spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nan Zhu <zhunanmcg...@gmail.com>
Subject ship MatrixFactorizationModel with each partition?
Date Wed, 08 Jan 2014 03:23:48 GMT
Hi, all  

I ‘m trying the ALS in mllib

the following is my code

val result = als.run(ratingRDD)
    val allMovies = ratingRDD.map(rating => rating.product).distinct()
    val allUsers = ratingRDD.map(rating => rating.user).distinct()
    val allUserMoviePair = allUsers.cartesian(allMovies)
    val resultRDD = allUserMoviePair.map(userMoviePair => {
      var str = ""
      str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
        result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
      str
    })



every time result.predict throws exception like  

scala.MatchError: null at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507)
at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42)
at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72) at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69)
at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
at org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)  

if I change the code to pull the partitions into an array in the driver program, it works

val resultRDD = allUserMoviePair.collect().map(userMoviePair => {
      var str = ""
      str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
        result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
      str
    })


so the exception seems to be related to how to share the MatrixFactorizationModel  in each
partition?

can anyone give me the hint

Thank you very much!

--  
Nan Zhu


Mime
View raw message