How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

Jürgen Albersdorfer
Hi, I’m obviously new to Spark Structured Streaming, and I want to

1.) Open one (a single) Connection to a Mqtt broker / topic spewing JSON Objects
2.) Transform JSON to Wide Table
3.) Do several different queries on wide Table

What I do:

val lines = session.readStream
  .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
  .option("topic", "t")
  .option("QoS", 1)
  .option("username", "u")
  .option("password", p")
  .option("persistence", "memory")
  .load("ssl://mqtt ... :8883").as[(String, Timestamp)]

// 1st Sink
val parsedData = lines.select(from_json('value, schema)
val parsedDataQuery = parsedData.writeStream
  .outputMode("append")
  .format("console")
  .start()  // <-------------- this opens one connection to Mqtt Broker

// 2nd Sink
val countPerMinute = parsedData
  .groupBy(window($"timestamp", "1 minute"))
  .count()
val query = countPerMinute.writeStream
  .outputMode("complete")
  .format("console")
  .start()  // <---------------- this opens another connection to Mqtt Broker

parsedDataQuery.awaitTermination()

How to only have one connection to the Mqtt Server and do multiple analyses on the 'parsedData'?

Thanks for your advice in advance,
Jürgen

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

Amiya Mishra
Hi Jürgen,

Have you found any solution or workaround for multiple sinks from single
source as we cannot process multiple sinks at a time ?

As i also has a scenario in ETL where we are using clone component having
multiple sinks with single input stream dataframe.

Can you keep posting once you have any solution.

Thanks
Amiya





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

chandan prakash
Hi Amiya/Jürgen,
Did you get any lead on this ?
I want to process records post some validation.
Correct records should go in sink1 and incorrect records should go in sink2.
How to achieve this in single stream ?

Regards,
Chandan

On Wed, Jun 13, 2018 at 2:30 PM Amiya Mishra <[hidden email]> wrote:
Hi Jürgen,

Have you found any solution or workaround for multiple sinks from single
source as we cannot process multiple sinks at a time ?

As i also has a scenario in ETL where we are using clone component having
multiple sinks with single input stream dataframe.

Can you keep posting once you have any solution.

Thanks
Amiya





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Chandan Prakash

Reply | Threaded
Open this post in threaded view
|

Re: How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

Amiya Mishra
Hi Chandan/Jürgen,

I had tried through a native code having single input data frame with
multiple sinks as :

Spark provides a method called awaitAnyTermination() in
StreamingQueryManager.scala which provides all the required details to
handle the query processed by spark.By observing documentation of spark with
below points :
                -> Wait until any of the queries on the associated SQLContext has
terminated since the creation of the context, or since `resetTerminated()`
was called. If any query was terminated
                -> If a query has terminated, then subsequent calls to
`awaitAnyTermination()` will either return immediately (if the query was
terminated  by `query.stop()`),or throw the exception immediately (if the
query was terminated with exception). Use `resetTerminated()` to clear past
terminations and wait for new terminations.
                -> In the case where multiple queries have terminated since
`resetTermination()` was called, if any query has terminated with exception,
when `awaitAnyTermination()` will throw any of the exception. For correctly
documenting exceptions across multiple queries,users need to  stop all of
them after any of them terminates with exception, and then check the
`query.exception()` for each query.

   
val inputdf:DataFrame =
sparkSession.readStream.schema(schema).format("csv").option("delimiter",",").csv("src/main/streamingInput")
    query1 =
inputdf.writeStream.option("path","first_output").option("checkpointLocation","checkpointloc").format("csv").start()
    query2 =
inputdf.writeStream.option("path","second_output").option("checkpointLocation","checkpoint2").format("csv").start()
    sparkSession.streams.awaitAnyTermination()


Now, both "first_output" and "second_output" file write successfully.

Try it out on your site and let me know if you found any limitation.And try
to posting if you found any other way.

Let me correct if i had grammatical mistake.

Thanks
Amiya



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

Tathagata Das
Hey all, 

In Spark 2.4.0, there will be a new feature called foreachBatch which will expose the output rows of every micro-batch as a dataframe, on which you apply a user-defined function. With that, you can reuse existing batch sources for writing results as well as write results to multiple locations. 

Reuse existing batch data sources
For many storage systems, there may not be a streaming sink available yet, but there may already exist a data writer for batch queries. Using foreachBatch(), you can use the batch data writers on the output of each micro-batch. For example writing from a stream to cassandra using the Cassandra Spark Connector will be like 

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.cassandraFormat(...).save(...)
}

Write to multiple locations
If you want to write the output of a streaming query to multiple locations, then you can simply write the output DataFrame/Dataset multiple times. However, each attempt to write can cause the output data to be recomputed (including possible re-reading of the input data). To avoid recomputations, you should cache the output DataFrame/Dataset, write it to multiple locations, and then uncache it. Here is an outline.

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.cache()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.uncache()
}

Apply additional DataFrame operations
Many DataFrame and Dataset operations are not supported in streaming DataFrames because Spark does not support generating incremental plans in those cases. Using foreachBatch() you can apply some of these operations on each micro-batch output. However, you will have to reason about the end-to-end semantics of doing that operation yourself.

NOTE: By default foreachBatch() provides only at-least-once write guarantees. However, you can use the batchId provided to the function as a way to deduplicate the output and get an exactly-once guarantee.

TD

On Thu, Jul 5, 2018 at 12:33 AM, Amiya Mishra <[hidden email]> wrote:
Hi Chandan/Jürgen,

I had tried through a native code having single input data frame with
multiple sinks as :

Spark provides a method called awaitAnyTermination() in
StreamingQueryManager.scala which provides all the required details to
handle the query processed by spark.By observing documentation of spark with
below points :
                -> Wait until any of the queries on the associated SQLContext has
terminated since the creation of the context, or since `resetTerminated()`
was called. If any query was terminated
                -> If a query has terminated, then subsequent calls to
`awaitAnyTermination()` will either return immediately (if the query was
terminated  by `query.stop()`),or throw the exception immediately (if the
query was terminated with exception). Use `resetTerminated()` to clear past
terminations and wait for new terminations.
                -> In the case where multiple queries have terminated since
`resetTermination()` was called, if any query has terminated with exception,
when `awaitAnyTermination()` will throw any of the exception. For correctly
documenting exceptions across multiple queries,users need to  stop all of
them after any of them terminates with exception, and then check the
`query.exception()` for each query.     


val inputdf:DataFrame =
sparkSession.readStream.schema(schema).format("csv").option("delimiter",",").csv("src/main/streamingInput")
    query1 =
inputdf.writeStream.option("path","first_output").option("checkpointLocation","checkpointloc").format("csv").start()
    query2 =
inputdf.writeStream.option("path","second_output").option("checkpointLocation","checkpoint2").format("csv").start()
    sparkSession.streams.awaitAnyTermination()


Now, both "first_output" and "second_output" file write successfully.

Try it out on your site and let me know if you found any limitation.And try
to posting if you found any other way.

Let me correct if i had grammatical mistake.

Thanks
Amiya



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

Amiya Mishra
Hi Tathagata,

Is there any limitation of below code while writing to multiple file ?

val inputdf:DataFrame =
sparkSession.readStream.schema(schema).format("csv").option("delimiter",",").csv("src/main/streamingInput")
    query1 =
inputdf.writeStream.option("path","first_output").option("checkpointLocation","checkpointloc").format("csv").start()
    query2 =
inputdf.writeStream.option("path","second_output").option("checkpointLocation","checkpoint2").format("csv").start()
    sparkSession.streams.awaitAnyTermination()


And what will be the release date of spark 2.4.0 ?

Thanks
Amiya







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to branch a Stream / have multiple Sinks / do multiple Queries on one Stream

chandan prakash
Thanks Amiya/TD for responding.

@TD,
Thanks for letting us know about this new foreachBatch api, this handle of per batch dataframe should be useful in many cases.

@Amiya, 
The input source will be read twice, entire dag computation will be done twice. Not limitation but resource utilisation and performance.

Regards,
Chandan



On Fri, Jul 6, 2018 at 2:42 PM Amiya Mishra <[hidden email]> wrote:
Hi Tathagata,

Is there any limitation of below code while writing to multiple file ?

val inputdf:DataFrame =
sparkSession.readStream.schema(schema).format("csv").option("delimiter",",").csv("src/main/streamingInput")
    query1 =
inputdf.writeStream.option("path","first_output").option("checkpointLocation","checkpointloc").format("csv").start()
    query2 =
inputdf.writeStream.option("path","second_output").option("checkpointLocation","checkpoint2").format("csv").start()
    sparkSession.streams.awaitAnyTermination()


And what will be the release date of spark 2.4.0 ?

Thanks
Amiya







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Chandan Prakash