kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jan Filipiak <Jan.Filip...@trivago.com>
Subject Re: [DISCUSS]: KIP-161: streams record processing exception handlers
Date Mon, 05 Jun 2017 17:19:42 GMT

just my few thoughts

On 05.06.2017 11:44, Eno Thereska wrote:
> Hi there,
> Sorry for the late reply, I was out this past week. Looks like good progress was made
with the discussions either way. Let me recap a couple of points I saw into one big reply:
> 1. Jan mentioned CRC errors. I think this is a good point. As these happen in Kafka,
before Kafka Streams gets a chance to inspect anything, I'd like to hear the opinion of more
Kafka folks like Ismael or Jason on this one. Currently the documentation is not great with
what to do once a CRC check has failed. From looking at the code, it looks like the client
gets a KafkaException (bubbled up from the fetcher) and currently we in streams catch this
as part of poll() and fail. It might be advantageous to treat CRC handling in a similar way
to serialisation handling (e.g., have the option to fail/skip). Let's see what the other folks
say. Worst-case we can do a separate KIP for that if it proved too hard to do in one go.
there is no reasonable way to "skip" a crc error. How can you know the 
length you read was anything reasonable? you might be completely lost 
inside your response.
> 2. Damian has convinced me that the KIP should just be for deserialisation from the network,
not from local state store DBs. For the latter we'll follow the current way of failing since
the DB is likely corrupt.
> 3. Dead letter queue option. There was never any intention here to do anything super
clever like attempt to re-inject the failed records from the dead letter queue back into the
system. Reasoning about when that'd be useful in light of all sorts of semantic breakings
would be hard (arguably impossible). The idea was to just have a place to have all these dead
records to help with subsequent debugging. We could also just log a whole bunch of info for
a poison pill record and not have a dead letter queue at all. Perhaps that's a better, simpler,
starting point.
> 4. Agree with Jay on style, a DefaultHandler with some config options. Will add options
to KIP. Also as part of this let's remove the threshold logger since it gets complex and arguably
the ROI is low.
> 5. Jay's JSON example, where serialisation passes but the JSON message doesn't have the
expected fields, is an interesting one. It's a bit complicated to handle this in the middle
of processing. For example, some operators in the DAG might actually find the needed JSON
fields and make progress, but other operators, for the same record, might not find their fields
and will throw an exception.
> At a minimum, handling this type of exception will need to involve the exactly-once (EoS)
logic. We'd still allow the option of failing or skipping, but EoS would need to clean up
by rolling back all the side effects from the processing so far. Matthias, how does this sound?
Eos will not help the record might be 5,6 repartitions down into the 
topology. I haven't followed but I pray you made EoS optional! We don't 
need this and we don't want this and we will turn it off if it comes. So 
I wouldn't recommend relying on it. The option to turn it off is better 
than forcing it and still beeing unable to rollback badpills (as 
explained before)
> 6. Will add an end-to-end example as Michael suggested.
> Thanks
> Eno
>> On 4 Jun 2017, at 02:35, Matthias J. Sax <matthias@confluent.io> wrote:
>> What I don't understand is this:
>>>  From there on its the easiest way forward: fix, redeploy, start => done
>> If you have many producers that work fine and a new "bad" producer
>> starts up and writes bad data into your input topic, your Streams app
>> dies but all your producers, including the bad one, keep writing.
>> Thus, how would you fix this, as you cannot "remove" the corrupted date
>> from the topic? It might take some time to identify the root cause and
>> stop the bad producer. Up to this point you get good and bad data into
>> your Streams input topic. If Streams app in not able to skip over those
>> bad records, how would you get all the good data from the topic? Not
>> saying it's not possible, but it's extra work copying the data with a
>> new non-Streams consumer-producer-app into a new topic and than feed
>> your Streams app from this new topic -- you also need to update all your
>> upstream producers to write to the new topic.
>> Thus, if you want to fail fast, you can still do this. And after you
>> detected and fixed the bad producer you might just reconfigure your app
>> to skip bad records until it reaches the good part of the data.
>> Afterwards, you could redeploy with fail-fast again.
>> Thus, for this pattern, I actually don't see any reason why to stop the
>> Streams app at all. If you have a callback, and use the callback to
>> raise an alert (and maybe get the bad data into a bad record queue), it
>> will not take longer to identify and stop the "bad" producer. But for
>> this case, you have zero downtime for your Streams app.
>> This seems to be much simpler. Or do I miss anything?
>> Having said this, I agree that the "threshold based callback" might be
>> questionable. But as you argue for strict "fail-fast", I want to argue
>> that this must not always be the best pattern to apply and that the
>> overall KIP idea is super useful from my point of view.
>> -Matthias
>> On 6/3/17 11:57 AM, Jan Filipiak wrote:
>>> Could not agree more!
>>> But then I think the easiest is still: print exception and die.
>>>  From there on its the easiest way forward: fix, redeploy, start => done
>>> All the other ways to recover a pipeline that was processing partially
>>> all the time
>>> and suddenly went over a "I cant take it anymore" threshold is not
>>> straight forward IMO.
>>> How to find the offset, when it became to bad when it is not the latest
>>> commited one?
>>> How to reset there? with some reasonable stuff in your rockses?
>>> If one would do the following. The continuing Handler would measure for
>>> a threshold and
>>> would terminate after a certain threshold has passed (per task). Then
>>> one can use offset commit/ flush intervals
>>> to make reasonable assumption of how much is slipping by + you get an
>>> easy recovery when it gets to bad
>>> + you could also account for "in processing" records.
>>> Setting this threshold to zero would cover all cases with 1
>>> implementation. It is still beneficial to have it pluggable
>>> Again CRC-Errors are the only bad pills we saw in production for now.
>>> Best Jan
>>> On 02.06.2017 17:37, Jay Kreps wrote:
>>>> Jan, I agree with you philosophically. I think one practical challenge
>>>> has
>>>> to do with data formats. Many people use untyped events, so there is
>>>> simply
>>>> no guarantee on the form of the input. E.g. many companies use JSON
>>>> without
>>>> any kind of schema so it becomes very hard to assert anything about the
>>>> input which makes these programs very fragile to the "one accidental
>>>> message publication that creates an unsolvable problem.
>>>> For that reason I do wonder if limiting to just serialization actually
>>>> gets
>>>> you a useful solution. For JSON it will help with the problem of
>>>> non-parseable JSON, but sounds like it won't help in the case where the
>>>> JSON is well-formed but does not have any of the fields you expect and
>>>> depend on for your processing. I expect the reason for limiting the scope
>>>> is it is pretty hard to reason about correctness for anything that
>>>> stops in
>>>> the middle of processing an operator DAG?
>>>> -Jay
>>>> On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak <Jan.Filipiak@trivago.com>
>>>> wrote:
>>>>> IMHO your doing it wrong then. + building to much support into the kafka
>>>>> eco system is very counterproductive in fostering a happy userbase
>>>>> On 02.06.2017 13:15, Damian Guy wrote:
>>>>>> Jan, you have a choice to Fail fast if you want. This is about giving
>>>>>> people options and there are times when you don't want to fail fast.
>>>>>> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak <Jan.Filipiak@trivago.com>
>>>>>> wrote:
>>>>>> Hi
>>>>>>> 1.
>>>>>>> That greatly complicates monitoring.  Fail Fast gives you that
>>>>>>> you
>>>>>>> monitor only the lag of all your apps
>>>>>>> you are completely covered. With that sort of new application
>>>>>>> Monitoring
>>>>>>> is very much more complicated as
>>>>>>> you know need to monitor fail % of some special apps aswell.
In my
>>>>>>> opinion that is a huge downside already.
>>>>>>> 2.
>>>>>>> using a schema regerstry like Avrostuff it might not even be
>>>>>>> record
>>>>>>> that is broken, it might be just your app
>>>>>>> unable to fetch a schema it needs now know. Maybe you got partitioned
>>>>>>> away from that registry.
>>>>>>> 3. When you get alerted because of to high fail percentage. what
>>>>>>> are the
>>>>>>> steps you gonna do?
>>>>>>> shut it down to buy time. fix the problem. spend way to much
time to
>>>>>>> find a good reprocess offset.
>>>>>>> Your timewindows are in bad shape anyways, and you pretty much
>>>>>>> This routine is nonsense.
>>>>>>> Dead letter queues would be the worst possible addition to the
>>>>>>> toolkit that I can think of. It just doesn't fit the architecture
>>>>>>> of having clients falling behind is a valid option.
>>>>>>> Further. I mentioned already the only bad pill ive seen so far
is crc
>>>>>>> errors. any plans for those?
>>>>>>> Best Jan
>>>>>>> On 02.06.2017 11:34, Damian Guy wrote:
>>>>>>>> I agree with what Matthias has said w.r.t failing fast. There
>>>>>>>> plenty
>>>>>>> of
>>>>>>>> times when you don't want to fail-fast and must attempt to
>>>>>>> progress.
>>>>>>>> The dead-letter queue is exactly for these circumstances.
>>>>>>>> course if
>>>>>>>> every record is failing, then you probably do want to give
>>>>>>>> On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax <matthias@confluent.io>
>>>>>>> wrote:
>>>>>>>> First a meta comment. KIP discussion should take place on
the dev
>>>>>>>> list
>>>>>>>>> -- if user list is cc'ed please make sure to reply to
both lists.
>>>>>>>> Thanks.
>>>>>>>> Thanks for making the scope of the KIP clear. Makes a lot
of sense to
>>>>>>>>> focus on deserialization exceptions for now.
>>>>>>>>> With regard to corrupted state stores, would it make
sense to fail a
>>>>>>>>> task and wipe out the store to repair it via recreation
from the
>>>>>>>>> changelog? That's of course a quite advance pattern,
but I want to
>>>>>>>>> bring
>>>>>>>>> it up to design the first step in a way such that we
can get
>>>>>>>>> there (if
>>>>>>>>> we think it's a reasonable idea).
>>>>>>>>> I also want to comment about fail fast vs making progress.
>>>>>>>>> think that
>>>>>>>>> fail-fast must not always be the best option. The scenario
I have in
>>>>>>>>> mind is like this: you got a bunch of producers that
feed the
>>>>>>>>> Streams
>>>>>>>>> input topic. Most producers work find, but maybe one
producer miss
>>>>>>>>> behaves and the data it writes is corrupted. You might
not even
>>>>>>>>> be able
>>>>>>>>> to recover this lost data at any point -- thus, there
is no
>>>>>>>>> reason to
>>>>>>>>> stop processing but you just skip over those records.
Of course, you
>>>>>>>>> need to fix the root cause, and thus you need to alert
>>>>>>>>> via logs
>>>>>>>>> of the exception handler directly) and you need to start
>>>>>>>>> investigate
>>>>>>>>> to find the bad producer, shut it down and fix it.
>>>>>>>>> Here the dead letter queue comes into place. From my
>>>>>>>>> understanding, the
>>>>>>>>> purpose of this feature is solely enable post debugging.
I don't
>>>>>>>>> think
>>>>>>>>> those record would be fed back at any point in time (so
I don't
>>>>>>>>> see any
>>>>>>>>> ordering issue -- a skipped record, with this regard,
is just "fully
>>>>>>>>> processed"). Thus, the dead letter queue should actually
encode the
>>>>>>>>> original records metadata (topic, partition offset etc)
to enable
>>>>>>>>> such
>>>>>>>>> debugging. I guess, this might also be possible if you
just log
>>>>>>>>> the bad
>>>>>>>>> records, but it would be harder to access (you first
must find the
>>>>>>>>> Streams instance that did write the log and extract the
>>>>>>>>> from
>>>>>>>>> there). Reading it from topic is much simpler.
>>>>>>>>> I also want to mention the following. Assume you have
such a
>>>>>>>>> topic with
>>>>>>>>> some bad records and some good records. If we always
fail-fast, it's
>>>>>>>>> going to be super hard to process the good data. You
would need to
>>>>>>>>> write
>>>>>>>>> an extra app that copied the data into a new topic filtering
out the
>>>>>>>>> bad
>>>>>>>>> records (or apply the map() workaround withing stream).
So I don't
>>>>>>>>> think
>>>>>>>>> that failing fast is most likely the best option in production
>>>>>>>>> necessarily, true.
>>>>>>>>> Or do you think there are scenarios, for which you can
recover the
>>>>>>>>> corrupted records successfully? And even if this is possible,
>>>>>>>>> might
>>>>>>>>> be a case for reprocessing instead of failing the whole
>>>>>>>>> Also, if you think you can "repair" a corrupted record,
should the
>>>>>>>>> handler allow to return a "fixed" record? This would
solve the
>>>>>>>>> ordering
>>>>>>>>> problem.
>>>>>>>>> -Matthias
>>>>>>>>> On 5/30/17 1:47 AM, Michael Noll wrote:
>>>>>>>>>> Thanks for your work on this KIP, Eno -- much appreciated!
>>>>>>>>>> - I think it would help to improve the KIP by adding
an end-to-end
>>>>>>>>>> code
>>>>>>>>>> example that demonstrates, with the DSL and with
the Processor API,
>>>>>>>>>> how
>>>>>>>>> the
>>>>>>>>>> user would write a simple application that would
then be augmented
>>>>>>>>>> with
>>>>>>>>> the
>>>>>>>>>> proposed KIP changes to handle exceptions.  It should
>>>>>>>>>> become much
>>>>>>>>>> clearer then that e.g. the KIP would lead to different
>>>>>>>>>> paths for
>>>>>>>>> the
>>>>>>>> happy case and any failure scenarios.
>>>>>>>>>> - Do we have sufficient information available to
make informed
>>>>>>>>> decisions
>>>>>>>> on
>>>>>>>>>> what to do next?  For example, do we know in which
part of the
>>>>>>>>>> topology
>>>>>>>>> the
>>>>>>>>>> record failed? `ConsumerRecord` gives us access to
>>>>>>>>>> partition,
>>>>>>>>>> offset, timestamp, etc., but what about topology-related
>>>>>>>>>> information
>>>>>>>>> (e.g.
>>>>>>>>>> what is the associated state store, if any)?
>>>>>>>>>> - Only partly on-topic for the scope of this KIP,
but this is about
>>>>>>>>>> the
>>>>>>>>>> bigger picture: This KIP would give users the option
to send
>>>>>>>>>> corrupted
>>>>>>>>>> records to dead letter queue (quarantine topic).
 But, what pattern
>>>>>>>>> would
>>>>>>>> we advocate to process such a dead letter queue then, e.g.
how to
>>>>>>>> allow
>>>>>>>>> for
>>>>>>>>>> retries with backoff ("If the first record in the
dead letter queue
>>>>>>>>> fails
>>>>>>>> again, then try the second record for the time being and
go back
>>>>>>>> to the
>>>>>>>>>> first record at a later time").  Jay and Jan already
alluded to
>>>>>>>>> ordering
>>>>>>>> problems that will be caused by dead letter queues. As I
>>>>>>>> retries
>>>>>>>>>> might be out of scope but perhaps the implications
should be
>>>>>>>>>> considered
>>>>>>>>> if
>>>>>>>>>> possible?
>>>>>>>>>> Also, I wrote the text below before reaching the
point in the
>>>>>>>>> conversation
>>>>>>>>>> that this KIP's scope will be limited to exceptions
in the
>>>>>>>>>> category of
>>>>>>>>>> poison pills / deserialization errors.  But since
Jay brought up
>>>>>>>>>> user
>>>>>>>>> code
>>>>>>>>>> errors again, I decided to include it again.
>>>>>>>>>> ----------------------------snip----------------------------
>>>>>>>>>> A meta comment: I am not sure about this split between
the code for
>>>>>>>>>> the
>>>>>>>>>> happy path (e.g. map/filter/... in the DSL) from
the failure path
>>>>>>>>> (using
>>>>>>>> exception handlers).  In Scala, for example, we can do:
>>>>>>>>>>        scala> val computation = scala.util.Try(1
/ 0)
>>>>>>>>>>        computation: scala.util.Try[Int] =
>>>>>>>>>> Failure(java.lang.ArithmeticException: / by zero)
>>>>>>>>>>        scala> computation.getOrElse(42)
>>>>>>>>>>        res2: Int = 42
>>>>>>>>>> Another example with Scala's pattern matching, which
is similar to
>>>>>>>>>> `KStream#branch()`:
>>>>>>>>>>        computation match {
>>>>>>>>>>          case scala.util.Success(x) => x * 5
>>>>>>>>>>          case scala.util.Failure(_) => 42
>>>>>>>>>>        }
>>>>>>>>>> (The above isn't the most idiomatic way to handle
this in Scala,
>>>>>>>>>> but
>>>>>>>>> that's
>>>>>>>>>> not the point I'm trying to make here.)
>>>>>>>>>> Hence the question I'm raising here is: Do we want
to have an API
>>>>>>>>>> where
>>>>>>>>> you
>>>>>>>>>> code "the happy path", and then have a different
code path for
>>>>>>>>>> failures
>>>>>>>>>> (using exceptions and handlers);  or should we treat
>>>>>>>>>> Success and
>>>>>>>>>> Failure in the same way?
>>>>>>>>>> I think the failure/exception handling approach (as
proposed in
>>>>>>>>>> this
>>>>>>>>> KIP)
>>>>>>>> is well-suited for errors in the category of deserialization
>>>>>>>>> aka
>>>>>>>> poison pills, partly because the (default) serdes are defined
>>>>>>>>>> configuration (explicit serdes however are defined
through API
>>>>>>>>>> calls).
>>>>>>>>>> However, I'm not yet convinced that the failure/exception
>>>>>>>>> approach
>>>>>>>>>> is the best idea for user code exceptions, e.g. if
you fail to
>>>>>>>>>> guard
>>>>>>>>>> against NPE in your lambdas or divide a number by
>>>>>>>>>>        scala> val stream = Seq(1, 2, 3, 4, 5)
>>>>>>>>>>        stream: Seq[Int] = List(1, 2, 3, 4, 5)
>>>>>>>>>>        // Here: Fallback to a sane default when encountering
>>>>>>>>> records
>>>>>>>>        scala>     stream.map(x => Try(1/(3 - x))).flatMap(t
>>>>>>>>>> Seq(t.getOrElse(42)))
>>>>>>>>>>        res19: Seq[Int] = List(0, 1, 42, -1, 0)
>>>>>>>>>>        // Here: Skip over failed records
>>>>>>>>>>        scala> stream.map(x => Try(1/(3 - x))).collect{
>>>>>>>>>> Success(s)
>>>>>>>>> => s
>>>>>>>> }
>>>>>>>>>>        res20: Seq[Int] = List(0, 1, -1, 0)
>>>>>>>>>> The above is more natural to me than using error
handlers to define
>>>>>>>>>> how
>>>>>>>>> to
>>>>>>>>>> deal with failed records (here, the value `3` causes
an arithmetic
>>>>>>>>>> exception).  Again, it might help the KIP if we added
an end-to-end
>>>>>>>>> example
>>>>>>>>>> for such user code errors.
>>>>>>>>>> ----------------------------snip----------------------------
>>>>>>>>>> On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak <
>>>>>>>>> Jan.Filipiak@trivago.com>
>>>>>>>> wrote:
>>>>>>>>>> Hi Jay,
>>>>>>>>>>> Eno mentioned that he will narrow down the scope
to only
>>>>>>>>>> ConsumerRecord
>>>>>>>> deserialisation.
>>>>>>>>>>> I am working with Database Changelogs only. I
would really not
>>>>>>>>>>> like
>>>>>>>>>>> to
>>>>>>>>>> see
>>>>>>>>>> a dead letter queue or something
>>>>>>>>>>> similliar. how am I expected to get these back
in order. Just
>>>>>>>>>>> grind
>>>>>>>>>>> to
>>>>>>>>>>> hold an call me on the weekend. I'll fix it
>>>>>>>>>>> then in a few minutes rather spend 2 weeks ordering
dead letters.
>>>>>>>>>> (where
>>>>>>>> reprocessing might be even the faster fix)
>>>>>>>>>>> Best Jan
>>>>>>>>>>> On 29.05.2017 20:23, Jay Kreps wrote:
>>>>>>>>>>>        - I think we should hold off on retries
unless we have
>>>>>>>>>>> worked
>>>>>>>>>>> out
>>>>>>>> the
>>>>>>>>>>        full usage pattern, people can always implement
their own. I
>>>>>>>>>>> think
>>>>>>>> the idea
>>>>>>>>>>>>        is that you send the message to some
kind of dead
>>>>>>>>>>>> letter queue
>>>>>>>>>>> and
>>>>>>>> then
>>>>>>>>>>>>        replay these later. This obviously
destroys all semantic
>>>>>>>>>>> guarantees
>>>>>>>> we are
>>>>>>>>>>>>        working hard to provide right now,
which may be okay.

View raw message