kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Chen <tnac...@gmail.com>
Subject Re: New consumer APIs
Date Fri, 16 May 2014 23:33:54 GMT
Hi Neha,

Yes a way that allows each partition to be committed seperately.

Couldn't remember if the new consumer allows it, but looks like it does!

Tim

On Fri, May 16, 2014 at 9:37 AM, Neha Narkhede <neha.narkhede@gmail.com> wrote:
> Tim,
>
> I'm going to ask you the same question :-)
>
> By "per stream commit", do you mean a per partition commit like this API -
>
> public OffsetMetadata commit(Map<TopicPartition, Long> offsets);
>
> This API allows the consumer to commit the specified offsets only for
> selected partitions.
>
> Thanks,
> Neha
>
>
> On Thu, May 15, 2014 at 8:42 AM, Timothy Chen <tnachen@gmail.com> wrote:
>
>> Also going to add that I know a per stream commit is a strong requirement
>> for folks I know using Kafka, and seen custom code done just to do so.
>>
>> Tim
>>
>> > On May 9, 2014, at 1:19 PM, Eric Sammer <esammer@scalingdata.com> wrote:
>> >
>> > All:
>> >
>> > I've been going over the new consumer APIs and it seems like we're
>> > squishing a lot of different concerns together into a single class. The
>> > scope of the new Consumer is kind of all over the place. Managing the
>> > lifecycle - and especially the thread safety - seems challenging.
>> > Specifically, Consumer seems to serve the following purposes:
>> > * Acts as a holder of subscription info (e.g. subscribe()).
>> > * Acts as a stream (e.g. poll(), seek()).
>> >
>> > I definitely think we want these to be separate. It's pretty common to
>> have
>> > a consumer process that connects to the broker, creates N consumer
>> threads,
>> > each of which working on a single stream (which could be composed of some
>> > number of partitions). In this scenario, you *really* want to explicitly
>> > control durability (e.g. commit()s) on a per-stream basis. You also have
>> > different lifecycle semantics and thread safety concerns at the stream
>> > level versus the global level. Is there a reason the API doesn't look
>> more
>> > like:
>> >
>> > // Thread safe, owns the multiplexed connection
>> > Consumer:
>> >  def subscribe(topic: String, streams: Int): Set[Stream]
>> >  def close() // Release everything
>> >
>> > // Not at all thread safe, no synchronization.
>> > Stream:
>> >  def commit() // Really important this be here and not on Consumer.
>> >  def seek(...)
>> >  def poll(duration: Long, unit: TimeUnit): List[MessageThingie]
>> >  def close() // Release these partitions
>> >  ...
>> >
>> > I think this also significantly reduces the complexity of the Consumer
>> API
>> > and lets each thread in a consumer process handle stream lifecycle
>> > appropriately. Since the connection is multiplexed and things could get
>> > rebalanced, just toss an exception if the streams become invalid,
>> forcing a
>> > resubscribe. That way we don't have crazy state logic.
>> >
>> > I'm sure I'm missing something, but I wanted to toss this out there for
>> > folks to poke at.
>> > (p.s. I *really* want per-stream commit baked into the API.)
>> > --
>> > E. Sammer
>> > CTO - ScalingData
>>

Mime
View raw message