beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stasle...@apache.org
Subject [1/2] beam git commit: [BEAM-2072] Fixed MicrobatchSource.reader stops reading after reaching maxNumRecords for the first time.
Date Mon, 01 May 2017 11:07:58 GMT
Repository: beam
Updated Branches:
  refs/heads/master b414f8de9 -> 254470e62


[BEAM-2072] Fixed MicrobatchSource.reader stops reading after reaching maxNumRecords for the
first time.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3b6f4f65
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3b6f4f65
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3b6f4f65

Branch: refs/heads/master
Commit: 3b6f4f65ea451f910769311f26e98e94f1ffe871
Parents: b414f8d
Author: Stas Levin <staslevin@apache.org>
Authored: Wed Apr 26 09:28:08 2017 +0300
Committer: Stas Levin <staslevin@apache.org>
Committed: Mon May 1 14:07:07 2017 +0300

----------------------------------------------------------------------
 .../beam/runners/spark/io/MicrobatchSource.java | 188 +++++++++----------
 .../beam/runners/spark/io/SourceDStream.java    |  12 +-
 .../apache/beam/runners/spark/io/SourceRDD.java |  11 +-
 .../spark/stateful/StateSpecFunctions.java      |  30 +--
 4 files changed, 125 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3b6f4f65/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
index fde5f9a..53d1ba7 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/MicrobatchSource.java
@@ -33,7 +33,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.util.FluentBackoff;
@@ -44,19 +44,14 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Mostly based on {@link org.apache.beam.sdk.io.BoundedReadFromUnboundedSource},
- * with some adjustments for this specific use-case.
- *
- * <p>A {@link BoundedSource} wrapping an {@link UnboundedSource} to complement Spark's
micro-batch
- * nature.
- *
- * <p>By design, Spark's micro-batches are bounded by their duration. Spark also provides
a
- * back-pressure mechanism that may signal a bound by max records.
+ * A {@link Source} that accommodates Spark's micro-batch oriented nature and wraps an
+ * {@link UnboundedSource}.
  */
 public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
