spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vermaRaj90 <rajneesh.k1...@gmail.com>
Subject Spark checkpoining error when joining static dataset with DStream
Date Mon, 07 Sep 2015 10:53:20 GMT
I am trying to use spark streaming application in java. My spark application
reads continuous feed from hadoop directory using textFileStream() at
interval of each 1 Min. I need to perform spark aggregation(group by)
operation on incoming DStream. After aggregation, I am joining aggregated
DStream with RDD with RDD created from static dataset read by textFile()
from hadoop directory.

Problem comes when I enable check pointing. With empty checkpoint directory,
it runs fine. After running 2-3 batches I close it using ctrl+c and run it
again. On second run it throws spark exception immediately: "SPARK-5063"

/"Exception in thread "main" org.apache.spark.SparkException: RDD
transformations and actions can only be invoked by the driver, not inside of
other transformations; for example, rdd1.map(x => rdd2.values.count() * x)
is invalid because the values transformation and count action cannot be
performed inside of the rdd1.map transformation. For more information, see
SPARK-5063"/

*Following is the Block of Code of spark application:*

private void compute(JavaSparkContext sc, JavaStreamingContext ssc) {

   JavaRDD<String> distFile = sc.textFile(MasterFile);      
   JavaDStream<String> file = ssc.textFileStream(inputDir);             

   // Read Master file
   JavaRDD<MasterParseLog> masterLogLines =
distFile.flatMap(EXTRACT_MASTER_LOGLINES);
   final JavaPairRDD<String, String> masterRDD =
masterLogLines.mapToPair(MASTER_KEY_VALUE_MAPPER);

   // Continuous Streaming file
   JavaDStream<ParseLog> logLines = file.flatMap(EXTRACT_CKT_LOGLINES);

   // calculate the sum of required field and generate group sum RDD
   JavaPairDStream<String, Summary> sumRDD =
logLines.mapToPair(CKT_GRP_MAPPER);
   JavaPairDStream<String, Summary> grpSumRDD =
sumRDD.reduceByKey(CKT_GRP_SUM);

   //GROUP BY Operation
   JavaPairDStream<String, Summary> grpAvgRDD =
grpSumRDD.mapToPair(CKT_GRP_AVG);

   // Join Master RDD with the DStream  //This is the block causing error
(without it code is working fine)
   JavaPairDStream<String, Tuple2&lt;String, String>> joinedStream =
grpAvgRDD.transformToPair(

       new Function2<JavaPairRDD&lt;String, String>, Time,
JavaPairRDD<String, Tuple2&lt;String, String>>>() {

           private static final long serialVersionUID = 1L;

           public JavaPairRDD<String, Tuple2&lt;String, String>> call(
               JavaPairRDD<String, String> rdd, Time v2) throws Exception {
               return masterRDD.value().join(rdd);
           }
       }
   );
   joinedStream.print(10);
}

public static void main(String[] args) {

   JavaStreamingContextFactory contextFactory = new
JavaStreamingContextFactory() {
        public JavaStreamingContext create() {

           // Create the context with a 60 second batch size
           SparkConf sparkConf = new SparkConf();
           final JavaSparkContext sc = new JavaSparkContext(sparkConf);
           JavaStreamingContext ssc1 = new JavaStreamingContext(sc,
Durations.seconds(duration));               

           app.compute(sc, ssc1);

           ssc1.checkpoint(checkPointDir);                       
           return ssc1;
        }
   };

   JavaStreamingContext ssc =
JavaStreamingContext.getOrCreate(checkPointDir, contextFactory);

   // start the streaming server
   ssc.start();
   logger.info("Streaming server started...");

   // wait for the computations to finish
   ssc.awaitTermination();
   logger.info("Streaming server stopped...");
}

I know that block of code which joins static dataset with DStream is causing
error, But that is taken from spark-streaming page of Apache spark website
(sub heading "stream-dataset join" under "Join Operations"). Please help me
to get it working even if there is different way of doing it. I need to
enable checkpointing in my streaming application.

*Environment Details:* Centos6.5 :2 node Cluster Java :1.8 Spark :1.4.1
Hadoop :2.7.1

I have also posted same question on stackoverflow. you can have a look
http://stackoverflow.com/questions/32378296/spark-checkpoining-error-when-joining-static-dataset-with-dstream

Please help me to get it working.

Regards,
Rajneesh.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-checkpoining-error-when-joining-static-dataset-with-DStream-tp24590.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message