kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ross Black <ross.w.bl...@gmail.com>
Subject Re: Exactly-once semantics with compression
Date Sat, 02 Jun 2012 07:18:32 GMT
Hi Jun,

The only reason I would like the compressed messages exposed is so that I
know the boundary to be able to safely persist my state with the offset.
Is there a better way to achieve that?
In my (probably poor) example attempt to expose batch messages, the only
things you can do with a compressed message set are - get the offset, get
the serialized form, and iterate over the contained messages.

Is kafka attempting to support exactly-once semantics? If so, it would seem
that something needs to be exposed in the API to make it a bit more
explicit than having to keep track of offsets changing for individual
messages.

Thanks,
Ross



On 2 June 2012 14:19, Jun Rao <junrao@gmail.com> wrote:

> Ross,
>
> The shallow iterator is intended for efficient mirroring btw kafka
> clusters. Not sure if it's a good idea to expose it as an external api.
> Note that you can really can't do much on a compressed message set other
> than store it as raw bytes somewhere else.
>
> Thanks,
>
> Jun
>
> On Thu, May 31, 2012 at 11:32 PM, Ross Black <ross.w.black@gmail.com>
> wrote:
>
> > Hi Jun.
> >
> > I did find a way to process by batch, but it probably reaches a little
> too
> > deep into the internals of kafka?
> >
> >            FetchRequest request = new FetchRequest("topic",
> > partitionNumber, requestOffset, bufferSize);
> >            ByteBufferMessageSet messageSet =
> simpleConsumer.fetch(request);
> >            Iterator<MessageAndOffset> batchIterator =
> > messageSet.underlying().shallowIterator();
> >            while (batchIterator.hasNext()) {
> >                MessageAndOffset messageAndOffset = batchIterator.next();
> >                Message batchMessage = messageAndOffset.message();
> >                long offset = messageAndOffset.offset();
> >                Iterator<MessageAndOffset> messages =
> > CompressionUtils.decompress(batchMessage).iterator();
> >                // process the batch of messages and persist with the
> offset
> >            }
> >
> > This should work ok, but I am concerned that it is using internal kafka
> > classes.  The code has to reach into the underlying (scala)
> > ByteBufferMessageSet because shallowIterator is not exposed by the java
> > variant.  The code also has to understand that the message is potentially
> > compressed and then call CompressionUtils.
> >
> > How likely is the above approach to work with subsequent releases?
> > Is it worth exposing the concept of batches in ByteBufferMessageSet to
> make
> > it explicit?
> >
> > eg ByteBufferMessageSet.batchIterator : BatchMessage
> > where BatchMessage is a simple extension of Message that has an
> additional
> > method to allow getting a ByteBufferMessageSet (ie. wraps the call to
> > CompressionUtils).
> >
> >
> > Thoughts?
> >
> > Thanks,
> > Ross
> >
> >
> >
> > On 1 June 2012 14:51, Jun Rao <junrao@gmail.com> wrote:
> >
> > > Ross,
> > >
> > > With compression enabled, it's a bit hard to implement exact-once since
> > > offsets are only advanced after a compressed batch of messages has been
> > > consumed. So, you will have to make sure that each batch of messages
> can
> > be
> > > consumed together as a unit. The other option is to compress with a
> batch
> > > size of 1.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, May 31, 2012 at 8:05 PM, Ross Black <ross.w.black@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > Using SimpleConsumer, I get the offset of a message (from
> > > MessageAndOffset)
> > > > and persist it with my consumer data to get exactly-once semantics
> for
> > > > consumer state (as described in the kafka design docs).  If the
> > consumer
> > > > fails then it is simply a matter of starting replay of messages from
> > the
> > > > persisted index.
> > > >
> > > > When using compression, the offset from MessageAndOffset appears to
> be
> > > the
> > > > offset of the compressed batch.  e.g. For a batch of 10 messages, the
> > > > offset returned for messages 1-9 is the start of the *current* batch,
> > and
> > > > the offset for message 10 is the start of the *next* batch.
> > > >
> > > > How can I get the exactly-once semantics for consumer state?
> > > > Is there a way that I can get a batch of messages from
> SimpleConsumer?
> > > > (otherwise I have to reconstruct a batch by watching for a change in
> > the
> > > > offset between messages)
> > > >
> > > > Thanks,
> > > > Ross
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message