re: spark streaming / AnalysisException on collect()

Previous Topic Next Topic
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

re: spark streaming / AnalysisException on collect()

Peter Liu
Hello there,

I have a quick question regarding how to share data (a small data collection) between a kafka producer and consumer using spark streaming (spark 2.2):

the data published by a kafka producer is received in order on the kafka consumer side (see (a) copied below).

however, collect() or cache() on a streaming dataframe does not seem to be supported (see links in (b) below): i got this:
Exception in thread "DataProducer" org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;

My question would be:

--- How can I use the collection data (on a streaming dataframe) arrived on the consumer side, e.g convert it to an array of objects?
--- Maybe there's another quick way to use kafka for sharing static data (instead of streaming) between two spark application services (without any common spark context and session etc.)?

I have copied some code snippet in (c).

It seems to be a very simple use case scenario to share a global collection between a spark producer and consumer. But I spent entire day to try various options and gone thru online resources such as google-general/apache-spark/stackoverflow/cloudera/etc/etc.

Any help would be very much appreciated!



(a) streaming data (df) received on the consumer side (console sink):

 |-- ad_id: string (nullable = true)
 |-- campaign_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)

Batch: 0
|ad_id                               |campaign_id                         |timestamp              |
|b5629b58-376e-462c-9e65-726184390c84|bf412fa4-aeaa-4148-8467-1e1e2a6e0945|2018-04-27 14:35:45.475|
|64e93f73-15bb-478c-9f96-fd38f6a24da2|bf412fa4-aeaa-4148-8467-1e1e2a6e0945|2018-04-27 14:35:45.475|
|05fa1349-fcb3-432e-9b58-2bb0559859a2|060810fd-0430-444f-808c-8a177613226a|2018-04-27 14:35:45.478|
|ae0a176e-236a-4d3a-acb9-141157e81568|42b68023-6a3a-4618-a54a-e6f71df26710|2018-04-27 14:35:45.484|

(b) online discussions on unsupported operations on streaming dataframe:

(c) code snippet:


   val rawDf = spark
      .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
      .option("startingOffsets", "earliest")
      .option("subscribe", Variables.CAMPAIGNS_TOPIC)


val mySchema = StructType(Array(
      StructField("ad_id", StringType),
      StructField("campaign_id", StringType)))

    val campaignsDf2 =$"value", mySchema).as("data"), $"timestamp")
      .select("data.*", "timestamp")


    .trigger(org.apache.spark.sql.streaming.Trigger.Once()) //trigger once since this is a onetime static data


      val campaignsArrayRows = campaignsDf2.collect()  //<==== not supported  ====> AnalysisException!