kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Péter Sinóros-Szabó <peter.sinoros-sz...@transferwise.com.INVALID>
Subject Re: MM2 startup delay
Date Fri, 27 Dec 2019 13:44:23 GMT
Hey,

> Do you see any timeouts in the logs?
No. At least not exception related to timeout.
I see exception
like: InstanceAlreadyExistsException, SecurityDisabledException.

But I see these issues below, it might be related... Is it expected to see
ClassLoader exceptions?

[2019-12-27 11:34:48,056] INFO Started MirrorCheckpointConnector with 871
consumer groups.
(org.apache.kafka.connect.mirror.MirrorCheckpointConnector:69)
[2019-12-27 11:34:48,060] INFO Finished creating connector
MirrorCheckpointConnector (org.apache.kafka.connect.runtime.Worker:273)
[2019-12-27 11:34:48,060] ERROR Plugin class loader for connector:
'org.apache.kafka.connect.mirror.MirrorCheckpointConnector' was not found.
Returning:
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2eced48b
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
[2019-12-27 11:34:48,060] INFO SourceConnectorConfig values:
        config.action.reload = restart
        connector.class =
org.apache.kafka.connect.mirror.MirrorCheckpointConnector
        errors.log.enable = false
        errors.log.include.messages = false
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none
        header.converter = null
        key.converter = null
        name = MirrorCheckpointConnector
        tasks.max = 1
        transforms = []
        value.converter = null
 (org.apache.kafka.connect.runtime.SourceConnectorConfig:347)
[2019-12-27 11:34:48,062] INFO EnrichedConnectorConfig values:
        config.action.reload = restart
        connector.class =
org.apache.kafka.connect.mirror.MirrorCheckpointConnector
        errors.log.enable = false
        errors.log.include.messages = false
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none
        header.converter = null
        key.converter = null
        name = MirrorCheckpointConnector
        tasks.max = 1
        transforms = []
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:347)
[2019-12-27 11:34:48,063] ERROR Plugin class loader for connector:
'org.apache.kafka.connect.mirror.MirrorCheckpointConnector' was not found.
Returning:
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2eced48b
(org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
[2019-12-27 11:34:48,066] INFO [Worker clientId=connect-1,
groupId=eucmain-mm2] Handling task config update by restarting tasks
[MirrorCheckpointConnector-0, MirrorSourceConnector-0]
(org.apache.kafka.connect.runtime.distributed.DistributedHerder:574)
[2019-12-27 11:34:48,066] INFO Stopping task MirrorCheckpointConnector-0
(org.apache.kafka.connect.runtime.Worker:704)
[2019-12-27 11:34:48,611] INFO Stopping DistributedHerder-connect-1 took
544 ms. (org.apache.kafka.connect.mirror.MirrorCheckpointTask:99)
[2019-12-27 11:34:48,611] INFO Stopping task MirrorSourceConnector-0
(org.apache.kafka.connect.runtime.Worker:704)
[2019-12-27 11:34:48,616] INFO
WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2019-12-27 11:34:48,617] INFO
WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding
messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2019-12-27 11:34:48,617] INFO [Producer
clientId=connector-producer-MirrorCheckpointConnector-0] Closing the Kafka
producer with timeoutMillis = 30000 ms.
(org.apache.kafka.clients.producer.KafkaProducer:1183)
[2019-12-27 11:34:50,587] INFO syncing topic configs took 2930 ms
(org.apache.kafka.connect.mirror.Scheduler:95)
[2019-12-27 11:34:53,612] ERROR Graceful stop of task
MirrorSourceConnector-0 failed.
(org.apache.kafka.connect.runtime.Worker:736)
[2019-12-27 11:34:53,613] INFO [Worker clientId=connect-1,
groupId=eucmain-mm2] Rebalance started
(org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:222)
[2019-12-27 11:34:53,613] INFO [Worker clientId=connect-1,
groupId=eucmain-mm2] (Re-)joining group
(org.apache.kafka.clients.consumer.internals.AbstractCoordinator:533)
[2019-12-27 11:34:53,613] ERROR Failed to fetch offsets from namespace
MirrorSourceConnector:
 (org.apache.kafka.connect.storage.OffsetStorageReaderImpl:113)
