flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ChrisChinchilla <...@git.apache.org>
Subject [GitHub] flink pull request #4543: [FLINK-7449] [docs] Additional documentation for i...
Date Sat, 25 Nov 2017 00:26:56 GMT
Github user ChrisChinchilla commented on a diff in the pull request:

    --- Diff: docs/ops/state/checkpoints.md ---
    @@ -99,3 +99,296 @@ above).
     $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
    +## Incremental Checkpoints
    +### Synopsis
    +Incremental checkpoints can significantly reduce checkpointing time in comparison to
full checkpoints, at the cost of a
    +(potentially) longer recovery time. The core idea is that incremental checkpoints only
record changes in state since the
    +previously-completed checkpoint instead of producing a full, self-contained backup of
the state backend. In this way,
    +incremental checkpoints can build upon previous checkpoints.
    +RocksDBStateBackend is currently the only backend that supports incremental checkpoints.
    +Flink leverages RocksDB's internal backup mechanism in a way that is self-consolidating
over time. As a result, the
    +incremental checkpoint history in Flink does not grow indefinitely, and old checkpoints
are eventually subsumed and
    +pruned automatically.
    +``While we strongly encourage the use of incremental checkpoints for Flink jobs with
large state, please note that this is
    +a new feature and currently not enabled by default``.
    +To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding
boolean flag in the
    +constructor set to `true`, e.g.:
    +   RocksDBStateBackend backend =
    +       new RocksDBStateBackend(filebackend, true);
    +### Use-case for Incremental Checkpoints
    +Checkpoints are the centrepiece of Flink’s fault tolerance mechanism and each checkpoint
represents a consistent
    +snapshot of the distributed state of a Flink job from which the system can recover in
case of a software or machine
    +failure (see [here]({{ site.baseurl }}/internals/stream_checkpointing.html). 
    +Flink creates checkpoints periodically to track the progress of a job so that, in case
of failure, only those
    +(hopefully few) *events that have been processed after the last completed checkpoint*
must be reprocessed from the data
    +source. The number of events that must be reprocessed has implications for recovery time,
and so for fastest recovery,
    +we want to *take checkpoints as often as possible*.
    +However, checkpoints are not without performance cost and can introduce *considerable
overhead* to the system. This
    +overhead can lead to lower throughput and higher latency during the time that checkpoints
are created. One reason is
    +that, traditionally, each checkpoint in Flink always represented the *complete state*
of the job at the time of the
    +checkpoint, and all of the state had to be written to stable storage (typically some
distributed file system) for every
    +single checkpoint. Writing multiple terabytes (or more) of state data for each checkpoint
can obviously create
    +significant load for the I/O and network subsystems, on top of the normal load from pipeline’s
data processing work.
    +Before incremental checkpoints, users were stuck with a suboptimal tradeoff between recovery
time and checkpointing
    +overhead. Fast recovery and low checkpointing overhead were conflicting goals. And this
is exactly the problem that
    +incremental checkpoints solve.
    +### Basics of Incremental Checkpoints
    +In this section, for the sake of getting the concept across, we will briefly discuss
the idea behind incremental
    +checkpoints in a simplified manner.
    +Our motivation for incremental checkpointing stemmed from the observation that it is
often wasteful to write the full
    +state of a job for every single checkpoint. In most cases, the state between two checkpoints
is not drastically
    +different, and only a fraction of the state data is modified and some new data added.
Instead of writing the full state
    +into each checkpoint again and again, we could record only changes in state since the
previous checkpoint. As long as we
    +have the previous checkpoint and the state changes for the current checkpoint, we can
restore the full, current state
    +for the job. This is the basic principle of incremental checkpoints, that each checkpoint
can build upon a history of
    +previous checkpoints to avoid writing redundant information.
    +Figure 1 illustrates the basic idea of incremental checkpointing in comparison to full
    +The state of the job evolves over time and for checkpoints ``CP 1`` to ``CP 2``, a full
checkpoint is simply a copy of the whole
    +<p class="text-center">
    +   <img alt="Figure 1: Full Checkpoints vs Incremental Checkpoints" width="80%" src="{{
site.baseurl }}/fig/incremental_cp_basic.svg"/>
    +With incremental checkpointing, each checkpoint contains only the state change since
the previous checkpoint.
    +* For the first checkpoint ``CP 1``, there is no difference between a full checkpoint
and the complete state at the time the
    +checkpoint is written.
    +* For ``CP 2``, incremental checkpointing will write only the changes since ``CP 1``:
the value for ``K1`` has changed and a mapping
    +for ``K3`` was added.
    +* For checkpoint ``CP 3``, incremental checkpointing only records the changes since ``CP
    +Notice that, unlike in full checkpoints, we also must record changes that delete state
in an incremental checkpoint, as
    +in the case of ``K0``. In this simple example, we can see how incremental checkpointing
can reduce the amount of data that
    +is written for each checkpoint.
    +The next interesting question: how does restoring from incremental checkpoints compare
to restoring from full
    +checkpoints? Restoring a full checkpoint is as simple as loading all the data from the
checkpoint back into the job
    +state because full checkpoints are self-contained. In contrast, to restore an incremental
checkpoint, we need to replay
    +the history of all incremental checkpoints that are in the reference chain of the checkpoint
we are trying to restore.
    +In our example from Figure 1, those connections are represented by the orange arrows.
If we want to restore ``CP 3``, as a
    +first step, we need to apply all the changes of ``CP 1`` to the empty initial job state.
On top of that, we apply the
    +changes from ``CP 2``, and then the changes from ``CP 3``.
    +A different way to think about basic incremental checkpoints is to imagine it as a changelog
with some aggregation. What
    +we mean by aggregated is that for example, if the state under key ``K1`` is overwritten
multiple times between two
    +consecutive checkpoints, we will only record the latest state value at the time in the
checkpoint. All previous changes
    +are thereby subsumed.
    +This leads us to the discussion of the potential *disadvantages* of incremental checkpoints
compared to full checkpoints.
    +While we save work in writing checkpoints, we have to do more work in reading the data
from multiple checkpoints on
    +recovery. Furthermore, we can no longer simply delete old checkpoints because new checkpoints
rely upon them and the
    +history of old checkpoints can grow indefinitely over time (like a changelog).
    +At this point, it looks like we didn’t gain too much from incremental checkpoints because
we are again trading between
    +checkpointing overhead and recovery time. Fortunately, there are ways to improve on this
naive approach to recovery. One
    +simple and obvious way to restrict recovery time and the length of the checkpoint history
is to write a full checkpoint
    +from time to time. We can drop all checkpoints prior to the most recent full checkpoint,
and the full checkpoint can
    +serve as a new basis for future incremental checkpoints.
    +Our actual implementation of incremental checkpoints in Flink is more involved and designed
to address a number of
    +different tradeoffs. Our incremental checkpointing restricts the size of the checkpoint
history and therefore never
    +needs take a full checkpoint to keep recovery efficiently! We present more detail about
this in the next section, but
    --- End diff --
    @StefanRRichter Is the whole section…
    > We present more detail about this in the next section…
    Until the end of the paragraph needed? If you're about to cover it below anyway, why mention
it? Or is there anything here that isn't mentioned in the 
    > ### Incremental Checkpoints in Flink


View raw message