def fetch_data(self):
self.sc.setLogLevel("ERROR")
#{"rowkey":"c9289c6e-77f5-4a65-9dfb-d6b675d67cff","ticker":"MSFT", "timeissued":"2021-02-23T08:42:23", "price":31.12}
schema = StructType().add("rowkey", StringType()).add("ticker", StringType()).add("timeissued", TimestampType()).add("price", FloatType())
try:
# construct a streaming dataframe streamingDataFrame that subscribes to topic config['MDVariables']['topic']) -> md (market data)
streamingDataFrame = self.spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", config['MDVariables']['bootstrapServers'],) \
.option("schema.registry.url", config['MDVariables']['schemaRegistryURL']) \
.option("group.id", config['common']['appName']) \ .option("subscribe", config['MDVariables']['topic']) \
.option("failOnDataLoss", "false") \
.option("includeHeaders", "true") \
.option("startingOffsets", "latest") \
.load() \
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))
#streamingDataFrame.printSchema()
"""
"foreach" performs custom write logic on each row and "foreachBatch" performs custom write logic on each micro-batch through SendToBigQuery function
foreachBatch(SendToBigQuery) expects 2 parameters, first: micro-batch as DataFrame or Dataset and second: unique id for each batch
Using foreachBatch, we write each micro batch to storage defined in our custom logic. In this case, we store the output of our streaming application to Google BigQuery table.
Note that we are appending data and column "rowkey" is defined as UUID so it can be used as the primary key
"""
result = streamingDataFrame.select( \
col("parsed_value.rowkey").alias("rowkey") \
, col("parsed_value.ticker").alias("ticker") \
, col("parsed_value.timeissued").alias("timeissued") \
, col("parsed_value.price").alias("price")). \
withColumn("currency", lit(config['MDVariables']['currency'])). \
withColumn("op_type", lit(config['MDVariables']['op_type'])). \
withColumn("op_time", current_timestamp()). \
writeStream. \
outputMode('append'). \
option("truncate", "false"). \
format('console'). \
start()
except Exception as e:
print(f"""{e}, quitting""")
sys.exit(1)
result.awaitTermination()
With this output
-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+----------+-----+--------+-------+-------+
|rowkey|ticker|timeissued|price|currency|op_type|op_time|
+------+------+----------+-----+--------+-------+-------+
+------+------+----------+-----+--------+-------+-------+
-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------+------+-------------------+------+--------+-------+-----------------------+
|rowkey |ticker|timeissued |price |currency|op_type|op_time |
+------------------------------------+------+-------------------+------+--------+-------+-----------------------+
|35bc0378-a782-4183-999f-561a1dc162aa|MRW |2021-02-27 17:15:49|300.75|GBP |1 |2021-02-27 17:16:24.472|
|39c55b09-7f50-43fe-a0a1-f88e5bdd51e1|ORCL |2021-02-27 17:15:49|23.75 |GBP |1 |2021-02-27 17:16:24.472|
|22dfaf4f-2335-4658-aa74-3c0e4f05cc46|MKS |2021-02-27 17:15:49|441.9 |GBP |1 |2021-02-27 17:16:24.472|
However, GCP offers Dataproc compute servers that use Spark 3.1.1.
The same code is stuck in BatchId 0 and does not move on.
Streaming DataFrame : True
21/02/27 18:01:09 WARN org.apache.spark.sql.streaming.StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
-------------------------------------------
Batch: 0
-------------------------------------------
+------+------+----------+-----+--------+-------+-------+
|rowkey|ticker|timeissued|price|currency|op_type|op_time|
+------+------+----------+-----+--------+-------+-------+
+------+------+----------+-----+--------+-------+-------+
I am getting one additional warning line highlighted above. Does that signify anything. Also is there anything else I can do to debug it. FYI, I can see that the data is coming through Kafka topic output
$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper ctpcluster-m:2181, ctpcluster-w-0:2181, ctpcluster-w-1:2181 --topic md
"rowkey":"56e9ef90-5113-4731-9f6e-1f91d5849799","ticker":"MSFT", "timeissued":"2021-02-27T18:20:42", "price":27.02}
In GCP we have zookeepers and Kafka brokers on containers but that should not matter?
Thanks
LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction
of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from such
loss, damage or destruction.