beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From boyu...@apache.org
Subject [beam] branch master updated: Make SDK worker resilient to bad logging services. (#9214)
Date Thu, 01 Aug 2019 17:01:10 GMT
This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ea4547  Make SDK worker resilient to bad logging services. (#9214)
7ea4547 is described below

commit 7ea4547a6f2530dffc06cb4dab3cdde58c2fd0dc
Author: Robert Bradshaw <robertwb@google.com>
AuthorDate: Thu Aug 1 19:00:54 2019 +0200

    Make SDK worker resilient to bad logging services. (#9214)
---
 .../apache_beam/runners/worker/log_handler.py      | 26 +++++++++++++---------
 .../apache_beam/runners/worker/sdk_worker_main.py  | 25 ++++++++++++---------
 2 files changed, 31 insertions(+), 20 deletions(-)

diff --git a/sdks/python/apache_beam/runners/worker/log_handler.py b/sdks/python/apache_beam/runners/worker/log_handler.py
index 4eac50c..b38aaed 100644
--- a/sdks/python/apache_beam/runners/worker/log_handler.py
+++ b/sdks/python/apache_beam/runners/worker/log_handler.py
@@ -99,16 +99,22 @@ class FnApiLogRecordHandler(logging.Handler):
 
   def close(self):
     """Flush out all existing log entries and unregister this handler."""
-    self._alive = False
-    # Acquiring the handler lock ensures ``emit`` is not run until the lock is
-    # released.
-    self.acquire()
-    self._log_entry_queue.put(self._FINISHED, timeout=5)
-    # wait on server to close.
-    self._reader.join()
-    self.release()
-    # Unregister this handler.
-    super(FnApiLogRecordHandler, self).close()
+    try:
+      self._alive = False
+      # Acquiring the handler lock ensures ``emit`` is not run until the lock is
+      # released.
+      self.acquire()
+      self._log_entry_queue.put(self._FINISHED, timeout=5)
+      # wait on server to close.
+      self._reader.join()
+      self.release()
+      # Unregister this handler.
+      super(FnApiLogRecordHandler, self).close()
+    except Exception:
+      # Log rather than raising exceptions, to avoid clobbering
+      # underlying errors that may have caused this to close
+      # prematurely.
+      logging.error("Error closing the logging channel.", exc_info=True)
 
   def _write_log_entries(self):
     done = False
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 4c7bee6..81bc1b5 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -91,16 +91,21 @@ class StatusServer(object):
 def main(unused_argv):
   """Main entry point for SDK Fn Harness."""
   if 'LOGGING_API_SERVICE_DESCRIPTOR' in os.environ:
-    logging_service_descriptor = endpoints_pb2.ApiServiceDescriptor()
-    text_format.Merge(os.environ['LOGGING_API_SERVICE_DESCRIPTOR'],
-                      logging_service_descriptor)
-
-    # Send all logs to the runner.
-    fn_log_handler = FnApiLogRecordHandler(logging_service_descriptor)
-    # TODO(BEAM-5468): This should be picked up from pipeline options.
-    logging.getLogger().setLevel(logging.INFO)
-    logging.getLogger().addHandler(fn_log_handler)
-    logging.info('Logging handler created.')
+    try:
+      logging_service_descriptor = endpoints_pb2.ApiServiceDescriptor()
+      text_format.Merge(os.environ['LOGGING_API_SERVICE_DESCRIPTOR'],
+                        logging_service_descriptor)
+
+      # Send all logs to the runner.
+      fn_log_handler = FnApiLogRecordHandler(logging_service_descriptor)
+      # TODO(BEAM-5468): This should be picked up from pipeline options.
+      logging.getLogger().setLevel(logging.INFO)
+      logging.getLogger().addHandler(fn_log_handler)
+      logging.info('Logging handler created.')
+    except Exception:
+      logging.error("Failed to set up logging handler, continuing without.",
+                    exc_info=True)
+      fn_log_handler = None
   else:
     fn_log_handler = None
 


Mime
View raw message