[Structured Streaming] Two watermarks and StreamingQueryListener

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[Structured Streaming] Two watermarks and StreamingQueryListener

subramgr
Hi,

We have two *flatMapGroupWithState* in our job and we have two
*withWatermark*

We are getting the event max time, event time and watermarks from
*QueryProgressEvent*.

Right now it just returns one *watermark* value.

Are two watermarks maintained by Spark or just one.
If one which one
If one watermark is maintained per *Dataframe* how do I get the values for
them ?




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] Two watermarks and StreamingQueryListener

Tathagata Das
Structured Streaming internally maintains one global watermark by taking a min of the two watermarks. Thats why one gets reported. In Spark 2.4, there will be the option of choosing max instead of min.

Just curious. Why do you have to two watermarks? Whats the query like. 

TD

On Thu, Aug 9, 2018 at 3:15 PM, subramgr <[hidden email]> wrote:
Hi,

We have two *flatMapGroupWithState* in our job and we have two
*withWatermark*

We are getting the event max time, event time and watermarks from
*QueryProgressEvent*.

Right now it just returns one *watermark* value.

Are two watermarks maintained by Spark or just one.
If one which one
If one watermark is maintained per *Dataframe* how do I get the values for
them ?




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: [Structured Streaming] Two watermarks and StreamingQueryListener

Girish Subramanian
In reply to this post by subramgr

Thanks for the explanation.

 

We are doing something like this.

 

The *first* watermark is to eliminate the late events from *kafka*

 

The *second* watermark is to eliminate older aggregated metrics across *sessions*

 

I know I can replace the second one with *window* but I was not able to come up with a solution.

 

 

val sessionLevelMetrics: Dataset[Metric] = kafkaEvents
  .withWatermark(
"timestamp", "30 minutes")
  .groupByKey(e => e._2.getSiteIdentifierConcatenatedSessionId)
  .flatMapGroupsWithState(OutputMode.Append(), GroupStateTimeout.EventTimeTimeout())(updateSessionState(broadcastWrapper))

val aggMetrics: Dataset[AggregatedMetric] = sessionLevelMetrics.withColumn("ts", conversion(col("timestamp")))
  .withWatermark(
"ts", "30 minutes")
  .groupByKey(m => m.getAs[
String]("name") + "." + m.getAs[Long]("timestamp"))
  .flatMapGroupsWithState(OutputMode.Append(), GroupStateTimeout.EventTimeTimeout())(updateAggregateMetricsState)

aggMetrics
  .writeStream
  .format(
"com.walmart.cxtools.expo.kairos.KairosSinkProvider")
  .option(
"checkpointLocation", checkpointLocation)
  .outputMode(OutputMode.Append())
  .trigger(Trigger.ProcessingTime(
60.seconds))
  .start()
  .awaitTermination()

 

 

 

 

 

def updateAggregateMetricsState(metricKey: String, sessionLevelMetrics: Iterator[Row], state: GroupState[AggregatedMetric]): Iterator[AggregatedMetric] = {
 
if (state.hasTimedOut) {
    state.remove()
   
Iterator()
  }
else if (!sessionLevelMetrics.hasNext) {
   
Iterator()
  }
else {
   
val prev = state.getOption
   
var sum = sessionLevelMetrics.map(_.getAs[Int]("count")).sum
   
if (prev.isDefined) {
      sum += prev.get.count
    }
   
val aggMetric = AggregatedMetric(metricKey, sum)
    state.update(aggMetric)
    state.setTimeoutTimestamp(metricKey.substring(metricKey.lastIndexOf(
".") + 1).toLong + (30 * 60000))
   
Iterator(aggMetric)
  }
}

 

 

From: Tathagata Das <[hidden email]>
Date: Friday, August 10, 2018 at 4:16 PM
To: subramgr <[hidden email]>
Cc: user <[hidden email]>
Subject: EXT: Re: [Structured Streaming] Two watermarks and StreamingQueryListener

 

Structured Streaming internally maintains one global watermark by taking a min of the two watermarks. Thats why one gets reported. In Spark 2.4, there will be the option of choosing max instead of min.

 

Just curious. Why do you have to two watermarks? Whats the query like. 

 

TD

 

On Thu, Aug 9, 2018 at 3:15 PM, subramgr <[hidden email]> wrote:

Hi,

We have two *flatMapGroupWithState* in our job and we have two
*withWatermark*

We are getting the event max time, event time and watermarks from
*QueryProgressEvent*.

Right now it just returns one *watermark* value.

Are two watermarks maintained by Spark or just one.
If one which one
If one watermark is maintained per *Dataframe* how do I get the values for
them ?




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]