kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: In Kafka Streaming, Serdes should use Optionals
Date Wed, 12 Oct 2016 17:04:31 GMT
Haha, I feel the same pain with you man.

On Tue, Oct 11, 2016 at 8:59 PM, Ali Akhtar <ali.rac200@gmail.com> wrote:

> Thanks. That filter() method is a good solution. But whenever I look at it,
> I feel an empty spot in my heart which can only be filled by:
> filter(Optional::isPresent)
>
> On Wed, Oct 12, 2016 at 12:15 AM, Guozhang Wang <wangguoz@gmail.com>
> wrote:
>
> > Ali,
> >
> > We are working on moving from Java7 to Java8 in Apache Kafka, and the
> > Streams client is one of the motivations doing so. Stay tuned on the
> > mailing list when it will come.
> >
> > Currently Streams won't automatically filter out null values for you
> since
> > in some other cases they may have semantic meanings and cannot be simply
> > ignored; you can, though, apply a simple filter such like "filter((key,
> > value) => value != null)" before your processor lambda operator, if it
> > looks clearer in your code.
> >
> > Guozhang
> >
> >
> > On Sun, Oct 9, 2016 at 3:14 PM, Ali Akhtar <ali.rac200@gmail.com> wrote:
> >
> > > It isn't a fatal error. It should be logged as a warning, and then the
> > > stream should be continued w/ the next message.
> > >
> > > Checking for null is 'ok', in the sense that it gets the job done, but
> > > after java 8's release, we really should be using optionals.
> > >
> > > Hopefully we can break compatibility w/ the bad old days soon and move
> > into
> > > the future.
> > >
> > > (If there's a way to do the null check automatically, i.e before
> calling
> > > the lambda, please let me know).
> > >
> > > On Sun, Oct 9, 2016 at 11:14 PM, Guozhang Wang <wangguoz@gmail.com>
> > wrote:
> > >
> > > > Ali,
> > > >
> > > > In your scenario, if serde fails to parse the bytes should that be
> > > treated
> > > > as a fatal failure or it is expected?
> > > >
> > > > In the former case, instead of returning a null I think it is better
> to
> > > > throw a runtime exception in order to let the whole client to stop
> and
> > > > notify the error; in the latter case, returning and checking null
> looks
> > > > fine to me.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Fri, Oct 7, 2016 at 3:12 PM, Ali Akhtar <ali.rac200@gmail.com>
> > wrote:
> > > >
> > > > > Hey G,
> > > > >
> > > > > Looks like the only difference is a valueSerde parameter.
> > > > >
> > > > > How does that prevent having to look for nulls in the consumer?
> > > > >
> > > > > E.g, I wrote a custom Serde which converts the messages (which are
> > json
> > > > > strings) into a Java class using Jackson.
> > > > >
> > > > > If the json parse fails, it sends back a null.
> > > > >
> > > > > When I'm reading this stream, in my callback, how would I prevent
> > > having
> > > > to
> > > > > check if the serialized value isn't null?
> > > > >
> > > > > On Sat, Oct 8, 2016 at 1:07 AM, Guozhang Wang <wangguoz@gmail.com>
> > > > wrote:
> > > > >
> > > > > > Hello Ali,
> > > > > >
> > > > > > We do have corresponding overloaded functions for most of
> KStream /
> > > > > KTable
> > > > > > operators to avoid enforcing users to specify "null"; in these
> > cases
> > > > the
> > > > > > default serdes specified in the configs are then used. For
> example:
> > > > > >
> > > > > > <T> KTable<K, T> aggregate(Initializer<T>
initializer,
> > > > > >                            Aggregator<K, V, T> adder,
> > > > > >                            Aggregator<K, V, T> subtractor,
> > > > > >                            Serde<T> aggValueSerde,
> > > > > >                            String storeName);
> > > > > >
> > > > > > /**
> > > > > >  * .. using default serializers and deserializers.
> > > > > >  */
> > > > > > <T> KTable<K, T> aggregate(Initializer<T>
initializer,
> > > > > >                            Aggregator<K, V, T> adder,
> > > > > >                            Aggregator<K, V, T> subtractor,
> > > > > >                            String storeName);
> > > > > >
> > > > > >
> > > > > >
> > > > > > Guozhang
> > > > > >
> > > > > >
> > > > > > On Fri, Oct 7, 2016 at 9:20 AM, Michael Noll <
> michael@confluent.io
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Ali, the Apache Kafka project still targets Java 7, which
means
> > we
> > > > > can't
> > > > > > > use Java 8 features just yet.
> > > > > > >
> > > > > > > FYI: There's on ongoing conversation about when Kafka would
> move
> > > from
> > > > > > Java
> > > > > > > 7 to Java 8.
> > > > > > >
> > > > > > > On Fri, Oct 7, 2016 at 6:14 PM, Ali Akhtar <
> ali.rac200@gmail.com
> > >
> > > > > wrote:
> > > > > > >
> > > > > > > > Since we're using Java 8 in most cases anyway, Serdes
/
> > > > Serialiazers
> > > > > > > should
> > > > > > > > use options, to avoid having to deal with the lovely
nulls.
> > > > > > > >
> > > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > --
> > > > > > -- Guozhang
> > > > > >
> > > > >
> > > >
> > > >
> > > >
> > > > --
> > > > -- Guozhang
> > > >
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>



-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message