beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xi...@apache.org
Subject [beam] branch master updated: [BEAM-7311] merge internal commits to beam open source trunk to prepare for the security patch (#8582)
Date Wed, 15 May 2019 16:29:06 GMT
This is an automated email from the ASF dual-hosted git repository.

xinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ad7f12  [BEAM-7311] merge internal commits to beam open source trunk to prepare
for the security patch (#8582)
0ad7f12 is described below

commit 0ad7f12b5952f9776cfa8388b7fce433b9dca19f
Author: Hai Lu <halu@linkedin.com>
AuthorDate: Wed May 15 09:28:51 2019 -0700

    [BEAM-7311] merge internal commits to beam open source trunk to prepare for the security
patch (#8582)
---
 .../beam/runners/samza/SamzaJobServerDriver.java   | 76 ++++++++--------------
 .../samza/SamzaPipelineLifeCycleListener.java      |  2 +-
 .../beam/runners/samza/SamzaPipelineResult.java    | 16 ++++-
 ...ener.java => SamzaPortablePipelineOptions.java} | 32 ++++-----
 .../org/apache/beam/runners/samza/SamzaRunner.java | 11 ++--
 5 files changed, 61 insertions(+), 76 deletions(-)

diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
index 9effd57..7f0aabc 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaJobServerDriver.java
@@ -18,7 +18,6 @@
 package org.apache.beam.runners.samza;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -32,11 +31,9 @@ import org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService;
 import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
 import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
 import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.vendor.grpc.v1p13p1.com.google.protobuf.Struct;
 import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
-import org.kohsuke.args4j.CmdLineException;
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,41 +42,33 @@ import org.slf4j.LoggerFactory;
 public class SamzaJobServerDriver {
   private static final Logger LOG = LoggerFactory.getLogger(SamzaJobServerDriver.class);
 
-  private final ServerConfiguration config;
+  private final SamzaPortablePipelineOptions pipelineOptions;
 
-  /** Configuration for the jobServer. */
-  private static class ServerConfiguration {
-    @Option(name = "--job-port", usage = "The job service port. (Default: 11440)")
-    private int jobPort = 11440;
-
-    @Option(name = "--control-port", usage = "The FnControl port. (Default: 11441)")
-    private int controlPort = 11441;
-  }
-
-  private SamzaJobServerDriver(ServerConfiguration config) {
-    this.config = config;
+  private SamzaJobServerDriver(SamzaPortablePipelineOptions pipelineOptions) {
+    this.pipelineOptions = pipelineOptions;
   }
 
   public static void main(String[] args) throws Exception {
-    final ServerConfiguration configuration = new ServerConfiguration();
-    final CmdLineParser parser = new CmdLineParser(configuration);
-    try {
-      parser.parseArgument(args);
-      fromConfig(configuration).run();
-    } catch (CmdLineException e) {
-      LOG.error("Unable to parse command line arguments {}", Arrays.asList(args), e);
-      throw new IllegalArgumentException("Unable to parse command line arguments.", e);
-    } catch (Exception e) {
-      LOG.error("Hit exception with SamzaJobServer. Exiting...", e);
-      throw e;
-    }
+    SamzaPortablePipelineOptions pipelineOptions =
+        PipelineOptionsFactory.fromArgs(args).as(SamzaPortablePipelineOptions.class);
+    fromOptions(pipelineOptions).run();
   }
 
-  public static SamzaJobServerDriver fromConfig(ServerConfiguration config) {
-    return new SamzaJobServerDriver(config);
+  public static SamzaJobServerDriver fromOptions(SamzaPortablePipelineOptions pipelineOptions)
{
+    Map<String, String> overrideConfig =
+        pipelineOptions.getConfigOverride() != null
+            ? pipelineOptions.getConfigOverride()
+            : new HashMap<>();
+    overrideConfig.put(SamzaRunnerOverrideConfigs.IS_PORTABLE_MODE, String.valueOf(true));
+    overrideConfig.put(
+        SamzaRunnerOverrideConfigs.FN_CONTROL_PORT,
+        String.valueOf(pipelineOptions.getControlPort()));
+    pipelineOptions.setConfigOverride(overrideConfig);
+    return new SamzaJobServerDriver(pipelineOptions);
   }
 
-  private static InMemoryJobService createJobService(int controlPort) throws IOException
{
+  private static InMemoryJobService createJobService(SamzaPortablePipelineOptions pipelineOptions)
+      throws IOException {
     JobInvoker jobInvoker =
         new JobInvoker("samza-job-invoker") {
           @Override
@@ -89,26 +78,15 @@ public class SamzaJobServerDriver {
               @Nullable String retrievalToken,
               ListeningExecutorService executorService)
               throws IOException {
-            SamzaPipelineOptions samzaPipelineOptions =
-                PipelineOptionsTranslation.fromProto(options).as(SamzaPipelineOptions.class);
-            Map<String, String> overrideConfig =
-                samzaPipelineOptions.getConfigOverride() != null
-                    ? samzaPipelineOptions.getConfigOverride()
-                    : new HashMap<>();
-            overrideConfig.put(SamzaRunnerOverrideConfigs.IS_PORTABLE_MODE, String.valueOf(true));
-            overrideConfig.put(
-                SamzaRunnerOverrideConfigs.FN_CONTROL_PORT, String.valueOf(controlPort));
-            samzaPipelineOptions.setConfigOverride(overrideConfig);
             String invocationId =
-                String.format(
-                    "%s_%s", samzaPipelineOptions.getJobName(), UUID.randomUUID().toString());
-            SamzaPipelineRunner pipelineRunner = new SamzaPipelineRunner(samzaPipelineOptions);
+                String.format("%s_%s", pipelineOptions.getJobName(), UUID.randomUUID().toString());
+            SamzaPipelineRunner pipelineRunner = new SamzaPipelineRunner(pipelineOptions);
             JobInfo jobInfo =
                 JobInfo.create(
                     invocationId,
-                    samzaPipelineOptions.getJobName(),
+                    pipelineOptions.getJobName(),
                     retrievalToken,
-                    PipelineOptionsTranslation.toProto(samzaPipelineOptions));
+                    PipelineOptionsTranslation.toProto(pipelineOptions));
             return new JobInvocation(jobInfo, executorService, pipeline, pipelineRunner);
           }
         };
@@ -126,11 +104,11 @@ public class SamzaJobServerDriver {
         jobInvoker);
   }
 
-  private void run() throws Exception {
-    final InMemoryJobService service = createJobService(config.controlPort);
+  public void run() throws Exception {
+    final InMemoryJobService service = createJobService(pipelineOptions);
     final GrpcFnServer<InMemoryJobService> jobServiceGrpcFnServer =
         GrpcFnServer.allocatePortAndCreateFor(
-            service, ServerFactory.createWithPortSupplier(() -> config.jobPort));
+            service, ServerFactory.createWithPortSupplier(pipelineOptions::getJobPort));
     LOG.info("JobServer started on {}", jobServiceGrpcFnServer.getApiServiceDescriptor().getUrl());
     try {
       jobServiceGrpcFnServer.getServer().awaitTermination();
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineLifeCycleListener.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineLifeCycleListener.java
index 9585d77..47f36a2 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineLifeCycleListener.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineLifeCycleListener.java
@@ -23,7 +23,7 @@ import org.apache.samza.context.ExternalContext;
 /** Life cycle listener for a Samza pipeline during runtime. */
 public interface SamzaPipelineLifeCycleListener {
   /** Callback when the pipeline options is created. */
-  void onInit(Config config);
+  void onInit(Config config, SamzaPipelineOptions options);
 
   /** Callback when the pipeline is started. */
   ExternalContext onStart();
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
index 063f0b4..e6d27fa 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineResult.java
@@ -25,6 +25,8 @@ import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.metrics.MetricResults;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.samza.application.StreamApplication;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.TaskConfig;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.joda.time.Duration;
@@ -34,21 +36,28 @@ import org.slf4j.LoggerFactory;
 /** The result from executing a Samza Pipeline. */
 public class SamzaPipelineResult implements PipelineResult {
   private static final Logger LOG = LoggerFactory.getLogger(SamzaPipelineResult.class);
+  private static final long DEFAULT_SHUTDOWN_MS = 5000L;
+  // allow some buffer on top of samza's own shutdown timeout
+  private static final long SHUTDOWN_TIMEOUT_BUFFER = 5000L;
 
   private final SamzaExecutionContext executionContext;
   private final ApplicationRunner runner;
   private final StreamApplication app;
   private final SamzaPipelineLifeCycleListener listener;
+  private final long shutdownTiemoutMs;
 
   public SamzaPipelineResult(
       StreamApplication app,
       ApplicationRunner runner,
       SamzaExecutionContext executionContext,
-      SamzaPipelineLifeCycleListener listener) {
+      SamzaPipelineLifeCycleListener listener,
+      Config config) {
     this.executionContext = executionContext;
     this.runner = runner;
     this.app = app;
     this.listener = listener;
+    this.shutdownTiemoutMs =
+        config.getLong(TaskConfig.SHUTDOWN_MS(), DEFAULT_SHUTDOWN_MS) + SHUTDOWN_TIMEOUT_BUFFER;
   }
 
   @Override
@@ -58,8 +67,10 @@ public class SamzaPipelineResult implements PipelineResult {
 
   @Override
   public State cancel() {
+    LOG.info("Start to cancel samza pipeline...");
     runner.kill();
-    return waitUntilFinish();
+    LOG.info("Start awaiting finish for {} ms.", shutdownTiemoutMs);
+    return waitUntilFinish(Duration.millis(shutdownTiemoutMs));
   }
 
   @Override
@@ -84,6 +95,7 @@ public class SamzaPipelineResult implements PipelineResult {
       throw stateInfo.error;
     }
 
+    LOG.info("Pipeline finished. Final state: {}", stateInfo.state);
     return stateInfo.state;
   }
 
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineLifeCycleListener.java
b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java
similarity index 53%
copy from runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineLifeCycleListener.java
copy to runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java
index 9585d77..661c1a5 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPipelineLifeCycleListener.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaPortablePipelineOptions.java
@@ -17,28 +17,20 @@
  */
 package org.apache.beam.runners.samza;
 
-import org.apache.samza.config.Config;
-import org.apache.samza.context.ExternalContext;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
 
-/** Life cycle listener for a Samza pipeline during runtime. */
-public interface SamzaPipelineLifeCycleListener {
-  /** Callback when the pipeline options is created. */
-  void onInit(Config config);
+/** Samza pipeline option that contains portability specific logic. For internal usage only.
*/
+public interface SamzaPortablePipelineOptions extends SamzaPipelineOptions {
+  @Description("The job service port. (Default: 11440) ")
+  @Default.Integer(11440)
+  int getJobPort();
 
-  /** Callback when the pipeline is started. */
-  ExternalContext onStart();
+  void setJobPort(int port);
 
-  /**
-   * Callback after the pipeline is submmitted. This will be invoked only for Samza jobs
submitted
-   * to a cluster.
-   */
-  void onSubmit();
+  @Description("The FnControl port. (Default: 11441) ")
+  @Default.Integer(11441)
+  int getControlPort();
 
-  /** Callback after the pipeline is finished. */
-  void onFinish();
-
-  /** A registrar for {@link SamzaPipelineLifeCycleListener}. */
-  interface Registrar {
-    SamzaPipelineLifeCycleListener getLifeCycleListener();
-  }
+  void setControlPort(int port);
 }
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
index 3a9e442..0827b8d 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/SamzaRunner.java
@@ -78,13 +78,16 @@ public class SamzaRunner extends PipelineRunner<SamzaPipelineResult>
{
     options.setConfigOverride(config);
 
     if (listener != null) {
-      listener.onInit(config);
+      listener.onInit(config, options);
     }
 
     final SamzaExecutionContext executionContext = new SamzaExecutionContext(options);
+    final Map<String, MetricsReporterFactory> reporterFactories = getMetricsReporters();
     final StreamApplication app =
         appDescriptor -> {
-          appDescriptor.withApplicationContainerContextFactory(executionContext.new Factory());
+          appDescriptor
+              .withApplicationContainerContextFactory(executionContext.new Factory())
+              .withMetricsReporterFactories(reporterFactories);
           SamzaPortablePipelineTranslator.translate(
               pipeline, new PortableTranslationContext(appDescriptor, options));
         };
@@ -113,7 +116,7 @@ public class SamzaRunner extends PipelineRunner<SamzaPipelineResult>
{
     options.setConfigOverride(config);
 
     if (listener != null) {
-      listener.onInit(config);
+      listener.onInit(config, options);
     }
 
     final SamzaExecutionContext executionContext = new SamzaExecutionContext(options);
@@ -152,7 +155,7 @@ public class SamzaRunner extends PipelineRunner<SamzaPipelineResult>
{
 
     final ApplicationRunner runner = ApplicationRunners.getApplicationRunner(app, config);
     final SamzaPipelineResult result =
-        new SamzaPipelineResult(app, runner, executionContext, listener);
+        new SamzaPipelineResult(app, runner, executionContext, listener, config);
 
     ExternalContext externalContext = null;
     if (listener != null) {


Mime
View raw message