kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jay Kreps <jay.kr...@gmail.com>
Subject Re: New consumer APIs
Date Sat, 17 May 2014 00:18:06 GMT
Hey Eric,

Yeah this is more similar to what we currently have but with a richer api
then a simple Iterator.

I think the question is how the poll() on the various streams translates
into the ultimate poll that we need to do against the individual socket

Some of the things that make the current api not ideal are the following:
1. Currently we have fetcher threads that do network I/O and then blocking
queues that feed iterators (the equivalent of the Stream) in your proposal.
However it is a bit unfortunate to have the client dictate a particular
threading model. One thing we liked about the current proposal was that it
is all one thread. It only does I/O when you call poll(). If the user then
wants to feed these into queues to go to thread pools that is fine but not
required. In a sense the client is sort of a lower level building block--I
think there are actually a couple of higher level APIs that could be built
on top of this to (such as the Java 8 Stream stuff or something that
manages a thread pool of processors for you). We thought about some of
these higher-level APIs, and though they are definitely more convenient for
certain uses they are not as general.
2. There are also a ton of gotchas in terms of cleanly re-assigning
partitions. You need some "safe point" at which the user isn't processing
any more. In the current API when the user calls poll() the meaning of that
call is that all previous messages have been processed. Hence you can
commit offsets (if using autocommit) or reassign partitions transparently.
This gets a bit more complex if there are many polls() on different time
windows (though perhaps still possible).
3. We would have to figure out how unioning two streams would work. You
need to have a way of polling over all the streams for things that consume
multiple inputs.

Not sure how much of that makes sense...


On Fri, May 9, 2014 at 1:19 PM, Eric Sammer <esammer@scalingdata.com> wrote:

> All:
> I've been going over the new consumer APIs and it seems like we're
> squishing a lot of different concerns together into a single class. The
> scope of the new Consumer is kind of all over the place. Managing the
> lifecycle - and especially the thread safety - seems challenging.
> Specifically, Consumer seems to serve the following purposes:
> * Acts as a holder of subscription info (e.g. subscribe()).
> * Acts as a stream (e.g. poll(), seek()).
> I definitely think we want these to be separate. It's pretty common to have
> a consumer process that connects to the broker, creates N consumer threads,
> each of which working on a single stream (which could be composed of some
> number of partitions). In this scenario, you *really* want to explicitly
> control durability (e.g. commit()s) on a per-stream basis. You also have
> different lifecycle semantics and thread safety concerns at the stream
> level versus the global level. Is there a reason the API doesn't look more
> like:
> // Thread safe, owns the multiplexed connection
> Consumer:
>   def subscribe(topic: String, streams: Int): Set[Stream]
>   def close() // Release everything
> // Not at all thread safe, no synchronization.
> Stream:
>   def commit() // Really important this be here and not on Consumer.
>   def seek(...)
>   def poll(duration: Long, unit: TimeUnit): List[MessageThingie]
>   def close() // Release these partitions
>   ...
> I think this also significantly reduces the complexity of the Consumer API
> and lets each thread in a consumer process handle stream lifecycle
> appropriately. Since the connection is multiplexed and things could get
> rebalanced, just toss an exception if the streams become invalid, forcing a
> resubscribe. That way we don't have crazy state logic.
> I'm sure I'm missing something, but I wanted to toss this out there for
> folks to poke at.
> (p.s. I *really* want per-stream commit baked into the API.)
> --
> E. Sammer
> CTO - ScalingData

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message