kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Kafka Streams: context.forward() with downstream name
Date Tue, 05 Apr 2016 17:23:20 GMT
Hi Josh,

I think there are a few issues that we want to resolve here, which could be
orthogonal to each other.

1) one-to-many mapping in transform() function that generates a single
stream (i.e. single typed key-value pairs).

Since transform() already enforces to make type-safe return values, one
thing we can do is to change the punctuate() function return value from
"null" to "R" as well. And then for one-to-many mapping one can then define
R as Array<MyType>

stream.transform().flatMap(/* from Array<MyType> to MyType*/)

2) having a new function that takes one stream, and generate multiple
streams with different key-value types.

This is a good-to-have operator in the Streams DSL, and I think this is
your proposed new API in the previous email? I am not sure I
understand the "telescoping"
arity completely though, so let me know if I'm wrong.

3) having data-driven emission policy (this will be the building block of
session windows) as well as time-drive emission policy.

I am thinking about how to support this as well, one thing is that we can
use the underlying process() function for data-driven emission, for
example, if there is a session-start / end flag then create the
corresponding session record in state, and only emit upon session-end flag;
and the underlying punctuate() function for time-drive emission (we
probably need to first refactor it to be triggered by record timestamp
instead of wallclock time).


Guozhang





On Tue, Apr 5, 2016 at 8:24 AM, josh gruenberg <joshgr@gmail.com> wrote:

