Arbitrary stateful aggregation: updating state without setting timeout

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Arbitrary stateful aggregation: updating state without setting timeout

"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"

Hi all, I have following question:

What happens to the state (in terms of expiration) if I’m updating the state without setting timeout? 


E.g. in FlatMapGroupsWithStateFunction

  1. first batch:

state.update(myObj)

state.setTimeoutDuration(timeout)

  1. second batch:

state.update(myObj)

  1. third batch (no data for a long time):
    1. ???? state timed-out after initial timeout  expired? Not timed-out? 
Reply | Threaded
Open this post in threaded view
|

Re: Arbitrary stateful aggregation: updating state without setting timeout

Jungtaek Lim-2
Hi,

That's not explained in the SS guide doc but explained in the scala API doc.

The statement being quoted from the scala API doc answers your question.

The timeout is reset every time the function is called on a group, that is, when the group has new data, or the group has timed out. So the user has to set the timeout duration every time the function is called, otherwise there will not be any timeout set.

Simply saying, you'd want to always set timeout unless you remove state for the group (key).

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

‪On Mon, Oct 5, 2020 at 6:16 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ <[hidden email]> wrote:‬

Hi all, I have following question:

What happens to the state (in terms of expiration) if I’m updating the state without setting timeout? 


E.g. in FlatMapGroupsWithStateFunction

  1. first batch:

state.update(myObj)

state.setTimeoutDuration(timeout)

  1. second batch:

state.update(myObj)

  1. third batch (no data for a long time):
    1. ???? state timed-out after initial timeout  expired? Not timed-out? 
Reply | Threaded
Open this post in threaded view
|

Re: Arbitrary stateful aggregation: updating state without setting timeout

"Yuri Oleynikov (‫יורי אולייניקוב‬‎)"
Hi Jungtaek 
Thank you very much for clarification

5 окт. 2020 г., в 15:17, Jungtaek Lim <[hidden email]> написал(а):


Hi,

That's not explained in the SS guide doc but explained in the scala API doc.

The statement being quoted from the scala API doc answers your question.

The timeout is reset every time the function is called on a group, that is, when the group has new data, or the group has timed out. So the user has to set the timeout duration every time the function is called, otherwise there will not be any timeout set.

Simply saying, you'd want to always set timeout unless you remove state for the group (key).

Hope this helps.

Thanks,
Jungtaek Lim (HeartSaVioR)

‪On Mon, Oct 5, 2020 at 6:16 PM ‫Yuri Oleynikov (יורי אולייניקוב‬‎ <[hidden email]> wrote:‬

Hi all, I have following question:

What happens to the state (in terms of expiration) if I’m updating the state without setting timeout? 


E.g. in FlatMapGroupsWithStateFunction

  1. first batch:

state.update(myObj)

state.setTimeoutDuration(timeout)

  1. second batch:

state.update(myObj)

  1. third batch (no data for a long time):
    1. ???? state timed-out after initial timeout  expired? Not timed-out?