I am trying to implement checkpointing in my streaming application but I am getting a not serializable
error. Has anyone encountered this? I am deploying this job in YARN clustered mode.
Here is a snippet of the main parts of the code.
object S3EventIngestion {
//create and setup streaming context
def createContext(
batchInterval: Integer, checkpointDirectory: String, awsS3BucketName: String, databaseName:
String, tableName: String, partitionByColumnName: String
): StreamingContext = {
println("Creating new context")
val sparkConf = new SparkConf().setAppName("S3EventIngestion")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
// Create the streaming context with batch interval
val ssc = new StreamingContext(sc, Seconds(batchInterval))
// Create a text file stream on an S3 bucket
val csv = ssc.textFileStream("s3a://" + awsS3BucketName + "/")
csv.foreachRDD(rdd => {
if (!rdd.partitions.isEmpty) {
// process data
}
})
ssc.checkpoint(checkpointDirectory)
ssc
}
def main(args: Array[String]) {
if (args.length != 6) {
System.err.println("Usage: S3EventIngestion <batch-interval> <checkpoint-directory>
<s3-bucket-name> <database-name> <table-name> <partition-by-column-name>")
System.exit(1)
}
// Get streaming context from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpoint,
() => createContext(interval, checkpoint, bucket, database, table, partitionBy))
//start streaming context
context.start()
context.awaitTermination()
}
}
Can someone help please?
Thanks,
Ben
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org
|