beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-4176) Java: Portable batch runner passes all ValidatesRunner tests that non-portable runner passes
Date Thu, 02 Aug 2018 01:43:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-4176?focusedWorklogId=130053&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-130053
]

ASF GitHub Bot logged work on BEAM-4176:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Aug/18 01:42
            Start Date: 02/Aug/18 01:42
    Worklog Time Spent: 10m 
      Work Description: tweise closed pull request #6110: [BEAM-4176] Tests for running Python
on Flink.
URL: https://github.com/apache/beam/pull/6110
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
new file mode 100644
index 00000000000..51f01c9a66d
--- /dev/null
+++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+from __future__ import print_function
+
+import logging
+import shutil
+import sys
+import tempfile
+import unittest
+
+import apache_beam as beam
+from apache_beam.options.pipeline_options import DebugOptions
+from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
+from apache_beam.runners.portability import portable_runner
+from apache_beam.runners.portability import portable_runner_test
+from apache_beam.testing.util import assert_that
+
+if __name__ == '__main__':
+  # Run as
+  #
+  # python -m apache_beam.runners.portability.flink_runner_test \
+  #     /path/to/job_server.jar \
+  #     [FlinkRunnerTest.test_method, ...]
+  flinkJobServerJar = sys.argv.pop(1)
+  streaming = sys.argv.pop(1).lower() == 'streaming'
+
+  # This is defined here to only be run when we invoke this file explicitly.
+  class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
+    _use_grpc = True
+    _use_subprocesses = True
+
+    @classmethod
+    def _subprocess_command(cls, port):
+      tmp_dir = tempfile.mkdtemp(prefix='flinktest')
+      try:
+        return [
+            'java',
+            '-jar', flinkJobServerJar,
+            '--artifacts-dir', tmp_dir,
+            '--job-host', 'localhost:%s' % port,
+        ]
+      finally:
+        shutil.rmtree(tmp_dir)
+
+    @classmethod
+    def get_runner(cls):
+      return portable_runner.PortableRunner()
+
+    def create_options(self):
+      options = super(FlinkRunnerTest, self).create_options()
+      options.view_as(DebugOptions).experiments = ['beam_fn_api']
+      options.view_as(SetupOptions).sdk_location = 'container'
+      if streaming:
+        options.view_as(StandardOptions).streaming = True
+      return options
+
+    # Can't read host files from within docker, read a "local" file there.
+    def test_read(self):
+      with self.create_pipeline() as p:
+        lines = p | beam.io.ReadFromText('/etc/profile')
+        assert_that(lines, lambda lines: len(lines) > 0)
+
+    def test_no_subtransform_composite(self):
+      raise unittest.SkipTest("BEAM-4781")
+
+    # Inherits all other tests.
+
+  # Run the tests.
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 6544c9d477a..277a817dd2a 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -199,7 +199,9 @@ def test_gbk_side_input(self):
   def test_multimap_side_input(self):
     with self.create_pipeline() as p:
       main = p | 'main' >> beam.Create(['a', 'b'])
-      side = p | 'side' >> beam.Create([('a', 1), ('b', 2), ('a', 3)])
+      side = (p | 'side' >> beam.Create([('a', 1), ('b', 2), ('a', 3)])
+              # TODO(BEAM-4782): Obviate the need for this map.
+              | beam.Map(lambda kv: (kv[0], kv[1])))
       assert_that(
           main | beam.Map(lambda k, d: (k, sorted(d[k])),
                           beam.pvalue.AsMultiMap(side)),
diff --git a/sdks/python/apache_beam/runners/portability/portable_runner_test.py b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
index 119e3ffaece..edd981c9d98 100644
--- a/sdks/python/apache_beam/runners/portability/portable_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/portable_runner_test.py
@@ -82,19 +82,12 @@ def _pick_unused_port():
 
   @classmethod
   def _start_local_runner_subprocess_job_service(cls):
-    if cls._subprocess:
-      # Kill the old one if it exists.
-      cls._subprocess.kill()
+    cls._maybe_kill_subprocess()
     # TODO(robertwb): Consider letting the subprocess pick one and
     # communicate it back...
     port = cls._pick_unused_port()
     logging.info('Starting server on port %d.', port)
-    cls._subprocess = subprocess.Popen([
-        sys.executable, '-m',
-        'apache_beam.runners.portability.local_job_service_main', '-p',
-        str(port), '--worker_command_line',
-        '%s -m apache_beam.runners.worker.sdk_worker_main' % sys.executable
-    ])
+    cls._subprocess = subprocess.Popen(cls._subprocess_command(port))
     address = 'localhost:%d' % port
     job_service = beam_job_api_pb2_grpc.JobServiceStub(
         grpc.insecure_channel(address))
@@ -116,7 +109,7 @@ def _start_local_runner_subprocess_job_service(cls):
               beam_job_api_pb2.GetJobStateRequest(job_id='[fake]'))
           break
         except grpc.RpcError as exn:
