How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

Jürgen Albersdorfer
Hi, I’m obviously new to Spark Structured Streaming, and I want to

1.) Open one (a single) Connection to a Mqtt broker / topic spewing JSON Objects
2.) Transform JSON to Wide Table
3.) Do several different queries on wide Table

What I do:

val lines = session.readStream
  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  .option("topic", "t")
  .option("QoS", 1)
  .option("username", "u")
  .option("password", p")
  .option("persistence", "memory")
  .load("ssl://mqtt ... :8883").as[(String, Timestamp)]

// 1st Sink
val parsedData = lines.select(from_json('value, schema)
val parsedDataQuery = parsedData.writeStream
  .outputMode("append")
  .format("console")
  .start()  // <-------------- this opens one connection to Mqtt Broker

// 2nd Sink
val countPerMinute = parsedData
  .groupBy(window($"timestamp", "1 minute"))
  .count()
val query = countPerMinute.writeStream
  .outputMode("complete")
  .format("console")
  .start()  // <---------------- this opens another connection to Mqtt Broker

parsedDataQuery.awaitTermination()

How to only have one connection to the Mqtt Server and do multiple analyses on the 'parsedData'?

Thanks for your advice in advance,
Jürgen

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]