kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jan Filipiak <Jan.Filip...@trivago.com>
Subject Re: [DISCUSS] Streams DSL/StateStore Refactoring
Date Tue, 04 Jul 2017 07:53:57 GMT
Hi Damian,

I do see your point of something needs to change. But I fully agree with 
Gouzhang when he says.
---

But since this is a incompatibility change, and we are going to remove the
compatibility annotations soon it means we only have one chance and we
really have to make it right.
----

I fear all suggestions do not go far enough to become something that will carry on for very
much longer.
I am currently working on KAFKA-3705 and try to find the most easy way for the user to give
me all the required functionality. The easiest interface I could come up so far can be looked
at here.

https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L622

And its already horribly complicated. I am currently unable to find the right abstraction
level to have everything falling into place naturally. To be honest I already think introducing

https://github.com/Kaiserchen/kafka/blob/3da2b8f787a5d30dee2de71cf0f125ab3e57d89b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L493

was unideal and makes everything a mess. The JoinType:Whatever is also not really flexible.
2 things come to my mind:

1. I don't think we should rule out config based decisions say configs like
	streams.$applicationID.joins.$joinname.conf = value
This can allow for tremendous changes without single API change and IMO it was not considered
enough yet.

2. Push logic from the DSL to the Callback classes. A ValueJoiner for example can be used
to implement different join types as the user wishes.

As Gouzhang said: stopping to break users is very important. especially with this changes
+ All the plans I sadly only have in my head but hopefully the first link can give a glimpse.

Thanks for preparing the examples made it way clearer to me what exactly we are talking about.
I would argue to go a bit slower and more carefull on this one. At some point we need to get
it right. Peeking over to the hadoop guys with their hughe userbase. Config files really work
well for them.

Best Jan





