Spark watermarked aggregation query and append output mode

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark watermarked aggregation query and append output mode

Sergey Oboguev

Hi,

I am trying to aggregate Spark time-stamped structured stream to get per-device (source) averages for every second of incoming data.

dataset.printSchema();   // see the output below

Dataset<Row> ds1 = dataset
                          .withWatermark("timestamp", "1 second")
                          .groupBy(
                                   functions.window(dataset.col("timestamp"), "1 second", "1 second"),
                                   dataset.col("source"))
                          .agg(
                               functions.avg("D0").as("AVG_D0"),
                               functions.avg("I0").as("AVG_I0"))
                          .orderBy("window");

StreamingQuery query = ds1.writeStream()
                          .outputMode(OutputMode.Append())
                          .format("console")
                          .option("truncate", "false")
                          .option("numRows", Integer.MAX_VALUE)
                          .start();

query.awaitTermination();

I am using Spark 2.4.6.

According to
https://spark.apache.org/docs/2.4.6/structured-streaming-programming-guide.html#output-modes
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes
the above construct should work fine.

Yet I am getting an exception in the query start():

11:05:27.282 [main] ERROR my.sparkbench.example.Example - Exception
org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Sort [window#44 ASC NULLS FIRST], true
+- Aggregate [window#71, source#0], [window#71 AS window#44, source#0, avg(D0#12) AS AVG_D0#68, avg(I0#2L) AS AVG_I0#70]
   +- Filter isnotnull(timestamp#1)
      +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 1000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 1000000) + 0) + 1000000), LongType, TimestampType)) AS window#71, source#0, timestamp#1-T1000ms, I0#2L, I1#3L, I2#4L, I3#5L, I4#6L, I5#7L, I6#8L, I7#9L, I8#10L, I9#11L, D0#12, D1#13, D2#14, D3#15, D4#16, D5#17, D6#18, D7#19, D8#20, D9#21]
         +- EventTimeWatermark timestamp#1: timestamp, interval 1 seconds
            +- StreamingRelationV2 my.sparkbench.datastreamreader.MyStreamingSource@6897a4a, my.sparkbench.datastreamreader.MyStreamingSource, [source#0, timestamp#1, I0#2L, I1#3L, I2#4L, I3#5L, I4#6L, I5#7L, I6#8L, I7#9L, I8#10L, I9#11L, D0#12, D1#13, D2#14, D3#15, D4#16, D5#17, D6#18, D7#19, D8#20, D9#21]

    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:111)
    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:256)
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:322)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
    at my.sparkbench.example.Example.streamGroupByResult(Example.java:113)
    at my.sparkbench.example.Example.exec_main(Example.java:76)
    at my.sparkbench.example.Example.do_main(Example.java:42)
    at my.sparkbench.example.Example.main(Example.java:34)

even though there is a watermark on the stream.

Schema printout looks fine:

root
 |-- source: string (nullable = false)
 |-- timestamp: timestamp (nullable = false)
 |-- I0: long (nullable = false)
 |-- I1: long (nullable = false)
 |-- I2: long (nullable = false)
 |-- I3: long (nullable = false)
 |-- I4: long (nullable = false)
 |-- I5: long (nullable = false)
 |-- I6: long (nullable = false)
 |-- I7: long (nullable = false)
 |-- I8: long (nullable = false)
 |-- I9: long (nullable = false)
 |-- D0: double (nullable = false)
 |-- D1: double (nullable = false)
 |-- D2: double (nullable = false)
 |-- D3: double (nullable = false)
 |-- D4: double (nullable = false)
 |-- D5: double (nullable = false)
 |-- D6: double (nullable = false)
 |-- D7: double (nullable = false)
 |-- D8: double (nullable = false)
 |-- D9: double (nullable = false)

Actual data looks fine too. If I feed it to

dataset.writeStream().format("console").option("truncate", "false").outputMode(OutputMode.Append()).start();

