flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9489) Checkpoint timers as part of managed keyed state instead of raw keyed state
Date Sat, 14 Jul 2018 06:57:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9489?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544091#comment-16544091

ASF GitHub Bot commented on FLINK-9489:

GitHub user StefanRRichter opened a pull request:


    [FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state

    ## What is the purpose of the change
    This PR integrates priority queue state (timers) with the snapshotting of Flink's state
backend ans also already includes backwards compatibility (FLINK-9490). Core idea is to have
a common abstraction for how state is registered in the state backend and how snapshots operator
on such state (`StateSnapshotRestore`, `RegisteredStateMetaInfoBase`). With this, the new
state integrates more or less seemless with existing snapshot logic. The notable exception
is a current lack of integration of RocksDB state backend with heap-based priority queue state.
This can currently still use the old snapshot code without causing any regression using a
temporary path (see `AbstractStreamOperator.snapshotState(...)`. As a result, after this PR
Flink supports asynchronous snapshots for heap kv / heap queue, rocks kv / rocks queue (full
and incremental), rocks kv / heap queue (only full)  and still uses synchronous snapshots
for rocks kv / heap queue (only incremental).
    This work was created in a bit of a rush to make it into the 1.6 release and still has
some known rough edges that we could fix up in the test phase. Here is a list of some things
that already come to my mind:
    - Integrate heap-based timers with incremental RocksDB snapshots, then kick out some code.
    - Check proper integration with serializer upgrade story (!!)
    - After that, we can also remove the key-partitioning in the set structure from `HeapPriorityQueueSet`.
    - Improve integration of the batch wrapper.
    - Improve general state registration logic in the backends, there is potential to remove
duplicated code, and generally still improve the integration of the queue state a bit.
    - Improve performance of RocksDB based timers, e.g. byte[] based cache, seek sharp to
the next potential timer instead of seeking to the key-group start, bulkPoll.
    - Improve some class/interface/method names
    - Add tests, e.g. bulkPoll etc.
    ## Verifying this change
    This change is already covered by existing tests, but I would add some more eventually.
You can activate RocksDB based timers by using the RocksDB backend and setting `RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE`
to `ROCKS`.
    ## Does this pull request potentially affect one of the following parts:
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (yes)
      - The runtime per-record code paths (performance sensitive): (yes)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing,
Yarn/Mesos, ZooKeeper: (yes)
      - The S3 file system connector: (no)
    ## Documentation
      - Does this pull request introduce a new feature? (yes)
      - If yes, how is the feature documented? (JavaDocs only for now)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StefanRRichter/flink pq-snapshot-integration

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6333
commit 1bb8f70700deacc49a4d4ac7900425c10272959d
Author: Stefan Richter <s.richter@...>
Date:   2018-06-13T09:56:16Z

    [FLINK-9489] Checkpoint timers as part of managed keyed state instead of raw keyed state

commit fc20df8268decab6d9890d617787a4084284b2f0
Author: Stefan Richter <s.richter@...>
Date:   2018-07-13T23:19:30Z

    Optimization for relaxed bulk polls

commit 4db1bea90fd6881555172fe3d22ee928e97894a7
Author: Stefan Richter <s.richter@...>
Date:   2018-07-14T06:34:16Z

    Renaming of some classes/interfaces


> Checkpoint timers as part of managed keyed state instead of raw keyed state
> ---------------------------------------------------------------------------
>                 Key: FLINK-9489
>                 URL: https://issues.apache.org/jira/browse/FLINK-9489
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Stefan Richter
>            Assignee: Stefan Richter
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.0
> Timer state should now become part of the keyed state backend snapshot, i.e., stored
inside the managed keyed state. This means that we have to connect our preparation for asynchronous
checkpoints with the backend, so that the timers are written as part of the state for each
key-group. This means that we will also free up the raw keyed state an might expose it to
user functions in the future.

This message was sent by Atlassian JIRA

View raw message