kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Kafka Streams: context.forward() with downstream name
Date Tue, 05 Apr 2016 23:13:30 GMT
HI Josh,

Re 1): transform is only for usage in the higher-level DSL, while in the
lower-level APIs people are expected to work with Processor only, which for
now use context.forward() to send record to the downstream processors.

Re 2): I have a few questions for your propose: with different typed
key-value pairs, are they supposed to be forwarding to different children
processors, i.e. you'd better call "forward(K, V, String /* child name*/ )"
rather than "forward(K, V)" which will forward to all the children
processors, one by one. In this case, the topology builder also needs to be
refactored since we need to make sure the children processors exist when
the processors are defined, which could be a bit tricky in implementation.

Also, is this change also aimed at making the lower-level Processor to be
type-safe as well? The main motivation for not supporting strong typed
Processors is mainly for allowing users to flexibly connect processors in
the topology without worrying about data types of each processor. As I
mentioned above, making specific forwards to downstream processors would
likely to defeat this purpose.


Guozhang


On Tue, Apr 5, 2016 at 2:30 PM, josh gruenberg <joshgr@gmail.com> wrote:

> Hi Guozhang,
>
> I'll reply to your points in-line below:
>
> On Tue, Apr 5, 2016 at 10:23 AM Guozhang Wang <wangguoz@gmail.com> wrote:
>
> > Hi Josh,
> >
> > I think there are a few issues that we want to resolve here, which could
> be
> > orthogonal to each other.
> >
> > 1) one-to-many mapping in transform() function that generates a single
> > stream (i.e. single typed key-value pairs).
> >
> > Since transform() already enforces to make type-safe return values, one
> > thing we can do is to change the punctuate() function return value from
> > "null" to "R" as well. And then for one-to-many mapping one can then
> define
> > R as Array<MyType>
> >
> > stream.transform().flatMap(/* from Array<MyType> to MyType*/)
> >
> >
> Interesting, it hadn't thought of returning an Iterable from a Transformer
> to achieve the one-to-many case. Regardless, my initial reaction is that
> this seems natural when you're working in the declarative DSL (where we can
> move the flatMap to the left of the one-to-many lambda), but seems
> cumbersome when writing a lower-level imperative Transformer/Processor
> (which then requires a subsequent flatMap to explode the arrays). I think
> I'd prefer a more imperative, side-effecting style of emission in this
> case; see my reply to #2 below.
>
>
> > 2) having a new function that takes one stream, and generate multiple
> > streams with different key-value types.
> >
> > This is a good-to-have operator in the Streams DSL, and I think this is
> > your proposed new API in the previous email? I am not sure I
> > understand the "telescoping"
> > arity completely though, so let me know if I'm wrong.
> >
>
> Yes, sorry, I wrote my previous mail in a hurry this morning. By
> "telescoping", I meant defining processor/transformer interfaces for each
> supported output arity, with the corresponding output-types expressed as
> generics:
>
>    - Processor1<K, V, K1, V1>
>    - Processor2<K, V, K1, V1, K2, V2>
>    - Processor3<K, V, K1, V1, K2, V2, K3, V3>
>
> (Yes, the number of generics here is getting unwieldy, but I don't
> immediately see a good way to avoid that while preserving type-safety. Some
> sort of cosmetic improvement would be nice!)
>
> Given this, the framework could inject type-safe "emitters" into the
> Processors for each output-stream:
>
> interface Processor2<K, V, K1, V1, K2, V2> {
>   void init(ProcessorContext context, Forwarder<K1, V1> output1,
> Forwarder<K2, V2> output2);
>
>   // these can use the forwarders provided to init() to emit any number of
> values
>   void process(I input);
>   void punctuate(long timestamp);
>   // ...
> }
>
> interface Forwarder<K, V> {
>   void forward(K key, V value);
> }
>
> ... then, in KStream<K,V>:
>
> <K1, V1, K2, V2> KStreamTuple2<K1, V1, K2, V2> process2(Processor2<K,
V,
> K1, V1, K2, V2> processor, String... stateStores);
>
> I haven't worked through all of the details, but I'm optimistic that this
> could work nicely to unify the Transformer and Processor APIs, and address
> all of the described use-cases (up to some arbitrarily-chosen number of
> supported output-streams).
>
>
> > 3) having data-driven emission policy (this will be the building block of
> > session windows) as well as time-drive emission policy.
> >
> > I am thinking about how to support this as well, one thing is that we can
> > use the underlying process() function for data-driven emission, for
> > example, if there is a session-start / end flag then create the
> > corresponding session record in state, and only emit upon session-end
> flag;
> > and the underlying punctuate() function for time-drive emission (we
> > probably need to first refactor it to be triggered by record timestamp
> > instead of wallclock time).
> >
>
> Yes, I agree: data-driven emission could work just fine with process(), and
> delayed emission works nicely with punctuate(). I've also been meaning to
> mention the clear need for event-time punctuation, so I'm glad to hear
> that's on your radar! Watermarking will be important for session-windowing.
>
> Thoughts?
>
> -josh
>
>
> >
> > On Tue, Apr 5, 2016 at 8:24 AM, josh gruenberg <joshgr@gmail.com> wrote:
> >
> > > Hi all,
> > >
> > > Just chiming in with Yuto: I think the custom Processor becomes
> > attractive
> > > in scenarios where a node in the graph may emit to a variety of
> > downstream
> > > paths, possibly after some delay, depending on logic. This can probably
> > > often be achieved with the existing DSL using some combination of
> > > predicates and intermediate representations, but this involves
> > contortions
> > > that feel cumbersome, and probably leads to less intelligible code. I'm
> > > also not sure the current DSL can model scenarios where the
> > transformation
> > > may be one-to-many, as in the last part of Yuto's example, or where the
> > > emission-delay is data-driven, as in my earlier "sessionization"
> example.
> > >
> > > One idea I'd offer is to provide a mechanism for wiring in Processors
> > with
> > > "telescoping" arity (eg, support Processor1<I, O1>, Processor2<I,
O1,
> > O2>,
> > > etc), and providing each arity with type-safe forwarding interfaces for
> > > each output stream (eg, Forwarder<T>). This assigns each output-stream
> a
> > > clear ordinal, and suggests a corresponding type-safe return-type for
> the
> > > DSL (eg, KStreamTuple2<O1, O2>).
> > >
> > > I think this pattern could provide a unification of the 'Transformer'
> and
> > > 'Processor' APIs.
> > > This was what I had in mind for a PR we discussed earlier (for
> modifying
> > > the Transformer API), but the scope expanded beyond what I felt
> > comfortable
> > > submitting without discussion, and I had to prioritize other efforts.
> > > Regardless, I could get a WIP branch pushed to github later today to
> > > illustrate if you'd like to see it.
> > >
> > > HTH,
> > > -josh
> > >
> > > On Mon, Apr 4, 2016, 9:14 PM Guozhang Wang <wangguoz@gmail.com> wrote:
> > >
> > > > Thanks Yuto for your code snippet. Since you need to access a
> > customized
> > > > external storage for metadata, that indeed cannot be wrapped in any
> > > > built-in operators in the Streams DSL yet, and your code example in
> the
> > > > previous email would be close to the best you can do with the
> > high-level
> > > > DSL now.
> > > >
> > > > One minor improvement from your above code, though, is that instead
> of
> > > > calling map(... -> process()) you can actually call transform(),
> which
> > > > still allows you to provide a customized transformer function, but it
> > > still
> > > > gives you strong typing assuming all these three kinds of records are
> > of
> > > > the same key / value types.
> > > >
> > > > Guozhang
> > > >
> > > >
> > > >
> > > >
> > > > On Sun, Apr 3, 2016 at 10:48 PM, Yuto KAWAMURA <
> > > kawamuray.dadada@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > 2016-04-04 7:20 GMT+09:00 Guozhang Wang <wangguoz@gmail.com>:
> > > > > > Hi Yuto,
> > > > > >
> > > > > > Is the destination topic embedded as part of the value in the
> > > original
> > > > > > "foo" topic? If yes could you just access that field directly
> > instead
> > > > of
> > > > > > mapping to a (key, value, destination) triplet?
> > > > > >
> > > > >
> > > > > Nope. KeyValueWithDestination is just an example of output from the
> > > > > first Processor and is not included in actual messages that the
> topic
> > > > > foo received.
> > > > > Let me explain bit more realistic use-case. How can we write a
> > > > > Processor like below in High-level DSL cleanly?
> > > > >
> > > > > ```java
> > > > > public class EventProcessor implements Processor<String, Event>
{
> > > > > ...
> > > > >   @Override
> > > > >   public void process(String key, Event value) {
> > > > >       EventMetadata meta =
> > > > > getEventMetadataFromExternalStorage(value.getId());
> > > > >
> > > > >       if (isFieldACorrupted(meta, value.getFieldA())) {
> > > > >           // This event is corrupted! let's evacuate it once to the
> > > > > grave topic for further investigation.
> > > > >           context.forward(key, value, "CorruptedEventSink");
> > > > >       }
> > > > >       if (isFieldBCorrupted(meta, value.getFieldB())) {
> > > > >           // Antoher case of corruption, but maybe recoverable.
> > > > >           context.forward(key, value,
> > > "CorruptedEventRecoveryProcessor");
> > > > >       }
> > > > >
> > > > >       for (Foo foo : event.getFoos()) {
> > > > >           context.forward(key, buildMessage(meta, foo),
> > > "FooProcessor");
> > > > >       }
> > > > >   }
> > > > > ...
> > > > > }
> > > > > ```
> > > > >
> > > > >
> > > > > > Guozhang
> > > > > >
> > > > > > On Sun, Apr 3, 2016 at 9:29 AM, Yuto KAWAMURA <
> > > > > kawamuray.dadada@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > >> Hi Guozhang,
> > > > > >>
> > > > > >>
> > > > > >>
> > > > > >> 2016-04-02 3:29 GMT+09:00 Guozhang Wang <wangguoz@gmail.com>:
> > > > > >> > Hi Yuto,
> > > > > >> >
> > > > > >> > That is a good suggestion, the child index is not very
> intuitive
> > > > from
> > > > > >> > programmer's view and we can even consider replacing
it with
> the
> > > > > >> processor
> > > > > >> > name instead of overloading it. Could you file a JIRA?
> > > > > >> >
> > > > > >>
> > > > > >> Yep :) https://issues.apache.org/jira/browse/KAFKA-3497
> > > > > >>
> > > > > >> > Also I am wondering if you have looked at the higher-level
> > Streams
> > > > > DSL,
> > > > > >> and
> > > > > >> > if yes could let me know what are the limitations from
using
> > that
> > > > > APIs in
> > > > > >> > your case?
> > > > > >> >
> > > > > >>
> > > > > >> Well, I read though high-level DSL interface but couldn't
find
> an
> > > easy
> > > > > >> way to handle output from Processors which could issue multiple
> > > > > >> messages to arbitrary different destinations.
> > > > > >> Maybe it could be done by doing something like below but
it
> > doesn't
> > > > > >> look good. Please let me know if you have any idea to do
this in
> > > > > >> easier way.
> > > > > >>
> > > > > >> ```java
> > > > > >> class KeyValueWithDestination {
> > > > > >>     K key;
> > > > > >>     V value;
> > > > > >>     String destination;
> > > > > >> }
> > > > > >>
> > > > > >> class DestinationPredicate implements Predicate<K,
> > > > > >> KeyValueWithDestination> {
> > > > > >>     String destination;
> > > > > >>     @Override
> > > > > >>     public boolean test(K key, KeyValueWithDestination value)
{
> > > > > >>         return value.destination.equals(destination);
> > > > > >>     }
> > > > > >> }
> > > > > >>
> > > > > >> String[] destTopics = {"topicA", "topicB", "topicC"};
> > > > > >>
> > > > > >> Predicate<K, KeyValueWithDestination>[] predicates
=
> > > > > >>         Arrays.stream(destTopics).map(DestinationPredicate::new)
> > > > > >>                                  .toArray(Predicate<K,
> > > > > >> KeyValueWithDestination>::new);
> > > > > >>
> > > > > >> branches = builder.stream("foo")
> > > > > >>                   .map((key, value) -> processor.process(key,
> > value)
> > > > > >> /* => KeyValueWithDestination */)
> > > > > >>                   .branch(predicates);
> > > > > >>
> > > > > >> for (int i = 0; i < branches.length; i++) {
> > > > > >>     branches[i].to(destTopics[i]);
> > > > > >> }
> > > > > >> ```
> > > > > >>
> > > > > >>
> > > > > >> > Guozhang
> > > > > >> >
> > > > > >> > On Fri, Apr 1, 2016 at 1:20 AM, Yuto KAWAMURA <
> > > > > >> kawamuray.dadada@gmail.com>
> > > > > >> > wrote:
> > > > > >> >
> > > > > >> >> When I tried to implement a task which does kinda
dispatching
> > to
> > > > > >> >> downstream processors or sinks, looks like relying
on
> > > > > >> >> context.forward(K, V, int childIndex) is the only
way now.
> > > > > >> >> I have a question why this method implemented using
> > > > childIndex(which
> > > > > >> >> is just an index of children "List" that based
on order of
> > > > > >> >> builder.addProcessor() call) instead of child name(first
> > argument
> > > > to
> > > > > >> >> add{Processor,Sink}).
> > > > > >> >> I wanna ask what is the concrete use case of forward(K,
V,
> int
> > > > > >> >> childIndex) and is it makes sense to introduce
another
> > overload:
> > > > > >> >> forward(K, V, String childName) for much handy
use.
> > > > > >> >> Currently I have a use-case like this in my mind:
> > > > > >> >> ```
> > > > > >> >> builder.addProcessor("DispatchProcess", new
> > > > > >> >> DispatchProcessorSupplier(), "Source");
> > > > > >> >> builder.addProcessor("Process-A", new ProcessorASupplier(),
> > > > > >> >> "DispatchProcess");
> > > > > >> >> builder.addProcessor("Process-B", new ProcessorBSupplier(),
> > > > > >> >> "DispatchProcess");
> > > > > >> >>
> > > > > >> >> // in process(key, value)
> > > > > >> >> if ("key-for-A".equals(key)) {
> > > > > >> >>     context.forward(key, value, "Process-A");
> > > > > >> >> } else if ("key-for-B".equals(key)) {
> > > > > >> >>     context.forward(key, value, "Process-B");
> > > > > >> >> }
> > > > > >> >> ```
> > > > > >> >>
> > > > > >> >
> > > > > >> >
> > > > > >> >
> > > > > >> > --
> > > > > >> > -- Guozhang
> > > > > >>
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message