spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Qiang Wang (Jira)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-28913) ArrayIndexOutOfBoundsException in ALS for datasets with 12 billion instances
Date Thu, 29 Aug 2019 11:59:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-28913?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Qiang Wang updated SPARK-28913:
-------------------------------
    Description: 
The stack trace is below:
{quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 BlockManager: Block
rdd_10916_493 could not be removed as it was not found on disk or in memory 19/08/28 07:00:41
ERROR Executor task launch worker for task 325074 Executor: Exception in task 3.0 in stage
347.1 (TID 325074) java.lang.ArrayIndexOutOfBoundsException: 6741 at org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460)
at org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141)
at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{quote}
This exception happened sometimes. 

Dataset capacity: ~12 billion ratings
 Here is the our code:
{code:java}
val hivedata = sc.sql(sqltext).select(id,dpid,score).coalesce(numPartitions)
val predataItem =  hivedata.rdd.map(r=>(r._1._1,(r._1._2,r._2.sum)))
  .groupByKey().zipWithIndex()
  .persist(StorageLevel.MEMORY_AND_DISK_SER)
val predataUser = predataItem.flatMap(r=>r._1._2.map(y=>(y._1,(r._2.toInt,y._2))))
  .aggregateByKey(zeroValueArr,numPartitions)((a,b)=> a += b,(a,b)=>a ++ b).map(r=>(r._1,r._2.toIterable))
  .zipWithIndex().persist(StorageLevel.MEMORY_AND_DISK_SER)
//x._2 is the item_id, y._1 is the user_id, y._2 is the rating
val trainData = predataUser.flatMap(x => x._1._2.map(y => (x._2.toInt, y._1, y._2.toFloat)))
  .setName(trainDataName).persist(StorageLevel.MEMORY_AND_DISK_SER)

case class ALSData(user:Int, item:Int, rating:Float) extends Serializable
val ratingData = trainData.map(x => ALSData(x._1, x._2, x._3)).toDF()
    val als = new ALS
    val paramMap = ParamMap(als.alpha -> 25000).
      put(als.checkpointInterval, 5).
      put(als.implicitPrefs, true).
      put(als.itemCol, "item").
      put(als.maxIter, 60).
      put(als.nonnegative, false).
      put(als.numItemBlocks, 600).
      put(als.numUserBlocks, 600).
      put(als.regParam, 4.5).
      put(als.rank, 25).
      put(als.userCol, "user")
    als.fit(ratingData, paramMap)
{code}

  was:
The stack trace is below:
{quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 BlockManager: Block
rdd_10916_493 could not be removed as it was not found on disk or in memory 19/08/28 07:00:41
ERROR Executor task launch worker for task 325074 Executor: Exception in task 3.0 in stage
347.1 (TID 325074) java.lang.ArrayIndexOutOfBoundsException: 6741 at org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460)
at org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141)
at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
{quote}
This happened sometimes. Dataset capacity: ~12 billion ratings
Here is the our code:
{code:java}

val hivedata = sc.sql(sqltext).select(id,dpid,score).coalesce(numPartitions)
val predataItem =  hivedata.rdd.map(r=>(r._1._1,(r._1._2,r._2.sum)))
  .groupByKey().zipWithIndex()
  .persist(StorageLevel.MEMORY_AND_DISK_SER)
val predataUser = predataItem.flatMap(r=>r._1._2.map(y=>(y._1,(r._2.toInt,y._2))))
  .aggregateByKey(zeroValueArr,numPartitions)((a,b)=> a += b,(a,b)=>a ++ b).map(r=>(r._1,r._2.toIterable))
  .zipWithIndex().persist(StorageLevel.MEMORY_AND_DISK_SER)
//x._2 is the item_id, y._1 is the user_id, y._2 is the rating
val trainData = predataUser.flatMap(x => x._1._2.map(y => (x._2.toInt, y._1, y._2.toFloat)))
  .setName(trainDataName).persist(StorageLevel.MEMORY_AND_DISK_SER)

case class ALSData(user:Int, item:Int, rating:Float) extends Serializable
val ratingData = trainData.map(x => ALSData(x._1, x._2, x._3)).toDF()
    val als = new ALS
    val paramMap = ParamMap(als.alpha -> 25000).
      put(als.checkpointInterval, 5).
      put(als.implicitPrefs, true).
      put(als.itemCol, "item").
      put(als.maxIter, 60).
      put(als.nonnegative, false).
      put(als.numItemBlocks, 600).
      put(als.numUserBlocks, 600).
      put(als.regParam, 4.5).
      put(als.rank, 25).
      put(als.userCol, "user")
    als.fit(ratingData, paramMap)
{code}


> ArrayIndexOutOfBoundsException in ALS for datasets  with 12 billion instances
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-28913
>                 URL: https://issues.apache.org/jira/browse/SPARK-28913
>             Project: Spark
>          Issue Type: Bug
>          Components: MLlib
>    Affects Versions: 2.2.1
>            Reporter: Qiang Wang
>            Assignee: Xiangrui Meng
>            Priority: Major
>
> The stack trace is below:
> {quote}19/08/28 07:00:40 WARN Executor task launch worker for task 325074 BlockManager:
Block rdd_10916_493 could not be removed as it was not found on disk or in memory 19/08/28
07:00:41 ERROR Executor task launch worker for task 325074 Executor: Exception in task 3.0
in stage 347.1 (TID 325074) java.lang.ArrayIndexOutOfBoundsException: 6741 at org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1460)
at org.apache.spark.dpshade.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1440)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$40$$anonfun$apply$41.apply(PairRDDFunctions.scala:760)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1041)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1032)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:972) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1032)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:763) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285) at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:141)
at org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:137) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:137) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:358)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
> {quote}
> This exception happened sometimes. 
> Dataset capacity: ~12 billion ratings
>  Here is the our code:
> {code:java}
> val hivedata = sc.sql(sqltext).select(id,dpid,score).coalesce(numPartitions)
> val predataItem =  hivedata.rdd.map(r=>(r._1._1,(r._1._2,r._2.sum)))
>   .groupByKey().zipWithIndex()
>   .persist(StorageLevel.MEMORY_AND_DISK_SER)
> val predataUser = predataItem.flatMap(r=>r._1._2.map(y=>(y._1,(r._2.toInt,y._2))))
>   .aggregateByKey(zeroValueArr,numPartitions)((a,b)=> a += b,(a,b)=>a ++ b).map(r=>(r._1,r._2.toIterable))
>   .zipWithIndex().persist(StorageLevel.MEMORY_AND_DISK_SER)
> //x._2 is the item_id, y._1 is the user_id, y._2 is the rating
> val trainData = predataUser.flatMap(x => x._1._2.map(y => (x._2.toInt, y._1, y._2.toFloat)))
>   .setName(trainDataName).persist(StorageLevel.MEMORY_AND_DISK_SER)
> case class ALSData(user:Int, item:Int, rating:Float) extends Serializable
> val ratingData = trainData.map(x => ALSData(x._1, x._2, x._3)).toDF()
>     val als = new ALS
>     val paramMap = ParamMap(als.alpha -> 25000).
>       put(als.checkpointInterval, 5).
>       put(als.implicitPrefs, true).
>       put(als.itemCol, "item").
>       put(als.maxIter, 60).
>       put(als.nonnegative, false).
>       put(als.numItemBlocks, 600).
>       put(als.numUserBlocks, 600).
>       put(als.regParam, 4.5).
>       put(als.rank, 25).
>       put(als.userCol, "user")
>     als.fit(ratingData, paramMap)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message