Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/4645#discussion_r137269070
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java ---
@@ -159,24 +155,17 @@ public void shutdown(Time timeout) {
}
private <P extends ResponseBody> CompletableFuture<P> submitRequest(String
targetAddress, int targetPort, FullHttpRequest httpRequest, Class<P> responseClass)
{
- return CompletableFuture.supplyAsync(() -> bootstrap.connect(targetAddress, targetPort),
executor)
- .thenApply((channel) -> {
- try {
- return channel.sync();
- } catch (InterruptedException e) {
- throw new FlinkRuntimeException(e);
- }
- })
- .thenApply((ChannelFuture::channel))
- .thenCompose(channel -> {
- ClientHandler handler = channel.pipeline().get(ClientHandler.class);
- CompletableFuture<JsonResponse> future = handler.getJsonFuture();
- channel.writeAndFlush(httpRequest);
- return future;
- }).thenComposeAsync(
- (JsonResponse rawResponse) -> parseResponse(rawResponse, responseClass),
- executor
- );
+ ChannelFuture connect = bootstrap.connect(targetAddress, targetPort);
+ Channel channel;
--- End diff --
I understand, but it can be confusing that a method which returns a future actually has
a (potentially long) blocking operation.
---
|