Re: About the question of Spark Structured Streaming window output

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: About the question of Spark Structured Streaming window output

maasg
Hi,

When you use a window in Append mode, you need to wait for the end of the window + watermark to see the final record from the "append" mode.
This is your query over time. Note the timestamp at the right side of the cell and the data present in it.

val windowedCounts = dataSrc
      .withWatermark("timestamp", "1 minutes")
      .groupBy(window($"timestamp", "5 minutes"))
      .agg(sum("value") as "sumvalue")
      .select("window.start", "window.end","sumvalue")


image.png

Going back to your questions:
1、when I set the append output model,  I send inputdata, but there is no result to output. How to use append model in window aggreate case ?
Wait for the window + watermark to expire and you'll see the append record output

2、when I set the update output model, I send inputdata, the result is output every batch .But I want output the result only once when window is end. How can I do?
Use `append` mode.

kr, Gerard.

On Thu, Aug 23, 2018 at 4:31 AM [hidden email] <[hidden email]> wrote:
Hi :
   I have some questions about spark structured streaming window output  in spark 2.3.1.  I write the application code as following:

case class DataType(time:Timestamp, value:Long) {}

val spark = SparkSession
      .builder
      .appName("StructuredNetworkWordCount")
      .master("local[1]")
      .getOrCreate()
 
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

val words = lines.as[String].map(l => {
  var tmp = l.split(",")
  DataType(Timestamp.valueOf(tmp(0)), tmp(1).toLong)
}).as[DataType]

val windowedCounts = words
      .withWatermark("time", "1 minutes")
      .groupBy(window($"time", "5 minutes"))
      .agg(sum("value") as "sumvalue")
      .select("window.start", "window.end","sumvalue")

val query = windowedCounts.writeStream
  .outputMode("update")
  .format("console")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start()

query.awaitTermination()

the input data format is :
2018-08-20 12:01:00,1
2018-08-20 12:02:01,1

My questions are:
1、when I set the append output model,  I send inputdata, but there is no result to output. How to use append model in window aggreate case ?
2、when I set the update output model, I send inputdata, the result is output every batch .But I want output the result only once when window is end. How can I do?

Thanks in advance!



Reply | Threaded
Open this post in threaded view
|

Re: Re: About the question of Spark Structured Streaming window output

zrc@zjdex.com
Hi Gerard Mass:
Thanks a lot for your reply. 
When i use "append" model,  I send the following data:
2018-08-27 09:53:00,1
2018-08-27 09:53:01,1
The result (which has only schema, like the following) has received after the batch is end. But when the time of window + watermark is up, there is no result to output. Is there something I misss? Thanks in advance.




 
Date: 2018-08-27 05:00
Subject: Re: About the question of Spark Structured Streaming window output
Hi,

When you use a window in Append mode, you need to wait for the end of the window + watermark to see the final record from the "append" mode.
This is your query over time. Note the timestamp at the right side of the cell and the data present in it.

val windowedCounts = dataSrc
      .withWatermark("timestamp", "1 minutes")
      .groupBy(window($"timestamp", "5 minutes"))
      .agg(sum("value") as "sumvalue")
      .select("window.start", "window.end","sumvalue")


image.png

Going back to your questions:
1、when I set the append output model,  I send inputdata, but there is no result to output. How to use append model in window aggreate case ?
Wait for the window + watermark to expire and you'll see the append record output

2、when I set the update output model, I send inputdata, the result is output every batch .But I want output the result only once when window is end. How can I do?
Use `append` mode.

kr, Gerard.

On Thu, Aug 23, 2018 at 4:31 AM [hidden email] <[hidden email]> wrote:
Hi :
   I have some questions about spark structured streaming window output  in spark 2.3.1.  I write the application code as following:

case class DataType(time:Timestamp, value:Long) {}

val spark = SparkSession
      .builder
      .appName("StructuredNetworkWordCount")
      .master("local[1]")
      .getOrCreate()
 
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

val words = lines.as[String].map(l => {
  var tmp = l.split(",")
  DataType(Timestamp.valueOf(tmp(0)), tmp(1).toLong)
}).as[DataType]

val windowedCounts = words
      .withWatermark("time", "1 minutes")
      .groupBy(window($"time", "5 minutes"))
      .agg(sum("value") as "sumvalue")
      .select("window.start", "window.end","sumvalue")

val query = windowedCounts.writeStream
  .outputMode("update")
  .format("console")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start()

query.awaitTermination()

the input data format is :
2018-08-20 12:01:00,1
2018-08-20 12:02:01,1

