structure streaming dataframe/dataset join (Java)

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

structure streaming dataframe/dataset join (Java)

Mann Du
Hello there,

I am trying to calculate simple difference btw adjacent rows ( ts = ts -10) of a column for a dataset using Join (of itself). The sql expression was working for static datasets (trackT) as: 

Dataset<Row> trackDiff = spark.sql(" select a.*, "

    + "a.posX - coalesce(b.posX, 0) as delX, 

    + "from trackT a left join trackT b " 

    + "on a.ts = b.ts - 10 ");

However, if the dataset is a structure streaming dataset, Spark prompts that "Stream-stream outer join between two streaming 

DataFrame/Datasets is not supported without a watermark in the join keys". Since the dataset joins itself, I was thinking to just use an arbitrary time interval as the watermark to create two streaming datasets and join them: 

Dataset<Row> trackWM1 = trackT.withColumn("ts1", trackT.col("timestamp"))

        .drop("timestamp").withWatermark("ts1", "10 second");

Dataset<Row> trackWM2 = trackT.withColumn("ts2", trackT.col("timestamp"))

         .drop("timestamp").withWatermark("ts2", "10 second");

Dataset<Row> joinDF = trackWM1.join(trackWM2, "???")

I am stuck and don't know how to do what I intended to do for the static datasets for this streaming dataset. The join seems to me mean different when I added the time interval watermark, as the original one was joining tables with different rows. Can someone explain how I can realize the original logic in streaming dataset. Probably I don't even need a join?