beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [1/3] incubator-beam git commit: [BEAM-1000, BEAM-1050] Fixed PipelineResult.State Failed for streaming, support non-blocking cancel/waituntilfinish in batch. Added a SparkPipelineResult class to address PipelineResult#waitUntilFinish() and SparkRunner#r
Date Mon, 05 Dec 2016 11:26:56 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master ef9871c36 -> 6893a7270


[BEAM-1000, BEAM-1050] Fixed PipelineResult.State Failed for streaming, support non-blocking
cancel/waituntilfinish in batch.
Added a SparkPipelineResult class to address PipelineResult#waitUntilFinish() and SparkRunner#run() semantics.

* Simplified beamExceptionFrom() to abstract away SparkExceptions.
* Reordered methods according to access level.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b1a67934
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b1a67934
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b1a67934

Branch: refs/heads/master
Commit: b1a67934d1496e221718599301635c38f8e3b7ec
Parents: ef9871c
Author: Stas Levin <staslevin@gmail.com>
Authored: Mon Nov 28 11:11:10 2016 +0200
Committer: Sela <ansela@paypal.com>
Committed: Mon Dec 5 12:56:39 2016 +0200

----------------------------------------------------------------------
 .../beam/runners/spark/EvaluationResult.java    |  67 ------
 .../beam/runners/spark/SparkPipelineResult.java | 179 +++++++++++++++
 .../apache/beam/runners/spark/SparkRunner.java  |  98 +++++----
 .../beam/runners/spark/TestSparkRunner.java     |  10 +-
 .../beam/runners/spark/examples/WordCount.java  |   2 +-
 .../spark/translation/EvaluationContext.java    | 119 ++--------
 .../spark/translation/SparkContextFactory.java  |   2 +-
 .../runners/spark/ProvidedSparkContextTest.java |   6 +-
 .../runners/spark/SparkPipelineStateTest.java   | 219 +++++++++++++++++++
 .../metrics/sink/ClearAggregatorsRule.java      |   2 +-
 .../metrics/sink/NamedAggregatorsTest.java      |   2 +-
 .../beam/runners/spark/io/AvroPipelineTest.java |   2 +-
 .../beam/runners/spark/io/NumShardsTest.java    |   2 +-
 .../io/hadoop/HadoopFileFormatPipelineTest.java |   2 +-
 .../spark/translation/SideEffectsTest.java      |  59 -----
 .../streaming/EmptyStreamAssertionTest.java     |   4 +
 .../ResumeFromCheckpointStreamingTest.java      |   8 +-
 .../streaming/utils/PAssertStreaming.java       |   9 +-
 18 files changed, 500 insertions(+), 292 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java
