Getting Message From Structured Streaming Format Kafka

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Getting Message From Structured Streaming Format Kafka

Daniel de Oliveira Mantovani
Hello, I'm trying to run the following code,

var newContextCreated = false // Flag to detect whether new context was created or not
val kafkaBrokers = "localhost:9092" // comma separated list of broker:host

private val batchDuration: Duration = Seconds(3)
private val master: String = "local[2]"
private val appName: String = this.getClass().getSimpleName()
private val checkpointDir: String = "/tmp/spark-streaming-amqp-tests"

// Create a Spark configuration

val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")

val ssc = new StreamingContext(sparkConf, batchDuration)
ssc.checkpoint(checkpointDir)
ssc.remember(Minutes(1)) // To make sure data is not deleted by the time we query it interactively

val spark = SparkSession
.builder
.config(sparkConf)
.getOrCreate()

val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "evil_queue")
.load()

lines.printSchema()

import spark.implicits._
val noAggDF = lines.select("key")

noAggDF
.writeStream
.format("console")
.start()


But I'm having the error:
http://paste.scsys.co.uk/565658

How do I get my messages using kafka as format from Structured Streaming ?

Thank you

--

--
Daniel de Oliveira Mantovani
Perl Evangelist/Data Hacker
+1 786 459 1341
Reply | Threaded
Open this post in threaded view
|

Re: Getting Message From Structured Streaming Format Kafka

Burak Yavuz-2
Hi Daniel,

Several things:
 1) Your error seems to suggest you're using a different version of Spark and a different version of the sql-kafka connector. Could you make sure they are on the same Spark version?
 2) With Structured Streaming, you may remove everything related to a StreamingContext.

val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")

val ssc = new StreamingContext(sparkConf, batchDuration)
ssc.checkpoint(checkpointDir)
ssc.remember(Minutes(1))
These lines are not doing anything for Structured Streaming.

Best,
Burak

On Thu, Nov 2, 2017 at 11:36 AM, Daniel de Oliveira Mantovani <[hidden email]> wrote:
Hello, I'm trying to run the following code,

var newContextCreated = false // Flag to detect whether new context was created or not
val kafkaBrokers = "localhost:9092" // comma separated list of broker:host

private val batchDuration: Duration = Seconds(3)
private val master: String = "local[2]"
private val appName: String = this.getClass().getSimpleName()
private val checkpointDir: String = "/tmp/spark-streaming-amqp-tests"

// Create a Spark configuration

val sparkConf = new SparkConf().setMaster(master).setAppName(appName)
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")

val ssc = new StreamingContext(sparkConf, batchDuration)
ssc.checkpoint(checkpointDir)
ssc.remember(Minutes(1)) // To make sure data is not deleted by the time we query it interactively

val spark = SparkSession
.builder
.config(sparkConf)
.getOrCreate()

val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "evil_queue")
.load()

lines.printSchema()

import spark.implicits._
val noAggDF = lines.select("key")

noAggDF
.writeStream
.format("console")
.start()


But I'm having the error:
http://paste.scsys.co.uk/565658

How do I get my messages using kafka as format from Structured Streaming ?

Thank you

--

--
Daniel de Oliveira Mantovani
Perl Evangelist/Data Hacker
<a href="tel:(786)%20459-1341" value="+17864591341" target="_blank">+1 786 459 1341