org.apache.kafka.connect.errors.ConnectException: Offset reader closed
while attempting to read offsets. This is likely because the task was been
scheduled to stop but has taken longer than the graceful shutdown period to
do so.
        at
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103)
        at
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
        at
org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffset(MirrorSourceTask.java:227)
        at
org.apache.kafka.connect.mirror.MirrorSourceTask.lambda$loadOffsets$4(MirrorSourceTask.java:222)
        at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
        at
java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
        at
java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
        at
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
        at
org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffsets(MirrorSourceTask.java:222)
        at
org.apache.kafka.connect.mirror.MirrorSourceTask.start(MirrorSourceTask.java:92)
        at
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
        at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2019-12-27 11:34:53,616] INFO WorkerSourceTask{id=MirrorSourceConnector-0}
Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2019-12-27 11:34:53,617] INFO WorkerSourceTask{id=MirrorSourceConnector-0}
flushing 0 outstanding messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2019-12-27 11:34:53,617] ERROR
WorkerSourceTask{id=MirrorSourceConnector-0} Task threw an uncaught and
unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets.
        at
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114)
        at
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
        at
org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffset(MirrorSourceTask.java:227)
        at
org.apache.kafka.connect.mirror.MirrorSourceTask.lambda$loadOffsets$4(MirrorSourceTask.java:222)
        at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
        at
java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
        at
java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
        at
java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
        at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
        at
java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at
java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
        at
org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffsets(MirrorSourceTask.java:222)
        at
org.apache.kafka.connect.mirror.MirrorSourceTask.start(MirrorSourceTask.java:92)
        at
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
        at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader
closed while attempting to read offsets. This is likely because the task
was been scheduled to stop but has taken longer than the graceful shutdown
period to do so.
        at
org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103)
        ... 21 more
[2019-12-27 11:34:53,621] ERROR
WorkerSourceTask{id=MirrorSourceConnector-0} Task is being killed and will
not recover until manually restarted
(org.apache.kafka.connect.runtime.WorkerTask:180)
[2019-12-27 11:34:53,622] INFO [Producer clientId=producer-4] Closing the
Kafka producer with timeoutMillis = 9223372036854775807 ms.
(org.apache.kafka.clients.producer.KafkaProducer:1183)


After awhile this exceptions stop and I only see these logs while nothing
really happens (well, minimal traffic on the destination broker, probably
only some cluster maintenance.)

[2019-12-27 11:35:17,556] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2019-12-27 11:35:17,556] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2019-12-27 11:35:17,582] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets
successfully in 25 ms
(org.apache.kafka.connect.runtime.WorkerSourceTask:515)
[2019-12-27 11:35:19,154] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2019-12-27 11:35:19,154] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2019-12-27 11:35:19,455] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets
successfully in 301 ms
(org.apache.kafka.connect.runtime.WorkerSourceTask:515)
[2019-12-27 11:35:53,718] INFO
WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2019-12-27 11:35:53,718] INFO
WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding
messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2019-12-27 11:35:53,772] INFO WorkerSourceTask{id=MirrorSourceConnector-0}
Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2019-12-27 11:35:53,773] INFO WorkerSourceTask{id=MirrorSourceConnector-0}
flushing 0 outstanding messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2019-12-27 11:36:17,582] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2019-12-27 11:36:17,582] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2019-12-27 11:36:17,593] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets
successfully in 11 ms
(org.apache.kafka.connect.runtime.WorkerSourceTask:515)
[2019-12-27 11:36:19,456] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2019-12-27 11:36:19,456] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2019-12-27 11:36:19,483] INFO
WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets
successfully in 27 ms
(org.apache.kafka.connect.runtime.WorkerSourceTask:515)
[2019-12-27 11:36:53,719] INFO
WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets
(org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2019-12-27 11:36:53,719] INFO
WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding
messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2019-12-27 11:36:53,911] INFO
WorkerSourceTask{id=MirrorCheckpointConnector-0} Finished commitOffsets
successfully in 192 ms
(org.apache.kafka.connect.runtime.WorkerSourceTask:515)
[2019-12-27 11:36:53,911] INFO WorkerSourceTask{id=MirrorSourceConnector-0}
Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2019-12-27 11:36:53,911] INFO WorkerSourceTask{id=MirrorSourceConnector-0}
flushing 0 outstanding messages for offset commit
(org.apache.kafka.connect.runtime.WorkerSourceTask:433)


