Generate windows on processing time in Spark Structured Streaming

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Generate windows on processing time in Spark Structured Streaming

wangsan
Hi all,

How can I use current processing time to generate windows in streaming processing? 
window function's Scala doc says "For a streaming query, you may use the function current_timestamp to generate windows on processing time.”  But when using current_timestamp as column in window function, exceptions occurred.

Here are my code:
val socketDF = spark.readStream
 .format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()

socketDF.createOrReplaceTempView("words")
val windowedCounts = spark.sql(
"""
   |SELECT value as word, current_timestamp() as time, count(1) as count FROM words
   |   GROUP BY window(time, "5 seconds"), word
 """.stripMargin)

windowedCounts
 .writeStream
 .outputMode("complete")
.format("console")
.start()
.awaitTermination()
And here are Exception Info:
Caused by: org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in
Project, Filter, Aggregate or Window, found:



Reply | Threaded
Open this post in threaded view
|

Re: Generate windows on processing time in Spark Structured Streaming

Michael Armbrust
Hmmm, we should allow that.  current_timestamp() is acutally deterministic within any given batch.  Could you open a JIRA ticket?

On Fri, Nov 10, 2017 at 1:52 AM, wangsan <[hidden email]> wrote:
Hi all,

How can I use current processing time to generate windows in streaming processing? 
window function's Scala doc says "For a streaming query, you may use the function current_timestamp to generate windows on processing time.”  But when using current_timestamp as column in window function, exceptions occurred.

Here are my code:
val socketDF = spark.readStream
 .format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()

socketDF.createOrReplaceTempView("words")
val windowedCounts = spark.sql(
"""
   |SELECT value as word, current_timestamp() as time, count(1) as count FROM words
   |   GROUP BY window(time, "5 seconds"), word
 """.stripMargin)

windowedCounts
 .writeStream
 .outputMode("complete")
.format("console")
.start()
.awaitTermination()
And here are Exception Info:
Caused by: org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in
Project, Filter, Aggregate or Window, found: