storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stig Rohde Døssing <s...@apache.org>
Subject Re: KafkaSpout reconsume all kafka messages after kafka rolling restart
Date Wed, 23 Aug 2017 11:41:17 GMT
I think you should add another Zookeeper, you generally want an odd number
since Zookeeper requires a majority (n/2 + 1) of nodes to be available in
order for the cluster to function. At 2 nodes your cluster will stop
serving requests if either node goes down.

I haven't been able to spot an issue with the spout that could cause the
issue you are seeing. This is the code we use to determine which offsets to
commit
https://github.com/apache/storm/blob/v1.1.0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java#L70.
To me it doesn't look like it can't return anything smaller than the
current commit offset. We initialize the committed offset here
https://github.com/apache/storm/blob/v1.1.0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L170
based on the committed offset returned by Kafka and the first poll offset
strategy, which in your case should be UNCOMMITTED_EARLIEST.


2017-08-22 9:03 GMT+02:00 Elyahou Ittah <elyahou.i@fiverr.com>:

> I checked the __consumer_offsets topic and here is an extraction from this
> log for the same consumer group, a specific topic (users) and specific
> partition (15):
>
> [storm_kafka_topology,users,15]::[OffsetMetadata[8327,{topic-partition=users-15,
> offset=8327, numFails=0, thread='Thread-11-kafkaSpout-executor[4
> 4]'}],CommitTime 1503230031557,ExpirationTime 1503316431557]
> [storm_kafka_topology,users,15]::[OffsetMetadata[8330,{topic-partition=users-15,
> offset=8330, numFails=0, thread='Thread-11-kafkaSpout-executor[4
> 4]'}],CommitTime 1503230332504,ExpirationTime 1503316732504]
> [storm_kafka_topology,users,15]::[OffsetMetadata[6512,{topic-partition=users-15,
> offset=6512, numFails=0, thread='Thread-11-kafkaSpout-executor[4
> 4]'}],CommitTime 1503230748612,ExpirationTime 1503317148612]
> [storm_kafka_topology,users,15]::[OffsetMetadata[8172,{topic-partition=users-15,
> offset=8172, numFails=0, thread='Thread-11-kafkaSpout-executor[4
> 4]'}],CommitTime 1503230791209,ExpirationTime 1503317191209]
> [storm_kafka_topology,users,15]::[OffsetMetadata[8330,{topic-partition=users-15,
> offset=8330, numFails=0, thread='Thread-11-kafkaSpout-executor[4
> 4]'}],CommitTime 1503230821337,ExpirationTime 1503317221337]
> [storm_kafka_topology,users,15]::[OffsetMetadata[8333,{topic-partition=users-15,
> offset=8333, numFails=0, thread='Thread-11-kafkaSpout-executor[4
> 4]'}],CommitTime 1503231513311,ExpirationTime 1503317913311]
> [storm_kafka_topology,users,15]::[OffsetMetadata[8338,{topic-partition=users-15,
> offset=8338, numFails=0, thread='Thread-11-kafkaSpout-executor[4
> 4]'}],CommitTime 1503231603513,ExpirationTime 1503318003513]
> [storm_kafka_topology,users,15]::[OffsetMetadata[8344,{topic-partition=users-15,
> offset=8344, numFails=0, thread='Thread-11-kafkaSpout-executor[4
> 4]'}],CommitTime 1503231693829,ExpirationTime 1503318093829]
>
> we can see here that the consumer was at offset 8330 at Sunday, August 20,
> 2017 11:53:51.557 AM and at offset 6512 somes minutes after (the kafka
> restart occured at this time)
>
> On Tue, Aug 22, 2017 at 12:31 AM, Elyahou Ittah <elyahou.i@fiverr.com>
> wrote:
>
>> The topology is working well and commiting offset for some times, and I
>> also restarted it and saw it start from last commited offset, I saw the
>> issue only at kafka restart.
>>
>> I have two zookeeper and they were not restarted.
>>
>>
>>
>> On Tue, Aug 22, 2017 at 12:24 AM, Stig Rohde Døssing <srdo@apache.org>
>> wrote:
>>
>>> I think the __consumer_offsets configuration looks fine, I just wanted
>>> to be sure there wasn't only one replica. How many Zookeepers do you have,
>>> and were they restarted as well?
>>>
>>> I would suspect that the spout isn't committing properly for some
>>> reason. The default behavior is to get the committed offset from Kafka when
>>> starting, and if it is present it is used. If it isn't there the spout
>>> starts over from the beginning of the partitions. You can check if the
>>> spout is committing by enabling debug logging for the classes in
>>> storm-kafka-client, to check logs like this one
>>> https://github.com/apache/storm/blob/v1.1.0/external/storm-k
>>> afka-client/src/main/java/org/apache/storm/kafka/spout/inter
>>> nal/OffsetManager.java#L118.
>>>
>>> 2017-08-21 22:20 GMT+02:00 Elyahou Ittah <elyahou.i@fiverr.com>:
>>>
>>>> Hi Stig,
>>>>
>>>> I don't have this kind of errors normally. It just occured at the
>>>> rolling restart of kafka.
>>>>
>>>> Also the __consumer_offsets configuration is:
>>>> Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3
>>>> Configs:segment.bytes=104857600,cleanup.policy=compact,compr
>>>> ession.type=producer
>>>> Topic: __consumer_offsets Partition: 0 Leader: 0 Replicas: 2,1,0 Isr:
>>>> 0,1
>>>> ...
>>>>
>>>> The fact that the replication factor is 3 even if there is only two
>>>> broker can cause an issue ?
>>>>
>>>>
>>>> On Mon, Aug 21, 2017 at 6:51 PM, Stig Rohde Døssing <srdo@apache.org>
>>>> wrote:
>>>>
>>>>> The spout will reemit some messages if it fails to commit offsets to
>>>>> Kafka. Are these CommitFailedExceptions occuring in your logs normally?
>>>>>
>>>>> Also since the spout stores offsets in Kafka, you may want to check
>>>>> the replication factor on that topic by running `./kafka-topics.sh
>>>>> --zookeeper localhost:2181 --describe --topic __consumer_offsets` in
one of
>>>>> your Kafka /bin directories.
>>>>>
>>>>> 2017-08-21 17:17 GMT+02:00 Elyahou Ittah <elyahou.i@fiverr.com>:
>>>>>
>>>>>> The config is the default one, I just set the bootstrap server.
>>>>>>
>>>>>> Kafka version is 0.11
>>>>>>
>>>>>> Storm-kafka-client is 1.1.0
>>>>>>
>>>>>> On Mon, Aug 21, 2017 at 5:48 PM, Stig Rohde Døssing <srdo@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Elyahou,
>>>>>>>
>>>>>>> Could you post your spout configuration, Kafka version and
>>>>>>> storm-kafka-client version? The logs imply that your spouts are
not polling
>>>>>>> often enough.
>>>>>>>
>>>>>>> 2017-08-21 <20%2017%2008%2021> 9:44 GMT+02:00 Elyahou Ittah
<
>>>>>>> elyahou.i@fiverr.com>:
>>>>>>>
>>>>>>>> I noticed that storm kafka spout reconsume all kafka message
after
>>>>>>>> a rolling restart of kafka cluster.
>>>>>>>>
>>>>>>>> This issue occured only with kafkaSpout consumer and not
for my
>>>>>>>> other consumers (ruby based  using the kafka consumer API
like kafkaSpout)
>>>>>>>>
>>>>>>>> Attached logs of the spout.
>>>>>>>>
>>>>>>>> Do you know what can cause this kind of behavior ?
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message