to_avro/from_avro inserts extra values from Kafka

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

to_avro/from_avro inserts extra values from Kafka

Alex Nastetsky
Hi all,

I create a dataframe, convert it to Avro with to_avro and write it to Kafka. 
Then I read it back out with from_avro.
(Not using Schema Registry.)
The problem is that the values skip every other field in the result.

I expect:
+---------+--------+-----+-------+
|firstName|lastName|color|   mood|
+---------+--------+-----+-------+
|     Suzy    |  Samson   |  indigo   |  grim |
|     Jim    |   Johnson  |   blue  | grimmer |
+---------+--------+-----+-------+

Instead I get:

+---------+--------+-----+-------+
|firstName|lastName|color|   mood|
+---------+--------+-----+-------+
|         |    Suzy|     | Samson|
|         |     Jim|     |Johnson|
+---------+--------+-----+-------+

Here's what I'm doing --

$ kt admin -createtopic persons-avro-spark9 -topicdetail <(jsonify =NumPartitions 1 =ReplicationFactor 1)

$ cat person.avsc
{
  "type": "record",
  "name": "Person",
  "namespace": "com.ippontech.kafkatutorials",
  "fields": [
    {
      "name": "firstName",
      "type": "string"
    },
    {
      "name": "lastName",
      "type": "string"
    },
    {
      "name": "color",
      "type": "string"
    },
    {
      "name": "mood",
      "type": "string"
    }
  ]

$ spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.5,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.5

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.avro._
import java.nio.file.Files;
import java.nio.file.Paths;

val topic = "persons-avro-spark9"


// `from_avro` requires Avro schema in JSON string format.
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("person.avsc")))


val personDF = sc.parallelize(Seq(
    ("Jim","Johnson","indigo","grim"),
    ("Suzy","Samson","blue","grimmer")
)).toDF("firstName","lastName","color","mood")

personDF.select(to_avro(struct(personDF.columns.map(column):_*)).alias("value"))
.write
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic",topic)
.option("avroSchema",jsonFormatSchema)
.save()

val stream = spark
    .readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", topic)
    .option("startingOffsets", "earliest")
    .load()
    .select(from_avro('value, jsonFormatSchema) as 'person)
    .select($"person.firstName",$"person.lastName",$"person.color",$"person.mood")
    .writeStream
    .format("console")
    .start()

// Exiting paste mode, now interpreting.

import org.apache.spark.sql.avro._                                              
import java.nio.file.Files
import java.nio.file.Paths
topic: String = persons-avro-spark9
jsonFormatSchema: String =
{
  "type": "record",
  "name": "Person",
  "namespace": "com.ippontech.kafkatutorials",
  "fields": [
    {
      "name": "firstName",
      "type": "string"
    },
    {
      "name": "lastName",
      "type": "string"
    },
    {
      "name": "color",
      "type": "string"
    },
    {
      "name": "mood",
      "type": "string"
    }
  ]
}
personDF: org.apache.spark.sql.DataFrame = [firstName: string, lastName: string ... 2 more fields]
stream: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@3990c36c

scala> -------------------------------------------
Batch: 0
-------------------------------------------
+---------+--------+-----+-------+
|firstName|lastName|color|   mood|
+---------+--------+-----+-------+
|         |    Suzy|     | Samson|
|         |     Jim|     |Johnson|
+---------+--------+-----+-------+

See the raw bytes:

$ kt consume -topic persons-avro-spark9
{
  "partition": 0,
  "offset": 0,
  "key": null,
  "value": "\u0000\u0008Suzy\u0000\u000cSamson\u0000\u0008blue\u0000\u000egrimmer",
  "timestamp": "2020-05-12T17:18:53.858-04:00"
}
{
  "partition": 0,
  "offset": 1,
  "key": null,
  "value": "\u0000\u0006Jim\u0000\u000eJohnson\u0000\u000cindigo\u0000\u0008grim",
  "timestamp": "2020-05-12T17:18:53.859-04:00"
}

Thanks,
Alex.