Spark Streaming in Spark 2.1 with Kafka 0.9

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Spark Streaming in Spark 2.1 with Kafka 0.9

khajaasmath786
Hi,

I am not successful when using using spark 2.1 with Kafka 0.9, can anyone please share the code snippet to use it.

val sparkSession: SparkSession = runMode match {
case "local" => SparkSession.builder.config(sparkConfig).getOrCreate
case "yarn" => SparkSession.builder.config(sparkConfig).enableHiveSupport.getOrCreate
}
val streamingContext = new StreamingContext(sparkSession.sparkContext, Seconds(20))
println("streamingContext --------->"+streamingContext)
streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
println("topics --------->"+config.getString(Constants.Properties.KafkaBrokerList))

val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet)

with above code , job gets aborted.

------------------------------------

I used code snippet of 0.10 too but no luck.

val streamingContext = new StreamingContext(sparkConfig, Seconds(20))
println("streamingContext --------->"+streamingContext)
streamingContext.checkpoint(config.getString(Constants.Properties.CheckPointDir))
println("topics --------->"+config.getString(Constants.Properties.KafkaBrokerList))

//val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](streamingContext, kafkaParams, topicsSet)

val messages = KafkaUtils.createDirectStream[String, String](
streamingContext,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))

any suggestions on how to use Spark2.1 with Kafka streaming ?
Thanks,
Asmath