-          if exn.code != grpc.StatusCode.UNAVAILABLE:
+          if exn.code() != grpc.StatusCode.UNAVAILABLE:
             # We were able to contact the service for our fake state request.
             break
     logging.info('Server ready.')
@@ -147,14 +140,21 @@ def get_runner(cls):
 
   @classmethod
   def tearDownClass(cls):
-    if hasattr(cls, '_subprocess'):
+    cls._maybe_kill_subprocess()
+
+  @classmethod
+  def _maybe_kill_subprocess(cls):
+    if hasattr(cls, '_subprocess') and cls._subprocess.poll() is None:
       cls._subprocess.kill()
       time.sleep(0.1)
 
-  def create_pipeline(self):
+  def create_options(self):
     options = PipelineOptions()
     options.view_as(PortableOptions).job_endpoint = self._get_job_endpoint()
-    return beam.Pipeline(self.get_runner(), options)
+    return options
+
+  def create_pipeline(self):
+    return beam.Pipeline(self.get_runner(), self.create_options())
 
   def test_assert_that(self):
     # TODO: figure out a way for runner to parse and raise the
@@ -195,6 +195,16 @@ class PortableRunnerTestWithSubprocesses(PortableRunnerTest):
   _use_grpc = True
   _use_subprocesses = True
 
+  @classmethod
+  def _subprocess_command(cls, port):
+    return [
+        sys.executable,
+        '-m', 'apache_beam.runners.portability.local_job_service_main',
+        '-p', str(port),
+        '--worker_command_line',
+        '%s -m apache_beam.runners.worker.sdk_worker_main' % sys.executable,
+    ]
+
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle
index 0cbd61df83d..240ed3ccec7 100644
--- a/sdks/python/build.gradle
+++ b/sdks/python/build.gradle
@@ -260,6 +260,25 @@ task hdfsIntegrationTest(dependsOn: 'installGcpTest') {
   }
 }
 
+def flinkCompatibilityMatrix = {
+  def type = it
+  def name = 'flinkCompatibilityMatrix' + type
+  tasks.create(name: name) {
+    dependsOn 'setupVirtualenv'
+    dependsOn ':beam-sdks-python-container:docker'
+    dependsOn ':beam-runners-flink_2.11-job-server:jar'
+    doLast {
+      exec {
+        executable 'sh'
+        args '-c', ". ${envdir}/bin/activate && pip install -e . && python
-m apache_beam.runners.portability.flink_runner_test ${project(":beam-runners-flink_2.11-job-server:").shadowJar.archivePath}
${type}"
+      }
+    }
+  }
+}
+
+flinkCompatibilityMatrix('Batch')
+flinkCompatibilityMatrix('Streaming')
+
 task postCommit() {
   dependsOn "preCommit"
   dependsOn "localWordCount"


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 130053)
    Time Spent: 9.5h  (was: 9h 20m)

> Java: Portable batch runner passes all ValidatesRunner tests that non-portable runner
passes
> --------------------------------------------------------------------------------------------
>
>                 Key: BEAM-4176
>                 URL: https://issues.apache.org/jira/browse/BEAM-4176
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>            Reporter: Ben Sidhom
>            Priority: Major
>          Time Spent: 9.5h
>  Remaining Estimate: 0h
>
> We need this as a sanity check that runner execution is correct.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message