kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryanne Dolan <ryannedo...@gmail.com>
Subject Re: Odd behaviour of MirrorMaker and ConsumerGroupCommand
Date Tue, 05 Mar 2019 15:08:12 GMT
Tolya,

That is the expected behavior. Offsets are not consistent between mirrored
clusters.

Kafka allows duplicate records ("at least once"), which means the
downstream offsets will tend to creep higher than those in the source
partitions. For example, if a producer sends a record but doesn't receive
an ACK within a time out, it may resend the same record again. But the
record may have actually been received by the broker, so now the broker
sees the same record twice.

You can use an idempotent producer to prevent duplicates and transactions
for exactly-once replication, but even so, there is no guarantee the
offsets are consistent. For example, a source partition doesn't necessarily
start at offset zero when you start replicating it.

You are correct that failover will not work as you were expecting. I've
solved this problem in KIP-382 with "MirrorMaker 2.0", which is currently
implemented in a draft PR here:

https://github.com/apache/kafka/pull/6295

MM2 uses a sparse "offset sync" topic to keep track of the mapping between
upstream and downstream offsets, and emits checkpoints that consumers can
use for failover and failback. This can be automated, e.g. by resetting
consumer offsets based on the latest checkpoint from another cluster. The
tooling has not been released yet, but the logic is in RemoteClusterUtils.

Ryanne

On Tue, Mar 5, 2019, 5:06 AM Anatoliy Soldatov <aksoldatov@avito.ru.invalid>
wrote:

> Hello, guys!
>
> I am not sure about offsets replicated by MirrorMaker.
>
> I am replicating data from one Kafka cluster (let's say cluster A,
> Confluent Kafka 2.0) to another (cluster B, Confluent Kafka 2.1) with
> internal topics.
> MirrorMaker lag is somewhere between 1-2k events.
>
> I started replication after some time (some old events were removed
> because of retention).
> Topics on both clusters have similar number of partitions.
> ConsumerGroupCommand on cluster A and cluster B showing different results
> as below.
>
> cluster A (source):
> ~> kafka-consumer-groups \
>   --bootstrap-server clusterA:9092 \
>   --describe \
>   --group "some_group" | sort
>
> ------------------------------
> TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
>  CONSUMER-ID                                 HOST           CLIENT-ID
> some_topic             0          560498          560498          0
>  sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> some_topic             1          560569          560571          2
>  sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> some_topic             2          560478          560480          2
>  sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> some_topic             3          560528          560530          2
>  sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> some_topic             4          560542          560543          1
>  sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> some_topic             5          560497          560498          1
>  sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> some_topic             6          560484          560484          0
>  sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> some_topic             7          560527          560527          0
>  sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> some_topic             8          560539          560540          1
>  sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
>
> cluster B (destination):
> ~> kafka-consumer-groups \
>   --bootstrap-server clusterB:9092 \
>   --describe \
>   --group "some_group" | sort
>
> ------------------------------
> Consumer group 'some_group' has no active members.
>
> TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> CONSUMER-ID     HOST            CLIENT-ID
> some_topic             0          373323          481950          108627
>  -               -               -
>
>
> However, offset metadata in __consumer_offsets topic is the same on both
> clusters.
>
> cluster A (source):
> ~> kafka-console-consumer \
> --formatter
> "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
> --bootstrap-server clusterA:9092 \
> --topic __consumer_offsets | grep some_topic
>
> ------------------------------
> [some_group,some_topic,7]::[OffsetMetadata[561492,NO_METADATA],CommitTime
> 1551782709369,ExpirationTime 1552387509369]
> ...
>
> cluster B (destination):
> ~> kafka-console-consumer \
> --formatter
> "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" \
> --bootstrap-server clusterB:9092 \
> --topic __consumer_offsets | grep some_topic
>
> ------------------------------
> [some_group,some_topic,7]::OffsetAndMetadata(offset=561492,
> leaderEpoch=Optional.empty, metadata=, commitTimestamp=1551782709369,
> expireTimestamp=Some(1552387509369))
> ...
>
> Notice, that offsets matches output of ConsumerGroupCommand for cluster A,
> but not for cluster B.
>
> So, there are my questions:
> Why ConsumerGroupCommand showing different results on cluster A and
> cluster B?
> Why consumer lag is so high on cluster B?
> Is ConsumerGroupCommand using info not from __consumer_offsets topic?
> Will all my consumers stuck (because of incompatible offsets) in case of
> failover?
>
> Kind regards,
> Tolya
>
>
> ________________________________
> "This message contains confidential information/commercial secret. If you
> are not the intended addressee of this message you may not copy, save,
> print or forward it to any third party and you are kindly requested to
> destroy this message and notify the sender thereof by email.
> ?????? ????????? ???????? ???????????????? ??????????/??????????,
> ?????????? ???????????? ??????. ???? ?? ?? ????????? ?????????? ?????????
> ??????? ?????????, ?? ?? ?????? ??????????, ?????????, ???????? ???
> ?????????? ??? ????? ???? ???? ?????. ??????? ?????????? ?????? ????????? ?
> ????????? ?? ???? ??????????? ??????????? ???????."
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message