beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [2/3] beam git commit: Add a Base OverrideFactory class for 1-to-1 overrides
Date Wed, 22 Feb 2017 17:36:42 GMT
Add a Base OverrideFactory class for 1-to-1 overrides

These overrides are relatively common, and this reduces the
reimplementation of mapping singletons and casting them.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/926385c4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/926385c4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/926385c4

Branch: refs/heads/master
Commit: 926385c474f652117d814e966f476ca6280ba506
Parents: 453e37b
Author: Thomas Groh <tgroh@google.com>
Authored: Thu Feb 16 19:14:23 2017 -0800
Committer: Thomas Groh <tgroh@google.com>
Committed: Wed Feb 22 09:36:17 2017 -0800

----------------------------------------------------------------------
 .../SingleInputOutputOverrideFactory.java       |  50 ++++++++
 .../SingleInputOutputOverrideFactoryTest.java   | 114 +++++++++++++++++++
 2 files changed, 164 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/926385c4/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
new file mode 100644
index 0000000..43bf556
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import com.google.common.collect.Iterables;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+
+/**
+ * A {@link PTransformOverrideFactory} which consumes from a {@link PValue} and produces
a
+ * {@link PValue}. {@link #getInput(List, Pipeline)} and {@link #mapOutputs(List, PValue)}
are
+ * implemented.
+ */
+public abstract class SingleInputOutputOverrideFactory<
+        InputT extends PValue,
+        OutputT extends PValue,
+        TransformT extends PTransform<InputT, OutputT>>
+    implements PTransformOverrideFactory<InputT, OutputT, TransformT> {
+  @Override
+  public final InputT getInput(List<TaggedPValue> inputs, Pipeline p) {
+    return (InputT) Iterables.getOnlyElement(inputs).getValue();
+  }
+
+  @Override
+  public final Map<PValue, ReplacementOutput> mapOutputs(
+      List<TaggedPValue> outputs, OutputT newOutput) {
+    return ReplacementOutputs.singleton(outputs, newOutput);
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/926385c4/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
new file mode 100644
index 0000000..b4cdd1f
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.Iterables;
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PValue;
+import org.hamcrest.Matchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link SingleInputOutputOverrideFactory}. */
+@RunWith(JUnit4.class)
+public class SingleInputOutputOverrideFactoryTest implements Serializable {
+  @Rule public transient ExpectedException thrown = ExpectedException.none();
+
+  @Rule
+  public transient TestPipeline pipeline =
+      TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+  private transient SingleInputOutputOverrideFactory<
+          PCollection<? extends Integer>, PCollection<Integer>, MapElements<Integer,
Integer>>
+      factory =
+          new SingleInputOutputOverrideFactory<
+              PCollection<? extends Integer>, PCollection<Integer>,
+              MapElements<Integer, Integer>>() {
+            @Override
+            public PTransform<PCollection<? extends Integer>, PCollection<Integer>>
+                getReplacementTransform(MapElements<Integer, Integer> transform) {
+              return transform;
+            }
+          };
+
+  private SimpleFunction<Integer, Integer> fn = new SimpleFunction<Integer, Integer>()
{
+      @Override
+      public Integer apply(Integer input) {
+        return input - 1;
+      }
+    };
+
+  @Test
+  public void testGetInput() {
+    PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
+    assertThat(
+        factory.getInput(input.expand(), pipeline),
+        Matchers.<PCollection<? extends Integer>>equalTo(input));
+  }
+
+  @Test
+  public void testGetInputMultipleInputsFails() {
+    PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
+    PCollection<Integer> otherInput = pipeline.apply("OtherCreate", Create.of(1, 2,
3));
+
+    thrown.expect(IllegalArgumentException.class);
+    factory.getInput(PCollectionList.of(input).and(otherInput).expand(), pipeline);
+  }
+
+  @Test
+  public void testMapOutputs() {
+    PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
+    PCollection<Integer> output = input.apply("Map", MapElements.via(fn));
+    PCollection<Integer> reappliedOutput = input.apply("ReMap", MapElements.via(fn));
+    Map<PValue, ReplacementOutput> replacementMap =
+        factory.mapOutputs(output.expand(), reappliedOutput);
+    assertThat(
+        replacementMap,
+        Matchers.<PValue, ReplacementOutput>hasEntry(
+            reappliedOutput,
+            ReplacementOutput.of(
+                Iterables.getOnlyElement(output.expand()),
+                Iterables.getOnlyElement(reappliedOutput.expand()))));
+  }
+
+  @Test
+  public void testMapOutputsMultipleOriginalOutputsFails() {
+    PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3));
+    PCollection<Integer> output = input.apply("Map", MapElements.via(fn));
+    PCollection<Integer> reappliedOutput = input.apply("ReMap", MapElements.via(fn));
+    thrown.expect(IllegalArgumentException.class);
+    Map<PValue, ReplacementOutput> replacementMap =
+        factory.mapOutputs(
+            PCollectionList.of(output).and(input).and(reappliedOutput).expand(), reappliedOutput);
+  }
+}


Mime
View raw message