beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [08/50] [abbrv] beam git commit: Properly handle side input exception when all reader threads complete
Date Fri, 17 Nov 2017 20:30:59 GMT
Properly handle side input exception when all reader threads complete


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6779b8ec
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6779b8ec
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6779b8ec

Branch: refs/heads/tez-runner
Commit: 6779b8ec0e872de86ed13fdfc9b260f69f44dfab
Parents: a0eb00e
Author: Charles Chen <ccy@google.com>
Authored: Fri Nov 10 11:28:43 2017 -0800
Committer: chamikara@google.com <chamikara@google.com>
Committed: Fri Nov 10 15:21:15 2017 -0800

----------------------------------------------------------------------
 .../apache_beam/runners/worker/sideinputs.py      |  3 +++
 .../apache_beam/runners/worker/sideinputs_test.py | 18 ++++++++++++++++++
 2 files changed, 21 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6779b8ec/sdks/python/apache_beam/runners/worker/sideinputs.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs.py b/sdks/python/apache_beam/runners/worker/sideinputs.py
index bdf9f4e..c91fe95 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs.py
@@ -116,6 +116,7 @@ class PrefetchingSourceSetIterable(object):
       self.element_queue.put(READER_THREAD_IS_DONE_SENTINEL)
 
   def __iter__(self):
+    # pylint: disable=too-many-nested-blocks
     if self.already_iterated:
       raise RuntimeError(
           'Can only iterate once over PrefetchingSourceSetIterable instance.')
@@ -132,6 +133,8 @@ class PrefetchingSourceSetIterable(object):
         if element is READER_THREAD_IS_DONE_SENTINEL:
           num_readers_finished += 1
           if num_readers_finished == self.num_reader_threads:
+            if self.has_errored:
+              raise self.reader_exceptions.get()
             return
         elif self.has_errored:
           raise self.reader_exceptions.get()

http://git-wip-us.apache.org/repos/asf/beam/blob/6779b8ec/sdks/python/apache_beam/runners/worker/sideinputs_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/worker/sideinputs_test.py b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
index d243bbe..73d34fb 100644
--- a/sdks/python/apache_beam/runners/worker/sideinputs_test.py
+++ b/sdks/python/apache_beam/runners/worker/sideinputs_test.py
@@ -91,6 +91,24 @@ class PrefetchingSourceIteratorTest(unittest.TestCase):
         sources, max_reader_threads=1)
     assert list(strip_windows(iterator_fn())) == range(11)
 
+  def test_source_iterator_single_source_exception(self):
+    class MyException(Exception):
+      pass
+
+    def exception_generator():
+      yield 0
+      raise MyException('I am an exception!')
+
+    sources = [
+        FakeSource(exception_generator()),
+    ]
+    iterator_fn = sideinputs.get_iterator_fn_for_sources(sources)
+    seen = set()
+    with self.assertRaises(MyException):
+      for value in iterator_fn():
+        seen.add(value.value)
+    self.assertEqual(sorted(seen), [0])
+
   def test_source_iterator_fn_exception(self):
     class MyException(Exception):
       pass


Mime
View raw message