Stateful Spark Streaming: Required attribute 'value' not found

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Stateful Spark Streaming: Required attribute 'value' not found

Something Something
In a Stateful Spark Streaming application I am writing the 'OutputRow' in the 'updateAcrossEvents' but I keep getting this error (Required attribute 'value' not found) while it's trying to write to Kafka. I know from the documentation that 'value' attribute needs to be set but how do I do that in the 'Stateful Structured Streaming'? Where & how do I add this 'value' attribute in the following code? Note: I am using Spark 2.3.1

withEventTime
      .as[R00tJsonObject]
      .withWatermark("event_time", "5 minutes")
      .groupByKey(row => (row.value.Id, row.value.time.toString, row.value.cId))
      .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "myTopic")
      .option("checkpointLocation", "/Users/username/checkpointLocation")
      .outputMode("update")
      .start()
      .awaitTermination()
Reply | Threaded
Open this post in threaded view
|

Re: Stateful Spark Streaming: Required attribute 'value' not found

Something Something
By simply adding 'toJSON' before 'writeStream' the problem was fixed. Maybe it will help someone.

On Tue, Mar 3, 2020 at 6:02 PM Something Something <[hidden email]> wrote:
In a Stateful Spark Streaming application I am writing the 'OutputRow' in the 'updateAcrossEvents' but I keep getting this error (Required attribute 'value' not found) while it's trying to write to Kafka. I know from the documentation that 'value' attribute needs to be set but how do I do that in the 'Stateful Structured Streaming'? Where & how do I add this 'value' attribute in the following code? Note: I am using Spark 2.3.1

withEventTime
      .as[R00tJsonObject]
      .withWatermark("event_time", "5 minutes")
      .groupByKey(row => (row.value.Id, row.value.time.toString, row.value.cId))
      .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout)(updateAcrossEvents)
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:9092")
      .option("topic", "myTopic")
      .option("checkpointLocation", "/Users/username/checkpointLocation")
      .outputMode("update")
      .start()
      .awaitTermination()