beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [1/2] beam git commit: [BEAM-1392] DoFn teardown not called on empty partitions
Date Sun, 05 Feb 2017 10:03:05 GMT
Repository: beam
Updated Branches:
  refs/heads/master 6e220bb37 -> e5afbb27f


[BEAM-1392] DoFn teardown not called on empty partitions


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/25f91353
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/25f91353
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/25f91353

Branch: refs/heads/master
Commit: 25f913536019aff80d189cf7136f2acc3b6db7c2
Parents: 6e220bb
Author: Aviem Zur <aviemzur@gmail.com>
Authored: Sun Feb 5 11:22:00 2017 +0200
Committer: Sela <ansela@paypal.com>
Committed: Sun Feb 5 11:35:31 2017 +0200

----------------------------------------------------------------------
 .../apache/beam/runners/spark/translation/SparkProcessContext.java | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/25f91353/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index 9957bf3..60c9d4d 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -62,8 +62,10 @@ class SparkProcessContext<FnInputT, FnOutputT, OutputT> {
 
     // skip if partition is empty.
     if (!partition.hasNext()) {
+      DoFnInvokers.invokerFor(doFn).invokeTeardown();
       return Lists.newArrayList();
     }
+
     // call startBundle() before beginning to process the partition.
     doFnRunner.startBundle();
     // process the partition; finishBundle() is called from within the output iterator.


Mime
View raw message