kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject [4/5] kudu git commit: KUDU-1648 - [python] Expose Setting of Range Partitions
Date Mon, 07 Nov 2016 20:25:07 GMT
KUDU-1648 - [python] Expose Setting of Range Partitions

Currently, the Python client does not allow developers
to set range partitions. This patch adds that capability
and includes updates to existing tests.

Change-Id: Ib1e2c9a49196c6dd6644388d08014acd7593d4aa
Reviewed-on: http://gerrit.cloudera.org:8080/4795
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jdcryans@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0f87b044
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0f87b044
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0f87b044

Branch: refs/heads/master
Commit: 0f87b044e53a0eae062a28764154141097d3871e
Parents: 13ffec6
Author: Jordan Birdsell <jordantbirdsell@gmail.com>
Authored: Fri Oct 21 22:56:26 2016 -0400
Committer: Jean-Daniel Cryans <jdcryans@apache.org>
Committed: Mon Nov 7 20:02:29 2016 +0000

----------------------------------------------------------------------
 python/kudu/__init__.py          |   4 +-
 python/kudu/client.pyx           | 129 ++++++++++++++++++++++++++++++++--
 python/kudu/libkudu_client.pxd   |   9 +++
 python/kudu/tests/test_client.py |  26 ++++++-
 python/kudu/tests/util.py        |  17 ++++-
 5 files changed, 174 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/0f87b044/python/kudu/__init__.py
