kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Something Something <mailinglist...@gmail.com>
Subject Writing Integration test with Embedded Kafka
Date Sat, 07 Mar 2020 01:19:02 GMT
I am trying to write an integration test using Embedded Kafka but I
keep getting NullPointerException. My test case is very simple. It has
following steps:

   1. Read a JSON file & write messages to an inputTopic.
   2. Perform a 'readStream' operation.
   3. Do a 'select' on the Stream. This throws a NullPointerException.

What am I doing wrong? Code is given below:

"My Test which runs with Embedded Kafka" should "Generate correct Result" in {

    implicit val config: EmbeddedKafkaConfig =
      EmbeddedKafkaConfig(
        kafkaPort = 9066,
        zooKeeperPort = 2066,
        Map("log.dir" -> "./src/test/resources/")
      )

    withRunningKafka {
      createCustomTopic(inputTopic)
      val source = Source.fromFile("src/test/resources/test1.json")
      source.getLines.toList.filterNot(_.isEmpty).foreach(
        line => publishStringMessageToKafka(inputTopic, line)
      )
      source.close()
      implicit val deserializer: StringDeserializer = new StringDeserializer

      createCustomTopic(outputTopic)
      import spark2.implicits._

      val schema = spark.read.json("my.json").schema
      val myStream = spark
        .readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9066")
        .option("subscribe", inputTopic)
        .load()

      // Schema looks good
      myStream.printSchema()

      // Following line throws NULLPointerException! Why?
      val df = myStream.select(from_json($"value".cast("string"),
schema).alias("value"))

      // There's more code... but let's not worry about that for now.
    }

  }

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message