flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zentol <...@git.apache.org>
Subject [GitHub] flink pull request #5038: [FLINK-7880][FLINK-7975][FLINK-7974][QS] QS test i...
Date Tue, 21 Nov 2017 15:04:16 GMT
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5038#discussion_r152300724
  
    --- Diff: flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
---
    @@ -166,28 +167,57 @@ public String getClientName() {
     	 * Shuts down the client and closes all connections.
     	 *
     	 * <p>After a call to this method, all returned futures will be failed.
    +	 *
    +	 * @return A {@link CompletableFuture} that will be completed when the shutdown process
is done.
     	 */
    -	public void shutdown() {
    -		if (shutDown.compareAndSet(false, true)) {
    +	public CompletableFuture<?> shutdown() {
    +		final CompletableFuture<?> newShutdownFuture = new CompletableFuture<>();
    +		if (clientShutdownFuture.compareAndSet(null, newShutdownFuture)) {
    +
    +			final List<CompletableFuture<?>> connectionFutures = new ArrayList<>();
    +
     			for (Map.Entry<InetSocketAddress, EstablishedConnection> conn : establishedConnections.entrySet())
{
     				if (establishedConnections.remove(conn.getKey(), conn.getValue())) {
    -					conn.getValue().close();
    +					connectionFutures.add(conn.getValue().close());
     				}
     			}
     
     			for (Map.Entry<InetSocketAddress, PendingConnection> conn : pendingConnections.entrySet())
{
     				if (pendingConnections.remove(conn.getKey()) != null) {
    -					conn.getValue().close();
    +					connectionFutures.add(conn.getValue().close());
     				}
     			}
     
    -			if (bootstrap != null) {
    -				EventLoopGroup group = bootstrap.group();
    -				if (group != null) {
    -					group.shutdownGracefully(0L, 10L, TimeUnit.SECONDS);
    +			CompletableFuture.allOf(
    +					connectionFutures.toArray(new CompletableFuture<?>[connectionFutures.size()])
    +			).whenComplete((result, throwable) -> {
    +				if (throwable != null) {
    +					newShutdownFuture.completeExceptionally(throwable);
    +				} else if (bootstrap != null) {
    +					EventLoopGroup group = bootstrap.group();
    +					if (group != null && !group.isShutdown()) {
    +						group.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS)
    +								.addListener(finished -> {
    +									if (finished.isSuccess()) {
    +										newShutdownFuture.complete(null);
    +									} else {
    +										newShutdownFuture.completeExceptionally(finished.cause());
    +									}
    +								});
    +					} else {
    +						newShutdownFuture.complete(null);
    +					}
    +				} else {
    +					newShutdownFuture.complete(null);
     				}
    +			});
    +
    +			// check again if in the meantime another thread completed the future
    +			if (clientShutdownFuture.compareAndSet(null, newShutdownFuture)) {
    --- End diff --
    
    where in close() do we set the shutdown future to null? I only see that being done in
sendRequest. (which seems fishy)


---

Mime
View raw message