Using two WriteStreams in same spark structured streaming job

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Using two WriteStreams in same spark structured streaming job

act_coder
I have a scenario where I would like to save the same streaming dataframe to
two different streaming sinks.

I have created a streaming dataframe which I need to send to both Kafka
topic and delta lake.

I thought of using forEachBatch, but looks like it doesn't support multiple
STREAMING SINKS.

Also, I tried using spark session.awaitAnyTermination() with multiple write
streams. But the second stream is not getting processed !

Is there a way through which we can achieve this ?!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Using two WriteStreams in same spark structured streaming job

lec ssmi
you can use foreach sink to  achieve the logic you want. 

act_coder <[hidden email]> 于2020年11月4日周三 下午9:56写道:
I have a scenario where I would like to save the same streaming dataframe to
two different streaming sinks.

I have created a streaming dataframe which I need to send to both Kafka
topic and delta lake.

I thought of using forEachBatch, but looks like it doesn't support multiple
STREAMING SINKS.

Also, I tried using spark session.awaitAnyTermination() with multiple write
streams. But the second stream is not getting processed !

Is there a way through which we can achieve this ?!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Using two WriteStreams in same spark structured streaming job

act_coder
This post was updated on .
If I use foreach() function, then I may need to use custom Kafka stream writer
right ?!

And I might not be able to use default writestream.format(Kafka) method ?!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org

Reply | Threaded
Open this post in threaded view
|

Re: Using two WriteStreams in same spark structured streaming job

Kevin Pis
you means sparkSession.streams.awaitAnyTermination()?   May i have your code ?  or you can see the following:


my demo code:

````
 val hourDevice = beginTimeDevice.groupBy($"subsId",$"eventBeginHour",$"serviceType")
      .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForHour")

    hourDevice.writeStream
      .outputMode("update")
      .option("truncate", "false")
      .format("console")
      .start()

    beginTimeDevice.groupBy($"subsId",$"eventBeginDay",$"serviceType")
      .agg("duration" -> "sum").withColumnRenamed("sum(duration)", "durationForDay")

   dayDevice.writeStream
      .outputMode("update")
      .option("truncate", "false")
      .format("console")
      .start()

   sparkSession.streams.awaitAnyTermination()
`````

   sparkSession.streams.awaitAnyTermination() is ok,  maybe its wrong somewhere else in your code.

act_coder <[hidden email]> 于2020年11月5日周四 上午11:45写道:
If I use for each function, then I may need to use custom Kafka stream writer
right ?!

And I might not be able to use default writestream.format(Kafka) method ?!



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--

Best,

Kevin Pis