kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nan Xu <nanxu1...@gmail.com>
Subject Re: kstream transform forward to different topics
Date Thu, 07 Feb 2019 15:40:58 GMT
thanks, just to make sure I understand this correctly,.

I have some processing logic using DSL, after those processing, I have a
kstream, from this kstream, I need to do a transform and put result to
different topics. To use processor api, I need to put this kstream to a
topic, then use topology.addSource("source-node", "input-topic");
something like

 val streamBuilder = new StreamsBuilder()
 val inputStream = streamBuilder.stream[String, StoreInput](begin_topic)

//some DSL processing....
val resultKStream = inputStream.map(xxxx).fitler..............


resultKStream .to("inter_topic")

final Topology topology = new Topology();
topology.addSource("source-node", " inter_topic");
topology.addProcessor("transformer", () -> new MyTransfomer(),
"source-node");

so if I have to put my intermediate result to inter_topic, is there any
performance implication? Not sure if I am right, but sounds to me that will
cause one more hop from client(stream app) to kakfa brokers.  beginning DSL
processing is happening on the client side.  then have to put the result
back to broker, then read back to client to use processor api.

Thanks,
Nan




On Thu, Feb 7, 2019 at 9:18 AM Bill Bejeck <bill@confluent.io> wrote:

> Hi Nan,
>
> To forward to the 3 different topics it will probably be easier to do this
> in the Processor API.  Based off what your stated in your question, the
> topology will look something like this:
>
> final Topology topology = new Topology();
> topology.addSource("source-node", "input-topic");
> topology.addProcessor("transformer", () -> new MyTransfomer(),
> "source-node");
> topology.addSink("sink-node-1", "output-topic-1", "transformer");
> topology.addSink("sink-node-2", "output-topic-2", "transformer");
> topology.addSink("sink-node-3", "output-topic-3", "transformer");
>
> As you can see, the "transformer" is the parent node of all 3 sink nodes.
> Then in your Transformer, you can forward the key-value pairs by using one
> of two approaches.
>
> Sending to all child nodes with this call:
>
> context().forward(key, value, To.all()).
>
> Or by listing each child node individually like so
>
> context().forward(key, value, To.child("sink-node-1"));
> context().forward(key, value, To.child("sink-node-2"));
> context().forward(key, value, To.child("sink-node-3"));
>
> HTH,
>
> Bill
>
>
>
>
> On Thu, Feb 7, 2019 at 12:13 AM Nan Xu <nanxu1980@gmail.com> wrote:
>
> > when I do the transform, for a single input record, I need to output 3
> > different records, those 3 records are in different classes.  I want to
> > send the each type of records to a separate topic, my understanding is I
> > should use
> >
> > context.forward inside the transformer  like
> >
> > Transformer{..
> > context.forward(key, record1, To.child("topic1"))
> > context.forward(key, value1, To.child("topic2"))
> > }
> > but how do I define those processor, I can create them in topology but
> who
> > should be their parent? what's the name of the parent?
> >
> > stream.transform(transformer) don't give me a way to say processor name.
> >
> > Thanks,
> > Nan
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message