spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Imran Rashid <im...@therashids.com>
Subject Re: continuing processing when errors occur
Date Thu, 24 Jul 2014 19:58:47 GMT
Hi Art,

I have some advice that isn't spark-specific at all, so it doesn't
*exactly* address your questions, but you might still find helpful.  I
think using an implicit to add your retyring behavior might be useful.  I
can think of two options:

1. enriching RDD itself, eg. to add a .retryForeach, which would have the
desired behavior.

2. enriching Function to create a variant with retry behavior.

I prefer option 2, because it could be useful outside of spark, and even
within spark, you might realize you want to do something similar for more
than just foreach.

Here's an example.  (probably there is a more functional way to do this, to
avoid the while loop, but my brain isn't working and that's not the point
of this anyway)

Lets say we have this function:

def tenDiv(x:Int) = println(10 / x)

and we try applying it to a normal old Range:

scala> (-10 to 10).foreach{tenDiv}
-1
-1
-1
-1
-1
-2
-2
-3
-5
-10
java.lang.ArithmeticException: / by zero
    at .tenDiv(<console>:7)


We can create enrich Function to add some retry behavior:

class RetryFunction[-A](nTries: Int,f: A => Unit) extends Function[A,Unit]
{
  def apply(a: A): Unit = {
    var tries = 0
    var success = false
    while(!success && tries < nTries) {
      tries += 1
      try {
        f(a)
      } catch {
        case scala.util.control.NonFatal(ex) =>
          println(s"failed on try $tries with $ex")
      }
    }
  }
}

implicit class Retryable[A](f: A => Unit) {
  def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f)
}



We "activate" this behavior by calling .retryable(nTries) on our method.
Like so:

scala> (-2 to 2).foreach{(tenDiv _).retryable(1)}
-5
-10
failed on try 1 with java.lang.ArithmeticException: / by zero
10
5

scala> (-2 to 2).foreach{(tenDiv _).retryable(3)}
-5
-5
-5
-10
-10
-10
failed on try 1 with java.lang.ArithmeticException: / by zero
failed on try 2 with java.lang.ArithmeticException: / by zero
failed on try 3 with java.lang.ArithmeticException: / by zero
10
10
10
5
5
5


You could do the same thing on closures you pass to RDD.foreach.

I should add, that I'm often very hesitant to use implicits because in can
make it harder to follow what's going on in the code.  I think this version
is OK, though, b/c somebody coming along later and looking at the code at
least can see the call to "retryable" as a clue.  (I really dislike
implicit conversions that happen without any hints in the actual code.)
Hopefully that's enough of a hint for others to figure out what is going
on.  Eg., intellij will know where that method came from and jump to it,
and also if you make the name unique enough, you can probably find it with
plain text search / c-tags.  But, its definitely worth considering for
yourself.

hope this helps,
Imran



On Thu, Jul 24, 2014 at 1:12 PM, Art Peel <foundart@gmail.com> wrote:

> Our system works with RDDs generated from Hadoop files. It processes each
> record in a Hadoop file and for a subset of those records generates output
> that is written to an external system via RDD.foreach. There are no
> dependencies between the records that are processed.
>
> If writing to the external system fails (due to a detail of what is being
> written) and throws an exception, I see the following behavior:
>
> 1. Spark retries the entire partition (thus wasting time and effort),
> reaches the problem record and fails again.
> 2. It repeats step 1 up to the default 4 tries and then gives up. As a
> result, the rest of records from that Hadoop file are not processed.
> 3. The executor where the 4th failure occurred is marked as failed and
> told to shut down and thus I lose a core for processing the remaining
> Hadoop files, thus slowing down the entire process.
>
>
> For this particular problem, I know how to prevent the underlying
> exception, but I'd still like to get a handle on error handling for future
> situations that I haven't yet encountered.
>
> My goal is this:
> Retry the problem record only (rather than starting over at the beginning
> of the partition) up to N times, then give up and move on to process the
> rest of the partition.
>
> As far as I can tell, I need to supply my own retry behavior and if I want
> to process records after the problem record I have to swallow exceptions
> inside the foreach block.
>
> My 2 questions are:
> 1. Is there anything I can do to prevent the executor from being shut down
> when a failure occurs?
>
>
> 2. Are there ways Spark can help me get closer to my goal of retrying only
> the problem record without writing my own re-try code and swallowing
> exceptions?
>
> Regards,
> Art
>
>

Mime
View raw message