kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Becket Qin <becket....@gmail.com>
Subject Re: client use high cpu which caused by delayedFetch operation immediately return
Date Sun, 09 Oct 2016 04:39:26 GMT
Can you check if you have KAFKA-3003 when you run the code?

On Sat, Oct 8, 2016 at 12:52 AM, Kafka <kafkausr@126.com> wrote:

> Hi all,
>         we found our consumer have high cpu load in our product
> enviroment,as we know,fetch.min.bytes and fetch.wait.ma <
> http://fetch.wait.ma/>x.ms will affect the frequency of consumer’s return,
> so we adjust them to very big so that broker is very hard to satisfy it.
>         then we found the problem is not be solved,then we check the
> kafka’s code,we check delayedFetch’s tryComplete() function has these codes,
>
>          if (endOffset.messageOffset != fetchOffset.messageOffset) {
>               if (endOffset.onOlderSegment(fetchOffset)) {
>                 // Case C, this can happen when the new fetch operation is
> on a truncated leader
>                 debug("Satisfying fetch %s since it is fetching later
> segments of partition %s.".format(fetchMetadata, topicAndPartition))
>                 return forceComplete()
>               } else if (fetchOffset.onOlderSegment(endOffset)) {
>                 // Case C, this can happen when the fetch operation is
> falling behind the current segment
>                 // or the partition has just rolled a new segment
>                 debug("Satisfying fetch %s immediately since it is
> fetching older segments.".format(fetchMetadata))
>                 return forceComplete()
>               } else if (fetchOffset.messageOffset <
> endOffset.messageOffset) {
>                 // we need take the partition fetch size as upper bound
> when accumulating the bytes
>                 accumulatedSize += math.min(endOffset.positionDiff(fetchOffset),
> fetchStatus.fetchInfo.fetchSize)
>               }
>             }
>
> so we can ensure that our fetchOffset’s segmentBaseOffset is not the same
> as endOffset’s segmentBaseOffset,then we check our topic-partition’s
> segment, we found the data in the segment is all cleaned by the kafka for
> log.retention.
> and we guess that the  fetchOffset’s segmentBaseOffset is smaller than
> endOffset’s segmentBaseOffset leads this problem.
>
> but my point is should we use we use these code to make client use less
> cpu,
>    if (endOffset.messageOffset != fetchOffset.messageOffset) {
>               if (endOffset.onOlderSegment(fetchOffset)) {
>                 return false
>               } else if (fetchOffset.onOlderSegment(endOffset)) {
>                 return false
>               }
>     }
>
> and then it will response after fetch.wait.ma <http://fetch.wait.ma/>x.ms
> in this scene instead of immediately return.
>
> Feedback is greatly appreciated. Thanks.
>
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message