kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bill Bejeck <b...@confluent.io>
Subject Re: Kafka Streams KGroupedTable.count() method returning negative values.
Date Fri, 08 Feb 2019 19:25:14 GMT
Hi Ankur,

I understand. Let's see if we can narrow things down some without any
logging.

Where exactly are you seeing the negative number from the code above?
Have you confirmed the count is negative by observing the results of the
count().toStream() before the mapValues call?


Thanks!
Bill

On Fri, Feb 8, 2019 at 1:31 PM Ankur Rana <ankur.rana@getfareye.com> wrote:

> Hi Bill,
>
> I will try to make that change but since the negative values are really
> rare, It would be very difficult to capture the logs at that point. I am
> seeing this issue only in production. I will see what I can do.
>
> Also, the negative count is not limited to smaller values, but I have seen
> values as high as -300.
> I cannot get my head around this behavior.
>
>
> On Fri, Feb 8, 2019 at 5:56 PM Bill Bejeck <bill@confluent.io> wrote:
>
> > Hi Ankur,
> >
> > Could you add an additional peek method logging what's going into the
> > groupBy call and share the logs?
> >
> > Thanks,
> > Bill
> >
> > On Fri, Feb 8, 2019 at 7:48 AM Ankur Rana <ankur.rana@getfareye.com>
> > wrote:
> >
> > > Hi All,
> > >
> > > One of my Kafka streams application is returning negative values for
> > > count() method.
> > > How is that possible?
> > >
> > >
> > > is there any known issue? I cannot think of any reason that this count
> > > could be negative.
> > > Is it possible if the state store is corrupted?
> > >
> > > idAndJobTransaction
> > >         .filter((k,v) -> v!=null)
> > >         .mapValues(jobTransaction -> {
> > >             jobTransaction.setCount(0);
> > >             jobTransaction.setId(0L);
> > >             jobTransaction.setRunsheet_id(0L);
> > >             jobTransaction.setTimestamp(0L);
> > >             if(jobTransaction.getDelete_flag() == 1)
> > >                 return null;
> > >             else
> > >                 return jobTransaction;
> > >         } )
> > >         .groupBy((id,jobTransaction)->new
> > >
> > >
> >
> KeyValue<>(jobTransaction,jobTransaction),Serialized.with(jobTransactionSerde,jobTransactionSerde))
> > >         .count()
> > >         .toStream()
> > >         .mapValues((k,v)-> new JobSummary(k,v))
> > >         .peek((k,v)->{
> > >             log.info(k.toString());
> > >             log.info(v.toString());
> > >         }).selectKey((k,v)-> v.getCompany_id())  // So that the count
> > > is consumed in order for each company
> > >         .to(JOB_SUMMARY,Produced.with(Serdes.Long(),jobSummarySerde));
> > >
> > >
> > > --
> > > Thanks,
> > >
> > > Ankur Rana
> > > Software Developer
> > > FarEye
> > >
> >
>
>
> --
> Thanks,
>
> Ankur Rana
> Software Developer
> FarEye
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message