kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dana Powers <dana.pow...@gmail.com>
Subject Re: Can't save Kafka offset in Zookeeper
Date Tue, 19 Jan 2016 17:39:28 GMT
Sadly HDP 2.3.2 shipped with a broken OffsetCommit api (the 0.8.2-beta
version). You should use the apache releases, or upgrade to HDP 2.3.4.0 or
later.

-Dana

On Tue, Jan 19, 2016 at 12:03 AM, Krzysztof Zarzycki <k.zarzycki@gmail.com>
wrote:

> Hi Kafka users,
> I have an issue with saving Kafka offsets to Zookeeper through
> OffsetCommitRequest. It's the same issue I found unanswered on SO, so I
> kindly borrow the description:
>
> http://stackoverflow.com/questions/33897683/cant-save-kafka-offset-in-zookeeper
>
> "I've installed Zookeeper and Kafka from Ambari, on CentoS 7.
>
> Ambari version: 2.1.2.1
> Zookeeper version: 3.4.6.2.3
> Kafka version: 0.8.2.2.3
> Java Kafka client:kafka_2.10, 0.8.2.2
>
> I'm trying to save the Kafka offset, using the following code:
>
> SimpleConsumer simpleConsumer = new SimpleConsumer(host, port,
> soTimeout, bufferSize, clientId);TopicAndPartition topicAndPartition =
> new TopicAndPartition(topicName, partitionId);Map<TopicAndPartition,
> OffsetAndMetadata> requestInfo = new HashMap<>();
> requestInfo.put(topicAndPartition, new OffsetAndMetadata(readOffset,
> "", ErrorMapping.NoError()));OffsetCommitRequest offsetCommitRequest =
> new OffsetCommitRequest(groupName, requestInfo, correlationId,
> clientName, (short)0);
> simpleConsumer.commitOffsets(offsetCommitRequest);
> simpleConsumer.close();
>
> But when I run this, I get the following error in my client:
>
> java.io.EOFException: Received -1 when reading from channel, socket
> has likely been closed.
>
> Also in the Kafka logs I have the following error:
>
> [2015-11-24 15:38:53,566] ERROR Closing socket for /192.168.186.1
> because of error (kafka.network.Processor)
> java.nio.BufferUnderflowException
>     at java.nio.Buffer.nextGetIndex(Buffer.java:498)
>     at java.nio.HeapByteBuffer.getLong(HeapByteBuffer.java:406)
>     at
> kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:73)
>     at
> kafka.api.OffsetCommitRequest$$anonfun$1$$anonfun$apply$1.apply(OffsetCommitRequest.scala:68)
>     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>     at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>     at scala.collection.immutable.Range.foreach(Range.scala:141)
>     at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>     at
> kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:68)
>     at
> kafka.api.OffsetCommitRequest$$anonfun$1.apply(OffsetCommitRequest.scala:65)
>     at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>     at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>     at scala.collection.immutable.Range.foreach(Range.scala:141)
>     at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>     at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>     at
> kafka.api.OffsetCommitRequest$.readFrom(OffsetCommitRequest.scala:65)
>     at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
>     at kafka.api.RequestKeys$$anonfun$9.apply(RequestKeys.scala:47)
>     at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:55)
>     at kafka.network.Processor.read(SocketServer.scala:547)
>     at kafka.network.Processor.run(SocketServer.scala:405)
>     at java.lang.Thread.run(Thread.java:745)
>
> Now I've also downloaded and installed the official Kafka 0.8.2.2 version
> from
>
> https://www.apache.org/dyn/closer.cgi?path=/kafka/0.8.2.2/kafka_2.10-0.8.2.2.tgz
> and
> it works ok; you can save the Kafka offset without any error.
>
> Can anybody give me a some directions, why is the Ambari Kafka failing to
> save the offset?
>
> P.S: I know that if versionId is 0 (in OffsetCommitRequest), than the
> offset is actually saved in Zookeeper.
> "
> My only difference (IMHO, irrelevant)  is that I'm using HDP in version
> 2.3.2, but other than that versions are the same.
>
> Do you guys have any hints on what could be wrong? Is that something wrong
> with my use of offset committing? Or conflict of versions?
> Any hints would be highly appreciated :)
> Cheers,
> Krzysztof
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message