kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Handling errors in the new (0.8.2) Java Client's Producer
Date Fri, 19 Sep 2014 19:31:42 GMT
Hello Andrew,

I think you would want a sync producer for your use case? You can try to
call get() on the returned metadata future of the send() call instead of
using a callback; the pattern is something like:

for (message in messages)
    producer.send(message).get()

The get() call will block until the message response has received, and will
throw an exception if the response is "failure", you the then catch the
exception and pause the producer.

Guozhang

On Thu, Sep 18, 2014 at 6:30 PM, Andrew Stein <
andrew.stein@quantumretail.com> wrote:

> I am trying to understand the best practices for working with the new
> (0.8.2) Producer interface.
>
> We have a process in a large server that writes a lot of data to Kafka.
> However, this data is not mission critical. When a problem arises writing
> to Kafka, most specifically network issues, but also full Producer buffers,
> we want the server to continue working, but to stop sending data to Kafka,
> allowing other tasks to continue. The issue I have is handling messages
> that have been "sent" to the producer but are waiting to go to Kafka. These
> messages remain long after my processing is over, timing out, writing to
> the logs, and
> preventing me from moving forward. I am looking for some way to tell the
> client to stop forwarding messages to Kafka.
>
> This is what I have so far:
>
>     class ErrorCallback implements Callback {
>         @Override
>         public void onCompletion(RecordMetadata metadata, Exception
> exception) {
>             if (exception == null) { // The message was sent,
>                 return;
>             }
>
>             stopProducerSendAndClose();
>             String threadName = Thread.currentThread().getName();
>             if (!threadName.equals("kafka-producer-network-thread")) { //
> Some of the callbacks happen on my thread
>             } else { // We are in KafkaProducer's ioThread ==> commit
> suicide.
>                 Thread.currentThread().interrupt();
>                 throw new ThreadDeath(); // Cannot throw an Exception as is
> will just be caught and logged.
>             }
>         }
>     }
>
> My question is, is this the correct approach, or is there some other way to
> stop sending messages (short of going "sync"ed).
>
> Andrew Stein
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message