kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gwen Shapira <gshap...@cloudera.com>
Subject Re: Consumer and offset management support in 0.8.2 and 0.9
Date Mon, 05 Jan 2015 18:55:18 GMT
Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
part of the Map changed, which will modify the wire protocol.

This is actually not handled in the Java client either. It will send
the timestamp no matter which version is used.

This looks like a bug and I'd even mark it as blocker for 0.8.2 since
it may prevent rolling upgrades.

Are you opening the JIRA?

Gwen

On Mon, Jan 5, 2015 at 10:28 AM, Dana Powers <dana.powers@rd.io> wrote:
> specifically comparing 0.8.1 --
>
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
> ```
> (1 to partitionCount).map(_ => {
>   val partitionId = buffer.getInt
>   val offset = buffer.getLong
>   val metadata = readShortString(buffer)
>   (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset,
> metadata))
> })
> ```
>
> totrunk --
>
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
> ```
> (1 to partitionCount).map(_ => {
>   val partitionId = buffer.getInt
>   val offset = buffer.getLong
>   val timestamp = {
>     val given = buffer.getLong
>     if (given == -1L) now else given
>   }
>   val metadata = readShortString(buffer)
>   (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset,
> metadata, timestamp))
> })
> ```
>
> should the `timestamp` buffer read be wrapped in an api version check?
>
>
> Dana Powers
> Rdio, Inc.
> dana.powers@rd.io
> rdio.com/people/dpkp/
>
> On Mon, Jan 5, 2015 at 9:49 AM, Gwen Shapira <gshapira@cloudera.com> wrote:
>
>> Ah, I see :)
>>
>> The readFrom function basically tries to read two extra fields if you
>> are on version 1:
>>
>> if (versionId == 1) {
>>       groupGenerationId = buffer.getInt
>>       consumerId = readShortString(buffer)
>>     }
>>
>> The rest looks identical in version 0 and 1, and still no timestamp in
>> sight...
>>
>> Gwen
>>
>> On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers <dana.powers@rd.io> wrote:
>> > Hi Gwen, I am using/writing kafka-python to construct api requests and
>> have
>> > not dug too deeply into the server source code.  But I believe it is
>> > kafka/api/OffsetCommitRequest.scala and specifically the readFrom method
>> > used to decode the wire protocol.
>> >
>> > -Dana
>> > OffsetCommitRequest has two constructors now:
>> >
>> > For version 0:
>> >  OffsetCommitRequest(String groupId, Map<TopicPartition,
>> > PartitionData> offsetData)
>> >
>> > And version 1:
>> > OffsetCommitRequest(String groupId, int generationId, String
>> > consumerId, Map<TopicPartition, PartitionData> offsetData)
>> >
>> > None of them seem to require timestamps... so I'm not sure where you
>> > see that this is required. Can you share an example?
>> >
>> > Gwen
>> >
>> > On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers <dana.powers@rd.io> wrote:
>> >> Hi Joel,
>> >>
>> >> I'm looking more closely at the OffsetCommitRequest wire protocol change
>> >> you mentioned below, and I cannot figure out how to explicitly
>> construct a
>> >> request with the earlier version.  Should the api version be different
>> for
>> >> requests that do not include it and/or servers that do not support the
>> >> timestamp field?  It looks like 0.8.1.1 did not include the timestamp
>> > field
>> >> and used api version 0.  But 0.8.2-beta seems to now require timestamps
>> >> even when I explicitly encode OffsetCommitRequest api version 0 (server
>> >> logs a BufferUnderflowException).
>> >>
>> >> Is this the expected server behavior?  Can you provide any tips on how
>> >> third-party clients should manage the wire-protocol change for this api
>> >> method (I'm working on kafka-python)?
>> >>
>> >> Thanks,
>> >>
>> >> -Dana
>> >>
>> >> On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy <jjkoshy.w@gmail.com>
>> wrote:
>> >>
>> >>> Yes it should be backwards compatible. So for e.g., you should be able
>> >>> to use an 0.8.1 client with an 0.8.2 broker. In general, you should
>> >>> not upgrade your clients until after the brokers have been upgraded.
>> >>> However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
>> >>> protocol change I'm aware of is the OffsetCommitRequest.  There is a
>> >>> change in the OffsetCommitRequest format (KAFKA-1634) although you can
>> >>> explicitly construct an OffsetCommitRequest with the earlier version.
>> >>>
>> >>> Thanks,
>> >>>
>> >>> Joel
>> >>>
>> >>> On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
>> >>> > Hi Joel,
>> >>> >
>> >>> > Thanks for all the clarifications!  Just another question on this:
>> will
>> >>> > 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with
0.8?
>> >>> > Generally speaking, would there be any concerns with using the
0.8.2
>> >>> > consumer with a 0.8.1 broker, for instance?
>> >>> >
>> >>> > Marius
>> >>> >
>> >>> > On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy <jjkoshy.w@gmail.com>
>> > wrote:
>> >>> >
>> >>> > > Inline..
>> >>> > >
>> >>> > > On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici
wrote:
>> >>> > > > Hello everyone,
>> >>> > > >
>> >>> > > > I have a few questions about the current status and future
of the
>> >>> Kafka
>> >>> > > > consumers.
>> >>> > > >
>> >>> > > > We have been working to adding Kafka support in Spring
XD [1],
>> >>> currently
>> >>> > > > using the high level consumer via Spring Integration
Kafka [2].
>> We
>> >>> are
>> >>> > > > working on adding features such as:
>> >>> > > > - the ability to control offsets/replay topics;
>> >>> > > > - the ability to control partition allocation across
multiple
>> >>> consumers;
>> >>> > > >
>> >>> > > > We are currently at version 0.8.1.1, so using the simple
consumer
>> > is
>> >>> a
>> >>> > > > pretty straightforward choice right now. However, in
the light of
>> > the
>> >>> > > > upcoming consumer changes for 0.8.2 and 0.9, I have a
few
>> > questions:
>> >>> > > >
>> >>> > > > 1) With respect to the consumer redesign for 0.9, what
is the
>> > future
>> >>> of
>> >>> > > the
>> >>> > > > Simple Consumer and High Level Consumer? To my best
>> understanding,
>> >>> the
>> >>> > > > existing high level consumer API will be deprecated in
favour of
>> > the
>> >>> new
>> >>> > > > consumer API. What is the future of the Simple Consumer,
in this
>> >>> case? it
>> >>> > > > will continue to exist as a low-level API implementing
the Kafka
>> >>> protocol
>> >>> > > > [3] and providing the building blocks for the new consumer,
or
>> will
>> >>> it be
>> >>> > > > deprecated as well?
>> >>> > >
>> >>> > > The new consumer will subsume both use-cases (simple and
>> high-level).
>> >>> > > You can still use the old SimpleConsumer if you wish - i.e.,
the
>> wire
>> >>> > > protocol for fetch and other requests will still be supported.
>> >>> > >
>> >>> > > >
>> >>> > > > 2) Regarding the new consumer: the v0.8.2 codebase contains
an
>> > early
>> >>> > > > implementation of it, but since this a feature scheduled
only for
>> >>> 0.9,
>> >>> > > what
>> >>> > > > is its status as well? Is it included only as a future
reference
>> > and
>> >>> for
>> >>> > > > stabilizing the API?
>> >>> > >
>> >>> > > It is a WIP so you cannot really use it.
>> >>> > >
>> >>> > > > 3) Obviously, offset management is a concern if using
the simple
>> >>> > > consumer,
>> >>> > > > so - wondering about the Offset Management API as well.
The Kafka
>> >>> > > protocol
>> >>> > > > document specifically indicates that it will be fully
functional
>> in
>> >>> 0.8.2
>> >>> > > > [4] - however, a functional implementation is already
available
>> in
>> >>> > > 0.8.1.1
>> >>> > > > (accessible via the SimpleConsumer API but not documented
in
>> [5]).
>> >>> Again,
>> >>> > > > trying to understand the extent of what 0.8.1.1 already
supports
>> >>> > > > (ostensibly, the offset manager support seems to have
been added
>> >>> only in
>> >>> > > > 0.8.2 - please correct me if I am wrong), and whether
if it is
>> >>> > > recommended
>> >>> > > > for use in production in any form (with the caveats that
>> accompany
>> >>> the
>> >>> > > use
>> >>> > > > of ZooKeeper).
>> >>> > >
>> >>> > > In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will
use
>> > Kafka
>> >>> > > as the offsets storage mechanism (not zookeeper). High-level
Java
>> >>> > > consumers can choose to store offsets in ZooKeeper instead
by
>> setting
>> >>> > > offsets.storage=zookeeper
>> >>> > >
>> >>> > > However, if you are using the simple consumer and wish to
store
>> >>> > > offsets in ZooKeeper you will need to commit to ZooKeeper
directly.
>> >>> > > You can use ZkUtils in the kafka.utils package for this.
>> >>> > >
>> >>> > > If you wish to move to Kafka-based offsets we will be adding
a new
>> >>> > > OffsetsClient that can be used to commit/fetch offsets to/from
>> Kafka.
>> >>> > > This is currently not listed as a blocker for 0.8.2 but I
think we
>> >>> > > should include it. I will update that ticket.
>> >>> > >
>> >>> > > > 4) Trying to interpret the existing examples in [6] and
the
>> > comments
>> >>> on
>> >>> > > [7]
>> >>> > > > - the version of the Offset Management API that exists
in 0.8.1.1
>> > is
>> >>> > > using
>> >>> > > > ZooKeeper - whereas ZooKeeper will be optional in 0.8.2
- to be
>> >>> replaced
>> >>> > > by
>> >>> > > > Kafka, and phased out if possible. To my understanding,
the
>> switch
>> >>> > > between
>> >>> > > > the two will be controlled by the broker configuration
(along
>> with
>> >>> other
>> >>> > > > parameters that control the performance of offset queues.
Is that
>> >>> > > correct?
>> >>> > >
>> >>> > > The switch is a client-side configuration. That wiki is not
>> >>> > > up-to-date. The most current documentation is available as
a patch
>> in
>> >>> > > https://issues.apache.org/jira/browse/KAFKA-1729
>> >>> > >
>> >>> > > > 5) Also, wondering about the timeline of 0.8.2 - according
to the
>> >>> > > roadmaps
>> >>> > > > it should be released relatively shortly. Is that correct?
>> >>> > >
>> >>> > > Yes - once the blockers are ironed out.
>> >>> > >
>> >>> > > >
>> >>> > > > Thanks,
>> >>> > > > Marius
>> >>> > > >
>> >>> > > > [1] http://projects.spring.io/spring-xd/
>> >>> > > > [2] https://github.com/spring-projects/spring-integration-kafka
>> >>> > > > [3]
>> >>> > > >
>> >>> > >
>> >>>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>> >>> > > > [4]
>> >>> > > >
>> >>> > >
>> >>>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
>> >>> > > > [5]
>> > http://kafka.apache.org/082/documentation.html#simpleconsumerapi
>> >>> > > > [6]
>> >>> > > >
>> >>> > >
>> >>>
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
>> >>> > > > [7] https://issues.apache.org/jira/browse/KAFKA-1729
>> >>> > >
>> >>> > >
>> >>>
>> >>>
>>

Mime
View raw message