kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From João Peixoto <joao.harti...@gmail.com>
Subject Large Kafka Streams deployment takes a long time to bootstrap
Date Fri, 05 May 2017 22:48:59 GMT
Warning, long message

*Problem*: Initializing a Kafka Stream is taking a loooong time. Currently
at the 40 minute mark

*Setup*:
2 co-partition topics with 100 partitions.
First topic contains a lot of messages in the order of hundreds of millions
Second topic is a KTable and contains ~30k records

Kafka cluster with 6 brokers running 0.10.1

Kafka streams running on 0.10.2.1. 5 instances with 5 threads each.
The instances are running on Kubernetes

*Stream Configuration*:
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
Serdes.ByteArray().getClass().getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);
props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);

*The events*:
I started 5 instances of my stream configuration at the same time. This is
the first
time this configuration is running.

2017-05-05 21:23:03.283  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1]
Creating producer client
2017-05-05 21:23:03.415  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1]
Creating consumer client
2017-05-05 21:23:03.520  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1]
Creating restore consumer client
2017-05-05 21:23:03.528  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1]
State transition from NOT_RUNNING to RUNNING.
2017-05-05 21:23:03.531  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-2]
Creating producer client
2017-05-05 21:23:03.564  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-2]
Creating consumer client
2017-05-05 21:23:03.569  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-2]
Creating restore consumer client
2017-05-05 21:23:03.615  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-2]
State transition from NOT_RUNNING to RUNNING.
2017-05-05 21:23:03.617  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-3]
Creating producer client
2017-05-05 21:23:03.621  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-3]
Creating consumer client
2017-05-05 21:23:03.625  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-3]
Creating restore consumer client
2017-05-05 21:23:03.628  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-3]
State transition from NOT_RUNNING to RUNNING.
2017-05-05 21:23:03.629  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
Creating producer client
2017-05-05 21:23:03.632  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
Creating consumer client
2017-05-05 21:23:03.635  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
Creating restore consumer client
2017-05-05 21:23:03.638  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
State transition from NOT_RUNNING to RUNNING.
2017-05-05 21:23:03.639  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-5]
Creating producer client
2017-05-05 21:23:03.641  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-5]
Creating consumer client
2017-05-05 21:23:03.644  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-5]
Creating restore consumer client
2017-05-05 21:23:03.647  INFO 71 --- [           main]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-5]
State transition from NOT_RUNNING to RUNNING.
2017-05-05 21:23:03.790  INFO 71 --- [ StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1]
Starting
2017-05-05 21:23:03.791  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
Starting
2017-05-05 21:23:03.790  INFO 71 --- [ StreamThread-2]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-2]
Starting
2017-05-05 21:23:03.791  INFO 71 --- [ StreamThread-3]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-3]
Starting
2017-05-05 21:23:03.792  INFO 71 --- [ StreamThread-5]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-5]
Starting
2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1]
at state RUNNING: partitions [] revoked at the beginning of consumer
rebalance.
2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-2]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-2]
at state RUNNING: partitions [] revoked at the beginning of consumer
rebalance.
2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
at state RUNNING: partitions [] revoked at the beginning of consumer
rebalance.
2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1]
State transition from RUNNING to PARTITIONS_REVOKED.
2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-3]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-3]
at state RUNNING: partitions [] revoked at the beginning of consumer
rebalance.
2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-2]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-2]
State transition from RUNNING to PARTITIONS_REVOKED.
2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-5]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-5]
at state RUNNING: partitions [] revoked at the beginning of consumer
rebalance.
2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
State transition from RUNNING to PARTITIONS_REVOKED.
2017-05-05 21:23:03.968  INFO 71 --- [ StreamThread-3]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-3]
State transition from RUNNING to PARTITIONS_REVOKED.
2017-05-05 21:23:03.968  INFO 71 --- [ StreamThread-5]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-5]
State transition from RUNNING to PARTITIONS_REVOKED.
2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-2]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-2]
Updating suspended tasks to contain active tasks []
2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
Updating suspended tasks to contain active tasks []
2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-5]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-5]
Updating suspended tasks to contain active tasks []
2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1]
Updating suspended tasks to contain active tasks []
2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-3]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-3]
Updating suspended tasks to contain active tasks []
2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-2]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-2]
Removing all active tasks []
2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
Removing all active tasks []
2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-5]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-5]
Removing all active tasks []
2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1]
Removing all active tasks []
2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-3]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-3]
Removing all active tasks []
2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-2]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-2]
Removing all standby tasks []
2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
Removing all standby tasks []
2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-5]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-5]
Removing all standby tasks []
2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1]
Removing all standby tasks []
2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-3]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-3]
Removing all standby tasks []
2017-05-05 21:23:04.020  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread [StreamThread-4]
Constructed client metadata
{18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null,
consumers=[<consumerlist>], state=[activeTasks: ([]) assignedTasks: ([])
prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost: 0.0]}}
from the member subscriptions.
2017-05-05 21:23:04.218  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread [StreamThread-4]
Completed validating internal topics in partition assignor
2017-05-05 21:23:04.591  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread [StreamThread-4]
Completed validating internal topics in partition assignor
2017-05-05 21:23:04.726  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread [StreamThread-4]
Assigned tasks to clients as
{18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>])
assignedTasks: ([<list>]) prevActiveTasks: ([]) prevAssignedTasks: ([])
capacity: 1.0 cost: 100.0]}.
2017-05-05 21:23:04.742  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread [StreamThread-4]
Constructed client metadata
{18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null,
consumers=[<consumerlist>], state=[activeTasks: ([]) assignedTasks: ([])
prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost: 0.0]}}
from the member subscriptions.
2017-05-05 21:23:05.120  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread [StreamThread-4]
Completed validating internal topics in partition assignor
2017-05-05 21:23:05.482  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread [StreamThread-4]
Completed validating internal topics in partition assignor
2017-05-05 21:23:05.520  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread [StreamThread-4]
Assigned tasks to clients as
{18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>])
prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost: 50.0],
da663a61-dada-478b-b060-78d77536530a=[activeTasks: ([<list>])
prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost: 50.0]}.
2017-05-05 21:23:05.553  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
at state PARTITIONS_REVOKED: new partitions [<partitionlist>] assigned at
the end of consumer rebalance.
*// The above line is repeated for each thread*
2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-5]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-5]
State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-2]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-2]
State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1]
State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-3]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-3]
State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
*// omitted*
2017-05-05 21:23:15.596  INFO 71 --- [ StreamThread-2]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-2]
State transition from ASSIGNING_PARTITIONS to RUNNING.
*// above message repeated for each thread*

