flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
Date Sat, 13 Jan 2018 09:39:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16325064#comment-16325064
] 

ASF GitHub Bot commented on FLINK-8317:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5223#discussion_r161368455
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
---
    @@ -0,0 +1,337 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest.handler.job.savepoints;
    +
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.JobID;
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.configuration.CoreOptions;
    +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
    +import org.apache.flink.runtime.concurrent.FutureUtils;
    +import org.apache.flink.runtime.rest.NotFoundException;
    +import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
    +import org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter;
    +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
    +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody;
    +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
    +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
    +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
    +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId;
    +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
    +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
    +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
    +import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.types.Either;
    +import org.apache.flink.util.SerializedThrowable;
    +
    +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
    +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
    +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import javax.annotation.Nonnull;
    +import javax.annotation.Nullable;
    +import javax.annotation.concurrent.Immutable;
    +import javax.annotation.concurrent.ThreadSafe;
    +
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.concurrent.CompletableFuture;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +/**
    + * HTTP handlers for asynchronous triggering of savepoints.
    + *
    + * <p>Drawing savepoints is a potentially long-running operation. To avoid blocking
HTTP
    + * connections, savepoints must be drawn in two steps. First, an HTTP request is issued
to trigger
    + * the savepoint asynchronously. The request will be assigned a {@link SavepointTriggerId},
    + * which is returned in the response body. Next, the returned id should be used to poll
the status
    + * of the savepoint until it is finished.
    + *
    + * <p>A savepoint is triggered by sending an HTTP {@code POST} request to
    + * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON body to specify
the target
    + * directory of the savepoint, e.g.,
    + * <pre>
    + * { "target-directory": "/tmp" }
    + * </pre>
    + * If the body is omitted, or the field {@code target-property} is {@code null}, the
default
    + * savepoint directory as specified by {@link CoreOptions#SAVEPOINT_DIRECTORY} will be
used.
    + * As written above, the response will contain a request id, e.g.,
    + * <pre>
    + * { "request-id": "7d273f5a62eb4730b9dea8e833733c1e" }
    + * </pre>
    + *
    + * <p>To poll for the status of an ongoing savepoint, an HTTP {@code GET} request
is issued to
    + * {@code /jobs/:jobid/savepoints/:savepointtriggerid}. If the specified savepoint is
still ongoing,
    + * the response will be
    + * <pre>
    + * {
    + *     "status": {
    + *         "id": "IN_PROGRESS"
    + *     }
    + * }
    + * </pre>
    + * If the specified savepoint has completed, the status id will transition to {@code
COMPLETED}, and
    + * the response will additionally contain information about the savepoint, such as the
location:
    + * <pre>
    + * {
    + *     "status": {
    + *         "id": "COMPLETED"
    + *     },
    + *     "savepoint": {
    + *         "request-id": "7d273f5a62eb4730b9dea8e833733c1e",
    + *         "location": "/tmp/savepoint-d9813b-8a68e674325b"
    + *     }
    + * }
    + * </pre>
    + */
    +public class SavepointHandlers {
    +
    +	private final CompletedCheckpointCache completedCheckpointCache = new CompletedCheckpointCache();
    +
    +	@Nullable
    +	private String defaultSavepointDir;
    --- End diff --
    
    done


> Enable Triggering of Savepoints via RestfulGateway
> --------------------------------------------------
>
>                 Key: FLINK-8317
>                 URL: https://issues.apache.org/jira/browse/FLINK-8317
>             Project: Flink
>          Issue Type: New Feature
>          Components: Distributed Coordination, REST
>    Affects Versions: 1.5.0
>            Reporter: Gary Yao
>            Assignee: Gary Yao
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture<CompletedCheckpoint> triggerSavepoint(long
timestamp, String targetDirectory)}} to {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new {{AbstractRestHandler}} which allows asynchronous triggering of savepoints




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message