Repository: beam
Updated Branches:
refs/heads/master d6f6351f1 -> 75b6567f6
Port the DirectRunner to the Batch Surgery API
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fdc2eddb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fdc2eddb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fdc2eddb
Branch: refs/heads/master
Commit: fdc2eddb633ed0e0dde80948d4588757e7a552e6
Parents: 85af898
Author: Thomas Groh <tgroh@google.com>
Authored: Fri Mar 17 16:39:58 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Wed Mar 22 18:11:54 2017 -0700
----------------------------------------------------------------------
.../beam/runners/direct/DirectRunner.java | 60 +++++++++++---------
1 file changed, 33 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/fdc2eddb/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 94f0521..62df6c8 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
@@ -44,9 +45,7 @@ import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.metrics.MetricResults;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverride;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.PipelineRunner;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.Aggregator;
@@ -260,10 +259,7 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult>
{
@Override
public DirectPipelineResult run(Pipeline pipeline) {
- for (Map.Entry<PTransformMatcher, PTransformOverrideFactory> override :
- defaultTransformOverrides().entrySet()) {
- pipeline.replace(PTransformOverride.of(override.getKey(), override.getValue()));
- }
+ pipeline.replaceAll(defaultTransformOverrides());
MetricsEnvironment.setMetricsSupported(true);
DirectGraphVisitor graphVisitor = new DirectGraphVisitor();
pipeline.traverseTopologically(graphVisitor);
@@ -321,27 +317,37 @@ public class DirectRunner extends PipelineRunner<DirectPipelineResult>
{
* iteration order based on the order at which elements are added to it.
*/
@SuppressWarnings("rawtypes")
- private Map<PTransformMatcher, PTransformOverrideFactory> defaultTransformOverrides()
{
- return ImmutableMap.<PTransformMatcher, PTransformOverrideFactory>builder()
- .put(
- PTransformMatchers.writeWithRunnerDeterminedSharding(),
- new WriteWithShardingFactory()) /* Uses a view internally. */
- .put(
- PTransformMatchers.classEqualTo(CreatePCollectionView.class),
- new ViewOverrideFactory()) /* Uses pardos and GBKs */
- .put(
- PTransformMatchers.classEqualTo(TestStream.class),
- new DirectTestStreamFactory(this)) /* primitive */
- // SplittableParDo is implemented in terms of GroupByKeys and Primitives
- .put(PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory())
- // state and timer ParDos are implemented in terms of GroupByKeys and Primitives
- .put(PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory())
- .put(
- PTransformMatchers.classEqualTo(GBKIntoKeyedWorkItems.class),
- new DirectGBKIntoKeyedWorkItemsOverrideFactory()) /* Returns a GBKO */
- .put(
- PTransformMatchers.classEqualTo(GroupByKey.class),
- new DirectGroupByKeyOverrideFactory()) /* returns two chained primitives. */
+ private List<PTransformOverride> defaultTransformOverrides() {
+ return ImmutableList.<PTransformOverride>builder()
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.writeWithRunnerDeterminedSharding(),
+ new WriteWithShardingFactory())) /* Uses a view internally. */
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(CreatePCollectionView.class),
+ new ViewOverrideFactory())) /* Uses pardos and GBKs */
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(TestStream.class),
+ new DirectTestStreamFactory(this))) /* primitive */
+ // SplittableParMultiDo is implemented in terms of nonsplittable simple ParDos and
extra
+ // primitives
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.splittableParDoMulti(), new ParDoMultiOverrideFactory()))
+ // state and timer pardos are implemented in terms of simple ParDos and extra primitives
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.stateOrTimerParDoMulti(), new ParDoMultiOverrideFactory()))
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(GBKIntoKeyedWorkItems.class),
+ new DirectGBKIntoKeyedWorkItemsOverrideFactory())) /* Returns a GBKO */
+ .add(
+ PTransformOverride.of(
+ PTransformMatchers.classEqualTo(GroupByKey.class),
+ new DirectGroupByKeyOverrideFactory())) /* returns two chained primitives.
*/
.build();
}
|