beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From stasle...@apache.org
Subject [1/2] beam git commit: [BEAM-2074, BEAM-2073] Fixed SourceDStream's rate control usage.
Date Mon, 01 May 2017 12:32:42 GMT
Repository: beam
Updated Branches:
  refs/heads/master 254470e62 -> 535761a74


[BEAM-2074,BEAM-2073] Fixed SourceDStream's rate control usage.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fcb61ae6
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fcb61ae6
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fcb61ae6

Branch: refs/heads/master
Commit: fcb61ae603ba61ed94bbbe75f4d5c8257eaa1c32
Parents: 254470e
Author: Stas Levin <staslevin@apache.org>
Authored: Wed Apr 26 11:46:31 2017 +0300
Committer: Stas Levin <staslevin@apache.org>
Committed: Mon May 1 14:13:43 2017 +0300

----------------------------------------------------------------------
 .../beam/runners/spark/io/SourceDStream.java    | 61 +++++++++++++-------
 1 file changed, 40 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fcb61ae6/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
index d8f414a..20aca5f 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/io/SourceDStream.java
@@ -99,7 +99,7 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
     this.initialParallelism = ssc().sparkContext().defaultParallelism();
     checkArgument(this.initialParallelism > 0, "Number of partitions must be greater than
zero.");
 
-    this.boundMaxRecords = boundMaxRecords > 0 ? boundMaxRecords : rateControlledMaxRecords();
+    this.boundMaxRecords = boundMaxRecords;
 
     try {
       this.numPartitions =
@@ -124,14 +124,34 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
 
 
   private MicrobatchSource<T, CheckpointMarkT> createMicrobatchSource() {
-    return new MicrobatchSource<>(
-        unboundedSource,
-        boundReadDuration,
-        initialParallelism,
-        boundMaxRecords,
-        -1,
-        id(),
-        readerCacheInterval);
+    return new MicrobatchSource<>(unboundedSource,
+                                  boundReadDuration,
+                                  initialParallelism,
+                                  computeReadMaxRecords(),
+                                  -1,
+                                  id(),
+                                  readerCacheInterval);
+  }
+
+  private long computeReadMaxRecords() {
+    if (boundMaxRecords > 0) {
+      LOG.info("Max records per batch has been set to {}, as configured in the PipelineOptions.",
+               boundMaxRecords);
+      return boundMaxRecords;
+    } else {
+      final scala.Option<Long> rateControlledMax = rateControlledMaxRecords();
+      if (rateControlledMax.isDefined()) {
+        LOG.info("Max records per batch has been set to {}, as advised by the rate controller.",
+                 rateControlledMax.get());
+        return rateControlledMax.get();
+      } else {
+        LOG.info("Max records per batch has not been limited by neither configuration "
+                     + "nor the rate controller, and will remain unlimited for the current
batch "
+                     + "({}).",
+                 Long.MAX_VALUE);
+        return Long.MAX_VALUE;
+      }
+    }
   }
 
   @Override
@@ -170,19 +190,18 @@ class SourceDStream<T, CheckpointMarkT extends UnboundedSource.CheckpointMark>
 
   //---- Bound by records.
 
-  private long rateControlledMaxRecords() {
-    scala.Option<RateController> rateControllerOption = rateController();
-    if (rateControllerOption.isDefined()) {
-      long rateLimitPerSecond = rateControllerOption.get().getLatestRate();
-      if (rateLimitPerSecond > 0) {
-        long totalRateLimit =
-            rateLimitPerSecond * (ssc().graph().batchDuration().milliseconds() / 1000);
-        LOG.info("RateController set limit to {}", totalRateLimit);
-        return totalRateLimit;
-      }
+  private scala.Option<Long> rateControlledMaxRecords() {
+    final scala.Option<RateController> rateControllerOption = rateController();
+    final scala.Option<Long> rateLimitPerBatch;
+    final long rateLimitPerSec;
+    if (rateControllerOption.isDefined()
+        && ((rateLimitPerSec = rateControllerOption.get().getLatestRate()) > 0))
{
+      final long batchDurationSec = ssc().graph().batchDuration().milliseconds() / 1000;
+      rateLimitPerBatch = scala.Option.apply(rateLimitPerSec * batchDurationSec);
+    } else {
+      rateLimitPerBatch = scala.Option.empty();
     }
-    LOG.info("RateController had nothing to report, default is Long.MAX_VALUE");
-    return Long.MAX_VALUE;
+    return rateLimitPerBatch;
   }
 
   private final RateController rateController = new SourceRateController(id(),


Mime
View raw message