*Important*: At this point only StreamThread-4 is performing commits every
10 seconds. The other threads output no logs. Now the fun begins

2017-05-05 21:29:21.310  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
State transition from RUNNING to PARTITIONS_REVOKED.
2017-05-05 21:29:21.310  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
Closing task's topology ... // repeated multiple times
2017-05-05 21:29:21.387  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
Flushing state stores of task ... // repeated multiple times
2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
Committing consumer offsets of task ... // repeated multiple times
2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
Updating suspended tasks to contain active tasks [<list>]
2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
Removing all active tasks [<list>]
2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-4]
Removing all standby tasks []


At this point there are no more log messages for 16 minutes!! During this
time I perform several threaddumps, almost every minute.
Thread dump below. Do notice that thread 4 is the only different one.

"StreamThread-5" - Thread t@57
   java.lang.Thread.State: BLOCKED
        at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169)
        - waiting to lock <653f9d6> (a
org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-1" t@49
        at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176)
        at
org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
        at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190)
        at
org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139)
        at
org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268)
        at
org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
        - locked <3fbbc5a8> (a java.util.PriorityQueue)
        at
org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251)
        at
org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

   Locked ownable synchronizers:
        - None

"StreamThread-4" - Thread t@55
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <5709c085> (a sun.nio.ch.Util$2)
        - locked <1cacaaf> (a java.util.Collections$UnmodifiableSet)
        - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at
