beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-5324) Finish Python 3 porting for unpackaged files
Date Thu, 20 Sep 2018 19:12:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-5324?focusedWorklogId=146101&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-146101
]

ASF GitHub Bot logged work on BEAM-5324:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 20/Sep/18 19:11
            Start Date: 20/Sep/18 19:11
    Worklog Time Spent: 10m 
      Work Description: aaltay closed pull request #6424: [BEAM-5324] Partially port unpackaged
modules to Python 3
URL: https://github.com/apache/beam/pull/6424
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index 9d4e79a9a9a..6397820e6e9 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -22,6 +22,7 @@
 import copy
 import logging
 import platform
+import sys
 import unittest
 from builtins import object
 from builtins import range
@@ -390,6 +391,8 @@ def process(self, element):
     assert_that(pcoll, equal_to([11, 12]))
     pipeline.run()
 
+  @unittest.skipIf(sys.version_info[0] == 3, 'This test still needs to be '
+                                             'fixed on Python 3')
   def test_side_input_no_tag(self):
     class TestDoFn(DoFn):
       def process(self, element, prefix, suffix):
@@ -405,6 +408,8 @@ def process(self, element, prefix, suffix):
     assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
     pipeline.run()
 
+  @unittest.skipIf(sys.version_info[0] == 3, 'This test still needs to be '
+                                             'fixed on Python 3')
   def test_side_input_tagged(self):
     class TestDoFn(DoFn):
       def process(self, element, prefix, suffix=DoFn.SideInputParam):
@@ -520,13 +525,14 @@ def test_dir(self):
     options = Breakfast()
     self.assertEquals(
         set(['from_dictionary', 'get_all_options', 'slices', 'style',
-             'view_as', 'display_data', 'next']),
-        set([attr for attr in dir(options) if not attr.startswith('_')]))
+             'view_as', 'display_data']),
+        set([attr for attr in dir(options) if not attr.startswith('_') and
+             attr != 'next']))
     self.assertEquals(
         set(['from_dictionary', 'get_all_options', 'style', 'view_as',
-             'display_data', 'next']),
+             'display_data']),
         set([attr for attr in dir(options.view_as(Eggs))
-             if not attr.startswith('_')]))
+             if not attr.startswith('_') and attr != 'next']))
 
 
 class RunnerApiTest(unittest.TestCase):
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index 9e39ca82c84..f6e0d558346 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -541,7 +541,7 @@ def expand_gbk(stages):
                 pipeline_components.pcollections[pcoll_id], pipeline_components)
 
           # This is used later to correlate the read and write.
-          param = str("group:%s" % stage.name)
+          param = str("group:%s" % stage.name).encode('utf-8')
           if stage.name not in pipeline_components.transforms:
             pipeline_components.transforms[stage.name].CopyFrom(transform)
           gbk_write = Stage(
@@ -583,7 +583,7 @@ def sink_flattens(stages):
         transform = stage.transforms[0]
         if transform.spec.urn == common_urns.primitives.FLATTEN.urn:
           # This is used later to correlate the read and writes.
-          param = str("materialize:%s" % transform.unique_name)
+          param = str("materialize:%s" % transform.unique_name).encode('utf-8')
           output_pcoll_id, = list(transform.outputs.values())
           output_coder_id = pcollections[output_pcoll_id].coder_id
           flatten_writes = []
@@ -731,7 +731,7 @@ def fuse(producer, consumer):
 
       # Now try to fuse away all pcollections.
       for pcoll, producer in producers_by_pcoll.items():
-        pcoll_as_param = str("materialize:%s" % pcoll)
+        pcoll_as_param = str("materialize:%s" % pcoll).encode('utf-8')
         write_pcoll = None
         for consumer in consumers_by_pcoll[pcoll]:
           producer = replacement(producer)
@@ -929,7 +929,7 @@ def extract_endpoints(stage):
 
     # Store the required side inputs into state.
     for (transform_id, tag), (pcoll_id, si) in data_side_input.items():
-      actual_pcoll_id = pcoll_id[len("materialize:"):]
+      actual_pcoll_id = pcoll_id[len(b"materialize:"):]
       value_coder = context.coders[safe_coders[
           pipeline_components.pcollections[actual_pcoll_id].coder_id]]
       elements_by_window = _WindowGroupingBuffer(si, value_coder)
@@ -945,14 +945,14 @@ def extract_endpoints(stage):
         controller.state_handler.blocking_append(state_key, elements_data)
 
     def get_buffer(pcoll_id):
-      if pcoll_id.startswith('materialize:'):
+      if pcoll_id.startswith(b'materialize:'):
         if pcoll_id not in pcoll_buffers:
           # Just store the data chunks for replay.
           pcoll_buffers[pcoll_id] = list()
-      elif pcoll_id.startswith('group:'):
+      elif pcoll_id.startswith(b'group:'):
         # This is a grouping write, create a grouping buffer if needed.
         if pcoll_id not in pcoll_buffers:
-          original_gbk_transform = pcoll_id.split(':', 1)[1]
+          original_gbk_transform = pcoll_id.split(b':', 1)[1]
           transform_proto = pipeline_components.transforms[
               original_gbk_transform]
           input_pcoll = only_element(list(transform_proto.inputs.values()))
diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py
index 00ce3e6429b..b0bafa55d68 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -96,7 +96,7 @@ def create_runner(runner_name):
   if '.' in runner_name:
     module, runner = runner_name.rsplit('.', 1)
     try:
-      return getattr(__import__(module, {}, {}, [runner], -1), runner)()
+      return getattr(__import__(module, {}, {}, [runner], 0), runner)()
     except ImportError:
       if runner_name in _KNOWN_DATAFLOW_RUNNERS:
         raise ImportError(
diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py
index 4b7e9cda1bf..2ec670f3499 100644
--- a/sdks/python/apache_beam/runners/worker/bundle_processor.py
+++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py
@@ -108,7 +108,7 @@ def __init__(self, operation_name, step_name, consumers, counter_factory,
     self.receivers = [
         operations.ConsumerSet(
             self.counter_factory, self.name_context.step_name, 0,
-            next(itervalues(consumers)), self.windowed_coder)]
+            next(iter(itervalues(consumers))), self.windowed_coder)]
 
   def process(self, windowed_value):
     self.output(windowed_value)
@@ -141,7 +141,7 @@ def __getitem__(self, window):
               ptransform_id=self._transform_id,
               side_input_id=self._tag,
               window=self._target_window_coder.encode(target_window),
-              key=''))
+              key=b''))
       state_handler = self._state_handler
       access_pattern = self._side_input_data.access_pattern
 
