kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Exactly-once semantics with compression
Date Sat, 02 Jun 2012 16:19:13 GMT
You can get the same offset using deep iterator. Whenever the offset
increases, you know you have crossed the compressed unit.

Jun

On Sat, Jun 2, 2012 at 12:18 AM, Ross Black <ross.w.black@gmail.com> wrote:

> Hi Jun,
>
> The only reason I would like the compressed messages exposed is so that I
> know the boundary to be able to safely persist my state with the offset.
> Is there a better way to achieve that?
> In my (probably poor) example attempt to expose batch messages, the only
> things you can do with a compressed message set are - get the offset, get
> the serialized form, and iterate over the contained messages.
>
> Is kafka attempting to support exactly-once semantics? If so, it would seem
> that something needs to be exposed in the API to make it a bit more
> explicit than having to keep track of offsets changing for individual
> messages.
>
> Thanks,
> Ross
>
>
>
> On 2 June 2012 14:19, Jun Rao <junrao@gmail.com> wrote:
>
> > Ross,
> >
> > The shallow iterator is intended for efficient mirroring btw kafka
> > clusters. Not sure if it's a good idea to expose it as an external api.
> > Note that you can really can't do much on a compressed message set other
> > than store it as raw bytes somewhere else.
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, May 31, 2012 at 11:32 PM, Ross Black <ross.w.black@gmail.com>
> > wrote:
> >
> > > Hi Jun.
> > >
> > > I did find a way to process by batch, but it probably reaches a little
> > too
> > > deep into the internals of kafka?
> > >
> > >            FetchRequest request = new FetchRequest("topic",
> > > partitionNumber, requestOffset, bufferSize);
> > >            ByteBufferMessageSet messageSet =
> > simpleConsumer.fetch(request);
> > >            Iterator<MessageAndOffset> batchIterator =
> > > messageSet.underlying().shallowIterator();
> > >            while (batchIterator.hasNext()) {
> > >                MessageAndOffset messageAndOffset =
> batchIterator.next();
> > >                Message batchMessage = messageAndOffset.message();
> > >                long offset = messageAndOffset.offset();
> > >                Iterator<MessageAndOffset> messages =
> > > CompressionUtils.decompress(batchMessage).iterator();
> > >                // process the batch of messages and persist with the
> > offset
> > >            }
> > >
> > > This should work ok, but I am concerned that it is using internal kafka
> > > classes.  The code has to reach into the underlying (scala)
> > > ByteBufferMessageSet because shallowIterator is not exposed by the java
> > > variant.  The code also has to understand that the message is
> potentially
> > > compressed and then call CompressionUtils.
> > >
> > > How likely is the above approach to work with subsequent releases?
> > > Is it worth exposing the concept of batches in ByteBufferMessageSet to
> > make
> > > it explicit?
> > >
> > > eg ByteBufferMessageSet.batchIterator : BatchMessage
> > > where BatchMessage is a simple extension of Message that has an
> > additional
> > > method to allow getting a ByteBufferMessageSet (ie. wraps the call to
> > > CompressionUtils).
> > >
> > >
> > > Thoughts?
> > >
> > > Thanks,
> > > Ross
> > >
> > >
> > >
> > > On 1 June 2012 14:51, Jun Rao <junrao@gmail.com> wrote:
> > >
> > > > Ross,
> > > >
> > > > With compression enabled, it's a bit hard to implement exact-once
> since
> > > > offsets are only advanced after a compressed batch of messages has
> been
> > > > consumed. So, you will have to make sure that each batch of messages
> > can
> > > be
> > > > consumed together as a unit. The other option is to compress with a
> > batch
> > > > size of 1.
> > > >
> > > > Thanks,
> > > >
> > > > Jun
> > > >
> > > > On Thu, May 31, 2012 at 8:05 PM, Ross Black <ross.w.black@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > Using SimpleConsumer, I get the offset of a message (from
> > > > MessageAndOffset)
> > > > > and persist it with my consumer data to get exactly-once semantics
> > for
> > > > > consumer state (as described in the kafka design docs).  If the
> > > consumer
> > > > > fails then it is simply a matter of starting replay of messages
> from
> > > the
> > > > > persisted index.
> > > > >
> > > > > When using compression, the offset from MessageAndOffset appears
to
> > be
> > > > the
> > > > > offset of the compressed batch.  e.g. For a batch of 10 messages,
> the
> > > > > offset returned for messages 1-9 is the start of the *current*
> batch,
> > > and
> > > > > the offset for message 10 is the start of the *next* batch.
> > > > >
> > > > > How can I get the exactly-once semantics for consumer state?
> > > > > Is there a way that I can get a batch of messages from
> > SimpleConsumer?
> > > > > (otherwise I have to reconstruct a batch by watching for a change
> in
> > > the
> > > > > offset between messages)
> > > > >
> > > > > Thanks,
> > > > > Ross
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message