spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michael Quinlan <mq0...@gmail.com>
Subject Re: Spark SequenceFile Java API Repeat Key Values
Date Wed, 15 Jan 2014 01:50:23 GMT
Matei and Andrew,

Thank you both for your prompt responses. Matei is correct in that I am
attempting to cache a large RDD for repeated query.

I was able to implement your suggestion in a Scala version of the code,
which I've copied below. I should point out two minor details:
LongWritable.clone() is a private method and both the key and value need to
be "cloned" in order for the data to be cached correctly.

My attempt at a Java version wasn't as successful. If you don't mind, could
you please suggest a better way if it currently exists? This is mostly
educational since I already have a working version in Scala. I'm new to
both.

Regards,

Mike

Java:

public class App 
{
    public static void main(String[] args) throws Exception {
        if (args.length < 3) {
          System.err.println("Usage: SynthesisService <master> <input file>
<jar file>");
          System.exit(1);
        }
        
        System.setProperty("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
       
System.setProperty("spark.kryo.registrator","spark.synthesis.service.Init.MyRegistrator");

        JavaSparkContext ctx = new JavaSparkContext(args[0], 
            "SynthesisService",
            "~/spark-0.8.0-incubating",args[2]); 
        
        //Load DataCube via Spark sequenceFile
        JavaPairRDD<LongWritable,LongWritable> temp_DataCube =
ctx.sequenceFile(args[1], 
            LongWritable.class, LongWritable.class);
        
        JavaRDD<Tuple2&lt;LongWritable,LongWritable>> DataCube;
        DataCube = temp_DataCube.map(
                new
Function2<LongWritable,LongWritable,Tuple2&lt;LongWritable,LongWritable>> ()
{
                    @Override
                    public Tuple2<LongWritable,LongWritable> 
                    call(LongWritable key, LongWritable value) {
                        return (new Tuple2(new LongWritable(key.get()),
value));
                    }
                
                });

-----
COMPILATION ERROR : 
-------------------------------------------------------------
spark/synthesis/service/Init/App.java:[51,32] error: no suitable method
found for map(<anonymous
Function2<LongWritable,LongWritable,Tuple2<LongWritable,LongWritable>>>)
1 error

Scala:

package testspark

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.serializer.KryoRegistrator

import org.apache.hadoop.io.LongWritable

import com.esotericsoftware.kryo.Kryo

class MyRegistrator extends KryoRegistrator{
    def registerClasses(kryo: Kryo){
        kryo.register(classOf[LongWritable]);
        kryo.register(classOf[Tuple2[LongWritable,LongWritable]]);
    }
}

object ScalaSynthesisServer {
	
	def pseudoClone(x: LongWritable, y: LongWritable):
(LongWritable,LongWritable) = {
		return new Tuple2(new LongWritable(x.get()) , new LongWritable(y.get()))
	}
	
	def main(args: Array[String]) {
		if (args.length < 3) {
			System.err.println("Usage: ScalaSynthesisServer <master> <input file>
<jar file>")
			System.exit(1)
		}
		
		System.setProperty("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
		System.setProperty("spark.kryo.registrator","testspark.MyRegistrator")
	
		val sc = new SparkContext(args(0),
"ScalaSynthesisServer","~/spark-0.8.0-incubating",List(args(2)))
		
		val DataCube = sc.sequenceFile(args(1), classOf[LongWritable],
classOf[LongWritable]).map(a => pseudoClone(a._1,a._2))
		
		DataCube.cache()
		
		val list = DataCube.collect();
		
		var x = 0; 
		for( x <- list ){
			println("Key= " + x._1 + " Value= " + x._2);
		}
	}
}




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SequenceFile-Java-API-Repeat-Key-Values-tp353p552.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Mime
View raw message