kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sachin Nikumbh <saniku...@yahoo.com.INVALID>
Subject Unable to consume messages
Date Mon, 08 May 2017 03:55:33 GMT
Hi all,
I am relatively new to kafka and my initial attempts at consuming messages are failing. My
topic has 3 partitions and I am setting "auto.offset.reset" to "earliest". The call to poll
hangs. Here's my code: 
***********************************************************************************import
org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.TopicPartition;import java.util.*;
public class TestConsumer {
    public static void main(String[] args){
        Properties props = new Properties();        props.put("bootstrap.servers",
"localhost:9092");        props.put("group.id", "mytest5");        props.put("enable.auto.commit",
"false");        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
      props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
      props.put("auto.offset.reset", "earliest");        props.put("max.poll.records",
"50");        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
      consumer.subscribe(Arrays.asList("mpsFleet4"), new RebalanceHandler(consumer)); 
      while (true) {            ConsumerRecords<String, String> records = consumer.poll(120000); 
          for (ConsumerRecord<String, String> record : records)           
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(),
record.value());        }    }}
class RebalanceHandler implements ConsumerRebalanceListener {
    Consumer<String, String> consumer;
    RebalanceHandler(Consumer<String, String> consumer){        this.consumer
= consumer;    }
    @Override    public void onPartitionsRevoked(Collection<TopicPartition> partitions)
{        System.out.println("Partitions revoked");    }    @Override    public void
onPartitionsAssigned(Collection<TopicPartition> partitions) {        System.out.println("Partitions
assigned");        //consumer.seekToBeginning(partitions);    }}
***********************************************************************************

When I use ConsumerGroupCommand to check the offsets, I see the following:

***********************************************************************************
GROUP      TOPIC         PARTITION     CURRENT-OFFSET     LOG-END-OFFSET    
   LAG                      OWNERmytest5     mpsFleet4             0
                   unknown                          397184      
         unknown         consumer-1_/172.28.11.138mytest5     mpsFleet4      
      1                    unknown                          650207
               unknown         consumer-1_/172.28.11.138mytest5     mpsFleet4
            2                    unknown                      
   451783                unknown         consumer-1_/172.28.11.138***********************************************************************************

If I set "enable.auto.commit" to "true", the call to poll still hangs but all the values for
CURRENT-OFFSET are same as LOG-END-OFFSET.
I am really not able to understand what's going on and would really appreciate any help. I
have even tried playing with explicitly seeking the partition offsets in my rebalance listener.
ThanksSachin
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message