beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ming...@apache.org
Subject [beam] branch master updated: [BEAM-3526] KakfaIO support for finalizeCheckpoint() (#4481)
Date Fri, 26 Jan 2018 18:41:10 GMT
This is an automated email from the ASF dual-hosted git repository.

mingmxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1274938  [BEAM-3526] KakfaIO support for finalizeCheckpoint() (#4481)
1274938 is described below

commit 1274938bcfd1c2233f27b9b6c5fa5d10370e5ade
Author: Raghu Angadi <rangadi@apache.org>
AuthorDate: Fri Jan 26 10:41:05 2018 -0800

    [BEAM-3526] KakfaIO support for finalizeCheckpoint() (#4481)
    
    * KakfaIO : Add support for finalizeCheckpoint()
    
    Adds an option to KafkaIO source to commit offsets to Kafka inside
    `CheckpointMark#finalizecheckPoint()`. Kafka consumer is single
    threaded and as a result actuall commit to Kafka might take upto
    1 second (KAFKA_POLL_TIMEOUT). This is not necessarily bad since
    it could avoid excessive commits when multiple checkpoint marks
    are finalized at once by the runner.
    
    * @AvroIgnore reader field in checkpoint mark.
    
    * reset records to emty.
    
    * minor
    
    * Set enq timeout to 100 millis
    
    * check return of offer(); remove metric check in unit test.
    
    * Warn if auto commit is enabled.
---
 .../beam/sdk/io/kafka/KafkaCheckpointMark.java     |  20 +++-
 .../java/org/apache/beam/sdk/io/kafka/KafkaIO.java | 118 ++++++++++++++++++---
 .../org/apache/beam/sdk/io/kafka/KafkaIOTest.java  |   9 +-
 3 files changed, 126 insertions(+), 21 deletions(-)

diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
index 64dbad4..0856c7c 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaCheckpointMark.java
@@ -21,6 +21,8 @@ import com.google.common.base.Joiner;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
+
+import org.apache.avro.reflect.AvroIgnore;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.UnboundedSource;
@@ -34,10 +36,15 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark
{
 
   private List<PartitionMark> partitions;
 
+  @AvroIgnore
+  private KafkaIO.UnboundedKafkaReader<?, ?> reader; // Non-null when offsets need
to be committed.
+
   private KafkaCheckpointMark() {} // for Avro
 
-  public KafkaCheckpointMark(List<PartitionMark> partitions) {
+  public KafkaCheckpointMark(List<PartitionMark> partitions,
+                             KafkaIO.UnboundedKafkaReader<?, ?> reader) {
     this.partitions = partitions;
+    this.reader = reader;
   }
 
   public List<PartitionMark> getPartitions() {
@@ -46,10 +53,13 @@ public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark
{
 
   @Override
   public void finalizeCheckpoint() throws IOException {
-    /* nothing to do */
-
-    // We might want to support committing offset in Kafka for better resume point when the
job
-    // is restarted (checkpoint is not available for job restarts).
+    if (reader != null) {
+      // Is it ok to commit asynchronously, or should we wait till this (or newer) is committed?
+      // Often multiple marks would be finalized at once, since we only need to finalize
the latest,
+      // it is better to wait a little while. Currently maximum is delay same as KAFKA_POLL_TIMEOUT
+      // in the reader (1 second).
+      reader.finalizeCheckpointMarkAsync(this);
+    }
   }
 
   @Override
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 374dbc7..aa931e6 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -62,6 +62,8 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.AtomicCoder;
@@ -302,6 +304,7 @@ public class KafkaIO {
         .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
         .setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES)
         .setMaxNumRecords(Long.MAX_VALUE)
+        .setCommitOffsetsInFinalizeEnabled(false)
         .build();
   }
 
@@ -344,6 +347,7 @@ public class KafkaIO {
     @Nullable abstract Duration getMaxReadTime();
 
     @Nullable abstract Instant getStartReadTime();
+    abstract boolean isCommitOffsetsInFinalizeEnabled();
 
     abstract Builder<K, V> toBuilder();
 
@@ -364,6 +368,7 @@ public class KafkaIO {
       abstract Builder<K, V> setMaxNumRecords(long maxNumRecords);
       abstract Builder<K, V> setMaxReadTime(Duration maxReadTime);
       abstract Builder<K, V> setStartReadTime(Instant startReadTime);
+      abstract Builder<K, V> setCommitOffsetsInFinalizeEnabled(boolean commitOffsetInFinalize);
 
       abstract Read<K, V> build();
     }
@@ -562,6 +567,19 @@ public class KafkaIO {
     }
 
     /**
+     * Finalized offsets are committed to Kafka. See {@link CheckpointMark#finalizeCheckpoint()}.
+     * It helps with minimizing gaps or duplicate processing of records while restarting
a pipeline
+     * from scratch. But it does not provide hard processing guarantees. There
+     * could be a short delay to commit after {@link CheckpointMark#finalizeCheckpoint()}
is
+     * invoked, as reader might be blocked on reading from Kafka. Note that it is independent
of
+     * 'AUTO_COMMIT' Kafka consumer configuration. Usually either this or AUTO_COMMIT in
Kafka
+     * consumer is enabled, but not both.
+     */
+    public Read<K, V> commitOffsetsInFinalize() {
+      return toBuilder().setCommitOffsetsInFinalizeEnabled(true).build();
+    }
+
+    /**
      * Returns a {@link PTransform} for PCollection of {@link KV}, dropping Kafka metatdata.
      */
     public PTransform<PBegin, PCollection<KV<K, V>>> withoutMetadata()
{
@@ -584,6 +602,16 @@ public class KafkaIO {
                 + ". If you are building with maven, set \"kafka.clients.version\" "
                 + "maven property to 0.10.1.0 or newer.");
       }
+      if (isCommitOffsetsInFinalizeEnabled()) {
+        checkArgument(getConsumerConfig().get(ConsumerConfig.GROUP_ID_CONFIG) != null,
+          "commitOffsetsInFinalize() is enabled, but group.id in Kafka consumer config "
+              + "is not set. Offset management requires group.id.");
+        if (Boolean.TRUE.equals(
+          getConsumerConfig().get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))) {
+          LOG.warn("'{}' in consumer config is enabled even though commitOffsetsInFinalize()
"
+              + "is set. You need only one of them.", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+        }
+      }
 
       // Infer key/value coders if not specified explicitly
       CoderRegistry registry = input.getPipeline().getCoderRegistry();
@@ -877,7 +905,16 @@ public class KafkaIO {
     }
   }
 
-  private static class UnboundedKafkaReader<K, V> extends UnboundedReader<KafkaRecord<K,
V>> {
+  @VisibleForTesting
+  static class UnboundedKafkaReader<K, V> extends UnboundedReader<KafkaRecord<K,
V>> {
+    // package private, accessed in KafkaCheckpointMark.finalizeCheckpoint.
+
+    @VisibleForTesting
+    static final String METRIC_NAMESPACE = "KafkaIOReader";
+    @VisibleForTesting
+    static final String CHECKPOINT_MARK_COMMITS_METRIC = "checkpointMarkCommits";
+    private static final String CHECKPOINT_MARK_COMMIT_SKIPS_METRIC = "checkpointMarkCommitSkips";
+
 
     private final UnboundedKafkaSource<K, V> source;
     private final String name;
@@ -896,9 +933,25 @@ public class KafkaIO {
     private final Counter bytesReadBySplit;
     private final Gauge backlogBytesOfSplit;
     private final Gauge backlogElementsOfSplit;
+    private final Counter checkpointMarkCommits = Metrics.counter(
+      METRIC_NAMESPACE, CHECKPOINT_MARK_COMMITS_METRIC);
+    // Checkpoint marks skipped in favor of newer mark (only the latest needs to be committed).
+    private final Counter checkpointMarkCommitSkips = Metrics.counter(
+      METRIC_NAMESPACE, CHECKPOINT_MARK_COMMIT_SKIPS_METRIC);
 
+    /**
+     * The poll timeout while reading records from Kafka.
+     * If option to commit reader offsets in to Kafka in
+     * {@link KafkaCheckpointMark#finalizeCheckpoint()} is enabled, it would be delayed until
+     * this poll returns. It should be reasonably low as a result.
+     * At the same time it probably can't be very low like 10 millis, I am not sure how it
affects
+     * when the latency is high. Probably good to experiment. Often multiple marks would
be
+     * finalized in a batch, it it reduce finalization overhead to wait a short while and
finalize
+     * only the last checkpoint mark.
+     */
     private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
-    private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10);
+    private static final Duration RECORDS_DEQUEUE_POLL_TIMEOUT = Duration.millis(10);
+    private static final Duration RECORDS_ENQUEUE_POLL_TIMEOUT = Duration.millis(100);
 
     // Use a separate thread to read Kafka messages. Kafka Consumer does all its work including
     // network I/O inside poll(). Polling only inside #advance(), especially with a small
timeout
@@ -907,6 +960,7 @@ public class KafkaIO {
     private final ExecutorService consumerPollThread = Executors.newSingleThreadExecutor();
     private final SynchronousQueue<ConsumerRecords<byte[], byte[]>> availableRecordsQueue
=
         new SynchronousQueue<>();
+    private AtomicReference<KafkaCheckpointMark> finalizedCheckpointMark = new AtomicReference<>();
     private AtomicBoolean closed = new AtomicBoolean(false);
 
     // Backlog support :
@@ -1036,12 +1090,21 @@ public class KafkaIO {
     }
 
     private void consumerPollLoop() {
-      // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue
+      // Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue.
+
+      ConsumerRecords<byte[], byte[]> records = ConsumerRecords.empty();
       while (!closed.get()) {
         try {
-          ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
-          if (!records.isEmpty() && !closed.get()) {
-            availableRecordsQueue.put(records); // blocks until dequeued.
+          if (records.isEmpty()) {
+            records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
+          } else if (availableRecordsQueue.offer(records,
+                                                 RECORDS_ENQUEUE_POLL_TIMEOUT.getMillis(),
+                                                 TimeUnit.MILLISECONDS)) {
+            records = ConsumerRecords.empty();
+          }
+          KafkaCheckpointMark checkpointMark = finalizedCheckpointMark.getAndSet(null);
+          if (checkpointMark != null) {
+            commitCheckpointMark(checkpointMark);
           }
         } catch (InterruptedException e) {
           LOG.warn("{}: consumer thread is interrupted", this, e); // not expected
@@ -1054,13 +1117,40 @@ public class KafkaIO {
       LOG.info("{}: Returning from consumer pool loop", this);
     }
 
+    private void commitCheckpointMark(KafkaCheckpointMark checkpointMark) {
+      consumer.commitSync(
+        checkpointMark
+          .getPartitions()
+          .stream()
+          .filter(p -> p.getNextOffset() != UNINITIALIZED_OFFSET)
+          .collect(Collectors.toMap(
+            p -> new TopicPartition(p.getTopic(), p.getPartition()),
+            p -> new OffsetAndMetadata(p.getNextOffset())
+          ))
+      );
+      checkpointMarkCommits.inc();
+    }
+
+    /**
+     * Enqueue checkpoint mark to be committed to Kafka. This does not block until
+     * it is committed. There could be a delay of up to KAFKA_POLL_TIMEOUT (1 second).
+     * Any checkpoint mark enqueued earlier is dropped in favor of this checkpoint mark.
+     * Documentation for {@link CheckpointMark#finalizeCheckpoint()} says these are finalized
+     * in order. Only the latest offsets need to be committed.
+     */
+    void finalizeCheckpointMarkAsync(KafkaCheckpointMark checkpointMark) {
+      if (finalizedCheckpointMark.getAndSet(checkpointMark) != null) {
+        checkpointMarkCommitSkips.inc();
+      }
+    }
+
     private void nextBatch() {
       curBatch = Collections.emptyIterator();
 
       ConsumerRecords<byte[], byte[]> records;
       try {
         // poll available records, wait (if necessary) up to the specified timeout.
-        records = availableRecordsQueue.poll(NEW_RECORDS_POLL_TIMEOUT.getMillis(),
+        records = availableRecordsQueue.poll(RECORDS_DEQUEUE_POLL_TIMEOUT.getMillis(),
                                              TimeUnit.MILLISECONDS);
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
@@ -1307,13 +1397,13 @@ public class KafkaIO {
     public CheckpointMark getCheckpointMark() {
       reportBacklog();
       return new KafkaCheckpointMark(
-          // avoid lazy (consumedOffset can change)
-          ImmutableList.copyOf(
-              Lists.transform(
-                  partitionStates,
-                  p ->
-                      new PartitionMark(
-                          p.topicPartition.topic(), p.topicPartition.partition(), p.nextOffset))));
+        partitionStates.stream()
+            .map((p) -> new PartitionMark(p.topicPartition.topic(),
+                                          p.topicPartition.partition(),
+                                          p.nextOffset))
+            .collect(Collectors.toList()),
+        source.spec.isCommitOffsetsInFinalizeEnabled() ? this : null
+      );
     }
 
     @Override
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index c29c7a7..084731a 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -36,6 +36,7 @@ import java.lang.reflect.Method;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -644,7 +645,11 @@ public class KafkaIOTest {
     String readStep = "readFromKafka";
 
     p.apply(readStep,
-        mkKafkaReadTransform(numElements, new ValueAsTimestampFn()).withoutMetadata());
+        mkKafkaReadTransform(numElements, new ValueAsTimestampFn())
+          .updateConsumerProperties(ImmutableMap.<String, Object>of(ConsumerConfig.GROUP_ID_CONFIG,
+                                                                    "test.group"))
+          .commitOffsetsInFinalize()
+          .withoutMetadata());
 
     PipelineResult result = p.run();
 
@@ -1085,7 +1090,7 @@ public class KafkaIOTest {
     List<ProducerRecord<Integer, Long>> sent = mockProducer.history();
 
     // sort by values
-    Collections.sort(sent, (o1, o2) -> Long.compare(o1.value(), o2.value()));
+    Collections.sort(sent, Comparator.comparingLong((o) -> o.value()));
 
     for (int i = 0; i < numElements; i++) {
       ProducerRecord<Integer, Long> record = sent.get(i);

-- 
To stop receiving notification emails like this one, please contact
mingmxu@apache.org.

Mime
View raw message