----------------------------------------------------------------------
diff --git a/python/kudu/__init__.py b/python/kudu/__init__.py
index 828c3cf..2a6623c 100644
--- a/python/kudu/__init__.py
+++ b/python/kudu/__init__.py
@@ -26,7 +26,9 @@ from kudu.client import (Client, Table, Scanner, Session,  # noqa
                          FLUSH_AUTO_SYNC,
                          FLUSH_MANUAL,
                          READ_LATEST,
-                         READ_AT_SNAPSHOT)
+                         READ_AT_SNAPSHOT,
+                         EXCLUSIVE_BOUND,
+                         INCLUSIVE_BOUND)
 
 from kudu.errors import (KuduException, KuduBadStatus, KuduNotFound,  # noqa
                          KuduNotSupported,

http://git-wip-us.apache.org/repos/asf/kudu/blob/0f87b044/python/kudu/client.pyx
----------------------------------------------------------------------
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index 501aaaa..0977b30 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -67,6 +67,33 @@ cdef dict _type_names = {
     KUDU_UNIXTIME_MICROS : "KUDU_UNIXTIME_MICROS"
 }
 
+# Range Partition Bound Type enums
+EXCLUSIVE_BOUND = PartitionType_Exclusive
+INCLUSIVE_BOUND = PartitionType_Inclusive
+
+cdef dict _partition_bound_types = {
+    'exclusive': PartitionType_Exclusive,
+    'inclusive': PartitionType_Inclusive
+}
+
+def _check_convert_range_bound_type(bound):
+    # Convert bounds types to constants and raise exception if invalid.
+    def invalid_bound_type(bound_type):
+        raise ValueError('Invalid range partition bound type: {0}'
+                         .format(bound_type))
+
+    if isinstance(bound, int):
+        if bound >= len(_partition_bound_types) \
+                or bound < 0:
+            invalid_bound_type(bound)
+        else:
+            return bound
+    else:
+        try:
+            return _partition_bound_types[bound.lower()]
+        except KeyError:
+            invalid_bound_type(bound)
+
 
 cdef class TimeDelta:
     """
@@ -304,7 +331,7 @@ cdef class Client:
         try:
             c.table_name(tobytes(table_name))
             c.schema(schema.schema)
-            self._apply_partitioning(c, partitioning)
+            self._apply_partitioning(c, partitioning, schema)
             if n_replicas:
                 c.num_replicas(n_replicas)
             s = c.Create()
@@ -312,10 +339,13 @@ cdef class Client:
         finally:
             del c
 
-    cdef _apply_partitioning(self, KuduTableCreator* c, part):
+    cdef _apply_partitioning(self, KuduTableCreator* c, part, Schema schema):
         cdef:
             vector[string] v
-            PartialRow py_row
+            PartialRow lower_bound
+            PartialRow upper_bound
+            PartialRow split_row
+
         # Apply hash partitioning.
         for col_names, num_buckets, seed in part._hash_partitions:
             v.clear()
@@ -331,6 +361,32 @@ cdef class Client:
             for n in part._range_partition_cols:
                 v.push_back(tobytes(n))
             c.set_range_partition_columns(v)
+            if part._range_partitions:
+                for partition in part._range_partitions:
+                    if not isinstance(partition[0], PartialRow):
+                        lower_bound = schema.new_row(partition[0])
+                    else:
+                        lower_bound = partition[0]
+                    lower_bound._own = 0
+                    if not isinstance(partition[1], PartialRow):
+                        upper_bound = schema.new_row(partition[1])
+                    else:
+                        upper_bound = partition[1]
+                    upper_bound._own = 0
+                    c.add_range_partition(
+                        lower_bound.row,
+                        upper_bound.row,
+                        _check_convert_range_bound_type(partition[2]),
+                        _check_convert_range_bound_type(partition[3])
+                    )
+            if part._range_partition_splits:
+                for split in part._range_partition_splits:
+                    if not isinstance(split, PartialRow):
+                        split_row = schema.new_row(split)
+                    else:
+                        split_row = split
+                    split_row._own = 0
+                    c.add_range_partition_split(split_row.row)
 
     def delete_table(self, table_name):
         """
@@ -944,6 +1000,8 @@ class Partitioning(object):
     def __init__(self):
         self._hash_partitions = []
         self._range_partition_cols = None
+        self._range_partitions = []
+        self._range_partition_splits = []
 
     def add_hash_partitions(self, column_names, num_buckets, seed=None):
         """
@@ -994,9 +1052,62 @@ class Partitioning(object):
         self._range_partition_cols = column_names
         return self
 
-    # TODO: implement split_rows.
-    # This is slightly tricky since currently the PartialRow constructor requires a
-    # Table object, which doesn't exist yet. Should we use tuples instead?
+    def add_range_partition(self, lower_bound=None,
+                                  upper_bound=None,
+                                  lower_bound_type='inclusive',
+                                  upper_bound_type='exclusive'):
+        """
+        Add a range partition to the table.
+
+        Multiple range partitions may be added, but they must not overlap.
+        All range splits specified by add_range_partition_split must fall
+        in a range partition. The lower bound must be less than or equal
+        to the upper bound.
+
+        If this method is not called, the table's range will be unbounded.
+
+        Parameters
+        ----------
+        lower_bound : PartialRow/list/tuple/dict
+        upper_bound : PartialRow/list/tuple/dict
+        lower_bound_type : {'inclusive', 'exclusive'} or constants
+          kudu.EXCLUSIVE_BOUND and kudu.INCLUSIVE_BOUND
+        upper_bound_type : {'inclusive', 'exclusive'} or constants
+          kudu.EXCLUSIVE_BOUND and kudu.INCLUSIVE_BOUND
+
+        Returns
+        -------
+        self : Partitioning
+        """
+        if self._range_partition_cols:
+            self._range_partitions.append(
+                (lower_bound, upper_bound, lower_bound_type, upper_bound_type)
+            )
+        else:
+            raise ValueError("Range Partition Columns must be set before " +
+                             "adding a range partition.")
+
+        return self
+
+    def add_range_partition_split(self, split_row):
+        """
+        Add a range partition split at the provided row.
+
+        Parameters
+        ----------
+        split_row : PartialRow/list/tuple/dict
+
+        Returns
+        -------
+        self : Partitioning
+        """
+        if self._range_partition_cols:
+            self._range_partition_splits.append(split_row)
+        else:
+            raise ValueError("Range Partition Columns must be set before " +
+                             "adding a range partition split.")
+
+        return self
 
 
 cdef class Predicate:
@@ -2244,7 +2355,11 @@ cdef class PartialRow:
         if isinstance(key, basestring):
             self.set_field(key, value)
         else:
-            self.set_loc(key, value)
+            if 0 <= key < len(self.schema):
+                self.set_loc(key, value)
+            else:
+                raise IndexError("Column index {0} is out of bounds."
+                                 .format(key))
 
     def from_record(self, record):
         """

http://git-wip-us.apache.org/repos/asf/kudu/blob/0f87b044/python/kudu/libkudu_client.pxd
----------------------------------------------------------------------
diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd
index 643e349..9f2bf09 100644
--- a/python/kudu/libkudu_client.pxd
+++ b/python/kudu/libkudu_client.pxd
@@ -464,6 +464,10 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
         ReadMode_Latest " kudu::client::KuduScanner::READ_LATEST"
         ReadMode_Snapshot " kudu::client::KuduScanner::READ_AT_SNAPSHOT"
 
+    enum RangePartitionBound" kudu::client::KuduTableCreator::RangePartitionBound":
+        PartitionType_Exclusive " kudu::client::KuduTableCreator::EXCLUSIVE_BOUND"
+        PartitionType_Inclusive " kudu::client::KuduTableCreator::INCLUSIVE_BOUND"
+
     cdef cppclass KuduClient:
 
         Status DeleteTable(const string& table_name)
@@ -518,6 +522,11 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
                                               int num_buckets,
                                               int seed)
         KuduTableCreator& set_range_partition_columns(vector[string]& columns)
