Repository: beam
Updated Branches:
refs/heads/master 65135fd7a -> 70efdd0fe
[BEAM-1715] Fix Python WordCount on Dataflow Mismatch
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fa6f5f0f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fa6f5f0f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fa6f5f0f
Branch: refs/heads/master
Commit: fa6f5f0f45a3cb8343d0a30dac8f75a8097d65d1
Parents: 65135fd
Author: Mark Liu <markliu@google.com>
Authored: Tue Mar 14 12:43:45 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Wed Mar 22 17:34:18 2017 -0700
----------------------------------------------------------------------
.../apache_beam/examples/wordcount_it_test.py | 5 +++-
.../apache_beam/tests/pipeline_verifiers.py | 31 +++++++++++++++++++-
.../tests/pipeline_verifiers_test.py | 21 +++++++++++++
sdks/python/run_postcommit.sh | 3 +-
4 files changed, 57 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/fa6f5f0f/sdks/python/apache_beam/examples/wordcount_it_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py
index 77926bb..1c700b6 100644
--- a/sdks/python/apache_beam/examples/wordcount_it_test.py
+++ b/sdks/python/apache_beam/examples/wordcount_it_test.py
@@ -43,9 +43,12 @@ class WordCountIT(unittest.TestCase):
output = '/'.join([test_pipeline.get_option('output'),
test_pipeline.get_option('job_name'),
'results'])
+ arg_sleep_secs = test_pipeline.get_option('sleep_secs')
+ sleep_secs = int(arg_sleep_secs) if arg_sleep_secs is not None else None
pipeline_verifiers = [PipelineStateMatcher(),
FileChecksumMatcher(output + '*-of-*',
- self.DEFAULT_CHECKSUM)]
+ self.DEFAULT_CHECKSUM,
+ sleep_secs)]
extra_opts = {'output': output,
'on_success_matcher': all_of(*pipeline_verifiers)}
http://git-wip-us.apache.org/repos/asf/beam/blob/fa6f5f0f/sdks/python/apache_beam/tests/pipeline_verifiers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers.py b/sdks/python/apache_beam/tests/pipeline_verifiers.py
index 0d6814e..3cac658 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers.py
@@ -23,6 +23,7 @@ of test pipeline job. Customized verifier should extend
"""
import logging
+import time
from hamcrest.core.base_matcher import BaseMatcher
@@ -79,7 +80,27 @@ class FileChecksumMatcher(BaseMatcher):
is a hash string computed from content of file(s).
"""
- def __init__(self, file_path, expected_checksum):
+ def __init__(self, file_path, expected_checksum, sleep_secs=None):
+ """Initialize a FileChecksumMatcher object
+
+ Args:
+ file_path : A string that is the full path of output file. This path
+ can contain globs.
+ expected_checksum : A hash string that is computed from expected
+ result.
+ sleep_secs : Number of seconds to wait before verification start.
+ Extra time are given to make sure output files are ready on FS.
+ """
+ if sleep_secs is not None:
+ if isinstance(sleep_secs, int):
+ self.sleep_secs = sleep_secs
+ else:
+ raise ValueError('Sleep seconds, if received, must be int. '
+ 'But received: %r, %s' % (sleep_secs,
+ type(sleep_secs)))
+ else:
+ self.sleep_secs = None
+
self.file_path = file_path
self.file_system = get_filesystem(self.file_path)
self.expected_checksum = expected_checksum
@@ -94,6 +115,9 @@ class FileChecksumMatcher(BaseMatcher):
matched_path = [f.path for f in match_result.metadata_list]
if not matched_path:
raise IOError('No such file or directory: %s' % self.file_path)
+
+ logging.info('Find %d files in %s: \n%s',
+ len(matched_path), self.file_path, '\n'.join(matched_path))
for path in matched_path:
with self.file_system.open(path, 'r') as f:
for line in f:
@@ -101,6 +125,11 @@ class FileChecksumMatcher(BaseMatcher):
return read_lines
def _matches(self, _):
+ if self.sleep_secs:
+ # Wait to have output file ready on FS
+ logging.info('Wait %d seconds...', self.sleep_secs)
+ time.sleep(self.sleep_secs)
+
# Read from given file(s) path
read_lines = self._read_with_retry()
http://git-wip-us.apache.org/repos/asf/beam/blob/fa6f5f0f/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
index af8f441..909917d 100644
--- a/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
+++ b/sdks/python/apache_beam/tests/pipeline_verifiers_test.py
@@ -121,6 +121,27 @@ class PipelineVerifiersTest(unittest.TestCase):
self.assertTrue(mock_match.called)
self.assertEqual(verifiers.MAX_RETRIES + 1, mock_match.call_count)
+ def test_file_checksum_matchcer_invalid_sleep_time(self):
+ with self.assertRaises(ValueError) as cm:
+ verifiers.FileChecksumMatcher('file_path',
+ 'expected_checksum',
+ 'invalid_sleep_time')
+ self.assertEqual(cm.exception.message,
+ 'Sleep seconds, if received, must be int. '
+ 'But received: \'invalid_sleep_time\', '
+ '<type \'str\'>')
+
+ @patch('time.sleep', return_value=None)
+ def test_file_checksum_matcher_sleep_before_verify(self, mocked_sleep):
+ temp_dir = tempfile.mkdtemp()
+ case = self.test_cases[0]
+ self.create_temp_file(case['content'], temp_dir)
+ matcher = verifiers.FileChecksumMatcher(temp_dir + '/*',
+ case['expected_checksum'],
+ 10)
+ hc_assert_that(self._mock_result, matcher)
+ self.assertTrue(mocked_sleep.called)
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
http://git-wip-us.apache.org/repos/asf/beam/blob/fa6f5f0f/sdks/python/run_postcommit.sh
----------------------------------------------------------------------
diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh
index 4d17942..50338e2 100755
--- a/sdks/python/run_postcommit.sh
+++ b/sdks/python/run_postcommit.sh
@@ -101,4 +101,5 @@ python setup.py nosetests \
--output=$GCS_LOCATION/py-wordcount-cloud/output \
--sdk_location=$SDK_LOCATION \
--job_name=$JOBNAME_E2E_WC \
- --num_workers=1"
+ --num_workers=1 \
+ --sleep_secs=20"
|