kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Patricio Echag├╝e <patric...@gmail.com>
Subject Re: Questions on consumerConnector.createMessageStreams()
Date Fri, 06 Jan 2012 18:09:01 GMT
Evan, you can use timeouts apparently but be aware of this:
https://issues.apache.org/jira/browse/KAFKA-241

On Fri, Jan 6, 2012 at 9:55 AM, Evan Chan <ev@ooyala.com> wrote:

> Jun,
>
> Thanks.  Could this be clearly documented in the ScalaDocs and other docs?
>  It would be very helpful.
>
> If we use a timeout, then there should be no problems with multiplexing
> using one thread right?  Just curious.
>
> thanks,
> Evan
>
>
> On Fri, Jan 6, 2012 at 9:00 AM, Jun Rao <junrao@gmail.com> wrote:
>
> > Evan,
> >
> > The number that you pass in the topic map controls how many streams a
> topic
> > is divided into. In your case, if you pass in 1, all 10 partitions's data
> > will be fed into 1 stream. If you pass in 2, each of the 2 streams will
> get
> > data from 5 partitions. If you pass in 11, 10 of them will each get data
> > from 1 partition and 1 stream will get nothing.
> >
> > Typically, you need to iterate each stream in its own thread. This is
> > because each stream can block forever if there is no new event.
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Jan 6, 2012 at 12:05 AM, Evan Chan <ev@ooyala.com> wrote:
> >
> > > I have to pass in a Map from the topic -> #streams, according to the
> > > scaladoc at http://incubator.apache.org/kafka/api-docs/0.6/.
> > > Is this the same or different than the # of partitions?  For example,
> > let's
> > > say that I have 10 partitions for a topic.  What partitions will the
> > > following code fetch from?
> > >
> > >     val consumerConnector = Consumer.create(new ConsumerConfig(props))
> > >     val topicMessageStreams =
> > >   consumerConnector.createMessageStreams(Predef.Map(topic -> 1))
> > >     val kafkaStream = topicMessageStreams(topic)(0)
> > >
> > >     override def run = try {
> > >       for (message <- kafkaStream) {
> > >
> > > Is there any way I can get the code above to read from all 10
> partitions?
> > > Or do I really have to create 10 separate threads for reading from 10
> > > partitions at once?
> > >
> > > Does each KafkaMessageStream need its own thread, or can they be shared
> > > somehow?
> > >
> > > Let's say I want to be able to attach additional consumers dynamically
> to
> > > read from the same topic, such that the consumers get the messages
> > > round-robin.
> > > It seems we have the following constraints:
> > > - The # of partitions must be >= the max # of consumers I would want to
> > > attach, since partitions cannot be divided amongst multiple consumers
> > > - Each consumer must be able to consume up to ceil(# partitions /
> > minimum #
> > > consumers) partitions or streams (still confused).    If the consumers
> > > don't grab all of the streams/partitions, then some partitions will not
> > be
> > > allocated and no messages would be read from those partitions.
> > >
> > > Thanks for clarifying confusion.
> > >
> > > --
> > > --
> > > *Evan Chan*
> > > Senior Software Engineer |
> > > ev@ooyala.com | (650) 996-4600
> > > www.ooyala.com | blog <http://www.ooyala.com/blog> |
> > > @ooyala<http://www.twitter.com/ooyala>
> > >
> >
>
>
>
> --
> --
> *Evan Chan*
> Senior Software Engineer |
> ev@ooyala.com | (650) 996-4600
> www.ooyala.com | blog <http://www.ooyala.com/blog> |
> @ooyala<http://www.twitter.com/ooyala>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message