kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "DuanSky" <skysubscr...@qq.com>
Subject kafka lost data when use scala API to send data.
Date Fri, 24 Jun 2016 10:56:37 GMT
Hello With Respect,
  Here I met a problem when use scala API to send/receive data to/from kafka brokers. I write
a very simple producer and consumer code(just like the official examples), I found the code
with Java API can work correctly, but the code with Scala API may lost data. Here is details.

Config: I down load kafka_2.11- binary files and start it on single mode. Just
one broker and one zookeeper, use default configuration.

(1)Java API Test 
  I write a simple consumer and producer program with Java API first. The producer code is
like this
code A
void produce() {
    int messageNo = 1;
    while (messageNo <= Config.count) {
        for (String topic : KafkaConfig.topics.split(",")) {
            String key = String.valueOf(messageNo);
            String data = topic + "-" + new Date();
            producer.send(new KeyedMessage<String, String>(topic, key ,data));
            System.out.println(topic + "#" + key + "#" + data);
        messageNo ++;
The consumer code is like this:
code B
void consume() {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

    StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
    StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

    for (String topic : Config.topics.split(",")) {
        topicCountMap.put(topic, new Integer(1));

    final Map<String, List<KafkaStream<String, String>>> consumerMap =

    for (final String topic : Config.topics.split(",")) {
        new Thread(new Runnable() {
            public void run() {
                int count = 0;
                KafkaStream<String, String> stream = consumerMap.get(topic).get(0);
                ConsumerIterator<String, String> it = stream.iterator();
                while (it.hasNext()) {
                    count ++;
                    MessageAndMetadata<String,String> message = it.next();
                    System.out.println(count + "#"  + message.topic() +":" + message.key()
+ ":"+message.message());


As I change the number of Config.count (which is the total number of every topic, here I use
two topic -- a and b) I found that the consumer will receive the same number data no matter
what the count is. So Java API is correct, but when I do the same thing using Scala API, I
found some data may lost when send to the kafka brokers.

(2) Kafka API Test
  I write a simple producer program with Scala API, part of it like this
code C
def main(args:Array[String]): Unit ={
  val producer = {
    // Zookeeper connection properties
    val props = new util.HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Config.brokers)

    new KafkaProducer[String, String](props)

  (1 to Config.count).foreach(key => {
    Config.topics.split(",").foreach( topic =>{
      val data = topic+new Date().toString
      val message = new ProducerRecord[String,String](topic, key+"", data)
      System.out.println(topic + "#" + key + "#" + data)

(1) First I start a consumer node waiting for receiving data;(use code B above)
(2) Then I start a producer node to produce data. (use code C above) 

 I found that when I start code C, the producer produce data very fast that sending 100 messages
takes no more than 1 second. While the Producer with Java API(code A above) cannot send data
so fast; the consumer(code B) can only receive 34 topics a and 34 topics b then stand by,
in fact I produce 100 messages. I have changed the count of the message that I send to Kafka,
but no matter how many data I want to send, the producer can only receive half then I send
sometimes less. More messages I send then more data I will lost.[lost means I can not send
them successfully to brokers or I send successfully but cannot receive? In fact what I saw
is I received part of the data and will not consume any more] 
  I found that the producer of Scala API send data too fast, so I add Thread.sleep(time) after
send a single data. I found I do works! when I set the time = 100, I found It rearly lost
data. But sometimes I still cannot receive the same count of data I send.

If I use the scala API wrong? Or something else...
I offered my codes in attachment. Looking forward your reply and advice.
  • Unnamed multipart/mixed (inline, 8-Bit, 0 bytes)
View raw message