beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Alban Perillat-Merceroz (JIRA)" <j...@apache.org>
Subject [jira] [Created] (BEAM-990) KafkaIO does not commit offsets to Kafka
Date Wed, 16 Nov 2016 09:58:58 GMT
Alban Perillat-Merceroz created BEAM-990:
--------------------------------------------

             Summary: KafkaIO does not commit offsets to Kafka
                 Key: BEAM-990
                 URL: https://issues.apache.org/jira/browse/BEAM-990
             Project: Beam
          Issue Type: Bug
            Reporter: Alban Perillat-Merceroz


I use KafkaIO as a source, and I would like consumed offsets to be stored in Kafka (in the
{{__consumer_offsets}} topic).

I'm configuring the Kafka reader with 
{code:java}
.updateConsumerProperties(ImmutableMap.of(
              ConsumerConfig.GROUP_ID_CONFIG, "my-group",
              ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE,
              ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't work with default
value either (5000ms)
            ))
{code}

But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}}, next job will
restart at latest offset).

I can't find in the code where the offsets are supposed to be committed.

I tried to add a manual commit in the {{consumerPollLoop()}} method, and it works, offsets
are committed:

{code:java}
private void consumerPollLoop() {
            // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue
            while (!closed.get()) {
                try {
                    ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
                    if (!records.isEmpty() && !closed.get()) {
                        availableRecordsQueue.put(records); // blocks until dequeued.
                        // Manual commit
                        consumer.commitSync();
                    }
                } catch (InterruptedException e) {
                    LOG.warn("{}: consumer thread is interrupted", this, e); // not expected
                    break;
                } catch (WakeupException e) {
                    break;
                }
            }

            LOG.info("{}: Returning from consumer pool loop", this);
        }
{code}

Is this a bug in KafkaIO or am I misconfiguring something?

Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in Dataflow SDK (https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java),
but I'm confident the code is similar for this case.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message