kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Li Li <fancye...@gmail.com>
Subject Re: high level consumer not working
Date Tue, 24 Jun 2014 01:09:23 GMT
no luck by adding props.put("auto.offset.reset", "smallest");
but running consumer after producer works.
But in my use case, it's not alwasy true for this.
Another question, The consumer should remember the offset. it's not
very easy to use.

On Mon, Jun 23, 2014 at 11:05 PM, Guozhang Wang <wangguoz@gmail.com> wrote:
> Did you start the consumer after the producer? The default behavior of the
> consumer is to "consume from the tail of the log", and hence if there is no
> new messages coming in after the consumer started, it will get nothing. You
> may set
>
> auto.offset.reset="smallest"
>
> and try again.
>
> Guozhang
>
>
> On Sun, Jun 22, 2014 at 9:10 PM, Li Li <fancyerii@gmail.com> wrote:
>
>> hi all,
>>    I am reading the book "apache kafka" and write a simple producer
>> and consumer class. the producer works but the consumer hangs.
>>    The producer class:
>> public static void main(String[] args) {
>>     String topic="test-topic";
>>     Properties props = new Properties();
>>     props.put("metadata.broker.list","linux157:9092");
>>     props.put("serializer.class","kafka.serializer.StringEncoder");
>>     props.put("request.required.acks", "1");
>>     ProducerConfig config = new ProducerConfig(props);
>>     Producer<Integer, String> producer = new
>> Producer<Integer,String>(config);
>>     for(int i=0;i<100;i++){
>>         KeyedMessage<Integer, String> data = new
>> KeyedMessage<Integer,String>(topic, "msg"+i);
>>         producer.send(data);
>>     }
>>     producer.close();
>>
>> }
>>
>> public class TestKafkaConsumer {
>> private final ConsumerConnector consumer;
>> private final String topic;
>>
>> public TestKafkaConsumer(String zookeeper, String groupId, String topic) {
>> Properties props = new Properties();
>> props.put("zookeeper.connect", zookeeper);
>> props.put("group.id", groupId);
>> props.put("zookeeper.session.timeout.ms", "500");
>> props.put("zookeeper.sync.time.ms", "250");
>> props.put("auto.commit.interval.ms", "1000");
>> consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(
>> props));
>> this.topic = topic;
>> }
>>
>> public void testConsumer() {
>>     Map<String, Integer> topicCount = new HashMap<String, Integer>();
>>     // Define single thread for topic
>>     topicCount.put(topic, new Integer(1));
>>     Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams
=
>> consumer
>>              .createMessageStreams(topicCount);
>>     List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
>>     for (final KafkaStream stream : streams) {
>>         ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
>>         while (consumerIte.hasNext())
>>             System.out.println("Message from Single Topic :: "
>>                        + new String(consumerIte.next().message()));
>>         }
>>     if (consumer != null)
>>         consumer.shutdown();
>> }
>>
>> public static void main(String[] args) {
>>     String topic = "test-topic";
>>     TestKafkaConsumer simpleHLConsumer = new
>> TestKafkaConsumer("linux157:2181",testgroup22", topic);
>>     simpleHLConsumer.testConsumer();
>>
>> }
>>
>> }
>>
>
>
>
> --
> -- Guozhang

Mime
View raw message