beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [2/4] incubator-beam git commit: Remove unused Thread variable in TransformExecutor
Date Wed, 16 Nov 2016 21:23:00 GMT
Remove unused Thread variable in TransformExecutor


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5ba4d181
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5ba4d181
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5ba4d181

Branch: refs/heads/master
Commit: 5ba4d181d4625d43078bb0b071635d563d925277
Parents: 3e6a4f4
Author: Thomas Groh <tgroh@google.com>
Authored: Tue Nov 8 14:16:23 2016 -0800
Committer: Thomas Groh <tgroh@google.com>
Committed: Wed Nov 16 13:22:41 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/TransformExecutor.java  | 23 -----------
 .../runners/direct/TransformExecutorTest.java   | 43 --------------------
 2 files changed, 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5ba4d181/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
index c4002b5..1704955 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformExecutor.java
@@ -17,13 +17,9 @@
  */
 package org.apache.beam.runners.direct;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicReference;
-import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
 import org.apache.beam.sdk.metrics.MetricUpdates;
 import org.apache.beam.sdk.metrics.MetricsContainer;
@@ -70,8 +66,6 @@ class TransformExecutor<T> implements Runnable {
   private final TransformExecutorService transformEvaluationState;
   private final EvaluationContext context;
 
-  private final AtomicReference<Thread> thread;
-
   private TransformExecutor(
       EvaluationContext context,
       TransformEvaluatorFactory factory,
@@ -90,20 +84,12 @@ class TransformExecutor<T> implements Runnable {
 
     this.transformEvaluationState = transformEvaluationState;
     this.context = context;
-    this.thread = new AtomicReference<>();
   }
 
   @Override
   public void run() {
     MetricsContainer metricsContainer = new MetricsContainer(transform.getFullName());
     MetricsEnvironment.setMetricsContainer(metricsContainer);
-    checkState(
-        thread.compareAndSet(null, Thread.currentThread()),
-        "Tried to execute %s for %s on thread %s, but is already executing on thread %s",
-        TransformExecutor.class.getSimpleName(),
-        transform.getFullName(),
-        Thread.currentThread(),
-        thread.get());
     try {
       Collection<ModelEnforcement<T>> enforcements = new ArrayList<>();
       for (ModelEnforcementFactory enforcementFactory : modelEnforcements) {
@@ -186,13 +172,4 @@ class TransformExecutor<T> implements Runnable {
     }
     return result;
   }
-
-  /**
-   * If this {@link TransformExecutor} is currently executing, return the thread it is executing
in.
-   * Otherwise, return null.
-   */
-  @Nullable
-  public Thread getThread() {
-    return thread.get();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5ba4d181/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
index 32f874d..0b7b882 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/TransformExecutorTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isA;
-import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
@@ -277,48 +276,6 @@ public class TransformExecutorTest {
   }
 
   @Test
-  public void duringCallGetThreadIsNonNull() throws Exception {
-    final TransformResult result =
-        StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();
-    final CountDownLatch testLatch = new CountDownLatch(1);
-    final CountDownLatch evaluatorLatch = new CountDownLatch(1);
-    TransformEvaluator<Object> evaluator =
-        new TransformEvaluator<Object>() {
-          @Override
-          public void processElement(WindowedValue<Object> element) throws Exception
{
-            throw new IllegalArgumentException("Shouldn't be called");
-          }
-
-          @Override
-          public TransformResult finishBundle() throws Exception {
-            testLatch.countDown();
-            evaluatorLatch.await();
-            return result;
-          }
-        };
-
-    when(registry.forApplication(created.getProducingTransformInternal(), null))
-        .thenReturn(evaluator);
-
-    TransformExecutor<String> executor =
-        TransformExecutor.create(
-            evaluationContext,
-            registry,
-            Collections.<ModelEnforcementFactory>emptyList(),
-            null,
-            created.getProducingTransformInternal(),
-            completionCallback,
-            transformEvaluationState);
-
-    Executors.newSingleThreadExecutor().submit(executor);
-    testLatch.await();
-    assertThat(executor.getThread(), not(nullValue()));
-
-    // Finish the execution so everything can get closed down cleanly.
-    evaluatorLatch.countDown();
-  }
-
-  @Test
   public void callWithEnforcementAppliesEnforcement() throws Exception {
     final TransformResult result =
         StepTransformResult.withoutHold(downstream.getProducingTransformInternal()).build();


Mime
View raw message