Stateful Structured Spark Streaming: Timeout is not getting triggered

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Stateful Structured Spark Streaming: Timeout is not getting triggered

Something Something
I've set the timeout duration to "2 minutes" as follows:

def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[R00tJsonObject],
                          oldState: GroupState[MyState]): OutputRow = {

    println("$$$$ Inside updateAcrossEvents with : " + tuple3._1 + ", " + tuple3._2 + ", " + tuple3._3)
    var state: MyState = if (oldState.exists) oldState.get else MyState(tuple3._1, tuple3._2, tuple3._3)

    if (oldState.hasTimedOut) {
      println("@@@@@ oldState has timed out @@@@")
      // Logic to Write OutputRow
      OutputRow("some values here...")
    } else {
      for (input <- inputs) {
        state = updateWithEvent(state, input)
        oldState.update(state)
        oldState.setTimeoutDuration("2 minutes")
      }
      OutputRow(null, null, null)
    }

  }
I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as follows...
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)

But 'hasTimedOut' is never true so I don't get any output! What am I doing wrong?



Reply | Threaded
Open this post in threaded view
|

Re: Stateful Structured Spark Streaming: Timeout is not getting triggered

Tathagata Das
Make sure that you are continuously feeding data into the query to trigger the batches. only then timeouts are processed. 
 
On Wed, Mar 4, 2020 at 2:51 PM Something Something <[hidden email]> wrote:
I've set the timeout duration to "2 minutes" as follows:

def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[R00tJsonObject],
                          oldState: GroupState[MyState]): OutputRow = {

    println("$$$$ Inside updateAcrossEvents with : " + tuple3._1 + ", " + tuple3._2 + ", " + tuple3._3)
    var state: MyState = if (oldState.exists) oldState.get else MyState(tuple3._1, tuple3._2, tuple3._3)

    if (oldState.hasTimedOut) {
      println("@@@@@ oldState has timed out @@@@")
      // Logic to Write OutputRow
      OutputRow("some values here...")
    } else {
      for (input <- inputs) {
        state = updateWithEvent(state, input)
        oldState.update(state)
        oldState.setTimeoutDuration("2 minutes")
      }
      OutputRow(null, null, null)
    }

  }
I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as follows...
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)

But 'hasTimedOut' is never true so I don't get any output! What am I doing wrong?




Reply | Threaded
Open this post in threaded view
|

Re: Stateful Structured Spark Streaming: Timeout is not getting triggered

Something Something
Yes.... that was it! It seems it only works if input data is continuously flowing. I had stopped the input job because I had enough data but it seems timeouts work only if the data is continuously fed. Not sure why it's designed that way. Makes it a bit harder to write unit/integration tests BUT I am sure there's a reason why it's designed this way. Thanks.

On Wed, Mar 4, 2020 at 6:31 PM Tathagata Das <[hidden email]> wrote:
Make sure that you are continuously feeding data into the query to trigger the batches. only then timeouts are processed. 
 
On Wed, Mar 4, 2020 at 2:51 PM Something Something <[hidden email]> wrote:
I've set the timeout duration to "2 minutes" as follows:

def updateAcrossEvents (tuple3: Tuple3[String, String, String], inputs: Iterator[R00tJsonObject],
                          oldState: GroupState[MyState]): OutputRow = {

    println("$$$$ Inside updateAcrossEvents with : " + tuple3._1 + ", " + tuple3._2 + ", " + tuple3._3)
    var state: MyState = if (oldState.exists) oldState.get else MyState(tuple3._1, tuple3._2, tuple3._3)

    if (oldState.hasTimedOut) {
      println("@@@@@ oldState has timed out @@@@")
      // Logic to Write OutputRow
      OutputRow("some values here...")
    } else {
      for (input <- inputs) {
        state = updateWithEvent(state, input)
        oldState.update(state)
        oldState.setTimeoutDuration("2 minutes")
      }
      OutputRow(null, null, null)
    }

  }
I have also specified ProcessingTimeTimeout in 'mapGroupsWithState' as follows...
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)

But 'hasTimedOut' is never true so I don't get any output! What am I doing wrong?