kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Debraj Manna <subharaj.ma...@gmail.com>
Subject Re: Java Consumer Not reading message -
Date Mon, 19 Feb 2018 03:32:31 GMT
Thanks Matthias for replying.

 The answer has been discussed the stackoverflow link which I have posted
in the question.

On 16-Feb-2018 11:35 PM, "Matthias J. Sax" <matthias@confluent.io> wrote:

Can you check the committed offsets using bin/kafka-consumer-group.sh ?

Also inspect your consumer's position via KafkaConsumer#position() to
see where the consumer actually is in the topic.


-Matthias


On 2/16/18 5:13 AM, Debraj Manna wrote:
> I have posted the same question in stackoverflow also. But I have not got
> any reply there also
>
> https://stackoverflow.com/questions/48826279/kafka-0-10-
java-consumer-not-reading-message-from-topic
>
> On Fri, Feb 16, 2018 at 5:23 PM, Debraj Manna <subharaj.manna@gmail.com>
> wrote:
>
>> I have a simple java producer like below
>>
>> public class Producer
>> {
>> private final static String TOPIC = "my-example-topi8";
>>     private final static String BOOTSTRAP_SERVERS = "localhost:8092";
>>
>>     public static void main( String[] args ) throws Exception {
>>         Producer<String, byte[]> producer = createProducer();
>>         for(int i=0;i<3000;i++) {
>>             String msg = "Test Message-" + i;
>>             final ProducerRecord<String, byte[]> record = new
>> ProducerRecord<String, byte[]>(TOPIC, "key" + i, msg.getBytes());
>>             producer.send(record).get();
>>             System.out.println("Sent message " + msg);
>>         }
>>         producer.close();
>>     }
>>
>>     private static Producer<String, byte[]> createProducer() {
>>         Properties props = new Properties();
>>         props.put("metadata.broker.list", BOOTSTRAP_SERVERS);
>>         props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
>>         props.put("client.id", "AppFromJava");
>>         props.put("serializer.class", "kafka.serializer.DefaultEncoder");
>>         props.put("key.serializer.class", "kafka.serializer.
>> StringEncoder");
>>         props.put("key.serializer", "org.apache.kafka.common.
>> serialization.StringSerializer");
>>         props.put("compression.codec", "snappy");
>>         props.put("value.serializer", "org.apache.kafka.common.
>> serialization.ByteArraySerializer");
>>         return new KafkaProducer<String, byte[]>(props);
>>     }
>> }
>>
>> I am trying to read data as below
>>
>> public class Consumer
>> {
>> private final static String TOPIC = "my-example-topi8";
>>     private final static String BOOTSTRAP_SERVERS = "localhost:8092";
>>
>>     public static void main( String[] args ) throws Exception {
>>         Consumer<String, byte[]> consumer = createConsumer();
>>         start(consumer);
>>     }
>>
>>     static void start(Consumer<String, byte[]> consumer) throws
>> InterruptedException {
>>         final int giveUp = 10;
>>         int noRecordsCount = 0;
>>         int stopCount = 1000;
>>
>>         while (true) {
>>             final ConsumerRecords<String, byte[]> consumerRecords =
>> consumer.poll(1000);
>>             if (consumerRecords.count()==0) {
>>                 noRecordsCount++;
>>                 if (noRecordsCount > giveUp) break;
>>                 else continue;
>>             }
>>
>>
>>             consumerRecords.forEach(record -> {
>>                 System.out.printf("\nConsumer Record:(%s, %s, %s)",
>> record.key(), new String(record.value()), record.topic());
>>             });
>>
>>             consumer.commitSync();
>>             break;
>>         }
>>         consumer.close();
>>         System.out.println("DONE");
>>     }
>>
>>     private static Consumer<String, byte[]> createConsumer() {
>>     final Properties props = new Properties();
>>         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
>>                                     BOOTSTRAP_SERVERS);
>>         props.put(ConsumerConfig.GROUP_ID_CONFIG,
>>                                     "KafkaExampleConsumer");
>>         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>>                 StringDeserializer.class.getName());
>>         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>>                 ByteArrayDeserializer.class.getName());
>>         props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234");
>>         props.put("group.id", "test");
>>         props.put("enable.auto.commit", "false");
>>
>>         // Create the consumer using props.
>>         final Consumer<String, byte[]> consumer = new
KafkaConsumer(props);
>>         consumer.subscribe(Collections.singletonList(TOPIC));
>>         return consumer;
>>     }
>> }
>>
>> But the consumer is not reading any message from kafka. If I add the
below
>> at the very start()
>>
>> consumer.poll(0);
>>
>> consumer.seekToBeginning(consumer.assignment());
>>
>>
>> Then the consumer starts reading from the topic. But then each time the
>> consumer is restarted it is reading message from the start of the topic
>> which I don;t want. Can someone let me know what is going wrong and how
can
>> I fix this?
>>
>>
>> Kafka Version 0.10
>>
>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message