Spark Structured streaming - dropDuplicates with watermark

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Spark Structured streaming - dropDuplicates with watermark

Nirmal Manoharan
I am trying to deduplicate on streaming data using the dropDuplicate function with watermark. The problem I am facing currently is that I have to two timestamps for a given record
1. One is the eventtimestamp - timestamp of the record creation from the source
2. Another is an transfer timestamp - timestamp from an intermediate process that is responsible to stream the data.
The duplicates are introduced during the intermediate stage so for a given a record duplicate, the eventtimestamp is same but transfer timestamp is different.

For the watermark, I like to use the transfertimestamp because I know the duplicates cant occur more than 3 minutes apart in transfer. But I cant use it within dropDuplicate because it wont capture the duplicates as the duplicates have different transfer timestamp.

Here is an example,
        Event 1:{ "EventString":"example1", "Eventtimestamp": "2018-11-29T10:00:00.00", "TransferTimestamp": "2018-11-29T10:05:00.00" }
        Event 2 (duplicate): {"EventString":"example1", "Eventtimestamp": "2018-11-29T10:00:00.00", "TransferTimestamp": "2018-11-29T10:08:00.00"}

In this case, the duplicate was created during transfer after 3 mins from the original event

My code is like below,
   
streamDataset.
    .withWatermark("transferTimestamp", "4 minutes")
    .dropDuplicates("eventstring","transferTimestamp");

The above code won't drop the duplicates as transferTimestamp is unique for the event and its duplicate. But currently, this is the only way as Spark forces me to include the watermark column in the dropDuplicates function.

I would really like to see an dropDuplicate implementation like below which would be a valid case for any at-least once semantics streams where I dont have to use the watermark field in dropDuplicates and still the watermark based state eviction is honored.
streamDataset.
    .withWatermark("transferTimestamp", "4 minutes")
    .dropDuplicates("eventstring");

If anyone has an alternate solution for this, please let me know. I cant use the eventtimestamp as it is not ordered and time range varies drastically (delayed events and junk events).

Thanks in advance
-Nirmal