beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] beam git commit: [BEAM-1347] Remove the usage of a thread local on a potentially hot path
Date Tue, 30 May 2017 21:15:41 GMT
Repository: beam
Updated Branches:
  refs/heads/master 2d3e9fe75 -> 49067b164


[BEAM-1347] Remove the usage of a thread local on a potentially hot path


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/60779e2e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/60779e2e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/60779e2e

Branch: refs/heads/master
Commit: 60779e2ecd76f1cb4766050e4560765c1bc3c19b
Parents: 2d3e9fe
Author: Luke Cwik <lcwik@google.com>
Authored: Tue May 30 13:15:31 2017 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Tue May 30 14:15:23 2017 -0700

----------------------------------------------------------------------
 .../fn/harness/logging/BeamFnLoggingClient.java | 36 +++++++++++---------
 1 file changed, 19 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/60779e2e/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
----------------------------------------------------------------------
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
index c8d11ed..d56ee6d 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/logging/BeamFnLoggingClient.java
@@ -38,7 +38,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.function.BiFunction;
-import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.logging.Formatter;
 import java.util.logging.Handler;
@@ -179,11 +178,14 @@ public class BeamFnLoggingClient implements AutoCloseable {
     private final BlockingDeque<BeamFnApi.LogEntry> bufferedLogEntries =
         new LinkedBlockingDeque<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
     private final Future<?> bufferedLogWriter;
-    private final ThreadLocal<Consumer<BeamFnApi.LogEntry>> logEntryHandler;
+    /**
+     * Safe object publishing is not required since we only care if the thread that set
+     * this field is equal to the thread also attempting to add a log entry.
+     */
+    private Thread logEntryHandlerThread;
 
     private LogRecordHandler(ExecutorService executorService) {
       bufferedLogWriter = executorService.submit(this);
-      logEntryHandler = new ThreadLocal<>();
     }
 
     @Override
@@ -204,19 +206,18 @@ public class BeamFnLoggingClient implements AutoCloseable {
         builder.setTrace(getStackTraceAsString(record.getThrown()));
       }
       // The thread that sends log records should never perform a blocking publish and
-      // only insert log records best effort. We detect which thread is logging
-      // by using the thread local, defaulting to the blocking publish.
-      MoreObjects.firstNonNull(
-          logEntryHandler.get(), this::blockingPublish).accept(builder.build());
-    }
-
-    /** Blocks caller till enough space exists to publish this log entry. */
-    private void blockingPublish(BeamFnApi.LogEntry logEntry) {
-      try {
-        bufferedLogEntries.put(logEntry);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new RuntimeException(e);
+      // only insert log records best effort.
+      if (Thread.currentThread() != logEntryHandlerThread) {
+        // Blocks caller till enough space exists to publish this log entry.
+        try {
+          bufferedLogEntries.put(builder.build());
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
+        }
+      } else {
+        // Never blocks caller, will drop log message if buffer is full.
+        bufferedLogEntries.offer(builder.build());
       }
     }
 
@@ -225,7 +226,8 @@ public class BeamFnLoggingClient implements AutoCloseable {
       // Logging which occurs in this thread will attempt to publish log entries into the
       // above handler which should never block if the queue is full otherwise
       // this thread will get stuck.
-      logEntryHandler.set(bufferedLogEntries::offer);
+      logEntryHandlerThread = Thread.currentThread();
+
       List<BeamFnApi.LogEntry> additionalLogEntries =
           new ArrayList<>(MAX_BUFFERED_LOG_ENTRY_COUNT);
       try {


Mime
View raw message