flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chauvet, Thomas" <Thomas.Chau...@ercom.fr>
Subject Flink / Kafka unit testing with scalatest-embedded-kafka
Date Thu, 19 Apr 2018 13:23:43 GMT
Hi,

I would like to < unit test > a job flink with Kafka as source (and Sink). I am trying
to use the library scalatest-embedded-kafka to simulate a Kafka for my test.

For example, I would like to get data (string stream) from Kafka, convert it intro uppercase
and put it into another topic.

Now, I am just trying to use Flink's kafka consumer to read into a topic (with embedded kafka).

Here is the code for example :

```scala

import java.util.Properties

import org.apache.flink.streaming.api.scala._
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.core.fs.FileSystem.WriteMode
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
import org.scalatest.{Matchers, WordSpec}
import scala.util.Random

object SimpleFlinkKafkaTest {
  SimpleFlinkKafkaTest
  val kafkaPort = 9092
  val zooKeeperPort = 2181

  val groupId = Random.nextInt(1000000).toString
  val props = new Properties()
  props.put("bootstrap.servers", "localhost:9092")
  props.put("zookeeper.connect", "localhost:2181")
  props.put("auto.offset.reset", "earliest")
  props.put("group.id", groupId)
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")

  val propsMap = Map(
   "bootstrap.servers" -> "localhost:9092",
    "zookeeper.connect" -> "localhost:2181",
    "auto.offset.reset" -> "earliest",
    "group.id" -> groupId,
    "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
    "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
  )

  val inputString = "mystring"
  val expectedString = "MYSTRING"

}

class SimpleFlinkKafkaTest extends WordSpec with Matchers with EmbeddedKafka {

  "runs with embedded kafka" should {

    "work" in {

      implicit val config = EmbeddedKafkaConfig(
        kafkaPort = SimpleFlinkKafkaTest.kafkaPort,
        zooKeeperPort = SimpleFlinkKafkaTest.zooKeeperPort,
        customConsumerProperties = SimpleFlinkKafkaTest.propsMap
      )

      withRunningKafka {

        publishStringMessageToKafka("input-topic", SimpleFlinkKafkaTest.inputString)

        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)

        val kafkaConsumer = new FlinkKafkaConsumer011(
          "input-topic",
          new SimpleStringSchema,
          SimpleFlinkKafkaTest.props
        )

        val inputStream = env.addSource(kafkaConsumer)

        val outputStream = inputStream.map { msg =>
          msg.toUpperCase
        }

        outputStream.writeAsText("test.csv", WriteMode.OVERWRITE)

        env.execute()

        consumeFirstStringMessageFrom("output-topic") shouldEqual SimpleFlinkKafkaTest.expectedString

      }
    }
  }
}
```

The flink process si running but nothing happen. I try ot write into a text file to see any
output but there is nothing into the file.

Any idea ? Does anybody use this library to test a Flink Job using Kafka ?

Thanks in advance,

Thomas

Mime
View raw message