Controlling Spark StateStore retention

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Controlling Spark StateStore retention

Sergey Oboguev
I am trying to write a Spark Structured Streaming application consisting of GroupState construct followed by aggregation.

Events arriving from event sources are bucketized by deviceId and quantized timestamp, composed together into group state key idTime.

Logical plan consists of stages (in the order of data flow):

FlatMapGroupsWithState
Aggregate

and translates to physical plan (in the same order)

FlatMapGroupsWithState
SerializeFromObject
Project (idTime, timestamp)
HashAggregate(keys=[idTime] ... partial aggregation)
Exchange hashpartitioning(idTime)
HashAggregate(keys=[idTime] ... merge aggregates)
StateStoreRestore [idTime], state info)
HashAggregate(keys=[idTime] ... merge aggregates)
StateStoreSave [idTime], state info)
HashAggregate(keys=[idTime], functions= ...)

This all works, but it appears that partial aggregate state does not ever get released.

If I send 10 events for some value of idTime, the stream produces an output batch with count = 10.

If some significant time later (after group state expires) I send 10 more events for the some value of idTime, the stream produces another output batch with count = 20. Other aggregates also reflect that both old and new events were reflected in this subsequent aggregation output batch.

Thus, it appears state information is not cleared from the state store.

This is nice from the standpoint of handling latecomer events, but also poses a problem: if partial aggregate information per every idTime value is never cleared from the state store, the state store eventually is going to run out of space.

Is there a way to control this retention and trigger the release of state store data for old values idTime, no longer needed?

Thanks for advice.
Reply | Threaded
Open this post in threaded view
|

Re: Controlling Spark StateStore retention

Jungtaek Lim-2
Looks like you're trying to add two stateful operations in a chain - actually this would trigger the limitation of global watermark and lead the output "possibly" to be incorrect.
We've documented the limitations in the SS guide doc starting from Spark 3.0, so please take time to read the doc to know what would be possible issues for this and what workaround is available.


Starting from Spark 3.0, Spark provides a warning log message when the pattern is detected. Even in upcoming Spark 3.1, the query having such a pattern is disallowed unless end users set the config explicitly to force run.

Hope thisĀ helps.

Thanks,
Jungtaek Lim (HeartSaVioR)


On Sun, Feb 21, 2021 at 8:49 AM Sergey Oboguev <[hidden email]> wrote:
I am trying to write a Spark Structured Streaming application consisting of GroupState construct followed by aggregation.

Events arriving from event sources are bucketized by deviceId and quantized timestamp, composed together into group state key idTime.

Logical plan consists of stages (in the order of data flow):

FlatMapGroupsWithState
Aggregate

and translates to physical plan (in the same order)

FlatMapGroupsWithState
SerializeFromObject
Project (idTime, timestamp)
HashAggregate(keys=[idTime] ... partial aggregation)
Exchange hashpartitioning(idTime)
HashAggregate(keys=[idTime] ... merge aggregates)
StateStoreRestore [idTime], state info)
HashAggregate(keys=[idTime] ... merge aggregates)
StateStoreSave [idTime], state info)
HashAggregate(keys=[idTime], functions= ...)

This all works, but it appears that partial aggregate state does not ever get released.

If I send 10 events for some value of idTime, the stream produces an output batch with count = 10.

If some significant time later (after group state expires) I send 10 more events for the some value of idTime, the stream produces another output batch with count = 20. Other aggregates also reflect that both old and new events were reflected in this subsequent aggregation output batch.

Thus, it appears state information is not cleared from the state store.

This is nice from the standpoint of handling latecomer events, but also poses a problem: if partial aggregate information per every idTime value is never cleared from the state store, the state store eventually is going to run out of space.

Is there a way to control this retention and trigger the release of state store data for old values idTime, no longer needed?

Thanks for advice.