I’m attempting to leverage
I’ve come to this conclusion due to never hitting a conditional check (with log output) for the hasTimedOut property. Each of these scenarios was tested in isolation from each other and all three exhibited the same behavior — failure to reach a timeout event, and Spark induced huge duration between batches.
The test was 2000 messages read from a Kafka topic with two distinct groups (1000 messages / group).
To give an idea of what I’m attempting to do: aggregate all events into a single bucket given some timeout expiry.
Also, it should be noted, in this example I’m attempting to get the final value of the GroupState object as its timedout. This is why I attempt to do a second pass on the timeout — but that doesn’t really matter as I’m not even getting the timeout event.
My code is here:
Thanks for any help.
From your code, it seems like you are setting the timeout timestamp based on the current processing-time / wall-clock-time, while the watermark is being calculated on the event-time ("when" column). The semantics of the EventTimeTimeout is that when the last set timeout timestamp of a group becomes older than the watermark (that is calculated across all groups) because that group did not get any new data for a while, then there is a timeout and the function is called with hasTimedOut to true. However, in this case, the timeout timestamp is being from a different source of time (using the wall clock time) than the watermark (using event-time), so they may not correlate correctly. For example, if the event-time in the test data is such that it is always one hour behind the wall clock time, the watermark will be atleast 1 hour older than the set timeout timestamp, and the group would have to not received data for more than an hour before it times out.
So I would verify what is the gap between the event-time in data, and the wall-clock time that is being used to set to understand what is going on. Or even better, just use the event-time in the data to calculate the timeout timestamp and not use processing time timeout anywhere.
Let me know how it goes.
On Fri, Jan 12, 2018 at 2:36 PM, daniel williams <[hidden email]> wrote:
How are testing whether there is a timeout? The situation that would lead to the EventTimeTimeout would be the following.
1. Send bunch of data to group1, to set the timeout timestamp using event-time
2. Then send more data to group2 only, to advance the watermark (since it's based on event time across all the groups) and see timeout occurs.
Note that you have to keep sending some data to other groups so that microbatches are triggered continuously and watermark is recalculated. If you send bunch of data and then stop sending and just wait, then the watermark will not advance (as there is no data to recalculate watermark) and therefore may not hit the condition watermark > timeout timestamp.
For ProcessingTimeTimeout the situation is different. That should rely solely on the wallclock time, not on any watermark.
In that case, you still have to keep sending data to continuously trigger microbatches, as without any data, there wont be microbatches triggered and therefore no timeouts will be processed. This is a known issue that we will fix. It should work fine if you keep pushing data to group2; group1 should timeout.
Did that make sense?
On Fri, Jan 12, 2018 at 3:43 PM, daniel williams <[hidden email]> wrote:
|Free forum by Nabble||Edit this page|