kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Neha Narkhede <neha.narkh...@gmail.com>
Subject Re: Questions on consumerConnector.createMessageStreams()
Date Fri, 06 Jan 2012 19:52:46 GMT
Evan,

>>  Could this be clearly documented in the ScalaDocs and other docs? It would be very
helpful.

Certainly. Filed https://issues.apache.org/jira/browse/KAFKA-243 to track this.

Thanks,
Neha

On Fri, Jan 6, 2012 at 10:30 AM, Jun Rao <junrao@gmail.com> wrote:
> Evan,
>
> We can add this in our wiki, probably under a new section "Details of Kafka
> components". We can put the compression and the mirroring pages under that
> section and add a page for consumer. Would you be interested in starting
> the consumer page?
>
> Thanks,
>
> Jun
>
> On Fri, Jan 6, 2012 at 9:55 AM, Evan Chan <ev@ooyala.com> wrote:
>
>> Jun,
>>
>> Thanks.  Could this be clearly documented in the ScalaDocs and other docs?
>>  It would be very helpful.
>>
>> If we use a timeout, then there should be no problems with multiplexing
>> using one thread right?  Just curious.
>>
>> thanks,
>> Evan
>>
>>
>> On Fri, Jan 6, 2012 at 9:00 AM, Jun Rao <junrao@gmail.com> wrote:
>>
>> > Evan,
>> >
>> > The number that you pass in the topic map controls how many streams a
>> topic
>> > is divided into. In your case, if you pass in 1, all 10 partitions's data
>> > will be fed into 1 stream. If you pass in 2, each of the 2 streams will
>> get
>> > data from 5 partitions. If you pass in 11, 10 of them will each get data
>> > from 1 partition and 1 stream will get nothing.
>> >
>> > Typically, you need to iterate each stream in its own thread. This is
>> > because each stream can block forever if there is no new event.
>> >
>> > Thanks,
>> >
>> > Jun
>> >
>> > On Fri, Jan 6, 2012 at 12:05 AM, Evan Chan <ev@ooyala.com> wrote:
>> >
>> > > I have to pass in a Map from the topic -> #streams, according to the
>> > > scaladoc at http://incubator.apache.org/kafka/api-docs/0.6/.
>> > > Is this the same or different than the # of partitions?  For example,
>> > let's
>> > > say that I have 10 partitions for a topic.  What partitions will the
>> > > following code fetch from?
>> > >
>> > >     val consumerConnector = Consumer.create(new ConsumerConfig(props))
>> > >     val topicMessageStreams =
>> > >   consumerConnector.createMessageStreams(Predef.Map(topic -> 1))
>> > >     val kafkaStream = topicMessageStreams(topic)(0)
>> > >
>> > >     override def run = try {
>> > >       for (message <- kafkaStream) {
>> > >
>> > > Is there any way I can get the code above to read from all 10
>> partitions?
>> > > Or do I really have to create 10 separate threads for reading from 10
>> > > partitions at once?
>> > >
>> > > Does each KafkaMessageStream need its own thread, or can they be shared
>> > > somehow?
>> > >
>> > > Let's say I want to be able to attach additional consumers dynamically
>> to
>> > > read from the same topic, such that the consumers get the messages
>> > > round-robin.
>> > > It seems we have the following constraints:
>> > > - The # of partitions must be >= the max # of consumers I would want
to
>> > > attach, since partitions cannot be divided amongst multiple consumers
>> > > - Each consumer must be able to consume up to ceil(# partitions /
>> > minimum #
>> > > consumers) partitions or streams (still confused).    If the consumers
>> > > don't grab all of the streams/partitions, then some partitions will not
>> > be
>> > > allocated and no messages would be read from those partitions.
>> > >
>> > > Thanks for clarifying confusion.
>> > >
>> > > --
>> > > --
>> > > *Evan Chan*
>> > > Senior Software Engineer |
>> > > ev@ooyala.com | (650) 996-4600
>> > > www.ooyala.com | blog <http://www.ooyala.com/blog> |
>> > > @ooyala<http://www.twitter.com/ooyala>
>> > >
>> >
>>
>>
>>
>> --
>> --
>> *Evan Chan*
>> Senior Software Engineer |
>> ev@ooyala.com | (650) 996-4600
>> www.ooyala.com | blog <http://www.ooyala.com/blog> |
>> @ooyala<http://www.twitter.com/ooyala>
>>

Mime
View raw message