then ...

[2019-12-27 11:44:47,642] ERROR Scheduler for MirrorSourceConnector caught
exception in scheduled task: syncing topic ACLs
(org.apache.kafka.connect.mirror.Scheduler:102)
java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is
configured on the broker
        at
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)

and then the usual logs from WorkerSourceTask... for some minutes. And then
another Security... exception:

[2019-12-27 11:54:47,643] ERROR Scheduler for MirrorSourceConnector caught
exception in scheduled task: syncing topic ACLs
(org.apache.kafka.connect.mirror.Scheduler:102)
java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is
configured on the broker
        at
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)

and only after another minute, it starts to subscribe to the topics:

[2019-12-27 11:55:11,298] INFO [Consumer clientId=consumer-7, groupId=null]
Subscribed to partition(s): Ninja ... ... <listing a lot of topics here>
[2019-12-27 11:55:11,318] INFO Starting with 2303 previously uncommitted
partitions. (org.apache.kafka.connect.mirror.MirrorSourceTask:94)
[2019-12-27 11:55:11,319] INFO [Consumer clientId=consumer-7, groupId=null]
Seeking to offset 0 for partition Ninja...-0
(org.apache.kafka.clients.consumer.KafkaConsumer:1564)
...
[2019-12-27 11:55:11,719] INFO task-thread-MirrorSourceConnector-0
replicating 2710 topic-partitions eucmain->euwbackup: [Ninja...

And then I see some real traffic towards the destination cluster, I guess
it is the time when it really starts the mirroring.

Peter

On Wed, 11 Dec 2019 at 20:26, Ryanne Dolan <ryannedolan@gmail.com> wrote:

> Hey Peter. Do you see any timeouts in the logs? The internal scheduler will
> timeout each task after 60 seconds by default, which might not be long
> enough to finish some of the bootstrap tasks in your case. My team has
> observed that behavior in super-flaky environments, e.g. when connectivity
> drops during bootstrapping, in which case MirrorSourceConnector can get
> into a funky state. This resolves when it refreshes its state after a
> while. The default refresh interval of 10 minutes seems to jibe with your
> observations.
>
> My team patched our internal MM2 build months ago to force bootstrapping to
> complete correctly. I can share the patch, and if it helps we can raise a
> PR.
>
> Ryanne
>
> On Mon, Dec 9, 2019 at 5:28 AM Péter Sinóros-Szabó
> <peter.sinoros-szabo@transferwise.com.invalid> wrote:
>
> > Hi,
> >
> > I am experimenting with Mirror Make 2 in 2.4.0-rc3. It seems to start up
> > fine, connects to both source and destination, creates new topics...
> > But it does not start to actually mirror the messages until about 12
> > minutes after MM2 was started. I would expect it to start mirroring in
> some
> > seconds after startup.
> >
> > Source cluster has about 2800 partitions, destination cluster is empty.
> > Both clusters are in AWS but in different regions.
> >
> > What may cause the 12 minutes delay?
> >
> > Config is:
> > ---
> > clusters = eucmain, euwbackup
> > eucmain.bootstrap.servers =
> > test-kafka-main-fra01.xx:9092,test-kafka-main-fra02.xx:9092
> > euwbackup.bootstrap.servers = 172.30.197.203:9092,172.30.213.104:9092
> > eucmain->euwbackup.enabled = true
> > eucmain->euwbackup.topics = .*
> > eucmain->euwbackup.topics.blacklist = ^(kafka|kmf|__).*
> > eucmain->euwbackup.rename.topics = false
> > replication.policy.separator = __
> > eucmain.client.id = mm2
> >
> > I do not see any serious errors in the logs that I would think of a cause
> > of this.
> >
> > Thanks,
> > Peter
> >
>


-- 
 - Sini

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message