crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jwi...@apache.org
Subject crunch git commit: CRUNCH-665: Add crunch.max.poll.interval property
Date Thu, 08 Mar 2018 23:53:53 GMT
Repository: crunch
Updated Branches:
  refs/heads/master 6c0ae4131 -> 23a2da07d


CRUNCH-665: Add crunch.max.poll.interval property

Signed-off-by: Josh Wills <jwills@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/23a2da07
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/23a2da07
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/23a2da07

Branch: refs/heads/master
Commit: 23a2da07d3a80b3650568dde8973acc941f4f25d
Parents: 6c0ae41
Author: Clément MATHIEU <clement@unportant.info>
Authored: Wed Mar 7 10:13:51 2018 +0100
Committer: Josh Wills <jwills@apache.org>
Committed: Thu Mar 8 13:26:03 2018 -0800

----------------------------------------------------------------------
 .../apache/crunch/impl/mr/exec/MRExecutor.java  | 20 +++++++++++++++++---
 .../crunch/impl/mr/run/RuntimeParameters.java   |  2 ++
 2 files changed, 19 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/23a2da07/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
index 87546e1..45084f2 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/exec/MRExecutor.java
@@ -31,6 +31,7 @@ import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl;
 import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 import org.apache.crunch.impl.mr.MRJob;
 import org.apache.crunch.impl.mr.MRPipelineExecution;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
 import org.apache.crunch.materialize.MaterializableIterable;
 import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
@@ -90,9 +91,7 @@ public class MRExecutor extends AbstractFuture<PipelineResult> implements
MRPipe
         monitorLoop();
       }
     });
-    this.pollInterval = isLocalMode()
-      ? new CappedExponentialCounter(50, 1000)
-      : new CappedExponentialCounter(500, 10000);
+    this.pollInterval = getPollInterval(conf);
 
     this.namedDotFiles = new ConcurrentHashMap<String, String>();
   }
@@ -255,6 +254,21 @@ public class MRExecutor extends AbstractFuture<PipelineResult>
implements MRPipe
     }
   }
 
+  private CappedExponentialCounter getPollInterval(Configuration conf) {
+    long maxPollInterval = conf.getLong(RuntimeParameters.MAX_POLL_INTERVAL, -1);
+
+    if (maxPollInterval <= 0) {
+      if (isLocalMode()) {
+        maxPollInterval = 1_000;
+      } else {
+        maxPollInterval = 10_000;
+      }
+    }
+
+    long minPollInterval = Math.max(maxPollInterval / 20, 1);
+    return new CappedExponentialCounter(minPollInterval, maxPollInterval);
+  }
+
   private static boolean isLocalMode() {
     Configuration conf = new Configuration();
     String frameworkName = conf.get("mapreduce.framework.name", "");

http://git-wip-us.apache.org/repos/asf/crunch/blob/23a2da07/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
index fe6f7ee..a36b910 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -45,6 +45,8 @@ public final class RuntimeParameters {
 
   public static final String FILE_TARGET_MAX_THREADS = "crunch.file.target.max.threads";
 
+  public static final String MAX_POLL_INTERVAL = "crunch.max.poll.interval";
+
   // Not instantiated
   private RuntimeParameters() {
   }


Mime
View raw message