beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From goe...@apache.org
Subject [beam] branch master updated: [BEAM-6725] share some Flink job invocation code with Samza runner
Date Fri, 01 Mar 2019 21:10:49 GMT
This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 194d85f  [BEAM-6725] share some Flink job invocation code with Samza runner
     new ee8f260  Merge pull request #7941 from ibzib/portable-job-invocation
194d85f is described below

commit 194d85f950f5dbe2c114e7cd20e6dbb54adfd5a5
Author: Kyle Weaver <kcweaver@google.com>
AuthorDate: Fri Feb 22 16:25:19 2019 -0800

    [BEAM-6725] share some Flink job invocation code with Samza runner
---
 .../apache/beam/runners/flink/FlinkJobInvoker.java |  45 +++---
 .../beam/runners/flink/FlinkPipelineOptions.java   |   2 +-
 ...JobInvocation.java => FlinkPipelineRunner.java} | 163 ++-------------------
 .../beam/runners/flink/FlinkSavepointTest.java     |   5 +-
 .../beam/runners/flink/PortableExecutionTest.java  |   5 +-
 .../runners/flink/PortableStateExecutionTest.java  |   5 +-
 .../runners/flink/PortableTimersExecutionTest.java |   5 +-
 .../fnexecution/jobsubmission/JobInvocation.java   | 148 ++++++++++++++++++-
 .../fnexecution/jobsubmission/JobInvoker.java      |  33 ++++-
 ...JobInvoker.java => PortablePipelineRunner.java} |  12 +-
 .../beam/runners/samza/SamzaJobInvocation.java     | 128 ----------------
 .../beam/runners/samza/SamzaJobServerDriver.java   |  14 +-
 .../beam/runners/samza/SamzaPipelineOptions.java   |   2 +-
 .../beam/runners/samza/SamzaPipelineRunner.java    |  57 +++++++
 14 files changed, 290 insertions(+), 334 deletions(-)

diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
index 9dde9e3..38dcdc8 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvoker.java
@@ -20,9 +20,8 @@ package org.apache.beam.runners.flink;
 import static org.apache.beam.runners.core.construction.PipelineResources.detectClassPathResourcesToStage;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
@@ -31,39 +30,30 @@ import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
 import org.apache.beam.sdk.options.PortablePipelineOptions;
 import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Job Invoker for the {@link FlinkRunner}. */
-public class FlinkJobInvoker implements JobInvoker {
+public class FlinkJobInvoker extends JobInvoker {
   private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvoker.class);
 
   public static FlinkJobInvoker create(FlinkJobServerDriver.FlinkServerConfiguration serverConfig)
{
-    ThreadFactory threadFactory =
-        new ThreadFactoryBuilder()
-            .setNameFormat("flink-runner-job-invoker")
-            .setDaemon(true)
-            .build();
-    ListeningExecutorService executorService =
-        MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory));
-    return new FlinkJobInvoker(executorService, serverConfig);
+    return new FlinkJobInvoker(serverConfig);
   }
 
-  private final ListeningExecutorService executorService;
   private final FlinkJobServerDriver.FlinkServerConfiguration serverConfig;
 
