beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
Subject [beam] branch release-2.9.0 updated: [BEAM-6111] Fix flaky PortableTimersExecutionTest
Date Tue, 04 Dec 2018 13:34:34 GMT
This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-2.9.0
in repository

The following commit(s) were added to refs/heads/release-2.9.0 by this push:
     new 3b6ce57  [BEAM-6111] Fix flaky PortableTimersExecutionTest
3b6ce57 is described below

commit 3b6ce57116b11ffc37cc42409e57d5723336eedd
Author: Maximilian Michels <>
AuthorDate: Fri Nov 30 16:24:10 2018 +0100

    [BEAM-6111] Fix flaky PortableTimersExecutionTest
    This is caused by executing the state completion request in GrpcStateService
    asynchronously with the default threadpool. I think this wasn't intended because
    whenCompleteAsync was used instead of whenComplete. The default thread pool in
    the test is Flink's thread pool which doesn't take care to log exceptions.
    With the current implementation the StateRequest will always be a completed
    CompleteableFuture anyways, so there is no need to schedule asynchronously.
    The following exception was thrown here:
    java.lang.IllegalStateException: sendHeaders has already been called
 .../org/apache/beam/runners/fnexecution/state/     | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/
index eee0305..88738f3 100644
--- a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/
+++ b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/state/
@@ -128,7 +128,7 @@ public class GrpcStateService extends BeamFnStateGrpc.BeamFnStateImplBase
           requestHandlers.getOrDefault(request.getInstructionReference(), this::handlerNotFound);
       try {
         CompletionStage<StateResponse.Builder> result = handler.handle(request);
-        result.whenCompleteAsync(
+        result.whenComplete(
             (StateResponse.Builder responseBuilder, Throwable t) ->
                 // note that this is threadsafe if and only if outboundObserver is threadsafe.

View raw message