kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From João Peixoto <joao.harti...@gmail.com>
Subject Re: Large Kafka Streams deployment takes a long time to bootstrap
Date Tue, 09 May 2017 18:35:57 GMT
Guozhang thanks a lot for that info, that is exactly what I'm observing it
seems.

I'll keep an eye out.

JP

On Mon, May 8, 2017 at 3:17 PM Guozhang Wang <wangguoz@gmail.com> wrote:

> Hello,
>
> Just to adds a few more pointers that there is a few improvements we have
> added in trunk and are considering to also piggy-back to a 0.10.2 in case
> we can have a 0.10.2.2 release, and one of them that would help with this
> case:
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-134%3A+Delay+initial+consumer+group+rebalance
>
> The key idea is that with many instances of the same app starting up at the
> same time, in your case 5 * 5 = 25 threads, we can consider 1) reduce the
> latency of a single rebalance, 2) reduce the number of consecutive
> rebalances until all instances are up and running, and the above one is
> aimed for the second case. So I'd suggest taking a look at the app's logs
> and see if there are multiple rebalances triggered during the starting up,
> and if yes the above fix may help the most.
>
>
> Guozhang
>
>
> On Mon, May 8, 2017 at 7:41 AM, João Peixoto <joao.hartimer@gmail.com>
> wrote:
>
> > Thanks for the feedback. Here is additional information:
> >
> > * The stream instances are deployed on kubernetes through deployments. I
> do
> > not know if they use emptyDir, hostPath or EBS
> > * The instances have 2 cores minimum
> >
> > Good advice on the state stores, I already had some of those
> > configurations, but for this issue in particular the state stores are
> > empty, since this happens when the kafka stream bootstraps for the first
> > time.
> >
> >
> >
> > On Sat, May 6, 2017 at 7:31 AM Eno Thereska <eno.thereska@gmail.com>
> > wrote:
> >
> > > Hi there,
> > >
> > > I wanted to add something: how many CPU cores does each of your
> > Kubernetes
> > > instance have? In 0.10.2.1 we noticed a regression in environments
> with 1
> > > core as described in https://issues.apache.org/jira/browse/KAFKA-5174
> <
> > > https://issues.apache.org/jira/browse/KAFKA-5174>.
> > >
> > > If you have 1 core, the workaround is to change a config as described
> > here:
> > >
> > > http://docs.confluent.io/current/streams/upgrade-guide.
> > html#known-issues-and-workarounds
> > > <
> > > http://docs.confluent.io/current/streams/upgrade-guide.
> > html#known-issues-and-workarounds
> > > >
> > >
> > > Thanks
> > > Eno
> > >
> > >
> > > > On May 6, 2017, at 9:48 AM, Sachin Mittal <sjmittal@gmail.com>
> wrote:
> > > >
> > > > Note on few things.
> > > > Set changelog topic delete retention time to as less as possible if
> the
> > > > previous values for same key are not needed and can be safely cleaned
> > up.
> > > > Set segment size and segment retention time also low so older
> segments
> > > can
> > > > be compacted and cleaned up.
> > > > Set delete ratio to be aggressive 0.01 so segments don't grow to big.
> > > >
> > > > This way state stores would be created much faster.
> > > >
> > > > Also when using Windows smaller window size helps.
> > > >
> > > > Try not running many stream threads on single machine unless you
> have a
> > > > great hardware.
> > > >
> > > > Make sure a thread is not reading from many partitions. Make sure
> ratio
> > > of
> > > > partions to total threads is low.
> > > >
> > > > Hope this helps.
> > > >
> > > > Sachin
> > > >
> > > > On 6 May 2017 13:28, "Shimi Kiviti" <shimi.k@gmail.com> wrote:
> > > >
> > > >> This is very similar to issues that we see.
> > > >>
> > > >> Did you check the status of the consumer group? In my case it will
> be
> > in
> > > >> rebalancing most of the time. Once in a while it will show consumers
> > and
> > > >> offsets but after a short time will go back to rebalancing.
> > > >>
> > > >> How much storage does your Kafka-streams use?
> > > >> Also, what is your k8s configuration?
> > > >> Deployment? Deployment with emptyDir, hostPath or EBS? Statefulset?
> > > >>
> > > >> Thanks,
> > > >> Shimi
> > > >> On Sat, 6 May 2017 at 2:34 João Peixoto <joao.hartimer@gmail.com>
> > > wrote:
> > > >>
> > > >>> After a while the instance started running.
> > > >>>
> > > >>> 2017-05-05 22:40:26.806  INFO 85 --- [ StreamThread-4]
> > > >>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>> Committing task StreamTask 1_62
> > > >>> (this is literally the next message)
> > > >>> 2017-05-05 23:13:27.820  INFO 85 --- [ StreamThread-4]
> > > >>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>> Committing all tasks because the commit interval 10000ms has
> elapsed
> > > >>>
> > > >>> On Fri, May 5, 2017 at 3:48 PM João Peixoto <
> joao.hartimer@gmail.com
> > >
> > > >>> wrote:
> > > >>>
> > > >>>> Warning, long message
> > > >>>>
> > > >>>> *Problem*: Initializing a Kafka Stream is taking a loooong time.
> > > >>>> Currently at the 40 minute mark
> > > >>>>
> > > >>>> *Setup*:
> > > >>>> 2 co-partition topics with 100 partitions.
> > > >>>> First topic contains a lot of messages in the order of hundreds of
> > > >>> millions
> > > >>>> Second topic is a KTable and contains ~30k records
> > > >>>>
> > > >>>> Kafka cluster with 6 brokers running 0.10.1
> > > >>>>
> > > >>>> Kafka streams running on 0.10.2.1. 5 instances with 5 threads
> each.
> > > >>>> The instances are running on Kubernetes
> > > >>>>
> > > >>>> *Stream Configuration*:
> > > >>>> Properties props = new Properties();
> > > >>>> props.put(StreamsConfig.APPLICATION_ID_CONFIG, streamName);
> > > >>>> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, ...);
> > > >>>> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG,
> > > >>>> Serdes.String().getClass().getName());
> > > >>>> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,
> > > >>>> Serdes.ByteArray().getClass().getName());
> > > >>>> props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10000);
> > > >>>> props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
> > > >>>> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 5);
> > > >>>> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
> > > >>>> props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
> > > >>>>
> > > >>>> *The events*:
> > > >>>> I started 5 instances of my stream configuration at the same time.
> > > This
> > > >>> is
> > > >>>> the first
> > > >>>> time this configuration is running.
> > > >>>>
> > > >>>> 2017-05-05 21:23:03.283  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-1]
> > > >>>> Creating producer client
> > > >>>> 2017-05-05 21:23:03.415  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-1]
> > > >>>> Creating consumer client
> > > >>>> 2017-05-05 21:23:03.520  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-1]
> > > >>>> Creating restore consumer client
> > > >>>> 2017-05-05 21:23:03.528  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-1]
> > > >>>> State transition from NOT_RUNNING to RUNNING.
> > > >>>> 2017-05-05 21:23:03.531  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-2]
> > > >>>> Creating producer client
> > > >>>> 2017-05-05 21:23:03.564  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-2]
> > > >>>> Creating consumer client
> > > >>>> 2017-05-05 21:23:03.569  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-2]
> > > >>>> Creating restore consumer client
> > > >>>> 2017-05-05 21:23:03.615  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-2]
> > > >>>> State transition from NOT_RUNNING to RUNNING.
> > > >>>> 2017-05-05 21:23:03.617  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-3]
> > > >>>> Creating producer client
> > > >>>> 2017-05-05 21:23:03.621  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-3]
> > > >>>> Creating consumer client
> > > >>>> 2017-05-05 21:23:03.625  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-3]
> > > >>>> Creating restore consumer client
> > > >>>> 2017-05-05 21:23:03.628  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-3]
> > > >>>> State transition from NOT_RUNNING to RUNNING.
> > > >>>> 2017-05-05 21:23:03.629  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Creating producer client
> > > >>>> 2017-05-05 21:23:03.632  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Creating consumer client
> > > >>>> 2017-05-05 21:23:03.635  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Creating restore consumer client
> > > >>>> 2017-05-05 21:23:03.638  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> State transition from NOT_RUNNING to RUNNING.
> > > >>>> 2017-05-05 21:23:03.639  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-5]
> > > >>>> Creating producer client
> > > >>>> 2017-05-05 21:23:03.641  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-5]
> > > >>>> Creating consumer client
> > > >>>> 2017-05-05 21:23:03.644  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-5]
> > > >>>> Creating restore consumer client
> > > >>>> 2017-05-05 21:23:03.647  INFO 71 --- [           main]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-5]
> > > >>>> State transition from NOT_RUNNING to RUNNING.
> > > >>>> 2017-05-05 21:23:03.790  INFO 71 --- [ StreamThread-1]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-1]
> > > >>>> Starting
> > > >>>> 2017-05-05 21:23:03.791  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Starting
> > > >>>> 2017-05-05 21:23:03.790  INFO 71 --- [ StreamThread-2]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-2]
> > > >>>> Starting
> > > >>>> 2017-05-05 21:23:03.791  INFO 71 --- [ StreamThread-3]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-3]
> > > >>>> Starting
> > > >>>> 2017-05-05 21:23:03.792  INFO 71 --- [ StreamThread-5]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-5]
> > > >>>> Starting
> > > >>>> 2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-1]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-1]
> > > >>>> at state RUNNING: partitions [] revoked at the beginning of
> consumer
> > > >>>> rebalance.
> > > >>>> 2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-2]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-2]
> > > >>>> at state RUNNING: partitions [] revoked at the beginning of
> consumer
> > > >>>> rebalance.
> > > >>>> 2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> at state RUNNING: partitions [] revoked at the beginning of
> consumer
> > > >>>> rebalance.
> > > >>>> 2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-1]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-1]
> > > >>>> State transition from RUNNING to PARTITIONS_REVOKED.
> > > >>>> 2017-05-05 21:23:03.966  INFO 71 --- [ StreamThread-3]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-3]
> > > >>>> at state RUNNING: partitions [] revoked at the beginning of
> consumer
> > > >>>> rebalance.
> > > >>>> 2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-2]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-2]
> > > >>>> State transition from RUNNING to PARTITIONS_REVOKED.
> > > >>>> 2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-5]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-5]
> > > >>>> at state RUNNING: partitions [] revoked at the beginning of
> consumer
> > > >>>> rebalance.
> > > >>>> 2017-05-05 21:23:03.967  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> State transition from RUNNING to PARTITIONS_REVOKED.
> > > >>>> 2017-05-05 21:23:03.968  INFO 71 --- [ StreamThread-3]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-3]
> > > >>>> State transition from RUNNING to PARTITIONS_REVOKED.
> > > >>>> 2017-05-05 21:23:03.968  INFO 71 --- [ StreamThread-5]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-5]
> > > >>>> State transition from RUNNING to PARTITIONS_REVOKED.
> > > >>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-2]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-2]
> > > >>>> Updating suspended tasks to contain active tasks []
> > > >>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Updating suspended tasks to contain active tasks []
> > > >>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-5]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-5]
> > > >>>> Updating suspended tasks to contain active tasks []
> > > >>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-1]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-1]
> > > >>>> Updating suspended tasks to contain active tasks []
> > > >>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-3]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-3]
> > > >>>> Updating suspended tasks to contain active tasks []
> > > >>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-2]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-2]
> > > >>>> Removing all active tasks []
> > > >>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Removing all active tasks []
> > > >>>> 2017-05-05 21:23:03.970  INFO 71 --- [ StreamThread-5]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-5]
> > > >>>> Removing all active tasks []
> > > >>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-1]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-1]
> > > >>>> Removing all active tasks []
> > > >>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-3]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-3]
> > > >>>> Removing all active tasks []
> > > >>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-2]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-2]
> > > >>>> Removing all standby tasks []
> > > >>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Removing all standby tasks []
> > > >>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-5]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-5]
> > > >>>> Removing all standby tasks []
> > > >>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-1]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-1]
> > > >>>> Removing all standby tasks []
> > > >>>> 2017-05-05 21:23:03.971  INFO 71 --- [ StreamThread-3]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-3]
> > > >>>> Removing all standby tasks []
> > > >>>> 2017-05-05 21:23:04.020  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Constructed client metadata
> > > >>>>
> {18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null,
> > > >>>> consumers=[<consumerlist>], state=[activeTasks: ([])
> assignedTasks:
> > > >> ([])
> > > >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 1.0 cost:
> > > >> 0.0]}}
> > > >>>> from the member subscriptions.
> > > >>>> 2017-05-05 21:23:04.218  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Completed validating internal topics in partition assignor
> > > >>>> 2017-05-05 21:23:04.591  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Completed validating internal topics in partition assignor
> > > >>>> 2017-05-05 21:23:04.726  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Assigned tasks to clients as
> > > >>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>])
> > > >>>> assignedTasks: ([<list>]) prevActiveTasks: ([]) prevAssignedTasks:
> > > ([])
> > > >>>> capacity: 1.0 cost: 100.0]}.
> > > >>>> 2017-05-05 21:23:04.742  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Constructed client metadata
> > > >>>>
> {18d6eae1-6fd5-4ccc-b535-49e392110253=ClientMetadata{hostInfo=null,
> > > >>>> consumers=[<consumerlist>], state=[activeTasks: ([])
> assignedTasks:
> > > >> ([])
> > > >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost:
> > > >> 0.0]}}
> > > >>>> from the member subscriptions.
> > > >>>> 2017-05-05 21:23:05.120  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Completed validating internal topics in partition assignor
> > > >>>> 2017-05-05 21:23:05.482  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Completed validating internal topics in partition assignor
> > > >>>> 2017-05-05 21:23:05.520  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Assigned tasks to clients as
> > > >>>> {18d6eae1-6fd5-4ccc-b535-49e392110253=[activeTasks: ([<list>])
> > > >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost:
> > > >> 50.0],
> > > >>>> da663a61-dada-478b-b060-78d77536530a=[activeTasks: ([<list>])
> > > >>>> prevActiveTasks: ([]) prevAssignedTasks: ([]) capacity: 5.0 cost:
> > > >> 50.0]}.
> > > >>>> 2017-05-05 21:23:05.553  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> at state PARTITIONS_REVOKED: new partitions [<partitionlist>]
> > assigned
> > > >> at
> > > >>>> the end of consumer rebalance.
> > > >>>> *// The above line is repeated for each thread*
> > > >>>> 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
> > > >>>> 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-5]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-5]
> > > >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
> > > >>>> 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-2]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-2]
> > > >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
> > > >>>> 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-1]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-1]
> > > >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
> > > >>>> 2017-05-05 21:23:05.554  INFO 71 --- [ StreamThread-3]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-3]
> > > >>>> State transition from PARTITIONS_REVOKED to ASSIGNING_PARTITIONS.
> > > >>>> *// omitted*
> > > >>>> 2017-05-05 21:23:15.596  INFO 71 --- [ StreamThread-2]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-2]
> > > >>>> State transition from ASSIGNING_PARTITIONS to RUNNING.
> > > >>>> *// above message repeated for each thread*
> > > >>>>
> > > >>>> *Important*: At this point only StreamThread-4 is performing
> commits
> > > >>>> every 10 seconds. The other threads output no logs. Now the fun
> > begins
> > > >>>>
> > > >>>> 2017-05-05 21:29:21.310  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> State transition from RUNNING to PARTITIONS_REVOKED.
> > > >>>> 2017-05-05 21:29:21.310  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Closing task's topology ... // repeated multiple times
> > > >>>> 2017-05-05 21:29:21.387  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Flushing state stores of task ... // repeated multiple times
> > > >>>> 2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Committing consumer offsets of task ... // repeated multiple times
> > > >>>> 2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Updating suspended tasks to contain active tasks [<list>]
> > > >>>> 2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Removing all active tasks [<list>]
> > > >>>> 2017-05-05 21:29:21.388  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Removing all standby tasks []
> > > >>>>
> > > >>>>
> > > >>>> At this point there are no more log messages for 16 minutes!!
> During
> > > >> this
> > > >>>> time I perform several threaddumps, almost every minute.
> > > >>>> Thread dump below. Do notice that thread 4 is the only different
> > one.
> > > >>>>
> > > >>>> "StreamThread-5" - Thread t@57
> > > >>>>   java.lang.Thread.State: BLOCKED
> > > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > > >> java:169)
> > > >>>>        - waiting to lock <653f9d6> (a
> > > >>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-1"
> > t@49
> > > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > > >> java:176)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread$
> > > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > > >> measureLatencyNs(StreamsMetricsImpl.java:190)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > ProcessorNode.punctuate(
> > > >> ProcessorNode.java:139)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> > > >> StreamTask.java:268)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > > >>>>        - locked <3fbbc5a8> (a java.util.PriorityQueue)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > StreamTask.maybePunctuate(
> > > >> StreamTask.java:251)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.maybePunctuate(StreamThread.java:751)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > >> StreamThread.java:633)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.run(StreamThread.java:361)
> > > >>>>
> > > >>>>   Locked ownable synchronizers:
> > > >>>>        - None
> > > >>>>
> > > >>>> "StreamThread-4" - Thread t@55
> > > >>>>   java.lang.Thread.State: RUNNABLE
> > > >>>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > > >>>>        at
> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
> > > >> 269)
> > > >>>>        at
> > > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> > > >>>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
> > > >> java:86)
> > > >>>>        - locked <5709c085> (a sun.nio.ch.Util$2)
> > > >>>>        - locked <1cacaaf> (a
> java.util.Collections$UnmodifiableSet)
> > > >>>>        - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl)
> > > >>>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> > > >>>>        at
> > > >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489)
> > > >>>>        at
> > > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298)
> > > >>>>        at
> > > >>>>
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(
> > > >> ConsumerNetworkClient.java:226)
> > > >>>>        - locked <639d53ee> (a
> > > >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(
> > > >> ConsumerNetworkClient.java:172)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > >> joinGroupIfNeeded(AbstractCoordinator.java:347)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > >> ensureActiveGroup(AbstractCoordinator.java:303)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.
> > ConsumerCoordinator.poll(
> > > >> ConsumerCoordinator.java:290)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.KafkaConsumer.
> > > >> pollOnce(KafkaConsumer.java:1029)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > >> KafkaConsumer.java:995)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > >> StreamThread.java:592)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.run(StreamThread.java:361)
> > > >>>>
> > > >>>>   Locked ownable synchronizers:
> > > >>>>        - None
> > > >>>>
> > > >>>> "StreamThread-3" - Thread t@53
> > > >>>>   java.lang.Thread.State: RUNNABLE
> > > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > > >> java:169)
> > > >>>>        - locked <653f9d6> (a org.apache.kafka.common.
> > metrics.Sensor)
> > > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > > >> java:176)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread$
> > > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > > >> measureLatencyNs(StreamsMetricsImpl.java:190)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > ProcessorNode.punctuate(
> > > >> ProcessorNode.java:139)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> > > >> StreamTask.java:268)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > > >>>>        - locked <7a09baf0> (a java.util.PriorityQueue)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > StreamTask.maybePunctuate(
> > > >> StreamTask.java:251)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.maybePunctuate(StreamThread.java:751)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > >> StreamThread.java:633)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.run(StreamThread.java:361)
> > > >>>>
> > > >>>>   Locked ownable synchronizers:
> > > >>>>        - None
> > > >>>>
> > > >>>> "StreamThread-2" - Thread t@51
> > > >>>>   java.lang.Thread.State: BLOCKED
> > > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > > >> java:169)
> > > >>>>        - waiting to lock <653f9d6> (a
> > > >>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-1"
> > t@49
> > > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > > >> java:176)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread$
> > > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > > >> measureLatencyNs(StreamsMetricsImpl.java:190)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > ProcessorNode.punctuate(
> > > >> ProcessorNode.java:139)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> > > >> StreamTask.java:268)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > > >>>>        - locked <2dc188ac> (a java.util.PriorityQueue)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > StreamTask.maybePunctuate(
> > > >> StreamTask.java:251)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.maybePunctuate(StreamThread.java:751)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > >> StreamThread.java:633)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.run(StreamThread.java:361)
> > > >>>>
> > > >>>>   Locked ownable synchronizers:
> > > >>>>        - None
> > > >>>>
> > > >>>> "StreamThread-1" - Thread t@49
> > > >>>>   java.lang.Thread.State: RUNNABLE
> > > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > > >> java:172)
> > > >>>>        - locked <653f9d6> (a org.apache.kafka.common.
> > metrics.Sensor)
> > > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > > >> java:176)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread$
> > > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > > >> measureLatencyNs(StreamsMetricsImpl.java:190)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > ProcessorNode.punctuate(
> > > >> ProcessorNode.java:139)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> > > >> StreamTask.java:268)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > > >>>>        - locked <7dffc3aa> (a java.util.PriorityQueue)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > StreamTask.maybePunctuate(
> > > >> StreamTask.java:251)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.maybePunctuate(StreamThread.java:751)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > >> StreamThread.java:633)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.run(StreamThread.java:361)
> > > >>>>
> > > >>>>   Locked ownable synchronizers:
> > > >>>>        - None
> > > >>>>
> > > >>>>
> > > >>>> (no logs omitted, ~16 minutes later)
> > > >>>> 2017-05-05 21:45:05.270  INFO 71 --- [ StreamThread-1]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-1]
> > > >>>> Committing all tasks because the commit interval 10000ms has
> elapsed
> > > >>>> 2017-05-05 21:45:05.270  INFO 71 --- [ StreamThread-1]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-1]
> > > >>>> Committing task StreamTask .. // repeated multiple times
> > > >>>> 2017-05-05 21:45:05.379  INFO 71 --- [ StreamThread-1]
> > > >>>> o.a.k.s.p.internals.StreamThread         : stream-thread
> > > >> [StreamThread-1]
> > > >>>> State transition from RUNNING to PARTITIONS_REVOKED.
> > > >>>> *// The above is repeated for threads 2, 3 and 5*
> > > >>>> (no logs omitted!! This is really the next entry, ~10 minutes
> later)
> > > >>>> 2017-05-05 21:55:16.835  INFO 71 --- [ StreamThread-4]
> > > >>>> o.a.k.s.p.i.StreamPartitionAssignor      : stream-thread
> > > >> [StreamThread-4]
> > > >>>> Constructed client metadata ...
> > > >>>>
> > > >>>> During the above 10 minutes all threads show the following
> > > >>>> "StreamThread-5" - Thread t@57
> > > >>>>   java.lang.Thread.State: RUNNABLE
> > > >>>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > > >>>>        at
> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
> > > >> 269)
> > > >>>>        at
> > > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> > > >>>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
> > > >> java:86)
> > > >>>>        - locked <4a60e0dd> (a sun.nio.ch.Util$2)
> > > >>>>        - locked <5e54059f> (a java.util.Collections$
> > UnmodifiableSet)
> > > >>>>        - locked <60693986> (a sun.nio.ch.EPollSelectorImpl)
> > > >>>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> > > >>>>        at
> > > >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489)
> > > >>>>        at
> > > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298)
> > > >>>>        at
> > > >>>>
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(
> > > >> ConsumerNetworkClient.java:226)
> > > >>>>        - locked <5f1ae6c1> (a
> > > >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(
> > > >> ConsumerNetworkClient.java:172)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > >> joinGroupIfNeeded(AbstractCoordinator.java:347)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > >> ensureActiveGroup(AbstractCoordinator.java:303)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.
> > ConsumerCoordinator.poll(
> > > >> ConsumerCoordinator.java:290)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.KafkaConsumer.
> > > >> pollOnce(KafkaConsumer.java:1029)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > >> KafkaConsumer.java:995)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > >> StreamThread.java:592)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.run(StreamThread.java:361)
> > > >>>>
> > > >>>>   Locked ownable synchronizers:
> > > >>>>        - None
> > > >>>>
> > > >>>> "StreamThread-4" - Thread t@55
> > > >>>>   java.lang.Thread.State: RUNNABLE
> > > >>>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > > >>>>        at
> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
> > > >> 269)
> > > >>>>        at
> > > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> > > >>>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
> > > >> java:86)
> > > >>>>        - locked <5709c085> (a sun.nio.ch.Util$2)
> > > >>>>        - locked <1cacaaf> (a
> java.util.Collections$UnmodifiableSet)
> > > >>>>        - locked <26a408e> (a sun.nio.ch.EPollSelectorImpl)
> > > >>>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> > > >>>>        at
> > > >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489)
> > > >>>>        at
> > > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298)
> > > >>>>        at
> > > >>>>
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(
> > > >> ConsumerNetworkClient.java:226)
> > > >>>>        - locked <639d53ee> (a
> > > >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(
> > > >> ConsumerNetworkClient.java:172)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > >> joinGroupIfNeeded(AbstractCoordinator.java:347)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > >> ensureActiveGroup(AbstractCoordinator.java:303)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.
> > ConsumerCoordinator.poll(
> > > >> ConsumerCoordinator.java:290)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.KafkaConsumer.
> > > >> pollOnce(KafkaConsumer.java:1029)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > >> KafkaConsumer.java:995)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > >> StreamThread.java:592)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.run(StreamThread.java:361)
> > > >>>>
> > > >>>>   Locked ownable synchronizers:
> > > >>>>        - None
> > > >>>>
> > > >>>> "StreamThread-3" - Thread t@53
> > > >>>>   java.lang.Thread.State: RUNNABLE
> > > >>>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > > >>>>        at
> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
> > > >> 269)
> > > >>>>        at
> > > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> > > >>>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
> > > >> java:86)
> > > >>>>        - locked <123193f7> (a sun.nio.ch.Util$2)
> > > >>>>        - locked <6c3704d3> (a java.util.Collections$
> > UnmodifiableSet)
> > > >>>>        - locked <45bbb5da> (a sun.nio.ch.EPollSelectorImpl)
> > > >>>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> > > >>>>        at
> > > >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489)
> > > >>>>        at
> > > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298)
> > > >>>>        at
> > > >>>>
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(
> > > >> ConsumerNetworkClient.java:226)
> > > >>>>        - locked <4d9f6f42> (a
> > > >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(
> > > >> ConsumerNetworkClient.java:172)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > >> joinGroupIfNeeded(AbstractCoordinator.java:347)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > >> ensureActiveGroup(AbstractCoordinator.java:303)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.
> > ConsumerCoordinator.poll(
> > > >> ConsumerCoordinator.java:290)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.KafkaConsumer.
> > > >> pollOnce(KafkaConsumer.java:1029)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > >> KafkaConsumer.java:995)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > >> StreamThread.java:592)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.run(StreamThread.java:361)
> > > >>>>
> > > >>>>   Locked ownable synchronizers:
> > > >>>>        - None
> > > >>>>
> > > >>>> "StreamThread-2" - Thread t@51
> > > >>>>   java.lang.Thread.State: RUNNABLE
> > > >>>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > > >>>>        at
> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
> > > >> 269)
> > > >>>>        at
> > > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> > > >>>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
> > > >> java:86)
> > > >>>>        - locked <532ff32d> (a sun.nio.ch.Util$2)
> > > >>>>        - locked <76a6407> (a
> java.util.Collections$UnmodifiableSet)
> > > >>>>        - locked <1f670455> (a sun.nio.ch.EPollSelectorImpl)
> > > >>>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> > > >>>>        at
> > > >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489)
> > > >>>>        at
> > > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298)
> > > >>>>        at
> > > >>>>
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(
> > > >> ConsumerNetworkClient.java:226)
> > > >>>>        - locked <29b48d84> (a
> > > >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(
> > > >> ConsumerNetworkClient.java:172)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > >> joinGroupIfNeeded(AbstractCoordinator.java:347)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > >> ensureActiveGroup(AbstractCoordinator.java:303)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.
> > ConsumerCoordinator.poll(
> > > >> ConsumerCoordinator.java:290)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.KafkaConsumer.
> > > >> pollOnce(KafkaConsumer.java:1029)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > >> KafkaConsumer.java:995)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > >> StreamThread.java:592)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.run(StreamThread.java:361)
> > > >>>>
> > > >>>>   Locked ownable synchronizers:
> > > >>>>        - None
> > > >>>>
> > > >>>> "StreamThread-1" - Thread t@49
> > > >>>>   java.lang.Thread.State: RUNNABLE
> > > >>>>        at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
> > > >>>>        at
> sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:
> > > >> 269)
> > > >>>>        at
> > > >>> sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:93)
> > > >>>>        at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.
> > > >> java:86)
> > > >>>>        - locked <5aeb504> (a sun.nio.ch.Util$2)
> > > >>>>        - locked <5130a3ea> (a java.util.Collections$
> > UnmodifiableSet)
> > > >>>>        - locked <76d25035> (a sun.nio.ch.EPollSelectorImpl)
> > > >>>>        at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:97)
> > > >>>>        at
> > > >>>> org.apache.kafka.common.network.Selector.select(Selector.java:489)
> > > >>>>        at
> > > >>> org.apache.kafka.common.network.Selector.poll(Selector.java:298)
> > > >>>>        at
> > > >>>>
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:349)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(
> > > >> ConsumerNetworkClient.java:226)
> > > >>>>        - locked <7b072bc6> (a
> > > >>>> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.
> > ConsumerNetworkClient.poll(
> > > >> ConsumerNetworkClient.java:172)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > >> joinGroupIfNeeded(AbstractCoordinator.java:347)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.
> > > >> ensureActiveGroup(AbstractCoordinator.java:303)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.internals.
> > ConsumerCoordinator.poll(
> > > >> ConsumerCoordinator.java:290)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.KafkaConsumer.
> > > >> pollOnce(KafkaConsumer.java:1029)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.clients.consumer.KafkaConsumer.poll(
> > > >> KafkaConsumer.java:995)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > >> StreamThread.java:592)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.run(StreamThread.java:361)
> > > >>>>
> > > >>>>   Locked ownable synchronizers:
> > > >>>>        - None
> > > >>>>
> > > >>>> Eventually they get assigned partitions again, then they are
> > revoked,
> > > >>>> another long time passes, threads 1, 2, 3 and 5 stuck on Sensor
> and
> > we
> > > >>> get
> > > >>>> into the same situation.
> > > >>>>
> > > >>>> Finally, I tried starting up only 1 instance (with 5 threads).
> > Current
> > > >>>> status:
> > > >>>>
> > > >>>> "StreamThread-5" - Thread t@56
> > > >>>>   java.lang.Thread.State: RUNNABLE
> > > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > > >> java:169)
> > > >>>>        - locked <6a55b7c6> (a org.apache.kafka.common.
> > metrics.Sensor)
> > > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > > >> java:176)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread$
> > > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > > >> measureLatencyNs(StreamsMetricsImpl.java:190)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > ProcessorNode.punctuate(
> > > >> ProcessorNode.java:139)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> > > >> StreamTask.java:268)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > > >>>>        - locked <5bea9b1b> (a java.util.PriorityQueue)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > StreamTask.maybePunctuate(
> > > >> StreamTask.java:251)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.maybePunctuate(StreamThread.java:751)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > >> StreamThread.java:633)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.run(StreamThread.java:361)
> > > >>>>
> > > >>>>   Locked ownable synchronizers:
> > > >>>>        - None
> > > >>>>
> > > >>>> "StreamThread-4" - Thread t@54
> > > >>>>   java.lang.Thread.State: RUNNABLE
> > > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > > >> java:169)
> > > >>>>        - locked <6a55b7c6> (a org.apache.kafka.common.
> > metrics.Sensor)
> > > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > > >> java:176)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread$
> > > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > > >> measureLatencyNs(StreamsMetricsImpl.java:190)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > ProcessorNode.punctuate(
> > > >> ProcessorNode.java:139)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> > > >> StreamTask.java:268)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > > >>>>        - locked <5a06855> (a java.util.PriorityQueue)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > StreamTask.maybePunctuate(
> > > >> StreamTask.java:251)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.maybePunctuate(StreamThread.java:751)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > >> StreamThread.java:633)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.run(StreamThread.java:361)
> > > >>>>
> > > >>>>   Locked ownable synchronizers:
> > > >>>>        - None
> > > >>>>
> > > >>>> "StreamThread-3" - Thread t@52
> > > >>>>   java.lang.Thread.State: BLOCKED
> > > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > > >> java:169)
> > > >>>>        - waiting to lock <6a55b7c6> (a
> > > >>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-5"
> > t@56
> > > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > > >> java:176)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread$
> > > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > > >> measureLatencyNs(StreamsMetricsImpl.java:190)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > ProcessorNode.punctuate(
> > > >> ProcessorNode.java:139)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> > > >> StreamTask.java:268)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > > >>>>        - locked <d3a57bc> (a java.util.PriorityQueue)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > StreamTask.maybePunctuate(
> > > >> StreamTask.java:251)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.maybePunctuate(StreamThread.java:751)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > >> StreamThread.java:633)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.run(StreamThread.java:361)
> > > >>>>
> > > >>>>   Locked ownable synchronizers:
> > > >>>>        - None
> > > >>>>
> > > >>>> "StreamThread-2" - Thread t@50
> > > >>>>   java.lang.Thread.State: RUNNABLE
> > > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > > >> java:175)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread$
> > > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > > >> measureLatencyNs(StreamsMetricsImpl.java:190)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > ProcessorNode.punctuate(
> > > >> ProcessorNode.java:139)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> > > >> StreamTask.java:268)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > > >>>>        - locked <480f4efc> (a java.util.PriorityQueue)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > StreamTask.maybePunctuate(
> > > >> StreamTask.java:251)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.maybePunctuate(StreamThread.java:751)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > >> StreamThread.java:633)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.run(StreamThread.java:361)
> > > >>>>
> > > >>>>   Locked ownable synchronizers:
> > > >>>>        - None
> > > >>>>
> > > >>>> "StreamThread-1" - Thread t@48
> > > >>>>   java.lang.Thread.State: BLOCKED
> > > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > > >> java:169)
> > > >>>>        - waiting to lock <6a55b7c6> (a
> > > >>>> org.apache.kafka.common.metrics.Sensor) owned by "StreamThread-5"
> > t@56
> > > >>>>        at org.apache.kafka.common.metrics.Sensor.record(Sensor.
> > > >> java:176)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread$
> > > >> StreamsMetricsThreadImpl.recordLatency(StreamThread.java:1184)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.
> > > >> measureLatencyNs(StreamsMetricsImpl.java:190)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > ProcessorNode.punctuate(
> > > >> ProcessorNode.java:139)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamTask.punctuate(
> > > >> StreamTask.java:268)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> PunctuationQueue.mayPunctuate(PunctuationQueue.java:45)
> > > >>>>        - locked <47b226a5> (a java.util.PriorityQueue)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > StreamTask.maybePunctuate(
> > > >> StreamTask.java:251)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.maybePunctuate(StreamThread.java:751)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(
> > > >> StreamThread.java:633)
> > > >>>>        at
> > > >>>>
> > > >>> org.apache.kafka.streams.processor.internals.
> > > >> StreamThread.run(StreamThread.java:361)
> > > >>>>
> > > >>>>   Locked ownable synchronizers:
> > > >>>>        - None
> > > >>>>
> > > >>>> This has been going on for over 40 minutes now and the cluster
> does
> > > not
> > > >>>> stabilize. Not sure what to do here, any help welcome.
> > > >>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message