kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jay Kreps <...@confluent.io>
Subject Re: Consumer and offset management support in 0.8.2 and 0.9
Date Wed, 07 Jan 2015 18:04:59 GMT
Hey guys,

We need to take the versioning of the protocol seriously. People are
definitely using the offset commit functionality in 0.8.1 and I really
think we should treat this as a bug and revert the change to version 0.

-Jay

On Wed, Jan 7, 2015 at 9:24 AM, Jun Rao <jun@confluent.io> wrote:

> Yes, we did make an incompatible change in OffsetCommitRequest in 0.8.2,
> which is a mistake. The incompatible change was introduced in KAFKA-1012 in
> Mar, 2014 when we added the kafka-based offset management support. However,
> we didn't realize that this breaks the wire protocol until much later. Now,
> the wire protocol has evolved again and it's a bit hard to fix the format
> in version 0. I can see a couple of options.
>
> Option 1: Just accept the incompatible change as it is.
> The argument is that even though we introduced OffsetCommitRequest in
> 0.8.1, it's not used in the high level consumer. It's possible that some
> users of SimpleConsumer started using it. However, that number is likely
> small. Also, the functionality of OffsetCommitRequest has changed since
> it's writing the offset to a Kafka log, instead of ZK (for good reasons).
> So, we can document this as a wire protocol and functionality incompatible
> change. For users who don't mind the functionality change, they will need
> to upgrade the client to the new protocol before they can use the new
> broker. For users who want to preserve the old functionality, they will
> have to write the offsets directly to ZK. In either case, hopefully the
> number of people being affected is small.
>
> Option 2: Revert version 0 format to what's in 0.8.1.
> There will be a few issues here. First, it's not clear how this affects
> other people who have been deploying from trunk. Second, I am not sure that
> we want to continue supporting writing the offset to ZK in
> OffsetCommitRequest
> since that can cause ZK to be overloaded.
>
> Joel Koshy,
>
> Any thoughts on this?
>
> Thanks,
>
> Jun
>
> On Mon, Jan 5, 2015 at 11:39 PM, Joe Stein <joe.stein@stealth.ly> wrote:
>
> > In addition to the issue you bring up, the functionality as a whole has
> > changed.. when you call OffsetFetchRequest the version = 0 needs to
> > preserve the old functionality
> >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L678-L700
> > and version = 1 the new
> >
> >
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L153-L223
> > .
> > Also the OffsetFetchRequest functionality even though the wire protocol
> is
> > the same after the 0.8.2 upgrade for OffsetFetchRequest if you were using
> > 0.8.1.1 OffsetFetchRequest
> >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L705-L728
> > will stop going to zookeeper and start going to Kafka storage
> >
> >
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L504-L519
> > so more errors will happen and things break too.
> >
> > I think we should treat the version field not just to stop from breaking
> > the wire protocol calls but also as a "feature flag" preserving upgrades
> > and multiple pathways.
> >
> > I updated the JIRA for the feature flag needs for OffsetFetch and
> > OffsetCommit too.
> >
> > /*******************************************
> >  Joe Stein
> >  Founder, Principal Consultant
> >  Big Data Open Source Security LLC
> >  http://www.stealth.ly
> >  Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
> > ********************************************/
> >
> > On Mon, Jan 5, 2015 at 3:21 PM, Dana Powers <dana.powers@rd.io> wrote:
> >
> > > ok, opened KAFKA-1841 .  KAFKA-1634 also related.
> > >
> > > -Dana
> > >
> > > On Mon, Jan 5, 2015 at 10:55 AM, Gwen Shapira <gshapira@cloudera.com>
> > > wrote:
> > >
> > > > Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
> > > > part of the Map changed, which will modify the wire protocol.
> > > >
> > > > This is actually not handled in the Java client either. It will send
> > > > the timestamp no matter which version is used.
> > > >
> > > > This looks like a bug and I'd even mark it as blocker for 0.8.2 since
> > > > it may prevent rolling upgrades.
> > > >
> > > > Are you opening the JIRA?
> > > >
> > > > Gwen
> > > >
> > > > On Mon, Jan 5, 2015 at 10:28 AM, Dana Powers <dana.powers@rd.io>
> > wrote:
> > > > > specifically comparing 0.8.1 --
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
> > > > > ```
> > > > > (1 to partitionCount).map(_ => {
> > > > >   val partitionId = buffer.getInt
> > > > >   val offset = buffer.getLong
> > > > >   val metadata = readShortString(buffer)
> > > > >   (TopicAndPartition(topic, partitionId),
> > > OffsetMetadataAndError(offset,
> > > > > metadata))
> > > > > })
> > > > > ```
> > > > >
> > > > > totrunk --
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
> > > > > ```
> > > > > (1 to partitionCount).map(_ => {
> > > > >   val partitionId = buffer.getInt
> > > > >   val offset = buffer.getLong
> > > > >   val timestamp = {
> > > > >     val given = buffer.getLong
> > > > >     if (given == -1L) now else given
> > > > >   }
> > > > >   val metadata = readShortString(buffer)
> > > > >   (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset,
> > > > > metadata, timestamp))
> > > > > })
> > > > > ```
> > > > >
> > > > > should the `timestamp` buffer read be wrapped in an api version
> > check?
> > > > >
> > > > >
> > > > > Dana Powers
> > > > > Rdio, Inc.
> > > > > dana.powers@rd.io
> > > > > rdio.com/people/dpkp/
> > > > >
> > > > > On Mon, Jan 5, 2015 at 9:49 AM, Gwen Shapira <
> gshapira@cloudera.com>
> > > > wrote:
> > > > >
> > > > >> Ah, I see :)
> > > > >>
> > > > >> The readFrom function basically tries to read two extra fields
if
> > you
> > > > >> are on version 1:
> > > > >>
> > > > >> if (versionId == 1) {
> > > > >>       groupGenerationId = buffer.getInt
> > > > >>       consumerId = readShortString(buffer)
> > > > >>     }
> > > > >>
> > > > >> The rest looks identical in version 0 and 1, and still no
> timestamp
> > in
> > > > >> sight...
> > > > >>
> > > > >> Gwen
> > > > >>
> > > > >> On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers <dana.powers@rd.io>
> > > wrote:
> > > > >> > Hi Gwen, I am using/writing kafka-python to construct api
> requests
> > > and
> > > > >> have
> > > > >> > not dug too deeply into the server source code.  But I believe
> it
> > is
> > > > >> > kafka/api/OffsetCommitRequest.scala and specifically the
> readFrom
> > > > method
> > > > >> > used to decode the wire protocol.
> > > > >> >
> > > > >> > -Dana
> > > > >> > OffsetCommitRequest has two constructors now:
> > > > >> >
> > > > >> > For version 0:
> > > > >> >  OffsetCommitRequest(String groupId, Map<TopicPartition,
> > > > >> > PartitionData> offsetData)
> > > > >> >
> > > > >> > And version 1:
> > > > >> > OffsetCommitRequest(String groupId, int generationId, String
> > > > >> > consumerId, Map<TopicPartition, PartitionData> offsetData)
> > > > >> >
> > > > >> > None of them seem to require timestamps... so I'm not sure
where
> > you
> > > > >> > see that this is required. Can you share an example?
> > > > >> >
> > > > >> > Gwen
> > > > >> >
> > > > >> > On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers <dana.powers@rd.io
> >
> > > > wrote:
> > > > >> >> Hi Joel,
> > > > >> >>
> > > > >> >> I'm looking more closely at the OffsetCommitRequest
wire
> protocol
> > > > change
> > > > >> >> you mentioned below, and I cannot figure out how to
explicitly
> > > > >> construct a
> > > > >> >> request with the earlier version.  Should the api version
be
> > > > different
> > > > >> for
> > > > >> >> requests that do not include it and/or servers that
do not
> > support
> > > > the
> > > > >> >> timestamp field?  It looks like 0.8.1.1 did not include
the
> > > timestamp
> > > > >> > field
> > > > >> >> and used api version 0.  But 0.8.2-beta seems to now
require
> > > > timestamps
> > > > >> >> even when I explicitly encode OffsetCommitRequest api
version 0
> > > > (server
> > > > >> >> logs a BufferUnderflowException).
> > > > >> >>
> > > > >> >> Is this the expected server behavior?  Can you provide
any tips
> > on
> > > > how
> > > > >> >> third-party clients should manage the wire-protocol
change for
> > this
> > > > api
> > > > >> >> method (I'm working on kafka-python)?
> > > > >> >>
> > > > >> >> Thanks,
> > > > >> >>
> > > > >> >> -Dana
> > > > >> >>
> > > > >> >> On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy <
> jjkoshy.w@gmail.com
> > >
> > > > >> wrote:
> > > > >> >>
> > > > >> >>> Yes it should be backwards compatible. So for e.g.,
you should
> > be
> > > > able
> > > > >> >>> to use an 0.8.1 client with an 0.8.2 broker. In
general, you
> > > should
> > > > >> >>> not upgrade your clients until after the brokers
have been
> > > upgraded.
> > > > >> >>> However, you can point an 0.8.2 client at an 0.8.1
broker. One
> > > wire
> > > > >> >>> protocol change I'm aware of is the OffsetCommitRequest.
> There
> > > is a
> > > > >> >>> change in the OffsetCommitRequest format (KAFKA-1634)
although
> > you
> > > > can
> > > > >> >>> explicitly construct an OffsetCommitRequest with
the earlier
> > > > version.
> > > > >> >>>
> > > > >> >>> Thanks,
> > > > >> >>>
> > > > >> >>> Joel
> > > > >> >>>
> > > > >> >>> On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius
Bogoevici
> > wrote:
> > > > >> >>> > Hi Joel,
> > > > >> >>> >
> > > > >> >>> > Thanks for all the clarifications!  Just another
question on
> > > this:
> > > > >> will
> > > > >> >>> > 0.8.2 be backwards compatible with 0.8.1, just
as 0.8.1 was
> > with
> > > > 0.8?
> > > > >> >>> > Generally speaking, would there be any concerns
with using
> the
> > > > 0.8.2
> > > > >> >>> > consumer with a 0.8.1 broker, for instance?
> > > > >> >>> >
> > > > >> >>> > Marius
> > > > >> >>> >
> > > > >> >>> > On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy
<
> > > jjkoshy.w@gmail.com>
> > > > >> > wrote:
> > > > >> >>> >
> > > > >> >>> > > Inline..
> > > > >> >>> > >
> > > > >> >>> > > On Tue, Nov 18, 2014 at 04:26:04AM -0500,
Marius Bogoevici
> > > > wrote:
> > > > >> >>> > > > Hello everyone,
> > > > >> >>> > > >
> > > > >> >>> > > > I have a few questions about the
current status and
> future
> > > of
> > > > the
> > > > >> >>> Kafka
> > > > >> >>> > > > consumers.
> > > > >> >>> > > >
> > > > >> >>> > > > We have been working to adding Kafka
support in Spring
> XD
> > > [1],
> > > > >> >>> currently
> > > > >> >>> > > > using the high level consumer via
Spring Integration
> Kafka
> > > > [2].
> > > > >> We
> > > > >> >>> are
> > > > >> >>> > > > working on adding features such as:
> > > > >> >>> > > > - the ability to control offsets/replay
topics;
> > > > >> >>> > > > - the ability to control partition
allocation across
> > > multiple
> > > > >> >>> consumers;
> > > > >> >>> > > >
> > > > >> >>> > > > We are currently at version 0.8.1.1,
so using the simple
> > > > consumer
> > > > >> > is
> > > > >> >>> a
> > > > >> >>> > > > pretty straightforward choice right
now. However, in the
> > > > light of
> > > > >> > the
> > > > >> >>> > > > upcoming consumer changes for 0.8.2
and 0.9, I have a
> few
> > > > >> > questions:
> > > > >> >>> > > >
> > > > >> >>> > > > 1) With respect to the consumer redesign
for 0.9, what
> is
> > > the
> > > > >> > future
> > > > >> >>> of
> > > > >> >>> > > the
> > > > >> >>> > > > Simple Consumer and High Level Consumer?
To my best
> > > > >> understanding,
> > > > >> >>> the
> > > > >> >>> > > > existing high level consumer API
will be deprecated in
> > > favour
> > > > of
> > > > >> > the
> > > > >> >>> new
> > > > >> >>> > > > consumer API. What is the future
of the Simple Consumer,
> > in
> > > > this
> > > > >> >>> case? it
> > > > >> >>> > > > will continue to exist as a low-level
API implementing
> the
> > > > Kafka
> > > > >> >>> protocol
> > > > >> >>> > > > [3] and providing the building blocks
for the new
> > consumer,
> > > or
> > > > >> will
> > > > >> >>> it be
> > > > >> >>> > > > deprecated as well?
> > > > >> >>> > >
> > > > >> >>> > > The new consumer will subsume both use-cases
(simple and
> > > > >> high-level).
> > > > >> >>> > > You can still use the old SimpleConsumer
if you wish -
> i.e.,
> > > the
> > > > >> wire
> > > > >> >>> > > protocol for fetch and other requests
will still be
> > supported.
> > > > >> >>> > >
> > > > >> >>> > > >
> > > > >> >>> > > > 2) Regarding the new consumer: the
v0.8.2 codebase
> > contains
> > > an
> > > > >> > early
> > > > >> >>> > > > implementation of it, but since this
a feature scheduled
> > > only
> > > > for
> > > > >> >>> 0.9,
> > > > >> >>> > > what
> > > > >> >>> > > > is its status as well? Is it included
only as a future
> > > > reference
> > > > >> > and
> > > > >> >>> for
> > > > >> >>> > > > stabilizing the API?
> > > > >> >>> > >
> > > > >> >>> > > It is a WIP so you cannot really use it.
> > > > >> >>> > >
> > > > >> >>> > > > 3) Obviously, offset management is
a concern if using
> the
> > > > simple
> > > > >> >>> > > consumer,
> > > > >> >>> > > > so - wondering about the Offset Management
API as well.
> > The
> > > > Kafka
> > > > >> >>> > > protocol
> > > > >> >>> > > > document specifically indicates that
it will be fully
> > > > functional
> > > > >> in
> > > > >> >>> 0.8.2
> > > > >> >>> > > > [4] - however, a functional implementation
is already
> > > > available
> > > > >> in
> > > > >> >>> > > 0.8.1.1
> > > > >> >>> > > > (accessible via the SimpleConsumer
API but not
> documented
> > in
> > > > >> [5]).
> > > > >> >>> Again,
> > > > >> >>> > > > trying to understand the extent of
what 0.8.1.1 already
> > > > supports
> > > > >> >>> > > > (ostensibly, the offset manager support
seems to have
> been
> > > > added
> > > > >> >>> only in
> > > > >> >>> > > > 0.8.2 - please correct me if I am
wrong), and whether if
> > it
> > > is
> > > > >> >>> > > recommended
> > > > >> >>> > > > for use in production in any form
(with the caveats that
> > > > >> accompany
> > > > >> >>> the
> > > > >> >>> > > use
> > > > >> >>> > > > of ZooKeeper).
> > > > >> >>> > >
> > > > >> >>> > > In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest
> will
> > > use
> > > > >> > Kafka
> > > > >> >>> > > as the offsets storage mechanism (not
zookeeper).
> High-level
> > > > Java
> > > > >> >>> > > consumers can choose to store offsets
in ZooKeeper instead
> > by
> > > > >> setting
> > > > >> >>> > > offsets.storage=zookeeper
> > > > >> >>> > >
> > > > >> >>> > > However, if you are using the simple consumer
and wish to
> > > store
> > > > >> >>> > > offsets in ZooKeeper you will need to
commit to ZooKeeper
> > > > directly.
> > > > >> >>> > > You can use ZkUtils in the kafka.utils
package for this.
> > > > >> >>> > >
> > > > >> >>> > > If you wish to move to Kafka-based offsets
we will be
> > adding a
> > > > new
> > > > >> >>> > > OffsetsClient that can be used to commit/fetch
offsets
> > to/from
> > > > >> Kafka.
> > > > >> >>> > > This is currently not listed as a blocker
for 0.8.2 but I
> > > think
> > > > we
> > > > >> >>> > > should include it. I will update that
ticket.
> > > > >> >>> > >
> > > > >> >>> > > > 4) Trying to interpret the existing
examples in [6] and
> > the
> > > > >> > comments
> > > > >> >>> on
> > > > >> >>> > > [7]
> > > > >> >>> > > > - the version of the Offset Management
API that exists
> in
> > > > 0.8.1.1
> > > > >> > is
> > > > >> >>> > > using
> > > > >> >>> > > > ZooKeeper - whereas ZooKeeper will
be optional in 0.8.2
> -
> > to
> > > > be
> > > > >> >>> replaced
> > > > >> >>> > > by
> > > > >> >>> > > > Kafka, and phased out if possible.
To my understanding,
> > the
> > > > >> switch
> > > > >> >>> > > between
> > > > >> >>> > > > the two will be controlled by the
broker configuration
> > > (along
> > > > >> with
> > > > >> >>> other
> > > > >> >>> > > > parameters that control the performance
of offset
> queues.
> > Is
> > > > that
> > > > >> >>> > > correct?
> > > > >> >>> > >
> > > > >> >>> > > The switch is a client-side configuration.
That wiki is
> not
> > > > >> >>> > > up-to-date. The most current documentation
is available
> as a
> > > > patch
> > > > >> in
> > > > >> >>> > > https://issues.apache.org/jira/browse/KAFKA-1729
> > > > >> >>> > >
> > > > >> >>> > > > 5) Also, wondering about the timeline
of 0.8.2 -
> according
> > > to
> > > > the
> > > > >> >>> > > roadmaps
> > > > >> >>> > > > it should be released relatively
shortly. Is that
> correct?
> > > > >> >>> > >
> > > > >> >>> > > Yes - once the blockers are ironed out.
> > > > >> >>> > >
> > > > >> >>> > > >
> > > > >> >>> > > > Thanks,
> > > > >> >>> > > > Marius
> > > > >> >>> > > >
> > > > >> >>> > > > [1] http://projects.spring.io/spring-xd/
> > > > >> >>> > > > [2]
> > > > https://github.com/spring-projects/spring-integration-kafka
> > > > >> >>> > > > [3]
> > > > >> >>> > > >
> > > > >> >>> > >
> > > > >> >>>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > > > >> >>> > > > [4]
> > > > >> >>> > > >
> > > > >> >>> > >
> > > > >> >>>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
> > > > >> >>> > > > [5]
> > > > >> >
> http://kafka.apache.org/082/documentation.html#simpleconsumerapi
> > > > >> >>> > > > [6]
> > > > >> >>> > > >
> > > > >> >>> > >
> > > > >> >>>
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> > > > >> >>> > > > [7] https://issues.apache.org/jira/browse/KAFKA-1729
> > > > >> >>> > >
> > > > >> >>> > >
> > > > >> >>>
> > > > >> >>>
> > > > >>
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message