Detecting latecomer events in Spark structured streaming

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Detecting latecomer events in Spark structured streaming

Sergey Oboguev
I have a Spark structured streaming based application that performs window(...) construct followed by aggregation.

This construct discards latecomer events that arrive past the watermark. I need to be able to detect these late events to handle them out-of-band.
The application maintains a raw store of all received events and can re-aggregate a particular time interval for a particular device in a secondary batch mode, as long as it knows that this interval has to be re-aggregated, i.e. contains latecomer data that was discarded by structured streaming due to watermark.

I am trying to come with a way to perform such a detection.

One approach would perhaps be to insert an additional stage before window(...) -- a stage that would monitor all events received by the stream, look at their timestamps, and predict which events will be discarded by window(...) due to watermark. Such events can then be handled outside of Spark structured streaming. The stage can be based on Dataset.foreach, Dataset.filter or Dataset.map that will pass all events through, but also monitor the events and if a latecomer condition is detected, then issue a side channel notification that will cause data for the specified device and interval be re-aggregated later from raw event storage, out of stream.

I have a couple of questions related to the feasibility of such a construct.

Q1:

Can data behind the window(...) be shared by multiple executors or nodes, or is it owned by one executor at any given time? If it is shared, it would appear that local monitoring of passing timestamps would be insufficient, since it lacks global context.

Q2:

To monitor the stream, the stage needs to maintain a context. The context can be checkpointed periodically in an external store, but I do not want to persist/readback the context for every microbatch (or, in the foreach case, for every individual event). I want to checkpoint the context infrequently, and maintain it across microbatches just in memory.

Which brings a question... The handler function inside the stage (called by foreach, map, or filter) needs to refer to the context object, yet it is unclear how to make such a reference.

I could attach a context to the stream via some global map object (translating stream->context), but handler functions for Dataset.foreach, Dataset.map, or Dataset.filter do not receive a stream handle, and thus have no key to use for translation back to context object.

The association cannot be done via a TLS (per-thread) variable too, since Spark can internally create threads for stream processing and they won't inherit the parent TLS (and also may not even have the thread that started the stream as their parent thread).

This appears to leave Java static variable as the only option for the context pointer, limiting the model to one active stream per executor. But is it guaranteed by Spark specification that different executors will run in different JVM instances?

Thanks for advice.
Reply | Threaded
Open this post in threaded view
|

Re: Detecting latecomer events in Spark structured streaming

Jungtaek Lim-2
Hi,

If I remember correctly, I don't think Spark provides watermark value itself for the current batch to the public API. That said, if you're dealing with "event time" (and I guess you belong to this case as you worry about late events), unless you employ a new logical/physical plan to expose watermarks to the user level function, it's not possible to do what you plan to do.

I've tried similar thing to count the number of late events via making changes on Spark codebase (see https://github.com/apache/spark/pull/24936) - my initial goal was providing side-output on late events to let end users being able to deal with these events outside of the query, but soon realized it's non-trivial, and just took the simplest approach at that time.
(There're still possible ideas to do, e.g. sending them to the driver via RPC, assuming these events are "minority", but nothing comes into conclusion it worths to put efforts. If your business logic requires it, you could be a hacker and try to deal with this, and share if you succeed to make it.)

I'd skip answering questions as I explained you'd be stuck even before raising these questions.

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Tue, Mar 9, 2021 at 6:49 AM Sergey Oboguev <[hidden email]> wrote:
I have a Spark structured streaming based application that performs window(...) construct followed by aggregation.

This construct discards latecomer events that arrive past the watermark. I need to be able to detect these late events to handle them out-of-band.
The application maintains a raw store of all received events and can re-aggregate a particular time interval for a particular device in a secondary batch mode, as long as it knows that this interval has to be re-aggregated, i.e. contains latecomer data that was discarded by structured streaming due to watermark.

I am trying to come with a way to perform such a detection.

One approach would perhaps be to insert an additional stage before window(...) -- a stage that would monitor all events received by the stream, look at their timestamps, and predict which events will be discarded by window(...) due to watermark. Such events can then be handled outside of Spark structured streaming. The stage can be based on Dataset.foreach, Dataset.filter or Dataset.map that will pass all events through, but also monitor the events and if a latecomer condition is detected, then issue a side channel notification that will cause data for the specified device and interval be re-aggregated later from raw event storage, out of stream.

I have a couple of questions related to the feasibility of such a construct.

Q1:

Can data behind the window(...) be shared by multiple executors or nodes, or is it owned by one executor at any given time? If it is shared, it would appear that local monitoring of passing timestamps would be insufficient, since it lacks global context.

Q2:

To monitor the stream, the stage needs to maintain a context. The context can be checkpointed periodically in an external store, but I do not want to persist/readback the context for every microbatch (or, in the foreach case, for every individual event). I want to checkpoint the context infrequently, and maintain it across microbatches just in memory.

Which brings a question... The handler function inside the stage (called by foreach, map, or filter) needs to refer to the context object, yet it is unclear how to make such a reference.

I could attach a context to the stream via some global map object (translating stream->context), but handler functions for Dataset.foreach, Dataset.map, or Dataset.filter do not receive a stream handle, and thus have no key to use for translation back to context object.

The association cannot be done via a TLS (per-thread) variable too, since Spark can internally create threads for stream processing and they won't inherit the parent TLS (and also may not even have the thread that started the stream as their parent thread).

This appears to leave Java static variable as the only option for the context pointer, limiting the model to one active stream per executor. But is it guaranteed by Spark specification that different executors will run in different JVM instances?

Thanks for advice.