flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6006) Kafka Consumer can lose state if queried partition list is incomplete on restore
Date Fri, 10 Mar 2017 08:37:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6006?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15904712#comment-15904712
] 

ASF GitHub Bot commented on FLINK-6006:
---------------------------------------

GitHub user tzulitai opened a pull request:

    https://github.com/apache/flink/pull/3507

    [FLINK-6006] [kafka] Always use complete restored state in FlinkKafkaConsumer

    (This PR is the fix of FLINK-6006 for Flink 1.1)
    
    Previously, the Kafka Consumer performs partition list querying on
    restore, and then uses it to filter out restored state of partitions
    that doesn't exist in the list.
    
    If in any case the returned partitions list is incomplete (i.e. missing
    partitions that existed before perhaps due to temporary ZK / broker
    downtimes), then the state of the missing partitions is dropped and
    cannot be recovered anymore.
    
    This PR fixes this by always restoring the complete state, without
    any sort of filtering. We simply let the consumer fail if assigned
    partitions to the consuming threads / Kafka clients are unreachable when
    the consumer starts running.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/tzulitai/flink FLINK-6006-1.1

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3507.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3507
    
----
commit a57524bfb0158363be9a5bd4a6f18e053d96a030
Author: Tzu-Li (Gordon) Tai <tzulitai@apache.org>
Date:   2017-03-10T06:47:57Z

    [FLINK-6006] [kafka] Always use complete restored state in FlinkKafkaConsumer
    
    Previously, the Kafka Consumer performs partition list querying on
    restore, and then uses it to filter out restored state of partitions
    that doesn't exist in the list.
    
    If in any case the returned partitions list is incomplete (i.e. missing
    partitions that existed before perhaps due to temporary ZK / broker
    downtimes), then the state of the missing partitions is dropped and
    cannot be recovered anymore.
    
    This commit fixes this by always restoring the complete state, without
    any sort of filtering. We simply let the consumer fail if assigned
    partitions to the consuming threads / Kafka clients are unreachable when
    the consumer starts running.

----


> Kafka Consumer can lose state if queried partition list is incomplete on restore
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-6006
>                 URL: https://issues.apache.org/jira/browse/FLINK-6006
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector, Streaming Connectors
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.1.5, 1.2.1
>
>
> In 1.1.x and 1.2.x, the FlinkKafkaConsumer performs partition list querying on restore.
Then, only restored state of partitions that exists in the queried list is used to initialize
the fetcher's state holders.
> If in any case the returned partition list is incomplete (i.e. missing partitions that
existed before, perhaps due to temporary ZK / broker downtime), then the state of the missing
partitions is dropped and cannot be recovered anymore.
> In 1.3-SNAPSHOT, this is fixed by changes in FLINK-4280, so only 1.1 and 1.2 is affected.
> We can backport some of the behavioural changes there to 1.1 and 1.2. Generally, we should
not depend on the current partition list in Kafka when restoring, but just restore all previous
state into the fetcher's state holders. 
> This would therefore also require some checking on how the consumer threads / Kafka clients
behave when its assigned partitions cannot be reached.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message