kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryanne Dolan <ryannedo...@gmail.com>
Subject Re: Odd behaviour of MirrorMaker and ConsumerGroupCommand
Date Tue, 05 Mar 2019 17:05:35 GMT
Tolya,

You mentioned that you are replicating "with internal topics", so I'd
expect the __consumer_offsets topic in the target cluster to include (at
least) the same records as the source cluster. MirrorMaker does not
translate offsets, so the downstream commits will be wrong if you try to
replicate __consumer_offsets like that.

Re why kafka-consumer-groups is reporting different information, I suspect
that the downstream __consumer_offsets topic does not have the correct
number of partitions. If __consumer_offsets was created by MirrorMaker
during replication, it would have been created with the cluster's default
number of partitions, which is not the same as
offsets.topic.num.partitions. In this case, the semantic partitioning will
be broken, and kafka-consumer-groups (or indeed KafkaConsumer) will be
confused.

Re why kafka-console-consumer is showing slightly different records, I
believe there must be a string serde somewhere in your pipeline. It appears
that the offset record value has been toString'd.

> let consumers read from specific time (not offset number). Should it work?

Yes, that is the current best practice, though there are many reasons why
this is less than ideal.

Ryanne


On Tue, Mar 5, 2019 at 10:07 AM Anatoliy Soldatov
<aksoldatov@avito.ru.invalid> wrote:

