Infer JSON schema in structured streaming Kafka.

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Infer JSON schema in structured streaming Kafka.

satyajit vegesna
Hi All,

I would like to infer JSON schema from a sample of data that i receive from, Kafka Streams(specific topic), and i have to infer the schema as i am going to receive random JSON string with different schema for each topic, so i chose to go ahead with below steps,

a. readStream from Kafka(latest offset), from a single Kafka topic.
b. Some how to store the JSON string into val and infer the schema.
c. stop the stream.
d.Create new readStream(smallest offset) and use the above inferred schema to process the JSON using spark provided JSON support, like from_json, json_object and others and run my actuall business logic.

Now i am not sure how to be successful with step(b). Any help would be appreciated.
And would also like to know if there is any better approach.

Regards,
Satyajit.
Reply | Threaded
Open this post in threaded view
|

Re: Infer JSON schema in structured streaming Kafka.

Jacek Laskowski
Hi,

What about memory sink? That could work.

Pozdrawiam,
Jacek Laskowski
----
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark

On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <[hidden email]> wrote:
Hi All,

I would like to infer JSON schema from a sample of data that i receive from, Kafka Streams(specific topic), and i have to infer the schema as i am going to receive random JSON string with different schema for each topic, so i chose to go ahead with below steps,

a. readStream from Kafka(latest offset), from a single Kafka topic.
b. Some how to store the JSON string into val and infer the schema.
c. stop the stream.
d.Create new readStream(smallest offset) and use the above inferred schema to process the JSON using spark provided JSON support, like from_json, json_object and others and run my actuall business logic.

Now i am not sure how to be successful with step(b). Any help would be appreciated.
And would also like to know if there is any better approach.

Regards,
Satyajit.

Reply | Threaded
Open this post in threaded view
|

Re: Infer JSON schema in structured streaming Kafka.

satyajit vegesna
Hi Jacek,

Thank you for responding back,

i have tried memory sink, and below is what i did 

 val fetchValue = debeziumRecords.selectExpr("value").withColumn("tableName", functions.get_json_object($"value".cast(StringType), "$.schema.name"))
    .withColumn("operation", functions.get_json_object($"value".cast(StringType), "$.payload.op"))
    .withColumn("payloadAfterValue", split(substring_index(debeziumRecords("value"), "\"after\":" ,-1),",\"source\"").getItem(0))
    .drop("tableName").drop("operation").drop("value").as[String].writeStream    
    .outputMode(OutputMode.Append())
    .queryName("record")
    .format("memory")
    .start() 

spark.sql("select * from record").show(truncate = false) //i was expecting to be able to use the record table to read the JSON string, but the table is empty for the first call. And i do not see any dataframe output after the first one

But yeah the above steps work good and i can do things that i need to, in spark-shell, the problem is when i try to code in Intellij, because the streaming query keeps running and i am not sure how to identify and stop the streaming query and use record memory table.

So i would like to stop the streaming query once i know i have some data in my record memory table(is there a way to do that), so i can stop the streaming query and use the memory table, fetch my record.
Any help on how to approach the situation programmatically/any examples pointed would highly be appreciated. 

Regards,
Satyajit.



On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <[hidden email]> wrote:
Hi,

What about memory sink? That could work.

Pozdrawiam,
Jacek Laskowski
----
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark

On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <[hidden email]> wrote:
Hi All,

I would like to infer JSON schema from a sample of data that i receive from, Kafka Streams(specific topic), and i have to infer the schema as i am going to receive random JSON string with different schema for each topic, so i chose to go ahead with below steps,

a. readStream from Kafka(latest offset), from a single Kafka topic.
b. Some how to store the JSON string into val and infer the schema.
c. stop the stream.
d.Create new readStream(smallest offset) and use the above inferred schema to process the JSON using spark provided JSON support, like from_json, json_object and others and run my actuall business logic.

Now i am not sure how to be successful with step(b). Any help would be appreciated.
And would also like to know if there is any better approach.

Regards,
Satyajit.


Reply | Threaded
Open this post in threaded view
|

Re: Infer JSON schema in structured streaming Kafka.

satyajit vegesna
Hi Jacek,

For now , i am using Thread.sleep() on driver, to make sure my streaming query receives some data and and stop it, before the control reaches querying memory table.
Let me know if there is any better way of handling it.