My questions are:
1、when I set the append output model,  I send inputdata, but there is no result to output. How to use append model in window aggreate case ?
2、when I set the update output model, I send inputdata, the result is output every batch .But I want output the result only once when window is end. How can I do?

Thanks in advance!



Reply | Threaded
Open this post in threaded view
|

Re: Re: About the question of Spark Structured Streaming window output

Jungtaek Lim
In reply to this post by maasg
You may want to add streaming listener to your query and see when/how watermark is updated. In short, watermark is calculated from previous batch and calculated value is applied to current batch. So you may think that the result is provided later than expected, maybe a batch.

2018년 8월 27일 (월) 오전 11:56, [hidden email] <[hidden email]>님이 작성:
Hi Gerard Mass:
Thanks a lot for your reply. 
When i use "append" model,  I send the following data:
2018-08-27 09:53:00,1
2018-08-27 09:53:01,1
The result (which has only schema, like the following) has received after the batch is end. But when the time of window + watermark is up, there is no result to output. Is there something I misss? Thanks in advance.




 
Date: 2018-08-27 05:00
Subject: Re: About the question of Spark Structured Streaming window output
Hi,

When you use a window in Append mode, you need to wait for the end of the window + watermark to see the final record from the "append" mode.
This is your query over time. Note the timestamp at the right side of the cell and the data present in it.

val windowedCounts = dataSrc
      .withWatermark("timestamp", "1 minutes")
      .groupBy(window($"timestamp", "5 minutes"))
      .agg(sum("value") as "sumvalue")
      .select("window.start", "window.end","sumvalue")




Going back to your questions:
1、when I set the append output model,  I send inputdata, but there is no result to output. How to use append model in window aggreate case ?
Wait for the window + watermark to expire and you'll see the append record output

2、when I set the update output model, I send inputdata, the result is output every batch .But I want output the result only once when window is end. How can I do?
Use `append` mode.

kr, Gerard.

On Thu, Aug 23, 2018 at 4:31 AM [hidden email] <[hidden email]> wrote:
Hi :
   I have some questions about spark structured streaming window output  in spark 2.3.1.  I write the application code as following:

case class DataType(time:Timestamp, value:Long) {}

val spark = SparkSession
      .builder
      .appName("StructuredNetworkWordCount")
      .master("local[1]")
      .getOrCreate()
 
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

val words = lines.as[String].map(l => {
  var tmp = l.split(",")
  DataType(Timestamp.valueOf(tmp(0)), tmp(1).toLong)
}).as[DataType]

val windowedCounts = words
      .withWatermark("time", "1 minutes")
      .groupBy(window($"time", "5 minutes"))
      .agg(sum("value") as "sumvalue")
      .select("window.start", "window.end","sumvalue")

val query = windowedCounts.writeStream
  .outputMode("update")
  .format("console")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start()

query.awaitTermination()

the input data format is :
2018-08-20 12:01:00,1
2018-08-20 12:02:01,1

My questions are:
1、when I set the append output model,  I send inputdata, but there is no result to output. How to use append model in window aggreate case ?
2、when I set the update output model, I send inputdata, the result is output every batch .But I want output the result only once when window is end. How can I do?

Thanks in advance!




