kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From josh gruenberg <jos...@gmail.com>
Subject Re: Kafka Streams: context.forward() with downstream name
Date Tue, 05 Apr 2016 15:24:30 GMT
Hi all,

Just chiming in with Yuto: I think the custom Processor becomes attractive
in scenarios where a node in the graph may emit to a variety of downstream
paths, possibly after some delay, depending on logic. This can probably
often be achieved with the existing DSL using some combination of
predicates and intermediate representations, but this involves contortions
that feel cumbersome, and probably leads to less intelligible code. I'm
also not sure the current DSL can model scenarios where the transformation
may be one-to-many, as in the last part of Yuto's example, or where the
emission-delay is data-driven, as in my earlier "sessionization" example.

One idea I'd offer is to provide a mechanism for wiring in Processors with
"telescoping" arity (eg, support Processor1<I, O1>, Processor2<I, O1, O2>,
etc), and providing each arity with type-safe forwarding interfaces for
each output stream (eg, Forwarder<T>). This assigns each output-stream a
clear ordinal, and suggests a corresponding type-safe return-type for the
DSL (eg, KStreamTuple2<O1, O2>).

I think this pattern could provide a unification of the 'Transformer' and
'Processor' APIs.
This was what I had in mind for a PR we discussed earlier (for modifying
the Transformer API), but the scope expanded beyond what I felt comfortable
submitting without discussion, and I had to prioritize other efforts.
Regardless, I could get a WIP branch pushed to github later today to
illustrate if you'd like to see it.

HTH,
-josh

On Mon, Apr 4, 2016, 9:14 PM Guozhang Wang <wangguoz@gmail.com> wrote:

> Thanks Yuto for your code snippet. Since you need to access a customized
> external storage for metadata, that indeed cannot be wrapped in any
> built-in operators in the Streams DSL yet, and your code example in the
> previous email would be close to the best you can do with the high-level
> DSL now.
>
> One minor improvement from your above code, though, is that instead of
> calling map(... -> process()) you can actually call transform(), which
> still allows you to provide a customized transformer function, but it still
> gives you strong typing assuming all these three kinds of records are of
> the same key / value types.
>
> Guozhang
>
>
>
>
> On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA <kawamuray.dadada@gmail.com
> >
> wrote:
>
> > 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wangguoz@gmail.com>:
> > > Hi Yuto,
> > >
> > > Is the destination topic embedded as part of the value in the original
> > > "foo" topic? If yes could you just access that field directly instead
> of
> > > mapping to a (key, value, destination) triplet?
> > >
> >
> > Nope. KeyValueWithDestination is just an example of output from the
> > first Processor and is not included in actual messages that the topic
> > foo received.
> > Let me explain bit more realistic use-case. How can we write a
> > Processor like below in High-level DSL cleanly?
> >
> > ```java
> > public class EventProcessor implements Processor<String, Event> {
> > ...
> >   @Override
> >   public void process(String key, Event value) {
> >       EventMetadata meta =
> > getEventMetadataFromExternalStorage(value.getId());
> >
> >       if (isFieldACorrupted(meta, value.getFieldA())) {
> >           // This event is corrupted! let's evacuate it once to the
> > grave topic for further investigation.
> >           context.forward(key, value, "CorruptedEventSink");
> >       }
> >       if (isFieldBCorrupted(meta, value.getFieldB())) {
> >           // Antoher case of corruption, but maybe recoverable.
> >           context.forward(key, value, "CorruptedEventRecoveryProcessor");
> >       }
> >
> >       for (Foo foo : event.getFoos()) {
> >           context.forward(key, buildMessage(meta, foo), "FooProcessor");
> >       }
> >   }
> > ...
> > }
> > ```
> >
> >
> > > Guozhang
> > >
> > > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <
> > kawamuray.dadada@gmail.com>
> > > wrote:
> > >
> > >> Hi Guozhang,
> > >>
> > >>
> > >>
> > >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wangguoz@gmail.com>:
> > >> > Hi Yuto,
> > >> >
> > >> > That is a good suggestion, the child index is not very intuitive
> from
> > >> > programmer's view and we can even consider replacing it with the
> > >> processor
> > >> > name instead of overloading it. Could you file a JIRA?
> > >> >
> > >>
> > >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497
> > >>
> > >> > Also I am wondering if you have looked at the higher-level Streams
> > DSL,
> > >> and
> > >> > if yes could let me know what are the limitations from using that
> > APIs in
> > >> > your case?
> > >> >
> > >>
> > >> Well, I read though high-level DSL interface but couldn't find an easy
> > >> way to handle output from Processors which could issue multiple
> > >> messages to arbitrary different destinations.
> > >> Maybe it could be done by doing something like below but it doesn't
> > >> look good. Please let me know if you have any idea to do this in
> > >> easier way.
> > >>
> > >> ```java
> > >> class KeyValueWithDestination {
> > >>     K key;
> > >>     V value;
> > >>     String destination;
> > >> }
> > >>
> > >> class DestinationPredicate implements Predicate<K,
> > >> KeyValueWithDestination> {
> > >>     String destination;
> > >>     @Override
> > >>     public boolean test(K key, KeyValueWithDestination value) {
> > >>         return value.destination.equals(destination);
> > >>     }
> > >> }
> > >>
> > >> String[] destTopics = {"topicA", "topicB", "topicC"};
> > >>
> > >> Predicate<K, KeyValueWithDestination>[] predicates =
> > >>         Arrays.stream(destTopics).map(DestinationPredicate::new)
> > >>                                  .toArray(Predicate<K,
> > >> KeyValueWithDestination>::new);
> > >>
> > >> branches = builder.stream("foo")
> > >>                   .map((key, value) -> processor.process(key, value)
> > >> /* => KeyValueWithDestination */)
> > >>                   .branch(predicates);
> > >>
> > >> for (int i = 0; i < branches.length; i++) {
> > >>     branches[i].to(destTopics[i]);
> > >> }
> > >> ```
> > >>
> > >>
> > >> > Guozhang
> > >> >
> > >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <
> > >> kawamuray.dadada@gmail.com>
> > >> > wrote:
> > >> >
> > >> >> When I tried to implement a task which does kinda dispatching
to
> > >> >> downstream processors or sinks, looks like relying on
> > >> >> context.forward(K, V, int childIndex) is the only way now.
> > >> >> I have a question why this method implemented using
> childIndex(which
> > >> >> is just an index of children "List" that based on order of
> > >> >> builder.addProcessor() call) instead of child name(first argument
> to
> > >> >> add{Processor,Sink}).
> > >> >> I wanna ask what is the concrete use case of forward(K, V, int
> > >> >> childIndex) and is it makes sense to introduce another overload:
> > >> >> forward(K, V, String childName) for much handy use.
> > >> >> Currently I have a use-case like this in my mind:
> > >> >> ```
> > >> >> builder.addProcessor("DispatchProcess", new
> > >> >> DispatchProcessorSupplier(), "Source");
> > >> >> builder.addProcessor("Process-A", new ProcessorASupplier(),
> > >> >> "DispatchProcess");
> > >> >> builder.addProcessor("Process-B", new ProcessorBSupplier(),
> > >> >> "DispatchProcess");
> > >> >>
> > >> >> // in process(key, value)
> > >> >> if ("key-for-A".equals(key)) {
> > >> >>     context.forward(key, value, "Process-A");
> > >> >> } else if ("key-for-B".equals(key)) {
> > >> >>     context.forward(key, value, "Process-B");
> > >> >> }
> > >> >> ```
> > >> >>
> > >> >
> > >> >
> > >> >
> > >> > --
> > >> > -- Guozhang
> > >>
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message