Example of Stateful Spark Structured Streaming with Kafka

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Example of Stateful Spark Structured Streaming with Kafka

Something Something
There are lots of examples on 'Stateful Structured Streaming' in 'The Definitive Guide' book BUT all of them read JSON from a 'path'. That's working for me.

Now I need to read from Kafka.

I Googled but I couldn't find any example. I am struggling to Map the 'Value' of the Kafka message to my JSON. Any help would be appreciated. Here's what I am trying:

val query = withEventTime
.as[R00tJsonObject]
.withWatermark("event_time", "5 minutes")
.groupByKey(row => (row.report.id, row.report.time.toString, row.report.cId))
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic",
"myTopic")
.option(
"checkpointLocation", "/Users/username/checkpointLocation")
.outputMode("update")
.start().awaitTermination

cannot resolve 'arrivalTime' given input columns: [value, event_time];