flatMapGroupsWithState not timing out (spark 2.2.1)

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

flatMapGroupsWithState not timing out (spark 2.2.1)

williamd1618

Hi,

I’m attempting to leverage flatMapGroupsWithState to handle some arbitrary aggregations and am noticing a couple of things:

  • ProcessingTimeTimeout + setTimeoutDuration timeout not being honored
  • EventTimeTimeout + watermark value not being honored.
  • EventTimeTimeout + setTimeoutTimestamp not being honored

I’ve come to this conclusion due to never hitting a conditional check (with log output) for the hasTimedOut property. Each of these scenarios was tested in isolation from each other and all three exhibited the same behavior — failure to reach a timeout event, and Spark induced huge duration between batches.

The test was 2000 messages read from a Kafka topic with two distinct groups (1000 messages / group).

To give an idea of what I’m attempting to do: aggregate all events into a single bucket given some timeout expiry.

Also, it should be noted, in this example I’m attempting to get the final value of the GroupState object as its timedout. This is why I attempt to do a second pass on the timeout — but that doesn’t really matter as I’m not even getting the timeout event.

My code is here:

    val stream = reader
      .load()
      .selectExpr(
        "CAST(key AS STRING)",
        "topic",
        "CAST(value AS BINARY)",
        "timestamp"
      )
      .as[KafkaLoadType].map(el => getJacksonReader(classOf[Data]).readValue[Data](new String(el._3)))
      .withWatermark("when", "10 seconds")
      .groupByKey(f => (f.name, f.when))
      .flatMapGroupsWithState[SessionInfo, Result](OutputMode.Append, GroupStateTimeout.EventTimeTimeout()) {
      case ((name, when),
      events: Iterator[Data], state: GroupState[SessionInfo]) => {

        state.setTimeoutTimestamp(DateTime.now.plusMinutes(1).getMillis)

        info("Starting flatMapGroupsWithState func")

        val asList = events.toList

        info(s"${name} iterator size: ${asList.size}")

        if (state.exists) {
          info(s"State exists: ${state.get}")
        } 

        var session = state.getOption.getOrElse(SessionInfo.zero(when, name))

        asList.foreach(e => {
          session = session.add(e.value)
        })

        info(s"Updating value to ${session}")

        state.update(session)

        val result = if (state.hasTimedOut && !state.get.finalized) {
          info("State has timedout ... finalizing")

          state.update(state.get.copy(finalized = true))

          Iterator(Option(state.get).map(r => Result(r.when, r.name, r.value)).get)
        } else if (state.hasTimedOut && state.get.finalized) {
          info("State has timedout AND is finalized")

          val r = state.get

          state.remove()

          Iterator(Option(r).map(r => Result(r.when, r.name, r.value)).get)
        } else {
          val result = state.get

          info(s"Returning ${result}")

          //          state.remove()

          Iterator(Option(result).map(r => Result(r.when, r.name, r.value)).get)
        }

        info("Exiting flatMapGroupsWithState func")

        result
      }
    }.writeStream.trigger(Trigger.ProcessingTime(500))
      .format("console").option("truncate", false)
      .outputMode(OutputMode.Append)
      .start()



Thanks for any help.

dan
Reply | Threaded
Open this post in threaded view
|

Re: flatMapGroupsWithState not timing out (spark 2.2.1)

Tathagata Das
Hello Dan, 

From your code, it seems like you are setting the timeout timestamp based on the current processing-time / wall-clock-time, while the watermark is being calculated on the event-time ("when" column). The semantics of the EventTimeTimeout is that when the last set timeout timestamp of a group becomes older than the watermark (that is calculated across all groups) because that group did not get any new data for a while, then there is a timeout and the function is called with hasTimedOut to true. However, in this case, the timeout timestamp is being from a different source of time (using the wall clock time) than the watermark (using event-time), so they may not correlate correctly. For example, if the event-time in the test data is such that it is always one hour behind the wall clock time, the watermark will be atleast 1 hour older than the set timeout timestamp, and the group would have to not received data for more than an hour before it times out.

So I would verify what is the gap between the event-time in data, and the wall-clock time that is being used to set to understand what is going on. Or even better, just use the event-time in the data to calculate the timeout timestamp and not use processing time timeout anywhere.

Let me know how it goes.

TD



On Fri, Jan 12, 2018 at 2:36 PM, daniel williams <[hidden email]> wrote:

Hi,

