storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hugo Da Cruz Louro <hlo...@hortonworks.com>
Subject Re: Kafka spout stops commiting offsets on some partitions
Date Thu, 16 Feb 2017 21:34:01 GMT
Which version of Storm are you using ?

On Feb 16, 2017, at 12:59 PM, Igor Kuzmenko <f1sherox@gmail.com<mailto:f1sherox@gmail.com>>
wrote:

Thanks for reply Hugo.
I'll double check log tomorrow looking for KafkaSpoutRetryExponentialBackoff calls.

I just noticed, that in log I have there's strange thing. First message is "Unexpected offset
found [2777849]". It's strange because if you look on partition 10 commited offset, it is
2777978 which is a little bit higher then offset found. The next message in log was "No offsets
ready to commit."

So, after checking 2777849 offset it immediately stoped seeking new offset to commit.

On Thu, Feb 16, 2017 at 8:23 PM, Hugo Da Cruz Louro <hlouro@hortonworks.com<mailto:hlouro@hortonworks.com>>
wrote:
Hi,

Most likely this is happening because some messages failed and/or got acked out of order.

For example, if you process messages with offsets 1,2,3,X,5,6,7,… where X is message (with
offset 4) that failed, the Spout will only commit offset 3. Until the message with offset
4 is acked, or reaches max number of retrials (which is configurable but by default is forever),
the messages with offsets 5,6,7,… will not get committed despite having been acked. That
is because you cannot do kafkaConsumer.commitSync(new TopicPartion(test_topic,5)) if the message
with offset 4 has not been acked or discarded by reaching the max number of retrials. Until
the spout moves on from message with offset 4, the lag will increase when new messages come
in.

You can try enabling the log level to ALL for org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff
to see which messages are getting retried. You can also set log level to DEBUG or ALL org.apache.storm.kafka.spout.KafkaSpout
to see exactly which offsets/records are being processed. However, it will print a lot of
messages, and may slow down processing considerably.

You can also set the maxNumberOfRetires to a small number (e.g. 3-5) to see if that solves
this situation.

Hugo

> On Feb 16, 2017, at 8:36 AM, Igor Kuzmenko <f1sherox@gmail.com<mailto:f1sherox@gmail.com>>
wrote:
>
> Today in Storm UI I saw this Kafka Spouts Lag:
> Id                    Topic   Partition       Latest Offset   Spout Committed Offset
 Lag
> Kafka Spout   test_topic      0               5591087         5562814               
         28273
> Kafka Spout   test_topic      1               2803256         2789090               
         14166
> Kafka Spout   test_topic      2               2801927         2787767               
         14160
> Kafka Spout   test_topic      3               2800627         2800626               
         1
> Kafka Spout   test_topic      4               2799391         2785238               
         14153
> Kafka Spout   test_topic      5               2798126         2798125               
         1
> Kafka Spout   test_topic      6               2796874         2782726               
         14148
> Kafka Spout   test_topic      7               2795669         2781528               
         14141
> Kafka Spout   test_topic      8               2794419         2780280               
         14139
> Kafka Spout   test_topic      9               2793255         2793254               
         1
> Kafka Spout   test_topic      10              2792109         2777978               
         14131
> Kafka Spout   test_topic      11              2790939         2776817               
         14122
> Kafka Spout   test_topic      12              2789783         2775665               
         14118
> Kafka Spout   test_topic      13              2788651         2774539               
         14112
> Kafka Spout   test_topic      14              2787521         2773412               
         14109
>
>
> There was no new messages in that topic for a while, so I expected, that my topology
would process all messages. But lag shows me that there's some uncommitted messages in most
of topics. Topology stop working and didn't process any messages for few hours.
>
> In logs I found these messages:
> 2017-02-16 14:50:20.187 o.a.s.k.s.KafkaSpout [DEBUG] Unexpected offset found [2777849].
OffsetEntry{topic-partition=test_topic-10, fetchOffset=2775755, committedOffset=2777978, ackedMsgs=[{topic-partition=test_topic-10,
offset=2777849, numFails=0}, {topic-partition=test_topic-10, offset=2777850, numFails=0},
> ........................................
>  {topic-partition=test_topic-10, offset=2792107, numFails=0}, {topic-partition=test_topic-10,
offset=2792108, numFails=0}]}
> 2017-02-16 14:50:20.201 o.a.s.k.s.KafkaSpout [DEBUG] No offsets ready to commit. OffsetEntry{topic-partition=test_topic-10,
fetchOffset=2775755, committedOffset=2777978, ackedMsgs=[{topic-partition=test_topic-10, offset=2777849,
numFails=0},
> .......................................
> {topic-partition=test_topic-10, offset=2792108, numFails=0}]}
>
>
> So, I assume, messages, that showed as uncommitted, are actually processed by topology
and acked. After I start sending new messages to Kafka topic topology start working, but spout
lag increasing.
> Why spout could stop committing to Kafka?



Mime
View raw message