flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8421) HeapInternalTimerService should reconfigure compatible key / namespace serializers on restore
Date Thu, 25 Jan 2018 13:54:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8421?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16339236#comment-16339236

ASF GitHub Bot commented on FLINK-8421:

GitHub user tzulitai opened a pull request:


    [FLINK-8421] [DataStream] Make timer serializers reconfigurable on restore

    ## What is the purpose of the change
    Previously, key and namespace serializers of the `HeapInternalTimerService` were not reconfigured
on restore.
    In Flink 1.4.0, we removed Avro dependency, and on restore if the Avro dependency is not
present, a `DummyAvroKryoSerializerClass` was registered to Kryo as a placeholder, which altered
the base Kryo registrations in the `KryoSerializer`. This change required a serializer reconfiguration
in order for restores to work. Effectively, this allowed the issue in the `HeapInternalTimerService`
to surface.
    This PR fixes this by writing also the `TypeSerializerConfigSnapshot`s of the key and
namespace serializer of the `HeapInternalTimerService` into savepoints, and use them to reconfigure
new serializers on restore.
    Since this would change the binary format of the written timer services, this PR also
uses this opportunity to properly make the format versioned.
    More details of the change is explained below.
    ## Brief change log
    - 1bc3cd0: A preliminary migration test that took a savepoint of a `WindowOperator` with
keys that required serialization using the `KryoSerializer`. Savepoint were taken for Flink
versions 1.2 and 1.3. Restoring from this savepoint in Flink 1.4 fails, and requires the following
commits to pass.
    - b9a1695: Always use the `FailureTolerantObjectInputStream` to read objects in the `InstantiationUtil.deserializeObject(...)`
methods. That special stream avoids restore failures with `ClassNotFoundException` if Avro
is not present, but there were leaks where during the restore process, that special input
stream was not used.
    - ff2e6b7 and 8bd955d: Introduced `ByteArrayPrependedInputStream` and `PostVersionedIOReadableWritable`.
These are utility classes that were required to migrate the serialization format of the timer
services from non-versioned to versioned.
    - bcdc1f1: The main change, which adds key / namespace serializer config snapshots and
use them for serializer reconfiguration on restore. This commit also makes the format versioned.
    ## Verifying this change
    - The migration test added in 1bc3cd0 will not pass without all fixes.
    - Unit tests are added for the new `ByteArrayPrependedInputStream` and `PostVersionedIOReadableWritable`
    - The `testSnapshotAndRestore` and `testSnapshotAndRebalancedRestore` tests in `HeapInternalTimerServiceTest`
are adapted to test both versioned and previous non-versioned formats.
    ## Does this pull request potentially affect one of the following parts:
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes
/ **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing,
Yarn/Mesos, ZooKeeper: (**YES** / no / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    ## Documentation
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-8421

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5362
commit 1bc3cd0214d2d17f19d76a9aa094429730b5ba13
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-01-24T16:08:13Z

    [FLINK-8421] [DataStream, tests] Add WindowOperator migration test for Kryo-serialized
window keys

commit b9a169535a91d5678ae916d8e54b7e60724a7486
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-01-24T16:15:08Z

    [FLINK-8421] [core] Let InstantiationUtil.deserializeObject() always use FailureTolerantObjectInputStream

commit ff2e6b75f39f0d474ecca451ac1a47c0183e9a6f
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-01-24T17:07:14Z

    [FLINK-8421] [core] Introduce ByteArrayPrependedInputStream

commit 8bd955d701f9f9278a5e52befea4308f42a60b45
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-01-24T17:08:52Z

    [FLINK-8421] [core] Introduce PostVersionedIOReadableWritable

commit bcdc1f14d29ef272d07c8e52c46a355ac565d853
Author: Tzu-Li (Gordon) Tai <tzulitai@...>
Date:   2018-01-24T17:09:44Z

    [FLINK-8421] [DataStream] Make timer serializers reconfigurable on restore
    Previously, the key and namespace serializers for the
    HeapInternalTimerService were not reconfigured on restore to be compatible
    with previously written serializers.
    This caused an immediate error to restore savepoints in Flink 1.4.0,
    since in Flink 1.4.0 we changed the base registrations in the Kryo
    serializer. That change requires serializer reconfiguration.
    This commit fixes this by writing also the serializer configuration
    snapshots of the key and namespace serializer into savepoints, and use
    them to reconfigure the new serializers on rrestore. This improvement also
    comes along with making the written data for timer service snapshots
    versioned. Backwards compatibility with previous non-versioned formats
    is not broken.


> HeapInternalTimerService should reconfigure compatible key / namespace serializers on
> ---------------------------------------------------------------------------------------------
>                 Key: FLINK-8421
>                 URL: https://issues.apache.org/jira/browse/FLINK-8421
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.4.0, 1.5.0
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.3.3, 1.5.0, 1.4.1
> The {{HeapInternalTimerService}} still uses simple {{equals}} checks on restored / newly
provided serializers for compatibility checks. This should be replaced with the {{TypeSerializer::ensureCompatibility}}
checks instead, so that new serializers can be reconfigured.
> This would entail that the {{TypeSerializerConfiguration}} of the key and namespace serializer
in the {{HeapInternalTimerService}} also needs to be written to the raw state.
> For Flink 1.4.0 release and current master, this is a critical bug since the {{KryoSerializer}}
has different default base registrations than before due to FLINK-7420. i.e if the key of
a window is serialized using the {{KryoSerializer}} in 1.3.x, the restore would never succeed
in 1.4.0.
> For 1.3.x, this fix would be an improvement, such that the {{HeapInternalTimerService}}
restore will make use of serializer reconfiguration.
> Other remarks:
> * We need to double check all operators that checkpoint / restore from **raw** state.
Apparently, the serializer compatibility checks were only implemented for managed state.
> * Migration ITCases apparently do not have enough coverage. A migration test job that
uses a key type which required the {{KryoSerializer}}, and uses windows, would have caught
this issue.

This message was sent by Atlassian JIRA

View raw message