spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xiangrui Meng (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-3990) kryo.KryoException caused by ALS.trainImplicit in pyspark
Date Fri, 17 Oct 2014 22:59:33 GMT

    [ https://issues.apache.org/jira/browse/SPARK-3990?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14175671#comment-14175671
] 

Xiangrui Meng commented on SPARK-3990:
--------------------------------------

In PySpark 1.1, we switched to Kryo serialization by default. However, ALS code requires special
registration with Kryo in order to work. The error happens when there is not enough memory
and ALS needs to store ratings or in/out blocks to disk. I will work on this issue. For now,
the workaround is to use a cluster with enough memory.

> kryo.KryoException caused by ALS.trainImplicit in pyspark
> ---------------------------------------------------------
>
>                 Key: SPARK-3990
>                 URL: https://issues.apache.org/jira/browse/SPARK-3990
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib, PySpark
>    Affects Versions: 1.1.0
>         Environment: 5 slaves cluster(m3.large) in AWS launched by spark-ec2
> Linux
> Python 2.6.8
>            Reporter: Gen TANG
>              Labels: test
>
> When we tried ALS.trainImplicit() in pyspark environment, it only works for iterations
= 1. What is more strange, it is that if we try the same code in Scala, it works very well.(I
did several test, by now, in Scala ALS.trainImplicit works)
> For example, the following code:
> {code:title=test.py|borderStyle=solid}
>   r1 = (1, 1, 1.0) 
>   r2 = (1, 2, 2.0) 
>   r3 = (2, 1, 2.0) 
>   ratings = sc.parallelize([r1, r2, r3]) 
>   model = ALS.trainImplicit(ratings, 1) 
> '''by default iterations = 5 or model = ALS.trainImplicit(ratings, 1, 2)'''
> {code}
> It will cause the failed stage at count at ALS.scala:314 Info as:
> {code:title=error information provided by ganglia}
> Job aborted due to stage failure: Task 6 in stage 90.0 failed 4 times, most recent failure:
Lost task 6.3 in stage 90.0 (TID 484, ip-172-31-35-238.ec2.internal): com.esotericsoftware.kryo.KryoException:
java.lang.ArrayStoreException: scala.collection.mutable.HashSet
> Serialization trace:
> shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)
>         com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
>         com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
>         com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>         com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
>         com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
>         com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>         org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
>         org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
>         org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
>         org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
>         scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>         org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
>         org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
>         org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
>         scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
>         org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
>         org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
>         org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>         org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>         java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         java.lang.Thread.run(Thread.java:745)
> Driver stacktrace:
> {code}
> In the log of slave which failed the task, it has:
> {code:title=error information in the log of slave}
> 14/10/17 13:20:54 ERROR executor.Executor: Exception in task 6.0 in stage 90.0 (TID 465)
> com.esotericsoftware.kryo.KryoException: java.lang.ArrayStoreException: scala.collection.mutable.HashSet
> Serialization trace:
> shouldSend (org.apache.spark.mllib.recommendation.OutLinkBlock)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
> 	at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:43)
> 	at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:34)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
> 	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
> 	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133)
> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:137)
> 	at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
> 	at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
> 	at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> 	at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> 	at org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> 	at org.apache.spark.rdd.FlatMappedValuesRDD.compute(FlatMappedValuesRDD.scala:31)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> 	at org.apache.spark.rdd.FlatMappedRDD.compute(FlatMappedRDD.scala:33)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> 	at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:54)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ArrayStoreException: scala.collection.mutable.HashSet
> 	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
> 	at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
> 	at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
> 	at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
> 	... 36 more
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message