I’m attempting to leverage flatMapGroupsWithState to handle some arbitrary aggregations and am noticing a couple of things:

  • ProcessingTimeTimeout + setTimeoutDuration timeout not being honored
  • EventTimeTimeout + watermark value not being honored.
  • EventTimeTimeout + setTimeoutTimestamp not being honored

I’ve come to this conclusion due to never hitting a conditional check (with log output) for the hasTimedOut property. Each of these scenarios was tested in isolation from each other and all three exhibited the same behavior — failure to reach a timeout event, and Spark induced huge duration between batches.

The test was 2000 messages read from a Kafka topic with two distinct groups (1000 messages / group).

To give an idea of what I’m attempting to do: aggregate all events into a single bucket given some timeout expiry.

Also, it should be noted, in this example I’m attempting to get the final value of the GroupState object as its timedout. This is why I attempt to do a second pass on the timeout — but that doesn’t really matter as I’m not even getting the timeout event.

My code is here:

    val stream = reader
      .load()
      .selectExpr(
        "CAST(key AS STRING)",
        "topic",
        "CAST(value AS BINARY)",
        "timestamp"
      )
      .as[KafkaLoadType].map(el => getJacksonReader(classOf[Data]).readValue[Data](new String(el._3)))
      .withWatermark("when", "10 seconds")
      .groupByKey(f => (f.name, f.when))
      .flatMapGroupsWithState[SessionInfo, Result](OutputMode.Append, GroupStateTimeout.EventTimeTimeout()) {
      case ((name, when),
      events: Iterator[Data], state: GroupState[SessionInfo]) => {

        state.setTimeoutTimestamp(DateTime.now.plusMinutes(1).getMillis)

        info("Starting flatMapGroupsWithState func")

        val asList = events.toList

        info(s"${name} iterator size: ${asList.size}")

        if (state.exists) {
          info(s"State exists: ${state.get}")
        } 

        var session = state.getOption.getOrElse(SessionInfo.zero(when, name))

        asList.foreach(e => {
          session = session.add(e.value)
        })

        info(s"Updating value to ${session}")

        state.update(session)

        val result = if (state.hasTimedOut && !state.get.finalized) {
          info("State has timedout ... finalizing")

          state.update(state.get.copy(finalized = true))

          Iterator(Option(state.get).map(r => Result(r.when, r.name, r.value)).get)
        } else if (state.hasTimedOut && state.get.finalized) {
          info("State has timedout AND is finalized")

          val r = state.get

          state.remove()

          Iterator(Option(r).map(r => Result(r.when, r.name, r.value)).get)
        } else {
          val result = state.get

          info(s"Returning ${result}")

          //          state.remove()

          Iterator(Option(result).map(r => Result(r.when, r.name, r.value)).get)
        }

        info("Exiting flatMapGroupsWithState func")

        result
      }
    }.writeStream.trigger(Trigger.ProcessingTime(500))
      .format("console").option("truncate", false)
      .outputMode(OutputMode.Append)
      .start()



Thanks for any help.

dan

Reply | Threaded
Open this post in threaded view
|

Re: flatMapGroupsWithState not timing out (spark 2.2.1)

Tathagata Das
Aah okay!

