kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jay Kreps <jay.kr...@gmail.com>
Subject Re: API change in consumer
Date Fri, 30 Mar 2012 21:36:26 GMT
Thanks for putting detailed thought into this!

A few quick comments, and then I will think more:
1. Could you move this to the wiki so we could evolve some of the
naming as suggestions come in?
2. KafkaMessageAndMetadataStream is a pretty unwieldy name. Can we
just call it KafkaMessageStream or KafkaStream or something like that?
3. Do we need to have two methods? Couldn't we just allow regular
expressions in the topic name? I am not sure about the details of a
TopicFilterSpec maybe you could explain that?
4. I feel the current API which you retain, where you provide a map of
topic=>thread_num is extremely unintuitive. I think we should rethink
this pattern.
5. Does the MessageAndOffset class contain also the partition? That is
needed, since offsets aren't unique. It might also be useful in some
situation to have the topic there as well because if you merge the
contents of two iterators you can process many such iterators at once
but you won't know which topic a given message came from. We would
probably need to rename this class to something else, not sure what.
MessageInfo? StreamElement? We should think on this....

-Jay

On Fri, Mar 30, 2012 at 12:21 PM, Joel Koshy <jjkoshy.w@gmail.com> wrote:
> Hi all,
>
> This is a follow-up to the email I had sent out a few days ago on the
> consumer API extension as part of KAFKA-249 - after the code review, a more
> major API change may be more suitable, so here is an overview.
>
> The new method in the consumer connector that supports wildcarding (in the
> v2 patch) returns a list of KafkaMessageAndTopicStream[T] objects.  There
> were a couple of comments on this:
>
> - It is (somewhat oddly) different from the existing API
>  (createMessageStreams which returns a map containing KafkaMessageStream[T]
>  objects)
> - We already have a MessageAndOffset class, and at some point we may want to
>  give consumers access to logical partition/offset information.
>
> So this would be an opportunity to fix the consumer API to accomodate a more
> general consumer stream and iterator API, that provide access to
> MessageAndMetadata elements, each of which contains the message + metadata
> (such as topic, offset, partition, etc.)
>
> So I have incorporated this in a new patch (which I will upload soon after I
> address all the other review comments), and I wanted to share the API
> changes here since it is a more significant change that would require users
> of the consumer and iterator to update their code.
>
> --------------------------------------------------------------------------------
>
> Proposal for the new ConsumerConnector API:
>
>  /**
>   *  Create a list of MessageStreams for each topic.
>   *
>   *  @param topicCountMap  a map of (topic, #streams) pair
>   *  @param decoder Decoder to decode each Message to type T
>   *  @return a map of (topic, list of  KafkaMessageAndMetadataStream)
> pairs.
>   *          The number of items in the list is #streams. Each stream
> supports
>   *          an iterator over message/metadata pairs.
>   */
>  def createMessageStreams[T](topicCountMap: Map[String,Int],
>                              decoder: Decoder[T] = new DefaultDecoder)
>    : Map[String,List[KafkaMessageAndMetadataStream[T]]]
>
>
>  /**
>   *  Create a list of message streams for all topics that match a given
> filter.
>   *
>   *  @param filterSpec Either a Whitelist or Blacklist TopicFilterSpec
> object.
>   *  @param numStreams Number of streams to return
>   *  @param decoder Decoder to decode each Message to type T
>   *  @return a list of KafkaMessageAndMetadataStream each of which
> provides an
>   *          iterator over message/metadata pairs over allowed topics.
>   */
>  def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec,
>                                      numStreams: Int = 1,
>                                      decoder: Decoder[T] = new
> DefaultDecoder)
>    : Seq[KafkaMessageAndMetadataStream[T]]
>
> --------------------------------------------------------------------------------
>
> The KafkaMessageAndMetadataStream[T]'s iterator is a ConsumerIterator[T]
> which is an iterator over MessageAndMetadata[T] objects:
>
> case class MessageAndMetadata[T](message: T, topic: String = "", offset:
> Long = -1L)
>
> Although the MessageAndMetadata class is simple, it also needs to be evolved
> carefully - i.e., adding fields is easy, but removing fields would
> effectively break older
> clients at compile time).  I think it would be better to avoid schemas
> and/or explicit
> versioning since that would make writing the client-side code more
> difficult.
>
> --------------------------------------------------------------------------------
>
> This means the current pattern of:
>
> for (message <- stream) {
>  // process(message)
> }
>
> will change to:
>
> for (msgAndMetadata <- stream) {
>  // processMessage(msgAndMetadata.message)
>  // can also access msgAndMetadata.offset, topic, etc. if appropriate
> }
>
> --------------------------------------------------------------------------------
>
> Would love to get any thoughts on this. Given that this is an API
> change that would require code changes for consumers, I wanted to send this
> around for comments/objections before proceeding further.
>
> Thanks,
>
> Joel

Mime
View raw message