[SS]How to add a column with custom system time?

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

[SS]How to add a column with custom system time?

KevinZwx
Hi,

In structured streaming how can I add a column to a dataset with current system time aligned with 15 minutes?

Thanks.
Reply | Threaded
Open this post in threaded view
|

Re: [SS]How to add a column with custom system time?

Michael Armbrust
import org.apache.spark.sql.functions._

df.withColumn("window", window(current_timestamp(), "15 minutes"))

On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <[hidden email]> wrote:
Hi,

In structured streaming how can I add a column to a dataset with current system time aligned with 15 minutes?

Thanks.

Reply | Threaded
Open this post in threaded view
|

Re: [SS]How to add a column with custom system time?

KevinZwx
Thanks for reply, but using this method I got an exception:

"Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic expressions are only allowed in

Project, Filter, Aggregate or Window"

Can you give more advice?


Michael Armbrust <[hidden email]>于2017年9月12日周二 上午4:48写道:
import org.apache.spark.sql.functions._

df.withColumn("window", window(current_timestamp(), "15 minutes"))

On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <[hidden email]> wrote:
Hi,

In structured streaming how can I add a column to a dataset with current system time aligned with 15 minutes?

Thanks.

Reply | Threaded
Open this post in threaded view
|

Re: [SS]How to add a column with custom system time?

Michael Armbrust
Which version of spark?

On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <[hidden email]> wrote:
Thanks for reply, but using this method I got an exception:

"Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic expressions are only allowed in

Project, Filter, Aggregate or Window"

Can you give more advice?


Michael Armbrust <[hidden email]>于2017年9月12日周二 上午4:48写道:
import org.apache.spark.sql.functions._

df.withColumn("window", window(current_timestamp(), "15 minutes"))

On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <[hidden email]> wrote:
Hi,

In structured streaming how can I add a column to a dataset with current system time aligned with 15 minutes?

Thanks.


Reply | Threaded
Open this post in threaded view
|

Re: [SS]How to add a column with custom system time?

KevinZwx
The spark version is 2.2.0

Michael Armbrust <[hidden email]>于2017年9月12日周二 下午12:32写道:
Which version of spark?

On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <[hidden email]> wrote:
Thanks for reply, but using this method I got an exception:

"Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic expressions are only allowed in

Project, Filter, Aggregate or Window"

Can you give more advice?


Michael Armbrust <[hidden email]>于2017年9月12日周二 上午4:48写道:
import org.apache.spark.sql.functions._

df.withColumn("window", window(current_timestamp(), "15 minutes"))

On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <[hidden email]> wrote:
Hi,

In structured streaming how can I add a column to a dataset with current system time aligned with 15 minutes?

Thanks.


Reply | Threaded
Open this post in threaded view
|

Re: [SS]How to add a column with custom system time?

Michael Armbrust
Can you show all the code?  This works for me.

On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <[hidden email]> wrote:
The spark version is 2.2.0

Michael Armbrust <[hidden email]>于2017年9月12日周二 下午12:32写道:
Which version of spark?

On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <[hidden email]> wrote:
Thanks for reply, but using this method I got an exception:

"Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic expressions are only allowed in

Project, Filter, Aggregate or Window"

Can you give more advice?


Michael Armbrust <[hidden email]>于2017年9月12日周二 上午4:48写道:
import org.apache.spark.sql.functions._

df.withColumn("window", window(current_timestamp(), "15 minutes"))

On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <[hidden email]> wrote:
Hi,

In structured streaming how can I add a column to a dataset with current system time aligned with 15 minutes?

Thanks.



Reply | Threaded
Open this post in threaded view
|

Re: [SS]How to add a column with custom system time?

KevinZwx
Yes, my code is shown below
/**
    * input
    */
  val logs = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", BROKER_SERVER)
    .option("subscribe", TOPIC)
    .option("startingOffset", "latest")
    .load()

  /**
    * process
    */
  val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]

  val events = logValues
    .map(parseFunction)
    .select(
      $"_1".alias("date").cast("timestamp"),
      $"_2".alias("uuid").cast("string")
    )

  val results = events
    .withWatermark("date", "1 day")
    .dropDuplicates("uuid", "date")
    .groupBy($"date")
    .count()
    .withColumn("window", window(current_timestamp(), "15 minutes"))

  /**
    * output
    */
  val query = results
    .writeStream
    .outputMode("update")
    .format("console")
    .option("truncate", "false")
    .trigger(Trigger.ProcessingTime("1 seconds"))
    .start()

  query.awaitTermination()

