How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

Jürgen Albersdorfer
Hi, I’m obviously new to Spark Structured Streaming, and I want to

1.) Open one (a single) Connection to a Mqtt broker / topic spewing JSON Objects
2.) Transform JSON to Wide Table
3.) Do several different queries on wide Table

What I do:

val lines = session.readStream
  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  .option("topic", "t")
  .option("QoS", 1)
  .option("username", "u")
  .option("password", p")
  .option("persistence", "memory")
  .load("ssl://mqtt ... :8883").as[(String, Timestamp)]

// 1st Sink
val parsedData = lines.select(from_json('value, schema)
val parsedDataQuery = parsedData.writeStream
  .outputMode("append")
  .format("console")
  .start()  // <-------------- this opens one connection to Mqtt Broker

// 2nd Sink
val countPerMinute = parsedData
  .groupBy(window($"timestamp", "1 minute"))
  .count()
val query = countPerMinute.writeStream
  .outputMode("complete")
  .format("console")
  .start()  // <---------------- this opens another connection to Mqtt Broker

parsedDataQuery.awaitTermination()

How to only have one connection to the Mqtt Server and do multiple analyses on the 'parsedData'?

Thanks for your advice in advance,
Jürgen

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

Amiya Mishra
Hi Jürgen,

Have you found any solution or workaround for multiple sinks from single
source as we cannot process multiple sinks at a time ?

As i also has a scenario in ETL where we are using clone component having
multiple sinks with single input stream dataframe.

Can you keep posting once you have any solution.

Thanks
Amiya





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]