kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joel Koshy <jjkosh...@gmail.com>
Subject Consumer API extension in KAFKA-249
Date Wed, 28 Mar 2012 01:13:50 GMT
The newest patch for KAFKA-249 adds a new method to the consumer connector
(after feedback on the first patch).  Since this is an addition to the
consumer
API, I wanted to send it around for comments/concerns.

The new method basically allows the consumer connector to subscribe to a
whitelist or blacklist of topics.

Scala API:

  /**
   *  Create a list of message streams for all topics that match a given
filter.
   *
   *  @param filterSpec TopicFilterSpec encapsulating a Java-style regex
whitelist
   *         or blacklist.
   *  @param numStreams number of KafkaMessageAndTopicStream to create
   *         or blacklist.
   *  @return a list of KafkaMessageAndTopicStream that provide iterator
over
   *           messages from allowed topics.
   */
  def createMessageStreamsByFilter[T](filterSpec: TopicFilterSpec,
                                      numStreams: Int = 1,
                                      decoder: Decoder[T] = new
DefaultDecoder)
    : Seq[KafkaMessageAndTopicStream[T]]

TopicFilterSpec can be either a Whitelist or Blacklist.

The consumer may now receive messages from multiple topics due to
wildcarding. This is why it returns a KafkaMessageAndTopicStream which
allows iteration over MessageAndTopic objects.

Example:

val stream = zkConnector.createMessageStreamsByFilter(new
Whitelist("whitetopics.*")).get(0)

for (messageAndTopic <- stream) {
  println("Message from topic %s: %s".format(messageAndTopic.topic,
messageAndTopic.message))
}

The same method (and default argument variants) is also exposed in the Java
API.

Feedback/concerns/objections?

Thanks,

Joel

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message