kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: No referential transparency with transform() ?
Date Mon, 24 Sep 2018 16:11:59 GMT
Hello Stéphane,

As the Javadoc of TransformerSupplier#get() states:


Return a new {@link Transformer} instance.


Since Streams is single-thread isolation per-task by-design. If users
ignore the instruction and reuse the same Transformer instance it will
result in undefined behavior, most likely RTE.


Guozhang

On Mon, Sep 24, 2018 at 8:17 AM, Damian Guy <damian.guy@gmail.com> wrote:

> The return value from the `TransformSupplier` should always be a `new
> YourTransformer(..)` as there will be one for each task and they are
> potentially processed on multiple threads.
>
> On Mon, 24 Sep 2018 at 16:07 Stéphane. D. <derste@gmail.com> wrote:
>
> > Hi,
> >
> > We just stumbled upon an issue with KStream.transform() where we had a
> > runtime error with this code:
> >
> > ```
> > DeduplicationTransformer<X, Y, , > transformer = new
> > DeduplicationTransformer<>(...);
> > stream.transform(() -> transformer, ...)
> > ```
> >
> > The error is:
> > Failed to process stream task 0_0 due to the following error:
> > java.lang.IllegalStateException: This should not happen as timestamp()
> > should only be called while a record is processed
> >
> > Whereas simply inlining the creation of the Transformer works:
> >
> > ```
> > stream.transform(() -> new DeduplicationTransformer<>(...), ...)
> > ```
> >
> > Is this behavior expected?
> >
> >
> > I guess that's why tranform() takes a wrapper, to construct it when
> needed?
> >
> > Why does this happen? Is there some kind of global reference used
> > internally (only construct during the execution) ?
> >
> >
> > Thanks,
> >
> > Stéphane
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message