spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: How to recover in case user errors in streaming
Date Fri, 26 Jun 2015 15:32:03 GMT
No, if you have a bad message that you are continually throwing exceptions
on, your stream will not progress to future batches.

On Fri, Jun 26, 2015 at 10:28 AM, Amit Assudani <aassudani@impetus.com>
wrote:

>  Also, what I understand is, max failures doesn’t stop the entire stream,
> it fails the job created for the specific batch, but the subsequent batches
> still proceed, isn’t it right ? And question still remains, how to keep
> track of those failed batches ?
>
>   From: amit assudani <aassudani@impetus.com>
> Date: Friday, June 26, 2015 at 11:21 AM
> To: Cody Koeninger <cody@koeninger.org>
>
> Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das <
> tdas@databricks.com>
> Subject: Re: How to recover in case user errors in streaming
>
>   Thanks for quick response,
>
>  My question here is how do I know that the max retries are done (
> because in my code I never know whether it is failure of first try or the
> last try ) and I need to handle this message, is there any callback ?
>
>  Also, I know the limitation of checkpoint in upgrading the code, but my
> main focus here to mitigate the connectivity issues to persistent store
> which gets resolved in a while, but how do I know which all messages failed
> and need rework ?
>
>  Regards,
> Amit
>
>   From: Cody Koeninger <cody@koeninger.org>
> Date: Friday, June 26, 2015 at 11:16 AM
> To: amit assudani <aassudani@impetus.com>
> Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das <
> tdas@databricks.com>
> Subject: Re: How to recover in case user errors in streaming
>
>   If you're consistently throwing exceptions and thus failing tasks, once
> you reach max failures the whole stream will stop.
>
>  It's up to you to either catch those exceptions, or restart your stream
> appropriately once it stops.
>
>  Keep in mind that if you're relying on checkpoints, and fixing the error
> requires changing your code, you may not be able to recover the checkpoint.
>
> On Fri, Jun 26, 2015 at 9:05 AM, Amit Assudani <aassudani@impetus.com>
> wrote:
>
>>   *Problem: *how do we recover from user errors (connectivity issues /
>> storage service down / etc.)?
>>
>> *Environment:* Spark streaming using Kafka Direct Streams
>>
>> *Code Snippet: *
>>
>>
>>
>> HashSet<String> topicsSet = *new* HashSet<String>(Arrays.*asList*(
>> "kafkaTopic1"));
>>
>> HashMap<String, String> kafkaParams = *new* HashMap<String, String>();
>>
>> kafkaParams.put("metadata.broker.list", "localhost:9092");
>>
>> kafkaParams.put("auto.offset.reset", "smallest");
>>
>>
>>
>>
>>
>> JavaPairInputDStream<String, String> messages = KafkaUtils
>>
>> .*createDirectStream*(jssc, String.*class*, String.*class*,
>> StringDecoder.*class*, StringDecoder.*class*, kafkaParams, topicsSet);
>>
>>
>>
>> JavaDStream<String> inputStream = messages
>>
>>        .map(*new**Function<Tuple2<String, String>, String>()* {
>>
>>        @Override
>>
>>        *public* String call(Tuple2<String, String> tuple2) {
>>
>>               *return*tuple2._2();
>>
>>        }});
>>
>>
>>
>> inputStream.foreachRDD(*new**Function<JavaRDD<String>, Void>()* {
>>
>>
>>
>>        @Override
>>
>>        *public* Void call(JavaRDD<String> rdd)*throws* Exception {
>>
>>               *if*(!rdd.isEmpty())
>>
>>               {
>>
>> rdd.foreach(*new**VoidFunction<String>()*{
>>
>> @Override
>>
>>                       *public**void* call(String arg0)*throws* Exception
>> {
>>
>> System.*out*.println("------------------------rdd----------"+arg0);
>>
>> Thread.*sleep*(1000);
>>
>>
>>
>> *throw**new* Exception(" :::::::::::::::user and/or service
>> exception::::::::::::::"+arg0);
>>
>>
>>
>>                       }});
>>
>>
>>
>>               }
>>
>>               *return**null*;
>>
>>        }
>>
>> });
>>
>>
>>
>> *Detailed Description*: Using spark streaming I read the text messages
>> from kafka using direct API. For sake of simplicity, all I do in processing
>> is printing each message on console and sleep of 1 sec. as a placeholder
>> for actual processing. Assuming we get a user error may be due to bad
>> record, format error or the service connectivity issues or let’s say the
>> persistent store downtime. I’ve represented that with throwing an Exception
>> from foreach block. I understand spark retries this configurable number of
>> times and  proceeds ahead. The question is what happens to those failed
>> messages, does ( if yes when ) spark re-tries those ? If not, does it have
>> any callback method so as user can log / dump it in error queue and
>> provision it for further analysis and / or retrials manually. Also, fyi,
>> checkpoints are enabled and above code is in create context method to
>> recover from spark driver / worker failures.
>>
>> ------------------------------
>>
>>
>>
>>
>>
>>
>> NOTE: This message may contain information that is confidential,
>> proprietary, privileged or otherwise protected by law. The message is
>> intended solely for the named addressee. If received in error, please
>> destroy and notify the sender. Any use of this email is prohibited when
>> received in error. Impetus does not represent, warrant and/or guarantee,
>> that the integrity of this communication has been maintained nor that the
>> communication is free of errors, virus, interception or interference.
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>
>
> ------------------------------
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>
> ------------------------------
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>

Mime
View raw message