storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Elyahou Ittah <elyaho...@fiverr.com>
Subject Re: KafkaSpout reconsume all kafka messages after kafka rolling restart
Date Tue, 22 Aug 2017 07:03:13 GMT
I checked the __consumer_offsets topic and here is an extraction from this
log for the same consumer group, a specific topic (users) and specific
partition (15):

[storm_kafka_topology,users,15]::[OffsetMetadata[8327,{topic-partition=users-15,
offset=8327, numFails=0, thread='Thread-11-kafkaSpout-executor[4
4]'}],CommitTime 1503230031557,ExpirationTime 1503316431557]
[storm_kafka_topology,users,15]::[OffsetMetadata[8330,{topic-partition=users-15,
offset=8330, numFails=0, thread='Thread-11-kafkaSpout-executor[4
4]'}],CommitTime 1503230332504,ExpirationTime 1503316732504]
[storm_kafka_topology,users,15]::[OffsetMetadata[6512,{topic-partition=users-15,
offset=6512, numFails=0, thread='Thread-11-kafkaSpout-executor[4
4]'}],CommitTime 1503230748612,ExpirationTime 1503317148612]
[storm_kafka_topology,users,15]::[OffsetMetadata[8172,{topic-partition=users-15,
offset=8172, numFails=0, thread='Thread-11-kafkaSpout-executor[4
4]'}],CommitTime 1503230791209,ExpirationTime 1503317191209]
[storm_kafka_topology,users,15]::[OffsetMetadata[8330,{topic-partition=users-15,
offset=8330, numFails=0, thread='Thread-11-kafkaSpout-executor[4
4]'}],CommitTime 1503230821337,ExpirationTime 1503317221337]
[storm_kafka_topology,users,15]::[OffsetMetadata[8333,{topic-partition=users-15,
offset=8333, numFails=0, thread='Thread-11-kafkaSpout-executor[4
4]'}],CommitTime 1503231513311,ExpirationTime 1503317913311]
[storm_kafka_topology,users,15]::[OffsetMetadata[8338,{topic-partition=users-15,
offset=8338, numFails=0, thread='Thread-11-kafkaSpout-executor[4
4]'}],CommitTime 1503231603513,ExpirationTime 1503318003513]
[storm_kafka_topology,users,15]::[OffsetMetadata[8344,{topic-partition=users-15,
offset=8344, numFails=0, thread='Thread-11-kafkaSpout-executor[4
4]'}],CommitTime 1503231693829,ExpirationTime 1503318093829]

we can see here that the consumer was at offset 8330 at Sunday, August 20,
2017 11:53:51.557 AM and at offset 6512 somes minutes after (the kafka
restart occured at this time)

On Tue, Aug 22, 2017 at 12:31 AM, Elyahou Ittah <elyahou.i@fiverr.com>
wrote:

> The topology is working well and commiting offset for some times, and I
> also restarted it and saw it start from last commited offset, I saw the
> issue only at kafka restart.
>
> I have two zookeeper and they were not restarted.
>
>
>
> On Tue, Aug 22, 2017 at 12:24 AM, Stig Rohde Døssing <srdo@apache.org>
> wrote:
>
>> I think the __consumer_offsets configuration looks fine, I just wanted to
>> be sure there wasn't only one replica. How many Zookeepers do you have, and
>> were they restarted as well?
>>
>> I would suspect that the spout isn't committing properly for some reason.
>> The default behavior is to get the committed offset from Kafka when
>> starting, and if it is present it is used. If it isn't there the spout
>> starts over from the beginning of the partitions. You can check if the
>> spout is committing by enabling debug logging for the classes in
>> storm-kafka-client, to check logs like this one
>> https://github.com/apache/storm/blob/v1.1.0/external/storm-
>> kafka-client/src/main/java/org/apache/storm/kafka/spout/
>> internal/OffsetManager.java#L118.
>>
>> 2017-08-21 22:20 GMT+02:00 Elyahou Ittah <elyahou.i@fiverr.com>:
>>
>>> Hi Stig,
>>>
>>> I don't have this kind of errors normally. It just occured at the
>>> rolling restart of kafka.
>>>
>>> Also the __consumer_offsets configuration is:
>>> Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3
>>> Configs:segment.bytes=104857600,cleanup.policy=compact,compr
>>> ession.type=producer
>>> Topic: __consumer_offsets Partition: 0 Leader: 0 Replicas: 2,1,0 Isr:
>>> 0,1
>>> ...
>>>
>>> The fact that the replication factor is 3 even if there is only two
>>> broker can cause an issue ?
>>>
>>>
>>> On Mon, Aug 21, 2017 at 6:51 PM, Stig Rohde Døssing <srdo@apache.org>
>>> wrote:
>>>
>>>> The spout will reemit some messages if it fails to commit offsets to
>>>> Kafka. Are these CommitFailedExceptions occuring in your logs normally?
>>>>
>>>> Also since the spout stores offsets in Kafka, you may want to check the
>>>> replication factor on that topic by running `./kafka-topics.sh --zookeeper
>>>> localhost:2181 --describe --topic __consumer_offsets` in one of your Kafka
>>>> /bin directories.
>>>>
>>>> 2017-08-21 17:17 GMT+02:00 Elyahou Ittah <elyahou.i@fiverr.com>:
>>>>
>>>>> The config is the default one, I just set the bootstrap server.
>>>>>
>>>>> Kafka version is 0.11
>>>>>
>>>>> Storm-kafka-client is 1.1.0
>>>>>
>>>>> On Mon, Aug 21, 2017 at 5:48 PM, Stig Rohde Døssing <srdo@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Hi Elyahou,
>>>>>>
>>>>>> Could you post your spout configuration, Kafka version and
>>>>>> storm-kafka-client version? The logs imply that your spouts are not
polling
>>>>>> often enough.
>>>>>>
>>>>>> 2017-08-21 <20%2017%2008%2021> 9:44 GMT+02:00 Elyahou Ittah
<
>>>>>> elyahou.i@fiverr.com>:
>>>>>>
>>>>>>> I noticed that storm kafka spout reconsume all kafka message
after a
>>>>>>> rolling restart of kafka cluster.
>>>>>>>
>>>>>>> This issue occured only with kafkaSpout consumer and not for
my
>>>>>>> other consumers (ruby based  using the kafka consumer API like
kafkaSpout)
>>>>>>>
>>>>>>> Attached logs of the spout.
>>>>>>>
>>>>>>> Do you know what can cause this kind of behavior ?
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message