kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Rosenberg <...@squareup.com>
Subject Re: Producer.send questions
Date Mon, 26 Aug 2013 07:20:54 GMT
Ok,

I added a comment to: https://issues.apache.org/jira/browse/KAFKA-998

I added suggestion about exposing recoverability back to the caller.  I'm
not 100% sure it's the same issue (since I'm concerned about the client
api), and the Jira seems concerned about returning quickly from the
internal retry if it's not retryable.  Let me know if it makes more sense
to create a separate Jira instead.

It seems that ticket already had info about automatically retrying with a
smaller batch size in response to a MessageToLargeException.  Let me know
if that warrants it's own Jira (can create that too).

Jason


On Sun, Aug 25, 2013 at 10:01 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> Actually we do have a JIRA tracking this issue:
>
> https://issues.apache.org/jira/browse/KAFKA-998
>
> And BTW, any review comments are welcome :)
>
> Guozhang
>
>
> On Sat, Aug 24, 2013 at 8:25 PM, Neha Narkhede <neha.narkhede@gmail.com
> >wrote:
>
> > >> Ok, but perhaps the producer will handle something like this in the
> > future?
> >
> > Yes, I think we need a JIRA for this.
> >
> > >> UnretryableFailedToSendMessageException (wraps the root cause)
> > NoMoreRetriesFailedToSendMessageException (wraps the root cause, from the
> > final attempt)
> >
> > Something like this makes sense. Would you mind creating a JIRA for this
> so
> > we can
> > discuss a solution there ?
> >
> > Thanks,
> > Neha
> >
> >
> > On Sat, Aug 24, 2013 at 10:41 AM, Jason Rosenberg <jbr@squareup.com>
> > wrote:
> >
> > > Thanks Neha,
> > >
> > > On Sat, Aug 24, 2013 at 10:06 AM, Neha Narkhede <
> neha.narkhede@gmail.com
> > > >wrote:
> > > >
> > > > >> I gathered from one of your previous responses, that a
> > > > MessageSizeTooLargeException
> > > > can be rectified with a smaller batch size.
> > > >  If so, does that imply that the message size limit is measured on
> the
> > > > broker by the cumulative size of the batch, and not of any one
> message?
> > > >
> > > > That's right. The broker does the message size check on the
> compressed
> > > > message. The size of the compressed message
> > > > is proportional to the batch size. Hence, reducing the batch size on
> a
> > > > retry might make sense here, but currently the
> > > > producer doesn't do this.
> > > >
> > >
> > > Ok, but perhaps the producer will handle something like this in the
> > future?
> > >
> > > >
> > > > >> If I want to implement guaranteed delivery semantics, using the
> new
> > > > request.required.acks
> > > > configuration, I need to expose retry logic beyond
> > > > that built into the producer?
> > > >
> > > > The kafka producer must handle recoverable exceptions with a
> > configurable
> > > > number of retries and must not retry
> > > > on unrecoverable exceptions. So ideally you shouldn't have to write
> > your
> > > > own batching and retry logic.
> > > >
> > >
> > > So, it seems there might be a bit of a gray area.  There is a
> > configurable
> > > retry count, which we can increase perhaps to gain confidence that
> > anything
> > > recoverable has been sent.  But, since this retry count is finite,
> > there's
> > > no way to know for sure that it won't succeed if it were retried just
> one
> > > more time.  So, it is then difficult to conclude that  if Producer.send
> > > throws a FailedToSendMessageException, the message shouldn't be
> retried.
> > >
> > > Perhaps it would be useful to define different exception types, so
> that a
> > > caller can have clearer semantics:
> > >
> > > UnretryableFailedToSendMessageException (wraps the root cause)
> > > NoMoreRetriesFailedToSendMessageException (wraps the root cause, from
> the
> > > final attempt)
> > >
> > > Probably shorter names are possible here!  Perhaps these could be
> > > subclasses of FailedToSendMessageException.  Alternately,
> > > FailedToSendMessageException could include information, such as the
> > number
> > > of retries attempted, and a flag indicating whether it's possible to
> > retry
> > > the message.
> > >
> > > Jason
> > >
> > >
> > > >
> > > >
> > > >
> > > > On Sat, Aug 24, 2013 at 9:54 AM, Jason Rosenberg <jbr@squareup.com>
> > > wrote:
> > > >
> > > > > Jun,
> > > > >
> > > > > Thanks, this is helpful.
> > > > >
> > > > > So, can QueueFullException occur in either sync or async mode (or
> > just
> > > > > async mode)?
> > > > >
> > > > > If there's a MessageSizeTooLargeException, is there any visibility
> of
> > > > this
> > > > > to the caller?  Or will it just be a FailedToSendMessageException.
>  I
> > > > > gathered from one of your previous responses, that a
> > > > > MessageSizeTooLargeException can be rectified with a smaller batch
> > > size.
> > > > >  If so, does that imply that the message size limit is measured on
> > the
> > > > > broker by the cumulative size of the batch, and not of any one
> > message?
> > > > >  (makes sense if the broker doesn't unwrap a batch of messages
> before
> > > > > storing on the server).
> > > > >
> > > > > If I want to implement guaranteed delivery semantics, using the new
> > > > > request.required.acks configuration, I need to expose retry logic
> > > beyond
> > > > > that built into the producer?  And to do this, I need to indicate
> to
> > > the
> > > > > caller whether it's possible to retry, or whether it will be
> > fruitless.
> > > >  I
> > > > > suppose allowing message.max.send.retries to allow infinite retries
> > > (e.g.
> > > > > by setting it to -1) might be useful.  But optionally, I'd like the
> > > > caller
> > > > > to be able to handle this retry logic itself.
> > > > >
> > > > > Jason
> > > > >
> > > > >
> > > > > On Sat, Aug 24, 2013 at 8:22 AM, Jun Rao <junrao@gmail.com>
wrote:
> > > > >
> > > > > > You don't need to restart the producer. The producer currently
> > > handles
> > > > > all
> > > > > > error/exceptions by refreshing the metadata and retrying. If
it
> > fails
> > > > all
> > > > > > retries, it throws a FailedToSendMessageException to the caller
> (in
> > > > sync
> > > > > > mode). The original cause is not included in this exception.
We
> > have
> > > > > > thought about being a bit smarter in the producer retry logic
> such
> > > that
> > > > > it
> > > > > > only retries on recoverable errors and could implement this
at
> some
> > > > > point.
> > > > > > Other than FailedToSendMessageException, the producer can also
> > throw
> > > > > > QueueFullException.
> > > > > > This is an indication that the producer is sending data at a
rate
> > > > faster
> > > > > > than the broker can handle. This may or may not be recoverable
> > since
> > > it
> > > > > > depends on the load.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > >
> > > > > > On Sat, Aug 24, 2013 at 1:44 AM, Jason Rosenberg <
> jbr@squareup.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Jun,
> > > > > > >
> > > > > > > There are several others I've seen that I would have thought
> > would
> > > be
> > > > > > > retryable (possibly after an exponential backoff delay).
 I'm
