flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8344) Add support for HA to RestClusterClient
Date Fri, 19 Jan 2018 19:45:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16332814#comment-16332814
] 

ASF GitHub Bot commented on FLINK-8344:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5312#discussion_r162714906
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
---
    @@ -61,46 +69,77 @@
     import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
     import org.apache.flink.runtime.rest.messages.queue.AsynchronouslyCreatedResource;
     import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
    +import org.apache.flink.runtime.rest.util.RestClientException;
     import org.apache.flink.runtime.util.ExecutorThreadFactory;
    +import org.apache.flink.util.ExceptionUtils;
     import org.apache.flink.util.ExecutorUtils;
     import org.apache.flink.util.FlinkException;
     import org.apache.flink.util.Preconditions;
     import org.apache.flink.util.SerializedThrowable;
    -import org.apache.flink.util.function.SupplierWithException;
    +import org.apache.flink.util.function.CheckedSupplier;
    +
    +import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException;
    +
    +import akka.actor.AddressFromURIString;
     
     import javax.annotation.Nullable;
     
     import java.io.IOException;
     import java.net.InetSocketAddress;
    +import java.net.MalformedURLException;
     import java.net.URL;
    -import java.util.ArrayList;
     import java.util.Collection;
     import java.util.Collections;
     import java.util.List;
    +import java.util.UUID;
     import java.util.concurrent.CompletableFuture;
     import java.util.concurrent.CompletionException;
     import java.util.concurrent.ExecutionException;
     import java.util.concurrent.ExecutorService;
     import java.util.concurrent.Executors;
    +import java.util.concurrent.ScheduledExecutorService;
     import java.util.concurrent.TimeUnit;
     import java.util.concurrent.TimeoutException;
    +import java.util.function.Predicate;
    +import java.util.function.Supplier;
    +import java.util.stream.Collectors;
     
    -import static java.util.Objects.requireNonNull;
    +import scala.Option;
    +
    +import static org.apache.flink.util.Preconditions.checkArgument;
     
     /**
      * A {@link ClusterClient} implementation that communicates via HTTP REST requests.
      */
     public class RestClusterClient<T> extends ClusterClient<T> {
     
    +	private static final long AWAIT_LEADER_TIMEOUT = 10_000;
    +
    +	private static final int MAX_RETRIES = 20;
    +
    +	private static final Time RETRY_DELAY = Time.seconds(3);
    +
     	private final RestClusterClientConfiguration restClusterClientConfiguration;
     
     	private final RestClient restClient;
     
     	private final ExecutorService executorService = Executors.newFixedThreadPool(4, new
ExecutorThreadFactory("Flink-RestClusterClient-IO"));
    +
     	private final WaitStrategy waitStrategy;
     
     	private final T clusterId;
     
    +	private LeaderRetrievalService restServerRetrievalService;
    +
    +	private LeaderRetrievalService dispatcherLeaderRetriever;
    +
    +	private final LeaderHolder<URL> restServerLeaderHolder = new LeaderHolder<>("RestServer",
AWAIT_LEADER_TIMEOUT);
    +
    +	private final LeaderHolder<String> dispatcherLeaderHolder = new LeaderHolder<>("Dispatcher",
AWAIT_LEADER_TIMEOUT);
    --- End diff --
    
    Ok, I will look into it. Somehow I missed this class.


> Add support for HA to RestClusterClient
> ---------------------------------------
>
>                 Key: FLINK-8344
>                 URL: https://issues.apache.org/jira/browse/FLINK-8344
>             Project: Flink
>          Issue Type: Improvement
>          Components: Client
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Gary Yao
>            Priority: Major
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> The {{RestClusterClient}} must be able to deal with changing JobMasters in case of HA.
We have to add functionality to reconnect to a newly elected leader in case of HA.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message