-  private FlinkJobInvoker(
-      ListeningExecutorService executorService,
-      FlinkJobServerDriver.FlinkServerConfiguration serverConfig) {
-    this.executorService = executorService;
+  private FlinkJobInvoker(FlinkJobServerDriver.FlinkServerConfiguration serverConfig) {
+    super("flink-runner-job-invoker");
     this.serverConfig = serverConfig;
   }
 
   @Override
-  public JobInvocation invoke(
-      RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken)
+  protected JobInvocation invokeWithExecutor(
+      RunnerApi.Pipeline pipeline,
+      Struct options,
+      @Nullable String retrievalToken,
+      ListeningExecutorService executorService)
       throws IOException {
     // TODO: How to make Java/Python agree on names of keys and their values?
     LOG.trace("Parsing pipeline options");
@@ -85,7 +75,7 @@ public class FlinkJobInvoker implements JobInvoker {
 
     flinkOptions.setRunner(null);
 
-    return FlinkJobInvocation.create(
+    return createJobInvocation(
         invocationId,
         retrievalToken,
         executorService,
@@ -94,4 +84,17 @@ public class FlinkJobInvoker implements JobInvoker {
         serverConfig.getFlinkConfDir(),
         detectClassPathResourcesToStage(FlinkJobInvoker.class.getClassLoader()));
   }
+
+  static JobInvocation createJobInvocation(
+      String invocationId,
+      String retrievalToken,
+      ListeningExecutorService executorService,
+      RunnerApi.Pipeline pipeline,
+      FlinkPipelineOptions flinkOptions,
+      @Nullable String confDir,
+      List<String> filesToStage) {
+    FlinkPipelineRunner pipelineRunner =
+        new FlinkPipelineRunner(invocationId, retrievalToken, flinkOptions, confDir, filesToStage);
+    return new JobInvocation(invocationId, executorService, pipeline, pipelineRunner);
+  }
 }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
index cc94793..e1103bc 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.common.ExecutionMode;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.CheckpointingMode;
 
-/** Options which can be used to configure a Flink PipelineRunner. */
+/** Options which can be used to configure a Flink PortablePipelineRunner. */
 public interface FlinkPipelineOptions
     extends PipelineOptions, ApplicationNameOptions, StreamingOptions {
 
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
similarity index 50%
rename from runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
rename to runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
index 421c38e..9ccd973 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkJobInvocation.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java
@@ -17,91 +17,53 @@
  */
 package org.apache.beam.runners.flink;
 
-import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
 import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
-import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables.getRootCause;
-import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables.getStackTraceAsString;
 
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
-import java.util.function.Consumer;
 import javax.annotation.Nullable;
-import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
-import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
 import org.apache.beam.runners.core.construction.PTransformTranslation;
 import org.apache.beam.runners.core.construction.PipelineOptionsTranslation;
 import org.apache.beam.runners.core.construction.graph.ExecutableStage;
 import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
-import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
+import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Sets;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.FutureCallback;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Futures;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListenableFuture;
-import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** Invocation of a Flink Job via {@link FlinkRunner}. */
-public class FlinkJobInvocation implements JobInvocation {
-  private static final Logger LOG = LoggerFactory.getLogger(FlinkJobInvocation.class);
-
-  public static FlinkJobInvocation create(
-      String id,
-      String retrievalToken,
-      ListeningExecutorService executorService,
-      Pipeline pipeline,
-      FlinkPipelineOptions pipelineOptions,
-      @Nullable String confDir,
-      List<String> filesToStage) {
-    return new FlinkJobInvocation(
-        id, retrievalToken, executorService, pipeline, pipelineOptions, confDir, filesToStage);
-  }
+/** Runs a Pipeline on Flink via {@link FlinkRunner}. */
+public class FlinkPipelineRunner implements PortablePipelineRunner {
+  private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineRunner.class);
 
   private final String id;
   private final String retrievalToken;
-  private final ListeningExecutorService executorService;
-  private final RunnerApi.Pipeline pipeline;
   private final FlinkPipelineOptions pipelineOptions;
   private final String confDir;
   private final List<String> filesToStage;
-  private JobState.Enum jobState;
-  private List<Consumer<JobState.Enum>> stateObservers;
-  private List<Consumer<JobMessage>> messageObservers;
-
-  @Nullable private ListenableFuture<PipelineResult> invocationFuture;
 
-  private FlinkJobInvocation(
+  public FlinkPipelineRunner(
       String id,
       String retrievalToken,
-      ListeningExecutorService executorService,
-      Pipeline pipeline,
       FlinkPipelineOptions pipelineOptions,
       @Nullable String confDir,
       List<String> filesToStage) {
     this.id = id;
     this.retrievalToken = retrievalToken;
-    this.executorService = executorService;
-    this.pipeline = pipeline;
     this.pipelineOptions = pipelineOptions;
     this.confDir = confDir;
     this.filesToStage = filesToStage;
-    this.invocationFuture = null;
-    this.jobState = JobState.Enum.STOPPED;
-    this.stateObservers = new ArrayList<>();
-    this.messageObservers = new ArrayList<>();
   }
 
-  private PipelineResult runPipeline() throws Exception {
+  @Override
+  public PipelineResult run(final Pipeline pipeline) throws Exception {
     MetricsEnvironment.setMetricsSupported(false);
 
     FlinkPortablePipelineTranslator<?> translator;
@@ -111,12 +73,12 @@ public class FlinkJobInvocation implements JobInvocation {
     } else {
       translator = new FlinkStreamingPortablePipelineTranslator();
     }
-    return runPipelineWithTranslator(translator);
+    return runPipelineWithTranslator(pipeline, translator);
   }
 
   private <T extends FlinkPortablePipelineTranslator.TranslationContext>
-      PipelineResult runPipelineWithTranslator(FlinkPortablePipelineTranslator<T> translator)
-          throws Exception {
+      PipelineResult runPipelineWithTranslator(
+          final Pipeline pipeline, FlinkPortablePipelineTranslator<T> translator) throws
Exception {
     LOG.info("Translating pipeline to Flink program.");
 
     // Don't let the fuser fuse any subcomponents of native transforms.
@@ -178,111 +140,6 @@ public class FlinkJobInvocation implements JobInvocation {
     }
   }
 
-  @Override
-  public synchronized void start() {
-    LOG.info("Starting job invocation {}", getId());
-    if (getState() != JobState.Enum.STOPPED) {
-      throw new IllegalStateException(String.format("Job %s already running.", getId()));
-    }
-    setState(JobState.Enum.STARTING);
-    invocationFuture = executorService.submit(this::runPipeline);
-    // TODO: Defer transitioning until the pipeline is up and running.
-    setState(JobState.Enum.RUNNING);
-    Futures.addCallback(
-        invocationFuture,
-        new FutureCallback<PipelineResult>() {
-          @Override
-          public void onSuccess(@Nullable PipelineResult pipelineResult) {
-            if (pipelineResult != null) {
-              checkArgument(
-                  pipelineResult.getState() == PipelineResult.State.DONE,
-                  "Success on non-Done state: " + pipelineResult.getState());
-              setState(JobState.Enum.DONE);
-            } else {
-              setState(JobState.Enum.UNSPECIFIED);
-            }
-          }
-
-          @Override
-          public void onFailure(Throwable throwable) {
-            String message = String.format("Error during job invocation %s.", getId());
-            LOG.error(message, throwable);
-            sendMessage(
-                JobMessage.newBuilder()
-                    .setMessageText(getStackTraceAsString(throwable))
-                    .setImportance(JobMessage.MessageImportance.JOB_MESSAGE_DEBUG)
-                    .build());
-            sendMessage(
-                JobMessage.newBuilder()
-                    .setMessageText(getRootCause(throwable).toString())
-                    .setImportance(JobMessage.MessageImportance.JOB_MESSAGE_ERROR)
-                    .build());
-            setState(JobState.Enum.FAILED);
-          }
-        },
-        executorService);
-  }
-
-  @Override
-  public String getId() {
-    return id;
-  }
-
-  @Override
-  public synchronized void cancel() {
-    LOG.info("Canceling job invocation {}", getId());
-    if (this.invocationFuture != null) {
-      this.invocationFuture.cancel(true /* mayInterruptIfRunning */);
-      Futures.addCallback(
-          invocationFuture,
-          new FutureCallback<PipelineResult>() {
-            @Override
-            public void onSuccess(@Nullable PipelineResult pipelineResult) {
-              if (pipelineResult != null) {
-                try {
-                  pipelineResult.cancel();
-                } catch (IOException exn) {
-                  throw new RuntimeException(exn);
-                }
-              }
-            }
-
-            @Override
-            public void onFailure(Throwable throwable) {}
-          },
-          executorService);
-    }
-  }
-
-  @Override
-  public JobState.Enum getState() {
-    return this.jobState;
-  }
-
-  @Override
-  public synchronized void addStateListener(Consumer<JobState.Enum> stateStreamObserver)
{
-    stateStreamObserver.accept(getState());
-    stateObservers.add(stateStreamObserver);
-  }
-
-  @Override
-  public synchronized void addMessageListener(Consumer<JobMessage> messageStreamObserver)
{
-    messageObservers.add(messageStreamObserver);
-  }
-
-  private synchronized void setState(JobState.Enum state) {
-    this.jobState = state;
-    for (Consumer<JobState.Enum> observer : stateObservers) {
-      observer.accept(state);
-    }
-  }
-
-  private synchronized void sendMessage(JobMessage message) {
-    for (Consumer<JobMessage> observer : messageObservers) {
-      observer.accept(message);
-    }
-  }
-
   /** Indicates whether the given pipeline has any unbounded PCollections. */
   private static boolean hasUnboundedPCollections(RunnerApi.Pipeline pipeline) {
     checkNotNull(pipeline);
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
index 756b706..41e9299 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkSavepointTest.java
@@ -27,6 +27,7 @@ import java.util.concurrent.Executors;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -194,8 +195,8 @@ public class FlinkSavepointTest implements Serializable {
     ListeningExecutorService executorService =
         MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
     try {
-      FlinkJobInvocation jobInvocation =
-          FlinkJobInvocation.create(
+      JobInvocation jobInvocation =
+          FlinkJobInvoker.createJobInvocation(
               "id",
               "none",
               executorService,
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
index c817cb9..6214ff4 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableExecutionTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.BigEndianLongCoder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -140,8 +141,8 @@ public class PortableExecutionTest implements Serializable {
     RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
 
     // execute the pipeline
-    FlinkJobInvocation jobInvocation =
-        FlinkJobInvocation.create(
+    JobInvocation jobInvocation =
+        FlinkJobInvoker.createJobInvocation(
             "fakeId",
             "fakeRetrievalToken",
             flinkJobExecutor,
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
index e5dd6b9..c96ebb8 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableStateExecutionTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -194,8 +195,8 @@ public class PortableStateExecutionTest implements Serializable {
 
     RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
 
-    FlinkJobInvocation jobInvocation =
-        FlinkJobInvocation.create(
+    JobInvocation jobInvocation =
+        FlinkJobInvoker.createJobInvocation(
             "id",
             "none",
             flinkJobExecutor,
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
index 544a4e5..bffc903 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PortableTimersExecutionTest.java
@@ -31,6 +31,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.runners.core.construction.Environments;
 import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
+import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -181,8 +182,8 @@ public class PortableTimersExecutionTest implements Serializable {
 
     RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(pipeline);
 
-    FlinkJobInvocation jobInvocation =
-        FlinkJobInvocation.create(
+    JobInvocation jobInvocation =
+        FlinkJobInvoker.createJobInvocation(
             "id",
             "none",
             flinkJobExecutor,
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
index 831edc9..292e6e8 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvocation.java
@@ -17,31 +17,165 @@
  */
 package org.apache.beam.runners.fnexecution.jobsubmission;
 
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables.getRootCause;
+import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Throwables.getStackTraceAsString;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.function.Consumer;
+import javax.annotation.Nullable;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobMessage;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobState;
 import org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.FutureCallback;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.Futures;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListenableFuture;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** Internal representation of a Job which has been invoked (prepared and run) by a client.
*/
-public interface JobInvocation {
+public class JobInvocation {
+
+  private static final Logger LOG = LoggerFactory.getLogger(JobInvocation.class);
+
+  private final RunnerApi.Pipeline pipeline;
+  private final PortablePipelineRunner pipelineRunner;
+  private final String id;
+  private final ListeningExecutorService executorService;
+  private List<Consumer<Enum>> stateObservers;
+  private List<Consumer<JobMessage>> messageObservers;
+  private JobState.Enum jobState;
+  @Nullable private ListenableFuture<PipelineResult> invocationFuture;
+
+  public JobInvocation(
+      String id,
+      ListeningExecutorService executorService,
+      Pipeline pipeline,
+      PortablePipelineRunner pipelineRunner) {
+    this.id = id;
+    this.executorService = executorService;
+    this.pipeline = pipeline;
+    this.pipelineRunner = pipelineRunner;
+    this.stateObservers = new ArrayList<>();
+    this.messageObservers = new ArrayList<>();
+    this.invocationFuture = null;
+    this.jobState = JobState.Enum.STOPPED;
+  }
+
+  private PipelineResult runPipeline() throws Exception {
+    return pipelineRunner.run(pipeline);
+  }
 
   /** Start the job. */
-  void start();
+  public synchronized void start() {
+    LOG.info("Starting job invocation {}", getId());
+    if (getState() != JobState.Enum.STOPPED) {
+      throw new IllegalStateException(String.format("Job %s already running.", getId()));
+    }
+    setState(JobState.Enum.STARTING);
+    invocationFuture = executorService.submit(this::runPipeline);
+    // TODO: Defer transitioning until the pipeline is up and running.
+    setState(JobState.Enum.RUNNING);
+    Futures.addCallback(
+        invocationFuture,
+        new FutureCallback<PipelineResult>() {
+          @Override
+          public void onSuccess(@Nullable PipelineResult pipelineResult) {
+            if (pipelineResult != null) {
+              checkArgument(
+                  pipelineResult.getState() == PipelineResult.State.DONE,
+                  "Success on non-Done state: " + pipelineResult.getState());
+              setState(JobState.Enum.DONE);
+            } else {
+              setState(JobState.Enum.UNSPECIFIED);
+            }
+          }
+
+          @Override
+          public void onFailure(Throwable throwable) {
+            String message = String.format("Error during job invocation %s.", getId());
+            LOG.error(message, throwable);
+            sendMessage(
+                JobMessage.newBuilder()
+                    .setMessageText(getStackTraceAsString(throwable))
+                    .setImportance(JobMessage.MessageImportance.JOB_MESSAGE_DEBUG)
+                    .build());
+            sendMessage(
+                JobMessage.newBuilder()
+                    .setMessageText(getRootCause(throwable).toString())
+                    .setImportance(JobMessage.MessageImportance.JOB_MESSAGE_ERROR)
+                    .build());
+            setState(JobState.Enum.FAILED);
+          }
+        },
+        executorService);
+  }
 
   /** @return Unique identifier for the job invocation. */
-  String getId();
+  public String getId() {
+    return id;
+  }
 
   /** Cancel the job. */
-  void cancel();
+  public synchronized void cancel() {
+    LOG.info("Canceling job invocation {}", getId());
+    if (this.invocationFuture != null) {
+      this.invocationFuture.cancel(true /* mayInterruptIfRunning */);
+      Futures.addCallback(
+          invocationFuture,
+          new FutureCallback<PipelineResult>() {
+            @Override
+            public void onSuccess(@Nullable PipelineResult pipelineResult) {
+              if (pipelineResult != null) {
+                try {
+                  pipelineResult.cancel();
+                } catch (IOException exn) {
+                  throw new RuntimeException(exn);
+                }
+              }
+            }
+
+            @Override
+            public void onFailure(Throwable throwable) {}
+          },
+          executorService);
+    }
+  }
 
   /** Retrieve the job's current state. */
-  JobState.Enum getState();
+  public JobState.Enum getState() {
+    return this.jobState;
+  }
 
   /** Listen for job state changes with a {@link Consumer}. */
-  void addStateListener(Consumer<Enum> stateStreamObserver);
+  public synchronized void addStateListener(Consumer<JobState.Enum> stateStreamObserver)
{
+    stateStreamObserver.accept(getState());
+    stateObservers.add(stateStreamObserver);
+  }
 
   /** Listen for job messages with a {@link Consumer}. */
-  void addMessageListener(Consumer<JobMessage> messageStreamObserver);
+  public synchronized void addMessageListener(Consumer<JobMessage> messageStreamObserver)
{
+    messageObservers.add(messageStreamObserver);
+  }
+
+  private synchronized void setState(JobState.Enum state) {
+    this.jobState = state;
+    for (Consumer<JobState.Enum> observer : stateObservers) {
+      observer.accept(state);
+    }
+  }
+
+  private synchronized void sendMessage(JobMessage message) {
+    for (Consumer<JobMessage> observer : messageObservers) {
+      observer.accept(message);
+    }
+  }
 
   static Boolean isTerminated(Enum state) {
     switch (state) {
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java
index cfd18a1..1ed74ad 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java
@@ -18,13 +18,40 @@
 package org.apache.beam.runners.fnexecution.jobsubmission;
 
 import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/** Factory to create {@link JobInvocation} instances. */
+public abstract class JobInvoker {
+
+  private final ListeningExecutorService executorService;
 
-/** Factory to create a {@link JobInvocation} instances. */
-public interface JobInvoker {
   /** Start running a job, abstracting its state as a {@link JobInvocation} instance. */
-  JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken)
+  protected abstract JobInvocation invokeWithExecutor(
+      RunnerApi.Pipeline pipeline,
+      Struct options,
+      @Nullable String retrievalToken,
+      ListeningExecutorService executorService)
       throws IOException;
+
+  JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken)
+      throws IOException {
+    return invokeWithExecutor(pipeline, options, retrievalToken, this.executorService);
+  }
+
+  private ListeningExecutorService createExecutorService(String name) {
+    ThreadFactory threadFactory =
+        new ThreadFactoryBuilder().setNameFormat(name).setDaemon(true).build();
+    return MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(threadFactory));
+  }
+
+  protected JobInvoker(String name) {
+    this.executorService = createExecutorService(name);
+  }
 }
diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java
similarity index 67%
copy from runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java
copy to runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java
index cfd18a1..d6cf083 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/JobInvoker.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/jobsubmission/PortablePipelineRunner.java
@@ -17,14 +17,10 @@
  */
 package org.apache.beam.runners.fnexecution.jobsubmission;
 
-import java.io.IOException;
-import javax.annotation.Nullable;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
+import org.apache.beam.sdk.PipelineResult;
 
-/** Factory to create a {@link JobInvocation} instances. */
-public interface JobInvoker {
-  /** Start running a job, abstracting its state as a {@link JobInvocation} instance. */
-  JobInvocation invoke(RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken)
-      throws IOException;
+/** Runs a portable Beam pipeline on some execution engine. */
+public interface PortablePipelineRunner {
+  PipelineResult run(RunnerApi.Pipeline pipeline) throws Exception;
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java
deleted file mode 100644
index d70e10a..0000000
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobInvocation.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.samza;
-
-import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.CANCELLED;
-import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.DONE;
-import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.FAILED;
-import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.RUNNING;
-import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STARTING;
-import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.STOPPED;
-import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UNRECOGNIZED;
-import static org.apache.beam.model.jobmanagement.v1.JobApi.JobState.Enum.UPDATED;
-
-import java.util.function.Consumer;
-import org.apache.beam.model.jobmanagement.v1.JobApi;
-import org.apache.beam.model.pipeline.v1.RunnerApi;
-import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
-import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
-import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer;
-import org.apache.beam.sdk.fn.IdGenerator;
-import org.apache.beam.sdk.fn.IdGenerators;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/** Invocation of a Samza job via {@link SamzaRunner}. */
-public class SamzaJobInvocation implements JobInvocation {
-  private static final Logger LOG = LoggerFactory.getLogger(SamzaJobInvocation.class);
-  private static final IdGenerator idGenerator = IdGenerators.incrementingLongs();
-
-  private final SamzaPipelineOptions options;
-  private final RunnerApi.Pipeline originalPipeline;
-  private volatile SamzaPipelineResult pipelineResult;
-  private final String id;
-
-  public SamzaJobInvocation(RunnerApi.Pipeline pipeline, SamzaPipelineOptions options, String
id) {
-    this.originalPipeline = pipeline;
-    this.options = options;
-    this.id = id;
-  }
-
-  private SamzaPipelineResult invokeSamzaJob() {
-    // Fused pipeline proto.
-    final RunnerApi.Pipeline fusedPipeline =
-        GreedyPipelineFuser.fuse(originalPipeline).toPipeline();
-    LOG.info("Portable pipeline to run:");
-    LOG.info(PortablePipelineDotRenderer.toDotString(fusedPipeline));
-    // the pipeline option coming from sdk will set the sdk specific runner which will break
-    // serialization
-    // so we need to reset the runner here to a valid Java runner
-    options.setRunner(SamzaRunner.class);
-    try {
-      final SamzaRunner runner = new SamzaRunner(options);
-      return runner.runPortablePipeline(fusedPipeline);
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to invoke samza job", e);
-    }
-  }
-
-  @Override
-  public void start() {
-    LOG.info("Starting job invocation {}", getId());
-    pipelineResult = invokeSamzaJob();
-  }
-
-  @Override
-  public String getId() {
-    return id;
-  }
-
-  @Override
-  public void cancel() {
-    try {
-      if (pipelineResult != null) {
-        pipelineResult.cancel();
-      }
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to cancel job.", e);
-    }
-  }
-
-  @Override
-  public JobApi.JobState.Enum getState() {
-    if (pipelineResult == null) {
-      return STARTING;
-    }
-    switch (pipelineResult.getState()) {
-      case RUNNING:
-        return RUNNING;
-      case FAILED:
-        return FAILED;
-      case DONE:
-        return DONE;
-      case STOPPED:
-        return STOPPED;
-      case UPDATED:
-        return UPDATED;
-      case CANCELLED:
-        return CANCELLED;
-      default:
-        return UNRECOGNIZED;
-    }
-  }
-
-  @Override
-  public void addStateListener(Consumer<JobApi.JobState.Enum> stateStreamObserver)
{
-    LOG.info("state listener not yet implemented. Directly use getState() instead");
-  }
-
-  @Override
-  public synchronized void addMessageListener(Consumer<JobApi.JobMessage> messageStreamObserver)
{
-    LOG.info("message listener not yet implemented.");
-  }
-}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
index 834f066..c582d17 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
@@ -32,6 +32,7 @@ import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
 import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
 import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
 import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
+import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
 import org.kohsuke.args4j.CmdLineException;
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
@@ -39,6 +40,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Driver program that starts a job server. */
+// TODO extend JobServerDriver
 public class SamzaJobServerDriver {
   private static final Logger LOG = LoggerFactory.getLogger(SamzaJobServerDriver.class);
 
@@ -78,10 +80,13 @@ public class SamzaJobServerDriver {
 
   private static InMemoryJobService createJobService(int controlPort) throws IOException
{
     JobInvoker jobInvoker =
-        new JobInvoker() {
+        new JobInvoker("samza-job-invoker") {
           @Override
-          public JobInvocation invoke(
-              RunnerApi.Pipeline pipeline, Struct options, @Nullable String retrievalToken)
+          protected JobInvocation invokeWithExecutor(
+              RunnerApi.Pipeline pipeline,
+              Struct options,
+              @Nullable String retrievalToken,
+              ListeningExecutorService executorService)
               throws IOException {
             SamzaPipelineOptions samzaPipelineOptions =
                 PipelineOptionsTranslation.fromProto(options).as(SamzaPipelineOptions.class);
@@ -96,7 +101,8 @@ public class SamzaJobServerDriver {
             String invocationId =
                 String.format(
                     "%s_%s", samzaPipelineOptions.getJobName(), UUID.randomUUID().toString());
-            return new SamzaJobInvocation(pipeline, samzaPipelineOptions, invocationId);
+            SamzaPipelineRunner pipelineRunner = new SamzaPipelineRunner(samzaPipelineOptions);
+            return new JobInvocation(invocationId, executorService, pipeline, pipelineRunner);
           }
         };
     return InMemoryJobService.create(
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
index d49dece..65204c4 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineOptions.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.samza.config.ConfigFactory;
 import org.apache.samza.config.factories.PropertiesConfigFactory;
 
-/** Options which can be used to configure a Samza PipelineRunner. */
+/** Options which can be used to configure a Samza PortablePipelineRunner. */
 public interface SamzaPipelineOptions extends PipelineOptions {
 
   @Description(
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
new file mode 100644
index 0000000..1a94e1d
--- /dev/null
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineRunner.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.samza;
+
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Pipeline;
+import org.apache.beam.runners.core.construction.graph.GreedyPipelineFuser;
+import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
+import org.apache.beam.runners.samza.util.PortablePipelineDotRenderer;
+import org.apache.beam.sdk.PipelineResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Runs a Samza job via {@link SamzaRunner}. */
+public class SamzaPipelineRunner implements PortablePipelineRunner {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SamzaPipelineRunner.class);
+
+  private final SamzaPipelineOptions options;
+
+  @Override
+  public PipelineResult run(final Pipeline pipeline) {
+    // Fused pipeline proto.
+    final RunnerApi.Pipeline fusedPipeline = GreedyPipelineFuser.fuse(pipeline).toPipeline();
+    LOG.info("Portable pipeline to run:");
+    LOG.info(PortablePipelineDotRenderer.toDotString(fusedPipeline));
+    // the pipeline option coming from sdk will set the sdk specific runner which will break
+    // serialization
+    // so we need to reset the runner here to a valid Java runner
+    options.setRunner(SamzaRunner.class);
+    try {
+      final SamzaRunner runner = new SamzaRunner(options);
+      return runner.runPortablePipeline(fusedPipeline);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to invoke samza job", e);
+    }
+  }
+
+  public SamzaPipelineRunner(SamzaPipelineOptions options) {
+    this.options = options;
+  }
+}


Mime
View raw message