kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Exactly-once semantics with compression
Date Sun, 03 Jun 2012 16:15:29 GMT
Since one could do everything on the producer side using Producer, it will
likely be the only public api for the producer. On the consumer side, since
the high level consumer can't do everything, we may need to support
SimpleConsumer as a public api, although the exact api could change in 0.8.

Thanks,

Jun

On Sun, Jun 3, 2012 at 5:00 AM, Ross Black <ross.w.black@gmail.com> wrote:

> Hi Jun,
>
> Thanks.  I will stick with using the deep iterator then to avoid any
> internal changes.
>
> Are you able to comment on
>
> http://mail-archives.apache.org/mod_mbox/incubator-kafka-users/201205.mbox/%3CCAM%2BbZhhjGSDuR9_4-rgbTx3tZ4B%2BHscjX%2B6STXp9kLZUVnj0PQ%40mail.gmail.com%3E
> ?
>
> In particular I just wanted to check whether SyncProducer, AsyncProducer,
> and SimpleConsumer and considered part of the "public" API so that they do
> not disappear?
>
> Thanks,
> Ross
>
>
> On 3 June 2012 02:19, Jun Rao <junrao@gmail.com> wrote:
>
> > You can get the same offset using deep iterator. Whenever the offset
> > increases, you know you have crossed the compressed unit.
> >
> > Jun
> >
> > On Sat, Jun 2, 2012 at 12:18 AM, Ross Black <ross.w.black@gmail.com>
> > wrote:
> >
> > > Hi Jun,
> > >
> > > The only reason I would like the compressed messages exposed is so
> that I
> > > know the boundary to be able to safely persist my state with the
> offset.
> > > Is there a better way to achieve that?
> > > In my (probably poor) example attempt to expose batch messages, the
> only
> > > things you can do with a compressed message set are - get the offset,
> get
> > > the serialized form, and iterate over the contained messages.
> > >
> > > Is kafka attempting to support exactly-once semantics? If so, it would
> > seem
> > > that something needs to be exposed in the API to make it a bit more
> > > explicit than having to keep track of offsets changing for individual
> > > messages.
> > >
> > > Thanks,
> > > Ross
> > >
> > >
> > >
> > > On 2 June 2012 14:19, Jun Rao <junrao@gmail.com> wrote:
> > >
> > > > Ross,
> > > >
> > > > The shallow iterator is intended for efficient mirroring btw kafka
> > > > clusters. Not sure if it's a good idea to expose it as an external
> api.
> > > > Note that you can really can't do much on a compressed message set
> > other
> > > > than store it as raw bytes somewhere else.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, May 31, 2012 at 11:32 PM, Ross Black <ross.w.black@gmail.com
> >
> > > > wrote:
> > > >
> > > > > Hi Jun.
> > > > >
> > > > > I did find a way to process by batch, but it probably reaches a
> > little
> > > > too
> > > > > deep into the internals of kafka?
> > > > >
> > > > >            FetchRequest request = new FetchRequest("topic",
> > > > > partitionNumber, requestOffset, bufferSize);
> > > > >            ByteBufferMessageSet messageSet =
> > > > simpleConsumer.fetch(request);
> > > > >            Iterator<MessageAndOffset> batchIterator =
> > > > > messageSet.underlying().shallowIterator();
> > > > >            while (batchIterator.hasNext()) {
> > > > >                MessageAndOffset messageAndOffset =
> > > batchIterator.next();
> > > > >                Message batchMessage = messageAndOffset.message();
> > > > >                long offset = messageAndOffset.offset();
> > > > >                Iterator<MessageAndOffset> messages =
> > > > > CompressionUtils.decompress(batchMessage).iterator();
> > > > >                // process the batch of messages and persist with
> the
> > > > offset
> > > > >            }
> > > > >
> > > > > This should work ok, but I am concerned that it is using internal
> > kafka
> > > > > classes.  The code has to reach into the underlying (scala)
> > > > > ByteBufferMessageSet because shallowIterator is not exposed by the
> > java
> > > > > variant.  The code also has to understand that the message is
> > > potentially
> > > > > compressed and then call CompressionUtils.
> > > > >
> > > > > How likely is the above approach to work with subsequent releases?
> > > > > Is it worth exposing the concept of batches in ByteBufferMessageSet
> > to
> > > > make
> > > > > it explicit?
> > > > >
> > > > > eg ByteBufferMessageSet.batchIterator : BatchMessage
> > > > > where BatchMessage is a simple extension of Message that has an
> > > > additional
> > > > > method to allow getting a ByteBufferMessageSet (ie. wraps the call
> to
> > > > > CompressionUtils).
> > > > >
> > > > >
> > > > > Thoughts?
> > > > >
> > > > > Thanks,
> > > > > Ross
> > > > >
> > > > >
> > > > >
> > > > > On 1 June 2012 14:51, Jun Rao <junrao@gmail.com> wrote:
> > > > >
> > > > > > Ross,
> > > > > >
> > > > > > With compression enabled, it's a bit hard to implement exact-once
> > > since
> > > > > > offsets are only advanced after a compressed batch of messages
> has
> > > been
> > > > > > consumed. So, you will have to make sure that each batch of
> > messages
> > > > can
> > > > > be
> > > > > > consumed together as a unit. The other option is to compress
> with a
> > > > batch
> > > > > > size of 1.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Thu, May 31, 2012 at 8:05 PM, Ross Black <
> > ross.w.black@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > Using SimpleConsumer, I get the offset of a message (from
> > > > > > MessageAndOffset)
> > > > > > > and persist it with my consumer data to get exactly-once
> > semantics
> > > > for
> > > > > > > consumer state (as described in the kafka design docs).
 If the
> > > > > consumer
> > > > > > > fails then it is simply a matter of starting replay of
messages
> > > from
> > > > > the
> > > > > > > persisted index.
> > > > > > >
> > > > > > > When using compression, the offset from MessageAndOffset
> appears
> > to
> > > > be
> > > > > > the
> > > > > > > offset of the compressed batch.  e.g. For a batch of 10
> messages,
> > > the
> > > > > > > offset returned for messages 1-9 is the start of the *current*
> > > batch,
> > > > > and
> > > > > > > the offset for message 10 is the start of the *next* batch.
> > > > > > >
> > > > > > > How can I get the exactly-once semantics for consumer state?
> > > > > > > Is there a way that I can get a batch of messages from
> > > > SimpleConsumer?
> > > > > > > (otherwise I have to reconstruct a batch by watching for
a
> change
> > > in
> > > > > the
> > > > > > > offset between messages)
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Ross
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message