spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gustavo Enrique Salazar Torres <gsala...@ime.usp.br>
Subject Re: LBGFS optimizer performace
Date Tue, 03 Mar 2015 22:27:27 GMT
Yeah, I can call count before that and it works. Also I was over caching
tables but I removed those. Now there is no caching but it gets really slow
since it calculates my table RDD many times.
Also hacked the LBFGS code to pass the number of examples which I
calculated outside in a Spark SQL query but just moved the location of the
problem.

The query I'm running looks like this:

s"SELECT $mappedFields, tableB.id as b_id FROM tableA LEFT JOIN tableB  ON
tableA.id=tableB.table_a_id WHERE tableA.status='' OR tableB.status='' "

mappedFields contains a list of fields which I'm interested in. The result
of that query goes through (including sampling) some transformations before
being input to LBFGS.

My dataset has 180GB just for feature selection, I'm planning to use 450GB
to train the final model and I'm using 16 c3.2xlarge EC2 instances, that
means I have 240GB of RAM available.

Any suggestion? I'm starting to check the algorithm because I don't
understand why it needs to count the dataset.

Thanks

Gustavo

On Tue, Mar 3, 2015 at 6:08 PM, Joseph Bradley <joseph@databricks.com>
wrote:

> Is that error actually occurring in LBFGS?  It looks like it might be
> happening before the data even gets to LBFGS.  (Perhaps the outer join
> you're trying to do is making the dataset size explode a bit.)  Are you
> able to call count() (or any RDD action) on the data before you pass it to
> LBFGS?
>
> On Tue, Mar 3, 2015 at 8:55 AM, Gustavo Enrique Salazar Torres <
> gsalazar@ime.usp.br> wrote:
>
>> Just did with the same error.
>> I think the problem is the "data.count()" call in LBFGS because for huge
>> datasets that's naive to do.
>> I was thinking to write my version of LBFGS but instead of doing
>> data.count() I will pass that parameter which I will calculate from a Spark
>> SQL query.
>>
>> I will let you know.
>>
>> Thanks
>>
>>
>> On Tue, Mar 3, 2015 at 3:25 AM, Akhil Das <akhil@sigmoidanalytics.com>
>> wrote:
>>
>>> Can you try increasing your driver memory, reducing the executors and
>>> increasing the executor memory?
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres <
>>> gsalazar@ime.usp.br> wrote:
>>>
>>>> Hi there:
>>>>
>>>> I'm using LBFGS optimizer to train a logistic regression model. The
>>>> code I implemented follows the pattern showed in
>>>> https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but
>>>> training data is obtained from a Spark SQL RDD.
>>>> The problem I'm having is that LBFGS tries to count the elements in my
>>>> RDD and that results in a OOM exception since my dataset is huge.
>>>> I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on Hadoop
>>>> YARN. My dataset is about 150 GB but I sample (I take only 1% of the data)
>>>> it in order to scale logistic regression.
>>>> The exception I'm getting is this:
>>>>
>>>> 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in
>>>> stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal):
>>>> java.lang.OutOfMemoryError: Java heap space
>>>>         at java.util.Arrays.copyOfRange(Arrays.java:2694)
>>>>         at java.lang.String.<init>(String.java:203)
>>>>         at com.esotericsoftware.kryo.io.Input.readString(Input.java:448)
>>>>         at
>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157)
>>>>         at
>>>> com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146)
>>>>         at
>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>         at
>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>>>>         at
>>>> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>>>>         at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651)
>>>>         at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
>>>>         at
>>>> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>>>>         at
>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>         at
>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42)
>>>>         at
>>>> com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33)
>>>>         at
>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732)
>>>>         at
>>>> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144)
>>>>         at
>>>> org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>>>>         at
>>>> org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>>>>         at
>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>         at
>>>> scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
>>>>         at
>>>> org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
>>>>         at
>>>> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>>>>         at
>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>         at
>>>> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>>>>         at org.apache.spark.sql.execution.joins.HashOuterJoin.org
>>>> $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179)
>>>>         at
>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199)
>>>>         at
>>>> org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196)
>>>>         at
>>>> org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:247)
>>>>         at
>>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>>         at
>>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280)
>>>>
>>>> I'm using this parameters at runtime:
>>>> --num-executors 128 --executor-memory 1G --driver-memory 4G
>>>> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
>>>> --conf spark.storage.memoryFraction=0.2
>>>>
>>>> I also persist my dataset using MEMORY_AND_DISK_SER but get the same
>>>> error.
>>>> I will appreciate any help on this problem. I have been trying to solve
>>>> it for days and I'm running out of time and hair.
>>>>
>>>> Thanks
>>>> Gustavo
>>>>
>>>
>>>
>>
>

Mime
View raw message