kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Evan Chan ...@ooyala.com>
Subject Questions on consumerConnector.createMessageStreams()
Date Fri, 06 Jan 2012 08:05:04 GMT
I have to pass in a Map from the topic -> #streams, according to the
scaladoc at http://incubator.apache.org/kafka/api-docs/0.6/.
Is this the same or different than the # of partitions?  For example, let's
say that I have 10 partitions for a topic.  What partitions will the
following code fetch from?

     val consumerConnector = Consumer.create(new ConsumerConfig(props))
     val topicMessageStreams =
   consumerConnector.createMessageStreams(Predef.Map(topic -> 1))
     val kafkaStream = topicMessageStreams(topic)(0)

     override def run = try {
       for (message <- kafkaStream) {

Is there any way I can get the code above to read from all 10 partitions?
Or do I really have to create 10 separate threads for reading from 10
partitions at once?

Does each KafkaMessageStream need its own thread, or can they be shared

Let's say I want to be able to attach additional consumers dynamically to
read from the same topic, such that the consumers get the messages
It seems we have the following constraints:
- The # of partitions must be >= the max # of consumers I would want to
attach, since partitions cannot be divided amongst multiple consumers
- Each consumer must be able to consume up to ceil(# partitions / minimum #
consumers) partitions or streams (still confused).    If the consumers
don't grab all of the streams/partitions, then some partitions will not be
allocated and no messages would be read from those partitions.

Thanks for clarifying confusion.

*Evan Chan*
Senior Software Engineer |
ev@ooyala.com | (650) 996-4600
www.ooyala.com | blog <http://www.ooyala.com/blog> |

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message