spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yadid Ayzenberg <ya...@media.mit.edu>
Subject Re: java.io.NotSerializableException on RDD count() in Java
Date Tue, 05 Nov 2013 13:04:48 GMT
Ah, I see. Thanks very much for you assistance Patrick and Reynold.
As a workaround for now, I implemented the SC field as transient and its 
working fine.

Yadid


On 11/3/13 9:05 PM, Reynold Xin wrote:
> Yea so every inner class actually contains a field referencing the 
> outer class. In your case, the anonymous class DoubleFlatMapFunction 
> actually has a this$0 field referencing the outer 
> class AnalyticsEngine, and thus why Spark is trying to 
> serialize AnalyticsEngine.
>
> In the Scala API, the closure (which is really just implemented as 
> anonymous classes) has a field called "$outer", and Spark uses a 
> "closure cleaner" that goes into the anonymous class to remove the 
> $outer field if it is not used in the closure itself. In Java, the 
> compiler generates a field called "this$0", and thus the closure 
> cleaner doesn't find it and can't "clean" it properly.
>
> I will work on a fix for the closure cleaner to clean this up as well. 
> Meantime, you can work around this by either defining your anonymous 
> class as a static class, or mark the JavaSparkContext field as transient.
>
>
>
> On Sun, Nov 3, 2013 at 7:32 PM, Patrick Wendell <pwendell@gmail.com 
> <mailto:pwendell@gmail.com>> wrote:
>
>     Hm, I think you are triggering a bug in the Java API where closures
>     may not be properly cleaned. I think @rxin has reproduced this,
>     deferring to him.
>
>     - Patrick
>
>     On Sun, Nov 3, 2013 at 5:25 PM, Yadid Ayzenberg
>     <yadid@media.mit.edu <mailto:yadid@media.mit.edu>> wrote:
>     > code is below. in the code rdd.count() works, but rdd2.count()
>     fails.
>     >
>     > public class AnalyticsEngine  implements Serializable {
>     >
>     >     private static AnalyticsEngine engine=null;
>     >     private JavaSparkContext sc;
>     >
>     >     final Logger logger =
>     LoggerFactory.getLogger(AnalyticsEngine.class);
>     >     private Properties prop;
>     >
>     >     String db_host;
>     >
>     >     private AnalyticsEngine()
>     >     {
>     >         System.setProperty("spark.serializer",
>     > "org.apache.spark.serializer.KryoSerializer");
>     > System.setProperty("spark.kryo.registrator",
>     > "edu.mit.bsense.MyRegistrator");
>     >         sc = new JavaSparkContext("local[4]","TestSpark");
>     >         Properties prop = new Properties();
>     >         try {
>     >             prop.load(new FileInputStream("config.properties"));
>     >
>     >
>     >             db_host = prop.getProperty("database_host1");
>     > logger.info <http://logger.info>("Database host: {}", db_host);
>     >         }  catch (FileNotFoundException ex)
>     >                 {
>     > logger.info <http://logger.info>("Could not read
>     config.properties: " +
>     > ex.toString());
>     >
>     >                 } catch (IOException ex)
>     >                 {
>     > logger.info <http://logger.info>("Could not read
>     config.properties: " +
>     > ex.toString());
>     >
>     >                 }
>     >
>     >
>     >
>     >         public void getData(void)
>     >         {
>     >         Configuration conf = new Configuration();
>     >
>     >         String conf_url = "mongodb://" + db_host +
>     "/test.data1"; //this is
>     > the data partition
>     >         conf.set("mongo.input.uri", conf_url);
>     >
>     >
>     >         conf.set("mongo.input.query",
>     > "{\"streamId\":\""+"13"+"\"},{\"data\":1}");
>     >         conf.set("mongo.input.split_size","64");
>     >
>     >         JavaPairRDD<Object,BSONObject> rdd =
>     sc.newAPIHadoopRDD(conf,
>     > MongoInputFormat.class, Object.class, BSONObject.class);
>     >
>     >         rdd.cache();
>     >
>     > logger.info <http://logger.info>("Count of rdd: {}", rdd.count());
>     >
>     > logger.info
>     <http://logger.info>("==========================================================================");
>     >
>     >
>     >
>     >         JavaDoubleRDD rdd2 =  rdd.flatMap( new
>     > DoubleFlatMapFunction<Tuple2<Object, BSONObject>>() {
>     >
>     >         @Override
>     >         public Iterable<Double> call(Tuple2<Object, BSONObject>
e) {
>     >           BSONObject doc = e._2();
>     >           BasicDBList vals = (BasicDBList)doc.get("data");
>     >
>     >           List<Double> results = new ArrayList<Double>();
>     >           for (int i=0; i< vals.size();i++ )
>     > results.add((Double)((BasicDBList)vals.get(i)).get(0));
>     >
>     >           return results;
>     >
>     >         }
>     >         });
>     >
>     > logger.info <http://logger.info>("Take: {}", rdd2.take(100));
>     > logger.info <http://logger.info>("Count: {}", rdd2.count());
>     >
>     >
>     >     }
>     >
>     >     }
>     >
>     >
>     > On 11/3/13 8:19 PM, Patrick Wendell wrote:
>     >>
>     >> Thanks that would help. This would be consistent with there being a
>     >> reference to the SparkContext itself inside of the closure.
>     Just want
>     >> to make sure that's not the case.
>     >>
>     >> On Sun, Nov 3, 2013 at 5:13 PM, Yadid Ayzenberg
>     <yadid@media.mit.edu <mailto:yadid@media.mit.edu>>
>     >> wrote:
>     >>>
>     >>> Im running in local[4] mode - so there are no slave machines.
>     Full stack
>     >>> trace:
>     >>>
>     >>>
>     >>> (run-main) org.apache.spark.SparkException: Job failed:
>     >>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>     >>> org.apache.spark.SparkException: Job failed:
>     >>> java.io.NotSerializableException: edu.mit.bsense.AnalyticsEngine
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>     >>>      at
>     >>>
>     >>>
>     scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>     >>>      at
>     >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>     >>>      at
>     >>>
>     >>> org.apache.spark.scheduler.DAGScheduler.org
>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>     >>>      at
>     >>>
>     >>> org.apache.spark.scheduler.DAGScheduler.org
>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>     >>>      at
>     >>>
>     >>> org.apache.spark.scheduler.DAGScheduler.org
>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>     >>> [debug]     Thread run-main exited.
>     >>> [debug] Interrupting remaining threads (should be all daemons).
>     >>> [debug] Sandboxed run complete..
>     >>> java.lang.RuntimeException: Nonzero exit code: 1
>     >>>      at scala.sys.package$.error(package.scala:27)
>     >>>      at
>     sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>     >>>      at
>     sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>     >>>      at scala.Option.foreach(Option.scala:236)
>     >>>      at sbt.BuildCommon$class.toError(Defaults.scala:1628)
>     >>>      at sbt.Defaults$.toError(Defaults.scala:34)
>     >>>      at
>     >>>
>     >>>
>     sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
>     >>>      at
>     >>>
>     >>>
>     sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
>     >>>      at
>     scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>     >>>      at
>     >>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
>     >>>      at sbt.std.Transform$$anon$4.work(System.scala:64)
>     >>>      at
>     >>>
>     sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>     >>>      at
>     >>>
>     sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>     >>>      at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
>     >>>      at sbt.Execute.work(Execute.scala:244)
>     >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>     >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>     >>>      at
>     >>>
>     >>>
>     sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
>     >>>      at
>     sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
>     >>>      at
>     >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>     >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>     >>>      at
>     >>>
>     java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>     >>>      at
>     >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>     >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>     >>>      at
>     >>>
>     >>>
>     java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>     >>>      at
>     >>>
>     >>>
>     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>     >>>      at java.lang.Thread.run(Thread.java:695)
>     >>>
>     >>> when I add implements Serializable to my class, I get the
>     following stack
>     >>> trace:
>     >>>
>     >>> error] (run-main) org.apache.spark.SparkException: Job failed:
>     >>> java.io.NotSerializableException:
>     >>> org.apache.spark.api.java.JavaSparkContext
>     >>> org.apache.spark.SparkException: Job failed:
>     >>> java.io.NotSerializableException:
>     >>> org.apache.spark.api.java.JavaSparkContext
>     >>>
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>     >>>      at
>     >>>
>     >>>
>     scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>     >>>      at
>     >>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>     >>>      at
>     >>>
>     >>> org.apache.spark.scheduler.DAGScheduler.org
>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>     >>>      at
>     >>>
>     >>> org.apache.spark.scheduler.DAGScheduler.org
>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>     >>>      at
>     >>>
>     >>> org.apache.spark.scheduler.DAGScheduler.org
>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>     >>>      at
>     >>>
>     >>>
>     org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>     >>> [debug]     Thread run-main exited.
>     >>> [debug] Interrupting remaining threads (should be all daemons).
>     >>> [debug] Sandboxed run complete..
>     >>> java.lang.RuntimeException: Nonzero exit code: 1
>     >>>      at scala.sys.package$.error(package.scala:27)
>     >>>      at
>     sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>     >>>      at
>     sbt.BuildCommon$$anonfun$toError$1.apply(Defaults.scala:1628)
>     >>>      at scala.Option.foreach(Option.scala:236)
>     >>>      at sbt.BuildCommon$class.toError(Defaults.scala:1628)
>     >>>      at sbt.Defaults$.toError(Defaults.scala:34)
>     >>>      at
>     >>>
>     >>>
>     sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:647)
>     >>>      at
>     >>>
>     >>>
>     sbt.Defaults$$anonfun$runTask$1$$anonfun$apply$36$$anonfun$apply$37.apply(Defaults.scala:645)
>     >>>      at
>     scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
>     >>>      at
>     >>> sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:42)
>     >>>      at sbt.std.Transform$$anon$4.work(System.scala:64)
>     >>>      at
>     >>>
>     sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>     >>>      at
>     >>>
>     sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:237)
>     >>>      at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:18)
>     >>>      at sbt.Execute.work(Execute.scala:244)
>     >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>     >>>      at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:237)
>     >>>      at
>     >>>
>     >>>
>     sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:160)
>     >>>      at
>     sbt.CompletionService$$anon$2.call(CompletionService.scala:30)
>     >>>      at
>     >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>     >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>     >>>      at
>     >>>
>     java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439)
>     >>>      at
>     >>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
>     >>>      at java.util.concurrent.FutureTask.run(FutureTask.java:138)
>     >>>      at
>     >>>
>     >>>
>     java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
>     >>>      at
>     >>>
>     >>>
>     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
>     >>>      at java.lang.Thread.run(Thread.java:695)
>     >>>
>     >>> I can post my code if that helps
>     >>>
>     >>>
>     >>>
>     >>> On 11/3/13 8:05 PM, Patrick Wendell wrote:
>     >>>>
>     >>>> If you look in the UI, are there failures on any of the
>     slaves that
>     >>>> you can give a  stack trace for? That would narrow down where the
>     >>>> serialization error is happening.
>     >>>>
>     >>>> Unfortunately this code path doesn't print a full stack trace
>     which
>     >>>> makes it harder to debug where the serialization error comes
>     from.
>     >>>>
>     >>>> Could you post all of your code?
>     >>>>
>     >>>> Also, just wondering, what happens if you just go ahead and add
>     >>>> "extends Serializable" to AnalyticsEngine class? It's
>     possible this is
>     >>>> happening during closure serialization, which will use the
>     closure
>     >>>> serializer (which is by default Java).
>     >>>>
>     >>>> - Patrick
>     >>>>
>     >>>> On Sun, Nov 3, 2013 at 4:49 PM, Yadid Ayzenberg
>     <yadid@media.mit.edu <mailto:yadid@media.mit.edu>>
>     >>>> wrote:
>     >>>>>
>     >>>>> yes, I tried that as well (it is currently registered with
>     Kryo)-
>     >>>>> although
>     >>>>> it doesnt make sense to me (and doesnt solve the problem). I
>     also made
>     >>>>> sure
>     >>>>> my registration was running:
>     >>>>> DEBUG org.apache.spark.serializer.KryoSerializer  - Running
user
>     >>>>> registrator: edu.mit.bsense.MyRegistrator
>     >>>>> 7841 [spark-akka.actor.default-dispatcher-3] DEBUG
>     >>>>> org.apache.spark.serializer.KryoSerializer  - Running user
>     registrator:
>     >>>>> edu.mit.bsense.MyRegistrator
>     >>>>>
>     >>>>> edu.mit.bsense.AnalyticsEngine is the class containing the
>     SC which
>     >>>>> instantiates the RDDs and runs the map() and count().
>     >>>>> Can you explain why it needs to be serialized?
>     >>>>>
>     >>>>> Also, when running count() on my original RDD (pre map) I
>     get the right
>     >>>>> answer - this means the classes of data in the RDD are
>     serializable.
>     >>>>> It's only when I run map, and then count() on a new RDD do I
>     get this
>     >>>>> exception. My map does not introduce any new classes it -
>     just iterates
>     >>>>> over
>     >>>>> the existing data.
>     >>>>>
>     >>>>> Any ideas?
>     >>>>>
>     >>>>>
>     >>>>>
>     >>>>>
>     >>>>>
>     >>>>>
>     >>>>>
>     >>>>>
>     >>>>>
>     >>>>> On 11/3/13 7:43 PM, Patrick Wendell wrote:
>     >>>>>>
>     >>>>>> edu.mit.bsense.AnalyticsEngine
>     >>>>>>
>     >>>>>> Look at the exception. Basically, you'll need to register
>     every class
>     >>>>>> type that is recursively used by BSONObject.
>     >>>>>>
>     >>>>>> On Sun, Nov 3, 2013 at 4:27 PM, Yadid Ayzenberg
>     <yadid@media.mit.edu <mailto:yadid@media.mit.edu>>
>     >>>>>> wrote:
>     >>>>>>>
>     >>>>>>> Hi Patrick,
>     >>>>>>>
>     >>>>>>> I am in fact using Kryo and im registering
>      BSONObject.class (which
>     >>>>>>> is
>     >>>>>>> class
>     >>>>>>> holding the data) in my KryoRegistrator.
>     >>>>>>> Im not sure what other classes I should be registering.
>     >>>>>>>
>     >>>>>>> Thanks,
>     >>>>>>>
>     >>>>>>> Yadid
>     >>>>>>>
>     >>>>>>>
>     >>>>>>>
>     >>>>>>> On 11/3/13 7:23 PM, Patrick Wendell wrote:
>     >>>>>>>>
>     >>>>>>>> The problem is you are referencing a class that
does not
>     "extend
>     >>>>>>>> serializable" in the data that you shuffle. Spark
needs
>     to send all
>     >>>>>>>> shuffle data over the network, so it needs to know
how to
>     serialize
>     >>>>>>>> them.
>     >>>>>>>>
>     >>>>>>>> One option is to use Kryo for network serialization
as
>     described
>     >>>>>>>> here
>     >>>>>>>> - you'll have to register all the class that get
>     serialized though.
>     >>>>>>>>
>     >>>>>>>> http://spark.incubator.apache.org/docs/latest/tuning.html
>     >>>>>>>>
>     >>>>>>>> Another option is to write a wrapper class that
"extends
>     >>>>>>>> externalizable" and write the serialization yourself.
>     >>>>>>>>
>     >>>>>>>> - Patrick
>     >>>>>>>>
>     >>>>>>>> On Sun, Nov 3, 2013 at 10:33 AM, Yadid Ayzenberg
>     >>>>>>>> <yadid@media.mit.edu <mailto:yadid@media.mit.edu>>
>     >>>>>>>> wrote:
>     >>>>>>>>>
>     >>>>>>>>> Hi All,
>     >>>>>>>>>
>     >>>>>>>>> My original RDD contains arrays of doubles.
when appying
>     a count()
>     >>>>>>>>> operator
>     >>>>>>>>> to the original RDD I get the result as expected.
>     >>>>>>>>> However when I run a map on the original RDD
in order to
>     generate a
>     >>>>>>>>> new
>     >>>>>>>>> RDD
>     >>>>>>>>> with only the first element of each array, and
try to
>     apply count()
>     >>>>>>>>> to
>     >>>>>>>>> the
>     >>>>>>>>> new generated RDD I get the following exception:
>     >>>>>>>>>
>     >>>>>>>>> 19829 [run-main] INFO
>      org.apache.spark.scheduler.DAGScheduler  -
>     >>>>>>>>> Failed
>     >>>>>>>>> to
>     >>>>>>>>> run count at AnalyticsEngine.java:133
>     >>>>>>>>> [error] (run-main) org.apache.spark.SparkException:
Job
>     failed:
>     >>>>>>>>> java.io.NotSerializableException:
>     edu.mit.bsense.AnalyticsEngine
>     >>>>>>>>> org.apache.spark.SparkException: Job failed:
>     >>>>>>>>> java.io.NotSerializableException:
>     edu.mit.bsense.AnalyticsEngine
>     >>>>>>>>>         at
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:760)
>     >>>>>>>>>         at
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:758)
>     >>>>>>>>>         at
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60)
>     >>>>>>>>>         at
>     >>>>>>>>>
>     scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     >>>>>>>>>         at
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:758)
>     >>>>>>>>>         at
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org
>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:556)
>     >>>>>>>>>         at
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org
>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:503)
>     >>>>>>>>>         at
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:361)
>     >>>>>>>>>         at
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org
>     <http://org.apache.spark.scheduler.DAGScheduler.org>$apache$spark$scheduler$DAGScheduler$$run(DAGScheduler.scala:441)
>     >>>>>>>>>         at
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     org.apache.spark.scheduler.DAGScheduler$$anon$1.run(DAGScheduler.scala:149)
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>> If a run a take() operation on the new RDD I
receive the
>     results as
>     >>>>>>>>> expected. here is my code:
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>> JavaRDD<Double> rdd2 =  rdd.flatMap( new
>     >>>>>>>>> FlatMapFunction<Tuple2<Object,
>     >>>>>>>>> BSONObject>, Double>() {
>     >>>>>>>>> @Override
>     >>>>>>>>>             public Iterable<Double> call(Tuple2<Object,
>     BSONObject>
>     >>>>>>>>> e)
>     >>>>>>>>> {
>     >>>>>>>>> BSONObject doc = e._2();
>     >>>>>>>>> List<List<Double>> vals =
>     >>>>>>>>> (List<List<Double>>)doc.get("data");
>     >>>>>>>>> List<Double> results = new ArrayList<Double>();
>     >>>>>>>>>               for (int i=0; i< vals.size();i++
)
>     >>>>>>>>> results.add((Double)vals.get(i).get(0));
>     >>>>>>>>> return results;
>     >>>>>>>>>
>     >>>>>>>>>             }
>     >>>>>>>>>             });
>     >>>>>>>>>
>     >>>>>>>>> logger.info <http://logger.info>("Take:
{}",
>     rdd2.take(100));
>     >>>>>>>>> logger.info <http://logger.info>("Count:
{}", rdd2.count());
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>> Any ideas on what I am doing wrong ?
>     >>>>>>>>>
>     >>>>>>>>> Thanks,
>     >>>>>>>>>
>     >>>>>>>>> Yadid
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>> --
>     >>>>>>>>> Yadid Ayzenberg
>     >>>>>>>>> Graduate Student and Research Assistant
>     >>>>>>>>> Affective Computing
>     >>>>>>>>> Phone: 617-866-7226 <tel:617-866-7226>
>     >>>>>>>>> Room: E14-274G
>     >>>>>>>>> MIT Media Lab
>     >>>>>>>>> 75 Amherst st, Cambridge, MA, 02139
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>>>>
>     >>>>>>> --
>     >>>>>>> Yadid Ayzenberg
>     >>>>>>> Graduate Student and Research Assistant
>     >>>>>>> Affective Computing
>     >>>>>>> Phone: 617-866-7226 <tel:617-866-7226>
>     >>>>>>> Room: E14-274G
>     >>>>>>> MIT Media Lab
>     >>>>>>> 75 Amherst st, Cambridge, MA, 02139
>     >>>>>>>
>     >>>>>>>
>     >>>>>>>
>     >>>>> --
>     >>>>> Yadid Ayzenberg
>     >>>>> Graduate Student and Research Assistant
>     >>>>> Affective Computing
>     >>>>> Phone: 617-866-7226 <tel:617-866-7226>
>     >>>>> Room: E14-274G
>     >>>>> MIT Media Lab
>     >>>>> 75 Amherst st, Cambridge, MA, 02139
>     >>>>>
>     >>>>>
>     >>>>>
>     >>>
>     >>> --
>     >>> Yadid Ayzenberg
>     >>> Graduate Student and Research Assistant
>     >>> Affective Computing
>     >>> Phone: 617-866-7226
>     >>> Room: E14-274G
>     >>> MIT Media Lab
>     >>> 75 Amherst st, Cambridge, MA, 02139
>     >>>
>     >>>
>     >>>
>     >
>     >
>     > --
>     > Yadid Ayzenberg
>     > Graduate Student and Research Assistant
>     > Affective Computing
>     > Phone: 617-866-7226
>     > Room: E14-274G
>     > MIT Media Lab
>     > 75 Amherst st, Cambridge, MA, 02139
>     >
>     >
>     >
>
>


-- 
Yadid Ayzenberg
Graduate Student and Research Assistant
Affective Computing
Phone: 617-866-7226
Room: E14-274G
MIT Media Lab
75 Amherst st, Cambridge, MA, 02139




Mime
View raw message