kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Large Kafka Streams deployment takes a long time to bootstrap
Date Mon, 08 May 2017 22:17:11 GMT
Hello,

Just to adds a few more pointers that there is a few improvements we have
added in trunk and are considering to also piggy-back to a 0.10.2 in case
we can have a 0.10.2.2 release, and one of them that would help with this
case:

https://cwiki.apache.org/confluence/display/KAFKA/KIP-134%3A+Delay+initial+consumer+group+rebalance

The key idea is that with many instances of the same app starting up at the
same time, in your case 5 * 5 = 25 threads, we can consider 1) reduce the
latency of a single rebalance, 2) reduce the number of consecutive
rebalances until all instances are up and running, and the above one is
aimed for the second case. So I'd suggest taking a look at the app's logs
and see if there are multiple rebalances triggered during the starting up,
and if yes the above fix may help the most.


Guozhang


On Mon, May 8, 2017 at 7:41 AM, João Peixoto <joao.hartimer@gmail.com>
wrote:

> Thanks for the feedback. Here is additional information:
>
> * The stream instances are deployed on kubernetes through deployments. I do
> not know if they use emptyDir, hostPath or EBS
> * The instances have 2 cores minimum
>
> Good advice on the state stores, I already had some of those
> configurations, but for this issue in particular the state stores are
> empty, since this happens when the kafka stream bootstraps for the first
> time.
>
>
>
> On Sat, May 6, 2017 at 7:31 AM Eno Thereska <eno.thereska@gmail.com>
> wrote:
>
> > Hi there,
> >
> > I wanted to add something: how many CPU cores does each of your
> Kubernetes
> > instance have? In 0.10.2.1 we noticed a regression in environments with 1
> > core as described in https://issues.apache.org/jira/browse/KAFKA-5174 <
> > https://issues.apache.org/jira/browse/KAFKA-5174>.
> >
> > If you have 1 core, the workaround is to change a config as described
> here:
> >
> > http://docs.confluent.io/current/streams/upgrade-guide.
> html#known-issues-and-workarounds
> > <
> > http://docs.confluent.io/current/streams/upgrade-guide.
> html#known-issues-and-workarounds
> > >
> >
> > Thanks
> > Eno
> >
> >
> > > On May 6, 2017, at 9:48 AM, Sachin Mittal <sjmittal@gmail.com> wrote:
> > >
> > > Note on few things.
> > > Set changelog topic delete retention time to as less as possible if the
> > > previous values for same key are not needed and can be safely cleaned
> up.
> > > Set segment size and segment retention time also low so older segments
> > can
> > > be compacted and cleaned up.
> > > Set delete ratio to be aggressive 0.01 so segments don't grow to big.
> > >
> > > This way state stores would be created much faster.
> > >
> > > Also when using Windows smaller window size helps.
> > >
> > > Try not running many stream threads on single machine unless you have a
> > > great hardware.
> > >
> > > Make sure a thread is not reading from many partitions. Make sure ratio
> > of
> > > partions to total threads is low.
> > >
> > > Hope this helps.
> > >
> > > Sachin
> > >
> > > On 6 May 2017 13:28, "Shimi Kiviti" <shimi.k@gmail.com> wrote:
> > >
> > >> This is very similar to issues that we see.
> > >>
> > >> Did you check the status of the consumer group? In my case it will be
> in
> > >> rebalancing most of the time. Once in a while it will show consumers
> and
> > >> offsets but after a short time will go back to rebalancing.
> > >>
> > >> How much storage does your Kafka-streams use?
> > >> Also, what is your k8s configuration?
> > >> Deployment? Deployment with emptyDir, hostPath or EBS? Statefulset?
> > >>
> > >> Thanks,
> > >> Shimi
> > >> On Sat, 6 May 2017 at 2:34 João Peixoto <joao.hartimer@gmail.com>
> > wrote:
> > >>
> > >>> After a while the instance started running.
> > >>>
> > >>> 2017-05-05 22:40:26.806  INFO 85 --- [ StreamThread-4]
> > >>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>> Committing task StreamTask 1_62
> > >>> (this is literally the next message)
> > >>> 2017-05-05 23:13:27.820  INFO 85 --- [ StreamThread-4]
> > >>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>> Committing all tasks because the commit interval 10000ms has elapsed
> > >>>
> > >>> On Fri, May 5, 2017 at 3:48 PM João Peixoto <joao.hartimer@gmail.com
> >
> > >>> wrote:
> > >>>
> > >>>> Warning, long message
> > >>>>
> > >>>> *Problem*: Initializing a Kafka Stream is taking a loooong time.
> > >>>> Currently at the 40 minute mark
> > >>>>
> > >>>> *Setup*:
> > >>>> 2 co-partition topics with 100 partitions.
> > >>>> First topic contains a lot of messages in the order of hundreds of
> > >>> millions
> > >>>> Second topic is a KTable and contains ~30k records
> > >>>>
> > >>>> Kafka cluster with 6 brokers running 0.10.1
> > >>>>
> > >>>> Kafka streams running on 0.10.2.1. 5 instances with 5 threads each.
> > >>>> The instances are running on Kubernetes
> > >>>>
> > >>>> *Stream Configuration*:
> > >>>> Properties props = new Properties();
> > >>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName);
> > >>>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
> > >>>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > >>>> Serdes.String().getClass().getName());
> > >>>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > >>>> Serdes.ByteArray().getClass().getName());
> > >>>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);
> > >>>> props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
> > >>>> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5);
> > >>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> > >>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
> > >>>>
> > >>>> *The events*:
> > >>>> I started 5 instances of my stream configuration at the same time.
> > This
> > >>> is
> > >>>> the first
> > >>>> time this configuration is running.
> > >>>>
> > >>>> 2017-05-05 21:23:03.283  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-1]
> > >>>> Creating producer client
> > >>>> 2017-05-05 21:23:03.415  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-1]
> > >>>> Creating consumer client
> > >>>> 2017-05-05 21:23:03.520  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-1]
> > >>>> Creating restore consumer client
> > >>>> 2017-05-05 21:23:03.528  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-1]
> > >>>> State transition from NOT_RUNNING to RUNNING.
> > >>>> 2017-05-05 21:23:03.531  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-2]
> > >>>> Creating producer client
> > >>>> 2017-05-05 21:23:03.564  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-2]
> > >>>> Creating consumer client
> > >>>> 2017-05-05 21:23:03.569  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-2]
> > >>>> Creating restore consumer client
> > >>>> 2017-05-05 21:23:03.615  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-2]
> > >>>> State transition from NOT_RUNNING to RUNNING.
> > >>>> 2017-05-05 21:23:03.617  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-3]
> > >>>> Creating producer client
> > >>>> 2017-05-05 21:23:03.621  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-3]
> > >>>> Creating consumer client
> > >>>> 2017-05-05 21:23:03.625  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-3]
> > >>>> Creating restore consumer client
> > >>>> 2017-05-05 21:23:03.628  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-3]
> > >>>> State transition from NOT_RUNNING to RUNNING.
> > >>>> 2017-05-05 21:23:03.629  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> Creating producer client
> > >>>> 2017-05-05 21:23:03.632  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> Creating consumer client
> > >>>> 2017-05-05 21:23:03.635  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> Creating restore consumer client
> > >>>> 2017-05-05 21:23:03.638  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> State transition from NOT_RUNNING to RUNNING.
> > >>>> 2017-05-05 21:23:03.639  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-5]
> > >>>> Creating producer client
> > >>>> 2017-05-05 21:23:03.641  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-5]
> > >>>> Creating consumer client
> > >>>> 2017-05-05 21:23:03.644  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-5]
> > >>>> Creating restore consumer client
> > >>>> 2017-05-05 21:23:03.647  INFO 71 --- [           main]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-5]
> > >>>> State transition from NOT_RUNNING to RUNNING.
> > >>>> 2017-05-05 21:23:03.790  INFO 71 --- [ StreamThread-1]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-1]
> > >>>> Starting
> > >>>> 2017-05-05 21:23:03.791  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> Starting
> > >>>> 2017-05-05 21:23:03.790  INFO 71 --- [ StreamThread-2]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-2]
> > >>>> Starting
> > >>>> 2017-05-05 21:23:03.791  INFO 71 --- [ StreamThread-3]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-3]
> > >>>> Starting
> > >>>> 2017-05-05 21:23:03.792  INFO 71 --- [ StreamThread-5]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-5]
> > >>>> Starting
> > >>>> 2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-1]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-1]
> > >>>> at state RUNNING: partitions [] revoked at the beginning of consumer
> > >>>> rebalance.
> > >>>> 2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-2]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-2]
> > >>>> at state RUNNING: partitions [] revoked at the beginning of consumer
> > >>>> rebalance.
> > >>>> 2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> at state RUNNING: partitions [] revoked at the beginning of consumer
> > >>>> rebalance.
> > >>>> 2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-1]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-1]
> > >>>> State transition from RUNNING to PARTITIONS_REVOKED.
> > >>>> 2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-3]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-3]
> > >>>> at state RUNNING: partitions [] revoked at the beginning of consumer
> > >>>> rebalance.
> > >>>> 2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-2]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-2]
> > >>>> State transition from RUNNING to PARTITIONS_REVOKED.
> > >>>> 2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-5]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-5]
> > >>>> at state RUNNING: partitions [] revoked at the beginning of consumer
> > >>>> rebalance.
> > >>>> 2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> State transition from RUNNING to PARTITIONS_REVOKED.
> > >>>> 2017-05-05 21:23:03.968  INFO 71 --- [ StreamThread-3]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-3]
> > >>>> State transition from RUNNING to PARTITIONS_REVOKED.
> > >>>> 2017-05-05 21:23:03.968  INFO 71 --- [ StreamThread-5]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-5]
> > >>>> State transition from RUNNING to PARTITIONS_REVOKED.
> > >>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-2]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-2]
> > >>>> Updating suspended tasks to contain active tasks []
> > >>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> Updating suspended tasks to contain active tasks []
> > >>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-5]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-5]
> > >>>> Updating suspended tasks to contain active tasks []
> > >>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-1]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-1]
> > >>>> Updating suspended tasks to contain active tasks []
> > >>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-3]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-3]
> > >>>> Updating suspended tasks to contain active tasks []
> > >>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-2]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-2]
> > >>>> Removing all active tasks []
> > >>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> Removing all active tasks []
> > >>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-5]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-5]
> > >>>> Removing all active tasks []
> > >>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-1]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-1]
> > >>>> Removing all active tasks []
> > >>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-3]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-3]
> > >>>> Removing all active tasks []
> > >>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-2]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-2]
> > >>>> Removing all standby tasks []
> > >>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> Removing all standby tasks []
> > >>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-5]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-5]
> > >>>> Removing all standby tasks []
> > >>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-1]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-1]
> > >>>> Removing all standby tasks []
> > >>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-3]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-3]
> > >>>> Removing all standby tasks []
> > >>>> 2017-05-05 21:23:04.020  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> > >> [StreamThread-4]
> > >>>> Constructed client metadata
> > >>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null,
> > >>>> consumers=[<consumerlist>], state=[activeTasks: ([]) assignedTasks:
> > >> ([])
> > >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost:
> > >> 0.0]}}
> > >>>> from the member subscriptions.
> > >>>> 2017-05-05 21:23:04.218  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> > >> [StreamThread-4]
> > >>>> Completed validating internal topics in partition assignor
> > >>>> 2017-05-05 21:23:04.591  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> > >> [StreamThread-4]
> > >>>> Completed validating internal topics in partition assignor
> > >>>> 2017-05-05 21:23:04.726  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> > >> [StreamThread-4]
> > >>>> Assigned tasks to clients as
> > >>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>])
> > >>>> assignedTasks: ([<list>]) prevActiveTasks: ([]) prevAssignedTasks:
> > ([])
> > >>>> capacity: 1.0 cost: 100.0]}.
> > >>>> 2017-05-05 21:23:04.742  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> > >> [StreamThread-4]
> > >>>> Constructed client metadata
> > >>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null,
> > >>>> consumers=[<consumerlist>], state=[activeTasks: ([]) assignedTasks:
> > >> ([])
> > >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost:
> > >> 0.0]}}
> > >>>> from the member subscriptions.
> > >>>> 2017-05-05 21:23:05.120  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> > >> [StreamThread-4]
> > >>>> Completed validating internal topics in partition assignor
> > >>>> 2017-05-05 21:23:05.482  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> > >> [StreamThread-4]
> > >>>> Completed validating internal topics in partition assignor
> > >>>> 2017-05-05 21:23:05.520  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> > >> [StreamThread-4]
> > >>>> Assigned tasks to clients as
> > >>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>])
> > >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost:
> > >> 50.0],
> > >>>> da663a61-dada-478b-b060-78d77536530a=[activeTasks: ([<list>])
> > >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost:
> > >> 50.0]}.
> > >>>> 2017-05-05 21:23:05.553  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> at state PARTITIONS_REVOKED: new partitions [<partitionlist>]
> assigned
> > >> at
> > >>>> the end of consumer rebalance.
> > >>>> *// The above line is repeated for each thread*
> > >>>> 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
> > >>>> 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-5]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-5]
> > >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
> > >>>> 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-2]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-2]
> > >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
> > >>>> 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-1]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-1]
> > >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
> > >>>> 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-3]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-3]
> > >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
> > >>>> *// omitted*
> > >>>> 2017-05-05 21:23:15.596  INFO 71 --- [ StreamThread-2]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-2]
> > >>>> State transition from ASSIGNING_PARTITIONS to RUNNING.
> > >>>> *// above message repeated for each thread*
> > >>>>
> > >>>> *Important*: At this point only StreamThread-4 is performing commits
> > >>>> every 10 seconds. The other threads output no logs. Now the fun
> begins
> > >>>>
> > >>>> 2017-05-05 21:29:21.310  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> State transition from RUNNING to PARTITIONS_REVOKED.
> > >>>> 2017-05-05 21:29:21.310  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> Closing task's topology ... // repeated multiple times
> > >>>> 2017-05-05 21:29:21.387  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> Flushing state stores of task ... // repeated multiple times
> > >>>> 2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> Committing consumer offsets of task ... // repeated multiple times
> > >>>> 2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> Updating suspended tasks to contain active tasks [<list>]
> > >>>> 2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> Removing all active tasks [<list>]
> > >>>> 2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-4]
> > >>>> Removing all standby tasks []
> > >>>>
> > >>>>
> > >>>> At this point there are no more log messages for 16 minutes!! During
> > >> this
> > >>>> time I perform several threaddumps, almost every minute.
> > >>>> Thread dump below. Do notice that thread 4 is the only different
> one.
> > >>>>
> > >>>> "StreamThread-5" - Thread t@57
> > >>>>   java.lang.Thread.State: BLOCKED
> > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > >> java:169)
> > >>>>        - waiting to lock <653f9d6> (a
> > >>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-1"
> t@49
> > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > >> java:176)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread$
> > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > >> measureLatencyNs(StreamsMetricsImpl.java:190)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> ProcessorNode.punctuate(
> > >> ProcessorNode.java:139)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> > >> StreamTask.java:268)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > >>>>        - locked <3fbbc5a8> (a java.util.PriorityQueue)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> StreamTask.maybePunctuate(
> > >> StreamTask.java:251)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.maybePunctuate(StreamThread.java:751)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:633)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:361)
> > >>>>
> > >>>>   Locked ownable synchronizers:
> > >>>>        - None
> > >>>>
> > >>>> "StreamThread-4" - Thread t@55
> > >>>>   java.lang.Thread.State: RUNNABLE
> > >>>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > >>>>        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
> > >> 269)
> > >>>>        at
> > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> > >>>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
> > >> java:86)
> > >>>>        - locked <5709c085> (a sun.nio.ch.Util$2)
> > >>>>        - locked <1cacaaf> (a java.util.Collections$UnmodifiableSet)
> > >>>>        - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl)
> > >>>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> > >>>>        at
> > >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489)
> > >>>>        at
> > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298)
> > >>>>        at
> > >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(
> > >> ConsumerNetworkClient.java:226)
> > >>>>        - locked <639d53ee> (a
> > >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(
> > >> ConsumerNetworkClient.java:172)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > >> joinGroupIfNeeded(AbstractCoordinator.java:347)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > >> ensureActiveGroup(AbstractCoordinator.java:303)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(
> > >> ConsumerCoordinator.java:290)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.
> > >> pollOnce(KafkaConsumer.java:1029)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > >> KafkaConsumer.java:995)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:592)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:361)
> > >>>>
> > >>>>   Locked ownable synchronizers:
> > >>>>        - None
> > >>>>
> > >>>> "StreamThread-3" - Thread t@53
> > >>>>   java.lang.Thread.State: RUNNABLE
> > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > >> java:169)
> > >>>>        - locked <653f9d6> (a org.apache.kafka.common.
> metrics.Sensor)
> > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > >> java:176)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread$
> > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > >> measureLatencyNs(StreamsMetricsImpl.java:190)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> ProcessorNode.punctuate(
> > >> ProcessorNode.java:139)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> > >> StreamTask.java:268)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > >>>>        - locked <7a09baf0> (a java.util.PriorityQueue)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> StreamTask.maybePunctuate(
> > >> StreamTask.java:251)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.maybePunctuate(StreamThread.java:751)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:633)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:361)
> > >>>>
> > >>>>   Locked ownable synchronizers:
> > >>>>        - None
> > >>>>
> > >>>> "StreamThread-2" - Thread t@51
> > >>>>   java.lang.Thread.State: BLOCKED
> > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > >> java:169)
> > >>>>        - waiting to lock <653f9d6> (a
> > >>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-1"
> t@49
> > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > >> java:176)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread$
> > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > >> measureLatencyNs(StreamsMetricsImpl.java:190)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> ProcessorNode.punctuate(
> > >> ProcessorNode.java:139)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> > >> StreamTask.java:268)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > >>>>        - locked <2dc188ac> (a java.util.PriorityQueue)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> StreamTask.maybePunctuate(
> > >> StreamTask.java:251)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.maybePunctuate(StreamThread.java:751)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:633)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:361)
> > >>>>
> > >>>>   Locked ownable synchronizers:
> > >>>>        - None
> > >>>>
> > >>>> "StreamThread-1" - Thread t@49
> > >>>>   java.lang.Thread.State: RUNNABLE
> > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > >> java:172)
> > >>>>        - locked <653f9d6> (a org.apache.kafka.common.
> metrics.Sensor)
> > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > >> java:176)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread$
> > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > >> measureLatencyNs(StreamsMetricsImpl.java:190)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> ProcessorNode.punctuate(
> > >> ProcessorNode.java:139)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> > >> StreamTask.java:268)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > >>>>        - locked <7dffc3aa> (a java.util.PriorityQueue)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> StreamTask.maybePunctuate(
> > >> StreamTask.java:251)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.maybePunctuate(StreamThread.java:751)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:633)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:361)
> > >>>>
> > >>>>   Locked ownable synchronizers:
> > >>>>        - None
> > >>>>
> > >>>>
> > >>>> (no logs omitted, ~16 minutes later)
> > >>>> 2017-05-05 21:45:05.270  INFO 71 --- [ StreamThread-1]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-1]
> > >>>> Committing all tasks because the commit interval 10000ms has elapsed
> > >>>> 2017-05-05 21:45:05.270  INFO 71 --- [ StreamThread-1]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-1]
> > >>>> Committing task StreamTask .. // repeated multiple times
> > >>>> 2017-05-05 21:45:05.379  INFO 71 --- [ StreamThread-1]
> > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > >> [StreamThread-1]
> > >>>> State transition from RUNNING to PARTITIONS_REVOKED.
> > >>>> *// The above is repeated for threads 2, 3 and 5*
> > >>>> (no logs omitted!! This is really the next entry, ~10 minutes later)
> > >>>> 2017-05-05 21:55:16.835  INFO 71 --- [ StreamThread-4]
> > >>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> > >> [StreamThread-4]
> > >>>> Constructed client metadata ...
> > >>>>
> > >>>> During the above 10 minutes all threads show the following
> > >>>> "StreamThread-5" - Thread t@57
> > >>>>   java.lang.Thread.State: RUNNABLE
> > >>>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > >>>>        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
> > >> 269)
> > >>>>        at
> > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> > >>>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
> > >> java:86)
> > >>>>        - locked <4a60e0dd> (a sun.nio.ch.Util$2)
> > >>>>        - locked <5e54059f> (a java.util.Collections$
> UnmodifiableSet)
> > >>>>        - locked <60693986> (a sun.nio.ch.EPollSelectorImpl)
> > >>>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> > >>>>        at
> > >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489)
> > >>>>        at
> > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298)
> > >>>>        at
> > >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(
> > >> ConsumerNetworkClient.java:226)
> > >>>>        - locked <5f1ae6c1> (a
> > >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(
> > >> ConsumerNetworkClient.java:172)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > >> joinGroupIfNeeded(AbstractCoordinator.java:347)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > >> ensureActiveGroup(AbstractCoordinator.java:303)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(
> > >> ConsumerCoordinator.java:290)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.
> > >> pollOnce(KafkaConsumer.java:1029)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > >> KafkaConsumer.java:995)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:592)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:361)
> > >>>>
> > >>>>   Locked ownable synchronizers:
> > >>>>        - None
> > >>>>
> > >>>> "StreamThread-4" - Thread t@55
> > >>>>   java.lang.Thread.State: RUNNABLE
> > >>>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > >>>>        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
> > >> 269)
> > >>>>        at
> > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> > >>>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
> > >> java:86)
> > >>>>        - locked <5709c085> (a sun.nio.ch.Util$2)
> > >>>>        - locked <1cacaaf> (a java.util.Collections$UnmodifiableSet)
> > >>>>        - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl)
> > >>>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> > >>>>        at
> > >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489)
> > >>>>        at
> > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298)
> > >>>>        at
> > >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(
> > >> ConsumerNetworkClient.java:226)
> > >>>>        - locked <639d53ee> (a
> > >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(
> > >> ConsumerNetworkClient.java:172)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > >> joinGroupIfNeeded(AbstractCoordinator.java:347)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > >> ensureActiveGroup(AbstractCoordinator.java:303)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(
> > >> ConsumerCoordinator.java:290)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.
> > >> pollOnce(KafkaConsumer.java:1029)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > >> KafkaConsumer.java:995)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:592)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:361)
> > >>>>
> > >>>>   Locked ownable synchronizers:
> > >>>>        - None
> > >>>>
> > >>>> "StreamThread-3" - Thread t@53
> > >>>>   java.lang.Thread.State: RUNNABLE
> > >>>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > >>>>        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
> > >> 269)
> > >>>>        at
> > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> > >>>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
> > >> java:86)
> > >>>>        - locked <123193f7> (a sun.nio.ch.Util$2)
> > >>>>        - locked <6c3704d3> (a java.util.Collections$
> UnmodifiableSet)
> > >>>>        - locked <45bbb5da> (a sun.nio.ch.EPollSelectorImpl)
> > >>>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> > >>>>        at
> > >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489)
> > >>>>        at
> > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298)
> > >>>>        at
> > >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(
> > >> ConsumerNetworkClient.java:226)
> > >>>>        - locked <4d9f6f42> (a
> > >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(
> > >> ConsumerNetworkClient.java:172)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > >> joinGroupIfNeeded(AbstractCoordinator.java:347)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > >> ensureActiveGroup(AbstractCoordinator.java:303)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(
> > >> ConsumerCoordinator.java:290)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.
> > >> pollOnce(KafkaConsumer.java:1029)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > >> KafkaConsumer.java:995)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:592)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:361)
> > >>>>
> > >>>>   Locked ownable synchronizers:
> > >>>>        - None
> > >>>>
> > >>>> "StreamThread-2" - Thread t@51
> > >>>>   java.lang.Thread.State: RUNNABLE
> > >>>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > >>>>        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
> > >> 269)
> > >>>>        at
> > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> > >>>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
> > >> java:86)
> > >>>>        - locked <532ff32d> (a sun.nio.ch.Util$2)
> > >>>>        - locked <76a6407> (a java.util.Collections$UnmodifiableSet)
> > >>>>        - locked <1f670455> (a sun.nio.ch.EPollSelectorImpl)
> > >>>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> > >>>>        at
> > >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489)
> > >>>>        at
> > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298)
> > >>>>        at
> > >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(
> > >> ConsumerNetworkClient.java:226)
> > >>>>        - locked <29b48d84> (a
> > >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(
> > >> ConsumerNetworkClient.java:172)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > >> joinGroupIfNeeded(AbstractCoordinator.java:347)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > >> ensureActiveGroup(AbstractCoordinator.java:303)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(
> > >> ConsumerCoordinator.java:290)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.
> > >> pollOnce(KafkaConsumer.java:1029)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > >> KafkaConsumer.java:995)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:592)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:361)
> > >>>>
> > >>>>   Locked ownable synchronizers:
> > >>>>        - None
> > >>>>
> > >>>> "StreamThread-1" - Thread t@49
> > >>>>   java.lang.Thread.State: RUNNABLE
> > >>>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > >>>>        at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
> > >> 269)
> > >>>>        at
> > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> > >>>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
> > >> java:86)
> > >>>>        - locked <5aeb504> (a sun.nio.ch.Util$2)
> > >>>>        - locked <5130a3ea> (a java.util.Collections$
> UnmodifiableSet)
> > >>>>        - locked <76d25035> (a sun.nio.ch.EPollSelectorImpl)
> > >>>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> > >>>>        at
> > >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489)
> > >>>>        at
> > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298)
> > >>>>        at
> > >>>> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(
> > >> ConsumerNetworkClient.java:226)
> > >>>>        - locked <7b072bc6> (a
> > >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.
> ConsumerNetworkClient.poll(
> > >> ConsumerNetworkClient.java:172)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > >> joinGroupIfNeeded(AbstractCoordinator.java:347)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > >> ensureActiveGroup(AbstractCoordinator.java:303)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.internals.
> ConsumerCoordinator.poll(
> > >> ConsumerCoordinator.java:290)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.
> > >> pollOnce(KafkaConsumer.java:1029)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > >> KafkaConsumer.java:995)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:592)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:361)
> > >>>>
> > >>>>   Locked ownable synchronizers:
> > >>>>        - None
> > >>>>
> > >>>> Eventually they get assigned partitions again, then they are
> revoked,
> > >>>> another long time passes, threads 1, 2, 3 and 5 stuck on Sensor and
> we
> > >>> get
> > >>>> into the same situation.
> > >>>>
> > >>>> Finally, I tried starting up only 1 instance (with 5 threads).
> Current
> > >>>> status:
> > >>>>
> > >>>> "StreamThread-5" - Thread t@56
> > >>>>   java.lang.Thread.State: RUNNABLE
> > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > >> java:169)
> > >>>>        - locked <6a55b7c6> (a org.apache.kafka.common.
> metrics.Sensor)
> > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > >> java:176)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread$
> > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > >> measureLatencyNs(StreamsMetricsImpl.java:190)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> ProcessorNode.punctuate(
> > >> ProcessorNode.java:139)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> > >> StreamTask.java:268)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > >>>>        - locked <5bea9b1b> (a java.util.PriorityQueue)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> StreamTask.maybePunctuate(
> > >> StreamTask.java:251)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.maybePunctuate(StreamThread.java:751)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:633)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:361)
> > >>>>
> > >>>>   Locked ownable synchronizers:
> > >>>>        - None
> > >>>>
> > >>>> "StreamThread-4" - Thread t@54
> > >>>>   java.lang.Thread.State: RUNNABLE
> > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > >> java:169)
> > >>>>        - locked <6a55b7c6> (a org.apache.kafka.common.
> metrics.Sensor)
> > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > >> java:176)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread$
> > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > >> measureLatencyNs(StreamsMetricsImpl.java:190)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> ProcessorNode.punctuate(
> > >> ProcessorNode.java:139)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> > >> StreamTask.java:268)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > >>>>        - locked <5a06855> (a java.util.PriorityQueue)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> StreamTask.maybePunctuate(
> > >> StreamTask.java:251)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.maybePunctuate(StreamThread.java:751)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:633)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:361)
> > >>>>
> > >>>>   Locked ownable synchronizers:
> > >>>>        - None
> > >>>>
> > >>>> "StreamThread-3" - Thread t@52
> > >>>>   java.lang.Thread.State: BLOCKED
> > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > >> java:169)
> > >>>>        - waiting to lock <6a55b7c6> (a
> > >>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-5"
> t@56
> > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > >> java:176)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread$
> > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > >> measureLatencyNs(StreamsMetricsImpl.java:190)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> ProcessorNode.punctuate(
> > >> ProcessorNode.java:139)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> > >> StreamTask.java:268)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > >>>>        - locked <d3a57bc> (a java.util.PriorityQueue)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> StreamTask.maybePunctuate(
> > >> StreamTask.java:251)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.maybePunctuate(StreamThread.java:751)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:633)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:361)
> > >>>>
> > >>>>   Locked ownable synchronizers:
> > >>>>        - None
> > >>>>
> > >>>> "StreamThread-2" - Thread t@50
> > >>>>   java.lang.Thread.State: RUNNABLE
> > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > >> java:175)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread$
> > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > >> measureLatencyNs(StreamsMetricsImpl.java:190)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> ProcessorNode.punctuate(
> > >> ProcessorNode.java:139)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> > >> StreamTask.java:268)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > >>>>        - locked <480f4efc> (a java.util.PriorityQueue)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> StreamTask.maybePunctuate(
> > >> StreamTask.java:251)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.maybePunctuate(StreamThread.java:751)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:633)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:361)
> > >>>>
> > >>>>   Locked ownable synchronizers:
> > >>>>        - None
> > >>>>
> > >>>> "StreamThread-1" - Thread t@48
> > >>>>   java.lang.Thread.State: BLOCKED
> > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > >> java:169)
> > >>>>        - waiting to lock <6a55b7c6> (a
> > >>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-5"
> t@56
> > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > >> java:176)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread$
> > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > >> measureLatencyNs(StreamsMetricsImpl.java:190)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> ProcessorNode.punctuate(
> > >> ProcessorNode.java:139)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> > >> StreamTask.java:268)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > >>>>        - locked <47b226a5> (a java.util.PriorityQueue)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> StreamTask.maybePunctuate(
> > >> StreamTask.java:251)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.maybePunctuate(StreamThread.java:751)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > >> StreamThread.java:633)
> > >>>>        at
> > >>>>
> > >>> org.apache.kafka.streams.processor.internals.
> > >> StreamThread.run(StreamThread.java:361)
> > >>>>
> > >>>>   Locked ownable synchronizers:
> > >>>>        - None
> > >>>>
> > >>>> This has been going on for over 40 minutes now and the cluster does
> > not
> > >>>> stabilize. Not sure what to do here, any help welcome.
> > >>>>
> > >>>>
> > >>>
> > >>
> >
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message