beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [07/11] incubator-beam git commit: Output Keyed Bundles in GroupAlsoByWindowEvaluator
Date Thu, 24 Nov 2016 00:03:07 GMT
Output Keyed Bundles in GroupAlsoByWindowEvaluator

This allows reuse of keys for downstream serialization.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f03b4fe1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f03b4fe1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f03b4fe1

Branch: refs/heads/python-sdk
Commit: f03b4fe11cb605edf216903738a6c305b3a91066
Parents: 6fa8f65
Author: Thomas Groh <tgroh@google.com>
Authored: Tue Nov 22 14:51:39 2016 -0800
Committer: Davor Bonaci <davor@google.com>
Committed: Wed Nov 23 16:02:04 2016 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/beam/runners/direct/DirectRunner.java  | 5 ++++-
 .../beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java  | 4 +++-
 2 files changed, 7 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f03b4fe1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index 0060e84..cb31947 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.GBKIntoKeyedWorkItems;
+import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.runners.direct.TestStreamEvaluatorFactory.DirectTestStreamFactory;
@@ -310,7 +311,9 @@ public class DirectRunner
     KeyedPValueTrackingVisitor keyedPValueVisitor =
         KeyedPValueTrackingVisitor.create(
             ImmutableSet.<Class<? extends PTransform>>of(
-                GroupByKey.class, DirectGroupByKeyOnly.class));
+                GBKIntoKeyedWorkItems.class,
+                DirectGroupByKeyOnly.class,
+                DirectGroupAlsoByWindow.class));
     pipeline.traverseTopologically(keyedPValueVisitor);
 
     DisplayDataValidator.validatePipeline(pipeline);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f03b4fe1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index b946e4d..36c742b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -112,6 +112,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory
{
     private @SuppressWarnings("unchecked") final WindowingStrategy<?, BoundedWindow>
         windowingStrategy;
 
+    private final StructuralKey<?> structuralKey;
     private final Collection<UncommittedBundle<?>> outputBundles;
     private final ImmutableList.Builder<WindowedValue<KeyedWorkItem<K, V>>>
unprocessedElements;
     private final AggregatorContainer.Mutator aggregatorChanges;
@@ -130,6 +131,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory
{
       this.evaluationContext = evaluationContext;
       this.application = application;
 
+      structuralKey = inputBundle.getKey();
       stepContext = evaluationContext
           .getExecutionContext(application, inputBundle.getKey())
           .getOrCreateStepContext(
@@ -159,7 +161,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory
{
       K key = workItem.key();
 
       UncommittedBundle<KV<K, Iterable<V>>> bundle =
-          evaluationContext.createBundle(application.getOutput());
+          evaluationContext.createKeyedBundle(structuralKey, application.getOutput());
       outputBundles.add(bundle);
       CopyOnAccessInMemoryStateInternals<K> stateInternals =
           (CopyOnAccessInMemoryStateInternals<K>) stepContext.stateInternals();


Mime
View raw message