deleted file mode 100644
index 52606a3..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/EvaluationResult.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark;
-
-import org.apache.beam.sdk.PipelineResult;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PValue;
-
-/**
- * Interface for retrieving the result(s) of running a pipeline. Allows us to translate between
- * {@code PObject<T>}s or {@code PCollection<T>}s and Ts or collections of Ts.
- */
-public interface EvaluationResult extends PipelineResult {
-  /**
-   * Retrieves an iterable of results associated with the PCollection passed in.
-   *
-   * @param pcollection Collection we wish to translate.
-   * @param <T>         Type of elements contained in collection.
-   * @return Natively types result associated with collection.
-   */
-  <T> Iterable<T> get(PCollection<T> pcollection);
-
-  /**
-   * Retrieve an object of Type T associated with the PValue passed in.
-   *
-   * @param pval PValue to retrieve associated data for.
-   * @param <T>  Type of object to return.
-   * @return Native object.
-   */
-  <T> T get(PValue pval);
-
-  /**
-   * Retrieves the final value of the aggregator.
-   *
-   * @param aggName    name of aggregator.
-   * @param resultType Class of final result of aggregation.
-   * @param <T>        Type of final result of aggregation.
-   * @return Result of aggregation associated with specified name.
-   */
-  <T> T getAggregatorValue(String aggName, Class<T> resultType);
-
-  /**
-   * Releases any runtime resources, including distributed-execution contexts currently held by
-   * this EvaluationResult; once close() has been called,
-   * {@link EvaluationResult#get(PCollection)} might
-   * not work for subsequent calls.
-   *
-   * @param gracefully true if Spark should finish all ongoing work before closing.
-   */
-  void close(boolean gracefully);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
new file mode 100644
index 0000000..ec0610c
--- /dev/null
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkPipelineResult.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.beam.runners.spark.translation.EvaluationContext;
+import org.apache.beam.runners.spark.translation.SparkContextFactory;
+import org.apache.beam.sdk.AggregatorRetrievalException;
+import org.apache.beam.sdk.AggregatorValues;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.spark.SparkException;
+import org.joda.time.Duration;
+
+/**
+ * Represents a Spark pipeline execution result.
+ */
+public abstract class SparkPipelineResult implements PipelineResult {
+
+  protected final Future pipelineExecution;
+  protected final EvaluationContext context;
+
+  protected PipelineResult.State state;
+
+  SparkPipelineResult(final Future<?> pipelineExecution,
+                      final EvaluationContext evaluationContext) {
+    this.pipelineExecution = pipelineExecution;
+    this.context = evaluationContext;
+    // pipelineExecution is expected to have started executing eagerly.
+    state = State.RUNNING;
+  }
+
+  private RuntimeException runtimeExceptionFrom(Throwable e) {
+    return (e instanceof RuntimeException) ? (RuntimeException) e : new RuntimeException(e);
+  }
+
+  private RuntimeException beamExceptionFrom(Throwable e) {
+    // Scala doesn't declare checked exceptions in the bytecode, and the Java compiler
+    // won't let you catch something that is not declared, so we can't catch
+    // SparkException directly, instead we do an instanceof check.
+    return (e instanceof SparkException)
+        ? new Pipeline.PipelineExecutionException(e.getCause() != null ? e.getCause() : e)
+        : runtimeExceptionFrom(e);
+  }
+
+  protected abstract void stop();
+
+  protected abstract State awaitTermination(Duration duration)
+      throws TimeoutException, ExecutionException, InterruptedException;
+
+  public <T> T getAggregatorValue(String named, Class<T> resultType) {
+    return context.getAggregatorValue(named, resultType);
+  }
+
+  @Override
+  public PipelineResult.State getState() {
+    return state;
+  }
+
+  @Override
+  public PipelineResult.State waitUntilFinish() {
+    return waitUntilFinish(Duration.millis(Long.MAX_VALUE));
+  }
+
+  @Override
+  public State waitUntilFinish(Duration duration) {
+    try {
+      state = awaitTermination(duration);
+    } catch (TimeoutException e) {
+      state = null;
+    } catch (ExecutionException e) {
+      state = PipelineResult.State.FAILED;
+      throw beamExceptionFrom(e.getCause());
+    } catch (Exception e) {
+      state = PipelineResult.State.FAILED;
+      throw beamExceptionFrom(e);
+    } finally {
+      stop();
+    }
+
+    return state;
+  }
+
+  @Override
+  public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
+      throws AggregatorRetrievalException {
+    return context.getAggregatorValues(aggregator);
+  }
+
+  @Override
+  public MetricResults metrics() {
+    throw new UnsupportedOperationException("The SparkRunner does not currently support metrics.");
+  }
+
+  @Override
+  public PipelineResult.State cancel() throws IOException {
+    if (state != null && !state.isTerminal()) {
+      stop();
+      state = PipelineResult.State.CANCELLED;
+    }
+
+    return state;
+  }
+
+  /**
+   * Represents the result of running a batch pipeline.
+   */
+  static class BatchMode extends SparkPipelineResult {
+
+    BatchMode(final Future<?> pipelineExecution,
+              final EvaluationContext evaluationContext) {
+      super(pipelineExecution, evaluationContext);
+    }
+
+    @Override
+    protected void stop() {
+      SparkContextFactory.stopSparkContext(context.getSparkContext());
+    }
+
+    @Override
+    protected State awaitTermination(Duration duration)
+        throws TimeoutException, ExecutionException, InterruptedException {
+      pipelineExecution.get(duration.getMillis(), TimeUnit.MILLISECONDS);
+      return PipelineResult.State.DONE;
+    }
+  }
+
+  /**
+   * Represents a streaming Spark pipeline result.
+   */
+  static class StreamingMode extends SparkPipelineResult {
+
+    StreamingMode(final Future<?> pipelineExecution,
+                  final EvaluationContext evaluationContext) {
+      super(pipelineExecution, evaluationContext);
+    }
+
+    @Override
+    protected void stop() {
+      context.getStreamingContext().stop(false, true);
+      SparkContextFactory.stopSparkContext(context.getSparkContext());
+    }
+
+    @Override
+    protected State awaitTermination(Duration duration) throws TimeoutException,
+        ExecutionException, InterruptedException {
+      pipelineExecution.get(duration.getMillis(), TimeUnit.MILLISECONDS);
+      if (context.getStreamingContext().awaitTerminationOrTimeout(duration.getMillis())) {
+        return State.DONE;
+      } else {
+        return null;
+      }
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
index 63f77c0..a8c600e 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java
@@ -20,6 +20,9 @@ package org.apache.beam.runners.spark;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import org.apache.beam.runners.spark.translation.EvaluationContext;
 import org.apache.beam.runners.spark.translation.SparkContextFactory;
 import org.apache.beam.runners.spark.translation.SparkPipelineTranslator;
@@ -36,14 +39,12 @@ import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
-import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
 import org.slf4j.Logger;
@@ -58,7 +59,7 @@ import org.slf4j.LoggerFactory;
  *
  * {@code
  * Pipeline p = [logic for pipeline creation]
- * EvaluationResult result = (EvaluationResult) p.run();
+ * SparkPipelineResult result = (SparkPipelineResult) p.run();
  * }
  *
  * <p>To create a pipeline runner to run against a different spark cluster, with a custom master url
@@ -68,10 +69,10 @@ import org.slf4j.LoggerFactory;
  * Pipeline p = [logic for pipeline creation]
  * SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
  * options.setSparkMaster("spark://host:port");
- * EvaluationResult result = (EvaluationResult) p.run();
+ * SparkPipelineResult result = (SparkPipelineResult) p.run();
  * }
  */
-public final class SparkRunner extends PipelineRunner<EvaluationResult> {
+public final class SparkRunner extends PipelineRunner<SparkPipelineResult> {
 
   private static final Logger LOG = LoggerFactory.getLogger(SparkRunner.class);
   /**
@@ -122,50 +123,57 @@ public final class SparkRunner extends PipelineRunner<EvaluationResult> {
   }
 
   @Override
-  public EvaluationResult run(Pipeline pipeline) {
-    try {
-      LOG.info("Executing pipeline using the SparkRunner.");
-
-      detectTranslationMode(pipeline);
-      if (mOptions.isStreaming()) {
-        SparkRunnerStreamingContextFactory contextFactory =
-            new SparkRunnerStreamingContextFactory(pipeline, mOptions);
-        JavaStreamingContext jssc = JavaStreamingContext.getOrCreate(mOptions.getCheckpointDir(),
-            contextFactory);
-
-        LOG.info("Starting streaming pipeline execution.");
-        jssc.start();
-
-        // if recovering from checkpoint, we have to reconstruct the EvaluationResult instance.
-        return contextFactory.getCtxt() == null ? new EvaluationContext(jssc.sparkContext(),
-            pipeline, jssc) : contextFactory.getCtxt();
-      } else {
-        JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions);
-        EvaluationContext ctxt = new EvaluationContext(jsc, pipeline);
-        SparkPipelineTranslator translator = new TransformTranslator.Translator();
-        pipeline.traverseTopologically(new Evaluator(translator, ctxt));
-        ctxt.computeOutputs();
+  public SparkPipelineResult run(final Pipeline pipeline) {
+    LOG.info("Executing pipeline using the SparkRunner.");
+
+    final SparkPipelineResult result;
+    final EvaluationContext evaluationContext;
+    final Future<?> startPipeline;
+    final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+    detectTranslationMode(pipeline);
+
+    if (mOptions.isStreaming()) {
+      final SparkRunnerStreamingContextFactory contextFactory =
+          new SparkRunnerStreamingContextFactory(pipeline, mOptions);
+      final JavaStreamingContext jssc =
+          JavaStreamingContext.getOrCreate(mOptions.getCheckpointDir(), contextFactory);
+
+      // if recovering from checkpoint, we have to reconstruct the Evaluation instance.
+      evaluationContext =
+          contextFactory.getCtxt() == null
+              ? new EvaluationContext(jssc.sparkContext(), pipeline, jssc)
+              : contextFactory.getCtxt();
+
+      startPipeline = executorService.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          LOG.info("Starting streaming pipeline execution.");
+          jssc.start();
+        }
+      });
 
-        LOG.info("Pipeline execution complete.");
+      result = new SparkPipelineResult.StreamingMode(startPipeline, evaluationContext);
+    } else {
+      final JavaSparkContext jsc = SparkContextFactory.getSparkContext(mOptions);
+      evaluationContext = new EvaluationContext(jsc, pipeline);
 
-        return ctxt;
-      }
-    } catch (Exception e) {
-      // Scala doesn't declare checked exceptions in the bytecode, and the Java compiler
-      // won't let you catch something that is not declared, so we can't catch
-      // SparkException here. Instead we do an instanceof check.
-      // Then we find the cause by seeing if it's a user exception (wrapped by Beam's
-      // UserCodeException), or just use the SparkException cause.
-      if (e instanceof SparkException && e.getCause() != null) {
-        if (e.getCause() instanceof UserCodeException && e.getCause().getCause() != null) {
-          throw UserCodeException.wrap(e.getCause().getCause());
-        } else {
-          throw new RuntimeException(e.getCause());
+      startPipeline = executorService.submit(new Runnable() {
+
+        @Override
+        public void run() {
+          pipeline.traverseTopologically(new Evaluator(new TransformTranslator.Translator(),
+                                                       evaluationContext));
+          evaluationContext.computeOutputs();
+          LOG.info("Batch pipeline execution complete.");
         }
-      }
-      // otherwise just wrap in a RuntimeException
-      throw new RuntimeException(e);
+      });
+
+      result = new SparkPipelineResult.BatchMode(startPipeline, evaluationContext);
     }
+
+    return result;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index a4ddca0..9a67f9c 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -37,7 +37,7 @@ import org.apache.beam.sdk.values.POutput;
  *
  * {@code
  * Pipeline p = [logic for pipeline creation]
- * EvaluationResult result = (EvaluationResult) p.run();
+ * SparkPipelineResult result = (SparkPipelineResult) p.run();
  * }
  *
  * <p>To create a pipeline runner to run against a different spark cluster, with a custom master url
@@ -47,10 +47,10 @@ import org.apache.beam.sdk.values.POutput;
  * Pipeline p = [logic for pipeline creation]
  * SparkPipelineOptions options = SparkPipelineOptionsFactory.create();
  * options.setSparkMaster("spark://host:port");
- * EvaluationResult result = (EvaluationResult) p.run();
+ * SparkPipelineResult result = (SparkPipelineResult) p.run();
  * }
  */
-public final class TestSparkRunner extends PipelineRunner<EvaluationResult> {
+public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> {
 
   private SparkRunner delegate;
 
@@ -72,9 +72,9 @@ public final class TestSparkRunner extends PipelineRunner<EvaluationResult> {
   };
 
   @Override
-  public EvaluationResult run(Pipeline pipeline) {
+  public SparkPipelineResult run(Pipeline pipeline) {
     TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class);
-    EvaluationResult result = delegate.run(pipeline);
+    SparkPipelineResult result = delegate.run(pipeline);
     assertThat(result, testPipelineOptions.getOnCreateMatcher());
     assertThat(result, testPipelineOptions.getOnSuccessMatcher());
     return result;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 0ae78f2..38dae38 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -132,6 +132,6 @@ public class WordCount {
      .apply(MapElements.via(new FormatAsTextFn()))
      .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
 
-    p.run();
+    p.run().waitUntilFinish();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
index ae45609..425f114 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/EvaluationContext.java
@@ -21,12 +21,10 @@ package org.apache.beam.runners.spark.translation;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.collect.Iterables;
-import java.io.IOException;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
-import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
 import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
 import org.apache.beam.runners.spark.translation.streaming.UnboundedDataset;
@@ -34,7 +32,6 @@ import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -46,15 +43,13 @@ import org.apache.beam.sdk.values.POutput;
 import org.apache.beam.sdk.values.PValue;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.streaming.StreamingContextState;
 import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.joda.time.Duration;
-
 
 /**
- * Evaluation context allows us to define how pipeline instructions.
+ * The EvaluationContext allows us to define pipeline instructions and translate between
+ * {@code PObject<T>}s or {@code PCollection<T>}s and Ts or DStreams/RDDs of Ts.
  */
-public class EvaluationContext implements EvaluationResult {
+public class EvaluationContext {
   private final JavaSparkContext jsc;
   private JavaStreamingContext jssc;
   private final SparkRuntimeContext runtime;
@@ -66,24 +61,20 @@ public class EvaluationContext implements EvaluationResult {
   private final Map<PValue, Object> pobjects = new LinkedHashMap<>();
   private final Map<PValue, Iterable<? extends WindowedValue<?>>> pview = new LinkedHashMap<>();
   private AppliedPTransform<?, ?, ?> currentTransform;
-  private State state;
 
   public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline) {
     this.jsc = jsc;
     this.pipeline = pipeline;
     this.runtime = new SparkRuntimeContext(pipeline, jsc);
-    // A batch pipeline is blocking by nature
-    this.state = State.DONE;
   }
 
   public EvaluationContext(JavaSparkContext jsc, Pipeline pipeline,
                            JavaStreamingContext jssc) {
     this(jsc, pipeline);
     this.jssc = jssc;
-    this.state = State.RUNNING;
   }
 
-  JavaSparkContext getSparkContext() {
+  public JavaSparkContext getSparkContext() {
     return jsc;
   }
 
@@ -179,8 +170,14 @@ public class EvaluationContext implements EvaluationResult {
     }
   }
 
+  /**
+   * Retrieve an object of Type T associated with the PValue passed in.
+   *
+   * @param value PValue to retrieve associated data for.
+   * @param <T>  Type of object to return.
+   * @return Native object.
+   */
   @SuppressWarnings("unchecked")
-  @Override
   public <T> T get(PValue value) {
     if (pobjects.containsKey(value)) {
       T result = (T) pobjects.get(value);
@@ -195,23 +192,24 @@ public class EvaluationContext implements EvaluationResult {
     throw new IllegalStateException("Cannot resolve un-known PObject: " + value);
   }
 
-  @Override
-  public <T> T getAggregatorValue(String named, Class<T> resultType) {
-    return runtime.getAggregatorValue(AccumulatorSingleton.getInstance(jsc), named, resultType);
-  }
-
-  @Override
   public <T> AggregatorValues<T> getAggregatorValues(Aggregator<?, T> aggregator)
       throws AggregatorRetrievalException {
     return runtime.getAggregatorValues(AccumulatorSingleton.getInstance(jsc), aggregator);
   }
 
-  @Override
-  public MetricResults metrics() {
-    throw new UnsupportedOperationException("The SparkRunner does not currently support metrics.");
+  public <T> T getAggregatorValue(String named, Class<T> resultType) {
+    return runtime.getAggregatorValue(AccumulatorSingleton.getInstance(jsc),
+                                      named,
+                                      resultType);
   }
 
-  @Override
+  /**
+   * Retrieves an iterable of results associated with the PCollection passed in.
+   *
+   * @param pcollection Collection we wish to translate.
+   * @param <T>         Type of elements contained in collection.
+   * @return Natively types result associated with collection.
+   */
   public <T> Iterable<T> get(PCollection<T> pcollection) {
     @SuppressWarnings("unchecked")
     BoundedDataset<T> boundedDataset = (BoundedDataset<T>) datasets.get(pcollection);
@@ -225,79 +223,6 @@ public class EvaluationContext implements EvaluationResult {
     return boundedDataset.getValues(pcollection);
   }
 
-  @Override
-  public void close(boolean gracefully) {
-    // Stopping streaming job if running
-    if (isStreamingPipeline() && !state.isTerminal()) {
-      try {
-        cancel(gracefully);
-      } catch (IOException e) {
-        throw new RuntimeException("Failed to cancel streaming job", e);
-      }
-    }
-    SparkContextFactory.stopSparkContext(jsc);
-  }
-
-  @Override
-  public State getState() {
-    return state;
-  }
-
-  @Override
-  public State cancel() throws IOException {
-    return cancel(true);
-  }
-
-  private State cancel(boolean gracefully) throws IOException {
-    if (isStreamingPipeline()) {
-      if (!state.isTerminal()) {
-        jssc.stop(false, gracefully);
-        state = State.CANCELLED;
-      }
-      return state;
-    } else {
-      // Batch is currently blocking so
-      // there is no way to cancel a batch job
-      // will be handled at BEAM-1000
-      throw new UnsupportedOperationException(
-          "Spark runner EvaluationContext does not support cancel.");
-    }
-  }
-
-  @Override
-  public State waitUntilFinish() {
-    return waitUntilFinish(Duration.ZERO);
-  }
-
-  @Override
-  public State waitUntilFinish(Duration duration) {
-    if (isStreamingPipeline()) {
-      // According to PipelineResult: Provide a value less than 1 ms for an infinite wait
-      if (duration.getMillis() < 1L) {
-        jssc.awaitTermination();
-        state = State.DONE;
-      } else {
-        jssc.awaitTermination(duration.getMillis());
-        // According to PipelineResult: The final state of the pipeline or null on timeout
-        if (jssc.getState().equals(StreamingContextState.STOPPED)) {
-          state = State.DONE;
-        } else {
-          return null;
-        }
-      }
-      return state;
-    } else {
-      // This is no-op, since Spark runner in batch is blocking.
-      // It needs to be updated once SparkRunner supports non-blocking execution:
-      // https://issues.apache.org/jira/browse/BEAM-595
-      return State.DONE;
-    }
-  }
-
-  private boolean isStreamingPipeline() {
-    return jssc != null;
-  }
-
   private String storageLevel() {
     return runtime.getPipelineOptions().as(SparkPipelineOptions.class).getStorageLevel();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
index c7f90b4..67839a8 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkContextFactory.java
@@ -66,7 +66,7 @@ public final class SparkContextFactory {
     }
   }
 
-  static synchronized void stopSparkContext(JavaSparkContext context) {
+  public static synchronized void stopSparkContext(JavaSparkContext context) {
     if (!Boolean.getBoolean(TEST_REUSE_SPARK_CONTEXT)) {
       context.stop();
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
index fe73aba..2982844 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/ProvidedSparkContextTest.java
@@ -76,7 +76,7 @@ public class ProvidedSparkContextTest {
         PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
 
         // Run test from pipeline
-        p.run();
+        p.run().waitUntilFinish();
 
         jsc.stop();
     }
@@ -100,7 +100,7 @@ public class ProvidedSparkContextTest {
         PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
 
         try {
-            p.run();
+            p.run().waitUntilFinish();
             fail("Should throw an exception when The provided Spark context is null");
         } catch (RuntimeException e){
             assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION));
@@ -128,7 +128,7 @@ public class ProvidedSparkContextTest {
         PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET);
 
         try {
-            p.run();
+            p.run().waitUntilFinish();
             fail("Should throw an exception when The provided Spark context is stopped");
         } catch (RuntimeException e){
             assert(e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java
new file mode 100644
index 0000000..69cf1c4
--- /dev/null
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkPipelineStateTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.spark;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.runners.spark.io.CreateStream;
+import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.util.UserCodeException;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * This suite tests that various scenarios result in proper states of the pipeline.
+ */
+public class SparkPipelineStateTest implements Serializable {
+
+  private static class UserException extends RuntimeException {
+
+    UserException(String message) {
+      super(message);
+    }
+  }
+
+  @Rule
+  public transient SparkTestPipelineOptions commonOptions = new SparkTestPipelineOptions();
+
+  @Rule
+  public transient TestName testName = new TestName();
+
+  private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the batch intentionally";
+
+  private static final List<String> BATCH_WORDS = Arrays.asList("one", "two");
+
+  private static final List<Iterable<String>> STREAMING_WORDS =
+      Lists.<Iterable<String>>newArrayList(BATCH_WORDS);
+
+  private ParDo.Bound<String, String> printParDo(final String prefix) {
+    return ParDo.of(new DoFn<String, String>() {
+
+      @ProcessElement
+      public void processElement(ProcessContext c) {
+        System.out.println(prefix + " " + c.element());
+      }
+    });
+  }
+
+  private PTransform<PBegin, PCollection<String>> getValues(SparkPipelineOptions options) {
+    return options.isStreaming()
+        ? CreateStream.fromQueue(STREAMING_WORDS)
+        : Create.of(BATCH_WORDS);
+  }
+
+  private SparkPipelineOptions getStreamingOptions() {
+    final SparkPipelineOptions options = commonOptions.getOptions();
+    options.setStreaming(true);
+    return options;
+  }
+
+  private SparkPipelineOptions getBatchOptions() {
+    return commonOptions.getOptions();
+  }
+
+  private Pipeline getPipeline(SparkPipelineOptions options) {
+
+    final Pipeline pipeline = Pipeline.create(options);
+    final String name = testName.getMethodName() + "(isStreaming=" + options.isStreaming() + ")";
+
+    pipeline
+        .apply(getValues(options)).setCoder(StringUtf8Coder.of())
+        .apply(printParDo(name));
+
+    return pipeline;
+  }
+
+  private void testFailedPipeline(SparkPipelineOptions options) throws Exception {
+
+    SparkPipelineResult result = null;
+
+    try {
+      final Pipeline pipeline = Pipeline.create(options);
+      pipeline
+          .apply(getValues(options)).setCoder(StringUtf8Coder.of())
+          .apply(MapElements.via(new SimpleFunction<String, String>() {
+
+            @Override
+            public String apply(String input) {
+              throw new UserException(FAILED_THE_BATCH_INTENTIONALLY);
+            }
+          }));
+
+      result = (SparkPipelineResult) pipeline.run();
+      result.waitUntilFinish();
+    } catch (Exception e) {
+      assertThat(e, instanceOf(Pipeline.PipelineExecutionException.class));
+      assertThat(e.getCause(), instanceOf(UserCodeException.class));
+      assertThat(e.getCause().getCause(), instanceOf(UserException.class));
+      assertThat(e.getCause().getCause().getMessage(), is(FAILED_THE_BATCH_INTENTIONALLY));
+      assertThat(result.getState(), is(PipelineResult.State.FAILED));
+      result.cancel();
+      return;
+    }
+
+    fail("An injected failure did not affect the pipeline as expected.");
+  }
+
+  private void testTimeoutPipeline(SparkPipelineOptions options) throws Exception {
+
+    final Pipeline pipeline = getPipeline(options);
+
+    SparkPipelineResult result = (SparkPipelineResult) pipeline.run();
+
+    result.waitUntilFinish(Duration.millis(1));
+
+    assertThat(result.getState(), nullValue());
+
+    result.cancel();
+  }
+
+  private void testCanceledPipeline(SparkPipelineOptions options) throws Exception {
+
+    final Pipeline pipeline = getPipeline(options);
+
+    SparkPipelineResult result = (SparkPipelineResult) pipeline.run();
+
+    result.cancel();
+
+    assertThat(result.getState(), is(PipelineResult.State.CANCELLED));
+  }
+
+  private void testRunningPipeline(SparkPipelineOptions options) throws Exception {
+
+    final Pipeline pipeline = getPipeline(options);
+
+    SparkPipelineResult result = (SparkPipelineResult) pipeline.run();
+
+    assertThat(result.getState(), is(PipelineResult.State.RUNNING));
+
+    result.cancel();
+  }
+
+  @Test
+  public void testStreamingPipelineRunningState() throws Exception {
+    testRunningPipeline(getStreamingOptions());
+  }
+
+  @Test
+  public void testBatchPipelineRunningState() throws Exception {
+    testRunningPipeline(getBatchOptions());
+  }
+
+  @Test
+  public void testStreamingPipelineCanceledState() throws Exception {
+    testCanceledPipeline(getStreamingOptions());
+  }
+
+  @Test
+  public void testBatchPipelineCanceledState() throws Exception {
+    testCanceledPipeline(getBatchOptions());
+  }
+
+  @Test
+  public void testStreamingPipelineFailedState() throws Exception {
+    testFailedPipeline(getStreamingOptions());
+  }
+
+  @Test
+  public void testBatchPipelineFailedState() throws Exception {
+    testFailedPipeline(getBatchOptions());
+  }
+
+  @Test
+  public void testStreamingPipelineTimeoutState() throws Exception {
+    testTimeoutPipeline(getStreamingOptions());
+  }
+
+  @Test
+  public void testBatchPipelineTimeoutState() throws Exception {
+    testTimeoutPipeline(getBatchOptions());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java
index 79c58a7..52ae019 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/ClearAggregatorsRule.java
@@ -25,7 +25,7 @@ import org.junit.rules.ExternalResource;
  * A rule that clears the {@link org.apache.beam.runners.spark.aggregators.AccumulatorSingleton}
  * which represents the Beam {@link org.apache.beam.sdk.transforms.Aggregator}s.
  */
-class ClearAggregatorsRule extends ExternalResource {
+public class ClearAggregatorsRule extends ExternalResource {
   @Override
   protected void before() throws Throwable {
     AccumulatorSingleton.clear();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
index c16574c..6b36bcc 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/aggregators/metrics/sink/NamedAggregatorsTest.java
@@ -78,7 +78,7 @@ public class NamedAggregatorsTest {
 
     PAssert.that(output).containsInAnyOrder(expectedCounts);
 
-    pipeline.run();
+    pipeline.run().waitUntilFinish();
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
index 03f96d5..c5bb583 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/AvroPipelineTest.java
@@ -76,7 +76,7 @@ public class AvroPipelineTest {
     PCollection<GenericRecord> input = p.apply(
         AvroIO.Read.from(inputFile.getAbsolutePath()).withSchema(schema));
     input.apply(AvroIO.Write.to(outputDir.getAbsolutePath()).withSchema(schema));
-    p.run();
+    p.run().waitUntilFinish();
 
     List<GenericRecord> records = readGenericFile();
     assertEquals(Lists.newArrayList(savedRecord), records);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index 4e5435f..34d6818 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -74,7 +74,7 @@ public class NumShardsTest {
     PCollection<String> output = inputWords.apply(new WordCount.CountWords())
         .apply(MapElements.via(new WordCount.FormatAsTextFn()));
     output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
-    p.run();
+    p.run().waitUntilFinish();
 
     int count = 0;
     Set<String> expected = Sets.newHashSet("hi: 5", "there: 1", "sue: 2", "bob: 2");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
index b68e8d4..9efc670 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/hadoop/HadoopFileFormatPipelineTest.java
@@ -88,7 +88,7 @@ public class HadoopFileFormatPipelineTest {
     HadoopIO.Write.Bound<IntWritable, Text> write = HadoopIO.Write.to(outputFile.getAbsolutePath(),
         outputFormatClass, IntWritable.class, Text.class);
     input.apply(write.withoutSharding());
-    p.run();
+    p.run().waitUntilFinish();
 
     IntWritable key = new IntWritable();
     Text value = new Text();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
deleted file mode 100644
index 3b79d03..0000000
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SideEffectsTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.runners.spark.translation;
-
-import static org.hamcrest.core.Is.isA;
-
-import java.io.Serializable;
-import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptions;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-/**
- * Side effects test.
- */
-public class SideEffectsTest implements Serializable {
-  private static class UserException extends RuntimeException {
-  }
-
-  @Rule
-  public final transient SparkTestPipelineOptions pipelineOptions = new SparkTestPipelineOptions();
-  @Rule
-  public final transient ExpectedException expectedException = ExpectedException.none();
-
-  @Test
-  public void test() throws Exception {
-    Pipeline p = Pipeline.create(pipelineOptions.getOptions());
-
-    p.apply(Create.of("a")).apply(ParDo.of(new DoFn<String, String>() {
-      @ProcessElement
-      public void processElement(ProcessContext c) throws Exception {
-        throw new UserException();
-      }
-    }));
-
-    expectedException.expectCause(isA(UserException.class));
-    p.run();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
index 656107a..e3561d6 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/EmptyStreamAssertionTest.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
 import java.io.Serializable;
 import java.util.Collections;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.aggregators.metrics.sink.ClearAggregatorsRule;
 import org.apache.beam.runners.spark.io.CreateStream;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
 import org.apache.beam.runners.spark.translation.streaming.utils.SparkTestPipelineOptionsForStreaming;
@@ -54,6 +55,9 @@ public class EmptyStreamAssertionTest implements Serializable {
   public SparkTestPipelineOptionsForStreaming commonOptions =
       new SparkTestPipelineOptionsForStreaming();
 
+  @Rule
+  public ClearAggregatorsRule clearAggregatorsRule = new ClearAggregatorsRule();
+
   @Test
   public void testAssertion() throws Exception {
     SparkPipelineOptions options = commonOptions.withTmpCheckpointDir(checkpointParentDir);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index b57787f..e0d71d4 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -27,8 +27,8 @@ import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.runners.spark.EvaluationResult;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.SparkPipelineResult;
 import org.apache.beam.runners.spark.aggregators.AccumulatorSingleton;
 import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
 import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
@@ -118,7 +118,7 @@ public class ResumeFromCheckpointStreamingTest {
     options.setCheckpointDurationMillis(options.getBatchIntervalMillis());
 
     // first run will read from Kafka backlog - "auto.offset.reset=smallest"
-    EvaluationResult res = run(options);
+    SparkPipelineResult res = run(options);
     long processedMessages1 = res.getAggregatorValue("processedMessages", Long.class);
     assertThat(String.format("Expected %d processed messages count but "
         + "found %d", EXPECTED_AGG_FIRST, processedMessages1), processedMessages1,
@@ -132,14 +132,14 @@ public class ResumeFromCheckpointStreamingTest {
             equalTo(EXPECTED_AGG_FIRST));
   }
 
-  private static EvaluationResult runAgain(SparkPipelineOptions options) {
+  private static SparkPipelineResult runAgain(SparkPipelineOptions options) {
     AccumulatorSingleton.clear();
     // sleep before next run.
     Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
     return run(options);
   }
 
-  private static EvaluationResult run(SparkPipelineOptions options) {
+  private static SparkPipelineResult run(SparkPipelineOptions options) {
     // write to Kafka
     produce();
     Map<String, Object> consumerProps = ImmutableMap.<String, Object>of(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b1a67934/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
index 23aca43..471ec92 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java
@@ -23,7 +23,7 @@ import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertThat;
 
 import java.io.Serializable;
-import org.apache.beam.runners.spark.EvaluationResult;
+import org.apache.beam.runners.spark.SparkPipelineResult;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -55,7 +55,7 @@ public final class PAssertStreaming implements Serializable {
    * Note that it is oblivious to windowing, so the assertion will apply indiscriminately to all
    * windows.
    */
-  public static <T> EvaluationResult runAndAssertContents(Pipeline p,
+  public static <T> SparkPipelineResult runAndAssertContents(Pipeline p,
                                                           PCollection<T> actual,
                                                           T[] expected,
                                                           Duration timeout,
@@ -69,9 +69,8 @@ public final class PAssertStreaming implements Serializable {
         .apply(ParDo.of(new AssertDoFn<>(expected)));
 
     // run the pipeline.
-    EvaluationResult res = (EvaluationResult) p.run();
+    SparkPipelineResult res = (SparkPipelineResult) p.run();
     res.waitUntilFinish(timeout);
-    res.close(stopGracefully);
     // validate assertion succeeded (at least once).
     int success = res.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class);
     Assert.assertThat("Success aggregator should be greater than zero.", success, not(0));
@@ -87,7 +86,7 @@ public final class PAssertStreaming implements Serializable {
    * Default to stop gracefully so that tests will finish processing even if slower for reasons
    * such as a slow runtime environment.
    */
-  public static <T> EvaluationResult runAndAssertContents(Pipeline p,
+  public static <T> SparkPipelineResult runAndAssertContents(Pipeline p,
                                                           PCollection<T> actual,
                                                           T[] expected,
                                                           Duration timeout) {


Mime
View raw message