org.apache.kafka.common.network.Selector.select(Selector.java:489)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:298)
        at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
        - locked <639d53ee> (a
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

   Locked ownable synchronizers:
        - None

"StreamThread-3" - Thread t@53
   java.lang.Thread.State: RUNNABLE
        at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169)
        - locked <653f9d6> (a org.apache.kafka.common.metrics.Sensor)
        at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176)
        at
org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
        at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190)
        at
org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139)
        at
org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268)
        at
org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
        - locked <7a09baf0> (a java.util.PriorityQueue)
        at
org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251)
        at
org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

   Locked ownable synchronizers:
        - None

"StreamThread-2" - Thread t@51
   java.lang.Thread.State: BLOCKED
        at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169)
        - waiting to lock <653f9d6> (a
org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-1" t@49
        at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176)
        at
org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
        at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190)
        at
org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139)
        at
org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268)
        at
org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
        - locked <2dc188ac> (a java.util.PriorityQueue)
        at
org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251)
        at
org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

   Locked ownable synchronizers:
        - None

"StreamThread-1" - Thread t@49
   java.lang.Thread.State: RUNNABLE
        at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:172)
        - locked <653f9d6> (a org.apache.kafka.common.metrics.Sensor)
        at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176)
        at
org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
        at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190)
        at
org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139)
        at
org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268)
        at
org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
        - locked <7dffc3aa> (a java.util.PriorityQueue)
        at
org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251)
        at
org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

   Locked ownable synchronizers:
        - None


(no logs omitted, ~16 minutes later)
2017-05-05 21:45:05.270  INFO 71 --- [ StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1]
Committing all tasks because the commit interval 10000ms has elapsed
2017-05-05 21:45:05.270  INFO 71 --- [ StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1]
Committing task StreamTask .. // repeated multiple times
2017-05-05 21:45:05.379  INFO 71 --- [ StreamThread-1]
o.a.k.s.p.internals.StreamThread         : stream-thread [StreamThread-1]
State transition from RUNNING to PARTITIONS_REVOKED.
*// The above is repeated for threads 2, 3 and 5*
(no logs omitted!! This is really the next entry, ~10 minutes later)
2017-05-05 21:55:16.835  INFO 71 --- [ StreamThread-4]
o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread [StreamThread-4]
Constructed client metadata ...

During the above 10 minutes all threads show the following
"StreamThread-5" - Thread t@57
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <4a60e0dd> (a sun.nio.ch.Util$2)
        - locked <5e54059f> (a java.util.Collections$UnmodifiableSet)
        - locked <60693986> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at
org.apache.kafka.common.network.Selector.select(Selector.java:489)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:298)
        at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
        - locked <5f1ae6c1> (a
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

   Locked ownable synchronizers:
        - None

"StreamThread-4" - Thread t@55
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <5709c085> (a sun.nio.ch.Util$2)
        - locked <1cacaaf> (a java.util.Collections$UnmodifiableSet)
        - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at
org.apache.kafka.common.network.Selector.select(Selector.java:489)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:298)
        at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
        - locked <639d53ee> (a
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

   Locked ownable synchronizers:
        - None

"StreamThread-3" - Thread t@53
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <123193f7> (a sun.nio.ch.Util$2)
        - locked <6c3704d3> (a java.util.Collections$UnmodifiableSet)
        - locked <45bbb5da> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at
org.apache.kafka.common.network.Selector.select(Selector.java:489)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:298)
        at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
        - locked <4d9f6f42> (a
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

   Locked ownable synchronizers:
        - None

"StreamThread-2" - Thread t@51
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <532ff32d> (a sun.nio.ch.Util$2)
        - locked <76a6407> (a java.util.Collections$UnmodifiableSet)
        - locked <1f670455> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at
org.apache.kafka.common.network.Selector.select(Selector.java:489)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:298)
        at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
        - locked <29b48d84> (a
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

   Locked ownable synchronizers:
        - None

"StreamThread-1" - Thread t@49
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:269)
        at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:86)
        - locked <5aeb504> (a sun.nio.ch.Util$2)
        - locked <5130a3ea> (a java.util.Collections$UnmodifiableSet)
        - locked <76d25035> (a sun.nio.ch.EPollSelectorImpl)
        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
        at
