spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Dibyendu Bhattacharya <dibyendu.bhattach...@gmail.com>
Subject Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka
Date Thu, 03 Dec 2015 05:38:02 GMT
There are other ways to deal with the problem than shutdown the streaming
job. You can monitor the lag in your consumer to see if consumer if falling
behind . If lag is too high that offsetOutOfRange can happen, you either
increase retention period or increase consumer rate..or do both ..

What I am trying to say, streaming job should not fail in any cases ..

Dibyendu

On Thu, Dec 3, 2015 at 9:40 AM, Cody Koeninger <cody@koeninger.org> wrote:

> I believe that what differentiates reliable systems is individual
> components should fail fast when their preconditions aren't met, and other
> components should be responsible for monitoring them.
>
> If a user of the direct stream thinks that your approach of restarting and
> ignoring data loss is the right thing to do, they can monitor the job
> (which they should be doing in any case) and restart.
>
> If a user of your library thinks that my approach of failing (so they KNOW
> there was data loss and can adjust their system) is the right thing to do,
> how do they do that?
>
> On Wed, Dec 2, 2015 at 9:49 PM, Dibyendu Bhattacharya <
> dibyendu.bhattachary@gmail.com> wrote:
>
>> Well, even if you do correct retention and increase speed,
>> OffsetOutOfRange can still come depends on how your downstream processing
>> is. And if that happen , there is No Other way to recover old messages . So
>> best bet here from Streaming Job point of view  is to start from earliest
>> offset rather bring down the streaming job . In many cases goal for a
>> streaming job is not to shut down and exit in case of any failure. I
>> believe that is what differentiate a always running streaming job.
>>
>> Dibyendu
>>
>> On Thu, Dec 3, 2015 at 8:26 AM, Cody Koeninger <cody@koeninger.org>
>> wrote:
>>
>>> No, silently restarting from the earliest offset in the case of offset
>>> out of range exceptions during a streaming job is not the "correct way of
>>> recovery".
>>>
>>> If you do that, your users will be losing data without knowing why.
>>> It's more like  a "way of ignoring the problem without actually addressing
>>> it".
>>>
>>> The only really correct way to deal with that situation is to recognize
>>> why it's happening, and either increase your Kafka retention or increase
>>> the speed at which you are consuming.
>>>
>>> On Wed, Dec 2, 2015 at 7:13 PM, Dibyendu Bhattacharya <
>>> dibyendu.bhattachary@gmail.com> wrote:
>>>
>>>> This consumer which I mentioned does not silently throw away data. If
>>>> offset out of range it start for earliest offset and that is correct way
of
>>>> recovery from this error.
>>>>
>>>> Dibyendu
>>>> On Dec 2, 2015 9:56 PM, "Cody Koeninger" <cody@koeninger.org> wrote:
>>>>
>>>>> Again, just to be clear, silently throwing away data because your
>>>>> system isn't working right is not the same as "recover from any Kafka
>>>>> leader changes and offset out of ranges issue".
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya <
>>>>> dibyendu.bhattachary@gmail.com> wrote:
>>>>>
>>>>>> Hi, if you use Receiver based consumer which is available in
>>>>>> spark-packages (
>>>>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer)
,
>>>>>> this has all built in failure recovery and it can recover from any
Kafka
>>>>>> leader changes and offset out of ranges issue.
>>>>>>
>>>>>> Here is the package form github :
>>>>>> https://github.com/dibbhatt/kafka-spark-consumer
>>>>>>
>>>>>>
>>>>>> Dibyendu
>>>>>>
>>>>>> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy <
>>>>>> swethakasireddy@gmail.com> wrote:
>>>>>>
>>>>>>> How to avoid those Errors with receiver based approach? Suppose
we
>>>>>>> are OK with at least once processing and use receiver based approach
which
>>>>>>> uses ZooKeeper but not query Kafka directly, would these errors(Couldn't
>>>>>>> find leader offsets for
>>>>>>> Set([test_stream,5])))    be avoided?
>>>>>>>
>>>>>>> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger <cody@koeninger.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> KafkaRDD.scala , handleFetchErr
>>>>>>>>
>>>>>>>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
>>>>>>>> swethakasireddy@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Cody,
>>>>>>>>>
>>>>>>>>> How to look at Option 2(see the following)? Which portion
of the
>>>>>>>>> code in Spark Kafka Direct to look at to handle this
issue specific to our
>>>>>>>>> requirements.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2.Catch that exception and somehow force things to "reset"
for that
>>>>>>>>> partition And how would it handle the offsets already
calculated
>>>>>>>>> in the
>>>>>>>>> backlog (if there is one)?
>>>>>>>>>
>>>>>>>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <cody@koeninger.org
>>>>>>>>> > wrote:
>>>>>>>>>
>>>>>>>>>> If you're consistently getting offset out of range
exceptions,
>>>>>>>>>> it's probably because messages are getting deleted
before you've processed
>>>>>>>>>> them.
>>>>>>>>>>
>>>>>>>>>> The only real way to deal with this is give kafka
more retention,
>>>>>>>>>> consume faster, or both.
>>>>>>>>>>
>>>>>>>>>> If you're just looking for a quick "fix" for an infrequent
issue,
>>>>>>>>>> option 4 is probably easiest.  I wouldn't do that
automatically / silently,
>>>>>>>>>> because you're losing data.
>>>>>>>>>>
>>>>>>>>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasireddy@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> So, our Streaming Job fails with the following
errors. If you
>>>>>>>>>>> see the errors
>>>>>>>>>>> below, they are all related to Kafka losing offsets
and
>>>>>>>>>>> OffsetOutOfRangeException.
>>>>>>>>>>>
>>>>>>>>>>> What are the options we have other than fixing
Kafka? We would
>>>>>>>>>>> like to do
>>>>>>>>>>> something like the following. How can we achieve
1 and 2 with
>>>>>>>>>>> Spark Kafka
>>>>>>>>>>> Direct?
>>>>>>>>>>>
>>>>>>>>>>> 1.Need to see a way to skip some offsets if they
are not
>>>>>>>>>>> available after the
>>>>>>>>>>> max retries are reached..in that case there might
be data loss.
>>>>>>>>>>>
>>>>>>>>>>> 2.Catch that exception and somehow force things
to "reset" for
>>>>>>>>>>> that
>>>>>>>>>>> partition And how would it handle the offsets
already calculated
>>>>>>>>>>> in the
>>>>>>>>>>> backlog (if there is one)?
>>>>>>>>>>>
>>>>>>>>>>> 3.Track the offsets separately, restart the job
by providing the
>>>>>>>>>>> offsets.
>>>>>>>>>>>
>>>>>>>>>>> 4.Or a straightforward approach would be to monitor
the log for
>>>>>>>>>>> this error,
>>>>>>>>>>> and if it occurs more than X times, kill the
job, remove the
>>>>>>>>>>> checkpoint
>>>>>>>>>>> directory, and restart.
>>>>>>>>>>>
>>>>>>>>>>> ERROR DirectKafkaInputDStream:
>>>>>>>>>>> ArrayBuffer(kafka.common.UnknownException,
>>>>>>>>>>> org.apache.spark.SparkException: Couldn't find
leader offsets for
>>>>>>>>>>> Set([test_stream,5]))
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>>> kafka.common.NotLeaderForPartitionException
>>>>>>>>>>>
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> java.util.concurrent.RejectedExecutionException:
Task
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>>>>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>>>>>>>>> [Terminated,
>>>>>>>>>>> pool size = 0, active threads = 0, queued tasks
= 0, completed
>>>>>>>>>>> tasks =
>>>>>>>>>>> 12112]
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.SparkException: Job aborted
due to stage
>>>>>>>>>>> failure: Task 10
>>>>>>>>>>> in stage 52.0 failed 4 times, most recent failure:
Lost task
>>>>>>>>>>> 10.3 in stage
>>>>>>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>>>>>>>>
>>>>>>>>>>> Exception in thread "streaming-job-executor-0"
java.lang.Error:
>>>>>>>>>>> java.lang.InterruptedException
>>>>>>>>>>>
>>>>>>>>>>> Caused by: java.lang.InterruptedException
>>>>>>>>>>>
>>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>>>
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.SparkException: Job aborted
due to stage
>>>>>>>>>>> failure: Task 7 in
>>>>>>>>>>> stage 33.0 failed 4 times, most recent failure:
Lost task 7.3 in
>>>>>>>>>>> stage 33.0
>>>>>>>>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>>>>>>>>
>>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>>>
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>>
>>>>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>>>>
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> View this message in context:
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>>>>>>>>> Sent from the Apache Spark User List mailing
list archive at
>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>
>>
>

Mime
View raw message