kudu-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jdcry...@apache.org
Subject [3/5] kudu git commit: [python] - Expose additional scanner methods
Date Mon, 07 Nov 2016 20:25:06 GMT
[python] - Expose additional scanner methods

This patch exposes a few remaining scanner methods from the C++
client.
	- set_cache_blocks
	- keep_alive
	- close
	- get_current_server

Additionally, this patch fixes an issue with inappropriate deallocation
of replicas (this is handled by deallocating the ScanToken). This patch
includes tests.

Change-Id: Ifa6070a96a5daca796d463ffc3ffcbe5f0a5e08a
Reviewed-on: http://gerrit.cloudera.org:8080/4888
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jdcryans@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/13ffec6e
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/13ffec6e
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/13ffec6e

Branch: refs/heads/master
Commit: 13ffec6eae6f498b7c21ccdd2fdb66f53df07afb
Parents: c25aea4
Author: Jordan Birdsell <jordantbirdsell@gmail.com>
Authored: Sun Oct 30 20:55:53 2016 -0400
Committer: Jean-Daniel Cryans <jdcryans@apache.org>
Committed: Mon Nov 7 19:59:21 2016 +0000

----------------------------------------------------------------------
 python/kudu/client.pyx              | 90 ++++++++++++++++++++++++++++++--
 python/kudu/libkudu_client.pxd      |  5 +-
 python/kudu/tests/test_scanner.py   |  7 +--
 python/kudu/tests/test_scantoken.py | 30 +++++++----
 4 files changed, 113 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/13ffec6e/python/kudu/client.pyx
----------------------------------------------------------------------
diff --git a/python/kudu/client.pyx b/python/kudu/client.pyx
index 3cfd92a..501aaaa 100644
--- a/python/kudu/client.pyx
+++ b/python/kudu/client.pyx
@@ -447,6 +447,7 @@ cdef class Client:
         result = []
         for i in range(tservers.size()):
             ts = TabletServer()
+            ts._own = 1
             result.append(ts._init(tservers[i]))
         return result
 
@@ -592,15 +593,27 @@ cdef class TabletServer:
 
     cdef:
         const KuduTabletServer* _tserver
+        public bint _own
 
     cdef _init(self, const KuduTabletServer* tserver):
         self._tserver = tserver
+        self._own = 0
         return self
 
     def __dealloc__(self):
-        if self._tserver != NULL:
+        if self._tserver != NULL and self._own:
             del self._tserver
 
+    def __richcmp__(TabletServer self, TabletServer other, int op):
+        if op == 2: # ==
+            return ((self.uuid(), self.hostname(), self.port()) ==
+                    (other.uuid(), other.hostname(), other.port()))
+        elif op == 3: # !=
+            return ((self.uuid(), self.hostname(), self.port()) !=
+                    (other.uuid(), other.hostname(), other.port()))
+        else:
+            raise NotImplementedError
+
     def uuid(self):
         return frombytes(self._tserver.uuid())
 
@@ -1649,6 +1662,77 @@ cdef class Scanner:
         check_status(self.scanner.NextBatch(&batch.batch))
         return batch
 
+    def set_cache_blocks(self, cache_blocks):
+        """
+        Sets the block caching policy.
+        Returns a reference to itself to facilitate chaining.
+
+        Parameters
+        ----------
+        cache_blocks : bool
+
+        Returns
+        -------
+        self : Scanner
+        """
+        check_status(self.scanner.SetCacheBlocks(cache_blocks))
+        return self
+
+    def keep_alive(self):
+        """
+        Keep the current remote scanner alive.
+
+        Keep the current remote scanner alive on the Tablet server for an
+        additional time-to-live (set by a configuration flag on the tablet
+        server). This is useful if the interval in between NextBatch() calls is
+        big enough that the remote scanner might be garbage collected (default
+        ttl is set to 60 secs.). This does not invalidate any previously
+        fetched results.
+
+        Returns
+        -------
+        self : Scanner
+        """
+        check_status(self.scanner.KeepAlive())
+        return self
+
+    def get_current_server(self):
+        """
+        Get the TabletServer that is currently handling the scan.
+
+        More concretely, this is the server that handled the most recent open()
+        or next_batch() RPC made by the server.
+
+        Returns
+        -------
+        tserver : TabletServer
+        """
+        cdef:
+            TabletServer tserver = TabletServer()
+            KuduTabletServer* tserver_p = NULL
+
+        check_status(self.scanner.GetCurrentServer(&tserver_p))
+        tserver._own = 1
+        tserver._init(tserver_p)
+        return tserver
+
+    def close(self):
+        """
+        Close the scanner.
+
+        Closing the scanner releases resources on the server. This call does
+        not block, and will not ever fail, even if the server cannot be
+        contacted.
+
+        Note: The scanner is reset to its initial state by this function.
+        You'll have to re-add any projection, predicates, etc if you want to
+        reuse this object.
+        Note: When the Scanner object is garbage collected, this method is run.
+        This method call is only needed if you want to explicitly release the
+        resources on the server.
+        """
+        self.scanner.Close()
+
 
 cdef class ScanToken:
     """
@@ -2113,10 +2197,6 @@ cdef class Replica:
         self._replica = replica
         return self
 
-    def __dealloc__(self):
-        if self._replica != NULL:
-            del self._replica
-
     def is_leader(self):
         return self._replica.is_leader()
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/13ffec6e/python/kudu/libkudu_client.pxd
----------------------------------------------------------------------
diff --git a/python/kudu/libkudu_client.pxd b/python/kudu/libkudu_client.pxd
index b56cc11..643e349 100644
--- a/python/kudu/libkudu_client.pxd
+++ b/python/kudu/libkudu_client.pxd
@@ -627,9 +627,8 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
         c_bool HasMoreRows()
         Status NextBatch(KuduScanBatch* batch)
         Status SetBatchSizeBytes(uint32_t batch_size)
-
         Status SetSelection(ReplicaSelection selection)
-
+        Status SetCacheBlocks(c_bool cache_blocks)
         Status SetReadMode(ReadMode read_mode)
         Status SetSnapshotMicros(uint64_t snapshot_timestamp_micros)
         Status SetTimeoutMillis(int millis)
@@ -638,6 +637,8 @@ cdef extern from "kudu/client/client.h" namespace "kudu::client" nogil:
         Status SetFaultTolerant()
         Status AddLowerBound(const KuduPartialRow& key)
         Status AddExclusiveUpperBound(const KuduPartialRow& key)
+        Status KeepAlive()
+        Status GetCurrentServer(KuduTabletServer** server)
 
         KuduSchema GetProjectionSchema()
         const ResourceMetrics& GetResourceMetrics()

http://git-wip-us.apache.org/repos/asf/kudu/blob/13ffec6e/python/kudu/tests/test_scanner.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scanner.py b/python/kudu/tests/test_scanner.py
index 0b9aeb4..b483b38 100644
--- a/python/kudu/tests/test_scanner.py
+++ b/python/kudu/tests/test_scanner.py
@@ -211,14 +211,15 @@ class TestScanner(TestScanBase):
             # Avoid tight looping
             time.sleep(0.05)
 
-    def test_resource_metrics(self):
+    def test_resource_metrics_and_cache_blocks(self):
         """
