spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mayur Rustagi <mayur.rust...@gmail.com>
Subject Re: Running actions in loops
Date Fri, 07 Mar 2014 19:29:16 GMT
Mostly the job you are executing is not serializable, this typically
happens when you have a library that is not serializable.. are you using
any library like jodatime etc ?

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski <ognen@plainvanillagames.com
> wrote:

> It looks like the problem is in the filter task - is there anything
> special about filter()?
>
> I have removed the filter line from the loops just to see if things will
> work and they do.
>
> Anyone has any ideas?
>
> Thanks!
> Ognen
>
>
> On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:
>
>> Hello,
>>
>> What is the general approach people take when trying to do analysis
>> across multiple large files where the data to be extracted from a
>> successive file depends on the data extracted from a previous file or set
>> of files?
>>
>> For example:
>> I have the following: a group of HDFS files each 20+GB in size. I need to
>> extract event1 on day 1 from first file and extract event2 from all
>> remaining files in a period of successive dates, then do a calculation on
>> the two events.
>> I then need to move on to day2, extract event1 (with certain properties),
>> take all following days, extract event2 and run a calculation against
>> previous day for all days in period. So on and so on.
>>
>> I have verified that the following (very naive approach doesn't work):
>>
>> def calcSimpleRetention(start:String,end:String,event1:
>> String,event2:String):Map[String,List[Double]] = {
>>     val epd = new PipelineDate(end)
>>     val result = for {
>>       dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
>>       val f1 = sc.textFile(dt1.toJsonHdfsFileName)
>>       val e1 = f1.filter(_.split(",")(0).split(":")(1).replace("\"","")
>> == event1).map(line => (line.split(",")(2).split(":")
>> (1).replace("\"",""),0)).cache
>>       val c = e1.count.toDouble
>>
>>       val intres = for {
>>         dt2 <- PipelineDate.getPeriod(dt1+1,epd)
>>         val f2 = sc.textFile(dt2.toJsonHdfsFileName)
>>         val e2 = f2.filter(_.split(",")(0).split(":")(1).replace("\"","")
>> == event2).map(line => (line.split(",")(2).split(":")
>> (1).replace("\"",""),1))
>>         val e1e2 = e1.union(e2)
>>         val r = e1e2.groupByKey().filter(e => e._2.length > 1 &&
>> e._2.filter(_==0).length>0).count.toDouble
>>       } yield (c/r) // get the retention rate
>>     } yield (dt1.toString->intres)
>>     Map(result:_*)
>>   }
>>
>> I am getting the following errors:
>> 14/03/07 03:22:25 INFO SparkContext: Starting job: count at
>> CountActor.scala:33
>> 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
>> CountActor.scala:33) with 140 output partitions (allowLocal=false)
>> 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at
>> CountActor.scala:33)
>> 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
>> 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
>> 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
>> map at CountActor.scala:32), which has no missing parents
>> 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
>> CountActor.scala:33
>> 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not
>> serializable: java.io.NotSerializableException:
>> com.github.ognenpv.pipeline.CountActor
>> org.apache.spark.SparkException: Job aborted: Task not serializable:
>> java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$
>> apache$spark$scheduler$DAGScheduler$$abortStage$1.
>> apply(DAGScheduler.scala:1028)
>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$
>> apache$spark$scheduler$DAGScheduler$$abortStage$1.
>> apply(DAGScheduler.scala:1026)
>>     at scala.collection.mutable.ResizableArray$class.foreach(
>> ResizableArray.scala:59)
>>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
>> scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
>> scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>>     at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$
>> scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>>     at org.apache.spark.scheduler.DAGScheduler.processEvent(
>> DAGScheduler.scala:569)
>>     at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$
>> $anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
>> AbstractDispatcher.scala:386)
>>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(
>> ForkJoinTask.java:260)
>>     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>> runTask(ForkJoinPool.java:1339)
>>     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>> ForkJoinPool.java:1979)
>>     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>> ForkJoinWorkerThread.java:107)
>>
>> I should mention that this code is fired off from an Akka actor (which is
>> controlled by a Scalatra servlet).
>>
>> Any ideas, recommendations etc.? I am fairly new to Scala and M/R
>> principles in general, it is fair to say that at this point I am still
>> thinking from a point of view of an imperative programmer trying to fit a
>> square peg through a round hole ;)
>> Ognen
>>
>
> --
> Some people, when confronted with a problem, think "I know, I'll use
> regular expressions." Now they have two problems.
> -- Jamie Zawinski
>
>

Mime
View raw message