From vermaRaj90 <>
Subject Spark checkpoining error when joining static dataset with DStream
Date Mon, 07 Sep 2015 10:53:20 GMT
I am trying to use spark streaming application in java. My spark application
reads continuous feed from hadoop directory using textFileStream() at
interval of each 1 Min. I need to perform spark aggregation(group by)
operation on incoming DStream. After aggregation, I am joining aggregated
DStream with RDD with RDD created from static dataset read by textFile()
from hadoop directory.

Problem comes when I enable check pointing. With empty checkpoint directory,
it runs fine. After running 2-3 batches I close it using ctrl+c and run it
again. On second run it throws spark exception immediately: "SPARK-5063"

/"Exception in thread "main" org.apache.spark.SparkException: RDD
transformations and actions can only be invoked by the driver, not inside of
other transformations; for example, => rdd2.values.count() * x)
is invalid because the values transformation and count action cannot be
performed inside of the transformation. For more information, see

*Following is the Block of Code of spark application:*

private void compute(JavaSparkContext sc, JavaStreamingContext ssc) {

   JavaRDD<String> distFile = sc.textFile(MasterFile);      
   JavaDStream<String> file = ssc.textFileStream(inputDir);             

   // Read Master file
   JavaRDD<MasterParseLog> masterLogLines =
   final JavaPairRDD<String, String> masterRDD =

   // Continuous Streaming file
   JavaDStream<ParseLog> logLines = file.flatMap(EXTRACT_CKT_LOGLINES);

   // calculate the sum of required field and generate group sum RDD
   JavaPairDStream<String, Summary> sumRDD =
   JavaPairDStream<String, Summary> grpSumRDD =

   //GROUP BY Operation
   JavaPairDStream<String, Summary> grpAvgRDD =

   // Join Master RDD with the DStream  //This is the block causing error
(without it code is working fine)
   JavaPairDStream<String, Tuple2&lt;String, String>> joinedStream =

       new Function2<JavaPairRDD&lt;String, String>, Time,
JavaPairRDD<String, Tuple2&lt;String, String>>>() {

           private static final long serialVersionUID = 1L;

           public JavaPairRDD<String, Tuple2&lt;String, String>> call(
               JavaPairRDD<String, String> rdd, Time v2) throws Exception {
               return masterRDD.value().join(rdd);

public static void main(String[] args) {

   JavaStreamingContextFactory contextFactory = new
JavaStreamingContextFactory() {
        public JavaStreamingContext create() {

           // Create the context with a 60 second batch size
           SparkConf sparkConf = new SparkConf();
           final JavaSparkContext sc = new JavaSparkContext(sparkConf);
           JavaStreamingContext ssc1 = new JavaStreamingContext(sc,

           app.compute(sc, ssc1);

           return ssc1;

   JavaStreamingContext ssc =
JavaStreamingContext.getOrCreate(checkPointDir, contextFactory);

   // start the streaming server
   ssc.start();"Streaming server started...");

   // wait for the computations to finish
   ssc.awaitTermination();"Streaming server stopped...");

I know that block of code which joins static dataset with DStream is causing
error, But that is taken from spark-streaming page of Apache spark website
(sub heading "stream-dataset join" under "Join Operations"). Please help me
to get it working even if there is different way of doing it. I need to
enable checkpointing in my streaming application.

*Environment Details:* Centos6.5 :2 node Cluster Java :1.8 Spark :1.4.1
Hadoop :2.7.1

I have also posted same question on stackoverflow. you can have a look

Please help me to get it working.


