spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: Recovery for Spark Streaming Kafka Direct in case of issues with Kafka
Date Thu, 03 Dec 2015 02:56:35 GMT
No, silently restarting from the earliest offset in the case of offset out
of range exceptions during a streaming job is not the "correct way of
recovery".

If you do that, your users will be losing data without knowing why.  It's
more like  a "way of ignoring the problem without actually addressing it".

The only really correct way to deal with that situation is to recognize why
it's happening, and either increase your Kafka retention or increase the
speed at which you are consuming.

On Wed, Dec 2, 2015 at 7:13 PM, Dibyendu Bhattacharya <
dibyendu.bhattachary@gmail.com> wrote:

> This consumer which I mentioned does not silently throw away data. If
> offset out of range it start for earliest offset and that is correct way of
> recovery from this error.
>
> Dibyendu
> On Dec 2, 2015 9:56 PM, "Cody Koeninger" <cody@koeninger.org> wrote:
>
>> Again, just to be clear, silently throwing away data because your system
>> isn't working right is not the same as "recover from any Kafka leader
>> changes and offset out of ranges issue".
>>
>>
>>
>> On Tue, Dec 1, 2015 at 11:27 PM, Dibyendu Bhattacharya <
>> dibyendu.bhattachary@gmail.com> wrote:
>>
>>> Hi, if you use Receiver based consumer which is available in
>>> spark-packages (
>>> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer) , this
>>> has all built in failure recovery and it can recover from any Kafka leader
>>> changes and offset out of ranges issue.
>>>
>>> Here is the package form github :
>>> https://github.com/dibbhatt/kafka-spark-consumer
>>>
>>>
>>> Dibyendu
>>>
>>> On Wed, Dec 2, 2015 at 5:28 AM, swetha kasireddy <
>>> swethakasireddy@gmail.com> wrote:
>>>
>>>> How to avoid those Errors with receiver based approach? Suppose we are
>>>> OK with at least once processing and use receiver based approach which uses
>>>> ZooKeeper but not query Kafka directly, would these errors(Couldn't
>>>> find leader offsets for
>>>> Set([test_stream,5])))    be avoided?
>>>>
>>>> On Tue, Dec 1, 2015 at 3:40 PM, Cody Koeninger <cody@koeninger.org>
>>>> wrote:
>>>>
>>>>> KafkaRDD.scala , handleFetchErr
>>>>>
>>>>> On Tue, Dec 1, 2015 at 3:39 PM, swetha kasireddy <
>>>>> swethakasireddy@gmail.com> wrote:
>>>>>
>>>>>> Hi Cody,
>>>>>>
>>>>>> How to look at Option 2(see the following)? Which portion of the
code
>>>>>> in Spark Kafka Direct to look at to handle this issue specific to
our
>>>>>> requirements.
>>>>>>
>>>>>>
>>>>>> 2.Catch that exception and somehow force things to "reset" for that
>>>>>> partition And how would it handle the offsets already calculated
in
>>>>>> the
>>>>>> backlog (if there is one)?
>>>>>>
>>>>>> On Tue, Dec 1, 2015 at 6:51 AM, Cody Koeninger <cody@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> If you're consistently getting offset out of range exceptions,
it's
>>>>>>> probably because messages are getting deleted before you've processed
them.
>>>>>>>
>>>>>>> The only real way to deal with this is give kafka more retention,
>>>>>>> consume faster, or both.
>>>>>>>
>>>>>>> If you're just looking for a quick "fix" for an infrequent issue,
>>>>>>> option 4 is probably easiest.  I wouldn't do that automatically
/ silently,
>>>>>>> because you're losing data.
>>>>>>>
>>>>>>> On Mon, Nov 30, 2015 at 6:22 PM, SRK <swethakasireddy@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> So, our Streaming Job fails with the following errors. If
you see
>>>>>>>> the errors
>>>>>>>> below, they are all related to Kafka losing offsets and
>>>>>>>> OffsetOutOfRangeException.
>>>>>>>>
>>>>>>>> What are the options we have other than fixing Kafka? We
would like
>>>>>>>> to do
>>>>>>>> something like the following. How can we achieve 1 and 2
with Spark
>>>>>>>> Kafka
>>>>>>>> Direct?
>>>>>>>>
>>>>>>>> 1.Need to see a way to skip some offsets if they are not
available
>>>>>>>> after the
>>>>>>>> max retries are reached..in that case there might be data
loss.
>>>>>>>>
>>>>>>>> 2.Catch that exception and somehow force things to "reset"
for that
>>>>>>>> partition And how would it handle the offsets already calculated
in
>>>>>>>> the
>>>>>>>> backlog (if there is one)?
>>>>>>>>
>>>>>>>> 3.Track the offsets separately, restart the job by providing
the
>>>>>>>> offsets.
>>>>>>>>
>>>>>>>> 4.Or a straightforward approach would be to monitor the log
for
>>>>>>>> this error,
>>>>>>>> and if it occurs more than X times, kill the job, remove
the
>>>>>>>> checkpoint
>>>>>>>> directory, and restart.
>>>>>>>>
>>>>>>>> ERROR DirectKafkaInputDStream:
>>>>>>>> ArrayBuffer(kafka.common.UnknownException,
>>>>>>>> org.apache.spark.SparkException: Couldn't find leader offsets
for
>>>>>>>> Set([test_stream,5]))
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>> kafka.common.NotLeaderForPartitionException
>>>>>>>>
>>>>>>>> at
>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> java.util.concurrent.RejectedExecutionException: Task
>>>>>>>>
>>>>>>>> org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler@a48c5a8
>>>>>>>> rejected from java.util.concurrent.ThreadPoolExecutor@543258e0
>>>>>>>> [Terminated,
>>>>>>>> pool size = 0, active threads = 0, queued tasks = 0, completed
>>>>>>>> tasks =
>>>>>>>> 12112]
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
failure:
>>>>>>>> Task 10
>>>>>>>> in stage 52.0 failed 4 times, most recent failure: Lost task
10.3
>>>>>>>> in stage
>>>>>>>> 52.0 (TID 255, 172.16.97.97): UnknownReason
>>>>>>>>
>>>>>>>> Exception in thread "streaming-job-executor-0" java.lang.Error:
>>>>>>>> java.lang.InterruptedException
>>>>>>>>
>>>>>>>> Caused by: java.lang.InterruptedException
>>>>>>>>
>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>
>>>>>>>> at
>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> org.apache.spark.SparkException: Job aborted due to stage
failure:
>>>>>>>> Task 7 in
>>>>>>>> stage 33.0 failed 4 times, most recent failure: Lost task
7.3 in
>>>>>>>> stage 33.0
>>>>>>>> (TID 283, 172.16.97.103): UnknownReason
>>>>>>>>
>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>
>>>>>>>> at
>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>
>>>>>>>> java.lang.ClassNotFoundException:
>>>>>>>> kafka.common.OffsetOutOfRangeException
>>>>>>>>
>>>>>>>> at
>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> View this message in context:
>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Recovery-for-Spark-Streaming-Kafka-Direct-in-case-of-issues-with-Kafka-tp25524.html
>>>>>>>> Sent from the Apache Spark User List mailing list archive
at
>>>>>>>> Nabble.com.
>>>>>>>>
>>>>>>>>
>>>>>>>> ---------------------------------------------------------------------
>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>

Mime
View raw message