flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Luka Jurukovski (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10195) RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
Date Fri, 24 Aug 2018 21:22:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592182#comment-16592182

Luka Jurukovski commented on FLINK-10195:

For what I can tell no. Although this has been very much a crash course in RabbitMQ for me.
Looking at forums it looks like the prefetch.count is the way that this is handled normally.
Basically the consumer can tell RabbitMQ how many unacked messages to allow before stopping.
Ie if the prefetch.count is set to 10,000 that is how many messages rabbitmq will allow before
it needs acknowledgement, at which point it will send data until it hits this max.

I would imagine that Flink would not want to use this mechanism due to the fact that it doesn't
actually "backpressure" with how Checkpointing is tied to Acking. One would have to do a throughput
calculation and hope that there isn't any variance in that number that results in Flink waiting
on the next checkpoint. Additionally since sync checkpointing is a feature there is no guarantees
that checkpointing will happen at a regular interval.

Under the covers the Queueing consumer is using LinkedBlockingQueue and uses the "add" method
to append to the queue.

I tried changing it to use ArrayBlockingQueue with a set capacity and the blocking "put"
method, however this results in another problem with RabbitMQ. Basically this results in RabbitMQ
sometimes terminating the connection to Flink when Flink doesn't dequeue from the queue fast
enough (noticing this usually happens only when sync checkpointing is on and it there is long
running checkpoints). According to some of the forums this is due to Rabbit having some sort
of timeout with regards to how long it is willing to wait when writing to a clients buffer.

I have some ugly code that I am testing where I turn off the consumer when the buffer is full,
and a monitoring thread that turns it back on when it is below a certain capacity. Don't know
if this methodology will cause any other issues, and am testing more. I might be able to get
rid of the monitoring thread but I'll look into that when I proved out this way of doing things.

Welcoming any additional thoughts or comments here. Sorry for the wall of text

> RabbitMQ Source With Checkpointing Doesn't Backpressure Correctly
> -----------------------------------------------------------------
>                 Key: FLINK-10195
>                 URL: https://issues.apache.org/jira/browse/FLINK-10195
>             Project: Flink
>          Issue Type: Bug
>          Components: RabbitMQ Connector
>    Affects Versions: 1.4.0, 1.5.0, 1.5.1, 1.6.0
>            Reporter: Luka Jurukovski
>            Priority: Major
> The connection between the RabbitMQ server and the client does not appropriately back
pressure when auto acking is disabled. This becomes very problematic when a downstream process
throttles the data processing to slower then RabbitMQ sends the data to the client.
> The difference in records ends up being stored in the flink's heap space, which grows
indefinitely (or technically to "Integer Max" Deliveries). Looking at RabbitMQ's metrics the
number of unacked messages looks like steadily rising saw tooth shape.
> Upon further invesitgation it looks like this is due to how the QueueingConsumer works,
messages are added to the BlockingQueue faster then they are being removed and processed,
resulting in the previously described behavior.
> This may be intended behavior, however this isn't explicitly obvious in the documentation
or any of the examples I have seen.

This message was sent by Atlassian JIRA

View raw message