On 30.06.2017 09:31, Damian Guy wrote:
> Thanks Matthias
>
> On Fri, 30 Jun 2017 at 08:05 Matthias J. Sax <matthias@confluent.io> wrote:
>
>> I am just catching up on this thread, so sorry for the long email in
>> advance... Also, it's to some extend a dump of thoughts and not always a
>> clear proposal. Still need to think about this in more detail. But maybe
>> it helps other to get new ideas :)
>>
>>
>>>> However, I don't understand your argument about putting aggregate()
>>>> after the withXX() -- all the calls to withXX() set optional parameters
>>>> for aggregate() and not for groupBy() -- but a groupBy().withXX()
>>>> indicates that the withXX() belongs to the groupBy(). IMHO, this might
>>>> be quite confusion for developers.
>>>>
>>>>
>>> I see what you are saying, but the grouped stream is effectively a no-op
>>> until you call one of the aggregate/count/reduce etc functions. So the
>>> optional params are ones that are applicable to any of the operations you
>>> can perform on this grouped stream. Then the final
>>> count()/reduce()/aggregate() call has any of the params that are
>>> required/specific to that function.
>>>
>> I understand your argument, but you don't share the conclusion. If we
>> need a "final/terminal" call, the better way might be
>>
>> .groupBy().count().withXX().build()
>>
>> (with a better name for build() though)
>>
>>
> The point is that all the other calls, i.e,withBlah, windowed, etc apply
> too all the aggregate functions. The terminal call being the actual type of
> aggregation you want to do. I personally find this more natural than
> groupBy().count().withBlah().build()
>
>
>>> groupedStream.count(/** non windowed count**/)
>>> groupedStream.windowed(TimeWindows.of(10L)).count(...)
>>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
>>
>> I like this. However, I don't see a reason to have windowed() and
>> sessionWindowed(). We should have one top-level `Windows` interface that
>> both `TimeWindows` and `SessionWindows` implement and just have a single
>> windowed() method that accepts all `Windows`. (I did not like the
>> separation of `SessionWindows` in the first place, and this seems to be
>> an opportunity to clean this up. It was hard to change when we
>> introduced session windows)
>>
> Yes - true we should look into that.
>
>
>> Btw: we do you the imperative groupBy() and groupByKey(), and thus we
>> might also want to use windowBy() (instead of windowed()). Not sure how
>> important this is, but it seems to be inconsistent otherwise.
>>
>>
> Makes sense
>
>
>> About joins:  I don't like .withJoinType(JoinType.LEFT) at all. I think,
>> defining an inner/left/outer join is not an optional argument but a
>> first class concept and should have a proper representation in the API
>> (like the current methods join(), leftJoin, outerJoin()).
>>
>>
> Yep, i did originally have it as a required param and maybe that is what we
> go with. It could have a default, but maybe that is confusing.
>
>
>
>> About the two join API proposals, the second one has too much boiler
>> plate code for my taste. Also, the actual join() operator has only one
>> argument what is weird to me, as in my thinking process, the main
>> operator call, should have one parameter per mandatory argument but your
>> proposal put the mandatory arguments into Joins.streamStreamJoin() call.
>> This is far from intuitive IMHO.
>>
>>
> This is the builder pattern, you only need one param as the builder has
> captured all of the required and optional arguments.
>
>
>> The first join proposal also seems to align better with the pattern
>> suggested for aggregations and having the same pattern for all operators
>> is important (as you stated already).
>>
>>
> This is why i offered two alternatives as i started out with. 1 is the
> builder pattern, the other is the more fluent pattern.
>
>
>>
>> Coming back to the config vs optional parameter. What about having a
>> method withConfig[s](...) that allow to put in the configuration?
>>
>>
> Sure, it is currently called withLogConfig() as that is the only thing that
> is really config.
>
>
>> This also raises the question if until() is a windows property?
>> Actually, until() seems to be a configuration parameter and thus, should
>> not not have it's own method.
>>
>>
> Hmmm, i don't agree. Until is a property of the window. It is going to be
> potentially different for every window operation you do in a streams app.
>
>
>>
>> Browsing throw your example DSL branch, I also saw this one:
>>
>>> final KTable<Windowed<String>, Long> windowed>
>>   groupedStream.counting()
>>>                   .windowed(TimeWindows.of(10L).until(10))
>>>                   .table();
>> This is an interesting idea, and it remind my on some feedback about "I
>> wanted to count a stream, but there was no count() method -- I first
>> needed to figure out, that I need to group the stream first to be able
>> to count it. It does make sense in hindsight but was not obvious in the
>> beginning". Thus, carrying out this thought, we could also do the
>> following:
>>
>> stream.count().groupedBy().windowedBy().table();
>>
>> -> Note, I use "grouped" and "windowed" instead of imperative here, as
>> it comes after the count()
>>
>> This would be more consistent than your proposal (that has grouping
>> before but windowing after count()). It might even allow us to enrich
>> the API with a some syntactic sugar like `stream.count().table()` to get
>> the overall count of all records (this would obviously not scale, but we
>> could support it -- if not now, maybe later).
>>
>>
> I guess i'd prefer
> stream.groupBy().windowBy().count()
> stream.groupBy().windowBy().reduce()
> stream.groupBy().count()
>
> As i said above, everything that happens before the final aggregate call
> can be applied to any of them. So it makes sense to me to do those things
> ahead of the final aggregate call.
>
>
>> Last about builder pattern. I am convinced that we need some "terminal"
>> operator/method that tells us when to add the processor to the topology.
>> But I don't see the need for a plain builder pattern that feels alien to
>> me (see my argument about the second join proposal). Using .stream() /
>> .table() as use in many examples might work. But maybe a more generic
>> name that we can use in all places like build() or apply() might also be
>> an option.
>>
>>
> Sure, a generic name might be ok.
>
>
>
>
>> -Matthias
>>
>>
>>
>> On 6/29/17 7:37 AM, Damian Guy wrote:
>>> Thanks Kyle.
>>>
>>> On Thu, 29 Jun 2017 at 15:11 Kyle Winkelman <winkelman.kyle@gmail.com>
>>> wrote:
>>>
>>>> Hi Damian,
>>>>
>>>>>>>> When trying to program in the fluent API that has been discussed
>> most
>>>> it
>>>>>>>> feels difficult to know when you will actually get an object
you can
>>>> reuse.
>>>>>>>> What if I make one KGroupedStream that I want to reuse, is
it legal
>> to
>>>>>>>> reuse it or does this approach expect you to call grouped
each time?
>>>>>> I'd anticipate that once you have a KGroupedStream you can re-use
it
>> as
>>>> you
>>>>>> can today.
>>>> You said it yourself in another post that the grouped stream is
>>>> effectively a no-op until a count, reduce, or aggregate. The way I see
>> it
>>>> you wouldn’t be able to reuse anything except KStreams and KTables,
>> because
>>>> most of this fluent api would continue returning this (this being the
>>>> builder object currently being manipulated).
>>> So, if you ever store a reference to anything but KStreams and KTables
>> and
>>>> you use it in two different ways then its possible you make conflicting
>>>> withXXX() calls on the same builder.
>>>>
>>>>
>>> No necessarily true. It could return a new instance of the builder, i.e.,
>>> the builders being immutable. So if you held a reference to the builder
>> it
>>> would always be the same as it was when it was created.
>>>
>>>
>>>> GroupedStream<K,V> groupedStreamWithDefaultSerdes = kStream.grouped();
>>>> GroupedStream<K,V> groupedStreamWithDeclaredSerdes =
>>>> groupedStreamsWithDefaultSerdes.withKeySerde(…).withValueSerde(…);
>>>>
>>>> I’ll admit that this shouldn’t happen but some user is going to do it
>>>> eventually…
>>>> Depending on implementation uses of groupedStreamWithDefaultSerdes would
>>>> most likely be equivalent to the version withDeclaredSerdes. One work
>>>> around would be to always make copies of the config objects you are
>>>> building, but this approach has its own problem because now we have to
>>>> identify which configs are equivalent so we don’t create repeated
>>>> processors.
>>>>
>>>> The point of this long winded example is that we always have to be
>>>> thinking about all of the possible ways it could be misused by a user
>>>> (causing them to see hard to diagnose problems).
>>>>
>>> Exactly! That is the point of the discussion really.
>>>
>>>
>>>> In my attempt at a couple methods with builders I feel that I could
>>>> confidently say the user couldn’t really mess it up.
>>>>> // Count
>>>>> KTable<String, Long> count =
>>>>> kGroupedStream.count(Count.count().withQueryableStoreName("my-store"));
>>>> The kGroupedStream is reusable and if they attempted to reuse the Count
>>>> for some reason it would throw an error message saying that a store
>> named
>>>> “my-store” already exists.
>>>>
>>>>
>>> Yes i agree and i think using builders is my preferred pattern.
>>>
>>> Cheers,
>>> Damian
>>>
>>>
>>>> Thanks,
>>>> Kyle
>>>>
>>>> From: Damian Guy
>>>> Sent: Thursday, June 29, 2017 3:59 AM
>>>> To: dev@kafka.apache.org
>>>> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring
>>>>
>>>> Hi Kyle,
>>>>
>>>> Thanks for your input. Really appreciated.
>>>>
>>>> On Thu, 29 Jun 2017 at 06:09 Kyle Winkelman <winkelman.kyle@gmail.com>
>>>> wrote:
>>>>
>>>>> I like more of a builder pattern even though others have voiced against
>>>>> it. The reason I like it is because it makes it clear to the user that
>> a
>>>>> call to KGroupedStream#count will return a KTable not some intermediate
>>>>> class that I need to undetstand.
>>>>>
>>>> Yes, that makes sense.
>>>>
>>>>
>>>>> When trying to program in the fluent API that has been discussed most
>> it
>>>>> feels difficult to know when you will actually get an object you can
>>>> reuse.
>>>>> What if I make one KGroupedStream that I want to reuse, is it legal to
>>>>> reuse it or does this approach expect you to call grouped each time?
>>>>
>>>> I'd anticipate that once you have a KGroupedStream you can re-use it as
>> you
>>>> can today.
>>>>
>>>>
>>>>> This question doesn’t pop into my head at all in the builder pattern
I
>>>>> assume I can reuse everything.
>>>>> Finally, I like .groupByKey and .groupBy(KeyValueMapper) not a big fan
>> of
>>>>> the grouped.
>>>>>
>>>>> Yes, grouped() was more for demonstration and because groupBy() and
>>>> groupByKey() were taken! So i'd imagine the api would actually want to
>> be
>>>> groupByKey(/** no required args***/).withOptionalArg() and
>>>> groupBy(KeyValueMapper m).withOpitionalArg(...)  of course this all
>> depends
>>>> on maintaining backward compatibility.
>>>>
>>>>
>>>>> Unfortunately, the below approach would require atleast 2 (probably 3)
>>>>> overloads (one for returning a KTable and one for returning a KTable
>> with
>>>>> Windowed Key, probably would want to split windowed and sessionwindowed
>>>> for
>>>>> ease of implementation) of each count, reduce, and aggregate.
>>>>> Obviously not exhaustive but enough for you to get the picture. Count,
>>>>> Reduce, and Aggregate supply 3 static methods to initialize the
>> builder:
>>>>> // Count
>>>>> KTable<String, Long> count =
>>>>> groupedStream.count(Count.count().withQueryableStoreName("my-store"));
>>>>>
>>>>> // Windowed Count
>>>>> KTable<Windowed<String>, Long> windowedCount =
>>>>>
>> groupedStream.count(Count.windowed(TimeWindows.of(10L).until(10)).withQueryableStoreName("my-windowed-store"));
>>>>> // Session Count
>>>>> KTable<Windowed<String>, Long> sessionCount =
>>>>>
>> groupedStream.count(Count.sessionWindowed(SessionWindows.with(10L)).withQueryableStoreName("my-session-windowed-store"));
>>>>>
>>>> Above and below, i think i'd prefer it to be:
>>>> groupedStream.count(/** non windowed count**/)
>>>> groupedStream.windowed(TimeWindows.of(10L)).count(...)
>>>> groupedStream.sessionWindowed(SessionWindows.of(10L)).count(...)
>>>>
>>>>
>>>>
>>>>
>>>>> // Reduce
>>>>> Reducer<Long> reducer;
>>>>> KTable<String, Long> reduce = groupedStream.reduce(reducer,
>>>>> Reduce.reduce().withQueryableStoreName("my-store"));
>>>>>
>>>>> // Aggregate Windowed with Custom Store
>>>>> Initializer<String> initializer;
>>>>> Aggregator<String, Long, String> aggregator;
>>>>> KTable<Windowed<String>, String> aggregate =
>>>>> groupedStream.aggregate(initializer, aggregator,
>>>>>
>> Aggregate.windowed(TimeWindows.of(10L).until(10)).withStateStoreSupplier(stateStoreSupplier)));
>>>>> // Cogroup SessionWindowed
>>>>> KTable<String, String> cogrouped = groupedStream1.cogroup(aggregator1)
>>>>>          .cogroup(groupedStream2, aggregator2)
>>>>>          .aggregate(initializer, aggregator,
>>>>> Aggregate.sessionWindowed(SessionWindows.with(10L),
>>>>> sessionMerger).withQueryableStoreName("my-store"));
>>>>>
>>>>>
>>>>>
>>>>> public class Count {
>>>>>
>>>>>      public static class Windowed extends Count {
>>>>>          private Windows windows;
>>>>>      }
>>>>>      public static class SessionWindowed extends Count {
>>>>>          private SessionWindows sessionWindows;
>>>>>      }
>>>>>
>>>>>      public static Count count();
>>>>>      public static Windowed windowed(Windows windows);
>>>>>      public static SessionWindowed sessionWindowed(SessionWindows
>>>>> sessionWindows);
>>>>>
>>>>>      // All withXXX(...) methods.
>>>>> }
>>>>>
>>>>> public class KGroupedStream {
>>>>>      public KTable<K, Long> count(Count count);
>>>>>      public KTable<Windowed<K>, Long> count(Count.Windowed
count);
>>>>>      public KTable<Windowed<K>, Long> count(Count.SessionWindowed
>> count);
>>>>> …
>>>>> }
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Kyle
>>>>>
>>>>> From: Guozhang Wang
>>>>> Sent: Wednesday, June 28, 2017 7:45 PM
>>>>> To: dev@kafka.apache.org
>>>>> Subject: Re: [DISCUSS] Streams DSL/StateStore Refactoring
>>>>>
>>>>> I played the current proposal a bit with
>> https://github.com/dguy/kafka/
>>>>> tree/dsl-experiment <https://github.com/dguy/kafka/tree/dsl-experiment
>>> ,
>>>>> and here are my observations:
>>>>>
>>>>> 1. Personally I prefer
>>>>>
>>>>>      "stream.group(mapper) / stream.groupByKey()"
>>>>>
>>>>> than
>>>>>
>>>>>      "stream.group().withKeyMapper(mapper) / stream.group()"
>>>>>
>>>>> Since 1) withKeyMapper is not enforced programmatically though it is
>> not
>>>>> "really" optional like others, 2) syntax-wise it reads more natural.
>>>>>
>>>>> I think it is okay to add the APIs in (
>>>>>
>>>>>
>> https://github.com/dguy/kafka/blob/dsl-experiment/streams/src/main/java/org/apache/kafka/streams/kstream/GroupedStream.java
>>>>> )
>>>>> in KGroupedStream.
>>>>>
>>>>>
>>>>> 2. For the "withStateStoreSupplier" API, are the user supposed to pass
>> in
>>>>> the most-inner state store supplier (e.g. then one whose get() return
>>>>> RocksDBStore), or it is supposed to return the most-outer supplier with
>>>>> logging / metrics / etc? I think it would be more useful to only
>> require
>>>>> users pass in the inner state store supplier while specifying caching
/
>>>>> logging through other APIs.
>>>>>
>>>>> In addition, the "GroupedWithCustomStore" is a bit suspicious to me:
we
>>>> are
>>>>> allowing users to call other APIs like "withQueryableName" multiple
>> time,
>>>>> but only call "withStateStoreSupplier" only once in the end. Why is
>> that?
>>>>>
>>>>> 3. The current DSL seems to be only for aggregations, what about joins?
>>>>>
>>>>>
>>>>> 4. I think it is okay to keep the "withLogConfig": for the
>>>>> StateStoreSupplier it will still be user code specifying the topology
>> so
>>>> I
>>>>> do not see there is a big difference.
>>>>>
>>>>>
>>>>> 5. "WindowedGroupedStream" 's withStateStoreSupplier should take the
>>>>> windowed state store supplier to enforce typing?
>>>>>
>>>>>
>>>>> Below are minor ones:
>>>>>
>>>>> 6. "withQueryableName": maybe better "withQueryableStateName"?
>>>>>
>>>>> 7. "withLogConfig": maybe better "withLoggingTopicConfig()"?
>>>>>
>>>>>
>>>>>
>>>>> Guozhang
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jun 28, 2017 at 3:59 PM, Matthias J. Sax <
>> matthias@confluent.io>
>>>>> wrote:
>>>>>
>>>>>> I see your point about "when to add the processor to the topology".
>>>> That
>>>>>> is indeed an issue. Not sure it we could allow "updates" to the
>>>>> topology...
>>>>>> I don't see any problem with having all the withXX() in KTable
>>>> interface
>>>>>> -- but this might be subjective.
>>>>>>
>>>>>>
>>>>>> However, I don't understand your argument about putting aggregate()
>>>>>> after the withXX() -- all the calls to withXX() set optional
>> parameters
>>>>>> for aggregate() and not for groupBy() -- but a groupBy().withXX()
>>>>>> indicates that the withXX() belongs to the groupBy(). IMHO, this
might
>>>>>> be quite confusion for developers.
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>> On 6/28/17 2:55 AM, Damian Guy wrote:
>>>>>>>> I also think that mixing optional parameters with configs
is a bad
>>>>> idea.
>>>>>>>> Have not proposal for this atm but just wanted to mention
it. Hope
>>>> to
>>>>>>>> find some time to come up with something.
>>>>>>>>
>>>>>>>>
>>>>>>> Yes, i don't like the mix of config either. But the only real
config
>>>>> here
>>>>>>> is the logging config - which we don't really need as it can
already
>>>> be
>>>>>>> done via a custom StateStoreSupplier.
>>>>>>>
>>>>>>>
>>>>>>>> What I don't like in the current proposal is the
>>>>>>>> .grouped().withKeyMapper() -- the current solution with
>>>> .groupBy(...)
>>>>>>>> and .groupByKey() seems better. For clarity, we could rename
to
>>>>>>>> .groupByNewKey(...) and .groupByCurrentKey() (even if we
should find
>>>>>>>> some better names).
>>>>>>>>
>>>>>>>>
>>>>>>> it could be groupByKey(), groupBy() or something different bt
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> The proposed pattern "chains" grouping and aggregation too
close
>>>>>>>> together. I would rather separate both more than less, ie,
do into
>>>> the
>>>>>>>> opposite direction.
>>>>>>>>
>>>>>>>> I am also wondering, if we could so something more "fluent".
The
>>>>> initial
>>>>>>>> proposal was like:
>>>>>>>>
>>>>>>>>>> groupedStream.count()
>>>>>>>>>>     .withStoreName("name")
>>>>>>>>>>     .withCachingEnabled(false)
>>>>>>>>>>     .withLoggingEnabled(config)
>>>>>>>>>>     .table()
>>>>>>>> The .table() statement in the end was kinda alien.
>>>>>>>>
>>>>>>> I agree, but then all of the withXXX methods need to be on KTable
>>>> which
>>>>>> is
>>>>>>> worse in my opinion. You also need something that is going to
"build"
>>>>> the
>>>>>>> internal processors and add them to the topology.
>>>>>>>
>>>>>>>
>>>>>>>> The current proposal put the count() into the end -- ie,
the
>>>> optional
>>>>>>>> parameter for count() have to specified on the .grouped()
call --
>>>> this
>>>>>>>> does not seems to be the best way either.
>>>>>>>>
>>>>>>>>
>>>>>>> I actually prefer this method as you are building a grouped stream
>>>> that
>>>>>> you
>>>>>>> will aggregate. So
>>>> table.grouped(...).withOptionalStuff().aggregate(..)
>>>>>> etc
>>>>>>> seems natural to me.
>>>>>>>
>>>>>>>
>>>>>>>> I did not think this through in detail, but can't we just
do the
>>>>> initial
>>>>>>>> proposal with the .table() ?
>>>>>>>>
>>>>>>>> groupedStream.count().withStoreName("name").mapValues(...)
>>>>>>>>
>>>>>>>> Each .withXXX(...) return the current KTable and all the
.withXXX()
>>>>> are
>>>>>>>> just added to the KTable interface. Or do I miss anything
why this
>>>>> wont'
>>>>>>>> work or any obvious disadvantage?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>> See above.
>>>>>>>
>>>>>>>
>>>>>>>> -Matthias
>>>>>>>>
>>>>>>>> On 6/22/17 4:06 AM, Damian Guy wrote:
>>>>>>>>> Thanks everyone. My latest attempt is below. It builds
on the
>>>> fluent
>>>>>>>>> approach, but i think it is slightly nicer.
>>>>>>>>> I agree with some of what Eno said about mixing configy
stuff in
>>>> the
>>>>>> DSL,
>>>>>>>>> but i think that enabling caching and enabling logging
are things
>>>>> that
>>>>>>>>> aren't actually config. I'd probably not add withLogConfig(...)
>>>> (even
>>>>>>>>> though it is below) as this is actually config and we
already have
>>>> a
>>>>>> way
>>>>>>>> of
>>>>>>>>> doing that, via the StateStoreSupplier. Arguably we could
use the
>>>>>>>>> StateStoreSupplier for disabling caching etc, but as
it stands that
>>>>> is
>>>>>> a
>>>>>>>>> bit of a tedious process for someone that just wants
to use the
>>>>> default
>>>>>>>>> storage engine, but not have caching enabled.
>>>>>>>>>
>>>>>>>>> There is also an orthogonal concern that Guozhang alluded
to.... If
>>>>> you
>>>>>>>>> want to plug in a custom storage engine and you want
it to be
>>>> logged
>>>>>> etc,
>>>>>>>>> you would currently need to implement that yourself.
Ideally we can
>>>>>>>> provide
>>>>>>>>> a way where we will wrap the custom store with logging,
metrics,
>>>>> etc. I
>>>>>>>>> need to think about where this fits, it is probably more
>>>> appropriate
>>>>> on
>>>>>>>> the
>>>>>>>>> Stores API.
>>>>>>>>>
>>>>>>>>> final KeyValueMapper<String, String, Long> keyMapper
= null;
>>>>>>>>> // count with mapped key
>>>>>>>>> final KTable<Long, Long> count = stream.grouped()
>>>>>>>>>          .withKeyMapper(keyMapper)
>>>>>>>>>          .withKeySerde(Serdes.Long())
>>>>>>>>>          .withValueSerde(Serdes.String())
>>>>>>>>>          .withQueryableName("my-store")
>>>>>>>>>          .count();
>>>>>>>>>
>>>>>>>>> // windowed count
>>>>>>>>> final KTable<Windowed<String>, Long> windowedCount
=
>>>> stream.grouped()
>>>>>>>>>          .withQueryableName("my-window-store")
>>>>>>>>>          .windowed(TimeWindows.of(10L).until(10))
>>>>>>>>>          .count();
>>>>>>>>>
>>>>>>>>> // windowed reduce
>>>>>>>>> final Reducer<String> windowedReducer = null;
>>>>>>>>> final KTable<Windowed<String>, String> windowedReduce
=
>>>>>> stream.grouped()
>>>>>>>>>          .withQueryableName("my-window-store")
>>>>>>>>>          .windowed(TimeWindows.of(10L).until(10))
>>>>>>>>>          .reduce(windowedReducer);
>>>>>>>>>
>>>>>>>>> final Aggregator<String, String, Long> aggregator
= null;
>>>>>>>>> final Initializer<Long> init = null;
>>>>>>>>>
>>>>>>>>> // aggregate
>>>>>>>>> final KTable<String, Long> aggregate = stream.grouped()
>>>>>>>>>          .withQueryableName("my-aggregate-store")
>>>>>>>>>          .aggregate(aggregator, init, Serdes.Long());
>>>>>>>>>
>>>>>>>>> final StateStoreSupplier<KeyValueStore<String,
Long>>
>>>>>> stateStoreSupplier
>>>>>>>> = null;
>>>>>>>>> // aggregate with custom store
>>>>>>>>> final KTable<String, Long> aggWithCustomStore =
stream.grouped()
>>>>>>>>>          .withStateStoreSupplier(stateStoreSupplier)
>>>>>>>>>          .aggregate(aggregator, init);
>>>>>>>>>
>>>>>>>>> // disable caching
>>>>>>>>> stream.grouped()
>>>>>>>>>          .withQueryableName("name")
>>>>>>>>>          .withCachingEnabled(false)
>>>>>>>>>          .count();
>>>>>>>>>
>>>>>>>>> // disable logging
>>>>>>>>> stream.grouped()
>>>>>>>>>          .withQueryableName("q")
>>>>>>>>>          .withLoggingEnabled(false)
>>>>>>>>>          .count();
>>>>>>>>>
>>>>>>>>> // override log config
>>>>>>>>> final Reducer<String> reducer = null;
>>>>>>>>> stream.grouped()
>>>>>>>>>          .withLogConfig(Collections.singletonMap("segment.size",
>>>>> "10"))
>>>>>>>>>          .reduce(reducer);
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> If anyone wants to play around with this you can find
the code
>>>> here:
>>>>>>>>> https://github.com/dguy/kafka/tree/dsl-experiment
>>>>>>>>>
>>>>>>>>> Note: It won't actually work as most of the methods just
return
>>>> null.
>>>>>>>>> Thanks,
>>>>>>>>> Damian
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, 22 Jun 2017 at 11:18 Ismael Juma <ismael@juma.me.uk>
>>>> wrote:
>>>>>>>>>> Thanks Damian. I think both options have pros and
cons. And both
>>>> are
>>>>>>>> better
>>>>>>>>>> than overload abuse.
>>>>>>>>>>
>>>>>>>>>> The fluent API approach reads better, no mention
of builder or
>>>> build
>>>>>>>>>> anywhere. The main downside is that the method signatures
are a
>>>>> little
>>>>>>>> less
>>>>>>>>>> clear. By reading the method signature, one doesn't
necessarily
>>>>> knows
>>>>>>>> what
>>>>>>>>>> it returns. Also, one needs to figure out the special
method
>>>>>> (`table()`
>>>>>>>> in
>>>>>>>>>> this case) that gives you what you actually care
about (`KTable`
>>>> in
>>>>>> this
>>>>>>>>>> case). Not major issues, but worth mentioning while
doing the
>>>>>>>> comparison.
>>>>>>>>>> The builder approach avoids the issues mentioned
above, but it
>>>>> doesn't
>>>>>>>> read
>>>>>>>>>> as well.
>>>>>>>>>>
>>>>>>>>>> Ismael
>>>>>>>>>>
>>>>>>>>>> On Wed, Jun 21, 2017 at 3:37 PM, Damian Guy <damian.guy@gmail.com
>>>>>>>> wrote:
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I'd like to get a discussion going around some
of the API choices
>>>>>> we've
>>>>>>>>>>> made in the DLS. In particular those that relate
to stateful
>>>>>> operations
>>>>>>>>>>> (though this could expand).
>>>>>>>>>>> As it stands we lean heavily on overloaded methods
in the API,
>>>> i.e,
>>>>>>>> there
>>>>>>>>>>> are 9 overloads for KGroupedStream.count(..)!
It is becoming
>>>> noisy
>>>>>> and
>>>>>>>> i
>>>>>>>>>>> feel it is only going to get worse as we add
more optional
>>>> params.
>>>>> In
>>>>>>>>>>> particular we've had some requests to be able
to turn caching
>>>> off,
>>>>> or
>>>>>>>>>>> change log configs,  on a per operator basis
(note this can be
>>>> done
>>>>>> now
>>>>>>>>>> if
>>>>>>>>>>> you pass in a StateStoreSupplier, but this can
be a bit
>>>>> cumbersome).
>>>>>>>>>>> So this is a bit of an open question. How can
we change the DSL
>>>>>>>> overloads
>>>>>>>>>>> so that it flows, is simple to use and understand,
and is easily
>>>>>>>> extended
>>>>>>>>>>> in the future?
>>>>>>>>>>>
>>>>>>>>>>> One option would be to use a fluent API approach
for providing
>>>> the
>>>>>>>>>> optional
>>>>>>>>>>> params, so something like this:
>>>>>>>>>>>
>>>>>>>>>>> groupedStream.count()
>>>>>>>>>>>     .withStoreName("name")
>>>>>>>>>>>     .withCachingEnabled(false)
>>>>>>>>>>>     .withLoggingEnabled(config)
>>>>>>>>>>>     .table()
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Another option would be to provide a Builder
to the count method,
>>>>> so
>>>>>> it
>>>>>>>>>>> would look something like this:
>>>>>>>>>>> groupedStream.count(new
>>>>>>>>>>> CountBuilder("storeName").withCachingEnabled(false).build())
>>>>>>>>>>>
>>>>>>>>>>> Another option is to say: Hey we don't need this,
what are you on
>>>>>>>> about!
>>>>>>>>>>> The above has focussed on state store related
overloads, but the
>>>>> same
>>>>>>>>>> ideas
>>>>>>>>>>> could  be applied to joins etc, where we presently
have many join
>>>>>>>> methods
>>>>>>>>>>> and many overloads.
>>>>>>>>>>>
>>>>>>>>>>> Anyway, i look forward to hearing your opinions.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Damian
>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> -- Guozhang
>>>>>
>>>>>
>>>>
>>


Mime
View raw message