then I am getting output

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+---------------------+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+
|source  |timestamp            |I0 |I1 |I2 |I3 |I4 |I5 |I6 |I7 |I8 |I9 |D0  |D1  |D2  |D3  |D4  |D5  |D6  |D7  |D8  |D9  |
+--------+---------------------+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+
|DEV-0001|1970-01-01 00:01:40  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0002|1970-01-01 00:01:40  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0003|1970-01-01 00:01:40  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0004|1970-01-01 00:01:40  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0001|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0002|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0003|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0004|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0001|1970-01-01 00:01:41  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0002|1970-01-01 00:01:41  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0003|1970-01-01 00:01:41  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0004|1970-01-01 00:01:41  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0001|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0002|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0003|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0004|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0001|1970-01-01 00:01:42  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0002|1970-01-01 00:01:42  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0003|1970-01-01 00:01:42  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0004|1970-01-01 00:01:42  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
+--------+---------------------+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+
only showing top 20 rows

and then follow-up batches of a similar look.

There is no exception if I use COMPLETE output mode, but then old results (from the start of the timeline) are reported in every batch and that’s not what I want. I want only new query result records to be reported. Thus I want the APPEND mode – but it causes an exception.

Why is the exception and how can I make it work?

Tiny project that isolates the problem is here: https://github.com/oboguev/SparkQuestion

Thanks for advice.

Reply | Threaded
Open this post in threaded view
|

Re: Spark watermarked aggregation query and append output mode

German Schiavon Matteo
Hi, 

try this :

dataset.printSchema();   // see the output below

Dataset<Row> ds1 = dataset
                          .withWatermark("timestamp", "1 second")
                          .groupBy(
                                   functions.window(col("timestamp"), "1 second", "1 second"),
                                   col("source"))
                          .agg(
                               functions.avg("D0").as("AVG_D0"),
                               functions.avg("I0").as("AVG_I0"))
                          .orderBy("window");

On Wed, 23 Sep 2020 at 22:51, Sergey Oboguev <[hidden email]> wrote:

Hi,

I am trying to aggregate Spark time-stamped structured stream to get per-device (source) averages for every second of incoming data.

dataset.printSchema();   // see the output below

Dataset<Row> ds1 = dataset
                          .withWatermark("timestamp", "1 second")
                          .groupBy(
                                   functions.window(dataset.col("timestamp"), "1 second", "1 second"),
                                   dataset.col("source"))
                          .agg(
                               functions.avg("D0").as("AVG_D0"),
                               functions.avg("I0").as("AVG_I0"))
                          .orderBy("window");

StreamingQuery query = ds1.writeStream()
                          .outputMode(OutputMode.Append())
                          .format("console")
                          .option("truncate", "false")
                          .option("numRows", Integer.MAX_VALUE)
                          .start();

query.awaitTermination();

I am using Spark 2.4.6.

According to
https://spark.apache.org/docs/2.4.6/structured-streaming-programming-guide.html#output-modes
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes
the above construct should work fine.

Yet I am getting an exception in the query start():