+        KuduTableCreator& add_range_partition(KuduPartialRow* lower_bound,
+                                              KuduPartialRow* upper_bound,
+                                              RangePartitionBound lower_bound_type,
+                                              RangePartitionBound upper_bound_type)
+        KuduTableCreator& add_range_partition_split(KuduPartialRow* split_row)
         KuduTableCreator& split_rows(vector[const KuduPartialRow*]& split_rows)
         KuduTableCreator& num_replicas(int n_replicas)
         KuduTableCreator& wait(c_bool wait)

http://git-wip-us.apache.org/repos/asf/kudu/blob/0f87b044/python/kudu/tests/test_client.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_client.py b/python/kudu/tests/test_client.py
index 0c97ee9..10bbda4 100644
--- a/python/kudu/tests/test_client.py
+++ b/python/kudu/tests/test_client.py
@@ -132,7 +132,12 @@ class TestClient(KuduTestBase, unittest.TestCase):
 
             self.client.create_table(
                 name, self.schema,
-                partitioning=Partitioning().set_range_partition_columns([]))
+                partitioning=Partitioning()
+                    .set_range_partition_columns(['key'])
+                    .add_range_partition_split({'key': 10})
+                    .add_range_partition_split([20])
+                    .add_range_partition_split((30,))
+            )
             self.client.delete_table(name)
 
             self.client.create_table(
@@ -246,6 +251,25 @@ class TestClient(KuduTestBase, unittest.TestCase):
             assert tserver.hostname() is not None
             assert tserver.port() is not None
 
+    def test_bad_partialrow(self):
+        table = self.client.table(self.ex_table)
+        op = table.new_insert()
+        # Test bad keys or indexes
+        keys = [
+            ('not-there', KeyError),
+            (len(self.schema) + 1, IndexError),
+            (-1, IndexError)
+        ]
+
+        for key in keys:
+            with self.assertRaises(key[1]):
+                op[key[0]] = 'test'
+
+        # Test incorrectly typed data
+        with self.assertRaises(TypeError):
+            op['int_val'] = 'incorrect'
+
+
 class TestMonoDelta(unittest.TestCase):
 
     def test_empty_ctor(self):

http://git-wip-us.apache.org/repos/asf/kudu/blob/0f87b044/python/kudu/tests/util.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/util.py b/python/kudu/tests/util.py
index a6bbae6..2783111 100644
--- a/python/kudu/tests/util.py
+++ b/python/kudu/tests/util.py
@@ -65,7 +65,7 @@ class TestScanBase(KuduTestBase, unittest.TestCase):
         table_name = 'type-test'
         # Create schema, partitioning and then table
         builder = kudu.schema_builder()
-        builder.add_column('key').type(kudu.int64).nullable(False).primary_key()
+        builder.add_column('key').type(kudu.int64).nullable(False)
         builder.add_column('unixtime_micros_val', type_=kudu.unixtime_micros, nullable=False)
         builder.add_column('string_val', type_=kudu.string, compression=kudu.COMPRESSION_LZ4,
encoding='prefix')
         builder.add_column('bool_val', type_=kudu.bool)
@@ -73,13 +73,26 @@ class TestScanBase(KuduTestBase, unittest.TestCase):
         builder.add_column('int8_val', type_=kudu.int8)
         builder.add_column('binary_val', type_='binary', compression=kudu.COMPRESSION_SNAPPY,
encoding='prefix')
         builder.add_column('float_val', type_=kudu.float)
+        builder.set_primary_keys(['key', 'unixtime_micros_val'])
         schema = builder.build()
 
         self.projected_names_w_o_float = [
             col for col in schema.names if col != 'float_val'
         ]
 
-        partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3)
+        partitioning = Partitioning() \
+            .add_hash_partitions(column_names=['key'], num_buckets=3)\
+            .set_range_partition_columns(['unixtime_micros_val'])\
+            .add_range_partition(
+                upper_bound={'unixtime_micros_val': ("2016-01-01", "%Y-%m-%d")},
+                upper_bound_type=kudu.EXCLUSIVE_BOUND
+            )\
+            .add_range_partition(
+                lower_bound={'unixtime_micros_val': datetime.datetime(2016, 1, 1)},
+                lower_bound_type='INCLUSIVE',
+                upper_bound={'unixtime_micros_val': datetime.datetime(9999, 12, 31)}
+            )
+
 
         self.client.create_table(table_name, schema, partitioning)
         self.type_table = self.client.table(table_name)


Mime
View raw message