kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <...@confluent.io>
Subject Re: Consumer and offset management support in 0.8.2 and 0.9
Date Wed, 07 Jan 2015 17:24:25 GMT
Yes, we did make an incompatible change in OffsetCommitRequest in 0.8.2,
which is a mistake. The incompatible change was introduced in KAFKA-1012 in
Mar, 2014 when we added the kafka-based offset management support. However,
we didn't realize that this breaks the wire protocol until much later. Now,
the wire protocol has evolved again and it's a bit hard to fix the format
in version 0. I can see a couple of options.

Option 1: Just accept the incompatible change as it is.
The argument is that even though we introduced OffsetCommitRequest in
0.8.1, it's not used in the high level consumer. It's possible that some
users of SimpleConsumer started using it. However, that number is likely
small. Also, the functionality of OffsetCommitRequest has changed since
it's writing the offset to a Kafka log, instead of ZK (for good reasons).
So, we can document this as a wire protocol and functionality incompatible
change. For users who don't mind the functionality change, they will need
to upgrade the client to the new protocol before they can use the new
broker. For users who want to preserve the old functionality, they will
have to write the offsets directly to ZK. In either case, hopefully the
number of people being affected is small.

Option 2: Revert version 0 format to what's in 0.8.1.
There will be a few issues here. First, it's not clear how this affects
other people who have been deploying from trunk. Second, I am not sure that
we want to continue supporting writing the offset to ZK in OffsetCommitRequest
since that can cause ZK to be overloaded.

Joel Koshy,

Any thoughts on this?

Thanks,

Jun

On Mon, Jan 5, 2015 at 11:39 PM, Joe Stein <joe.stein@stealth.ly> wrote:

