kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Abhinav Solan <abhinav.so...@gmail.com>
Subject Kafka Consumer error handling
Date Wed, 18 May 2016 01:28:15 GMT
Hi Everyone,

I wanted to what is the best and secure way of error handling for
KafkaConsumer. I am using confluent's recommended consumer implementation.
my delivery semantics is at least once. I am switching off the auto commit
as well.
Or I should just switch on the auto commit.
The thing is I am processing all the messages in an asynchronous manner, so
I only want to do a commit offset when it comes back from the processing

Here is the KafkaConsumerRunner -

private void doCommitSync() {
    try {
        consumer.commitSync();
    } catch (WakeupException e) {
        // we're shutting down, but finish the commit first and then
        // rethrow the exception so that the main loop can exit
        doCommitSync();
        throw e;
    } catch (CommitFailedException e) {
        // the commit failed with an unrecoverable error. if there is any
        // internal state which depended on the commit, you can clean it
        // up here. otherwise it's reasonable to ignore the error and go on
        LOGGER.error("Commit failed", e);
    }
}

/**
 * run the thread in here
 */
@Override
public void run() {
    application.getCurrentSession();
    CountDownLatch batchCompleteLatch;
    try {
        consumer.subscribe(topics, new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
                doCommitSync();
            }

            @Override
            public void
onPartitionsAssigned(Collection<TopicPartition> partitions) {}
        });

        while (true) {
            // adding a count down latch to block the loop until the current
            batchCompleteLatch = new CountDownLatch(1);
            ConsumerRecords<String, String> records =
consumer.poll(Long.MAX_VALUE);

            LOGGER.info("Processing - " + records.count() + " records");

            Observable.<String>create(subscriber ->
records.forEach(consumerRecord ->
subscriber.onNext(consumerRecord.value())))
                    .map(json -> JsonSerializer.deserialize(json,
A2FKafkaMessage.class))
                    .groupBy(A2FKafkaMessage::getOrgId)
                    .flatMap(grp -> {
                        return
messageProcessor.process(grp.map(a2FKafkaMessage -> {
                            AssetDataPoint assetDataPoint =
(AssetDataPoint) a2FKafkaMessage.getMessage();
                            assetDataPoint.setOrgId(a2FKafkaMessage.getOrgId());
                            return assetDataPoint;
                        }));
                    })
                    .subscribe(
                            assetDataPoint -> {},
                            throwable -> {
                                LOGGER.error(throwable.toString(), throwable);
                                batchCompleteLatch.countDown();
                                throw new
                            }, // do nothing here, just don't commit
                            () -> {
                                consumer.commitAsync();
                                batchCompleteLatch.countDown();
                            }
                    );
            batchCompleteLatch.await();
        }
    } catch (WakeupException e) {
        // ignore we're closing
    } catch (Exception e) {
        LOGGER.error(e.toString(), e);
    } finally {
        consumer.close();
        shutdownLatch.countDown();
    }

}

Do let me know if you have any thoughts or improvements on this.

Thanks,
Abhinav

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message