and I use play json to parse input logs from kafka ,the parse function is like

  def parseFunction(str: String): (Long, String) = {
    val json = Json.parse(str)
    val timestamp = (json \ "time").get.toString().toLong
    val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
    val uuid = (json \ "uuid").get.toString()
    (date, uuid)  
  }

Michael Armbrust <[hidden email]>于2017年9月13日周三 上午2:36写道:
Can you show all the code?  This works for me.

On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <[hidden email]> wrote:
The spark version is 2.2.0

Michael Armbrust <[hidden email]>于2017年9月12日周二 下午12:32写道:
Which version of spark?

On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <[hidden email]> wrote:
Thanks for reply, but using this method I got an exception:

"Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic expressions are only allowed in

Project, Filter, Aggregate or Window"

Can you give more advice?


Michael Armbrust <[hidden email]>于2017年9月12日周二 上午4:48写道:
import org.apache.spark.sql.functions._

df.withColumn("window", window(current_timestamp(), "15 minutes"))

On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <[hidden email]> wrote:
Hi,

In structured streaming how can I add a column to a dataset with current system time aligned with 15 minutes?

Thanks.



Reply | Threaded
Open this post in threaded view
|

Re: [SS]How to add a column with custom system time?

KevinZwx
and I use .withColumn("window", window(current_timestamp(), "15 minutes")) after count

张万新 <[hidden email]>于2017年9月13日周三 上午11:32写道:
Yes, my code is shown below
/**
    * input
    */
  val logs = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", BROKER_SERVER)
    .option("subscribe", TOPIC)
    .option("startingOffset", "latest")
    .load()

  /**
    * process
    */
  val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]

  val events = logValues
    .map(parseFunction)
    .select(
      $"_1".alias("date").cast("timestamp"),
      $"_2".alias("uuid").cast("string")
    )

  val results = events
    .withWatermark("date", "1 day")
    .dropDuplicates("uuid", "date")
    .groupBy($"date")
    .count()
    .withColumn("window", window(current_timestamp(), "15 minutes"))

  /**
    * output
    */
  val query = results
    .writeStream
    .outputMode("update")
    .format("console")
    .option("truncate", "false")
    .trigger(Trigger.ProcessingTime("1 seconds"))
    .start()

  query.awaitTermination()

and I use play json to parse input logs from kafka ,the parse function is like

  def parseFunction(str: String): (Long, String) = {
    val json = Json.parse(str)
    val timestamp = (json \ "time").get.toString().toLong
    val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
    val uuid = (json \ "uuid").get.toString()
    (date, uuid)  
  }

Michael Armbrust <[hidden email]>于2017年9月13日周三 上午2:36写道:
Can you show all the code?  This works for me.

On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <[hidden email]> wrote:
The spark version is 2.2.0

Michael Armbrust <[hidden email]>于2017年9月12日周二 下午12:32写道:
Which version of spark?

On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <[hidden email]> wrote:
Thanks for reply, but using this method I got an exception:

"Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic expressions are only allowed in

Project, Filter, Aggregate or Window"

Can you give more advice?


Michael Armbrust <[hidden email]>于2017年9月12日周二 上午4:48写道:
import org.apache.spark.sql.functions._

df.withColumn("window", window(current_timestamp(), "15 minutes"))

On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <[hidden email]> wrote:
Hi,

In structured streaming how can I add a column to a dataset with current system time aligned with 15 minutes?

Thanks.



Reply | Threaded
Open this post in threaded view
|

Re: [SS]How to add a column with custom system time?

KevinZwx
It seems current_timestamp() cannot be used directly in window function? because after attempts I found that using 

df.count.withColumn("pTime", current_timestamp).select(window($"pTime", "15 minutes"), $"count")

instead of 

df.count.withColumn("window", window(current_timestamp(), "15 minutes"))

throws no exception and works fine. I don't know if this is a problem that needs improvement.


张万新 <[hidden email]>于2017年9月13日周三 上午11:43写道:
and I use .withColumn("window", window(current_timestamp(), "15 minutes")) after count

张万新 <[hidden email]>于2017年9月13日周三 上午11:32写道:
Yes, my code is shown below
/**
    * input
    */
  val logs = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", BROKER_SERVER)
    .option("subscribe", TOPIC)
    .option("startingOffset", "latest")
    .load()

  /**
    * process
    */
  val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]

  val events = logValues
    .map(parseFunction)
    .select(
      $"_1".alias("date").cast("timestamp"),
      $"_2".alias("uuid").cast("string")
    )

  val results = events
    .withWatermark("date", "1 day")
    .dropDuplicates("uuid", "date")
    .groupBy($"date")
    .count()
    .withColumn("window", window(current_timestamp(), "15 minutes"))

  /**
    * output
    */
  val query = results
    .writeStream
    .outputMode("update")
    .format("console")
    .option("truncate", "false")
    .trigger(Trigger.ProcessingTime("1 seconds"))
    .start()

  query.awaitTermination()

