DStream reduceByKeyAndWindow not using checkpointed data for inverse reducing old data

classic Classic list List threaded Threaded
2 messages Options
NB
Reply | Threaded
Open this post in threaded view
|

DStream reduceByKeyAndWindow not using checkpointed data for inverse reducing old data

NB
Hello,

We have a Spark Streaming application (currently using Spark 2.2.0) that uses reduceByKeyAndWindow() with inverse reduce functions and filter functions quite heavily. The batch interval is 30 seconds. 

One thing that I have not been able to explain till now is the number and long duration of input blocks being cached in memory and disk by spark when logically, the lineage for those should have been cut and thus they should have been cleared. I believe I may have spotted the cause of these input blocks being cached for a long time. A section of the DAG generated below might help illustrate my theory.

If you follow the lineages of the various RDDs going into the reduceByKeyAndWindow @18:26:30 in stage 1395845. One of them is the current batch with lineage all the way to the input flume stream[0] at the top of the stage 1395843. A second RDD from just the prior batch @18:26:00 is being read from the checkpoint as expected. However the third RDD (the one that is going out of windowDuration of 15 minutes and fed to the inverse reduce function) @18:11:30 is not coming from the checkpoint but has its lineage shown all the way to the input flume stream[0] at the top of the skipped stage 1395844. 


image.png

I believe this is the reason that our input streams RDDs are being persisted in memory and disk for close to 2-1/2 hours before being cleared up by Spark.

This is a reduceByKeyAndWindow for a 15 minute windowDuration for illiustration purposes only. We have other reduceByKeyAndWindow that go up to 1 hour and all display the same characteristic.

If anyone can please suggest the underlying cause of this and what we can do to alleviate it will be much appreciated.

Thanks
N B

NB
Reply | Threaded
Open this post in threaded view
|

Re: DStream reduceByKeyAndWindow not using checkpointed data for inverse reducing old data

NB
Update on this:

I have tested a potential fix for this issue. I ended up discovering what looks like 2 different issues, but will need validation from a committer before submitting patches or pull requests - may need more refinement and discussion. I have created 2 JIRA tickets here : SPARK-25302 and SPARK-25303

After making the fixes, I can successfully run our pipeline and it now uses checkpointed data instead of cached data. It also helps cut the lineage to the Input DStream and hence removes it from the cache in about 30 seconds instead of persisting for 2-1/2 hours like it was doing earlier.

Here is a screenshot of the same portion of the pipeline that I had pasted in the previous message but with the fixes applied :

image.png

If someone can please jump in and help me guide on how to submit patches or even pull requests, it will be very useful.

Thanks,
N B


On Wed, Aug 29, 2018 at 4:16 PM N B <[hidden email]> wrote:
Hello,

We have a Spark Streaming application (currently using Spark 2.2.0) that uses reduceByKeyAndWindow() with inverse reduce functions and filter functions quite heavily. The batch interval is 30 seconds. 

One thing that I have not been able to explain till now is the number and long duration of input blocks being cached in memory and disk by spark when logically, the lineage for those should have been cut and thus they should have been cleared. I believe I may have spotted the cause of these input blocks being cached for a long time. A section of the DAG generated below might help illustrate my theory.

If you follow the lineages of the various RDDs going into the reduceByKeyAndWindow @18:26:30 in stage 1395845. One of them is the current batch with lineage all the way to the input flume stream[0] at the top of the stage 1395843. A second RDD from just the prior batch @18:26:00 is being read from the checkpoint as expected. However the third RDD (the one that is going out of windowDuration of 15 minutes and fed to the inverse reduce function) @18:11:30 is not coming from the checkpoint but has its lineage shown all the way to the input flume stream[0] at the top of the skipped stage 1395844. 




I believe this is the reason that our input streams RDDs are being persisted in memory and disk for close to 2-1/2 hours before being cleared up by Spark.

This is a reduceByKeyAndWindow for a 15 minute windowDuration for illiustration purposes only. We have other reduceByKeyAndWindow that go up to 1 hour and all display the same characteristic.

If anyone can please suggest the underlying cause of this and what we can do to alleviate it will be much appreciated.

Thanks
N B