Hi All,
I am trying to create a class that wraps functionalities that I need; some
of these functions require access to the SparkContext, which I would like to
pass in. I know that the SparkContext is not seralizable, and I am not
planning on passing it to worker nodes or anything, I just want to wrap some
functionalities that require SparkContext's api. As a preface, I am
basically using the spark shell to test the functionality of my code at the
moment, so I am not sure if that plays into any of the issues I am having.
Here is my current class:
class MyClass(sparkContext: SparkContext) {
import org.apache.spark.sql._
import org.apache.spark.rdd._
val sqlContext = new SQLContext(sparkContext)
val DATA_TYPE_MAPPING = Map(
"int" -> IntegerType,
"double" -> DoubleType,
"float" -> FloatType,
"long" -> LongType,
"short" -> ShortType,
"binary" -> BinaryType,
"bool" -> BooleanType,
"byte" -> ByteType,
"string" -> StringType)
//removes the first line of a text file
def removeHeader(partitionIdx: Int, fileItr: Iterator[String]):
Iterator[String] ={
//header line is first line in first partition
if(partitionIdx == 0){
fileItr.drop(1)
}
fileItr
}
//returns back a StructType for the schema
def getSchema(rawSchema: Array[String]): StructType ={
//return backs a StructField
def getSchemaFieldHelper(schemaField: String): StructField ={
val schemaParts = schemaField.split(' ')
StructField(schemaParts(0), DATA_TYPE_MAPPING(schemaParts(1)), true)
}
val structFields = rawSchema.map(column => getSchemaFieldHelper(column))
StructType(structFields)
}
def getRow(strRow: String): Row ={
val spRow = strRow.split(',')
val tRow = spRow.map(_.trim)
Row(tRow:_*)
}
def applySchemaToCsv(csvFile: String, includeHeader: Boolean, schemaFile:
String): SchemaRDD ={
//apply schema to rdd to create schemaRDD
def createSchemaRDD(csvRDD: RDD[Row], schemaStruct: StructType):
SchemaRDD ={
val schemaRDD = sqlContext.applySchema(csvRDD, schemaStruct)
schemaRDD
}
val rawSchema = sparkContext.textFile(schemaFile).collect
val schema = getSchema(rawSchema)
val rawCsvData = sparkContext.textFile(csvFile)
//if we want to keep header from csv file
if(includeHeader){
val rowRDD = rawCsvData.map(getRow)
val schemaRDD = createSchemaRDD(rowRDD, schema)
return schemaRDD
}
val csvData = rawCsvData.mapPartitionsWithIndex(removeHeader)
val rowRDD = csvData.map(getRow)
val schemaRDD = createSchemaRDD(rowRDD, schema)
schemaRDD
}
}
So in the spark shell I am basically creating an instance of this class and
calling applySchemaToCsv like so:
val test = new MyClass(sc)
test.applySchemaToCsv("/tmp/myFile.csv", false, "/tmp/schema.txt")
What I am getting is not serializable exception:
15/03/04 09:40:56 INFO SparkContext: Created broadcast 2 from textFile at
<console>:62
org.apache.spark.SparkException: Task not serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:615)
.
.
.
Caused by: java.io.NotSerializableException:
If I remove the class wrapper and make references to sc directly everything
works. I am basically wondering what is causing the serialization issues
and if I can wrap a class around these functions.
--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Passing-around-SparkContext-with-in-the-Driver-tp21913.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org
|