kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ratha v <vijayara...@gmail.com>
Subject Re: If my producer producing, then why the consumer couldn't consume? it stuck @ poll()
Date Mon, 04 Apr 2016 06:23:06 GMT
when I list down the partition info ;

List<PartitionInfo> info = consumer.partitionsFor(topic);


it returns info like;

[Partition(topic = *MY_TOPIC*, partition = 0, leader = 1012, replicas =
[1012,], isr = [1012,]]


My communication with the broker looks like fine. I see more than 1000
messages for the topic using kafka tool (UI tool to view topics, consumer..)
But I could not figure out what blocks my consumer to poll() the message.
Anyone can figure out my issue?



On 4 April 2016 at 10:58, Ratha v <vijayaratha@gmail.com> wrote:

>
> Hi all;
> Im publishing to the remote kafka server and try to consume messages from
> that remote server. (Kafka v 0.90.1)
> Publishing works fine but nor the consuming.
>
> *Publisher*
>
> package org.test;
>
> *import java.io.IOException;*
> *import java.util.Properties;*
>
> *import org.apache.kafka.clients.producer.KafkaProducer;*
> *import org.apache.kafka.clients.producer.ProducerRecord;*
>
>
> *public class Producer {*
>
> * private void generateMessgaes() throws IOException {*
> * String topic = "MY_TOPIC";*
>
> * Properties props = new Properties();*
> * props.put("bootstrap.servers", "kafka.xx.com:9092
> <http://kafka.xx.com:9092>");*
> * props.put("acks", "all");*
> * props.put("retries", 0);*
> * props.put("batch.size", 16384);*
> * props.put("linger.ms <http://linger.ms>", 1);*
> * props.put("buffer.memory", 33554432);*
> * props.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");*
> * props.put("value.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");*
> * props.put("serializer.class",
> "org.apache.kafka.common.serialization.StringSerializer");*
>
>
> * KafkaProducer<String, String> producer = null;*
> * try {*
> * producer = new KafkaProducer<>(props);*
> * for (int i = 0; i < 10; i++) {*
> * producer.send(new ProducerRecord<String, String>(topic, "test msg"));*
> * System.out.println("producing---");*
> * }*
>
> * } catch (Throwable e) {*
> * e.printStackTrace();*
> * System.out.println("Error in publishing messages to the topic : " +
> topic);*
>
> * } finally {*
> * producer.close();*
> * }*
> * }*
>
> * public static void main(String[] args) throws IOException {*
> * Producer producer = new Producer();*
> * producer.generateMessgaes();*
> * System.out.println("$$$$$");*
> * }*
> *}*
>
>
> I can see "*producing--- and $$$$ *prints. But when i try to consume, i
> do not see "polling " print messages.. It got stuck at poll(timeout).
>
> Any clue? (I have asked this question before, asking again :()
>
> *Consumer*
>
> *package org.test;*
>
>
> *import java.util.Arrays;*
>
> *import java.util.List;*
>
> *import java.util.Properties;*
>
> *import org.apache.kafka.clients.consumer.ConsumerRecord;*
>
> *import org.apache.kafka.clients.consumer.ConsumerRecords;*
>
> *import org.apache.kafka.clients.consumer.KafkaConsumer;*
>
>
>
> *public class Listener {*
>
>
> * public void start() throws CoreException {*
>
>
> * String topic = "MY_TOPIC";*
>
>
> * List<String> topics = Arrays.asList(topic);*
>
>
> * Properties props = new Properties();*
>
> * props.put("bootstrap.servers", "kafka.xx.com:9092
> <http://kafka.xx.com:9092>");*
>
> * props.put("enable.auto.commit", true);*
>
> * props.put("receive.buffer.bytes", 262144);*
>
> * props.put("consumer.timeout.ms <http://consumer.timeout.ms>", 10000);*
>
> * props.put("session.timeout.ms <http://session.timeout.ms>", 7000);*
>
> * props.put("heartbeat.interval.ms <http://heartbeat.interval.ms>", 1000);*
>
> * props.put("auto.offset.reset", "earliest");*
>
> * props.put("group.id <http://group.id>", "test");*
>
> * props.put("fetch.min.bytes", 1);*
>
> * props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");*
>
> * props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");*
>
> * props.put("serializer.class",
> "org.apache.kafka.common.serialization.StringDeserializer");*
>
> * KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
> String>(props);*
>
> * consumer.subscribe(topics);*
>
>
> * try {*
>
> * while (true) {*
>
> * ConsumerRecords<String, String> records = consumer.poll(100);*
>
> * System.out.println("polling msges : " + records.count());*
>
> * for (ConsumerRecord<String, String> record : records) {*
>
> * System.out.println("kafka record : " + record.value());*
>
> * }*
>
> * }*
>
> * } catch (Throwable e) {*
>
> * e.printStackTrace();*
>
> * System.out.println("eror in polling");*
>
> * } finally {*
>
> * consumer.close();*
>
> * }*
>
> * }*
>
>
> * public static void main(String args[]) throws CoreException {*
>
>
> * Listener listener = new Listener();*
>
> * listener.start();*
>
>
> * }*
>
> *}*
>
> Thanks
> --
> -Ratha
> http://vvratha.blogspot.com/
>



-- 
-Ratha
http://vvratha.blogspot.com/

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message