kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Evan Chan ...@ooyala.com>
Subject Re: Questions on consumerConnector.createMessageStreams()
Date Fri, 06 Jan 2012 20:02:09 GMT
Jun,

I can give it an initial stab.

-Evan


On Fri, Jan 6, 2012 at 10:30 AM, Jun Rao <junrao@gmail.com> wrote:

> Evan,
>
> We can add this in our wiki, probably under a new section "Details of Kafka
> components". We can put the compression and the mirroring pages under that
> section and add a page for consumer. Would you be interested in starting
> the consumer page?
>
> Thanks,
>
> Jun
>
> On Fri, Jan 6, 2012 at 9:55 AM, Evan Chan <ev@ooyala.com> wrote:
>
> > Jun,
> >
> > Thanks.  Could this be clearly documented in the ScalaDocs and other
> docs?
> >  It would be very helpful.
> >
> > If we use a timeout, then there should be no problems with multiplexing
> > using one thread right?  Just curious.
> >
> > thanks,
> > Evan
> >
> >
> > On Fri, Jan 6, 2012 at 9:00 AM, Jun Rao <junrao@gmail.com> wrote:
> >
> > > Evan,
> > >
> > > The number that you pass in the topic map controls how many streams a
> > topic
> > > is divided into. In your case, if you pass in 1, all 10 partitions's
> data
> > > will be fed into 1 stream. If you pass in 2, each of the 2 streams will
> > get
> > > data from 5 partitions. If you pass in 11, 10 of them will each get
> data
> > > from 1 partition and 1 stream will get nothing.
> > >
> > > Typically, you need to iterate each stream in its own thread. This is
> > > because each stream can block forever if there is no new event.
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Fri, Jan 6, 2012 at 12:05 AM, Evan Chan <ev@ooyala.com> wrote:
> > >
> > > > I have to pass in a Map from the topic -> #streams, according to the
> > > > scaladoc at http://incubator.apache.org/kafka/api-docs/0.6/.
> > > > Is this the same or different than the # of partitions?  For example,
> > > let's
> > > > say that I have 10 partitions for a topic.  What partitions will the
> > > > following code fetch from?
> > > >
> > > >     val consumerConnector = Consumer.create(new
> ConsumerConfig(props))
> > > >     val topicMessageStreams =
> > > >   consumerConnector.createMessageStreams(Predef.Map(topic -> 1))
> > > >     val kafkaStream = topicMessageStreams(topic)(0)
> > > >
> > > >     override def run = try {
> > > >       for (message <- kafkaStream) {
> > > >
> > > > Is there any way I can get the code above to read from all 10
> > partitions?
> > > > Or do I really have to create 10 separate threads for reading from 10
> > > > partitions at once?
> > > >
> > > > Does each KafkaMessageStream need its own thread, or can they be
> shared
> > > > somehow?
> > > >
> > > > Let's say I want to be able to attach additional consumers
> dynamically
> > to
> > > > read from the same topic, such that the consumers get the messages
> > > > round-robin.
> > > > It seems we have the following constraints:
> > > > - The # of partitions must be >= the max # of consumers I would want
> to
> > > > attach, since partitions cannot be divided amongst multiple consumers
> > > > - Each consumer must be able to consume up to ceil(# partitions /
> > > minimum #
> > > > consumers) partitions or streams (still confused).    If the
> consumers
> > > > don't grab all of the streams/partitions, then some partitions will
> not
> > > be
> > > > allocated and no messages would be read from those partitions.
> > > >
> > > > Thanks for clarifying confusion.
> > > >
> > > > --
> > > > --
> > > > *Evan Chan*
> > > > Senior Software Engineer |
> > > > ev@ooyala.com | (650) 996-4600
> > > > www.ooyala.com | blog <http://www.ooyala.com/blog> |
> > > > @ooyala<http://www.twitter.com/ooyala>
> > > >
> > >
> >
> >
> >
> > --
> > --
> > *Evan Chan*
> > Senior Software Engineer |
> > ev@ooyala.com | (650) 996-4600
> > www.ooyala.com | blog <http://www.ooyala.com/blog> |
> > @ooyala<http://www.twitter.com/ooyala>
> >
>



-- 
--
*Evan Chan*
Senior Software Engineer |
ev@ooyala.com | (650) 996-4600
www.ooyala.com | blog <http://www.ooyala.com/blog> |
@ooyala<http://www.twitter.com/ooyala>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message