kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bill Bejeck <b...@confluent.io>
Subject Re: kstream transform forward to different topics
Date Thu, 07 Feb 2019 15:18:30 GMT
Hi Nan,

To forward to the 3 different topics it will probably be easier to do this
in the Processor API.  Based off what your stated in your question, the
topology will look something like this:

final Topology topology = new Topology();
topology.addSource("source-node", "input-topic");
topology.addProcessor("transformer", () -> new MyTransfomer(),
"source-node");
topology.addSink("sink-node-1", "output-topic-1", "transformer");
topology.addSink("sink-node-2", "output-topic-2", "transformer");
topology.addSink("sink-node-3", "output-topic-3", "transformer");

As you can see, the "transformer" is the parent node of all 3 sink nodes.
Then in your Transformer, you can forward the key-value pairs by using one
of two approaches.

Sending to all child nodes with this call:

context().forward(key, value, To.all()).

Or by listing each child node individually like so

context().forward(key, value, To.child("sink-node-1"));
context().forward(key, value, To.child("sink-node-2"));
context().forward(key, value, To.child("sink-node-3"));

HTH,

Bill




On Thu, Feb 7, 2019 at 12:13 AM Nan Xu <nanxu1980@gmail.com> wrote:

> when I do the transform, for a single input record, I need to output 3
> different records, those 3 records are in different classes.  I want to
> send the each type of records to a separate topic, my understanding is I
> should use
>
> context.forward inside the transformer  like
>
> Transformer{..
> context.forward(key, record1, To.child("topic1"))
> context.forward(key, value1, To.child("topic2"))
> }
> but how do I define those processor, I can create them in topology but who
> should be their parent? what's the name of the parent?
>
> stream.transform(transformer) don't give me a way to say processor name.
>
> Thanks,
> Nan
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message