Spark Structured Streaming - DF shows only one column with list of byte array

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Spark Structured Streaming - DF shows only one column with list of byte array

salemi
This post was updated on .
Hi All,

I am trying to read messages from Kafka, deserialize the values using Avro
and then convert the JSON content  to a DF.
I would like to see a dataframe like the following for  a Kafka message
value like  {"a": "1" , "b": "1"}:
+---------------+
|a        | b       |
+---------------+
|1       | 2        |
+---------------+

I have written the following code which reads messages from a Kafka topic.
Then the messages are deserialized with confluent AVRO. Next,  the code
coverts the Avro Records to JSON format and finally the code converts the
JSON messages to Map Objects. The output of this code is just a column
"value" with a list of Byte arrays see output below.

How should I change this code to Map each field, value of the JSON object to
a column of the DF.

Thank you,
Ali


Code:

  private val cachedSchemaRegistryClient = new
CachedSchemaRegistryClient(SCHEMA_REGISTRY_URL, 20)
  private val kafkaAvroDeserializer = new
KafkaAvroDeserializer(cachedSchemaRegistryClient)
  implicit val mapEncoder = Encoders.kryo[Map[String, Any]]

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[2]")
      .appName("Spark Consumption")
      .getOrCreate()

    val kafkaDf = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BROKERS)
      .option("subscribe", TOPIC_ID)
      //      .option("checkpointLocation", "s3://checkpoint")
      .load()

    val messageDf = kafkaDf
      .map(record =>{
            val avroMsg = record.get(1).asInstanceOf[Array[Byte]]
            val msgObj = kafkaAvroDeserializer.deserialize("topic",
avroMsg).asInstanceOf[GenericData.Record ]
            val result = JSON.parseFull(msgObj.toString)
            result match {
              // Matches if jsonStr is valid JSON and represents a Map of
Strings to Any
              case Some(map: Map[String, Any]) => {
                map
              }
              case None => null
            }
      })
      .writeStream
      .format("console") // <-- use ConsoleSink
      .option("truncate", "false")
      .option("numRows", 10)
      .start.awaitTermination



Output:

+-------------------------------------------------------------------------------------------------------------------------+
|value
|
+-------------------------------------------------------------------------------------------------------------------------+
|[374 69 6E 61 74 69 669 66 64 3A 20 53 65 61 72 63 68 20 45 6E 67 69 6E 65
73 20 26 20 50 6F 72 74 ]|
|[37 01 20 40 01 03 01 64 65 76 69 63 65 5F 6E 61 6D E5 00 40 01 03 01 66 6C
6F 77 5F 73 74 61 72 74 ]|
|...                                                                                                                                                                                                                                                                            
+-------------------------------------------------------------------------------------------------------------------------+
only showing top 10 rows



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org