Key DoFnInstanceManager cache on AppliedPTransform Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2c018cd4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2c018cd4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2c018cd4 Branch: refs/heads/master Commit: 2c018cd471f62e7d6e4d5f61b461c9c0e8cb9247 Parents: add2fa6 Author: Kenneth Knowles Authored: Tue Sep 12 11:39:12 2017 -0700 Committer: Kenneth Knowles Committed: Tue Sep 12 13:01:34 2017 -0700 ---------------------------------------------------------------------- .../runners/direct/ParDoEvaluatorFactory.java | 30 +++++++++----------- ...littableProcessElementsEvaluatorFactory.java | 3 +- .../direct/StatefulParDoEvaluatorFactory.java | 15 ++++++++-- .../direct/TransformEvaluatorRegistry.java | 5 +++- 4 files changed, 33 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2c018cd4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index 408a7df..47df0d4 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -40,33 +40,33 @@ import org.slf4j.LoggerFactory; final class ParDoEvaluatorFactory implements TransformEvaluatorFactory { private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class); - private final LoadingCache, DoFnLifecycleManager> fnClones; + private final LoadingCache, DoFnLifecycleManager> fnClones; private final EvaluationContext evaluationContext; private final ParDoEvaluator.DoFnRunnerFactory runnerFactory; ParDoEvaluatorFactory( EvaluationContext evaluationContext, - ParDoEvaluator.DoFnRunnerFactory runnerFactory) { + ParDoEvaluator.DoFnRunnerFactory runnerFactory, + CacheLoader, DoFnLifecycleManager> doFnCacheLoader) { this.evaluationContext = evaluationContext; this.runnerFactory = runnerFactory; fnClones = - CacheBuilder.newBuilder() - .build( - new CacheLoader, DoFnLifecycleManager>() { - @Override - public DoFnLifecycleManager load(DoFn key) throws Exception { - return DoFnLifecycleManager.of(key); - } - }); + CacheBuilder.newBuilder().build(doFnCacheLoader); + } + + static CacheLoader, DoFnLifecycleManager> basicDoFnCacheLoader() { + return new CacheLoader, DoFnLifecycleManager>() { + @Override + public DoFnLifecycleManager load(AppliedPTransform application) throws Exception { + return DoFnLifecycleManager.of(ParDoTranslation.getDoFn(application)); + } + }; } @Override public TransformEvaluator forApplication( AppliedPTransform application, CommittedBundle inputBundle) throws Exception { - final DoFn doFn = - (DoFn) ParDoTranslation.getDoFn(application); - @SuppressWarnings({"unchecked", "rawtypes"}) TransformEvaluator evaluator = (TransformEvaluator) @@ -74,7 +74,6 @@ final class ParDoEvaluatorFactory implements TransformEvaluator (AppliedPTransform) application, (PCollection) inputBundle.getPCollection(), inputBundle.getKey(), - doFn, ParDoTranslation.getSideInputs(application), (TupleTag) ParDoTranslation.getMainOutputTag(application), ParDoTranslation.getAdditionalOutputTags(application).getAll()); @@ -98,7 +97,6 @@ final class ParDoEvaluatorFactory implements TransformEvaluator AppliedPTransform, PCollectionTuple, ?> application, PCollection mainInput, StructuralKey inputBundleKey, - DoFn doFn, List> sideInputs, TupleTag mainOutputTag, List> additionalOutputTags) @@ -109,7 +107,7 @@ final class ParDoEvaluatorFactory implements TransformEvaluator .getExecutionContext(application, inputBundleKey) .getStepContext(stepName); - DoFnLifecycleManager fnManager = fnClones.getUnchecked(doFn); + DoFnLifecycleManager fnManager = fnClones.getUnchecked(application); return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping( createParDoEvaluator( http://git-wip-us.apache.org/repos/asf/beam/blob/2c018cd4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index bc7b193..852ad2f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -64,7 +64,8 @@ class SplittableProcessElementsEvaluatorFactory< new ParDoEvaluatorFactory<>( evaluationContext, SplittableProcessElementsEvaluatorFactory - .processFnRunnerFactory()); + .processFnRunnerFactory(), + ParDoEvaluatorFactory.basicDoFnCacheLoader()); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/2c018cd4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index bdec9c8..42bfe0b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -66,7 +66,19 @@ final class StatefulParDoEvaluatorFactory implements Transfo StatefulParDoEvaluatorFactory(EvaluationContext evaluationContext) { this.delegateFactory = new ParDoEvaluatorFactory<>( - evaluationContext, ParDoEvaluator., OutputT>defaultRunnerFactory()); + evaluationContext, + ParDoEvaluator., OutputT>defaultRunnerFactory(), + new CacheLoader, DoFnLifecycleManager>() { + @Override + public DoFnLifecycleManager load(AppliedPTransform appliedStatefulParDo) + throws Exception { + // StatefulParDo is overridden after the portable pipeline is received, so we + // do not go through the portability translation layers + StatefulParDo statefulParDo = + (StatefulParDo) appliedStatefulParDo.getTransform(); + return DoFnLifecycleManager.of(statefulParDo.getDoFn()); + } + }); this.cleanupRegistry = CacheBuilder.newBuilder() .weakValues() @@ -119,7 +131,6 @@ final class StatefulParDoEvaluatorFactory implements Transfo (AppliedPTransform) application, (PCollection) inputBundle.getPCollection(), inputBundle.getKey(), - doFn, application.getTransform().getSideInputs(), application.getTransform().getMainOutputTag(), application.getTransform().getAdditionalOutputTags().getAll()); http://git-wip-us.apache.org/repos/asf/beam/blob/2c018cd4/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index 30666db..7bba7d5 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -64,7 +64,10 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { .put(READ_TRANSFORM_URN, new ReadEvaluatorFactory(ctxt)) .put( PAR_DO_TRANSFORM_URN, - new ParDoEvaluatorFactory<>(ctxt, ParDoEvaluator.defaultRunnerFactory())) + new ParDoEvaluatorFactory<>( + ctxt, + ParDoEvaluator.defaultRunnerFactory(), + ParDoEvaluatorFactory.basicDoFnCacheLoader())) .put(FLATTEN_TRANSFORM_URN, new FlattenEvaluatorFactory(ctxt)) .put(WINDOW_TRANSFORM_URN, new WindowEvaluatorFactory(ctxt))