beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [2/3] incubator-beam git commit: Use Structural Value keys instead of User Values
Date Thu, 02 Jun 2016 00:57:03 GMT
Use Structural Value keys instead of User Values

This fixes problems with lookup by basing entirely on structural
equality.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e3cc4fa4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e3cc4fa4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e3cc4fa4

Branch: refs/heads/master
Commit: e3cc4fa4724625f49e3e6690e878a4713615b2e1
Parents: b9a8cbe
Author: Thomas Groh <tgroh@google.com>
Authored: Wed Jun 1 14:28:18 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Wed Jun 1 17:56:29 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/direct/BundleFactory.java      |  4 +-
 .../direct/ExecutorServiceParallelExecutor.java |  8 +-
 .../ImmutabilityCheckingBundleFactory.java      |  4 +-
 .../direct/InMemoryWatermarkManager.java        | 70 ++++++++-------
 .../runners/direct/InProcessBundleFactory.java  | 25 +++---
 .../direct/InProcessEvaluationContext.java      | 10 +--
 .../direct/InProcessExecutionContext.java       |  4 +-
 ...InProcessGroupByKeyOnlyEvaluatorFactory.java |  6 +-
 .../runners/direct/InProcessPipelineRunner.java |  7 +-
 .../apache/beam/runners/direct/StepAndKey.java  |  8 +-
 .../beam/runners/direct/StructuralKey.java      | 77 ++++++++++++++++
 .../direct/GroupByKeyEvaluatorFactoryTest.java  | 27 ++++--
 .../ImmutabilityCheckingBundleFactoryTest.java  | 13 ++-
 .../direct/InMemoryWatermarkManagerTest.java    | 95 +++++++++++---------
 .../direct/InProcessBundleFactoryTest.java      | 54 ++++++-----
 .../direct/InProcessEvaluationContextTest.java  | 52 ++++++-----
 ...ocessGroupByKeyOnlyEvaluatorFactoryTest.java | 33 ++++---
 .../direct/InProcessPipelineRunnerTest.java     | 60 +++++++++++++
 .../direct/InProcessTimerInternalsTest.java     |  3 +-
 .../direct/ParDoInProcessEvaluatorTest.java     |  2 +-
 .../direct/ParDoMultiEvaluatorFactoryTest.java  | 29 +++---
 .../direct/ParDoSingleEvaluatorFactoryTest.java | 35 +++++---
 .../beam/runners/direct/StructuralKeyTest.java  | 81 +++++++++++++++++
 23 files changed, 500 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
index fea4841..a0511df 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BundleFactory.java
@@ -44,6 +44,6 @@ public interface BundleFactory {
    * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}. Elements added to the bundle
    * belong to the {@code output} {@link PCollection}.
    */
