flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Yu <yuzhih...@gmail.com>
Subject Re: Flink / Kafka unit testing with scalatest-embedded-kafka
Date Thu, 19 Apr 2018 13:41:48 GMT
Pardon - I missed the implicit config (which is used by withRunningKafka).

Without your manual message production, was there any indication in broker
log that it received message(s) ?

Thanks

On Thu, Apr 19, 2018 at 6:31 AM, Chauvet, Thomas <Thomas.Chauvet@ercom.fr>
wrote:

> Hi,
>
>
>
> withRunningKafka launch a kafka broker. This is one of the advantage of
> this library.
>
>
>
> I test to consume / produce messages with kafka command line, and it seems
> alright.
>
>
>
> Thanks
>
>
>
> *De :* Ted Yu [mailto:yuzhihong@gmail.com]
> *Envoyé :* jeudi 19 avril 2018 15:28
> *À :* Chauvet, Thomas <Thomas.Chauvet@ercom.fr>
> *Objet :* Re: Flink / Kafka unit testing with scalatest-embedded-kafka
>
>
>
> Looking at your code, Kafka broker was not started.
>
>
>
> Was there running broker on localhost ?
>
>
>
> Cheers
>
>
>
> On Thu, Apr 19, 2018 at 6:23 AM, Chauvet, Thomas <Thomas.Chauvet@ercom.fr>
> wrote:
>
> Hi,
>
>
>
> I would like to « unit test » a job flink with Kafka as source (and Sink).
> I am trying to use the library scalatest-embedded-kafka to simulate a Kafka
> for my test.
>
>
>
> For example, I would like to get data (string stream) from Kafka, convert
> it intro uppercase and put it into another topic.
>
>
>
> Now, I am just trying to use Flink’s kafka consumer to read into a topic
> (with embedded kafka).
>
>
>
> Here is the code for example :
>
>
>
> ```scala
>
>
>
> import java.util.Properties
>
>
>
> import org.apache.flink.streaming.api.scala._
>
> import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
>
> import org.apache.flink.api.common.serialization.SimpleStringSchema
>
> import org.apache.flink.core.fs.FileSystem.WriteMode
>
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
>
> import org.scalatest.{Matchers, WordSpec}
>
> import scala.util.Random
>
>
>
> object SimpleFlinkKafkaTest {
>
>   SimpleFlinkKafkaTest
>
>   val kafkaPort = 9092
>
>   val zooKeeperPort = 2181
>
>
>
>   val groupId = Random.nextInt(1000000).toString
>
>   val props = new Properties()
>
>   props.put("bootstrap.servers", "localhost:9092")
>
>   props.put("zookeeper.connect", "localhost:2181")
>
>   props.put("auto.offset.reset", "earliest")
>
>   props.put("group.id", groupId)
>
>   props.put("key.deserializer", "org.apache.kafka.common.serialization.
> StringDeserializer")
>
>   props.put("value.deserializer", "org.apache.kafka.common.serialization.
> StringDeserializer")
>
>
>
>   val propsMap = Map(
>
>    "bootstrap.servers" -> "localhost:9092",
>
>     "zookeeper.connect" -> "localhost:2181",
>
>     "auto.offset.reset" -> "earliest",
>
>     "group.id" -> groupId,
>
>     "key.deserializer" -> "org.apache.kafka.common.serialization.
> StringDeserializer",
>
>     "value.deserializer" -> "org.apache.kafka.common.serialization.
> StringDeserializer"
>
>   )
>
>
>
>   val inputString = "mystring"
>
>   val expectedString = "MYSTRING"
>
>
>
> }
>
>
>
> class SimpleFlinkKafkaTest extends WordSpec with Matchers with
> EmbeddedKafka {
>
>
>
>   "runs with embedded kafka" should {
>
>
>
>     "work" in {
>
>
>
>       implicit val config = EmbeddedKafkaConfig(
>
>         kafkaPort = SimpleFlinkKafkaTest.kafkaPort,
>
>         zooKeeperPort = SimpleFlinkKafkaTest.zooKeeperPort,
>
>         customConsumerProperties = SimpleFlinkKafkaTest.propsMap
>
>       )
>
>
>
>       withRunningKafka {
>
>
>
>         publishStringMessageToKafka("input-topic", SimpleFlinkKafkaTest.
> inputString)
>
>
>
>         val env = StreamExecutionEnvironment.getExecutionEnvironment
>
>         env.setParallelism(1)
>
>
>
>         val kafkaConsumer = new FlinkKafkaConsumer011(
>
>           "input-topic",
>
>           new SimpleStringSchema,
>
>           SimpleFlinkKafkaTest.props
>
>         )
>
>
>
>         val inputStream = env.addSource(kafkaConsumer)
>
>
>
>         val outputStream = inputStream.map { msg =>
>
>           msg.toUpperCase
>
>         }
>
>
>
>         outputStream.writeAsText("test.csv", WriteMode.OVERWRITE)
>
>
>
>         env.execute()
>
>
>
>         consumeFirstStringMessageFrom("output-topic") shouldEqual
> SimpleFlinkKafkaTest.expectedString
>
>
>
>       }
>
>     }
>
>   }
>
> }
>
> ```
>
>
>
> The flink process si running but nothing happen. I try ot write into a
> text file to see any output but there is nothing into the file.
>
>
>
> Any idea ? Does anybody use this library to test a Flink Job using Kafka ?
>
>
>
> Thanks in advance,
>
>
>
> Thomas
>
>
>

Mime
View raw message