> Hello, Ryanne and thank you for you answer!
>
> I am using idempotent producers. And you are right, I started replication
> after few days and some of source data were already deleted (because of
> retention) at that moment.
>
> Still I couldn’t understand logic behind Kafka-consumer-groups. With
> console consumer I could see that __consumer_offsets topic on both source
> and destination  clusters consists of same data (more or less).
> OffsetMetadata and commit time are identical for some randomly picked
> events from this topic.
>
> However, Kafka-consumer-groups shows really different output for source
> and destination cluster for the same consumer group. As in my previous
> letter, it shows something around 560000 current offset + 560000 end offset
> on source cluster  and 370000 current offset + 480000 end offset on
> destination cluster.  But console consumer shows that on both source and
> destination __consumer_offsets topic has events with 560000 offsets for
> this group. And Kafka-consumer-groups shows only one partition for the
> group on destination cluster. But there are 9 partitions on both clusters.
>
> Also, as far as I know, partitioner class should be different for typical
> topics and  __consumer_offsets topic (different hash keys). Is it correct?
> If so, how MirrorMaker producer handles it?
>
> I have an idea for failover – let consumers read from specific time (not
> offset number). Should it work?
>
> Also, I think MM2 Is a nice idea and waiting for it!
>
> Tolya
>
> > 5 марта 2019 г., в 18:08, Ryanne Dolan <ryannedolan@gmail.com>
> написал(а):
> >
> > Tolya,
> >
> > That is the expected behavior. Offsets are not consistent between
> mirrored
> > clusters.
> >
> > Kafka allows duplicate records ("at least once"), which means the
> > downstream offsets will tend to creep higher than those in the source
> > partitions. For example, if a producer sends a record but doesn't receive
> > an ACK within a time out, it may resend the same record again. But the
> > record may have actually been received by the broker, so now the broker
> > sees the same record twice.
> >
> > You can use an idempotent producer to prevent duplicates and transactions
> > for exactly-once replication, but even so, there is no guarantee the
> > offsets are consistent. For example, a source partition doesn't
> necessarily
> > start at offset zero when you start replicating it.
> >
> > You are correct that failover will not work as you were expecting. I've
> > solved this problem in KIP-382 with "MirrorMaker 2.0", which is currently
> > implemented in a draft PR here:
> >
> >
> https://emea01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fkafka%2Fpull%2F6295&amp;data=02%7C01%7Caksoldatov%40avito.ru%7Cf45fab1a7dae481f966708d6a17def52%7Caf0e07b3b90b472392e63fab11dd5396%7C1%7C0%7C636873959598377914&amp;sdata=Tmvj98a%2B%2Br84IFvJ7m78NDVjFh5FVfGQBJ9QEfoRoVw%3D&amp;reserved=0
> >
> > MM2 uses a sparse "offset sync" topic to keep track of the mapping
> between
> > upstream and downstream offsets, and emits checkpoints that consumers can
> > use for failover and failback. This can be automated, e.g. by resetting
> > consumer offsets based on the latest checkpoint from another cluster. The
> > tooling has not been released yet, but the logic is in
> RemoteClusterUtils.
> >
> > Ryanne
> >
> > On Tue, Mar 5, 2019, 5:06 AM Anatoliy Soldatov
> <aksoldatov@avito.ru.invalid>
> > wrote:
> >
> >> Hello, guys!
> >>
> >> I am not sure about offsets replicated by MirrorMaker.
> >>
> >> I am replicating data from one Kafka cluster (let's say cluster A,
> >> Confluent Kafka 2.0) to another (cluster B, Confluent Kafka 2.1) with
> >> internal topics.
> >> MirrorMaker lag is somewhere between 1-2k events.
> >>
> >> I started replication after some time (some old events were removed
> >> because of retention).
> >> Topics on both clusters have similar number of partitions.
> >> ConsumerGroupCommand on cluster A and cluster B showing different
> results
> >> as below.
> >>
> >> cluster A (source):
> >> ~> kafka-consumer-groups \
> >>  --bootstrap-server clusterA:9092 \
> >>  --describe \
> >>  --group "some_group" | sort
> >>
> >> ------------------------------
> >> TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> >> CONSUMER-ID                                 HOST           CLIENT-ID
> >> some_topic             0          560498          560498          0
> >> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> >> some_topic             1          560569          560571          2
> >> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> >> some_topic             2          560478          560480          2
> >> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> >> some_topic             3          560528          560530          2
> >> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> >> some_topic             4          560542          560543          1
> >> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> >> some_topic             5          560497          560498          1
> >> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> >> some_topic             6          560484          560484          0
> >> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> >> some_topic             7          560527          560527          0
> >> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> >> some_topic             8          560539          560540          1
> >> sarama-be2f868d-82ec-4aa8-9412-977d8de9a42e /x.x.x.x       sarama
> >>
> >> cluster B (destination):
> >> ~> kafka-consumer-groups \
> >>  --bootstrap-server clusterB:9092 \
> >>  --describe \
> >>  --group "some_group" | sort
> >>
> >> ------------------------------
> >> Consumer group 'some_group' has no active members.
> >>
> >> TOPIC                  PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
> >> CONSUMER-ID     HOST            CLIENT-ID
> >> some_topic             0          373323          481950          108627
> >> -               -               -
> >>
> >>
> >> However, offset metadata in __consumer_offsets topic is the same on both
> >> clusters.
> >>
> >> cluster A (source):
> >> ~> kafka-console-consumer \
> >> --formatter
> >> "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
> \
> >> --bootstrap-server clusterA:9092 \
> >> --topic __consumer_offsets | grep some_topic
> >>
> >> ------------------------------
> >>
> [some_group,some_topic,7]::[OffsetMetadata[561492,NO_METADATA],CommitTime
> >> 1551782709369,ExpirationTime 1552387509369]
> >> ...
> >>
> >> cluster B (destination):
> >> ~> kafka-console-consumer \
> >> --formatter
> >> "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
> \
> >> --bootstrap-server clusterB:9092 \
> >> --topic __consumer_offsets | grep some_topic
> >>
> >> ------------------------------
> >> [some_group,some_topic,7]::OffsetAndMetadata(offset=561492,
> >> leaderEpoch=Optional.empty, metadata=, commitTimestamp=1551782709369,
> >> expireTimestamp=Some(1552387509369))
> >> ...
> >>
> >> Notice, that offsets matches output of ConsumerGroupCommand for cluster
> A,
> >> but not for cluster B.
> >>
> >> So, there are my questions:
> >> Why ConsumerGroupCommand showing different results on cluster A and
> >> cluster B?
> >> Why consumer lag is so high on cluster B?
> >> Is ConsumerGroupCommand using info not from __consumer_offsets topic?
> >> Will all my consumers stuck (because of incompatible offsets) in case of
> >> failover?
> >>
> >> Kind regards,
> >> Tolya
> >>
> >>
> >> ________________________________
> >> "This message contains confidential information/commercial secret. If
> you
> >> are not the intended addressee of this message you may not copy, save,
> >> print or forward it to any third party and you are kindly requested to
> >> destroy this message and notify the sender thereof by email.
> >> ?????? ????????? ???????? ???????????????? ??????????/??????????,
> >> ?????????? ???????????? ??????. ???? ?? ?? ????????? ??????????
> ?????????
> >> ??????? ?????????, ?? ?? ?????? ??????????, ?????????, ???????? ???
> >> ?????????? ??? ????? ???? ???? ?????. ??????? ?????????? ??????
> ????????? ?
> >> ????????? ?? ???? ??????????? ??????????? ???????."
> >>
>
>
> ________________________________
> "This message contains confidential information/commercial secret. If you
> are not the intended addressee of this message you may not copy, save,
> print or forward it to any third party and you are kindly requested to
> destroy this message and notify the sender thereof by email.
> Данное сообщение содержит конфиденциальную информацию/информацию,
> являющуюся коммерческой тайной. Если Вы не являетесь
надлежащим адресатом
> данного сообщения, Вы не вправе копировать, сохранять,
печатать или
> пересылать его каким либо иным лицам. Просьба уничтожить
данное сообщение и
> уведомить об этом отправителя электронным письмом.”
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message