kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dana Powers <dana.pow...@rd.io>
Subject Re: Consumer and offset management support in 0.8.2 and 0.9
Date Mon, 05 Jan 2015 18:28:57 GMT
specifically comparing 0.8.1 --

https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
```
(1 to partitionCount).map(_ => {
  val partitionId = buffer.getInt
  val offset = buffer.getLong
  val metadata = readShortString(buffer)
  (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset,
metadata))
})
```

totrunk --

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
```
(1 to partitionCount).map(_ => {
  val partitionId = buffer.getInt
  val offset = buffer.getLong
  val timestamp = {
    val given = buffer.getLong
    if (given == -1L) now else given
  }
  val metadata = readShortString(buffer)
  (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset,
metadata, timestamp))
})
```

should the `timestamp` buffer read be wrapped in an api version check?


Dana Powers
Rdio, Inc.
dana.powers@rd.io
rdio.com/people/dpkp/

On Mon, Jan 5, 2015 at 9:49 AM, Gwen Shapira <gshapira@cloudera.com> wrote:

> Ah, I see :)
>
> The readFrom function basically tries to read two extra fields if you
> are on version 1:
>
> if (versionId == 1) {
>       groupGenerationId = buffer.getInt
>       consumerId = readShortString(buffer)
>     }
>
> The rest looks identical in version 0 and 1, and still no timestamp in
> sight...
>
> Gwen
>
> On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers <dana.powers@rd.io> wrote:
> > Hi Gwen, I am using/writing kafka-python to construct api requests and
> have
> > not dug too deeply into the server source code.  But I believe it is
> > kafka/api/OffsetCommitRequest.scala and specifically the readFrom method
> > used to decode the wire protocol.
> >
> > -Dana
> > OffsetCommitRequest has two constructors now:
> >
> > For version 0:
> >  OffsetCommitRequest(String groupId, Map<TopicPartition,
> > PartitionData> offsetData)
> >
> > And version 1:
> > OffsetCommitRequest(String groupId, int generationId, String
> > consumerId, Map<TopicPartition, PartitionData> offsetData)
> >
> > None of them seem to require timestamps... so I'm not sure where you
> > see that this is required. Can you share an example?
> >
> > Gwen
> >
> > On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers <dana.powers@rd.io> wrote:
> >> Hi Joel,
> >>
> >> I'm looking more closely at the OffsetCommitRequest wire protocol change
> >> you mentioned below, and I cannot figure out how to explicitly
> construct a
> >> request with the earlier version.  Should the api version be different
> for
> >> requests that do not include it and/or servers that do not support the
> >> timestamp field?  It looks like 0.8.1.1 did not include the timestamp
> > field
> >> and used api version 0.  But 0.8.2-beta seems to now require timestamps
> >> even when I explicitly encode OffsetCommitRequest api version 0 (server
> >> logs a BufferUnderflowException).
> >>
> >> Is this the expected server behavior?  Can you provide any tips on how
> >> third-party clients should manage the wire-protocol change for this api
> >> method (I'm working on kafka-python)?
> >>
> >> Thanks,
> >>
> >> -Dana
> >>
> >> On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy <jjkoshy.w@gmail.com>
> wrote:
> >>
> >>> Yes it should be backwards compatible. So for e.g., you should be able
> >>> to use an 0.8.1 client with an 0.8.2 broker. In general, you should
> >>> not upgrade your clients until after the brokers have been upgraded.
> >>> However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
> >>> protocol change I'm aware of is the OffsetCommitRequest.  There is a
> >>> change in the OffsetCommitRequest format (KAFKA-1634) although you can
> >>> explicitly construct an OffsetCommitRequest with the earlier version.
> >>>
> >>> Thanks,
> >>>
> >>> Joel
> >>>
> >>> On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
> >>> > Hi Joel,
> >>> >
> >>> > Thanks for all the clarifications!  Just another question on this:
> will
> >>> > 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
> >>> > Generally speaking, would there be any concerns with using the 0.8.2
> >>> > consumer with a 0.8.1 broker, for instance?
> >>> >
> >>> > Marius
> >>> >
> >>> > On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy <jjkoshy.w@gmail.com>
> > wrote:
> >>> >
> >>> > > Inline..
> >>> > >
> >>> > > On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
> >>> > > > Hello everyone,
> >>> > > >
> >>> > > > I have a few questions about the current status and future
of the
> >>> Kafka
> >>> > > > consumers.
> >>> > > >
> >>> > > > We have been working to adding Kafka support in Spring XD
[1],
> >>> currently
> >>> > > > using the high level consumer via Spring Integration Kafka
[2].
> We
> >>> are
> >>> > > > working on adding features such as:
> >>> > > > - the ability to control offsets/replay topics;
> >>> > > > - the ability to control partition allocation across multiple
> >>> consumers;
> >>> > > >
> >>> > > > We are currently at version 0.8.1.1, so using the simple
consumer
> > is
> >>> a
> >>> > > > pretty straightforward choice right now. However, in the
light of
> > the
> >>> > > > upcoming consumer changes for 0.8.2 and 0.9, I have a few
> > questions:
> >>> > > >
> >>> > > > 1) With respect to the consumer redesign for 0.9, what is
the
> > future
> >>> of
> >>> > > the
> >>> > > > Simple Consumer and High Level Consumer? To my best
> understanding,
> >>> the
> >>> > > > existing high level consumer API will be deprecated in favour
of
> > the
> >>> new
> >>> > > > consumer API. What is the future of the Simple Consumer,
in this
> >>> case? it
> >>> > > > will continue to exist as a low-level API implementing the
Kafka
> >>> protocol
> >>> > > > [3] and providing the building blocks for the new consumer,
or
> will
> >>> it be
> >>> > > > deprecated as well?
> >>> > >
> >>> > > The new consumer will subsume both use-cases (simple and
> high-level).
> >>> > > You can still use the old SimpleConsumer if you wish - i.e., the
> wire
> >>> > > protocol for fetch and other requests will still be supported.
> >>> > >
> >>> > > >
> >>> > > > 2) Regarding the new consumer: the v0.8.2 codebase contains
an
> > early
> >>> > > > implementation of it, but since this a feature scheduled
only for
> >>> 0.9,
> >>> > > what
> >>> > > > is its status as well? Is it included only as a future reference
> > and
> >>> for
> >>> > > > stabilizing the API?
> >>> > >
> >>> > > It is a WIP so you cannot really use it.
> >>> > >
> >>> > > > 3) Obviously, offset management is a concern if using the
simple
> >>> > > consumer,
> >>> > > > so - wondering about the Offset Management API as well. The
Kafka
> >>> > > protocol
> >>> > > > document specifically indicates that it will be fully functional
> in
> >>> 0.8.2
> >>> > > > [4] - however, a functional implementation is already available
> in
> >>> > > 0.8.1.1
> >>> > > > (accessible via the SimpleConsumer API but not documented
in
> [5]).
> >>> Again,
> >>> > > > trying to understand the extent of what 0.8.1.1 already supports
> >>> > > > (ostensibly, the offset manager support seems to have been
added
> >>> only in
> >>> > > > 0.8.2 - please correct me if I am wrong), and whether if
it is
> >>> > > recommended
> >>> > > > for use in production in any form (with the caveats that
> accompany
> >>> the
> >>> > > use
> >>> > > > of ZooKeeper).
> >>> > >
> >>> > > In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use
> > Kafka
> >>> > > as the offsets storage mechanism (not zookeeper). High-level Java
> >>> > > consumers can choose to store offsets in ZooKeeper instead by
> setting
> >>> > > offsets.storage=zookeeper
> >>> > >
> >>> > > However, if you are using the simple consumer and wish to store
> >>> > > offsets in ZooKeeper you will need to commit to ZooKeeper directly.
> >>> > > You can use ZkUtils in the kafka.utils package for this.
> >>> > >
> >>> > > If you wish to move to Kafka-based offsets we will be adding a
new
> >>> > > OffsetsClient that can be used to commit/fetch offsets to/from
> Kafka.
> >>> > > This is currently not listed as a blocker for 0.8.2 but I think
we
> >>> > > should include it. I will update that ticket.
> >>> > >
> >>> > > > 4) Trying to interpret the existing examples in [6] and the
> > comments
> >>> on
> >>> > > [7]
> >>> > > > - the version of the Offset Management API that exists in
0.8.1.1
> > is
> >>> > > using
> >>> > > > ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 -
to be
> >>> replaced
> >>> > > by
> >>> > > > Kafka, and phased out if possible. To my understanding, the
> switch
> >>> > > between
> >>> > > > the two will be controlled by the broker configuration (along
> with
> >>> other
> >>> > > > parameters that control the performance of offset queues.
Is that
> >>> > > correct?
> >>> > >
> >>> > > The switch is a client-side configuration. That wiki is not
> >>> > > up-to-date. The most current documentation is available as a patch
> in
> >>> > > https://issues.apache.org/jira/browse/KAFKA-1729
> >>> > >
> >>> > > > 5) Also, wondering about the timeline of 0.8.2 - according
to the
> >>> > > roadmaps
> >>> > > > it should be released relatively shortly. Is that correct?
> >>> > >
> >>> > > Yes - once the blockers are ironed out.
> >>> > >
> >>> > > >
> >>> > > > Thanks,
> >>> > > > Marius
> >>> > > >
> >>> > > > [1] http://projects.spring.io/spring-xd/
> >>> > > > [2] https://github.com/spring-projects/spring-integration-kafka
> >>> > > > [3]
> >>> > > >
> >>> > >
> >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> >>> > > > [4]
> >>> > > >
> >>> > >
> >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
> >>> > > > [5]
> > http://kafka.apache.org/082/documentation.html#simpleconsumerapi
> >>> > > > [6]
> >>> > > >
> >>> > >
> >>>
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> >>> > > > [7] https://issues.apache.org/jira/browse/KAFKA-1729
> >>> > >
> >>> > >
> >>>
> >>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message