11:05:27.282 [main] ERROR my.sparkbench.example.Example - Exception
org.apache.spark.sql.AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;;
Sort [window#44 ASC NULLS FIRST], true
+- Aggregate [window#71, source#0], [window#71 AS window#44, source#0, avg(D0#12) AS AVG_D0#68, avg(I0#2L) AS AVG_I0#70]
   +- Filter isnotnull(timestamp#1)
      +- Project [named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 1000000) + 0), LongType, TimestampType), end, precisetimestampconversion((((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) as double) = (cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) THEN (CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) + cast(1 as bigint)) ELSE CEIL((cast((precisetimestampconversion(timestamp#1, TimestampType, LongType) - 0) as double) / cast(1000000 as double))) END + cast(0 as bigint)) - cast(1 as bigint)) * 1000000) + 0) + 1000000), LongType, TimestampType)) AS window#71, source#0, timestamp#1-T1000ms, I0#2L, I1#3L, I2#4L, I3#5L, I4#6L, I5#7L, I6#8L, I7#9L, I8#10L, I9#11L, D0#12, D1#13, D2#14, D3#15, D4#16, D5#17, D6#18, D7#19, D8#20, D9#21]
         +- EventTimeWatermark timestamp#1: timestamp, interval 1 seconds
            +- StreamingRelationV2 my.sparkbench.datastreamreader.MyStreamingSource@6897a4a, my.sparkbench.datastreamreader.MyStreamingSource, [source#0, timestamp#1, I0#2L, I1#3L, I2#4L, I3#5L, I4#6L, I5#7L, I6#8L, I7#9L, I8#10L, I9#11L, D0#12, D1#13, D2#14, D3#15, D4#16, D5#17, D6#18, D7#19, D8#20, D9#21]

    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:389)
    at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForStreaming(UnsupportedOperationChecker.scala:111)
    at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:256)
    at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:322)
    at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:325)
    at my.sparkbench.example.Example.streamGroupByResult(Example.java:113)
    at my.sparkbench.example.Example.exec_main(Example.java:76)
    at my.sparkbench.example.Example.do_main(Example.java:42)
    at my.sparkbench.example.Example.main(Example.java:34)

even though there is a watermark on the stream.

Schema printout looks fine:

root
 |-- source: string (nullable = false)
 |-- timestamp: timestamp (nullable = false)
 |-- I0: long (nullable = false)
 |-- I1: long (nullable = false)
 |-- I2: long (nullable = false)
 |-- I3: long (nullable = false)
 |-- I4: long (nullable = false)
 |-- I5: long (nullable = false)
 |-- I6: long (nullable = false)
 |-- I7: long (nullable = false)
 |-- I8: long (nullable = false)
 |-- I9: long (nullable = false)
 |-- D0: double (nullable = false)
 |-- D1: double (nullable = false)
 |-- D2: double (nullable = false)
 |-- D3: double (nullable = false)
 |-- D4: double (nullable = false)
 |-- D5: double (nullable = false)
 |-- D6: double (nullable = false)
 |-- D7: double (nullable = false)
 |-- D8: double (nullable = false)
 |-- D9: double (nullable = false)

Actual data looks fine too. If I feed it to

dataset.writeStream().format("console").option("truncate", "false").outputMode(OutputMode.Append()).start();

then I am getting output

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+---------------------+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+
|source  |timestamp            |I0 |I1 |I2 |I3 |I4 |I5 |I6 |I7 |I8 |I9 |D0  |D1  |D2  |D3  |D4  |D5  |D6  |D7  |D8  |D9  |
+--------+---------------------+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+
|DEV-0001|1970-01-01 00:01:40  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0002|1970-01-01 00:01:40  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0003|1970-01-01 00:01:40  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0004|1970-01-01 00:01:40  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0001|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0002|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0003|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0004|1970-01-01 00:01:40.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0001|1970-01-01 00:01:41  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0002|1970-01-01 00:01:41  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0003|1970-01-01 00:01:41  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0004|1970-01-01 00:01:41  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0001|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0002|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0003|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0004|1970-01-01 00:01:41.5|10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0001|1970-01-01 00:01:42  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0002|1970-01-01 00:01:42  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0003|1970-01-01 00:01:42  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
|DEV-0004|1970-01-01 00:01:42  |10 |10 |10 |10 |10 |10 |10 |10 |10 |10 |10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|10.0|
+--------+---------------------+---+---+---+---+---+---+---+---+---+---+----+----+----+----+----+----+----+----+----+----+
only showing top 20 rows

and then follow-up batches of a similar look.

There is no exception if I use COMPLETE output mode, but then old results (from the start of the timeline) are reported in every batch and that’s not what I want. I want only new query result records to be reported. Thus I want the APPEND mode – but it causes an exception.

Why is the exception and how can I make it work?

Tiny project that isolates the problem is here: https://github.com/oboguev/SparkQuestion

Thanks for advice.

Reply | Threaded
Open this post in threaded view
|

Re: Spark watermarked aggregation query and append output mode

Sergey Oboguev
Thanks!

It appears one should use not dataset.col("timestamp")
but rather functions.col("timestamp").