Try allocating some more resources to your application.
You seem to be using 512Mb for you worker node - (you can verify that from the master UI)

Try putting the following settings into your code and see if it helps - 

System.setProperty("spark.executor.memory","15g")   // Will allocate more memory
System.setProperty("spark.akka.frameSize","2000")
System.setProperty("spark.akka.threads","16")           // Dependent upon number of cores with your worker machine


On Fri, Dec 6, 2013 at 1:06 AM, learner1014 all <learner1014@gmail.com> wrote:
Hi,

Trying to do a join operation on an RDD, my input is pipe delimited data and there are 2 files.
One file is 24MB and the other file is 285MB.
Setup being used is the single node (server) setup: SPARK_MEM set to 512m

Master
/pkg/java/jdk1.7.0_11/bin/java -cp :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Dspark.boundedMemoryCache.memoryFraction=0.4 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip localhost --port 7077 --webui-port 8080

Worker
/pkg/java/jdk1.7.0_11/bin/java -cp :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Dspark.boundedMemoryCache.memoryFraction=0.4 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://localhost:7077


App
/pkg/java/jdk1.7.0_11/bin/java -cp :/spark-0.8.0-incubating-bin-cdh4/conf:/spark-0.8.0-incubating-bin-cdh4/assembly/target/scala-2.9.3/spark-assembly-0.8.0-incubating-hadoop1.2.1.jar:/spark-0.8.0-incubating-bin-cdh4/core/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/repl/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/mllib/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/bagel/target/scala-2.9.3/test-classes:/spark-0.8.0-incubating-bin-cdh4/streaming/target/scala-2.9.3/test-classes -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps -Dspark.boundedMemoryCache.memoryFraction=0.4 -Dspark.cache.class=spark.DiskSpillingCache -XX:+UseConcMarkSweepGC  -Xms512M -Xmx512M org.apache.spark.executor.StandaloneExecutorBackend akka://spark@localhost:33024/user/StandaloneScheduler 1 localhost 4


Here is the code
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.storage.StorageLevel

object SimpleApp {

      def main (args: Array[String]) {

      System.setProperty("spark.local.dir","/spark-0.8.0-incubating-bin-cdh4/tmp");
      System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      System.setProperty("spark.akka.timeout", "30")  //in seconds

      val dataFile2 = "/tmp_data/data1.txt"
      val dataFile1 = "/tmp_data/data2.txt"
      val sc = new SparkContext("spark://localhost:7077", "Simple App", "/spark-0.8.0-incubating-bin-cdh4",
      List("target/scala-2.9.3/simple-project_2.9.3-1.0.jar"))

      val data10 = sc.textFile(dataFile1, 128)
      val data11 = data10.map(x => x.split("|"))
      val data12 = data11.map( x  =>  (x(1).toInt -> x) )


      val data20 = sc.textFile(dataFile2, 128)
      val data21 = data20.map(x => x.split("|"))
      val data22 = data21.map(x => (x(1).toInt -> x))


      val data3 = data12.join(data22, 128)
      val data4 = data3.distinct(4)
      val numAs = data10.count()
      val numBs = data20.count()
      val numCs = data3.count()
      val numDs = data4.count()
      println("Lines in 1: %s, Lines in 2: %s Lines in 3: %s Lines in 4: %s".format(numAs, numBs, numCs, numDs))
      data4.foreach(println)
}

I see the following errors
13/12/04 10:53:55 WARN storage.BlockManagerMaster: Error sending message to BlockManagerMaster in 1 attempts
java.util.concurrent.TimeoutException: Futures timed out after [10000] milliseconds
        at akka.dispatch.DefaultPromise.ready(Future.scala:870)
        at akka.dispatch.DefaultPromise.result(Future.scala:874)
        at akka.dispatch.Await$.result(Future.scala:74)

and
13/12/04 10:53:55 ERROR executor.Executor: Exception in task ID 517
java.lang.OutOfMemoryError: Java heap space
        at com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:282)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$StringArraySerializer.read(DefaultArraySerializers.java:262)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
        at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:106)
        at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:101)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

Lots of hem actually...


To give some additional information, i just added single columns in both files and passed them through this program and encountered the same issue.
Out of memory and other errors.

What did work was removal of the following lines:

      val data21 = data20.map(x => x.split("|"))
      val data22 = data21.map(x => (x(1).toInt -> x))

which were replaced by:
      val data22 = data20.map(x => (x.toInt -> x))

However as soon as i add additional columns this is of-course not going to work.
So can someone explain this and any suggestions are most welcome.
Any help is helpful.
Thanks