flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From uce <...@git.apache.org>
Subject [GitHub] flink pull request: [FLINK-2976] Allow to trigger checkpoints manu...
Date Fri, 04 Dec 2015 12:04:18 GMT
GitHub user uce opened a pull request:


    [FLINK-2976] Allow to trigger checkpoints manually

    This PR contains **documentation**: https://github.com/uce/flink/blob/2976-savepoints/docs/apis/savepoints.md
    **In a nutshell**, savepoints `(*)` are **manually triggered checkpoints**, which take
a snapshot of a program and write it out to an external state backend. This allows you to
stop and resume your program without loosing intermediate state.
    **Why is this nice?** Because you don't have to replay everything when you redeploy your
long running streaming job after changing it or updating to a newer Flink version.
    `(*)` Initially I wrote it as sa**F**epoints, but then settled on sa**V**epoints after
stubmling across a related feature in [an Oracle SQL reference](https://docs.oracle.com/cd/B19306_01/appdev.102/b14261/savepoint_statement.htm).
What do you think? :smile: http://doodle.com/poll/2z2cp9hxu7eucdsz
    ## Example
    Start your stateful streaming program via `flink/bin run ...`.
    $ bin/flink list
    ------------------ Running/Restarting Jobs -------------------
    04.12.2015 13:51:10 : 46da86f25ca8daa1bbff8ccae64d53af : Flink Streaming Job (RUNNING)
    Wait for some checkpoints to complete:
    $ tail -f log/flink-hadoop-client-uce-m.log
    13:50:59,806 INFO  org.apache.flink.runtime.jobmanager.JobManager - Status of job 46da86f25ca8daa1bbff8ccae64d53af
(Flink Streaming Job) changed to RUNNING.
    13:55:37,225 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 1 @ 1449150937225
    13:55:37,581 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
checkpoint 1
    13:55:42,225 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 2 @ 1449150942225
    13:55:42,328 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
checkpoint 2
    13:56:27,225 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 11 @ 1449150987225
    13:56:27,237 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
checkpoint 11
    Trigger a savepoint and cancel the job:
    $ bin/flink savepoint 46da86f25ca8daa1bbff8ccae64d53af
    Triggering savepoint for job 46da86f25ca8daa1bbff8ccae64d53af. Waiting for response...
    Savepoint completed. Path: jobmanager://savepoints/1
    You can resume your program from this savepoint with the run command.
    $ bin/flink cancel 46da86f25ca8daa1bbff8ccae64d53af
    Now you can restart the program from the savepoint:
    $ bin/flink run --fromSavepoint jobmanager://savepoints/1 ...
    This will resume the application from the state of the savepoint.
    ## Changes to Flink
    I focussed on **not changing any major Flink component** for this. Savepoints use the
same checkpointing mechanism as the periodic checkpoints with some plumbing around it.
    ### Savepoint coordinator
    In addition to the `CheckpointCoordinator`, we add another instance of the `CheckpointCoordinator`
called `SavepointCoordinator`. This class extends the regular coordinator and registers some
callbacks on shutdown, fully ack'ed checkpoint, and cancelled checkpoint. For this, I've added
three callback methods to the checkpoint coordinator, which are overwritten by the savepoint
coordinator. With two separate coordinators, periodic checkpoints and savepoints don't interfere
with each other.
    The savepoint coordinator manages a map of `checkpoint ID => future`. The futures are
completed when the checkpoint is ack'ed or cancelled (or the coordinator shuts down altogether).
    #### Restoring
    Restore happens on job submission if a savepoint path is provided in the `JobSnapshottingSettings`.
The restore mechanism is similar to the regular checkpoint restore, but with some further
sanity checks to ensure that the state to task mapping is correct (see below). All state has
to be mapped to the restored program.
    ### JobManagerMessages
    Added `TriggerSavepoint(JobID)` and `DisposeSavepoint(String)` Akka messages to the job
manager. They trigger and dispose the savepoints respectively. These operations work asynchronously
and respond the the request when the savepoint futures complete. The requests are triggered
by the user (see CLI frontend).
    ### Hashing of StreamNodes
    The state to task mapping of checkpoints happens via `(jobVertexID, subtaskIndex)`. With
this change, the jobVertexIDs of streaming programs are generated deterministically with respect
to the structure of the program. This is needed to make sure that a restore with a new program
can map the savepoint state to the tasks.
    The hash starts from the sources and takes multiple things into account:
    - parallelism
    - user function class
    - hash of the input
    - hash of the outputs
    - stream node ID
    The automatic generations makes sure that you can just use the savepoints, but it is actually
*not recommended*, because you cannot change the program in any meaningful way (except changing
the user function internals).
    That's why the **recommended option** is to specify a unique ID as input to the hasher
on the DataStream API level:
    DataStream<String> stream = env.
      // Stateful source (e.g. Kafka) with ID
      .addSource(new StatefulSource()).uid("source-id")
      // The stateful mapper with ID
      .map(new StatefulMapper()).uid("mapper-id")
    // Stateless sink (no specific ID required)
    If you give IDs to all stateful operators, you can happily re-arrange and change the topology
(except for parallelism, see below).
    ### Application ID and DbStateBackend
    Savepoints are pairs of `(ApplicationID, CompletedCheckpoint)`. I've added a new `ApplicationID`
to allow scoping tasks across multiple job submissions (which have changing job IDs). This
is for example required to restore state in the `DbStateBackend`. After consulting with @gyfora
I've changed all references to `JobID` in `DbStateBackend` to  `ApplicationID`.
    The ApplicationID is assigned in the `ExecutionGraph` only and is reset to the application
ID of the savepoint if there is a restore operation. This touches almost nothing of the existing
code and it is only propagated to the `TaskDeploymentDescriptor` and `Environment` of the
`Task` instances.
    ### State storage
    The state storage **does not** instantiate the regular state backend on the job manager.
It is essentially a set of a few helper classes, which allow to put and get some state to
the file system or the job manager's heap. I think this is fine for now, because I didn't
want to make changes to the central state abstractions, which are kind of in flux right now.
But we should think about it in the future.
    ### Configuration and CLIFrontend
    This works out of the box if the job is using checkpointing. The default state backend
for savepoints is `jobmanager`, which allows to stop and restore a program while the same
job manager is running.
    For configuration, there are two new keys:
    If you don't specify these, the regular state backend configuration is used with the `jobmanager`
as a fallback if no viable config is found.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink 2976-savepoints

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1434
commit d63ea457d11c89378c4d0f0173a5ac372b5a3f58
Author: Ufuk Celebi <uce@apache.org>
Date:   2015-12-01T16:31:32Z

    [FLINK-2976] [streaming-java, streaming-scala] Set JobVertexID based on stream node hash

commit 4896bdcb0107059b2e0f57afc5d7776c26b820d7
Author: Ufuk Celebi <uce@apache.org>
Date:   2015-12-01T16:51:44Z

    [FLINK-2976] [runtime] Add StateStore<T>

commit c748ac935edcd97a1cd7c49662420f55c9806354
Author: Ufuk Celebi <uce@apache.org>
Date:   2015-12-01T17:33:31Z

    [FLINK-2976] [core, runtime, streaming-java] Add ApplicationID to ExecutionGraph

commit 6b20f6924df29c56783b0c0772a61a05639ef619
Author: Ufuk Celebi <uce@apache.org>
Date:   2015-12-01T17:38:34Z

    [FLINK-2976] [runtime] Add setCount(long newCount) to CheckpointIDCounter

commit 60a0774133f460627a0d9949219299b1875d3c1f
Author: Ufuk Celebi <uce@apache.org>
Date:   2015-12-01T17:47:07Z

    [FLINK-2976] [runtime, tests] Add SavepointCoordinator

commit 182b157cc92b09cdc5ce2867a4dd5cbc234a385d
Author: Ufuk Celebi <uce@apache.org>
Date:   2015-12-01T17:49:14Z

    [FLINK-2976] [clients] Add savepoint commands to CliFrontend

commit a69c550967da3acb77ae2c8b5cef2982e835e6b4
Author: Ufuk Celebi <uce@apache.org>
Date:   2015-12-02T15:21:06Z

    [FLINK-2976] [docs] Add docs about savepoints

commit b38481fe147a470127a045eb11edae3af198c134
Author: Ufuk Celebi <uce@apache.org>
Date:   2015-12-03T10:35:46Z

    [FLINK-2976] [streaming-contrib] Use ApplicationID in DbStateBackend instead of JobID


If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.

View raw message