org.apache.kafka.common.network.Selector.select(Selector.java:489)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:298)
        at
org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:226)
        - locked <7b072bc6> (a
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
        at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:347)
        at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
        at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

   Locked ownable synchronizers:
        - None

Eventually they get assigned partitions again, then they are revoked,
another long time passes, threads 1, 2, 3 and 5 stuck on Sensor and we get
into the same situation.

Finally, I tried starting up only 1 instance (with 5 threads). Current
status:

"StreamThread-5" - Thread t@56
   java.lang.Thread.State: RUNNABLE
        at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169)
        - locked <6a55b7c6> (a org.apache.kafka.common.metrics.Sensor)
        at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176)
        at
org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
        at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190)
        at
org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139)
        at
org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268)
        at
org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
        - locked <5bea9b1b> (a java.util.PriorityQueue)
        at
org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251)
        at
org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

   Locked ownable synchronizers:
        - None

"StreamThread-4" - Thread t@54
   java.lang.Thread.State: RUNNABLE
        at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169)
        - locked <6a55b7c6> (a org.apache.kafka.common.metrics.Sensor)
        at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176)
        at
org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
        at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190)
        at
org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139)
        at
org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268)
        at
org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
        - locked <5a06855> (a java.util.PriorityQueue)
        at
org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251)
        at
org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

   Locked ownable synchronizers:
        - None

"StreamThread-3" - Thread t@52
   java.lang.Thread.State: BLOCKED
        at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169)
        - waiting to lock <6a55b7c6> (a
org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-5" t@56
        at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176)
        at
org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
        at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190)
        at
org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139)
        at
org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268)
        at
org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
        - locked <d3a57bc> (a java.util.PriorityQueue)
        at
org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251)
        at
org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

   Locked ownable synchronizers:
        - None

"StreamThread-2" - Thread t@50
   java.lang.Thread.State: RUNNABLE
        at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:175)
        at
org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
        at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190)
        at
org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139)
        at
org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268)
        at
org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
        - locked <480f4efc> (a java.util.PriorityQueue)
        at
org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251)
        at
org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

   Locked ownable synchronizers:
        - None

"StreamThread-1" - Thread t@48
   java.lang.Thread.State: BLOCKED
        at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:169)
        - waiting to lock <6a55b7c6> (a
org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-5" t@56
        at org.apache.kafka.common.metrics.Sensor.record(Sensor.java:176)
        at
org.apache.kafka.streams.processor.internals.StreamThread$StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
        at
org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:190)
        at
org.apache.kafka.streams.processor.internals.ProcessorNode.punctuate(ProcessorNode.java:139)
        at
org.apache.kafka.streams.processor.internals.StreamTask.punctuate(StreamTask.java:268)
        at
org.apache.kafka.streams.processor.internals.PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
        - locked <47b226a5> (a java.util.PriorityQueue)
        at
org.apache.kafka.streams.processor.internals.StreamTask.maybePunctuate(StreamTask.java:251)
        at
org.apache.kafka.streams.processor.internals.StreamThread.maybePunctuate(StreamThread.java:751)
        at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:633)
        at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361)

   Locked ownable synchronizers:
        - None

This has been going on for over 40 minutes now and the cluster does not
stabilize. Not sure what to do here, any help welcome.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message