kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jun Rao <jun...@gmail.com>
Subject Re: Partition Question
Date Sun, 14 Aug 2011 03:54:54 GMT
That property is available at ProducerConfig as well.

Yes, compression occurs at the producer side. The compressed data is
propagated to the broker and then to the consumer. The consumer library
decompresses the data.

Jun

On Sat, Aug 13, 2011 at 8:44 PM, Blake Matheny <blake@tumblr.com> wrote:

> It looks like that's on applicable to the SyncProducer? Also, I assume
> the upcoming (0.7) compression functionality be on the wire
> compression?
>
> Thanks,
>
> -Blake
>
> On Sat, Aug 13, 2011 at 11:27 PM, Jun Rao <junrao@gmail.com> wrote:
> > Blake,
> >
> > Once an unexpectedly large message hits the broker, it's going to be hard
> > for the consumer to automatically recover from this error. The best way
> is
> > to prevent such a message from being produced in the first place. There
> is a
> > config property max.message.size in the producer that controls the size
> of
> > the largest allowed message. Messages exceeding that size will not be
> sent.
> >
> > Thanks,
> >
> > Jun
> >
> > On Sat, Aug 13, 2011 at 8:14 PM, Blake Matheny <blake@tumblr.com> wrote:
> >
> >> Thanks for clarifying things Jay, makes sense. It was indeed a 'bug',
> >> Once I looked at the logs (oh yeah, those things), I found:
> >>
> >> ERROR [20110813-21:50:50.227] consumer.FetcherRunnable: error in
> >> FetcherRunnable
> >> kafka.common.InvalidMessageSizeException: invalid message size:488145
> >> only received bytes:307196 at 0 possible causes (1) a single message
> >> larger than the fetch size; (2) log corruption
> >> INFO [20110813-21:50:50.228] consumer.FetcherRunnable: stopping
> >> fetcher FetchRunnable-0 to host 10.60.26.38
> >> com.tumblr.motherboy.workers.UpdatePostWorker@1d573137: caught
> >> kafka.common.InvalidMessageSizeException
> >>
> >> After increasing the fetch.size the consumer continued on and is now
> >> catching up. For us this is safe since our codec would catch any
> >> corruption in the message, but it brings up a question. What is the
> >> right way to deal with this? Catch it and skip the message? Can you
> >> safely update the offset in zookeeper to force the consumer to move
> >> ahead?
> >>
> >> Thanks,
> >>
> >> -Blake
> >>
> >> On Sat, Aug 13, 2011 at 10:57 PM, Jay Kreps <jay.kreps@gmail.com>
> wrote:
> >> > Hi Blake,
> >> >
> >> > Yes, as you say the num.partitions is the number of partitions per
> node.
> >> So
> >> > as you say, 5 topics with 5 partitions each will have 25 total topic
> >> > partitions per node for a total of 50 partitions.
> >> >
> >> > Partitions can be merged for consumption but not split, which means
> >> across
> >> > all consumer machines/processes you can have as few as 1 or as many as
> 50
> >> > active consumers (if you have more than 50, the extras won't get any
> >> data).
> >> > Provided this is the case, and you have an active thread consuming off
> of
> >> > each of the 5 topics, you should see offset updates on each topic
> >> appearing
> >> > in zookeeper for all the partitions. Be aware that data is fetched in
> >> chunks
> >> > from each partition and those partitons are processed sequentially so
> the
> >> > updates will not be continual on all partitions.
> >> >
> >> > What you are describing sounds like a bug somewhere. Can you turn on
> >> debug
> >> > logging in the consumers and add logging to make sure all your threads
> >> are
> >> > really getting data and consuming?
> >> >
> >> > -Jay
> >> >
> >> > On Sat, Aug 13, 2011 at 7:39 PM, Blake Matheny <blake@tumblr.com>
> wrote:
> >> >
> >> >> Our current setup:
> >> >>
> >> >>  2 brokers, each with num.partitions set to 5
> >> >>  n producers, publishing to 5 topics
> >> >>  5 consumers
> >> >>   All in same consumer group
> >> >>   Each is consuming from all 5 topics
> >> >>   Each is reading from 2 KafkaMessageStream's
> >> >>  Custom Partitioner, provides uniform distribution
> >> >>
> >> >> Having read the recently recommended Kafka paper that describes some
> >> >> of the partitioning semantics, I have a few questions wrt the above
> >> >> setup.
> >> >>
> >> >> First, the way the ZK info for the brokers read, it looks like
> setting
> >> >> num.partitions to 5 on each broker has actually created 10 total
> >> >> partitions, 5 on each broker, is that correct?
> >> >> Second, with 5 topics, 5 partitions, and 2 brokers, does that give
> you
> >> >> 50 distinct message streams? I understand that a consumer can pull
> >> >> from more than one partition, but assuming you would like to map a
> >> >> single topic/partition to each consumer, would you in the above setup
> >> >> want to run 50 consumers?
> >> >> Lastly, I'm seeing updates to the log files on the second broker
> >> >> (/tmp/kafka-logs/[topic]-[partition-id]/[logfile].kafka is growing),
> >> >> but the corresponding offset znode isn't being updated by the
> >> >> consumer. The same consumer is updating the offset for the same
> topic,
> >> >> different partiton/consumer just fine (which leads me to believe the
> >> >> consumer is working properly). Is there something in the above
> >> >> described config that sounds incorrect? I'm wondering if there is a
> >> >> bug (in my code or elsewhere) when a consumer is reading from two
> >> >> partitions on the same topic across more than one broker. Just
> >> >> guessing though.
> >> >>
> >> >> Thanks in advance,
> >> >>
> >> >> -Blake
> >> >>
> >> >> --
> >> >> Blake Matheny
> >> >>
> >> >
> >>
> >>
> >>
> >> --
> >> Blake Matheny
> >>
> >
>
>
>
> --
> Blake Matheny
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message