spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <t...@databricks.com>
Subject Re: How to recover in case user errors in streaming
Date Mon, 29 Jun 2015 21:24:18 GMT
I recommend writing using dstream.foreachRDD, and then
rdd.saveAsNewAPIHadoopFile inside try catch. See the implementation of
dstream.saveAsNewAPIHadoopFiles

https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala#L716

On Mon, Jun 29, 2015 at 8:44 AM, Amit Assudani <aassudani@impetus.com>
wrote:

>  Also, how do you suggest catching exceptions while using with connector
> API like, saveAsNewAPIHadoopFiles ?
>
>   From: amit assudani <aassudani@impetus.com>
> Date: Monday, June 29, 2015 at 9:55 AM
> To: Tathagata Das <tdas@databricks.com>
>
> Cc: Cody Koeninger <cody@koeninger.org>, "user@spark.apache.org" <
> user@spark.apache.org>
> Subject: Re: How to recover in case user errors in streaming
>
>   Thanks TD, this helps.
>
>  Looking forward to some fix where framework handles the batch failures
> by some callback methods. This will help not having to write try/catch in
> every transformation / action.
>
>  Regards,
> Amit
>
>   From: Tathagata Das <tdas@databricks.com>
> Date: Saturday, June 27, 2015 at 5:14 AM
> To: amit assudani <aassudani@impetus.com>
> Cc: Cody Koeninger <cody@koeninger.org>, "user@spark.apache.org" <
> user@spark.apache.org>
> Subject: Re: How to recover in case user errors in streaming
>
>   I looked at the code and found that batch exceptions are indeed
> ignored. This is something that is worth fixing, that batch exceptions
> should not be silently ignored.
>
>  Also, you can catch failed batch jobs (irrespective of the number of
> retries) by catch the exception in foreachRDD. Here is an example.
>
>  dstream.foreachRDD { rdd =>
>
>     try {
>
>    } catch {
>
>     }
> }
>
>
>  This will catch failures at the granularity of the job, after all the
> max retries of a task has been done. But it will be hard to filter and find
> the push the failed record(s) somewhere. To do that, I would do use
> rdd.foreach or rdd.foreachPartition, inside which I would catch the
> exception and push that record out to another Kafka topic, and continue
> normal processing of other records. This would prevent the task process the
> partition from failing (as you are catching the bad records).
>
>  dstream.foreachRDD {  rdd =>
>
>      rdd.foreachPartition { iterator =>
>
>          // Create Kafka producer for bad records
>
>          iterator.foreach { record =>
>              try {
>                  // process record
>              } catch {
>                 case ExpectedException =>
>                     // publish bad record to error topic in Kafka using
> above producer
>              }
>         }
>     }
> }
>
>
>  TD
>
>  PS: Apologies for the Scala examples, hope you get the idea :)
>
> On Fri, Jun 26, 2015 at 9:56 AM, Amit Assudani <aassudani@impetus.com>
> wrote:
>
>>  Also, I get TaskContext.get() null when used in foreach function below
>> ( I get it when I use it in map, but the whole point here is to handle
>> something that is breaking in action ). Please help. :(
>>
>>   From: amit assudani <aassudani@impetus.com>
>> Date: Friday, June 26, 2015 at 11:41 AM
>>
>> To: Cody Koeninger <cody@koeninger.org>
>> Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das <
>> tdas@databricks.com>
>> Subject: Re: How to recover in case user errors in streaming
>>
>>   Hmm, not sure why, but when I run this code, it always keeps on
>> consuming from Kafka and proceeds ignoring the previous failed batches,
>>
>>  Also, Now that I get the attempt number from TaskContext and I have
>> information of max retries, I am supposed to handle it in the try/catch
>> block, but does it mean I’ve to handle these kind of exceptions / errors in
>> every transformation step ( map, reduce, transform, etc. ), isn’t there any
>> callback where it says it has been retried max number of times and before
>> being ignored you’ve a handle to do whatever you want to do with the batch
>> / message in hand.
>>
>>  Regards,
>> Amit
>>
>>   From: Cody Koeninger <cody@koeninger.org>
>> Date: Friday, June 26, 2015 at 11:32 AM
>> To: amit assudani <aassudani@impetus.com>
>> Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das <
>> tdas@databricks.com>
>> Subject: Re: How to recover in case user errors in streaming
>>
>>   No, if you have a bad message that you are continually throwing
>> exceptions on, your stream will not progress to future batches.
>>
>> On Fri, Jun 26, 2015 at 10:28 AM, Amit Assudani <aassudani@impetus.com>
>> wrote:
>>
>>>  Also, what I understand is, max failures doesn’t stop the entire
>>> stream, it fails the job created for the specific batch, but the subsequent
>>> batches still proceed, isn’t it right ? And question still remains, how to
>>> keep track of those failed batches ?
>>>
>>>   From: amit assudani <aassudani@impetus.com>
>>> Date: Friday, June 26, 2015 at 11:21 AM
>>> To: Cody Koeninger <cody@koeninger.org>
>>>
>>> Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das <
>>> tdas@databricks.com>
>>> Subject: Re: How to recover in case user errors in streaming
>>>
>>>   Thanks for quick response,
>>>
>>>  My question here is how do I know that the max retries are done (
>>> because in my code I never know whether it is failure of first try or the
>>> last try ) and I need to handle this message, is there any callback ?
>>>
>>>  Also, I know the limitation of checkpoint in upgrading the code, but
>>> my main focus here to mitigate the connectivity issues to persistent store
>>> which gets resolved in a while, but how do I know which all messages failed
>>> and need rework ?
>>>
>>>  Regards,
>>> Amit
>>>
>>>   From: Cody Koeninger <cody@koeninger.org>
>>> Date: Friday, June 26, 2015 at 11:16 AM
>>> To: amit assudani <aassudani@impetus.com>
>>> Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das <
>>> tdas@databricks.com>
>>> Subject: Re: How to recover in case user errors in streaming
>>>
>>>   If you're consistently throwing exceptions and thus failing tasks,
>>> once you reach max failures the whole stream will stop.
>>>
>>>  It's up to you to either catch those exceptions, or restart your
>>> stream appropriately once it stops.
>>>
>>>  Keep in mind that if you're relying on checkpoints, and fixing the
>>> error requires changing your code, you may not be able to recover the
>>> checkpoint.
>>>
>>> On Fri, Jun 26, 2015 at 9:05 AM, Amit Assudani <aassudani@impetus.com>
>>> wrote:
>>>
>>>>   *Problem: *how do we recover from user errors (connectivity issues /
>>>> storage service down / etc.)?
>>>>
>>>> *Environment:* Spark streaming using Kafka Direct Streams
>>>>
>>>> *Code Snippet: *
>>>>
>>>>
>>>>
>>>> HashSet<String> topicsSet = *new* HashSet<String>(Arrays.*asList*(
>>>> "kafkaTopic1"));
>>>>
>>>> HashMap<String, String> kafkaParams = *new* HashMap<String, String>();
>>>>
>>>> kafkaParams.put("metadata.broker.list", "localhost:9092");
>>>>
>>>> kafkaParams.put("auto.offset.reset", "smallest");
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> JavaPairInputDStream<String, String> messages = KafkaUtils
>>>>
>>>> .*createDirectStream*(jssc, String.*class*, String.*class*,
>>>> StringDecoder.*class*, StringDecoder.*class*, kafkaParams, topicsSet);
>>>>
>>>>
>>>>
>>>> JavaDStream<String> inputStream = messages
>>>>
>>>>        .map(*new**Function<Tuple2<String, String>, String>()*
{
>>>>
>>>>        @Override
>>>>
>>>>        *public* String call(Tuple2<String, String> tuple2) {
>>>>
>>>>               *return*tuple2._2();
>>>>
>>>>        }});
>>>>
>>>>
>>>>
>>>> inputStream.foreachRDD(*new**Function<JavaRDD<String>, Void>()*
{
>>>>
>>>>
>>>>
>>>>        @Override
>>>>
>>>>        *public* Void call(JavaRDD<String> rdd)*throws* Exception {
>>>>
>>>>               *if*(!rdd.isEmpty())
>>>>
>>>>               {
>>>>
>>>> rdd.foreach(*new**VoidFunction<String>()*{
>>>>
>>>> @Override
>>>>
>>>>                       *public**void* call(String arg0)*throws*
>>>> Exception {
>>>>
>>>> System.*out*.println("------------------------rdd----------"+arg0);
>>>>
>>>> Thread.*sleep*(1000);
>>>>
>>>>
>>>>
>>>> *throw**new* Exception(" :::::::::::::::user and/or service
>>>> exception::::::::::::::"+arg0);
>>>>
>>>>
>>>>
>>>>                       }});
>>>>
>>>>
>>>>
>>>>               }
>>>>
>>>>               *return**null*;
>>>>
>>>>        }
>>>>
>>>> });
>>>>
>>>>
>>>>
>>>> *Detailed Description*: Using spark streaming I read the text messages
>>>> from kafka using direct API. For sake of simplicity, all I do in processing
>>>> is printing each message on console and sleep of 1 sec. as a placeholder
>>>> for actual processing. Assuming we get a user error may be due to bad
>>>> record, format error or the service connectivity issues or let’s say the
>>>> persistent store downtime. I’ve represented that with throwing an Exception
>>>> from foreach block. I understand spark retries this configurable number of
>>>> times and  proceeds ahead. The question is what happens to those failed
>>>> messages, does ( if yes when ) spark re-tries those ? If not, does it have
>>>> any callback method so as user can log / dump it in error queue and
>>>> provision it for further analysis and / or retrials manually. Also, fyi,
>>>> checkpoints are enabled and above code is in create context method to
>>>> recover from spark driver / worker failures.
>>>>
>>>> ------------------------------
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> NOTE: This message may contain information that is confidential,
>>>> proprietary, privileged or otherwise protected by law. The message is
>>>> intended solely for the named addressee. If received in error, please
>>>> destroy and notify the sender. Any use of this email is prohibited when
>>>> received in error. Impetus does not represent, warrant and/or guarantee,
>>>> that the integrity of this communication has been maintained nor that the
>>>> communication is free of errors, virus, interception or interference.
>>>>
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>
>>>
>>>
>>> ------------------------------
>>>
>>>
>>>
>>>
>>>
>>>
>>> NOTE: This message may contain information that is confidential,
>>> proprietary, privileged or otherwise protected by law. The message is
>>> intended solely for the named addressee. If received in error, please
>>> destroy and notify the sender. Any use of this email is prohibited when
>>> received in error. Impetus does not represent, warrant and/or guarantee,
>>> that the integrity of this communication has been maintained nor that the
>>> communication is free of errors, virus, interception or interference.
>>>
>>> ------------------------------
>>>
>>>
>>>
>>>
>>>
>>>
>>> NOTE: This message may contain information that is confidential,
>>> proprietary, privileged or otherwise protected by law. The message is
>>> intended solely for the named addressee. If received in error, please
>>> destroy and notify the sender. Any use of this email is prohibited when
>>> received in error. Impetus does not represent, warrant and/or guarantee,
>>> that the integrity of this communication has been maintained nor that the
>>> communication is free of errors, virus, interception or interference.
>>>
>>
>>
>> ------------------------------
>>
>>
>>
>>
>>
>>
>> NOTE: This message may contain information that is confidential,
>> proprietary, privileged or otherwise protected by law. The message is
>> intended solely for the named addressee. If received in error, please
>> destroy and notify the sender. Any use of this email is prohibited when
>> received in error. Impetus does not represent, warrant and/or guarantee,
>> that the integrity of this communication has been maintained nor that the
>> communication is free of errors, virus, interception or interference.
>>
>> ------------------------------
>>
>>
>>
>>
>>
>>
>> NOTE: This message may contain information that is confidential,
>> proprietary, privileged or otherwise protected by law. The message is
>> intended solely for the named addressee. If received in error, please
>> destroy and notify the sender. Any use of this email is prohibited when
>> received in error. Impetus does not represent, warrant and/or guarantee,
>> that the integrity of this communication has been maintained nor that the
>> communication is free of errors, virus, interception or interference.
>>
>
>
> ------------------------------
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>

Mime
View raw message