kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: New consumer APIs
Date Sat, 17 May 2014 15:34:16 GMT
Eric,

With the new proposal, it seems what you want can be achieved by just
instantiating N Consumer instances. Then, you can wrap each in a thread and
call poll() in it. Will that work for you?

Thanks

Jun


On Fri, May 16, 2014 at 4:05 PM, Eric Sammer <esammer@scalingdata.com>wrote:

> Neha:
>
> Here's the basic pseudo code of the process acting as the Kafka consumer:
>
> executor = Executors.newFixedThreadPool(numberOfThreads)
> consumer = // get a handle to the broker.
> mytopicStreams = consumer.getStreams({ "mytopic" => numberOfThreads
> }).get("mytopic")
>
> for (stream : mytopicStreams) {
>   executor.submit(() => {
>     i = 0;
>     for (message : stream) {
>       writeMessageToSomething(message);
>       i++
>       if (i % 1000) {
>         commitSomething()
>         // I understand I can get dupes as a result of this.
>         stream.commit()
>       }
>     }
>   })
> }
>
> You get the idea. I want to be able to indicate that thread N (which has
> stream N which is made up of partitions a..z) is in a reasonable state to
> commit; that the messages it has consumed are "on disk." Specifically, I
> don't want to have to synchronize all threads when I commit, nor do I want
> to enumerate the partitions in each stream to commit them individually.
> From a library perspective, it's pretty obvious that the stream should
> manage the offsets for each underlying partition.
>
>
>
> On Thu, May 15, 2014 at 3:09 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > >> (p.s. I *really* want per-stream commit baked into the API.)
> >
> > Assuming that you mean being able to control commit() per partition, then
> > yes. This is included. You can see some code
> > examples<
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/org/apache/kafka/clients/consumer/KafkaConsumer.html
> > >here
> > to get a better idea of how that can be used.
> >
> > However, I'd also like to understand your use case better. Are you trying
> > to use the consumer for group management (auto rebalancing) and at the
> same
> > time, control per partition commit()? In general, before proposing any
> > changes, I'd like to understand what you are hoping to achieve with the
> > consumer APIs.
> >
> > Thanks,
> > Neha
> >
> >
> > On Fri, May 9, 2014 at 1:19 PM, Eric Sammer <esammer@scalingdata.com>
> > wrote:
> >
> > > All:
> > >
> > > I've been going over the new consumer APIs and it seems like we're
> > > squishing a lot of different concerns together into a single class. The
> > > scope of the new Consumer is kind of all over the place. Managing the
> > > lifecycle - and especially the thread safety - seems challenging.
> > > Specifically, Consumer seems to serve the following purposes:
> > > * Acts as a holder of subscription info (e.g. subscribe()).
> > > * Acts as a stream (e.g. poll(), seek()).
> > >
> > > I definitely think we want these to be separate. It's pretty common to
> > have
> > > a consumer process that connects to the broker, creates N consumer
> > threads,
> > > each of which working on a single stream (which could be composed of
> some
> > > number of partitions). In this scenario, you *really* want to
> explicitly
> > > control durability (e.g. commit()s) on a per-stream basis. You also
> have
> > > different lifecycle semantics and thread safety concerns at the stream
> > > level versus the global level. Is there a reason the API doesn't look
> > more
> > > like:
> > >
> > > // Thread safe, owns the multiplexed connection
> > > Consumer:
> > >   def subscribe(topic: String, streams: Int): Set[Stream]
> > >   def close() // Release everything
> > >
> > > // Not at all thread safe, no synchronization.
> > > Stream:
> > >   def commit() // Really important this be here and not on Consumer.
> > >   def seek(...)
> > >   def poll(duration: Long, unit: TimeUnit): List[MessageThingie]
> > >   def close() // Release these partitions
> > >   ...
> > >
> > > I think this also significantly reduces the complexity of the Consumer
> > API
> > > and lets each thread in a consumer process handle stream lifecycle
> > > appropriately. Since the connection is multiplexed and things could get
> > > rebalanced, just toss an exception if the streams become invalid,
> > forcing a
> > > resubscribe. That way we don't have crazy state logic.
> > >
> > > I'm sure I'm missing something, but I wanted to toss this out there for
> > > folks to poke at.
> > > (p.s. I *really* want per-stream commit baked into the API.)
> > > --
> > > E. Sammer
> > > CTO - ScalingData
> > >
> >
>
>
>
> --
> E. Sammer
> CTO - ScalingData
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message