Catch.jpg (154K) Download Attachment
image(08-27-10-10-26).png (185K) Download Attachment
Catch.jpg (154K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Re: About the question of Spark Structured Streaming window output

zrc@zjdex.com
Hi Jungtaek Lim & Gerard Mass:
Thanks very much.
When I put three batch data like following :

batch 0:
2018-08-27 09:53:00,1
2018-08-27 09:53:01,1

batch 1:
2018-08-27 11:04:00,1
2018-08-27 11:04:01,1

batch 2:
2018-08-27 11:17:00,1
2018-08-27 11:17:01,1

the agg result of time "2018-08-27 09:53:00" is output like following:
Batch: 2
-------------------------------------------
+-------------------+-------------------+--------+
|              start|                end|sumvalue|
+-------------------+-------------------+--------+
|2018-08-27 09:50:00|2018-08-27 09:55:00|       2|
+-------------------+-------------------+--------+

For the result, I wonder to know:
1、why the start time is "2018-08-27 09:50:00" not "2018-08-27 09:53:00"? When I define the window, the starttime is not set.
2、why the agg result of time "2018-08-27 09:53:00 " is not output when the batch1 data is comming?

Thanks a lot!




 
Date: 2018-08-27 11:01
Subject: Re: Re: About the question of Spark Structured Streaming window output
You may want to add streaming listener to your query and see when/how watermark is updated. In short, watermark is calculated from previous batch and calculated value is applied to current batch. So you may think that the result is provided later than expected, maybe a batch.

2018년 8월 27일 (월) 오전 11:56, [hidden email] <[hidden email]>님이 작성:
Hi Gerard Mass:
Thanks a lot for your reply. 
When i use "append" model,  I send the following data:
2018-08-27 09:53:00,1
2018-08-27 09:53:01,1
The result (which has only schema, like the following) has received after the batch is end. But when the time of window + watermark is up, there is no result to output. Is there something I misss? Thanks in advance.




 
Date: 2018-08-27 05:00
Subject: Re: About the question of Spark Structured Streaming window output
Hi,

When you use a window in Append mode, you need to wait for the end of the window + watermark to see the final record from the "append" mode.
This is your query over time. Note the timestamp at the right side of the cell and the data present in it.

val windowedCounts = dataSrc
      .withWatermark("timestamp", "1 minutes")
      .groupBy(window($"timestamp", "5 minutes"))
      .agg(sum("value") as "sumvalue")
      .select("window.start", "window.end","sumvalue")




Going back to your questions:
1、when I set the append output model,  I send inputdata, but there is no result to output. How to use append model in window aggreate case ?
Wait for the window + watermark to expire and you'll see the append record output

2、when I set the update output model, I send inputdata, the result is output every batch .But I want output the result only once when window is end. How can I do?
Use `append` mode.

kr, Gerard.

On Thu, Aug 23, 2018 at 4:31 AM [hidden email] <[hidden email]> wrote:
Hi :
   I have some questions about spark structured streaming window output  in spark 2.3.1.  I write the application code as following:

case class DataType(time:Timestamp, value:Long) {}

val spark = SparkSession
      .builder
      .appName("StructuredNetworkWordCount")
      .master("local[1]")
      .getOrCreate()
 
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

val words = lines.as[String].map(l => {
  var tmp = l.split(",")
  DataType(Timestamp.valueOf(tmp(0)), tmp(1).toLong)
}).as[DataType]

val windowedCounts = words
      .withWatermark("time", "1 minutes")
      .groupBy(window($"time", "5 minutes"))
      .agg(sum("value") as "sumvalue")
      .select("window.start", "window.end","sumvalue")

val query = windowedCounts.writeStream
  .outputMode("update")
  .format("console")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start()

query.awaitTermination()

the input data format is :
2018-08-20 12:01:00,1
2018-08-20 12:02:01,1

My questions are:
1、when I set the append output model,  I send inputdata, but there is no result to output. How to use append model in window aggreate case ?
2、when I set the update output model, I send inputdata, the result is output every batch .But I want output the result only once when window is end. How can I do?

Thanks in advance!



Reply | Threaded
Open this post in threaded view
|

Re: Re: About the question of Spark Structured Streaming window output

maasg
In reply to this post by Jungtaek Lim
Hi,

> 1、why the start time is "2018-08-27 09:50:00" not "2018-08-27 09:53:00"? When I define the window, the starttime is not set.
When no 'starttime' is defined, windows are aligned to the start of the upper time magnitude. So, if your window is defined in minutes, it will be aligned to the start of the hour and the first window will be the current window.
In the case of a window of 5:00 minutes, it will be 9:00 - 9:05, 9:05-9:10, 9:10-9:15, ... 9:45-9:50, 9:50-9:55, ...
The first data point sets the internal 'event time clock' and the first corresponding window for 9:53 is 9:50-9:55

Also, note that 'start time' is a very misleading name for that window parameter. It's actually `starttimeoffset`. If you would specify 'startTime' to `1 minute`, then the windows will be 9:01-9:06, 9:06-9:11, ...

2、why the agg result of time "2018-08-27 09:53:00 " is not output when the batch1 data is coming?
Yes. the resulting value is clearly there:
2018-08-27 09:50:00|2018-08-27 09:55:00|       2|
the two datapoints that fall in this window are summed and the result is 2.

What would were you expecting?

kr, Gerard.


On Mon, Aug 27, 2018 at 5:37 AM [hidden email] <[hidden email]> wrote:
Hi Jungtaek Lim & Gerard Mass:
Thanks very much.
When I put three batch data like following :

batch 0:
2018-08-27 09:53:00,1
2018-08-27 09:53:01,1

batch 1:
2018-08-27 11:04:00,1
2018-08-27 11:04:01,1

batch 2:
2018-08-27 11:17:00,1
2018-08-27 11:17:01,1

the agg result of time "2018-08-27 09:53:00" is output like following:
Batch: 2
-------------------------------------------
+-------------------+-------------------+--------+
|              start|                end|sumvalue|
+-------------------+-------------------+--------+
|2018-08-27 09:50:00|2018-08-27 09:55:00|       2|
+-------------------+-------------------+--------+

For the result, I wonder to know:
1、why the start time is "2018-08-27 09:50:00" not "2018-08-27 09:53:00"? When I define the window, the starttime is not set.
2、why the agg result of time "2018-08-27 09:53:00 " is not output when the batch1 data is comming?

Thanks a lot!




 
Date: 2018-08-27 11:01
Subject: Re: Re: About the question of Spark Structured Streaming window output
You may want to add streaming listener to your query and see when/how watermark is updated. In short, watermark is calculated from previous batch and calculated value is applied to current batch. So you may think that the result is provided later than expected, maybe a batch.

2018년 8월 27일 (월) 오전 11:56, [hidden email] <[hidden email]>님이 작성:
Hi Gerard Mass:
Thanks a lot for your reply. 
When i use "append" model,  I send the following data:
2018-08-27 09:53:00,1
2018-08-27 09:53:01,1
The result (which has only schema, like the following) has received after the batch is end. But when the time of window + watermark is up, there is no result to output. Is there something I misss? Thanks in advance.




 
Date: 2018-08-27 05:00
Subject: Re: About the question of Spark Structured Streaming window output
Hi,

When you use a window in Append mode, you need to wait for the end of the window + watermark to see the final record from the "append" mode.
This is your query over time. Note the timestamp at the right side of the cell and the data present in it.

val windowedCounts = dataSrc
      .withWatermark("timestamp", "1 minutes")
      .groupBy(window($"timestamp", "5 minutes"))
      .agg(sum("value") as "sumvalue")
      .select("window.start", "window.end","sumvalue")


image.png

Going back to your questions:
1、when I set the append output model,  I send inputdata, but there is no result to output. How to use append model in window aggreate case ?
Wait for the window + watermark to expire and you'll see the append record output

2、when I set the update output model, I send inputdata, the result is output every batch .But I want output the result only once when window is end. How can I do?
Use `append` mode.

kr, Gerard.

On Thu, Aug 23, 2018 at 4:31 AM [hidden email] <[hidden email]> wrote:
Hi :
   I have some questions about spark structured streaming window output  in spark 2.3.1.  I write the application code as following:

case class DataType(time:Timestamp, value:Long) {}

val spark = SparkSession
      .builder
      .appName("StructuredNetworkWordCount")
      .master("local[1]")
      .getOrCreate()
 
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

val words = lines.as[String].map(l => {
  var tmp = l.split(",")
  DataType(Timestamp.valueOf(tmp(0)), tmp(1).toLong)
}).as[DataType]

val windowedCounts = words
      .withWatermark("time", "1 minutes")
      .groupBy(window($"time", "5 minutes"))
      .agg(sum("value") as "sumvalue")
      .select("window.start", "window.end","sumvalue")

val query = windowedCounts.writeStream
  .outputMode("update")
  .format("console")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start()

query.awaitTermination()

the input data format is :
2018-08-20 12:01:00,1
2018-08-20 12:02:01,1

My questions are:
1、when I set the append output model,  I send inputdata, but there is no result to output. How to use append model in window aggreate case ?
2、when I set the update output model, I send inputdata, the result is output every batch .But I want output the result only once when window is end. How can I do?

Thanks in advance!



Reply | Threaded
Open this post in threaded view
|

Re: Re: About the question of Spark Structured Streaming window output

zrc@zjdex.com
Hi Gerard:
   Thank you very much!
   About second question, what I wonder is that, when the batch 1 data(2018-08-27 11:04:00,1) is coming, the max event time is "2018-08-27 11:04:00", it is larger than the window + watermark of batch 0 data(2018-08-27 09:53:00, 1), so it can trigger the output of agg result of batch 0. But it is not, the batch 2 data does.

Thanks!

 
Date: 2018-08-27 17:26
Subject: Re: Re: About the question of Spark Structured Streaming window output
Hi,

> 1、why the start time is "2018-08-27 09:50:00" not "2018-08-27 09:53:00"? When I define the window, the starttime is not set.
When no 'starttime' is defined, windows are aligned to the start of the upper time magnitude. So, if your window is defined in minutes, it will be aligned to the start of the hour and the first window will be the current window.
In the case of a window of 5:00 minutes, it will be 9:00 - 9:05, 9:05-9:10, 9:10-9:15, ... 9:45-9:50, 9:50-9:55, ...
The first data point sets the internal 'event time clock' and the first corresponding window for 9:53 is 9:50-9:55

Also, note that 'start time' is a very misleading name for that window parameter. It's actually `starttimeoffset`. If you would specify 'startTime' to `1 minute`, then the windows will be 9:01-9:06, 9:06-9:11, ...

2、why the agg result of time "2018-08-27 09:53:00 " is not output when the batch1 data is coming?
Yes. the resulting value is clearly there:
2018-08-27 09:50:00|2018-08-27 09:55:00|       2|
the two datapoints that fall in this window are summed and the result is 2.

What would were you expecting?

kr, Gerard.


On Mon, Aug 27, 2018 at 5:37 AM [hidden email] <[hidden email]> wrote:
Hi Jungtaek Lim & Gerard Mass:
Thanks very much.
When I put three batch data like following :

batch 0:
2018-08-27 09:53:00,1
2018-08-27 09:53:01,1

batch 1:
2018-08-27 11:04:00,1
2018-08-27 11:04:01,1

batch 2:
2018-08-27 11:17:00,1
2018-08-27 11:17:01,1

the agg result of time "2018-08-27 09:53:00" is output like following:
Batch: 2
-------------------------------------------
+-------------------+-------------------+--------+
|              start|                end|sumvalue|
+-------------------+-------------------+--------+
|2018-08-27 09:50:00|2018-08-27 09:55:00|       2|
+-------------------+-------------------+--------+

For the result, I wonder to know:
1、why the start time is "2018-08-27 09:50:00" not "2018-08-27 09:53:00"? When I define the window, the starttime is not set.
2、why the agg result of time "2018-08-27 09:53:00 " is not output when the batch1 data is comming?

Thanks a lot!




 
Date: 2018-08-27 11:01
Subject: Re: Re: About the question of Spark Structured Streaming window output
You may want to add streaming listener to your query and see when/how watermark is updated. In short, watermark is calculated from previous batch and calculated value is applied to current batch. So you may think that the result is provided later than expected, maybe a batch.

2018년 8월 27일 (월) 오전 11:56, [hidden email] <[hidden email]>님이 작성:
Hi Gerard Mass:
Thanks a lot for your reply. 
When i use "append" model,  I send the following data:
2018-08-27 09:53:00,1
2018-08-27 09:53:01,1
The result (which has only schema, like the following) has received after the batch is end. But when the time of window + watermark is up, there is no result to output. Is there something I misss? Thanks in advance.




 
Date: 2018-08-27 05:00
Subject: Re: About the question of Spark Structured Streaming window output
Hi,

When you use a window in Append mode, you need to wait for the end of the window + watermark to see the final record from the "append" mode.
This is your query over time. Note the timestamp at the right side of the cell and the data present in it.

val windowedCounts = dataSrc
      .withWatermark("timestamp", "1 minutes")
      .groupBy(window($"timestamp", "5 minutes"))
      .agg(sum("value") as "sumvalue")
      .select("window.start", "window.end","sumvalue")


image.png

Going back to your questions:
1、when I set the append output model,  I send inputdata, but there is no result to output. How to use append model in window aggreate case ?
Wait for the window + watermark to expire and you'll see the append record output

2、when I set the update output model, I send inputdata, the result is output every batch .But I want output the result only once when window is end. How can I do?
Use `append` mode.

kr, Gerard.

On Thu, Aug 23, 2018 at 4:31 AM [hidden email] <[hidden email]> wrote:
Hi :
   I have some questions about spark structured streaming window output  in spark 2.3.1.  I write the application code as following:

case class DataType(time:Timestamp, value:Long) {}

val spark = SparkSession
      .builder
      .appName("StructuredNetworkWordCount")
      .master("local[1]")
      .getOrCreate()
 
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to localhost:9999
val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

val words = lines.as[String].map(l => {
  var tmp = l.split(",")
  DataType(Timestamp.valueOf(tmp(0)), tmp(1).toLong)
}).as[DataType]

val windowedCounts = words
      .withWatermark("time", "1 minutes")
      .groupBy(window($"time", "5 minutes"))
      .agg(sum("value") as "sumvalue")
      .select("window.start", "window.end","sumvalue")

val query = windowedCounts.writeStream
  .outputMode("update")
  .format("console")
  .trigger(Trigger.ProcessingTime("10 seconds"))
  .start()

query.awaitTermination()

the input data format is :
2018-08-20 12:01:00,1
2018-08-20 12:02:01,1

My questions are:
1、when I set the append output model,  I send inputdata, but there is no result to output. How to use append model in window aggreate case ?
2、when I set the update output model, I send inputdata, the result is output every batch .But I want output the result only once when window is end. How can I do?

Thanks in advance!