kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Asaf Mesika <asaf.mes...@gmail.com>
Subject Re: Detecting when all the retries are expired for a message
Date Tue, 06 Dec 2016 10:10:21 GMT
Vatsal:

I don't think they merged the fix for this bug (retries doesn't work) in
0.9.x to 0.10.0.1: https://github.com/apache/kafka/pull/1547


On Tue, Dec 6, 2016 at 10:19 AM Mevada, Vatsal <Mevada@sky.optymyze.com>
wrote:

> Hello,
>
> Bumping up this thread in case anyone of you have any say on this issue.
>
> Regards,
> Vatsal
>
> -----Original Message-----
> From: Mevada, Vatsal
> Sent: 02 December 2016 16:16
> To: Kafka Users <users@kafka.apache.org>
> Subject: RE: Detecting when all the retries are expired for a message
>
> I executed the same producer code for a single record file with following
> config:
>
>         properties.put("bootstrap.servers", bootstrapServer);
>         properties.put("key.serializer",
> StringSerializer.class.getCanonicalName());
>         properties.put("value.serializer",
> StringSerializer.class.getCanonicalName());
>         properties.put("acks", "-1");
>         properties.put("retries", 50000);
>         properties.put("request.timeout.ms", 1);
>
> I have kept request.timeout.ms=1 to make sure that message delivery will
> fail with TimeoutException. Since the retries are 50000 then the program
> should take at-least 50000 ms (50 seconds) to complete for single record.
> However the program is completing almost instantly with only one callback
> with TimeoutException. I suspect that producer is not going for any
> retries. Or am I missing something in my code?
>
> My Kafka version is 0.10.0.1.
>
> Regards,
> Vatsal
> Am I missing any configuration or
> -----Original Message-----
> From: Ismael Juma [mailto:ismaelj@gmail.com]
> Sent: 02 December 2016 13:30
> To: Kafka Users <users@kafka.apache.org>
> Subject: RE: Detecting when all the retries are expired for a message
>
> The callback is called after the retries have been exhausted.
>
> Ismael
>
> On 2 Dec 2016 3:34 am, "Mevada, Vatsal" <Mevada@sky.optymyze.com> wrote:
>
> > @Ismael:
> >
> > I can handle TimeoutException in the callback. However as per the
> > documentation of Callback(link: https://kafka.apache.org/0100/
> > javadoc/org/apache/kafka/clients/producer/Callback.html),
> > TimeoutException is a retriable exception and it says that it "may be
> > covered by increasing #.retries". So even if I get TimeoutException in
> > callback, wouldn't it try to send message again until all the retries
> > are done? Would it be safe to assume that message delivery is failed
> > permanently just by encountering TimeoutException in callback?
> >
> > Here is a snippet from above mentioned documentation:
> > "exception - The exception thrown during processing of this record.
> > Null if no error occurred. Possible thrown exceptions include:
> > Non-Retriable exceptions (fatal, the message will never be sent):
> > InvalidTopicException OffsetMetadataTooLargeException
> > RecordBatchTooLargeException RecordTooLargeException
> > UnknownServerException Retriable exceptions (transient, may be covered
> > by increasing #.retries): CorruptRecordException
> > InvalidMetadataException NotEnoughReplicasAfterAppendException
> > NotEnoughReplicasException OffsetOutOfRangeException TimeoutException
> > UnknownTopicOrPartitionException"
> >
> > @asaf :My kafka - API version is 0.10.0.1. So I think I should not
> > face the issue that you are mentioning. I mentioned documentation link
> > of 0.9 by mistake.
> >
> > Regards,
> > Vatsal
> > -----Original Message-----
> > From: Asaf Mesika [mailto:asaf.mesika@gmail.com]
> > Sent: 02 December 2016 00:32
> > To: Kafka Users <users@kafka.apache.org>
> > Subject: Re: Detecting when all the retries are expired for a message
> >
> > There's a critical bug in that section that has only been fixed in
> > 0.9.0.2 which has not been release yet. Without the fix it doesn't
> really retry.
> > I forked the kafka repo, applied the fix, built it and placed it in
> > our own Nexus Maven repository until 0.9.0.2 will be released.
> >
> > https://github.com/logzio/apache-kafka/commits/0.9.0.1-logzio
> >
> > Feel free to use it.
> >
> > On Thu, Dec 1, 2016 at 4:52 PM Ismael Juma <ismael@juma.me.uk> wrote:
> >
> > > The callback should give you what you are asking for. Has it not
> > > worked as you expect when you tried it?
> > >
> > > Ismael
> > >
> > > On Thu, Dec 1, 2016 at 1:22 PM, Mevada, Vatsal
> > > <Mevada@sky.optymyze.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > >
> > > >
> > > > I am reading a file and dumping each record on Kafka. Here is my
> > > > producer
> > > > code:
> > > >
> > > >
> > > >
> > > > public void produce(String topicName, String filePath, String
> > > > bootstrapServers, String encoding) {
> > > >
> > > >                 try (BufferedReader bf =
> > > > getBufferedReader(filePath, encoding);
> > > >
> > > >                                 KafkaProducer<Object, String>
> > > > producer =
> > > > initKafkaProducer(bootstrapServers)) {
> > > >
> > > >                                 String line;
> > > >
> > > >                                 while ((line = bf.readLine()) !=
> > > > null) {
> > > >
> > > >                                                 producer.send(new
> > > > ProducerRecord<>(topicName, line), (metadata, e) -> {
> > > >
> > > >                                                                 if
> > > > (e !=
> > > > null) {
> > > >
> > > >
> > > >       e.printStackTrace();
> > > >
> > > >                                                                 }
> > > >
> > > >                                                 });
> > > >
> > > >                                 }
> > > >
> > > >                                 producer.flush();
> > > >
> > > >                 } catch (IOException e) {
> > > >
> > > >                                 Throwables.propagate(e);
> > > >
> > > >                 }
> > > >
> > > > }
> > > >
> > > >
> > > >
> > > > private static KafkaProducer<Object, String>
> > > > initKafkaProducer(String
> > > > bootstrapServer) {
> > > >
> > > >                 Properties properties = new Properties();
> > > >
> > > >                 properties.put("bootstrap.servers",
> > > > bootstrapServer);
> > > >
> > > >                 properties.put("key.serializer",
> > StringSerializer.class.
> > > > getCanonicalName());
> > > >
> > > >                 properties.put("value.serializer",
> > > StringSerializer.class.
> > > > getCanonicalName());
> > > >
> > > >                 properties.put("acks", "-1");
> > > >
> > > >                 properties.put("retries", 10);
> > > >
> > > >                 return new KafkaProducer<>(properties);
> > > >
> > > > }
> > > >
> > > >
> > > >
> > > > private BufferedReader getBufferedReader(String filePath, String
> > > encoding)
> > > > throws UnsupportedEncodingException, FileNotFoundException {
> > > >
> > > >                 return new BufferedReader(new
> > > > InputStreamReader(new FileInputStream(filePath),
> Optional.ofNullable(encoding).
> > > > orElse("UTF-8")));
> > > >
> > > > }
> > > >
> > > >
> > > >
> > > > As per the official documentation of Callback<https://kafka.apache.
> > > > org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
> > > > TimeoutException is a retriable exception. As I have kept retries
> > > > 10, producer will try to resend the message if delivering some
> > > > message fails with TimeoutException. I am looking for some
> > > > reliable to way to detect
> > > when
> > > > delivery of a message is failed permanently after all retries.
> > > >
> > > >
> > > >
> > > > Regards,
> > > >
> > > > Vatsal
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message