[Spark-Kafka-Streaming] Verifying the approach for multiple queries

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[Spark-Kafka-Streaming] Verifying the approach for multiple queries

Amit Joshi
Hi,

I have a scenario where a kafka topic is being written with different types of json records.
I have to regroup the records based on the type and then fetch the schema and parse and write as parquet.
I have tried structured programming. But dynamic schema is a constraint.
So I have used DStreams and though I know the approach I have taken may not be good.
If anyone can pls let me know if the approach will scale and possible pros and cons.
I am collecting the grouped records and then again forming the dataframe for each grouped record.
createKeyValue -> This is creating the key value pair with schema information.
stream.foreachRDD { (rdd, time) =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val result = rdd.map(createKeyValue).reduceByKey((x,y) => x ++ y).collect()
result.foreach(x=> println(x._1))
result.map(x=> {
val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val df = x._2 toDF("value")
df.select(from_json($"value", x._1._2, Map.empty[String,String]).as("data"))
.select($"data.*")
//.withColumn("entity", lit("invoice"))
.withColumn("year",year($"TimeUpdated"))
.withColumn("month",month($"TimeUpdated"))
.withColumn("day",dayofmonth($"TimeUpdated"))
.write.partitionBy("name","year","month","day").mode("append").parquet(path)
})
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}
Reply | Threaded
Open this post in threaded view
|

回复:[Spark-Kafka-Streaming] Verifying the approach for multiple queries

tianlangstudio
Hello, Sir! 
What about process and group the data first then write grouped data to Kafka topics A and B.   Then read topic A or B from another Spark Application and process it more. Like the term ETL's mean.

        

------------------------------------------------------------------
发件人:Amit Joshi <[hidden email]>
发送时间:2020年8月10日(星期一) 02:37
收件人:user <[hidden email]>
主 题:[Spark-Kafka-Streaming] Verifying the approach for multiple queries

Hi,

I have a scenario where a kafka topic is being written with different types of json records.
I have to regroup the records based on the type and then fetch the schema and parse and write as parquet.
I have tried structured programming. But dynamic schema is a constraint.
So I have used DStreams and though I know the approach I have taken may not be good.
If anyone can pls let me know if the approach will scale and possible pros and cons.
I am collecting the grouped records and then again forming the dataframe for each grouped record.
createKeyValue -> This is creating the key value pair with schema information.
stream.foreachRDD { (rdd, time) =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val result = rdd.map(createKeyValue).reduceByKey((x,y) => x ++ y).collect()
result.foreach(x=> println(x._1))
result.map(x=> {
val spark = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
import org.apache.spark.sql.functions._
val df = x._2 toDF("value")
df.select(from_json($"value", x._1._2, Map.empty[String,String]).as("data"))
.select($"data.*")
//.withColumn("entity", lit("invoice"))
.withColumn("year",year($"TimeUpdated"))
.withColumn("month",month($"TimeUpdated"))
.withColumn("day",dayofmonth($"TimeUpdated"))
.write.partitionBy("name","year","month","day").mode("append").parquet(path)
})
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}