kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bhavesh Mistry <mistry.p.bhav...@gmail.com>
Subject Re: message size limit
Date Tue, 09 Sep 2014 19:59:19 GMT
HI Jun,

Thanks for clarification.  Follow up questions, does new producer solve the
issues highlight.  In event of compression and async mode in new producer,
will it break down messages to this UPPER limit and submit or new producer
strictly honor batch size.  I am just asking if compression batch size
message reaches this configured limit, does batch broken down to sub batch
that is within limit.  I would like to minimize the data loss due to this
limit.


Thanks,

Bhavesh

On Mon, Sep 8, 2014 at 9:17 PM, Jun Rao <junrao@gmail.com> wrote:

> Are you using compression in the producer? If so, message.max.bytes applies
> to the compressed size of a batch of messages. Otherwise, message.max.bytes
> applies to the size of each individual message.
>
> Thanks,
>
> Jun
>
> On Wed, Sep 3, 2014 at 3:25 PM, Bhavesh Mistry <mistry.p.bhavesh@gmail.com
> >
> wrote:
>
> > I am referring to wiki http://kafka.apache.org/08/configuration.html and
> > following parameter control max batch message bytes as far as I know.
> > Kafka Community, please correct me if I am wrong.  I do not want to
> create
> > confusion for Kafka User Community here.   Also, if you increase this
> limit
> > than you have to set the corresponding limit increase on consumer side as
> > well (fetch.message.max.bytes).
> >
> > Since we are using batch async mode, our messages are getting drop
> sometime
> > if the entire batch bytes  exceed this limit so I was asking Kafka
> > Developers if any optimal way to determine the batch size based on this
> > limit to minimize the data loss. Because, entire batch is rejected by
> > brokers.
> >
> > message.max.bytes 1000000 The maximum size of a message that the server
> can
> > receive. It is important that this property be in sync with the maximum
> > fetch size your consumers use or else an unruly producer will be able to
> > publish messages too large for consumers to consume.
> >
> > Thanks,
> >
> > Bhavesh
> >
> >
> > On Wed, Sep 3, 2014 at 2:59 PM, Alexis Midon <
> > alexis.midon@airbedandbreakfast.com> wrote:
> >
> > > Hi Bhavesh
> > >
> > > can you explain what limit you're referring to?
> > > I'm asking because `message.max.bytes` is applied per message not per
> > > batch.
> > > is there another limit I should be aware of?
> > >
> > > thanks
> > >
> > >
> > > On Wed, Sep 3, 2014 at 2:07 PM, Bhavesh Mistry <
> > mistry.p.bhavesh@gmail.com
> > > >
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > We have similar problem.  We have variable length of messages.  So
> when
> > > we
> > > > have fixed size of Batch sometime the batch exceed the limit set on
> the
> > > > brokers (2MB).
> > > >
> > > > So can Producer have some extra logic to determine the optimal batch
> > size
> > > > by looking at configured message.max.bytes  value.
> > > >
> > > > During the metadata update, Producer will get this value from the
> > Broker
> > > > for each topic and Producer will check if current batch size reach
> this
> > > > limit than break batch into smaller chunk such way that It would not
> > > exceed
> > > > limit (unless single message exceed the limit). Basically try to
> avoid
> > > data
> > > > loss as much as possible.
> > > >
> > > > Please let me know what is your opinion on this...
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > >
> > > > On Wed, Sep 3, 2014 at 6:21 AM, Alexis Midon <
> > > > alexis.midon@airbedandbreakfast.com> wrote:
> > > >
> > > > > Thanks Jun.
> > > > >
> > > > > I'll create a jira and try to provide a patch. I think this is
> pretty
> > > > > serious.
> > > > >
> > > > > On Friday, August 29, 2014, Jun Rao <junrao@gmail.com> wrote:
> > > > >
> > > > > > The goal of batching is mostly to reduce the # RPC calls to
the
> > > broker.
> > > > > If
> > > > > > compression is enabled, a larger batch typically implies better
> > > > > compression
> > > > > > ratio.
> > > > > >
> > > > > > The reason that we have to fail the whole batch is that the
error
> > > code
> > > > in
> > > > > > the produce response is per partition, instead of per message.
> > > > > >
> > > > > > Retrying individual messages on MessageSizeTooLarge seems
> > reasonable.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Fri, Aug 29, 2014 at 4:28 PM, Alexis Midon <
> > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>>
wrote:
> > > > > >
> > > > > > > Could you explain the goals of batches? I was assuming
this was
> > > > simply
> > > > > a
> > > > > > > performance optimization, but this behavior makes me think
I'm
> > > > missing
> > > > > > > something.
> > > > > > > is a batch more than a list of *independent* messages?
> > > > > > >
> > > > > > > Why would you reject the whole batch? One invalid message
> causes
> > > the
> > > > > loss
> > > > > > > of batch.num.messages-1 messages :(
> > > > > > > It seems pretty critical to me.
> > > > > > >
> > > > > > > If ack=0, the producer will never know about it.
> > > > > > > If ack !=0, the producer will retry the whole batch. If
the
> issue
> > > was
> > > > > > > related to data corruption (etc), retries might work. But
in
> the
> > > case
> > > > > of
> > > > > > > "big message", the batch will always be rejected and the
> producer
> > > > will
> > > > > > give
> > > > > > > up.
> > > > > > >
> > > > > > > If the messages are indeed considered independent, I think
this
> > is
> > > a
> > > > > > pretty
> > > > > > > serious issue.
> > > > > > >
> > > > > > > I see 2 possible fix approaches:
> > > > > > > - the broker could reject only the invalid messages
> > > > > > > - the broker could reject the whole batch (like today)
but the
> > > > producer
> > > > > > (if
> > > > > > > ack!=0) could retry messages one at a time on exception
like
> > > > > > > "MessageSizeTooLarge".
> > > > > > >
> > > > > > > opinions?
> > > > > > >
> > > > > > > Alexis
> > > > > > >
> > > > > > > ```
> > > > > > > [2014-08-29 16:00:35,170] WARN Produce request with correlation
> > id
> > > 46
> > > > > > > failed due to [test,1]:
> kafka.common.MessageSizeTooLargeException
> > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > [2014-08-29 16:00:35,284] WARN Produce request with correlation
> > id
> > > 51
> > > > > > > failed due to [test,0]:
> kafka.common.MessageSizeTooLargeException
> > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > [2014-08-29 16:00:35,392] WARN Produce request with correlation
> > id
> > > 56
> > > > > > > failed due to [test,0]:
> kafka.common.MessageSizeTooLargeException
> > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > [2014-08-29 16:00:35,499] WARN Produce request with correlation
> > id
> > > 61
> > > > > > > failed due to [test,1]:
> kafka.common.MessageSizeTooLargeException
> > > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > [2014-08-29 16:00:35,603] ERROR Failed to send requests
for
> > topics
> > > > test
> > > > > > > with correlation ids in [43,62]
> > > > > > (kafka.producer.async.DefaultEventHandler)
> > > > > > > [2014-08-29 16:00:35,603] ERROR Error in handling batch
of 3
> > events
> > > > > > > (kafka.producer.async.ProducerSendThread)
> > > > > > > kafka.common.FailedToSendMessageException: Failed to send
> > messages
> > > > > after
> > > > > > 3
> > > > > > > tries.
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> > > > > > >  at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
> > > > > > > at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
> > > > > > >  at
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
> > > > > > > ```
> > > > > > >
> > > > > > >
> > > > > > > On Thu, Aug 28, 2014 at 7:13 AM, Jun Rao <junrao@gmail.com
> > > > > > <javascript:;>> wrote:
> > > > > > >
> > > > > > > > That's right. If one message in a batch exceeds the
size
> limit,
> > > the
> > > > > > whole
> > > > > > > > batch is rejected.
> > > > > > > >
> > > > > > > > When determining message.max.bytes, the most important
thing
> to
> > > > > > consider
> > > > > > > is
> > > > > > > > probably memory since currently we need to allocate
memory
> for
> > a
> > > > full
> > > > > > > > message in the broker and the producer and the consumer
> client.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > >
> > > > > > > > On Wed, Aug 27, 2014 at 9:52 PM, Alexis Midon <
> > > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>>
wrote:
> > > > > > > >
> > > > > > > > > am I miss reading this loop:
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/log/Log.scala#L265-L269
> > > > > > > > >
> > > > > > > > > it seems like all messages from `validMessages`
(which is
> > > > > > > > > ByteBufferMessageSet) are NOT appended if one
of the
> message
> > > size
> > > > > > > exceeds
> > > > > > > > > the limit.
> > > > > > > > >
> > > > > > > > > I hope I'm missing something.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Aug 27, 2014 at 9:38 PM, Alexis Midon
<
> > > > > > > > > alexis.midon@airbedandbreakfast.com <javascript:;>>
wrote:
> > > > > > > > >
> > > > > > > > > > Hi Jun,
> > > > > > > > > >
> > > > > > > > > > thanks for you answer.
> > > > > > > > > > Unfortunately the size won't help much,
I'd like to see
> the
> > > > > actual
> > > > > > > > > message
> > > > > > > > > > data.
> > > > > > > > > >
> > > > > > > > > > By the way what are the things to consider
when deciding
> on
> > > > > > > > > > `message.max.bytes` value?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Aug 27, 2014 at 9:06 PM, Jun Rao
<
> junrao@gmail.com
> > > > > > <javascript:;>> wrote:
> > > > > > > > > >
> > > > > > > > > >> The message size check is currently
only done on the
> > broker.
> > > > If
> > > > > > you
> > > > > > > > > enable
> > > > > > > > > >> trace level logging in RequestChannel,
you will see the
> > > > produce
> > > > > > > > request,
> > > > > > > > > >> which includes the size of each partition.
> > > > > > > > > >>
> > > > > > > > > >> Thanks,
> > > > > > > > > >>
> > > > > > > > > >> Jun
> > > > > > > > > >>
> > > > > > > > > >>
> > > > > > > > > >> On Wed, Aug 27, 2014 at 4:40 PM, Alexis
Midon <
> > > > > > > > > >> alexis.midon@airbedandbreakfast.com
<javascript:;>>
> > wrote:
> > > > > > > > > >>
> > > > > > > > > >> > Hello,
> > > > > > > > > >> >
> > > > > > > > > >> > my brokers are reporting that some
received messages
> > > exceed
> > > > > the
> > > > > > > > > >> > `message.max.bytes` value.
> > > > > > > > > >> > I'd like to know what producers
are at fault but It is
> > > > pretty
> > > > > > much
> > > > > > > > > >> > impossible:
> > > > > > > > > >> > - the brokers don't log the content
of the rejected
> > > messages
> > > > > > > > > >> > - the log messages do not contain
the IP of the
> > producers
> > > > > > > > > >> > - on the consumer side, no exception
is thrown (afaik
> it
> > > is
> > > > > > > because
> > > > > > > > > >> Ack-0
> > > > > > > > > >> > is used). The only kind of notification
is to closed
> the
> > > > > > > connection.
> > > > > > > > > >> >
> > > > > > > > > >> > [1] Do you have any suggestions
to track down the
> guilty
> > > > > > producers
> > > > > > > > or
> > > > > > > > > >> find
> > > > > > > > > >> > out the message content?
> > > > > > > > > >> >
> > > > > > > > > >> > Even though it makes total sense
to have the limit
> > defined
> > > > and
> > > > > > > > applied
> > > > > > > > > >> on
> > > > > > > > > >> > the brokers, I was thinking that
this check could also
> > be
> > > > > > applied
> > > > > > > by
> > > > > > > > > the
> > > > > > > > > >> > producers. Some google results
suggest that
> > > > > `message.max.bytes`
> > > > > > > > might
> > > > > > > > > be
> > > > > > > > > >> > used by the producers but I can't
find any trace of
> that
> > > > > > behavior
> > > > > > > in
> > > > > > > > > the
> > > > > > > > > >> > code.
> > > > > > > > > >> >
> > > > > > > > > >> > The closest thing I have is
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/producer/SyncProducer.scala#L67
> > > > > > > > > >> > but it simply logs the message
size and content and
> the
> > > log
> > > > > > level
> > > > > > > is
> > > > > > > > > >> trace.
> > > > > > > > > >> >
> > > > > > > > > >> > [2] could you please confirm if
such a producer-side
> > check
> > > > > > exists?
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > thanks!
> > > > > > > > > >> >
> > > > > > > > > >> > Alexis
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message