beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-4040) Direct runner does not wait for failed tasks to teardown
Date Mon, 03 Sep 2018 18:14:01 GMT

     [ https://issues.apache.org/jira/browse/BEAM-4040?focusedWorklogId=140689&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-140689
]

ASF GitHub Bot logged work on BEAM-4040:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 03/Sep/18 18:13
            Start Date: 03/Sep/18 18:13
    Worklog Time Spent: 10m 
      Work Description: stale[bot] closed pull request #5096: [BEAM-4040][BEAM-4047] ensure
direct runner wait teardown calls even on exception
URL: https://github.com/apache/beam/pull/5096
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
index 0e30e5c07bb..501dac66daf 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DoFnLifecycleManager.java
@@ -32,6 +32,8 @@
 import org.apache.beam.sdk.transforms.DoFn.Teardown;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.util.SerializableUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Manages {@link DoFn} setup, teardown, and serialization.
@@ -42,6 +44,8 @@
  * clearing all cached {@link DoFn DoFns}.
  */
 class DoFnLifecycleManager {
+  private static final Logger LOG = LoggerFactory.getLogger(TransformEvaluatorRegistry.class);
+
   public static DoFnLifecycleManager of(DoFn<?, ?> original) {
     return new DoFnLifecycleManager(original);
   }
@@ -99,6 +103,7 @@ public DeserializingCacheLoader(DoFn<?, ?> original) {
     public DoFn<?, ?> load(Thread key) throws Exception {
       DoFn<?, ?> fn = (DoFn<?, ?>) SerializableUtils.deserializeFromByteArray(original,
           "DoFn Copy in thread " + key.getName());
+      LOG.info("Loaded {}", fn);
       DoFnInvokers.invokerFor(fn).invokeSetup();
       return fn;
     }
@@ -107,9 +112,13 @@ public DeserializingCacheLoader(DoFn<?, ?> original) {
   private class TeardownRemovedFnListener implements RemovalListener<Thread, DoFn<?,
?>> {
     @Override
     public void onRemoval(RemovalNotification<Thread, DoFn<?, ?>> notification)
{
+      final DoFn<?, ?> value = notification.getValue();
+      LOG.info("Removing {}", value);
       try {
-        DoFnInvokers.invokerFor(notification.getValue()).invokeTeardown();
+        DoFnInvokers.invokerFor(value).invokeTeardown();
+        LOG.info("Removed {}", value);
       } catch (Exception e) {
+        LOG.info("Failed to remove {}", value, e);
         thrownOnTeardown.put(notification.getKey(), e);
       }
     }
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index ab759287637..8c3bd07688b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -28,8 +28,11 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -58,6 +61,8 @@
     implements PipelineExecutor, BundleProcessor<CommittedBundle<?>, AppliedPTransform<?,
?, ?>> {
   private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class);
 
+  private final String id = UUID.randomUUID().toString();
+
   private final int targetParallelism;
   private final ExecutorService executorService;
 
@@ -71,7 +76,9 @@
 
   private final QueueMessageReceiver visibleUpdates;
 
-  private AtomicReference<State> pipelineState = new AtomicReference<>(State.RUNNING);
+  private final AtomicReference<State> pipelineState = new AtomicReference<>(State.RUNNING);
+  private final AtomicReference<Throwable> resultException = new AtomicReference<>();
+  private volatile CompletableFuture<Boolean> endingTask;
 
   public static ExecutorServiceParallelExecutor create(
       int targetParallelism,
@@ -91,11 +98,13 @@ private ExecutorServiceParallelExecutor(
     // Don't use Daemon threads for workers. The Pipeline should continue to execute even
if there
     // are no other active threads (for example, because waitUntilFinish was not called)
     this.executorService =
+        // TODO: think how to make that configurable through options, threadfactory, poolfactory?
         Executors.newFixedThreadPool(
             targetParallelism,
             new ThreadFactoryBuilder()
                 .setThreadFactory(MoreExecutors.platformThreadFactory())
-                .setNameFormat("direct-runner-worker")
+                .setNameFormat("direct-runner-worker-" + id)
+                .setUncaughtExceptionHandler(onThreadException())
                 .build());
     this.registry = registry;
     this.evaluationContext = context;
@@ -154,19 +163,18 @@ public void start(DirectGraph graph, RootProviderRegistry rootProviderRegistry)
     final ExecutionDriver executionDriver =
         QuiescenceDriver.create(
             evaluationContext, graph, this, visibleUpdates, pendingRootBundles.build());
-    executorService.submit(
+    executorService.execute(
         new Runnable() {
           @Override
           public void run() {
             DriverState drive = executionDriver.drive();
             if (drive.isTermainal()) {
-              State newPipelineState = State.UNKNOWN;
               switch (drive) {
                 case FAILED:
-                  newPipelineState = State.FAILED;
+                  executeEndTask(() -> shutdown(State.FAILED));
                   break;
                 case SHUTDOWN:
-                  newPipelineState = State.DONE;
+                  executeEndTask(() -> shutdown(State.DONE));
                   break;
                 case CONTINUE:
                   throw new IllegalStateException(
@@ -175,14 +183,34 @@ public void run() {
                   throw new IllegalArgumentException(
                       String.format("Unknown %s %s", DriverState.class.getSimpleName(), drive));
               }
-              shutdownIfNecessary(newPipelineState);
             } else {
-              executorService.submit(this);
+              executorService.execute(this);
             }
           }
+
+          // use another thread to ensure we can stop current one in shutdown(state)
+          private void executeEndTask(final Runnable task) {
+            endingTask = new CompletableFuture<>();
+            final Thread thread = new Thread(() -> {
+              try {
+                task.run();
+                endingTask.complete(true);
+              } catch (final Throwable th) {
+                endingTask.completeExceptionally(th);
+              }
+            });
+            thread.setUncaughtExceptionHandler(onThreadException());
+            thread.setName("shutting-down-direct-runner-instance-" + id);
+            thread.start();
+          }
         });
   }
 
+  // when these threads hit an exception it is already processed so just log it
+  private Thread.UncaughtExceptionHandler onThreadException() {
+    return (t, e) -> LOG.debug(e.getMessage(), e);
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public void process(
@@ -224,7 +252,8 @@ private boolean isKeyed(PValue pvalue) {
   @Override
   public State waitUntilFinish(Duration duration) throws Exception {
     Instant completionTime;
-    if (duration.equals(Duration.ZERO)) {
+    final boolean infinite = duration.equals(Duration.ZERO);
+    if (infinite) {
       completionTime = new Instant(Long.MAX_VALUE);
     } else {
       completionTime = Instant.now().plus(duration);
@@ -239,40 +268,79 @@ public State waitUntilFinish(Duration duration) throws Exception {
       if (update == null && pipelineState.get().isTerminal()) {
         // there are no updates to process and no updates will ever be published because
the
         // executor is shutdown
-        return pipelineState.get();
-      } else if (update != null && update.thrown.isPresent()) {
-        Throwable thrown = update.thrown.get();
-        if (thrown instanceof Exception) {
-          throw (Exception) thrown;
-        } else if (thrown instanceof Error) {
-          throw (Error) thrown;
-        } else {
-          throw new Exception("Unknown Type of Throwable", thrown);
+        Throwable throwable = resultException.get();
+        if (throwable != null) {
+          if (infinite) {
+            waitEnd();
+          }
+          rethrow(throwable);
         }
+        break;
+      } else {
+        processUpdate(update);
+        // don't directly exit even on error,
+        // wait next iteration to let a chance to cleanup resources
       }
     }
+    if (infinite) {
+      waitEnd();
+    }
     return pipelineState.get();
   }
 
+  private void waitEnd() { // note that the best would be to return the completionstage to
the user
+    if (endingTask == null) {
+      return;
+    }
+    try {
+      endingTask.get();
+    } catch (final InterruptedException e) {
+      Thread.currentThread().interrupt();
+    } catch (final ExecutionException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  private void processUpdate(final VisibleExecutorUpdate update) {
+    if (update != null && update.thrown.isPresent()) {
+      final Throwable thrown = update.thrown.get();
+      final Throwable previous = resultException.get();
+      if (previous != null) {
+        previous.addSuppressed(thrown);
+      } else {
+        if (!resultException.compareAndSet(null, thrown)) {
+          resultException.get().addSuppressed(thrown);
+        }
+      }
+    }
+  }
+
+  private void rethrow(final Throwable thrown) throws Exception {
+    if (thrown instanceof Exception) {
+      throw (Exception) thrown;
+    } else if (thrown instanceof Error) {
+      throw (Error) thrown;
+    }
+    throw new Exception("Unknown Type of Throwable", thrown);
+  }
+
   @Override
   public State getPipelineState() {
     return pipelineState.get();
   }
 
-  private boolean isTerminalStateUpdate(VisibleExecutorUpdate update) {
-    return !(update.getNewState() == null && update.getNewState().isTerminal());
+  private boolean isTerminalStateUpdate(final VisibleExecutorUpdate update) {
+    final State state = update.getNewState();
+    return state == null || state.isTerminal();
   }
 
   @Override
   public void stop() {
-    shutdownIfNecessary(State.CANCELLED);
+    shutdown(State.CANCELLED);
     visibleUpdates.cancelled();
   }
 
-  private void shutdownIfNecessary(State newState) {
-    if (!newState.isTerminal()) {
-      return;
-    }
+  private void shutdown(final State newState) {
     LOG.debug("Pipeline has terminated. Shutting down.");
 
     final Collection<Exception> errors = new ArrayList<>();
@@ -280,10 +348,6 @@ private void shutdownIfNecessary(State newState) {
     // to add work to the shutdown executor.
     try {
       serialExecutorServices.invalidateAll();
-    } catch (final RuntimeException re) {
-      errors.add(re);
-    }
-    try {
       serialExecutorServices.cleanUp();
     } catch (final RuntimeException re) {
       errors.add(re);
@@ -294,16 +358,23 @@ private void shutdownIfNecessary(State newState) {
       errors.add(re);
     }
     try {
-      executorService.shutdown();
+      executorService.shutdown(); // don't exec tasks, they can be infinite
     } catch (final RuntimeException re) {
       errors.add(re);
     }
+    while (!executorService.isTerminated()) {
+      try {
+          Thread.sleep(50L);
+      } catch (final InterruptedException e) {
+          Thread.currentThread().interrupt();
+      }
+    }
     try {
       registry.cleanup();
     } catch (final Exception e) {
       errors.add(e);
     }
-    pipelineState.compareAndSet(State.RUNNING, newState); // ensure we hit a terminal node
+    pipelineState.compareAndSet(State.RUNNING, newState);
     if (!errors.isEmpty()) {
       final IllegalStateException exception = new IllegalStateException(
         "Error" + (errors.size() == 1 ? "" : "s") + " during executor shutdown:\n"
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
index 4b44c8ebff8..bf38a8791a6 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java
@@ -153,7 +153,7 @@ private TransformEvaluatorRegistry(
   }
 
   @Override
-  public <InputT> TransformEvaluator<InputT> forApplication(
+  public synchronized <InputT> TransformEvaluator<InputT> forApplication(
       AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle)
       throws Exception {
     checkState(
@@ -168,7 +168,7 @@ private TransformEvaluatorRegistry(
   }
 
   @Override
-  public void cleanup() throws Exception {
+  public synchronized void cleanup() throws Exception {
     Collection<Exception> thrownInCleanup = new ArrayList<>();
     for (TransformEvaluatorFactory factory : factories.values()) {
       try {
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
index 9aa71f742d8..3f535568624 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutorServices.java
@@ -20,7 +20,7 @@
 import com.google.common.base.MoreObjects;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -28,7 +28,7 @@
 import org.slf4j.LoggerFactory;
 
 /**
- * Static factory methods for constructing instances of {@link TransformExecutorService}.
+ * Static factory methods for constructing instances of {@link TransformExecutor}.
  */
 final class TransformExecutorServices {
   private TransformExecutorServices() {
@@ -39,7 +39,7 @@ private TransformExecutorServices() {
    * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors}
in
    * parallel.
    */
-  public static TransformExecutorService parallel(ExecutorService executor) {
+  public static TransformExecutorService parallel(Executor executor) {
     return new ParallelTransformExecutor(executor);
   }
 
@@ -47,13 +47,13 @@ public static TransformExecutorService parallel(ExecutorService executor)
{
    * Returns an EvaluationState that evaluates {@link TransformExecutor TransformExecutors}
in
    * serial.
    */
-  public static TransformExecutorService serial(ExecutorService executor) {
+  public static TransformExecutorService serial(Executor executor) {
     return new SerialTransformExecutor(executor);
   }
 
   /**
    * A {@link TransformExecutorService} with unlimited parallelism. Any {@link TransformExecutor}
-   * scheduled will be immediately submitted to the {@link ExecutorService}.
+   * scheduled will be immediately submitted to the {@link Executor}.
    *
    * <p>A principal use of this is for the evaluation of an unkeyed Step. Unkeyed computations
are
    * processed in parallel.
@@ -61,10 +61,10 @@ public static TransformExecutorService serial(ExecutorService executor)
{
   private static class ParallelTransformExecutor implements TransformExecutorService {
     private static final Logger LOG = LoggerFactory.getLogger(ParallelTransformExecutor.class);
 
-    private final ExecutorService executor;
+    private final Executor executor;
     private final AtomicBoolean active = new AtomicBoolean(true);
 
-    private ParallelTransformExecutor(ExecutorService executor) {
+    private ParallelTransformExecutor(Executor executor) {
       this.executor = executor;
     }
 
@@ -72,7 +72,7 @@ private ParallelTransformExecutor(ExecutorService executor) {
     public void schedule(TransformExecutor work) {
       if (active.get()) {
         try {
-          executor.submit(work);
+          executor.execute(work);
         } catch (RejectedExecutionException rejected) {
           boolean stillActive = active.get();
           if (stillActive) {
@@ -104,19 +104,19 @@ public void shutdown() {
   /**
    * A {@link TransformExecutorService} with a single work queue. Any {@link TransformExecutor}
    * scheduled will be placed on the work queue. Only one item of work will be submitted
to the
-   * {@link ExecutorService} at any time.
+   * {@link Executor} at any time.
    *
    * <p>A principal use of this is for the serial evaluation of a (Step, Key) pair.
    * Keyed computations are processed serially per step.
    */
   private static class SerialTransformExecutor implements TransformExecutorService {
-    private final ExecutorService executor;
+    private final Executor executor;
 
     private AtomicReference<TransformExecutor> currentlyEvaluating;
     private final Queue<TransformExecutor> workQueue;
     private boolean active = true;
 
-    private SerialTransformExecutor(ExecutorService executor) {
+    private SerialTransformExecutor(Executor executor) {
       this.executor = executor;
       this.currentlyEvaluating = new AtomicReference<>();
       this.workQueue = new ConcurrentLinkedQueue<>();
@@ -159,7 +159,7 @@ private void updateCurrentlyEvaluating() {
           TransformExecutor newWork = workQueue.poll();
           if (active && newWork != null) {
             if (currentlyEvaluating.compareAndSet(null, newWork)) {
-              executor.submit(newWork);
+              executor.execute(newWork);
             } else {
               workQueue.offer(newWork);
             }
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 3f2a4772a97..6ad146b8a9b 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -20,9 +20,12 @@
 import static com.google.common.base.Preconditions.checkState;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isA;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import com.google.common.collect.ImmutableMap;
@@ -39,6 +42,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
@@ -334,6 +338,55 @@ public void teardown() {
     assertThat(doneTs, greaterThan(tearDownTs));
   }
 
+  private static final AtomicLong TEARDOWN_CALL_ON_EXCEPTION_COUNT = new AtomicLong(-1);
+  private static final AtomicBoolean EXCEPTION_THROWN = new AtomicBoolean(false);
+
+  @Test
+  public void tearsDownFnsBeforeFinishingOnException() {
+    TEARDOWN_CALL_ON_EXCEPTION_COUNT.set(0);
+    final Pipeline pipeline = getPipeline();
+    pipeline.apply(GenerateSequence.from(0).to(100000))
+      .apply(ParDo.of(new DoFn<Long, String>() {
+        @Setup
+        public void setup() {
+          TEARDOWN_CALL_ON_EXCEPTION_COUNT.incrementAndGet();
+        }
+
+        @ProcessElement
+        public void onElement(final ProcessContext ctx) {
+          // no-op
+        }
+
+        @FinishBundle
+        public void onBundle(final FinishBundleContext ctx) {
+          EXCEPTION_THROWN.set(true);
+          final IllegalStateException exception = new IllegalStateException(
+            "expected exception for test");
+          exception.setStackTrace(new StackTraceElement[0]);
+          throw exception;
+        }
+
+        @Teardown
+        public void teardown() {
+          // just to not have a fast execution hiding an issue until we have a shutdown callback
+          try {
+            Thread.sleep(1000);
+          } catch (final InterruptedException e) {
+            fail();
+          }
+          TEARDOWN_CALL_ON_EXCEPTION_COUNT.decrementAndGet();
+        }
+      }));
+    try {
+      pipeline.run();
+      fail("should have failed");
+    } catch (final Pipeline.PipelineExecutionException ise) {
+      assertTrue(EXCEPTION_THROWN.get());
+      assertThat(ise.getCause(), instanceOf(IllegalStateException.class));
+      assertEquals(0, TEARDOWN_CALL_ON_EXCEPTION_COUNT.get(), 0);
+    }
+  }
+
   @Test
   public void transformDisplayDataExceptionShouldFail() {
     DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
diff --git a/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java
index 311509abe12..ed5c91cbc21 100644
--- a/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java
+++ b/sdks/java/io/solr/src/test/java/org/apache/beam/sdk/io/solr/SolrIOTest.java
@@ -18,18 +18,15 @@
 package org.apache.beam.sdk.io.solr;
 
 import static org.apache.beam.sdk.io.solr.SolrIO.RetryConfiguration.DEFAULT_RETRY_PREDICATE;
-import static org.apache.beam.sdk.io.solr.SolrIOTestUtils.namedThreadIsAlive;
 import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.hamcrest.Matchers.lessThan;
 
-import com.carrotsearch.ant.tasks.junit4.dependencies.com.google.common.collect.ImmutableSet;
 import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope;
 import com.google.common.io.BaseEncoding;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.List;
-import java.util.Set;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.solr.SolrIOTestUtils.LenientRetryPredicate;
@@ -53,9 +50,7 @@
 import org.apache.solr.common.SolrDocument;
 import org.apache.solr.common.SolrException;
 import org.apache.solr.common.SolrInputDocument;
-import org.apache.solr.common.cloud.SolrZkClient;
 import org.apache.solr.common.cloud.ZkStateReader;
-import org.apache.solr.common.util.ObjectReleaseTracker;
 import org.apache.solr.security.Sha256AuthenticationProvider;
 import org.joda.time.Duration;
 import org.junit.AfterClass;
@@ -290,9 +285,6 @@ public void testWriteRetry() throws Throwable {
     thrown.expect(IOException.class);
     thrown.expectMessage("Error writing to Solr");
 
-    // entry state of the release tracker to ensure we only unregister newly created objects
-    Set<Object> entryState = ImmutableSet.copyOf(ObjectReleaseTracker.OBJECTS.keySet());
-
     SolrIO.Write write =
         SolrIO.write()
             .withConnectionConfiguration(connectionConfiguration)
@@ -307,21 +299,6 @@ public void testWriteRetry() throws Throwable {
     try {
       pipeline.run();
     } catch (final Pipeline.PipelineExecutionException e) {
-      // Hack: await all worker threads completing (BEAM-4040)
-      int waitAttempts = 30; // defensive coding
-      while (namedThreadIsAlive("direct-runner-worker") && waitAttempts-- >= 0)
{
-        LOG.info("Pausing to allow direct-runner-worker threads to finish");
-        Thread.sleep(1000);
-      }
-
-      // remove solrClients created by us as there are no guarantees on Teardown here
-      for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) {
-        if (o instanceof SolrZkClient && !entryState.contains(o)) {
-          LOG.info("Removing unreleased SolrZkClient");
-          ObjectReleaseTracker.release(o);
-        }
-      }
-
       // check 2 retries were initiated by inspecting the log before passing on the exception
       expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.RETRY_ATTEMPT_LOG, 1));
       expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.RETRY_ATTEMPT_LOG, 2));


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 140689)
    Time Spent: 1h 10m  (was: 1h)

> Direct runner does not wait for failed tasks to teardown
> --------------------------------------------------------
>
>                 Key: BEAM-4040
>                 URL: https://issues.apache.org/jira/browse/BEAM-4040
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct
>    Affects Versions: 2.5.0
>            Reporter: Ismaël Mejía
>            Assignee: Romain Manni-Bucau
>            Priority: Major
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> While testing write retry on SolrIO we found that the direct runner does not correctly
handle the finalization of failed tasks. It seems it does not call teardown on pending tasks.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message