spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Piotr Kołaczkowski (JIRA) <j...@apache.org>
Subject [jira] [Commented] (SPARK-1712) ParallelCollectionRDD operations hanging forever without any error messages
Date Mon, 05 May 2014 19:31:18 GMT

    [ https://issues.apache.org/jira/browse/SPARK-1712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13989876#comment-13989876
] 

Piotr Kołaczkowski commented on SPARK-1712:
-------------------------------------------

This is log from shell:

{noformat}
scala> val rdd = sc.parallelize(collection)
14/05/05 21:23:16 DEBUG SparkILoop$SparkILoopInterpreter: parse("       val rdd = sc.parallelize(collection)
") Some(List(val rdd = sc.parallelize(collection)))
14/05/05 21:23:16 DEBUG SparkILoop$SparkILoopInterpreter:   11: ValDef
  11: TypeTree
  31: Apply
  20: Select
  17: Ident
  32: Ident

14/05/05 21:23:16 DEBUG SparkILoop$SparkILoopInterpreter: parse("
class $read extends Serializable {
  class $iwC extends Serializable {
val $VAL2 = $line3.$read.INSTANCE;
import $VAL2.$iw.$iw.`sc`;
class $iwC extends Serializable {
import org.apache.spark.SparkContext._
class $iwC extends Serializable {
class $iwC extends Serializable {
import com.datastax.bdp.spark.CassandraFunctions._
class $iwC extends Serializable {
import com.datastax.bdp.spark.context.CassandraContext
class $iwC extends Serializable {
import com.tuplejump.calliope.Implicits._
class $iwC extends Serializable {
val $VAL3 = $line17.$read.INSTANCE;
import $VAL3.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.`collection`;
class $iwC extends Serializable {
       val rdd = sc.parallelize(collection)

      

}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;

}
object $read {
  val INSTANCE = new $read();
}

") Some(List(class $read extends Serializable {
  def <init>() = {
    super.<init>();
    ()
  };
  class $iwC extends Serializable {
    def <init>() = {
      super.<init>();
      ()
    };
    val $VAL2 = $line3.$read.INSTANCE;
    import $VAL2.$iw.$iw.sc;
    class $iwC extends Serializable {
      def <init>() = {
        super.<init>();
        ()
      };
      import org.apache.spark.SparkContext._;
      class $iwC extends Serializable {
        def <init>() = {
          super.<init>();
          ()
        };
        class $iwC extends Serializable {
          def <init>() = {
            super.<init>();
            ()
          };
          import com.datastax.bdp.spark.CassandraFunctions._;
          class $iwC extends Serializable {
            def <init>() = {
              super.<init>();
              ()
            };
            import com.datastax.bdp.spark.context.CassandraContext;
            class $iwC extends Serializable {
              def <init>() = {
                super.<init>();
                ()
              };
              import com.tuplejump.calliope.Implicits._;
              class $iwC extends Serializable {
                def <init>() = {
                  super.<init>();
                  ()
                };
                val $VAL3 = $line17.$read.INSTANCE;
                import $VAL3.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.collection;
                class $iwC extends Serializable {
                  def <init>() = {
                    super.<init>();
                    ()
                  };
                  val rdd = sc.parallelize(collection)
                };
                val $iw = new $iwC()
              };
              val $iw = new $iwC()
            };
            val $iw = new $iwC()
          };
          val $iw = new $iwC()
        };
        val $iw = new $iwC()
      };
      val $iw = new $iwC()
    };
    val $iw = new $iwC()
  };
  val $iw = new $iwC()
}, object $read extends scala.AnyRef {
  def <init>() = {
    super.<init>();
    ()
  };
  val INSTANCE = new $read()
}))
14/05/05 21:23:16 DEBUG SparkILoop$SparkILoopInterpreter: class $read extends Serializable
{
  def <init>() = {
    super.<init>;
    ()
  };
  class $iwC extends Serializable {
    def <init>() = {
      super.<init>;
      ()
    };
    val $VAL2 = $line3.$read.INSTANCE;
    import $VAL2.$iw.$iw.sc;
    class $iwC extends Serializable {
      def <init>() = {
        super.<init>;
        ()
      };
      import org.apache.spark.SparkContext._;
      class $iwC extends Serializable {
        def <init>() = {
          super.<init>;
          ()
        };
        class $iwC extends Serializable {
          def <init>() = {
            super.<init>;
            ()
          };
          import com.datastax.bdp.spark.CassandraFunctions._;
          class $iwC extends Serializable {
            def <init>() = {
              super.<init>;
              ()
            };
            import com.datastax.bdp.spark.context.CassandraContext;
            class $iwC extends Serializable {
              def <init>() = {
                super.<init>;
                ()
              };
              import com.tuplejump.calliope.Implicits._;
              class $iwC extends Serializable {
                def <init>() = {
                  super.<init>;
                  ()
                };
                val $VAL3 = $line17.$read.INSTANCE;
                import $VAL3.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.collection;
                class $iwC extends Serializable {
                  def <init>() = {
                    super.<init>;
                    ()
                  };
                  val rdd = sc parallelize collection
                };
                val $iw = new $iwC.<init>
              };
              val $iw = new $iwC.<init>
            };
            val $iw = new $iwC.<init>
          };
          val $iw = new $iwC.<init>
        };
        val $iw = new $iwC.<init>
      };
      val $iw = new $iwC.<init>
    };
    val $iw = new $iwC.<init>
  };
  val $iw = new $iwC.<init>
}
14/05/05 21:23:16 DEBUG SparkILoop$SparkILoopInterpreter: object $read extends scala.AnyRef
{
  def <init>() = {
    super.<init>;
    ()
  };
  val INSTANCE = new $read.<init>
}
14/05/05 21:23:16 DEBUG SparkILoop$SparkILoopInterpreter: Set symbol of rdd to val rdd():
org.apache.spark.rdd.RDD
14/05/05 21:23:16 DEBUG SparkILoop$SparkILoopInterpreter: parse("
object $eval {
  lazy val $result = $line18.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.`rdd`
  val $print: String =  {
    $read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw
    (""
      
 + "rdd: org.apache.spark.rdd.RDD[(String, Int)] = " + scala.runtime.ScalaRunTime.replStringOf($line18.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.`rdd`,
1000)

    )
  }
}
      
") Some(List(object $eval extends scala.AnyRef {
  def <init>() = {
    super.<init>();
    ()
  };
  lazy val $result = $line18.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.rdd;
  val $print: String = {
    $read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw;
    "".$plus("rdd: org.apache.spark.rdd.RDD[(String, Int)] = ").$plus(scala.runtime.ScalaRunTime.replStringOf($line18.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.rdd,
1000))
  }
}))
14/05/05 21:23:16 DEBUG SparkILoop$SparkILoopInterpreter: object $eval extends scala.AnyRef
{
  def <init>() = {
    super.<init>;
    ()
  };
  lazy val $result = $line18.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.rdd;
  val $print: String = {
    $read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw;
    "".+("rdd: org.apache.spark.rdd.RDD[(String, Int)] = ").+(scala.runtime.ScalaRunTime.replStringOf($line18.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.rdd,
1000))
  }
}
14/05/05 21:23:16 DEBUG SparkILoop$SparkILoopInterpreter: Invoking: public static java.lang.String
$line18.$eval.$print()
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at
<console>:21

scala> rdd.map(_._2).sum
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: parse("       rdd.map(_._2).sum
") Some(List(rdd.map(((x$1) => x$1._2)).sum))
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter:   21: Select
  14: Apply
  11: Select
  7: Ident
  17: Function
  15: ValDef
  15: TypeTree
  -1: EmptyTree
  17: Select
  15: Ident

14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: parse("       val res3 =
              rdd.map(_._2).sum
") Some(List(val res3 = rdd.map(((x$1) => x$1._2)).sum))
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter:   11: ValDef
  11: TypeTree
  46: Select
  39: Apply
  36: Select
  32: Ident
  42: Function
  40: ValDef
  40: TypeTree
  -1: EmptyTree
  42: Select
  40: Ident

14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: parse("
class $read extends Serializable {
  class $iwC extends Serializable {
val $VAL4 = $line3.$read.INSTANCE;
import $VAL4.$iw.$iw.`sc`;
class $iwC extends Serializable {
import org.apache.spark.SparkContext._
class $iwC extends Serializable {
class $iwC extends Serializable {
import com.datastax.bdp.spark.CassandraFunctions._
class $iwC extends Serializable {
import com.datastax.bdp.spark.context.CassandraContext
class $iwC extends Serializable {
import com.tuplejump.calliope.Implicits._
class $iwC extends Serializable {
val $VAL5 = $line17.$read.INSTANCE;
import $VAL5.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.`collection`;
val $VAL6 = $line18.$read.INSTANCE;
import $VAL6.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.`rdd`;
class $iwC extends Serializable {
       val res3 =
              rdd.map(_._2).sum

      

}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;

}
object $read {
  val INSTANCE = new $read();
}

") Some(List(class $read extends Serializable {
  def <init>() = {
    super.<init>();
    ()
  };
  class $iwC extends Serializable {
    def <init>() = {
      super.<init>();
      ()
    };
    val $VAL4 = $line3.$read.INSTANCE;
    import $VAL4.$iw.$iw.sc;
    class $iwC extends Serializable {
      def <init>() = {
        super.<init>();
        ()
      };
      import org.apache.spark.SparkContext._;
      class $iwC extends Serializable {
        def <init>() = {
          super.<init>();
          ()
        };
        class $iwC extends Serializable {
          def <init>() = {
            super.<init>();
            ()
          };
          import com.datastax.bdp.spark.CassandraFunctions._;
          class $iwC extends Serializable {
            def <init>() = {
              super.<init>();
              ()
            };
            import com.datastax.bdp.spark.context.CassandraContext;
            class $iwC extends Serializable {
              def <init>() = {
                super.<init>();
                ()
              };
              import com.tuplejump.calliope.Implicits._;
              class $iwC extends Serializable {
                def <init>() = {
                  super.<init>();
                  ()
                };
                val $VAL5 = $line17.$read.INSTANCE;
                import $VAL5.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.collection;
                val $VAL6 = $line18.$read.INSTANCE;
                import $VAL6.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.rdd;
                class $iwC extends Serializable {
                  def <init>() = {
                    super.<init>();
                    ()
                  };
                  val res3 = rdd.map(((x$1) => x$1._2)).sum
                };
                val $iw = new $iwC()
              };
              val $iw = new $iwC()
            };
            val $iw = new $iwC()
          };
          val $iw = new $iwC()
        };
        val $iw = new $iwC()
      };
      val $iw = new $iwC()
    };
    val $iw = new $iwC()
  };
  val $iw = new $iwC()
}, object $read extends scala.AnyRef {
  def <init>() = {
    super.<init>();
    ()
  };
  val INSTANCE = new $read()
}))
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: class $read extends Serializable
{
  def <init>() = {
    super.<init>;
    ()
  };
  class $iwC extends Serializable {
    def <init>() = {
      super.<init>;
      ()
    };
    val $VAL4 = $line3.$read.INSTANCE;
    import $VAL4.$iw.$iw.sc;
    class $iwC extends Serializable {
      def <init>() = {
        super.<init>;
        ()
      };
      import org.apache.spark.SparkContext._;
      class $iwC extends Serializable {
        def <init>() = {
          super.<init>;
          ()
        };
        class $iwC extends Serializable {
          def <init>() = {
            super.<init>;
            ()
          };
          import com.datastax.bdp.spark.CassandraFunctions._;
          class $iwC extends Serializable {
            def <init>() = {
              super.<init>;
              ()
            };
            import com.datastax.bdp.spark.context.CassandraContext;
            class $iwC extends Serializable {
              def <init>() = {
                super.<init>;
                ()
              };
              import com.tuplejump.calliope.Implicits._;
              class $iwC extends Serializable {
                def <init>() = {
                  super.<init>;
                  ()
                };
                val $VAL5 = $line17.$read.INSTANCE;
                import $VAL5.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.collection;
                val $VAL6 = $line18.$read.INSTANCE;
                import $VAL6.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.rdd;
                class $iwC extends Serializable {
                  def <init>() = {
                    super.<init>;
                    ()
                  };
                  val res3 = rdd.map(((x$1) => x$1._2)).sum
                };
                val $iw = new $iwC.<init>
              };
              val $iw = new $iwC.<init>
            };
            val $iw = new $iwC.<init>
          };
          val $iw = new $iwC.<init>
        };
        val $iw = new $iwC.<init>
      };
      val $iw = new $iwC.<init>
    };
    val $iw = new $iwC.<init>
  };
  val $iw = new $iwC.<init>
}
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: object $read extends scala.AnyRef
{
  def <init>() = {
    super.<init>;
    ()
  };
  val INSTANCE = new $read.<init>
}
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: Set symbol of res3 to val res3():
Double
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: parse("
object $eval {
  lazy val $result = $line19.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.`res3`
  val $print: String =  {
    $read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw
    (""
      
 + "res3: Double = " + scala.runtime.ScalaRunTime.replStringOf($line19.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.`res3`,
1000)

    )
  }
}
      
") Some(List(object $eval extends scala.AnyRef {
  def <init>() = {
    super.<init>();
    ()
  };
  lazy val $result = $line19.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.res3;
  val $print: String = {
    $read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw;
    "".$plus("res3: Double = ").$plus(scala.runtime.ScalaRunTime.replStringOf($line19.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.res3,
1000))
  }
}))
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: object $eval extends scala.AnyRef
{
  def <init>() = {
    super.<init>;
    ()
  };
  lazy val $result = $line19.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.res3;
  val $print: String = {
    $read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw;
    "".+("res3: Double = ").+(scala.runtime.ScalaRunTime.replStringOf($line19.$read.INSTANCE.$iw.$iw.$iw.$iw.$iw.$iw.$iw.$iw.res3,
1000))
  }
}
14/05/05 21:23:45 DEBUG SparkILoop$SparkILoopInterpreter: Invoking: public static java.lang.String
$line19.$eval.$print()
14/05/05 21:23:46 INFO SharkContext: Starting job: sum at <console>:24
14/05/05 21:23:46 INFO DAGScheduler: Got job 0 (sum at <console>:24) with 2 output partitions
(allowLocal=false)
14/05/05 21:23:46 INFO DAGScheduler: Final stage: Stage 0 (sum at <console>:24)
14/05/05 21:23:46 INFO DAGScheduler: Parents of final stage: List()
14/05/05 21:23:46 INFO DAGScheduler: Missing parents: List()
14/05/05 21:23:46 DEBUG DAGScheduler: submitStage(Stage 0)
14/05/05 21:23:46 DEBUG DAGScheduler: missing: List()
14/05/05 21:23:46 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[2] at numericRDDToDoubleRDDFunctions
at <console>:24), which has no missing parents
14/05/05 21:23:46 DEBUG DAGScheduler: submitMissingTasks(Stage 0)
14/05/05 21:23:46 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[2]
at numericRDDToDoubleRDDFunctions at <console>:24)
14/05/05 21:23:46 DEBUG DAGScheduler: New pending tasks: Set(ResultTask(0, 0), ResultTask(0,
1))
14/05/05 21:23:46 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
14/05/05 21:23:46 DEBUG TaskSetManager: Epoch for TaskSet 0.0: 0
14/05/05 21:23:46 DEBUG TaskSetManager: Valid locality levels for TaskSet 0.0: ANY
14/05/05 21:23:46 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 0
14/05/05 21:23:46 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 0
14/05/05 21:23:46 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor 0: 127.0.0.1
(PROCESS_LOCAL)
14/05/05 21:23:47 INFO TaskSetManager: Serialized task 0.0:0 as 13890654 bytes in 617 ms
14/05/05 21:23:47 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 1
14/05/05 21:23:47 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 1
14/05/05 21:23:48 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 1
14/05/05 21:23:48 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 1
14/05/05 21:23:49 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 1
14/05/05 21:23:49 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 1
14/05/05 21:23:50 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 1
14/05/05 21:23:50 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 1
14/05/05 21:23:51 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 1
14/05/05 21:23:51 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_0, runningTasks: 1
{noformat}

How to enable logging from Executor? It doesn't seem to log anything anywhere, but maybe something
is misconfigured.

> ParallelCollectionRDD operations hanging forever without any error messages 
> ----------------------------------------------------------------------------
>
>                 Key: SPARK-1712
>                 URL: https://issues.apache.org/jira/browse/SPARK-1712
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 0.9.0
>         Environment: Linux Ubuntu 14.04, a single spark node; standalone mode.
>            Reporter: Piotr Kołaczkowski
>         Attachments: executor.jstack.txt, master.jstack.txt, repl.jstack.txt, spark-hang.png,
worker.jstack.txt
>
>
> {noformat}
> scala> val collection = (1 to 1000000).map(i => ("foo" + i, i)).toVector
> collection: Vector[(String, Int)] = Vector((foo1,1), (foo2,2), (foo3,3), (foo4,4), (foo5,5),
(foo6,6), (foo7,7), (foo8,8), (foo9,9), (foo10,10), (foo11,11), (foo12,12), (foo13,13), (foo14,14),
(foo15,15), (foo16,16), (foo17,17), (foo18,18), (foo19,19), (foo20,20), (foo21,21), (foo22,22),
(foo23,23), (foo24,24), (foo25,25), (foo26,26), (foo27,27), (foo28,28), (foo29,29), (foo30,30),
(foo31,31), (foo32,32), (foo33,33), (foo34,34), (foo35,35), (foo36,36), (foo37,37), (foo38,38),
(foo39,39), (foo40,40), (foo41,41), (foo42,42), (foo43,43), (foo44,44), (foo45,45), (foo46,46),
(foo47,47), (foo48,48), (foo49,49), (foo50,50), (foo51,51), (foo52,52), (foo53,53), (foo54,54),
(foo55,55), (foo56,56), (foo57,57), (foo58,58), (foo59,59), (foo60,60), (foo61,61), (foo62,62),
(foo63,63), (foo64,64), (foo...
> scala> val rdd = sc.parallelize(collection)
> rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize
at <console>:24
> scala> rdd.first
> res4: (String, Int) = (foo1,1)
> scala> rdd.map(_._2).sum
> // nothing happens
> {noformat}
> CPU and I/O idle. 
> Memory usage reported by JVM, after manually triggered GC:
> repl: 216 MB / 2 GB
> executor: 67 MB / 2 GB
> worker: 6 MB / 128 MB
> master: 6 MB / 128 MB
> No errors found in worker's stderr/stdout. 
> It works fine with 700,000 elements and then it takes about 1 second to process the request
and calculate the sum. With 700,000 items the spark executor memory doesn't even exceed 300
MB out of 2GB available. It fails with 800,000 items.
> Multiple parralelized collections of size 700,000 items at the same time in the same
session work fine.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message