kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mevada, Vatsal" <Mev...@sky.optymyze.com>
Subject Detecting when all the retries are expired for a message
Date Thu, 01 Dec 2016 13:22:43 GMT
Hi,



I am reading a file and dumping each record on Kafka. Here is my producer code:



public void produce(String topicName, String filePath, String bootstrapServers, String encoding)
{

                try (BufferedReader bf = getBufferedReader(filePath, encoding);

                                KafkaProducer<Object, String> producer = initKafkaProducer(bootstrapServers))
{

                                String line;

                                while ((line = bf.readLine()) != null) {

                                                producer.send(new ProducerRecord<>(topicName,
line), (metadata, e) -> {

                                                                if (e != null) {

                                                                                e.printStackTrace();

                                                                }

                                                });

                                }

                                producer.flush();

                } catch (IOException e) {

                                Throwables.propagate(e);

                }

}



private static KafkaProducer<Object, String> initKafkaProducer(String bootstrapServer)
{

                Properties properties = new Properties();

                properties.put("bootstrap.servers", bootstrapServer);

                properties.put("key.serializer", StringSerializer.class.getCanonicalName());

                properties.put("value.serializer", StringSerializer.class.getCanonicalName());

                properties.put("acks", "-1");

                properties.put("retries", 10);

                return new KafkaProducer<>(properties);

}



private BufferedReader getBufferedReader(String filePath, String encoding) throws UnsupportedEncodingException,
FileNotFoundException {

                return new BufferedReader(new InputStreamReader(new FileInputStream(filePath),
Optional.ofNullable(encoding).orElse("UTF-8")));

}



As per the official documentation of Callback<https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/producer/Callback.html>,
TimeoutException is a retriable exception. As I have kept retries 10, producer will try to
resend the message if delivering some message fails with TimeoutException. I am looking for
some reliable to way to detect when delivery of a message is failed permanently after all
retries.



Regards,

Vatsal

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message