spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ian (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-19462) when spark.sql.adaptive.enabled is enabled, RDD is not resilient to node container failure
Date Sun, 05 Feb 2017 10:25:41 GMT

     [ https://issues.apache.org/jira/browse/SPARK-19462?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Ian updated SPARK-19462:
------------------------
    Description: 
property spark.sql.adaptive.enabled needs to be set "true"

reproducible steps using spark-shell
0. we use yarn as cluster manager, spark-shell runs in client mode 
1. launch spark-shell
2. 
{code}
val df1 = sc.parallelize( 1 to 1000, 2).toDF("number")
df1.registerTempTable("test")

val data1 = sqlContext.sql("SELECT * FROM test WHERE number > 50")
data1.collect

val data2 = sqlContext.sql("SELECT number, count(*) cnt FROM test GROUP BY number")
data2.collect

// everything is fine up to this point
// manually kill both the AM and all the NMs of the spark-shell app

// re-run data1.collect, the result is returned successfully
data1.collect

// but data2.collect will fail
data2.collect

// stacktrace
Caused by: java.lang.RuntimeException: Exchange not implemented for UnknownPartitioning(1)
  at scala.sys.package$.error(package.scala:27)
  at org.apache.spark.sql.execution.Exchange.org$apache$spark$sql$execution$Exchange$$getPartitionKeyExtractor$1(Exchange.scala:198)
  at org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:208)
  at org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:207)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

{code}

The difference between data1 and data2 is whether ShuffledRowRDD is present in lineage.
When the RDD lineage contains ShuffledRowRDD, the above mentioned behavior can be observed
when node failures or container loss happens.
{code}
scala> data2.rdd.toDebugString
res6: String =
(1) MapPartitionsRDD[20] at rdd at <console>:26 []
 |  MapPartitionsRDD[19] at rdd at <console>:26 []
 |  ShuffledRowRDD[8] at collect at <console>:26 []
 +-(2) MapPartitionsRDD[7] at collect at <console>:26 []
    |  MapPartitionsRDD[6] at collect at <console>:26 []
    |  MapPartitionsRDD[5] at collect at <console>:26 []
    |  MapPartitionsRDD[1] at intRddToDataFrameHolder at <console>:25 []
    |  ParallelCollectionRDD[0] at parallelize at <console>:25 []

{code}



  was:
property spark.sql.adaptive.enabled needs to be set "true"

reproducible steps using spark-shell
0. we use yarn as cluster manager, spark-shell runs in client mode 
1. launch spark-shell
2. 
{code}
val df1 = sc.parallelize( 1 to 1000, 2).toDF("number")
df1.registerTempTable("test")

val data1 = sqlContext.sql("SELECT * FROM test WHERE number > 50")
data1.collect

val data2 = sqlContext.sql("SELECT number, count(*) cnt FROM test GROUP BY number")
data2.collect

// everything is fine up to this point
// manually kill both the AM and all the NMs of the spark-shell app

// re-run data1.collect, the result is returned successfully
data1.collect

// but data2.collect will fail
data2.collect

// stacktrace
Caused by: java.lang.RuntimeException: Exchange not implemented for UnknownPartitioning(1)
  at scala.sys.package$.error(package.scala:27)
  at org.apache.spark.sql.execution.Exchange.org$apache$spark$sql$execution$Exchange$$getPartitionKeyExtractor$1(Exchange.scala:198)
  at org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:208)
  at org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:207)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
  at org.apache.spark.scheduler.Task.run(Task.scala:89)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

{code}







> when spark.sql.adaptive.enabled is enabled, RDD is not resilient to node container failure
> ------------------------------------------------------------------------------------------
>
>                 Key: SPARK-19462
>                 URL: https://issues.apache.org/jira/browse/SPARK-19462
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.6.3
>            Reporter: Ian
>
> property spark.sql.adaptive.enabled needs to be set "true"
> reproducible steps using spark-shell
> 0. we use yarn as cluster manager, spark-shell runs in client mode 
> 1. launch spark-shell
> 2. 
> {code}
> val df1 = sc.parallelize( 1 to 1000, 2).toDF("number")
> df1.registerTempTable("test")
> val data1 = sqlContext.sql("SELECT * FROM test WHERE number > 50")
> data1.collect
> val data2 = sqlContext.sql("SELECT number, count(*) cnt FROM test GROUP BY number")
> data2.collect
> // everything is fine up to this point
> // manually kill both the AM and all the NMs of the spark-shell app
> // re-run data1.collect, the result is returned successfully
> data1.collect
> // but data2.collect will fail
> data2.collect
> // stacktrace
> Caused by: java.lang.RuntimeException: Exchange not implemented for UnknownPartitioning(1)
>   at scala.sys.package$.error(package.scala:27)
>   at org.apache.spark.sql.execution.Exchange.org$apache$spark$sql$execution$Exchange$$getPartitionKeyExtractor$1(Exchange.scala:198)
>   at org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:208)
>   at org.apache.spark.sql.execution.Exchange$$anonfun$3.apply(Exchange.scala:207)
>   at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
>   at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$21.apply(RDD.scala:728)
>   at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
>   at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> {code}
> The difference between data1 and data2 is whether ShuffledRowRDD is present in lineage.
> When the RDD lineage contains ShuffledRowRDD, the above mentioned behavior can be observed
when node failures or container loss happens.
> {code}
> scala> data2.rdd.toDebugString
> res6: String =
> (1) MapPartitionsRDD[20] at rdd at <console>:26 []
>  |  MapPartitionsRDD[19] at rdd at <console>:26 []
>  |  ShuffledRowRDD[8] at collect at <console>:26 []
>  +-(2) MapPartitionsRDD[7] at collect at <console>:26 []
>     |  MapPartitionsRDD[6] at collect at <console>:26 []
>     |  MapPartitionsRDD[5] at collect at <console>:26 []
>     |  MapPartitionsRDD[1] at intRddToDataFrameHolder at <console>:25 []
>     |  ParallelCollectionRDD[0] at parallelize at <console>:25 []
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message