mahout-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rohit Jain <rohitkjai...@gmail.com>
Subject RowSimilakrity : NotSerializableException
Date Sat, 07 May 2016 10:05:14 GMT
hello everyone,

I want to run Spark RowSimilarity recommender on data obtained from
mongodb. For this purpose, I've written below code which takes input from
mongo, converts it to RDD of Objects. This needs to be passed to
IndexedDataSetSpark which is then passed to SimilarityAnalysis.
rowSimilarityIDS.

import org.apache.hadoop.conf.Configuration
import org.apache.mahout.math.cf.SimilarityAnalysis
import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import org.apache.spark.rdd.{NewHadoopRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}
import org.bson.BSONObject
import com.mongodb.hadoop.MongoInputFormat

object SparkExample extends App {
  val mongoConfig = new Configuration()
  mongoConfig.set("mongo.input.uri",
"mongodb://my_mongo_ip:27017/db.collection")

  val sparkConf = new SparkConf()
  val sc = new SparkContext("local", "SparkExample", sparkConf)

  val documents: RDD[(Object, BSONObject)] = sc.newAPIHadoopRDD(
    mongoConfig,
    classOf[MongoInputFormat],
    classOf[Object],
    classOf[BSONObject]
  )
  val new_doc: RDD[(String, String)] = documents.map(
    doc1 => (
    doc1._2.get("product_id").toString(),
    doc1._2.get("product_attribute_value").toString().replace("[ \"",
"").replace("\"]", "").split("\" , \"").map(value =>
value.toLowerCase.replace(" ", "-")).mkString(" ")
    )
  )
  var myIDs = IndexedDatasetSpark(new_doc)(sc)


SimilarityAnalysis.rowSimilarityIDS(myIDs).dfsWrite("hdfs://myhadoop:9000/myfile",
readWriteSchema)

after runnning the code I am getiing this error:
java.io.NotSerializableException: org.apache.mahout.math.DenseVector
Serialization stack:
- object not serializable (class: org.apache.mahout.math.DenseVector,
value: {3:1.0,8:1.0,10:1.0})
- field (class: scala.Some, name: x, type: class java.lang.Object)
- object (class scala.Some, Some({3:1.0,8:1.0,10:1.0}))
at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:240)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Please help me with this.


-- 
Thanks & Regards,

*Rohit Jain*
Web developer | Consultant
Mob +91 8097283931

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message