kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eno Thereska <eno.there...@gmail.com>
Subject Re: session window bug not fixed in 0.10.2.1?
Date Tue, 02 May 2017 19:55:59 GMT
Hi Ara,

The PR https://github.com/apache/kafka/pull/2645 has gone to both trunk and
0.10.2.1, I just checked. What error are you seeing, could you give us an
update?

Thanks
Eno

On Fri, Apr 28, 2017 at 7:10 PM, Ara Ebrahimi <ara.ebrahimi@argyledata.com>
wrote:

> Hi,
>
> I upgraded to 0.10.2.1 yesterday, enabled caching for session windows and
> tested again. It doesn’t seem to be fixed?
>
> Ara.
>
> > On Mar 27, 2017, at 2:10 PM, Damian Guy <damian.guy@gmail.com> wrote:
> >
> > Hi Ara,
> >
> > There is a performance issue in the 0.10.2 release of session windows. It
> > is fixed with this PR: https://github.com/apache/kafka/pull/2645
> > You can work around this on 0.10.2 by calling the aggregate(..),
> reduce(..)
> > etc methods and supplying StateStoreSupplier<SessionStore> with caching
> > disabled, i.e, by doing something like:
> >
> > final StateStoreSupplier<SessionStore> sessionStore =
> > Stores.create(*"session-store-name"*)
> >    .withKeys(Serdes.String())
> >    .withValues(Serdes.String())
> >    .persistent()
> >    .sessionWindowed(TimeUnit.MINUTES.toMillis(7))
> >    .build();
> >
> >
> > The fix has also been cherry-picked to the 0.10.2 branch, so you could
> > build from source and not have to create the StateStoreSupplier.
> >
> > Thanks,
> > Damian
> >
> > On Mon, 27 Mar 2017 at 21:56 Ara Ebrahimi <ara.ebrahimi@argyledata.com>
> > wrote:
> >
> > Thanks for the response Mathias!
> >
> > The reason we want this exact task assignment to happen is that a
> critical
> > part of our pipeline involves grouping relevant records together (that’s
> > what the aggregate function in the topology is for). And for hot keys
> this
> > can lead to sometimes 100s of records to get grouped together. Even
> worse,
> > these records are session bound, we use session windows. Hence we see
> lots
> > of activity around the store backing the aggregate function and even
> though
> > we use SSD drives we’re not seeing the kind of performance we want to
> see.
> > It seems like the aggregate function leads to lots of updates to these
> hot
> > keys which lead to lots of rocksdb activity.
> >
> > Now there are many ways to fix this problem:
> > - just don’t aggregate, create an algorithm which is not reliant on
> > grouping/aggregating records. Not what we can do with our tight schedule
> > right now.
> > - do grouping/aggregating but employ n instances and rely on uniform
> > distribution of these tasks. This is the easiest solution and what we
> > expected to work but didn’t work as you can tell from this thread. We
> threw
> > 4 instances at it but only 2 got used.
> > - tune rocksdb? I tried this actually but it didn’t really help us much,
> > aside from the fact that tuning rocksdb is very tricky.
> > - use in-memory store instead? Unfortunately we have to use session
> windows
> > for this aggregate function and apparently there’s no in-memory session
> > store impl? I tried to create one but soon realized it’s too much work
> :) I
> > looked at default PartitionAssigner code too, but that ain’t trivial
> either.
> >
> > So I’m a bit hopeless :(
> >
> > Ara.
> >
> > On Mar 27, 2017, at 1:35 PM, Matthias J. Sax <matthias@confluent.io
> <mailto:
> > matthias@confluent.io>> wrote:
> >
> >
> >
> >
> > ________________________________
> >
> > This message is for the designated recipient only and may contain
> > privileged, proprietary, or otherwise confidential information. If you
> have
> > received it in error, please notify the sender immediately and delete the
> > original. Any other use of the e-mail by you is prohibited. Thank you in
> > advance for your cooperation.
> >
> > ________________________________
> >
> > From: "Matthias J. Sax" <matthias@confluent.io<mailto:
> matthias@confluent.io
> >>>
> > Subject: Re: more uniform task assignment across kafka stream nodes
> > Date: March 27, 2017 at 1:35:30 PM PDT
> > To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> > Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>
> >
> >
> > Ara,
> >
> > thanks for the detailed information.
> >
> > If I parse this correctly, both instances run the same number of tasks
> > (12 each). That is all Streams promises.
> >
> > To come back to your initial question:
> >
> > Is there a way to tell kafka streams to uniformly assign partitions
> across
> > instances? If I have n kafka streams instances running, I want each to
> > handle EXACTLY 1/nth number of partitions. No dynamic task assignment
> > logic. Just dumb 1/n assignment.
> >
> > That is exactly what you get: each of you two instances get 24/2 = 12
> > tasks assigned. That is dump 1/n assignment, isn't it? So my original
> > response was correct.
> >
> > However, I now understand better what you are actually meaning by your
> > question. Note that Streams does not distinguish "type" of tasks -- it
> > only sees 24 tasks and assigns those in a balanced way.
> >
> > Thus, currently there is no easy way to get the assignment you want to
> > have, except, you implement you own `PartitionAssignor`.
> >
> > This is the current implementation for 0.10.2
> > https://github.com/apache/kafka/blob/0.10.2/streams/src/
> main/java/org/apache/kafka/streams/processor/internals/
> StreamPartitionAssignor.java
> >
> > You can, if you wish write your own assignor and set it via
> > StreamsConfig. However, be aware that this might be tricky to get right
> > and also might have runtime implications with regard to rebalancing and
> > state store recovery. We recently improve the current implementation to
> > avoid costly task movements:
> > https://issues.apache.org/jira/browse/KAFKA-4677
> >
> > Thus, I would not recommend to implement an own `PartitionAssignor`.
> >
> >
> > However, the root question is, why do you need this exact assignment you
> > are looking for in the first place? Why is it "bad" if "types" of tasks
> > are not distinguished? I would like to understand your requirement
> > better -- it might be worth to improve Streams here.
> >
> >
> > -Matthias
> >
> >
> > On 3/27/17 12:57 PM, Ara Ebrahimi wrote:
> > Hi,
> >
> > So, I simplified the topology by making sure we have only 1 source topic.
> > Now I have 1 source topic, 8 partitions, 2 instances. And here’s how the
> > topology looks like:
> >
> > instance 1:
> >
> > KafkaStreams processID: 48b58bc0-f600-4ec8-bc92-8cb3ea081aac
> > StreamsThread appId: mar-23-modular
> > StreamsThread clientId: mar-23-modular
> > StreamsThread threadId: StreamThread-1
> > Active tasks:
> > StreamsTask taskId: 0_3
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000000:
> > topics: [activities-avro-or]
> > children: [KSTREAM-FILTER-0000000001]
> > KSTREAM-FILTER-0000000001:
> > children: [KSTREAM-MAP-0000000002]
> > KSTREAM-MAP-0000000002:
> > children: [KSTREAM-BRANCH-0000000003]
> > KSTREAM-BRANCH-0000000003:
> > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> 0000000005]
> > KSTREAM-BRANCHCHILD-0000000004:
> > children: [KSTREAM-MAPVALUES-0000000006]
> > KSTREAM-MAPVALUES-0000000006:
> > children: [KSTREAM-FLATMAPVALUES-0000000007]
> > KSTREAM-FLATMAPVALUES-0000000007:
> > children: [KSTREAM-MAP-0000000008]
> > KSTREAM-MAP-0000000008:
> > children: [KSTREAM-FILTER-0000000011]
> > KSTREAM-FILTER-0000000011:
> > children: [KSTREAM-SINK-0000000010]
> > KSTREAM-SINK-0000000010:
> > topic: activities-by-phone-store-or-repartition
> > KSTREAM-BRANCHCHILD-0000000005:
> > Partitions [activities-avro-or-3]
> > Standby tasks:
> >
> > StreamsThread appId: mar-23-modular
> > StreamsThread clientId: mar-23-modular
> > StreamsThread threadId: StreamThread-2
> > Active tasks:
> > StreamsTask taskId: 1_2
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000012:
> > topics: [activities-by-phone-store-or-repartition]
> > children: [KSTREAM-AGGREGATE-0000000009]
> > KSTREAM-AGGREGATE-0000000009:
> > states: [activities-by-phone-store-or]
> > children: [KTABLE-TOSTREAM-0000000013]
> > KTABLE-TOSTREAM-0000000013:
> > children: [KSTREAM-FILTER-0000000014]
> > KSTREAM-FILTER-0000000014:
> > children: [KSTREAM-FILTER-0000000015]
> > KSTREAM-FILTER-0000000015:
> > children: [KSTREAM-MAP-0000000016]
> > KSTREAM-MAP-0000000016:
> > children: [KSTREAM-MAP-0000000017]
> > KSTREAM-MAP-0000000017:
> > children: [KSTREAM-SINK-0000000018]
> > KSTREAM-SINK-0000000018:
> > topic: ml-features-avro-or
> > Partitions [activities-by-phone-store-or-repartition-2]
> > Standby tasks:
> >
> > StreamsThread appId: mar-23-modular
> > StreamsThread clientId: mar-23-modular
> > StreamsThread threadId: StreamThread-3
> > Active tasks:
> > StreamsTask taskId: 1_1
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000012:
> > topics: [activities-by-phone-store-or-repartition]
> > children: [KSTREAM-AGGREGATE-0000000009]
> > KSTREAM-AGGREGATE-0000000009:
> > states: [activities-by-phone-store-or]
> > children: [KTABLE-TOSTREAM-0000000013]
> > KTABLE-TOSTREAM-0000000013:
> > children: [KSTREAM-FILTER-0000000014]
> > KSTREAM-FILTER-0000000014:
> > children: [KSTREAM-FILTER-0000000015]
> > KSTREAM-FILTER-0000000015:
> > children: [KSTREAM-MAP-0000000016]
> > KSTREAM-MAP-0000000016:
> > children: [KSTREAM-MAP-0000000017]
> > KSTREAM-MAP-0000000017:
> > children: [KSTREAM-SINK-0000000018]
> > KSTREAM-SINK-0000000018:
> > topic: ml-features-avro-or
> > Partitions [activities-by-phone-store-or-repartition-1]
> > StreamsTask taskId: 2_7
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000019:
> > topics: [ml-features-avro-or]
> > Partitions [ml-features-avro-or-7]
> > Standby tasks:
> >
> > StreamsThread appId: mar-23-modular
> > StreamsThread clientId: mar-23-modular
> > StreamsThread threadId: StreamThread-4
> > Active tasks:
> > StreamsTask taskId: 2_0
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000019:
> > topics: [ml-features-avro-or]
> > Partitions [ml-features-avro-or-0]
> > StreamsTask taskId: 2_6
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000019:
> > topics: [ml-features-avro-or]
> > Partitions [ml-features-avro-or-6]
> > Standby tasks:
> >
> > StreamsThread appId: mar-23-modular
> > StreamsThread clientId: mar-23-modular
> > StreamsThread threadId: StreamThread-5
> > Active tasks:
> > StreamsTask taskId: 0_0
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000000:
> > topics: [activities-avro-or]
> > children: [KSTREAM-FILTER-0000000001]
> > KSTREAM-FILTER-0000000001:
> > children: [KSTREAM-MAP-0000000002]
> > KSTREAM-MAP-0000000002:
> > children: [KSTREAM-BRANCH-0000000003]
> > KSTREAM-BRANCH-0000000003:
> > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> 0000000005]
> > KSTREAM-BRANCHCHILD-0000000004:
> > children: [KSTREAM-MAPVALUES-0000000006]
> > KSTREAM-MAPVALUES-0000000006:
> > children: [KSTREAM-FLATMAPVALUES-0000000007]
> > KSTREAM-FLATMAPVALUES-0000000007:
> > children: [KSTREAM-MAP-0000000008]
> > KSTREAM-MAP-0000000008:
> > children: [KSTREAM-FILTER-0000000011]
> > KSTREAM-FILTER-0000000011:
> > children: [KSTREAM-SINK-0000000010]
> > KSTREAM-SINK-0000000010:
> > topic: activities-by-phone-store-or-repartition
> > KSTREAM-BRANCHCHILD-0000000005:
> > Partitions [activities-avro-or-0]
> > StreamsTask taskId: 1_6
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000012:
> > topics: [activities-by-phone-store-or-repartition]
> > children: [KSTREAM-AGGREGATE-0000000009]
> > KSTREAM-AGGREGATE-0000000009:
> > states: [activities-by-phone-store-or]
> > children: [KTABLE-TOSTREAM-0000000013]
> > KTABLE-TOSTREAM-0000000013:
> > children: [KSTREAM-FILTER-0000000014]
> > KSTREAM-FILTER-0000000014:
> > children: [KSTREAM-FILTER-0000000015]
> > KSTREAM-FILTER-0000000015:
> > children: [KSTREAM-MAP-0000000016]
> > KSTREAM-MAP-0000000016:
> > children: [KSTREAM-MAP-0000000017]
> > KSTREAM-MAP-0000000017:
> > children: [KSTREAM-SINK-0000000018]
> > KSTREAM-SINK-0000000018:
> > topic: ml-features-avro-or
> > Partitions [activities-by-phone-store-or-repartition-6]
> > Standby tasks:
> >
> > StreamsThread appId: mar-23-modular
> > StreamsThread clientId: mar-23-modular
> > StreamsThread threadId: StreamThread-6
> > Active tasks:
> > StreamsTask taskId: 1_0
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000012:
> > topics: [activities-by-phone-store-or-repartition]
> > children: [KSTREAM-AGGREGATE-0000000009]
> > KSTREAM-AGGREGATE-0000000009:
> > states: [activities-by-phone-store-or]
> > children: [KTABLE-TOSTREAM-0000000013]
> > KTABLE-TOSTREAM-0000000013:
> > children: [KSTREAM-FILTER-0000000014]
> > KSTREAM-FILTER-0000000014:
> > children: [KSTREAM-FILTER-0000000015]
> > KSTREAM-FILTER-0000000015:
> > children: [KSTREAM-MAP-0000000016]
> > KSTREAM-MAP-0000000016:
> > children: [KSTREAM-MAP-0000000017]
> > KSTREAM-MAP-0000000017:
> > children: [KSTREAM-SINK-0000000018]
> > KSTREAM-SINK-0000000018:
> > topic: ml-features-avro-or
> > Partitions [activities-by-phone-store-or-repartition-0]
> > StreamsTask taskId: 0_7
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000000:
> > topics: [activities-avro-or]
> > children: [KSTREAM-FILTER-0000000001]
> > KSTREAM-FILTER-0000000001:
> > children: [KSTREAM-MAP-0000000002]
> > KSTREAM-MAP-0000000002:
> > children: [KSTREAM-BRANCH-0000000003]
> > KSTREAM-BRANCH-0000000003:
> > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> 0000000005]
> > KSTREAM-BRANCHCHILD-0000000004:
> > children: [KSTREAM-MAPVALUES-0000000006]
> > KSTREAM-MAPVALUES-0000000006:
> > children: [KSTREAM-FLATMAPVALUES-0000000007]
> > KSTREAM-FLATMAPVALUES-0000000007:
> > children: [KSTREAM-MAP-0000000008]
> > KSTREAM-MAP-0000000008:
> > children: [KSTREAM-FILTER-0000000011]
> > KSTREAM-FILTER-0000000011:
> > children: [KSTREAM-SINK-0000000010]
> > KSTREAM-SINK-0000000010:
> > topic: activities-by-phone-store-or-repartition
> > KSTREAM-BRANCHCHILD-0000000005:
> > Partitions [activities-avro-or-7]
> > Standby tasks:
> >
> > StreamsThread appId: mar-23-modular
> > StreamsThread clientId: mar-23-modular
> > StreamsThread threadId: StreamThread-7
> > Active tasks:
> > StreamsTask taskId: 2_4
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000019:
> > topics: [ml-features-avro-or]
> > Partitions [ml-features-avro-or-4]
> > Standby tasks:
> >
> > StreamsThread appId: mar-23-modular
> > StreamsThread clientId: mar-23-modular
> > StreamsThread threadId: StreamThread-8
> > Active tasks:
> > StreamsTask taskId: 1_3
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000012:
> > topics: [activities-by-phone-store-or-repartition]
> > children: [KSTREAM-AGGREGATE-0000000009]
> > KSTREAM-AGGREGATE-0000000009:
> > states: [activities-by-phone-store-or]
> > children: [KTABLE-TOSTREAM-0000000013]
> > KTABLE-TOSTREAM-0000000013:
> > children: [KSTREAM-FILTER-0000000014]
> > KSTREAM-FILTER-0000000014:
> > children: [KSTREAM-FILTER-0000000015]
> > KSTREAM-FILTER-0000000015:
> > children: [KSTREAM-MAP-0000000016]
> > KSTREAM-MAP-0000000016:
> > children: [KSTREAM-MAP-0000000017]
> > KSTREAM-MAP-0000000017:
> > children: [KSTREAM-SINK-0000000018]
> > KSTREAM-SINK-0000000018:
> > topic: ml-features-avro-or
> > Partitions [activities-by-phone-store-or-repartition-3]
> > Standby tasks:
> >
> >
> > instance 2:
> >
> > KafkaStreams processID: 092072f8-87be-4989-a94f-0ed544f5ca44
> > StreamsThread appId: mar-23-modular
> > StreamsThread clientId: mar-23-modular
> > StreamsThread threadId: StreamThread-1
> > Active tasks:
> > StreamsTask taskId: 2_1
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000019:
> > topics: [ml-features-avro-or]
> > Partitions [ml-features-avro-or-1]
> > StreamsTask taskId: 2_5
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000019:
> > topics: [ml-features-avro-or]
> > Partitions [ml-features-avro-or-5]
> > Standby tasks:
> >
> > StreamsThread appId: mar-23-modular
> > StreamsThread clientId: mar-23-modular
> > StreamsThread threadId: StreamThread-2
> > Active tasks:
> > StreamsTask taskId: 0_4
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000000:
> > topics: [activities-avro-or]
> > children: [KSTREAM-FILTER-0000000001]
> > KSTREAM-FILTER-0000000001:
> > children: [KSTREAM-MAP-0000000002]
> > KSTREAM-MAP-0000000002:
> > children: [KSTREAM-BRANCH-0000000003]
> > KSTREAM-BRANCH-0000000003:
> > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> 0000000005]
> > KSTREAM-BRANCHCHILD-0000000004:
> > children: [KSTREAM-MAPVALUES-0000000006]
> > KSTREAM-MAPVALUES-0000000006:
> > children: [KSTREAM-FLATMAPVALUES-0000000007]
> > KSTREAM-FLATMAPVALUES-0000000007:
> > children: [KSTREAM-MAP-0000000008]
> > KSTREAM-MAP-0000000008:
> > children: [KSTREAM-FILTER-0000000011]
> > KSTREAM-FILTER-0000000011:
> > children: [KSTREAM-SINK-0000000010]
> > KSTREAM-SINK-0000000010:
> > topic: activities-by-phone-store-or-repartition
> > KSTREAM-BRANCHCHILD-0000000005:
> > Partitions [activities-avro-or-4]
> > Standby tasks:
> >
> > StreamsThread appId: mar-23-modular
> > StreamsThread clientId: mar-23-modular
> > StreamsThread threadId: StreamThread-3
> > Active tasks:
> > StreamsTask taskId: 2_2
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000019:
> > topics: [ml-features-avro-or]
> > Partitions [ml-features-avro-or-2]
> > StreamsTask taskId: 1_7
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000012:
> > topics: [activities-by-phone-store-or-repartition]
> > children: [KSTREAM-AGGREGATE-0000000009]
> > KSTREAM-AGGREGATE-0000000009:
> > states: [activities-by-phone-store-or]
> > children: [KTABLE-TOSTREAM-0000000013]
> > KTABLE-TOSTREAM-0000000013:
> > children: [KSTREAM-FILTER-0000000014]
> > KSTREAM-FILTER-0000000014:
> > children: [KSTREAM-FILTER-0000000015]
> > KSTREAM-FILTER-0000000015:
> > children: [KSTREAM-MAP-0000000016]
> > KSTREAM-MAP-0000000016:
> > children: [KSTREAM-MAP-0000000017]
> > KSTREAM-MAP-0000000017:
> > children: [KSTREAM-SINK-0000000018]
> > KSTREAM-SINK-0000000018:
> > topic: ml-features-avro-or
> > Partitions [activities-by-phone-store-or-repartition-7]
> > Standby tasks:
> >
> > StreamsThread appId: mar-23-modular
> > StreamsThread clientId: mar-23-modular
> > StreamsThread threadId: StreamThread-4
> > Active tasks:
> > StreamsTask taskId: 2_3
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000019:
> > topics: [ml-features-avro-or]
> > Partitions [ml-features-avro-or-3]
> > Standby tasks:
> >
> > StreamsThread appId: mar-23-modular
> > StreamsThread clientId: mar-23-modular
> > StreamsThread threadId: StreamThread-5
> > Active tasks:
> > StreamsTask taskId: 0_1
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000000:
> > topics: [activities-avro-or]
> > children: [KSTREAM-FILTER-0000000001]
> > KSTREAM-FILTER-0000000001:
> > children: [KSTREAM-MAP-0000000002]
> > KSTREAM-MAP-0000000002:
> > children: [KSTREAM-BRANCH-0000000003]
> > KSTREAM-BRANCH-0000000003:
> > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> 0000000005]
> > KSTREAM-BRANCHCHILD-0000000004:
> > children: [KSTREAM-MAPVALUES-0000000006]
> > KSTREAM-MAPVALUES-0000000006:
> > children: [KSTREAM-FLATMAPVALUES-0000000007]
> > KSTREAM-FLATMAPVALUES-0000000007:
> > children: [KSTREAM-MAP-0000000008]
> > KSTREAM-MAP-0000000008:
> > children: [KSTREAM-FILTER-0000000011]
> > KSTREAM-FILTER-0000000011:
> > children: [KSTREAM-SINK-0000000010]
> > KSTREAM-SINK-0000000010:
> > topic: activities-by-phone-store-or-repartition
> > KSTREAM-BRANCHCHILD-0000000005:
> > Partitions [activities-avro-or-1]
> > StreamsTask taskId: 1_5
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000012:
> > topics: [activities-by-phone-store-or-repartition]
> > children: [KSTREAM-AGGREGATE-0000000009]
> > KSTREAM-AGGREGATE-0000000009:
> > states: [activities-by-phone-store-or]
> > children: [KTABLE-TOSTREAM-0000000013]
> > KTABLE-TOSTREAM-0000000013:
> > children: [KSTREAM-FILTER-0000000014]
> > KSTREAM-FILTER-0000000014:
> > children: [KSTREAM-FILTER-0000000015]
> > KSTREAM-FILTER-0000000015:
> > children: [KSTREAM-MAP-0000000016]
> > KSTREAM-MAP-0000000016:
> > children: [KSTREAM-MAP-0000000017]
> > KSTREAM-MAP-0000000017:
> > children: [KSTREAM-SINK-0000000018]
> > KSTREAM-SINK-0000000018:
> > topic: ml-features-avro-or
> > Partitions [activities-by-phone-store-or-repartition-5]
> > Standby tasks:
> >
> > StreamsThread appId: mar-23-modular
> > StreamsThread clientId: mar-23-modular
> > StreamsThread threadId: StreamThread-6
> > Active tasks:
> > StreamsTask taskId: 1_4
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000012:
> > topics: [activities-by-phone-store-or-repartition]
> > children: [KSTREAM-AGGREGATE-0000000009]
> > KSTREAM-AGGREGATE-0000000009:
> > states: [activities-by-phone-store-or]
> > children: [KTABLE-TOSTREAM-0000000013]
> > KTABLE-TOSTREAM-0000000013:
> > children: [KSTREAM-FILTER-0000000014]
> > KSTREAM-FILTER-0000000014:
> > children: [KSTREAM-FILTER-0000000015]
> > KSTREAM-FILTER-0000000015:
> > children: [KSTREAM-MAP-0000000016]
> > KSTREAM-MAP-0000000016:
> > children: [KSTREAM-MAP-0000000017]
> > KSTREAM-MAP-0000000017:
> > children: [KSTREAM-SINK-0000000018]
> > KSTREAM-SINK-0000000018:
> > topic: ml-features-avro-or
> > Partitions [activities-by-phone-store-or-repartition-4]
> > Standby tasks:
> >
> > StreamsThread appId: mar-23-modular
> > StreamsThread clientId: mar-23-modular
> > StreamsThread threadId: StreamThread-7
> > Active tasks:
> > StreamsTask taskId: 0_2
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000000:
> > topics: [activities-avro-or]
> > children: [KSTREAM-FILTER-0000000001]
> > KSTREAM-FILTER-0000000001:
> > children: [KSTREAM-MAP-0000000002]
> > KSTREAM-MAP-0000000002:
> > children: [KSTREAM-BRANCH-0000000003]
> > KSTREAM-BRANCH-0000000003:
> > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> 0000000005]
> > KSTREAM-BRANCHCHILD-0000000004:
> > children: [KSTREAM-MAPVALUES-0000000006]
> > KSTREAM-MAPVALUES-0000000006:
> > children: [KSTREAM-FLATMAPVALUES-0000000007]
> > KSTREAM-FLATMAPVALUES-0000000007:
> > children: [KSTREAM-MAP-0000000008]
> > KSTREAM-MAP-0000000008:
> > children: [KSTREAM-FILTER-0000000011]
> > KSTREAM-FILTER-0000000011:
> > children: [KSTREAM-SINK-0000000010]
> > KSTREAM-SINK-0000000010:
> > topic: activities-by-phone-store-or-repartition
> > KSTREAM-BRANCHCHILD-0000000005:
> > Partitions [activities-avro-or-2]
> > StreamsTask taskId: 0_6
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000000:
> > topics: [activities-avro-or]
> > children: [KSTREAM-FILTER-0000000001]
> > KSTREAM-FILTER-0000000001:
> > children: [KSTREAM-MAP-0000000002]
> > KSTREAM-MAP-0000000002:
> > children: [KSTREAM-BRANCH-0000000003]
> > KSTREAM-BRANCH-0000000003:
> > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> 0000000005]
> > KSTREAM-BRANCHCHILD-0000000004:
> > children: [KSTREAM-MAPVALUES-0000000006]
> > KSTREAM-MAPVALUES-0000000006:
> > children: [KSTREAM-FLATMAPVALUES-0000000007]
> > KSTREAM-FLATMAPVALUES-0000000007:
> > children: [KSTREAM-MAP-0000000008]
> > KSTREAM-MAP-0000000008:
> > children: [KSTREAM-FILTER-0000000011]
> > KSTREAM-FILTER-0000000011:
> > children: [KSTREAM-SINK-0000000010]
> > KSTREAM-SINK-0000000010:
> > topic: activities-by-phone-store-or-repartition
> > KSTREAM-BRANCHCHILD-0000000005:
> > Partitions [activities-avro-or-6]
> > Standby tasks:
> >
> > StreamsThread appId: mar-23-modular
> > StreamsThread clientId: mar-23-modular
> > StreamsThread threadId: StreamThread-8
> > Active tasks:
> > StreamsTask taskId: 0_5
> > ProcessorTopology:
> > KSTREAM-SOURCE-0000000000:
> > topics: [activities-avro-or]
> > children: [KSTREAM-FILTER-0000000001]
> > KSTREAM-FILTER-0000000001:
> > children: [KSTREAM-MAP-0000000002]
> > KSTREAM-MAP-0000000002:
> > children: [KSTREAM-BRANCH-0000000003]
> > KSTREAM-BRANCH-0000000003:
> > children: [KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-
> 0000000005]
> > KSTREAM-BRANCHCHILD-0000000004:
> > children: [KSTREAM-MAPVALUES-0000000006]
> > KSTREAM-MAPVALUES-0000000006:
> > children: [KSTREAM-FLATMAPVALUES-0000000007]
> > KSTREAM-FLATMAPVALUES-0000000007:
> > children: [KSTREAM-MAP-0000000008]
> > KSTREAM-MAP-0000000008:
> > children: [KSTREAM-FILTER-0000000011]
> > KSTREAM-FILTER-0000000011:
> > children: [KSTREAM-SINK-0000000010]
> > KSTREAM-SINK-0000000010:
> > topic: activities-by-phone-store-or-repartition
> > KSTREAM-BRANCHCHILD-0000000005:
> > Partitions [activities-avro-or-5]
> > Standby tasks:
> >
> >
> > activities-avro-or is input topic. ml-features-avro-or is output topic.
> In
> > the middle we have an aggregate (activities-by-phone-store-or-
> repartition).
> >
> > On instance 1 I see 3 tasks for activities-avro-or and on instance 2 I
> see
> > 5. Bad.
> >
> > On instance 1 see 4 tasks for ml-features-avro-or. And 4 on instance 2.
> > Good.
> >
> > On instance 1 see 5 tasks for activities-by-phone-store-or-repartition.
> And
> > 3 on instance 2. Bad.
> >
> > As I said in terms of offsets for all these partitions I see uniform
> > distribution, so we’re not dealing with a bad key scenario.
> >
> > Ara.
> >
> > On Mar 25, 2017, at 6:43 PM, Matthias J. Sax <matthias@confluent.io
> <mailto:
> > matthias@confluent.io>> wrote:
> >
> >
> >
> >
> > ________________________________
> >
> > This message is for the designated recipient only and may contain
> > privileged, proprietary, or otherwise confidential information. If you
> have
> > received it in error, please notify the sender immediately and delete the
> > original. Any other use of the e-mail by you is prohibited. Thank you in
> > advance for your cooperation.
> >
> > ________________________________
> >
> > From: "Matthias J. Sax" <matthias@confluent.io<mailto:
> matthias@confluent.io
> >>>
> > Subject: Re: more uniform task assignment across kafka stream nodes
> > Date: March 25, 2017 at 6:43:12 PM PDT
> > To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> > Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org>>
> >
> >
> > Please share the rest of your topology code (without any UDFs / business
> > logic). Otherwise, I cannot give further advice.
> >
> > -Matthias
> >
> >
> > On 3/25/17 6:08 PM, Ara Ebrahimi wrote:
> > Via:
> >
> > builder.stream("topic1");
> > builder.stream("topic2");
> > builder.stream("topic3”);
> >
> > These are different kinds of topics consuming different avro objects.
> >
> > Ara.
> >
> > On Mar 25, 2017, at 6:04 PM, Matthias J. Sax <matthias@confluent.io
> <mailto:
> > matthias@confluent.io><mailto:matthias@confluent.io>> wrote:
> >
> >
> >
> >
> > ________________________________
> >
> > This message is for the designated recipient only and may contain
> > privileged, proprietary, or otherwise confidential information. If you
> have
> > received it in error, please notify the sender immediately and delete the
> > original. Any other use of the e-mail by you is prohibited. Thank you in
> > advance for your cooperation.
> >
> > ________________________________
> >
> > From: "Matthias J. Sax" <matthias@confluent.io<mailto:
> matthias@confluent.io
> >> <mailto:matthias@confluent.io>>
> > Subject: Re: more uniform task assignment across kafka stream nodes
> > Date: March 25, 2017 at 6:04:30 PM PDT
> > To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
> > users@kafka.apache.org>
> > Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
> > users@kafka.apache.org>>
> >
> >
> > Ara,
> >
> > How do you consume your topics? Via
> >
> > builder.stream("topic1", "topic2", "topic3);
> >
> > or via
> >
> > builder.stream("topic1");
> > builder.stream("topic2");
> > builder.stream("topic3");
> >
> > Both and handled differently with regard to creating tasks (partition to
> > task assignment also depends on you downstream code though).
> >
> > If this does not help, can you maybe share the structure of processing?
> > To dig deeper, we would need to know the topology DAG.
> >
> >
> > -Matthias
> >
> >
> > On 3/25/17 5:56 PM, Ara Ebrahimi wrote:
> > Mathias,
> >
> > This apparently happens because we have more than 1 source topic. We
> have 3
> > source topics in the same application. So it seems like the task
> assignment
> > algorithm creates topologies not for one specific topic at a time but the
> > total partitions across all source topics consumed in an application
> > instance. Because we have some code dependencies between these 3 source
> > topics we can’t separate them into 3 applications at this time. Hence the
> > reason I want to get the task assignment algorithm basically do a uniform
> > and simple task assignment PER source topic.
> >
> > Ara.
> >
> > On Mar 25, 2017, at 5:21 PM, Matthias J. Sax <matthias@confluent.io
> <mailto:
> > matthias@confluent.io><mailto:matthias@confluent.io><mailto:
> > matthias@confluent.io>> wrote:
> >
> >
> >
> >
> > ________________________________
> >
> > This message is for the designated recipient only and may contain
> > privileged, proprietary, or otherwise confidential information. If you
> have
> > received it in error, please notify the sender immediately and delete the
> > original. Any other use of the e-mail by you is prohibited. Thank you in
> > advance for your cooperation.
> >
> > ________________________________
> >
> > From: "Matthias J. Sax" <matthias@confluent.io<mailto:
> matthias@confluent.io
> >> <mailto:matthias@confluent.io><mailto:matthias@confluent.io>>
> > Subject: Re: more uniform task assignment across kafka stream nodes
> > Date: March 25, 2017 at 5:21:47 PM PDT
> > To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
> > users@kafka.apache.org><mailto:users@kafka.apache.org>
> > Reply-To: <users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
> > users@kafka.apache.org><mailto:users@kafka.apache.org>>
> >
> >
> > Hi,
> >
> > I am wondering why this happens in the first place. Streams,
> > load-balanced over all running instances, and each instance should be
> > the same number of tasks (and thus partitions) assigned.
> >
> > What is the overall assignment? Do you have StandyBy tasks configured?
> > What version do you use?
> >
> >
> > -Matthias
> >
> >
> > On 3/24/17 8:09 PM, Ara Ebrahimi wrote:
> > Hi,
> >
> > Is there a way to tell kafka streams to uniformly assign partitions
> across
> > instances? If I have n kafka streams instances running, I want each to
> > handle EXACTLY 1/nth number of partitions. No dynamic task assignment
> > logic. Just dumb 1/n assignment.
> >
> > Here’s our scenario. Lets say we have an “source" topic with 8
> partitions.
> > We also have 2 kafka streams instances. Each instances get assigned to
> > handle 4 “source" topic partitions. BUT then we do a few maps and an
> > aggregate. So data gets shuffled around. The map function uniformly
> > distributes these across all partitions (I can verify that by looking at
> > the partition offsets). After the map what I notice by looking at the
> > topology is that one kafka streams instance get assigned to handle say 2
> > aggregate repartition topics and the other one gets assigned 6. Even
> worse,
> > on bigger clusters (say 4 instances) we see say 2 nodes gets assigned
> > downstream aggregate repartition topics and 2 other nodes assigned
> NOTHING
> > to handle.
> >
> > Ara.
> >
> >
> >
> > ________________________________
> >
> > This message is for the designated recipient only and may contain
> > privileged, proprietary, or otherwise confidential information. If you
> have
> > received it in error, please notify the sender immediately and delete the
> > original. Any other use of the e-mail by you is prohibited. Thank you in
> > advance for your cooperation.
> >
> > ________________________________
> >
> >
> >
> >
> >
> >
> >
> >
> > ________________________________
> >
> > This message is for the designated recipient only and may contain
> > privileged, proprietary, or otherwise confidential information. If you
> have
> > received it in error, please notify the sender immediately and delete the
> > original. Any other use of the e-mail by you is prohibited. Thank you in
> > advance for your cooperation.
> >
> > ________________________________
> >
> >
> >
> >
> > ________________________________
> >
> > This message is for the designated recipient only and may contain
> > privileged, proprietary, or otherwise confidential information. If you
> have
> > received it in error, please notify the sender immediately and delete the
> > original. Any other use of the e-mail by you is prohibited. Thank you in
> > advance for your cooperation.
> >
> > ________________________________
> >
> >
> >
> >
> > ________________________________
> >
> > This message is for the designated recipient only and may contain
> > privileged, proprietary, or otherwise confidential information. If you
> have
> > received it in error, please notify the sender immediately and delete the
> > original. Any other use of the e-mail by you is prohibited. Thank you in
> > advance for your cooperation.
> >
> > ________________________________
> >
> >
> >
> >
> >
> >
> >
> >
> > ________________________________
> >
> > This message is for the designated recipient only and may contain
> > privileged, proprietary, or otherwise confidential information. If you
> have
> > received it in error, please notify the sender immediately and delete the
> > original. Any other use of the e-mail by you is prohibited. Thank you in
> > advance for your cooperation.
> >
> > ________________________________
> >
> >
> >
> > ________________________________
> >
> > This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
> >
> > ________________________________
>
>
>
>
> ________________________________
>
> This message is for the designated recipient only and may contain
> privileged, proprietary, or otherwise confidential information. If you have
> received it in error, please notify the sender immediately and delete the
> original. Any other use of the e-mail by you is prohibited. Thank you in
> advance for your cooperation.
>
> ________________________________
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message