Regards,
Satyajit.

On Sun, Dec 10, 2017 at 10:43 PM, satyajit vegesna <[hidden email]> wrote:
Hi Jacek,

Thank you for responding back,

i have tried memory sink, and below is what i did 

 val fetchValue = debeziumRecords.selectExpr("value").withColumn("tableName", functions.get_json_object($"value".cast(StringType), "$.schema.name"))
    .withColumn("operation", functions.get_json_object($"value".cast(StringType), "$.payload.op"))
    .withColumn("payloadAfterValue", split(substring_index(debeziumRecords("value"), "\"after\":" ,-1),",\"source\"").getItem(0))
    .drop("tableName").drop("operation").drop("value").as[String].writeStream    
    .outputMode(OutputMode.Append())
    .queryName("record")
    .format("memory")
    .start() 

spark.sql("select * from record").show(truncate = false) //i was expecting to be able to use the record table to read the JSON string, but the table is empty for the first call. And i do not see any dataframe output after the first one

But yeah the above steps work good and i can do things that i need to, in spark-shell, the problem is when i try to code in Intellij, because the streaming query keeps running and i am not sure how to identify and stop the streaming query and use record memory table.

So i would like to stop the streaming query once i know i have some data in my record memory table(is there a way to do that), so i can stop the streaming query and use the memory table, fetch my record.
Any help on how to approach the situation programmatically/any examples pointed would highly be appreciated. 

Regards,
Satyajit.



On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <[hidden email]> wrote:
Hi,

What about memory sink? That could work.

Pozdrawiam,
Jacek Laskowski
----
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark

On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <[hidden email]> wrote:
Hi All,

I would like to infer JSON schema from a sample of data that i receive from, Kafka Streams(specific topic), and i have to infer the schema as i am going to receive random JSON string with different schema for each topic, so i chose to go ahead with below steps,

a. readStream from Kafka(latest offset), from a single Kafka topic.
b. Some how to store the JSON string into val and infer the schema.
c. stop the stream.
d.Create new readStream(smallest offset) and use the above inferred schema to process the JSON using spark provided JSON support, like from_json, json_object and others and run my actuall business logic.

Now i am not sure how to be successful with step(b). Any help would be appreciated.
And would also like to know if there is any better approach.

Regards,
Satyajit.



Reply | Threaded
Open this post in threaded view
|

Re: Infer JSON schema in structured streaming Kafka.

Jacek Laskowski
Hi,

What about a custom streaming Sink that would stop the query after addBatch has been called?

Pozdrawiam,
Jacek Laskowski
----
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark

On Mon, Dec 11, 2017 at 9:15 AM, satyajit vegesna <[hidden email]> wrote:
Hi Jacek,

For now , i am using Thread.sleep() on driver, to make sure my streaming query receives some data and and stop it, before the control reaches querying memory table.
Let me know if there is any better way of handling it.

Regards,
Satyajit.

On Sun, Dec 10, 2017 at 10:43 PM, satyajit vegesna <[hidden email]> wrote:
Hi Jacek,

Thank you for responding back,

i have tried memory sink, and below is what i did 

 val fetchValue = debeziumRecords.selectExpr("value").withColumn("tableName", functions.get_json_object($"value".cast(StringType), "$.schema.name"))
    .withColumn("operation", functions.get_json_object($"value".cast(StringType), "$.payload.op"))
    .withColumn("payloadAfterValue", split(substring_index(debeziumRecords("value"), "\"after\":" ,-1),",\"source\"").getItem(0))
    .drop("tableName").drop("operation").drop("value").as[String].writeStream    
    .outputMode(OutputMode.Append())
    .queryName("record")
    .format("memory")
    .start() 

spark.sql("select * from record").show(truncate = false) //i was expecting to be able to use the record table to read the JSON string, but the table is empty for the first call. And i do not see any dataframe output after the first one

But yeah the above steps work good and i can do things that i need to, in spark-shell, the problem is when i try to code in Intellij, because the streaming query keeps running and i am not sure how to identify and stop the streaming query and use record memory table.

So i would like to stop the streaming query once i know i have some data in my record memory table(is there a way to do that), so i can stop the streaming query and use the memory table, fetch my record.
Any help on how to approach the situation programmatically/any examples pointed would highly be appreciated. 

