flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5048) Kafka Consumer (0.9/0.10) threading model leads problematic cancellation behavior
Date Fri, 11 Nov 2016 14:38:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15657219#comment-15657219
] 

ASF GitHub Bot commented on FLINK-5048:
---------------------------------------

GitHub user StephanEwen opened a pull request:

    https://github.com/apache/flink/pull/2789

    [FLINK-5048] [Kafka Consumer] Change thread model of FlinkKafkaConsumer to better handel
shutdown/interrupt situations

    **NOTE:** Only the second commit is relevant, the first commit only prepares by cleaning
up some code in the Flink Kafka Consumers for 0.9 and 0.10
    
    ## Rational
    
    Prior to this commit, the FlinkKafkaConsumer (0.9 / 0.10) spawns a separate thread that
operates Kafka's consumer. That thread was shielded from interrupts, because the Kafka Consumer
has not been handling thread interrupts well.
    
    Since that thread was also the thread that emitted records, it would block in the network
stack (backpressure) or in chained operators. The later case lead to situations where cancellations
got very slow unless that thread would be interrupted (which it could not be).
    
    ## Core changes
    
    This commit changes the thread model:
    
      - A spawned consumer thread polls a batch or records from the KafkaConsumer and pushes
the batch of records into a sort of blocking queue
      - The main thread of the task will pull the record batches from the blocking queue and
emit the records.
    
    The "batches" are the fetch batches from Kafka's consumer, there is no additional buffering
or so that would impact latency.
    
    The thread-to-thread handover of the records batches is handled by a class `Handover`
which is a size-one blocking queue with the additional ability to gracefully wake up the consumer
thread if the main thread decided to shut down. That way we need no interrupts on the KafkaConsumerThread.
    
    This also pulls the KafkaConsumerThread out of the fetcher class for some code cleanup
(scope simplifications).
    The method calls that were broken between Kafka 0.9 and 0.10 are handled via a "call bridge",
which leads to fewer code changes in the fetchers for each method that needs to be adapted.
    
    ## Tests
    
    This adjusts some tests, but it removes the "short retention IT Cases" for Kafka 0.9 and
0.10 consumers.
    While that type of test makes sense for the 0.8 consumer, for the newer ones the tests
actually test purely Kafka and no Flink code.
    
    In addition, they are virtually impossible to run stable and fast, because they rely on
an artificial slowdown in the KafkaConsumer threads. That type of unhealthy interference is
exactly what this patch here prevents ;-)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/StephanEwen/incubator-flink kafka_consumer

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2789.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2789
    
----
commit f6cd417cdf37213f88c62e9342206e249402eac6
Author: Stephan Ewen <sewen@apache.org>
Date:   2016-11-09T16:58:54Z

    [hotfix] [Kafka Consumer] Clean up some code confusion and style in the Fetchers for Kafka
0.9/0.10

commit 9a0786508b9a13cd986de593c6bdb2ecdb1737a8
Author: Stephan Ewen <sewen@apache.org>
Date:   2016-11-10T10:13:43Z

    [FLINK-5048] [kafka consumer] Change thread model of FlinkKafkaConsumer to better handel
shutdown/interrupt situations
    
    Prior to this commit, the FlinkKafkaConsumer (0.9 / 0.10) spawns a separate thread that
operates Kafka's consumer.
    That thread ws shielded from interrupts, because the Kafka Consumer has not been handling
thread interrupts well.
    Since that thread was also the thread that emitted records, it would block in the network
stack (backpressure) or in chained operators.
    The later case lead to situations where cancellations got very slow unless that thread
would be interrupted (which it could not be).
    
    This commit changes the thread model:
      - A spawned consumer thread polls a batch or records from the KafkaConsumer and pushes
the
        batch of records into a blocking queue (size one)
      - The main thread of the task will pull the record batches from the blocking queue and
        emit the records.

----


> Kafka Consumer (0.9/0.10) threading model leads problematic cancellation behavior
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-5048
>                 URL: https://issues.apache.org/jira/browse/FLINK-5048
>             Project: Flink
>          Issue Type: Bug
>          Components: Kafka Connector
>    Affects Versions: 1.1.3
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.2.0
>
>
> The {{FLinkKafkaConsumer}} (0.9 / 0.10) spawns a separate thread that operates the KafkaConsumer.
That thread is shielded from interrupts, because the Kafka Consumer has not been handling
thread interrupts well.
> Since that thread is also the thread that emits records, it may block in the network
stack (backpressure) or in chained operators. The later case leads to situations where cancellations
get very slow unless that thread would be interrupted (which it cannot be).
> I propose to change the thread model as follows:
>   - A spawned consumer thread pull from the KafkaConsumer and pushes its pulled batch
of records into a blocking queue (size one)
>   - The main thread of the task will pull the record batches from the blocking queue
and emit the records.
> This allows actually for some additional I/O overlay while limiting the additional memory
consumption - only two batches are ever held, one being fetched and one being emitted.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message