beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Amit Sela (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (BEAM-637) WindowedValue$ValueInGlobalWindow is not serializable when using JavaSerializer instead of Kryo
Date Mon, 19 Sep 2016 08:43:20 GMT

    [ https://issues.apache.org/jira/browse/BEAM-637?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15502734#comment-15502734
] 

Amit Sela commented on BEAM-637:
--------------------------------

By default, the SparkRunner is hard-coded to use Kryo for Serialisation.
In many places though, especially when shuffling data is concerned, we use Beam coders to
encode-shuffle-decode.
It looks like this happens in the "triggering-count" which is used to trigger an action of
a branch in the DAG if non was applied (Spark is lazy-evaluated and requires so) - since this
is a trigger-only op. you could try to "WindowingHelpers.<T>unwindowValueFunction()"
before the count, that'll just leave T (the elements) obligated to be Serializable. The most
robust solution would be to "CoderHelpers.toByteArrays(windowedValues, windowCoder)" encode
them, and so you count byte[] elements. 

> WindowedValue$ValueInGlobalWindow is not serializable when using JavaSerializer instead
of Kryo 
> ------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-637
>                 URL: https://issues.apache.org/jira/browse/BEAM-637
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-spark
>            Reporter: Stas Levin
>            Assignee: Amit Sela
>
> When using {{JavaSerializer}} instead of {{KryoSerializer}} by configuring {{JavaSparkContext}}
with  {{"spark.serializer"}} set to {{JavaSerializer.class.getCanonicalName()}}, an exception
is thrown:
> {noformat}
> object not serializable (class: org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow,
value: ValueInGlobalWindow{value=hi there, pane=PaneInfo.NO_FIRING})
> {noformat}
> {noformat}
> Serialization stack:
> 	- object not serializable (class: org.apache.beam.sdk.util.WindowedValue$ValueInGlobalWindow,
value: ValueInGlobalWindow{value=hi there, pane=PaneInfo.NO_FIRING})
> 	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
> 	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> 	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> 	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
> 	at scala.Option.foreach(Option.scala:236)
> 	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
> 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
> 	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
> 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929)
> 	at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
> 	at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:440)
> 	at org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:46)
> 	at org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext$1.call(StreamingEvaluationContext.java:175)
> 	at org.apache.beam.runners.spark.translation.streaming.StreamingEvaluationContext$1.call(StreamingEvaluationContext.java:172)
> 	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
> 	at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$3.apply(JavaDStreamLike.scala:335)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
> 	at scala.util.Try$.apply(Try.scala:161)
> 	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> {noformat}
> This issue is reproducible with the {{SimpleStreamingWordCountTest#testRun}} test (given
{{JavaSerialiser}} is configured).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message