|
|
Hi ,
I have a use case where the file path of the json records stored in s3 are coming as a kafka message in kafka. I have to process the data using spark structured streaming.
The design which I thought is as follows: 1. In kafka Spark structures streaming, read the message containing the data path. 2. Collect the message record in driver. (Messages are small in sizes) 3. Create the dataframe from the datalocation.
kafkaDf.select($"value".cast(StringType)) .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) => { //rough code //collec to driver val records = batchDf.collect() //create dataframe and process records foreach((rec: Row) =>{ println("records:######################",rec.toString()) val path = rec.getAs[String]("data_path") val dfToProcess =spark.read.json(path) .... })
} I would like to know the views, if this approach is fine? Specifically if there is some problem with with creating the dataframe after calling collect. If there is any better approach, please let know the same.
Regards Amit Joshi
|
|
Hi Amit,
Why won’t you just map()/mapXXX() the kafkaDf with the mapping function that reads the paths?
Also, do you really have to read the json into an additional dataframe?
Thanks, Boris
From: Amit Joshi <[hidden email]>
Sent: Monday, 18 January 2021 15:04
To: spark-user <[hidden email]>
Subject: [Spark Structured Streaming] Processing the data path coming from kafka.
Hi ,
I have a use case where the file path of the json records stored in s3 are coming as a kafka
message in kafka. I have to process the data using spark structured streaming.
The design which I thought is as follows:
1. In kafka Spark structures streaming, read the message containing the data path.
2. Collect the message record in driver. (Messages are small in sizes)
3. Create the dataframe from the datalocation.
kafkaDf.select($"value".cast(StringType)) .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) => {
//rough code
//collec to driver
val records = batchDf.collect()
//create dataframe and process records foreach((rec: Row) =>{ println("records:######################",rec.toString()) val path = rec.getAs[String]("data_path")
val dfToProcess =spark.read.json(path)
....
})
}
I would like to know the views, if this approach is fine? Specifically if there is some problem with
with creating the dataframe after calling collect.
If there is any better approach, please let know the same.
Regards
Amit Joshi
|
|
Hi Boris,
I need to do processing on the data present in the path. That is the reason I am trying to make the dataframe.
Can you please provide the example of your solution?
Regards Amit
Hi Amit,
Why won’t you just map()/mapXXX() the kafkaDf with the mapping function that reads the paths?
Also, do you really have to read the json into an additional dataframe?
Thanks, Boris
From: Amit Joshi <[hidden email]>
Sent: Monday, 18 January 2021 15:04
To: spark-user <[hidden email]>
Subject: [Spark Structured Streaming] Processing the data path coming from kafka.
Hi ,
I have a use case where the file path of the json records stored in s3 are coming as a kafka
message in kafka. I have to process the data using spark structured streaming.
The design which I thought is as follows:
1. In kafka Spark structures streaming, read the message containing the data path.
2. Collect the message record in driver. (Messages are small in sizes)
3. Create the dataframe from the datalocation.
kafkaDf.select($"value".cast(StringType)) .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) => {
//rough code
//collec to driver
val records = batchDf.collect()
//create dataframe and process records foreach((rec: Row) =>{ println("records:######################",rec.toString()) val path = rec.getAs[String]("data_path")
val dfToProcess =spark.read.json(path)
....
})
}
I would like to know the views, if this approach is fine? Specifically if there is some problem with
with creating the dataframe after calling collect.
If there is any better approach, please let know the same.
Regards
Amit Joshi
|
|
HI Amit,
I was thinking along the lines of (python):
@udf(returnType=StringType())
def
reader_udf(filename:
str) ->
str:
with
open(filename,
"r")
as
f:
return
f.read()
def
run_locally():
with
utils.build_spark_session("Local",
local=True)
as
spark:
df = spark.readStream.csv(r'testdata',
schema=StructType([StructField('filename',
StringType(), True)]))
df = df.withColumn('content',
reader_udf(col('filename')))
q = df.select('content').writeStream.queryName('test').format('console').start()
q.awaitTermination()
Now each row contains the contents of the files, provided they are not large you can foreach() over the df/rdd and do whatever you want with it, such as json.loads()/etc.
If you know the shema of the jsons, you can later explode() them into a flat DF, ala
https://stackoverflow.com/questions/38243717/spark-explode-nested-json-with-array-in-scala
Note that unless I am missing something you cannot access spark session from foreach as code is not running on the driver.
Please say if it makes sense or did I miss anything.
Boris
From: Amit Joshi <[hidden email]>
Sent: Monday, 18 January 2021 17:10
To: Boris Litvak <[hidden email]>
Cc: spark-user <[hidden email]>
Subject: Re: [Spark Structured Streaming] Processing the data path coming from kafka.
Hi Boris,
I need to do processing on the data present in the path.
That is the reason I am trying to make the dataframe.
Can you please provide the example of your solution?
Hi Amit,
Why won’t you just map()/mapXXX() the kafkaDf with the mapping function that reads the paths?
Also, do you really have to read the json into an additional dataframe?
Thanks, Boris
From: Amit Joshi <[hidden email]>
Sent: Monday, 18 January 2021 15:04
To: spark-user <[hidden email]>
Subject: [Spark Structured Streaming] Processing the data path coming from kafka.
Hi ,
I have a use case where the file path of the json records stored in s3 are coming as a kafka
message in kafka. I have to process the data using spark structured streaming.
The design which I thought is as follows:
1. In kafka Spark structures streaming, read the message containing the data path.
2. Collect the message record in driver. (Messages are small in sizes)
3. Create the dataframe from the datalocation.
kafkaDf.select($"value".cast(StringType)) .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) => {
//rough code
//collec to driver
val records = batchDf.collect()
//create dataframe and process records foreach((rec: Row) =>{ println("records:######################",rec.toString()) val path = rec.getAs[String]("data_path")
val dfToProcess =spark.read.json(path)
....
})
}
I would like to know the views, if this approach is fine? Specifically if there is some problem with
with creating the dataframe after calling collect.
If there is any better approach, please let know the same.
Regards
Amit Joshi
|
|
Hi Boris,
Thanks for your code block. I understood what you are trying to achieve in the code.
But content in the file are json records seperated by new line. And we have to make the dataframe out of it, as some processing has to be done on it.
Regards Amit On Monday, January 18, 2021, Boris Litvak < [hidden email]> wrote:
HI Amit,
I was thinking along the lines of (python):
@udf(returnType=StringType())
def
reader_udf(filename:
str) ->
str:
with
open(filename,
"r")
as
f:
return
f.read()
def
run_locally():
with
utils.build_spark_session("Local",
local=True)
as
spark:
df = spark.readStream.csv(r'testdata',
schema=StructType([StructField('filename',
StringType(), True)]))
df = df.withColumn('content',
reader_udf(col('filename')))
q = df.select('content').writeStream.queryName('test').format('console').start()
q.awaitTermination()
Now each row contains the contents of the files, provided they are not large you can foreach() over the df/rdd and do whatever you want with it, such as json.loads()/etc.
If you know the shema of the jsons, you can later explode() them into a flat DF, ala
https://stackoverflow.com/questions/38243717/spark-explode-nested-json-with-array-in-scala
Note that unless I am missing something you cannot access spark session from foreach as code is not running on the driver.
Please say if it makes sense or did I miss anything.
Boris
From: Amit Joshi <[hidden email]>
Sent: Monday, 18 January 2021 17:10
To: Boris Litvak <[hidden email]>
Cc: spark-user <[hidden email]>
Subject: Re: [Spark Structured Streaming] Processing the data path coming from kafka.
Hi Boris,
I need to do processing on the data present in the path.
That is the reason I am trying to make the dataframe.
Can you please provide the example of your solution?
Hi Amit,
Why won’t you just map()/mapXXX() the kafkaDf with the mapping function that reads the paths?
Also, do you really have to read the json into an additional dataframe?
Thanks, Boris
From: Amit Joshi <[hidden email]>
Sent: Monday, 18 January 2021 15:04
To: spark-user <[hidden email]>
Subject: [Spark Structured Streaming] Processing the data path coming from kafka.
Hi ,
I have a use case where the file path of the json records stored in s3 are coming as a kafka
message in kafka. I have to process the data using spark structured streaming.
The design which I thought is as follows:
1. In kafka Spark structures streaming, read the message containing the data path.
2. Collect the message record in driver. (Messages are small in sizes)
3. Create the dataframe from the datalocation.
kafkaDf.select($"value".cast(StringType)) .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) => {
//rough code
//collec to driver
val records = batchDf.collect()
//create dataframe and process records foreach((rec: Row) =>{ println("records:######################",rec.toString()) val path = rec.getAs[String]("data_path")
val dfToProcess =spark.read.json(path)
....
})
}
I would like to know the views, if this approach is fine? Specifically if there is some problem with
with creating the dataframe after calling collect.
If there is any better approach, please let know the same.
Regards
Amit Joshi
|
|
Coming in late.. but if I understand correctly, you can simply use the fact that spark.read (or readStream) will also accept a directory argument. If you provide a directory spark will automagically pull in all the files in that directory. """Reading in multiple files example""" spark = SparkSession.builder.master('local[*]').appName('spark_streaming').getOrCreate()
# Schema for incoming data json_schema = StructType([StructField("username", StringType(), True), StructField("name", StringType(), True), StructField("sex", StringType(), True), StructField("address", StringType(), True), StructField("mail", StringType(), True), StructField("birthdate", DateType(), True), StructField("work", StringType(), True), StructField("salary", IntegerType(), True), StructField("timestamp", TimestampType(), True)])
# Read in a bunch of data files (files are in JSON per line format)
data_directory_path = './data/my_directory' # Create a Spark DF with a bunch of files spark_df = spark.read.schema(json_schema).json(data_directory_path) Hi Boris,
Thanks for your code block. I understood what you are trying to achieve in the code.
But content in the file are json records seperated by new line. And we have to make the dataframe out of it, as some processing has to be done on it.
Regards Amit On Monday, January 18, 2021, Boris Litvak < [hidden email]> wrote:
HI Amit,
I was thinking along the lines of (python):
@udf(returnType=StringType())
def
reader_udf(filename:
str) ->
str:
with
open(filename,
"r")
as
f:
return
f.read()
def
run_locally():
with
utils.build_spark_session("Local",
local=True)
as
spark:
df = spark.readStream.csv(r'testdata',
schema=StructType([StructField('filename',
StringType(), True)]))
df = df.withColumn('content',
reader_udf(col('filename')))
q = df.select('content').writeStream.queryName('test').format('console').start()
q.awaitTermination()
Now each row contains the contents of the files, provided they are not large you can foreach() over the df/rdd and do whatever you want with it, such as json.loads()/etc.
If you know the shema of the jsons, you can later explode() them into a flat DF, ala
https://stackoverflow.com/questions/38243717/spark-explode-nested-json-with-array-in-scala
Note that unless I am missing something you cannot access spark session from foreach as code is not running on the driver.
Please say if it makes sense or did I miss anything.
Boris
From: Amit Joshi <[hidden email]>
Sent: Monday, 18 January 2021 17:10
To: Boris Litvak <[hidden email]>
Cc: spark-user <[hidden email]>
Subject: Re: [Spark Structured Streaming] Processing the data path coming from kafka.
Hi Boris,
I need to do processing on the data present in the path.
That is the reason I am trying to make the dataframe.
Can you please provide the example of your solution?
Hi Amit,
Why won’t you just map()/mapXXX() the kafkaDf with the mapping function that reads the paths?
Also, do you really have to read the json into an additional dataframe?
Thanks, Boris
From: Amit Joshi <[hidden email]>
Sent: Monday, 18 January 2021 15:04
To: spark-user <[hidden email]>
Subject: [Spark Structured Streaming] Processing the data path coming from kafka.
Hi ,
I have a use case where the file path of the json records stored in s3 are coming as a kafka
message in kafka. I have to process the data using spark structured streaming.
The design which I thought is as follows:
1. In kafka Spark structures streaming, read the message containing the data path.
2. Collect the message record in driver. (Messages are small in sizes)
3. Create the dataframe from the datalocation.
kafkaDf.select($"value".cast(StringType)) .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) => {
//rough code
//collec to driver
val records = batchDf.collect()
//create dataframe and process records foreach((rec: Row) =>{ println("records:######################",rec.toString()) val path = rec.getAs[String]("data_path")
val dfToProcess =spark.read.json(path)
....
})
}
I would like to know the views, if this approach is fine? Specifically if there is some problem with
with creating the dataframe after calling collect.
If there is any better approach, please let know the same.
Regards
Amit Joshi
|
|
There you go:
@udf(returnType=ArrayType(StringType()))
def
reader_udf(filename:
str) -> List[str]:
with
open(filename,
"r")
as
f:
return
f.read().split('\n')
def
run_locally():
with
utils.build_spark_session("Local",
local=True)
as
spark:
df = spark.readStream.csv(r'testdata',
schema=StructType([StructField('filename',
StringType(), True)]))
df = df.withColumn('content',
reader_udf(col('filename')))
q = df.select(explode('content')).writeStream.queryName('test').format('console')\
.option('truncate', False).start()
q.awaitTermination()
From: Amit Joshi <[hidden email]>
Sent: Monday, 18 January 2021 20:22
To: Boris Litvak <[hidden email]>
Cc: spark-user <[hidden email]>
Subject: Re: [Spark Structured Streaming] Processing the data path coming from kafka.
Hi Boris,
Thanks for your code block.
I understood what you are trying to achieve in the code.
But content in the file are json records seperated by new line.
And we have to make the dataframe out of it, as some processing has to be done on it.
Amit
On Monday, January 18, 2021, Boris Litvak <[hidden email]> wrote:
HI Amit,
I was thinking along the lines of (python):
@udf(returnType=StringType())
def
reader_udf(filename:
str) ->
str:
with
open(filename,
"r")
as
f:
return
f.read()
def
run_locally():
with
utils.build_spark_session("Local",
local=True)
as
spark:
df = spark.readStream.csv(r'testdata',
schema=StructType([StructField('filename',
StringType(), True)]))
df = df.withColumn('content',
reader_udf(col('filename')))
q = df.select('content').writeStream.queryName('test').format('console').start()
q.awaitTermination()
Now each row contains the contents of the files, provided they are not large you can foreach() over the df/rdd and do whatever you want with it, such as json.loads()/etc.
If you know the shema of the jsons, you can later explode() them into a flat DF, ala
https://stackoverflow.com/questions/38243717/spark-explode-nested-json-with-array-in-scala
Note that unless I am missing something you cannot access spark session from foreach as code is not running on the driver.
Please say if it makes sense or did I miss anything.
Boris
From: Amit Joshi <[hidden email]>
Sent: Monday, 18 January 2021 17:10
To: Boris Litvak <[hidden email]>
Cc: spark-user <[hidden email]>
Subject: Re: [Spark Structured Streaming] Processing the data path coming from kafka.
Hi Boris,
I need to do processing on the data present in the path.
That is the reason I am trying to make the dataframe.
Can you please provide the example of your solution?
Hi Amit,
Why won’t you just map()/mapXXX() the kafkaDf with the mapping function that reads the paths?
Also, do you really have to read the json into an additional dataframe?
Thanks, Boris
From: Amit Joshi <[hidden email]>
Sent: Monday, 18 January 2021 15:04
To: spark-user <[hidden email]>
Subject: [Spark Structured Streaming] Processing the data path coming from kafka.
Hi ,
I have a use case where the file path of the json records stored in s3 are coming as a kafka
message in kafka. I have to process the data using spark structured streaming.
The design which I thought is as follows:
1. In kafka Spark structures streaming, read the message containing the data path.
2. Collect the message record in driver. (Messages are small in sizes)
3. Create the dataframe from the datalocation.
kafkaDf.select($"value".cast(StringType)) .writeStream.foreachBatch((batchDf:DataFrame, batchId:Long) => {
//rough code
//collec to driver
val records = batchDf.collect()
//create dataframe and process records foreach((rec: Row) =>{ println("records:######################",rec.toString()) val path = rec.getAs[String]("data_path")
val dfToProcess =spark.read.json(path)
....
})
}
I would like to know the views, if this approach is fine? Specifically if there is some problem with
with creating the dataframe after calling collect.
If there is any better approach, please let know the same.
Regards
Amit Joshi
|
|