> > > curious
> > > > > > > about:
> > > > > > >
> > > > > > > BrokerNotAvailableException
> > > > > > > FailedToSendMessageException
> > > > > > > QueueFullException (happens if producerType is 'async')
> > > > > > > KafkaException (this seems to wrap lots of base conditions,
> does
> > > one
> > > > > have
> > > > > > > to sort through the different wrapped exception types?)
> > > > > > > LeaderNotAvailableException
> > > > > > > MessageSizeTooLargeException (does a batch of messages
get
> > treated
> > > > as a
> > > > > > > single message, when checking for message size too large?)
> > > > > > > ReplicaNotAvailableException
> > > > > > > UnavailableProducerException
> > > > > > > UnknownException
> > > > > > >
> > > > > > > Also, what about my first question, regarding whether it
makes
> > > sense
> > > > to
> > > > > > > refresh a producer by closing it and restarting it after
a
> > failure?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jason
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Aug 23, 2013 at 9:07 PM, Jun Rao <junrao@gmail.com>
> > wrote:
> > > > > > >
> > > > > > > > For the most part, only SocketExceptions and
> > > > > > > NotLeaderForPartitionException
> > > > > > > > are recoverable. MessageSizeTooLargeException may
be
> > recoverable
> > > > > with a
> > > > > > > > smaller batch size.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jun
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Aug 23, 2013 at 4:09 PM, Jason Rosenberg <
> > > jbr@squareup.com
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > I'm using the kafka.javaapi.producer.Producer
class from a
> > java
> > > > > > client.
> > > > > > > > >  I'm wondering if it ever makes sense to refresh
a producer
> > by
> > > > > > stopping
> > > > > > > > it
> > > > > > > > > and creating a new one, for example in response
to a
> > downstream
> > > > IO
> > > > > > > error
> > > > > > > > > (e.g. a broker got restarted, or a stale socket,
etc.).
> > > > > > > > >
> > > > > > > > > Or should it always be safe to rely on the producer's
> > > > > implementation
> > > > > > to
> > > > > > > > > manage it's pool of BlockingChannel connections,
etc.
> > > > > > > > >
> > > > > > > > > I'm also interested in trying to understand which
> exceptions
> > > > > > indicate a
> > > > > > > > > failed send() request might be retryable (basically
> anything
> > > that
> > > > > > > doesn't
> > > > > > > > > involve a data-dependent problem, like a malformed
message,
> > or
> > > a
> > > > > > > message
> > > > > > > > > too large, etc.).
> > > > > > > > >
> > > > > > > > > Unfortunately, the range of Exceptions that can
be thrown
> by
> > > the
> > > > > > > various
> > > > > > > > > javaapi methods is not yet well documented. 
It would be
> nice
> > > to
> > > > > have
> > > > > > > > some
> > > > > > > > > notion of whether an exception is the result
of a data
> error,
> > > or
> > > > a
> > > > > > > > > transient downstream connection error, etc.
> > > > > > > > >
> > > > > > > > > Jason
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > > So, can QueueFullException occur in either sync or async mode (or
> just
> > > > async mode)?
> > > >
> > > > If there's a MessageSizeTooLargeException, is there any visibility of
> > > this
> > > > to the caller?  Or will it just be a FailedToSendMessageException.  I
> > > > gathered from one of your previous responses, that a
> > > > MessageSizeTooLargeException can be rectified with a smaller batch
> > size.
> > > >  If so, does that imply that the message size limit is measured on
> the
> > > > broker by the cumulative size of the batch, and not of any one
> message?
> > > >  (makes sense if the broker doesn't unwrap a batch of messages before
> > > > storing on the server).
> > > >
> > > > If I want to implement guaranteed delivery semantics, using the new
> > > > request.required.acks configuration, I need to expose retry logic
> > beyond
> > > > that built into the producer?
> > > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message