kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Debraj Manna <subharaj.ma...@gmail.com>
Subject Re: Java Consumer Not reading message -
Date Fri, 16 Feb 2018 13:13:10 GMT
I have posted the same question in stackoverflow also. But I have not got
any reply there also

https://stackoverflow.com/questions/48826279/kafka-0-10-java-consumer-not-reading-message-from-topic

On Fri, Feb 16, 2018 at 5:23 PM, Debraj Manna <subharaj.manna@gmail.com>
wrote:

> I have a simple java producer like below
>
> public class Producer
> {
> private final static String TOPIC = "my-example-topi8";
>     private final static String BOOTSTRAP_SERVERS = "localhost:8092";
>
>     public static void main( String[] args ) throws Exception {
>         Producer<String, byte[]> producer = createProducer();
>         for(int i=0;i<3000;i++) {
>             String msg = "Test Message-" + i;
>             final ProducerRecord<String, byte[]> record = new
> ProducerRecord<String, byte[]>(TOPIC, "key" + i, msg.getBytes());
>             producer.send(record).get();
>             System.out.println("Sent message " + msg);
>         }
>         producer.close();
>     }
>
>     private static Producer<String, byte[]> createProducer() {
>         Properties props = new Properties();
>         props.put("metadata.broker.list", BOOTSTRAP_SERVERS);
>         props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
>         props.put("client.id", "AppFromJava");
>         props.put("serializer.class", "kafka.serializer.DefaultEncoder");
>         props.put("key.serializer.class", "kafka.serializer.
> StringEncoder");
>         props.put("key.serializer", "org.apache.kafka.common.
> serialization.StringSerializer");
>         props.put("compression.codec", "snappy");
>         props.put("value.serializer", "org.apache.kafka.common.
> serialization.ByteArraySerializer");
>         return new KafkaProducer<String, byte[]>(props);
>     }
> }
>
> I am trying to read data as below
>
> public class Consumer
> {
> private final static String TOPIC = "my-example-topi8";
>     private final static String BOOTSTRAP_SERVERS = "localhost:8092";
>
>     public static void main( String[] args ) throws Exception {
>         Consumer<String, byte[]> consumer = createConsumer();
>         start(consumer);
>     }
>
>     static void start(Consumer<String, byte[]> consumer) throws
> InterruptedException {
>         final int giveUp = 10;
>         int noRecordsCount = 0;
>         int stopCount = 1000;
>
>         while (true) {
>             final ConsumerRecords<String, byte[]> consumerRecords =
> consumer.poll(1000);
>             if (consumerRecords.count()==0) {
>                 noRecordsCount++;
>                 if (noRecordsCount > giveUp) break;
>                 else continue;
>             }
>
>
>             consumerRecords.forEach(record -> {
>                 System.out.printf("\nConsumer Record:(%s, %s, %s)",
> record.key(), new String(record.value()), record.topic());
>             });
>
>             consumer.commitSync();
>             break;
>         }
>         consumer.close();
>         System.out.println("DONE");
>     }
>
>     private static Consumer<String, byte[]> createConsumer() {
>     final Properties props = new Properties();
>         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
>                                     BOOTSTRAP_SERVERS);
>         props.put(ConsumerConfig.GROUP_ID_CONFIG,
>                                     "KafkaExampleConsumer");
>         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>                 StringDeserializer.class.getName());
>         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>                 ByteArrayDeserializer.class.getName());
>         props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234");
>         props.put("group.id", "test");
>         props.put("enable.auto.commit", "false");
>
>         // Create the consumer using props.
>         final Consumer<String, byte[]> consumer = new KafkaConsumer(props);
>         consumer.subscribe(Collections.singletonList(TOPIC));
>         return consumer;
>     }
> }
>
> But the consumer is not reading any message from kafka. If I add the below
> at the very start()
>
> consumer.poll(0);
>
> consumer.seekToBeginning(consumer.assignment());
>
>
> Then the consumer starts reading from the topic. But then each time the
> consumer is restarted it is reading message from the start of the topic
> which I don;t want. Can someone let me know what is going wrong and how can
> I fix this?
>
>
> Kafka Version 0.10
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message