From kpeng1 <>
Subject Passing around SparkContext with in the Driver
Date Wed, 04 Mar 2015 18:09:11 GMT
Hi All,

I am trying to create a class that wraps functionalities that I need; some
of these functions require access to the SparkContext, which I would like to
pass in.  I know that the SparkContext is not seralizable, and I am not
planning on passing it to worker nodes or anything, I just want to wrap some
functionalities that require SparkContext's api.  As a preface, I am
basically using the spark shell to test the functionality of my code at the
moment, so I am not sure if that plays into any of the issues I am having. 
Here is my current class:

class MyClass(sparkContext: SparkContext) {
  import org.apache.spark.sql._
  import org.apache.spark.rdd._

  val sqlContext = new SQLContext(sparkContext)

    "int" -> IntegerType,
    "double" -> DoubleType,
    "float" -> FloatType,
    "long" -> LongType,
    "short" -> ShortType,
    "binary" -> BinaryType,
    "bool" -> BooleanType,
    "byte" -> ByteType,
    "string" -> StringType)

  //removes the first line of a text file
  def removeHeader(partitionIdx: Int, fileItr: Iterator[String]):
Iterator[String] ={
    //header line is first line in first partition
    if(partitionIdx == 0){

  //returns back a StructType for the schema
  def getSchema(rawSchema: Array[String]): StructType ={
    //return backs a StructField
    def getSchemaFieldHelper(schemaField: String): StructField ={
      val schemaParts = schemaField.split(' ')
      StructField(schemaParts(0), DATA_TYPE_MAPPING(schemaParts(1)), true)

    val structFields = => getSchemaFieldHelper(column))

  def getRow(strRow: String): Row ={
    val spRow = strRow.split(',')
    val tRow =
  def applySchemaToCsv(csvFile: String, includeHeader: Boolean, schemaFile:
String): SchemaRDD ={
    //apply schema to rdd to create schemaRDD
    def createSchemaRDD(csvRDD: RDD[Row], schemaStruct: StructType):
SchemaRDD ={
      val schemaRDD = sqlContext.applySchema(csvRDD, schemaStruct)
    val rawSchema = sparkContext.textFile(schemaFile).collect
    val schema = getSchema(rawSchema)
    val rawCsvData = sparkContext.textFile(csvFile)
    //if we want to keep header from csv file
      val rowRDD = 
      val schemaRDD = createSchemaRDD(rowRDD, schema)
      return schemaRDD
    val csvData = rawCsvData.mapPartitionsWithIndex(removeHeader)
    val rowRDD =
    val schemaRDD = createSchemaRDD(rowRDD, schema)


So in the spark shell I am basically creating an instance of this class and
calling applySchemaToCsv like so:
val test = new MyClass(sc)
test.applySchemaToCsv("/tmp/myFile.csv", false, "/tmp/schema.txt")

What I am getting is not serializable exception:
15/03/04 09:40:56 INFO SparkContext: Created broadcast 2 from textFile at
org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
	at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:615)
Caused by:

If I remove the class wrapper and make references to sc directly everything
works.  I am basically wondering what is causing the serialization issues
and if I can wrap a class around these functions.

