kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eric Sammer <esam...@scalingdata.com>
Subject New consumer APIs
Date Fri, 09 May 2014 20:19:33 GMT

I've been going over the new consumer APIs and it seems like we're
squishing a lot of different concerns together into a single class. The
scope of the new Consumer is kind of all over the place. Managing the
lifecycle - and especially the thread safety - seems challenging.
Specifically, Consumer seems to serve the following purposes:
* Acts as a holder of subscription info (e.g. subscribe()).
* Acts as a stream (e.g. poll(), seek()).

I definitely think we want these to be separate. It's pretty common to have
a consumer process that connects to the broker, creates N consumer threads,
each of which working on a single stream (which could be composed of some
number of partitions). In this scenario, you *really* want to explicitly
control durability (e.g. commit()s) on a per-stream basis. You also have
different lifecycle semantics and thread safety concerns at the stream
level versus the global level. Is there a reason the API doesn't look more

// Thread safe, owns the multiplexed connection
  def subscribe(topic: String, streams: Int): Set[Stream]
  def close() // Release everything

// Not at all thread safe, no synchronization.
  def commit() // Really important this be here and not on Consumer.
  def seek(...)
  def poll(duration: Long, unit: TimeUnit): List[MessageThingie]
  def close() // Release these partitions

I think this also significantly reduces the complexity of the Consumer API
and lets each thread in a consumer process handle stream lifecycle
appropriately. Since the connection is multiplexed and things could get
rebalanced, just toss an exception if the streams become invalid, forcing a
resubscribe. That way we don't have crazy state logic.

I'm sure I'm missing something, but I wanted to toss this out there for
folks to poke at.
(p.s. I *really* want per-stream commit baked into the API.)
E. Sammer
CTO - ScalingData

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message