> In addition to the issue you bring up, the functionality as a whole has
> changed.. when you call OffsetFetchRequest the version = 0 needs to
> preserve the old functionality
>
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L678-L700
> and version = 1 the new
>
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L153-L223
> .
> Also the OffsetFetchRequest functionality even though the wire protocol is
> the same after the 0.8.2 upgrade for OffsetFetchRequest if you were using
> 0.8.1.1 OffsetFetchRequest
>
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L705-L728
> will stop going to zookeeper and start going to Kafka storage
>
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L504-L519
> so more errors will happen and things break too.
>
> I think we should treat the version field not just to stop from breaking
> the wire protocol calls but also as a "feature flag" preserving upgrades
> and multiple pathways.
>
> I updated the JIRA for the feature flag needs for OffsetFetch and
> OffsetCommit too.
>
> /*******************************************
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> ********************************************/
>
> On Mon, Jan 5, 2015 at 3:21 PM, Dana Powers <dana.powers@rd.io> wrote:
>
> > ok, opened KAFKA-1841 .  KAFKA-1634 also related.
> >
> > -Dana
> >
> > On Mon, Jan 5, 2015 at 10:55 AM, Gwen Shapira <gshapira@cloudera.com>
> > wrote:
> >
> > > Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
> > > part of the Map changed, which will modify the wire protocol.
> > >
> > > This is actually not handled in the Java client either. It will send
> > > the timestamp no matter which version is used.
> > >
> > > This looks like a bug and I'd even mark it as blocker for 0.8.2 since
> > > it may prevent rolling upgrades.
> > >
> > > Are you opening the JIRA?
> > >
> > > Gwen
> > >
> > > On Mon, Jan 5, 2015 at 10:28 AM, Dana Powers <dana.powers@rd.io>
> wrote:
> > > > specifically comparing 0.8.1 --
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
> > > > ```
> > > > (1 to partitionCount).map(_ => {
> > > >   val partitionId = buffer.getInt
> > > >   val offset = buffer.getLong
> > > >   val metadata = readShortString(buffer)
> > > >   (TopicAndPartition(topic, partitionId),
> > OffsetMetadataAndError(offset,
> > > > metadata))
> > > > })
> > > > ```
> > > >
> > > > totrunk --
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
> > > > ```
> > > > (1 to partitionCount).map(_ => {
> > > >   val partitionId = buffer.getInt
> > > >   val offset = buffer.getLong
> > > >   val timestamp = {
> > > >     val given = buffer.getLong
> > > >     if (given == -1L) now else given
> > > >   }
> > > >   val metadata = readShortString(buffer)
> > > >   (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset,
> > > > metadata, timestamp))
> > > > })
> > > > ```
> > > >
> > > > should the `timestamp` buffer read be wrapped in an api version
> check?
> > > >
> > > >
> > > > Dana Powers
> > > > Rdio, Inc.
> > > > dana.powers@rd.io
> > > > rdio.com/people/dpkp/
> > > >
> > > > On Mon, Jan 5, 2015 at 9:49 AM, Gwen Shapira <gshapira@cloudera.com>
> > > wrote:
> > > >
> > > >> Ah, I see :)
> > > >>
> > > >> The readFrom function basically tries to read two extra fields if
> you
> > > >> are on version 1:
> > > >>
> > > >> if (versionId == 1) {
> > > >>       groupGenerationId = buffer.getInt
> > > >>       consumerId = readShortString(buffer)
> > > >>     }
> > > >>
> > > >> The rest looks identical in version 0 and 1, and still no timestamp
> in
> > > >> sight...
> > > >>
> > > >> Gwen
> > > >>
> > > >> On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers <dana.powers@rd.io>
> > wrote:
> > > >> > Hi Gwen, I am using/writing kafka-python to construct api requests
> > and
> > > >> have
> > > >> > not dug too deeply into the server source code.  But I believe
it
> is
> > > >> > kafka/api/OffsetCommitRequest.scala and specifically the readFrom
> > > method
> > > >> > used to decode the wire protocol.
> > > >> >
> > > >> > -Dana
> > > >> > OffsetCommitRequest has two constructors now:
> > > >> >
> > > >> > For version 0:
> > > >> >  OffsetCommitRequest(String groupId, Map<TopicPartition,
> > > >> > PartitionData> offsetData)
> > > >> >
> > > >> > And version 1:
> > > >> > OffsetCommitRequest(String groupId, int generationId, String
> > > >> > consumerId, Map<TopicPartition, PartitionData> offsetData)
> > > >> >
> > > >> > None of them seem to require timestamps... so I'm not sure where
> you
> > > >> > see that this is required. Can you share an example?
> > > >> >
> > > >> > Gwen
> > > >> >
> > > >> > On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers <dana.powers@rd.io>
> > > wrote:
> > > >> >> Hi Joel,
> > > >> >>
> > > >> >> I'm looking more closely at the OffsetCommitRequest wire
protocol
> > > change
> > > >> >> you mentioned below, and I cannot figure out how to explicitly
> > > >> construct a
> > > >> >> request with the earlier version.  Should the api version
be
> > > different
> > > >> for
> > > >> >> requests that do not include it and/or servers that do not
> support
> > > the
> > > >> >> timestamp field?  It looks like 0.8.1.1 did not include the
> > timestamp
> > > >> > field
> > > >> >> and used api version 0.  But 0.8.2-beta seems to now require
> > > timestamps
> > > >> >> even when I explicitly encode OffsetCommitRequest api version
0
> > > (server
> > > >> >> logs a BufferUnderflowException).
> > > >> >>
> > > >> >> Is this the expected server behavior?  Can you provide any
tips
> on
> > > how
> > > >> >> third-party clients should manage the wire-protocol change
for
> this
> > > api
> > > >> >> method (I'm working on kafka-python)?
> > > >> >>
> > > >> >> Thanks,
> > > >> >>
> > > >> >> -Dana
> > > >> >>
> > > >> >> On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy <jjkoshy.w@gmail.com
> >
> > > >> wrote:
> > > >> >>
> > > >> >>> Yes it should be backwards compatible. So for e.g., you
should
> be
> > > able
> > > >> >>> to use an 0.8.1 client with an 0.8.2 broker. In general,
you
> > should
> > > >> >>> not upgrade your clients until after the brokers have
been
> > upgraded.
> > > >> >>> However, you can point an 0.8.2 client at an 0.8.1 broker.
One
> > wire
> > > >> >>> protocol change I'm aware of is the OffsetCommitRequest.
 There
> > is a
> > > >> >>> change in the OffsetCommitRequest format (KAFKA-1634)
although
> you
> > > can
> > > >> >>> explicitly construct an OffsetCommitRequest with the
earlier
> > > version.
> > > >> >>>
> > > >> >>> Thanks,
> > > >> >>>
> > > >> >>> Joel
> > > >> >>>
> > > >> >>> On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici
> wrote:
> > > >> >>> > Hi Joel,
> > > >> >>> >
> > > >> >>> > Thanks for all the clarifications!  Just another
question on
> > this:
> > > >> will
> > > >> >>> > 0.8.2 be backwards compatible with 0.8.1, just as
0.8.1 was
> with
> > > 0.8?
> > > >> >>> > Generally speaking, would there be any concerns
with using the
> > > 0.8.2
> > > >> >>> > consumer with a 0.8.1 broker, for instance?
> > > >> >>> >
> > > >> >>> > Marius
> > > >> >>> >
> > > >> >>> > On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy <
> > jjkoshy.w@gmail.com>
> > > >> > wrote:
> > > >> >>> >
> > > >> >>> > > Inline..
> > > >> >>> > >
> > > >> >>> > > On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius
Bogoevici
> > > wrote:
> > > >> >>> > > > Hello everyone,
> > > >> >>> > > >
> > > >> >>> > > > I have a few questions about the current
status and future
> > of
> > > the
> > > >> >>> Kafka
> > > >> >>> > > > consumers.
> > > >> >>> > > >
> > > >> >>> > > > We have been working to adding Kafka support
in Spring XD
> > [1],
> > > >> >>> currently
> > > >> >>> > > > using the high level consumer via Spring
Integration Kafka
> > > [2].
> > > >> We
> > > >> >>> are
> > > >> >>> > > > working on adding features such as:
> > > >> >>> > > > - the ability to control offsets/replay
topics;
> > > >> >>> > > > - the ability to control partition allocation
across
> > multiple
> > > >> >>> consumers;
> > > >> >>> > > >
> > > >> >>> > > > We are currently at version 0.8.1.1, so
using the simple
> > > consumer
> > > >> > is
> > > >> >>> a
> > > >> >>> > > > pretty straightforward choice right now.
However, in the
> > > light of
> > > >> > the
> > > >> >>> > > > upcoming consumer changes for 0.8.2 and
0.9, I have a few
> > > >> > questions:
> > > >> >>> > > >
> > > >> >>> > > > 1) With respect to the consumer redesign
for 0.9, what is
> > the
> > > >> > future
> > > >> >>> of
> > > >> >>> > > the
> > > >> >>> > > > Simple Consumer and High Level Consumer?
To my best
> > > >> understanding,
> > > >> >>> the
> > > >> >>> > > > existing high level consumer API will
be deprecated in
> > favour
> > > of
> > > >> > the
> > > >> >>> new
> > > >> >>> > > > consumer API. What is the future of the
Simple Consumer,
> in
> > > this
> > > >> >>> case? it
> > > >> >>> > > > will continue to exist as a low-level
API implementing the
> > > Kafka
> > > >> >>> protocol
> > > >> >>> > > > [3] and providing the building blocks
for the new
> consumer,
> > or
> > > >> will
> > > >> >>> it be
> > > >> >>> > > > deprecated as well?
> > > >> >>> > >
> > > >> >>> > > The new consumer will subsume both use-cases
(simple and
> > > >> high-level).
> > > >> >>> > > You can still use the old SimpleConsumer if
you wish - i.e.,
> > the
> > > >> wire
> > > >> >>> > > protocol for fetch and other requests will
still be
> supported.
> > > >> >>> > >
> > > >> >>> > > >
> > > >> >>> > > > 2) Regarding the new consumer: the v0.8.2
codebase
> contains
> > an
> > > >> > early
> > > >> >>> > > > implementation of it, but since this a
feature scheduled
> > only
> > > for
> > > >> >>> 0.9,
> > > >> >>> > > what
> > > >> >>> > > > is its status as well? Is it included
only as a future
> > > reference
> > > >> > and
> > > >> >>> for
> > > >> >>> > > > stabilizing the API?
> > > >> >>> > >
> > > >> >>> > > It is a WIP so you cannot really use it.
> > > >> >>> > >
> > > >> >>> > > > 3) Obviously, offset management is a concern
if using the
> > > simple
> > > >> >>> > > consumer,
> > > >> >>> > > > so - wondering about the Offset Management
API as well.
> The
> > > Kafka
> > > >> >>> > > protocol
> > > >> >>> > > > document specifically indicates that it
will be fully
> > > functional
> > > >> in
> > > >> >>> 0.8.2
> > > >> >>> > > > [4] - however, a functional implementation
is already
> > > available
> > > >> in
> > > >> >>> > > 0.8.1.1
> > > >> >>> > > > (accessible via the SimpleConsumer API
but not documented
> in
> > > >> [5]).
> > > >> >>> Again,
> > > >> >>> > > > trying to understand the extent of what
0.8.1.1 already
> > > supports
> > > >> >>> > > > (ostensibly, the offset manager support
seems to have been
> > > added
> > > >> >>> only in
> > > >> >>> > > > 0.8.2 - please correct me if I am wrong),
and whether if
> it
> > is
> > > >> >>> > > recommended
> > > >> >>> > > > for use in production in any form (with
the caveats that
> > > >> accompany
> > > >> >>> the
> > > >> >>> > > use
> > > >> >>> > > > of ZooKeeper).
> > > >> >>> > >
> > > >> >>> > > In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest
will
> > use
> > > >> > Kafka
> > > >> >>> > > as the offsets storage mechanism (not zookeeper).
High-level
> > > Java
> > > >> >>> > > consumers can choose to store offsets in ZooKeeper
instead
> by
> > > >> setting
> > > >> >>> > > offsets.storage=zookeeper
> > > >> >>> > >
> > > >> >>> > > However, if you are using the simple consumer
and wish to
> > store
> > > >> >>> > > offsets in ZooKeeper you will need to commit
to ZooKeeper
> > > directly.
> > > >> >>> > > You can use ZkUtils in the kafka.utils package
for this.
> > > >> >>> > >
> > > >> >>> > > If you wish to move to Kafka-based offsets
we will be
> adding a
> > > new
> > > >> >>> > > OffsetsClient that can be used to commit/fetch
offsets
> to/from
> > > >> Kafka.
> > > >> >>> > > This is currently not listed as a blocker for
0.8.2 but I
> > think
> > > we
> > > >> >>> > > should include it. I will update that ticket.
> > > >> >>> > >
> > > >> >>> > > > 4) Trying to interpret the existing examples
in [6] and
> the
> > > >> > comments
> > > >> >>> on
> > > >> >>> > > [7]
> > > >> >>> > > > - the version of the Offset Management
API that exists in
> > > 0.8.1.1
> > > >> > is
> > > >> >>> > > using
> > > >> >>> > > > ZooKeeper - whereas ZooKeeper will be
optional in 0.8.2 -
> to
> > > be
> > > >> >>> replaced
> > > >> >>> > > by
> > > >> >>> > > > Kafka, and phased out if possible. To
my understanding,
> the
> > > >> switch
> > > >> >>> > > between
> > > >> >>> > > > the two will be controlled by the broker
configuration
> > (along
> > > >> with
> > > >> >>> other
> > > >> >>> > > > parameters that control the performance
of offset queues.
> Is
> > > that
> > > >> >>> > > correct?
> > > >> >>> > >
> > > >> >>> > > The switch is a client-side configuration.
That wiki is not
> > > >> >>> > > up-to-date. The most current documentation
is available as a
> > > patch
> > > >> in
> > > >> >>> > > https://issues.apache.org/jira/browse/KAFKA-1729
> > > >> >>> > >
> > > >> >>> > > > 5) Also, wondering about the timeline
of 0.8.2 - according
> > to
> > > the
> > > >> >>> > > roadmaps
> > > >> >>> > > > it should be released relatively shortly.
Is that correct?
> > > >> >>> > >
> > > >> >>> > > Yes - once the blockers are ironed out.
> > > >> >>> > >
> > > >> >>> > > >
> > > >> >>> > > > Thanks,
> > > >> >>> > > > Marius
> > > >> >>> > > >
> > > >> >>> > > > [1] http://projects.spring.io/spring-xd/
> > > >> >>> > > > [2]
> > > https://github.com/spring-projects/spring-integration-kafka
> > > >> >>> > > > [3]
> > > >> >>> > > >
> > > >> >>> > >
> > > >> >>>
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > > >> >>> > > > [4]
> > > >> >>> > > >
> > > >> >>> > >
> > > >> >>>
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
> > > >> >>> > > > [5]
> > > >> > http://kafka.apache.org/082/documentation.html#simpleconsumerapi
> > > >> >>> > > > [6]
> > > >> >>> > > >
> > > >> >>> > >
> > > >> >>>
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> > > >> >>> > > > [7] https://issues.apache.org/jira/browse/KAFKA-1729
> > > >> >>> > >
> > > >> >>> > >
> > > >> >>>
> > > >> >>>
> > > >>
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message