spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aaron Davidson (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-4868) Twitter DStream.map() throws "Task not serializable"
Date Wed, 17 Dec 2014 06:06:13 GMT

    [ https://issues.apache.org/jira/browse/SPARK-4868?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14249525#comment-14249525
] 

Aaron Davidson commented on SPARK-4868:
---------------------------------------

The repl has funny namespacing going on, what happens if you put your no-op function inside
of an object, a la
object Test {
  def noop(a: Any) = {}
}
and then invoke it statically? Make sure to restart the repl in between attempts to avoid
polluted namespaces as well.

If that doesn't work you can always mark the streaming context as transient, but that never
feels quite right.

> Twitter DStream.map() throws "Task not serializable"
> ----------------------------------------------------
>
>                 Key: SPARK-4868
>                 URL: https://issues.apache.org/jira/browse/SPARK-4868
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Shell, Streaming
>    Affects Versions: 1.1.1
>         Environment: * Spark 1.1.1
> * EC2 cluster with 1 slave spun up using {{spark-ec2}}
> * twitter4j 3.0.3
> * {{spark-shell}} called with {{--jars}} argument to load {{spark-streaming-twitter_2.10-1.0.0.jar}}
as well as all the twitter4j jars.
>            Reporter: Nicholas Chammas
>            Priority: Minor
>
> _(Continuing the discussion [started here on the Spark user list|http://apache-spark-user-list.1001560.n3.nabble.com/NotSerializableException-in-Spark-Streaming-td5725.html].)_
> The following Spark Streaming code throws a serialization exception I do not understand.
> {code}
> import twitter4j.auth.{Authorization, OAuthAuthorization}
> import twitter4j.conf.ConfigurationBuilder 
> import org.apache.spark.streaming.{Seconds, Minutes, StreamingContext}
> import org.apache.spark.streaming.twitter.TwitterUtils
> def getAuth(): Option[Authorization] = {
>   System.setProperty("twitter4j.oauth.consumerKey", "consumerKey")
>   System.setProperty("twitter4j.oauth.consumerSecret", "consumerSecret")
>   System.setProperty("twitter4j.oauth.accessToken", "accessToken") 
>   System.setProperty("twitter4j.oauth.accessTokenSecret", "accessTokenSecret")
>   Some(new OAuthAuthorization(new ConfigurationBuilder().build()))
> } 
> def noop(a: Any): Any = {
>   a
> }
> val ssc = new StreamingContext(sc, Seconds(5))
> val liveTweetObjects = TwitterUtils.createStream(ssc, getAuth())
> val liveTweets = liveTweetObjects.map(_.getText)
> liveTweets.map(t => noop(t)).print()  // exception here
> ssc.start()
> {code}
> So before I even start the StreamingContext, I get the following stack trace:
> {code}
> scala> liveTweets.map(t => noop(t)).print()
> org.apache.spark.SparkException: Task not serializable
> 	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
> 	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
> 	at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
> 	at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
> 	at $iwC$$iwC$$iwC$$iwC.<init>(<console>:27)
> 	at $iwC$$iwC$$iwC.<init>(<console>:32)
> 	at $iwC$$iwC.<init>(<console>:34)
> 	at $iwC.<init>(<console>:36)
> 	at <init>(<console>:38)
> 	at .<init>(<console>:42)
> 	at .<clinit>(<console>)
> 	at .<init>(<console>:7)
> 	at .<clinit>(<console>)
> 	at $print(<console>)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
> 	at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
> 	at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
> 	at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
> 	at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
> 	at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:823)
> 	at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:868)
> 	at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:780)
> 	at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:625)
> 	at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:633)
> 	at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:638)
> 	at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:963)
> 	at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
> 	at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:911)
> 	at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
> 	at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:911)
> 	at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1006)
> 	at org.apache.spark.repl.Main$.main(Main.scala:31)
> 	at org.apache.spark.repl.Main.main(Main.scala)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:606)
> 	at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:329)
> 	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
> 	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> 	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> 	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> 	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> 	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> 	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> 	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
> 	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
> 	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
> 	... 43 more
> {code}
> What I'm really trying to do is use Spark Streaming via the interactive shell to filter
Tweets using a trained KMeans model. I got errors trying that, and I boiled it down to this
repro.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message