beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-3183) Add the ability run a pipeline via Runner.run(transform)
Date Tue, 02 Jan 2018 20:16:00 GMT

    [ https://issues.apache.org/jira/browse/BEAM-3183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16308641#comment-16308641
] 

ASF GitHub Bot commented on BEAM-3183:
--------------------------------------

robertwb closed pull request #4223: [BEAM-3183] Simplifying beam pipeline in python
URL: https://github.com/apache/beam/pull/4223
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py
index 5725e51a775..76398904b81 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -321,13 +321,15 @@ def replace_all(self, replacements):
     for override in replacements:
       self._check_replacement(override)
 
-  def run(self, test_runner_api=True):
+  def run(self, test_runner_api=True, runner=None):
     """Runs the pipeline. Returns whatever our runner returns after running."""
-
+    # Override default pipeline runner if runner arg is specified.
+    if not runner:
+      runner = self.runner
     # When possible, invoke a round trip through the runner API.
     if test_runner_api and self._verify_runner_api_compatible():
       return Pipeline.from_runner_api(
-          self.to_runner_api(), self.runner, self._options).run(False)
+          self.to_runner_api(), runner, self._options).run(False)
 
     if self._options.view_as(SetupOptions).save_main_session:
       # If this option is chosen, verify we can pickle the main session early.
@@ -336,7 +338,8 @@ def run(self, test_runner_api=True):
         pickler.dump_session(os.path.join(tmpdir, 'main_session.pickle'))
       finally:
         shutil.rmtree(tmpdir)
-    return self.runner.run(self)
+
+    return runner.run_pipeline(self)
 
   def __enter__(self):
     return self
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 6253c80f83b..206ab3755c3 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -261,7 +261,7 @@ def visit_transform(self, transform_node):
 
     return FlattenInputVisitor()
 
-  def run(self, pipeline):
+  def run_pipeline(self, pipeline):
     """Remotely executes entire pipeline or parts reachable from node."""
     # Import here to avoid adding the dependency for local running scenarios.
     try:
@@ -296,7 +296,7 @@ def run(self, pipeline):
     pipeline.visit(self.flatten_input_visitor())
 
     # The superclass's run will trigger a traversal of all reachable nodes.
-    super(DataflowRunner, self).run(pipeline)
+    super(DataflowRunner, self).run_pipeline(pipeline)
 
     test_options = pipeline._options.view_as(TestOptions)
     # If it is a dry run, return without submitting the job.
diff --git a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
index b2330c04d57..aad3fc7b88f 100644
--- a/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/test_dataflow_runner.py
@@ -27,7 +27,7 @@
 
 
 class TestDataflowRunner(DataflowRunner):
-  def run(self, pipeline):
+  def run_pipeline(self, pipeline):
     """Execute test pipeline and verify test matcher"""
     options = pipeline._options.view_as(TestOptions)
     on_success_matcher = options.on_success_matcher
@@ -36,7 +36,7 @@ def run(self, pipeline):
     # send this option to remote executors.
     options.on_success_matcher = None
 
-    self.result = super(TestDataflowRunner, self).run(pipeline)
+    self.result = super(TestDataflowRunner, self).run_pipeline(pipeline)
     if self.result.has_job:
       project = pipeline._options.view_as(GoogleCloudOptions).project
       region_id = pipeline._options.view_as(GoogleCloudOptions).region
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 794a96be12b..95f763dad39 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -183,7 +183,7 @@ def _flush(self):
     output.element_type = unicode
     return output
 
-  def run(self, pipeline):
+  def run_pipeline(self, pipeline):
     """Execute the entire pipeline and returns an DirectPipelineResult."""
 
     # Performing configured PTransform overrides.
diff --git a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
index 84bed4270bc..0eb9fba89b2 100644
--- a/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
+++ b/sdks/python/apache_beam/runners/experimental/python_rpc_direct/python_rpc_direct_runner.py
@@ -47,7 +47,7 @@ class PythonRPCDirectRunner(PipelineRunner):
   def __init__(self):
     self._cache = None
 
-  def run(self, pipeline):
+  def run_pipeline(self, pipeline):
     """Remotely executes entire pipeline or parts reachable from node."""
 
     # Performing configured PTransform overrides.
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index e40faa5a38d..c221bc00c8f 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -171,7 +171,6 @@ def items(self):
 class FnApiRunner(runner.PipelineRunner):
 
   def __init__(self, use_grpc=False, sdk_harness_factory=None):
-    super(FnApiRunner, self).__init__()
     self._last_uid = -1
     self._use_grpc = use_grpc
     if sdk_harness_factory and not use_grpc:
