How to handle spark state which is growing too big even with timeout set.

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to handle spark state which is growing too big even with timeout set.

Robin Kuttaiah
Hello,

I have a use case where I need to read events(non correlated) from a kafka topic, then correlate and push forward to another topic.

I use spark structured streaming with FlatMapGroupsWithStateFunction along with   GroupStateTimeout.ProcessingTimeTimeout() . After each timeout, I do some correlation on group events and push forward correlated events to another topic. Non correlated events are stored in the state until they are correlated in a future set of events.

There are times where non correlated events are more and the size of non correlated events in the state are growing too big.

Does anyone know how to handle this use case or will spark take care of handling state when it grows big?

Thanks in advance.

regards,
Robin Kuttaiah
Reply | Threaded
Open this post in threaded view
|

Re: How to handle spark state which is growing too big even with timeout set.

Jungtaek Lim-2
For now, you'd like to consider using a 3rd party implementation of RocksDB state store (either open source implementations, or commercial one if you use either Databricks or Qubole) if the state doesn't fit the executor memory.
Hopefully, Spark community had a discussion on providing RocksDB state store out of the box and the discussion went positive. Worth noting that even in a happy case Spark community would introduce it in 3.2.0 which takes months (release phase on 3.1 is still ongoing, so probably need to add 6+ months from now). So if you're encountering the problem in production level or waiting for Spark 3.2 is not an option, you still have no option but to try out 3rd party implementations.

In the meanwhile I'm planning to look into "state migration" which lets users migrate their state from state store provider A to B. The hopeful plan is to support any arbitrary providers between the two.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Feb 11, 2021 at 5:01 PM Kuttaiah Robin <[hidden email]> wrote:
Hello,

I have a use case where I need to read events(non correlated) from a kafka topic, then correlate and push forward to another topic.

I use spark structured streaming with FlatMapGroupsWithStateFunction along with   GroupStateTimeout.ProcessingTimeTimeout() . After each timeout, I do some correlation on group events and push forward correlated events to another topic. Non correlated events are stored in the state until they are correlated in a future set of events.

There are times where non correlated events are more and the size of non correlated events in the state are growing too big.

Does anyone know how to handle this use case or will spark take care of handling state when it grows big?

Thanks in advance.

regards,
Robin Kuttaiah