kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nicolas Fouché <nfou...@onfocus.io>
Subject Re: Kafka Streams: how can I get the name of the Processor when calling `KStream.process`
Date Thu, 19 Jan 2017 09:21:07 GMT
No problem with that. It's perfectly explained in
https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/test/java/io/confluent/examples/streams/StateStoresInTheDSLIntegrationTest.java
.

2017-01-18 19:41 GMT+01:00 Michael Noll <michael@confluent.io>:

> Nicolas,
>
> here's some information I shared on StackOverflow (perhaps a bit outdated
> by now, was back in Aug 2016) about how you can add a state store when
> using KStreamBuilder: http://stackoverflow.com/a/39086805/1743580
>
> -Michael
>
>
>
>
> On Wed, Jan 18, 2017 at 5:18 PM, Nicolas Fouché <nfouche@onfocus.io>
> wrote:
>
> > The reason I would not use `KStream.transform()` is that I want to call
> > `ProcessorContext.forward()` several times, to different children. These
> > children are sinks.
> > My use case: I need to route my beacons to different topics. Right now, I
> > use a series of `KStream.branch()` calls [1]. But would it be more
> > "elegant" to be able to add 5 sinks to a topology, and forward my records
> > to them in a custom processor ?
> >
> > Damian: About `TopologyBuilder.addProcessor(...)`, as far as I know, I
> > have
> > to give a parent processor. But the parent processor was generated by a
> > high-level topologies. And names of processors created by
> `KStreamBuilder`
> > are not accessible. (unless by inspecting the topology nodes I guess)
> >
> > [1] https://gist.github.com/nfo/c4936a24601352db23b18653a8ccc352
> >
> > Thanks.
> > Nicolas
> >
> >
> > 2017-01-18 15:56 GMT+01:00 Michael Noll <michael@confluent.io>:
> >
> > > Nicolas,
> > >
> > > if I understand your question correctly you'd like to add further
> > > operations after having called `KStream#process()`, which -- as you
> > report
> > > -- doesn't work because `process()` returns void.
> > >
> > > If that's indeed the case, +1 to Damian's suggest to use
> > > `KStream.transform()` instead of `KStream.process()`.
> > >
> > > -Michael
> > >
> > >
> > >
> > >
> > > On Wed, Jan 18, 2017 at 3:31 PM, Damian Guy <damian.guy@gmail.com>
> > wrote:
> > >
> > > > You could possibly also use KStream.transform(...)
> > > >
> > > > On Wed, 18 Jan 2017 at 14:22 Damian Guy <damian.guy@gmail.com>
> wrote:
> > > >
> > > > > Hi Nicolas,
> > > > >
> > > > > Good question! I'm not sure why it is a terminal operation, maybe
> one
> > > of
> > > > > the original authors can chip in. However, you could probably work
> > > around
> > > > > it by using TopologyBuilder.addProcessor(...) rather then
> > > > KStream.process
> > > > >
> > > > > Thanks,
> > > > > Damian
> > > > >
> > > > > On Wed, 18 Jan 2017 at 13:48 Nicolas Fouché <nfouche@onfocus.io>
> > > wrote:
> > > > >
> > > > > Hi,
> > > > >
> > > > > as far as I understand, calling `KStream.process` prevents the
> > > developer
> > > > > from adding further operations to a `KStreamBuilder` [1], because
> its
> > > > > return type is `void`. Good.
> > > > >
> > > > > But it also prevents the developer from adding operations to its
> > > > superclass
> > > > > `TopologyBuilder`. In my case I wanted to add a sink, and the
> parent
> > of
> > > > > this sink would be the name of the Processor that is created by
> > > > > `KStream.process`. Is there any reason why this method does not
> > return
> > > > the
> > > > > processor name [2] ? Is it because it would be a bad idea
> continuing
> > > > > building my topology with the low-level API ?
> > > > >
> > > > > [1]
> > > > >
> > > > > https://github.com/confluentinc/examples/blob/3.
> > > > 1.x/kafka-streams/src/test/java/io/confluent/examples/streams/
> > > > MixAndMatchLambdaIntegrationTest.java%23L56
> > > > > [2]
> > > > >
> > > > > https://github.com/apache/kafka/blob/
> b6011918fbc36bfaa465bdcc750e24
> > > > 35985d9101/streams/src/main/java/org/apache/kafka/streams/
> > > > kstream/internals/KStreamImpl.java#L391
> > > > >
> > > > >
> > > > > Thanks.
> > > > > Nicolas.
> > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message