beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [2/2] beam git commit: Add runners/core-construction-java
Date Thu, 16 Feb 2017 03:07:22 GMT
Add runners/core-construction-java

This module contains pre-execution PipelineRunner utilities.

Move PTransformMatchers to core-construction-java


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1ffbc689
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1ffbc689
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1ffbc689

Branch: refs/heads/master
Commit: 1ffbc6899c6c3109da19f505c94d22f267ee0974
Parents: 013f118
Author: Thomas Groh <tgroh@google.com>
Authored: Wed Feb 15 12:57:52 2017 -0800
Committer: Thomas Groh <tgroh@google.com>
Committed: Wed Feb 15 19:07:09 2017 -0800

----------------------------------------------------------------------
 pom.xml                                         |   6 +
 runners/core-construction-java/pom.xml          | 137 ++++++++++
 .../core/construction/PTransformMatchers.java   | 161 +++++++++++
 .../core/construction/ReplacementOutputs.java   | 105 +++++++
 .../runners/core/construction/package-info.java |  22 ++
 .../construction/PTransformMatchersTest.java    | 273 +++++++++++++++++++
 .../construction/ReplacementOutputsTest.java    | 254 +++++++++++++++++
 .../beam/runners/core/PTransformMatchers.java   | 142 ----------
 .../beam/runners/core/ReplacementOutputs.java   | 105 -------
 .../runners/core/PTransformMatchersTest.java    | 273 -------------------
 .../runners/core/ReplacementOutputsTest.java    | 254 -----------------
 runners/direct-java/pom.xml                     |   7 +-
 ...ectGBKIntoKeyedWorkItemsOverrideFactory.java |   2 +-
 .../direct/DirectGroupByKeyOverrideFactory.java |   2 +-
 .../direct/ParDoMultiOverrideFactory.java       |   2 +-
 .../ParDoSingleViaMultiOverrideFactory.java     |   2 +-
 .../direct/TestStreamEvaluatorFactory.java      |   2 +-
 .../runners/direct/ViewEvaluatorFactory.java    |   2 +-
 runners/pom.xml                                 |   1 +
 19 files changed, 971 insertions(+), 781 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d53502e..ac34016 100644
--- a/pom.xml
+++ b/pom.xml
@@ -421,6 +421,12 @@
 
       <dependency>
         <groupId>org.apache.beam</groupId>
