beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-2899) Universal Local Runner
Date Thu, 30 Nov 2017 02:55:02 GMT

    [ https://issues.apache.org/jira/browse/BEAM-2899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16272079#comment-16272079
] 

ASF GitHub Bot commented on BEAM-2899:
--------------------------------------

tgroh closed pull request #4105: [BEAM-2899]  Fork FnDataService from runners-core
URL: https://github.com/apache/beam/pull/4105
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/runners/core-java/pom.xml b/runners/core-java/pom.xml
index 48111348ff5..86acf826288 100644
--- a/runners/core-java/pom.xml
+++ b/runners/core-java/pom.xml
@@ -67,11 +67,6 @@
       <artifactId>beam-sdks-java-core</artifactId>
     </dependency>
 
-    <dependency>
-      <groupId>org.apache.beam</groupId>
-      <artifactId>beam-model-fn-execution</artifactId>
-    </dependency>
-
     <dependency>
       <groupId>org.apache.beam</groupId>
       <artifactId>beam-runners-core-construction-java</artifactId>
@@ -100,26 +95,11 @@
       <artifactId>guava</artifactId>
     </dependency>
 
-    <dependency>
-      <groupId>io.grpc</groupId>
-      <artifactId>grpc-core</artifactId>
-    </dependency>
-
-    <dependency>
-      <groupId>io.grpc</groupId>
-      <artifactId>grpc-stub</artifactId>
-    </dependency>
-
     <dependency>
       <groupId>joda-time</groupId>
       <artifactId>joda-time</artifactId>
     </dependency>
 
