re: spark streaming / AnalysisException on collect()

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

re: spark streaming / AnalysisException on collect()

Peter Liu
Hello there,

I have a quick question regarding how to share data (a small data collection) between a kafka producer and consumer using spark streaming (spark 2.2):

(A)
the data published by a kafka producer is received in order on the kafka consumer side (see (a) copied below).

(B)
however, collect() or cache() on a streaming dataframe does not seem to be supported (see links in (b) below): i got this:
Exception in thread "DataProducer" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

(C)
My question would be:

--- How can I use the collection data (on a streaming dataframe) arrived on the consumer side, e.g convert it to an array of objects?
--- Maybe there's another quick way to use kafka for sharing static data (instead of streaming) between two spark application services (without any common spark context and session etc.)?

I have copied some code snippet in (c).

It seems to be a very simple use case scenario to share a global collection between a spark producer and consumer. But I spent entire day to try various options and gone thru online resources such as google-general/apache-spark/stackoverflow/cloudera/etc/etc.

Any help would be very much appreciated!

Thanks!

Peter

(a) streaming data (df) received on the consumer side (console sink):

root
 |-- ad_id: string (nullable = true)
 |-- campaign_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------+------------------------------------+-----------------------+
|ad_id                               |campaign_id                         |timestamp              |
+------------------------------------+------------------------------------+-----------------------+
|b5629b58-376e-462c-9e65-726184390c84|bf412fa4-aeaa-4148-8467-1e1e2a6e0945|2018-04-27 14:35:45.475|
|64e93f73-15bb-478c-9f96-fd38f6a24da2|bf412fa4-aeaa-4148-8467-1e1e2a6e0945|2018-04-27 14:35:45.475|
|05fa1349-fcb3-432e-9b58-2bb0559859a2|060810fd-0430-444f-808c-8a177613226a|2018-04-27 14:35:45.478|
|ae0a176e-236a-4d3a-acb9-141157e81568|42b68023-6a3a-4618-a54a-e6f71df26710|2018-04-27 14:35:45.484|

(b) online discussions on unsupported operations on streaming dataframe:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operations

http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#unsupported-operatio... 

https://stackoverflow.com/questions/42062092/why-does-using-cache-on-streaming-datasets-fail-with-analysisexception-queries


(c) code snippet:

OK:

   val rawDf = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
      .option("startingOffsets", "earliest")
      .option("subscribe", Variables.CAMPAIGNS_TOPIC)
      .load()

OK:  

val mySchema = StructType(Array(
      StructField("ad_id", StringType),
      StructField("campaign_id", StringType)))

    val campaignsDf2 = campaignsDf.select(from_json($"value", mySchema).as("data"), $"timestamp")
      .select("data.*", "timestamp")

OK:

   campaignsDf2.writeStream
    .format("console")
    .option("truncate","false")
    .trigger(org.apache.spark.sql.streaming.Trigger.Once()) //trigger once since this is a onetime static data
    .awaitTermination() 
 

Exception:

      val campaignsArrayRows = campaignsDf2.collect()  //<==== not supported  ====> AnalysisException!