beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [2/3] beam git commit: Key DoFnInstanceManager cache on AppliedPTransform
Date Tue, 12 Sep 2017 23:57:51 GMT
Key DoFnInstanceManager cache on AppliedPTransform


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2c018cd4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2c018cd4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2c018cd4

Branch: refs/heads/master
Commit: 2c018cd471f62e7d6e4d5f61b461c9c0e8cb9247
Parents: add2fa6
Author: Kenneth Knowles <klk@google.com>
Authored: Tue Sep 12 11:39:12 2017 -0700
Committer: Kenneth Knowles <klk@google.com>
Committed: Tue Sep 12 13:01:34 2017 -0700

----------------------------------------------------------------------
 .../runners/direct/ParDoEvaluatorFactory.java   | 30 +++++++++-----------
 ...littableProcessElementsEvaluatorFactory.java |  3 +-
 .../direct/StatefulParDoEvaluatorFactory.java   | 15 ++++++++--
 .../direct/TransformEvaluatorRegistry.java      |  5 +++-
 4 files changed, 33 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2c018cd4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index 408a7df..47df0d4 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -40,33 +40,33 @@ import org.slf4j.LoggerFactory;
 final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluatorFactory
{
 
   private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class);
-  private final LoadingCache<DoFn<?, ?>, DoFnLifecycleManager> fnClones;
+  private final LoadingCache<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>
fnClones;
   private final EvaluationContext evaluationContext;
   private final ParDoEvaluator.DoFnRunnerFactory<InputT, OutputT> runnerFactory;
 
   ParDoEvaluatorFactory(
       EvaluationContext evaluationContext,
-      ParDoEvaluator.DoFnRunnerFactory<InputT, OutputT> runnerFactory) {
+      ParDoEvaluator.DoFnRunnerFactory<InputT, OutputT> runnerFactory,
+      CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager> doFnCacheLoader)
{
     this.evaluationContext = evaluationContext;
     this.runnerFactory = runnerFactory;
     fnClones =
-        CacheBuilder.newBuilder()
-            .build(
-                new CacheLoader<DoFn<?, ?>, DoFnLifecycleManager>() {
-                  @Override
-                  public DoFnLifecycleManager load(DoFn<?, ?> key) throws Exception
{
-                    return DoFnLifecycleManager.of(key);
-                  }
-                });
+        CacheBuilder.newBuilder().build(doFnCacheLoader);
+  }
+
+  static CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager> basicDoFnCacheLoader()
{
+    return new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>()
{
+      @Override
+      public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> application) throws
Exception {
+        return DoFnLifecycleManager.of(ParDoTranslation.getDoFn(application));
+      }
+    };
   }
 
   @Override
   public <T> TransformEvaluator<T> forApplication(
       AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle)
throws Exception {
 
-    final DoFn<InputT, OutputT> doFn =
-        (DoFn<InputT, OutputT>) ParDoTranslation.getDoFn(application);
-
     @SuppressWarnings({"unchecked", "rawtypes"})
     TransformEvaluator<T> evaluator =
         (TransformEvaluator<T>)
@@ -74,7 +74,6 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
                 (AppliedPTransform) application,
                 (PCollection<InputT>) inputBundle.getPCollection(),
                 inputBundle.getKey(),
-                doFn,
                 ParDoTranslation.getSideInputs(application),
                 (TupleTag<OutputT>) ParDoTranslation.getMainOutputTag(application),
                 ParDoTranslation.getAdditionalOutputTags(application).getAll());
@@ -98,7 +97,6 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
       AppliedPTransform<PCollection<InputT>, PCollectionTuple, ?> application,
       PCollection<InputT> mainInput,
       StructuralKey<?> inputBundleKey,
-      DoFn<InputT, OutputT> doFn,
       List<PCollectionView<?>> sideInputs,
       TupleTag<OutputT> mainOutputTag,
       List<TupleTag<?>> additionalOutputTags)
@@ -109,7 +107,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator
             .getExecutionContext(application, inputBundleKey)
             .getStepContext(stepName);
 
-    DoFnLifecycleManager fnManager = fnClones.getUnchecked(doFn);
+    DoFnLifecycleManager fnManager = fnClones.getUnchecked(application);
 
     return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(
         createParDoEvaluator(

http://git-wip-us.apache.org/repos/asf/beam/blob/2c018cd4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index bc7b193..852ad2f 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -64,7 +64,8 @@ class SplittableProcessElementsEvaluatorFactory<
         new ParDoEvaluatorFactory<>(
             evaluationContext,
             SplittableProcessElementsEvaluatorFactory
-                .<InputT, OutputT, RestrictionT>processFnRunnerFactory());
+                .<InputT, OutputT, RestrictionT>processFnRunnerFactory(),
+            ParDoEvaluatorFactory.basicDoFnCacheLoader());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/2c018cd4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index bdec9c8..42bfe0b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -66,7 +66,19 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements
Transfo
   StatefulParDoEvaluatorFactory(EvaluationContext evaluationContext) {
     this.delegateFactory =
         new ParDoEvaluatorFactory<>(
-            evaluationContext, ParDoEvaluator.<KV<K, InputT>, OutputT>defaultRunnerFactory());
+            evaluationContext,
+            ParDoEvaluator.<KV<K, InputT>, OutputT>defaultRunnerFactory(),
+            new CacheLoader<AppliedPTransform<?, ?, ?>, DoFnLifecycleManager>()
{
+              @Override
+              public DoFnLifecycleManager load(AppliedPTransform<?, ?, ?> appliedStatefulParDo)
+                  throws Exception {
+                // StatefulParDo is overridden after the portable pipeline is received, so
we
+                // do not go through the portability translation layers
+                StatefulParDo<?, ?, ?> statefulParDo =
+                    (StatefulParDo<?, ?, ?>) appliedStatefulParDo.getTransform();
+                return DoFnLifecycleManager.of(statefulParDo.getDoFn());
+              }
+            });
     this.cleanupRegistry =
         CacheBuilder.newBuilder()
             .weakValues()
@@ -119,7 +131,6 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements
Transfo
             (AppliedPTransform) application,
             (PCollection) inputBundle.getPCollection(),
             inputBundle.getKey(),
-            doFn,
             application.getTransform().getSideInputs(),
             application.getTransform().getMainOutputTag(),
             application.getTransform().getAdditionalOutputTags().getAll());

http://git-wip-us.apache.org/repos/asf/beam/blob/2c018cd4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 30666db..7bba7d5 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -64,7 +64,10 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
             .put(READ_TRANSFORM_URN, new ReadEvaluatorFactory(ctxt))
             .put(
                 PAR_DO_TRANSFORM_URN,
-                new ParDoEvaluatorFactory<>(ctxt, ParDoEvaluator.defaultRunnerFactory()))
+                new ParDoEvaluatorFactory<>(
+                    ctxt,
+                    ParDoEvaluator.defaultRunnerFactory(),
+                    ParDoEvaluatorFactory.basicDoFnCacheLoader()))
             .put(FLATTEN_TRANSFORM_URN, new FlattenEvaluatorFactory(ctxt))
             .put(WINDOW_TRANSFORM_URN, new WindowEvaluatorFactory(ctxt))
 


Mime
View raw message