kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jan Filipiak <Jan.Filip...@trivago.com>
Subject Re: [DISCUSS]: KIP-161: streams record processing exception handlers
Date Sat, 03 Jun 2017 18:57:37 GMT
Could not agree more!

But then I think the easiest is still: print exception and die.
 From there on its the easiest way forward: fix, redeploy, start => done

All the other ways to recover a pipeline that was processing partially 
all the time
and suddenly went over a "I cant take it anymore" threshold is not 
straight forward IMO.

How to find the offset, when it became to bad when it is not the latest 
commited one?
How to reset there? with some reasonable stuff in your rockses?

If one would do the following. The continuing Handler would measure for 
a threshold and
would terminate after a certain threshold has passed (per task). Then 
one can use offset commit/ flush intervals
to make reasonable assumption of how much is slipping by + you get an 
easy recovery when it gets to bad
+ you could also account for "in processing" records.

Setting this threshold to zero would cover all cases with 1 
implementation. It is still beneficial to have it pluggable

Again CRC-Errors are the only bad pills we saw in production for now.

Best Jan


On 02.06.2017 17:37, Jay Kreps wrote:
> Jan, I agree with you philosophically. I think one practical challenge has
> to do with data formats. Many people use untyped events, so there is simply
> no guarantee on the form of the input. E.g. many companies use JSON without
> any kind of schema so it becomes very hard to assert anything about the
> input which makes these programs very fragile to the "one accidental
> message publication that creates an unsolvable problem.
>
> For that reason I do wonder if limiting to just serialization actually gets
> you a useful solution. For JSON it will help with the problem of
> non-parseable JSON, but sounds like it won't help in the case where the
> JSON is well-formed but does not have any of the fields you expect and
> depend on for your processing. I expect the reason for limiting the scope
> is it is pretty hard to reason about correctness for anything that stops in
> the middle of processing an operator DAG?
>
> -Jay
>
> On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak <Jan.Filipiak@trivago.com>
> wrote:
>
>> IMHO your doing it wrong then. + building to much support into the kafka
>> eco system is very counterproductive in fostering a happy userbase
>>
>>
>>
>> On 02.06.2017 13:15, Damian Guy wrote:
>>
>>> Jan, you have a choice to Fail fast if you want. This is about giving
>>> people options and there are times when you don't want to fail fast.
>>>
>>>
>>> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak <Jan.Filipiak@trivago.com>
>>> wrote:
>>>
>>> Hi
>>>> 1.
>>>> That greatly complicates monitoring.  Fail Fast gives you that when you
>>>> monitor only the lag of all your apps
>>>> you are completely covered. With that sort of new application Monitoring
>>>> is very much more complicated as
>>>> you know need to monitor fail % of some special apps aswell. In my
>>>> opinion that is a huge downside already.
>>>>
>>>> 2.
>>>> using a schema regerstry like Avrostuff it might not even be the record
>>>> that is broken, it might be just your app
>>>> unable to fetch a schema it needs now know. Maybe you got partitioned
>>>> away from that registry.
>>>>
>>>> 3. When you get alerted because of to high fail percentage. what are the
>>>> steps you gonna do?
>>>> shut it down to buy time. fix the problem. spend way to much time to
>>>> find a good reprocess offset.
>>>> Your timewindows are in bad shape anyways, and you pretty much lost.
>>>> This routine is nonsense.
>>>>
>>>> Dead letter queues would be the worst possible addition to the kafka
>>>> toolkit that I can think of. It just doesn't fit the architecture
>>>> of having clients falling behind is a valid option.
>>>>
>>>> Further. I mentioned already the only bad pill ive seen so far is crc
>>>> errors. any plans for those?
>>>>
>>>> Best Jan
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On 02.06.2017 11:34, Damian Guy wrote:
>>>>
>>>>> I agree with what Matthias has said w.r.t failing fast. There are plenty
>>>>>
>>>> of
>>>>
>>>>> times when you don't want to fail-fast and must attempt to  make
>>>>>
>>>> progress.
>>>>
>>>>> The dead-letter queue is exactly for these circumstances. Of course if
>>>>> every record is failing, then you probably do want to give up.
>>>>>
>>>>> On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax <matthias@confluent.io>
>>>>>
>>>> wrote:
>>>>
>>>>> First a meta comment. KIP discussion should take place on the dev list
>>>>>> -- if user list is cc'ed please make sure to reply to both lists.
>>>>>>
>>>>> Thanks.
>>>>> Thanks for making the scope of the KIP clear. Makes a lot of sense to
>>>>>> focus on deserialization exceptions for now.
>>>>>>
>>>>>> With regard to corrupted state stores, would it make sense to fail
a
>>>>>> task and wipe out the store to repair it via recreation from the
>>>>>> changelog? That's of course a quite advance pattern, but I want to
>>>>>> bring
>>>>>> it up to design the first step in a way such that we can get there
(if
>>>>>> we think it's a reasonable idea).
>>>>>>
>>>>>> I also want to comment about fail fast vs making progress. I think
that
>>>>>> fail-fast must not always be the best option. The scenario I have
in
>>>>>> mind is like this: you got a bunch of producers that feed the Streams
>>>>>> input topic. Most producers work find, but maybe one producer miss
>>>>>> behaves and the data it writes is corrupted. You might not even be
able
>>>>>> to recover this lost data at any point -- thus, there is no reason
to
>>>>>> stop processing but you just skip over those records. Of course,
you
>>>>>> need to fix the root cause, and thus you need to alert (either via
logs
>>>>>> of the exception handler directly) and you need to start to investigate
>>>>>> to find the bad producer, shut it down and fix it.
>>>>>>
>>>>>> Here the dead letter queue comes into place. From my understanding,
the
>>>>>> purpose of this feature is solely enable post debugging. I don't
think
>>>>>> those record would be fed back at any point in time (so I don't see
any
>>>>>> ordering issue -- a skipped record, with this regard, is just "fully
>>>>>> processed"). Thus, the dead letter queue should actually encode the
>>>>>> original records metadata (topic, partition offset etc) to enable
such
>>>>>> debugging. I guess, this might also be possible if you just log the
bad
>>>>>> records, but it would be harder to access (you first must find the
>>>>>> Streams instance that did write the log and extract the information
>>>>>> from
>>>>>> there). Reading it from topic is much simpler.
>>>>>>
>>>>>> I also want to mention the following. Assume you have such a topic
with
>>>>>> some bad records and some good records. If we always fail-fast, it's
>>>>>> going to be super hard to process the good data. You would need to
>>>>>> write
>>>>>> an extra app that copied the data into a new topic filtering out
the
>>>>>> bad
>>>>>> records (or apply the map() workaround withing stream). So I don't
>>>>>> think
>>>>>> that failing fast is most likely the best option in production is
>>>>>> necessarily, true.
>>>>>>
>>>>>> Or do you think there are scenarios, for which you can recover the
>>>>>> corrupted records successfully? And even if this is possible, it
might
>>>>>> be a case for reprocessing instead of failing the whole application?
>>>>>> Also, if you think you can "repair" a corrupted record, should the
>>>>>> handler allow to return a "fixed" record? This would solve the ordering
>>>>>> problem.
>>>>>>
>>>>>>
>>>>>>
>>>>>> -Matthias
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On 5/30/17 1:47 AM, Michael Noll wrote:
>>>>>>
>>>>>>> Thanks for your work on this KIP, Eno -- much appreciated!
>>>>>>>
>>>>>>> - I think it would help to improve the KIP by adding an end-to-end
>>>>>>> code
>>>>>>> example that demonstrates, with the DSL and with the Processor
API,
>>>>>>> how
>>>>>>>
>>>>>> the
>>>>>>
>>>>>>> user would write a simple application that would then be augmented
>>>>>>> with
>>>>>>>
>>>>>> the
>>>>>>
>>>>>>> proposed KIP changes to handle exceptions.  It should also become
much
>>>>>>> clearer then that e.g. the KIP would lead to different code paths
for
>>>>>>>
>>>>>> the
>>>>> happy case and any failure scenarios.
>>>>>>> - Do we have sufficient information available to make informed
>>>>>>>
>>>>>> decisions
>>>>> on
>>>>>>> what to do next?  For example, do we know in which part of the
>>>>>>> topology
>>>>>>>
>>>>>> the
>>>>>>
>>>>>>> record failed? `ConsumerRecord` gives us access to topic, partition,
>>>>>>> offset, timestamp, etc., but what about topology-related information
>>>>>>>
>>>>>> (e.g.
>>>>>>
>>>>>>> what is the associated state store, if any)?
>>>>>>>
>>>>>>> - Only partly on-topic for the scope of this KIP, but this is
about
>>>>>>> the
>>>>>>> bigger picture: This KIP would give users the option to send
corrupted
>>>>>>> records to dead letter queue (quarantine topic).  But, what pattern
>>>>>>>
>>>>>> would
>>>>> we advocate to process such a dead letter queue then, e.g. how to allow
>>>>>> for
>>>>>>
>>>>>>> retries with backoff ("If the first record in the dead letter
queue
>>>>>>>
>>>>>> fails
>>>>> again, then try the second record for the time being and go back to the
>>>>>>> first record at a later time").  Jay and Jan already alluded
to
>>>>>>>
>>>>>> ordering
>>>>> problems that will be caused by dead letter queues. As I said, retries
>>>>>>> might be out of scope but perhaps the implications should be
>>>>>>> considered
>>>>>>>
>>>>>> if
>>>>>>
>>>>>>> possible?
>>>>>>>
>>>>>>> Also, I wrote the text below before reaching the point in the
>>>>>>>
>>>>>> conversation
>>>>>>
>>>>>>> that this KIP's scope will be limited to exceptions in the category
of
>>>>>>> poison pills / deserialization errors.  But since Jay brought
up user
>>>>>>>
>>>>>> code
>>>>>>
>>>>>>> errors again, I decided to include it again.
>>>>>>>
>>>>>>> ----------------------------snip----------------------------
>>>>>>> A meta comment: I am not sure about this split between the code
for
>>>>>>> the
>>>>>>> happy path (e.g. map/filter/... in the DSL) from the failure
path
>>>>>>>
>>>>>> (using
>>>>> exception handlers).  In Scala, for example, we can do:
>>>>>>>        scala> val computation = scala.util.Try(1 / 0)
>>>>>>>        computation: scala.util.Try[Int] =
>>>>>>> Failure(java.lang.ArithmeticException: / by zero)
>>>>>>>
>>>>>>>        scala> computation.getOrElse(42)
>>>>>>>        res2: Int = 42
>>>>>>>
>>>>>>> Another example with Scala's pattern matching, which is similar
to
>>>>>>> `KStream#branch()`:
>>>>>>>
>>>>>>>        computation match {
>>>>>>>          case scala.util.Success(x) => x * 5
>>>>>>>          case scala.util.Failure(_) => 42
>>>>>>>        }
>>>>>>>
>>>>>>> (The above isn't the most idiomatic way to handle this in Scala,
but
>>>>>>>
>>>>>> that's
>>>>>>
>>>>>>> not the point I'm trying to make here.)
>>>>>>>
>>>>>>> Hence the question I'm raising here is: Do we want to have an
API
>>>>>>> where
>>>>>>>
>>>>>> you
>>>>>>
>>>>>>> code "the happy path", and then have a different code path for
>>>>>>> failures
>>>>>>> (using exceptions and handlers);  or should we treat both Success
and
>>>>>>> Failure in the same way?
>>>>>>>
>>>>>>> I think the failure/exception handling approach (as proposed
in this
>>>>>>>
>>>>>> KIP)
>>>>> is well-suited for errors in the category of deserialization problems
>>>>>> aka
>>>>> poison pills, partly because the (default) serdes are defined through
>>>>>>> configuration (explicit serdes however are defined through API
calls).
>>>>>>>
>>>>>>> However, I'm not yet convinced that the failure/exception handling
>>>>>>>
>>>>>> approach
>>>>>>
>>>>>>> is the best idea for user code exceptions, e.g. if you fail to
guard
>>>>>>> against NPE in your lambdas or divide a number by zero.
>>>>>>>
>>>>>>>        scala> val stream = Seq(1, 2, 3, 4, 5)
>>>>>>>        stream: Seq[Int] = List(1, 2, 3, 4, 5)
>>>>>>>
>>>>>>>        // Here: Fallback to a sane default when encountering
failed
>>>>>>>
>>>>>> records
>>>>>        scala>     stream.map(x => Try(1/(3 - x))).flatMap(t =>
>>>>>>> Seq(t.getOrElse(42)))
>>>>>>>        res19: Seq[Int] = List(0, 1, 42, -1, 0)
>>>>>>>
>>>>>>>        // Here: Skip over failed records
>>>>>>>        scala> stream.map(x => Try(1/(3 - x))).collect{
case Success(s)
>>>>>>>
>>>>>> => s
>>>>> }
>>>>>>>        res20: Seq[Int] = List(0, 1, -1, 0)
>>>>>>>
>>>>>>> The above is more natural to me than using error handlers to
define
>>>>>>> how
>>>>>>>
>>>>>> to
>>>>>>
>>>>>>> deal with failed records (here, the value `3` causes an arithmetic
>>>>>>> exception).  Again, it might help the KIP if we added an end-to-end
>>>>>>>
>>>>>> example
>>>>>>
>>>>>>> for such user code errors.
>>>>>>> ----------------------------snip----------------------------
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak <
>>>>>>>
>>>>>> Jan.Filipiak@trivago.com>
>>>>> wrote:
>>>>>>> Hi Jay,
>>>>>>>> Eno mentioned that he will narrow down the scope to only
>>>>>>>>
>>>>>>> ConsumerRecord
>>>>> deserialisation.
>>>>>>>> I am working with Database Changelogs only. I would really
not like
>>>>>>>> to
>>>>>>>>
>>>>>>> see
>>>>>>> a dead letter queue or something
>>>>>>>> similliar. how am I expected to get these back in order.
Just grind
>>>>>>>> to
>>>>>>>> hold an call me on the weekend. I'll fix it
>>>>>>>> then in a few minutes rather spend 2 weeks ordering dead
letters.
>>>>>>>>
>>>>>>> (where
>>>>> reprocessing might be even the faster fix)
>>>>>>>> Best Jan
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On 29.05.2017 20:23, Jay Kreps wrote:
>>>>>>>>
>>>>>>>>        - I think we should hold off on retries unless we
have worked
>>>>>>>> out
>>>>> the
>>>>>>>        full usage pattern, people can always implement their
own. I
>>>>>>>> think
>>>>> the idea
>>>>>>>>>        is that you send the message to some kind of dead
letter queue
>>>>>>>>>
>>>>>>>> and
>>>>> then
>>>>>>>>>        replay these later. This obviously destroys all
semantic
>>>>>>>>>
>>>>>>>> guarantees
>>>>> we are
>>>>>>>>>        working hard to provide right now, which may be
okay.
>>>>>>>>>
>>>>>>>>>


Mime
View raw message