kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From James Olsen <ja...@inaseq.com>
Subject Re: Problems when Consuming from multiple Partitions
Date Sun, 08 Mar 2020 21:48:59 GMT
Thanks for your response.  Yes the second issue can be mitigated by reducing the fetch.max.wait.ms
although reducing it too far creates excessive CPU load on the Brokers.  However I've done
some further testing and found what looks like the underlying cause.

In the scenario below the Consumer is consuming from 2 Partitions (MyTopic-0 and MyTopic-1).
 There is a cycle of messages being fetched and ignored.  In each cycle a subsequent fetch
to get them again does not occur until after a complete fetch.max.wait.ms expires.  I suspect
this is due initially to the fact that MyTopic-0 has never had any messages and hence has
no epoch and subsequently is being fetched on it’s own - but being empty results in the
delay.  Someone who knows more about the meaning of "toSend=(), toForget=(MyTopic-1), implied=(MyTopic-0)”
might be able to enlighten things further.

I can post a more complete log of this if anyone wants to take a look.

I’m going to try Kafka 2.3 Brokers to see if the "Skipping validation …” bit has any
impact.

2020-03-09 09:46:43,093 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2'
[Consumer clientId=consumer-Redacted-group-1, groupId=Redacted-group] Fetch READ_UNCOMMITTED
at offset 40 for partition MyTopic-1 returned fetch data (error=NONE, highWaterMark=41, lastStableOffset
= 41, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=573)

2020-03-09 09:46:43,093 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2'
[Consumer clientId=consumer-Redacted-group-1, groupId=Redacted-group] Ignoring fetched records
for partition MyTopic-1 since it no longer has valid position

2020-03-09 09:46:43,093 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2'
[Consumer clientId=consumer-Redacted-group-1, groupId=Redacted-group] Added READ_UNCOMMITTED
fetch request for partition MyTopic-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), epoch=-1}} to node
localhost:9093 (id: 1001 rack: null)

2020-03-09 09:46:43,093 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2'
[Consumer clientId=consumer-Redacted-group-1, groupId=Redacted-group] Sending READ_UNCOMMITTED
IncrementalFetchRequest(toSend=(), toForget=(MyTopic-1), implied=(MyTopic-0)) to broker localhost:9093
(id: 1001 rack: null)

2020-03-09 09:46:43,095 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2'
[Consumer clientId=consumer-Redacted-group-1, groupId=Redacted-group] Skipping validation
of fetch offsets for partitions [MyTopic-1] since the broker does not support the required
protocol version (introduced in Kafka 2.3)

2020-03-09 09:46:43,597 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2'
[Consumer clientId=consumer-Redacted-group-1, groupId=Redacted-group] Added READ_UNCOMMITTED
fetch request for partition MyTopic-0 at position FetchPosition{offset=0, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), epoch=-1}} to node
localhost:9093 (id: 1001 rack: null)

2020-03-09 09:46:43,597 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2'
[Consumer clientId=consumer-Redacted-group-1, groupId=Redacted-group] Added READ_UNCOMMITTED
fetch request for partition MyTopic-1 at position FetchPosition{offset=40, offsetEpoch=Optional[0],
currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), epoch=-1}} to node
localhost:9093 (id: 1001 rack: null)

2020-03-09 09:46:43,597 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2'
[Consumer clientId=consumer-Redacted-group-1, groupId=Redacted-group] Sending READ_UNCOMMITTED
IncrementalFetchRequest(toSend=(MyTopic-1), toForget=(), implied=(MyTopic-0)) to broker localhost:9093
(id: 1001 rack: null)

2020-03-09 09:46:43,599 DEBUG [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2'
[Consumer clientId=consumer-Redacted-group-1, groupId=Redacted-group] Fetch READ_UNCOMMITTED
at offset 40 for partition MyTopic-1 returned fetch data (error=NONE, highWaterMark=41, lastStableOffset
= 41, logStartOffset = 0, preferredReadReplica = absent, abortedTransactions = null, recordsSizeInBytes=573)


On 5/03/2020, at 11:45 PM, M. Manna <manmedia@gmail.com<mailto:manmedia@gmail.com>>
wrote:

Hi James,

3 Consumers in a group means you are having 20 partitions per consumer (as
per your 60 partition and 1 CGroup setup), 5 means 12. There's nothing
special about these numbers as you also noticed.
Have you tried setting fetch.max.wait.ms = 0 and see whether that's making
a difference for you?

Thanks,


On Thu, 5 Mar 2020 at 03:43, James Olsen <james@inaseq.com<mailto:james@inaseq.com>>
wrote:

I’m seeing behaviour that I don’t understand when I have Consumers
fetching from multiple Partitions from the same Topic.  There are two
different conditions arising:

1. A subset of the Partitions allocated to a given Consumer not being
consumed at all.  The Consumer appears healthy, the Thread is running and
logging activity and is successfully processing records from some of the
Partitions it has been assigned.  I don’t think this is due to the first
Partition fetched filling a Batch (KIP-387).  The problem does not occur if
we have a particular number of Consumers (3 in this case) but it has failed
with a range of other larger values.  I don’t think there is anything
special about 3 - it just happens to work OK with that value although it is
the same as the Broker and Replica count.  When we tried 6, 5 Consumers
were fine but 1 exhibited this issue.

2. Up to a half second delay between Producer sending and Consumer
receiving a message.  This looks suspiciously like the fetch.max.wait.ms=500
but we also have fetch.min.bytes=1 so should get messages as soon as
something is available.  The only explanation I can think of is if the
fetch.max.wait.ms is applied in full to the first Partition checked and
it remains empty for the duration.  Then it moves on to a subsequent
non-empty Partition and delivers messages from there.

Our environment is AWS MSK (Kafka 2.2.1) and Kafka Java client 2.4.0.

All environments appear healthy and under light load, e.g. clients only
operating at a 1-2% CPU, Brokers (3) at 5-10% CPU.   No swap, no crashes,
no dead threads etc.

Typical scenario is a Topic with 60 Partitions, 3 Replicas and a single
ConsumerGroup with 5 Consumers.  The Partitioning is for semantic purposes
with the intention being to add more Consumers as the business grows and
load increases.  Some of the Partitions are always empty due to using short
string keys and the default Partitioner - we will probably implement a
custom Partitioner to achieve better distribution in the near future.

I don’t have access to the detailed JMX metrics yet but am working on that
in the hope it will help diagnose.

Thoughts and advice appreciated!

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message