beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Alban Perillat-Merceroz (JIRA)" <>
Subject [jira] [Created] (BEAM-990) KafkaIO does not commit offsets to Kafka
Date Wed, 16 Nov 2016 09:58:58 GMT
Alban Perillat-Merceroz created BEAM-990:

             Summary: KafkaIO does not commit offsets to Kafka
                 Key: BEAM-990
             Project: Beam
          Issue Type: Bug
            Reporter: Alban Perillat-Merceroz

I use KafkaIO as a source, and I would like consumed offsets to be stored in Kafka (in the
{{__consumer_offsets}} topic).

I'm configuring the Kafka reader with 
              ConsumerConfig.GROUP_ID_CONFIG, "my-group",
              ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE,
              ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't work with default
value either (5000ms)

But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}}, next job will
restart at latest offset).

I can't find in the code where the offsets are supposed to be committed.

I tried to add a manual commit in the {{consumerPollLoop()}} method, and it works, offsets
are committed:

private void consumerPollLoop() {
            // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue
            while (!closed.get()) {
                try {
                    ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
                    if (!records.isEmpty() && !closed.get()) {
                        availableRecordsQueue.put(records); // blocks until dequeued.
                        // Manual commit
                } catch (InterruptedException e) {
                    LOG.warn("{}: consumer thread is interrupted", this, e); // not expected
                } catch (WakeupException e) {

  "{}: Returning from consumer pool loop", this);

Is this a bug in KafkaIO or am I misconfiguring something?

Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in Dataflow SDK (,
but I'm confident the code is similar for this case.

This message was sent by Atlassian JIRA

View raw message