spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ilya Ganelin <ilgan...@gmail.com>
Subject Re: MLLib ALS ArrayIndexOutOfBoundsException with Scala Spark 1.1.0
Date Tue, 28 Oct 2014 15:50:18 GMT
Hi all - I've simplified the code so now I'm literally feeding in 200
million ratings directly to ALS.train. Nothing else is happening in the
program.
I've also tried with both the regular serializer and the KryoSerializer.
With Kryo, I get the same ArrayIndex exceptions.

With the regular serializer I get the following error stack:

14/10/28 10:43:14 WARN TaskSetManager: Lost task 119.0 in stage 10.0 (TID
2282, innovationdatanode07.cof.ds.capitalone.com):
java.io.FileNotFoundException:
/opt/cloudera/hadoop/1/yarn/nm/usercache/zjb238/appcache/application_1414016059040_0715/spark-local-20141028102246-dde7/06/shuffle_7_119_8
(No such file or directory)
        java.io.FileOutputStream.open(Native Method)
        java.io.FileOutputStream.<init>(FileOutputStream.java:221)

org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)

org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)

org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:67)

org.apache.spark.shuffle.hash.HashShuffleWriter$$anonfun$write$1.apply(HashShuffleWriter.scala:65)
        scala.collection.Iterator$class.foreach(Iterator.scala:727)
        scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

org.apache.spark.shuffle.hash.HashShuffleWriter.write(HashShuffleWriter.scala:65)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
        org.apache.spark.scheduler.Task.run(Task.scala:54)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:180)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:745)
14/10/28 10:43:14 INFO TaskSetManager: Starting task 119.1 in stage 10.0
(TID 2303, innovationdatanode07.cof.ds.capitalone.com, PROCESS_LOCAL, 5642
bytes)
14/10/28 10:43:14 WARN TaskSetManager: Lost task 119.1 in stage 10.0 (TID
2303, innovationdatanode07.cof.ds.capitalone.com):
java.io.FileNotFoundException:
/opt/cloudera/hadoop/1/yarn/nm/usercache/zjb238/appcache/application_1414016059040_0715/spark-local-20141028102246-dde7/23/shuffle_8_90_119
(No such file or directory)
        java.io.RandomAccessFile.open(Native Method)
        java.io.RandomAccessFile.<init>(RandomAccessFile.java:241)
        org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:93)
        org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:116)

org.apache.spark.shuffle.FileShuffleBlockManager.getBytes(FileShuffleBlockManager.scala:190)

org.apache.spark.storage.BlockManager.getLocalShuffleFromDisk(BlockManager.scala:361)

org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1$$anonfun$apply$10.apply(BlockFetcherIterator.scala:208)

org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1$$anonfun$apply$10.apply(BlockFetcherIterator.scala:208)

org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:258)

org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator.next(BlockFetcherIterator.scala:77)
.....

This is an issue I referenced in the past here:
https://www.google.com/url?sa=t&rct=j&q=&esrc=s&source=web&cd=1&cad=rja&uact=8&ved=0CB4QFjAA&url=https%3A%2F%2Fmail-archives.apache.org%2Fmod_mbox%2Fincubator-spark-user%2F201410.mbox%2F%253CCAM-S9zS-%2B-MSXVcohWEhjiAEKaCccOKr_N5e0HPXcNgnxZd%3DHw%40mail.gmail.com%253E&ei=97FPVIfyCsbgsASL94CoDQ&usg=AFQjCNEQ6gUlwpr6KzlcZVd0sQeCSdjQgQ&sig2=Ne7pL_Z94wN4g9BwSutsXQ

-Ilya Ganelin

On Mon, Oct 27, 2014 at 6:12 PM, Xiangrui Meng <mengxr@gmail.com> wrote:

> Could you save the data before ALS and try to reproduce the problem?
> You might try reducing the number of partitions and not using Kryo
> serialization, just to narrow down the issue. -Xiangrui
>
> On Mon, Oct 27, 2014 at 1:29 PM, Ilya Ganelin <ilganeli@gmail.com> wrote:
> > Hi Burak.
> >
> > I always see this error. I'm running the CDH 5.2 version of Spark 1.1.0.
> I
> > load my data from HDFS. By the time it hits the recommender it had gone
> > through many spark operations.
> >
> > On Oct 27, 2014 4:03 PM, "Burak Yavuz" <byavuz@stanford.edu> wrote:
> >>
> >> Hi,
> >>
> >> I've come across this multiple times, but not in a consistent manner. I
> >> found it hard to reproduce. I have a jira for it: SPARK-3080
> >>
> >> Do you observe this error every single time? Where do you load your data
> >> from? Which version of Spark are you running?
> >> Figuring out the similarities may help in pinpointing the bug.
> >>
> >> Thanks,
> >> Burak
> >>
> >> ----- Original Message -----
> >> From: "Ilya Ganelin" <ilganeli@gmail.com>
> >> To: "user" <user@spark.apache.org>
> >> Sent: Monday, October 27, 2014 11:36:46 AM
> >> Subject: MLLib ALS ArrayIndexOutOfBoundsException with Scala Spark 1.1.0
> >>
> >> Hello all - I am attempting to run MLLib's ALS algorithm on a
> substantial
> >> test vector - approx. 200 million records.
> >>
> >> I have resolved a few issues I've had with regards to garbage
> collection,
> >> KryoSeralization, and memory usage.
> >>
> >> I have not been able to get around this issue I see below however:
> >>
> >>
> >> > java.lang.
> >> > ArrayIndexOutOfBoundsException: 6106
> >> >
> >> >
> >> >
> org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateBlock$1.apply$mcVI$sp(ALS.
> >> > scala:543)
> >> >
>  scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
> >> >         org.apache.spark.mllib.recommendation.ALS.org
> >> > $apache$spark$mllib$recommendation$ALS$$updateBlock(ALS.scala:537)
> >> >
> >> >
> >> >
> org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:505)
> >> >
> >> >
> >> >
> org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mllib$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:504)
> >> >
> >> >
> >> >
> org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31)
> >> >
> >> >
> >> >
> org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValuesRDD.scala:31)
> >> >         scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> >> >         scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> >> >
> >> >
> >> >
> org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:144)
> >> >
> >> >
> >> >
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:159)
> >> >
> >> >
> >> >
> org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.scala:158)
> >> >
> >> >
> >> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> >> >
> >> >
> >> >
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> >> >
> >> > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> >> >
> >> >
> >> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> >> >
> >> > org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
> >> >
>  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >> >         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> >> >
> >> > org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
> >> >
>  org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
> >> >         org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
> >>
> >>
> >> I do not have any negative indices or indices that exceed Int-Max.
> >>
> >> I have partitioned the input data into 300 partitions and my Spark
> config
> >> is below:
> >>
> >> .set("spark.executor.memory", "14g")
> >>       .set("spark.storage.memoryFraction", "0.8")
> >>       .set("spark.serializer",
> >> "org.apache.spark.serializer.KryoSerializer")
> >>       .set("spark.kryo.registrator", "MyRegistrator")
> >>       .set("spark.core.connection.ack.wait.timeout","600")
> >>       .set("spark.akka.frameSize","50")
> >>       .set("spark.yarn.executor.memoryOverhead","1024")
> >>
> >> Does anyone have any suggestions as to why i'm seeing the above error or
> >> how to get around it?
> >> It may be possible to upgrade to the latest version of Spark but the
> >> mechanism for doing so in our environment isn't obvious yet.
> >>
> >> -Ilya Ganelin
> >>
> >
>

Mime
View raw message