beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [beam] branch master updated: BEAM-7018: Added regex transform on Python SDK.
Date Tue, 06 Aug 2019 15:57:15 GMT
This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 40936ba  BEAM-7018: Added regex transform on Python SDK.
     new 0a2ddc0  Merge pull request #8859 from mszb/BEAM-7018
40936ba is described below

commit 40936ba49df80e9518090dbf92346d11e7f89b9e
Author: Shoaib <shoaib.zafaf@venturedive.com>
AuthorDate: Mon Aug 5 20:09:47 2019 +0500

    BEAM-7018: Added regex transform on Python SDK.
---
 sdks/python/apache_beam/transforms/util.py      | 222 +++++++++++++++++++
 sdks/python/apache_beam/transforms/util_test.py | 280 ++++++++++++++++++++++++
 2 files changed, 502 insertions(+)

diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index edd2e72..c2866d6 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -24,9 +24,11 @@ from __future__ import division
 import collections
 import contextlib
 import random
+import re
 import time
 import typing
 import warnings
+from builtins import filter
 from builtins import object
 from builtins import range
 from builtins import zip
@@ -69,6 +71,7 @@ __all__ = [
     'Distinct',
     'Keys',
     'KvSwap',
+    'Regex',
     'Reify',
     'RemoveDuplicates',
     'Reshuffle',
@@ -865,3 +868,222 @@ class Reify(object):
 
     def expand(self, pcoll):
       return pcoll | ParDo(self.add_window_info)
+
+
+class Regex(object):
+  """
+  PTransform  to use Regular Expression to process the elements in a
+  PCollection.
+  """
+
+  ALL = "__regex_all_groups"
+
+  @staticmethod
+  def _regex_compile(regex):
+    """Return re.compile if the regex has a string value"""
+    if isinstance(regex, str):
+      regex = re.compile(regex)
+    return regex
+
+  @staticmethod
+  @typehints.with_input_types(str)
+  @typehints.with_output_types(str)
+  @ptransform_fn
+  def matches(pcoll, regex, group=0):
+    """
+    Returns the matches (group 0 by default) if zero or more characters at the
+    beginning of string match the regular expression. To match the entire
+    string, add "$" sign at the end of regex expression.
+
+    Group can be integer value or a string value.
+
+    Args:
+      regex: the regular expression string or (re.compile) pattern.
+      group: (optional) name/number of the group, it can be integer or a string
+        value. Defaults to 0, meaning the entire matched string will be
+        returned.
+    """
+    regex = Regex._regex_compile(regex)
+
+    def _process(element):
+      m = regex.match(element)
+      if m:
+        yield m.group(group)
+    return pcoll | FlatMap(_process)
+
+  @staticmethod
+  @typehints.with_input_types(str)
+  @typehints.with_output_types(typing.List[str])
+  @ptransform_fn
+  def all_matches(pcoll, regex):
+    """
+    Returns all matches (groups) if zero or more characters at the beginning
+    of string match the regular expression.
+
+    Args:
+      regex: the regular expression string or (re.compile) pattern.
+    """
+    regex = Regex._regex_compile(regex)
+
+    def _process(element):
+      m = regex.match(element)
+      if m:
+        yield [m.group(ix) for ix in range(m.lastindex + 1)]
+
+    return pcoll | FlatMap(_process)
+
+  @staticmethod
+  @typehints.with_input_types(str)
+  @typehints.with_output_types(typing.Tuple[str, str])
+  @ptransform_fn
+  def matches_kv(pcoll, regex, keyGroup, valueGroup=0):
+    """
+    Returns the KV pairs if the string matches the regular expression, deriving
+    the key & value from the specified group of the regular expression.
+
+    Args:
+      regex: the regular expression string or (re.compile) pattern.
+      keyGroup: The Regex group to use as the key. Can be int or str.
+      valueGroup: (optional) Regex group to use the value. Can be int or str.
+        The default value "0" returns entire matched string.
+    """
+    regex = Regex._regex_compile(regex)
+
+    def _process(element):
+      match = regex.match(element)
+      if match:
+        yield (match.group(keyGroup), match.group(valueGroup))
+    return pcoll | FlatMap(_process)
+
+  @staticmethod
+  @typehints.with_input_types(str)
+  @typehints.with_output_types(str)
+  @ptransform_fn
+  def find(pcoll, regex, group=0):
+    """
+    Returns the matches if a portion of the line matches the Regex. Returns
+    the entire group (group 0 by default). Group can be integer value or a
+    string value.
+
+    Args:
+      regex: the regular expression string or (re.compile) pattern.
+      group: (optional) name of the group, it can be integer or a string value.
+    """
+    regex = Regex._regex_compile(regex)
+
+    def _process(element):
+      r = regex.search(element)
+      if r:
+        yield r.group(group)
+    return pcoll | FlatMap(_process)
+
+  @staticmethod
+  @typehints.with_input_types(str)
+  @typehints.with_output_types(typing.Union[typing.List[str],
+                                            typing.Tuple[str, str]])
+  @ptransform_fn
+  def find_all(pcoll, regex, group=0, outputEmpty=True):
+    """
+    Returns the matches if a portion of the line matches the Regex. By default,
+    list of group 0 will return with empty items. To get all groups, pass the
+    `Regex.ALL` flag in the `group` parameter which returns all the groups in
+    the tuple format.
+
+    Args:
+      regex: the regular expression string or (re.compile) pattern.
+      group: (optional) name of the group, it can be integer or a string value.
+      outputEmpty: (optional) Should empty be output. True to output empties
+        and false if not.
+    """
+    regex = Regex._regex_compile(regex)
+
+    def _process(element):
+      matches = regex.finditer(element)
+      if group == Regex.ALL:
+        yield [(m.group(), m.groups()[0]) for m in matches if outputEmpty
+               or m.groups()[0]]
+      else:
+        yield [m.group(group) for m in matches if outputEmpty or m.group(group)]
+    return pcoll | FlatMap(_process)
+
+  @staticmethod
+  @typehints.with_input_types(str)
+  @typehints.with_output_types(typing.Tuple[str, str])
+  @ptransform_fn
+  def find_kv(pcoll, regex, keyGroup, valueGroup=0):
+    """
+    Returns the matches if a portion of the line matches the Regex. Returns the
+    specified groups as the key and value pair.
+
+    Args:
+      regex: the regular expression string or (re.compile) pattern.
+      keyGroup: The Regex group to use as the key. Can be int or str.
+      valueGroup: (optional) Regex group to use the value. Can be int or str.
+        The default value "0" returns entire matched string.
+    """
+    regex = Regex._regex_compile(regex)
+
+    def _process(element):
+      matches = regex.finditer(element)
+      if matches:
+        for match in matches:
+          yield (match.group(keyGroup), match.group(valueGroup))
+
+    return pcoll | FlatMap(_process)
+
+  @staticmethod
+  @typehints.with_input_types(str)
+  @typehints.with_output_types(str)
+  @ptransform_fn
+  def replace_all(pcoll, regex, replacement):
+    """
+    Returns the matches if a portion of the line  matches the regex and
+    replaces all matches with the replacement string.
+
+    Args:
+      regex: the regular expression string or (re.compile) pattern.
+      replacement: the string to be substituted for each match.
+    """
+    regex = Regex._regex_compile(regex)
+    return pcoll | Map(lambda elem: regex.sub(replacement, elem))
+
+  @staticmethod
+  @typehints.with_input_types(str)
+  @typehints.with_output_types(str)
+  @ptransform_fn
+  def replace_first(pcoll, regex, replacement):
+    """
+    Returns the matches if a portion of the line matches the regex and replaces
+    the first match with the replacement string.
+
+    Args:
+      regex: the regular expression string or (re.compile) pattern.
+      replacement: the string to be substituted for each match.
+    """
+    regex = Regex._regex_compile(regex)
+    return pcoll | Map(lambda elem: regex.sub(replacement, elem, 1))
+
+  @staticmethod
+  @typehints.with_input_types(str)
+  @typehints.with_output_types(typing.List[str])
+  @ptransform_fn
+  def split(pcoll, regex, outputEmpty=False):
+    """
+    Returns the list string which was splitted on the basis of regular
+    expression. It will not output empty items (by defaults).
+
+    Args:
+      regex: the regular expression string or (re.compile) pattern.
+      outputEmpty: (optional) Should empty be output. True to output empties
+          and false if not.
+    """
+    regex = Regex._regex_compile(regex)
+    outputEmpty = bool(outputEmpty)
+
+    def _process(element):
+      r = regex.split(element)
+      if r and not outputEmpty:
+        r = list(filter(None, r))
+      yield r
+
+    return pcoll | FlatMap(_process)
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
index cf620f4..af2fc8c 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -24,6 +24,7 @@ import itertools
 import logging
 import math
 import random
+import re
 import time
 import unittest
 from builtins import object
@@ -618,6 +619,285 @@ class ReifyTest(unittest.TestCase):
       assert_that(reified_pc, equal_to(expected), reify_windows=True)
 
 
+class RegexTest(unittest.TestCase):
+
+  def test_find(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["aj", "xj", "yj", "zj"])
+                | util.Regex.find("[xyz]"))
+      assert_that(result, equal_to(["x", "y", "z"]))
+
+  def test_find_pattern(self):
+    with TestPipeline() as p:
+      rc = re.compile("[xyz]")
+      result = (p | beam.Create(["aj", "xj", "yj", "zj"]) | util.Regex.find(rc))
+      assert_that(result, equal_to(["x", "y", "z"]))
+
+  def test_find_group(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["aj", "xj", "yj", "zj"])
+                | util.Regex.find("([xyz])j", group=1))
+      assert_that(result, equal_to(["x", "y", "z"]))
+
+  def test_find_empty(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["a", "b", "c", "d"])
+                | util.Regex.find("[xyz]"))
+      assert_that(result, equal_to([]))
+
+  def test_find_group_name(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["aj", "xj", "yj", "zj"])
+                | util.Regex.find("(?P<namedgroup>[xyz])j", group="namedgroup"))
+      assert_that(result, equal_to(["x", "y", "z"]))
+
+  def test_find_group_name_pattern(self):
+    with TestPipeline() as p:
+      rc = re.compile("(?P<namedgroup>[xyz])j")
+      result = (p | beam.Create(["aj", "xj", "yj", "zj"]) | util.Regex.find(
+          rc, group="namedgroup"))
+      assert_that(result, equal_to(["x", "y", "z"]))
+
+  def test_find_all_groups(self):
+    data = ["abb ax abbb", "abc qwerty abcabcd xyz"]
+    with TestPipeline() as p:
+      pcol = (p | beam.Create(data))
+
+      assert_that(pcol | 'with default values' >> util.Regex.find_all('a(b*)'),
+                  equal_to([['abb', 'a', 'abbb'], ['ab', 'ab', 'ab']]),
+                  label='CheckWithDefaultValues')
+
+      assert_that(pcol | 'group 1' >> util.Regex.find_all('a(b*)', 1),
+                  equal_to([['b', 'b', 'b'], ['bb', '', 'bbb']]),
+                  label='CheckWithGroup1')
+
+      assert_that(pcol | 'group 1 non empty' >> util.Regex.find_all(
+          'a(b*)', 1, outputEmpty=False),
+                  equal_to([['b', 'b', 'b'], ['bb', 'bbb']]),
+                  label='CheckGroup1NonEmpty')
+
+      assert_that(pcol | 'named group' >> util.Regex.find_all(
+          'a(?P<namedgroup>b*)', 'namedgroup'),
+                  equal_to([['b', 'b', 'b'], ['bb', '', 'bbb']]),
+                  label='CheckNamedGroup')
+
+      assert_that(pcol | 'all groups' >> util.Regex.find_all(
+          'a(?P<namedgroup>b*)', util.Regex.ALL),
+                  equal_to([[('ab', 'b'), ('ab', 'b'), ('ab', 'b')],
+                            [('abb', 'bb'), ('a', ''), ('abbb', 'bbb')]]),
+                  label='CheckAllGroups')
+
+      assert_that(pcol | 'all non empty groups' >> util.Regex.find_all(
+          'a(b*)', util.Regex.ALL, outputEmpty=False),
+                  equal_to([[('ab', 'b'), ('ab', 'b'), ('ab', 'b')],
+                            [('abb', 'bb'), ('abbb', 'bbb')]]),
+                  label='CheckAllNonEmptyGroups')
+
+  def test_find_kv(self):
+    with TestPipeline() as p:
+      pcol = (p | beam.Create(['a b c d']))
+      assert_that(pcol | 'key 1' >> util.Regex.find_kv(
+          'a (b) (c)', 1,), equal_to([('b', 'a b c')]), label='CheckKey1')
+
+      assert_that(pcol | 'key 1 group 1' >> util.Regex.find_kv(
+          'a (b) (c)', 1, 2), equal_to([('b', 'c')]), label='CheckKey1Group1')
+
+  def test_find_kv_pattern(self):
+    with TestPipeline() as p:
+      rc = re.compile("a (b) (c)")
+      result = (p | beam.Create(["a b c"]) | util.Regex.find_kv(rc, 1, 2))
+      assert_that(result, equal_to([("b", "c")]))
+
+  def test_find_kv_none(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["x y z"])
+                | util.Regex.find_kv("a (b) (c)", 1, 2))
+      assert_that(result, equal_to([]))
+
+  def test_match(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["a", "x", "y", "z"])
+                | util.Regex.matches("[xyz]"))
+      assert_that(result, equal_to(["x", "y", "z"]))
+
+    with TestPipeline() as p:
+      result = (p | beam.Create(["a", "ax", "yby", "zzc"])
+                | util.Regex.matches("[xyz]"))
+      assert_that(result, equal_to(["y", "z"]))
+
+  def test_match_entire_line(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["a", "x", "y", "ay", "zz"])
+                | util.Regex.matches("[xyz]$"))
+      assert_that(result, equal_to(["x", "y"]))
+
+  def test_match_pattern(self):
+    with TestPipeline() as p:
+      rc = re.compile("[xyz]")
+      result = (p | beam.Create(["a", "x", "y", "z"]) | util.Regex.matches(rc))
+      assert_that(result, equal_to(["x", "y", "z"]))
+
+  def test_match_none(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["a", "b", "c", "d"])
+                | util.Regex.matches("[xyz]"))
+      assert_that(result, equal_to([]))
+
+  def test_match_group(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["a", "x xxx", "x yyy", "x zzz"])
+                | util.Regex.matches("x ([xyz]*)", 1))
+      assert_that(result, equal_to(("xxx", "yyy", "zzz")))
+
+  def test_match_group_name(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["a", "x xxx", "x yyy", "x zzz"])
+                | util.Regex.matches("x (?P<namedgroup>[xyz]*)", 'namedgroup'))
+      assert_that(result, equal_to(("xxx", "yyy", "zzz")))
+
+  def test_match_group_name_pattern(self):
+    with TestPipeline() as p:
+      rc = re.compile("x (?P<namedgroup>[xyz]*)")
+      result = (p | beam.Create(["a", "x xxx", "x yyy", "x zzz"])
+                | util.Regex.matches(rc, 'namedgroup'))
+      assert_that(result, equal_to(("xxx", "yyy", "zzz")))
+
+  def test_match_group_empty(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["a", "b", "c", "d"])
+                | util.Regex.matches("x (?P<namedgroup>[xyz]*)", 'namedgroup'))
+      assert_that(result, equal_to([]))
+
+  def test_all_matched(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["a x", "x x", "y y", "z z"])
+                | util.Regex.all_matches("([xyz]) ([xyz])"))
+      expected_result = [["x x", "x", "x"], ["y y", "y", "y"],
+                         ["z z", "z", "z"]]
+      assert_that(result, equal_to(expected_result))
+
+  def test_all_matched_pattern(self):
+    with TestPipeline() as p:
+      rc = re.compile("([xyz]) ([xyz])")
+      result = (p | beam.Create(["a x", "x x", "y y", "z z"])
+                | util.Regex.all_matches(rc))
+      expected_result = [["x x", "x", "x"], ["y y", "y", "y"],
+                         ["z z", "z", "z"]]
+      assert_that(result, equal_to(expected_result))
+
+  def test_match_group_kv(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["a b c"])
+                | util.Regex.matches_kv("a (b) (c)", 1, 2))
+      assert_that(result, equal_to([("b", "c")]))
+
+  def test_match_group_kv_pattern(self):
+    with TestPipeline() as p:
+      rc = re.compile("a (b) (c)")
+      pcol = (p | beam.Create(["a b c"]))
+      assert_that(pcol | 'key 1' >> util.Regex.matches_kv(
+          rc, 1), equal_to([("b", "a b c")]), label="CheckKey1")
+
+      assert_that(pcol | 'key 1 group 2' >> util.Regex.matches_kv(
+          rc, 1, 2), equal_to([("b", "c")]), label="CheckKey1Group2")
+
+  def test_match_group_kv_none(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["x y z"])
+                | util.Regex.matches_kv("a (b) (c)", 1, 2))
+      assert_that(result, equal_to([]))
+
+  def test_match_kv_group_names(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["a b c"]) | util.Regex.matches_kv(
+          "a (?P<keyname>b) (?P<valuename>c)", 'keyname', 'valuename'))
+      assert_that(result, equal_to([("b", "c")]))
+
+  def test_match_kv_group_names_pattern(self):
+    with TestPipeline() as p:
+      rc = re.compile("a (?P<keyname>b) (?P<valuename>c)")
+      result = (p | beam.Create(["a b c"]) | util.Regex.matches_kv(
+          rc, 'keyname', 'valuename'))
+      assert_that(result, equal_to([("b", "c")]))
+
+  def test_match_kv_group_name_none(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["x y z"]) | util.Regex.matches_kv(
+          "a (?P<keyname>b) (?P<valuename>c)", 'keyname', 'valuename'))
+      assert_that(result, equal_to([]))
+
+  def test_replace_all(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["xj", "yj", "zj"]) | util.Regex.replace_all(
+          "[xyz]", "new"))
+      assert_that(result, equal_to(["newj", "newj", "newj"]))
+
+  def test_replace_all_mixed(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["abc", "xj", "yj", "zj", "def"])
+                | util.Regex.replace_all("[xyz]", 'new'))
+      assert_that(result, equal_to(["abc", "newj", "newj", "newj", "def"]))
+
+  def test_replace_all_mixed_pattern(self):
+    with TestPipeline() as p:
+      rc = re.compile("[xyz]")
+      result = (p | beam.Create(
+          ["abc", "xj", "yj", "zj", "def"]) | util.Regex.replace_all(rc, 'new'))
+      assert_that(result, equal_to(["abc", "newj", "newj", "newj", "def"]))
+
+  def test_replace_first(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["xjx", "yjy", "zjz"])
+                | util.Regex.replace_first("[xyz]", 'new'))
+      assert_that(result, equal_to(["newjx", "newjy", "newjz"]))
+
+  def test_replace_first_mixed(self):
+    with TestPipeline() as p:
+      result = (p | beam.Create(["abc", "xjx", "yjy", "zjz", "def"])
+                | util.Regex.replace_first("[xyz]", 'new'))
+      assert_that(result, equal_to(["abc", "newjx", "newjy", "newjz", "def"]))
+
+  def test_replace_first_mixed_pattern(self):
+    with TestPipeline() as p:
+      rc = re.compile("[xyz]")
+      result = (p | beam.Create(["abc", "xjx", "yjy", "zjz", "def"])
+                | util.Regex.replace_first(rc, 'new'))
+      assert_that(result, equal_to(["abc", "newjx", "newjy", "newjz", "def"]))
+
+  def test_split(self):
+    with TestPipeline() as p:
+      data = ["The  quick   brown fox jumps over    the lazy dog"]
+      result = (p | beam.Create(data) | util.Regex.split("\\W+"))
+      expected_result = [["The", "quick", "brown", "fox", "jumps", "over",
+                          "the", "lazy", "dog"]]
+      assert_that(result, equal_to(expected_result))
+
+  def test_split_pattern(self):
+    with TestPipeline() as p:
+      data = ["The  quick   brown fox jumps over    the lazy dog"]
+      rc = re.compile("\\W+")
+      result = (p | beam.Create(data) | util.Regex.split(rc))
+      expected_result = [["The", "quick", "brown", "fox", "jumps", "over",
+                          "the", "lazy", "dog"]]
+      assert_that(result, equal_to(expected_result))
+
+  def test_split_with_empty(self):
+    with TestPipeline() as p:
+      data = ["The  quick   brown fox jumps over    the lazy dog"]
+      result = (p | beam.Create(data) | util.Regex.split("\\s", True))
+      expected_result = [['The', '', 'quick', '', '', 'brown', 'fox', 'jumps',
+                          'over', '', '', '', 'the', 'lazy', 'dog']]
+      assert_that(result, equal_to(expected_result))
+
+  def test_split_without_empty(self):
+    with TestPipeline() as p:
+      data = ["The  quick   brown fox jumps over    the lazy dog"]
+      result = (p | beam.Create(data) | util.Regex.split("\\s", False))
+      expected_result = [["The", "quick", "brown", "fox", "jumps", "over",
+                          "the", "lazy", "dog"]]
+      assert_that(result, equal_to(expected_result))
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()


Mime
View raw message