kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nicolas Fouché <nfou...@onfocus.io>
Subject Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`
Date Wed, 18 Jan 2017 16:29:22 GMT
Ho my, I'm dumb. One can give multiple predicates to `KStream.branch()`.

2017-01-18 17:18 GMT+01:00 Nicolas Fouché <nfouche@onfocus.io>:

> The reason I would not use `KStream.transform()` is that I want to call
> `ProcessorContext.forward()` several times, to different children. These
> children are sinks.
> My use case: I need to route my beacons to different topics. Right now, I
> use a series of `KStream.branch()` calls [1]. But would it be more
> "elegant" to be able to add 5 sinks to a topology, and forward my records
> to them in a custom processor ?
>
> Damian: About `TopologyBuilder.addProcessor(...)`, as far as I know, I
> have to give a parent processor. But the parent processor was generated by
> a high-level topologies. And names of processors created by
> `KStreamBuilder` are not accessible. (unless by inspecting the topology
> nodes I guess)
>
> [1] https://gist.github.com/nfo/c4936a24601352db23b18653a8ccc352
>
> Thanks.
> Nicolas
>
>
> 2017-01-18 15:56 GMT+01:00 Michael Noll <michael@confluent.io>:
>
>> Nicolas,
>>
>> if I understand your question correctly you'd like to add further
>> operations after having called `KStream#process()`, which -- as you report
>> -- doesn't work because `process()` returns void.
>>
>> If that's indeed the case, +1 to Damian's suggest to use
>> `KStream.transform()` instead of `KStream.process()`.
>>
>> -Michael
>>
>>
>>
>>
>> On Wed, Jan 18, 2017 at 3:31 PM, Damian Guy <damian.guy@gmail.com> wrote:
>>
>> > You could possibly also use KStream.transform(...)
>> >
>> > On Wed, 18 Jan 2017 at 14:22 Damian Guy <damian.guy@gmail.com> wrote:
>> >
>> > > Hi Nicolas,
>> > >
>> > > Good question! I'm not sure why it is a terminal operation, maybe one
>> of
>> > > the original authors can chip in. However, you could probably work
>> around
>> > > it by using TopologyBuilder.addProcessor(...) rather then
>> > KStream.process
>> > >
>> > > Thanks,
>> > > Damian
>> > >
>> > > On Wed, 18 Jan 2017 at 13:48 Nicolas Fouché <nfouche@onfocus.io>
>> wrote:
>> > >
>> > > Hi,
>> > >
>> > > as far as I understand, calling `KStream.process` prevents the
>> developer
>> > > from adding further operations to a `KStreamBuilder` [1], because its
>> > > return type is `void`. Good.
>> > >
>> > > But it also prevents the developer from adding operations to its
>> > superclass
>> > > `TopologyBuilder`. In my case I wanted to add a sink, and the parent
>> of
>> > > this sink would be the name of the Processor that is created by
>> > > `KStream.process`. Is there any reason why this method does not return
>> > the
>> > > processor name [2] ? Is it because it would be a bad idea continuing
>> > > building my topology with the low-level API ?
>> > >
>> > > [1]
>> > >
>> > > https://github.com/confluentinc/examples/blob/3.
>> > 1.x/kafka-streams/src/test/java/io/confluent/examples/streams/
>> > MixAndMatchLambdaIntegrationTest.java%23L56
>> > > [2]
>> > >
>> > > https://github.com/apache/kafka/blob/b6011918fbc36bfaa465bdcc750e24
>> > 35985d9101/streams/src/main/java/org/apache/kafka/streams/
>> > kstream/internals/KStreamImpl.java#L391
>> > >
>> > >
>> > > Thanks.
>> > > Nicolas.
>> > >
>> > >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message