-    <dependency>
-      <groupId>org.slf4j</groupId>
-      <artifactId>slf4j-api</artifactId>
-    </dependency>
-
     <!-- test dependencies -->
 
     <!-- Utilities such as WindowMatchers -->
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java
deleted file mode 100644
index 811444c96ce..00000000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClient.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.fn;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-import io.grpc.Status;
-import io.grpc.StatusRuntimeException;
-import io.grpc.stub.StreamObserver;
-import java.io.Closeable;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A client for the control plane of an SDK harness, which can issue requests to it over
the Fn API.
- *
- * <p>This class presents a low-level Java API de-inverting the Fn API's gRPC layer.
- *
- * <p>The Fn API is inverted so the runner is the server and the SDK harness is the
client, for
- * firewalling reasons (the runner may execute in a more privileged environment forbidding
outbound
- * connections).
- *
- * <p>This low-level client is responsible only for correlating requests with responses.
- *
- * @deprecated Runners should depend on the beam-runners-java-fn-execution module for this
- *     functionality.
- */
-@Deprecated
-class FnApiControlClient implements Closeable {
-  private static final Logger LOG = LoggerFactory.getLogger(FnApiControlClient.class);
-
-  // All writes to this StreamObserver need to be synchronized.
-  private final StreamObserver<BeamFnApi.InstructionRequest> requestReceiver;
-  private final ResponseStreamObserver responseObserver = new ResponseStreamObserver();
-  private final Map<String, SettableFuture<BeamFnApi.InstructionResponse>> outstandingRequests;
-  private volatile boolean isClosed;
-
-  private FnApiControlClient(StreamObserver<BeamFnApi.InstructionRequest> requestReceiver)
{
-    this.requestReceiver = requestReceiver;
-    this.outstandingRequests = new ConcurrentHashMap<>();
-  }
-
-  /**
-   * Returns a {@link FnApiControlClient} which will submit its requests to the provided
-   * observer.
-   *
-   * <p>It is the responsibility of the caller to register this object as an observer
of incoming
-   * responses (this will generally be done as part of fulfilling the contract of a gRPC
service).
-   */
-  public static FnApiControlClient forRequestObserver(
-      StreamObserver<BeamFnApi.InstructionRequest> requestObserver) {
-    return new FnApiControlClient(requestObserver);
-  }
-
-  public synchronized ListenableFuture<BeamFnApi.InstructionResponse> handle(
-      BeamFnApi.InstructionRequest request) {
-    LOG.debug("Sending InstructionRequest {}", request);
-    SettableFuture<BeamFnApi.InstructionResponse> resultFuture = SettableFuture.create();
-    outstandingRequests.put(request.getInstructionId(), resultFuture);
-    requestReceiver.onNext(request);
-    return resultFuture;
-  }
-
-  StreamObserver<BeamFnApi.InstructionResponse> asResponseObserver() {
-    return responseObserver;
-  }
-
-  @Override
-  public void close() {
-    closeAndTerminateOutstandingRequests(new IllegalStateException("Runner closed connection"));
-  }
-
-  /** Closes this client and terminates any outstanding requests exceptionally. */
-  private synchronized void closeAndTerminateOutstandingRequests(Throwable cause) {
-    if (isClosed) {
-      return;
-    }
-
-    // Make a copy of the map to make the view of the outstanding requests consistent.
-    Map<String, SettableFuture<BeamFnApi.InstructionResponse>> outstandingRequestsCopy
=
-        new ConcurrentHashMap<>(outstandingRequests);
-    outstandingRequests.clear();
-    isClosed = true;
-
-    if (outstandingRequestsCopy.isEmpty()) {
-      requestReceiver.onCompleted();
-      return;
-    }
-    requestReceiver.onError(
-        new StatusRuntimeException(Status.CANCELLED.withDescription(cause.getMessage())));
-
-    LOG.error(
-        "{} closed, clearing outstanding requests {}",
-        FnApiControlClient.class.getSimpleName(),
-        outstandingRequestsCopy);
-    for (SettableFuture<BeamFnApi.InstructionResponse> outstandingRequest :
-        outstandingRequestsCopy.values()) {
-      outstandingRequest.setException(cause);
-    }
-  }
-
-  /**
-   * A private view of this class as a {@link StreamObserver} for connecting as a gRPC listener.
-   */
-  private class ResponseStreamObserver implements StreamObserver<BeamFnApi.InstructionResponse>
{
-    /**
-     * Processes an incoming {@link BeamFnApi.InstructionResponse} by correlating it with
the
-     * corresponding {@link BeamFnApi.InstructionRequest} and completes the future that was
returned
-     * by {@link #handle}.
-     */
-    @Override
-    public void onNext(BeamFnApi.InstructionResponse response) {
-      LOG.debug("Received InstructionResponse {}", response);
-      SettableFuture<BeamFnApi.InstructionResponse> completableFuture =
-          outstandingRequests.remove(response.getInstructionId());
-      if (completableFuture != null) {
-        completableFuture.set(response);
-      }
-    }
-
-    /** */
-    @Override
-    public void onCompleted() {
-      closeAndTerminateOutstandingRequests(
-          new IllegalStateException("SDK harness closed connection"));
-    }
-
-    @Override
-    public void onError(Throwable cause) {
-      LOG.error("{} received error {}", FnApiControlClient.class.getSimpleName(), cause);
-      closeAndTerminateOutstandingRequests(cause);
-    }
-  }
-}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java
deleted file mode 100644
index 21fc4f73fd0..00000000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolService.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.fn;
-
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.BlockingQueue;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A Fn API control service which adds incoming SDK harness connections to a pool.
- *
- * @deprecated Runners should depend on the beam-runners-java-fn-execution module for this
- *     functionality.
- */
-@Deprecated
-public class FnApiControlClientPoolService extends BeamFnControlGrpc.BeamFnControlImplBase
{
-  private static final Logger LOGGER = LoggerFactory.getLogger(FnApiControlClientPoolService.class);
-
-  private final BlockingQueue<FnApiControlClient> clientPool;
-
-  private FnApiControlClientPoolService(BlockingQueue<FnApiControlClient> clientPool)
{
-    this.clientPool = clientPool;
-  }
-
-  /**
-   * Creates a new {@link FnApiControlClientPoolService} which will enqueue and vend new
SDK harness
-   * connections.
-   */
-  public static FnApiControlClientPoolService offeringClientsToPool(
-      BlockingQueue<FnApiControlClient> clientPool) {
-    return new FnApiControlClientPoolService(clientPool);
-  }
-
-  /**
-   * Called by gRPC for each incoming connection from an SDK harness, and enqueue an available
SDK
-   * harness client.
-   *
-   * <p>Note: currently does not distinguish what sort of SDK it is, so a separate
instance is
-   * required for each.
-   */
-  @Override
-  public StreamObserver<BeamFnApi.InstructionResponse> control(
-      StreamObserver<BeamFnApi.InstructionRequest> requestObserver) {
-    LOGGER.info("Beam Fn Control client connected.");
-    FnApiControlClient newClient = FnApiControlClient.forRequestObserver(requestObserver);
-    try {
-      clientPool.put(newClient);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    }
-    return newClient.asResponseObserver();
-  }
-}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnDataReceiver.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnDataReceiver.java
deleted file mode 100644
index 639d678ad03..00000000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnDataReceiver.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.fn;
-
-import java.io.Closeable;
-
-/**
- * A receiver of streamed data.
- *
- * <p>Provide a {@link FnDataReceiver} and target to a {@link FnDataService} to listen
for incoming
- * data.
- *
- * <p>Register a target with a {@link FnDataService} to gain a {@link FnDataReceiver}
to which you
- * may write outgoing data.
- *
- * @deprecated Runners should depend on the beam-runners-java-fn-execution module for this
- *     functionality.
- */
-@Deprecated
-public interface FnDataReceiver<T> extends Closeable {
-  void accept(T input) throws Exception;
-}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java
deleted file mode 100644
index 091dea14168..00000000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessClient.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.fn;
-
-import com.google.auto.value.AutoValue;
-import com.google.common.base.Function;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import java.io.IOException;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-
-/**
- * A high-level client for an SDK harness.
- *
- * <p>This provides a Java-friendly wrapper around {@link FnApiControlClient} and {@link
- * FnDataReceiver}, which handle lower-level gRPC message wrangling.
- *
- * @deprecated Runners should depend on the beam-runners-java-fn-execution module for this
- *     functionality.
- */
-@Deprecated
-public class SdkHarnessClient {
-
-  /**
-   * A supply of unique identifiers, used internally. These must be unique across all Fn
API
-   * clients.
-   */
-  public interface IdGenerator {
-    String getId();
-  }
-
-  /** A supply of unique identifiers that are simply incrementing longs. */
-  private static class CountingIdGenerator implements IdGenerator {
-    private final AtomicLong nextId = new AtomicLong(0L);
-
-    @Override
-    public String getId() {
-      return String.valueOf(nextId.incrementAndGet());
-    }
-  }
-
-  /**
-   * An active bundle for a particular {@link
-   * BeamFnApi.ProcessBundleDescriptor}.
-   */
-  @AutoValue
-  public abstract static class ActiveBundle<InputT> {
-    public abstract String getBundleId();
-
-    public abstract Future<BeamFnApi.ProcessBundleResponse> getBundleResponse();
-
-    public abstract FnDataReceiver<InputT> getInputReceiver();
-
-    public static <InputT> ActiveBundle<InputT> create(
-        String bundleId,
-        Future<BeamFnApi.ProcessBundleResponse> response,
-        FnDataReceiver<InputT> dataReceiver) {
-      return new AutoValue_SdkHarnessClient_ActiveBundle(bundleId, response, dataReceiver);
-    }
-  }
-
-  private final IdGenerator idGenerator;
-  private final FnApiControlClient fnApiControlClient;
-
-  private SdkHarnessClient(
-      FnApiControlClient fnApiControlClient,
-      IdGenerator idGenerator) {
-    this.idGenerator = idGenerator;
-    this.fnApiControlClient = fnApiControlClient;
-  }
-
-  /**
-   * Creates a client for a particular SDK harness. It is the responsibility of the caller
to ensure
-   * that these correspond to the same SDK harness, so control plane and data plane messages
can be
-   * correctly associated.
-   */
-  public static SdkHarnessClient usingFnApiClient(FnApiControlClient fnApiControlClient)
{
-    return new SdkHarnessClient(fnApiControlClient, new CountingIdGenerator());
-  }
-
-  public SdkHarnessClient withIdGenerator(IdGenerator idGenerator) {
-    return new SdkHarnessClient(fnApiControlClient, idGenerator);
-  }
-
-  /**
-   * Registers a {@link BeamFnApi.ProcessBundleDescriptor} for future
-   * processing.
-   *
-   * <p>A client may block on the result future, but may also proceed without blocking.
-   */
-  public Future<BeamFnApi.RegisterResponse> register(
-      Iterable<BeamFnApi.ProcessBundleDescriptor> processBundleDescriptors) {
-
-    // TODO: validate that all the necessary data endpoints are known
-
-    ListenableFuture<BeamFnApi.InstructionResponse> genericResponse =
-        fnApiControlClient.handle(
-            BeamFnApi.InstructionRequest.newBuilder()
-                .setInstructionId(idGenerator.getId())
-                .setRegister(
-                    BeamFnApi.RegisterRequest.newBuilder()
-                        .addAllProcessBundleDescriptor(processBundleDescriptors)
-                        .build())
-                .build());
-
-    return Futures.transform(
-        genericResponse,
-        new Function<BeamFnApi.InstructionResponse, BeamFnApi.RegisterResponse>() {
-          @Override
-          public BeamFnApi.RegisterResponse apply(BeamFnApi.InstructionResponse input) {
-            return input.getRegister();
-          }
-        });
-  }
-
-  /**
-   * Start a new bundle for the given {@link
-   * BeamFnApi.ProcessBundleDescriptor} identifier.
-   *
-   * <p>The input channels for the returned {@link ActiveBundle} are derived from the
-   * instructions in the {@link BeamFnApi.ProcessBundleDescriptor}.
-   */
-  public ActiveBundle newBundle(String processBundleDescriptorId) {
-    String bundleId = idGenerator.getId();
-
-    // TODO: acquire an input receiver from appropriate FnDataService
-    FnDataReceiver dataReceiver = new FnDataReceiver() {
-      @Override
-      public void accept(Object input) throws Exception {
-        throw new UnsupportedOperationException("Placeholder FnDataReceiver cannot accept
data.");
-      }
-
-      @Override
-      public void close() throws IOException {
-        // noop
-      }
-    };
-
-    ListenableFuture<BeamFnApi.InstructionResponse> genericResponse =
-        fnApiControlClient.handle(
-            BeamFnApi.InstructionRequest.newBuilder()
-                .setProcessBundle(
-                    BeamFnApi.ProcessBundleRequest.newBuilder()
-                        .setProcessBundleDescriptorReference(processBundleDescriptorId))
-                .build());
-
-    ListenableFuture<BeamFnApi.ProcessBundleResponse> specificResponse =
-        Futures.transform(
-            genericResponse,
-            new Function<BeamFnApi.InstructionResponse, BeamFnApi.ProcessBundleResponse>()
{
-              @Override
-              public BeamFnApi.ProcessBundleResponse apply(BeamFnApi.InstructionResponse
input) {
-                return input.getProcessBundle();
-              }
-            });
-
-    return ActiveBundle.create(bundleId, specificResponse, dataReceiver);
-  }
-}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessDoFnRunner.java
deleted file mode 100644
index d27077fdde3..00000000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/SdkHarnessDoFnRunner.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.fn;
-
-import static com.google.common.base.Preconditions.checkState;
-
-import java.util.concurrent.ExecutionException;
-import javax.annotation.Nullable;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
-import org.apache.beam.runners.core.DoFnRunner;
-import org.apache.beam.sdk.state.TimeDomain;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.UserCodeException;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.joda.time.Instant;
-
-/**
- * Processes a bundle by sending it to an SDK harness over the Fn API.
- *
- * @deprecated Runners should interact with the Control and Data plane directly, rather than
through
- *     a {@link DoFnRunner}. Consider the beam-runners-java-fn-execution artifact instead.
- */
-@Deprecated
-public class SdkHarnessDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT,
OutputT> {
-
-  private final SdkHarnessClient sdkHarnessClient;
-  private final String processBundleDescriptorId;
-
-  /** {@code null} between bundles. */
-  @Nullable private SdkHarnessClient.ActiveBundle activeBundle;
-
-  private SdkHarnessDoFnRunner(
-      SdkHarnessClient sdkHarnessClient,
-      String processBundleDescriptorId) {
-    this.sdkHarnessClient = sdkHarnessClient;
-    this.processBundleDescriptorId = processBundleDescriptorId;
-  }
-
-  /**
-   * Returns a new {@link SdkHarnessDoFnRunner} suitable for just a particular {@link
-   * ProcessBundleDescriptor} (referenced by id here).
-   *
-   * <p>The {@link FnDataReceiver} must be the correct data plane service referenced
-   * in the primitive instructions in the
-   * {@link ProcessBundleDescriptor}.
-   *
-   * <p>Also outside of this class, the appropriate receivers must be registered with
the
-   * output data plane channels of the descriptor.
-   */
-  public static <InputT, OutputT> SdkHarnessDoFnRunner<InputT, OutputT> create(
-      SdkHarnessClient sdkHarnessClient,
-      String processBundleDescriptorId) {
-    return new SdkHarnessDoFnRunner(sdkHarnessClient, processBundleDescriptorId);
-  }
-
-  @Override
-  public void startBundle() {
-    this.activeBundle =
-        sdkHarnessClient.newBundle(processBundleDescriptorId);
-  }
-
-  @Override
-  public void processElement(WindowedValue<InputT> elem) {
-    checkState(
-        activeBundle != null,
-        "%s attempted to process an element without an active bundle",
-        SdkHarnessDoFnRunner.class.getSimpleName());
-
-    try {
-      activeBundle.getInputReceiver().accept(elem);
-    } catch (Exception exc) {
-      throw new RuntimeException(exc);
-    }
-  }
-
-  @Override
-  public void onTimer(
-      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
-    throw new UnsupportedOperationException("Timers are not supported over the Fn API");
-  }
-
-  @Override
-  public void finishBundle() {
-    try {
-      activeBundle.getBundleResponse().get();
-    } catch (InterruptedException interrupted) {
-      Thread.interrupted();
-      return;
-    } catch (ExecutionException exc) {
-      throw UserCodeException.wrap(exc);
-    }
-  }
-}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/package-info.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/package-info.java
deleted file mode 100644
index bea8051c49d..00000000000
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/package-info.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Provides utilities for a Beam runner to interact with a client using the Fn API.
- */
-@DefaultAnnotation(NonNull.class)
-package org.apache.beam.runners.core.fn;
-
-import edu.umd.cs.findbugs.annotations.DefaultAnnotation;
-import edu.umd.cs.findbugs.annotations.NonNull;
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolServiceTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolServiceTest.java
deleted file mode 100644
index da02d924eef..00000000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/FnApiControlClientPoolServiceTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.fn;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/** Unit tests for {@link FnApiControlClientPoolService}. */
-@RunWith(JUnit4.class)
-public class FnApiControlClientPoolServiceTest {
-
-  // For ease of straight-line testing, we use a LinkedBlockingQueue; in practice a SynchronousQueue
-  // for matching incoming connections and server threads is likely.
-  private final BlockingQueue<FnApiControlClient> pool = new LinkedBlockingQueue<>();
-  private FnApiControlClientPoolService controlService =
-      FnApiControlClientPoolService.offeringClientsToPool(pool);
-
-  @Test
-  public void testIncomingConnection() throws Exception {
-    StreamObserver<BeamFnApi.InstructionRequest> requestObserver = mock(StreamObserver.class);
-    StreamObserver<BeamFnApi.InstructionResponse> responseObserver =
-        controlService.control(requestObserver);
-
-    FnApiControlClient client = pool.take();
-
-    // Check that the client is wired up to the request channel
-    String id = "fakeInstruction";
-    ListenableFuture<BeamFnApi.InstructionResponse> responseFuture =
-        client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
-    verify(requestObserver).onNext(any(BeamFnApi.InstructionRequest.class));
-    assertThat(responseFuture.isDone(), is(false));
-
-    // Check that the response channel really came from the client
-    responseObserver.onNext(
-        BeamFnApi.InstructionResponse.newBuilder().setInstructionId(id).build());
-    responseFuture.get();
-  }
-}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/FnApiControlClientTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/FnApiControlClientTest.java
deleted file mode 100644
index 279e974cc31..00000000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/FnApiControlClientTest.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.fn;
-
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.isA;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.verify;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import io.grpc.stub.StreamObserver;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Unit tests for {@link FnApiControlClient}. */
-@RunWith(JUnit4.class)
-public class FnApiControlClientTest {
-
-  @Rule public ExpectedException thrown = ExpectedException.none();
-
-  @Mock public StreamObserver<BeamFnApi.InstructionRequest> mockObserver;
-  private FnApiControlClient client;
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-    client = FnApiControlClient.forRequestObserver(mockObserver);
-  }
-
-  @Test
-  public void testRequestSent() {
-    String id = "instructionId";
-    client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
-
-    verify(mockObserver).onNext(any(BeamFnApi.InstructionRequest.class));
-  }
-
-  @Test
-  public void testRequestSuccess() throws Exception {
-    String id = "successfulInstruction";
-
-    Future<BeamFnApi.InstructionResponse> responseFuture =
-        client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
-    client
-        .asResponseObserver()
-        .onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId(id).build());
-
-    BeamFnApi.InstructionResponse response = responseFuture.get();
-
-    assertThat(response.getInstructionId(), equalTo(id));
-  }
-
-  @Test
-  public void testUnknownResponseIgnored() throws Exception {
-    String id = "actualInstruction";
-    String unknownId = "unknownInstruction";
-
-    ListenableFuture<BeamFnApi.InstructionResponse> responseFuture =
-        client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
-
-    client
-        .asResponseObserver()
-        .onNext(BeamFnApi.InstructionResponse.newBuilder().setInstructionId(unknownId).build());
-
-    assertThat(responseFuture.isDone(), is(false));
-    assertThat(responseFuture.isCancelled(), is(false));
-  }
-
-  @Test
-  public void testOnCompletedCancelsOutstanding() throws Exception {
-    String id = "clientHangUpInstruction";
-
-    Future<BeamFnApi.InstructionResponse> responseFuture =
-        client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
-
-    client.asResponseObserver().onCompleted();
-
-    thrown.expect(ExecutionException.class);
-    thrown.expectCause(isA(IllegalStateException.class));
-    thrown.expectMessage("closed");
-    responseFuture.get();
-  }
-
-  @Test
-  public void testOnErrorCancelsOutstanding() throws Exception {
-    String id = "errorInstruction";
-
-    Future<BeamFnApi.InstructionResponse> responseFuture =
-        client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
-
-    class FrazzleException extends Exception {}
-    client.asResponseObserver().onError(new FrazzleException());
-
-    thrown.expect(ExecutionException.class);
-    thrown.expectCause(isA(FrazzleException.class));
-    responseFuture.get();
-  }
-
-  @Test
-  public void testCloseCancelsOutstanding() throws Exception {
-    String id = "serverCloseInstruction";
-
-    Future<BeamFnApi.InstructionResponse> responseFuture =
-        client.handle(BeamFnApi.InstructionRequest.newBuilder().setInstructionId(id).build());
-
-    client.close();
-
-    thrown.expect(ExecutionException.class);
-    thrown.expectCause(isA(IllegalStateException.class));
-    thrown.expectMessage("closed");
-    responseFuture.get();
-  }
-}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/SdkHarnessClientTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/SdkHarnessClientTest.java
deleted file mode 100644
index 7783b2f2f88..00000000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/SdkHarnessClientTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.fn;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.util.concurrent.SettableFuture;
-import java.util.concurrent.Future;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Unit tests for {@link SdkHarnessClient}. */
-@RunWith(JUnit4.class)
-public class SdkHarnessClientTest {
-
-  @Mock public FnApiControlClient fnApiControlClient;
-
-  private SdkHarnessClient sdkHarnessClient;
-
-  @Before
-  public void setup() {
-    MockitoAnnotations.initMocks(this);
-    sdkHarnessClient = SdkHarnessClient.usingFnApiClient(fnApiControlClient);
-  }
-
-  @Test
-  public void testRegisterDoesNotCrash() throws Exception {
-    String descriptorId1 = "descriptor1";
-    String descriptorId2 = "descriptor2";
-
-    SettableFuture<BeamFnApi.InstructionResponse> registerResponseFuture = SettableFuture.create();
-    when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
-        .thenReturn(registerResponseFuture);
-
-    Future<BeamFnApi.RegisterResponse> responseFuture = sdkHarnessClient.register(
-        ImmutableList.of(
-            BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build(),
-            BeamFnApi.ProcessBundleDescriptor.newBuilder().setId(descriptorId2).build()));
-
-    // Correlating the RegisterRequest and RegisterResponse is owned by the underlying
-    // FnApiControlClient. The SdkHarnessClient owns just wrapping the request and unwrapping
-    // the response.
-    //
-    // Currently there are no fields so there's nothing to check. This test is formulated
-    // to match the pattern it should have if/when the response is meaningful.
-    BeamFnApi.RegisterResponse response = BeamFnApi.RegisterResponse.getDefaultInstance();
-    registerResponseFuture.set(
-        BeamFnApi.InstructionResponse.newBuilder().setRegister(response).build());
-    responseFuture.get();
-  }
-
-  @Test
-  public void testNewBundleNoDataDoesNotCrash() throws Exception {
-    String descriptorId1 = "descriptor1";
-
-    SettableFuture<BeamFnApi.InstructionResponse> processBundleResponseFuture =
-        SettableFuture.create();
-    when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
-        .thenReturn(processBundleResponseFuture);
-
-    SdkHarnessClient.ActiveBundle activeBundle = sdkHarnessClient.newBundle(descriptorId1);
-
-    // Correlating the ProcessBundleRequest and ProcessBundleReponse is owned by the underlying
-    // FnApiControlClient. The SdkHarnessClient owns just wrapping the request and unwrapping
-    // the response.
-    //
-    // Currently there are no fields so there's nothing to check. This test is formulated
-    // to match the pattern it should have if/when the response is meaningful.
-    BeamFnApi.ProcessBundleResponse response = BeamFnApi.ProcessBundleResponse.getDefaultInstance();
-    processBundleResponseFuture.set(
-        BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
-    activeBundle.getBundleResponse().get();
-  }
-}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/SdkHarnessDoFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/SdkHarnessDoFnRunnerTest.java
deleted file mode 100644
index 8f160049e05..00000000000
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/fn/SdkHarnessDoFnRunnerTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.core.fn;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.when;
-
-import com.google.common.util.concurrent.SettableFuture;
-import java.io.IOException;
-import org.apache.beam.model.fnexecution.v1.BeamFnApi;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-
-/** Unit tests for {@link SdkHarnessDoFnRunner}. */
-@RunWith(JUnit4.class)
-public class SdkHarnessDoFnRunnerTest {
-  @Mock private SdkHarnessClient mockClient;
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-  }
-
-  @Test
-  public void testStartAndFinishBundleDoesNotCrash() {
-    String processBundleDescriptorId = "testDescriptor";
-    String bundleId = "testBundle";
-    SdkHarnessDoFnRunner<Void, Void> underTest =
-        SdkHarnessDoFnRunner.<Void, Void>create(mockClient, processBundleDescriptorId);
-
-    SettableFuture<BeamFnApi.ProcessBundleResponse> processBundleResponseFuture =
-        SettableFuture.create();
-    FnDataReceiver dummyInputReceiver = new FnDataReceiver() {
-      @Override
-      public void accept(Object input) throws Exception {
-        fail("Dummy input receiver should not have received data");
-      }
-
-      @Override
-      public void close() throws IOException {
-        // noop
-      }
-    };
-    SdkHarnessClient.ActiveBundle activeBundle =
-        SdkHarnessClient.ActiveBundle.create(
-            bundleId, processBundleResponseFuture, dummyInputReceiver);
-
-    when(mockClient.newBundle(anyString())).thenReturn(activeBundle);
-    underTest.startBundle();
-    processBundleResponseFuture.set(BeamFnApi.ProcessBundleResponse.getDefaultInstance());
-    underTest.finishBundle();
-  }
-}
diff --git a/runners/java-fn-execution/build.gradle b/runners/java-fn-execution/build.gradle
index dd4eaaed47d..e948c7ce625 100644
--- a/runners/java-fn-execution/build.gradle
+++ b/runners/java-fn-execution/build.gradle
@@ -1,5 +1,4 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
+/* * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
@@ -30,18 +29,11 @@ description = "Apache Beam :: Runners :: Java Fn Execution"
  */
 evaluationDependsOn(":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-fn-execution")
 
