kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bill Bejeck <b...@confluent.io>
Subject Re: kstream transform forward to different topics
Date Thu, 07 Feb 2019 15:47:07 GMT
Hi Nan,

What I'm suggesting is do the entire topology in the PAPI, sorry if I
didn't make this clear from before.

Thanks,
Bill

On Thu, Feb 7, 2019 at 10:41 AM Nan Xu <nanxu1980@gmail.com> wrote:

> thanks, just to make sure I understand this correctly,.
>
> I have some processing logic using DSL, after those processing, I have a
> kstream, from this kstream, I need to do a transform and put result to
> different topics. To use processor api, I need to put this kstream to a
> topic, then use topology.addSource("source-node", "input-topic");
> something like
>
>  val streamBuilder = new StreamsBuilder()
>  val inputStream = streamBuilder.stream[String, StoreInput](begin_topic)
>
> //some DSL processing....
> val resultKStream = inputStream.map(xxxx).fitler..............
>
>
> resultKStream .to("inter_topic")
>
> final Topology topology = new Topology();
> topology.addSource("source-node", " inter_topic");
> topology.addProcessor("transformer", () -> new MyTransfomer(),
> "source-node");
>
> so if I have to put my intermediate result to inter_topic, is there any
> performance implication? Not sure if I am right, but sounds to me that will
> cause one more hop from client(stream app) to kakfa brokers.  beginning DSL
> processing is happening on the client side.  then have to put the result
> back to broker, then read back to client to use processor api.
>
> Thanks,
> Nan
>
>
>
>
> On Thu, Feb 7, 2019 at 9:18 AM Bill Bejeck <bill@confluent.io> wrote:
>
> > Hi Nan,
> >
> > To forward to the 3 different topics it will probably be easier to do
> this
> > in the Processor API.  Based off what your stated in your question, the
> > topology will look something like this:
> >
> > final Topology topology = new Topology();
> > topology.addSource("source-node", "input-topic");
> > topology.addProcessor("transformer", () -> new MyTransfomer(),
> > "source-node");
> > topology.addSink("sink-node-1", "output-topic-1", "transformer");
> > topology.addSink("sink-node-2", "output-topic-2", "transformer");
> > topology.addSink("sink-node-3", "output-topic-3", "transformer");
> >
> > As you can see, the "transformer" is the parent node of all 3 sink nodes.
> > Then in your Transformer, you can forward the key-value pairs by using
> one
> > of two approaches.
> >
> > Sending to all child nodes with this call:
> >
> > context().forward(key, value, To.all()).
> >
> > Or by listing each child node individually like so
> >
> > context().forward(key, value, To.child("sink-node-1"));
> > context().forward(key, value, To.child("sink-node-2"));
> > context().forward(key, value, To.child("sink-node-3"));
> >
> > HTH,
> >
> > Bill
> >
> >
> >
> >
> > On Thu, Feb 7, 2019 at 12:13 AM Nan Xu <nanxu1980@gmail.com> wrote:
> >
> > > when I do the transform, for a single input record, I need to output 3
> > > different records, those 3 records are in different classes.  I want to
> > > send the each type of records to a separate topic, my understanding is
> I
> > > should use
> > >
> > > context.forward inside the transformer  like
> > >
> > > Transformer{..
> > > context.forward(key, record1, To.child("topic1"))
> > > context.forward(key, value1, To.child("topic2"))
> > > }
> > > but how do I define those processor, I can create them in topology but
> > who
> > > should be their parent? what's the name of the parent?
> > >
> > > stream.transform(transformer) don't give me a way to say processor
> name.
> > >
> > > Thanks,
> > > Nan
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message