kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Matthias J. Sax" <matth...@confluent.io>
Subject Re: [DISCUSS]: KIP-161: streams record processing exception handlers
Date Mon, 05 Jun 2017 18:26:24 GMT
@Jan: EOS will be turned off by default in 0.11. I assume, we might
enable it by default in a later release but the will be always a config
to disable it.


On 6/5/17 10:19 AM, Jan Filipiak wrote:
> Hi
> just my few thoughts
> On 05.06.2017 11:44, Eno Thereska wrote:
>> Hi there,
>> Sorry for the late reply, I was out this past week. Looks like good
>> progress was made with the discussions either way. Let me recap a
>> couple of points I saw into one big reply:
>> 1. Jan mentioned CRC errors. I think this is a good point. As these
>> happen in Kafka, before Kafka Streams gets a chance to inspect
>> anything, I'd like to hear the opinion of more Kafka folks like Ismael
>> or Jason on this one. Currently the documentation is not great with
>> what to do once a CRC check has failed. From looking at the code, it
>> looks like the client gets a KafkaException (bubbled up from the
>> fetcher) and currently we in streams catch this as part of poll() and
>> fail. It might be advantageous to treat CRC handling in a similar way
>> to serialisation handling (e.g., have the option to fail/skip). Let's
>> see what the other folks say. Worst-case we can do a separate KIP for
>> that if it proved too hard to do in one go.
> there is no reasonable way to "skip" a crc error. How can you know the
> length you read was anything reasonable? you might be completely lost
> inside your response.
>> 2. Damian has convinced me that the KIP should just be for
>> deserialisation from the network, not from local state store DBs. For
>> the latter we'll follow the current way of failing since the DB is
>> likely corrupt.
>> 3. Dead letter queue option. There was never any intention here to do
>> anything super clever like attempt to re-inject the failed records
>> from the dead letter queue back into the system. Reasoning about when
>> that'd be useful in light of all sorts of semantic breakings would be
>> hard (arguably impossible). The idea was to just have a place to have
>> all these dead records to help with subsequent debugging. We could
>> also just log a whole bunch of info for a poison pill record and not
>> have a dead letter queue at all. Perhaps that's a better, simpler,
>> starting point.
> +1
>> 4. Agree with Jay on style, a DefaultHandler with some config options.
>> Will add options to KIP. Also as part of this let's remove the
>> threshold logger since it gets complex and arguably the ROI is low.
>> 5. Jay's JSON example, where serialisation passes but the JSON message
>> doesn't have the expected fields, is an interesting one. It's a bit
>> complicated to handle this in the middle of processing. For example,
>> some operators in the DAG might actually find the needed JSON fields
>> and make progress, but other operators, for the same record, might not
>> find their fields and will throw an exception.
>> At a minimum, handling this type of exception will need to involve the
>> exactly-once (EoS) logic. We'd still allow the option of failing or
>> skipping, but EoS would need to clean up by rolling back all the side
>> effects from the processing so far. Matthias, how does this sound?
> Eos will not help the record might be 5,6 repartitions down into the
> topology. I haven't followed but I pray you made EoS optional! We don't
> need this and we don't want this and we will turn it off if it comes. So
> I wouldn't recommend relying on it. The option to turn it off is better
> than forcing it and still beeing unable to rollback badpills (as
> explained before)
>> 6. Will add an end-to-end example as Michael suggested.
>> Thanks
>> Eno
>>> On 4 Jun 2017, at 02:35, Matthias J. Sax <matthias@confluent.io> wrote:
>>> What I don't understand is this:
>>>>  From there on its the easiest way forward: fix, redeploy, start =>
>>>> done
>>> If you have many producers that work fine and a new "bad" producer
>>> starts up and writes bad data into your input topic, your Streams app
>>> dies but all your producers, including the bad one, keep writing.
>>> Thus, how would you fix this, as you cannot "remove" the corrupted date
>>> from the topic? It might take some time to identify the root cause and
>>> stop the bad producer. Up to this point you get good and bad data into
>>> your Streams input topic. If Streams app in not able to skip over those
>>> bad records, how would you get all the good data from the topic? Not
>>> saying it's not possible, but it's extra work copying the data with a
>>> new non-Streams consumer-producer-app into a new topic and than feed
>>> your Streams app from this new topic -- you also need to update all your
>>> upstream producers to write to the new topic.
>>> Thus, if you want to fail fast, you can still do this. And after you
>>> detected and fixed the bad producer you might just reconfigure your app
>>> to skip bad records until it reaches the good part of the data.
>>> Afterwards, you could redeploy with fail-fast again.
>>> Thus, for this pattern, I actually don't see any reason why to stop the
>>> Streams app at all. If you have a callback, and use the callback to
>>> raise an alert (and maybe get the bad data into a bad record queue), it
>>> will not take longer to identify and stop the "bad" producer. But for
>>> this case, you have zero downtime for your Streams app.
>>> This seems to be much simpler. Or do I miss anything?
>>> Having said this, I agree that the "threshold based callback" might be
>>> questionable. But as you argue for strict "fail-fast", I want to argue
>>> that this must not always be the best pattern to apply and that the
>>> overall KIP idea is super useful from my point of view.
>>> -Matthias
>>> On 6/3/17 11:57 AM, Jan Filipiak wrote:
>>>> Could not agree more!
>>>> But then I think the easiest is still: print exception and die.
>>>>  From there on its the easiest way forward: fix, redeploy, start =>
>>>> done
>>>> All the other ways to recover a pipeline that was processing partially
>>>> all the time
>>>> and suddenly went over a "I cant take it anymore" threshold is not
>>>> straight forward IMO.
>>>> How to find the offset, when it became to bad when it is not the latest
>>>> commited one?
>>>> How to reset there? with some reasonable stuff in your rockses?
>>>> If one would do the following. The continuing Handler would measure for
>>>> a threshold and
>>>> would terminate after a certain threshold has passed (per task). Then
>>>> one can use offset commit/ flush intervals
>>>> to make reasonable assumption of how much is slipping by + you get an
>>>> easy recovery when it gets to bad
>>>> + you could also account for "in processing" records.
>>>> Setting this threshold to zero would cover all cases with 1
>>>> implementation. It is still beneficial to have it pluggable
>>>> Again CRC-Errors are the only bad pills we saw in production for now.
>>>> Best Jan
>>>> On 02.06.2017 17:37, Jay Kreps wrote:
>>>>> Jan, I agree with you philosophically. I think one practical challenge
>>>>> has
>>>>> to do with data formats. Many people use untyped events, so there is
>>>>> simply
>>>>> no guarantee on the form of the input. E.g. many companies use JSON
>>>>> without
>>>>> any kind of schema so it becomes very hard to assert anything about
>>>>> the
>>>>> input which makes these programs very fragile to the "one accidental
>>>>> message publication that creates an unsolvable problem.
>>>>> For that reason I do wonder if limiting to just serialization actually
>>>>> gets
>>>>> you a useful solution. For JSON it will help with the problem of
>>>>> non-parseable JSON, but sounds like it won't help in the case where
>>>>> the
>>>>> JSON is well-formed but does not have any of the fields you expect and
>>>>> depend on for your processing. I expect the reason for limiting the
>>>>> scope
>>>>> is it is pretty hard to reason about correctness for anything that
>>>>> stops in
>>>>> the middle of processing an operator DAG?
>>>>> -Jay
>>>>> On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak
>>>>> <Jan.Filipiak@trivago.com>
>>>>> wrote:
>>>>>> IMHO your doing it wrong then. + building to much support into the
>>>>>> kafka
>>>>>> eco system is very counterproductive in fostering a happy userbase
>>>>>> On 02.06.2017 13:15, Damian Guy wrote:
>>>>>>> Jan, you have a choice to Fail fast if you want. This is about
>>>>>>> giving
>>>>>>> people options and there are times when you don't want to fail
>>>>>>> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak <Jan.Filipiak@trivago.com>
>>>>>>> wrote:
>>>>>>> Hi
>>>>>>>> 1.
>>>>>>>> That greatly complicates monitoring.  Fail Fast gives you
that when
>>>>>>>> you
>>>>>>>> monitor only the lag of all your apps
>>>>>>>> you are completely covered. With that sort of new application
>>>>>>>> Monitoring
>>>>>>>> is very much more complicated as
>>>>>>>> you know need to monitor fail % of some special apps aswell.
In my
>>>>>>>> opinion that is a huge downside already.
>>>>>>>> 2.
>>>>>>>> using a schema regerstry like Avrostuff it might not even
be the
>>>>>>>> record
>>>>>>>> that is broken, it might be just your app
>>>>>>>> unable to fetch a schema it needs now know. Maybe you got
>>>>>>>> partitioned
>>>>>>>> away from that registry.
>>>>>>>> 3. When you get alerted because of to high fail percentage.
>>>>>>>> are the
>>>>>>>> steps you gonna do?
>>>>>>>> shut it down to buy time. fix the problem. spend way to much
>>>>>>>> time to
>>>>>>>> find a good reprocess offset.
>>>>>>>> Your timewindows are in bad shape anyways, and you pretty
>>>>>>>> lost.
>>>>>>>> This routine is nonsense.
>>>>>>>> Dead letter queues would be the worst possible addition to
>>>>>>>> kafka
>>>>>>>> toolkit that I can think of. It just doesn't fit the architecture
>>>>>>>> of having clients falling behind is a valid option.
>>>>>>>> Further. I mentioned already the only bad pill ive seen so
>>>>>>>> is crc
>>>>>>>> errors. any plans for those?
>>>>>>>> Best Jan
>>>>>>>> On 02.06.2017 11:34, Damian Guy wrote:
>>>>>>>>> I agree with what Matthias has said w.r.t failing fast.
There are
>>>>>>>>> plenty
>>>>>>>> of
>>>>>>>>> times when you don't want to fail-fast and must attempt
to  make
>>>>>>>> progress.
>>>>>>>>> The dead-letter queue is exactly for these circumstances.
>>>>>>>>> course if
>>>>>>>>> every record is failing, then you probably do want to
give up.
>>>>>>>>> On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax
>>>>>>>>> <matthias@confluent.io>
>>>>>>>> wrote:
>>>>>>>>> First a meta comment. KIP discussion should take place
on the dev
>>>>>>>>> list
>>>>>>>>>> -- if user list is cc'ed please make sure to reply
to both lists.
>>>>>>>>> Thanks.
>>>>>>>>> Thanks for making the scope of the KIP clear. Makes a
lot of
>>>>>>>>> sense to
>>>>>>>>>> focus on deserialization exceptions for now.
>>>>>>>>>> With regard to corrupted state stores, would it make
sense to
>>>>>>>>>> fail a
>>>>>>>>>> task and wipe out the store to repair it via recreation
from the
>>>>>>>>>> changelog? That's of course a quite advance pattern,
but I
>>>>>>>>>> want to
>>>>>>>>>> bring
>>>>>>>>>> it up to design the first step in a way such that
we can get
>>>>>>>>>> there (if
>>>>>>>>>> we think it's a reasonable idea).
>>>>>>>>>> I also want to comment about fail fast vs making
progress. I
>>>>>>>>>> think that
>>>>>>>>>> fail-fast must not always be the best option. The
scenario I
>>>>>>>>>> have in
>>>>>>>>>> mind is like this: you got a bunch of producers that
feed the
>>>>>>>>>> Streams
>>>>>>>>>> input topic. Most producers work find, but maybe
one producer
>>>>>>>>>> miss
>>>>>>>>>> behaves and the data it writes is corrupted. You
might not even
>>>>>>>>>> be able
>>>>>>>>>> to recover this lost data at any point -- thus, there
is no
>>>>>>>>>> reason to
>>>>>>>>>> stop processing but you just skip over those records.
>>>>>>>>>> course, you
>>>>>>>>>> need to fix the root cause, and thus you need to
alert (either
>>>>>>>>>> via logs
>>>>>>>>>> of the exception handler directly) and you need to
start to
>>>>>>>>>> investigate
>>>>>>>>>> to find the bad producer, shut it down and fix it.
>>>>>>>>>> Here the dead letter queue comes into place. From
>>>>>>>>>> understanding, the
>>>>>>>>>> purpose of this feature is solely enable post debugging.
I don't
>>>>>>>>>> think
>>>>>>>>>> those record would be fed back at any point in time
(so I don't
>>>>>>>>>> see any
>>>>>>>>>> ordering issue -- a skipped record, with this regard,
is just
>>>>>>>>>> "fully
>>>>>>>>>> processed"). Thus, the dead letter queue should actually
>>>>>>>>>> encode the
>>>>>>>>>> original records metadata (topic, partition offset
etc) to enable
>>>>>>>>>> such
>>>>>>>>>> debugging. I guess, this might also be possible if
you just log
>>>>>>>>>> the bad
>>>>>>>>>> records, but it would be harder to access (you first
must find
>>>>>>>>>> the
>>>>>>>>>> Streams instance that did write the log and extract
>>>>>>>>>> information
>>>>>>>>>> from
>>>>>>>>>> there). Reading it from topic is much simpler.
>>>>>>>>>> I also want to mention the following. Assume you
have such a
>>>>>>>>>> topic with
>>>>>>>>>> some bad records and some good records. If we always
>>>>>>>>>> fail-fast, it's
>>>>>>>>>> going to be super hard to process the good data.
You would
>>>>>>>>>> need to
>>>>>>>>>> write
>>>>>>>>>> an extra app that copied the data into a new topic
>>>>>>>>>> out the
>>>>>>>>>> bad
>>>>>>>>>> records (or apply the map() workaround withing stream).
So I
>>>>>>>>>> don't
>>>>>>>>>> think
>>>>>>>>>> that failing fast is most likely the best option
in production is
>>>>>>>>>> necessarily, true.
>>>>>>>>>> Or do you think there are scenarios, for which you
can recover
>>>>>>>>>> the
>>>>>>>>>> corrupted records successfully? And even if this
is possible, it
>>>>>>>>>> might
>>>>>>>>>> be a case for reprocessing instead of failing the
>>>>>>>>>> application?
>>>>>>>>>> Also, if you think you can "repair" a corrupted record,
>>>>>>>>>> the
>>>>>>>>>> handler allow to return a "fixed" record? This would
solve the
>>>>>>>>>> ordering
>>>>>>>>>> problem.
>>>>>>>>>> -Matthias
>>>>>>>>>> On 5/30/17 1:47 AM, Michael Noll wrote:
>>>>>>>>>>> Thanks for your work on this KIP, Eno -- much
>>>>>>>>>>> - I think it would help to improve the KIP by
adding an
>>>>>>>>>>> end-to-end
>>>>>>>>>>> code
>>>>>>>>>>> example that demonstrates, with the DSL and with
>>>>>>>>>>> Processor API,
>>>>>>>>>>> how
>>>>>>>>>> the
>>>>>>>>>>> user would write a simple application that would
then be
>>>>>>>>>>> augmented
>>>>>>>>>>> with
>>>>>>>>>> the
>>>>>>>>>>> proposed KIP changes to handle exceptions.  It
should also
>>>>>>>>>>> become much
>>>>>>>>>>> clearer then that e.g. the KIP would lead to
different code
>>>>>>>>>>> paths for
>>>>>>>>>> the
>>>>>>>>> happy case and any failure scenarios.
>>>>>>>>>>> - Do we have sufficient information available
to make informed
>>>>>>>>>> decisions
>>>>>>>>> on
>>>>>>>>>>> what to do next?  For example, do we know in
which part of the
>>>>>>>>>>> topology
>>>>>>>>>> the
>>>>>>>>>>> record failed? `ConsumerRecord` gives us access
to topic,
>>>>>>>>>>> partition,
>>>>>>>>>>> offset, timestamp, etc., but what about topology-related
>>>>>>>>>>> information
>>>>>>>>>> (e.g.
>>>>>>>>>>> what is the associated state store, if any)?
>>>>>>>>>>> - Only partly on-topic for the scope of this
KIP, but this is
>>>>>>>>>>> about
>>>>>>>>>>> the
>>>>>>>>>>> bigger picture: This KIP would give users the
option to send
>>>>>>>>>>> corrupted
>>>>>>>>>>> records to dead letter queue (quarantine topic).
 But, what
>>>>>>>>>>> pattern
>>>>>>>>>> would
>>>>>>>>> we advocate to process such a dead letter queue then,
e.g. how to
>>>>>>>>> allow
>>>>>>>>>> for
>>>>>>>>>>> retries with backoff ("If the first record in
the dead letter
>>>>>>>>>>> queue
>>>>>>>>>> fails
>>>>>>>>> again, then try the second record for the time being
and go back
>>>>>>>>> to the
>>>>>>>>>>> first record at a later time").  Jay and Jan
already alluded to
>>>>>>>>>> ordering
>>>>>>>>> problems that will be caused by dead letter queues. As
I said,
>>>>>>>>> retries
>>>>>>>>>>> might be out of scope but perhaps the implications
should be
>>>>>>>>>>> considered
>>>>>>>>>> if
>>>>>>>>>>> possible?
>>>>>>>>>>> Also, I wrote the text below before reaching
the point in the
>>>>>>>>>> conversation
>>>>>>>>>>> that this KIP's scope will be limited to exceptions
in the
>>>>>>>>>>> category of
>>>>>>>>>>> poison pills / deserialization errors.  But since
Jay brought up
>>>>>>>>>>> user
>>>>>>>>>> code
>>>>>>>>>>> errors again, I decided to include it again.
>>>>>>>>>>> ----------------------------snip----------------------------
>>>>>>>>>>> A meta comment: I am not sure about this split
between the
>>>>>>>>>>> code for
>>>>>>>>>>> the
>>>>>>>>>>> happy path (e.g. map/filter/... in the DSL) from
the failure
>>>>>>>>>>> path
>>>>>>>>>> (using
>>>>>>>>> exception handlers).  In Scala, for example, we can do:
>>>>>>>>>>>        scala> val computation = scala.util.Try(1
/ 0)
>>>>>>>>>>>        computation: scala.util.Try[Int] =
>>>>>>>>>>> Failure(java.lang.ArithmeticException: / by zero)
>>>>>>>>>>>        scala> computation.getOrElse(42)
>>>>>>>>>>>        res2: Int = 42
>>>>>>>>>>> Another example with Scala's pattern matching,
which is
>>>>>>>>>>> similar to
>>>>>>>>>>> `KStream#branch()`:
>>>>>>>>>>>        computation match {
>>>>>>>>>>>          case scala.util.Success(x) => x *
>>>>>>>>>>>          case scala.util.Failure(_) => 42
>>>>>>>>>>>        }
>>>>>>>>>>> (The above isn't the most idiomatic way to handle
this in Scala,
>>>>>>>>>>> but
>>>>>>>>>> that's
>>>>>>>>>>> not the point I'm trying to make here.)
>>>>>>>>>>> Hence the question I'm raising here is: Do we
want to have an
>>>>>>>>>>> API
>>>>>>>>>>> where
>>>>>>>>>> you
>>>>>>>>>>> code "the happy path", and then have a different
code path for
>>>>>>>>>>> failures
>>>>>>>>>>> (using exceptions and handlers);  or should we
treat both
>>>>>>>>>>> Success and
>>>>>>>>>>> Failure in the same way?
>>>>>>>>>>> I think the failure/exception handling approach
(as proposed in
>>>>>>>>>>> this
>>>>>>>>>> KIP)
>>>>>>>>> is well-suited for errors in the category of deserialization
>>>>>>>>> problems
>>>>>>>>>> aka
>>>>>>>>> poison pills, partly because the (default) serdes are
>>>>>>>>> through
>>>>>>>>>>> configuration (explicit serdes however are defined
through API
>>>>>>>>>>> calls).
>>>>>>>>>>> However, I'm not yet convinced that the failure/exception
>>>>>>>>>>> handling
>>>>>>>>>> approach
>>>>>>>>>>> is the best idea for user code exceptions, e.g.
if you fail to
>>>>>>>>>>> guard
>>>>>>>>>>> against NPE in your lambdas or divide a number
by zero.
>>>>>>>>>>>        scala> val stream = Seq(1, 2, 3, 4,
>>>>>>>>>>>        stream: Seq[Int] = List(1, 2, 3, 4, 5)
>>>>>>>>>>>        // Here: Fallback to a sane default when
>>>>>>>>>>> failed
>>>>>>>>>> records
>>>>>>>>>        scala>     stream.map(x => Try(1/(3 - x))).flatMap(t
>>>>>>>>>>> Seq(t.getOrElse(42)))
>>>>>>>>>>>        res19: Seq[Int] = List(0, 1, 42, -1, 0)
>>>>>>>>>>>        // Here: Skip over failed records
>>>>>>>>>>>        scala> stream.map(x => Try(1/(3
- x))).collect{ case
>>>>>>>>>>> Success(s)
>>>>>>>>>> => s
>>>>>>>>> }
>>>>>>>>>>>        res20: Seq[Int] = List(0, 1, -1, 0)
>>>>>>>>>>> The above is more natural to me than using error
handlers to
>>>>>>>>>>> define
>>>>>>>>>>> how
>>>>>>>>>> to
>>>>>>>>>>> deal with failed records (here, the value `3`
causes an
>>>>>>>>>>> arithmetic
>>>>>>>>>>> exception).  Again, it might help the KIP if
we added an
>>>>>>>>>>> end-to-end
>>>>>>>>>> example
>>>>>>>>>>> for such user code errors.
>>>>>>>>>>> ----------------------------snip----------------------------
>>>>>>>>>>> On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak
>>>>>>>>>> Jan.Filipiak@trivago.com>
>>>>>>>>> wrote:
>>>>>>>>>>> Hi Jay,
>>>>>>>>>>>> Eno mentioned that he will narrow down the
scope to only
>>>>>>>>>>> ConsumerRecord
>>>>>>>>> deserialisation.
>>>>>>>>>>>> I am working with Database Changelogs only.
I would really not
>>>>>>>>>>>> like
>>>>>>>>>>>> to
>>>>>>>>>>> see
>>>>>>>>>>> a dead letter queue or something
>>>>>>>>>>>> similliar. how am I expected to get these
back in order. Just
>>>>>>>>>>>> grind
>>>>>>>>>>>> to
>>>>>>>>>>>> hold an call me on the weekend. I'll fix
>>>>>>>>>>>> then in a few minutes rather spend 2 weeks
ordering dead
>>>>>>>>>>>> letters.
>>>>>>>>>>> (where
>>>>>>>>> reprocessing might be even the faster fix)
>>>>>>>>>>>> Best Jan
>>>>>>>>>>>> On 29.05.2017 20:23, Jay Kreps wrote:
>>>>>>>>>>>>        - I think we should hold off on retries
unless we have
>>>>>>>>>>>> worked
>>>>>>>>>>>> out
>>>>>>>>> the
>>>>>>>>>>>        full usage pattern, people can always
implement their
>>>>>>>>>>> own. I
>>>>>>>>>>>> think
>>>>>>>>> the idea
>>>>>>>>>>>>>        is that you send the message to
some kind of dead
>>>>>>>>>>>>> letter queue
>>>>>>>>>>>> and
>>>>>>>>> then
>>>>>>>>>>>>>        replay these later. This obviously
destroys all
>>>>>>>>>>>>> semantic
>>>>>>>>>>>> guarantees
>>>>>>>>> we are
>>>>>>>>>>>>>        working hard to provide right
now, which may be okay.

View raw message