@@ -182,7 +181,7 @@ def _next_uid(self):
     self._last_uid += 1
     return str(self._last_uid)
 
-  def run(self, pipeline):
+  def run_pipeline(self, pipeline):
     MetricsEnvironment.set_metrics_supported(False)
     return self.run_via_runner_api(pipeline.to_runner_api())
 
diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
index 5b580a619df..913ccd59585 100644
--- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
+++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner.py
@@ -62,7 +62,7 @@ def has_metrics_support(self):
     """
     return False
 
-  def run(self, pipeline):
+  def run_pipeline(self, pipeline):
     MetricsEnvironment.set_metrics_supported(self.has_metrics_support())
     # List of map tasks  Each map task is a list of
     # (stage_name, operation_specs.WorkerOperation) instructions.
@@ -80,7 +80,7 @@ def run(self, pipeline):
     self.dependencies = collections.defaultdict(set)
 
     # Visit the graph, building up the map_tasks and their metadata.
-    super(MapTaskExecutorRunner, self).run(pipeline)
+    super(MapTaskExecutorRunner, self).run_pipeline(pipeline)
 
     # Now run the tasks in topological order.
     def compute_depth_map(deps):
diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner.py b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
index b9511949973..ecc09b54ae8 100644
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py
+++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
@@ -57,7 +57,6 @@ class UniversalLocalRunner(runner.PipelineRunner):
   def __init__(self, use_grpc=True, use_subprocesses=False):
     if use_subprocesses and not use_grpc:
       raise ValueError("GRPC must be used with subprocesses")
-    super(UniversalLocalRunner, self).__init__()
     self._use_grpc = use_grpc
     self._use_subprocesses = use_subprocesses
 
@@ -135,7 +134,7 @@ def _start_local_runner_subprocess_job_service(self):
     logging.info("Server ready.")
     return job_service
 
-  def run(self, pipeline):
+  def run_pipeline(self, pipeline):
     job_service = self._get_job_service()
     prepare_response = job_service.Prepare(
         beam_job_api_pb2.PrepareJobRequest(
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index bdabd8189fe..7a244bd01e8 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -116,7 +116,41 @@ class PipelineRunner(object):
   materialized values in order to reduce footprint.
   """
 
-  def run(self, pipeline):
+  def run(self, graph, options=None):
+    """Run a pipeline or ptransform with the runner.
+
+    If graph is a PTransform object, run the single step by creating a pipeline
+    using this runner and the provided options.
+
+    If graph is a Pipeline object, runs the graph using this runner instead of
+    the graph's default runner, but keep the graph's PipelineOptions.
+
+    Args:
+      graph: a Pipeline or PTransform object
+
+    Returns:
+      None
+    """
+
+    # Local import avoids circular dependencies for importing modules
+    from apache_beam.pipeline import Pipeline
+    from apache_beam.transforms.ptransform import PTransform
+
+    if isinstance(graph, PTransform):
+      # Create a pipeline from this runner and the options provided,
+      # and run the ptransform as a standalone step.
+      from apache_beam.pipeline import Pipeline
+      p = Pipeline(runner=self, options=options)
+      p | graph
+      result = p.run()
+      result.wait_until_finish()
+      return result
+    elif isinstance(graph, Pipeline):
+      # Run the pipeline directly
+      return graph.run(runner=self)
+
+
+  def run_pipeline(self, pipeline):
     """Execute the entire pipeline or the sub-DAG reachable from a node."""
 
     # Imported here to avoid circular dependencies.
diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py
index 063c8a2bcec..f4dcab00938 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -35,7 +35,7 @@
 from apache_beam.metrics.metricbase import MetricName
 from apache_beam.options.pipeline_options import PipelineOptions
 from apache_beam.pipeline import Pipeline
-from apache_beam.runners import DirectRunner
+from apache_beam.runners import DirectRunner, DataflowRunner
 from apache_beam.runners import create_runner
 from apache_beam.testing.util import assert_that
 from apache_beam.testing.util import equal_to
@@ -67,7 +67,8 @@ def test_create_runner_shorthand(self):
     self.assertTrue(
         isinstance(create_runner('Direct'), DirectRunner))
 
-  def test_direct_runner_metrics(self):
+  @staticmethod
+  def _create_counting_do_fn_class():
     from apache_beam.metrics.metric import Metrics
 
     class MyDoFn(beam.DoFn):
@@ -86,18 +87,10 @@ def process(self, element):
         distro.update(element)
         return [element]
 
