Spark structured streaming Stuck on Batch = 0 on spark 3.1.1, Dataproc cluster

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark structured streaming Stuck on Batch = 0 on spark 3.1.1, Dataproc cluster

Mich Talebzadeh
Hi,

I have a Pyspark program that uses Spark 3.0.1 to read Kafka topic and write it to Google BigQuery. This works fine on Premise and loops over micro-batch of data.

 def fetch_data(self):
        self.sc.setLogLevel("ERROR")
        #{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT", "timeissued":"2021-02-23T08:42:23", "price":31.12}
        schema = StructType().add("rowkey", StringType()).add("ticker", StringType()).add("timeissued", TimestampType()).add("price", FloatType())
        try:
            # construct a streaming dataframe streamingDataFrame that subscribes to topic config['MDVariables']['topic']) -> md (market data)
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms", config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms", config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms", config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms", config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

            #streamingDataFrame.printSchema()

            """
               "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function
                foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch
               Using foreachBatch, we write each micro batch to storage defined in our custom logic. In this case, we store the output of our streaming application to Google BigQuery table.
               Note that we are appending data and column "rowkey" is defined as UUID so it can be used as the primary key
            """
            result = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.ticker").alias("ticker") \
                   , col("parsed_value.timeissued").alias("timeissued") \
                   , col("parsed_value.price").alias("price")). \
                     withColumn("currency", lit(config['MDVariables']['currency'])). \
                     withColumn("op_type", lit(config['MDVariables']['op_type'])). \
                     withColumn("op_time", current_timestamp()). \
                     writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     format('console'). \
                     start()
        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)
        result.awaitTermination()

With this output


-------------------------------------------

Batch: 0

-------------------------------------------

+------+------+----------+-----+--------+-------+-------+

|rowkey|ticker|timeissued|price|currency|op_type|op_time|

+------+------+----------+-----+--------+-------+-------+

+------+------+----------+-----+--------+-------+-------+


-------------------------------------------

Batch: 1

-------------------------------------------

+------------------------------------+------+-------------------+------+--------+-------+-----------------------+

|rowkey                              |ticker|timeissued         |price |currency|op_type|op_time                |

+------------------------------------+------+-------------------+------+--------+-------+-----------------------+

|35bc0378-a782-4183-999f-561a1dc162aa|MRW   |2021-02-27 17:15:49|300.75|GBP     |1      |2021-02-27 17:16:24.472|

|39c55b09-7f50-43fe-a0a1-f88e5bdd51e1|ORCL  |2021-02-27 17:15:49|23.75 |GBP     |1      |2021-02-27 17:16:24.472|

|22dfaf4f-2335-4658-aa74-3c0e4f05cc46|MKS   |2021-02-27 17:15:49|441.9 |GBP     |1      |2021-02-27 17:16:24.472|


However, GCP offers Dataproc compute servers that use Spark 3.1.1.


The same code is stuck in BatchId 0 and does not move on. 

   

Streaming DataFrame :  True

21/02/27 18:01:09 WARN org.apache.spark.sql.streaming.StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.

-------------------------------------------

Batch: 0

-------------------------------------------

+------+------+----------+-----+--------+-------+-------+

|rowkey|ticker|timeissued|price|currency|op_type|op_time|

+------+------+----------+-----+--------+-------+-------+

+------+------+----------+-----+--------+-------+-------+


I am getting one additional warning line highlighted above. Does that signify anything. Also is there anything else I can do to debug it. FYI, I can see that the data is coming through Kafka topic output

 $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper ctpcluster-m:2181, ctpcluster-w-0:2181, ctpcluster-w-1:2181 --topic md

"rowkey":"56e9ef90-5113-4731-9f6e-1f91d5849799","ticker":"MSFT", "timeissued":"2021-02-27T18:20:42", "price":27.02}


In GCP we have zookeepers and Kafka brokers on containers but that should not matter?


Thanks



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming Stuck on Batch = 0 on spark 3.1.1, Dataproc cluster

