kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sachin Mittal <sjmit...@gmail.com>
Subject Re: Fixing two critical bugs in kafka streams
Date Mon, 06 Mar 2017 15:16:44 GMT
Please find the JIRA https://issues.apache.org/jira/browse/KAFKA-4848


On Mon, Mar 6, 2017 at 5:20 PM, Damian Guy <damian.guy@gmail.com> wrote:

> Hi Sachin,
>
> If it is a bug then please file a JIRA for it, too.
>
> Thanks,
> Damian
>
> On Mon, 6 Mar 2017 at 11:23 Sachin Mittal <sjmittal@gmail.com> wrote:
>
> > Ok that's great.
> > So you have already fixed that issue.
> >
> > I have modified my PR to remove that change (which was done keeping
> > 0.10.2.0 in mind).
> >
> > However the other issue is still valid.
> >
> > Please review that change. https://github.com/apache/kafka/pull/2642
> >
> >
> > Thanks
> > Sachin
> >
> >
> > On Mon, Mar 6, 2017 at 3:56 PM, Damian Guy <damian.guy@gmail.com> wrote:
> >
> > > On trunk the CommitFailedException isn't thrown anymore. The
> > commitOffsets
> > > method doesn't throw an exception. It returns one if it was thrown. We
> > used
> > > to throw this exception during suspendTasksAndState, but we don't
> > anymore.
> > >
> > > On Mon, 6 Mar 2017 at 05:04 Sachin Mittal <sjmittal@gmail.com> wrote:
> > >
> > > > Hi
> > > > On CommitFailedException at onPartitionsRevoked if it is thrown it
> gets
> > > > assigned to rebalanceException.
> > > > This causes the stream thread to shutdown. I am not sure how we can
> > > resume
> > > > the thread.
> > > >
> > > > Note thread is not in invalid state because because it already has
> been
> > > > assigned new partitions and this exception happens when trying to
> > revoke
> > > > old partitions which have been moved to some other thread, so we need
> > to
> > > > swallow this exception at the StreanThread side too, just like we
> > swallow
> > > > it at ConsumerCoordinator.java
> > > >
> > > > Also I fixed this against code base 0.10.2.0 and the difference in
> that
> > > vs
> > > > trunk code is these lines
> > > > 10.2.0
> > > >        if (firstException.get() == null) {
> > > >             firstException.set(commitOffsets());
> > > >        }
> > > >  vs trunk
> > > >         if (firstException.get() == null) {
> > > >             // TODO: currently commit failures will not be thrown to
> > > users
> > > >             // while suspending tasks; this need to be re-visit after
> > > > KIP-98
> > > >             commitOffsets();
> > > >         }
> > > > I am again not sure since this part is still a TODO, but looking at
> > code
> > > I
> > > > see that commitOffsets can still throw the CommitFailedException
> which
> > > > needs to be handled at onPartitionsRevoked.
> > > >
> > > > Hope this makes sense.
> > > >
> > > > On second issue, the deadlock is not caused by CommitFailedExceptio,
> > but
> > > > after fixing the deadlock we need to make sure thread does not die
> due
> > to
> > > > unhandled CommitFailedException at onPartitionsRevoked.
> > > > The deadlock issue is like this.
> > > > If a thread has two partitions and while processing partition one it
> > > takes
> > > > more than MAX_POLL_INTERVAL_MS_CONFIG time, then this thread is
> evicted
> > > > from the group and both partitions are now migrated to some other
> > thread.
> > > > Now when it tries to process the partition two it tries to get the
> lock
> > > to
> > > > rocks db. It won't get the lock since that partition is now moved to
> > some
> > > > other thread. So it keeps increasing the backoffTimeMs and keeps
> trying
> > > to
> > > > get the lock forever. This reaching a deadlock.
> > > > To fix this we need some upper bound of the time limit till it tries
> to
> > > get
> > > > that lock. And that upper bound has to be
> MAX_POLL_INTERVAL_MS_CONFIG,
> > > > because if by that time it has not got the lock, we can see that this
> > > > thread was evicted from the group and need to rejoin again to get new
> > > > partitions.
> > > >
> > > > On JIRA issue I can create one and attach the part of logs where it
> > keeps
> > > > trying to get the lock with increasing backoffTimeM.
> > > >
> > > > Let me know if these makes sense. Right now this is the best way we
> > could
> > > > come up with to handle stream thread failures.
> > > >
> > > > Also on a side note I feel we need more resilient streams. If we have
> > say
> > > > configured our streams application with 4 threads and for whatever
> > > reason a
> > > > thread dies, then application should itself (or via some exposed
> > hooks),
> > > > allow to restart a new thread (because in Java I guess same thread
> > cannot
> > > > be restarted), so that number of threads always stay what one has
> > > > configured.
> > > > I think exposed hooks will be better option to do this.
> > > >
> > > > Thanks
> > > > Sachin
> > > >
> > > >
> > > >
> > > >
> > > > On Mon, Mar 6, 2017 at 5:40 AM, Matthias J. Sax <
> matthias@confluent.io
> > >
> > > > wrote:
> > > >
> > > > > Sachin,
> > > > >
> > > > > thanks a lot for contributing!
> > > > >
> > > > > Right now, I am not sure if I understand the change. On
> > > > > CommitFailedException, why can we just resume the thread? To me,
it
> > > > > seems that the thread will be in an invalid state and thus it's not
> > > save
> > > > > to just swallow the exception and keep going. Can you shed some
> > light?
> > > > >
> > > > > And from my understanding, the deadlock is "caused" by the change
> > from
> > > > > above, right? So if it is save to swallow the exception, we should
> do
> > > > > some "clean up" to avoid the deadlock in the first place, instead
> of
> > > > > applying and additional timeout.
> > > > >
> > > > > Also, if this is a bug, we should have a JIRA.
> > > > >
> > > > > -Matthias
> > > > >
> > > > >
> > > > > On 3/5/17 4:11 AM, Sachin Mittal wrote:
> > > > > > Hi,
> > > > > > Please find the new PR
> > > > > > https://github.com/apache/kafka/pull/2642/
> > > > > >
> > > > > > I see that in trunk there has been change which is different
from
> > in
> > > > > 10.2.0
> > > > > >
> > > > > > 10.2.0
> > > > > >        if (firstException.get() == null) {
> > > > > >             firstException.set(commitOffsets());
> > > > > >        }
> > > > > >  vs trunk
> > > > > >         if (firstException.get() == null) {
> > > > > >             // TODO: currently commit failures will not be thrown
> > to
> > > > > users
> > > > > >             // while suspending tasks; this need to be re-visit
> > after
> > > > > KIP-98
> > > > > >             commitOffsets();
> > > > > >         }
> > > > > > I am not sure in view of this is is my part of the fix still
> valid.
> > > > Looks
> > > > > > like it is still valid.
> > > > > >
> > > > > > Also on side note what is the policy of closing a branch that
is
> > just
> > > > > > released.
> > > > > >
> > > > > > Since you have release 10.2.0 we are using that and that is
why
> > have
> > > > made
> > > > > > changes in that branch so that our changes just modify the needed
> > > code
> > > > > and
> > > > > > we don't mess up the other released code.
> > > > > >
> > > > > > Is the new release released off the branch 10.2.0, if yes then
> you
> > > > should
> > > > > > not close it as there can be patch fixes on them.
> > > > > >
> > > > > > Or is the release always made off the branch trunk. In that
case
> > how
> > > > can
> > > > > we
> > > > > > pick up the code on which the release binaries were created
so
> when
> > > we
> > > > > > build the binary we have exactly same code as released one,
plus
> > any
> > > > > > changes (we or someone else) makes on it.
> > > > > >
> > > > > > Also if a branch is closed, then perhaps we should delete it
or
> > mark
> > > it
> > > > > > closed or something.
> > > > > >
> > > > > > Please let us know how releases get created (off what codebase),
> so
> > > we
> > > > > are
> > > > > > more exact in applying our changes to.
> > > > > >
> > > > > > Thanks
> > > > > > Sachin
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Sun, Mar 5, 2017 at 3:40 PM, Eno Thereska <
> > eno.thereska@gmail.com
> > > >
> > > > > wrote:
> > > > > >
> > > > > >> Thanks Sachin, one thing before the review, 0.10.2 is closed
> now,
> > > this
> > > > > >> needs to target trunk.
> > > > > >>
> > > > > >> Thanks
> > > > > >> Eno
> > > > > >>> On 5 Mar 2017, at 09:10, Sachin Mittal <sjmittal@gmail.com>
> > wrote:
> > > > > >>>
> > > > > >>> Please review the PR and let me know if this makes sense.
> > > > > >>>
> > > > > >>> https://github.com/apache/kafka/pull/2640
> > > > > >>>
> > > > > >>> Thanks
> > > > > >>> Sachin
> > > > > >>>
> > > > > >>>
> > > > > >>> On Sun, Mar 5, 2017 at 1:49 PM, Eno Thereska <
> > > eno.thereska@gmail.com
> > > > >
> > > > > >> wrote:
> > > > > >>>
> > > > > >>>> Thanks Sachin for your contribution. Could you create
a pull
> > > request
> > > > > out
> > > > > >>>> of the commit (so we can add comments, and also
so you are
> > > > > acknowledged
> > > > > >>>> properly for your contribution)?
> > > > > >>>>
> > > > > >>>> Thanks
> > > > > >>>> Eno
> > > > > >>>>> On 5 Mar 2017, at 07:34, Sachin Mittal <sjmittal@gmail.com>
> > > wrote:
> > > > > >>>>>
> > > > > >>>>> Hi,
> > > > > >>>>> So far in our experiment we have encountered
2 critical bugs.
> > > > > >>>>> 1. If a thread takes more that MAX_POLL_INTERVAL_MS_CONFIG
to
> > > > > compute a
> > > > > >>>>> cycle it gets evicted from group and rebalance
takes place
> and
> > it
> > > > > gets
> > > > > >>>> new
> > > > > >>>>> assignment.
> > > > > >>>>> However when this thread tries to commit offsets
for the
> > revoked
> > > > > >>>> partitions
> > > > > >>>>> in
> > > > > >>>>> onPartitionsRevoked it will again throw the
> > > CommitFailedException.
> > > > > >>>>>
> > > > > >>>>> This gets handled by ConsumerCoordinatorso there
is no point
> to
> > > > > assign
> > > > > >>>> this
> > > > > >>>>> exception to
> > > > > >>>>> rebalanceException in StreamThread and stop
it. It has
> already
> > > been
> > > > > >>>>> assigned new partitions and it can continue.
> > > > > >>>>>
> > > > > >>>>> So as fix in case on CommitFailedException I
am not killing
> the
> > > > > >>>> StreamThrea.
> > > > > >>>>>
> > > > > >>>>> 2. Next we see a deadlock state when to process
a task it
> takes
> > > > > longer
> > > > > >>>>> than MAX_POLL_INTERVAL_MS_CONFIG
> > > > > >>>>> time. Then this threads partitions are assigned
to some other
> > > > thread
> > > > > >>>>> including rocksdb lock. When it tries to process
the next
> task
> > it
> > > > > >> cannot
> > > > > >>>>> get rocks db lock and simply keeps waiting for
that lock
> > forever.
> > > > > >>>>>
> > > > > >>>>> in retryWithBackoff for AbstractTaskCreator
we have a
> > > > backoffTimeMs =
> > > > > >>>> 50L.
> > > > > >>>>> If it does not get lock the we simply increase
the time by
> 10x
> > > and
> > > > > keep
> > > > > >>>>> trying inside the while true loop.
> > > > > >>>>>
> > > > > >>>>> We need to have a upper bound for this backoffTimeM.
If the
> > time
> > > is
> > > > > >>>> greater
> > > > > >>>>> than  MAX_POLL_INTERVAL_MS_CONFIG and it still
hasn't got the
> > > lock
> > > > > >> means
> > > > > >>>>> this thread's partitions are moved somewhere
else and it may
> > not
> > > > get
> > > > > >> the
> > > > > >>>>> lock again.
> > > > > >>>>>
> > > > > >>>>> So I have added an upper bound check in that
while loop.
> > > > > >>>>>
> > > > > >>>>> The commits are here:
> > > > > >>>>> https://github.com/sjmittal/kafka/commit/
> > > > > >> 6f04327c890c58cab9b1ae108af4ce
> > > > > >>>> 5c4e3b89a1
> > > > > >>>>>
> > > > > >>>>> please review and if you feel they make sense,
please merge
> it
> > to
> > > > > main
> > > > > >>>>> branch.
> > > > > >>>>>
> > > > > >>>>> Thanks
> > > > > >>>>> Sachin
> > > > > >>>>
> > > > > >>>>
> > > > > >>
> > > > > >>
> > > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message