beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/3] beam git commit: Port the DirectRunner to the Batch Surgery API
Date Thu, 23 Mar 2017 01:13:25 GMT
Repository: beam
Updated Branches:
  refs/heads/master d6f6351f1 -> 75b6567f6


Port the DirectRunner to the Batch Surgery API


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fdc2eddb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fdc2eddb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fdc2eddb

Branch: refs/heads/master
Commit: fdc2eddb633ed0e0dde80948d4588757e7a552e6
Parents: 85af898
Author: Thomas Groh <tgroh@google.com>
Authored: Fri Mar 17 16:39:58 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Wed Mar 22 18:11:54 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       | 60 +++++++++++---------
 1 file changed, 33 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fdc2eddb/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 94f0521..62df6c8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -26,6 +26,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
@@ -44,9 +45,7 @@ import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PTransformMatcher;
 import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.testing.TestStream;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -260,10 +259,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult>
{
 
   @Override
   public DirectPipelineResult run(Pipeline pipeline) {
-    for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
-        defaultTransformOverrides().entrySet()) {
-      pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
-    }
+    pipeline.replaceAll(defaultTransformOverrides());
     MetricsEnvironment.setMetricsSupported(true);
     DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
     pipeline.traverseTopologically(graphVisitor);
@@ -321,27 +317,37 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult>
{
    * iteration order based on the order at which elements are added to it.
    */
   @SuppressWarnings("rawtypes")
-  private Map<PTransformMatcher, PTransformOverrideFactory> defaultTransformOverrides()
{
-    return ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
-        .put(
-            PTransformMatchers.writeWithRunnerDeterminedSharding(),
-            new WriteWithShardingFactory()) /* Uses a view internally. */
-        .put(
-            PTransformMatchers.classEqualTo(CreatePCollectionView.class),
-            new ViewOverrideFactory()) /* Uses pardos and GBKs */
-        .put(
-            PTransformMatchers.classEqualTo(TestStream.class),
-            new DirectTestStreamFactory(this)) /* primitive */
-        // SplittableParDo is implemented in terms of GroupByKeys and Primitives
-        .put(PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory())
-        // state and timer ParDos are implemented in terms of GroupByKeys and Primitives
-        .put(PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory())
-        .put(
-            PTransformMatchers.classEqualTo(GBKIntoKeyedWorkItems.class),
-            new DirectGBKIntoKeyedWorkItemsOverrideFactory()) /* Returns a GBKO */
-        .put(
-            PTransformMatchers.classEqualTo(GroupByKey.class),
-            new DirectGroupByKeyOverrideFactory()) /* returns two chained primitives. */
+  private List<PTransformOverride> defaultTransformOverrides() {
+    return ImmutableList.<PTransformOverride>builder()
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.writeWithRunnerDeterminedSharding(),
+                new WriteWithShardingFactory())) /* Uses a view internally. */
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(CreatePCollectionView.class),
+                new ViewOverrideFactory())) /* Uses pardos and GBKs */
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(TestStream.class),
+                new DirectTestStreamFactory(this))) /* primitive */
+        // SplittableParMultiDo is implemented in terms of nonsplittable simple ParDos and
extra
+        // primitives
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory()))
+        // state and timer pardos are implemented in terms of simple ParDos and extra primitives
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory()))
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(GBKIntoKeyedWorkItems.class),
+                new DirectGBKIntoKeyedWorkItemsOverrideFactory())) /* Returns a GBKO */
+        .add(
+            PTransformOverride.of(
+                PTransformMatchers.classEqualTo(GroupByKey.class),
+                new DirectGroupByKeyOverrideFactory())) /* returns two chained primitives.
*/
         .build();
   }
 


Mime
View raw message