kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Something Something <mailinglist...@gmail.com>
Subject Writing Integration test with Embedded Kafka
Date Sat, 07 Mar 2020 01:19:02 GMT
I am trying to write an integration test using Embedded Kafka but I
keep getting NullPointerException. My test case is very simple. It has
following steps:

   1. Read a JSON file & write messages to an inputTopic.
   2. Perform a 'readStream' operation.
   3. Do a 'select' on the Stream. This throws a NullPointerException.

What am I doing wrong? Code is given below:

"My Test which runs with Embedded Kafka" should "Generate correct Result" in {

    implicit val config: EmbeddedKafkaConfig =
        kafkaPort = 9066,
        zooKeeperPort = 2066,
        Map("log.dir" -> "./src/test/resources/")

    withRunningKafka {
      val source = Source.fromFile("src/test/resources/test1.json")
        line => publishStringMessageToKafka(inputTopic, line)
      implicit val deserializer: StringDeserializer = new StringDeserializer

      import spark2.implicits._

      val schema = spark.read.json("my.json").schema
      val myStream = spark
        .option("kafka.bootstrap.servers", "localhost:9066")
        .option("subscribe", inputTopic)

      // Schema looks good

      // Following line throws NULLPointerException! Why?
      val df = myStream.select(from_json($"value".cast("string"),

      // There's more code... but let's not worry about that for now.


  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message