> Hi all,
>
> Just chiming in with Yuto: I think the custom Processor becomes attractive
> in scenarios where a node in the graph may emit to a variety of downstream
> paths, possibly after some delay, depending on logic. This can probably
> often be achieved with the existing DSL using some combination of
> predicates and intermediate representations, but this involves contortions
> that feel cumbersome, and probably leads to less intelligible code. I'm
> also not sure the current DSL can model scenarios where the transformation
> may be one-to-many, as in the last part of Yuto's example, or where the
> emission-delay is data-driven, as in my earlier "sessionization" example.
>
> One idea I'd offer is to provide a mechanism for wiring in Processors with
> "telescoping" arity (eg, support Processor1<I, O1>, Processor2<I, O1, O2>,
> etc), and providing each arity with type-safe forwarding interfaces for
> each output stream (eg, Forwarder<T>). This assigns each output-stream a
> clear ordinal, and suggests a corresponding type-safe return-type for the
> DSL (eg, KStreamTuple2<O1, O2>).
>
> I think this pattern could provide a unification of the 'Transformer' and
> 'Processor' APIs.
> This was what I had in mind for a PR we discussed earlier (for modifying
> the Transformer API), but the scope expanded beyond what I felt comfortable
> submitting without discussion, and I had to prioritize other efforts.
> Regardless, I could get a WIP branch pushed to github later today to
> illustrate if you'd like to see it.
>
> HTH,
> -josh
>
> On Mon, Apr 4, 2016, 9:14 PM Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > Thanks Yuto for your code snippet. Since you need to access a customized
> > external storage for metadata, that indeed cannot be wrapped in any
> > built-in operators in the Streams DSL yet, and your code example in the
> > previous email would be close to the best you can do with the high-level
> > DSL now.
> >
> > One minor improvement from your above code, though, is that instead of
> > calling map(... -> process()) you can actually call transform(), which
> > still allows you to provide a customized transformer function, but it
> still
> > gives you strong typing assuming all these three kinds of records are of
> > the same key / value types.
> >
> > Guozhang
> >
> >
> >
> >
> > On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA <
> kawamuray.dadada@gmail.com
> > >
> > wrote:
> >
> > > 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wangguoz@gmail.com>:
> > > > Hi Yuto,
> > > >
> > > > Is the destination topic embedded as part of the value in the
> original
> > > > "foo" topic? If yes could you just access that field directly instead
> > of
> > > > mapping to a (key, value, destination) triplet?
> > > >
> > >
> > > Nope. KeyValueWithDestination is just an example of output from the
> > > first Processor and is not included in actual messages that the topic
> > > foo received.
> > > Let me explain bit more realistic use-case. How can we write a
> > > Processor like below in High-level DSL cleanly?
> > >
> > > ```java
> > > public class EventProcessor implements Processor<String, Event> {
> > > ...
> > >   @Override
> > >   public void process(String key, Event value) {
> > >       EventMetadata meta =
> > > getEventMetadataFromExternalStorage(value.getId());
> > >
> > >       if (isFieldACorrupted(meta, value.getFieldA())) {
> > >           // This event is corrupted! let's evacuate it once to the
> > > grave topic for further investigation.
> > >           context.forward(key, value, "CorruptedEventSink");
> > >       }
> > >       if (isFieldBCorrupted(meta, value.getFieldB())) {
> > >           // Antoher case of corruption, but maybe recoverable.
> > >           context.forward(key, value,
> "CorruptedEventRecoveryProcessor");
> > >       }
> > >
> > >       for (Foo foo : event.getFoos()) {
> > >           context.forward(key, buildMessage(meta, foo),
> "FooProcessor");
> > >       }
> > >   }
> > > ...
> > > }
> > > ```
> > >
> > >
> > > > Guozhang
> > > >
> > > > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <
> > > kawamuray.dadada@gmail.com>
> > > > wrote:
> > > >
> > > >> Hi Guozhang,
> > > >>
> > > >>
> > > >>
> > > >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wangguoz@gmail.com>:
> > > >> > Hi Yuto,
> > > >> >
> > > >> > That is a good suggestion, the child index is not very intuitive
> > from
> > > >> > programmer's view and we can even consider replacing it with
the
> > > >> processor
> > > >> > name instead of overloading it. Could you file a JIRA?
> > > >> >
> > > >>
> > > >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497
> > > >>
> > > >> > Also I am wondering if you have looked at the higher-level Streams
> > > DSL,
> > > >> and
> > > >> > if yes could let me know what are the limitations from using
that
> > > APIs in
> > > >> > your case?
> > > >> >
> > > >>
> > > >> Well, I read though high-level DSL interface but couldn't find an
> easy
> > > >> way to handle output from Processors which could issue multiple
> > > >> messages to arbitrary different destinations.
> > > >> Maybe it could be done by doing something like below but it doesn't
> > > >> look good. Please let me know if you have any idea to do this in
> > > >> easier way.
> > > >>
> > > >> ```java
> > > >> class KeyValueWithDestination {
> > > >>     K key;
> > > >>     V value;
> > > >>     String destination;
> > > >> }
> > > >>
> > > >> class DestinationPredicate implements Predicate<K,
> > > >> KeyValueWithDestination> {
> > > >>     String destination;
> > > >>     @Override
> > > >>     public boolean test(K key, KeyValueWithDestination value) {
> > > >>         return value.destination.equals(destination);
> > > >>     }
> > > >> }
> > > >>
> > > >> String[] destTopics = {"topicA", "topicB", "topicC"};
> > > >>
> > > >> Predicate<K, KeyValueWithDestination>[] predicates =
> > > >>         Arrays.stream(destTopics).map(DestinationPredicate::new)
> > > >>                                  .toArray(Predicate<K,
> > > >> KeyValueWithDestination>::new);
> > > >>
> > > >> branches = builder.stream("foo")
> > > >>                   .map((key, value) -> processor.process(key, value)
> > > >> /* => KeyValueWithDestination */)
> > > >>                   .branch(predicates);
> > > >>
> > > >> for (int i = 0; i < branches.length; i++) {
> > > >>     branches[i].to(destTopics[i]);
> > > >> }
> > > >> ```
> > > >>
> > > >>
> > > >> > Guozhang
> > > >> >
> > > >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <
> > > >> kawamuray.dadada@gmail.com>
> > > >> > wrote:
> > > >> >
> > > >> >> When I tried to implement a task which does kinda dispatching
to
> > > >> >> downstream processors or sinks, looks like relying on
> > > >> >> context.forward(K, V, int childIndex) is the only way now.
> > > >> >> I have a question why this method implemented using
> > childIndex(which
> > > >> >> is just an index of children "List" that based on order of
> > > >> >> builder.addProcessor() call) instead of child name(first
argument
> > to
> > > >> >> add{Processor,Sink}).
> > > >> >> I wanna ask what is the concrete use case of forward(K, V,
int
> > > >> >> childIndex) and is it makes sense to introduce another overload:
> > > >> >> forward(K, V, String childName) for much handy use.
> > > >> >> Currently I have a use-case like this in my mind:
> > > >> >> ```
> > > >> >> builder.addProcessor("DispatchProcess", new
> > > >> >> DispatchProcessorSupplier(), "Source");
> > > >> >> builder.addProcessor("Process-A", new ProcessorASupplier(),
> > > >> >> "DispatchProcess");
> > > >> >> builder.addProcessor("Process-B", new ProcessorBSupplier(),
> > > >> >> "DispatchProcess");
> > > >> >>
> > > >> >> // in process(key, value)
> > > >> >> if ("key-for-A".equals(key)) {
> > > >> >>     context.forward(key, value, "Process-A");
> > > >> >> } else if ("key-for-B".equals(key)) {
> > > >> >>     context.forward(key, value, "Process-B");
> > > >> >> }
> > > >> >> ```
> > > >> >>
> > > >> >
> > > >> >
> > > >> >
> > > >> > --
> > > >> > -- Guozhang
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message