spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Saurabh Agrawal <saurabh.agra...@markit.com>
Subject RE: Writing collection to file error
Date Mon, 24 Nov 2014 15:02:15 GMT

Thanks for your help Akhil, however, this is creating an output folder and storing the result
sets in multiple files. Also the record count in  the result set seems to have multiplied!!
Is there any other way to achieve this?

Thanks!!

Regards,
Saurabh Agrawal
Vice President

Markit

Green Boulevard
B-9A, Tower C
3rd Floor, Sector - 62,
Noida 201301, India
+91 120 611 8274 Office

From: Akhil Das [mailto:akhil@sigmoidanalytics.com]
Sent: Monday, November 24, 2014 5:55 PM
To: Saurabh Agrawal
Cc: user@spark.apache.org
Subject: Re: Writing collection to file error

Hi Saurabh,

Here your ratesAndPreds is a RDD of type [((int, int), (Double, Double))] not an Array. Now,
if you want to save it on disk, then you can simply call the saveAsTextFile and provide the
location.

So change your last line from this:

ratesAndPreds.foreach(pw.println)

to this

ratesAndPreds.saveAsTextFile("/path/CFOutput")




Thanks
Best Regards

On Mon, Nov 24, 2014 at 5:05 PM, Saurabh Agrawal <saurabh.agrawal@markit.com<mailto:saurabh.agrawal@markit.com>>
wrote:
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating

// Load and parse the data
val data = sc.textFile("/path/CFReady.txt")
val ratings = data.map(_.split('\t') match { case Array(user, item, rate) =>
    Rating(user.toInt, item.toInt, rate.toDouble)
  })

// Build the recommendation model using ALS
val rank = 50
val numIterations = 100
val model = ALS.train(ratings, rank, numIterations, 0.10)

// Evaluate the model on rating data
val usersProducts = ratings.map { case Rating(user, product, rate) =>
  (user, product)
}
val predictions =
  model.predict(usersProducts).map { case Rating(user, product, rate) =>
    ((user, product), rate)
  }
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
  ((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
  val err = (r1 - r2)
  err * err
}.mean()
println("Mean Squared Error = " + MSE)

val pw = new PrintWriter(new File("/path/CFOutput.txt"))

ratesAndPreds.foreach(pw.println)
}


Hi,

Consider the highlighted code, I am trying to write the output of ratesAndPreds array on to
the disk. But I get error

Task not serializable
                at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
                at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
                at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
                at org.apache.spark.rdd.RDD.foreach(RDD.scala:758)
                at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
                at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41)
                at $iwC$$iwC$$iwC$$iwC.<init>(<console>:43)
                at $iwC$$iwC$$iwC.<init>(<console>:45)
                at $iwC$$iwC.<init>(<console>:47)
                at $iwC.<init>(<console>:49)
                at <init>(<console>:51)
                at .<init>(<console>:55)
                at .<clinit>(<console>)
                at .<init>(<console>:7)
                at .<clinit>(<console>)
                at $print(<console>)
                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.lang.reflect.Method.invoke(Method.java:606)
                at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
                at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
                at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
                at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
                at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
                at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
                at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
                at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
                at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
                at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
                at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
                at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
                at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
                at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
                at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
                at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
                at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
                at org.apache.spark.repl.Main$.main(Main.scala:31)
                at org.apache.spark.repl.Main.main(Main.scala)
                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.lang.reflect.Method.invoke(Method.java:606)
                at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
                at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
                at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: java.io.PrintWriter
                at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
                at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
                at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
                at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
                at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
                at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
                at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
                at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
                at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
                at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
                at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
                at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
                at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
                at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
                at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
                at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
                at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
                at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
                at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
                at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
                at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
                at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
                at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
                at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
                at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
                at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
                at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
                at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
                at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
                at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
                at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
                at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
                at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
                at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
                at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
                at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
                at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
                at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
                at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
                at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
                at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
                at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
                at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
                at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
                at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
                ... 45 more

I am very new to Spark and Scala, can somebody please guide me here?

Thanks,
Saurabh

________________________________
This e-mail, including accompanying communications and attachments, is strictly confidential
and only for the intended recipient. Any retention, use or disclosure not expressly authorised
by Markit is prohibited. This email is subject to all waivers and other terms at the following
link: http://www.markit.com/en/about/legal/email-disclaimer.page

Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information
on our offices worldwide.

MarkitSERV Limited has its registered office located at Level 4, Ropemaker Place, 25 Ropemaker
Street, London, EC2Y 9LY and is authorized and regulated by the Financial Conduct Authority
with registration number 207294


________________________________
This e-mail, including accompanying communications and attachments, is strictly confidential
and only for the intended recipient. Any retention, use or disclosure not expressly authorised
by Markit is prohibited. This email is subject to all waivers and other terms at the following
link: http://www.markit.com/en/about/legal/email-disclaimer.page

Please visit http://www.markit.com/en/about/contact/contact-us.page? for contact information
on our offices worldwide.

MarkitSERV Limited has its registered office located at Level 4, Ropemaker Place, 25 Ropemaker
Street, London, EC2Y 9LY and is authorized and regulated by the Financial Conduct Authority
with registration number 207294
Mime
View raw message