How to optimize iterative data processing in spark application

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

How to optimize iterative data processing in spark application

Federico D'Ambrosio
Hello everyone,

I have a spark application processing data iteratively within an RDD until .isEmpty() is true. Now the loop is sort of like it follows

mainRDD = sc.parallelize(...) //initialize mainRDD
do {

rdd1 = mainRDD.flatMapToPair(advanceState)//advance state of element

rdd2 = rdd1.filter().mapToPair()
rdd3 = rdd1.filter().mapToPair()
rdd4 = rdd1.filter().mapToPair()

finalRDD = rdd3.leftOuterJoin(rdd4).map()

mainRDD = rdd2.union(rdd4).cache()
mainRDD.checkpoint()

} while(!mainRDD.isEmpty())

Eventually mainRDD will be empty and the computation will stop.
Functionally speaking, this loop is working, but I'm finding an issue with a certain class of elements which could be contained within mainRDD.
These particular elements have an internal timer, a time constraint, which causes the element to not advance in state, basically causing more than a few empty loops.
Even though those timers function, in that when a timer terminates the iterations continue, the last .isEmpty() action lasts more than the previous iterations, causing a total execution time six times longer than the value of the timer.
From the application logs I can notice that this kind of slow down is really caused by a longer than normal isEmpty execution, in which a greater than normal number of tasks are executed (both local and cluster mode) and it's particularly evident if I test locally with a single element within the mainRDD.
Is it possible that Spark Streaming may be more recommended givent that I'm dealing with some sort of time constraint?
Is there any way I can optimize the transformations flow I should look into?

Thank you,
Federico
--
Federico D'Ambrosio