kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Kafka Streams: context.forward() with downstream name
Date Thu, 14 Apr 2016 20:36:48 GMT
Hi Josh,

As we chatted offline, would you like to summarize your proposed Transform
APIs in a separate JIRA so we can continue our discussions there?

Guozhang

On Tue, Apr 5, 2016 at 4:13 PM, Guozhang Wang <wangguoz@gmail.com> wrote:

> HI Josh,
>
> Re 1): transform is only for usage in the higher-level DSL, while in the
> lower-level APIs people are expected to work with Processor only, which for
> now use context.forward() to send record to the downstream processors.
>
> Re 2): I have a few questions for your propose: with different typed
> key-value pairs, are they supposed to be forwarding to different children
> processors, i.e. you'd better call "forward(K, V, String /* child name*/ )"
> rather than "forward(K, V)" which will forward to all the children
> processors, one by one. In this case, the topology builder also needs to be
> refactored since we need to make sure the children processors exist when
> the processors are defined, which could be a bit tricky in implementation.
>
> Also, is this change also aimed at making the lower-level Processor to be
> type-safe as well? The main motivation for not supporting strong typed
> Processors is mainly for allowing users to flexibly connect processors in
> the topology without worrying about data types of each processor. As I
> mentioned above, making specific forwards to downstream processors would
> likely to defeat this purpose.
>
>
> Guozhang
>
>
> On Tue, Apr 5, 2016 at 2:30 PM, josh gruenberg <joshgr@gmail.com> wrote:
>
>> Hi Guozhang,
>>
>> I'll reply to your points in-line below:
>>
>> On Tue, Apr 5, 2016 at 10:23 AM Guozhang Wang <wangguoz@gmail.com> wrote:
>>
>> > Hi Josh,
>> >
>> > I think there are a few issues that we want to resolve here, which
>> could be
>> > orthogonal to each other.
>> >
>> > 1) one-to-many mapping in transform() function that generates a single
>> > stream (i.e. single typed key-value pairs).
>> >
>> > Since transform() already enforces to make type-safe return values, one
>> > thing we can do is to change the punctuate() function return value from
>> > "null" to "R" as well. And then for one-to-many mapping one can then
>> define
>> > R as Array<MyType>
>> >
>> > stream.transform().flatMap(/* from Array<MyType> to MyType*/)
>> >
>> >
>> Interesting, it hadn't thought of returning an Iterable from a Transformer
>> to achieve the one-to-many case. Regardless, my initial reaction is that
>> this seems natural when you're working in the declarative DSL (where we
>> can
>> move the flatMap to the left of the one-to-many lambda), but seems
>> cumbersome when writing a lower-level imperative Transformer/Processor
>> (which then requires a subsequent flatMap to explode the arrays). I think
>> I'd prefer a more imperative, side-effecting style of emission in this
>> case; see my reply to #2 below.
>>
>>
>> > 2) having a new function that takes one stream, and generate multiple
>> > streams with different key-value types.
>> >
>> > This is a good-to-have operator in the Streams DSL, and I think this is
>> > your proposed new API in the previous email? I am not sure I
>> > understand the "telescoping"
>> > arity completely though, so let me know if I'm wrong.
>> >
>>
>> Yes, sorry, I wrote my previous mail in a hurry this morning. By
>> "telescoping", I meant defining processor/transformer interfaces for each
>> supported output arity, with the corresponding output-types expressed as
>> generics:
>>
>>    - Processor1<K, V, K1, V1>
>>    - Processor2<K, V, K1, V1, K2, V2>
>>    - Processor3<K, V, K1, V1, K2, V2, K3, V3>
>>
>> (Yes, the number of generics here is getting unwieldy, but I don't
>> immediately see a good way to avoid that while preserving type-safety.
>> Some
>> sort of cosmetic improvement would be nice!)
>>
>> Given this, the framework could inject type-safe "emitters" into the
>> Processors for each output-stream:
>>
>> interface Processor2<K, V, K1, V1, K2, V2> {
>>   void init(ProcessorContext context, Forwarder<K1, V1> output1,
>> Forwarder<K2, V2> output2);
>>
>>   // these can use the forwarders provided to init() to emit any number of
>> values
>>   void process(I input);
>>   void punctuate(long timestamp);
>>   // ...
>> }
>>
>> interface Forwarder<K, V> {
>>   void forward(K key, V value);
>> }
>>
>> ... then, in KStream<K,V>:
>>
>> <K1, V1, K2, V2> KStreamTuple2<K1, V1, K2, V2> process2(Processor2<K,
V,
>> K1, V1, K2, V2> processor, String... stateStores);
>>
>> I haven't worked through all of the details, but I'm optimistic that this
>> could work nicely to unify the Transformer and Processor APIs, and address
>> all of the described use-cases (up to some arbitrarily-chosen number of
>> supported output-streams).
>>
>>
>> > 3) having data-driven emission policy (this will be the building block
>> of
>> > session windows) as well as time-drive emission policy.
>> >
>> > I am thinking about how to support this as well, one thing is that we
>> can
>> > use the underlying process() function for data-driven emission, for
>> > example, if there is a session-start / end flag then create the
>> > corresponding session record in state, and only emit upon session-end
>> flag;
>> > and the underlying punctuate() function for time-drive emission (we
>> > probably need to first refactor it to be triggered by record timestamp
>> > instead of wallclock time).
>> >
>>
>> Yes, I agree: data-driven emission could work just fine with process(),
>> and
>> delayed emission works nicely with punctuate(). I've also been meaning to
>> mention the clear need for event-time punctuation, so I'm glad to hear
>> that's on your radar! Watermarking will be important for
>> session-windowing.
>>
>> Thoughts?
>>
>> -josh
>>
>>
>> >
>> > On Tue, Apr 5, 2016 at 8:24 AM, josh gruenberg <joshgr@gmail.com>
>> wrote:
>> >
>> > > Hi all,
>> > >
>> > > Just chiming in with Yuto: I think the custom Processor becomes
>> > attractive
>> > > in scenarios where a node in the graph may emit to a variety of
>> > downstream
>> > > paths, possibly after some delay, depending on logic. This can
>> probably
>> > > often be achieved with the existing DSL using some combination of
>> > > predicates and intermediate representations, but this involves
>> > contortions
>> > > that feel cumbersome, and probably leads to less intelligible code.
>> I'm
>> > > also not sure the current DSL can model scenarios where the
>> > transformation
>> > > may be one-to-many, as in the last part of Yuto's example, or where
>> the
>> > > emission-delay is data-driven, as in my earlier "sessionization"
>> example.
>> > >
>> > > One idea I'd offer is to provide a mechanism for wiring in Processors
>> > with
>> > > "telescoping" arity (eg, support Processor1<I, O1>, Processor2<I,
O1,
>> > O2>,
>> > > etc), and providing each arity with type-safe forwarding interfaces
>> for
>> > > each output stream (eg, Forwarder<T>). This assigns each
>> output-stream a
>> > > clear ordinal, and suggests a corresponding type-safe return-type for
>> the
>> > > DSL (eg, KStreamTuple2<O1, O2>).
>> > >
>> > > I think this pattern could provide a unification of the 'Transformer'
>> and
>> > > 'Processor' APIs.
>> > > This was what I had in mind for a PR we discussed earlier (for
>> modifying
>> > > the Transformer API), but the scope expanded beyond what I felt
>> > comfortable
>> > > submitting without discussion, and I had to prioritize other efforts.
>> > > Regardless, I could get a WIP branch pushed to github later today to
>> > > illustrate if you'd like to see it.
>> > >
>> > > HTH,
>> > > -josh
>> > >
>> > > On Mon, Apr 4, 2016, 9:14 PM Guozhang Wang <wangguoz@gmail.com>
>> wrote:
>> > >
>> > > > Thanks Yuto for your code snippet. Since you need to access a
>> > customized
>> > > > external storage for metadata, that indeed cannot be wrapped in any
>> > > > built-in operators in the Streams DSL yet, and your code example in
>> the
>> > > > previous email would be close to the best you can do with the
>> > high-level
>> > > > DSL now.
>> > > >
>> > > > One minor improvement from your above code, though, is that instead
>> of
>> > > > calling map(... -> process()) you can actually call transform(),
>> which
>> > > > still allows you to provide a customized transformer function, but
>> it
>> > > still
>> > > > gives you strong typing assuming all these three kinds of records
>> are
>> > of
>> > > > the same key / value types.
>> > > >
>> > > > Guozhang
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA <
>> > > kawamuray.dadada@gmail.com
>> > > > >
>> > > > wrote:
>> > > >
>> > > > > 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wangguoz@gmail.com>:
>> > > > > > Hi Yuto,
>> > > > > >
>> > > > > > Is the destination topic embedded as part of the value in
the
>> > > original
>> > > > > > "foo" topic? If yes could you just access that field directly
>> > instead
>> > > > of
>> > > > > > mapping to a (key, value, destination) triplet?
>> > > > > >
>> > > > >
>> > > > > Nope. KeyValueWithDestination is just an example of output from
>> the
>> > > > > first Processor and is not included in actual messages that the
>> topic
>> > > > > foo received.
>> > > > > Let me explain bit more realistic use-case. How can we write
a
>> > > > > Processor like below in High-level DSL cleanly?
>> > > > >
>> > > > > ```java
>> > > > > public class EventProcessor implements Processor<String, Event>
{
>> > > > > ...
>> > > > >   @Override
>> > > > >   public void process(String key, Event value) {
>> > > > >       EventMetadata meta =
>> > > > > getEventMetadataFromExternalStorage(value.getId());
>> > > > >
>> > > > >       if (isFieldACorrupted(meta, value.getFieldA())) {
>> > > > >           // This event is corrupted! let's evacuate it once
to
>> the
>> > > > > grave topic for further investigation.
>> > > > >           context.forward(key, value, "CorruptedEventSink");
>> > > > >       }
>> > > > >       if (isFieldBCorrupted(meta, value.getFieldB())) {
>> > > > >           // Antoher case of corruption, but maybe recoverable.
>> > > > >           context.forward(key, value,
>> > > "CorruptedEventRecoveryProcessor");
>> > > > >       }
>> > > > >
>> > > > >       for (Foo foo : event.getFoos()) {
>> > > > >           context.forward(key, buildMessage(meta, foo),
>> > > "FooProcessor");
>> > > > >       }
>> > > > >   }
>> > > > > ...
>> > > > > }
>> > > > > ```
>> > > > >
>> > > > >
>> > > > > > Guozhang
>> > > > > >
>> > > > > > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <
>> > > > > kawamuray.dadada@gmail.com>
>> > > > > > wrote:
>> > > > > >
>> > > > > >> Hi Guozhang,
>> > > > > >>
>> > > > > >>
>> > > > > >>
>> > > > > >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wangguoz@gmail.com>:
>> > > > > >> > Hi Yuto,
>> > > > > >> >
>> > > > > >> > That is a good suggestion, the child index is not
very
>> intuitive
>> > > > from
>> > > > > >> > programmer's view and we can even consider replacing
it with
>> the
>> > > > > >> processor
>> > > > > >> > name instead of overloading it. Could you file
a JIRA?
>> > > > > >> >
>> > > > > >>
>> > > > > >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497
>> > > > > >>
>> > > > > >> > Also I am wondering if you have looked at the higher-level
>> > Streams
>> > > > > DSL,
>> > > > > >> and
>> > > > > >> > if yes could let me know what are the limitations
from using
>> > that
>> > > > > APIs in
>> > > > > >> > your case?
>> > > > > >> >
>> > > > > >>
>> > > > > >> Well, I read though high-level DSL interface but couldn't
find
>> an
>> > > easy
>> > > > > >> way to handle output from Processors which could issue
multiple
>> > > > > >> messages to arbitrary different destinations.
>> > > > > >> Maybe it could be done by doing something like below
but it
>> > doesn't
>> > > > > >> look good. Please let me know if you have any idea to
do this
>> in
>> > > > > >> easier way.
>> > > > > >>
>> > > > > >> ```java
>> > > > > >> class KeyValueWithDestination {
>> > > > > >>     K key;
>> > > > > >>     V value;
>> > > > > >>     String destination;
>> > > > > >> }
>> > > > > >>
>> > > > > >> class DestinationPredicate implements Predicate<K,
>> > > > > >> KeyValueWithDestination> {
>> > > > > >>     String destination;
>> > > > > >>     @Override
>> > > > > >>     public boolean test(K key, KeyValueWithDestination
value) {
>> > > > > >>         return value.destination.equals(destination);
>> > > > > >>     }
>> > > > > >> }
>> > > > > >>
>> > > > > >> String[] destTopics = {"topicA", "topicB", "topicC"};
>> > > > > >>
>> > > > > >> Predicate<K, KeyValueWithDestination>[] predicates
=
>> > > > > >>
>>  Arrays.stream(destTopics).map(DestinationPredicate::new)
>> > > > > >>                                  .toArray(Predicate<K,
>> > > > > >> KeyValueWithDestination>::new);
>> > > > > >>
>> > > > > >> branches = builder.stream("foo")
>> > > > > >>                   .map((key, value) -> processor.process(key,
>> > value)
>> > > > > >> /* => KeyValueWithDestination */)
>> > > > > >>                   .branch(predicates);
>> > > > > >>
>> > > > > >> for (int i = 0; i < branches.length; i++) {
>> > > > > >>     branches[i].to(destTopics[i]);
>> > > > > >> }
>> > > > > >> ```
>> > > > > >>
>> > > > > >>
>> > > > > >> > Guozhang
>> > > > > >> >
>> > > > > >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <
>> > > > > >> kawamuray.dadada@gmail.com>
>> > > > > >> > wrote:
>> > > > > >> >
>> > > > > >> >> When I tried to implement a task which does
kinda
>> dispatching
>> > to
>> > > > > >> >> downstream processors or sinks, looks like
relying on
>> > > > > >> >> context.forward(K, V, int childIndex) is the
only way now.
>> > > > > >> >> I have a question why this method implemented
using
>> > > > childIndex(which
>> > > > > >> >> is just an index of children "List" that based
on order of
>> > > > > >> >> builder.addProcessor() call) instead of child
name(first
>> > argument
>> > > > to
>> > > > > >> >> add{Processor,Sink}).
>> > > > > >> >> I wanna ask what is the concrete use case of
forward(K, V,
>> int
>> > > > > >> >> childIndex) and is it makes sense to introduce
another
>> > overload:
>> > > > > >> >> forward(K, V, String childName) for much handy
use.
>> > > > > >> >> Currently I have a use-case like this in my
mind:
>> > > > > >> >> ```
>> > > > > >> >> builder.addProcessor("DispatchProcess", new
>> > > > > >> >> DispatchProcessorSupplier(), "Source");
>> > > > > >> >> builder.addProcessor("Process-A", new ProcessorASupplier(),
>> > > > > >> >> "DispatchProcess");
>> > > > > >> >> builder.addProcessor("Process-B", new ProcessorBSupplier(),
>> > > > > >> >> "DispatchProcess");
>> > > > > >> >>
>> > > > > >> >> // in process(key, value)
>> > > > > >> >> if ("key-for-A".equals(key)) {
>> > > > > >> >>     context.forward(key, value, "Process-A");
>> > > > > >> >> } else if ("key-for-B".equals(key)) {
>> > > > > >> >>     context.forward(key, value, "Process-B");
>> > > > > >> >> }
>> > > > > >> >> ```
>> > > > > >> >>
>> > > > > >> >
>> > > > > >> >
>> > > > > >> >
>> > > > > >> > --
>> > > > > >> > -- Guozhang
>> > > > > >>
>> > > > > >
>> > > > > >
>> > > > > >
>> > > > > > --
>> > > > > > -- Guozhang
>> > > > >
>> > > >
>> > > >
>> > > >
>> > > > --
>> > > > -- Guozhang
>> > > >
>> > >
>> >
>> >
>> >
>> > --
>> > -- Guozhang
>> >
>>
>
>
>
> --
> -- Guozhang
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message