kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Kafka Consumer does not receive any message after a while
Date Wed, 11 Dec 2013 16:16:20 GMT
Have you looked at
https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Myconsumerseemstohavestopped%2Cwhy%3F?

Thanks,

Jun


On Wed, Dec 11, 2013 at 3:59 AM, shahab <shahab.mokari@gmail.com> wrote:

> Hi,
>
> I have a problem in fetching messages from Kafka. I am using  simple
> consumer API in Java to fetch messages from kafka ( the same one which is
> stated in Kafka introduction example).  The problem is that after a while
> (could be 30min or couple of hours), the consumer does not receive any
> messages from Kafka, while the data exist there (while the streaming of
> data to Kafka still running, so Kafka has inputs).
> I can see that data exist in Kafka by just running  the following command
> and getting the list of messages exist in Kafka, Each message is around 80
> bytes :
>
> *bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
> --from-beginning*
>
>
> Any idea what could be the source of problem?? I also notices that if i
> stress the input to kafka (sending 1000 messages per scond) for an hour or
> more , the same situation happens again. ?? It seems that something is
> wrong with fetching (consumer) part, right?
>
> best,
> /Shahab
>
>
>
> The kafka is run in one machine, no clusters, replications,....etc, very
> basic configuration.
>
> The consumer config file is ;
>
> "zookeeper.connect", myserver:2181);
> "group.id", group1);
> "zookeeper.session.timeout.ms", "400");
> "zookeeper.sync.time.ms", "200");
> "auto.commit.interval.ms", "1000");
> "fetch.message.max.bytes", "1048576");
> "auto.offset.reset", "smallest";
>
>
>
>
> and the server.config looks like this:
>
> Boker.id=0
> port=9092
> num.network.threads=5
> num.io.threads=2
> socket.send.buffer.bytes=1048576
>
> socket.receive.buffer.bytes=1048576
>
> socket.request.max.bytes=104857600
>
> log.dirs=/tmp/kafka-logs
>
> num.partitions=2
>
> ############################# Log Flush Policy
> #############################
> log.flush.interval.messages=1000
>
> # The maximum amount of time a message can sit in a log before we force a
> flush
> log.flush.interval.ms=1000
>
> ############################ Log Retention Policy
> #############################
>
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=1
>
>
> log.retention.bytes=10485760
>
> # The maximum size of a log segment file. When this size is reached a new
> log segment will be created.
> log.segment.bytes=536870912
>
> # The interval at which log segments are checked to see if they can be
> deleted according
> # to the retention policies
> log.cleanup.interval.mins=1
> ookeeper.connect=localhost:2181
>
> # Timeout in ms for connecting to zookeeper
> zookeeper.connection.timeout.ms=1000000
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message