kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sachin Mittal <sjmit...@gmail.com>
Subject Re: Fixing two critical bugs in kafka streams
Date Mon, 06 Mar 2017 05:04:10 GMT
Hi
On CommitFailedException at onPartitionsRevoked if it is thrown it gets
assigned to rebalanceException.
This causes the stream thread to shutdown. I am not sure how we can resume
the thread.

Note thread is not in invalid state because because it already has been
assigned new partitions and this exception happens when trying to revoke
old partitions which have been moved to some other thread, so we need to
swallow this exception at the StreanThread side too, just like we swallow
it at ConsumerCoordinator.java

Also I fixed this against code base 0.10.2.0 and the difference in that vs
trunk code is these lines
10.2.0
       if (firstException.get() == null) {
            firstException.set(commitOffsets());
       }
 vs trunk
        if (firstException.get() == null) {
            // TODO: currently commit failures will not be thrown to users
            // while suspending tasks; this need to be re-visit after KIP-98
            commitOffsets();
        }
I am again not sure since this part is still a TODO, but looking at code I
see that commitOffsets can still throw the CommitFailedException which
needs to be handled at onPartitionsRevoked.

Hope this makes sense.

On second issue, the deadlock is not caused by CommitFailedExceptio, but
after fixing the deadlock we need to make sure thread does not die due to
unhandled CommitFailedException at onPartitionsRevoked.
The deadlock issue is like this.
If a thread has two partitions and while processing partition one it takes
more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is evicted
from the group and both partitions are now migrated to some other thread.
Now when it tries to process the partition two it tries to get the lock to
rocks db. It won't get the lock since that partition is now moved to some
other thread. So it keeps increasing the backoffTimeMs and keeps trying to
get the lock forever. This reaching a deadlock.
To fix this we need some upper bound of the time limit till it tries to get
that lock. And that upper bound has to be MAX_POLL_INTERVAL_MS_CONFIG,
because if by that time it has not got the lock, we can see that this
thread was evicted from the group and need to rejoin again to get new
partitions.

On JIRA issue I can create one and attach the part of logs where it keeps
trying to get the lock with increasing backoffTimeM.

Let me know if these makes sense. Right now this is the best way we could
come up with to handle stream thread failures.

Also on a side note I feel we need more resilient streams. If we have say
configured our streams application with 4 threads and for whatever reason a
thread dies, then application should itself (or via some exposed hooks),
allow to restart a new thread (because in Java I guess same thread cannot
be restarted), so that number of threads always stay what one has
configured.
I think exposed hooks will be better option to do this.

Thanks
Sachin




On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax <matthias@confluent.io>
wrote:

> Sachin,
>
> thanks a lot for contributing!
>
> Right now, I am not sure if I understand the change. On
> CommitFailedException, why can we just resume the thread? To me, it
> seems that the thread will be in an invalid state and thus it's not save
> to just swallow the exception and keep going. Can you shed some light?
>
> And from my understanding, the deadlock is "caused" by the change from
> above, right? So if it is save to swallow the exception, we should do
> some "clean up" to avoid the deadlock in the first place, instead of
> applying and additional timeout.
>
> Also, if this is a bug, we should have a JIRA.
>
> -Matthias
>
>
> On 3/5/17 4:11 AM, Sachin Mittal wrote:
> > Hi,
> > Please find the new PR
> > https://github.com/apache/kafka/pull/2642/
> >
> > I see that in trunk there has been change which is different from in
> 10.2.0
> >
> > 10.2.0
> >        if (firstException.get() == null) {
> >             firstException.set(commitOffsets());
> >        }
> >  vs trunk
> >         if (firstException.get() == null) {
> >             // TODO: currently commit failures will not be thrown to
> users
> >             // while suspending tasks; this need to be re-visit after
> KIP-98
> >             commitOffsets();
> >         }
> > I am not sure in view of this is is my part of the fix still valid. Looks
> > like it is still valid.
> >
> > Also on side note what is the policy of closing a branch that is just
> > released.
> >
> > Since you have release 10.2.0 we are using that and that is why have made
> > changes in that branch so that our changes just modify the needed code
> and
> > we don't mess up the other released code.
> >
> > Is the new release released off the branch 10.2.0, if yes then you should
> > not close it as there can be patch fixes on them.
> >
> > Or is the release always made off the branch trunk. In that case how can
> we
> > pick up the code on which the release binaries were created so when we
> > build the binary we have exactly same code as released one, plus any
> > changes (we or someone else) makes on it.
> >
> > Also if a branch is closed, then perhaps we should delete it or mark it
> > closed or something.
> >
> > Please let us know how releases get created (off what codebase), so we
> are
> > more exact in applying our changes to.
> >
> > Thanks
> > Sachin
> >
> >
> >
> > On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska <eno.thereska@gmail.com>
> wrote:
> >
> >> Thanks Sachin, one thing before the review, 0.10.2 is closed now, this
> >> needs to target trunk.
> >>
> >> Thanks
> >> Eno
> >>> On 5 Mar 2017, at 09:10, Sachin Mittal <sjmittal@gmail.com> wrote:
> >>>
> >>> Please review the PR and let me know if this makes sense.
> >>>
> >>> https://github.com/apache/kafka/pull/2640
> >>>
> >>> Thanks
> >>> Sachin
> >>>
> >>>
> >>> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <eno.thereska@gmail.com>
> >> wrote:
> >>>
> >>>> Thanks Sachin for your contribution. Could you create a pull request
> out
> >>>> of the commit (so we can add comments, and also so you are
> acknowledged
> >>>> properly for your contribution)?
> >>>>
> >>>> Thanks
> >>>> Eno
> >>>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sjmittal@gmail.com>
wrote:
> >>>>>
> >>>>> Hi,
> >>>>> So far in our experiment we have encountered 2 critical bugs.
> >>>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG to
> compute a
> >>>>> cycle it gets evicted from group and rebalance takes place and it
> gets
> >>>> new
> >>>>> assignment.
> >>>>> However when this thread tries to commit offsets for the revoked
> >>>> partitions
> >>>>> in
> >>>>> onPartitionsRevoked it will again throw the CommitFailedException.
> >>>>>
> >>>>> This gets handled by ConsumerCoordinatorso there is no point to
> assign
> >>>> this
> >>>>> exception to
> >>>>> rebalanceException in StreamThread and stop it. It has already been
> >>>>> assigned new partitions and it can continue.
> >>>>>
> >>>>> So as fix in case on CommitFailedException I am not killing the
> >>>> StreamThrea.
> >>>>>
> >>>>> 2. Next we see a deadlock state when to process a task it takes
> longer
> >>>>> than MAX_POLL_INTERVAL_MS_CONFIG
> >>>>> time. Then this threads partitions are assigned to some other thread
> >>>>> including rocksdb lock. When it tries to process the next task it
> >> cannot
> >>>>> get rocks db lock and simply keeps waiting for that lock forever.
> >>>>>
> >>>>> in retryWithBackoff for AbstractTaskCreator we have a backoffTimeMs
=
> >>>> 50L.
> >>>>> If it does not get lock the we simply increase the time by 10x and
> keep
> >>>>> trying inside the while true loop.
> >>>>>
> >>>>> We need to have a upper bound for this backoffTimeM. If the time
is
> >>>> greater
> >>>>> than  MAX_POLL_INTERVAL_MS_CONFIG and it still hasn't got the lock
> >> means
> >>>>> this thread's partitions are moved somewhere else and it may not
get
> >> the
> >>>>> lock again.
> >>>>>
> >>>>> So I have added an upper bound check in that while loop.
> >>>>>
> >>>>> The commits are here:
> >>>>> https://github.com/sjmittal/kafka/commit/
> >> 6f04327c890c58cab9b1ae108af4ce
> >>>> 5c4e3b89a1
> >>>>>
> >>>>> please review and if you feel they make sense, please merge it to
> main
> >>>>> branch.
> >>>>>
> >>>>> Thanks
> >>>>> Sachin
> >>>>
> >>>>
> >>
> >>
> >
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message