Mich Talebzadeh
BY the way as per streaming doc, one can monitor streaming status with

         result = streamingDataFrame.select( \
                     writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     format('console'). \
                     start()

        print(result.status)
        print(result.recentProgress)
        print(result.lastProgress)

Ok so they should tell us something.

When I run it where streaming data is displayed (on-premise) I see below (format('console')

{'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}
[]
None
-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+----------+-----+--------+-------+-------+
|rowkey|ticker|timeissued|price|currency|op_type|op_time|
+------+------+----------+-----+--------+-------+-------+
+------+------+----------+-----+--------+-------+-------+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------+------+-------------------+------+--------+-------+----------------------+
|rowkey                              |ticker|timeissued         |price |currency|op_type|op_time               |
+------------------------------------+------+-------------------+------+--------+-------+----------------------+
|e4c02434-fa1f-4e8e-ad94-40c2782e9681|MRW   |2021-03-01 15:11:44|293.75|GBP     |1      |2021-03-01 15:12:16.49|

etc ..
 
On the other hand when I run it in Google Cloud cluster I see exactly the same diagnostics BUT no data!

{'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}
[]
None
-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+----------+-----+--------+-------+-------+
|rowkey|ticker|timeissued|price|currency|op_type|op_time|
+------+------+----------+-----+--------+-------+-------+
+------+------+----------+-----+--------+-------+-------+

So the monitoring does not say anything. 

What does the following signify?

print(result.status)
 

Thanks



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sat, 27 Feb 2021 at 18:26, Mich Talebzadeh <[hidden email]> wrote:
Hi,

I have a Pyspark program that uses Spark 3.0.1 to read Kafka topic and write it to Google BigQuery. This works fine on Premise and loops over micro-batch of data.

 def fetch_data(self):
        self.sc.setLogLevel("ERROR")
        #{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT", "timeissued":"2021-02-23T08:42:23", "price":31.12}
        schema = StructType().add("rowkey", StringType()).add("ticker", StringType()).add("timeissued", TimestampType()).add("price", FloatType())
        try:
            # construct a streaming dataframe streamingDataFrame that subscribes to topic config['MDVariables']['topic']) -> md (market data)
            streamingDataFrame = self.spark \
                .readStream \
                .format("kafka") \
                .option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \
                .option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \
                .option("group.id", config['common']['appName']) \
                .option("zookeeper.connection.timeout.ms", config['MDVariables']['zookeeperConnectionTimeoutMs']) \
                .option("rebalance.backoff.ms", config['MDVariables']['rebalanceBackoffMS']) \
                .option("zookeeper.session.timeout.ms", config['MDVariables']['zookeeperSessionTimeOutMs']) \
                .option("auto.commit.interval.ms", config['MDVariables']['autoCommitIntervalMS']) \
                .option("subscribe", config['MDVariables']['topic']) \
                .option("failOnDataLoss", "false") \
                .option("includeHeaders", "true") \
                .option("startingOffsets", "latest") \
                .load() \
                .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

            #streamingDataFrame.printSchema()

            """
               "foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function
                foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch
               Using foreachBatch, we write each micro batch to storage defined in our custom logic. In this case, we store the output of our streaming application to Google BigQuery table.
               Note that we are appending data and column "rowkey" is defined as UUID so it can be used as the primary key
            """
            result = streamingDataFrame.select( \
                     col("parsed_value.rowkey").alias("rowkey") \
                   , col("parsed_value.ticker").alias("ticker") \
                   , col("parsed_value.timeissued").alias("timeissued") \
                   , col("parsed_value.price").alias("price")). \
                     withColumn("currency", lit(config['MDVariables']['currency'])). \
                     withColumn("op_type", lit(config['MDVariables']['op_type'])). \
                     withColumn("op_time", current_timestamp()). \
                     writeStream. \
                     outputMode('append'). \
                     option("truncate", "false"). \
                     format('console'). \
                     start()
        except Exception as e:
                print(f"""{e}, quitting""")
                sys.exit(1)
        result.awaitTermination()

With this output


-------------------------------------------

Batch: 0

-------------------------------------------

+------+------+----------+-----+--------+-------+-------+

|rowkey|ticker|timeissued|price|currency|op_type|op_time|

+------+------+----------+-----+--------+-------+-------+

+------+------+----------+-----+--------+-------+-------+


-------------------------------------------

Batch: 1

-------------------------------------------

+------------------------------------+------+-------------------+------+--------+-------+-----------------------+

|rowkey                              |ticker|timeissued         |price |currency|op_type|op_time                |

+------------------------------------+------+-------------------+------+--------+-------+-----------------------+

|35bc0378-a782-4183-999f-561a1dc162aa|MRW   |2021-02-27 17:15:49|300.75|GBP     |1      |2021-02-27 17:16:24.472|

|39c55b09-7f50-43fe-a0a1-f88e5bdd51e1|ORCL  |2021-02-27 17:15:49|23.75 |GBP     |1      |2021-02-27 17:16:24.472|

|22dfaf4f-2335-4658-aa74-3c0e4f05cc46|MKS   |2021-02-27 17:15:49|441.9 |GBP     |1      |2021-02-27 17:16:24.472|


However, GCP offers Dataproc compute servers that use Spark 3.1.1.


The same code is stuck in BatchId 0 and does not move on. 

   

Streaming DataFrame :  True

21/02/27 18:01:09 WARN org.apache.spark.sql.streaming.StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.

-------------------------------------------

Batch: 0

-------------------------------------------

+------+------+----------+-----+--------+-------+-------+

|rowkey|ticker|timeissued|price|currency|op_type|op_time|

+------+------+----------+-----+--------+-------+-------+

+------+------+----------+-----+--------+-------+-------+


I am getting one additional warning line highlighted above. Does that signify anything. Also is there anything else I can do to debug it. FYI, I can see that the data is coming through Kafka topic output

 $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper ctpcluster-m:2181, ctpcluster-w-0:2181, ctpcluster-w-1:2181 --topic md

"rowkey":"56e9ef90-5113-4731-9f6e-1f91d5849799","ticker":"MSFT", "timeissued":"2021-02-27T18:20:42", "price":27.02}


In GCP we have zookeepers and Kafka brokers on containers but that should not matter?


Thanks



LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.