kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ajeet singh <ajeetpr.si...@gmail.com>
Subject CommitOffsets for Simple consumer is returning EOF Exception
Date Tue, 27 Jan 2015 11:31:59 GMT
Hi ,

I am new to Kafka, I have a use case in which My Consumer can't use auto
commit offset feature, I have to go with option of manual Commit. With High
level Consumer I have have constrain that consumer can commit only current
offset, but in my case I will be committing some previous off-set value.

So only possible solution seems like to use Simple Consumer.  This is how I
am using Simple Consumer for Commit offset :

TopicAndPartition topicAndPartition = new
TopicAndPartition(topic,partition);
Map<TopicAndPartition, OffsetMetadataAndError> requestInfo = new
HashMap<TopicAndPartition, OffsetMetadataAndError>();
requestInfo.put(topicAndPartition, new
OffsetMetadataAndError(0L,"no_metadata", (short) 0));
kafka.javaapi.OffsetCommitRequest request = new
kafka.javaapi.OffsetCommitRequest("test",
requestInfo1,kafka.api.OffsetRequest.CurrentVersion(), 0, clientName);
kafka.javaapi.OffsetCommitResponse response =
consumer.commitOffsets(request);


I am getting EOFException

Oops:java.io.EOFException: Received -1 when reading from channel, socket
has likely been closed.
java.io.EOFException: Received -1 when reading from channel, socket has
likely been closed.
at kafka.utils.Utils$.read(Utils.scala:376)
at
kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
at
kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:81)
at
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:71)
at kafka.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:138)
at
kafka.javaapi.consumer.SimpleConsumer.fetchOffsets(SimpleConsumer.scala:99)
at
com.vizury.rtb.realtimelogging.OfflineLogConsumer.commitOffsetTest(OfflineLogConsumer.java:205)
at
com.vizury.rtb.realtimelogging.OfflineLogConsumer.run(OfflineLogConsumer.java:147)
at
com.vizury.rtb.realtimelogging.OfflineLogConsumer.main(OfflineLogConsumer.java:31)


Any help ?? same error I am getting with fetchOffsets() method, where as
 getOffsetsBefore() is working fine.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message