spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apache Spark (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-5316) DAGScheduler may make shuffleToMapStage leak if getParentStages failes
Date Mon, 19 Jan 2015 11:27:34 GMT

    [ https://issues.apache.org/jira/browse/SPARK-5316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14282416#comment-14282416
] 

Apache Spark commented on SPARK-5316:
-------------------------------------

User 'YanTangZhai' has created a pull request for this issue:
https://github.com/apache/spark/pull/4105

> DAGScheduler may make shuffleToMapStage leak if getParentStages failes
> ----------------------------------------------------------------------
>
>                 Key: SPARK-5316
>                 URL: https://issues.apache.org/jira/browse/SPARK-5316
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>            Reporter: YanTang Zhai
>            Priority: Minor
>
> DAGScheduler may make shuffleToMapStage leak if getParentStages failes.
> If getParentStages has exception for example input path does not exist, DAGScheduler
would fail to handle job submission, while shuffleToMapStage may be put some records when
getParentStages. However these records in shuffleToMapStage aren't going to be cleaned.
> A simple job as follows:
> {code:java}
> val inputFile1 = ... // Input path does not exist when this job submits
> val inputFile2 = ...
> val outputFile = ...
> val conf = new SparkConf()
> val sc = new SparkContext(conf)
> val rdd1 = sc.textFile(inputFile1)
>                     .flatMap(line => line.split(" "))
>                     .map(word => (word, 1))
>                     .reduceByKey(_ + _, 1)
> val rdd2 = sc.textFile(inputFile2)
>                     .flatMap(line => line.split(","))
>                     .map(word => (word, 1))
>                     .reduceByKey(_ + _, 1)
> try {
>   val rdd3 = new PairRDDFunctions(rdd1).join(rdd2, 1)
>   rdd3.saveAsTextFile(outputFile)
> } catch {
>   case e : Exception =>
>       logError(e)
> }
> // print the information of DAGScheduler's shuffleToMapStage to check
> // whether it still has uncleaned records.
> ...
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message