[Spark Structured Streaming] Exception while using watermark with type of timestamp

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[Spark Structured Streaming] Exception while using watermark with type of timestamp

Biplob Biswas
This post has NOT been accepted by the mailing list yet.
Hi,

I am playing around with Spark structured streaming and we have a use case to use this as a CEP engine.

I am reading from 3 different kafka topics together. I want to perform windowing on this structured stream and then run some queries on this block on a sliding scale. Also, all of this needs to happen on the event time and I have my corresponding timestamp attached to each event.

Now I have something like this

val windowedEvents = filteredBamEvents.withWatermark("timestamp", "10 minutes")
                                          .groupBy(functions.window($"timestamp", "10 minutes", "5 minutes"))


where timestamp is of type long in my case class as follows:

case class BAMIngestedEvent(id: String, eventName: String, eventID: String, correlationID: Seq[String], timestamp: Long)

But when I am running this example with my data from kafka I cam getting this following exception:



Exception in thread "main" org.apache.spark.sql.AnalysisException: Event time must be defined on a window or a timestamp, but timestamp is of type bigint;;
EventTimeWatermark timestamp#36: bigint, interval 10 minutes
+- TypedFilter <function1>, class com.airplus.poc.edl.model.BAMIngestedEvent, [StructField(id,StringType,true), StructField(eventName,StringType,true), StructField(eventID,StringType,true), StructField(correlationID,ArrayType(StringType,true),true), StructField(timestamp,LongType,false)], newInstance(class com.airplus.poc.edl.model.BAMIngestedEvent)
   +- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).id, true) AS id#32, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).eventName, true) AS eventName#33, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).eventID, true) AS eventID#34, mapobjects(MapObjects_loopValue0, MapObjects_loopIsNull1, ObjectType(class java.lang.String), staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, lambdavariable(MapObjects_loopValue0, MapObjects_loopIsNull1, ObjectType(class java.lang.String)), true), assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).correlationID) AS correlationID#35, assertnotnull(input[0, com.airplus.poc.edl.model.BAMIngestedEvent, true]).timestamp AS timestamp#36L]
      +- MapElements <function1>, interface org.apache.spark.sql.Row, [StructField(value,StringType,true)], obj#31: com.airplus.poc.edl.model.BAMIngestedEvent
         +- DeserializeToObject createexternalrow(value#16.toString, StructField(value,StringType,true)), obj#30: org.apache.spark.sql.Row
            +- Project [value#16]
               +- Project [cast(key#0 as string) AS key#15, cast(value#1 as string) AS value#16]
                  +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@222acad,kafka,List(),None,List(),None,Map(startingOffsets -> latest, subscribe -> iom, edl, kafka.bootstrap.servers -> airpluspoc-hdp-dn0.germanycentral.cloudapp.microsoftazure.de:9092,airpluspoc-hdp-dn1.germanycentral.cloudapp.microsoftazure.de:9092,airpluspoc-hdp-dn2.germanycentral.cloudapp.microsoftazure.de:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:204)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:76)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
        at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:76)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:161)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:167)
        at org.apache.spark.sql.Dataset$.apply(Dataset.scala:58)
        at org.apache.spark.sql.Dataset.withTypedPlan(Dataset.scala:2850)
        at org.apache.spark.sql.Dataset.withWatermark(Dataset.scala:571)
        at com.airplus.poc.edl.CEPForBAM$.main(CEPForBAM.scala:47)
        at com.airplus.poc.edl.CEPForBAM.main(CEPForBAM.scala)

Regards,
Biplob
Reply | Threaded
Open this post in threaded view
|

Re: [Spark Structured Streaming] Exception while using watermark with type of timestamp

abaghel
This post has NOT been accepted by the mailing list yet.
This post was updated on .
In your BAMIngestedEvent class type of timestamp is Long. Change the type to Timestamp (java.sql.Timestamp).