kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dana Powers <dana.pow...@rd.io>
Subject Re: Consumer and offset management support in 0.8.2 and 0.9
Date Mon, 05 Jan 2015 20:21:02 GMT
ok, opened KAFKA-1841 .  KAFKA-1634 also related.

-Dana

On Mon, Jan 5, 2015 at 10:55 AM, Gwen Shapira <gshapira@cloudera.com> wrote:

> Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
> part of the Map changed, which will modify the wire protocol.
>
> This is actually not handled in the Java client either. It will send
> the timestamp no matter which version is used.
>
> This looks like a bug and I'd even mark it as blocker for 0.8.2 since
> it may prevent rolling upgrades.
>
> Are you opening the JIRA?
>
> Gwen
>
> On Mon, Jan 5, 2015 at 10:28 AM, Dana Powers <dana.powers@rd.io> wrote:
> > specifically comparing 0.8.1 --
> >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
> > ```
> > (1 to partitionCount).map(_ => {
> >   val partitionId = buffer.getInt
> >   val offset = buffer.getLong
> >   val metadata = readShortString(buffer)
> >   (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset,
> > metadata))
> > })
> > ```
> >
> > totrunk --
> >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
> > ```
> > (1 to partitionCount).map(_ => {
> >   val partitionId = buffer.getInt
> >   val offset = buffer.getLong
> >   val timestamp = {
> >     val given = buffer.getLong
> >     if (given == -1L) now else given
> >   }
> >   val metadata = readShortString(buffer)
> >   (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset,
> > metadata, timestamp))
> > })
> > ```
> >
> > should the `timestamp` buffer read be wrapped in an api version check?
> >
> >
> > Dana Powers
> > Rdio, Inc.
> > dana.powers@rd.io
> > rdio.com/people/dpkp/
> >
> > On Mon, Jan 5, 2015 at 9:49 AM, Gwen Shapira <gshapira@cloudera.com>
> wrote:
> >
> >> Ah, I see :)
> >>
> >> The readFrom function basically tries to read two extra fields if you
> >> are on version 1:
> >>
> >> if (versionId == 1) {
> >>       groupGenerationId = buffer.getInt
> >>       consumerId = readShortString(buffer)
> >>     }
> >>
> >> The rest looks identical in version 0 and 1, and still no timestamp in
> >> sight...
> >>
> >> Gwen
> >>
> >> On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers <dana.powers@rd.io> wrote:
> >> > Hi Gwen, I am using/writing kafka-python to construct api requests and
> >> have
> >> > not dug too deeply into the server source code.  But I believe it is
> >> > kafka/api/OffsetCommitRequest.scala and specifically the readFrom
> method
> >> > used to decode the wire protocol.
> >> >
> >> > -Dana
> >> > OffsetCommitRequest has two constructors now:
> >> >
> >> > For version 0:
> >> >  OffsetCommitRequest(String groupId, Map<TopicPartition,
> >> > PartitionData> offsetData)
> >> >
> >> > And version 1:
> >> > OffsetCommitRequest(String groupId, int generationId, String
> >> > consumerId, Map<TopicPartition, PartitionData> offsetData)
> >> >
> >> > None of them seem to require timestamps... so I'm not sure where you
> >> > see that this is required. Can you share an example?
> >> >
> >> > Gwen
> >> >
> >> > On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers <dana.powers@rd.io>
> wrote:
> >> >> Hi Joel,
> >> >>
> >> >> I'm looking more closely at the OffsetCommitRequest wire protocol
> change
> >> >> you mentioned below, and I cannot figure out how to explicitly
> >> construct a
> >> >> request with the earlier version.  Should the api version be
> different
> >> for
> >> >> requests that do not include it and/or servers that do not support
> the
> >> >> timestamp field?  It looks like 0.8.1.1 did not include the timestamp
> >> > field
> >> >> and used api version 0.  But 0.8.2-beta seems to now require
> timestamps
> >> >> even when I explicitly encode OffsetCommitRequest api version 0
> (server
> >> >> logs a BufferUnderflowException).
> >> >>
> >> >> Is this the expected server behavior?  Can you provide any tips on
> how
> >> >> third-party clients should manage the wire-protocol change for this
> api
> >> >> method (I'm working on kafka-python)?
> >> >>
> >> >> Thanks,
> >> >>
> >> >> -Dana
> >> >>
> >> >> On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy <jjkoshy.w@gmail.com>
> >> wrote:
> >> >>
> >> >>> Yes it should be backwards compatible. So for e.g., you should
be
> able
> >> >>> to use an 0.8.1 client with an 0.8.2 broker. In general, you should
> >> >>> not upgrade your clients until after the brokers have been upgraded.
> >> >>> However, you can point an 0.8.2 client at an 0.8.1 broker. One
wire
> >> >>> protocol change I'm aware of is the OffsetCommitRequest.  There
is a
> >> >>> change in the OffsetCommitRequest format (KAFKA-1634) although
you
> can
> >> >>> explicitly construct an OffsetCommitRequest with the earlier
> version.
> >> >>>
> >> >>> Thanks,
> >> >>>
> >> >>> Joel
> >> >>>
> >> >>> On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
> >> >>> > Hi Joel,
> >> >>> >
> >> >>> > Thanks for all the clarifications!  Just another question
on this:
> >> will
> >> >>> > 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was
with
> 0.8?
> >> >>> > Generally speaking, would there be any concerns with using
the
> 0.8.2
> >> >>> > consumer with a 0.8.1 broker, for instance?
> >> >>> >
> >> >>> > Marius
> >> >>> >
> >> >>> > On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy <jjkoshy.w@gmail.com>
> >> > wrote:
> >> >>> >
> >> >>> > > Inline..
> >> >>> > >
> >> >>> > > On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici
> wrote:
> >> >>> > > > Hello everyone,
> >> >>> > > >
> >> >>> > > > I have a few questions about the current status
and future of
> the
> >> >>> Kafka
> >> >>> > > > consumers.
> >> >>> > > >
> >> >>> > > > We have been working to adding Kafka support in
Spring XD [1],
> >> >>> currently
> >> >>> > > > using the high level consumer via Spring Integration
Kafka
> [2].
> >> We
> >> >>> are
> >> >>> > > > working on adding features such as:
> >> >>> > > > - the ability to control offsets/replay topics;
> >> >>> > > > - the ability to control partition allocation across
multiple
> >> >>> consumers;
> >> >>> > > >
> >> >>> > > > We are currently at version 0.8.1.1, so using the
simple
> consumer
> >> > is
> >> >>> a
> >> >>> > > > pretty straightforward choice right now. However,
in the
> light of
> >> > the
> >> >>> > > > upcoming consumer changes for 0.8.2 and 0.9, I have
a few
> >> > questions:
> >> >>> > > >
> >> >>> > > > 1) With respect to the consumer redesign for 0.9,
what is the
> >> > future
> >> >>> of
> >> >>> > > the
> >> >>> > > > Simple Consumer and High Level Consumer? To my best
> >> understanding,
> >> >>> the
> >> >>> > > > existing high level consumer API will be deprecated
in favour
> of
> >> > the
> >> >>> new
> >> >>> > > > consumer API. What is the future of the Simple Consumer,
in
> this
> >> >>> case? it
> >> >>> > > > will continue to exist as a low-level API implementing
the
> Kafka
> >> >>> protocol
> >> >>> > > > [3] and providing the building blocks for the new
consumer, or
> >> will
> >> >>> it be
> >> >>> > > > deprecated as well?
> >> >>> > >
> >> >>> > > The new consumer will subsume both use-cases (simple
and
> >> high-level).
> >> >>> > > You can still use the old SimpleConsumer if you wish
- i.e., the
> >> wire
> >> >>> > > protocol for fetch and other requests will still be supported.
> >> >>> > >
> >> >>> > > >
> >> >>> > > > 2) Regarding the new consumer: the v0.8.2 codebase
contains an
> >> > early
> >> >>> > > > implementation of it, but since this a feature scheduled
only
> for
> >> >>> 0.9,
> >> >>> > > what
> >> >>> > > > is its status as well? Is it included only as a
future
> reference
> >> > and
> >> >>> for
> >> >>> > > > stabilizing the API?
> >> >>> > >
> >> >>> > > It is a WIP so you cannot really use it.
> >> >>> > >
> >> >>> > > > 3) Obviously, offset management is a concern if
using the
> simple
> >> >>> > > consumer,
> >> >>> > > > so - wondering about the Offset Management API as
well. The
> Kafka
> >> >>> > > protocol
> >> >>> > > > document specifically indicates that it will be
fully
> functional
> >> in
> >> >>> 0.8.2
> >> >>> > > > [4] - however, a functional implementation is already
> available
> >> in
> >> >>> > > 0.8.1.1
> >> >>> > > > (accessible via the SimpleConsumer API but not documented
in
> >> [5]).
> >> >>> Again,
> >> >>> > > > trying to understand the extent of what 0.8.1.1
already
> supports
> >> >>> > > > (ostensibly, the offset manager support seems to
have been
> added
> >> >>> only in
> >> >>> > > > 0.8.2 - please correct me if I am wrong), and whether
if it is
> >> >>> > > recommended
> >> >>> > > > for use in production in any form (with the caveats
that
> >> accompany
> >> >>> the
> >> >>> > > use
> >> >>> > > > of ZooKeeper).
> >> >>> > >
> >> >>> > > In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest
will use
> >> > Kafka
> >> >>> > > as the offsets storage mechanism (not zookeeper). High-level
> Java
> >> >>> > > consumers can choose to store offsets in ZooKeeper instead
by
> >> setting
> >> >>> > > offsets.storage=zookeeper
> >> >>> > >
> >> >>> > > However, if you are using the simple consumer and wish
to store
> >> >>> > > offsets in ZooKeeper you will need to commit to ZooKeeper
> directly.
> >> >>> > > You can use ZkUtils in the kafka.utils package for this.
> >> >>> > >
> >> >>> > > If you wish to move to Kafka-based offsets we will be
adding a
> new
> >> >>> > > OffsetsClient that can be used to commit/fetch offsets
to/from
> >> Kafka.
> >> >>> > > This is currently not listed as a blocker for 0.8.2 but
I think
> we
> >> >>> > > should include it. I will update that ticket.
> >> >>> > >
> >> >>> > > > 4) Trying to interpret the existing examples in
[6] and the
> >> > comments
> >> >>> on
> >> >>> > > [7]
> >> >>> > > > - the version of the Offset Management API that
exists in
> 0.8.1.1
> >> > is
> >> >>> > > using
> >> >>> > > > ZooKeeper - whereas ZooKeeper will be optional in
0.8.2 - to
> be
> >> >>> replaced
> >> >>> > > by
> >> >>> > > > Kafka, and phased out if possible. To my understanding,
the
> >> switch
> >> >>> > > between
> >> >>> > > > the two will be controlled by the broker configuration
(along
> >> with
> >> >>> other
> >> >>> > > > parameters that control the performance of offset
queues. Is
> that
> >> >>> > > correct?
> >> >>> > >
> >> >>> > > The switch is a client-side configuration. That wiki
is not
> >> >>> > > up-to-date. The most current documentation is available
as a
> patch
> >> in
> >> >>> > > https://issues.apache.org/jira/browse/KAFKA-1729
> >> >>> > >
> >> >>> > > > 5) Also, wondering about the timeline of 0.8.2 -
according to
> the
> >> >>> > > roadmaps
> >> >>> > > > it should be released relatively shortly. Is that
correct?
> >> >>> > >
> >> >>> > > Yes - once the blockers are ironed out.
> >> >>> > >
> >> >>> > > >
> >> >>> > > > Thanks,
> >> >>> > > > Marius
> >> >>> > > >
> >> >>> > > > [1] http://projects.spring.io/spring-xd/
> >> >>> > > > [2]
> https://github.com/spring-projects/spring-integration-kafka
> >> >>> > > > [3]
> >> >>> > > >
> >> >>> > >
> >> >>>
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> >> >>> > > > [4]
> >> >>> > > >
> >> >>> > >
> >> >>>
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
> >> >>> > > > [5]
> >> > http://kafka.apache.org/082/documentation.html#simpleconsumerapi
> >> >>> > > > [6]
> >> >>> > > >
> >> >>> > >
> >> >>>
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> >> >>> > > > [7] https://issues.apache.org/jira/browse/KAFKA-1729
> >> >>> > >
> >> >>> > >
> >> >>>
> >> >>>
> >>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message