spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Akhil Das <ak...@sigmoidanalytics.com>
Subject Re: How to catch error during Spark job?
Date Tue, 03 Nov 2015 06:37:02 GMT
If you are expecting the job state to be killed or error, then you can kill
the executor instance (worker process on the worker machine). We have
custom build (combination of nagios, and written codes) monitoring to keep
track of our spark jobs in production, apart from that using high
availability clusters and launching the jobs as mesos framework makes sure
that it doesn;t get killed and all.

Thanks
Best Regards

On Tue, Nov 3, 2015 at 3:51 AM, Isabelle Phan <nliphan@gmail.com> wrote:

> Hi Akhil,
>
> Thanks a lot for your reply.
>
> I ran your code on our cluster, and the status is still finished. 😅
>
> [image: Inline image 1]
>
> The spark-submit trace does show the error message though:
> 15/11/03 06:32:54 INFO
> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
> OutputCommitCoordinator stopped!
> Exception in thread "main" org.apache.spark.SparkException: Job aborted
> due to stage failure: Task 2 in stage 0.0 failed 4 times, most recent
> failure: Lost task 2.3 in stage 0.0 (TID 20): java.lang.Exception: Real
> failure!
>
>
> How do you usually keep track of your Spark jobs' status?
>
>
> Regards,
>
> Isabelle
>
>
>
>
> On Mon, Nov 2, 2015 at 1:18 AM, Akhil Das <akhil@sigmoidanalytics.com>
> wrote:
>
>> Usually you add exception handling within the transformations, in your
>> case you have it added in the driver code. This approach won't be able to
>> catch those exceptions happening inside the executor.
>>
>> eg:
>>
>> try {
>>       val rdd = sc.parallelize(1 to 100)
>>
>>       rdd.foreach(x => throw new Exception("Real failure!")) //This could
>> be rdd.map etc
>>
>>       val count = rdd.count
>>
>>       println(s"Count: $count")
>>
>>       *throw new Exception("Fail!")*
>>
>>     } finally {
>>       sc.stop
>>     }
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Oct 28, 2015 at 7:10 AM, Isabelle Phan <nliphan@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> I had a question about error handling in Spark job: if an exception
>>> occurs during the job, what is the best way to get notification of the
>>> failure? Can Spark jobs return with different exit codes?
>>>
>>> For example, I wrote a dummy Spark job just throwing out an Exception,
>>> as follows:
>>> import org.apache.spark.SparkContext
>>> import org.apache.spark.SparkContext._
>>> import org.apache.spark.SparkConf
>>>
>>> object ExampleJob {
>>>   def main(args: Array[String]): Unit = {
>>>     val conf = new SparkConf().setAppName("Test Job")
>>>     val sc = new SparkContext(conf)
>>>     try {
>>>       val count = sc.parallelize(1 to 100).count
>>>       println(s"Count: $count")
>>>
>>>       *throw new Exception("Fail!")*
>>>
>>>     } finally {
>>>       sc.stop
>>>     }
>>>   }
>>>
>>> }
>>>
>>> The spark-submit execution trace shows the error:
>>> spark-submit --class com.test.ExampleJob test.jar
>>> 15/10/03 03:13:16 INFO SparkContext: Running Spark version 1.4.0
>>> 15/10/03 03:13:19 WARN SparkConf: In Spark 1.0 and later spark.local.dir
>>> will be overridden by the value set by the cluster manager (via
>>> SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
>>> 15/10/03 03:13:19 WARN SparkConf:
>>> ...
>>> 15/10/03 03:13:59 INFO DAGScheduler: Job 0 finished: count at
>>> ExampleJob.scala:12, took 18.879104 s
>>> Count: 100
>>> 15/10/03 03:13:59 INFO SparkUI: Stopped Spark web UI at []
>>> 15/10/03 03:13:59 INFO DAGScheduler: Stopping DAGScheduler
>>> 15/10/03 03:13:59 INFO SparkDeploySchedulerBackend: Shutting down all
>>> executors
>>> 15/10/03 03:13:59 INFO SparkDeploySchedulerBackend: Asking each executor
>>> to shut down
>>> 15/10/03 03:13:59 INFO MapOutputTrackerMasterEndpoint:
>>> MapOutputTrackerMasterEndpoint stopped!
>>> 15/10/03 03:13:59 INFO Utils: path =
>>> /data1/spark/tmp/spark-d8c0a18f-6e45-46c8-a208-1e1ad36ae596/blockmgr-d8e40805-3b8c-45f4-97b3-b89874158796,
>>> already present as root for deletion.
>>> 15/10/03 03:13:59 INFO MemoryStore: MemoryStore cleared
>>> 15/10/03 03:13:59 INFO BlockManager: BlockManager stopped
>>> 15/10/03 03:13:59 INFO BlockManagerMaster: BlockManagerMaster stopped
>>> 15/10/03 03:13:59 INFO
>>> OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:
>>> OutputCommitCoordinator stopped!
>>> 15/10/03 03:13:59 INFO SparkContext: Successfully stopped SparkContext
>>> Exception in thread "main" java.lang.Exception: Fail!
>>> at com.test.ExampleJob$.main(ExampleJob.scala:14)
>>> at com.test.ExampleJob.main(ExampleJob.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>>> at
>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> 15/10/03 03:13:59 INFO RemoteActorRefProvider$RemotingTerminator:
>>> Shutting down remote daemon.
>>> 15/10/03 03:13:59 INFO RemoteActorRefProvider$RemotingTerminator: Remote
>>> daemon shut down; proceeding with flushing remote transports.
>>> 15/10/03 03:13:59 INFO Utils: Shutdown hook called
>>> 15/10/03 03:13:59 INFO Utils: Deleting directory
>>> /data1/spark/tmp/spark-d8c0a18f-6e45-46c8-a208-1e1ad36ae596
>>> 15/10/03 03:14:00 INFO RemoteActorRefProvider$RemotingTerminator:
>>> Remoting shut down.
>>>
>>>
>>> However, the Spark UI just shows the status as "FINISHED". Is this a
>>> configuration error on my side?
>>> [image: Inline image 1]
>>>
>>>
>>> Thanks,
>>>
>>> Isabelle
>>>
>>
>>
>

Mime
View raw message