Blacklisting in Spark Stateful Structured Streaming

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Blacklisting in Spark Stateful Structured Streaming

Eric Beabes

Currently we’ve a “Stateful” Spark Structured Streaming job that computes aggregates for each ID. I need to implement a new requirement which says that if the no. of incoming messages for a particular ID exceeds a certain value then add this ID to a blacklist & remove the state for it. Going forward for any ID that’s blacklisted we will not create a state for it. The message will simply get filtered out if the ID is blacklisted.

What’s the best way to implement this in Spark Structured Streaming? Essentially what we need to do is create a Distributed HashSet that gets updated intermittently & make this HashSet available to all Executors so that they can filter out unwanted messages.

Any pointers would be greatly appreciated. Is the only option to use a 3rdparty Distributed Cache tool such as EhCache, Redis etc?



Reply | Threaded
Open this post in threaded view
|

Re: Blacklisting in Spark Stateful Structured Streaming

liyuanjian
If you use the `flatMap/mapGroupsWithState` API for a "stateful" SS job, the blacklisting structure can be put into the user-defined state.
To use a 3rd-party cache should also be a good choice.

Eric Beabes <[hidden email]> 于2020年11月11日周三 上午6:54写道:

Currently we’ve a “Stateful” Spark Structured Streaming job that computes aggregates for each ID. I need to implement a new requirement which says that if the no. of incoming messages for a particular ID exceeds a certain value then add this ID to a blacklist & remove the state for it. Going forward for any ID that’s blacklisted we will not create a state for it. The message will simply get filtered out if the ID is blacklisted.

What’s the best way to implement this in Spark Structured Streaming? Essentially what we need to do is create a Distributed HashSet that gets updated intermittently & make this HashSet available to all Executors so that they can filter out unwanted messages.

Any pointers would be greatly appreciated. Is the only option to use a 3rdparty Distributed Cache tool such as EhCache, Redis etc?



Reply | Threaded
Open this post in threaded view
|

Re: Blacklisting in Spark Stateful Structured Streaming

Eric Beabes
Yes I agree that blacklisting structure can be put in the user-defined state but still the state would remain open for a long time, right? Am I misunderstanding something?

I like the idea of blacklisting in a "Broadcast" variable but I can't figure out how to use the "Broadcast" variable in the 'mapGroupWithState' function. For example, I've this code:

.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(
updateAcrossEvents
)
and updateAcrossEvents is defined as:
def updateAcrossEvents(tuple5: (String, String, String, String, String), inputs: Iterator[MyObject], oldState: GroupState[MyState]) 

How do I pass a "Broadcast Variable" into the 'updateAcrossEvents' method? Please advise. Thanks.


On Mon, Nov 16, 2020 at 3:40 AM Yuanjian Li <[hidden email]> wrote:
If you use the `flatMap/mapGroupsWithState` API for a "stateful" SS job, the blacklisting structure can be put into the user-defined state.
To use a 3rd-party cache should also be a good choice.

Eric Beabes <[hidden email]> 于2020年11月11日周三 上午6:54写道:

Currently we’ve a “Stateful” Spark Structured Streaming job that computes aggregates for each ID. I need to implement a new requirement which says that if the no. of incoming messages for a particular ID exceeds a certain value then add this ID to a blacklist & remove the state for it. Going forward for any ID that’s blacklisted we will not create a state for it. The message will simply get filtered out if the ID is blacklisted.

What’s the best way to implement this in Spark Structured Streaming? Essentially what we need to do is create a Distributed HashSet that gets updated intermittently & make this HashSet available to all Executors so that they can filter out unwanted messages.

Any pointers would be greatly appreciated. Is the only option to use a 3rdparty Distributed Cache tool such as EhCache, Redis etc?