kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: more uniform task assignment across kafka stream nodes
Date Mon, 27 Mar 2017 23:15:07 GMT
Great!

So overall, the issue is not related to task assignment. Also the
description below, does not indicate that different task assignment
would change anything.


-Matthias

On 3/27/17 3:08 PM, Ara Ebrahimi wrote:
> Let me clarify, cause I think we’re using different terminologies:
> 
> - message key is phone number, reversed
> - all call records for a phone number land on the same partition
> - then we apply a session window on them and aggregate+reduce
> - so we end up with a group of records for a phone number. This group is literally an
avro object with an array of records in it for the session.
> - since records arrive chronologically and since we have a session window, then all call
records for a phone number end up in the same partition and session (intended behavior). We
can easily have many such phone call record groups with 100s of call records in them. The
aggregate object (the avro object with array of records in it) can get updated 100s of times
for the same phone number in the course of an hour or so.
> - we process billions of such call records a day
> - we can’t expect our clients to install massive clusters of 10s or 100s of nodes.
We do need to make sure this processing is as efficient as possible. The session window bug
was killing us. Much better with the fix Damian provided!
> 
> Ara.
> 
> On Mar 27, 2017, at 2:41 PM, Matthias J. Sax <matthias@confluent.io<mailto:matthias@confluent.io>>
wrote:
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, proprietary,
or otherwise confidential information. If you have received it in error, please notify the
sender immediately and delete the original. Any other use of the e-mail by you is prohibited.
Thank you in advance for your cooperation.
> 
> ________________________________
> 
> From: "Matthias J. Sax" <matthias@confluent.io<mailto:matthias@confluent.io>>
> Subject: Re: more uniform task assignment across kafka stream nodes
> Date: March 27, 2017 at 2:41:06 PM PDT
> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>
> 
> 
> Ara,
> 
> I assume your performance issue is most likely related to the fix Damian
> pointed out already.
> 
> Couple of follow up comments:
> 
> critical part of our pipeline involves grouping relevant records together
> 
> Can you explain this a little better? The abstraction of a task does
> group data together already. Furthermore, if you get multiple tasks,
> those are independent units with regard to grouping as consecutive tasks
> are "connected" via repartitioning steps. Thus, even if we apply the
> task assignment as you ask for, I am not sure if this would change
> anything? Maybe you can give a small data example what
> behavior/data-co-location you need and what Streams provides for you.
> 
> And for hot keys this can lead to sometimes 100s of records to get grouped together
> 
> This is independent of task placement -- it a partitioning issue. If you
> want to work on that, you can provide a custom `Partioner` for the used
> Kafka producers (note, your external Producer writing to your Streams
> input topic might already generate "hot" keys, so you might need to use
> a custom partitioner there, too)
> 
> Also "100s of records" does not sound much to me. Streams can process
> multiple hundredths of thousandth records per thread. That is the
> reason, why I think that the fix Damian pointed out will most likely fix
> your problem.
> 
> 
> 
> -Matthias
> 
> 
> On 3/27/17 1:56 PM, Ara Ebrahimi wrote:
> Thanks for the response Mathias!
> 
> The reason we want this exact task assignment to happen is that a critical part of our
pipeline involves grouping relevant records together (that’s what the aggregate function
in the topology is for). And for hot keys this can lead to sometimes 100s of records to get
grouped together. Even worse, these records are session bound, we use session windows. Hence
we see lots of activity around the store backing the aggregate function and even though we
use SSD drives we’re not seeing the kind of performance we want to see. It seems like the
aggregate function leads to lots of updates to these hot keys which lead to lots of rocksdb
activity.
> 
> Now there are many ways to fix this problem:
> - just don’t aggregate, create an algorithm which is not reliant on grouping/aggregating
records. Not what we can do with our tight schedule right now.
> - do grouping/aggregating but employ n instances and rely on uniform distribution of
these tasks. This is the easiest solution and what we expected to work but didn’t work as
you can tell from this thread. We threw 4 instances at it but only 2 got used.
> - tune rocksdb? I tried this actually but it didn’t really help us much, aside from
the fact that tuning rocksdb is very tricky.
> - use in-memory store instead? Unfortunately we have to use session windows for this
aggregate function and apparently there’s no in-memory session store impl? I tried to create
one but soon realized it’s too much work :) I looked at default PartitionAssigner code too,
but that ain’t trivial either.
> 
> So I’m a bit hopeless :(
> 
> Ara.
> 
> On Mar 27, 2017, at 1:35 PM, Matthias J. Sax <matthias@confluent.io<mailto:matthias@confluent.io><mailto:matthias@confluent.io>>
wrote:
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, proprietary,
or otherwise confidential information. If you have received it in error, please notify the
sender immediately and delete the original. Any other use of the e-mail by you is prohibited.
Thank you in advance for your cooperation.
> 
> ________________________________
> 
> From: "Matthias J. Sax" <matthias@confluent.io<mailto:matthias@confluent.io><mailto:matthias@confluent.io>>
> Subject: Re: more uniform task assignment across kafka stream nodes
> Date: March 27, 2017 at 1:35:30 PM PDT
> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>
> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>>
> 
> 
> Ara,
> 
> thanks for the detailed information.
> 
> If I parse this correctly, both instances run the same number of tasks
> (12 each). That is all Streams promises.
> 
> To come back to your initial question:
> 
> Is there a way to tell kafka streams to uniformly assign partitions across instances?
If I have n kafka streams instances running, I want each to handle EXACTLY 1/nth number of
partitions. No dynamic task assignment logic. Just dumb 1/n assignment.
> 
> That is exactly what you get: each of you two instances get 24/2 = 12
> tasks assigned. That is dump 1/n assignment, isn't it? So my original
> response was correct.
> 
> However, I now understand better what you are actually meaning by your
> question. Note that Streams does not distinguish "type" of tasks -- it
> only sees 24 tasks and assigns those in a balanced way.
> 
> Thus, currently there is no easy way to get the assignment you want to
> have, except, you implement you own `PartitionAssignor`.
> 
> This is the current implementation for 0.10.2
> https://github.com/apache/kafka/blob/0.10.2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
> 
> You can, if you wish write your own assignor and set it via
> StreamsConfig. However, be aware that this might be tricky to get right
> and also might have runtime implications with regard to rebalancing and
> state store recovery. We recently improve the current implementation to
> avoid costly task movements:
> https://issues.apache.org/jira/browse/KAFKA-4677
> 
> Thus, I would not recommend to implement an own `PartitionAssignor`.
> 
> 
> However, the root question is, why do you need this exact assignment you
> are looking for in the first place? Why is it "bad" if "types" of tasks
> are not distinguished? I would like to understand your requirement
> better -- it might be worth to improve Streams here.
> 
> 
> -Matthias
> 
> 
> On 3/27/17 12:57 PM, Ara Ebrahimi wrote:
> Hi,
> 
> So, I simplified the topology by making sure we have only 1 source topic. Now I have
1 source topic, 8 partitions, 2 instances. And here’s how the topology looks like:
> 
> instance 1:
> 
> KafkaStreams processID: 48b58bc0-f600-4ec8-bc92-8cb3ea081aac
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-1
> Active tasks:
> StreamsTask taskId: 0_3
> ProcessorTopology:
> KSTREAM-SOURCE-0000000000:
> topics: [activities-avro-or]
> children: [KSTREAM-FILTER-0000000001]
> KSTREAM-FILTER-0000000001:
> children: [KSTREAM-MAP-0000000002]
> KSTREAM-MAP-0000000002:
> children: [KSTREAM-BRANCH-0000000003]
> KSTREAM-BRANCH-0000000003:
> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
> KSTREAM-BRANCHCHILD-0000000004:
> children: [KSTREAM-MAPVALUES-0000000006]
> KSTREAM-MAPVALUES-0000000006:
> children: [KSTREAM-FLATMAPVALUES-0000000007]
> KSTREAM-FLATMAPVALUES-0000000007:
> children: [KSTREAM-MAP-0000000008]
> KSTREAM-MAP-0000000008:
> children: [KSTREAM-FILTER-0000000011]
> KSTREAM-FILTER-0000000011:
> children: [KSTREAM-SINK-0000000010]
> KSTREAM-SINK-0000000010:
> topic: activities-by-phone-store-or-repartition
> KSTREAM-BRANCHCHILD-0000000005:
> Partitions [activities-avro-or-3]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-2
> Active tasks:
> StreamsTask taskId: 1_2
> ProcessorTopology:
> KSTREAM-SOURCE-0000000012:
> topics: [activities-by-phone-store-or-repartition]
> children: [KSTREAM-AGGREGATE-0000000009]
> KSTREAM-AGGREGATE-0000000009:
> states: [activities-by-phone-store-or]
> children: [KTABLE-TOSTREAM-0000000013]
> KTABLE-TOSTREAM-0000000013:
> children: [KSTREAM-FILTER-0000000014]
> KSTREAM-FILTER-0000000014:
> children: [KSTREAM-FILTER-0000000015]
> KSTREAM-FILTER-0000000015:
> children: [KSTREAM-MAP-0000000016]
> KSTREAM-MAP-0000000016:
> children: [KSTREAM-MAP-0000000017]
> KSTREAM-MAP-0000000017:
> children: [KSTREAM-SINK-0000000018]
> KSTREAM-SINK-0000000018:
> topic: ml-features-avro-or
> Partitions [activities-by-phone-store-or-repartition-2]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-3
> Active tasks:
> StreamsTask taskId: 1_1
> ProcessorTopology:
> KSTREAM-SOURCE-0000000012:
> topics: [activities-by-phone-store-or-repartition]
> children: [KSTREAM-AGGREGATE-0000000009]
> KSTREAM-AGGREGATE-0000000009:
> states: [activities-by-phone-store-or]
> children: [KTABLE-TOSTREAM-0000000013]
> KTABLE-TOSTREAM-0000000013:
> children: [KSTREAM-FILTER-0000000014]
> KSTREAM-FILTER-0000000014:
> children: [KSTREAM-FILTER-0000000015]
> KSTREAM-FILTER-0000000015:
> children: [KSTREAM-MAP-0000000016]
> KSTREAM-MAP-0000000016:
> children: [KSTREAM-MAP-0000000017]
> KSTREAM-MAP-0000000017:
> children: [KSTREAM-SINK-0000000018]
> KSTREAM-SINK-0000000018:
> topic: ml-features-avro-or
> Partitions [activities-by-phone-store-or-repartition-1]
> StreamsTask taskId: 2_7
> ProcessorTopology:
> KSTREAM-SOURCE-0000000019:
> topics: [ml-features-avro-or]
> Partitions [ml-features-avro-or-7]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-4
> Active tasks:
> StreamsTask taskId: 2_0
> ProcessorTopology:
> KSTREAM-SOURCE-0000000019:
> topics: [ml-features-avro-or]
> Partitions [ml-features-avro-or-0]
> StreamsTask taskId: 2_6
> ProcessorTopology:
> KSTREAM-SOURCE-0000000019:
> topics: [ml-features-avro-or]
> Partitions [ml-features-avro-or-6]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-5
> Active tasks:
> StreamsTask taskId: 0_0
> ProcessorTopology:
> KSTREAM-SOURCE-0000000000:
> topics: [activities-avro-or]
> children: [KSTREAM-FILTER-0000000001]
> KSTREAM-FILTER-0000000001:
> children: [KSTREAM-MAP-0000000002]
> KSTREAM-MAP-0000000002:
> children: [KSTREAM-BRANCH-0000000003]
> KSTREAM-BRANCH-0000000003:
> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
> KSTREAM-BRANCHCHILD-0000000004:
> children: [KSTREAM-MAPVALUES-0000000006]
> KSTREAM-MAPVALUES-0000000006:
> children: [KSTREAM-FLATMAPVALUES-0000000007]
> KSTREAM-FLATMAPVALUES-0000000007:
> children: [KSTREAM-MAP-0000000008]
> KSTREAM-MAP-0000000008:
> children: [KSTREAM-FILTER-0000000011]
> KSTREAM-FILTER-0000000011:
> children: [KSTREAM-SINK-0000000010]
> KSTREAM-SINK-0000000010:
> topic: activities-by-phone-store-or-repartition
> KSTREAM-BRANCHCHILD-0000000005:
> Partitions [activities-avro-or-0]
> StreamsTask taskId: 1_6
> ProcessorTopology:
> KSTREAM-SOURCE-0000000012:
> topics: [activities-by-phone-store-or-repartition]
> children: [KSTREAM-AGGREGATE-0000000009]
> KSTREAM-AGGREGATE-0000000009:
> states: [activities-by-phone-store-or]
> children: [KTABLE-TOSTREAM-0000000013]
> KTABLE-TOSTREAM-0000000013:
> children: [KSTREAM-FILTER-0000000014]
> KSTREAM-FILTER-0000000014:
> children: [KSTREAM-FILTER-0000000015]
> KSTREAM-FILTER-0000000015:
> children: [KSTREAM-MAP-0000000016]
> KSTREAM-MAP-0000000016:
> children: [KSTREAM-MAP-0000000017]
> KSTREAM-MAP-0000000017:
> children: [KSTREAM-SINK-0000000018]
> KSTREAM-SINK-0000000018:
> topic: ml-features-avro-or
> Partitions [activities-by-phone-store-or-repartition-6]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-6
> Active tasks:
> StreamsTask taskId: 1_0
> ProcessorTopology:
> KSTREAM-SOURCE-0000000012:
> topics: [activities-by-phone-store-or-repartition]
> children: [KSTREAM-AGGREGATE-0000000009]
> KSTREAM-AGGREGATE-0000000009:
> states: [activities-by-phone-store-or]
> children: [KTABLE-TOSTREAM-0000000013]
> KTABLE-TOSTREAM-0000000013:
> children: [KSTREAM-FILTER-0000000014]
> KSTREAM-FILTER-0000000014:
> children: [KSTREAM-FILTER-0000000015]
> KSTREAM-FILTER-0000000015:
> children: [KSTREAM-MAP-0000000016]
> KSTREAM-MAP-0000000016:
> children: [KSTREAM-MAP-0000000017]
> KSTREAM-MAP-0000000017:
> children: [KSTREAM-SINK-0000000018]
> KSTREAM-SINK-0000000018:
> topic: ml-features-avro-or
> Partitions [activities-by-phone-store-or-repartition-0]
> StreamsTask taskId: 0_7
> ProcessorTopology:
> KSTREAM-SOURCE-0000000000:
> topics: [activities-avro-or]
> children: [KSTREAM-FILTER-0000000001]
> KSTREAM-FILTER-0000000001:
> children: [KSTREAM-MAP-0000000002]
> KSTREAM-MAP-0000000002:
> children: [KSTREAM-BRANCH-0000000003]
> KSTREAM-BRANCH-0000000003:
> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
> KSTREAM-BRANCHCHILD-0000000004:
> children: [KSTREAM-MAPVALUES-0000000006]
> KSTREAM-MAPVALUES-0000000006:
> children: [KSTREAM-FLATMAPVALUES-0000000007]
> KSTREAM-FLATMAPVALUES-0000000007:
> children: [KSTREAM-MAP-0000000008]
> KSTREAM-MAP-0000000008:
> children: [KSTREAM-FILTER-0000000011]
> KSTREAM-FILTER-0000000011:
> children: [KSTREAM-SINK-0000000010]
> KSTREAM-SINK-0000000010:
> topic: activities-by-phone-store-or-repartition
> KSTREAM-BRANCHCHILD-0000000005:
> Partitions [activities-avro-or-7]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-7
> Active tasks:
> StreamsTask taskId: 2_4
> ProcessorTopology:
> KSTREAM-SOURCE-0000000019:
> topics: [ml-features-avro-or]
> Partitions [ml-features-avro-or-4]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-8
> Active tasks:
> StreamsTask taskId: 1_3
> ProcessorTopology:
> KSTREAM-SOURCE-0000000012:
> topics: [activities-by-phone-store-or-repartition]
> children: [KSTREAM-AGGREGATE-0000000009]
> KSTREAM-AGGREGATE-0000000009:
> states: [activities-by-phone-store-or]
> children: [KTABLE-TOSTREAM-0000000013]
> KTABLE-TOSTREAM-0000000013:
> children: [KSTREAM-FILTER-0000000014]
> KSTREAM-FILTER-0000000014:
> children: [KSTREAM-FILTER-0000000015]
> KSTREAM-FILTER-0000000015:
> children: [KSTREAM-MAP-0000000016]
> KSTREAM-MAP-0000000016:
> children: [KSTREAM-MAP-0000000017]
> KSTREAM-MAP-0000000017:
> children: [KSTREAM-SINK-0000000018]
> KSTREAM-SINK-0000000018:
> topic: ml-features-avro-or
> Partitions [activities-by-phone-store-or-repartition-3]
> Standby tasks:
> 
> 
> instance 2:
> 
> KafkaStreams processID: 092072f8-87be-4989-a94f-0ed544f5ca44
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-1
> Active tasks:
> StreamsTask taskId: 2_1
> ProcessorTopology:
> KSTREAM-SOURCE-0000000019:
> topics: [ml-features-avro-or]
> Partitions [ml-features-avro-or-1]
> StreamsTask taskId: 2_5
> ProcessorTopology:
> KSTREAM-SOURCE-0000000019:
> topics: [ml-features-avro-or]
> Partitions [ml-features-avro-or-5]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-2
> Active tasks:
> StreamsTask taskId: 0_4
> ProcessorTopology:
> KSTREAM-SOURCE-0000000000:
> topics: [activities-avro-or]
> children: [KSTREAM-FILTER-0000000001]
> KSTREAM-FILTER-0000000001:
> children: [KSTREAM-MAP-0000000002]
> KSTREAM-MAP-0000000002:
> children: [KSTREAM-BRANCH-0000000003]
> KSTREAM-BRANCH-0000000003:
> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
> KSTREAM-BRANCHCHILD-0000000004:
> children: [KSTREAM-MAPVALUES-0000000006]
> KSTREAM-MAPVALUES-0000000006:
> children: [KSTREAM-FLATMAPVALUES-0000000007]
> KSTREAM-FLATMAPVALUES-0000000007:
> children: [KSTREAM-MAP-0000000008]
> KSTREAM-MAP-0000000008:
> children: [KSTREAM-FILTER-0000000011]
> KSTREAM-FILTER-0000000011:
> children: [KSTREAM-SINK-0000000010]
> KSTREAM-SINK-0000000010:
> topic: activities-by-phone-store-or-repartition
> KSTREAM-BRANCHCHILD-0000000005:
> Partitions [activities-avro-or-4]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-3
> Active tasks:
> StreamsTask taskId: 2_2
> ProcessorTopology:
> KSTREAM-SOURCE-0000000019:
> topics: [ml-features-avro-or]
> Partitions [ml-features-avro-or-2]
> StreamsTask taskId: 1_7
> ProcessorTopology:
> KSTREAM-SOURCE-0000000012:
> topics: [activities-by-phone-store-or-repartition]
> children: [KSTREAM-AGGREGATE-0000000009]
> KSTREAM-AGGREGATE-0000000009:
> states: [activities-by-phone-store-or]
> children: [KTABLE-TOSTREAM-0000000013]
> KTABLE-TOSTREAM-0000000013:
> children: [KSTREAM-FILTER-0000000014]
> KSTREAM-FILTER-0000000014:
> children: [KSTREAM-FILTER-0000000015]
> KSTREAM-FILTER-0000000015:
> children: [KSTREAM-MAP-0000000016]
> KSTREAM-MAP-0000000016:
> children: [KSTREAM-MAP-0000000017]
> KSTREAM-MAP-0000000017:
> children: [KSTREAM-SINK-0000000018]
> KSTREAM-SINK-0000000018:
> topic: ml-features-avro-or
> Partitions [activities-by-phone-store-or-repartition-7]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-4
> Active tasks:
> StreamsTask taskId: 2_3
> ProcessorTopology:
> KSTREAM-SOURCE-0000000019:
> topics: [ml-features-avro-or]
> Partitions [ml-features-avro-or-3]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-5
> Active tasks:
> StreamsTask taskId: 0_1
> ProcessorTopology:
> KSTREAM-SOURCE-0000000000:
> topics: [activities-avro-or]
> children: [KSTREAM-FILTER-0000000001]
> KSTREAM-FILTER-0000000001:
> children: [KSTREAM-MAP-0000000002]
> KSTREAM-MAP-0000000002:
> children: [KSTREAM-BRANCH-0000000003]
> KSTREAM-BRANCH-0000000003:
> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
> KSTREAM-BRANCHCHILD-0000000004:
> children: [KSTREAM-MAPVALUES-0000000006]
> KSTREAM-MAPVALUES-0000000006:
> children: [KSTREAM-FLATMAPVALUES-0000000007]
> KSTREAM-FLATMAPVALUES-0000000007:
> children: [KSTREAM-MAP-0000000008]
> KSTREAM-MAP-0000000008:
> children: [KSTREAM-FILTER-0000000011]
> KSTREAM-FILTER-0000000011:
> children: [KSTREAM-SINK-0000000010]
> KSTREAM-SINK-0000000010:
> topic: activities-by-phone-store-or-repartition
> KSTREAM-BRANCHCHILD-0000000005:
> Partitions [activities-avro-or-1]
> StreamsTask taskId: 1_5
> ProcessorTopology:
> KSTREAM-SOURCE-0000000012:
> topics: [activities-by-phone-store-or-repartition]
> children: [KSTREAM-AGGREGATE-0000000009]
> KSTREAM-AGGREGATE-0000000009:
> states: [activities-by-phone-store-or]
> children: [KTABLE-TOSTREAM-0000000013]
> KTABLE-TOSTREAM-0000000013:
> children: [KSTREAM-FILTER-0000000014]
> KSTREAM-FILTER-0000000014:
> children: [KSTREAM-FILTER-0000000015]
> KSTREAM-FILTER-0000000015:
> children: [KSTREAM-MAP-0000000016]
> KSTREAM-MAP-0000000016:
> children: [KSTREAM-MAP-0000000017]
> KSTREAM-MAP-0000000017:
> children: [KSTREAM-SINK-0000000018]
> KSTREAM-SINK-0000000018:
> topic: ml-features-avro-or
> Partitions [activities-by-phone-store-or-repartition-5]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-6
> Active tasks:
> StreamsTask taskId: 1_4
> ProcessorTopology:
> KSTREAM-SOURCE-0000000012:
> topics: [activities-by-phone-store-or-repartition]
> children: [KSTREAM-AGGREGATE-0000000009]
> KSTREAM-AGGREGATE-0000000009:
> states: [activities-by-phone-store-or]
> children: [KTABLE-TOSTREAM-0000000013]
> KTABLE-TOSTREAM-0000000013:
> children: [KSTREAM-FILTER-0000000014]
> KSTREAM-FILTER-0000000014:
> children: [KSTREAM-FILTER-0000000015]
> KSTREAM-FILTER-0000000015:
> children: [KSTREAM-MAP-0000000016]
> KSTREAM-MAP-0000000016:
> children: [KSTREAM-MAP-0000000017]
> KSTREAM-MAP-0000000017:
> children: [KSTREAM-SINK-0000000018]
> KSTREAM-SINK-0000000018:
> topic: ml-features-avro-or
> Partitions [activities-by-phone-store-or-repartition-4]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-7
> Active tasks:
> StreamsTask taskId: 0_2
> ProcessorTopology:
> KSTREAM-SOURCE-0000000000:
> topics: [activities-avro-or]
> children: [KSTREAM-FILTER-0000000001]
> KSTREAM-FILTER-0000000001:
> children: [KSTREAM-MAP-0000000002]
> KSTREAM-MAP-0000000002:
> children: [KSTREAM-BRANCH-0000000003]
> KSTREAM-BRANCH-0000000003:
> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
> KSTREAM-BRANCHCHILD-0000000004:
> children: [KSTREAM-MAPVALUES-0000000006]
> KSTREAM-MAPVALUES-0000000006:
> children: [KSTREAM-FLATMAPVALUES-0000000007]
> KSTREAM-FLATMAPVALUES-0000000007:
> children: [KSTREAM-MAP-0000000008]
> KSTREAM-MAP-0000000008:
> children: [KSTREAM-FILTER-0000000011]
> KSTREAM-FILTER-0000000011:
> children: [KSTREAM-SINK-0000000010]
> KSTREAM-SINK-0000000010:
> topic: activities-by-phone-store-or-repartition
> KSTREAM-BRANCHCHILD-0000000005:
> Partitions [activities-avro-or-2]
> StreamsTask taskId: 0_6
> ProcessorTopology:
> KSTREAM-SOURCE-0000000000:
> topics: [activities-avro-or]
> children: [KSTREAM-FILTER-0000000001]
> KSTREAM-FILTER-0000000001:
> children: [KSTREAM-MAP-0000000002]
> KSTREAM-MAP-0000000002:
> children: [KSTREAM-BRANCH-0000000003]
> KSTREAM-BRANCH-0000000003:
> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
> KSTREAM-BRANCHCHILD-0000000004:
> children: [KSTREAM-MAPVALUES-0000000006]
> KSTREAM-MAPVALUES-0000000006:
> children: [KSTREAM-FLATMAPVALUES-0000000007]
> KSTREAM-FLATMAPVALUES-0000000007:
> children: [KSTREAM-MAP-0000000008]
> KSTREAM-MAP-0000000008:
> children: [KSTREAM-FILTER-0000000011]
> KSTREAM-FILTER-0000000011:
> children: [KSTREAM-SINK-0000000010]
> KSTREAM-SINK-0000000010:
> topic: activities-by-phone-store-or-repartition
> KSTREAM-BRANCHCHILD-0000000005:
> Partitions [activities-avro-or-6]
> Standby tasks:
> 
> StreamsThread appId: mar-23-modular
> StreamsThread clientId: mar-23-modular
> StreamsThread threadId: StreamThread-8
> Active tasks:
> StreamsTask taskId: 0_5
> ProcessorTopology:
> KSTREAM-SOURCE-0000000000:
> topics: [activities-avro-or]
> children: [KSTREAM-FILTER-0000000001]
> KSTREAM-FILTER-0000000001:
> children: [KSTREAM-MAP-0000000002]
> KSTREAM-MAP-0000000002:
> children: [KSTREAM-BRANCH-0000000003]
> KSTREAM-BRANCH-0000000003:
> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005]
> KSTREAM-BRANCHCHILD-0000000004:
> children: [KSTREAM-MAPVALUES-0000000006]
> KSTREAM-MAPVALUES-0000000006:
> children: [KSTREAM-FLATMAPVALUES-0000000007]
> KSTREAM-FLATMAPVALUES-0000000007:
> children: [KSTREAM-MAP-0000000008]
> KSTREAM-MAP-0000000008:
> children: [KSTREAM-FILTER-0000000011]
> KSTREAM-FILTER-0000000011:
> children: [KSTREAM-SINK-0000000010]
> KSTREAM-SINK-0000000010:
> topic: activities-by-phone-store-or-repartition
> KSTREAM-BRANCHCHILD-0000000005:
> Partitions [activities-avro-or-5]
> Standby tasks:
> 
> 
> activities-avro-or is input topic. ml-features-avro-or is output topic. In the middle
we have an aggregate (activities-by-phone-store-or-repartition).
> 
> On instance 1 I see 3 tasks for activities-avro-or and on instance 2 I see 5. Bad.
> 
> On instance 1 see 4 tasks for ml-features-avro-or. And 4 on instance 2. Good.
> 
> On instance 1 see 5 tasks for activities-by-phone-store-or-repartition. And 3 on instance
2. Bad.
> 
> As I said in terms of offsets for all these partitions I see uniform distribution, so
we’re not dealing with a bad key scenario.
> 
> Ara.
> 
> On Mar 25, 2017, at 6:43 PM, Matthias J. Sax <matthias@confluent.io<mailto:matthias@confluent.io>>
wrote:
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, proprietary,
or otherwise confidential information. If you have received it in error, please notify the
sender immediately and delete the original. Any other use of the e-mail by you is prohibited.
Thank you in advance for your cooperation.
> 
> ________________________________
> 
> From: "Matthias J. Sax" <matthias@confluent.io<mailto:matthias@confluent.io>>
> Subject: Re: more uniform task assignment across kafka stream nodes
> Date: March 25, 2017 at 6:43:12 PM PDT
> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>
> 
> 
> Please share the rest of your topology code (without any UDFs / business
> logic). Otherwise, I cannot give further advice.
> 
> -Matthias
> 
> 
> On 3/25/17 6:08 PM, Ara Ebrahimi wrote:
> Via:
> 
> builder.stream("topic1");
> builder.stream("topic2");
> builder.stream("topic3”);
> 
> These are different kinds of topics consuming different avro objects.
> 
> Ara.
> 
> On Mar 25, 2017, at 6:04 PM, Matthias J. Sax <matthias@confluent.io<mailto:matthias@confluent.io><mailto:matthias@confluent.io>>
wrote:
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, proprietary,
or otherwise confidential information. If you have received it in error, please notify the
sender immediately and delete the original. Any other use of the e-mail by you is prohibited.
Thank you in advance for your cooperation.
> 
> ________________________________
> 
> From: "Matthias J. Sax" <matthias@confluent.io<mailto:matthias@confluent.io><mailto:matthias@confluent.io>>
> Subject: Re: more uniform task assignment across kafka stream nodes
> Date: March 25, 2017 at 6:04:30 PM PDT
> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>
> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>>
> 
> 
> Ara,
> 
> How do you consume your topics? Via
> 
> builder.stream("topic1", "topic2", "topic3);
> 
> or via
> 
> builder.stream("topic1");
> builder.stream("topic2");
> builder.stream("topic3");
> 
> Both and handled differently with regard to creating tasks (partition to
> task assignment also depends on you downstream code though).
> 
> If this does not help, can you maybe share the structure of processing?
> To dig deeper, we would need to know the topology DAG.
> 
> 
> -Matthias
> 
> 
> On 3/25/17 5:56 PM, Ara Ebrahimi wrote:
> Mathias,
> 
> This apparently happens because we have more than 1 source topic. We have 3 source topics
in the same application. So it seems like the task assignment algorithm creates topologies
not for one specific topic at a time but the total partitions across all source topics consumed
in an application instance. Because we have some code dependencies between these 3 source
topics we can’t separate them into 3 applications at this time. Hence the reason I want
to get the task assignment algorithm basically do a uniform and simple task assignment PER
source topic.
> 
> Ara.
> 
> On Mar 25, 2017, at 5:21 PM, Matthias J. Sax <matthias@confluent.io<mailto:matthias@confluent.io><mailto:matthias@confluent.io><mailto:matthias@confluent.io>>
wrote:
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, proprietary,
or otherwise confidential information. If you have received it in error, please notify the
sender immediately and delete the original. Any other use of the e-mail by you is prohibited.
Thank you in advance for your cooperation.
> 
> ________________________________
> 
> From: "Matthias J. Sax" <matthias@confluent.io<mailto:matthias@confluent.io><mailto:matthias@confluent.io><mailto:matthias@confluent.io>>
> Subject: Re: more uniform task assignment across kafka stream nodes
> Date: March 25, 2017 at 5:21:47 PM PDT
> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>
> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:users@kafka.apache.org><mailto:users@kafka.apache.org>>
> 
> 
> Hi,
> 
> I am wondering why this happens in the first place. Streams,
> load-balanced over all running instances, and each instance should be
> the same number of tasks (and thus partitions) assigned.
> 
> What is the overall assignment? Do you have StandyBy tasks configured?
> What version do you use?
> 
> 
> -Matthias
> 
> 
> On 3/24/17 8:09 PM, Ara Ebrahimi wrote:
> Hi,
> 
> Is there a way to tell kafka streams to uniformly assign partitions across instances?
If I have n kafka streams instances running, I want each to handle EXACTLY 1/nth number of
partitions. No dynamic task assignment logic. Just dumb 1/n assignment.
> 
> Here’s our scenario. Lets say we have an “source" topic with 8 partitions. We also
have 2 kafka streams instances. Each instances get assigned to handle 4 “source" topic partitions.
BUT then we do a few maps and an aggregate. So data gets shuffled around. The map function
uniformly distributes these across all partitions (I can verify that by looking at the partition
offsets). After the map what I notice by looking at the topology is that one kafka streams
instance get assigned to handle say 2 aggregate repartition topics and the other one gets
assigned 6. Even worse, on bigger clusters (say 4 instances) we see say 2 nodes gets assigned
downstream aggregate repartition topics and 2 other nodes assigned NOTHING to handle.
> 
> Ara.
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, proprietary,
or otherwise confidential information. If you have received it in error, please notify the
sender immediately and delete the original. Any other use of the e-mail by you is prohibited.
Thank you in advance for your cooperation.
> 
> ________________________________
> 
> 
> 
> 
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, proprietary,
or otherwise confidential information. If you have received it in error, please notify the
sender immediately and delete the original. Any other use of the e-mail by you is prohibited.
Thank you in advance for your cooperation.
> 
> ________________________________
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, proprietary,
or otherwise confidential information. If you have received it in error, please notify the
sender immediately and delete the original. Any other use of the e-mail by you is prohibited.
Thank you in advance for your cooperation.
> 
> ________________________________
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, proprietary,
or otherwise confidential information. If you have received it in error, please notify the
sender immediately and delete the original. Any other use of the e-mail by you is prohibited.
Thank you in advance for your cooperation.
> 
> ________________________________
> 
> 
> 
> 
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, proprietary,
or otherwise confidential information. If you have received it in error, please notify the
sender immediately and delete the original. Any other use of the e-mail by you is prohibited.
Thank you in advance for your cooperation.
> 
> ________________________________
> 
> 
> 
> 
> 
> 
> 
> 
> ________________________________
> 
> This message is for the designated recipient only and may contain privileged, proprietary,
or otherwise confidential information. If you have received it in error, please notify the
sender immediately and delete the original. Any other use of the e-mail by you is prohibited.
Thank you in advance for your cooperation.
> 
> ________________________________
> 


Mime
View raw message