kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From josh gruenberg <jos...@gmail.com>
Subject Re: Kafka Streams: context.forward() with downstream name
Date Tue, 05 Apr 2016 21:30:13 GMT
Hi Guozhang,

I'll reply to your points in-line below:

On Tue, Apr 5, 2016 at 10:23 AM Guozhang Wang <wangguoz@gmail.com> wrote:

> Hi Josh,
>
> I think there are a few issues that we want to resolve here, which could be
> orthogonal to each other.
>
> 1) one-to-many mapping in transform() function that generates a single
> stream (i.e. single typed key-value pairs).
>
> Since transform() already enforces to make type-safe return values, one
> thing we can do is to change the punctuate() function return value from
> "null" to "R" as well. And then for one-to-many mapping one can then define
> R as Array<MyType>
>
> stream.transform().flatMap(/* from Array<MyType> to MyType*/)
>
>
Interesting, it hadn't thought of returning an Iterable from a Transformer
to achieve the one-to-many case. Regardless, my initial reaction is that
this seems natural when you're working in the declarative DSL (where we can
move the flatMap to the left of the one-to-many lambda), but seems
cumbersome when writing a lower-level imperative Transformer/Processor
(which then requires a subsequent flatMap to explode the arrays). I think
I'd prefer a more imperative, side-effecting style of emission in this
case; see my reply to #2 below.


> 2) having a new function that takes one stream, and generate multiple
> streams with different key-value types.
>
> This is a good-to-have operator in the Streams DSL, and I think this is
> your proposed new API in the previous email? I am not sure I
> understand the "telescoping"
> arity completely though, so let me know if I'm wrong.
>

Yes, sorry, I wrote my previous mail in a hurry this morning. By
"telescoping", I meant defining processor/transformer interfaces for each
supported output arity, with the corresponding output-types expressed as
generics:

   - Processor1<K, V, K1, V1>
   - Processor2<K, V, K1, V1, K2, V2>
   - Processor3<K, V, K1, V1, K2, V2, K3, V3>

(Yes, the number of generics here is getting unwieldy, but I don't
immediately see a good way to avoid that while preserving type-safety. Some
sort of cosmetic improvement would be nice!)

Given this, the framework could inject type-safe "emitters" into the
Processors for each output-stream:

interface Processor2<K, V, K1, V1, K2, V2> {
  void init(ProcessorContext context, Forwarder<K1, V1> output1,
Forwarder<K2, V2> output2);

  // these can use the forwarders provided to init() to emit any number of
values
  void process(I input);
  void punctuate(long timestamp);
  // ...
}

interface Forwarder<K, V> {
  void forward(K key, V value);
}

... then, in KStream<K,V>:

<K1, V1, K2, V2> KStreamTuple2<K1, V1, K2, V2> process2(Processor2<K, V,
K1, V1, K2, V2> processor, String... stateStores);

I haven't worked through all of the details, but I'm optimistic that this
could work nicely to unify the Transformer and Processor APIs, and address
all of the described use-cases (up to some arbitrarily-chosen number of
supported output-streams).


> 3) having data-driven emission policy (this will be the building block of
> session windows) as well as time-drive emission policy.
>
> I am thinking about how to support this as well, one thing is that we can
> use the underlying process() function for data-driven emission, for
> example, if there is a session-start / end flag then create the
> corresponding session record in state, and only emit upon session-end flag;
> and the underlying punctuate() function for time-drive emission (we
> probably need to first refactor it to be triggered by record timestamp
> instead of wallclock time).
>

Yes, I agree: data-driven emission could work just fine with process(), and
delayed emission works nicely with punctuate(). I've also been meaning to
mention the clear need for event-time punctuation, so I'm glad to hear
that's on your radar! Watermarking will be important for session-windowing.

Thoughts?

-josh