-    extends BoundedSource<T> {
+    extends Source<T> {
+
   private static final Logger LOG = LoggerFactory.getLogger(MicrobatchSource.class);
-  private static volatile Cache<MicrobatchSource<?, ?>, BoundedReader<?>>
readerCache;
+  private static volatile Cache<MicrobatchSource<?, ?>, Source.Reader<?>>
readerCache;
 
   private final UnboundedSource<T, CheckpointMarkT> source;
   private final Duration maxReadTime;
@@ -70,13 +65,13 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
   private final int splitId;
 
   MicrobatchSource(
-      UnboundedSource<T, CheckpointMarkT> source,
-      Duration maxReadTime,
-      int numInitialSplits,
-      long maxNumRecords,
-      int splitId,
-      int sourceId,
-      double readerCacheInterval) {
+      final UnboundedSource<T, CheckpointMarkT> source,
+      final Duration maxReadTime,
+      final int numInitialSplits,
+      final long maxNumRecords,
+      final int splitId,
+      final int sourceId,
+      final double readerCacheInterval) {
     this.source = source;
     this.maxReadTime = maxReadTime;
     this.numInitialSplits = numInitialSplits;
@@ -86,12 +81,23 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
     this.readerCacheInterval = readerCacheInterval;
   }
 
+  private static synchronized void initReaderCache(final long readerCacheInterval) {
+    if (readerCache == null) {
+      LOG.info("Creating reader cache. Cache interval = {} ms.", readerCacheInterval);
+      readerCache =
+          CacheBuilder.newBuilder()
+                      .expireAfterAccess(readerCacheInterval, TimeUnit.MILLISECONDS)
+                      .removalListener(new ReaderCacheRemovalListener())
+                      .build();
+    }
+  }
+
   /**
    * Divide the given number of records into {@code numSplits} approximately
    * equal parts that sum to {@code numRecords}.
    */
-  private static long[] splitNumRecords(long numRecords, int numSplits) {
-    long[] splitNumRecords = new long[numSplits];
+  private static long[] splitNumRecords(final long numRecords, final int numSplits) {
+    final long[] splitNumRecords = new long[numSplits];
     for (int i = 0; i < numSplits; i++) {
       splitNumRecords[i] = numRecords / numSplits;
     }
@@ -101,56 +107,33 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
     return splitNumRecords;
   }
 
-  @Override
-  public List<? extends BoundedSource<T>> split(
-      long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
-    List<MicrobatchSource<T, CheckpointMarkT>> result = new ArrayList<>();
-    List<? extends UnboundedSource<T, CheckpointMarkT>> splits =
+  List<? extends Source<T>> split(final PipelineOptions options) throws Exception
{
+    final List<MicrobatchSource<T, CheckpointMarkT>> result = new ArrayList<>();
+    final List<? extends UnboundedSource<T, CheckpointMarkT>> splits =
         source.split(numInitialSplits, options);
-    int numSplits = splits.size();
-    long[] numRecords = splitNumRecords(maxNumRecords, numSplits);
+    final int numSplits = splits.size();
+    final long[] numRecords = splitNumRecords(maxNumRecords, numSplits);
     for (int i = 0; i < numSplits; i++) {
       // splits must be stable, and cannot change during consecutive executions
       // for example: Kafka should not add partitions if more then one topic is read.
-      result.add(new MicrobatchSource<>(splits.get(i), maxReadTime, 1, numRecords[i],
i, sourceId,
-          readerCacheInterval));
+      result.add(
+          new MicrobatchSource<>(
+              splits.get(i), maxReadTime, 1, numRecords[i], i, sourceId, readerCacheInterval));
     }
     return result;
   }
 
-  @Override
-  public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
-    return 0;
-  }
-
-  @Override
-  public BoundedReader<T> createReader(PipelineOptions options) throws IOException
{
-    return getOrCreateReader(options, null);
-  }
-
   @SuppressWarnings("unchecked")
-  public BoundedReader<T> getOrCreateReader(
-      PipelineOptions options,
-      CheckpointMarkT checkpointMark) throws IOException {
+  public Source.Reader<T> getOrCreateReader(
+      final PipelineOptions options, final CheckpointMarkT checkpointMark) throws IOException
{
     try {
       initReaderCache((long) readerCacheInterval);
-      return (BoundedReader<T>) readerCache.get(this, new ReaderLoader(options, checkpointMark));
-    } catch (ExecutionException e) {
+      return (Source.Reader<T>) readerCache.get(this, new ReaderLoader(options, checkpointMark));
+    } catch (final ExecutionException e) {
       throw new RuntimeException("Failed to get or create reader", e);
     }
   }
 
-  private static synchronized void initReaderCache(long readerCacheInterval) {
-    if (readerCache == null) {
-      LOG.info("Creating reader cache. Cache interval = " + readerCacheInterval + " ms.");
-      readerCache =
-          CacheBuilder.newBuilder()
-              .expireAfterAccess(readerCacheInterval, TimeUnit.MILLISECONDS)
-              .removalListener(new ReaderCacheRemovalListener())
-              .build();
-    }
-  }
-
   @Override
   public void validate() {
     source.validate();
@@ -193,21 +176,26 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
   }
 
   /**
-   * A {@link BoundedSource.BoundedReader}
-   * wrapping an {@link UnboundedSource.UnboundedReader}.
+   * Mostly based on
+   * {@link org.apache.beam.sdk.io.BoundedReadFromUnboundedSource}'s
+   * <code>UnboundedToBoundedSourceAdapter</code>,
+   * with some adjustments for Spark specifics.
    *
-   * <p>This Reader will read until it reached the bound of duration, or max records,
-   * whichever comes first.
+   * <p>This Reader reads until one of the following thresholds has been reached:
+   * <ol>
+   *   <li>max records (per batch)</li>
+   *   <li>max read duration (per batch)</li>
+   * </ol>
    */
-  public class Reader extends BoundedSource.BoundedReader<T> {
+  public class Reader extends Source.Reader<T> {
     private long recordsRead = 0L;
-    private Instant endTime;
+    private Instant readEndTime;
     private final FluentBackoff backoffFactory;
-    private final UnboundedSource.UnboundedReader<T> reader;
+    private final UnboundedSource.UnboundedReader<T> unboundedReader;
     private boolean started;
 
-    private Reader(UnboundedSource.UnboundedReader<T> reader) {
-      this.reader = reader;
+    private Reader(final UnboundedSource.UnboundedReader<T> unboundedReader) {
+      this.unboundedReader = unboundedReader;
       backoffFactory =
           FluentBackoff.DEFAULT
               .withInitialBackoff(Duration.millis(10))
@@ -215,21 +203,28 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
               .withMaxCumulativeBackoff(maxReadTime.minus(1));
     }
 
+    private boolean startIfNeeded() throws IOException {
+      return !started && ((started = true) && unboundedReader.start());
+    }
+
+    private void prepareForNewBatchReading() {
+      readEndTime = Instant.now().plus(maxReadTime);
+      recordsRead = 0L;
+    }
+
     @Override
     public boolean start() throws IOException {
-      LOG.debug("MicrobatchReader-{}: Starting a microbatch read from an unbounded source
with a "
-          + "max read time of {} msec, and max number of records {}.", splitId, maxReadTime,
-              maxNumRecords);
-      endTime = Instant.now().plus(maxReadTime);
-      // Since reader is reused in microbatches only start it if it has not already been
started.
-      if (!started) {
-        started = true;
-        if (reader.start()) {
-          recordsRead++;
-          return true;
-        }
-      }
-      return advanceWithBackoff();
+      LOG.debug(
+          "MicrobatchReader-{}: Starting a microbatch read from an unbounded source with
a max "
+              + "read time of {} millis, and max number of records {}.",
+          splitId,
+          maxReadTime,
+          maxNumRecords);
+
+      prepareForNewBatchReading();
+
+      // either start a new read, or continue an existing one
+      return startIfNeeded() || advanceWithBackoff();
     }
 
     @Override
@@ -237,20 +232,21 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
       if (recordsRead >= maxNumRecords) {
         finalizeCheckpoint();
         return false;
+      } else {
+        return advanceWithBackoff();
       }
-      return advanceWithBackoff();
     }
 
     private boolean advanceWithBackoff() throws IOException {
       // Try reading from the source with exponential backoff
-      BackOff backoff = backoffFactory.backoff();
+      final BackOff backoff = backoffFactory.backoff();
       long nextSleep = backoff.nextBackOffMillis();
       while (nextSleep != BackOff.STOP) {
-        if (endTime != null && Instant.now().isAfter(endTime)) {
+        if (readEndTime != null && Instant.now().isAfter(readEndTime)) {
           finalizeCheckpoint();
           return false;
         }
-        if (reader.advance()) {
+        if (unboundedReader.advance()) {
           recordsRead++;
           return true;
         }
@@ -262,55 +258,55 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
     }
 
     private void finalizeCheckpoint() throws IOException {
-      reader.getCheckpointMark().finalizeCheckpoint();
+      unboundedReader.getCheckpointMark().finalizeCheckpoint();
       LOG.debug("MicrobatchReader-{}: finalized CheckpointMark successfully after "
           + "reading {} records.", splitId, recordsRead);
     }
 
     @Override
     public T getCurrent() throws NoSuchElementException {
-      return reader.getCurrent();
+      return unboundedReader.getCurrent();
     }
 
     @Override
     public Instant getCurrentTimestamp() throws NoSuchElementException {
-      return reader.getCurrentTimestamp();
+      return unboundedReader.getCurrentTimestamp();
     }
 
     @Override
     public void close() throws IOException {
-      reader.close();
+      unboundedReader.close();
     }
 
     @Override
-    public BoundedSource<T> getCurrentSource() {
+    public Source<T> getCurrentSource() {
       return MicrobatchSource.this;
     }
 
     @SuppressWarnings("unchecked")
     public CheckpointMarkT getCheckpointMark() {
-      return (CheckpointMarkT) reader.getCheckpointMark();
+      return (CheckpointMarkT) unboundedReader.getCheckpointMark();
     }
 
     public Instant getWatermark() {
-      return reader.getWatermark();
+      return unboundedReader.getWatermark();
     }
   }
 
   /**
    * {@link Callable} which creates a {@link Reader}.
    */
-  private class ReaderLoader implements Callable<BoundedReader<T>> {
+  private class ReaderLoader implements Callable<Source.Reader<T>> {
     private final PipelineOptions options;
     private final CheckpointMarkT checkpointMark;
 
-    ReaderLoader(PipelineOptions options, CheckpointMarkT checkpointMark) {
+    ReaderLoader(final PipelineOptions options, final CheckpointMarkT checkpointMark) {
       this.options = options;
       this.checkpointMark = checkpointMark;
     }
 
     @Override
-    public BoundedReader<T> call() throws Exception {
+    public Reader call() throws Exception {
       LOG.info("No cached reader found for split: [" + source
           + "]. Creating new reader at checkpoint mark " + checkpointMark);
       return new Reader(source.createReader(options, checkpointMark));
@@ -321,12 +317,14 @@ public class MicrobatchSource<T, CheckpointMarkT extends UnboundedSource.Checkpo
    * Listener to be called when a reader is removed from {@link MicrobatchSource#readerCache}.
    */
   private static class ReaderCacheRemovalListener
-      implements RemovalListener<MicrobatchSource<?, ?>, BoundedReader<?>>
{
-    @Override public void onRemoval(
-        RemovalNotification<MicrobatchSource<?, ?>, BoundedReader<?>> notification)
{
+      implements RemovalListener<MicrobatchSource<?, ?>, Source.Reader<?>>
{
+
+    @Override
+    public void onRemoval(
+        final RemovalNotification<MicrobatchSource<?, ?>, Source.Reader<?>>
notification) {
       try {
         notification.getValue().close();
-      } catch (IOException e) {
+      } catch (final IOException e) {
         throw new RuntimeException(e);
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/3b6f4f65/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index d33529c..d8f414a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -104,7 +104,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
     try {
       this.numPartitions =
           createMicrobatchSource()
-              .split(initialParallelism, options)
+              .split(options)
               .size();
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -124,8 +124,14 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
 
 
   private MicrobatchSource<T, CheckpointMarkT> createMicrobatchSource() {
-    return new MicrobatchSource<>(unboundedSource, boundReadDuration, initialParallelism,
-        boundMaxRecords, -1, id(), readerCacheInterval);
+    return new MicrobatchSource<>(
+        unboundedSource,
+        boundReadDuration,
+        initialParallelism,
+        boundMaxRecords,
+        -1,
+        id(),
+        readerCacheInterval);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/3b6f4f65/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
index b99ae10..e294359 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceRDD.java
@@ -258,12 +258,13 @@ public class SourceRDD {
     @Override
     public Partition[] getPartitions() {
       try {
-        List<? extends Source<T>> partitionedSources = microbatchSource.split(
-            -1 /* ignored */, runtimeContext.getPipelineOptions());
-        Partition[] partitions = new CheckpointableSourcePartition[partitionedSources.size()];
+        final List<? extends Source<T>> partitionedSources =
+            microbatchSource.split(runtimeContext.getPipelineOptions());
+        final Partition[] partitions = new CheckpointableSourcePartition[partitionedSources.size()];
         for (int i = 0; i < partitionedSources.size(); i++) {
-          partitions[i] = new CheckpointableSourcePartition<>(id(), i, partitionedSources.get(i),
-              EmptyCheckpointMark.get());
+          partitions[i] =
+              new CheckpointableSourcePartition<>(
+                  id(), i, partitionedSources.get(i), EmptyCheckpointMark.get());
         }
         return partitions;
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/beam/blob/3b6f4f65/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
index c9de7fa..30ee639 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/StateSpecFunctions.java
@@ -34,7 +34,6 @@ import org.apache.beam.runners.spark.io.SparkUnboundedSource.Metadata;
 import org.apache.beam.runners.spark.metrics.SparkMetricsContainer;
 import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.Source;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.metrics.MetricsContainer;
@@ -124,7 +123,7 @@ public class StateSpecFunctions {
 
         // Initial high/low watermarks.
         Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;
-        Instant highWatermark;
+        final Instant highWatermark;
 
         // if state exists, use it, otherwise it's first time so use the startCheckpointMark.
         // startCheckpointMark may be EmptyCheckpointMark (the Spark Java API tries to apply
@@ -146,13 +145,15 @@ public class StateSpecFunctions {
         }
 
         // create reader.
-        BoundedSource.BoundedReader<T> reader;
-        Stopwatch stopwatch = Stopwatch.createStarted();
+        final MicrobatchSource.Reader/*<T>*/ microbatchReader;
+        final Stopwatch stopwatch = Stopwatch.createStarted();
         long readDurationMillis = 0;
 
         try {
-          reader = microbatchSource.getOrCreateReader(runtimeContext.getPipelineOptions(),
-              checkpointMark);
+          microbatchReader =
+              (MicrobatchSource.Reader)
+                  microbatchSource.getOrCreateReader(runtimeContext.getPipelineOptions(),
+                                                     checkpointMark);
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
@@ -165,16 +166,19 @@ public class StateSpecFunctions {
                 GlobalWindow.Coder.INSTANCE);
         try {
           // measure how long a read takes per-partition.
-          boolean finished = !reader.start();
+          boolean finished = !microbatchReader.start();
           while (!finished) {
-            WindowedValue<T> wv = WindowedValue.of(reader.getCurrent(),
-                reader.getCurrentTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING);
+            final WindowedValue<T> wv =
+                WindowedValue.of((T) microbatchReader.getCurrent(),
+                                 microbatchReader.getCurrentTimestamp(),
+                                 GlobalWindow.INSTANCE,
+                                 PaneInfo.NO_FIRING);
             readValues.add(CoderHelpers.toByteArray(wv, coder));
-            finished = !reader.advance();
+            finished = !microbatchReader.advance();
           }
 
           // end-of-read watermark is the high watermark, but don't allow decrease.
-          Instant sourceWatermark = ((MicrobatchSource.Reader) reader).getWatermark();
+          final Instant sourceWatermark = microbatchReader.getWatermark();
           highWatermark = sourceWatermark.isAfter(lowWatermark) ? sourceWatermark : lowWatermark;
 
           readDurationMillis = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
@@ -186,8 +190,8 @@ public class StateSpecFunctions {
 
           // if the Source does not supply a CheckpointMark skip updating the state.
           @SuppressWarnings("unchecked")
-          CheckpointMarkT finishedReadCheckpointMark =
-              (CheckpointMarkT) ((MicrobatchSource.Reader) reader).getCheckpointMark();
+          final CheckpointMarkT finishedReadCheckpointMark =
+              (CheckpointMarkT) microbatchReader.getCheckpointMark();
           byte[] codedCheckpoint = new byte[0];
           if (finishedReadCheckpointMark != null) {
             codedCheckpoint = CoderHelpers.toByteArray(finishedReadCheckpointMark, checkpointCoder);


Mime
View raw message