kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: session window bug not fixed in 0.10.2.1?
Date Thu, 04 May 2017 07:31:28 GMT
Ara,

That is a bit weird, I double checked and agreed with Eno that this commit
is in both trunk and 0.10.2, so I suspect the same issue still persists in
trunk, hence there might be another issue that is not fixed in 2645. Could
you help verify if that is the case? In which we can re-open
https://issues.apache.org/jira/browse/KAFKA-4851 and investigate further.


Guozhang


On Tue, May 2, 2017 at 1:02 PM, Ara Ebrahimi <ara.ebrahimi@argyledata.com>
wrote:

> No errors. But if I enable caching I see performance drop considerably.
> The workaround was to disable caching. The same thing is still true in
> 10.2.1.
>
> Ara.
>
> > On May 2, 2017, at 12:55 PM, Eno Thereska <eno.thereska@gmail.com>
> wrote:
> >
> > Hi Ara,
> >
> > The PR https://github.com/apache/kafka/pull/2645 has gone to both trunk
> and
> > 0.10.2.1, I just checked. What error are you seeing, could you give us an
> > update?
> >
> > Thanks
> > Eno
> >
> > On Fri, Apr 28, 2017 at 7:10 PM, Ara Ebrahimi <
> ara.ebrahimi@argyledata.com>
> > wrote:
> >
> >> Hi,
> >>
> >> I upgraded to 0.10.2.1 yesterday, enabled caching for session windows
> and
> >> tested again. It doesn’t seem to be fixed?
> >>
> >> Ara.
> >>
> >>> On Mar 27, 2017, at 2:10 PM, Damian Guy <damian.guy@gmail.com> wrote:
> >>>
> >>> Hi Ara,
> >>>
> >>> There is a performance issue in the 0.10.2 release of session windows.
> It
> >>> is fixed with this PR: https://github.com/apache/kafka/pull/2645
> >>> You can work around this on 0.10.2 by calling the aggregate(..),
> >> reduce(..)
> >>> etc methods and supplying StateStoreSupplier<SessionStore> with
> caching
> >>> disabled, i.e, by doing something like:
> >>>
> >>> final StateStoreSupplier<SessionStore> sessionStore =
> >>> Stores.create(*"session-store-name"*)
> >>>   .withKeys(Serdes.String())
> >>>   .withValues(Serdes.String())
> >>>   .persistent()
> >>>   .sessionWindowed(TimeUnit.MINUTES.toMillis(7))
> >>>   .build();
> >>>
> >>>
> >>> The fix has also been cherry-picked to the 0.10.2 branch, so you could
> >>> build from source and not have to create the StateStoreSupplier.
> >>>
> >>> Thanks,
> >>> Damian
> >>>
> >>> On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi <ara.ebrahimi@argyledata.com
> >
> >>> wrote:
> >>>
> >>> Thanks for the response Mathias!
> >>>
> >>> The reason we want this exact task assignment to happen is that a
> >> critical
> >>> part of our pipeline involves grouping relevant records together
> (that’s
> >>> what the aggregate function in the topology is for). And for hot keys
> >> this
> >>> can lead to sometimes 100s of records to get grouped together. Even
> >> worse,
> >>> these records are session bound, we use session windows. Hence we see
> >> lots
> >>> of activity around the store backing the aggregate function and even
> >> though
> >>> we use SSD drives we’re not seeing the kind of performance we want to
> >> see.
> >>> It seems like the aggregate function leads to lots of updates to these
> >> hot
> >>> keys which lead to lots of rocksdb activity.
> >>>
> >>> Now there are many ways to fix this problem:
> >>> - just don’t aggregate, create an algorithm which is not reliant on
> >>> grouping/aggregating records. Not what we can do with our tight
> schedule
> >>> right now.
> >>> - do grouping/aggregating but employ n instances and rely on uniform
> >>> distribution of these tasks. This is the easiest solution and what we
> >>> expected to work but didn’t work as you can tell from this thread. We
> >> threw
> >>> 4 instances at it but only 2 got used.
> >>> - tune rocksdb? I tried this actually but it didn’t really help us
> much,
> >>> aside from the fact that tuning rocksdb is very tricky.
> >>> - use in-memory store instead? Unfortunately we have to use session
> >> windows
> >>> for this aggregate function and apparently there’s no in-memory session
> >>> store impl? I tried to create one but soon realized it’s too much work
> >> :) I
> >>> looked at default PartitionAssigner code too, but that ain’t trivial
> >> either.
> >>>
> >>> So I’m a bit hopeless :(
> >>>
> >>> Ara.
> >>>
> >>> On Mar 27, 2017, at 1:35 PM, Matthias J. Sax <matthias@confluent.io
> >> <mailto:
> >>> matthias@confluent.io>> wrote:
> >>>
> >>>
> >>>
> >>>
> >>> ________________________________
> >>>
> >>> This message is for the designated recipient only and may contain
> >>> privileged, proprietary, or otherwise confidential information. If you
> >> have
> >>> received it in error, please notify the sender immediately and delete
> the
> >>> original. Any other use of the e-mail by you is prohibited. Thank you
> in
> >>> advance for your cooperation.
> >>>
> >>> ________________________________
> >>>
> >>> From: "Matthias J. Sax" <matthias@confluent.io<mailto:
> >> matthias@confluent.io
> >>>>>
> >>> Subject: Re: more uniform task assignment across kafka stream nodes
> >>> Date: March 27, 2017 at 1:35:30 PM PDT
> >>> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> >>> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>
> >>>
> >>>
> >>> Ara,
> >>>
> >>> thanks for the detailed information.
> >>>
> >>> If I parse this correctly, both instances run the same number of tasks
> >>> (12 each). That is all Streams promises.
> >>>
> >>> To come back to your initial question:
> >>>
> >>> Is there a way to tell kafka streams to uniformly assign partitions
> >> across
> >>> instances? If I have n kafka streams instances running, I want each to
> >>> handle EXACTLY 1/nth number of partitions. No dynamic task assignment
> >>> logic. Just dumb 1/n assignment.
> >>>
> >>> That is exactly what you get: each of you two instances get 24/2 = 12
> >>> tasks assigned. That is dump 1/n assignment, isn't it? So my original
> >>> response was correct.
> >>>
> >>> However, I now understand better what you are actually meaning by your
> >>> question. Note that Streams does not distinguish "type" of tasks -- it
> >>> only sees 24 tasks and assigns those in a balanced way.
> >>>
> >>> Thus, currently there is no easy way to get the assignment you want to
> >>> have, except, you implement you own `PartitionAssignor`.
> >>>
> >>> This is the current implementation for 0.10.2
> >>> https://github.com/apache/kafka/blob/0.10.2/streams/src/
> >> main/java/org/apache/kafka/streams/processor/internals/
> >> StreamPartitionAssignor.java
> >>>
> >>> You can, if you wish write your own assignor and set it via
> >>> StreamsConfig. However, be aware that this might be tricky to get right
> >>> and also might have runtime implications with regard to rebalancing and
> >>> state store recovery. We recently improve the current implementation to
> >>> avoid costly task movements:
> >>> https://issues.apache.org/jira/browse/KAFKA-4677
> >>>
> >>> Thus, I would not recommend to implement an own `PartitionAssignor`.
> >>>
> >>>
> >>> However, the root question is, why do you need this exact assignment
> you
> >>> are looking for in the first place? Why is it "bad" if "types" of tasks
> >>> are not distinguished? I would like to understand your requirement
> >>> better -- it might be worth to improve Streams here.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 3/27/17 12:57 PM, Ara Ebrahimi wrote:
> >>> Hi,
> >>>
> >>> So, I simplified the topology by making sure we have only 1 source
> topic.
> >>> Now I have 1 source topic, 8 partitions, 2 instances. And here’s how
> the
> >>> topology looks like:
> >>>
> >>> instance 1:
> >>>
> >>> KafkaStreams processID: 48b58bc0-f600-4ec8-bc92-8cb3ea081aac
> >>> StreamsThread appId: mar-23-modular
> >>> StreamsThread clientId: mar-23-modular
> >>> StreamsThread threadId: StreamThread-1
> >>> Active tasks:
> >>> StreamsTask taskId: 0_3
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000000:
> >>> topics: [activities-avro-or]
> >>> children: [KSTREAM-FILTER-0000000001]
> >>> KSTREAM-FILTER-0000000001:
> >>> children: [KSTREAM-MAP-0000000002]
> >>> KSTREAM-MAP-0000000002:
> >>> children: [KSTREAM-BRANCH-0000000003]
> >>> KSTREAM-BRANCH-0000000003:
> >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> >> 0000000005]
> >>> KSTREAM-BRANCHCHILD-0000000004:
> >>> children: [KSTREAM-MAPVALUES-0000000006]
> >>> KSTREAM-MAPVALUES-0000000006:
> >>> children: [KSTREAM-FLATMAPVALUES-0000000007]
> >>> KSTREAM-FLATMAPVALUES-0000000007:
> >>> children: [KSTREAM-MAP-0000000008]
> >>> KSTREAM-MAP-0000000008:
> >>> children: [KSTREAM-FILTER-0000000011]
> >>> KSTREAM-FILTER-0000000011:
> >>> children: [KSTREAM-SINK-0000000010]
> >>> KSTREAM-SINK-0000000010:
> >>> topic: activities-by-phone-store-or-repartition
> >>> KSTREAM-BRANCHCHILD-0000000005:
> >>> Partitions [activities-avro-or-3]
> >>> Standby tasks:
> >>>
> >>> StreamsThread appId: mar-23-modular
> >>> StreamsThread clientId: mar-23-modular
> >>> StreamsThread threadId: StreamThread-2
> >>> Active tasks:
> >>> StreamsTask taskId: 1_2
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000012:
> >>> topics: [activities-by-phone-store-or-repartition]
> >>> children: [KSTREAM-AGGREGATE-0000000009]
> >>> KSTREAM-AGGREGATE-0000000009:
> >>> states: [activities-by-phone-store-or]
> >>> children: [KTABLE-TOSTREAM-0000000013]
> >>> KTABLE-TOSTREAM-0000000013:
> >>> children: [KSTREAM-FILTER-0000000014]
> >>> KSTREAM-FILTER-0000000014:
> >>> children: [KSTREAM-FILTER-0000000015]
> >>> KSTREAM-FILTER-0000000015:
> >>> children: [KSTREAM-MAP-0000000016]
> >>> KSTREAM-MAP-0000000016:
> >>> children: [KSTREAM-MAP-0000000017]
> >>> KSTREAM-MAP-0000000017:
> >>> children: [KSTREAM-SINK-0000000018]
> >>> KSTREAM-SINK-0000000018:
> >>> topic: ml-features-avro-or
> >>> Partitions [activities-by-phone-store-or-repartition-2]
> >>> Standby tasks:
> >>>
> >>> StreamsThread appId: mar-23-modular
> >>> StreamsThread clientId: mar-23-modular
> >>> StreamsThread threadId: StreamThread-3
> >>> Active tasks:
> >>> StreamsTask taskId: 1_1
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000012:
> >>> topics: [activities-by-phone-store-or-repartition]
> >>> children: [KSTREAM-AGGREGATE-0000000009]
> >>> KSTREAM-AGGREGATE-0000000009:
> >>> states: [activities-by-phone-store-or]
> >>> children: [KTABLE-TOSTREAM-0000000013]
> >>> KTABLE-TOSTREAM-0000000013:
> >>> children: [KSTREAM-FILTER-0000000014]
> >>> KSTREAM-FILTER-0000000014:
> >>> children: [KSTREAM-FILTER-0000000015]
> >>> KSTREAM-FILTER-0000000015:
> >>> children: [KSTREAM-MAP-0000000016]
> >>> KSTREAM-MAP-0000000016:
> >>> children: [KSTREAM-MAP-0000000017]
> >>> KSTREAM-MAP-0000000017:
> >>> children: [KSTREAM-SINK-0000000018]
> >>> KSTREAM-SINK-0000000018:
> >>> topic: ml-features-avro-or
> >>> Partitions [activities-by-phone-store-or-repartition-1]
> >>> StreamsTask taskId: 2_7
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000019:
> >>> topics: [ml-features-avro-or]
> >>> Partitions [ml-features-avro-or-7]
> >>> Standby tasks:
> >>>
> >>> StreamsThread appId: mar-23-modular
> >>> StreamsThread clientId: mar-23-modular
> >>> StreamsThread threadId: StreamThread-4
> >>> Active tasks:
> >>> StreamsTask taskId: 2_0
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000019:
> >>> topics: [ml-features-avro-or]
> >>> Partitions [ml-features-avro-or-0]
> >>> StreamsTask taskId: 2_6
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000019:
> >>> topics: [ml-features-avro-or]
> >>> Partitions [ml-features-avro-or-6]
> >>> Standby tasks:
> >>>
> >>> StreamsThread appId: mar-23-modular
> >>> StreamsThread clientId: mar-23-modular
> >>> StreamsThread threadId: StreamThread-5
> >>> Active tasks:
> >>> StreamsTask taskId: 0_0
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000000:
> >>> topics: [activities-avro-or]
> >>> children: [KSTREAM-FILTER-0000000001]
> >>> KSTREAM-FILTER-0000000001:
> >>> children: [KSTREAM-MAP-0000000002]
> >>> KSTREAM-MAP-0000000002:
> >>> children: [KSTREAM-BRANCH-0000000003]
> >>> KSTREAM-BRANCH-0000000003:
> >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> >> 0000000005]
> >>> KSTREAM-BRANCHCHILD-0000000004:
> >>> children: [KSTREAM-MAPVALUES-0000000006]
> >>> KSTREAM-MAPVALUES-0000000006:
> >>> children: [KSTREAM-FLATMAPVALUES-0000000007]
> >>> KSTREAM-FLATMAPVALUES-0000000007:
> >>> children: [KSTREAM-MAP-0000000008]
> >>> KSTREAM-MAP-0000000008:
> >>> children: [KSTREAM-FILTER-0000000011]
> >>> KSTREAM-FILTER-0000000011:
> >>> children: [KSTREAM-SINK-0000000010]
> >>> KSTREAM-SINK-0000000010:
> >>> topic: activities-by-phone-store-or-repartition
> >>> KSTREAM-BRANCHCHILD-0000000005:
> >>> Partitions [activities-avro-or-0]
> >>> StreamsTask taskId: 1_6
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000012:
> >>> topics: [activities-by-phone-store-or-repartition]
> >>> children: [KSTREAM-AGGREGATE-0000000009]
> >>> KSTREAM-AGGREGATE-0000000009:
> >>> states: [activities-by-phone-store-or]
> >>> children: [KTABLE-TOSTREAM-0000000013]
> >>> KTABLE-TOSTREAM-0000000013:
> >>> children: [KSTREAM-FILTER-0000000014]
> >>> KSTREAM-FILTER-0000000014:
> >>> children: [KSTREAM-FILTER-0000000015]
> >>> KSTREAM-FILTER-0000000015:
> >>> children: [KSTREAM-MAP-0000000016]
> >>> KSTREAM-MAP-0000000016:
> >>> children: [KSTREAM-MAP-0000000017]
> >>> KSTREAM-MAP-0000000017:
> >>> children: [KSTREAM-SINK-0000000018]
> >>> KSTREAM-SINK-0000000018:
> >>> topic: ml-features-avro-or
> >>> Partitions [activities-by-phone-store-or-repartition-6]
> >>> Standby tasks:
> >>>
> >>> StreamsThread appId: mar-23-modular
> >>> StreamsThread clientId: mar-23-modular
> >>> StreamsThread threadId: StreamThread-6
> >>> Active tasks:
> >>> StreamsTask taskId: 1_0
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000012:
> >>> topics: [activities-by-phone-store-or-repartition]
> >>> children: [KSTREAM-AGGREGATE-0000000009]
> >>> KSTREAM-AGGREGATE-0000000009:
> >>> states: [activities-by-phone-store-or]
> >>> children: [KTABLE-TOSTREAM-0000000013]
> >>> KTABLE-TOSTREAM-0000000013:
> >>> children: [KSTREAM-FILTER-0000000014]
> >>> KSTREAM-FILTER-0000000014:
> >>> children: [KSTREAM-FILTER-0000000015]
> >>> KSTREAM-FILTER-0000000015:
> >>> children: [KSTREAM-MAP-0000000016]
> >>> KSTREAM-MAP-0000000016:
> >>> children: [KSTREAM-MAP-0000000017]
> >>> KSTREAM-MAP-0000000017:
> >>> children: [KSTREAM-SINK-0000000018]
> >>> KSTREAM-SINK-0000000018:
> >>> topic: ml-features-avro-or
> >>> Partitions [activities-by-phone-store-or-repartition-0]
> >>> StreamsTask taskId: 0_7
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000000:
> >>> topics: [activities-avro-or]
> >>> children: [KSTREAM-FILTER-0000000001]
> >>> KSTREAM-FILTER-0000000001:
> >>> children: [KSTREAM-MAP-0000000002]
> >>> KSTREAM-MAP-0000000002:
> >>> children: [KSTREAM-BRANCH-0000000003]
> >>> KSTREAM-BRANCH-0000000003:
> >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> >> 0000000005]
> >>> KSTREAM-BRANCHCHILD-0000000004:
> >>> children: [KSTREAM-MAPVALUES-0000000006]
> >>> KSTREAM-MAPVALUES-0000000006:
> >>> children: [KSTREAM-FLATMAPVALUES-0000000007]
> >>> KSTREAM-FLATMAPVALUES-0000000007:
> >>> children: [KSTREAM-MAP-0000000008]
> >>> KSTREAM-MAP-0000000008:
> >>> children: [KSTREAM-FILTER-0000000011]
> >>> KSTREAM-FILTER-0000000011:
> >>> children: [KSTREAM-SINK-0000000010]
> >>> KSTREAM-SINK-0000000010:
> >>> topic: activities-by-phone-store-or-repartition
> >>> KSTREAM-BRANCHCHILD-0000000005:
> >>> Partitions [activities-avro-or-7]
> >>> Standby tasks:
> >>>
> >>> StreamsThread appId: mar-23-modular
> >>> StreamsThread clientId: mar-23-modular
> >>> StreamsThread threadId: StreamThread-7
> >>> Active tasks:
> >>> StreamsTask taskId: 2_4
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000019:
> >>> topics: [ml-features-avro-or]
> >>> Partitions [ml-features-avro-or-4]
> >>> Standby tasks:
> >>>
> >>> StreamsThread appId: mar-23-modular
> >>> StreamsThread clientId: mar-23-modular
> >>> StreamsThread threadId: StreamThread-8
> >>> Active tasks:
> >>> StreamsTask taskId: 1_3
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000012:
> >>> topics: [activities-by-phone-store-or-repartition]
> >>> children: [KSTREAM-AGGREGATE-0000000009]
> >>> KSTREAM-AGGREGATE-0000000009:
> >>> states: [activities-by-phone-store-or]
> >>> children: [KTABLE-TOSTREAM-0000000013]
> >>> KTABLE-TOSTREAM-0000000013:
> >>> children: [KSTREAM-FILTER-0000000014]
> >>> KSTREAM-FILTER-0000000014:
> >>> children: [KSTREAM-FILTER-0000000015]
> >>> KSTREAM-FILTER-0000000015:
> >>> children: [KSTREAM-MAP-0000000016]
> >>> KSTREAM-MAP-0000000016:
> >>> children: [KSTREAM-MAP-0000000017]
> >>> KSTREAM-MAP-0000000017:
> >>> children: [KSTREAM-SINK-0000000018]
> >>> KSTREAM-SINK-0000000018:
> >>> topic: ml-features-avro-or
> >>> Partitions [activities-by-phone-store-or-repartition-3]
> >>> Standby tasks:
> >>>
> >>>
> >>> instance 2:
> >>>
> >>> KafkaStreams processID: 092072f8-87be-4989-a94f-0ed544f5ca44
> >>> StreamsThread appId: mar-23-modular
> >>> StreamsThread clientId: mar-23-modular
> >>> StreamsThread threadId: StreamThread-1
> >>> Active tasks:
> >>> StreamsTask taskId: 2_1
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000019:
> >>> topics: [ml-features-avro-or]
> >>> Partitions [ml-features-avro-or-1]
> >>> StreamsTask taskId: 2_5
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000019:
> >>> topics: [ml-features-avro-or]
> >>> Partitions [ml-features-avro-or-5]
> >>> Standby tasks:
> >>>
> >>> StreamsThread appId: mar-23-modular
> >>> StreamsThread clientId: mar-23-modular
> >>> StreamsThread threadId: StreamThread-2
> >>> Active tasks:
> >>> StreamsTask taskId: 0_4
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000000:
> >>> topics: [activities-avro-or]
> >>> children: [KSTREAM-FILTER-0000000001]
> >>> KSTREAM-FILTER-0000000001:
> >>> children: [KSTREAM-MAP-0000000002]
> >>> KSTREAM-MAP-0000000002:
> >>> children: [KSTREAM-BRANCH-0000000003]
> >>> KSTREAM-BRANCH-0000000003:
> >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> >> 0000000005]
> >>> KSTREAM-BRANCHCHILD-0000000004:
> >>> children: [KSTREAM-MAPVALUES-0000000006]
> >>> KSTREAM-MAPVALUES-0000000006:
> >>> children: [KSTREAM-FLATMAPVALUES-0000000007]
> >>> KSTREAM-FLATMAPVALUES-0000000007:
> >>> children: [KSTREAM-MAP-0000000008]
> >>> KSTREAM-MAP-0000000008:
> >>> children: [KSTREAM-FILTER-0000000011]
> >>> KSTREAM-FILTER-0000000011:
> >>> children: [KSTREAM-SINK-0000000010]
> >>> KSTREAM-SINK-0000000010:
> >>> topic: activities-by-phone-store-or-repartition
> >>> KSTREAM-BRANCHCHILD-0000000005:
> >>> Partitions [activities-avro-or-4]
> >>> Standby tasks:
> >>>
> >>> StreamsThread appId: mar-23-modular
> >>> StreamsThread clientId: mar-23-modular
> >>> StreamsThread threadId: StreamThread-3
> >>> Active tasks:
> >>> StreamsTask taskId: 2_2
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000019:
> >>> topics: [ml-features-avro-or]
> >>> Partitions [ml-features-avro-or-2]
> >>> StreamsTask taskId: 1_7
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000012:
> >>> topics: [activities-by-phone-store-or-repartition]
> >>> children: [KSTREAM-AGGREGATE-0000000009]
> >>> KSTREAM-AGGREGATE-0000000009:
> >>> states: [activities-by-phone-store-or]
> >>> children: [KTABLE-TOSTREAM-0000000013]
> >>> KTABLE-TOSTREAM-0000000013:
> >>> children: [KSTREAM-FILTER-0000000014]
> >>> KSTREAM-FILTER-0000000014:
> >>> children: [KSTREAM-FILTER-0000000015]
> >>> KSTREAM-FILTER-0000000015:
> >>> children: [KSTREAM-MAP-0000000016]
> >>> KSTREAM-MAP-0000000016:
> >>> children: [KSTREAM-MAP-0000000017]
> >>> KSTREAM-MAP-0000000017:
> >>> children: [KSTREAM-SINK-0000000018]
> >>> KSTREAM-SINK-0000000018:
> >>> topic: ml-features-avro-or
> >>> Partitions [activities-by-phone-store-or-repartition-7]
> >>> Standby tasks:
> >>>
> >>> StreamsThread appId: mar-23-modular
> >>> StreamsThread clientId: mar-23-modular
> >>> StreamsThread threadId: StreamThread-4
> >>> Active tasks:
> >>> StreamsTask taskId: 2_3
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000019:
> >>> topics: [ml-features-avro-or]
> >>> Partitions [ml-features-avro-or-3]
> >>> Standby tasks:
> >>>
> >>> StreamsThread appId: mar-23-modular
> >>> StreamsThread clientId: mar-23-modular
> >>> StreamsThread threadId: StreamThread-5
> >>> Active tasks:
> >>> StreamsTask taskId: 0_1
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000000:
> >>> topics: [activities-avro-or]
> >>> children: [KSTREAM-FILTER-0000000001]
> >>> KSTREAM-FILTER-0000000001:
> >>> children: [KSTREAM-MAP-0000000002]
> >>> KSTREAM-MAP-0000000002:
> >>> children: [KSTREAM-BRANCH-0000000003]
> >>> KSTREAM-BRANCH-0000000003:
> >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> >> 0000000005]
> >>> KSTREAM-BRANCHCHILD-0000000004:
> >>> children: [KSTREAM-MAPVALUES-0000000006]
> >>> KSTREAM-MAPVALUES-0000000006:
> >>> children: [KSTREAM-FLATMAPVALUES-0000000007]
> >>> KSTREAM-FLATMAPVALUES-0000000007:
> >>> children: [KSTREAM-MAP-0000000008]
> >>> KSTREAM-MAP-0000000008:
> >>> children: [KSTREAM-FILTER-0000000011]
> >>> KSTREAM-FILTER-0000000011:
> >>> children: [KSTREAM-SINK-0000000010]
> >>> KSTREAM-SINK-0000000010:
> >>> topic: activities-by-phone-store-or-repartition
> >>> KSTREAM-BRANCHCHILD-0000000005:
> >>> Partitions [activities-avro-or-1]
> >>> StreamsTask taskId: 1_5
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000012:
> >>> topics: [activities-by-phone-store-or-repartition]
> >>> children: [KSTREAM-AGGREGATE-0000000009]
> >>> KSTREAM-AGGREGATE-0000000009:
> >>> states: [activities-by-phone-store-or]
> >>> children: [KTABLE-TOSTREAM-0000000013]
> >>> KTABLE-TOSTREAM-0000000013:
> >>> children: [KSTREAM-FILTER-0000000014]
> >>> KSTREAM-FILTER-0000000014:
> >>> children: [KSTREAM-FILTER-0000000015]
> >>> KSTREAM-FILTER-0000000015:
> >>> children: [KSTREAM-MAP-0000000016]
> >>> KSTREAM-MAP-0000000016:
> >>> children: [KSTREAM-MAP-0000000017]
> >>> KSTREAM-MAP-0000000017:
> >>> children: [KSTREAM-SINK-0000000018]
> >>> KSTREAM-SINK-0000000018:
> >>> topic: ml-features-avro-or
> >>> Partitions [activities-by-phone-store-or-repartition-5]
> >>> Standby tasks:
> >>>
> >>> StreamsThread appId: mar-23-modular
> >>> StreamsThread clientId: mar-23-modular
> >>> StreamsThread threadId: StreamThread-6
> >>> Active tasks:
> >>> StreamsTask taskId: 1_4
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000012:
> >>> topics: [activities-by-phone-store-or-repartition]
> >>> children: [KSTREAM-AGGREGATE-0000000009]
> >>> KSTREAM-AGGREGATE-0000000009:
> >>> states: [activities-by-phone-store-or]
> >>> children: [KTABLE-TOSTREAM-0000000013]
> >>> KTABLE-TOSTREAM-0000000013:
> >>> children: [KSTREAM-FILTER-0000000014]
> >>> KSTREAM-FILTER-0000000014:
> >>> children: [KSTREAM-FILTER-0000000015]
> >>> KSTREAM-FILTER-0000000015:
> >>> children: [KSTREAM-MAP-0000000016]
> >>> KSTREAM-MAP-0000000016:
> >>> children: [KSTREAM-MAP-0000000017]
> >>> KSTREAM-MAP-0000000017:
> >>> children: [KSTREAM-SINK-0000000018]
> >>> KSTREAM-SINK-0000000018:
> >>> topic: ml-features-avro-or
> >>> Partitions [activities-by-phone-store-or-repartition-4]
> >>> Standby tasks:
> >>>
> >>> StreamsThread appId: mar-23-modular
> >>> StreamsThread clientId: mar-23-modular
> >>> StreamsThread threadId: StreamThread-7
> >>> Active tasks:
> >>> StreamsTask taskId: 0_2
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000000:
> >>> topics: [activities-avro-or]
> >>> children: [KSTREAM-FILTER-0000000001]
> >>> KSTREAM-FILTER-0000000001:
> >>> children: [KSTREAM-MAP-0000000002]
> >>> KSTREAM-MAP-0000000002:
> >>> children: [KSTREAM-BRANCH-0000000003]
> >>> KSTREAM-BRANCH-0000000003:
> >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> >> 0000000005]
> >>> KSTREAM-BRANCHCHILD-0000000004:
> >>> children: [KSTREAM-MAPVALUES-0000000006]
> >>> KSTREAM-MAPVALUES-0000000006:
> >>> children: [KSTREAM-FLATMAPVALUES-0000000007]
> >>> KSTREAM-FLATMAPVALUES-0000000007:
> >>> children: [KSTREAM-MAP-0000000008]
> >>> KSTREAM-MAP-0000000008:
> >>> children: [KSTREAM-FILTER-0000000011]
> >>> KSTREAM-FILTER-0000000011:
> >>> children: [KSTREAM-SINK-0000000010]
> >>> KSTREAM-SINK-0000000010:
> >>> topic: activities-by-phone-store-or-repartition
> >>> KSTREAM-BRANCHCHILD-0000000005:
> >>> Partitions [activities-avro-or-2]
> >>> StreamsTask taskId: 0_6
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000000:
> >>> topics: [activities-avro-or]
> >>> children: [KSTREAM-FILTER-0000000001]
> >>> KSTREAM-FILTER-0000000001:
> >>> children: [KSTREAM-MAP-0000000002]
> >>> KSTREAM-MAP-0000000002:
> >>> children: [KSTREAM-BRANCH-0000000003]
> >>> KSTREAM-BRANCH-0000000003:
> >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> >> 0000000005]
> >>> KSTREAM-BRANCHCHILD-0000000004:
> >>> children: [KSTREAM-MAPVALUES-0000000006]
> >>> KSTREAM-MAPVALUES-0000000006:
> >>> children: [KSTREAM-FLATMAPVALUES-0000000007]
> >>> KSTREAM-FLATMAPVALUES-0000000007:
> >>> children: [KSTREAM-MAP-0000000008]
> >>> KSTREAM-MAP-0000000008:
> >>> children: [KSTREAM-FILTER-0000000011]
> >>> KSTREAM-FILTER-0000000011:
> >>> children: [KSTREAM-SINK-0000000010]
> >>> KSTREAM-SINK-0000000010:
> >>> topic: activities-by-phone-store-or-repartition
> >>> KSTREAM-BRANCHCHILD-0000000005:
> >>> Partitions [activities-avro-or-6]
> >>> Standby tasks:
> >>>
> >>> StreamsThread appId: mar-23-modular
> >>> StreamsThread clientId: mar-23-modular
> >>> StreamsThread threadId: StreamThread-8
> >>> Active tasks:
> >>> StreamsTask taskId: 0_5
> >>> ProcessorTopology:
> >>> KSTREAM-SOURCE-0000000000:
> >>> topics: [activities-avro-or]
> >>> children: [KSTREAM-FILTER-0000000001]
> >>> KSTREAM-FILTER-0000000001:
> >>> children: [KSTREAM-MAP-0000000002]
> >>> KSTREAM-MAP-0000000002:
> >>> children: [KSTREAM-BRANCH-0000000003]
> >>> KSTREAM-BRANCH-0000000003:
> >>> children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> >> 0000000005]
> >>> KSTREAM-BRANCHCHILD-0000000004:
> >>> children: [KSTREAM-MAPVALUES-0000000006]
> >>> KSTREAM-MAPVALUES-0000000006:
> >>> children: [KSTREAM-FLATMAPVALUES-0000000007]
> >>> KSTREAM-FLATMAPVALUES-0000000007:
> >>> children: [KSTREAM-MAP-0000000008]
> >>> KSTREAM-MAP-0000000008:
> >>> children: [KSTREAM-FILTER-0000000011]
> >>> KSTREAM-FILTER-0000000011:
> >>> children: [KSTREAM-SINK-0000000010]
> >>> KSTREAM-SINK-0000000010:
> >>> topic: activities-by-phone-store-or-repartition
> >>> KSTREAM-BRANCHCHILD-0000000005:
> >>> Partitions [activities-avro-or-5]
> >>> Standby tasks:
> >>>
> >>>
> >>> activities-avro-or is input topic. ml-features-avro-or is output topic.
> >> In
> >>> the middle we have an aggregate (activities-by-phone-store-or-
> >> repartition).
> >>>
> >>> On instance 1 I see 3 tasks for activities-avro-or and on instance 2 I
> >> see
> >>> 5. Bad.
> >>>
> >>> On instance 1 see 4 tasks for ml-features-avro-or. And 4 on instance 2.
> >>> Good.
> >>>
> >>> On instance 1 see 5 tasks for activities-by-phone-store-or-
> repartition.
> >> And
> >>> 3 on instance 2. Bad.
> >>>
> >>> As I said in terms of offsets for all these partitions I see uniform
> >>> distribution, so we’re not dealing with a bad key scenario.
> >>>
> >>> Ara.
> >>>
> >>> On Mar 25, 2017, at 6:43 PM, Matthias J. Sax <matthias@confluent.io
> >> <mailto:
> >>> matthias@confluent.io>> wrote:
> >>>
> >>>
> >>>
> >>>
> >>> ________________________________
> >>>
> >>> This message is for the designated recipient only and may contain
> >>> privileged, proprietary, or otherwise confidential information. If you
> >> have
> >>> received it in error, please notify the sender immediately and delete
> the
> >>> original. Any other use of the e-mail by you is prohibited. Thank you
> in
> >>> advance for your cooperation.
> >>>
> >>> ________________________________
> >>>
> >>> From: "Matthias J. Sax" <matthias@confluent.io<mailto:
> >> matthias@confluent.io
> >>>>>
> >>> Subject: Re: more uniform task assignment across kafka stream nodes
> >>> Date: March 25, 2017 at 6:43:12 PM PDT
> >>> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> >>> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>
> >>>
> >>>
> >>> Please share the rest of your topology code (without any UDFs /
> business
> >>> logic). Otherwise, I cannot give further advice.
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 3/25/17 6:08 PM, Ara Ebrahimi wrote:
> >>> Via:
> >>>
> >>> builder.stream("topic1");
> >>> builder.stream("topic2");
> >>> builder.stream("topic3”);
> >>>
> >>> These are different kinds of topics consuming different avro objects.
> >>>
> >>> Ara.
> >>>
> >>> On Mar 25, 2017, at 6:04 PM, Matthias J. Sax <matthias@confluent.io
> >> <mailto:
> >>> matthias@confluent.io><mailto:matthias@confluent.io>> wrote:
> >>>
> >>>
> >>>
> >>>
> >>> ________________________________
> >>>
> >>> This message is for the designated recipient only and may contain
> >>> privileged, proprietary, or otherwise confidential information. If you
> >> have
> >>> received it in error, please notify the sender immediately and delete
> the
> >>> original. Any other use of the e-mail by you is prohibited. Thank you
> in
> >>> advance for your cooperation.
> >>>
> >>> ________________________________
> >>>
> >>> From: "Matthias J. Sax" <matthias@confluent.io<mailto:
> >> matthias@confluent.io
> >>>> <mailto:matthias@confluent.io>>
> >>> Subject: Re: more uniform task assignment across kafka stream nodes
> >>> Date: March 25, 2017 at 6:04:30 PM PDT
> >>> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
> >>> users@kafka.apache.org>
> >>> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>
> <mailto:
> >>> users@kafka.apache.org>>
> >>>
> >>>
> >>> Ara,
> >>>
> >>> How do you consume your topics? Via
> >>>
> >>> builder.stream("topic1", "topic2", "topic3);
> >>>
> >>> or via
> >>>
> >>> builder.stream("topic1");
> >>> builder.stream("topic2");
> >>> builder.stream("topic3");
> >>>
> >>> Both and handled differently with regard to creating tasks (partition
> to
> >>> task assignment also depends on you downstream code though).
> >>>
> >>> If this does not help, can you maybe share the structure of processing?
> >>> To dig deeper, we would need to know the topology DAG.
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 3/25/17 5:56 PM, Ara Ebrahimi wrote:
> >>> Mathias,
> >>>
> >>> This apparently happens because we have more than 1 source topic. We
> >> have 3
> >>> source topics in the same application. So it seems like the task
> >> assignment
> >>> algorithm creates topologies not for one specific topic at a time but
> the
> >>> total partitions across all source topics consumed in an application
> >>> instance. Because we have some code dependencies between these 3 source
> >>> topics we can’t separate them into 3 applications at this time. Hence
> the
> >>> reason I want to get the task assignment algorithm basically do a
> uniform
> >>> and simple task assignment PER source topic.
> >>>
> >>> Ara.
> >>>
> >>> On Mar 25, 2017, at 5:21 PM, Matthias J. Sax <matthias@confluent.io
> >> <mailto:
> >>> matthias@confluent.io><mailto:matthias@confluent.io><mailto:
> >>> matthias@confluent.io>> wrote:
> >>>
> >>>
> >>>
> >>>
> >>> ________________________________
> >>>
> >>> This message is for the designated recipient only and may contain
> >>> privileged, proprietary, or otherwise confidential information. If you
> >> have
> >>> received it in error, please notify the sender immediately and delete
> the
> >>> original. Any other use of the e-mail by you is prohibited. Thank you
> in
> >>> advance for your cooperation.
> >>>
> >>> ________________________________
> >>>
> >>> From: "Matthias J. Sax" <matthias@confluent.io<mailto:
> >> matthias@confluent.io
> >>>> <mailto:matthias@confluent.io><mailto:matthias@confluent.io>>
> >>> Subject: Re: more uniform task assignment across kafka stream nodes
> >>> Date: March 25, 2017 at 5:21:47 PM PDT
> >>> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
> >>> users@kafka.apache.org><mailto:users@kafka.apache.org>
> >>> Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>
> <mailto:
> >>> users@kafka.apache.org><mailto:users@kafka.apache.org>>
> >>>
> >>>
> >>> Hi,
> >>>
> >>> I am wondering why this happens in the first place. Streams,
> >>> load-balanced over all running instances, and each instance should be
> >>> the same number of tasks (and thus partitions) assigned.
> >>>
> >>> What is the overall assignment? Do you have StandyBy tasks configured?
> >>> What version do you use?
> >>>
> >>>
> >>> -Matthias
> >>>
> >>>
> >>> On 3/24/17 8:09 PM, Ara Ebrahimi wrote:
> >>> Hi,
> >>>
> >>> Is there a way to tell kafka streams to uniformly assign partitions
> >> across
> >>> instances? If I have n kafka streams instances running, I want each to
> >>> handle EXACTLY 1/nth number of partitions. No dynamic task assignment
> >>> logic. Just dumb 1/n assignment.
> >>>
> >>> Here’s our scenario. Lets say we have an “source" topic with 8
> >> partitions.
> >>> We also have 2 kafka streams instances. Each instances get assigned to
> >>> handle 4 “source" topic partitions. BUT then we do a few maps and an
> >>> aggregate. So data gets shuffled around. The map function uniformly
> >>> distributes these across all partitions (I can verify that by looking
> at
> >>> the partition offsets). After the map what I notice by looking at the
> >>> topology is that one kafka streams instance get assigned to handle say
> 2
> >>> aggregate repartition topics and the other one gets assigned 6. Even
> >> worse,
> >>> on bigger clusters (say 4 instances) we see say 2 nodes gets assigned
> >>> downstream aggregate repartition topics and 2 other nodes assigned
> >> NOTHING
> >>> to handle.
> >>>
> >>> Ara.
> >>>
> >>>
> >>>
> >>> ________________________________
> >>>
> >>> This message is for the designated recipient only and may contain
> >>> privileged, proprietary, or otherwise confidential information. If you
> >> have
> >>> received it in error, please notify the sender immediately and delete
> the
> >>> original. Any other use of the e-mail by you is prohibited. Thank you
> in
> >>> advance for your cooperation.
> >>>
> >>> ________________________________
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> ________________________________
> >>>
> >>> This message is for the designated recipient only and may contain
> >>> privileged, proprietary, or otherwise confidential information. If you
> >> have
> >>> received it in error, please notify the sender immediately and delete
> the
> >>> original. Any other use of the e-mail by you is prohibited. Thank you
> in
> >>> advance for your cooperation.
> >>>
> >>> ________________________________
> >>>
> >>>
> >>>
> >>>
> >>> ________________________________
> >>>
> >>> This message is for the designated recipient only and may contain
> >>> privileged, proprietary, or otherwise confidential information. If you
> >> have
> >>> received it in error, please notify the sender immediately and delete
> the
> >>> original. Any other use of the e-mail by you is prohibited. Thank you
> in
> >>> advance for your cooperation.
> >>>
> >>> ________________________________
> >>>
> >>>
> >>>
> >>>
> >>> ________________________________
> >>>
> >>> This message is for the designated recipient only and may contain
> >>> privileged, proprietary, or otherwise confidential information. If you
> >> have
> >>> received it in error, please notify the sender immediately and delete
> the
> >>> original. Any other use of the e-mail by you is prohibited. Thank you
> in
> >>> advance for your cooperation.
> >>>
> >>> ________________________________
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> ________________________________
> >>>
> >>> This message is for the designated recipient only and may contain
> >>> privileged, proprietary, or otherwise confidential information. If you
> >> have
> >>> received it in error, please notify the sender immediately and delete
> the
> >>> original. Any other use of the e-mail by you is prohibited. Thank you
> in
> >>> advance for your cooperation.
> >>>
> >>> ________________________________
> >>>
> >>>
> >>>
> >>> ________________________________
> >>>
> >>> This message is for the designated recipient only and may contain
> >> privileged, proprietary, or otherwise confidential information. If you
> have
> >> received it in error, please notify the sender immediately and delete
> the
> >> original. Any other use of the e-mail by you is prohibited. Thank you in
> >> advance for your cooperation.
> >>>
> >>> ________________________________
> >>
> >>
> >>
> >>
> >> ________________________________
> >>
> >> This message is for the designated recipient only and may contain
> >> privileged, proprietary, or otherwise confidential information. If you
> have
> >> received it in error, please notify the sender immediately and delete
> the
> >> original. Any other use of the e-mail by you is prohibited. Thank you in
> >> advance for your cooperation.
> >>
> >> ________________________________
> >>
> >
> >
> >
> > ________________________________
> >
> > This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
> >
> > ________________________________
>
>
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> ________________________________
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message