kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Péter Sinóros-Szabó <peter.sinoros-sz...@transferwise.com.INVALID>
Subject Re: MM2 startup delay
Date Mon, 30 Dec 2019 09:09:48 GMT
Hi Ryanne,

> > Failed to fetch offsets
> I have not encountered this. Is it possible one of the clusters is/was
unreachable? Are any of the clusters using Kerberos or SSL that may be
misconfigured?

Clusters were reachable and working fine according to our monitoring. It
might be some network issue between AWS regions, but it is not likely.
I will keep monitoring this issue and let you know if it happens again.
No Kerberos or SSL is used. Just plain text, with almost default
configuration on both clusters.

> > Plugin class loader...
> I'll look into this.
We use an old JVM on this: openjdk version "1.8.0_222", just guessing it
may be the reason.

Peter



On Fri, 27 Dec 2019 at 19:11, Ryanne Dolan <ryannedolan@gmail.com> wrote:

> Hey Peter.
>
> > No Authorizer is configured on the broker
>
> You'll need to disable ACL sync to avoid this error. It's harmless tho.
>
> > Failed to fetch offsets
>
> I have not encountered this. Is it possible one of the clusters is/was
> unreachable? Are any of the clusters using Kerberos or SSL that may be
> misconfigured?
>
> > Plugin class loader...
>
> I'll look into this.
>
> Ryanne
>
>
> On Fri, Dec 27, 2019, 7:44 AM Péter Sinóros-Szabó
> <peter.sinoros-szabo@transferwise.com.invalid> wrote:
>
> > Hey,
> >
> > > Do you see any timeouts in the logs?
> > No. At least not exception related to timeout.
> > I see exception
> > like: InstanceAlreadyExistsException, SecurityDisabledException.
> >
> > But I see these issues below, it might be related... Is it expected to
> see
> > ClassLoader exceptions?
> >
> > [2019-12-27 11:34:48,056] INFO Started MirrorCheckpointConnector with 871
> > consumer groups.
> > (org.apache.kafka.connect.mirror.MirrorCheckpointConnector:69)
> > [2019-12-27 11:34:48,060] INFO Finished creating connector
> > MirrorCheckpointConnector (org.apache.kafka.connect.runtime.Worker:273)
> > [2019-12-27 11:34:48,060] ERROR Plugin class loader for connector:
> > 'org.apache.kafka.connect.mirror.MirrorCheckpointConnector' was not
> found.
> > Returning:
> > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2eced48b
> > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
> > [2019-12-27 11:34:48,060] INFO SourceConnectorConfig values:
> >         config.action.reload = restart
> >         connector.class =
> > org.apache.kafka.connect.mirror.MirrorCheckpointConnector
> >         errors.log.enable = false
> >         errors.log.include.messages = false
> >         errors.retry.delay.max.ms = 60000
> >         errors.retry.timeout = 0
> >         errors.tolerance = none
> >         header.converter = null
> >         key.converter = null
> >         name = MirrorCheckpointConnector
> >         tasks.max = 1
> >         transforms = []
> >         value.converter = null
> >  (org.apache.kafka.connect.runtime.SourceConnectorConfig:347)
> > [2019-12-27 11:34:48,062] INFO EnrichedConnectorConfig values:
> >         config.action.reload = restart
> >         connector.class =
> > org.apache.kafka.connect.mirror.MirrorCheckpointConnector
> >         errors.log.enable = false
> >         errors.log.include.messages = false
> >         errors.retry.delay.max.ms = 60000
> >         errors.retry.timeout = 0
> >         errors.tolerance = none
> >         header.converter = null
> >         key.converter = null
> >         name = MirrorCheckpointConnector
> >         tasks.max = 1
> >         transforms = []
> >         value.converter = null
> >
> >
> (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:347)
> > [2019-12-27 11:34:48,063] ERROR Plugin class loader for connector:
> > 'org.apache.kafka.connect.mirror.MirrorCheckpointConnector' was not
> found.
> > Returning:
> > org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader@2eced48b
> > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:165)
> > [2019-12-27 11:34:48,066] INFO [Worker clientId=connect-1,
> > groupId=eucmain-mm2] Handling task config update by restarting tasks
> > [MirrorCheckpointConnector-0, MirrorSourceConnector-0]
> > (org.apache.kafka.connect.runtime.distributed.DistributedHerder:574)
> > [2019-12-27 11:34:48,066] INFO Stopping task MirrorCheckpointConnector-0
> > (org.apache.kafka.connect.runtime.Worker:704)
> > [2019-12-27 11:34:48,611] INFO Stopping DistributedHerder-connect-1 took
> > 544 ms. (org.apache.kafka.connect.mirror.MirrorCheckpointTask:99)
> > [2019-12-27 11:34:48,611] INFO Stopping task MirrorSourceConnector-0
> > (org.apache.kafka.connect.runtime.Worker:704)
> > [2019-12-27 11:34:48,616] INFO
> > WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> > [2019-12-27 11:34:48,617] INFO
> > WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding
> > messages for offset commit
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> > [2019-12-27 11:34:48,617] INFO [Producer
> > clientId=connector-producer-MirrorCheckpointConnector-0] Closing the
> Kafka
> > producer with timeoutMillis = 30000 ms.
> > (org.apache.kafka.clients.producer.KafkaProducer:1183)
> > [2019-12-27 11:34:50,587] INFO syncing topic configs took 2930 ms
> > (org.apache.kafka.connect.mirror.Scheduler:95)
> > [2019-12-27 11:34:53,612] ERROR Graceful stop of task
> > MirrorSourceConnector-0 failed.
> > (org.apache.kafka.connect.runtime.Worker:736)
> > [2019-12-27 11:34:53,613] INFO [Worker clientId=connect-1,
> > groupId=eucmain-mm2] Rebalance started
> > (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator:222)
> > [2019-12-27 11:34:53,613] INFO [Worker clientId=connect-1,
> > groupId=eucmain-mm2] (Re-)joining group
> > (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:533)
> > [2019-12-27 11:34:53,613] ERROR Failed to fetch offsets from namespace
> > MirrorSourceConnector:
> >  (org.apache.kafka.connect.storage.OffsetStorageReaderImpl:113)
> > org.apache.kafka.connect.errors.ConnectException: Offset reader closed
> > while attempting to read offsets. This is likely because the task was
> been
> > scheduled to stop but has taken longer than the graceful shutdown period
> to
> > do so.
> >         at
> >
> >
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103)
> >         at
> >
> >
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
> >         at
> >
> >
> org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffset(MirrorSourceTask.java:227)
> >         at
> >
> >
> org.apache.kafka.connect.mirror.MirrorSourceTask.lambda$loadOffsets$4(MirrorSourceTask.java:222)
> >         at
> > java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> >         at
> > java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> >         at
> > java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> >         at
> > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> >         at
> >
> >
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> >         at
> >
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> >         at
> > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> >         at
> > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> >         at
> >
> >
> org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffsets(MirrorSourceTask.java:222)
> >         at
> >
> >
> org.apache.kafka.connect.mirror.MirrorSourceTask.start(MirrorSourceTask.java:92)
> >         at
> >
> >
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
> >         at
> > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
> >         at
> > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
> >         at
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >         at
> >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >         at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >         at java.lang.Thread.run(Thread.java:748)
> > [2019-12-27 11:34:53,616] INFO
> WorkerSourceTask{id=MirrorSourceConnector-0}
> > Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> > [2019-12-27 11:34:53,617] INFO
> WorkerSourceTask{id=MirrorSourceConnector-0}
> > flushing 0 outstanding messages for offset commit
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> > [2019-12-27 11:34:53,617] ERROR
> > WorkerSourceTask{id=MirrorSourceConnector-0} Task threw an uncaught and
> > unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
> > org.apache.kafka.connect.errors.ConnectException: Failed to fetch
> offsets.
> >         at
> >
> >
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:114)
> >         at
> >
> >
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offset(OffsetStorageReaderImpl.java:63)
> >         at
> >
> >
> org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffset(MirrorSourceTask.java:227)
> >         at
> >
> >
> org.apache.kafka.connect.mirror.MirrorSourceTask.lambda$loadOffsets$4(MirrorSourceTask.java:222)
> >         at
> > java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> >         at
> > java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> >         at
> > java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1556)
> >         at
> > java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
> >         at
> >
> >
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
> >         at
> >
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> >         at
> > java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> >         at
> > java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
> >         at
> >
> >
> org.apache.kafka.connect.mirror.MirrorSourceTask.loadOffsets(MirrorSourceTask.java:222)
> >         at
> >
> >
> org.apache.kafka.connect.mirror.MirrorSourceTask.start(MirrorSourceTask.java:92)
> >         at
> >
> >
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
> >         at
> > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
> >         at
> > org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
> >         at
> > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >         at
> >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >         at
> >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >         at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.kafka.connect.errors.ConnectException: Offset
> reader
> > closed while attempting to read offsets. This is likely because the task
> > was been scheduled to stop but has taken longer than the graceful
> shutdown
> > period to do so.
> >         at
> >
> >
> org.apache.kafka.connect.storage.OffsetStorageReaderImpl.offsets(OffsetStorageReaderImpl.java:103)
> >         ... 21 more
> > [2019-12-27 11:34:53,621] ERROR
> > WorkerSourceTask{id=MirrorSourceConnector-0} Task is being killed and
> will
> > not recover until manually restarted
> > (org.apache.kafka.connect.runtime.WorkerTask:180)
> > [2019-12-27 11:34:53,622] INFO [Producer clientId=producer-4] Closing the
> > Kafka producer with timeoutMillis = 9223372036854775807 ms.
> > (org.apache.kafka.clients.producer.KafkaProducer:1183)
> >
> >
> > After awhile this exceptions stop and I only see these logs while nothing
> > really happens (well, minimal traffic on the destination broker, probably
> > only some cluster maintenance.)
> >
> > [2019-12-27 11:35:17,556] INFO
> > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> > [2019-12-27 11:35:17,556] INFO
> > WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
> > messages for offset commit
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> > [2019-12-27 11:35:17,582] INFO
> > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets
> > successfully in 25 ms
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
> > [2019-12-27 11:35:19,154] INFO
> > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> > [2019-12-27 11:35:19,154] INFO
> > WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
> > messages for offset commit
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> > [2019-12-27 11:35:19,455] INFO
> > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets
> > successfully in 301 ms
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
> > [2019-12-27 11:35:53,718] INFO
> > WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> > [2019-12-27 11:35:53,718] INFO
> > WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding
> > messages for offset commit
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> > [2019-12-27 11:35:53,772] INFO
> WorkerSourceTask{id=MirrorSourceConnector-0}
> > Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> > [2019-12-27 11:35:53,773] INFO
> WorkerSourceTask{id=MirrorSourceConnector-0}
> > flushing 0 outstanding messages for offset commit
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> > [2019-12-27 11:36:17,582] INFO
> > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> > [2019-12-27 11:36:17,582] INFO
> > WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
> > messages for offset commit
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> > [2019-12-27 11:36:17,593] INFO
> > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets
> > successfully in 11 ms
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
> > [2019-12-27 11:36:19,456] INFO
> > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Committing offsets
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> > [2019-12-27 11:36:19,456] INFO
> > WorkerSourceTask{id=MirrorHeartbeatConnector-0} flushing 0 outstanding
> > messages for offset commit
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> > [2019-12-27 11:36:19,483] INFO
> > WorkerSourceTask{id=MirrorHeartbeatConnector-0} Finished commitOffsets
> > successfully in 27 ms
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
> > [2019-12-27 11:36:53,719] INFO
> > WorkerSourceTask{id=MirrorCheckpointConnector-0} Committing offsets
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> > [2019-12-27 11:36:53,719] INFO
> > WorkerSourceTask{id=MirrorCheckpointConnector-0} flushing 0 outstanding
> > messages for offset commit
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> > [2019-12-27 11:36:53,911] INFO
> > WorkerSourceTask{id=MirrorCheckpointConnector-0} Finished commitOffsets
> > successfully in 192 ms
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
> > [2019-12-27 11:36:53,911] INFO
> WorkerSourceTask{id=MirrorSourceConnector-0}
> > Committing offsets
> (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
> > [2019-12-27 11:36:53,911] INFO
> WorkerSourceTask{id=MirrorSourceConnector-0}
> > flushing 0 outstanding messages for offset commit
> > (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
> >
> >
> > then ...
> >
> > [2019-12-27 11:44:47,642] ERROR Scheduler for MirrorSourceConnector
> caught
> > exception in scheduled task: syncing topic ACLs
> > (org.apache.kafka.connect.mirror.Scheduler:102)
> > java.util.concurrent.ExecutionException:
> > org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer
> is
> > configured on the broker
> >         at
> >
> >
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> >
> > and then the usual logs from WorkerSourceTask... for some minutes. And
> then
> > another Security... exception:
> >
> > [2019-12-27 11:54:47,643] ERROR Scheduler for MirrorSourceConnector
> caught
> > exception in scheduled task: syncing topic ACLs
> > (org.apache.kafka.connect.mirror.Scheduler:102)
> > java.util.concurrent.ExecutionException:
> > org.apache.kafka.common.errors.SecurityDisabledException: No Authorizer
> is
> > configured on the broker
> >         at
> >
> >
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
> >
> > and only after another minute, it starts to subscribe to the topics:
> >
> > [2019-12-27 11:55:11,298] INFO [Consumer clientId=consumer-7,
> groupId=null]
> > Subscribed to partition(s): Ninja ... ... <listing a lot of topics here>
> > [2019-12-27 11:55:11,318] INFO Starting with 2303 previously uncommitted
> > partitions. (org.apache.kafka.connect.mirror.MirrorSourceTask:94)
> > [2019-12-27 11:55:11,319] INFO [Consumer clientId=consumer-7,
> groupId=null]
> > Seeking to offset 0 for partition Ninja...-0
> > (org.apache.kafka.clients.consumer.KafkaConsumer:1564)
> > ...
> > [2019-12-27 11:55:11,719] INFO task-thread-MirrorSourceConnector-0
> > replicating 2710 topic-partitions eucmain->euwbackup: [Ninja...
> >
> > And then I see some real traffic towards the destination cluster, I guess
> > it is the time when it really starts the mirroring.
> >
> > Peter
> >
> > On Wed, 11 Dec 2019 at 20:26, Ryanne Dolan <ryannedolan@gmail.com>
> wrote:
> >
> > > Hey Peter. Do you see any timeouts in the logs? The internal scheduler
> > will
> > > timeout each task after 60 seconds by default, which might not be long
> > > enough to finish some of the bootstrap tasks in your case. My team has
> > > observed that behavior in super-flaky environments, e.g. when
> > connectivity
> > > drops during bootstrapping, in which case MirrorSourceConnector can get
> > > into a funky state. This resolves when it refreshes its state after a
> > > while. The default refresh interval of 10 minutes seems to jibe with
> your
> > > observations.
> > >
> > > My team patched our internal MM2 build months ago to force
> bootstrapping
> > to
> > > complete correctly. I can share the patch, and if it helps we can
> raise a
> > > PR.
> > >
> > > Ryanne
> > >
> > > On Mon, Dec 9, 2019 at 5:28 AM Péter Sinóros-Szabó
> > > <peter.sinoros-szabo@transferwise.com.invalid> wrote:
> > >
> > > > Hi,
> > > >
> > > > I am experimenting with Mirror Make 2 in 2.4.0-rc3. It seems to start
> > up
> > > > fine, connects to both source and destination, creates new topics...
> > > > But it does not start to actually mirror the messages until about 12
> > > > minutes after MM2 was started. I would expect it to start mirroring
> in
> > > some
> > > > seconds after startup.
> > > >
> > > > Source cluster has about 2800 partitions, destination cluster is
> empty.
> > > > Both clusters are in AWS but in different regions.
> > > >
> > > > What may cause the 12 minutes delay?
> > > >
> > > > Config is:
> > > > ---
> > > > clusters = eucmain, euwbackup
> > > > eucmain.bootstrap.servers =
> > > > test-kafka-main-fra01.xx:9092,test-kafka-main-fra02.xx:9092
> > > > euwbackup.bootstrap.servers = 172.30.197.203:9092,
> 172.30.213.104:9092
> > > > eucmain->euwbackup.enabled = true
> > > > eucmain->euwbackup.topics = .*
> > > > eucmain->euwbackup.topics.blacklist = ^(kafka|kmf|__).*
> > > > eucmain->euwbackup.rename.topics = false
> > > > replication.policy.separator = __
> > > > eucmain.client.id = mm2
> > > >
> > > > I do not see any serious errors in the logs that I would think of a
> > cause
> > > > of this.
> > > >
> > > > Thanks,
> > > > Peter
> > > >
> > >
> >
> >
> > --
> >  - Sini
> >
>


-- 
 - Sini

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message