-        Test getting the resource metrics after scanning.
+        Test getting the resource metrics after scanning and
+        setting the scanner to not cache blocks.
         """
 
         # Build scanner and read through all batches and retrieve metrics.
         scanner = self.table.scanner()
-        scanner.set_fault_tolerant().open()
+        scanner.set_fault_tolerant().set_cache_blocks(False).open()
         scanner.read_all_tuples()
         metrics = scanner.get_resource_metrics()
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/13ffec6e/python/kudu/tests/test_scantoken.py
----------------------------------------------------------------------
diff --git a/python/kudu/tests/test_scantoken.py b/python/kudu/tests/test_scantoken.py
index 115ac30..392e2a9 100644
--- a/python/kudu/tests/test_scantoken.py
+++ b/python/kudu/tests/test_scantoken.py
@@ -22,12 +22,16 @@ from kudu.tests.common import KuduTestBase
 import kudu
 from multiprocessing import Pool
 import datetime
+import time
 
 def _get_scan_token_results(input):
     client = kudu.connect(input[1], input[2])
     scanner = client.deserialize_token_into_scanner(input[0])
     scanner.open()
-    return scanner.read_all_tuples()
+    tuples = scanner.read_all_tuples()
+    # Test explicit closing of scanner
+    scanner.close()
+    return tuples
 
 class TestScanToken(TestScanBase):
 
@@ -115,6 +119,16 @@ class TestScanToken(TestScanBase):
         with self.assertRaises(TypeError):
             builder.add_predicates([sv >= 1])
 
+    def _subtest_open_and_confirm_leader_tserver(self, token):
+        for replica in token.tablet().replicas():
+            if replica.is_leader():
+                leader_tserver = replica.ts()
+
+        scanner = token.into_kudu_scanner()
+        scanner.open()
+        self.assertEqual(scanner.get_current_server(), leader_tserver)
+        return scanner
+
     def test_scan_token_batch_by_batch_with_local_scanner(self):
         builder = self.table.scan_token_builder()
         lower_bound = builder.new_bound()
@@ -128,8 +142,7 @@ class TestScanToken(TestScanBase):
 
         tuples = []
         for token in tokens:
-            scanner = token.into_kudu_scanner()
-            scanner.open()
+            scanner = self._subtest_open_and_confirm_leader_tserver(token)
 
             while scanner.has_more_rows():
                 batch = scanner.next_batch()
@@ -150,10 +163,10 @@ class TestScanToken(TestScanBase):
 
         tuples = []
         for token in tokens:
-            scanner = token.into_kudu_scanner()
-            scanner.open()
+            scanner = self._subtest_open_and_confirm_leader_tserver(token)
 
             while scanner.has_more_rows():
+                scanner.keep_alive()
                 batch = scanner.next_batch()
                 tuples.extend(batch.as_tuples())
 
@@ -192,7 +205,7 @@ class TestScanToken(TestScanBase):
 
         tuples = []
         for token in tokens:
-            scanner = token.into_kudu_scanner().open()
+            scanner = self._subtest_open_and_confirm_leader_tserver(token)
             tuples.extend(scanner.read_all_tuples())
 
         self.assertEqual(sorted(self.tuples[1:]), sorted(tuples))
@@ -204,7 +217,7 @@ class TestScanToken(TestScanBase):
 
         tuples = []
         for token in tokens:
-            scanner = token.into_kudu_scanner().open()
+            scanner = self._subtest_open_and_confirm_leader_tserver(token)
             tuples.extend(scanner.read_all_tuples())
 
         self.assertEqual(sorted(self.tuples), sorted(tuples))
@@ -260,8 +273,7 @@ class TestScanToken(TestScanBase):
 
             tuples = []
             for token in tokens:
-                scanner = token.into_kudu_scanner()
-                scanner.open()
+                scanner = self._subtest_open_and_confirm_leader_tserver(token)
                 tuples.extend(scanner.read_all_tuples())
 
             self.assertEqual(sorted(tuples),


Mime
View raw message