Is that error actually occurring in LBFGS?  It looks like it might be happening before the data even gets to LBFGS.  (Perhaps the outer join you're trying to do is making the dataset size explode a bit.)  Are you able to call count() (or any RDD action) on the data before you pass it to LBFGS?

On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres <gsalazar@ime.usp.br> wrote:
Just did with the same error. 
I think the problem is the "data.count()" call in LBFGS because for huge datasets that's naive to do.
I was thinking to write my version of LBFGS but instead of doing data.count() I will pass that parameter which I will calculate from a Spark SQL query.

I will let you know.

Thanks


On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das <akhil@sigmoidanalytics.com> wrote:
Can you try increasing your driver memory, reducing the executors and increasing the executor memory?

Thanks
Best Regards

On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres <gsalazar@ime.usp.br> wrote:
Hi there:

I'm using LBFGS optimizer to train a logistic regression model. The code I implemented follows the pattern showed in https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but training data is obtained from a Spark SQL RDD.
The problem I'm having is that LBFGS tries to count the elements in my RDD and that results in a OOM exception since my dataset is huge.
I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on Hadoop YARN. My dataset is about 150 GB but I sample (I take only 1% of the data) it in order to scale logistic regression.
The exception I'm getting is this:

15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal): java.lang.OutOfMemoryError: Java heap space
        at java.util.Arrays.copyOfRange(Arrays.java:2694)
        at java.lang.String.<init>(String.java:203)
        at com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
        at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
        at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
        at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
        at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
        at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
        at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
        at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
        at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
        at org.apache.spark.sql.execution.joins.HashOuterJoin.org$apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179)
        at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199)
        at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196)
        at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)

I'm using this parameters at runtime:
--num-executors 128 --executor-memory 1G --driver-memory 4G 
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
--conf spark.storage.memoryFraction=0.2 

I also persist my dataset using MEMORY_AND_DISK_SER but get the same error.
I will appreciate any help on this problem. I have been trying to solve it for days and I'm running out of time and hair.

Thanks
Gustavo