spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Amit Assudani <aassud...@impetus.com>
Subject How to recover in case user errors in streaming
Date Fri, 26 Jun 2015 14:05:00 GMT
Problem: how do we recover from user errors (connectivity issues / storage service down / etc.)?
Environment: Spark streaming using Kafka Direct Streams
Code Snippet:

HashSet<String> topicsSet = new HashSet<String>(Arrays.asList("kafkaTopic1"));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("auto.offset.reset", "smallest");


JavaPairInputDStream<String, String> messages = KafkaUtils
.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topicsSet);

JavaDStream<String> inputStream = messages
       .map(new Function<Tuple2<String, String>, String>() {
       @Override
       public String call(Tuple2<String, String> tuple2) {
              return tuple2._2();
       }});

inputStream.foreachRDD(new Function<JavaRDD<String>, Void>() {

       @Override
       public Void call(JavaRDD<String> rdd) throws Exception {
              if(!rdd.isEmpty())
              {
rdd.foreach(new VoidFunction<String>(){
@Override
                      public void call(String arg0) throws Exception {
System.out.println("------------------------rdd----------"+arg0);
Thread.sleep(1000);

throw new Exception(" :::::::::::::::user and/or service exception::::::::::::::"+arg0);

                      }});

              }
              return null;
       }
});

Detailed Description: Using spark streaming I read the text messages from kafka using direct
API. For sake of simplicity, all I do in processing is printing each message on console and
sleep of 1 sec. as a placeholder for actual processing. Assuming we get a user error may be
due to bad record, format error or the service connectivity issues or let's say the persistent
store downtime. I've represented that with throwing an Exception from foreach block. I understand
spark retries this configurable number of times and  proceeds ahead. The question is what
happens to those failed messages, does ( if yes when ) spark re-tries those ? If not, does
it have any callback method so as user can log / dump it in error queue and provision it for
further analysis and / or retrials manually. Also, fyi, checkpoints are enabled and above
code is in create context method to recover from spark driver / worker failures.

________________________________






NOTE: This message may contain information that is confidential, proprietary, privileged or
otherwise protected by law. The message is intended solely for the named addressee. If received
in error, please destroy and notify the sender. Any use of this email is prohibited when received
in error. Impetus does not represent, warrant and/or guarantee, that the integrity of this
communication has been maintained nor that the communication is free of errors, virus, interception
or interference.

Mime
View raw message