+        <artifactId>beam-runners-core-construction-java</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>org.apache.beam</groupId>
         <artifactId>beam-runners-core-java</artifactId>
         <version>${project.version}</version>
       </dependency>

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/runners/core-construction-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/pom.xml b/runners/core-construction-java/pom.xml
new file mode 100644
index 0000000..868f743
--- /dev/null
+++ b/runners/core-construction-java/pom.xml
@@ -0,0 +1,137 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <artifactId>beam-runners-parent</artifactId>
+    <groupId>org.apache.beam</groupId>
+    <version>0.6.0-SNAPSHOT</version>
+    <relativePath>../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>beam-runners-core-construction-java</artifactId>
+  <name>Apache Beam :: Runners :: Core Java Construction</name>
+  <description>Beam Runners Core provides utilities to aid runner authors interact with a Pipeline
+    prior to execution.
+  </description>
+
+  <packaging>jar</packaging>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <excludedGroups>
+            org.apache.beam.sdk.testing.NeedsRunner
+          </excludedGroups>
+          <systemPropertyVariables>
+            <beamUseDummyRunner>true</beamUseDummyRunner>
+          </systemPropertyVariables>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-shade-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>bundle-and-repackage</id>
+            <phase>package</phase>
+            <goals>
+              <goal>shade</goal>
+            </goals>
+            <configuration>
+              <shadeTestJar>true</shadeTestJar>
+              <artifactSet>
+                <includes>
+                  <include>com.google.guava:guava</include>
+                </includes>
+              </artifactSet>
+              <filters>
+                <filter>
+                  <artifact>*:*</artifact>
+                  <excludes>
+                    <exclude>META-INF/*.SF</exclude>
+                    <exclude>META-INF/*.DSA</exclude>
+                    <exclude>META-INF/*.RSA</exclude>
+                  </excludes>
+                </filter>
+              </filters>
+              <relocations>
+                <!-- TODO: Once ready, change the following pattern to 'com'
+                   only, exclude 'org.apache.beam.**', and remove
+                   the second relocation. -->
+                <relocation>
+                  <pattern>com.google.common</pattern>
+                  <shadedPattern>
+                    org.apache.beam.runners.core.construction.repackaged.com.google.common
+                  </shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.google.thirdparty</pattern>
+                  <shadedPattern>
+                    org.apache.beam.runners.core.construction.repackaged.com.google.thirdparty
+                  </shadedPattern>
+                </relocation>
+              </relocations>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <!-- test dependencies -->
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
new file mode 100644
index 0000000..2823df8
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature.ProcessElementMethod;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+
+/**
+ * A {@link PTransformMatcher} that matches {@link PTransform PTransforms} based on the class of the
+ * transform.
+ *
+ * <p>Once {@link PTransform PTransforms} have URNs, this will be removed and replaced with a
+ * UrnPTransformMatcher.
+ */
+@Experimental(Kind.CORE_RUNNERS_ONLY)
+public class PTransformMatchers {
+  private PTransformMatchers() {}
+
+  /**
+   * Returns a {@link PTransformMatcher} which matches a {@link PTransform} if any of the provided
+   * matchers match the {@link PTransform}.
+   */
+  public static PTransformMatcher anyOf(
+      final PTransformMatcher matcher, final PTransformMatcher... matchers) {
+    return new PTransformMatcher() {
+      @Override
+      public boolean matches(AppliedPTransform<?, ?, ?> application) {
+        for (PTransformMatcher component : matchers) {
+          if (component.matches(application)) {
+            return true;
+          }
+        }
+        return matcher.matches(application);
+      }
+    };
+  }
+
+  /**
+   * Returns a {@link PTransformMatcher} that matches a {@link PTransform} if the class of the
+   * {@link PTransform} is equal to the {@link Class} provided ot this matcher.
+   * @param clazz
+   * @return
+   */
+  public static PTransformMatcher classEqualTo(Class<? extends PTransform> clazz) {
+    return new EqualClassPTransformMatcher(clazz);
+  }
+
+  private static class EqualClassPTransformMatcher implements PTransformMatcher {
+    private final Class<? extends PTransform> clazz;
+
+    private EqualClassPTransformMatcher(Class<? extends PTransform> clazz) {
+      this.clazz = clazz;
+    }
+
+    @Override
+    public boolean matches(AppliedPTransform<?, ?, ?> application) {
+      return application.getTransform().getClass().equals(clazz);
+    }
+  }
+
+  /**
+   * A {@link PTransformMatcher} that matches a {@link ParDo.Bound} containing a {@link DoFn} that
+   * is splittable, as signified by {@link ProcessElementMethod#isSplittable()}.
+   */
+  public static PTransformMatcher splittableParDoSingle() {
+    return new PTransformMatcher() {
+      @Override
+      public boolean matches(AppliedPTransform<?, ?, ?> application) {
+        PTransform<?, ?> transform = application.getTransform();
+        if (transform instanceof ParDo.Bound) {
+          DoFn<?, ?> fn = ((ParDo.Bound<?, ?>) transform).getFn();
+          DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn);
+          return signature.processElement().isSplittable();
+        }
+        return false;
+      }
+    };
+  }
+
+  /**
+   * A {@link PTransformMatcher} that matches a {@link ParDo.Bound} containing a {@link DoFn} that
+   * uses state or timers, as specified by {@link DoFnSignature#usesState()} and
+   * {@link DoFnSignature#usesTimers()}.
+   */
+  public static PTransformMatcher stateOrTimerParDoSingle() {
+    return new PTransformMatcher() {
+      @Override
+      public boolean matches(AppliedPTransform<?, ?, ?> application) {
+        PTransform<?, ?> transform = application.getTransform();
+        if (transform instanceof ParDo.Bound) {
+          DoFn<?, ?> fn = ((ParDo.Bound<?, ?>) transform).getFn();
+          DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn);
+          return signature.usesState() || signature.usesTimers();
+        }
+        return false;
+      }
+    };
+  }
+
+  /**
+   * A {@link PTransformMatcher} that matches a {@link ParDo.BoundMulti} containing a {@link DoFn}
+   * that is splittable, as signified by {@link ProcessElementMethod#isSplittable()}.
+   */
+  public static PTransformMatcher splittableParDoMulti() {
+    return new PTransformMatcher() {
+      @Override
+      public boolean matches(AppliedPTransform<?, ?, ?> application) {
+        PTransform<?, ?> transform = application.getTransform();
+        if (transform instanceof ParDo.BoundMulti) {
+          DoFn<?, ?> fn = ((ParDo.BoundMulti<?, ?>) transform).getFn();
+          DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn);
+          return signature.processElement().isSplittable();
+        }
+        return false;
+      }
+    };
+  }
+
+  /**
+   * A {@link PTransformMatcher} that matches a {@link ParDo.BoundMulti} containing a {@link DoFn}
+   * that uses state or timers, as specified by {@link DoFnSignature#usesState()} and
+   * {@link DoFnSignature#usesTimers()}.
+   */
+  public static PTransformMatcher stateOrTimerParDoMulti() {
+    return new PTransformMatcher() {
+      @Override
+      public boolean matches(AppliedPTransform<?, ?, ?> application) {
+        PTransform<?, ?> transform = application.getTransform();
+        if (transform instanceof ParDo.BoundMulti) {
+          DoFn<?, ?> fn = ((ParDo.BoundMulti<?, ?>) transform).getFn();
+          DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn);
+          return signature.usesState() || signature.usesTimers();
+        }
+        return false;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java
new file mode 100644
index 0000000..11b4449
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.construction;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
+
+/**
+ * Utility methods for creating {@link ReplacementOutput} for known styles of {@link POutput}.
+ */
+public class ReplacementOutputs {
+  private ReplacementOutputs() {}
+
+  public static Map<PValue, ReplacementOutput> singleton(
+      List<TaggedPValue> original, PValue replacement) {
+    TaggedPValue taggedReplacement = Iterables.getOnlyElement(replacement.expand());
+    return ImmutableMap.<PValue, ReplacementOutput>builder()
+        .put(
+            taggedReplacement.getValue(),
+            ReplacementOutput.of(Iterables.getOnlyElement(original), taggedReplacement))
+        .build();
+  }
+
+  public static Map<PValue, ReplacementOutput> ordered(
+      List<TaggedPValue> original, POutput replacement) {
+    ImmutableMap.Builder<PValue, ReplacementOutput> result = ImmutableMap.builder();
+    List<TaggedPValue> replacements = replacement.expand();
+    checkArgument(
+        original.size() == replacements.size(),
+        "Original and Replacements must be the same size. Original: %s Replacement: %s",
+        original.size(),
+        replacements.size());
+    int i = 0;
+    for (TaggedPValue replacementPvalue : replacements) {
+      result.put(
+          replacementPvalue.getValue(), ReplacementOutput.of(original.get(i), replacementPvalue));
+      i++;
+    }
+    return result.build();
+  }
+
+  public static Map<PValue, ReplacementOutput> tagged(
+      List<TaggedPValue> original, POutput replacement) {
+    Map<TupleTag<?>, TaggedPValue> originalTags = new HashMap<>();
+    for (TaggedPValue value : original) {
+      TaggedPValue former = originalTags.put(value.getTag(), value);
+      checkArgument(
+          former == null || former.equals(value),
+          "Found two tags in an expanded output which map to different values: output: %s "
+              + "Values: %s and %s",
+          original,
+          former,
+          value);
+    }
+    ImmutableMap.Builder<PValue, ReplacementOutput> resultBuilder = ImmutableMap.builder();
+    Set<TupleTag<?>> missingTags = new HashSet<>(originalTags.keySet());
+    for (TaggedPValue replacementValue : replacement.expand()) {
+      TaggedPValue mapped = originalTags.get(replacementValue.getTag());
+      checkArgument(
+          mapped != null,
+          "Missing original output for Tag %s and Value %s Between original %s and replacement %s",
+          replacementValue.getTag(),
+          replacementValue.getValue(),
+          original,
+          replacement.expand());
+      resultBuilder.put(
+          replacementValue.getValue(), ReplacementOutput.of(mapped, replacementValue));
+      missingTags.remove(replacementValue.getTag());
+    }
+    ImmutableMap<PValue, ReplacementOutput> result = resultBuilder.build();
+    checkArgument(
+        missingTags.isEmpty(),
+        "Missing replacement for tags %s. Encountered tags: %s",
+        missingTags,
+        result.keySet());
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/package-info.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/package-info.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/package-info.java
new file mode 100644
index 0000000..314c02c
--- /dev/null
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Provides utilities for Beam runner authors, prior to execution.
+ */
+package org.apache.beam.runners.core.construction;

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
new file mode 100644
index 0000000..8218762
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.base.MoreObjects;
+import java.io.Serializable;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.runners.PTransformMatcher;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.AppliedPTransform;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.Timer;
+import org.apache.beam.sdk.util.TimerSpec;
+import org.apache.beam.sdk.util.TimerSpecs;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link PTransformMatcher}.
+ */
+@RunWith(JUnit4.class)
+public class PTransformMatchersTest implements Serializable {
+  @Rule
+  public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+  /**
+   * Gets the {@link AppliedPTransform} that has a created {@code PCollection<KV<String, Integer>>}
+   * as input.
+   */
+  private AppliedPTransform<?, ?, ?> getAppliedTransform(PTransform pardo) {
+    PCollection<KV<String, Integer>> input =
+        PCollection.createPrimitiveOutputInternal(
+            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+    PCollection<Integer> output =
+        PCollection.createPrimitiveOutputInternal(
+            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+
+    return AppliedPTransform.of("pardo", input.expand(), output.expand(), pardo, p);
+  }
+
+  @Test
+  public void classEqualToMatchesSameClass() {
+    PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.Bound.class);
+    AppliedPTransform<?, ?, ?> application =
+        getAppliedTransform(
+            ParDo.of(
+                new DoFn<KV<String, Integer>, Integer>() {
+                  @ProcessElement
+                  public void doStuff(ProcessContext ctxt) {}
+                }));
+
+    assertThat(matcher.matches(application), is(true));
+  }
+
+  @Test
+  public void classEqualToDoesNotMatchSubclass() {
+    class MyPTransform extends PTransform<PCollection<KV<String, Integer>>, PCollection<Integer>> {
+      @Override
+      public PCollection<Integer> expand(PCollection<KV<String, Integer>> input) {
+        return PCollection.createPrimitiveOutputInternal(
+            input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
+      }
+    }
+    PTransformMatcher matcher = PTransformMatchers.classEqualTo(MyPTransform.class);
+    MyPTransform subclass = new MyPTransform() {};
+
+    assertThat(subclass.getClass(), not(Matchers.<Class<?>>equalTo(MyPTransform.class)));
+    assertThat(subclass, instanceOf(MyPTransform.class));
+
+    AppliedPTransform<?, ?, ?> application =
+        getAppliedTransform(subclass);
+
+    assertThat(matcher.matches(application), is(false));
+  }
+
+  @Test
+  public void classEqualToDoesNotMatchUnrelatedClass() {
+    PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.Bound.class);
+    AppliedPTransform<?, ?, ?> application =
+        getAppliedTransform(Window.<KV<String, Integer>>into(new GlobalWindows()));
+
+    assertThat(matcher.matches(application), is(false));
+  }
+
+  private DoFn<KV<String, Integer>, Integer> doFn =
+      new DoFn<KV<String, Integer>, Integer>() {
+        @ProcessElement
+        public void simpleProcess(ProcessContext ctxt) {
+          ctxt.output(ctxt.element().getValue() + 1);
+        }
+      };
+  private abstract static class SomeTracker implements RestrictionTracker<Void> {}
+  private DoFn<KV<String, Integer>, Integer> splittableDoFn =
+      new DoFn<KV<String, Integer>, Integer>() {
+        @ProcessElement
+        public void processElement(ProcessContext context, SomeTracker tracker) {}
+
+        @GetInitialRestriction
+        public Void getInitialRestriction(KV<String, Integer> element) {
+          return null;
+        }
+
+        @NewTracker
+        public SomeTracker newTracker(Void restriction) {
+          return null;
+        }
+      };
+  private DoFn<KV<String, Integer>, Integer> doFnWithState =
+      new DoFn<KV<String, Integer>, Integer>() {
+        private final String stateId = "mystate";
+
+        @StateId(stateId)
+        private final StateSpec<Object, ValueState<Integer>> intState =
+            StateSpecs.value(VarIntCoder.of());
+
+        @ProcessElement
+        public void processElement(ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
+          Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
+          c.output(currentValue);
+          state.write(currentValue + 1);
+        }
+      };
+  private DoFn<KV<String, Integer>, Integer> doFnWithTimers =
+      new DoFn<KV<String, Integer>, Integer>() {
+        private final String timerId = "myTimer";
+
+        @TimerId(timerId)
+        private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+        @ProcessElement
+        public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
+          timer.setForNowPlus(Duration.standardSeconds(1));
+          context.output(3);
+        }
+
+        @OnTimer(timerId)
+        public void onTimer(OnTimerContext context) {
+          context.output(42);
+        }
+      };
+
+  /**
+   * Demonstrates that a {@link ParDo.Bound} does not match any ParDo matcher.
+   */
+  @Test
+  public void parDoSingle() {
+    AppliedPTransform<?, ?, ?> parDoApplication = getAppliedTransform(ParDo.of(doFn));
+
+    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
+  }
+
+  @Test
+  public void parDoSingleSplittable() {
+    AppliedPTransform<?, ?, ?> parDoApplication = getAppliedTransform(ParDo.of(splittableDoFn));
+    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(true));
+
+    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
+  }
+
+  @Test
+  public void parDoSingleWithState() {
+    AppliedPTransform<?, ?, ?> parDoApplication = getAppliedTransform(ParDo.of(doFnWithState));
+    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(true));
+
+    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
+  }
+
+  @Test
+  public void parDoSingleWithTimers() {
+    AppliedPTransform<?, ?, ?> parDoApplication =
+        getAppliedTransform(ParDo.of(doFnWithTimers));
+    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(true));
+
+    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
+  }
+
+  @Test
+  public void parDoMulti() {
+    AppliedPTransform<?, ?, ?> parDoApplication =
+        getAppliedTransform(
+            ParDo.of(doFn).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty()));
+
+    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
+  }
+
+  @Test
+  public void parDoMultiSplittable() {
+    AppliedPTransform<?, ?, ?> parDoApplication =
+        getAppliedTransform(
+            ParDo.of(splittableDoFn).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty()));
+    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(true));
+
+    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
+  }
+
+  @Test
+  public void parDoMultiWithState() {
+    AppliedPTransform<?, ?, ?> parDoApplication =
+        getAppliedTransform(
+            ParDo.of(doFnWithState).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty()));
+    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(true));
+
+    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
+  }
+
+  @Test
+  public void parDoMultiWithTimers() {
+    AppliedPTransform<?, ?, ?> parDoApplication =
+        getAppliedTransform(
+            ParDo.of(doFnWithTimers).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty()));
+    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(true));
+
+    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
+    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
new file mode 100644
index 0000000..abfdeef
--- /dev/null
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.core.construction;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.PValue;
+import org.apache.beam.sdk.values.TaggedPValue;
+import org.apache.beam.sdk.values.TupleTag;
+import org.hamcrest.Matchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for {@link ReplacementOutputs}.
+ */
+@RunWith(JUnit4.class)
+public class ReplacementOutputsTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+  private TestPipeline p = TestPipeline.create();
+
+  private PCollection<Integer> ints =
+      PCollection.createPrimitiveOutputInternal(
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+  private PCollection<Integer> moreInts =
+      PCollection.createPrimitiveOutputInternal(
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+  private PCollection<String> strs =
+      PCollection.createPrimitiveOutputInternal(
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+
+  private PCollection<Integer> replacementInts =
+      PCollection.createPrimitiveOutputInternal(
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+  private PCollection<Integer> moreReplacementInts =
+      PCollection.createPrimitiveOutputInternal(
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+  private PCollection<String> replacementStrs =
+      PCollection.createPrimitiveOutputInternal(
+          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
+
+  @Test
+  public void singletonSucceeds() {
+    Map<PValue, ReplacementOutput> replacements =
+        ReplacementOutputs.singleton(ints.expand(), replacementInts);
+
+    assertThat(replacements, Matchers.<PValue>hasKey(replacementInts));
+
+    ReplacementOutput replacement = replacements.get(replacementInts);
+    TaggedPValue taggedInts = Iterables.getOnlyElement(ints.expand());
+    assertThat(replacement.getOriginal(), equalTo(taggedInts));
+    assertThat(replacement.getReplacement().getValue(), Matchers.<PValue>equalTo(replacementInts));
+  }
+
+  @Test
+  public void singletonMultipleOriginalsThrows() {
+    thrown.expect(IllegalArgumentException.class);
+    ReplacementOutputs.singleton(
+        ImmutableList.copyOf(Iterables.concat(ints.expand(), moreInts.expand())), replacementInts);
+  }
+
+  @Test
+  public void orderedSucceeds() {
+    List<TaggedPValue> originals = PCollectionList.of(ints).and(moreInts).expand();
+    Map<PValue, ReplacementOutput> replacements =
+        ReplacementOutputs.ordered(
+            originals, PCollectionList.of(replacementInts).and(moreReplacementInts));
+    assertThat(
+        replacements.keySet(),
+        Matchers.<PValue>containsInAnyOrder(replacementInts, moreReplacementInts));
+
+    ReplacementOutput intsMapping = replacements.get(replacementInts);
+    assertThat(intsMapping.getOriginal().getValue(), Matchers.<PValue>equalTo(ints));
+    assertThat(intsMapping.getReplacement().getValue(), Matchers.<PValue>equalTo(replacementInts));
+
+    ReplacementOutput moreIntsMapping = replacements.get(moreReplacementInts);
+    assertThat(moreIntsMapping.getOriginal().getValue(), Matchers.<PValue>equalTo(moreInts));
+    assertThat(
+        moreIntsMapping.getReplacement().getValue(), Matchers.<PValue>equalTo(moreReplacementInts));
+  }
+
+  @Test
+  public void orderedTooManyReplacements() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("same size");
+    ReplacementOutputs.ordered(
+        PCollectionList.of(ints).expand(),
+        PCollectionList.of(replacementInts).and(moreReplacementInts));
+  }
+
+  @Test
+  public void orderedTooFewReplacements() {
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("same size");
+    ReplacementOutputs.ordered(
+        PCollectionList.of(ints).and(moreInts).expand(), PCollectionList.of(moreReplacementInts));
+  }
+
+  private TupleTag<Integer> intsTag = new TupleTag<>();
+  private TupleTag<Integer> moreIntsTag = new TupleTag<>();
+  private TupleTag<String> strsTag = new TupleTag<>();
+
+  @Test
+  public void taggedSucceeds() {
+    PCollectionTuple original =
+        PCollectionTuple.of(intsTag, ints).and(strsTag, strs).and(moreIntsTag, moreInts);
+
+    Map<PValue, ReplacementOutput> replacements =
+        ReplacementOutputs.tagged(
+            original.expand(),
+            PCollectionTuple.of(strsTag, replacementStrs)
+                .and(moreIntsTag, moreReplacementInts)
+                .and(intsTag, replacementInts));
+    assertThat(
+        replacements.keySet(),
+        Matchers.<PValue>containsInAnyOrder(replacementStrs, replacementInts, moreReplacementInts));
+    ReplacementOutput intsReplacement = replacements.get(replacementInts);
+    ReplacementOutput strsReplacement = replacements.get(replacementStrs);
+    ReplacementOutput moreIntsReplacement = replacements.get(moreReplacementInts);
+
+    assertThat(
+        intsReplacement,
+        equalTo(
+            ReplacementOutput.of(
+                TaggedPValue.of(intsTag, ints), TaggedPValue.of(intsTag, replacementInts))));
+    assertThat(
+        strsReplacement,
+        equalTo(
+            ReplacementOutput.of(
+                TaggedPValue.of(strsTag, strs), TaggedPValue.of(strsTag, replacementStrs))));
+    assertThat(
+        moreIntsReplacement,
+        equalTo(
+            ReplacementOutput.of(
+                TaggedPValue.of(moreIntsTag, moreInts),
+                TaggedPValue.of(moreIntsTag, moreReplacementInts))));
+  }
+
+  /**
+   * When a call to {@link ReplacementOutputs#tagged(List, POutput)} is made where the first
+   * argument contains multiple copies of the same {@link TaggedPValue}, the call succeeds using
+   * that mapping.
+   */
+  @Test
+  public void taggedMultipleInstances() {
+    List<TaggedPValue> original =
+        ImmutableList.of(
+            TaggedPValue.of(intsTag, ints),
+            TaggedPValue.of(strsTag, strs),
+            TaggedPValue.of(intsTag, ints));
+
+    Map<PValue, ReplacementOutput> replacements =
+        ReplacementOutputs.tagged(
+            original, PCollectionTuple.of(strsTag, replacementStrs).and(intsTag, replacementInts));
+    assertThat(
+        replacements.keySet(),
+        Matchers.<PValue>containsInAnyOrder(replacementStrs, replacementInts));
+    ReplacementOutput intsReplacement = replacements.get(replacementInts);
+    ReplacementOutput strsReplacement = replacements.get(replacementStrs);
+
+    assertThat(
+        intsReplacement,
+        equalTo(
+            ReplacementOutput.of(
+                TaggedPValue.of(intsTag, ints), TaggedPValue.of(intsTag, replacementInts))));
+    assertThat(
+        strsReplacement,
+        equalTo(
+            ReplacementOutput.of(
+                TaggedPValue.of(strsTag, strs), TaggedPValue.of(strsTag, replacementStrs))));
+  }
+
+  /**
+   * When a call to {@link ReplacementOutputs#tagged(List, POutput)} is made where a single tag
+   * has multiple {@link PValue PValues} mapped to it, the call fails.
+   */
+  @Test
+  public void taggedMultipleConflictingInstancesThrows() {
+    List<TaggedPValue> original =
+        ImmutableList.of(
+            TaggedPValue.of(intsTag, ints), TaggedPValue.of(intsTag, moreReplacementInts));
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("different values");
+    thrown.expectMessage(intsTag.toString());
+    thrown.expectMessage(ints.toString());
+    thrown.expectMessage(moreReplacementInts.toString());
+    ReplacementOutputs.tagged(
+        original,
+        PCollectionTuple.of(strsTag, replacementStrs)
+            .and(moreIntsTag, moreReplacementInts)
+            .and(intsTag, replacementInts));
+  }
+
+  @Test
+  public void taggedMissingReplacementThrows() {
+    PCollectionTuple original =
+        PCollectionTuple.of(intsTag, ints).and(strsTag, strs).and(moreIntsTag, moreInts);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Missing replacement");
+    thrown.expectMessage(intsTag.toString());
+    thrown.expectMessage(ints.toString());
+    ReplacementOutputs.tagged(
+        original.expand(),
+        PCollectionTuple.of(strsTag, replacementStrs).and(moreIntsTag, moreReplacementInts));
+  }
+
+  @Test
+  public void taggedExtraReplacementThrows() {
+    PCollectionTuple original = PCollectionTuple.of(intsTag, ints).and(strsTag, strs);
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Missing original output");
+    thrown.expectMessage(moreIntsTag.toString());
+    thrown.expectMessage(moreReplacementInts.toString());
+    ReplacementOutputs.tagged(
+        original.expand(),
+        PCollectionTuple.of(strsTag, replacementStrs)
+            .and(moreIntsTag, moreReplacementInts)
+            .and(intsTag, replacementInts));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java
deleted file mode 100644
index 1d7e24e..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PTransformMatchers.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Experimental.Kind;
-import org.apache.beam.sdk.runners.PTransformMatcher;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignature.ProcessElementMethod;
-import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
-
-/**
- * A {@link PTransformMatcher} that matches {@link PTransform PTransforms} based on the class of the
- * transform.
- *
- * <p>Once {@link PTransform PTransforms} have URNs, this will be removed and replaced with a
- * UrnPTransformMatcher.
- */
-@Experimental(Kind.CORE_RUNNERS_ONLY)
-public class PTransformMatchers {
-  private PTransformMatchers() {}
-
-  /**
-   * Returns a {@link PTransformMatcher} that matches a {@link PTransform} if the class of the
-   * {@link PTransform} is equal to the {@link Class} provided ot this matcher.
-   * @param clazz
-   * @return
-   */
-  public static PTransformMatcher classEqualTo(Class<? extends PTransform> clazz) {
-    return new EqualClassPTransformMatcher(clazz);
-  }
-
-  private static class EqualClassPTransformMatcher implements PTransformMatcher {
-    private final Class<? extends PTransform> clazz;
-
-    private EqualClassPTransformMatcher(Class<? extends PTransform> clazz) {
-      this.clazz = clazz;
-    }
-
-    @Override
-    public boolean matches(AppliedPTransform<?, ?, ?> application) {
-      return application.getTransform().getClass().equals(clazz);
-    }
-  }
-
-  /**
-   * A {@link PTransformMatcher} that matches a {@link ParDo.Bound} containing a {@link DoFn} that
-   * is splittable, as signified by {@link ProcessElementMethod#isSplittable()}.
-   */
-  public static PTransformMatcher splittableParDoSingle() {
-    return new PTransformMatcher() {
-      @Override
-      public boolean matches(AppliedPTransform<?, ?, ?> application) {
-        PTransform<?, ?> transform = application.getTransform();
-        if (transform instanceof ParDo.Bound) {
-          DoFn<?, ?> fn = ((ParDo.Bound<?, ?>) transform).getFn();
-          DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn);
-          return signature.processElement().isSplittable();
-        }
-        return false;
-      }
-    };
-  }
-
-  /**
-   * A {@link PTransformMatcher} that matches a {@link ParDo.Bound} containing a {@link DoFn} that
-   * uses state or timers, as specified by {@link DoFnSignature#usesState()} and
-   * {@link DoFnSignature#usesTimers()}.
-   */
-  public static PTransformMatcher stateOrTimerParDoSingle() {
-    return new PTransformMatcher() {
-      @Override
-      public boolean matches(AppliedPTransform<?, ?, ?> application) {
-        PTransform<?, ?> transform = application.getTransform();
-        if (transform instanceof ParDo.Bound) {
-          DoFn<?, ?> fn = ((ParDo.Bound<?, ?>) transform).getFn();
-          DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn);
-          return signature.usesState() || signature.usesTimers();
-        }
-        return false;
-      }
-    };
-  }
-
-  /**
-   * A {@link PTransformMatcher} that matches a {@link ParDo.BoundMulti} containing a {@link DoFn}
-   * that is splittable, as signified by {@link ProcessElementMethod#isSplittable()}.
-   */
-  public static PTransformMatcher splittableParDoMulti() {
-    return new PTransformMatcher() {
-      @Override
-      public boolean matches(AppliedPTransform<?, ?, ?> application) {
-        PTransform<?, ?> transform = application.getTransform();
-        if (transform instanceof ParDo.BoundMulti) {
-          DoFn<?, ?> fn = ((ParDo.BoundMulti<?, ?>) transform).getFn();
-          DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn);
-          return signature.processElement().isSplittable();
-        }
-        return false;
-      }
-    };
-  }
-
-  /**
-   * A {@link PTransformMatcher} that matches a {@link ParDo.BoundMulti} containing a {@link DoFn}
-   * that uses state or timers, as specified by {@link DoFnSignature#usesState()} and
-   * {@link DoFnSignature#usesTimers()}.
-   */
-  public static PTransformMatcher stateOrTimerParDoMulti() {
-    return new PTransformMatcher() {
-      @Override
-      public boolean matches(AppliedPTransform<?, ?, ?> application) {
-        PTransform<?, ?> transform = application.getTransform();
-        if (transform instanceof ParDo.BoundMulti) {
-          DoFn<?, ?> fn = ((ParDo.BoundMulti<?, ?>) transform).getFn();
-          DoFnSignature signature = DoFnSignatures.signatureForDoFn(fn);
-          return signature.usesState() || signature.usesTimers();
-        }
-        return false;
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/runners/core-java/src/main/java/org/apache/beam/runners/core/ReplacementOutputs.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReplacementOutputs.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReplacementOutputs.java
deleted file mode 100644
index 73c3c5d..0000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReplacementOutputs.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Iterables;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
-import org.apache.beam.sdk.values.TupleTag;
-
-/**
- * Utility methods for creating {@link ReplacementOutput} for known styles of {@link POutput}.
- */
-public class ReplacementOutputs {
-  private ReplacementOutputs() {}
-
-  public static Map<PValue, ReplacementOutput> singleton(
-      List<TaggedPValue> original, PValue replacement) {
-    TaggedPValue taggedReplacement = Iterables.getOnlyElement(replacement.expand());
-    return ImmutableMap.<PValue, ReplacementOutput>builder()
-        .put(
-            taggedReplacement.getValue(),
-            ReplacementOutput.of(Iterables.getOnlyElement(original), taggedReplacement))
-        .build();
-  }
-
-  public static Map<PValue, ReplacementOutput> ordered(
-      List<TaggedPValue> original, POutput replacement) {
-    ImmutableMap.Builder<PValue, ReplacementOutput> result = ImmutableMap.builder();
-    List<TaggedPValue> replacements = replacement.expand();
-    checkArgument(
-        original.size() == replacements.size(),
-        "Original and Replacements must be the same size. Original: %s Replacement: %s",
-        original.size(),
-        replacements.size());
-    int i = 0;
-    for (TaggedPValue replacementPvalue : replacements) {
-      result.put(
-          replacementPvalue.getValue(), ReplacementOutput.of(original.get(i), replacementPvalue));
-      i++;
-    }
-    return result.build();
-  }
-
-  public static Map<PValue, ReplacementOutput> tagged(
-      List<TaggedPValue> original, POutput replacement) {
-    Map<TupleTag<?>, TaggedPValue> originalTags = new HashMap<>();
-    for (TaggedPValue value : original) {
-      TaggedPValue former = originalTags.put(value.getTag(), value);
-      checkArgument(
-          former == null || former.equals(value),
-          "Found two tags in an expanded output which map to different values: output: %s "
-              + "Values: %s and %s",
-          original,
-          former,
-          value);
-    }
-    ImmutableMap.Builder<PValue, ReplacementOutput> resultBuilder = ImmutableMap.builder();
-    Set<TupleTag<?>> missingTags = new HashSet<>(originalTags.keySet());
-    for (TaggedPValue replacementValue : replacement.expand()) {
-      TaggedPValue mapped = originalTags.get(replacementValue.getTag());
-      checkArgument(
-          mapped != null,
-          "Missing original output for Tag %s and Value %s Between original %s and replacement %s",
-          replacementValue.getTag(),
-          replacementValue.getValue(),
-          original,
-          replacement.expand());
-      resultBuilder.put(
-          replacementValue.getValue(), ReplacementOutput.of(mapped, replacementValue));
-      missingTags.remove(replacementValue.getTag());
-    }
-    ImmutableMap<PValue, ReplacementOutput> result = resultBuilder.build();
-    checkArgument(
-        missingTags.isEmpty(),
-        "Missing replacement for tags %s. Encountered tags: %s",
-        missingTags,
-        result.keySet());
-    return result;
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java
deleted file mode 100644
index 83795f1..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PTransformMatchersTest.java
+++ /dev/null
@@ -1,273 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core;
-
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.not;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.base.MoreObjects;
-import java.io.Serializable;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.runners.PTransformMatcher;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.util.TimeDomain;
-import org.apache.beam.sdk.util.Timer;
-import org.apache.beam.sdk.util.TimerSpec;
-import org.apache.beam.sdk.util.TimerSpecs;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateSpec;
-import org.apache.beam.sdk.util.state.StateSpecs;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.beam.sdk.values.TupleTagList;
-import org.hamcrest.Matchers;
-import org.joda.time.Duration;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link PTransformMatcher}.
- */
-@RunWith(JUnit4.class)
-public class PTransformMatchersTest implements Serializable {
-  @Rule
-  public transient TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
-
-  /**
-   * Gets the {@link AppliedPTransform} that has a created {@code PCollection<KV<String, Integer>>}
-   * as input.
-   */
-  private AppliedPTransform<?, ?, ?> getAppliedTransform(PTransform pardo) {
-    PCollection<KV<String, Integer>> input =
-        PCollection.createPrimitiveOutputInternal(
-            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
-    PCollection<Integer> output =
-        PCollection.createPrimitiveOutputInternal(
-            p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
-
-    return AppliedPTransform.of("pardo", input.expand(), output.expand(), pardo, p);
-  }
-
-  @Test
-  public void classEqualToMatchesSameClass() {
-    PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.Bound.class);
-    AppliedPTransform<?, ?, ?> application =
-        getAppliedTransform(
-            ParDo.of(
-                new DoFn<KV<String, Integer>, Integer>() {
-                  @ProcessElement
-                  public void doStuff(ProcessContext ctxt) {}
-                }));
-
-    assertThat(matcher.matches(application), is(true));
-  }
-
-  @Test
-  public void classEqualToDoesNotMatchSubclass() {
-    class MyPTransform extends PTransform<PCollection<KV<String, Integer>>, PCollection<Integer>> {
-      @Override
-      public PCollection<Integer> expand(PCollection<KV<String, Integer>> input) {
-        return PCollection.createPrimitiveOutputInternal(
-            input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
-      }
-    }
-    PTransformMatcher matcher = PTransformMatchers.classEqualTo(MyPTransform.class);
-    MyPTransform subclass = new MyPTransform() {};
-
-    assertThat(subclass.getClass(), not(Matchers.<Class<?>>equalTo(MyPTransform.class)));
-    assertThat(subclass, instanceOf(MyPTransform.class));
-
-    AppliedPTransform<?, ?, ?> application =
-        getAppliedTransform(subclass);
-
-    assertThat(matcher.matches(application), is(false));
-  }
-
-  @Test
-  public void classEqualToDoesNotMatchUnrelatedClass() {
-    PTransformMatcher matcher = PTransformMatchers.classEqualTo(ParDo.Bound.class);
-    AppliedPTransform<?, ?, ?> application =
-        getAppliedTransform(Window.<KV<String, Integer>>into(new GlobalWindows()));
-
-    assertThat(matcher.matches(application), is(false));
-  }
-
-  private DoFn<KV<String, Integer>, Integer> doFn =
-      new DoFn<KV<String, Integer>, Integer>() {
-        @ProcessElement
-        public void simpleProcess(ProcessContext ctxt) {
-          ctxt.output(ctxt.element().getValue() + 1);
-        }
-      };
-  private abstract static class SomeTracker implements RestrictionTracker<Void> {}
-  private DoFn<KV<String, Integer>, Integer> splittableDoFn =
-      new DoFn<KV<String, Integer>, Integer>() {
-        @ProcessElement
-        public void processElement(ProcessContext context, SomeTracker tracker) {}
-
-        @GetInitialRestriction
-        public Void getInitialRestriction(KV<String, Integer> element) {
-          return null;
-        }
-
-        @NewTracker
-        public SomeTracker newTracker(Void restriction) {
-          return null;
-        }
-      };
-  private DoFn<KV<String, Integer>, Integer> doFnWithState =
-      new DoFn<KV<String, Integer>, Integer>() {
-        private final String stateId = "mystate";
-
-        @StateId(stateId)
-        private final StateSpec<Object, ValueState<Integer>> intState =
-            StateSpecs.value(VarIntCoder.of());
-
-        @ProcessElement
-        public void processElement(ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
-          Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
-          c.output(currentValue);
-          state.write(currentValue + 1);
-        }
-      };
-  private DoFn<KV<String, Integer>, Integer> doFnWithTimers =
-      new DoFn<KV<String, Integer>, Integer>() {
-        private final String timerId = "myTimer";
-
-        @TimerId(timerId)
-        private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
-
-        @ProcessElement
-        public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
-          timer.setForNowPlus(Duration.standardSeconds(1));
-          context.output(3);
-        }
-
-        @OnTimer(timerId)
-        public void onTimer(OnTimerContext context) {
-          context.output(42);
-        }
-      };
-
-  /**
-   * Demonstrates that a {@link ParDo.Bound} does not match any ParDo matcher.
-   */
-  @Test
-  public void parDoSingle() {
-    AppliedPTransform<?, ?, ?> parDoApplication = getAppliedTransform(ParDo.of(doFn));
-
-    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
-    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
-    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
-    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
-  }
-
-  @Test
-  public void parDoSingleSplittable() {
-    AppliedPTransform<?, ?, ?> parDoApplication = getAppliedTransform(ParDo.of(splittableDoFn));
-    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(true));
-
-    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
-    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
-    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
-  }
-
-  @Test
-  public void parDoSingleWithState() {
-    AppliedPTransform<?, ?, ?> parDoApplication = getAppliedTransform(ParDo.of(doFnWithState));
-    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(true));
-
-    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
-    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
-    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
-  }
-
-  @Test
-  public void parDoSingleWithTimers() {
-    AppliedPTransform<?, ?, ?> parDoApplication =
-        getAppliedTransform(ParDo.of(doFnWithTimers));
-    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(true));
-
-    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
-    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
-    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
-  }
-
-  @Test
-  public void parDoMulti() {
-    AppliedPTransform<?, ?, ?> parDoApplication =
-        getAppliedTransform(
-            ParDo.of(doFn).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty()));
-
-    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
-    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
-    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
-    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
-  }
-
-  @Test
-  public void parDoMultiSplittable() {
-    AppliedPTransform<?, ?, ?> parDoApplication =
-        getAppliedTransform(
-            ParDo.of(splittableDoFn).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty()));
-    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(true));
-
-    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(false));
-    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
-    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
-  }
-
-  @Test
-  public void parDoMultiWithState() {
-    AppliedPTransform<?, ?, ?> parDoApplication =
-        getAppliedTransform(
-            ParDo.of(doFnWithState).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty()));
-    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(true));
-
-    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
-    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
-    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
-  }
-
-  @Test
-  public void parDoMultiWithTimers() {
-    AppliedPTransform<?, ?, ?> parDoApplication =
-        getAppliedTransform(
-            ParDo.of(doFnWithTimers).withOutputTags(new TupleTag<Integer>(), TupleTagList.empty()));
-    assertThat(PTransformMatchers.stateOrTimerParDoMulti().matches(parDoApplication), is(true));
-
-    assertThat(PTransformMatchers.splittableParDoMulti().matches(parDoApplication), is(false));
-    assertThat(PTransformMatchers.splittableParDoSingle().matches(parDoApplication), is(false));
-    assertThat(PTransformMatchers.stateOrTimerParDoSingle().matches(parDoApplication), is(false));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/runners/core-java/src/test/java/org/apache/beam/runners/core/ReplacementOutputsTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReplacementOutputsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReplacementOutputsTest.java
deleted file mode 100644
index 49943d7..0000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReplacementOutputsTest.java
+++ /dev/null
@@ -1,254 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.core;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.junit.Assert.assertThat;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import java.util.List;
-import java.util.Map;
-import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.PCollectionList;
-import org.apache.beam.sdk.values.PCollectionTuple;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.PValue;
-import org.apache.beam.sdk.values.TaggedPValue;
-import org.apache.beam.sdk.values.TupleTag;
-import org.hamcrest.Matchers;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link ReplacementOutputs}.
- */
-@RunWith(JUnit4.class)
-public class ReplacementOutputsTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-  private TestPipeline p = TestPipeline.create();
-
-  private PCollection<Integer> ints =
-      PCollection.createPrimitiveOutputInternal(
-          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
-  private PCollection<Integer> moreInts =
-      PCollection.createPrimitiveOutputInternal(
-          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
-  private PCollection<String> strs =
-      PCollection.createPrimitiveOutputInternal(
-          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
-
-  private PCollection<Integer> replacementInts =
-      PCollection.createPrimitiveOutputInternal(
-          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
-  private PCollection<Integer> moreReplacementInts =
-      PCollection.createPrimitiveOutputInternal(
-          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
-  private PCollection<String> replacementStrs =
-      PCollection.createPrimitiveOutputInternal(
-          p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED);
-
-  @Test
-  public void singletonSucceeds() {
-    Map<PValue, ReplacementOutput> replacements =
-        ReplacementOutputs.singleton(ints.expand(), replacementInts);
-
-    assertThat(replacements, Matchers.<PValue>hasKey(replacementInts));
-
-    ReplacementOutput replacement = replacements.get(replacementInts);
-    TaggedPValue taggedInts = Iterables.getOnlyElement(ints.expand());
-    assertThat(replacement.getOriginal(), equalTo(taggedInts));
-    assertThat(replacement.getReplacement().getValue(), Matchers.<PValue>equalTo(replacementInts));
-  }
-
-  @Test
-  public void singletonMultipleOriginalsThrows() {
-    thrown.expect(IllegalArgumentException.class);
-    ReplacementOutputs.singleton(
-        ImmutableList.copyOf(Iterables.concat(ints.expand(), moreInts.expand())), replacementInts);
-  }
-
-  @Test
-  public void orderedSucceeds() {
-    List<TaggedPValue> originals = PCollectionList.of(ints).and(moreInts).expand();
-    Map<PValue, ReplacementOutput> replacements =
-        ReplacementOutputs.ordered(
-            originals, PCollectionList.of(replacementInts).and(moreReplacementInts));
-    assertThat(
-        replacements.keySet(),
-        Matchers.<PValue>containsInAnyOrder(replacementInts, moreReplacementInts));
-
-    ReplacementOutput intsMapping = replacements.get(replacementInts);
-    assertThat(intsMapping.getOriginal().getValue(), Matchers.<PValue>equalTo(ints));
-    assertThat(intsMapping.getReplacement().getValue(), Matchers.<PValue>equalTo(replacementInts));
-
-    ReplacementOutput moreIntsMapping = replacements.get(moreReplacementInts);
-    assertThat(moreIntsMapping.getOriginal().getValue(), Matchers.<PValue>equalTo(moreInts));
-    assertThat(
-        moreIntsMapping.getReplacement().getValue(), Matchers.<PValue>equalTo(moreReplacementInts));
-  }
-
-  @Test
-  public void orderedTooManyReplacements() {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("same size");
-    ReplacementOutputs.ordered(
-        PCollectionList.of(ints).expand(),
-        PCollectionList.of(replacementInts).and(moreReplacementInts));
-  }
-
-  @Test
-  public void orderedTooFewReplacements() {
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("same size");
-    ReplacementOutputs.ordered(
-        PCollectionList.of(ints).and(moreInts).expand(), PCollectionList.of(moreReplacementInts));
-  }
-
-  private TupleTag<Integer> intsTag = new TupleTag<>();
-  private TupleTag<Integer> moreIntsTag = new TupleTag<>();
-  private TupleTag<String> strsTag = new TupleTag<>();
-
-  @Test
-  public void taggedSucceeds() {
-    PCollectionTuple original =
-        PCollectionTuple.of(intsTag, ints).and(strsTag, strs).and(moreIntsTag, moreInts);
-
-    Map<PValue, ReplacementOutput> replacements =
-        ReplacementOutputs.tagged(
-            original.expand(),
-            PCollectionTuple.of(strsTag, replacementStrs)
-                .and(moreIntsTag, moreReplacementInts)
-                .and(intsTag, replacementInts));
-    assertThat(
-        replacements.keySet(),
-        Matchers.<PValue>containsInAnyOrder(replacementStrs, replacementInts, moreReplacementInts));
-    ReplacementOutput intsReplacement = replacements.get(replacementInts);
-    ReplacementOutput strsReplacement = replacements.get(replacementStrs);
-    ReplacementOutput moreIntsReplacement = replacements.get(moreReplacementInts);
-
-    assertThat(
-        intsReplacement,
-        equalTo(
-            ReplacementOutput.of(
-                TaggedPValue.of(intsTag, ints), TaggedPValue.of(intsTag, replacementInts))));
-    assertThat(
-        strsReplacement,
-        equalTo(
-            ReplacementOutput.of(
-                TaggedPValue.of(strsTag, strs), TaggedPValue.of(strsTag, replacementStrs))));
-    assertThat(
-        moreIntsReplacement,
-        equalTo(
-            ReplacementOutput.of(
-                TaggedPValue.of(moreIntsTag, moreInts),
-                TaggedPValue.of(moreIntsTag, moreReplacementInts))));
-  }
-
-  /**
-   * When a call to {@link ReplacementOutputs#tagged(List, POutput)} is made where the first
-   * argument contains multiple copies of the same {@link TaggedPValue}, the call succeeds using
-   * that mapping.
-   */
-  @Test
-  public void taggedMultipleInstances() {
-    List<TaggedPValue> original =
-        ImmutableList.of(
-            TaggedPValue.of(intsTag, ints),
-            TaggedPValue.of(strsTag, strs),
-            TaggedPValue.of(intsTag, ints));
-
-    Map<PValue, ReplacementOutput> replacements =
-        ReplacementOutputs.tagged(
-            original, PCollectionTuple.of(strsTag, replacementStrs).and(intsTag, replacementInts));
-    assertThat(
-        replacements.keySet(),
-        Matchers.<PValue>containsInAnyOrder(replacementStrs, replacementInts));
-    ReplacementOutput intsReplacement = replacements.get(replacementInts);
-    ReplacementOutput strsReplacement = replacements.get(replacementStrs);
-
-    assertThat(
-        intsReplacement,
-        equalTo(
-            ReplacementOutput.of(
-                TaggedPValue.of(intsTag, ints), TaggedPValue.of(intsTag, replacementInts))));
-    assertThat(
-        strsReplacement,
-        equalTo(
-            ReplacementOutput.of(
-                TaggedPValue.of(strsTag, strs), TaggedPValue.of(strsTag, replacementStrs))));
-  }
-
-  /**
-   * When a call to {@link ReplacementOutputs#tagged(List, POutput)} is made where a single tag
-   * has multiple {@link PValue PValues} mapped to it, the call fails.
-   */
-  @Test
-  public void taggedMultipleConflictingInstancesThrows() {
-    List<TaggedPValue> original =
-        ImmutableList.of(
-            TaggedPValue.of(intsTag, ints), TaggedPValue.of(intsTag, moreReplacementInts));
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("different values");
-    thrown.expectMessage(intsTag.toString());
-    thrown.expectMessage(ints.toString());
-    thrown.expectMessage(moreReplacementInts.toString());
-    ReplacementOutputs.tagged(
-        original,
-        PCollectionTuple.of(strsTag, replacementStrs)
-            .and(moreIntsTag, moreReplacementInts)
-            .and(intsTag, replacementInts));
-  }
-
-  @Test
-  public void taggedMissingReplacementThrows() {
-    PCollectionTuple original =
-        PCollectionTuple.of(intsTag, ints).and(strsTag, strs).and(moreIntsTag, moreInts);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Missing replacement");
-    thrown.expectMessage(intsTag.toString());
-    thrown.expectMessage(ints.toString());
-    ReplacementOutputs.tagged(
-        original.expand(),
-        PCollectionTuple.of(strsTag, replacementStrs).and(moreIntsTag, moreReplacementInts));
-  }
-
-  @Test
-  public void taggedExtraReplacementThrows() {
-    PCollectionTuple original = PCollectionTuple.of(intsTag, ints).and(strsTag, strs);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Missing original output");
-    thrown.expectMessage(moreIntsTag.toString());
-    thrown.expectMessage(moreReplacementInts.toString());
-    ReplacementOutputs.tagged(
-        original.expand(),
-        PCollectionTuple.of(strsTag, replacementStrs)
-            .and(moreIntsTag, moreReplacementInts)
-            .and(intsTag, replacementInts));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/runners/direct-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/direct-java/pom.xml b/runners/direct-java/pom.xml
index 823c1e9..ced9cd6 100644
--- a/runners/direct-java/pom.xml
+++ b/runners/direct-java/pom.xml
@@ -151,6 +151,11 @@
 
     <dependency>
       <groupId>org.apache.beam</groupId>
+      <artifactId>beam-runners-core-construction-java</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-core-java</artifactId>
     </dependency>
 
@@ -261,7 +266,7 @@
       </exclusions>
       <scope>test</scope>
     </dependency>
-    
+
     <!-- needed for eclipse-jdt generated core as the test-jar references classes from this -->
     <dependency>
         <groupId>com.google.cloud.dataflow</groupId>

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
index 8de7b93..a957a17 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGBKIntoKeyedWorkItemsOverrideFactory.java
@@ -21,8 +21,8 @@ import com.google.common.collect.Iterables;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.KeyedWorkItem;
-import org.apache.beam.runners.core.ReplacementOutputs;
 import org.apache.beam.runners.core.SplittableParDo.GBKIntoKeyedWorkItems;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.PTransform;

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
index eedee31..1651987 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGroupByKeyOverrideFactory.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.direct;
 import com.google.common.collect.Iterables;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.GroupByKey;

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
index ccbde7a..c999093 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java
@@ -25,8 +25,8 @@ import java.util.Map;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.KeyedWorkItems;
-import org.apache.beam.runners.core.ReplacementOutputs;
 import org.apache.beam.runners.core.SplittableParDo;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
index 0ac8b04..990efb3 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoSingleViaMultiOverrideFactory.java
@@ -20,7 +20,7 @@ package org.apache.beam.runners.direct;
 import com.google.common.collect.Iterables;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.PTransform;

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
index 082d33f..53e2671 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java
@@ -30,7 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
-import org.apache.beam.runners.core.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.Pipeline;

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
index 6ccc156..49faaa9 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java
@@ -21,7 +21,7 @@ import com.google.common.collect.Iterables;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import org.apache.beam.runners.core.ReplacementOutputs;
+import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import org.apache.beam.runners.direct.CommittedResult.OutputType;
 import org.apache.beam.runners.direct.DirectRunner.PCollectionViewWriter;
 import org.apache.beam.runners.direct.StepTransformResult.Builder;

http://git-wip-us.apache.org/repos/asf/beam/blob/1ffbc689/runners/pom.xml
----------------------------------------------------------------------
diff --git a/runners/pom.xml b/runners/pom.xml
index 6513a33..3f74f7b 100644
--- a/runners/pom.xml
+++ b/runners/pom.xml
@@ -33,6 +33,7 @@
   <name>Apache Beam :: Runners</name>
 
   <modules>
+    <module>core-construction-java</module>
     <module>core-java</module>
     <module>direct-java</module>
     <module>flink</module>


Mime
View raw message