Spark can't identify the event time column being supplied to withWatermark()

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark can't identify the event time column being supplied to withWatermark()

frankdede
I was trying to find a way to resessionize features in different events based
on the event timestamps using Spark and I found a code example that uses
mapGroupsWithStateto resessionize events using processing timestamps in
their repo.

https://github.com/apache/spark/blob/v2.3.0/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala

To quickly test if this sessionization thing works with event timestamps, I
added withWatermark("timestamp", "10 seconds") (treating processing time as
the event timestamp) and changed ProcessingTimeTimeout to EventTimeTimeout.

  val lines = spark.readStream
  .format("socket")
  .option("host", host)
  .option("port", port)
  .option("includeTimestamp", value = true)
  .load()

 // Split the lines into words, treat words as sessionId of events
 val events = lines
  .withWatermark("timestamp", "10 seconds") // added
  .as[(String, Timestamp)]
  .flatMap { case (line, timestamp) =>
    line.split(" ").map(word => Event(sessionId = word, timestamp))
  }

 val sessionUpdates = events
  .groupByKey(event => event.sessionId)
  .mapGroupsWithState[SessionInfo,
SessionUpdate].(GroupStateTimeout.EventTimeTimeout) {
   ...
  }

  // Start running the query that prints the session updates to the console
 val query = sessionUpdates
  .writeStream
  .outputMode("update")
  .format("console")
  .start()

 query.awaitTermination()
However,when I ran it, Spark threw org.apache.spark.sql.AnalysisException
and said that Watermark must be specified in the query using
'[Dataset/DataFrame].withWatermark()' for using event-time timeout in a
[map|flatMap]GroupsWithState. Event-time timeout not supported without
watermark, which is not true and confusing, because that 'timestamp' column
is clearly in the physical plan following that exception message:

        ...
        +- EventTimeWatermark timestamp#3: timestamp, interval 10 seconds
           +- StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@394a6d2b,socket,List(),...,
[value#2, timestamp#3]
Did I miss something or did something wrong?

Thanks in advance!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark can't identify the event time column being supplied to withWatermark()

Tathagata Das
Try to define the watermark on the right column immediately before calling `groupByKey(...).mapGroupsWithState(...)`. You are applying the watermark and then doing a bunch of opaque transformation (user-defined flatMap that the planner has no visibility into). This prevents the planner from propagating the watermark tag through such operations. 

Specifically, you are applying a flatMap that takes a timestmap and splitting into multiple records with timestamp columns. The SQL analyzer/planner cannot possibly reason from the opaque user-defined code that the generated timestamp is same or different as the input timestamp column, hence it cannot propagate the watermark information down to the mapGropuswithState.


Hope this helps.

On Fri, Jun 8, 2018 at 7:50 AM, frankdede <[hidden email]> wrote:
I was trying to find a way to resessionize features in different events based
on the event timestamps using Spark and I found a code example that uses
mapGroupsWithStateto resessionize events using processing timestamps in
their repo.

https://github.com/apache/spark/blob/v2.3.0/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala

To quickly test if this sessionization thing works with event timestamps, I
added withWatermark("timestamp", "10 seconds") (treating processing time as
the event timestamp) and changed ProcessingTimeTimeout to EventTimeTimeout.

  val lines = spark.readStream
  .format("socket")
  .option("host", host)
  .option("port", port)
  .option("includeTimestamp", value = true)
  .load()

 // Split the lines into words, treat words as sessionId of events
 val events = lines
  .withWatermark("timestamp", "10 seconds") // added
  .as[(String, Timestamp)]
  .flatMap { case (line, timestamp) =>
    line.split(" ").map(word => Event(sessionId = word, timestamp))
  }

 val sessionUpdates = events
  .groupByKey(event => event.sessionId)
  .mapGroupsWithState[SessionInfo,
SessionUpdate].(GroupStateTimeout.EventTimeTimeout) {
   ...
  }

  // Start running the query that prints the session updates to the console
 val query = sessionUpdates
  .writeStream
  .outputMode("update")
  .format("console")
  .start()

 query.awaitTermination()
However,when I ran it, Spark threw org.apache.spark.sql.AnalysisException
and said that Watermark must be specified in the query using
'[Dataset/DataFrame].withWatermark()' for using event-time timeout in a
[map|flatMap]GroupsWithState. Event-time timeout not supported without
watermark, which is not true and confusing, because that 'timestamp' column
is clearly in the physical plan following that exception message:

        ...
        +- EventTimeWatermark timestamp#3: timestamp, interval 10 seconds
           +- StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@394a6d2b,socket,List(),...,
[value#2, timestamp#3]
Did I miss something or did something wrong?

Thanks in advance!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Spark can't identify the event time column being supplied to withWatermark()

frankdede
You are exactly right! A few hours ago, I tried many things and finally got
the example working by defining event timestamp column before groupByKey,
just like what you suggested, but I wasn't able to figure out the reasoning
behind my fix.

    val sessionUpdates = events
      .withWatermark("timestamp", "10 seconds")
      .groupByKey(event => event.sessionId)
      .mapGroupsWithState[SessionInfo,
SessionUpdate](GroupStateTimeout.EventTimeTimeout())

It turns out that it's just impossible for the planner to figure out the
source of the watermark column after applied flatMap.

Thanks Tathagata!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark can't identify the event time column being supplied to withWatermark()

Tathagata Das
Glad that it worked out! It's unfortunate that there exist such pitfalls. And there is no easy way to get around it. 
If you can, let us know how your experience with mapGroupsWithState has been.

TD

On Fri, Jun 8, 2018 at 1:49 PM, frankdede <[hidden email]> wrote:
You are exactly right! A few hours ago, I tried many things and finally got
the example working by defining event timestamp column before groupByKey,
just like what you suggested, but I wasn't able to figure out the reasoning
behind my fix.

    val sessionUpdates = events
      .withWatermark("timestamp", "10 seconds")
      .groupByKey(event => event.sessionId)
      .mapGroupsWithState[SessionInfo,
SessionUpdate](GroupStateTimeout.EventTimeTimeout())

It turns out that it's just impossible for the planner to figure out the
source of the watermark column after applied flatMap.

Thanks Tathagata!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]