Hi,
I have a problem in fetching messages from Kafka. I am using simple
consumer API in Java to fetch messages from kafka ( the same one which is
stated in Kafka introduction example). The problem is that after a while
(could be 30min or couple of hours), the consumer does not receive any
messages from Kafka, while the data exist there (while the streaming of
data to Kafka still running, so Kafka has inputs).
I can see that data exist in Kafka by just running the following command
and getting the list of messages exist in Kafka, Each message is around 80
bytes :
*bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
--from-beginning*
Any idea what could be the source of problem?? I also notices that if i
stress the input to kafka (sending 1000 messages per scond) for an hour or
more , the same situation happens again. ?? It seems that something is
wrong with fetching (consumer) part, right?
best,
/Shahab
The kafka is run in one machine, no clusters, replications,....etc, very
basic configuration.
The consumer config file is ;
"zookeeper.connect", myserver:2181);
"group.id", group1);
"zookeeper.session.timeout.ms", "400");
"zookeeper.sync.time.ms", "200");
"auto.commit.interval.ms", "1000");
"fetch.message.max.bytes", "1048576");
"auto.offset.reset", "smallest";
and the server.config looks like this:
Boker.id=0
port=9092
num.network.threads=5
num.io.threads=2
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=2
############################# Log Flush Policy #############################
log.flush.interval.messages=1000
# The maximum amount of time a message can sit in a log before we force a
flush
log.flush.interval.ms=1000
############################ Log Retention Policy
#############################
# The minimum age of a log file to be eligible for deletion
log.retention.hours=1
log.retention.bytes=10485760
# The maximum size of a log segment file. When this size is reached a new
log segment will be created.
log.segment.bytes=536870912
# The interval at which log segments are checked to see if they can be
deleted according
# to the retention policies
log.cleanup.interval.mins=1
ookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=1000000
|