[ https://issues.apache.org/jira/browse/CRUNCH-560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14904633#comment-14904633
]
Nithin Asokan commented on CRUNCH-560:
--------------------------------------
If I understood correctly, we need to have one hadoop configuration and it should include
the proper classloader.
First attempt:
Copy {{sparkHadoopConfiguration}} to crunch pipeline {{conf}} and use {{conf}} on SparkRuntime.
Results of this approach are ClassNotFoundException mentioned in jira description.
{code}
Configuration sparkHadoopConfiguration = sparkContext.hadoopConfiguration();
copyConfiguration(sparkHadoopConfiguration, conf);
setConfiguration(conf);
SparkRuntime runtime = new SparkRuntime(this, sparkContext, conf, outputTargets,
toMaterialize, cachedCollections, allPipelineCallables);
{code}
Second attempt:
Copy crunch pipeline {{conf}} to {{sparkHadoopConfiguration}} and use {{sparkHadoopConfiguration}}
on SparkRuntime. Pipeline successful on {{yarn-cluster}} mode from oozie spark action.
{code}
Configuration sparkHadoopConfiguration = sparkContext.hadoopConfiguration();
copyConfiguration(conf, sparkHadoopConfiguration);
setConfiguration(sparkHadoopConfiguration);
SparkRuntime runtime = new SparkRuntime(this, sparkContext, sparkHadoopConfiguration,
outputTargets,
toMaterialize, cachedCollections, allPipelineCallables);
{code}
Please suggest me if I'm doing anything different than what is expected.
> SparkPipeline should honor Spark Hadoop configuration
> -----------------------------------------------------
>
> Key: CRUNCH-560
> URL: https://issues.apache.org/jira/browse/CRUNCH-560
> Project: Crunch
> Issue Type: Bug
> Components: Spark
> Reporter: Nithin Asokan
> Attachments: CRUNCH-560-001.patch
>
>
> Executing a SparkPipeline using {{SparkPipeline(String sparkConnect, String appName,
Class<?> jarClass, Configuration conf)}} constructor and {{yarn-cluster}} mode via Oozie
Spark action causes a ClassNotFoundException during job creation. The problem appears to be
Spark not being able to read Crunch InputFormats from Hadoop configuration.
> {code}
> 15/09/18 00:06:39 WARN scheduler.DAGScheduler: Creating new stage failed due to exception
- job: 0
> java.lang.RuntimeException: readObject can't find class
> at org.apache.crunch.io.FormatBundle.readClass(FormatBundle.java:158)
> at org.apache.crunch.io.FormatBundle.readFields(FormatBundle.java:133)
> at org.apache.crunch.io.FormatBundle.fromSerialized(FormatBundle.java:62)
> at org.apache.crunch.io.CrunchInputs.getFormatNodeMap(CrunchInputs.java:79)
> at org.apache.crunch.impl.mr.run.CrunchInputFormat.getSplits(CrunchInputFormat.java:45)
> at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:95)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
> at org.apache.spark.rdd.UnionRDD$$anonfun$1.apply(UnionRDD.scala:66)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> at org.apache.spark.rdd.UnionRDD.getPartitions(UnionRDD.scala:66)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:32)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:217)
> at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:82)
> at org.apache.spark.rdd.ShuffledRDD.getDependencies(ShuffledRDD.scala:80)
> at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:206)
> at org.apache.spark.rdd.RDD$$anonfun$dependencies$2.apply(RDD.scala:204)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.rdd.RDD.dependencies(RDD.scala:204)
> at org.apache.spark.scheduler.DAGScheduler.visit$1(DAGScheduler.scala:298)
> at org.apache.spark.scheduler.DAGScheduler.getParentStages(DAGScheduler.scala:310)
> at org.apache.spark.scheduler.DAGScheduler.newStage(DAGScheduler.scala:244)
> at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:731)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1362)
> at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> Caused by: java.lang.ClassNotFoundException: Class org.apache.crunch.types.avro.AvroInputFormat
not found
> at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2018)
> at org.apache.crunch.io.FormatBundle.readClass(FormatBundle.java:156)
> ... 84 more
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
|