storm-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hugo Da Cruz Louro <>
Subject Re: Kafka spout stops commiting offsets on some partitions
Date Tue, 21 Feb 2017 17:54:28 GMT
Hi Igor,

In normal operation this shouldn’t happen, it it could have happened due to a consumer rebalance.
We have improved the code recently. From this info alone it’s hard to tell what really happened.
I could suggest a couple of things.

1. Try reproducing the issue again
2. Enable logs to ALL (not only Debug) to see which tuples get committed and which don’t
(this may print a lot of messages - however
3. If you have the chance, test a more up to date version of the spout. There have been a
few bug fixes, but could handle this case (if it’s indeed an issue)


On Feb 16, 2017, at 12:59 PM, Igor Kuzmenko <<>>

Thanks for reply Hugo.
I'll double check log tomorrow looking for KafkaSpoutRetryExponentialBackoff calls.

I just noticed, that in log I have there's strange thing. First message is "Unexpected offset
found [2777849]". It's strange because if you look on partition 10 commited offset, it is
2777978 which is a little bit higher then offset found. The next message in log was "No offsets
ready to commit."

So, after checking 2777849 offset it immediately stoped seeking new offset to commit.

On Thu, Feb 16, 2017 at 8:23 PM, Hugo Da Cruz Louro <<>>

Most likely this is happening because some messages failed and/or got acked out of order.

For example, if you process messages with offsets 1,2,3,X,5,6,7,… where X is message (with
offset 4) that failed, the Spout will only commit offset 3. Until the message with offset
4 is acked, or reaches max number of retrials (which is configurable but by default is forever),
the messages with offsets 5,6,7,… will not get committed despite having been acked. That
is because you cannot do kafkaConsumer.commitSync(new TopicPartion(test_topic,5)) if the message
with offset 4 has not been acked or discarded by reaching the max number of retrials. Until
the spout moves on from message with offset 4, the lag will increase when new messages come

You can try enabling the log level to ALL for org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff
to see which messages are getting retried. You can also set log level to DEBUG or ALL org.apache.storm.kafka.spout.KafkaSpout
to see exactly which offsets/records are being processed. However, it will print a lot of
messages, and may slow down processing considerably.

You can also set the maxNumberOfRetires to a small number (e.g. 3-5) to see if that solves
this situation.


> On Feb 16, 2017, at 8:36 AM, Igor Kuzmenko <<>>
> Today in Storm UI I saw this Kafka Spouts Lag:
> Id                    Topic   Partition       Latest Offset   Spout Committed Offset
> Kafka Spout   test_topic      0               5591087         5562814               
> Kafka Spout   test_topic      1               2803256         2789090               
> Kafka Spout   test_topic      2               2801927         2787767               
> Kafka Spout   test_topic      3               2800627         2800626               
> Kafka Spout   test_topic      4               2799391         2785238               
> Kafka Spout   test_topic      5               2798126         2798125               
> Kafka Spout   test_topic      6               2796874         2782726               
> Kafka Spout   test_topic      7               2795669         2781528               
> Kafka Spout   test_topic      8               2794419         2780280               
> Kafka Spout   test_topic      9               2793255         2793254               
> Kafka Spout   test_topic      10              2792109         2777978               
> Kafka Spout   test_topic      11              2790939         2776817               
> Kafka Spout   test_topic      12              2789783         2775665               
> Kafka Spout   test_topic      13              2788651         2774539               
> Kafka Spout   test_topic      14              2787521         2773412               
> There was no new messages in that topic for a while, so I expected, that my topology
would process all messages. But lag shows me that there's some uncommitted messages in most
of topics. Topology stop working and didn't process any messages for few hours.
> In logs I found these messages:
> 2017-02-16 14:50:20.187 o.a.s.k.s.KafkaSpout [DEBUG] Unexpected offset found [2777849].
OffsetEntry{topic-partition=test_topic-10, fetchOffset=2775755, committedOffset=2777978, ackedMsgs=[{topic-partition=test_topic-10,
offset=2777849, numFails=0}, {topic-partition=test_topic-10, offset=2777850, numFails=0},
> ........................................
>  {topic-partition=test_topic-10, offset=2792107, numFails=0}, {topic-partition=test_topic-10,
offset=2792108, numFails=0}]}
> 2017-02-16 14:50:20.201 o.a.s.k.s.KafkaSpout [DEBUG] No offsets ready to commit. OffsetEntry{topic-partition=test_topic-10,
fetchOffset=2775755, committedOffset=2777978, ackedMsgs=[{topic-partition=test_topic-10, offset=2777849,
> .......................................
> {topic-partition=test_topic-10, offset=2792108, numFails=0}]}
> So, I assume, messages, that showed as uncommitted, are actually processed by topology
and acked. After I start sending new messages to Kafka topic topology start working, but spout
lag increasing.
> Why spout could stop committing to Kafka?

View raw message