-  public <T> UncommittedBundle<T> createKeyedBundle(
-      CommittedBundle<?> input, Object key, PCollection<T> output);
+  public <K, T> UncommittedBundle<T> createKeyedBundle(
+      CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output);
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 70a8035..a627125 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -396,17 +396,19 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor {
     private boolean fireTimers() throws Exception {
       try {
         boolean firedTimers = false;
-        for (Map.Entry<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> transformTimers :
+        for (Map.Entry<
+               AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> transformTimers :
             evaluationContext.extractFiredTimers().entrySet()) {
           AppliedPTransform<?, ?, ?> transform = transformTimers.getKey();
-          for (Map.Entry<Object, FiredTimers> keyTimers : transformTimers.getValue().entrySet()) {
+          for (Map.Entry<StructuralKey<?>, FiredTimers> keyTimers :
+              transformTimers.getValue().entrySet()) {
             for (TimeDomain domain : TimeDomain.values()) {
               Collection<TimerData> delivery = keyTimers.getValue().getTimers(domain);
               if (delivery.isEmpty()) {
                 continue;
               }
               KeyedWorkItem<Object, Object> work =
-                  KeyedWorkItems.timersWorkItem(keyTimers.getKey(), delivery);
+                  KeyedWorkItems.timersWorkItem(keyTimers.getKey().getKey(), delivery);
               @SuppressWarnings({"unchecked", "rawtypes"})
               CommittedBundle<?> bundle =
                   evaluationContext

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
index 3b38211..92a57dd 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactory.java
@@ -72,8 +72,8 @@ class ImmutabilityCheckingBundleFactory implements BundleFactory {
   }
 
   @Override
-  public <T> UncommittedBundle<T> createKeyedBundle(
-      CommittedBundle<?> input, Object key, PCollection<T> output) {
+  public <K, T> UncommittedBundle<T> createKeyedBundle(
+      CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) {
     return new ImmutabilityEnforcingBundle<>(underlying.createKeyedBundle(input, key, output));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
index f8cf343..95095fa 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InMemoryWatermarkManager.java
@@ -207,7 +207,7 @@ public class InMemoryWatermarkManager {
   private static class AppliedPTransformInputWatermark implements Watermark {
     private final Collection<? extends Watermark> inputWatermarks;
     private final SortedMultiset<WindowedValue<?>> pendingElements;
-    private final Map<Object, NavigableSet<TimerData>> objectTimers;
+    private final Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers;
 
     private AtomicReference<Instant> currentWatermark;
 
@@ -286,7 +286,7 @@ public class InMemoryWatermarkManager {
       // We don't keep references to timers that have been fired and delivered via #getFiredTimers()
     }
 
-    private synchronized Map<Object, List<TimerData>> extractFiredEventTimeTimers() {
+    private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredEventTimeTimers() {
       return extractFiredTimers(currentWatermark.get(), objectTimers);
     }
 
@@ -384,8 +384,8 @@ public class InMemoryWatermarkManager {
   private static class SynchronizedProcessingTimeInputWatermark implements Watermark {
     private final Collection<? extends Watermark> inputWms;
     private final Collection<CommittedBundle<?>> pendingBundles;
-    private final Map<Object, NavigableSet<TimerData>> processingTimers;
-    private final Map<Object, NavigableSet<TimerData>> synchronizedProcessingTimers;
+    private final Map<StructuralKey<?>, NavigableSet<TimerData>> processingTimers;
+    private final Map<StructuralKey<?>, NavigableSet<TimerData>> synchronizedProcessingTimers;
 
     private final PriorityQueue<TimerData> pendingTimers;
 
@@ -490,9 +490,9 @@ public class InMemoryWatermarkManager {
       }
     }
 
-    private synchronized Map<Object, List<TimerData>> extractFiredDomainTimers(
+    private synchronized Map<StructuralKey<?>, List<TimerData>> extractFiredDomainTimers(
         TimeDomain domain, Instant firingTime) {
-      Map<Object, List<TimerData>> firedTimers;
+      Map<StructuralKey<?>, List<TimerData>> firedTimers;
       switch (domain) {
         case PROCESSING_TIME:
           firedTimers = extractFiredTimers(firingTime, processingTimers);
@@ -509,13 +509,14 @@ public class InMemoryWatermarkManager {
                   + " and gave a non-processing time domain "
                   + domain);
       }
-      for (Map.Entry<Object, ? extends Collection<TimerData>> firedTimer : firedTimers.entrySet()) {
+      for (Map.Entry<StructuralKey<?>, ? extends Collection<TimerData>> firedTimer :
+          firedTimers.entrySet()) {
         pendingTimers.addAll(firedTimer.getValue());
       }
       return firedTimers;
     }
 
-    private Map<TimeDomain, NavigableSet<TimerData>> timerMap(Object key) {
+    private Map<TimeDomain, NavigableSet<TimerData>> timerMap(StructuralKey<?> key) {
       NavigableSet<TimerData> processingQueue = processingTimers.get(key);
       if (processingQueue == null) {
         processingQueue = new TreeSet<>();
@@ -647,11 +648,12 @@ public class InMemoryWatermarkManager {
    *
    * The result collection retains ordering of timers (from earliest to latest).
    */
-  private static Map<Object, List<TimerData>> extractFiredTimers(
-      Instant latestTime, Map<Object, NavigableSet<TimerData>> objectTimers) {
-    Map<Object, List<TimerData>> result = new HashMap<>();
-    Set<Object> emptyKeys = new HashSet<>();
-    for (Map.Entry<Object, NavigableSet<TimerData>> pendingTimers : objectTimers.entrySet()) {
+  private static Map<StructuralKey<?>, List<TimerData>> extractFiredTimers(
+      Instant latestTime, Map<StructuralKey<?>, NavigableSet<TimerData>> objectTimers) {
+    Map<StructuralKey<?>, List<TimerData>> result = new HashMap<>();
+    Set<StructuralKey<?>> emptyKeys = new HashSet<>();
+    for (Map.Entry<StructuralKey<?>, NavigableSet<TimerData>> pendingTimers :
+        objectTimers.entrySet()) {
       NavigableSet<TimerData> timers = pendingTimers.getValue();
       if (!timers.isEmpty() && timers.first().getTimestamp().isBefore(latestTime)) {
         ArrayList<TimerData> keyFiredTimers = new ArrayList<>();
@@ -923,11 +925,12 @@ public class InMemoryWatermarkManager {
    * Returns a map of each {@link PTransform} that has pending timers to those timers. All of the
    * pending timers will be removed from this {@link InMemoryWatermarkManager}.
    */
-  public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() {
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> allTimers = new HashMap<>();
+  public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> extractFiredTimers() {
+    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> allTimers = new HashMap<>();
     for (Map.Entry<AppliedPTransform<?, ?, ?>, TransformWatermarks> watermarksEntry :
         transformToWatermarks.entrySet()) {
-      Map<Object, FiredTimers> keyFiredTimers = watermarksEntry.getValue().extractFiredTimers();
+      Map<StructuralKey<?>, FiredTimers> keyFiredTimers =
+          watermarksEntry.getValue().extractFiredTimers();
       if (!keyFiredTimers.isEmpty()) {
         allTimers.put(watermarksEntry.getKey(), keyFiredTimers);
       }
@@ -1130,10 +1133,11 @@ public class InMemoryWatermarkManager {
       return FluentIterable.from(bundle.getElements()).transformAndConcat(EXPLODE_WINDOWS_FN);
     }
 
-    private Map<Object, FiredTimers> extractFiredTimers() {
-      Map<Object, List<TimerData>> eventTimeTimers = inputWatermark.extractFiredEventTimeTimers();
-      Map<Object, List<TimerData>> processingTimers;
-      Map<Object, List<TimerData>> synchronizedTimers;
+    private Map<StructuralKey<?>, FiredTimers> extractFiredTimers() {
+      Map<StructuralKey<?>, List<TimerData>> eventTimeTimers =
+          inputWatermark.extractFiredEventTimeTimers();
+      Map<StructuralKey<?>, List<TimerData>> processingTimers;
+      Map<StructuralKey<?>, List<TimerData>> synchronizedTimers;
       if (inputWatermark.get().equals(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
         processingTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
             TimeDomain.PROCESSING_TIME, BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -1145,11 +1149,11 @@ public class InMemoryWatermarkManager {
         synchronizedTimers = synchronizedProcessingInputWatermark.extractFiredDomainTimers(
             TimeDomain.SYNCHRONIZED_PROCESSING_TIME, getSynchronizedProcessingInputTime());
       }
-      Map<Object, Map<TimeDomain, List<TimerData>>> groupedTimers = new HashMap<>();
+      Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> groupedTimers = new HashMap<>();
       groupFiredTimers(groupedTimers, eventTimeTimers, processingTimers, synchronizedTimers);
 
-      Map<Object, FiredTimers> keyFiredTimers = new HashMap<>();
-      for (Map.Entry<Object, Map<TimeDomain, List<TimerData>>> firedTimers :
+      Map<StructuralKey<?>, FiredTimers> keyFiredTimers = new HashMap<>();
+      for (Map.Entry<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> firedTimers :
           groupedTimers.entrySet()) {
         keyFiredTimers.put(firedTimers.getKey(), new FiredTimers(firedTimers.getValue()));
       }
@@ -1158,10 +1162,10 @@ public class InMemoryWatermarkManager {
 
     @SafeVarargs
     private final void groupFiredTimers(
-        Map<Object, Map<TimeDomain, List<TimerData>>> groupedToMutate,
-        Map<Object, List<TimerData>>... timersToGroup) {
-      for (Map<Object, List<TimerData>> subGroup : timersToGroup) {
-        for (Map.Entry<Object, List<TimerData>> newTimers : subGroup.entrySet()) {
+        Map<StructuralKey<?>, Map<TimeDomain, List<TimerData>>> groupedToMutate,
+        Map<StructuralKey<?>, List<TimerData>>... timersToGroup) {
+      for (Map<StructuralKey<?>, List<TimerData>> subGroup : timersToGroup) {
+        for (Map.Entry<StructuralKey<?>, List<TimerData>> newTimers : subGroup.entrySet()) {
           Map<TimeDomain, List<TimerData>> grouped = groupedToMutate.get(newTimers.getKey());
           if (grouped == null) {
             grouped = new HashMap<>();
@@ -1196,7 +1200,7 @@ public class InMemoryWatermarkManager {
    * the input to the executed step.
    */
   public static class TimerUpdate {
-    private final Object key;
+    private final StructuralKey<?> key;
     private final Iterable<? extends TimerData> completedTimers;
 
     private final Iterable<? extends TimerData> setTimers;
@@ -1217,7 +1221,7 @@ public class InMemoryWatermarkManager {
      * Creates a new {@link TimerUpdate} builder with the provided completed timers that needs the
      * set and deleted timers to be added to it.
      */
-    public static TimerUpdateBuilder builder(Object key) {
+    public static TimerUpdateBuilder builder(StructuralKey<?> key) {
       return new TimerUpdateBuilder(key);
     }
 
@@ -1225,12 +1229,12 @@ public class InMemoryWatermarkManager {
      * A {@link TimerUpdate} builder that needs to be provided with set timers and deleted timers.
      */
     public static final class TimerUpdateBuilder {
-      private final Object key;
+      private final StructuralKey<?> key;
       private final Collection<TimerData> completedTimers;
       private final Collection<TimerData> setTimers;
       private final Collection<TimerData> deletedTimers;
 
-      private TimerUpdateBuilder(Object key) {
+      private TimerUpdateBuilder(StructuralKey<?> key) {
         this.key = key;
         this.completedTimers = new HashSet<>();
         this.setTimers = new HashSet<>();
@@ -1280,7 +1284,7 @@ public class InMemoryWatermarkManager {
     }
 
     private TimerUpdate(
-        Object key,
+        StructuralKey<?> key,
         Iterable<? extends TimerData> completedTimers,
         Iterable<? extends TimerData> setTimers,
         Iterable<? extends TimerData> deletedTimers) {
@@ -1291,7 +1295,7 @@ public class InMemoryWatermarkManager {
     }
 
     @VisibleForTesting
-    Object getKey() {
+    StructuralKey<?> getKey() {
       return key;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
index bc9b04c..52bc575 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessBundleFactory.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkState;
 
 import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.values.PCollection;
 
@@ -29,8 +30,6 @@ import com.google.common.collect.ImmutableList;
 
 import org.joda.time.Instant;
 
-import javax.annotation.Nullable;
-
 /**
  * A factory that produces bundles that perform no additional validation.
  */
@@ -43,7 +42,7 @@ class InProcessBundleFactory implements BundleFactory {
 
   @Override
   public <T> UncommittedBundle<T> createRootBundle(PCollection<T> output) {
-    return InProcessBundle.create(output, null);
+    return InProcessBundle.create(output, StructuralKey.of(null, VoidCoder.of()));
   }
 
   @Override
@@ -52,8 +51,8 @@ class InProcessBundleFactory implements BundleFactory {
   }
 
   @Override
-  public <T> UncommittedBundle<T> createKeyedBundle(
-      CommittedBundle<?> input, @Nullable Object key, PCollection<T> output) {
+  public <K, T> UncommittedBundle<T> createKeyedBundle(
+      CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) {
     return InProcessBundle.create(output, key);
   }
 
@@ -62,18 +61,18 @@ class InProcessBundleFactory implements BundleFactory {
    */
   private static final class InProcessBundle<T> implements UncommittedBundle<T> {
     private final PCollection<T> pcollection;
-    @Nullable private final Object key;
+    private final StructuralKey<?> key;
     private boolean committed = false;
     private ImmutableList.Builder<WindowedValue<T>> elements;
 
     /**
      * Create a new {@link InProcessBundle} for the specified {@link PCollection}.
      */
-    public static <T> InProcessBundle<T> create(PCollection<T> pcollection, @Nullable Object key) {
-      return new InProcessBundle<T>(pcollection, key);
+    public static <T> InProcessBundle<T> create(PCollection<T> pcollection, StructuralKey<?> key) {
+      return new InProcessBundle<>(pcollection, key);
     }
 
-    private InProcessBundle(PCollection<T> pcollection, Object key) {
+    private InProcessBundle(PCollection<T> pcollection, StructuralKey<?> key) {
       this.pcollection = pcollection;
       this.key = key;
       this.elements = ImmutableList.builder();
@@ -108,7 +107,7 @@ class InProcessBundleFactory implements BundleFactory {
   private static class CommittedInProcessBundle<T> implements CommittedBundle<T> {
     public CommittedInProcessBundle(
         PCollection<T> pcollection,
-        Object key,
+        StructuralKey<?> key,
         Iterable<WindowedValue<T>> committedElements,
         Instant synchronizedCompletionTime) {
       this.pcollection = pcollection;
@@ -118,13 +117,13 @@ class InProcessBundleFactory implements BundleFactory {
     }
 
     private final PCollection<T> pcollection;
-    private final Object key;
+    /** The structural value key of the Bundle, as specified by the coder that created it. */
+    private final StructuralKey<?> key;
     private final Iterable<WindowedValue<T>> committedElements;
     private final Instant synchronizedCompletionTime;
 
     @Override
-    @Nullable
-    public Object getKey() {
+    public StructuralKey<?> getKey() {
       return key;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
index 732a279..981a842 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessEvaluationContext.java
@@ -233,8 +233,8 @@ class InProcessEvaluationContext {
    * Create a {@link UncommittedBundle} with the specified keys at the specified step. For use by
    * {@link InProcessGroupByKeyOnly} {@link PTransform PTransforms}.
    */
-  public <T> UncommittedBundle<T> createKeyedBundle(
-      CommittedBundle<?> input, Object key, PCollection<T> output) {
+  public <K, T> UncommittedBundle<T> createKeyedBundle(
+      CommittedBundle<?> input, StructuralKey<K> key, PCollection<T> output) {
     return bundleFactory.createKeyedBundle(input, key, output);
   }
 
@@ -302,7 +302,7 @@ class InProcessEvaluationContext {
    * Get an {@link ExecutionContext} for the provided {@link AppliedPTransform} and key.
    */
   public InProcessExecutionContext getExecutionContext(
-      AppliedPTransform<?, ?, ?> application, Object key) {
+      AppliedPTransform<?, ?, ?> application, StructuralKey<?> key) {
     StepAndKey stepAndKey = StepAndKey.of(application, key);
     return new InProcessExecutionContext(
         options.getClock(),
@@ -372,9 +372,9 @@ class InProcessEvaluationContext {
    * <p>This is a destructive operation. Timers will only appear in the result of this method once
    * for each time they are set.
    */
-  public Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> extractFiredTimers() {
+  public Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> extractFiredTimers() {
     forceRefresh();
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired =
+    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> fired =
         watermarkManager.extractFiredTimers();
     return fired;
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
index 44d8bd9..4f10b3a 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessExecutionContext.java
@@ -33,11 +33,11 @@ import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
 class InProcessExecutionContext
     extends BaseExecutionContext<InProcessExecutionContext.InProcessStepContext> {
   private final Clock clock;
-  private final Object key;
+  private final StructuralKey<?> key;
   private final CopyOnAccessInMemoryStateInternals<Object> existingState;
   private final TransformWatermarks watermarks;
 
-  public InProcessExecutionContext(Clock clock, Object key,
+  public InProcessExecutionContext(Clock clock, StructuralKey<?> key,
       CopyOnAccessInMemoryStateInternals<Object> existingState, TransformWatermarks watermarks) {
     this.clock = clock;
     this.key = key;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
index 79db5b6..a10d496 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactory.java
@@ -147,8 +147,10 @@ class InProcessGroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFacto
         K key = groupedEntry.getKey().key;
         KeyedWorkItem<K, V> groupedKv =
             KeyedWorkItems.elementsWorkItem(key, groupedEntry.getValue());
-        UncommittedBundle<KeyedWorkItem<K, V>> bundle =
-            evaluationContext.createKeyedBundle(inputBundle, key, application.getOutput());
+        UncommittedBundle<KeyedWorkItem<K, V>> bundle = evaluationContext.createKeyedBundle(
+            inputBundle,
+            StructuralKey.of(key, keyCoder),
+            application.getOutput());
         bundle.add(WindowedValue.valueInGlobalWindow(groupedKv));
         resultBuilder.addOutput(bundle);
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
index 5a04af4..8847c58 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/InProcessPipelineRunner.java
@@ -60,8 +60,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
-import javax.annotation.Nullable;
-
 /**
  * An In-Memory implementation of the Dataflow Programming Model. Supports Unbounded
  * {@link PCollection PCollections}.
@@ -130,11 +128,10 @@ public class InProcessPipelineRunner
     PCollection<T> getPCollection();
 
     /**
-     * Returns the (possibly null) key that was output in the most recent {@link GroupByKey} in the
+     * Returns the key that was output in the most recent {@link GroupByKey} in the
      * execution of this bundle.
      */
-    @Nullable
-    Object getKey();
+    StructuralKey<?> getKey();
 
     /**
      * Returns an {@link Iterable} containing all of the elements that have been added to this

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java
index 1c7cf6c..18fe04f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StepAndKey.java
@@ -29,16 +29,16 @@ import java.util.Objects;
  */
 final class StepAndKey {
   private final AppliedPTransform<?, ?, ?> step;
-  private final Object key;
+  private final StructuralKey<?> key;
 
   /**
    * Create a new {@link StepAndKey} with the provided step and key.
    */
-  public static StepAndKey of(AppliedPTransform<?, ?, ?> step, Object key) {
+  public static StepAndKey of(AppliedPTransform<?, ?, ?> step, StructuralKey<?> key) {
     return new StepAndKey(step, key);
   }
 
-  private StepAndKey(AppliedPTransform<?, ?, ?> step, Object key) {
+  private StepAndKey(AppliedPTransform<?, ?, ?> step, StructuralKey<?> key) {
     this.step = step;
     this.key = key;
   }
@@ -47,7 +47,7 @@ final class StepAndKey {
   public String toString() {
     return MoreObjects.toStringHelper(StepAndKey.class)
         .add("step", step.getFullName())
-        .add("key", key)
+        .add("key", key.getKey())
         .toString();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java
new file mode 100644
index 0000000..249ccfe
--- /dev/null
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StructuralKey.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.direct;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
+
+/**
+ * A (Key, Coder) pair that uses the structural value of the key (as provided by
+ * {@link Coder#structuralValue(Object)}) to perform equality and hashing.
+ */
+class StructuralKey<K> {
+  /**
+   * Create a new Structural Key of the provided key that can be encoded by the provided coder.
+   */
+  public static <K> StructuralKey<K> of(K key, Coder<K> coder) {
+    try {
+      return new StructuralKey<>(coder, key);
+    } catch (Exception e) {
+      throw new IllegalArgumentException(
+          "Could not encode a key with its provided coder " + coder.getClass().getSimpleName(), e);
+    }
+  }
+
+  private final Coder<K> coder;
+  private final Object structuralValue;
+  private final byte[] encoded;
+
+  private StructuralKey(Coder<K> coder, K key) throws Exception {
+    this.coder = coder;
+    this.structuralValue = coder.structuralValue(key);
+    this.encoded = CoderUtils.encodeToByteArray(coder, key);
+  }
+
+  public K getKey() {
+    try {
+      return CoderUtils.decodeFromByteArray(coder, encoded);
+    } catch (CoderException e) {
+      throw new IllegalArgumentException(
+          "Could not decode Key with coder of type " + coder.getClass().getSimpleName());
+    }
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == this) {
+      return true;
+    }
+    if (other instanceof StructuralKey) {
+      StructuralKey that = (StructuralKey) other;
+      return structuralValue.equals(that.structuralValue);
+    }
+    return false;
+  }
+
+  @Override
+  public int hashCode() {
+    return structuralValue.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
index 92f845c..a4f900c 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/GroupByKeyEvaluatorFactoryTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
@@ -72,17 +73,27 @@ public class GroupByKeyEvaluatorFactoryTest {
     CommittedBundle<KV<String, WindowedValue<Integer>>> inputBundle =
         bundleFactory.createRootBundle(kvs).commit(Instant.now());
     InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
-
+    StructuralKey<String> fooKey = StructuralKey.of("foo", StringUtf8Coder.of());
     UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle =
-        bundleFactory.createKeyedBundle(null, "foo", groupedKvs);
+        bundleFactory.createKeyedBundle(null, fooKey, groupedKvs);
+
+    StructuralKey<String> barKey = StructuralKey.of("bar", StringUtf8Coder.of());
     UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle =
-        bundleFactory.createKeyedBundle(null, "bar", groupedKvs);
-    UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle =
-        bundleFactory.createKeyedBundle(null, "baz", groupedKvs);
+        bundleFactory.createKeyedBundle(null, barKey, groupedKvs);
 
-    when(evaluationContext.createKeyedBundle(inputBundle, "foo", groupedKvs)).thenReturn(fooBundle);
-    when(evaluationContext.createKeyedBundle(inputBundle, "bar", groupedKvs)).thenReturn(barBundle);
-    when(evaluationContext.createKeyedBundle(inputBundle, "baz", groupedKvs)).thenReturn(bazBundle);
+    StructuralKey<String> bazKey = StructuralKey.of("baz", StringUtf8Coder.of());
+    UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle =
+        bundleFactory.createKeyedBundle(null, bazKey, groupedKvs);
+
+    when(evaluationContext.createKeyedBundle(inputBundle,
+        fooKey,
+        groupedKvs)).thenReturn(fooBundle);
+    when(evaluationContext.createKeyedBundle(inputBundle,
+        barKey,
+        groupedKvs)).thenReturn(barBundle);
+    when(evaluationContext.createKeyedBundle(inputBundle,
+        bazKey,
+        groupedKvs)).thenReturn(bazBundle);
 
     // The input to a GroupByKey is assumed to be a KvCoder
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
index 557ebff..2e7847d 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ImmutabilityCheckingBundleFactoryTest.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertThat;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -75,7 +76,9 @@ public class ImmutabilityCheckingBundleFactoryTest {
   @Test
   public void noMutationKeyedBundleSucceeds() {
     CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
-    UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed);
+    UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root,
+        StructuralKey.of("mykey", StringUtf8Coder.of()),
+        transformed);
 
     WindowedValue<byte[]> windowedArray =
         WindowedValue.of(
@@ -121,7 +124,9 @@ public class ImmutabilityCheckingBundleFactoryTest {
   @Test
   public void mutationBeforeAddKeyedBundleSucceeds() {
     CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
-    UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed);
+    UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root,
+        StructuralKey.of("mykey", StringUtf8Coder.of()),
+        transformed);
 
     byte[] array = new byte[] {4, 8, 12};
     array[0] = Byte.MAX_VALUE;
@@ -172,7 +177,9 @@ public class ImmutabilityCheckingBundleFactoryTest {
   @Test
   public void mutationAfterAddKeyedBundleThrows() {
     CommittedBundle<byte[]> root = factory.createRootBundle(created).commit(Instant.now());
-    UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root, "mykey", transformed);
+    UncommittedBundle<byte[]> keyed = factory.createKeyedBundle(root,
+        StructuralKey.of("mykey", StringUtf8Coder.of()),
+        transformed);
 
     byte[] array = new byte[] {4, 8, 12};
     WindowedValue<byte[]> windowedArray =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
index 7f202fb..af08d02 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InMemoryWatermarkManagerTest.java
@@ -31,6 +31,9 @@ import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate.Timer
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Create;
@@ -397,16 +400,18 @@ public class InMemoryWatermarkManagerTest implements Serializable {
    */
   @Test
   public void updateWatermarkWithKeyedWatermarkHolds() {
-    CommittedBundle<Integer> firstKeyBundle =
-        bundleFactory.createKeyedBundle(null, "Odd", createdInts)
-            .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(1_000_000L)))
-            .add(WindowedValue.timestampedValueInGlobalWindow(3, new Instant(-1000L)))
-            .commit(clock.now());
+    CommittedBundle<Integer> firstKeyBundle = bundleFactory.createKeyedBundle(null,
+        StructuralKey.of("Odd", StringUtf8Coder.of()),
+        createdInts)
+        .add(WindowedValue.timestampedValueInGlobalWindow(1, new Instant(1_000_000L)))
+        .add(WindowedValue.timestampedValueInGlobalWindow(3, new Instant(-1000L)))
+        .commit(clock.now());
 
-    CommittedBundle<Integer> secondKeyBundle =
-        bundleFactory.createKeyedBundle(null, "Even", createdInts)
-            .add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L)))
-            .commit(clock.now());
+    CommittedBundle<Integer> secondKeyBundle = bundleFactory.createKeyedBundle(null,
+        StructuralKey.of("Even", StringUtf8Coder.of()),
+        createdInts)
+        .add(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(1234L)))
+        .commit(clock.now());
 
     manager.updateWatermarks(null,
         TimerUpdate.empty(),
@@ -435,8 +440,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         not(earlierThan(BoundedWindow.TIMESTAMP_MAX_VALUE)));
     assertThat(filteredWatermarks.getOutputWatermark(), not(laterThan(new Instant(-1000L))));
 
-    CommittedBundle<Integer> fauxFirstKeyTimerBundle =
-        bundleFactory.createKeyedBundle(null, "Odd", createdInts).commit(clock.now());
+    CommittedBundle<Integer> fauxFirstKeyTimerBundle = bundleFactory.createKeyedBundle(null,
+        StructuralKey.of("Odd", StringUtf8Coder.of()),
+        createdInts).commit(clock.now());
     manager.updateWatermarks(fauxFirstKeyTimerBundle,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
@@ -447,8 +453,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
 
     assertThat(filteredWatermarks.getOutputWatermark(), equalTo(new Instant(1234L)));
 
-    CommittedBundle<Integer> fauxSecondKeyTimerBundle =
-        bundleFactory.createKeyedBundle(null, "Even", createdInts).commit(clock.now());
+    CommittedBundle<Integer> fauxSecondKeyTimerBundle = bundleFactory.createKeyedBundle(null,
+        StructuralKey.of("Even", StringUtf8Coder.of()),
+        createdInts).commit(clock.now());
     manager.updateWatermarks(fauxSecondKeyTimerBundle,
         TimerUpdate.empty(),
         result(filtered.getProducingTransformInternal(),
@@ -846,13 +853,14 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     Instant initialFilteredWm = filteredWms.getSynchronizedProcessingOutputTime();
     Instant initialFilteredDoubledWm = filteredDoubledWms.getSynchronizedProcessingOutputTime();
 
+    StructuralKey<String> key = StructuralKey.of("key", StringUtf8Coder.of());
     CommittedBundle<Integer> filteredBundle = multiWindowedBundle(filtered, 2, 8);
     TimerData pastTimer =
         TimerData.of(StateNamespaces.global(), new Instant(250L), TimeDomain.PROCESSING_TIME);
     TimerData futureTimer =
         TimerData.of(StateNamespaces.global(), new Instant(4096L), TimeDomain.PROCESSING_TIME);
     TimerUpdate timers =
-        TimerUpdate.builder("key").setTimer(pastTimer).setTimer(futureTimer).build();
+        TimerUpdate.builder(key).setTimer(pastTimer).setTimer(futureTimer).build();
     manager.updateWatermarks(createdBundle,
         timers,
         result(filtered.getProducingTransformInternal(),
@@ -872,11 +880,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         filteredDoubledWms.getSynchronizedProcessingOutputTime(),
         not(earlierThan(initialFilteredDoubledWm)));
 
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firedTimers =
+    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firedTimers =
         manager.extractFiredTimers();
     assertThat(
         firedTimers.get(filtered.getProducingTransformInternal())
-            .get("key")
+            .get(key)
             .getTimers(TimeDomain.PROCESSING_TIME),
         contains(pastTimer));
     // Our timer has fired, but has not been completed, so it holds our synchronized processing WM
@@ -885,14 +893,14 @@ public class InMemoryWatermarkManagerTest implements Serializable {
 
     CommittedBundle<Integer> filteredTimerBundle =
         bundleFactory
-            .createKeyedBundle(null, "key", filtered)
+            .createKeyedBundle(null, key, filtered)
             .commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
     CommittedBundle<Integer> filteredTimerResult =
-        bundleFactory.createKeyedBundle(null, "key", filteredTimesTwo)
+        bundleFactory.createKeyedBundle(null, key, filteredTimesTwo)
             .commit(filteredWms.getSynchronizedProcessingOutputTime());
     // Complete the processing time timer
     manager.updateWatermarks(filteredTimerBundle,
-        TimerUpdate.builder("key")
+        TimerUpdate.builder(key)
             .withCompletedTimers(Collections.<TimerData>singleton(pastTimer)).build(),
         result(filtered.getProducingTransformInternal(),
             filteredTimerBundle.withElements(Collections.<WindowedValue<Integer>>emptyList()),
@@ -988,7 +996,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
     TimerData upstreamProcessingTimer =
         TimerData.of(StateNamespaces.global(), upstreamHold, TimeDomain.PROCESSING_TIME);
     manager.updateWatermarks(created,
-        TimerUpdate.builder("key").setTimer(upstreamProcessingTimer).build(),
+        TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
+            .setTimer(upstreamProcessingTimer)
+            .build(),
         result(filtered.getProducingTransformInternal(),
             created.withElements(Collections.<WindowedValue<Integer>>emptyList()),
             Collections.<CommittedBundle<?>>singleton(filteredBundle)),
@@ -1009,7 +1019,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
 
     CommittedBundle<Integer> otherCreated = multiWindowedBundle(createdInts, 4, 8, 12);
     manager.updateWatermarks(otherCreated,
-        TimerUpdate.builder("key")
+        TimerUpdate.builder(StructuralKey.of("key", StringUtf8Coder.of()))
             .withCompletedTimers(Collections.singleton(upstreamProcessingTimer)).build(),
         result(filtered.getProducingTransformInternal(),
             otherCreated.withElements(Collections.<WindowedValue<Integer>>emptyList()),
@@ -1032,8 +1042,9 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         new Instant(29_919_235L));
 
     Instant upstreamHold = new Instant(2048L);
-    CommittedBundle<Integer> filteredBundle =
-        bundleFactory.createKeyedBundle(created, "key", filtered).commit(upstreamHold);
+    CommittedBundle<Integer> filteredBundle = bundleFactory.createKeyedBundle(created,
+        StructuralKey.of("key", StringUtf8Coder.of()),
+        filtered).commit(upstreamHold);
     manager.updateWatermarks(
         created,
         TimerUpdate.empty(),
@@ -1053,7 +1064,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
 
   @Test
   public void extractFiredTimersReturnsFiredEventTimeTimers() {
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> initialTimers =
+    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> initialTimers =
         manager.extractFiredTimers();
     // Watermarks haven't advanced
     assertThat(initialTimers.entrySet(), emptyIterable());
@@ -1074,7 +1085,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         TimerData.of(StateNamespaces.global(), new Instant(5000L), TimeDomain.EVENT_TIME);
     TimerData lastTimer =
         TimerData.of(StateNamespaces.global(), new Instant(10000L), TimeDomain.EVENT_TIME);
-    Object key = new Object();
+    StructuralKey<byte[]> key = StructuralKey.of(new byte[] {1, 4, 9}, ByteArrayCoder.of());
     TimerUpdate update =
         TimerUpdate.builder(key)
             .setTimer(earliestTimer)
@@ -1090,11 +1101,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         new Instant(1000L));
     manager.refreshAll();
 
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers =
+    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firstTransformFiredTimers =
         manager.extractFiredTimers();
     assertThat(
         firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
-    Map<Object, FiredTimers> firstFilteredTimers =
+    Map<StructuralKey<?>, FiredTimers> firstFilteredTimers =
         firstTransformFiredTimers.get(filtered.getProducingTransformInternal());
     assertThat(firstFilteredTimers.get(key), not(nullValue()));
     FiredTimers firstFired = firstFilteredTimers.get(key);
@@ -1107,11 +1118,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(50_000L));
     manager.refreshAll();
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
+    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> secondTransformFiredTimers =
         manager.extractFiredTimers();
     assertThat(
         secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
-    Map<Object, FiredTimers> secondFilteredTimers =
+    Map<StructuralKey<?>, FiredTimers> secondFilteredTimers =
         secondTransformFiredTimers.get(filtered.getProducingTransformInternal());
     assertThat(secondFilteredTimers.get(key), not(nullValue()));
     FiredTimers secondFired = secondFilteredTimers.get(key);
@@ -1121,7 +1132,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
 
   @Test
   public void extractFiredTimersReturnsFiredProcessingTimeTimers() {
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> initialTimers =
+    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> initialTimers =
         manager.extractFiredTimers();
     // Watermarks haven't advanced
     assertThat(initialTimers.entrySet(), emptyIterable());
@@ -1141,7 +1152,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         TimerData.of(StateNamespaces.global(), new Instant(5000L), TimeDomain.PROCESSING_TIME);
     TimerData lastTimer =
         TimerData.of(StateNamespaces.global(), new Instant(10000L), TimeDomain.PROCESSING_TIME);
-    Object key = new Object();
+    StructuralKey<?> key = StructuralKey.of(-12L, VarLongCoder.of());
     TimerUpdate update =
         TimerUpdate.builder(key)
             .setTimer(lastTimer)
@@ -1158,11 +1169,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         new Instant(1000L));
     manager.refreshAll();
 
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers =
+    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firstTransformFiredTimers =
         manager.extractFiredTimers();
     assertThat(
         firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
-    Map<Object, FiredTimers> firstFilteredTimers =
+    Map<StructuralKey<?>, FiredTimers> firstFilteredTimers =
         firstTransformFiredTimers.get(filtered.getProducingTransformInternal());
     assertThat(firstFilteredTimers.get(key), not(nullValue()));
     FiredTimers firstFired = firstFilteredTimers.get(key);
@@ -1176,11 +1187,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(50_000L));
     manager.refreshAll();
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
+    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> secondTransformFiredTimers =
         manager.extractFiredTimers();
     assertThat(
         secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
-    Map<Object, FiredTimers> secondFilteredTimers =
+    Map<StructuralKey<?>, FiredTimers> secondFilteredTimers =
         secondTransformFiredTimers.get(filtered.getProducingTransformInternal());
     assertThat(secondFilteredTimers.get(key), not(nullValue()));
     FiredTimers secondFired = secondFilteredTimers.get(key);
@@ -1190,7 +1201,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
 
   @Test
   public void extractFiredTimersReturnsFiredSynchronizedProcessingTimeTimers() {
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> initialTimers =
+    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> initialTimers =
         manager.extractFiredTimers();
     // Watermarks haven't advanced
     assertThat(initialTimers.entrySet(), emptyIterable());
@@ -1210,7 +1221,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         StateNamespaces.global(), new Instant(5000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
     TimerData lastTimer = TimerData.of(
         StateNamespaces.global(), new Instant(10000L), TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
-    Object key = new Object();
+    StructuralKey<byte[]> key = StructuralKey.of(new byte[] {2, -2, 22}, ByteArrayCoder.of());
     TimerUpdate update =
         TimerUpdate.builder(key)
             .setTimer(lastTimer)
@@ -1227,11 +1238,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         new Instant(1000L));
     manager.refreshAll();
 
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> firstTransformFiredTimers =
+    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> firstTransformFiredTimers =
         manager.extractFiredTimers();
     assertThat(
         firstTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
-    Map<Object, FiredTimers> firstFilteredTimers =
+    Map<StructuralKey<?>, FiredTimers> firstFilteredTimers =
         firstTransformFiredTimers.get(filtered.getProducingTransformInternal());
     assertThat(firstFilteredTimers.get(key), not(nullValue()));
     FiredTimers firstFired = firstFilteredTimers.get(key);
@@ -1246,11 +1257,11 @@ public class InMemoryWatermarkManagerTest implements Serializable {
             Collections.<CommittedBundle<?>>emptyList()),
         new Instant(50_000L));
     manager.refreshAll();
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> secondTransformFiredTimers =
+    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> secondTransformFiredTimers =
         manager.extractFiredTimers();
     assertThat(
         secondTransformFiredTimers.get(filtered.getProducingTransformInternal()), not(nullValue()));
-    Map<Object, FiredTimers> secondFilteredTimers =
+    Map<StructuralKey<?>, FiredTimers> secondFilteredTimers =
         secondTransformFiredTimers.get(filtered.getProducingTransformInternal());
     assertThat(secondFilteredTimers.get(key), not(nullValue()));
     FiredTimers secondFired = secondFilteredTimers.get(key);
@@ -1271,7 +1282,7 @@ public class InMemoryWatermarkManagerTest implements Serializable {
         TimerData.of(StateNamespaces.global(), new Instant(2048L), TimeDomain.EVENT_TIME);
 
     TimerUpdate update =
-        TimerUpdate.builder("foo")
+        TimerUpdate.builder(StructuralKey.of("foo", StringUtf8Coder.of()))
             .withCompletedTimers(ImmutableList.of(completedOne, completedTwo))
             .setTimer(set)
             .deletedTimer(deleted)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java
index 1809dc6..abe2a19 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessBundleFactoryTest.java
@@ -19,11 +19,15 @@ package org.apache.beam.runners.direct;
 
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 
 import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.WithKeys;
@@ -69,34 +73,38 @@ public class InProcessBundleFactoryTest {
   }
 
   @Test
-  public void createRootBundleShouldCreateWithNullKey() {
+  public void createRootBundleShouldCreateWithEmptyKey() {
     PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1));
 
     UncommittedBundle<Integer> inFlightBundle = bundleFactory.createRootBundle(pcollection);
 
     CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now());
 
-    assertThat(bundle.getKey(), nullValue());
+    assertThat(bundle.getKey(),
+        Matchers.<StructuralKey<?>>equalTo(StructuralKey.of(null, VoidCoder.of())));
   }
 
-  private void createKeyedBundle(Object key) {
+  private <T> void createKeyedBundle(Coder<T> coder, T key) throws Exception {
     PCollection<Integer> pcollection = TestPipeline.create().apply(Create.of(1));
+    StructuralKey skey = StructuralKey.of(key, coder);
 
     UncommittedBundle<Integer> inFlightBundle =
-        bundleFactory.createKeyedBundle(null, key, pcollection);
+        bundleFactory.createKeyedBundle(null, skey, pcollection);
 
     CommittedBundle<Integer> bundle = inFlightBundle.commit(Instant.now());
-    assertThat(bundle.getKey(), equalTo(key));
+    assertThat(bundle.getKey(), equalTo(skey));
   }
 
   @Test
-  public void keyedWithNullKeyShouldCreateKeyedBundle() {
-    createKeyedBundle(null);
+  public void keyedWithNullKeyShouldCreateKeyedBundle() throws Exception {
+    createKeyedBundle(VoidCoder.of(), null);
   }
 
   @Test
-  public void keyedWithKeyShouldCreateKeyedBundle() {
-    createKeyedBundle(new Object());
+  public void keyedWithKeyShouldCreateKeyedBundle() throws Exception {
+    createKeyedBundle(StringUtf8Coder.of(), "foo");
+    createKeyedBundle(VarIntCoder.of(), 1234);
+    createKeyedBundle(ByteArrayCoder.of(), new byte[] {0, 2, 4, 99});
   }
 
   private <T> CommittedBundle<T>
@@ -154,7 +162,7 @@ public class InProcessBundleFactoryTest {
 
     assertThat(withed.getElements(), containsInAnyOrder(firstReplacement, secondReplacement));
     assertThat(committed.getElements(), containsInAnyOrder(firstValue, secondValue));
-    assertThat(withed.getKey(), equalTo(committed.getKey()));
+    assertThat(withed.getKey(), Matchers.<StructuralKey<?>>equalTo(committed.getKey()));
     assertThat(withed.getPCollection(), equalTo(committed.getPCollection()));
     assertThat(
         withed.getSynchronizedProcessingOutputWatermark(),
@@ -203,21 +211,21 @@ public class InProcessBundleFactoryTest {
   @Test
   public void createBundleKeyedResultPropagatesKey() {
     CommittedBundle<KV<String, Integer>> newBundle =
-        bundleFactory
-            .createBundle(
-                bundleFactory.createKeyedBundle(null, "foo", created).commit(Instant.now()),
-                downstream)
-            .commit(Instant.now());
-    assertThat(newBundle.getKey(), Matchers.<Object>equalTo("foo"));
+        bundleFactory.createBundle(
+            bundleFactory.createKeyedBundle(
+                null,
+                StructuralKey.of("foo", StringUtf8Coder.of()),
+                created).commit(Instant.now()),
+            downstream).commit(Instant.now());
+    assertThat(newBundle.getKey().getKey(), Matchers.<Object>equalTo("foo"));
   }
 
   @Test
   public void createKeyedBundleKeyed() {
-    CommittedBundle<KV<String, Integer>> keyedBundle =
-        bundleFactory
-            .createKeyedBundle(
-                bundleFactory.createRootBundle(created).commit(Instant.now()), "foo", downstream)
-            .commit(Instant.now());
-    assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo"));
+    CommittedBundle<KV<String, Integer>> keyedBundle = bundleFactory.createKeyedBundle(
+        bundleFactory.createRootBundle(created).commit(Instant.now()),
+        StructuralKey.of("foo", StringUtf8Coder.of()),
+        downstream).commit(Instant.now());
+    assertThat(keyedBundle.getKey().getKey(), Matchers.<Object>equalTo("foo"));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
index 10b8721..18db400 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessEvaluationContextTest.java
@@ -31,6 +31,8 @@ import org.apache.beam.runners.direct.InProcessExecutionContext.InProcessStepCon
 import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.PCollectionViewWriter;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
+import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.io.CountingInput;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -159,16 +161,16 @@ public class InProcessEvaluationContextTest {
   @Test
   public void getExecutionContextSameStepSameKeyState() {
     InProcessExecutionContext fooContext =
-        context.getExecutionContext(created.getProducingTransformInternal(), "foo");
+        context.getExecutionContext(created.getProducingTransformInternal(),
+            StructuralKey.of("foo", StringUtf8Coder.of()));
 
     StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
     InProcessStepContext stepContext = fooContext.getOrCreateStepContext("s1", "s1");
     stepContext.stateInternals().state(StateNamespaces.global(), intBag).add(1);
 
-    context.handleResult(
-        InProcessBundleFactory.create()
-            .createKeyedBundle(null, "foo", created)
+    context.handleResult(InProcessBundleFactory.create()
+            .createKeyedBundle(null, StructuralKey.of("foo", StringUtf8Coder.of()), created)
             .commit(Instant.now()),
         ImmutableList.<TimerData>of(),
         StepTransformResult.withoutHold(created.getProducingTransformInternal())
@@ -176,7 +178,8 @@ public class InProcessEvaluationContextTest {
             .build());
 
     InProcessExecutionContext secondFooContext =
-        context.getExecutionContext(created.getProducingTransformInternal(), "foo");
+        context.getExecutionContext(created.getProducingTransformInternal(),
+            StructuralKey.of("foo", StringUtf8Coder.of()));
     assertThat(
         secondFooContext
             .getOrCreateStepContext("s1", "s1")
@@ -190,7 +193,8 @@ public class InProcessEvaluationContextTest {
   @Test
   public void getExecutionContextDifferentKeysIndependentState() {
     InProcessExecutionContext fooContext =
-        context.getExecutionContext(created.getProducingTransformInternal(), "foo");
+        context.getExecutionContext(created.getProducingTransformInternal(),
+            StructuralKey.of("foo", StringUtf8Coder.of()));
 
     StateTag<Object, BagState<Integer>> intBag = StateTags.bag("myBag", VarIntCoder.of());
 
@@ -201,7 +205,8 @@ public class InProcessEvaluationContextTest {
         .add(1);
 
     InProcessExecutionContext barContext =
-        context.getExecutionContext(created.getProducingTransformInternal(), "bar");
+        context.getExecutionContext(created.getProducingTransformInternal(),
+            StructuralKey.of("bar", StringUtf8Coder.of()));
     assertThat(barContext, not(equalTo(fooContext)));
     assertThat(
         barContext
@@ -214,7 +219,7 @@ public class InProcessEvaluationContextTest {
 
   @Test
   public void getExecutionContextDifferentStepsIndependentState() {
-    String myKey = "foo";
+    StructuralKey<?> myKey = StructuralKey.of("foo", StringUtf8Coder.of());
     InProcessExecutionContext fooContext =
         context.getExecutionContext(created.getProducingTransformInternal(), myKey);
 
@@ -269,7 +274,7 @@ public class InProcessEvaluationContextTest {
 
   @Test
   public void handleResultStoresState() {
-    String myKey = "foo";
+    StructuralKey<?> myKey = StructuralKey.of("foo".getBytes(), ByteArrayCoder.of());
     InProcessExecutionContext fooContext =
         context.getExecutionContext(downstream.getProducingTransformInternal(), myKey);
 
@@ -359,7 +364,7 @@ public class InProcessEvaluationContextTest {
             .build();
     context.handleResult(null, ImmutableList.<TimerData>of(), holdResult);
 
-    String key = "foo";
+    StructuralKey<?> key = StructuralKey.of("foo".length(), VarIntCoder.of());
     TimerData toFire =
         TimerData.of(StateNamespaces.global(), new Instant(100L), TimeDomain.EVENT_TIME);
     InProcessTransformResult timerResult =
@@ -383,11 +388,12 @@ public class InProcessEvaluationContextTest {
     // Should cause the downstream timer to fire
     context.handleResult(null, ImmutableList.<TimerData>of(), advanceResult);
 
-    Map<AppliedPTransform<?, ?, ?>, Map<Object, FiredTimers>> fired = context.extractFiredTimers();
+    Map<AppliedPTransform<?, ?, ?>, Map<StructuralKey<?>, FiredTimers>> fired =
+        context.extractFiredTimers();
     assertThat(
         fired,
         Matchers.<AppliedPTransform<?, ?, ?>>hasKey(downstream.getProducingTransformInternal()));
-    Map<Object, FiredTimers> downstreamFired =
+    Map<StructuralKey<?>, FiredTimers> downstreamFired =
         fired.get(downstream.getProducingTransformInternal());
     assertThat(downstreamFired, Matchers.<Object>hasKey(key));
 
@@ -402,23 +408,27 @@ public class InProcessEvaluationContextTest {
 
   @Test
   public void createBundleKeyedResultPropagatesKey() {
+    StructuralKey<String> key = StructuralKey.of("foo", StringUtf8Coder.of());
     CommittedBundle<KV<String, Integer>> newBundle =
         context
             .createBundle(
-                bundleFactory.createKeyedBundle(null, "foo", created).commit(Instant.now()),
-                downstream)
-            .commit(Instant.now());
-    assertThat(newBundle.getKey(), Matchers.<Object>equalTo("foo"));
+                bundleFactory.createKeyedBundle(
+                    null, key,
+                    created).commit(Instant.now()),
+                downstream).commit(Instant.now());
+    assertThat(newBundle.getKey(), Matchers.<StructuralKey<?>>equalTo(key));
   }
 
   @Test
   public void createKeyedBundleKeyed() {
+    StructuralKey<String> key = StructuralKey.of("foo", StringUtf8Coder.of());
     CommittedBundle<KV<String, Integer>> keyedBundle =
-        context
-            .createKeyedBundle(
-                bundleFactory.createRootBundle(created).commit(Instant.now()), "foo", downstream)
-            .commit(Instant.now());
-    assertThat(keyedBundle.getKey(), Matchers.<Object>equalTo("foo"));
+        context.createKeyedBundle(
+            bundleFactory.createRootBundle(created).commit(Instant.now()),
+            key,
+            downstream).commit(Instant.now());
+    assertThat(keyedBundle.getKey(),
+        Matchers.<StructuralKey<?>>equalTo(key));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
index 1172a4d..28a3cf6 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessGroupByKeyOnlyEvaluatorFactoryTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.runners.direct.InProcessPipelineRunner.CommittedBundle;
 import org.apache.beam.runners.direct.InProcessPipelineRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.util.GroupByKeyViaGroupByKeyOnly.ReifyTimestampsAndWindows;
@@ -73,16 +74,28 @@ public class InProcessGroupByKeyOnlyEvaluatorFactoryTest {
         bundleFactory.createRootBundle(kvs).commit(Instant.now());
     InProcessEvaluationContext evaluationContext = mock(InProcessEvaluationContext.class);
 
-    UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle =
-        bundleFactory.createKeyedBundle(null, "foo", groupedKvs);
-    UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle =
-        bundleFactory.createKeyedBundle(null, "bar", groupedKvs);
-    UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle =
-        bundleFactory.createKeyedBundle(null, "baz", groupedKvs);
-
-    when(evaluationContext.createKeyedBundle(inputBundle, "foo", groupedKvs)).thenReturn(fooBundle);
-    when(evaluationContext.createKeyedBundle(inputBundle, "bar", groupedKvs)).thenReturn(barBundle);
-    when(evaluationContext.createKeyedBundle(inputBundle, "baz", groupedKvs)).thenReturn(bazBundle);
+    StructuralKey<String> fooKey = StructuralKey.of("foo", StringUtf8Coder.of());
+    UncommittedBundle<KeyedWorkItem<String, Integer>> fooBundle = bundleFactory.createKeyedBundle(
+        null, fooKey,
+        groupedKvs);
+    StructuralKey<String> barKey = StructuralKey.of("bar", StringUtf8Coder.of());
+    UncommittedBundle<KeyedWorkItem<String, Integer>> barBundle = bundleFactory.createKeyedBundle(
+        null, barKey,
+        groupedKvs);
+    StructuralKey<String> bazKey = StructuralKey.of("baz", StringUtf8Coder.of());
+    UncommittedBundle<KeyedWorkItem<String, Integer>> bazBundle = bundleFactory.createKeyedBundle(
+        null, bazKey,
+        groupedKvs);
+
+    when(evaluationContext.createKeyedBundle(inputBundle,
+        fooKey,
+        groupedKvs)).thenReturn(fooBundle);
+    when(evaluationContext.createKeyedBundle(inputBundle,
+        barKey,
+        groupedKvs)).thenReturn(barBundle);
+    when(evaluationContext.createKeyedBundle(inputBundle,
+        bazKey,
+        groupedKvs)).thenReturn(bazBundle);
 
     // The input to a GroupByKey is assumed to be a KvCoder
     @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
index 5a92ce3..9314f5e 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessPipelineRunnerTest.java
@@ -18,23 +18,34 @@
 package org.apache.beam.runners.direct;
 
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.fail;
 
 import org.apache.beam.runners.direct.InProcessPipelineRunner.InProcessPipelineResult;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import com.google.common.collect.ImmutableMap;
 
 import com.fasterxml.jackson.annotation.JsonValue;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.internal.matchers.ThrowableMessageMatcher;
@@ -43,6 +54,7 @@ import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 import java.io.Serializable;
+import java.util.Map;
 
 /**
  * Tests for basic {@link InProcessPipelineRunner} functionality.
@@ -79,6 +91,54 @@ public class InProcessPipelineRunnerTest implements Serializable {
     result.awaitCompletion();
   }
 
+  @Test(timeout = 5000L)
+  public void byteArrayCountShouldSucceed() {
+    Pipeline p = getPipeline();
+
+    SerializableFunction<Integer, byte[]> getBytes = new SerializableFunction<Integer, byte[]>() {
+      @Override
+      public byte[] apply(Integer input) {
+        try {
+          return CoderUtils.encodeToByteArray(VarIntCoder.of(), input);
+        } catch (CoderException e) {
+          fail("Unexpected Coder Exception " + e);
+          throw new AssertionError("Unreachable");
+        }
+      }
+    };
+    TypeDescriptor<byte[]> td = new TypeDescriptor<byte[]>() {
+    };
+    PCollection<byte[]> foos =
+        p.apply(Create.of(1, 1, 1, 2, 2, 3)).apply(MapElements.via(getBytes).withOutputType(td));
+    PCollection<byte[]> msync =
+        p.apply(Create.of(1, -2, -8, -16)).apply(MapElements.via(getBytes).withOutputType(td));
+    PCollection<byte[]> bytes =
+        PCollectionList.of(foos).and(msync).apply(Flatten.<byte[]>pCollections());
+    PCollection<KV<byte[], Long>> counts = bytes.apply(Count.<byte[]>perElement());
+    PCollection<KV<Integer, Long>> countsBackToString =
+        counts.apply(MapElements.via(new SimpleFunction<KV<byte[], Long>, KV<Integer, Long>>() {
+          @Override
+          public KV<Integer, Long> apply(KV<byte[], Long> input) {
+            try {
+              return KV.of(CoderUtils.decodeFromByteArray(VarIntCoder.of(), input.getKey()),
+                  input.getValue());
+            } catch (CoderException e) {
+              fail("Unexpected Coder Exception " + e);
+              throw new AssertionError("Unreachable");
+        }
+      }
+    }));
+
+    Map<Integer, Long> expected = ImmutableMap.<Integer, Long>builder().put(1, 4L)
+        .put(2, 2L)
+        .put(3, 1L)
+        .put(-2, 1L)
+        .put(-8, 1L)
+        .put(-16, 1L)
+        .build();
+    PAssert.thatMap(countsBackToString).isEqualTo(expected);
+  }
+
   @Test
   public void transformDisplayDataExceptionShouldFail() {
     DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java
index 34a8980..3e01f44 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/InProcessTimerInternalsTest.java
@@ -25,6 +25,7 @@ import static org.mockito.Mockito.when;
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.TimerUpdate.TimerUpdateBuilder;
 import org.apache.beam.runners.direct.InMemoryWatermarkManager.TransformWatermarks;
+import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals.TimerData;
 import org.apache.beam.sdk.util.state.StateNamespaces;
@@ -55,7 +56,7 @@ public class InProcessTimerInternalsTest {
     MockitoAnnotations.initMocks(this);
     clock = MockClock.fromInstant(new Instant(0));
 
-    timerUpdateBuilder = TimerUpdate.builder(1234);
+    timerUpdateBuilder = TimerUpdate.builder(StructuralKey.of(1234, VarIntCoder.of()));
 
     internals = InProcessTimerInternals.create(clock, watermarks, timerUpdateBuilder);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
index ca15d9c..1127ed2 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoInProcessEvaluatorTest.java
@@ -154,7 +154,7 @@ public class ParDoInProcessEvaluatorTest {
     when(stepContext.getTimerUpdate()).thenReturn(TimerUpdate.empty());
     when(
             evaluationContext.getExecutionContext(
-                Mockito.any(AppliedPTransform.class), Mockito.any(Object.class)))
+                Mockito.any(AppliedPTransform.class), Mockito.any(StructuralKey.class)))
         .thenReturn(executionContext);
     when(evaluationContext.createCounterSet()).thenReturn(new CounterSet());
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
index cecfe01..a6f31c0 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoMultiEvaluatorFactoryTest.java
@@ -114,8 +114,8 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
 
     InProcessExecutionContext executionContext =
         new InProcessExecutionContext(null, null, null, null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
-        .thenReturn(executionContext);
+    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
+        inputBundle.getKey())).thenReturn(executionContext);
     CounterSet counters = new CounterSet();
     when(evaluationContext.createCounterSet()).thenReturn(counters);
 
@@ -199,8 +199,8 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
 
     InProcessExecutionContext executionContext =
         new InProcessExecutionContext(null, null, null, null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
-        .thenReturn(executionContext);
+    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
+        inputBundle.getKey())).thenReturn(executionContext);
     CounterSet counters = new CounterSet();
     when(evaluationContext.createCounterSet()).thenReturn(counters);
 
@@ -287,10 +287,12 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
     when(evaluationContext.createBundle(inputBundle, elementOutput))
         .thenReturn(elementOutputBundle);
 
-    InProcessExecutionContext executionContext =
-        new InProcessExecutionContext(null, "myKey", null, null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
-        .thenReturn(executionContext);
+    InProcessExecutionContext executionContext = new InProcessExecutionContext(null,
+        StructuralKey.of("myKey", StringUtf8Coder.of()),
+        null,
+        null);
+    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
+        inputBundle.getKey())).thenReturn(executionContext);
     CounterSet counters = new CounterSet();
     when(evaluationContext.createCounterSet()).thenReturn(counters);
 
@@ -397,10 +399,11 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
     when(evaluationContext.createBundle(inputBundle, elementOutput))
         .thenReturn(elementOutputBundle);
 
-    InProcessExecutionContext executionContext =
-        new InProcessExecutionContext(null, "myKey", null, null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
-        .thenReturn(executionContext);
+    InProcessExecutionContext executionContext = new InProcessExecutionContext(null,
+        StructuralKey.of("myKey", StringUtf8Coder.of()),
+        null, null);
+    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
+        inputBundle.getKey())).thenReturn(executionContext);
     CounterSet counters = new CounterSet();
     when(evaluationContext.createCounterSet()).thenReturn(counters);
 
@@ -419,7 +422,7 @@ public class ParDoMultiEvaluatorFactoryTest implements Serializable {
     assertThat(
         result.getTimerUpdate(),
         equalTo(
-            TimerUpdate.builder("myKey")
+            TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of()))
                 .setTimer(addedTimer)
                 .setTimer(addedTimer)
                 .setTimer(addedTimer)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3cc4fa4/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
index 236ad17..a1480e5 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoSingleEvaluatorFactoryTest.java
@@ -90,8 +90,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
     InProcessExecutionContext executionContext =
         new InProcessExecutionContext(null, null, null, null);
-    when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), null))
-        .thenReturn(executionContext);
+    when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
+        inputBundle.getKey())).thenReturn(executionContext);
     CounterSet counters = new CounterSet();
     when(evaluationContext.createCounterSet()).thenReturn(counters);
 
@@ -142,8 +142,8 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     when(evaluationContext.createBundle(inputBundle, collection)).thenReturn(outputBundle);
     InProcessExecutionContext executionContext =
         new InProcessExecutionContext(null, null, null, null);
-    when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(), null))
-        .thenReturn(executionContext);
+    when(evaluationContext.getExecutionContext(collection.getProducingTransformInternal(),
+        inputBundle.getKey())).thenReturn(executionContext);
     CounterSet counters = new CounterSet();
     when(evaluationContext.createCounterSet()).thenReturn(counters);
 
@@ -204,9 +204,11 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
 
     when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
 
-    InProcessExecutionContext executionContext =
-        new InProcessExecutionContext(null, "myKey", null, null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
+    InProcessExecutionContext executionContext = new InProcessExecutionContext(null,
+        StructuralKey.of("myKey", StringUtf8Coder.of()),
+        null, null);
+    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
+        inputBundle.getKey()))
         .thenReturn(executionContext);
     CounterSet counters = new CounterSet();
     when(evaluationContext.createCounterSet()).thenReturn(counters);
@@ -292,6 +294,7 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
             });
     PCollection<KV<String, Integer>> mainOutput = input.apply(pardo);
 
+    StructuralKey<?> key = StructuralKey.of("myKey", StringUtf8Coder.of());
     CommittedBundle<String> inputBundle =
         bundleFactory.createRootBundle(input).commit(Instant.now());
 
@@ -301,9 +304,12 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
 
     when(evaluationContext.createBundle(inputBundle, mainOutput)).thenReturn(mainOutputBundle);
 
-    InProcessExecutionContext executionContext =
-        new InProcessExecutionContext(null, "myKey", null, null);
-    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(), null))
+    InProcessExecutionContext executionContext = new InProcessExecutionContext(null,
+        key,
+        null,
+        null);
+    when(evaluationContext.getExecutionContext(mainOutput.getProducingTransformInternal(),
+        inputBundle.getKey()))
         .thenReturn(executionContext);
     CounterSet counters = new CounterSet();
     when(evaluationContext.createCounterSet()).thenReturn(counters);
@@ -316,9 +322,10 @@ public class ParDoSingleEvaluatorFactoryTest implements Serializable {
     evaluator.processElement(WindowedValue.valueInGlobalWindow("foo"));
 
     InProcessTransformResult result = evaluator.finishBundle();
-    assertThat(
-        result.getTimerUpdate(),
-        equalTo(
-            TimerUpdate.builder("myKey").setTimer(addedTimer).deletedTimer(deletedTimer).build()));
+    assertThat(result.getTimerUpdate(),
+        equalTo(TimerUpdate.builder(StructuralKey.of("myKey", StringUtf8Coder.of()))
+            .setTimer(addedTimer)
+            .deletedTimer(deletedTimer)
+            .build()));
   }
 }



Mime
View raw message