[Spark Streaming][Spark SQL] Design suggestions needed for sessionization

Previous Topic Next Topic
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
Report Content as Inappropriate

[Spark Streaming][Spark SQL] Design suggestions needed for sessionization

Ramkumar Venkataraman
This post has NOT been accepted by the mailing list yet.
At high-level, I am looking to do sessionization. I want to combine events based on some key, do some transformations and emit data to HDFS. The catch is there are time boundaries, say, I group events in a window of 0.5 hours, based on some timestamp key in the event. Typical event-time windowing + key-based grouping stuff.

I have been trying to figure out ways to do it.

The following best approaches are ruled-out:

1) Use event-time windows with watermarks and possibility of updating previous windows on late data arrival. But this is possible in spark 2, but it is only in alpha. Also the company I work for, doesnt support spark 2 yet.
2) Use mapWithState in spark 1.6, but we can't do event-time windows if I am not mistaken.

Other feasible approaches:

3) Use another data-store like HBase to store unfinished sessions. Every window needs to find out which session a particular event will fit in by doing a query on HBase. Least favored option, since we have to maintain another component operationally.
4) Use HDFS to store unfinished sessions. At every window, we need to create a DF on the unfinished sessions, join it with the current DStream, do transformations, emit a tuple stream of finished and unfinished sessions and write them to HDFS.

Option #4 looks elegant, but the catch is the write of unfinished sessions. We read and write from the same HDFS location, the write also needs to do a SaveMode.Overwrite. I am seeing concurrency problems when the next window of read DF doesn't find the files in HDFS (because they are getting overwritten by the write from previous window).

Caused by: java.io.FileNotFoundException: File does not exist: hdfs://horton/tmp/inprogress-events/part-r-00000-a22e7b14-3207-4fb3-8db8-f5423ef0441d.gz.parquet
        at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1374)

So the questions I have:
1) Is option #4 of reading and writing DF to the same HDFS location (same table basically) the right approach? If not, any alternatives?
2) I had fiddled with writing the DF at the end of a window to a temp location and move the files from temp to the expected folder (using HDFS utils) at the beginning of next window. Not much help.
3) Is there a way to make sure the write at the end of every window to block the processing of next window? I tried to force creation of a new stage by using coalesce, but not much help.
4) Is there any other totally different approach that is possible? We know option #3 works, but we dont want to maintain any other component operationally.

Let me know your thoughts or if need any more information. Any existing pointers or SO answers on how people do sessionization in spark 1.6 would also help (couldn't find anything that helped me)