spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <t...@databricks.com>
Subject Re: How to recover in case user errors in streaming
Date Mon, 06 Jul 2015 21:14:33 GMT
1. onBatchError is not a bad idea.
2. It works for all the Kafka Direct API and files as well. They are have
batches. However you will not get the number of records for the file
stream.
3. Mind giving an example of the exception you would like to see caught?

TD

On Wed, Jul 1, 2015 at 10:35 AM, Amit Assudani <aassudani@impetus.com>
wrote:

>  Hi TD,
>
>  Why don’t we have OnBatchError or similar method in StreamingListener ?
>
>  Also, is StreamingListener only for receiver based approach or does it
> work for Kafka Direct API / File Based Streaming as well ?
>
>  Regards,
> Amit
>
>   From: Tathagata Das <tdas@databricks.com>
> Date: Monday, June 29, 2015 at 5:24 PM
>
> To: amit assudani <aassudani@impetus.com>
> Cc: Cody Koeninger <cody@koeninger.org>, "user@spark.apache.org" <
> user@spark.apache.org>
> Subject: Re: How to recover in case user errors in streaming
>
>   I recommend writing using dstream.foreachRDD, and then
> rdd.saveAsNewAPIHadoopFile inside try catch. See the implementation of
> dstream.saveAsNewAPIHadoopFiles
>
>
> https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L716
>
> On Mon, Jun 29, 2015 at 8:44 AM, Amit Assudani <aassudani@impetus.com>
> wrote:
>
>>  Also, how do you suggest catching exceptions while using with connector
>> API like, saveAsNewAPIHadoopFiles ?
>>
>>   From: amit assudani <aassudani@impetus.com>
>> Date: Monday, June 29, 2015 at 9:55 AM
>> To: Tathagata Das <tdas@databricks.com>
>>
>> Cc: Cody Koeninger <cody@koeninger.org>, "user@spark.apache.org" <
>> user@spark.apache.org>
>> Subject: Re: How to recover in case user errors in streaming
>>
>>   Thanks TD, this helps.
>>
>>  Looking forward to some fix where framework handles the batch failures
>> by some callback methods. This will help not having to write try/catch in
>> every transformation / action.
>>
>>  Regards,
>> Amit
>>
>>   From: Tathagata Das <tdas@databricks.com>
>> Date: Saturday, June 27, 2015 at 5:14 AM
>> To: amit assudani <aassudani@impetus.com>
>> Cc: Cody Koeninger <cody@koeninger.org>, "user@spark.apache.org" <
>> user@spark.apache.org>
>> Subject: Re: How to recover in case user errors in streaming
>>
>>   I looked at the code and found that batch exceptions are indeed
>> ignored. This is something that is worth fixing, that batch exceptions
>> should not be silently ignored.
>>
>>  Also, you can catch failed batch jobs (irrespective of the number of
>> retries) by catch the exception in foreachRDD. Here is an example.
>>
>>  dstream.foreachRDD { rdd =>
>>
>>     try {
>>
>>    } catch {
>>
>>     }
>> }
>>
>>
>>  This will catch failures at the granularity of the job, after all the
>> max retries of a task has been done. But it will be hard to filter and find
>> the push the failed record(s) somewhere. To do that, I would do use
>> rdd.foreach or rdd.foreachPartition, inside which I would catch the
>> exception and push that record out to another Kafka topic, and continue
>> normal processing of other records. This would prevent the task process the
>> partition from failing (as you are catching the bad records).
>>
>>  dstream.foreachRDD {  rdd =>
>>
>>      rdd.foreachPartition { iterator =>
>>
>>          // Create Kafka producer for bad records
>>
>>          iterator.foreach { record =>
>>              try {
>>                  // process record
>>              } catch {
>>                 case ExpectedException =>
>>                     // publish bad record to error topic in Kafka using
>> above producer
>>              }
>>         }
>>     }
>> }
>>
>>
>>  TD
>>
>>  PS: Apologies for the Scala examples, hope you get the idea :)
>>
>> On Fri, Jun 26, 2015 at 9:56 AM, Amit Assudani <aassudani@impetus.com>
>> wrote:
>>
>>>  Also, I get TaskContext.get() null when used in foreach function below
>>> ( I get it when I use it in map, but the whole point here is to handle
>>> something that is breaking in action ). Please help. :(
>>>
>>>   From: amit assudani <aassudani@impetus.com>
>>> Date: Friday, June 26, 2015 at 11:41 AM
>>>
>>> To: Cody Koeninger <cody@koeninger.org>
>>> Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das <
>>> tdas@databricks.com>
>>> Subject: Re: How to recover in case user errors in streaming
>>>
>>>   Hmm, not sure why, but when I run this code, it always keeps on
>>> consuming from Kafka and proceeds ignoring the previous failed batches,
>>>
>>>  Also, Now that I get the attempt number from TaskContext and I have
>>> information of max retries, I am supposed to handle it in the try/catch
>>> block, but does it mean I’ve to handle these kind of exceptions / errors in
>>> every transformation step ( map, reduce, transform, etc. ), isn’t there any
>>> callback where it says it has been retried max number of times and before
>>> being ignored you’ve a handle to do whatever you want to do with the batch
>>> / message in hand.
>>>
>>>  Regards,
>>> Amit
>>>
>>>   From: Cody Koeninger <cody@koeninger.org>
>>> Date: Friday, June 26, 2015 at 11:32 AM
>>> To: amit assudani <aassudani@impetus.com>
>>> Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das <
>>> tdas@databricks.com>
>>> Subject: Re: How to recover in case user errors in streaming
>>>
>>>   No, if you have a bad message that you are continually throwing
>>> exceptions on, your stream will not progress to future batches.
>>>
>>> On Fri, Jun 26, 2015 at 10:28 AM, Amit Assudani <aassudani@impetus.com>
>>> wrote:
>>>
>>>>  Also, what I understand is, max failures doesn’t stop the entire
>>>> stream, it fails the job created for the specific batch, but the subsequent
>>>> batches still proceed, isn’t it right ? And question still remains, how
to
>>>> keep track of those failed batches ?
>>>>
>>>>   From: amit assudani <aassudani@impetus.com>
>>>> Date: Friday, June 26, 2015 at 11:21 AM
>>>> To: Cody Koeninger <cody@koeninger.org>
>>>>
>>>> Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das
<
>>>> tdas@databricks.com>
>>>> Subject: Re: How to recover in case user errors in streaming
>>>>
>>>>   Thanks for quick response,
>>>>
>>>>  My question here is how do I know that the max retries are done (
>>>> because in my code I never know whether it is failure of first try or the
>>>> last try ) and I need to handle this message, is there any callback ?
>>>>
>>>>  Also, I know the limitation of checkpoint in upgrading the code, but
>>>> my main focus here to mitigate the connectivity issues to persistent store
>>>> which gets resolved in a while, but how do I know which all messages failed
>>>> and need rework ?
>>>>
>>>>  Regards,
>>>> Amit
>>>>
>>>>   From: Cody Koeninger <cody@koeninger.org>
>>>> Date: Friday, June 26, 2015 at 11:16 AM
>>>> To: amit assudani <aassudani@impetus.com>
>>>> Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das
<
>>>> tdas@databricks.com>
>>>> Subject: Re: How to recover in case user errors in streaming
>>>>
>>>>   If you're consistently throwing exceptions and thus failing tasks,
>>>> once you reach max failures the whole stream will stop.
>>>>
>>>>  It's up to you to either catch those exceptions, or restart your
>>>> stream appropriately once it stops.
>>>>
>>>>  Keep in mind that if you're relying on checkpoints, and fixing the
>>>> error requires changing your code, you may not be able to recover the
>>>> checkpoint.
>>>>
>>>> On Fri, Jun 26, 2015 at 9:05 AM, Amit Assudani <aassudani@impetus.com>
>>>> wrote:
>>>>
>>>>>   *Problem: *how do we recover from user errors (connectivity issues
>>>>> / storage service down / etc.)?
>>>>>
>>>>> *Environment:* Spark streaming using Kafka Direct Streams
>>>>>
>>>>> *Code Snippet: *
>>>>>
>>>>>
>>>>>
>>>>> HashSet<String> topicsSet = *new* HashSet<String>(Arrays.*asList*(
>>>>> "kafkaTopic1"));
>>>>>
>>>>> HashMap<String, String> kafkaParams = *new* HashMap<String,
String>();
>>>>>
>>>>> kafkaParams.put("metadata.broker.list", "localhost:9092");
>>>>>
>>>>> kafkaParams.put("auto.offset.reset", "smallest");
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> JavaPairInputDStream<String, String> messages = KafkaUtils
>>>>>
>>>>> .*createDirectStream*(jssc, String.*class*, String.*class*,
>>>>> StringDecoder.*class*, StringDecoder.*class*, kafkaParams, topicsSet);
>>>>>
>>>>>
>>>>>
>>>>> JavaDStream<String> inputStream = messages
>>>>>
>>>>>        .map(*new**Function<Tuple2<String, String>, String>()*
{
>>>>>
>>>>>        @Override
>>>>>
>>>>>        *public* String call(Tuple2<String, String> tuple2) {
>>>>>
>>>>>               *return*tuple2._2();
>>>>>
>>>>>        }});
>>>>>
>>>>>
>>>>>
>>>>> inputStream.foreachRDD(*new**Function<JavaRDD<String>, Void>()*
{
>>>>>
>>>>>
>>>>>
>>>>>        @Override
>>>>>
>>>>>        *public* Void call(JavaRDD<String> rdd)*throws* Exception
{
>>>>>
>>>>>               *if*(!rdd.isEmpty())
>>>>>
>>>>>               {
>>>>>
>>>>> rdd.foreach(*new**VoidFunction<String>()*{
>>>>>
>>>>> @Override
>>>>>
>>>>>                       *public**void* call(String arg0)*throws*
>>>>> Exception {
>>>>>
>>>>> System.*out*.println("------------------------rdd----------"+arg0);
>>>>>
>>>>> Thread.*sleep*(1000);
>>>>>
>>>>>
>>>>>
>>>>> *throw**new* Exception(" :::::::::::::::user and/or service
>>>>> exception::::::::::::::"+arg0);
>>>>>
>>>>>
>>>>>
>>>>>                       }});
>>>>>
>>>>>
>>>>>
>>>>>               }
>>>>>
>>>>>               *return**null*;
>>>>>
>>>>>        }
>>>>>
>>>>> });
>>>>>
>>>>>
>>>>>
>>>>> *Detailed Description*: Using spark streaming I read the text
>>>>> messages from kafka using direct API. For sake of simplicity, all I do
in
>>>>> processing is printing each message on console and sleep of 1 sec. as
a
>>>>> placeholder for actual processing. Assuming we get a user error may be
due
>>>>> to bad record, format error or the service connectivity issues or let’s
say
>>>>> the persistent store downtime. I’ve represented that with throwing
an
>>>>> Exception from foreach block. I understand spark retries this configurable
>>>>> number of times and  proceeds ahead. The question is what happens to
those
>>>>> failed messages, does ( if yes when ) spark re-tries those ? If not,
does
>>>>> it have any callback method so as user can log / dump it in error queue
and
>>>>> provision it for further analysis and / or retrials manually. Also, fyi,
>>>>> checkpoints are enabled and above code is in create context method to
>>>>> recover from spark driver / worker failures.
>>>>>
>>>>> ------------------------------
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> NOTE: This message may contain information that is confidential,
>>>>> proprietary, privileged or otherwise protected by law. The message is
>>>>> intended solely for the named addressee. If received in error, please
>>>>> destroy and notify the sender. Any use of this email is prohibited when
>>>>> received in error. Impetus does not represent, warrant and/or guarantee,
>>>>> that the integrity of this communication has been maintained nor that
the
>>>>> communication is free of errors, virus, interception or interference.
>>>>>
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>
>>>>
>>>>
>>>> ------------------------------
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> NOTE: This message may contain information that is confidential,
>>>> proprietary, privileged or otherwise protected by law. The message is
>>>> intended solely for the named addressee. If received in error, please
>>>> destroy and notify the sender. Any use of this email is prohibited when
>>>> received in error. Impetus does not represent, warrant and/or guarantee,
>>>> that the integrity of this communication has been maintained nor that the
>>>> communication is free of errors, virus, interception or interference.
>>>>
>>>> ------------------------------
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> NOTE: This message may contain information that is confidential,
>>>> proprietary, privileged or otherwise protected by law. The message is
>>>> intended solely for the named addressee. If received in error, please
>>>> destroy and notify the sender. Any use of this email is prohibited when
>>>> received in error. Impetus does not represent, warrant and/or guarantee,
>>>> that the integrity of this communication has been maintained nor that the
>>>> communication is free of errors, virus, interception or interference.
>>>>
>>>
>>>
>>> ------------------------------
>>>
>>>
>>>
>>>
>>>
>>>
>>> NOTE: This message may contain information that is confidential,
>>> proprietary, privileged or otherwise protected by law. The message is
>>> intended solely for the named addressee. If received in error, please
>>> destroy and notify the sender. Any use of this email is prohibited when
>>> received in error. Impetus does not represent, warrant and/or guarantee,
>>> that the integrity of this communication has been maintained nor that the
>>> communication is free of errors, virus, interception or interference.
>>>
>>> ------------------------------
>>>
>>>
>>>
>>>
>>>
>>>
>>> NOTE: This message may contain information that is confidential,
>>> proprietary, privileged or otherwise protected by law. The message is
>>> intended solely for the named addressee. If received in error, please
>>> destroy and notify the sender. Any use of this email is prohibited when
>>> received in error. Impetus does not represent, warrant and/or guarantee,
>>> that the integrity of this communication has been maintained nor that the
>>> communication is free of errors, virus, interception or interference.
>>>
>>
>>
>> ------------------------------
>>
>>
>>
>>
>>
>>
>> NOTE: This message may contain information that is confidential,
>> proprietary, privileged or otherwise protected by law. The message is
>> intended solely for the named addressee. If received in error, please
>> destroy and notify the sender. Any use of this email is prohibited when
>> received in error. Impetus does not represent, warrant and/or guarantee,
>> that the integrity of this communication has been maintained nor that the
>> communication is free of errors, virus, interception or interference.
>>
>
>
> ------------------------------
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>

Mime
View raw message