spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Benjamin Kim <bbuil...@gmail.com>
Subject Spark 1.6 Streaming with Checkpointing
Date Fri, 26 Aug 2016 20:54:34 GMT
I am trying to implement checkpointing in my streaming application but I am getting a not serializable
error. Has anyone encountered this? I am deploying this job in YARN clustered mode.

Here is a snippet of the main parts of the code.

object S3EventIngestion {
    //create and setup streaming context
    def createContext(
        batchInterval: Integer, checkpointDirectory: String, awsS3BucketName: String, databaseName:
String, tableName: String, partitionByColumnName: String
    ): StreamingContext = {

        println("Creating new context")
        val sparkConf = new SparkConf().setAppName("S3EventIngestion")
        val sc = new SparkContext(sparkConf)
        val sqlContext = new SQLContext(sc)

        // Create the streaming context with batch interval
        val ssc = new StreamingContext(sc, Seconds(batchInterval))
        
        // Create a text file stream on an S3 bucket
        val csv = ssc.textFileStream("s3a://" + awsS3BucketName + "/")

        csv.foreachRDD(rdd => {
            if (!rdd.partitions.isEmpty) {
                // process data
            }
        })

        ssc.checkpoint(checkpointDirectory)
        ssc
    }

    def main(args: Array[String]) {
        if (args.length != 6) {
            System.err.println("Usage: S3EventIngestion <batch-interval> <checkpoint-directory>
<s3-bucket-name> <database-name> <table-name> <partition-by-column-name>")
            System.exit(1)
        }

        // Get streaming context from checkpoint data or create a new one
        val context = StreamingContext.getOrCreate(checkpoint,
            () => createContext(interval, checkpoint, bucket, database, table, partitionBy))

        //start streaming context
        context.start()
        context.awaitTermination()
    }
}

Can someone help please?

Thanks,
Ben
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message