kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bruno Rassaerts <bruno.rassae...@novazone.be>
Subject Re: Stuck consumer with new consumer API in 0.9
Date Tue, 26 Jan 2016 09:15:53 GMT
We do not seek in the onPartitionAssigned.

In our test setup (evaluating kafka for a new project) we put a constant load on one of the
topics.
We have a consumer group pulling messages from the different partitions on the topic.

At a certain point in time, the poll() does not return any messages anymore.
When this happens, we just seek() to the current offset, and messages come in again.

It is a bit annoying as we do seeks when there are no messages as well, but at least it prevents
stalling the client.

Bruno


> On 25 Jan 2016, at 16:07, Han JU <ju.han.felix@gmail.com> wrote:
> 
> Hi Bruno,
> 
> Can you tell me a little bit more about that? A seek() in the
> `onPartitionAssigned`?
> 
> Thanks.
> 
> 2016-01-25 10:51 GMT+01:00 Han JU <ju.han.felix@gmail.com>:
> 
>> Ok I'll create a JIRA issue on this.
>> 
>> Thanks!
>> 
>> 2016-01-23 21:47 GMT+01:00 Bruno Rassaerts <bruno.rassaerts@novazone.be>:
>> 
>>> +1 here
>>> 
>>> As a workaround we seek to the current offset which resets the current
>>> clients internal states and everything continues.
>>> 
>>> Regards,
>>> Bruno Rassaerts | Freelance Java Developer
>>> 
>>> Novazone, Edingsesteenweg 302, B-1755 Gooik, Belgium
>>> T: +32(0)54/26.02.03 - M:+32(0)477/39.01.15
>>> bruno.rassaerts@novazone.be -www.novazone.be
>>> 
>>>> On 23 Jan 2016, at 17:52, Ismael Juma <ismael@juma.me.uk> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> Can you please file an issue in JIRA so that we make sure this is
>>>> investigated?
>>>> 
>>>> Ismael
>>>> 
>>>>> On Fri, Jan 22, 2016 at 3:13 PM, Han JU <ju.han.felix@gmail.com>
>>> wrote:
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> I'm prototyping with the new consumer API of kafka 0.9 and I'm
>>> particularly
>>>>> interested in the `ConsumerRebalanceListener`.
>>>>> 
>>>>> My test setup is like the following:
>>>>> - 5M messages pre-loaded in one node kafka 0.9
>>>>> - 12 partitions, auto offset commit set to false
>>>>> - in `onPartitionsRevoked`, commit offset and flush the local state
>>>>> 
>>>>> The test run is like the following:
>>>>> - launch one process with 2 consumers and let it consume for a while
>>>>> - launch another process with 2 consumers, this triggers a
>>> rebalancing,
>>>>> and let these 2 processes run until messages are all consumed
>>>>> 
>>>>> The code is here: https://gist.github.com/darkjh/fe1e5a5387bf13b4d4dd
>>>>> 
>>>>> So at first, the 2 consumers of the first process each got 6
>>> partitions.
>>>>> And after the rebalancing, each consumer got 3 partitions. It's
>>> confirmed
>>>>> by logging inside the `onPartitionAssigned` callback.
>>>>> 
>>>>> But after the rebalancing, one of the 2 consumers of the first process
>>> stop
>>>>> receiving messages, even if it has partitions assigned to:
>>>>> 
>>>>> balance-1 pulled 7237 msgs ...
>>>>> balance-0 pulled 7263 msgs ...
>>>>> 2016-01-22 15:50:37,533 [INFO] [pool-1-thread-2]
>>>>> o.a.k.c.c.i.AbstractCoordinator - Attempt to heart beat failed since
>>> the
>>>>> group is rebalancing, try to re-join group.
>>>>> balance-1 flush @ 536637
>>>>> balance-1 committed offset for List(balance-11, balance-10, balance-9,
>>>>> balance-8, balance-7, balance-6)
>>>>> 2016-01-22 15:50:37,575 [INFO] [pool-1-thread-1]
>>>>> o.a.k.c.c.i.AbstractCoordinator - Attempt to heart beat failed since
>>> the
>>>>> group is rebalancing, try to re-join group.
>>>>> balance-0 flush @ 543845
>>>>> balance-0 committed offset for List(balance-5, balance-4, balance-3,
>>>>> balance-2, balance-1, balance-0)
>>>>> balance-0 got assigned List(balance-5, balance-4, balance-3)
>>>>> balance-1 got assigned List(balance-11, balance-10, balance-9)
>>>>> balance-1 pulled 3625 msgs ...
>>>>> balance-0 pulled 3621 msgs ...
>>>>> balance-0 pulled 3631 msgs ...
>>>>> balance-0 pulled 3631 msgs ...
>>>>> balance-1 pulled 0 msgs ...
>>>>> balance-0 pulled 3643 msgs ...
>>>>> balance-0 pulled 3643 msgs ...
>>>>> balance-1 pulled 0 msgs ...
>>>>> balance-0 pulled 3622 msgs ...
>>>>> balance-0 pulled 3632 msgs ...
>>>>> balance-1 pulled 0 msgs ...
>>>>> balance-0 pulled 3637 msgs ...
>>>>> balance-0 pulled 3641 msgs ...
>>>>> balance-0 pulled 3640 msgs ...
>>>>> balance-1 pulled 0 msgs ...
>>>>> balance-0 pulled 3632 msgs ...
>>>>> balance-0 pulled 3630 msgs ...
>>>>> balance-1 pulled 0 msgs ...
>>>>> ......
>>>>> 
>>>>> `balance-0` and `balance-1` are the names of the consumer thread. So
>>> after
>>>>> the rebalancing, thread `balance-1` continues to poll but no message
>>>>> arrive, given that it has got 3 partitions assigned to after the
>>>>> rebalancing.
>>>>> 
>>>>> Finally other 3 consumers pulls all their partitions' message, the
>>>>> situation is like
>>>>> 
>>>>> GROUP, TOPIC, PARTITION, CURRENT OFFSET, LOG END OFFSET, LAG, OWNER
>>>>> balance-test, balance, 9, 417467, 417467, 0, consumer-2_/127.0.0.1
>>>>> balance-test, balance, 10, 417467, 417467, 0, consumer-2_/127.0.0.1
>>>>> balance-test, balance, 11, 417467, 417467, 0, consumer-2_/127.0.0.1
>>>>> balance-test, balance, 6, 180269, 417467, 237198, consumer-2_/
>>> 127.0.0.1
>>>>> balance-test, balance, 7, 180036, 417468, 237432, consumer-2_/
>>> 127.0.0.1
>>>>> balance-test, balance, 8, 180197, 417467, 237270, consumer-2_/
>>> 127.0.0.1
>>>>> balance-test, balance, 3, 417467, 417467, 0, consumer-1_/127.0.0.1
>>>>> balance-test, balance, 4, 417468, 417468, 0, consumer-1_/127.0.0.1
>>>>> balance-test, balance, 5, 417468, 417468, 0, consumer-1_/127.0.0.1
>>>>> balance-test, balance, 0, 417467, 417467, 0, consumer-1_/127.0.0.1
>>>>> balance-test, balance, 1, 417467, 417467, 0, consumer-1_/127.0.0.1
>>>>> balance-test, balance, 2, 417467, 417467, 0, consumer-1_/127.0.0.1
>>>>> 
>>>>> So you can see, partition [6, 7, 8] still has messages, but the
>>> consumer
>>>>> can't pull them after the rebalancing.
>>>>> 
>>>>> I've tried 0.9.0.0 release, trunk and 0.9.0 branch, for both
>>> server/broker
>>>>> and client.
>>>>> 
>>>>> I hope the code is clear enough to illustrate/reproduce the problem.
>>> It's
>>>>> quite a surprise for me because this is the main feature of the new
>>>>> consumer API, but it does not seem to work properly.
>>>>> Feel free to talk to me for any details.
>>>>> --
>>>>> *JU Han*
>>>>> 
>>>>> Software Engineer @ Teads.tv
>>>>> 
>>>>> +33 0619608888
>>>>> 
>>> 
>> 
>> 
>> 
>> --
>> *JU Han*
>> 
>> Software Engineer @ Teads.tv
>> 
>> +33 0619608888
>> 
> 
> 
> 
> -- 
> *JU Han*
> 
> Software Engineer @ Teads.tv
> 
> +33 0619608888


Mime
View raw message