[ https://issues.apache.org/jira/browse/CRUNCH-560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nithin Asokan updated CRUNCH-560: --------------------------------- Attachment: CRUNCH-560-001.patch Attaching a patch that creates SparkRuntime with hadoop configuration from SparkContext. I have tested this patch and it appears to work for me, I don't see ClassNotFoundException anymore. One thing I'm not quite following is how {{yarn-client}} works and cluster mode seems to fail with this error. > SparkPipeline should honor Spark Hadoop configuration > ----------------------------------------------------- > > Key: CRUNCH-560 > URL: https://issues.apache.org/jira/browse/CRUNCH-560 > Project: Crunch > Issue Type: Bug > Components: Spark > Reporter: Nithin Asokan > Attachments: CRUNCH-560-001.patch > > > Executing a SparkPipeline using {{SparkPipeline(String sparkConnect, String appName, Class jarClass, Configuration conf)}} constructor and {{yarn-cluster}} mode via Oozie Spark action causes a ClassNotFoundException during job creation. The problem appears to be Spark not being able to read Crunch InputFormats from Hadoop configuration. > {code} > 15/09/18 00:06:39 WARN scheduler.DAGScheduler: Creating new stage failed due to exception - job: 0 > java.lang.RuntimeException: readObject can't find class > at org.apache.crunch.io.FormatBundle.readClass(FormatBundle.java:158) > at org.apache.crunch.io.FormatBundle.readFields(FormatBundle.java:133) > at org.apache.crunch.io.FormatBundle.fromSerialized(FormatBundle.java:62) > at org.apache.crunch.io.CrunchInputs.getFormatNodeMap(CrunchInputs.java:79) > at org.apache.crunch.impl.mr.run.CrunchInputFormat.getSplits(CrunchInputFormat.java:45) > at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) > at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > at scala.collection.immutable.List.foreach(List.scala:318) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > at scala.collection.AbstractTraversable.map(Traversable.scala:105) > at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) > at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) > at org.apache.spark.ShuffleDependency.(Dependency.scala:82) > at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80) > at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:206) > at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:204) > at scala.Option.getOrElse(Option.scala:120) > at org.apache.spark.rdd.RDD.dependencies(RDD.scala:204) > at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298) > at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310) > at org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244) > at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362) > at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) > at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) > Caused by: java.lang.ClassNotFoundException: Class org.apache.crunch.types.avro.AvroInputFormat not found > at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2018) > at org.apache.crunch.io.FormatBundle.readClass(FormatBundle.java:156) > ... 84 more > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)