kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mahendra Kariya <mahendra.kar...@go-jek.com>
Subject Re: Debugging Kafka Streams Windowing
Date Thu, 04 May 2017 00:47:39 GMT
Hi Matthias,

Sure we will look into this. In the meantime, we have run into another
issue. We have started getting this error frequently rather frequently and
the Streams app is unable to recover from this.

07:44:08.493 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 for group group-2.
07:44:08.494 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 dead for group group-2
07:44:08.594 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 for group group-2.
07:44:08.594 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 dead for group group-2
07:44:08.594 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 for group group-2.
07:44:08.594 [StreamThread-10] INFO o.a.k.c.c.i.AbstractCoordinator -
Marking the coordinator broker-05:6667 dead for group group-2
07:44:08.694 [StreamThread-9] INFO o.a.k.c.c.i.AbstractCoordinator -
Discovered coordinator broker-05:6667 for group group-2.


On Thu, May 4, 2017 at 4:35 AM, Matthias J. Sax <matthias@confluent.io>
wrote:

> I would recommend to double check the following:
>
>  - can you confirm that the filter does not remove all data for those
> time periods?
>  - I would also check input for your AggregatorFunction() -- does it
> receive everything?
>  - same for .mapValues()
>
> This would help to understand in what part of the program the data gets
> lost.
>
>
> -Matthias
>
>
> On 5/2/17 11:09 PM, Mahendra Kariya wrote:
> > Hi Garrett,
> >
> > Thanks for these insights. But we are not consuming old data. We want the
> > Streams app to run in near real time. And that is how it is actually
> > running. The lag never increases beyond a certain limit. So I don't think
> > that's an issue.
> >
> > The values of the configs that you are mentioning are whatever Kafka
> offers
> > by default. So I guess that should be fine.
> >
> >
> >
> > On Tue, May 2, 2017 at 7:52 PM, Garrett Barton <garrett.barton@gmail.com
> >
> > wrote:
> >
> >> Mahendra,
> >>
> >>  One possible thing I have seen that exhibits the same behavior of
> missing
> >> windows of data is the configuration of the topics (internal and your
> own)
> >> retention policies.  I was loading data that was fairly old (weeks) and
> >> using event time semantics as the record timestamp (custom timestamp
> >> extractor) and the cleanup stuff was deleting segments nearly right
> after
> >> they were written.  In my case default cleanup run was every 5 minutes,
> and
> >> the default retention was 7 days, so every 5 minutes I lost data.  In my
> >> logs I saw a ton of warnings about 'offset not found' and kafka skipping
> >> ahead to whatever the next available offset was.  End result was gaps
> all
> >> over my data.  I don't have a good fix yet, I set the retention to
> >> something massive which I think is getting me other problems.
> >>
> >> Maybe that helps?
> >>
> >> On Tue, May 2, 2017 at 6:27 AM, Mahendra Kariya <
> >> mahendra.kariya@go-jek.com>
> >> wrote:
> >>
> >>> Hi Matthias,
> >>>
> >>> What we did was read the data from sink topic and print it to console.
> >> And
> >>> here's the raw data from that topic (the counts are randomized). As we
> >> can
> >>> see, the data is certainly missing for some time windows. For instance,
> >>> after 1493693760, the next timestamp for which the data is present
> >>> is 1493694300. That's around 9 minutes of data missing.
> >>>
> >>> And this is just one instance. There are a lot of such instances in
> this
> >>> file.
> >>>
> >>>
> >>>
> >>> On Sun, Apr 30, 2017 at 11:23 AM, Mahendra Kariya <
> >>> mahendra.kariya@go-jek.com> wrote:
> >>>
> >>>> Thanks for the update Matthias! And sorry for the delayed response.
> >>>>
> >>>> The reason we use .aggregate() is because we want to count the number
> of
> >>>> unique values for a particular field in the message. So, we just add
> >> that
> >>>> particular field's value in the HashSet and then take the size of the
> >>>> HashSet.
> >>>>
> >>>> On our side, we are also investigating and it looks like there might
> be
> >> a
> >>>> bug somewhere in our codebase. If that's the case, then it's quite
> >> possible
> >>>> that there is no bug in Kafka Streams, except the metric one.
> >>>>
> >>>> We will revert after confirming.
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> On Sun, Apr 30, 2017 at 10:39 AM, Matthias J. Sax <
> >> matthias@confluent.io>
> >>>> wrote:
> >>>>
> >>>>> Just a follow up (we identified a bug in the "skipped records"
> metric).
> >>>>> The reported value is not correct.
> >>>>>
> >>>>>
> >>>>> On 4/28/17 9:12 PM, Matthias J. Sax wrote:
> >>>>>> Ok. That makes sense.
> >>>>>>
> >>>>>> Question: why do you use .aggregate() instead of .count() ?
> >>>>>>
> >>>>>> Also, can you share the code of you AggregatorFunction()? Did
you
> >>>>> change
> >>>>>> any default setting of StreamsConfig?
> >>>>>>
> >>>>>> I have still no idea what could go wrong. Maybe you can run
with log
> >>>>>> level TRACE? Maybe we can get some insight from those.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>> On 4/27/17 11:41 PM, Mahendra Kariya wrote:
> >>>>>>> Oh good point!
> >>>>>>>
> >>>>>>> The reason why there is only one row corresponding to each
time
> >>>>> window is
> >>>>>>> because it only contains the latest value for the time window.
So
> >>>>> what we
> >>>>>>> did was we just dumped the data present in the sink topic
to a db
> >>>>> using an
> >>>>>>> upsert query. The primary key of the table was time window.
The
> file
> >>>>> that I
> >>>>>>> attached is actually the data present in the DB. And we
know that
> >>>>> there is
> >>>>>>> no bug in our db dump code because we have been using it
for a long
> >>>>> time in
> >>>>>>> production without any issues.
> >>>>>>>
> >>>>>>> The reason the count is zero for some time windows is because
I
> >>>>> subtracted
> >>>>>>> a random number the actual values and rounded it off to
zero; for
> >>>>> privacy
> >>>>>>> reason. The actual data doesn't have any zero values. I
should have
> >>>>>>> mentioned this earlier. My bad!
> >>>>>>>
> >>>>>>> The stream topology code looks something like this.
> >>>>>>>
> >>>>>>> stream
> >>>>>>>     .filter()
> >>>>>>>     .map((key, value) -> new KeyValue<>(transform(key),
value)
> >>>>>>>     .groupByKey()
> >>>>>>>     .aggregate(HashSet::new, AggregatorFunction(),
> >>>>>>> TimeWindows.of(60000).until(3600000))
> >>>>>>>     .mapValues(HashSet::size)
> >>>>>>>     .toStream()
> >>>>>>>     .map((key, value) -> convertToProtobufObject(key,
value))
> >>>>>>>     .to()
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Fri, Apr 28, 2017 at 1:13 PM, Matthias J. Sax <
> >>>>> matthias@confluent.io>
> >>>>>>> wrote:
> >>>>>>>
> >>>>>>>> Thanks for the details (sorry that I forgot that you
did share the
> >>>>>>>> output already).
> >>>>>>>>
> >>>>>>>> Might be a dumb question, but what is the count for
missing
> windows
> >>>>> in
> >>>>>>>> your seconds implementation?
> >>>>>>>>
> >>>>>>>> If there is no data for a window, it should not emit
a window with
> >>>>> count
> >>>>>>>> zero, but nothing.
> >>>>>>>>
> >>>>>>>> Thus, looking at your output, I am wondering how it
could contain
> >>>>> line
> >>>>>>>> like:
> >>>>>>>>
> >>>>>>>>> 2017-04-27T04:53:00 0
> >>>>>>>>
> >>>>>>>> I am also wondering why your output only contains a
single value
> >> per
> >>>>>>>> window. As Streams outputs multiple updates per window
while the
> >>>>> count
> >>>>>>>> is increasing, you should actually see multiple records
per
> window.
> >>>>>>>>
> >>>>>>>> Your code is like this:
> >>>>>>>>
> >>>>>>>> stream.filter().groupByKey().count(TimeWindow.of(60000)).to();
> >>>>>>>>
> >>>>>>>> Or do you have something more complex?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 4/27/17 9:16 PM, Mahendra Kariya wrote:
> >>>>>>>>>> Can you somehow verify your output?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Do you mean the Kafka streams output? In the Kafka
Streams
> output,
> >>>>> we do
> >>>>>>>>> see some missing values. I have attached the Kafka
Streams output
> >>>>> (for a
> >>>>>>>>> few hours) in the very first email of this thread
for reference.
> >>>>>>>>>
> >>>>>>>>> Let me also summarise what we have done so far.
> >>>>>>>>>
> >>>>>>>>> We took a dump of the raw data present in the source
topic. We
> >>>>> wrote a
> >>>>>>>>> script to read this data and do the exact same aggregations
that
> >> we
> >>>>> do
> >>>>>>>>> using Kafka Streams. And then we compared the output
from Kafka
> >>>>> Streams
> >>>>>>>> and
> >>>>>>>>> our script.
> >>>>>>>>>
> >>>>>>>>> The difference that we observed in the two outputs
is that there
> >>>>> were a
> >>>>>>>> few
> >>>>>>>>> rows (corresponding to some time windows) missing
in the Streams
> >>>>> output.
> >>>>>>>>> For the time windows for which the data was present,
the
> >> aggregated
> >>>>>>>> numbers
> >>>>>>>>> matched exactly.
> >>>>>>>>>
> >>>>>>>>> This means, either all the records for a particular
time window
> >> are
> >>>>> being
> >>>>>>>>> skipped, or none. Now this is highly unlikely to
happen. Maybe
> >>>>> there is a
> >>>>>>>>> bug somewhere in the rocksdb state stores? Just
a speculation,
> not
> >>>>> sure
> >>>>>>>>> though. And there could even be a bug in the reported
metric.
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message