diff --git a/sdks/python/apache_beam/runners/worker/operation_specs.py b/sdks/python/apache_beam/runners/worker/operation_specs.py
index d64920f18fe..ebae476e9bc 100644
--- a/sdks/python/apache_beam/runners/worker/operation_specs.py
+++ b/sdks/python/apache_beam/runners/worker/operation_specs.py
@@ -58,7 +58,7 @@ def worker_printable_fields(workerproto):
   return ['%s=%s' % (name, value)
           # _asdict is the only way and cannot subclass this generated class
           # pylint: disable=protected-access
-          for name, value in workerproto._asdict().iteritems()
+          for name, value in workerproto._asdict().items()
           # want to output value 0 but not None nor []
           if (value or value == 0)
           and name not in
diff --git a/sdks/python/apache_beam/runners/worker/operations.py b/sdks/python/apache_beam/runners/worker/operations.py
index 0488fe928d3..dc2fe2b4c6e 100644
--- a/sdks/python/apache_beam/runners/worker/operations.py
+++ b/sdks/python/apache_beam/runners/worker/operations.py
@@ -495,13 +495,7 @@ def __init__(self, name_context, spec, counter_factory, state_sampler):
     # simpler than for the DoFn's of ParDo.
     fn, args, kwargs = pickler.loads(self.spec.combine_fn)[:3]
     self.combine_fn = curry_combine_fn(fn, args, kwargs)
-    if (getattr(fn.add_input, 'im_func', None)
-        is core.CombineFn.add_input.__func__):
-      # Old versions of the SDK have CombineFns that don't implement add_input.
-      self.combine_fn_add_input = (
-          lambda a, e: self.combine_fn.add_inputs(a, [e]))
-    else:
-      self.combine_fn_add_input = self.combine_fn.add_input
+    self.combine_fn_add_input = self.combine_fn.add_input
     # Optimization for the (known tiny accumulator, often wide keyspace)
     # combine functions.
     # TODO(b/36567833): Bound by in-memory size rather than key count.
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 7a53fbe25b0..4a89cbf8419 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -550,6 +550,7 @@ def to_runner_api(self, context, has_parts=False):
         urn=urn,
         payload=typed_param.SerializeToString()
         if isinstance(typed_param, message.Message)
+        else typed_param.encode('utf-8') if isinstance(typed_param, str)
         else typed_param)
 
   @classmethod
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index eb020c4d16a..9cdfcc62c02 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -57,7 +57,7 @@ commands =
 setenv =
   BEAM_EXPERIMENTAL_PY3=1
 modules =
-  apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability
+  apache_beam.coders,apache_beam.options,apache_beam.tools,apache_beam.utils,apache_beam.internal,apache_beam.metrics,apache_beam.portability,apache_beam.pipeline_test,apache_beam.pvalue_test
 commands =
   python --version
   pip --version


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 146101)
    Time Spent: 2h 40m  (was: 2.5h)

> Finish Python 3 porting for unpackaged files
> --------------------------------------------
>
>                 Key: BEAM-5324
>                 URL: https://issues.apache.org/jira/browse/BEAM-5324
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-py-core
>            Reporter: Robbe
>            Assignee: Robbe
>            Priority: Major
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message