Reload some static data during struct streaming

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Reload some static data during struct streaming

spark receiver
Hi 

I’m using struct streaming(spark 2.2)  to receive Kafka msg ,it works great. The thing is I need to join the Kafka message with a relative static table stored in mysql database (let’s call it metadata here).

So is it possible to reload the metadata table after some time interval(like daily ) without restart running struct streaming?

Snippet code as following :
// df_meta contains important information to join with the dataframe read from kafka
val df_meta = spark.read.format("jdbc").option("url", mysql_url).option("dbtable", "v_entity_ap_rel").load()
df_meta.cache()
val df = spark.readStream
.format(
"kafka")
.option(
"kafka.bootstrap.servers", x.x.x.x:9092").option("fetch.message.max.bytes", "50000000").option("kafka.max.partition.fetch.bytes", "50000000")
.option(
"subscribe", "rawdb.raw_data")
.option(
"failOnDataLoss", true)
.option(
"startingOffsets", "latest")
.load()
.select(
$"value".as[Array[Byte]])
.map(avroDeserialize(_))
.as[ApRawData].select(
"APMAC", "RSSI", "sourceMacAddress", "updatingTime")
.join(df_meta.as("b"), $"a.apmac" === $"b.apmac)

df.selectExpr(
"ENTITYID", "CLIENTMAC", "STIME", "case when a.rrssi>=b.rssi then '1' when a.rrssi < b.nearbyrssi then '3' else '2' end FLAG", "substring(stime,1,13) STIME_HOUR")
.distinct().writeStream.format(
"parquet").partitionBy("STIME_HOUR")
.option(
"checkpointLocation", "/user/root/t_cf_table_chpt").trigger(ProcessingTime("5 minutes"))
.start(
"T_CF_TABLE")
.awaitTermination()

Mason
Reply | Threaded
Open this post in threaded view
|

Re: Reload some static data during struct streaming

Burak Yavuz-2
I think if you don't cache the jdbc table, then it should auto-refresh.

On Mon, Nov 13, 2017 at 1:21 PM, spark receiver <[hidden email]> wrote:
Hi 

I’m using struct streaming(spark 2.2)  to receive Kafka msg ,it works great. The thing is I need to join the Kafka message with a relative static table stored in mysql database (let’s call it metadata here).

So is it possible to reload the metadata table after some time interval(like daily ) without restart running struct streaming?

Snippet code as following :
// df_meta contains important information to join with the dataframe read from kafka
val df_meta = spark.read.format("jdbc").option("url", mysql_url).option("dbtable", "v_entity_ap_rel").load()
df_meta.cache()
val df = spark.readStream
.format(
"kafka")
.option(
"kafka.bootstrap.servers", x.x.x.x:9092").option("fetch.message.max.bytes", "50000000").option("kafka.max.partition.fetch.bytes", "50000000")
.option(
"subscribe", "rawdb.raw_data")
.option(
"failOnDataLoss", true)
.option(
"startingOffsets", "latest")
.load()
.select(
$"value".as[Array[Byte]])
.map(avroDeserialize(_))
.as[ApRawData].select(
"APMAC", "RSSI", "sourceMacAddress", "updatingTime")
.join(df_meta.as("b"), $"a.apmac" === $"b.apmac)

df.selectExpr(
"ENTITYID", "CLIENTMAC", "STIME", "case when a.rrssi>=b.rssi then '1' when a.rrssi < b.nearbyrssi then '3' else '2' end FLAG", "substring(stime,1,13) STIME_HOUR")
.distinct().writeStream.format(
"parquet").partitionBy("STIME_HOUR")
.option(
"checkpointLocation", "/user/root/t_cf_table_chpt").trigger(ProcessingTime("5 minutes"))
.start(
"T_CF_TABLE")
.awaitTermination()

Mason

Reply | Threaded
Open this post in threaded view
|

Re: Reload some static data during struct streaming

spark receiver
I need it cached to improve throughput ,only hope it can be refreshed once a day not every batch.


On Nov 13, 2017, at 4:49 PM, Burak Yavuz <[hidden email]> wrote:

I think if you don't cache the jdbc table, then it should auto-refresh.

On Mon, Nov 13, 2017 at 1:21 PM, spark receiver <[hidden email]> wrote:
Hi 

I’m using struct streaming(spark 2.2)  to receive Kafka msg ,it works great. The thing is I need to join the Kafka message with a relative static table stored in mysql database (let’s call it metadata here).

So is it possible to reload the metadata table after some time interval(like daily ) without restart running struct streaming?

Snippet code as following :
// df_meta contains important information to join with the dataframe read from kafka
val df_meta = spark.read.format("jdbc").option("url", mysql_url).option("dbtable", "v_entity_ap_rel").load()
df_meta.cache()
val df = spark.readStream
.format(
"kafka")
.option(
"kafka.bootstrap.servers", x.x.x.x:9092").option("fetch.message.max.bytes", "50000000").option("kafka.max.partition.fetch.bytes", "50000000")
.option(
"subscribe", "rawdb.raw_data")
.option(
"failOnDataLoss", true)
.option(
"startingOffsets", "latest")
.load()
.select(
$"value".as[Array[Byte]])
.map(avroDeserialize(_))
.as[ApRawData].select(
"APMAC", "RSSI", "sourceMacAddress", "updatingTime")
.join(df_meta.as("b"), $"a.apmac" === $"b.apmac)

df.selectExpr(
"ENTITYID", "CLIENTMAC", "STIME", "case when a.rrssi>=b.rssi then '1' when a.rrssi < b.nearbyrssi then '3' else '2' end FLAG", "substring(stime,1,13) STIME_HOUR")
.distinct().writeStream.format(
"parquet").partitionBy("STIME_HOUR")
.option(
"checkpointLocation", "/user/root/t_cf_table_chpt").trigger(ProcessingTime("5 minutes"))
.start(
"T_CF_TABLE")
.awaitTermination()

Mason