How are testing whether there is a timeout? The situation that would lead to the EventTimeTimeout would be the following. 
1. Send bunch of data to group1, to set the timeout timestamp using event-time
2. Then send more data to group2 only, to advance the watermark (since it's based on event time across all the groups) and see timeout occurs. 
Note that you have to keep sending some data to other groups so that microbatches are triggered continuously and watermark is recalculated. If you send bunch of data and then stop sending and just wait, then the watermark will not advance (as there is no data to recalculate watermark) and therefore may not hit the condition watermark > timeout timestamp.

For ProcessingTimeTimeout the situation is different. That should rely solely on the wallclock time, not on any watermark. 
In that case, you still have to keep sending data to continuously trigger microbatches, as without any data, there wont be microbatches triggered and therefore no timeouts will be processed. This is a known issue that we will fix. It should work fine if you keep pushing data to group2; group1 should timeout.

Did that make sense?

TD

On Fri, Jan 12, 2018 at 3:43 PM, daniel williams <[hidden email]> wrote:

Hi Tathagata,

Thanks for the response and consideration. Noted in my points in my email that was actually one of the tests that I did (EventTimeTimeout solely with watermark) for the group — and it again never timed out. The code I posted was a later test where I was trying to use some of the additional GroupState methods to force a timeout. I suppose I could create an additional test of when.plus(1 minute) and see what happens.

Thanks and let me know if you have any more thoughts.

dan



On Fri, Jan 12, 2018 at 4:39 PM, Tathagata Das <[hidden email]> wrote:
Hello Dan, 

From your code, it seems like you are setting the timeout timestamp based on the current processing-time / wall-clock-time, while the watermark is being calculated on the event-time ("when" column). The semantics of the EventTimeTimeout is that when the last set timeout timestamp of a group becomes older than the watermark (that is calculated across all groups) because that group did not get any new data for a while, then there is a timeout and the function is called with hasTimedOut to true. However, in this case, the timeout timestamp is being from a different source of time (using the wall clock time) than the watermark (using event-time), so they may not correlate correctly. For example, if the event-time in the test data is such that it is always one hour behind the wall clock time, the watermark will be atleast 1 hour older than the set timeout timestamp, and the group would have to not received data for more than an hour before it times out.

So I would verify what is the gap between the event-time in data, and the wall-clock time that is being used to set to understand what is going on. Or even better, just use the event-time in the data to calculate the timeout timestamp and not use processing time timeout anywhere.

Let me know how it goes.

TD



On Fri, Jan 12, 2018 at 2:36 PM, daniel williams <[hidden email]> wrote:

Hi,

I’m attempting to leverage flatMapGroupsWithState to handle some arbitrary aggregations and am noticing a couple of things:

  • ProcessingTimeTimeout + setTimeoutDuration timeout not being honored
  • EventTimeTimeout + watermark value not being honored.
  • EventTimeTimeout + setTimeoutTimestamp not being honored

I’ve come to this conclusion due to never hitting a conditional check (with log output) for the hasTimedOut property. Each of these scenarios was tested in isolation from each other and all three exhibited the same behavior — failure to reach a timeout event, and Spark induced huge duration between batches.

The test was 2000 messages read from a Kafka topic with two distinct groups (1000 messages / group).

To give an idea of what I’m attempting to do: aggregate all events into a single bucket given some timeout expiry.

Also, it should be noted, in this example I’m attempting to get the final value of the GroupState object as its timedout. This is why I attempt to do a second pass on the timeout — but that doesn’t really matter as I’m not even getting the timeout event.

My code is here:

    val stream = reader
      .load()
      .selectExpr(
        "CAST(key AS STRING)",
        "topic",
        "CAST(value AS BINARY)",
        "timestamp"
      )
      .as[KafkaLoadType].map(el => getJacksonReader(classOf[Data]).readValue[Data](new String(el._3)))
      .withWatermark("when", "10 seconds")
      .groupByKey(f => (f.name, f.when))
      .flatMapGroupsWithState[SessionInfo, Result](OutputMode.Append, GroupStateTimeout.EventTimeTimeout()) {
      case ((name, when),
      events: Iterator[Data], state: GroupState[SessionInfo]) => {

        state.setTimeoutTimestamp(DateTime.now.plusMinutes(1).getMillis)

        info("Starting flatMapGroupsWithState func")

        val asList = events.toList

        info(s"${name} iterator size: ${asList.size}")

        if (state.exists) {
          info(s"State exists: ${state.get}")
        } 

        var session = state.getOption.getOrElse(SessionInfo.zero(when, name))

        asList.foreach(e => {
          session = session.add(e.value)
        })

        info(s"Updating value to ${session}")

        state.update(session)

        val result = if (state.hasTimedOut && !state.get.finalized) {
          info("State has timedout ... finalizing")

          state.update(state.get.copy(finalized = true))

          Iterator(Option(state.get).map(r => Result(r.when, r.name, r.value)).get)
        } else if (state.hasTimedOut && state.get.finalized) {
          info("State has timedout AND is finalized")

          val r = state.get

          state.remove()

          Iterator(Option(r).map(r => Result(r.when, r.name, r.value)).get)
        } else {
          val result = state.get

          info(s"Returning ${result}")

          //          state.remove()

          Iterator(Option(result).map(r => Result(r.when, r.name, r.value)).get)
        }

        info("Exiting flatMapGroupsWithState func")

        result
      }
    }.writeStream.trigger(Trigger.ProcessingTime(500))
      .format("console").option("truncate", false)
      .outputMode(OutputMode.Append)
      .start()



Thanks for any help.

dan




--
-dan