kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dmitry Minkovsky <dminkov...@gmail.com>
Subject Re: Streams: Why is ForwardingCacheFlushListener internal? Can I replicate its functionality in my code?
Date Thu, 01 Feb 2018 10:10:34 GMT
Right, but I want to forward messages to downstream processor nodes only
when the store flushes. How does that happen automatically
when KTableSourceProcessor sets that up to happen with a TupleForwarder?

On Thu, Feb 1, 2018 at 2:59 AM, Matthias J. Sax <matthias@confluent.io>
wrote:

> If you build a store and enable caching, you get the KTable behavior out
> of the box. No need to write any custom code in the processor itself.
>
>
> StoreBuilder builder =
> Stores.keyValueStoreBuilder(...).withCachingEnabled();
>
> topology.addStateStore(builder, ...)
>
>
>
> -Matthias
>
> On 1/31/18 6:19 PM, Dmitry Minkovsky wrote:
> > I am writing a processor and I want its stores to behave like KTables:
> For
> > consistency, I don't want to forward values until the stores have been
> > flushed.
> >
> > I am looking at `ForwardingCacheFlushListener` and see that it is using
> > `InternalProcessorContext` to change the current node, perform a forward,
> > and then set the node back.
> >
> > Now, I could do the same (`InternalProcessorContext` is public), but:
> > should I? I'm not well versed in the internals, so I am wondering what
> the
> > ramifications of this might be. Is this okay to do? Should I?
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message