spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kpeng1 <kpe...@gmail.com>
Subject Passing around SparkContext with in the Driver
Date Wed, 04 Mar 2015 18:09:11 GMT
Hi All,

I am trying to create a class that wraps functionalities that I need; some
of these functions require access to the SparkContext, which I would like to
pass in.  I know that the SparkContext is not seralizable, and I am not
planning on passing it to worker nodes or anything, I just want to wrap some
functionalities that require SparkContext's api.  As a preface, I am
basically using the spark shell to test the functionality of my code at the
moment, so I am not sure if that plays into any of the issues I am having. 
Here is my current class:

class MyClass(sparkContext: SparkContext) {
  import org.apache.spark.sql._
  import org.apache.spark.rdd._

  val sqlContext = new SQLContext(sparkContext)

  val DATA_TYPE_MAPPING = Map(
    "int" -> IntegerType,
    "double" -> DoubleType,
    "float" -> FloatType,
    "long" -> LongType,
    "short" -> ShortType,
    "binary" -> BinaryType,
    "bool" -> BooleanType,
    "byte" -> ByteType,
    "string" -> StringType)

  //removes the first line of a text file
  def removeHeader(partitionIdx: Int, fileItr: Iterator[String]):
Iterator[String] ={
    //header line is first line in first partition
    if(partitionIdx == 0){
      fileItr.drop(1)
    }
    fileItr
  }

  //returns back a StructType for the schema
  def getSchema(rawSchema: Array[String]): StructType ={
    //return backs a StructField
    def getSchemaFieldHelper(schemaField: String): StructField ={
      val schemaParts = schemaField.split(' ')
      StructField(schemaParts(0), DATA_TYPE_MAPPING(schemaParts(1)), true)
    }

    val structFields = rawSchema.map(column => getSchemaFieldHelper(column))
    StructType(structFields)
  }

  def getRow(strRow: String): Row ={
    val spRow = strRow.split(',')
    val tRow = spRow.map(_.trim)
    Row(tRow:_*)
  }
  def applySchemaToCsv(csvFile: String, includeHeader: Boolean, schemaFile:
String): SchemaRDD ={
    //apply schema to rdd to create schemaRDD
    def createSchemaRDD(csvRDD: RDD[Row], schemaStruct: StructType):
SchemaRDD ={
      val schemaRDD = sqlContext.applySchema(csvRDD, schemaStruct)
      schemaRDD
    }
          
    val rawSchema = sparkContext.textFile(schemaFile).collect
    val schema = getSchema(rawSchema)
          
    val rawCsvData = sparkContext.textFile(csvFile)
      
    //if we want to keep header from csv file
    if(includeHeader){
      val rowRDD = rawCsvData.map(getRow) 
      val schemaRDD = createSchemaRDD(rowRDD, schema)
      return schemaRDD
    }
     
    val csvData = rawCsvData.mapPartitionsWithIndex(removeHeader)
    val rowRDD = csvData.map(getRow)
    val schemaRDD = createSchemaRDD(rowRDD, schema)
    schemaRDD
  }

}

So in the spark shell I am basically creating an instance of this class and
calling applySchemaToCsv like so:
val test = new MyClass(sc)
test.applySchemaToCsv("/tmp/myFile.csv", false, "/tmp/schema.txt")

What I am getting is not serializable exception:
15/03/04 09:40:56 INFO SparkContext: Created broadcast 2 from textFile at
<console>:62
org.apache.spark.SparkException: Task not serializable
	at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
	at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
	at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:615)
       .
       .
       .
Caused by: java.io.NotSerializableException:


If I remove the class wrapper and make references to sc directly everything
works.  I am basically wondering what is causing the serialization issues
and if I can wrap a class around these functions.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Passing-around-SparkContext-with-in-the-Driver-tp21913.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message