kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gwen Shapira <gshap...@cloudera.com>
Subject Re: Consumer and offset management support in 0.8.2 and 0.9
Date Mon, 05 Jan 2015 17:49:45 GMT
Ah, I see :)

The readFrom function basically tries to read two extra fields if you
are on version 1:

if (versionId == 1) {
      groupGenerationId = buffer.getInt
      consumerId = readShortString(buffer)
    }

The rest looks identical in version 0 and 1, and still no timestamp in sight...

Gwen

On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers <dana.powers@rd.io> wrote:
> Hi Gwen, I am using/writing kafka-python to construct api requests and have
> not dug too deeply into the server source code.  But I believe it is
> kafka/api/OffsetCommitRequest.scala and specifically the readFrom method
> used to decode the wire protocol.
>
> -Dana
> OffsetCommitRequest has two constructors now:
>
> For version 0:
>  OffsetCommitRequest(String groupId, Map<TopicPartition,
> PartitionData> offsetData)
>
> And version 1:
> OffsetCommitRequest(String groupId, int generationId, String
> consumerId, Map<TopicPartition, PartitionData> offsetData)
>
> None of them seem to require timestamps... so I'm not sure where you
> see that this is required. Can you share an example?
>
> Gwen
>
> On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers <dana.powers@rd.io> wrote:
>> Hi Joel,
>>
>> I'm looking more closely at the OffsetCommitRequest wire protocol change
>> you mentioned below, and I cannot figure out how to explicitly construct a
>> request with the earlier version.  Should the api version be different for
>> requests that do not include it and/or servers that do not support the
>> timestamp field?  It looks like 0.8.1.1 did not include the timestamp
> field
>> and used api version 0.  But 0.8.2-beta seems to now require timestamps
>> even when I explicitly encode OffsetCommitRequest api version 0 (server
>> logs a BufferUnderflowException).
>>
>> Is this the expected server behavior?  Can you provide any tips on how
>> third-party clients should manage the wire-protocol change for this api
>> method (I'm working on kafka-python)?
>>
>> Thanks,
>>
>> -Dana
>>
>> On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy <jjkoshy.w@gmail.com> wrote:
>>
>>> Yes it should be backwards compatible. So for e.g., you should be able
>>> to use an 0.8.1 client with an 0.8.2 broker. In general, you should
>>> not upgrade your clients until after the brokers have been upgraded.
>>> However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
>>> protocol change I'm aware of is the OffsetCommitRequest.  There is a
>>> change in the OffsetCommitRequest format (KAFKA-1634) although you can
>>> explicitly construct an OffsetCommitRequest with the earlier version.
>>>
>>> Thanks,
>>>
>>> Joel
>>>
>>> On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
>>> > Hi Joel,
>>> >
>>> > Thanks for all the clarifications!  Just another question on this: will
>>> > 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
>>> > Generally speaking, would there be any concerns with using the 0.8.2
>>> > consumer with a 0.8.1 broker, for instance?
>>> >
>>> > Marius
>>> >
>>> > On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy <jjkoshy.w@gmail.com>
> wrote:
>>> >
>>> > > Inline..
>>> > >
>>> > > On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
>>> > > > Hello everyone,
>>> > > >
>>> > > > I have a few questions about the current status and future of
the
>>> Kafka
>>> > > > consumers.
>>> > > >
>>> > > > We have been working to adding Kafka support in Spring XD [1],
>>> currently
>>> > > > using the high level consumer via Spring Integration Kafka [2].
We
>>> are
>>> > > > working on adding features such as:
>>> > > > - the ability to control offsets/replay topics;
>>> > > > - the ability to control partition allocation across multiple
>>> consumers;
>>> > > >
>>> > > > We are currently at version 0.8.1.1, so using the simple consumer
> is
>>> a
>>> > > > pretty straightforward choice right now. However, in the light
of
> the
>>> > > > upcoming consumer changes for 0.8.2 and 0.9, I have a few
> questions:
>>> > > >
>>> > > > 1) With respect to the consumer redesign for 0.9, what is the
> future
>>> of
>>> > > the
>>> > > > Simple Consumer and High Level Consumer? To my best understanding,
>>> the
>>> > > > existing high level consumer API will be deprecated in favour
of
> the
>>> new
>>> > > > consumer API. What is the future of the Simple Consumer, in this
>>> case? it
>>> > > > will continue to exist as a low-level API implementing the Kafka
>>> protocol
>>> > > > [3] and providing the building blocks for the new consumer, or
will
>>> it be
>>> > > > deprecated as well?
>>> > >
>>> > > The new consumer will subsume both use-cases (simple and high-level).
>>> > > You can still use the old SimpleConsumer if you wish - i.e., the wire
>>> > > protocol for fetch and other requests will still be supported.
>>> > >
>>> > > >
>>> > > > 2) Regarding the new consumer: the v0.8.2 codebase contains an
> early
>>> > > > implementation of it, but since this a feature scheduled only
for
>>> 0.9,
>>> > > what
>>> > > > is its status as well? Is it included only as a future reference
> and
>>> for
>>> > > > stabilizing the API?
>>> > >
>>> > > It is a WIP so you cannot really use it.
>>> > >
>>> > > > 3) Obviously, offset management is a concern if using the simple
>>> > > consumer,
>>> > > > so - wondering about the Offset Management API as well. The Kafka
>>> > > protocol
>>> > > > document specifically indicates that it will be fully functional
in
>>> 0.8.2
>>> > > > [4] - however, a functional implementation is already available
in
>>> > > 0.8.1.1
>>> > > > (accessible via the SimpleConsumer API but not documented in [5]).
>>> Again,
>>> > > > trying to understand the extent of what 0.8.1.1 already supports
>>> > > > (ostensibly, the offset manager support seems to have been added
>>> only in
>>> > > > 0.8.2 - please correct me if I am wrong), and whether if it is
>>> > > recommended
>>> > > > for use in production in any form (with the caveats that accompany
>>> the
>>> > > use
>>> > > > of ZooKeeper).
>>> > >
>>> > > In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use
> Kafka
>>> > > as the offsets storage mechanism (not zookeeper). High-level Java
>>> > > consumers can choose to store offsets in ZooKeeper instead by setting
>>> > > offsets.storage=zookeeper
>>> > >
>>> > > However, if you are using the simple consumer and wish to store
>>> > > offsets in ZooKeeper you will need to commit to ZooKeeper directly.
>>> > > You can use ZkUtils in the kafka.utils package for this.
>>> > >
>>> > > If you wish to move to Kafka-based offsets we will be adding a new
>>> > > OffsetsClient that can be used to commit/fetch offsets to/from Kafka.
>>> > > This is currently not listed as a blocker for 0.8.2 but I think we
>>> > > should include it. I will update that ticket.
>>> > >
>>> > > > 4) Trying to interpret the existing examples in [6] and the
> comments
>>> on
>>> > > [7]
>>> > > > - the version of the Offset Management API that exists in 0.8.1.1
> is
>>> > > using
>>> > > > ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be
>>> replaced
>>> > > by
>>> > > > Kafka, and phased out if possible. To my understanding, the switch
>>> > > between
>>> > > > the two will be controlled by the broker configuration (along
with
>>> other
>>> > > > parameters that control the performance of offset queues. Is that
>>> > > correct?
>>> > >
>>> > > The switch is a client-side configuration. That wiki is not
>>> > > up-to-date. The most current documentation is available as a patch
in
>>> > > https://issues.apache.org/jira/browse/KAFKA-1729
>>> > >
>>> > > > 5) Also, wondering about the timeline of 0.8.2 - according to
the
>>> > > roadmaps
>>> > > > it should be released relatively shortly. Is that correct?
>>> > >
>>> > > Yes - once the blockers are ironed out.
>>> > >
>>> > > >
>>> > > > Thanks,
>>> > > > Marius
>>> > > >
>>> > > > [1] http://projects.spring.io/spring-xd/
>>> > > > [2] https://github.com/spring-projects/spring-integration-kafka
>>> > > > [3]
>>> > > >
>>> > >
>>>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
>>> > > > [4]
>>> > > >
>>> > >
>>>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
>>> > > > [5]
> http://kafka.apache.org/082/documentation.html#simpleconsumerapi
>>> > > > [6]
>>> > > >
>>> > >
>>>
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
>>> > > > [7] https://issues.apache.org/jira/browse/KAFKA-1729
>>> > >
>>> > >
>>>
>>>

Mime
View raw message