>
> On Tue, Apr 5, 2016 at 8:24 AM, josh gruenberg <joshgr@gmail.com> wrote:
>
> > Hi all,
> >
> > Just chiming in with Yuto: I think the custom Processor becomes
> attractive
> > in scenarios where a node in the graph may emit to a variety of
> downstream
> > paths, possibly after some delay, depending on logic. This can probably
> > often be achieved with the existing DSL using some combination of
> > predicates and intermediate representations, but this involves
> contortions
> > that feel cumbersome, and probably leads to less intelligible code. I'm
> > also not sure the current DSL can model scenarios where the
> transformation
> > may be one-to-many, as in the last part of Yuto's example, or where the
> > emission-delay is data-driven, as in my earlier "sessionization" example.
> >
> > One idea I'd offer is to provide a mechanism for wiring in Processors
> with
> > "telescoping" arity (eg, support Processor1<I, O1>, Processor2<I, O1,
> O2>,
> > etc), and providing each arity with type-safe forwarding interfaces for
> > each output stream (eg, Forwarder<T>). This assigns each output-stream a
> > clear ordinal, and suggests a corresponding type-safe return-type for the
> > DSL (eg, KStreamTuple2<O1, O2>).
> >
> > I think this pattern could provide a unification of the 'Transformer' and
> > 'Processor' APIs.
> > This was what I had in mind for a PR we discussed earlier (for modifying
> > the Transformer API), but the scope expanded beyond what I felt
> comfortable
> > submitting without discussion, and I had to prioritize other efforts.
> > Regardless, I could get a WIP branch pushed to github later today to
> > illustrate if you'd like to see it.
> >
> > HTH,
> > -josh
> >
> > On Mon, Apr 4, 2016, 9:14 PM Guozhang Wang <wangguoz@gmail.com> wrote:
> >
> > > Thanks Yuto for your code snippet. Since you need to access a
> customized
> > > external storage for metadata, that indeed cannot be wrapped in any
> > > built-in operators in the Streams DSL yet, and your code example in the
> > > previous email would be close to the best you can do with the
> high-level
> > > DSL now.
> > >
> > > One minor improvement from your above code, though, is that instead of
> > > calling map(... -> process()) you can actually call transform(), which
> > > still allows you to provide a customized transformer function, but it
> > still
> > > gives you strong typing assuming all these three kinds of records are
> of
> > > the same key / value types.
> > >
> > > Guozhang
> > >
> > >
> > >
> > >
> > > On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA <
> > kawamuray.dadada@gmail.com
> > > >
> > > wrote:
> > >
> > > > 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wangguoz@gmail.com>:
> > > > > Hi Yuto,
> > > > >
> > > > > Is the destination topic embedded as part of the value in the
> > original
> > > > > "foo" topic? If yes could you just access that field directly
> instead
> > > of
> > > > > mapping to a (key, value, destination) triplet?
> > > > >
> > > >
> > > > Nope. KeyValueWithDestination is just an example of output from the
> > > > first Processor and is not included in actual messages that the topic
> > > > foo received.
> > > > Let me explain bit more realistic use-case. How can we write a
> > > > Processor like below in High-level DSL cleanly?
> > > >
> > > > ```java
> > > > public class EventProcessor implements Processor<String, Event>
{
> > > > ...
> > > >   @Override
> > > >   public void process(String key, Event value) {
> > > >       EventMetadata meta =
> > > > getEventMetadataFromExternalStorage(value.getId());
> > > >
> > > >       if (isFieldACorrupted(meta, value.getFieldA())) {
> > > >           // This event is corrupted! let's evacuate it once to the
> > > > grave topic for further investigation.
> > > >           context.forward(key, value, "CorruptedEventSink");
> > > >       }
> > > >       if (isFieldBCorrupted(meta, value.getFieldB())) {
> > > >           // Antoher case of corruption, but maybe recoverable.
> > > >           context.forward(key, value,
> > "CorruptedEventRecoveryProcessor");
> > > >       }
> > > >
> > > >       for (Foo foo : event.getFoos()) {
> > > >           context.forward(key, buildMessage(meta, foo),
> > "FooProcessor");
> > > >       }
> > > >   }
> > > > ...
> > > > }
> > > > ```
> > > >
> > > >
> > > > > Guozhang
> > > > >
> > > > > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <
> > > > kawamuray.dadada@gmail.com>
> > > > > wrote:
> > > > >
> > > > >> Hi Guozhang,
> > > > >>
> > > > >>
> > > > >>
> > > > >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wangguoz@gmail.com>:
> > > > >> > Hi Yuto,
> > > > >> >
> > > > >> > That is a good suggestion, the child index is not very intuitive
> > > from
> > > > >> > programmer's view and we can even consider replacing it
with the
> > > > >> processor
> > > > >> > name instead of overloading it. Could you file a JIRA?
> > > > >> >
> > > > >>
> > > > >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497
> > > > >>
> > > > >> > Also I am wondering if you have looked at the higher-level
> Streams
> > > > DSL,
> > > > >> and
> > > > >> > if yes could let me know what are the limitations from using
> that
> > > > APIs in
> > > > >> > your case?
> > > > >> >
> > > > >>
> > > > >> Well, I read though high-level DSL interface but couldn't find
an
> > easy
> > > > >> way to handle output from Processors which could issue multiple
> > > > >> messages to arbitrary different destinations.
> > > > >> Maybe it could be done by doing something like below but it
> doesn't
> > > > >> look good. Please let me know if you have any idea to do this
in
> > > > >> easier way.
> > > > >>
> > > > >> ```java
> > > > >> class KeyValueWithDestination {
> > > > >>     K key;
> > > > >>     V value;
> > > > >>     String destination;
> > > > >> }
> > > > >>
> > > > >> class DestinationPredicate implements Predicate<K,
> > > > >> KeyValueWithDestination> {
> > > > >>     String destination;
> > > > >>     @Override
> > > > >>     public boolean test(K key, KeyValueWithDestination value)
{
> > > > >>         return value.destination.equals(destination);
> > > > >>     }
> > > > >> }
> > > > >>
> > > > >> String[] destTopics = {"topicA", "topicB", "topicC"};
> > > > >>
> > > > >> Predicate<K, KeyValueWithDestination>[] predicates =
> > > > >>         Arrays.stream(destTopics).map(DestinationPredicate::new)
> > > > >>                                  .toArray(Predicate<K,
> > > > >> KeyValueWithDestination>::new);
> > > > >>
> > > > >> branches = builder.stream("foo")
> > > > >>                   .map((key, value) -> processor.process(key,
> value)
> > > > >> /* => KeyValueWithDestination */)
> > > > >>                   .branch(predicates);
> > > > >>
> > > > >> for (int i = 0; i < branches.length; i++) {
> > > > >>     branches[i].to(destTopics[i]);
> > > > >> }
> > > > >> ```
> > > > >>
> > > > >>
> > > > >> > Guozhang
> > > > >> >
> > > > >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <
> > > > >> kawamuray.dadada@gmail.com>
> > > > >> > wrote:
> > > > >> >
> > > > >> >> When I tried to implement a task which does kinda dispatching
> to
> > > > >> >> downstream processors or sinks, looks like relying on
> > > > >> >> context.forward(K, V, int childIndex) is the only way
now.
> > > > >> >> I have a question why this method implemented using
> > > childIndex(which
> > > > >> >> is just an index of children "List" that based on order
of
> > > > >> >> builder.addProcessor() call) instead of child name(first
> argument
> > > to
> > > > >> >> add{Processor,Sink}).
> > > > >> >> I wanna ask what is the concrete use case of forward(K,
V, int
> > > > >> >> childIndex) and is it makes sense to introduce another
> overload:
> > > > >> >> forward(K, V, String childName) for much handy use.
> > > > >> >> Currently I have a use-case like this in my mind:
> > > > >> >> ```
> > > > >> >> builder.addProcessor("DispatchProcess", new
> > > > >> >> DispatchProcessorSupplier(), "Source");
> > > > >> >> builder.addProcessor("Process-A", new ProcessorASupplier(),
> > > > >> >> "DispatchProcess");
> > > > >> >> builder.addProcessor("Process-B", new ProcessorBSupplier(),
> > > > >> >> "DispatchProcess");
> > > > >> >>
> > > > >> >> // in process(key, value)
> > > > >> >> if ("key-for-A".equals(key)) {
> > > > >> >>     context.forward(key, value, "Process-A");
> > > > >> >> } else if ("key-for-B".equals(key)) {
> > > > >> >>     context.forward(key, value, "Process-B");
> > > > >> >> }
> > > > >> >> ```
> > > > >> >>
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > --
> > > > >> > -- Guozhang
> > > > >>
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > >
> > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
>
> --
> -- Guozhang
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message