flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (FLINK-9630) Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
Date Sun, 15 Jul 2018 13:33:00 GMT

     [ https://issues.apache.org/jira/browse/FLINK-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

ASF GitHub Bot updated FLINK-9630:
    Labels: pull-request-available  (was: )

> Kafka09PartitionDiscoverer cause connection leak on TopicAuthorizationException
> -------------------------------------------------------------------------------
>                 Key: FLINK-9630
>                 URL: https://issues.apache.org/jira/browse/FLINK-9630
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.5.0, 1.4.2
>         Environment: Linux 2.6, java 8, Kafka broker 0.10.x
>            Reporter: Youjun Yuan
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.5.2
> when the Kafka topic got deleted, during task starting process, Kafka09PartitionDiscoverer
will get a *TopicAuthorizationException* in getAllPartitionsForTopics(), and it get no chance
to close the  kafkaConsumer, hence resulting TCP connection leak (to Kafka broker).
> *this issue can bring down the whole Flink cluster*, because, in a default setup (fixedDelay
with INT.MAX restart attempt), job manager will randomly schedule the job to any TaskManager
that has free slot, and each attemp will cause the TaskManager to leak a TCP connection, eventually
almost every TaskManager will run out of file handle, hence no taskmanger could make snapshot,
or accept new job. Effectly stops the whole cluster.
> The leak happens when StreamTask.invoke() calls openAllOperators(), then FlinkKafkaConsumerBase.open()
calls partitionDiscoverer.discoverPartitions(), when kafkaConsumer.partitionsFor(topic)
in KafkaPartitionDiscoverer.getAllPartitionsForTopics() hit a *TopicAuthorizationException,* no
one catches this.
> Though StreamTask.open catches Exception and invoks the dispose() method of each operator,
which eventaully invoke FlinkKakfaConsumerBase.cancel(), however it does not close the kakfaConsumer
in partitionDiscoverer, not even invoke the partitionDiscoverer.wakeup(), because the discoveryLoopThread
was null.
> below is the code of FlinkKakfaConsumerBase.cancel() for your convenience
> public void cancel() {
>      // set ourselves as not running;
>      // this would let the main discovery loop escape as soon as possible
>      running = false;
>     if (discoveryLoopThread != null) {
>         if (partitionDiscoverer != null)
> {             // we cannot close the discoverer here, as it is error-prone to concurrent
access;             // only wakeup the discoverer, the discovery loop will clean itself
up after it escapes             partitionDiscoverer.wakeup();         }
>     // the discovery loop may currently be sleeping in-between
>      // consecutive discoveries; interrupt to shutdown faster
>      discoveryLoopThread.interrupt();
>      }
>     // abort the fetcher, if there is one
>      if (kafkaFetcher != null)
> {          kafkaFetcher.cancel();     }
> }

This message was sent by Atlassian JIRA

View raw message