storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hugo Da Cruz Louro <hlo...@hortonworks.com>
Subject Re: Kafka spout stops commiting offsets on some partitions
Date Thu, 16 Feb 2017 17:23:11 GMT
Hi,

Most likely this is happening because some messages failed and/or got acked out of order.

For example, if you process messages with offsets 1,2,3,X,5,6,7,… where X is message (with
offset 4) that failed, the Spout will only commit offset 3. Until the message with offset
4 is acked, or reaches max number of retrials (which is configurable but by default is forever),
the messages with offsets 5,6,7,… will not get committed despite having been acked. That
is because you cannot do kafkaConsumer.commitSync(new TopicPartion(test_topic,5)) if the message
with offset 4 has not been acked or discarded by reaching the max number of retrials. Until
the spout moves on from message with offset 4, the lag will increase when new messages come
in.

You can try enabling the log level to ALL for org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff
to see which messages are getting retried. You can also set log level to DEBUG or ALL org.apache.storm.kafka.spout.KafkaSpout
to see exactly which offsets/records are being processed. However, it will print a lot of
messages, and may slow down processing considerably.

You can also set the maxNumberOfRetires to a small number (e.g. 3-5) to see if that solves
this situation.

Hugo

> On Feb 16, 2017, at 8:36 AM, Igor Kuzmenko <f1sherox@gmail.com> wrote:
> 
> Today in Storm UI I saw this Kafka Spouts Lag:
> Id	                Topic	Partition	Latest Offset	Spout Committed Offset	Lag
> Kafka Spout	test_topic	0	        5591087	        5562814	                        28273
> Kafka Spout	test_topic	1	        2803256	        2789090	                        14166
> Kafka Spout	test_topic	2	        2801927	        2787767	                        14160
> Kafka Spout	test_topic	3	        2800627	        2800626	                        1
> Kafka Spout	test_topic	4	        2799391	        2785238	                        14153
> Kafka Spout	test_topic	5	        2798126	        2798125	                        1
> Kafka Spout	test_topic	6	        2796874	        2782726	                        14148
> Kafka Spout	test_topic	7	        2795669	        2781528	                        14141
> Kafka Spout	test_topic	8	        2794419	        2780280	                        14139
> Kafka Spout	test_topic	9	        2793255	        2793254	                        1
> Kafka Spout	test_topic	10	        2792109	        2777978	                        14131
> Kafka Spout	test_topic	11	        2790939	        2776817	                        14122
> Kafka Spout	test_topic	12	        2789783	        2775665	                        14118
> Kafka Spout	test_topic	13	        2788651	        2774539	                        14112
> Kafka Spout	test_topic	14	        2787521	        2773412	                        14109
> 
> 
> There was no new messages in that topic for a while, so I expected, that my topology
would process all messages. But lag shows me that there's some uncommitted messages in most
of topics. Topology stop working and didn't process any messages for few hours.
> 
> In logs I found these messages:
> 2017-02-16 14:50:20.187 o.a.s.k.s.KafkaSpout [DEBUG] Unexpected offset found [2777849].
OffsetEntry{topic-partition=test_topic-10, fetchOffset=2775755, committedOffset=2777978, ackedMsgs=[{topic-partition=test_topic-10,
offset=2777849, numFails=0}, {topic-partition=test_topic-10, offset=2777850, numFails=0},

> ........................................
>  {topic-partition=test_topic-10, offset=2792107, numFails=0}, {topic-partition=test_topic-10,
offset=2792108, numFails=0}]}
> 2017-02-16 14:50:20.201 o.a.s.k.s.KafkaSpout [DEBUG] No offsets ready to commit. OffsetEntry{topic-partition=test_topic-10,
fetchOffset=2775755, committedOffset=2777978, ackedMsgs=[{topic-partition=test_topic-10, offset=2777849,
numFails=0}, 
> .......................................
> {topic-partition=test_topic-10, offset=2792108, numFails=0}]}
> 
> 
> So, I assume, messages, that showed as uncommitted, are actually processed by topology
and acked. After I start sending new messages to Kafka topic topology start working, but spout
lag increasing.
> Why spout could stop committing to Kafka?

Mime
View raw message