kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ryanne Dolan <ryannedo...@gmail.com>
Subject Re: MM2 startup delay
Date Fri, 27 Dec 2019 18:11:02 GMT
Hey Peter.

> No Authorizer is configured on the broker

You'll need to disable ACL sync to avoid this error. It's harmless tho.

> Failed to fetch offsets

I have not encountered this. Is it possible one of the clusters is/was
unreachable? Are any of the clusters using Kerberos or SSL that may be
misconfigured?

> Plugin class loader...

I'll look into this.

Ryanne


On Fri, Dec 27, 2019, 7:44 AM Péter Sinóros-Szabó
<peter.sinoros-szabo@transferwise.com.invalid> wrote:

> Hey,
>
> > Do you see any timeouts in the logs?
> No. At least not exception related to timeout.
> I see exception
> like: InstanceAlreadyExistsException, SecurityDisabledException.
>
> But I see these issues below, it might be related... Is it expected to see
> ClassLoader exceptions?
>
> [2019-12-27 11:34:48,056] INFO Started MirrorCheckpointConnector with 871
> consumer groups.
> (org.apache.kafka.connect.mirror.MirrorCheckpointConnector:69)
> [2019-12-27 11:34:48,060] INFO Finished creating connector
> MirrorCheckpointConnector (org.apache.kafka.connect.runtime.Worker:273)
> [2019-12-27 11:34:48,060] ERROR Plugin class loader for connector:
> 'org.apache.kafka.connect.mirror.MirrorCheckpointConnector' was not found.
> Returning:
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2eced48b
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
> [2019-12-27 11:34:48,060] INFO SourceConnectorConfig values:
>         config.action.reload = restart
>         connector.class =
> org.apache.kafka.connect.mirror.MirrorCheckpointConnector
>         errors.log.enable = false
>         errors.log.include.messages = false
>         errors.retry.delay.max.ms = 60000
>         errors.retry.timeout = 0
>         errors.tolerance = none
>         header.converter = null
>         key.converter = null
>         name = MirrorCheckpointConnector
>         tasks.max = 1
>         transforms = []
>         value.converter = null
>  (org.apache.kafka.connect.runtime.SourceConnectorConfig:347)
> [2019-12-27 11:34:48,062] INFO EnrichedConnectorConfig values:
>         config.action.reload = restart
>         connector.class =
> org.apache.kafka.connect.mirror.MirrorCheckpointConnector
>         errors.log.enable = false
>         errors.log.include.messages = false
>         errors.retry.delay.max.ms = 60000
>         errors.retry.timeout = 0
>         errors.tolerance = none
>         header.converter = null
>         key.converter = null
>         name = MirrorCheckpointConnector
>         tasks.max = 1
>         transforms = []
>         value.converter = null
>
>  (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:347)
> [2019-12-27 11:34:48,063] ERROR Plugin class loader for connector:
> 'org.apache.kafka.connect.mirror.MirrorCheckpointConnector' was not found.
> Returning:
> org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2eced48b
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
> [2019-12-27 11:34:48,066] INFO [Worker clientId=connect-1,
> groupId=eucmain-mm2] Handling task config update by restarting tasks
> [MirrorCheckpointConnector-0, MirrorSourceConnector-0]
> (org.apache.kafka.connect.runtime.distributed.DistributedHerder:574)
> [2019-12-27 11:34:48,066] INFO Stopping task MirrorCheckpointConnector-0
> (org.apache.kafka.connect.runtime.Worker:704)
> [2019-12-27 11:34:48,611] INFO Stopping DistributedHerder-connect-1 took
> 544 ms. (org.apache.kafka.connect.mirror.MirrorCheckpointTask:99)
> [2019-12-27 11:34:48,611] INFO Stopping task MirrorSourceConnector-0
> (org.apache.kafka.connect.runtime.Worker:704)
> [2019-12-27 11:34:48,616] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> [2019-12-27 11:34:48,617] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding
> messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> [2019-12-27 11:34:48,617] INFO [Producer
> clientId=connector-producer-MirrorCheckpointConnector-0] Closing the Kafka
> producer with timeoutMillis = 30000 ms.
> (org.apache.kafka.clients.producer.KafkaProducer:1183)
> [2019-12-27 11:34:50,587] INFO syncing topic configs took 2930 ms
> (org.apache.kafka.connect.mirror.Scheduler:95)
> [2019-12-27 11:34:53,612] ERROR Graceful stop of task
> MirrorSourceConnector-0 failed.
> (org.apache.kafka.connect.runtime.Worker:736)
> [2019-12-27 11:34:53,613] INFO [Worker clientId=connect-1,
> groupId=eucmain-mm2] Rebalance started
> (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:222)
> [2019-12-27 11:34:53,613] INFO [Worker clientId=connect-1,
> groupId=eucmain-mm2] (Re-)joining group
> (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:533)
> [2019-12-27 11:34:53,613] ERROR Failed to fetch offsets from namespace
> MirrorSourceConnector:
>  (org.apache.kafka.connect.storage.OffsetStorageReaderImpl:113)
> org.apache.kafka.connect.errors.ConnectException: Offset reader closed
> while attempting to read offsets. This is likely because the task was been
> scheduled to stop but has taken longer than the graceful shutdown period to
> do so.
>         at
>
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103)
>         at
>
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
>         at
>
> org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffset(MirrorSourceTask.java:227)
>         at
>
> org.apache.kafka.connect.mirror.MirrorSourceTask.lambda$loadOffsets$4(MirrorSourceTask.java:222)
>         at
> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>         at
> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>         at
> java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
>         at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>         at
>
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>         at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>         at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>         at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>         at
>
> org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffsets(MirrorSourceTask.java:222)
>         at
>
> org.apache.kafka.connect.mirror.MirrorSourceTask.start(MirrorSourceTask.java:92)
>         at
>
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
>         at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
>         at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> [2019-12-27 11:34:53,616] INFO WorkerSourceTask{id=MirrorSourceConnector-0}
> Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> [2019-12-27 11:34:53,617] INFO WorkerSourceTask{id=MirrorSourceConnector-0}
> flushing 0 outstanding messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> [2019-12-27 11:34:53,617] ERROR
> WorkerSourceTask{id=MirrorSourceConnector-0} Task threw an uncaught and
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
> org.apache.kafka.connect.errors.ConnectException: Failed to fetch offsets.
>         at
>
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114)
>         at
>
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
>         at
>
> org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffset(MirrorSourceTask.java:227)
>         at
>
> org.apache.kafka.connect.mirror.MirrorSourceTask.lambda$loadOffsets$4(MirrorSourceTask.java:222)
>         at
> java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>         at
> java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>         at
> java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
>         at
> java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>         at
>
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>         at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>         at
> java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>         at
> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
>         at
>
> org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffsets(MirrorSourceTask.java:222)
>         at
>
> org.apache.kafka.connect.mirror.MirrorSourceTask.start(MirrorSourceTask.java:92)
>         at
>
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
>         at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
>         at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.kafka.connect.errors.ConnectException: Offset reader
> closed while attempting to read offsets. This is likely because the task
> was been scheduled to stop but has taken longer than the graceful shutdown
> period to do so.
>         at
>
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103)
>         ... 21 more
> [2019-12-27 11:34:53,621] ERROR
> WorkerSourceTask{id=MirrorSourceConnector-0} Task is being killed and will
> not recover until manually restarted
> (org.apache.kafka.connect.runtime.WorkerTask:180)
> [2019-12-27 11:34:53,622] INFO [Producer clientId=producer-4] Closing the
> Kafka producer with timeoutMillis = 9223372036854775807 ms.
> (org.apache.kafka.clients.producer.KafkaProducer:1183)
>
>
> After awhile this exceptions stop and I only see these logs while nothing
> really happens (well, minimal traffic on the destination broker, probably
> only some cluster maintenance.)
>
> [2019-12-27 11:35:17,556] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> [2019-12-27 11:35:17,556] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
> messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> [2019-12-27 11:35:17,582] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets
> successfully in 25 ms
> (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
> [2019-12-27 11:35:19,154] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> [2019-12-27 11:35:19,154] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
> messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> [2019-12-27 11:35:19,455] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets
> successfully in 301 ms
> (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
> [2019-12-27 11:35:53,718] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> [2019-12-27 11:35:53,718] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding
> messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> [2019-12-27 11:35:53,772] INFO WorkerSourceTask{id=MirrorSourceConnector-0}
> Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> [2019-12-27 11:35:53,773] INFO WorkerSourceTask{id=MirrorSourceConnector-0}
> flushing 0 outstanding messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> [2019-12-27 11:36:17,582] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> [2019-12-27 11:36:17,582] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
> messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> [2019-12-27 11:36:17,593] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets
> successfully in 11 ms
> (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
> [2019-12-27 11:36:19,456] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> [2019-12-27 11:36:19,456] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
> messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> [2019-12-27 11:36:19,483] INFO
> WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets
> successfully in 27 ms
> (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
> [2019-12-27 11:36:53,719] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> [2019-12-27 11:36:53,719] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding
> messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> [2019-12-27 11:36:53,911] INFO
> WorkerSourceTask{id=MirrorCheckpointConnector-0} Finished commitOffsets
> successfully in 192 ms
> (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
> [2019-12-27 11:36:53,911] INFO WorkerSourceTask{id=MirrorSourceConnector-0}
> Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> [2019-12-27 11:36:53,911] INFO WorkerSourceTask{id=MirrorSourceConnector-0}
> flushing 0 outstanding messages for offset commit
> (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
>
>
> then ...
>
> [2019-12-27 11:44:47,642] ERROR Scheduler for MirrorSourceConnector caught
> exception in scheduled task: syncing topic ACLs
> (org.apache.kafka.connect.mirror.Scheduler:102)
> java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is
> configured on the broker
>         at
>
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>
> and then the usual logs from WorkerSourceTask... for some minutes. And then
> another Security... exception:
>
> [2019-12-27 11:54:47,643] ERROR Scheduler for MirrorSourceConnector caught
> exception in scheduled task: syncing topic ACLs
> (org.apache.kafka.connect.mirror.Scheduler:102)
> java.util.concurrent.ExecutionException:
> org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer is
> configured on the broker
>         at
>
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>
> and only after another minute, it starts to subscribe to the topics:
>
> [2019-12-27 11:55:11,298] INFO [Consumer clientId=consumer-7, groupId=null]
> Subscribed to partition(s): Ninja ... ... <listing a lot of topics here>
> [2019-12-27 11:55:11,318] INFO Starting with 2303 previously uncommitted
> partitions. (org.apache.kafka.connect.mirror.MirrorSourceTask:94)
> [2019-12-27 11:55:11,319] INFO [Consumer clientId=consumer-7, groupId=null]
> Seeking to offset 0 for partition Ninja...-0
> (org.apache.kafka.clients.consumer.KafkaConsumer:1564)
> ...
> [2019-12-27 11:55:11,719] INFO task-thread-MirrorSourceConnector-0
> replicating 2710 topic-partitions eucmain->euwbackup: [Ninja...
>
> And then I see some real traffic towards the destination cluster, I guess
> it is the time when it really starts the mirroring.
>
> Peter
>
> On Wed, 11 Dec 2019 at 20:26, Ryanne Dolan <ryannedolan@gmail.com> wrote:
>
> > Hey Peter. Do you see any timeouts in the logs? The internal scheduler
> will
> > timeout each task after 60 seconds by default, which might not be long
> > enough to finish some of the bootstrap tasks in your case. My team has
> > observed that behavior in super-flaky environments, e.g. when
> connectivity
> > drops during bootstrapping, in which case MirrorSourceConnector can get
> > into a funky state. This resolves when it refreshes its state after a
> > while. The default refresh interval of 10 minutes seems to jibe with your
> > observations.
> >
> > My team patched our internal MM2 build months ago to force bootstrapping
> to
> > complete correctly. I can share the patch, and if it helps we can raise a
> > PR.
> >
> > Ryanne
> >
> > On Mon, Dec 9, 2019 at 5:28 AM Péter Sinóros-Szabó
> > <peter.sinoros-szabo@transferwise.com.invalid> wrote:
> >
> > > Hi,
> > >
> > > I am experimenting with Mirror Make 2 in 2.4.0-rc3. It seems to start
> up
> > > fine, connects to both source and destination, creates new topics...
> > > But it does not start to actually mirror the messages until about 12
> > > minutes after MM2 was started. I would expect it to start mirroring in
> > some
> > > seconds after startup.
> > >
> > > Source cluster has about 2800 partitions, destination cluster is empty.
> > > Both clusters are in AWS but in different regions.
> > >
> > > What may cause the 12 minutes delay?
> > >
> > > Config is:
> > > ---
> > > clusters = eucmain, euwbackup
> > > eucmain.bootstrap.servers =
> > > test-kafka-main-fra01.xx:9092,test-kafka-main-fra02.xx:9092
> > > euwbackup.bootstrap.servers = 172.30.197.203:9092,172.30.213.104:9092
> > > eucmain->euwbackup.enabled = true
> > > eucmain->euwbackup.topics = .*
> > > eucmain->euwbackup.topics.blacklist = ^(kafka|kmf|__).*
> > > eucmain->euwbackup.rename.topics = false
> > > replication.policy.separator = __
> > > eucmain.client.id = mm2
> > >
> > > I do not see any serious errors in the logs that I would think of a
> cause
> > > of this.
> > >
> > > Thanks,
> > > Peter
> > >
> >
>
>
> --
>  - Sini
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message