-configurations.all {
-  // Fn Execution contains shared utilities for Runners and Harnesses which use         
  
-  // the Portability framework. Runner-side interactions must not require a
-  // dependency on any particular SDK, so this library must not introduce such an
-  // edge.
-  exclude group: "org.apache.beam", module: "beam-sdks-java-core"
-}
-
 dependencies {
   compile library.java.guava
   shadow project(path: ":beam-model-parent:beam-model-pipeline", configuration: "shadow")
   shadow project(path: ":beam-model-parent:beam-model-fn-execution", configuration: "shadow")
+  shadow project(path: ":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-core", configuration:
"shadow")
   shadow project(path: ":beam-sdks-parent:beam-sdks-java-parent:beam-sdks-java-fn-execution",
configuration: "shadow")
   shadow library.java.grpc_core
   shadow library.java.grpc_stub
diff --git a/runners/java-fn-execution/pom.xml b/runners/java-fn-execution/pom.xml
index f275d69207e..85d4da1a6fc 100644
--- a/runners/java-fn-execution/pom.xml
+++ b/runners/java-fn-execution/pom.xml
@@ -48,6 +48,11 @@
       <artifactId>beam-sdks-java-fn-execution</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.beam</groupId>
+      <artifactId>beam-sdks-java-core</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>io.grpc</groupId>
       <artifactId>grpc-core</artifactId>
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnDataService.java
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java
similarity index 93%
rename from runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnDataService.java
rename to runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java
index 2a6777e4bcd..4366cfac514 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/fn/FnDataService.java
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/FnDataService.java
@@ -15,7 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.runners.core.fn;
+
+package org.apache.beam.runners.fnexecution.data;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -27,13 +28,8 @@
  * The {@link FnDataService} is able to forward inbound elements to a consumer and is also
a
  * consumer of outbound elements. Callers can register themselves as consumers for inbound
elements
  * or can get a handle for a consumer for outbound elements.
- *
- * @deprecated Runners should depend on the beam-runners-java-fn-execution module for this
- *     functionality.
  */
-@Deprecated
 public interface FnDataService {
-
   /**
    * A logical endpoint is a pair of an instruction ID corresponding to the {@link
    * BeamFnApi.ProcessBundleRequest} and the {@link
@@ -64,7 +60,7 @@ public static LogicalEndpoint of(String instructionId, BeamFnApi.Target
target)
    *
    * <p>The provided receiver is not required to be thread safe.
    */
-  <T> ListenableFuture<Void> listen(
+  <T> ListenableFuture<Void> receive(
       LogicalEndpoint inputLocation,
       Coder<WindowedValue<T>> coder,
       FnDataReceiver<WindowedValue<T>> listener)
@@ -82,4 +78,5 @@ public static LogicalEndpoint of(String instructionId, BeamFnApi.Target
target)
    */
   <T> FnDataReceiver<WindowedValue<T>> send(
       LogicalEndpoint outputLocation, Coder<WindowedValue<T>> coder) throws Exception;
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Universal Local Runner
> ----------------------
>
>                 Key: BEAM-2899
>                 URL: https://issues.apache.org/jira/browse/BEAM-2899
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-core
>            Reporter: Henning Rohde
>            Assignee: Thomas Groh
>              Labels: portability
>
> To make the portability effort tractable, we should implement a Universal Local Runner
(ULR) in Java that runs in a single server process plus docker containers for the SDK harness
containers. It would serve multiple purposes:
>   (1) A reference implementation for other runners. Ideally, any new feature should be
implemented in the ULR first.
>   (2) A fully-featured test runner for SDKs who participate in the portability framework.
It thus complements the direct runners.
>   (3) A test runner for user code that depends on or customizes the runtime environment.
For example, a DoFn that shells out has a dependency that may be satisfied on the user's desktop
(and thus works fine on the direct runner), but perhaps not by the container harness image.
The ULR allows for an easy way to find out.
> The Java direct runner presumably has lots of pieces that can be reused.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


Mime
View raw message