spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: How to recover in case user errors in streaming
Date Fri, 26 Jun 2015 15:27:34 GMT
TaskContext has an attemptNumber method on it.

If you want to know which messages failed, you have access to the offsets,
and can do whatever you need to with them.

On Fri, Jun 26, 2015 at 10:21 AM, Amit Assudani <aassudani@impetus.com>
wrote:

>  Thanks for quick response,
>
>  My question here is how do I know that the max retries are done (
> because in my code I never know whether it is failure of first try or the
> last try ) and I need to handle this message, is there any callback ?
>
>  Also, I know the limitation of checkpoint in upgrading the code, but my
> main focus here to mitigate the connectivity issues to persistent store
> which gets resolved in a while, but how do I know which all messages failed
> and need rework ?
>
>  Regards,
> Amit
>
>   From: Cody Koeninger <cody@koeninger.org>
> Date: Friday, June 26, 2015 at 11:16 AM
> To: amit assudani <aassudani@impetus.com>
> Cc: "user@spark.apache.org" <user@spark.apache.org>, Tathagata Das <
> tdas@databricks.com>
> Subject: Re: How to recover in case user errors in streaming
>
>   If you're consistently throwing exceptions and thus failing tasks, once
> you reach max failures the whole stream will stop.
>
>  It's up to you to either catch those exceptions, or restart your stream
> appropriately once it stops.
>
>  Keep in mind that if you're relying on checkpoints, and fixing the error
> requires changing your code, you may not be able to recover the checkpoint.
>
> On Fri, Jun 26, 2015 at 9:05 AM, Amit Assudani <aassudani@impetus.com>
> wrote:
>
>>   *Problem: *how do we recover from user errors (connectivity issues /
>> storage service down / etc.)?
>>
>> *Environment:* Spark streaming using Kafka Direct Streams
>>
>> *Code Snippet: *
>>
>>
>>
>> HashSet<String> topicsSet = *new* HashSet<String>(Arrays.*asList*(
>> "kafkaTopic1"));
>>
>> HashMap<String, String> kafkaParams = *new* HashMap<String, String>();
>>
>> kafkaParams.put("metadata.broker.list", "localhost:9092");
>>
>> kafkaParams.put("auto.offset.reset", "smallest");
>>
>>
>>
>>
>>
>> JavaPairInputDStream<String, String> messages = KafkaUtils
>>
>> .*createDirectStream*(jssc, String.*class*, String.*class*,
>> StringDecoder.*class*, StringDecoder.*class*, kafkaParams, topicsSet);
>>
>>
>>
>> JavaDStream<String> inputStream = messages
>>
>>        .map(*new**Function<Tuple2<String, String>, String>()* {
>>
>>        @Override
>>
>>        *public* String call(Tuple2<String, String> tuple2) {
>>
>>               *return*tuple2._2();
>>
>>        }});
>>
>>
>>
>> inputStream.foreachRDD(*new**Function<JavaRDD<String>, Void>()* {
>>
>>
>>
>>        @Override
>>
>>        *public* Void call(JavaRDD<String> rdd)*throws* Exception {
>>
>>               *if*(!rdd.isEmpty())
>>
>>               {
>>
>> rdd.foreach(*new**VoidFunction<String>()*{
>>
>> @Override
>>
>>                       *public**void* call(String arg0)*throws* Exception
>> {
>>
>> System.*out*.println("------------------------rdd----------"+arg0);
>>
>> Thread.*sleep*(1000);
>>
>>
>>
>> *throw**new* Exception(" :::::::::::::::user and/or service
>> exception::::::::::::::"+arg0);
>>
>>
>>
>>                       }});
>>
>>
>>
>>               }
>>
>>               *return**null*;
>>
>>        }
>>
>> });
>>
>>
>>
>> *Detailed Description*: Using spark streaming I read the text messages
>> from kafka using direct API. For sake of simplicity, all I do in processing
>> is printing each message on console and sleep of 1 sec. as a placeholder
>> for actual processing. Assuming we get a user error may be due to bad
>> record, format error or the service connectivity issues or let’s say the
>> persistent store downtime. I’ve represented that with throwing an Exception
>> from foreach block. I understand spark retries this configurable number of
>> times and  proceeds ahead. The question is what happens to those failed
>> messages, does ( if yes when ) spark re-tries those ? If not, does it have
>> any callback method so as user can log / dump it in error queue and
>> provision it for further analysis and / or retrials manually. Also, fyi,
>> checkpoints are enabled and above code is in create context method to
>> recover from spark driver / worker failures.
>>
>> ------------------------------
>>
>>
>>
>>
>>
>>
>> NOTE: This message may contain information that is confidential,
>> proprietary, privileged or otherwise protected by law. The message is
>> intended solely for the named addressee. If received in error, please
>> destroy and notify the sender. Any use of this email is prohibited when
>> received in error. Impetus does not represent, warrant and/or guarantee,
>> that the integrity of this communication has been maintained nor that the
>> communication is free of errors, virus, interception or interference.
>>
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>> For additional commands, e-mail: user-help@spark.apache.org
>>
>
>
> ------------------------------
>
>
>
>
>
>
> NOTE: This message may contain information that is confidential,
> proprietary, privileged or otherwise protected by law. The message is
> intended solely for the named addressee. If received in error, please
> destroy and notify the sender. Any use of this email is prohibited when
> received in error. Impetus does not represent, warrant and/or guarantee,
> that the integrity of this communication has been maintained nor that the
> communication is free of errors, virus, interception or interference.
>

Mime
View raw message