Regards,
Satyajit.



On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <[hidden email]> wrote:
Hi,

What about memory sink? That could work.

Pozdrawiam,
Jacek Laskowski
----
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark

On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <[hidden email]> wrote:
Hi All,

I would like to infer JSON schema from a sample of data that i receive from, Kafka Streams(specific topic), and i have to infer the schema as i am going to receive random JSON string with different schema for each topic, so i chose to go ahead with below steps,

a. readStream from Kafka(latest offset), from a single Kafka topic.
b. Some how to store the JSON string into val and infer the schema.
c. stop the stream.
d.Create new readStream(smallest offset) and use the above inferred schema to process the JSON using spark provided JSON support, like from_json, json_object and others and run my actuall business logic.

Now i am not sure how to be successful with step(b). Any help would be appreciated.
And would also like to know if there is any better approach.

Regards,
Satyajit.




Reply | Threaded
Open this post in threaded view
|

Re: Infer JSON schema in structured streaming Kafka.

Burak Yavuz-2
In Spark 2.2, you can read from Kafka in batch mode, and then use the json reader to infer schema:

val df = spark.read.format("kafka")...
  .select($"value.cast("string"))
val json = spark.read.json(df)
val schema = json.schema

While the above should be slow (since you're reading almost all data in Kafka in batch), but it would work.

My question to you is, do you think it's worth it? Why do you have a random json schema being inputted to your Kafka stream? Can this randomness not mess up everything in the future if someone messes up? Not having fixed, known schemas with streaming data (or any data for that matter) is dangerous for most purposes.
Just food for thought.

Best,
Burak



On Mon, Dec 11, 2017 at 4:01 AM, Jacek Laskowski <[hidden email]> wrote:
Hi,

What about a custom streaming Sink that would stop the query after addBatch has been called?

Pozdrawiam,
Jacek Laskowski
----
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark

On Mon, Dec 11, 2017 at 9:15 AM, satyajit vegesna <[hidden email]> wrote:
Hi Jacek,

For now , i am using Thread.sleep() on driver, to make sure my streaming query receives some data and and stop it, before the control reaches querying memory table.
Let me know if there is any better way of handling it.

Regards,
Satyajit.

On Sun, Dec 10, 2017 at 10:43 PM, satyajit vegesna <[hidden email]> wrote:
Hi Jacek,

Thank you for responding back,

i have tried memory sink, and below is what i did 

 val fetchValue = debeziumRecords.selectExpr("value").withColumn("tableName", functions.get_json_object($"value".cast(StringType), "$.schema.name"))
    .withColumn("operation", functions.get_json_object($"value".cast(StringType), "$.payload.op"))
    .withColumn("payloadAfterValue", split(substring_index(debeziumRecords("value"), "\"after\":" ,-1),",\"source\"").getItem(0))
    .drop("tableName").drop("operation").drop("value").as[String].writeStream    
    .outputMode(OutputMode.Append())
    .queryName("record")
    .format("memory")
    .start() 

spark.sql("select * from record").show(truncate = false) //i was expecting to be able to use the record table to read the JSON string, but the table is empty for the first call. And i do not see any dataframe output after the first one

But yeah the above steps work good and i can do things that i need to, in spark-shell, the problem is when i try to code in Intellij, because the streaming query keeps running and i am not sure how to identify and stop the streaming query and use record memory table.

So i would like to stop the streaming query once i know i have some data in my record memory table(is there a way to do that), so i can stop the streaming query and use the memory table, fetch my record.
Any help on how to approach the situation programmatically/any examples pointed would highly be appreciated. 

Regards,
Satyajit.



On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <[hidden email]> wrote:
Hi,

What about memory sink? That could work.

Pozdrawiam,
Jacek Laskowski
----
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark

On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <[hidden email]> wrote:
Hi All,

I would like to infer JSON schema from a sample of data that i receive from, Kafka Streams(specific topic), and i have to infer the schema as i am going to receive random JSON string with different schema for each topic, so i chose to go ahead with below steps,

a. readStream from Kafka(latest offset), from a single Kafka topic.
b. Some how to store the JSON string into val and infer the schema.
c. stop the stream.
d.Create new readStream(smallest offset) and use the above inferred schema to process the JSON using spark provided JSON support, like from_json, json_object and others and run my actuall business logic.

Now i am not sure how to be successful with step(b). Any help would be appreciated.
And would also like to know if there is any better approach.

Regards,
Satyajit.





Reply | Threaded
Open this post in threaded view
|

Re: Infer JSON schema in structured streaming Kafka.

satyajit vegesna
Hi Burak,

Thank you , for the inputs, would definitely try the options.

The reason we don't have an unified schema is because we are trying to consume data from different topics that contains data from different tables from a DB, and so each table has different columns.

Regards,
Satyajit.

On Dec 11, 2017 9:29 AM, "Burak Yavuz" <[hidden email]> wrote:
In Spark 2.2, you can read from Kafka in batch mode, and then use the json reader to infer schema:

val df = spark.read.format("kafka")...
  .select($"value.cast("string"))
val json = spark.read.json(df)
val schema = json.schema

While the above should be slow (since you're reading almost all data in Kafka in batch), but it would work.

My question to you is, do you think it's worth it? Why do you have a random json schema being inputted to your Kafka stream? Can this randomness not mess up everything in the future if someone messes up? Not having fixed, known schemas with streaming data (or any data for that matter) is dangerous for most purposes.
Just food for thought.

Best,
Burak



On Mon, Dec 11, 2017 at 4:01 AM, Jacek Laskowski <[hidden email]> wrote:
Hi,

What about a custom streaming Sink that would stop the query after addBatch has been called?

Pozdrawiam,
Jacek Laskowski
----
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark

On Mon, Dec 11, 2017 at 9:15 AM, satyajit vegesna <[hidden email]> wrote:
Hi Jacek,

For now , i am using Thread.sleep() on driver, to make sure my streaming query receives some data and and stop it, before the control reaches querying memory table.
Let me know if there is any better way of handling it.

Regards,
Satyajit.

On Sun, Dec 10, 2017 at 10:43 PM, satyajit vegesna <[hidden email]> wrote:
Hi Jacek,

Thank you for responding back,

i have tried memory sink, and below is what i did 

 val fetchValue = debeziumRecords.selectExpr("value").withColumn("tableName", functions.get_json_object($"value".cast(StringType), "$.schema.name"))
    .withColumn("operation", functions.get_json_object($"value".cast(StringType), "$.payload.op"))
    .withColumn("payloadAfterValue", split(substring_index(debeziumRecords("value"), "\"after\":" ,-1),",\"source\"").getItem(0))
    .drop("tableName").drop("operation").drop("value").as[String].writeStream    
    .outputMode(OutputMode.Append())
    .queryName("record")
    .format("memory")
    .start() 

spark.sql("select * from record").show(truncate = false) //i was expecting to be able to use the record table to read the JSON string, but the table is empty for the first call. And i do not see any dataframe output after the first one

But yeah the above steps work good and i can do things that i need to, in spark-shell, the problem is when i try to code in Intellij, because the streaming query keeps running and i am not sure how to identify and stop the streaming query and use record memory table.

So i would like to stop the streaming query once i know i have some data in my record memory table(is there a way to do that), so i can stop the streaming query and use the memory table, fetch my record.
Any help on how to approach the situation programmatically/any examples pointed would highly be appreciated. 

Regards,
Satyajit.



On Sun, Dec 10, 2017 at 9:52 PM, Jacek Laskowski <[hidden email]> wrote:
Hi,

What about memory sink? That could work.

Pozdrawiam,
Jacek Laskowski
----
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Apache Spark 2 https://bit.ly/mastering-apache-spark

On Mon, Dec 11, 2017 at 3:28 AM, satyajit vegesna <[hidden email]> wrote:
Hi All,

I would like to infer JSON schema from a sample of data that i receive from, Kafka Streams(specific topic), and i have to infer the schema as i am going to receive random JSON string with different schema for each topic, so i chose to go ahead with below steps,

a. readStream from Kafka(latest offset), from a single Kafka topic.
b. Some how to store the JSON string into val and infer the schema.
c. stop the stream.
d.Create new readStream(smallest offset) and use the above inferred schema to process the JSON using spark provided JSON support, like from_json, json_object and others and run my actuall business logic.

Now i am not sure how to be successful with step(b). Any help would be appreciated.
And would also like to know if there is any better approach.

Regards,
Satyajit.