flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
Date Sun, 15 Jul 2018 13:33:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16544541#comment-16544541

ASF GitHub Bot commented on FLINK-9630:

GitHub user ubyyj opened a pull request:


    [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection …

    …leak on TopicAuthorizationException
    **(The sections below can be removed for hotfixes of typos)**
    ## What is the purpose of the change
    Fix the bug that Kafka09PartitionDiscoverer can cause TCP connection leak, if getAllPartitionsForTopics()
get a TopicAuthorizationException.
    ## Brief change log
    catch TopicAuthorizationException and close the kafkaConsumer in getAllPartitionsForTopics().
    ## Verifying this change
    This change added tests and can be verified as follows:
     - *Manually verified the change by running job which consumes from an non-exist kafka
topic, and verified the # of opened TCP connection and # file handle did not increase of the
task manager process, The fix has beening running in our production for weeks now, without
problem *
    ## Does this pull request potentially affect one of the following parts:
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing,
Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    ## Documentation
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ubyyj/flink master

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6336
commit 0aa8d75af085c2465e8cfd9e5a572770a5d95738
Author: yuanyoujun <yuanyoujun@...>
Date:   2018-07-15T13:07:49Z

    [FLINK-9630] [connector] Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException


> Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
> -------------------------------------------------------------------------------
>                 Key: FLINK-9630
>                 URL: https://issues.apache.org/jira/browse/FLINK-9630
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.5.0, 1.4.2
>         Environment: Linux 2.6, java 8, Kafka broker 0.10.x
>            Reporter: Youjun Yuan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.5.2
> when the Kafka topic got deleted, during task starting process, Kafka09PartitionDiscoverer
will get a *TopicAuthorizationException* in getAllPartitionsForTopics(), and it get no chance
to close the  kafkaConsumer, hence resulting TCP connection leak (to Kafka broker).
> *this issue can bring down the whole Flink cluster*, because, in a default setup (fixedDelay
with INT.MAX restart attempt), job manager will randomly schedule the job to any TaskManager
that has free slot, and each attemp will cause the TaskManager to leak a TCP connection, eventually
almost every TaskManager will run out of file handle, hence no taskmanger could make snapshot,
or accept new job. Effectly stops the whole cluster.
> The leak happens when StreamTask.invoke() calls openAllOperators(), then FlinkKafkaConsumerBase.open()
calls partitionDiscoverer.discoverPartitions(), when kafkaConsumer.partitionsFor(topic)
in KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a *TopicAuthorizationException,* no
one catches this.
> Though StreamTask.open catches Exception and invoks the dispose() method of each operator,
which eventaully invoke FlinkKakfaConsumerBase.cancel(), however it does not close the kakfaConsumer
in partitionDiscoverer, not even invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread
was null.
> below is the code of FlinkKakfaConsumerBase.cancel() for your convenience
> public void cancel() {
>      // set ourselves as not running;
>      // this would let the main discovery loop escape as soon as possible
>      running = false;
>     if (discoveryLoopThread != null) {
>         if (partitionDiscoverer != null)
> {             // we cannot close the discoverer here, as it is error-prone to concurrent
access;             // only wakeup the discoverer, the discovery loop will clean itself
up after it escapes             partitionDiscoverer.wakeup();         }
>     // the discovery loop may currently be sleeping in-between
>      // consecutive discoveries; interrupt to shutdown faster
>      discoveryLoopThread.interrupt();
>      }
>     // abort the fetcher, if there is one
>      if (kafkaFetcher != null)
> {          kafkaFetcher.cancel();     }
> }

This message was sent by Atlassian JIRA

View raw message