Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4728#discussion_r141320991
--- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
---
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Cache for {@link AccessExecutionGraph} which are obtained from the Flink cluster.
Every cache entry
+ * has an associated time to live after which a new request will trigger the reloading
of the
+ * {@link AccessExecutionGraph} from the cluster.
+ */
+public class ExecutionGraphCache implements Closeable {
+
+ private final Time timeout;
+
+ private final Time timeToLive;
+
+ private final ConcurrentHashMap<JobID, ExecutionGraphEntry> cachedExecutionGraphs;
+
+ private volatile boolean running = true;
+
+ public ExecutionGraphCache(
+ Time timeout,
+ Time timeToLive) {
+ this.timeout = checkNotNull(timeout);
+ this.timeToLive = checkNotNull(timeToLive);
+
+ cachedExecutionGraphs = new ConcurrentHashMap<>(4);
+ }
+
+ @Override
+ public void close() {
+ running = false;
+
+ // clear all cached AccessExecutionGraphs
+ cachedExecutionGraphs.clear();
+ }
+
+ /**
+ * Gets the {@link AccessExecutionGraph} for the given {@link JobID} and caches it.
The
+ * {@link AccessExecutionGraph} will be requested again after the refresh interval has
passed
+ * or if the graph could not be retrieved from the given gateway.
+ *
+ * @param jobId identifying the {@link AccessExecutionGraph} to get
+ * @param restfulGateway to request the {@link AccessExecutionGraph} from
+ * @return Future containing the requested {@link AccessExecutionGraph}
+ */
+ public CompletableFuture<AccessExecutionGraph> getExecutionGraph(JobID jobId,
RestfulGateway restfulGateway) {
+
+ Preconditions.checkState(running, "ExecutionGraphCache is no longer running");
+
+ while (true) {
+ final ExecutionGraphEntry oldEntry = cachedExecutionGraphs.get(jobId);
+
+ final long currentTime = System.currentTimeMillis();
+
+ if (oldEntry != null) {
+ if (currentTime < oldEntry.getTTL()) {
+ if (oldEntry.getExecutionGraphFuture().isDone() && !oldEntry.getExecutionGraphFuture().isCompletedExceptionally())
{
+
+ // TODO: Remove once we no longer request the actual ExecutionGraph from the JobManager
but only the ArchivedExecutionGraph
+ try {
+ if (oldEntry.getExecutionGraphFuture().get().getState() != JobStatus.SUSPENDED)
{
+ return oldEntry.getExecutionGraphFuture();
+ }
+ // send a new request to get the ExecutionGraph from the new leader
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("Could not retrieve ExecutionGraph from the orderly
completed exception. This should never happen.", e);
+ }
+ } else if (!oldEntry.getExecutionGraphFuture().isDone()) {
+ return oldEntry.getExecutionGraphFuture();
+ }
+ // otherwise it must be completed exceptionally
+ }
+ }
+
+ final ExecutionGraphEntry newEntry = new ExecutionGraphEntry(currentTime + timeToLive.toMilliseconds());
+
+ final boolean success;
+
+ if (oldEntry == null) {
+ success = cachedExecutionGraphs.putIfAbsent(jobId, newEntry) == null;
+ } else {
+ success = cachedExecutionGraphs.replace(jobId, oldEntry, newEntry);
+ // cancel potentially outstanding futures
+ oldEntry.getExecutionGraphFuture().cancel(false);
+ }
+
+ if (success) {
+ // request job graph
--- End diff --
true
---
|