and I use play json to parse input logs from kafka ,the parse function is like

  def parseFunction(str: String): (Long, String) = {
    val json = Json.parse(str)
    val timestamp = (json \ "time").get.toString().toLong
    val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
    val uuid = (json \ "uuid").get.toString()
    (date, uuid)  
  }

Michael Armbrust <[hidden email]>于2017年9月13日周三 上午2:36写道:
Can you show all the code?  This works for me.

On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <[hidden email]> wrote:
The spark version is 2.2.0

Michael Armbrust <[hidden email]>于2017年9月12日周二 下午12:32写道:
Which version of spark?

On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <[hidden email]> wrote:
Thanks for reply, but using this method I got an exception:

"Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic expressions are only allowed in

Project, Filter, Aggregate or Window"

Can you give more advice?


Michael Armbrust <[hidden email]>于2017年9月12日周二 上午4:48写道:
import org.apache.spark.sql.functions._

df.withColumn("window", window(current_timestamp(), "15 minutes"))

On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <[hidden email]> wrote:
Hi,

In structured streaming how can I add a column to a dataset with current system time aligned with 15 minutes?

Thanks.



Reply | Threaded
Open this post in threaded view
|

Re: [SS]How to add a column with custom system time?

Michael Armbrust
Can you show the explain() for the version that doesn't work?  You might just be hitting a bug.

On Tue, Sep 12, 2017 at 9:03 PM, 张万新 <[hidden email]> wrote:
It seems current_timestamp() cannot be used directly in window function? because after attempts I found that using 

df.count.withColumn("pTime", current_timestamp).select(window($"pTime", "15 minutes"), $"count")

instead of 

df.count.withColumn("window", window(current_timestamp(), "15 minutes"))

throws no exception and works fine. I don't know if this is a problem that needs improvement.


张万新 <[hidden email]>于2017年9月13日周三 上午11:43写道:
and I use .withColumn("window", window(current_timestamp(), "15 minutes")) after count

张万新 <[hidden email]>于2017年9月13日周三 上午11:32写道:
Yes, my code is shown below
/**
    * input
    */
  val logs = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", BROKER_SERVER)
    .option("subscribe", TOPIC)
    .option("startingOffset", "latest")
    .load()

  /**
    * process
    */
  val logValues = logs.selectExpr("CAST(value AS STRING)").as[(String)]

  val events = logValues
    .map(parseFunction)
    .select(
      $"_1".alias("date").cast("timestamp"),
      $"_2".alias("uuid").cast("string")
    )

  val results = events
    .withWatermark("date", "1 day")
    .dropDuplicates("uuid", "date")
    .groupBy($"date")
    .count()
    .withColumn("window", window(current_timestamp(), "15 minutes"))

  /**
    * output
    */
  val query = results
    .writeStream
    .outputMode("update")
    .format("console")
    .option("truncate", "false")
    .trigger(Trigger.ProcessingTime("1 seconds"))
    .start()

  query.awaitTermination()

and I use play json to parse input logs from kafka ,the parse function is like

  def parseFunction(str: String): (Long, String) = {
    val json = Json.parse(str)
    val timestamp = (json \ "time").get.toString().toLong
    val date = (timestamp / (60 * 60 * 24) * 24 -8) * 60 * 60
    val uuid = (json \ "uuid").get.toString()
    (date, uuid)  
  }

Michael Armbrust <[hidden email]>于2017年9月13日周三 上午2:36写道:
Can you show all the code?  This works for me.

On Tue, Sep 12, 2017 at 12:05 AM, 张万新 <[hidden email]> wrote:
The spark version is 2.2.0

Michael Armbrust <[hidden email]>于2017年9月12日周二 下午12:32写道:
Which version of spark?

On Mon, Sep 11, 2017 at 8:27 PM, 张万新 <[hidden email]> wrote:
Thanks for reply, but using this method I got an exception:

"Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: nondeterministic expressions are only allowed in

Project, Filter, Aggregate or Window"

Can you give more advice?


Michael Armbrust <[hidden email]>于2017年9月12日周二 上午4:48写道:
import org.apache.spark.sql.functions._

df.withColumn("window", window(current_timestamp(), "15 minutes"))

On Mon, Sep 11, 2017 at 3:03 AM, 张万新 <[hidden email]> wrote:
Hi,

In structured streaming how can I add a column to a dataset with current system time aligned with 15 minutes?

Thanks.