spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Nan Zhu <zhunanmcg...@gmail.com>
Subject Re: ship MatrixFactorizationModel with each partition?
Date Wed, 08 Jan 2014 06:13:12 GMT
great  

thank you Matei  

--  
Nan Zhu



On Wednesday, January 8, 2014 at 12:33 AM, Matei Zaharia wrote:

> Sorry, you actually can’t call predict() on the cluster because the model contains
some RDDs. There was a recent patch that added a parallel predict method, here: https://github.com/apache/incubator-spark/pull/328/files.
You can grab the code from that method there (which does a join) and call that yourself in
Spark 0.8.x.
>  
> Matei
>  
> On Jan 7, 2014, at 10:23 PM, Nan Zhu <zhunanmcgill@gmail.com (mailto:zhunanmcgill@gmail.com)>
wrote:
> > Hi, all  
> >  
> > I ‘m trying the ALS in mllib
> >  
> > the following is my code
> >  
> > val result = als.run(ratingRDD)
> >     val allMovies = ratingRDD.map(rating => rating.product).distinct()
> >     val allUsers = ratingRDD.map(rating => rating.user).distinct()
> >     val allUserMoviePair = allUsers.cartesian(allMovies)
> >     val resultRDD = allUserMoviePair.map(userMoviePair => {
> >       var str = ""
> >       str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
> >         result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
> >       str
> >     })
> >  
> >  
> >  
> > every time result.predict throws exception like  
> >  
> > scala.MatchError: null at org.apache.spark.rdd.PairRDDFunctions.lookup(PairRDDFunctions.scala:507)
at org.apache.spark.mllib.recommendation.MatrixFactorizationModel.predict(MatrixFactorizationModel.scala:42)
at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:72) at algorithm.SparkALS$$anonfun$23.apply(SparkALS.scala:69)
at scala.collection.Iterator$$anon$19.next(Iterator.scala:401) at scala.collection.Iterator$$anon$19.next(Iterator.scala:401)
at org.apache.spark.rdd.PairRDDFunctions.writeToFile$1(PairRDDFunctions.scala:677) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$2.apply(PairRDDFunctions.scala:686)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:107) at org.apache.spark.scheduler.Task.run(Task.scala:53)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:215)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:50) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:182)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)  
> >  
> > if I change the code to pull the partitions into an array in the driver program,
it works
> >  
> > val resultRDD = allUserMoviePair.collect().map(userMoviePair => {
> >       var str = ""
> >       str += (userMoviePair._1 + ","  + userMoviePair._2 + "," +
> >         result.predict(userMoviePair._1, userMoviePair._2)) + "\n"
> >       str
> >     })
> >  
> >  
> > so the exception seems to be related to how to share the MatrixFactorizationModel
 in each partition?
> >  
> > can anyone give me the hint
> >  
> > Thank you very much!
> >  
> > --  
> > Nan Zhu
> >  
>  


Mime
View raw message