flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8484) Kinesis consumer re-reads closed shards on job restart
Date Wed, 31 Jan 2018 13:27:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8484?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16346809#comment-16346809

ASF GitHub Bot commented on FLINK-8484:

Github user tzulitai commented on the issue:

    @pluppens the changes look good to merge! Thanks a lot for working on this.
    Will merge this for `release-1.4`, `release-1.3`, and `master` ..

> Kinesis consumer re-reads closed shards on job restart
> ------------------------------------------------------
>                 Key: FLINK-8484
>                 URL: https://issues.apache.org/jira/browse/FLINK-8484
>             Project: Flink
>          Issue Type: Bug
>          Components: Kinesis Connector
>    Affects Versions: 1.4.0, 1.3.2
>            Reporter: Philip Luppens
>            Assignee: Philip Luppens
>            Priority: Blocker
>              Labels: bug, flink, kinesis
>             Fix For: 1.3.3, 1.5.0, 1.4.1
> We’re using the connector to subscribe to streams varying from 1 to a 100 shards, and
used the kinesis-scaling-utils to dynamically scale the Kinesis stream up and down during
peak times. What we’ve noticed is that, while we were having closed shards, any Flink job
restart with check- or save-point would result in shards being re-read from the event horizon,
duplicating our events.
> We started checking the checkpoint state, and found that the shards were stored correctly
with the proper sequence number (including for closed shards), but that upon restarts, the
older closed shards would be read from the event horizon, as if their restored state would
be ignored.
> In the end, we believe that we found the problem: in the FlinkKinesisConsumer’s run()
method, we’re trying to find the shard returned from the KinesisDataFetcher against the
shards’ metadata from the restoration point, but we do this via a containsKey() call, which
means we’ll use the StreamShardMetadata’s equals() method. However, this checks for all
properties, including the endingSequenceNumber, which might have changed between the restored
state’s checkpoint and our data fetch, thus failing the equality check, failing the containsKey()
check, and resulting in the shard being re-read from the event horizon, even though it was
present in the restored state.
> We’ve created a workaround where we only check for the shardId and stream name to restore
the state of the shards we’ve already seen, and this seems to work correctly. 

This message was sent by Atlassian JIRA

View raw message