beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [14/50] [abbrv] beam git commit: Integrate bundle retry code for the DirectRunner
Date Fri, 17 Nov 2017 20:31:05 GMT
Integrate bundle retry code for the DirectRunner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9a183f95
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9a183f95
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9a183f95

Branch: refs/heads/tez-runner
Commit: 9a183f95d2a9f1d1dd4124da7d609b81a3b69d8e
Parents: 2b4a6b5
Author: Maria Garcia Herrero <mariagh@google.com>
Authored: Fri Nov 10 14:50:16 2017 -0800
Committer: chamikara@google.com <chamikara@google.com>
Committed: Mon Nov 13 11:36:27 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/options/pipeline_options.py |  7 -------
 sdks/python/apache_beam/pipeline_test.py            |  3 +--
 sdks/python/apache_beam/runners/direct/executor.py  | 14 +-------------
 3 files changed, 2 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9a183f95/sdks/python/apache_beam/options/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py
index 5278b8a..aaac9a4 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -313,13 +313,6 @@ class DirectOptions(PipelineOptions):
         help='DirectRunner uses stacked WindowedValues within a Bundle for '
         'memory optimization. Set --no_direct_runner_use_stacked_bundle to '
         'avoid it.')
-    parser.add_argument(
-        '--direct_runner_bundle_retry',
-        action='store_true',
-        default=False,
-        help=
-        ('Whether to allow bundle retries. If True the maximum'
-         'number of attempts to process a bundle is 4. '))
 
 
 class GoogleCloudOptions(PipelineOptions):

http://git-wip-us.apache.org/repos/asf/beam/blob/9a183f95/sdks/python/apache_beam/pipeline_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 9bbb0d7..567ab92 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -506,8 +506,7 @@ class RunnerApiTest(unittest.TestCase):
 class DirectRunnerRetryTests(unittest.TestCase):
 
   def test_retry_fork_graph(self):
-    pipeline_options = PipelineOptions(['--direct_runner_bundle_retry'])
-    p = beam.Pipeline(options=pipeline_options)
+    p = beam.Pipeline()
 
     # TODO(mariagh): Remove the use of globals from the test.
     global count_b, count_c # pylint: disable=global-variable-undefined

http://git-wip-us.apache.org/repos/asf/beam/blob/9a183f95/sdks/python/apache_beam/runners/direct/executor.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/direct/executor.py b/sdks/python/apache_beam/runners/direct/executor.py
index 51fe908..853f19f 100644
--- a/sdks/python/apache_beam/runners/direct/executor.py
+++ b/sdks/python/apache_beam/runners/direct/executor.py
@@ -30,7 +30,6 @@ from weakref import WeakValueDictionary
 
 from apache_beam.metrics.execution import MetricsContainer
 from apache_beam.metrics.execution import ScopedMetricsContainer
-from apache_beam.options.pipeline_options import DirectOptions
 
 
 class _ExecutorService(object):
@@ -278,13 +277,7 @@ class TransformExecutor(_ExecutorService.CallableTask):
     self.blocked = False
     self._call_count = 0
     self._retry_count = 0
-    # Switch to turn on/off the retry of bundles.
-    pipeline_options = self._evaluation_context.pipeline_options
-    # TODO(mariagh): Remove once "bundle retry" is no longer experimental.
-    if not pipeline_options.view_as(DirectOptions).direct_runner_bundle_retry:
-      self._max_retries_per_bundle = 1
-    else:
-      self._max_retries_per_bundle = TransformExecutor._MAX_RETRY_PER_BUNDLE
+    self._max_retries_per_bundle = TransformExecutor._MAX_RETRY_PER_BUNDLE
 
   def call(self):
     self._call_count += 1
@@ -319,11 +312,6 @@ class TransformExecutor(_ExecutorService.CallableTask):
         if self._retry_count == self._max_retries_per_bundle:
           logging.error('Giving up after %s attempts.',
                         self._max_retries_per_bundle)
-          if self._retry_count == 1:
-            logging.info(
-                'Use the experimental flag --direct_runner_bundle_retry'
-                ' to retry failed bundles (up to %d times).',
-                TransformExecutor._MAX_RETRY_PER_BUNDLE)
           self._completion_callback.handle_exception(self, e)
 
     self._evaluation_context.metrics().commit_physical(


Mime
View raw message