beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [beam] branch master updated: [BEAM-5332] Do not attempt to evict cache after shutdown (#6342)
Date Fri, 07 Sep 2018 12:18:33 GMT
This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 992ece0  [BEAM-5332] Do not attempt to evict cache after shutdown (#6342)
992ece0 is described below

commit 992ece0a4ad444dee9faea21e4729c495dea6b42
Author: Maximilian Michels <max@posteo.de>
AuthorDate: Fri Sep 7 14:18:29 2018 +0200

    [BEAM-5332] Do not attempt to evict cache after shutdown (#6342)
---
 ...CountingFlinkExecutableStageContextFactory.java | 11 ++++---
 ...tingFlinkExecutableStageContextFactoryTest.java | 35 ++++++++++++++++++++++
 .../control/DockerJobBundleFactory.java            | 13 --------
 .../fnexecution/control/JobBundleFactoryBase.java  | 12 ++++----
 4 files changed, 46 insertions(+), 25 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
index 565e0b9..988a948 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactory.java
@@ -24,7 +24,6 @@ import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.fnexecution.control.StageBundleFactory;
@@ -42,7 +41,6 @@ public class ReferenceCountingFlinkExecutableStageContextFactory
     implements FlinkExecutableStageContext.Factory {
   private static final Logger LOG =
       LoggerFactory.getLogger(ReferenceCountingFlinkExecutableStageContextFactory.class);
-  private static final int TTL_IN_SECONDS = 30;
   private static final int MAX_RETRY = 3;
 
   private final Creator creator;
@@ -106,8 +104,9 @@ public class ReferenceCountingFlinkExecutableStageContextFactory
     WrappedContext wrapper = getCache().get(jobInfo.jobId());
     Preconditions.checkState(
         wrapper != null, "Releasing context for unknown job: " + jobInfo.jobId());
-    // Schedule task to clean the container later.
-    getExecutor().schedule(() -> release(wrapper), TTL_IN_SECONDS, TimeUnit.SECONDS);
+    // Do not release this asynchronously, as the releasing could fail due to the classloader
not being
+    // available anymore after the tasks have been removed from the execution engine.
+    release(wrapper);
   }
 
   private ConcurrentHashMap<String, WrappedContext> getCache() {
@@ -148,8 +147,8 @@ public class ReferenceCountingFlinkExecutableStageContextFactory
         if (getCache().remove(wrapper.jobInfo.jobId(), wrapper)) {
           try {
             wrapper.closeActual();
-          } catch (Exception e) {
-            LOG.error("Unable to close.", e);
+          } catch (Throwable t) {
+            LOG.error("Unable to close FlinkExecutableStageContext.", t);
           }
         }
       }
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
index cf75864..06a43c8 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/functions/ReferenceCountingFlinkExecutableStageContextFactoryTest.java
@@ -17,10 +17,16 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.google.common.base.Charsets;
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
 import org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory.Creator;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.junit.Assert;
@@ -77,4 +83,33 @@ public class ReferenceCountingFlinkExecutableStageContextFactoryTest {
     Assert.assertNotSame("We should get a new instance.", ac2B, ac4B);
     factory.release(ac4B); // 0 open jobB
   }
+
+  @Test
+  public void testCatchThrowablesAndLogThem() throws Exception {
+    PrintStream oldErr = System.err;
+    oldErr.flush();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    PrintStream newErr = new PrintStream(baos);
+    try {
+      System.setErr(newErr);
+      Creator creator = mock(Creator.class);
+      FlinkExecutableStageContext c1 = mock(FlinkExecutableStageContext.class);
+      when(creator.apply(any(JobInfo.class))).thenReturn(c1);
+      // throw an Throwable and ensure that it is caught and logged.
+      doThrow(new NoClassDefFoundError()).when(c1).close();
+      ReferenceCountingFlinkExecutableStageContextFactory factory =
+          ReferenceCountingFlinkExecutableStageContextFactory.create(creator);
+      JobInfo jobA = mock(JobInfo.class);
+      when(jobA.jobId()).thenReturn("jobA");
+      FlinkExecutableStageContext ac1A = factory.get(jobA);
+      factory.release(ac1A);
+      newErr.flush();
+      String output = new String(baos.toByteArray(), Charsets.UTF_8);
+      // Ensure that the error is logged
+      assertThat(output.contains("Unable to close FlinkExecutableStageContext"), is(true));
+    } finally {
+      newErr.flush();
+      System.setErr(oldErr);
+    }
+  }
 }
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
index fec2923..a305a59 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/DockerJobBundleFactory.java
@@ -86,19 +86,6 @@ public class DockerJobBundleFactory extends JobBundleFactoryBase {
   }
 
   @Override
-  public void close() throws Exception {
-    // Clear the cache. This closes all active environments.
-    environmentCache.invalidateAll();
-    environmentCache.cleanUp();
-
-    // Tear down common servers.
-    controlServer.close();
-    loggingServer.close();
-    retrievalServer.close();
-    provisioningServer.close();
-  }
-
-  @Override
   protected ServerFactory getServerFactory() {
     switch (getPlatform()) {
       case LINUX:
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactoryBase.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactoryBase.java
index 3c1bd6e..d171726 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactoryBase.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/JobBundleFactoryBase.java
@@ -68,13 +68,13 @@ import org.slf4j.LoggerFactory;
 public abstract class JobBundleFactoryBase implements JobBundleFactory {
   private static final Logger LOG = LoggerFactory.getLogger(JobBundleFactoryBase.class);
 
-  final IdGenerator stageIdGenerator;
-  final GrpcFnServer<FnApiControlClientPoolService> controlServer;
-  final GrpcFnServer<GrpcLoggingService> loggingServer;
-  final GrpcFnServer<ArtifactRetrievalService> retrievalServer;
-  final GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
+  private final IdGenerator stageIdGenerator;
+  private final GrpcFnServer<FnApiControlClientPoolService> controlServer;
+  private final GrpcFnServer<GrpcLoggingService> loggingServer;
+  private final GrpcFnServer<ArtifactRetrievalService> retrievalServer;
+  private final GrpcFnServer<StaticGrpcProvisionService> provisioningServer;
 
-  final LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache;
+  private final LoadingCache<Environment, WrappedSdkHarnessClient> environmentCache;
 
   JobBundleFactoryBase(JobInfo jobInfo) throws Exception {
     ServerFactory serverFactory = getServerFactory();


Mime
View raw message