beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: Retry on correct error codes for datastoreio
Date Wed, 03 May 2017 21:33:11 GMT
Repository: beam
Updated Branches:
  refs/heads/master f55d00253 -> 9b6b9060b


Retry on correct error codes for datastoreio


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d8e2522e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d8e2522e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d8e2522e

Branch: refs/heads/master
Commit: d8e2522eb04a2a0b5cb28415e55d467d8905d841
Parents: f55d002
Author: Vikas Kedigehalli <vikasrk@google.com>
Authored: Wed May 3 13:14:20 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Wed May 3 14:32:59 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/io/gcp/datastore/v1/helper.py   | 16 ++++++++-----
 .../io/gcp/datastore/v1/helper_test.py          | 24 +++++++++++++++-----
 2 files changed, 28 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d8e2522e/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
index d544226..a61884f 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper.py
@@ -24,13 +24,13 @@ try:
   from google.cloud.proto.datastore.v1 import datastore_pb2
   from google.cloud.proto.datastore.v1 import entity_pb2
   from google.cloud.proto.datastore.v1 import query_pb2
+  from google.rpc import code_pb2
   from googledatastore import PropertyFilter, CompositeFilter
   from googledatastore import helper as datastore_helper
   from googledatastore.connection import Datastore
   from googledatastore.connection import RPCError
-  QUERY_NOT_FINISHED = query_pb2.QueryResultBatch.NOT_FINISHED
 except ImportError:
-  QUERY_NOT_FINISHED = None
+  pass
 # pylint: enable=wrong-import-order, wrong-import-position
 
 from apache_beam.internal.gcp import auth
@@ -129,8 +129,12 @@ def make_partition(project, namespace):
 def retry_on_rpc_error(exception):
   """A retry filter for Cloud Datastore RPCErrors."""
   if isinstance(exception, RPCError):
-    return exception.code >= 500
-  # TODO(vikasrk): Figure out what other errors should be retried.
+    err_code = exception.code
+    # TODO(BEAM-2156): put these codes in a global list and use that instead.
+    return (err_code == code_pb2.DEADLINE_EXCEEDED or
+            err_code == code_pb2.UNAVAILABLE or
+            err_code == code_pb2.UNKNOWN or
+            err_code == code_pb2.INTERNAL)
   return False
 
 
@@ -221,7 +225,6 @@ class QueryIterator(object):
 
   Entities are read in batches. Retries on failures.
   """
-  _NOT_FINISHED = QUERY_NOT_FINISHED
   # Maximum number of results to request per query.
   _BATCH_SIZE = 500
 
@@ -265,4 +268,5 @@ class QueryIterator(object):
       # read).
       more_results = ((self._limit > 0) and
                       ((num_results == self._BATCH_SIZE) or
-                       (resp.batch.more_results == self._NOT_FINISHED)))
+                       (resp.batch.more_results ==
+                        query_pb2.QueryResultBatch.NOT_FINISHED)))

http://git-wip-us.apache.org/repos/asf/beam/blob/d8e2522e/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
index 582a5b3..5d4bb6f 100644
--- a/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
+++ b/sdks/python/apache_beam/io/gcp/datastore/v1/helper_test.py
@@ -33,6 +33,7 @@ try:
   from google.cloud.proto.datastore.v1 import entity_pb2
   from google.cloud.proto.datastore.v1 import query_pb2
   from google.cloud.proto.datastore.v1.entity_pb2 import Key
+  from google.rpc import code_pb2
   from googledatastore.connection import RPCError
   from googledatastore import helper as datastore_helper
 except ImportError:
@@ -49,19 +50,22 @@ class HelperTest(unittest.TestCase):
     self._query.kind.add().name = 'dummy_kind'
     patch_retry(self, helper)
 
-  def permanent_datastore_failure(self, req):
-    raise RPCError("dummy", 500, "failed")
+  def permanent_retriable_datastore_failure(self, req):
+    raise RPCError("dummy", code_pb2.UNAVAILABLE, "failed")
 
-  def transient_datastore_failure(self, req):
+  def transient_retriable_datastore_failure(self, req):
     if self._transient_fail_count:
       self._transient_fail_count -= 1
-      raise RPCError("dummy", 500, "failed")
+      raise RPCError("dummy", code_pb2.INTERNAL, "failed")
     else:
       return datastore_pb2.RunQueryResponse()
 
+  def non_retriable_datastore_failure(self, req):
+    raise RPCError("dummy", code_pb2.UNAUTHENTICATED, "failed")
+
   def test_query_iterator(self):
     self._mock_datastore.run_query.side_effect = (
-        self.permanent_datastore_failure)
+        self.permanent_retriable_datastore_failure)
     query_iterator = helper.QueryIterator("project", None, self._query,
                                           self._mock_datastore)
     self.assertRaises(RPCError, iter(query_iterator).next)
@@ -69,7 +73,7 @@ class HelperTest(unittest.TestCase):
 
   def test_query_iterator_with_transient_failures(self):
     self._mock_datastore.run_query.side_effect = (
-        self.transient_datastore_failure)
+        self.transient_retriable_datastore_failure)
     query_iterator = helper.QueryIterator("project", None, self._query,
                                           self._mock_datastore)
     fail_count = 2
@@ -80,6 +84,14 @@ class HelperTest(unittest.TestCase):
     self.assertEqual(fail_count + 1,
                      len(self._mock_datastore.run_query.call_args_list))
 
+  def test_query_iterator_with_non_retriable_failures(self):
+    self._mock_datastore.run_query.side_effect = (
+        self.non_retriable_datastore_failure)
+    query_iterator = helper.QueryIterator("project", None, self._query,
+                                          self._mock_datastore)
+    self.assertRaises(RPCError, iter(query_iterator).next)
+    self.assertEqual(1, len(self._mock_datastore.run_query.call_args_list))
+
   def test_query_iterator_with_single_batch(self):
     num_entities = 100
     batch_size = 500


Mime
View raw message