Spark not able to read from an Embedded Kafka Topic

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

Spark not able to read from an Embedded Kafka Topic

Something Something

I am trying to write an integration test using Embedded Kafka but I keep getting NullPointerException. My test case is very simple. It has following steps:

  1. Read a JSON file & write messages to an inputTopic.
  2. Perform a 'readStream' operation.
  3. Do a 'select' on the Stream. This throws a NullPointerException.

What am I doing wrong? Code is given below:

"My Test which runs with Embedded Kafka" should "Generate correct Result" in {

    implicit val config: EmbeddedKafkaConfig =
        kafkaPort = 9066,
        zooKeeperPort = 2066,
        Map("log.dir" -> "./src/test/resources/")

    withRunningKafka {
      val source = Source.fromFile("src/test/resources/test1.json")
        line => publishStringMessageToKafka(inputTopic, line)
      implicit val deserializer: StringDeserializer = new StringDeserializer

      import spark2.implicits._

      val schema ="my.json").schema
      val myStream = spark
        .option("kafka.bootstrap.servers", "localhost:9066")
        .option("subscribe", inputTopic)

      // Schema looks good

      // Following line throws NULLPointerException! Why?
      val df =$"value".cast("string"), schema).alias("value"))

      // There's more code... but let's not worry about that for now.