-    runner = DirectRunner()
-    p = Pipeline(runner,
-                 options=PipelineOptions(self.default_properties))
-    pcoll = (p | ptransform.Create([1, 2, 3, 4, 5])
-             | 'Do' >> beam.ParDo(MyDoFn()))
-    assert_that(pcoll, equal_to([1, 2, 3, 4, 5]))
-    result = p.run()
-    result.wait_until_finish()
-    metrics = result.metrics().query()
-    namespace = '{}.{}'.format(MyDoFn.__module__,
-                               MyDoFn.__name__)
+    return MyDoFn
 
+  @staticmethod
+  def _check_metric_counters(metrics, namespace):
     hc.assert_that(
         metrics['counters'],
         hc.contains_inanyorder(
@@ -118,6 +111,100 @@ def process(self, element):
                 DistributionResult(DistributionData(15, 5, 1, 5)),
                 DistributionResult(DistributionData(15, 5, 1, 5)))))
 
+  def test_direct_runner_metrics(self):
+    my_do_fn = self._create_counting_do_fn_class()
+
+    runner = DirectRunner()
+    p = Pipeline(runner=runner,
+                 options=PipelineOptions(self.default_properties))
+    pcoll = (p | ptransform.Create([1, 2, 3, 4, 5])
+             | 'Do' >> beam.ParDo(my_do_fn()))
+    assert_that(pcoll, equal_to([1, 2, 3, 4, 5]))
+    result = p.run()
+    result.wait_until_finish()
+    metrics = result.metrics().query()
+    namespace = '{}.{}'.format(my_do_fn.__module__,
+                               my_do_fn.__name__)
+
+    self._check_metric_counters(metrics, namespace)
+
+  def test_run_pipeline(self):
+    my_do_fn = self._create_counting_do_fn_class()
+
+    # Test that the pipeline runs successfully using the direct runner
+    runner = DirectRunner()
+    p = Pipeline(options=PipelineOptions(self.default_properties))
+    pcoll = (p | ptransform.Create([1, 2, 3, 4, 5])
+             | 'Do' >> beam.ParDo(my_do_fn()))
+    assert_that(pcoll, equal_to([1, 2, 3, 4, 5]))
+    result = runner.run(p)
+    result.wait_until_finish()
+    metrics = result.metrics().query()
+    namespace = '{}.{}'.format(my_do_fn.__module__,
+                               my_do_fn.__name__)
+
+    self._check_metric_counters(metrics, namespace)
+
+  def test_single_step(self):
+    from apache_beam.metrics.metric import Metrics
+
+    class CreateAndScaleTransform(ptransform.PTransform):
+
+      def __init__(self, label=None, scalar=2):
+        super(CreateAndScaleTransform, self).__init__(label)
+        self._scalar=scalar
+
+      def expand(self, pbegin):
+        assert isinstance(pbegin, beam.pvalue.PBegin)
+        ret = (pbegin
+               | 'create' >> ptransform.Create([1, 2, 3, 4, 5])
+               | 'scale' >> beam.ParDo(ScaleDoFn(self._scalar)))
+        return ret
+
+    class ScaleDoFn(beam.DoFn):
+      def __init__(self, scalar):
+        self._scalar = scalar
+
+      def process(self, element):
+        transformed_element = element * self._scalar
+        counter = Metrics.counter(self.__class__, 'elements')
+        counter.inc()
+        sum_counter = Metrics.counter(self.__class__, 'sum_inputs')
+        sum_counter.inc(element)
+        sum_counter = Metrics.counter(self.__class__, 'sum_outputs')
+        sum_counter.inc(transformed_element)
+        return [transformed_element]
+
+    runner = DirectRunner()
+    result = runner.run(
+      CreateAndScaleTransform('create_and_scale'),
+      options=PipelineOptions(self.default_properties)
+    )
+
+    metrics = result.metrics().query()
+    namespace = '{}.{}'.format(ScaleDoFn.__module__,
+                               ScaleDoFn.__name__)
+    hc.assert_that(
+        metrics['counters'],
+        hc.contains_inanyorder(
+            MetricResult(
+                MetricKey('create_and_scale/scale',
+                          MetricName(namespace, 'elements')),
+                5, 5
+            ),
+            MetricResult(
+                MetricKey('create_and_scale/scale',
+                          MetricName(namespace, 'sum_inputs')),
+                15, 15
+            ),
+            MetricResult(
+                MetricKey('create_and_scale/scale',
+                          MetricName(namespace, 'sum_outputs')),
+                30, 30
+            )
+        )
+    )
+
 
 if __name__ == '__main__':
   unittest.main()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add the ability run a pipeline via Runner.run(transform)
> --------------------------------------------------------
>
>                 Key: BEAM-3183
>                 URL: https://issues.apache.org/jira/browse/BEAM-3183
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-core, sdk-py-core
>            Reporter: Robert Bradshaw
>
> See http://s.apache.org/no-beam-pipeline



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message