kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nan Xu <nanxu1...@gmail.com>
Subject Re: kstream transform forward to different topics
Date Thu, 07 Feb 2019 16:51:41 GMT
hmm, but my DSL logic at beginning involve some join between different
streams, so I feel that will be quit complex to write everything in PAPI.
what if I do this. in the transform, I return all 3 classes as a tuple.
then to map 3 times on the same stream like this
transformer {
return (class1Instance, class2Instance, class3Instance)
}
val kstream = inputStream.transform(transformer)
kstream.map((r1,r2,r3) => r1).to("topic1")
kstream.map((r1,r2,r3) => r2).to("topic2")
kstream.map((r1,r2,r3) => r3).to("topic3")
but don't know if it is the recommended way.

Thanks,
Nan

On Thu, Feb 7, 2019 at 10:12 AM Bill Bejeck <bill@confluent.io> wrote:

> Hi Nan,
>
> I wanted to follow up some more.
>
> Since you need your Transformer forward to 3 output topics or more
> generally any time you want a processor to forward to multiple child nodes
> or specific nodes in the topology, you can best achieve this kind of
> control and flexibility using the PAPI.
>
> Thanks,
> Bill
>
> On Thu, Feb 7, 2019 at 10:47 AM Bill Bejeck <bill@confluent.io> wrote:
>
> > Hi Nan,
> >
> > What I'm suggesting is do the entire topology in the PAPI, sorry if I
> > didn't make this clear from before.
> >
> > Thanks,
> > Bill
> >
> > On Thu, Feb 7, 2019 at 10:41 AM Nan Xu <nanxu1980@gmail.com> wrote:
> >
> >> thanks, just to make sure I understand this correctly,.
> >>
> >> I have some processing logic using DSL, after those processing, I have a
> >> kstream, from this kstream, I need to do a transform and put result to
> >> different topics. To use processor api, I need to put this kstream to a
> >> topic, then use topology.addSource("source-node", "input-topic");
> >> something like
> >>
> >>  val streamBuilder = new StreamsBuilder()
> >>  val inputStream = streamBuilder.stream[String, StoreInput](begin_topic)
> >>
> >> //some DSL processing....
> >> val resultKStream = inputStream.map(xxxx).fitler..............
> >>
> >>
> >> resultKStream .to("inter_topic")
> >>
> >> final Topology topology = new Topology();
> >> topology.addSource("source-node", " inter_topic");
> >> topology.addProcessor("transformer", () -> new MyTransfomer(),
> >> "source-node");
> >>
> >> so if I have to put my intermediate result to inter_topic, is there any
> >> performance implication? Not sure if I am right, but sounds to me that
> >> will
> >> cause one more hop from client(stream app) to kakfa brokers.  beginning
> >> DSL
> >> processing is happening on the client side.  then have to put the result
> >> back to broker, then read back to client to use processor api.
> >>
> >> Thanks,
> >> Nan
> >>
> >>
> >>
> >>
> >> On Thu, Feb 7, 2019 at 9:18 AM Bill Bejeck <bill@confluent.io> wrote:
> >>
> >> > Hi Nan,
> >> >
> >> > To forward to the 3 different topics it will probably be easier to do
> >> this
> >> > in the Processor API.  Based off what your stated in your question,
> the
> >> > topology will look something like this:
> >> >
> >> > final Topology topology = new Topology();
> >> > topology.addSource("source-node", "input-topic");
> >> > topology.addProcessor("transformer", () -> new MyTransfomer(),
> >> > "source-node");
> >> > topology.addSink("sink-node-1", "output-topic-1", "transformer");
> >> > topology.addSink("sink-node-2", "output-topic-2", "transformer");
> >> > topology.addSink("sink-node-3", "output-topic-3", "transformer");
> >> >
> >> > As you can see, the "transformer" is the parent node of all 3 sink
> >> nodes.
> >> > Then in your Transformer, you can forward the key-value pairs by using
> >> one
> >> > of two approaches.
> >> >
> >> > Sending to all child nodes with this call:
> >> >
> >> > context().forward(key, value, To.all()).
> >> >
> >> > Or by listing each child node individually like so
> >> >
> >> > context().forward(key, value, To.child("sink-node-1"));
> >> > context().forward(key, value, To.child("sink-node-2"));
> >> > context().forward(key, value, To.child("sink-node-3"));
> >> >
> >> > HTH,
> >> >
> >> > Bill
> >> >
> >> >
> >> >
> >> >
> >> > On Thu, Feb 7, 2019 at 12:13 AM Nan Xu <nanxu1980@gmail.com> wrote:
> >> >
> >> > > when I do the transform, for a single input record, I need to
> output 3
> >> > > different records, those 3 records are in different classes.  I want
> >> to
> >> > > send the each type of records to a separate topic, my understanding
> >> is I
> >> > > should use
> >> > >
> >> > > context.forward inside the transformer  like
> >> > >
> >> > > Transformer{..
> >> > > context.forward(key, record1, To.child("topic1"))
> >> > > context.forward(key, value1, To.child("topic2"))
> >> > > }
> >> > > but how do I define those processor, I can create them in topology
> but
> >> > who
> >> > > should be their parent? what's the name of the parent?
> >> > >
> >> > > stream.transform(transformer) don't give me a way to say processor
> >> name.
> >> > >
> >> > > Thanks,
> >> > > Nan
> >> > >
> >> >
> >>
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message