spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jerry Vinokurov <grapesmo...@gmail.com>
Subject Re: intermittent Kryo serialization failures in Spark
Date Tue, 17 Sep 2019 14:37:44 GMT
Hi folks,

Posted this some time ago but the problem continues to bedevil us. I'm
including a (slightly edited) stack trace that results from this error. If
anyone can shed any light on what exactly is happening here and what we can
do to avoid it, that would be much appreciated.

org.apache.spark.SparkException: Failed to register classes with Kryo
> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
> 	at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
> 	at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:309)
> 	at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
> 	at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
> 	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
> 	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
> 	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
> 	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
> 	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
> 	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
> 	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
> 	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
> 	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
> 	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
> 	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.window.WindowExec.doExecute(WindowExec.scala:302)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.buildBuffers(InMemoryRelation.scala:83)
> 	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.cachedColumnBuffers(InMemoryRelation.scala:59)
> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.filteredCachedBatches(InMemoryTableScanExec.scala:276)
> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD$lzycompute(InMemoryTableScanExec.scala:105)
> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD(InMemoryTableScanExec.scala:104)
> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doExecute(InMemoryTableScanExec.scala:310)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
> 	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:143)
> 	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
> 	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
> 	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
> 	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
> 	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
> 	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
> 	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
> 	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
>         [our code that writes data to CSV]	
> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> 	at java.lang.Class.forName0(Native Method)
> 	at java.lang.Class.forName(Class.java:348)
> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
> 	... 132 more
>
>
On Wed, Jul 10, 2019 at 12:50 PM Jerry Vinokurov <grapesmoker@gmail.com>
wrote:

> Hi all,
>
> I am experiencing a strange intermittent failure of my Spark job that
> results from serialization issues in Kryo. Here is the stack trace:
>
> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> 	at java.lang.Class.forName0(Native Method)
>> 	at java.lang.Class.forName(Class.java:348)
>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>> 	... 204 more
>>
>> (I've edited the company and model name since this is proprietary code)
>
> This error does not surface every time the job is run; I would say it
> probably shows up once in every 10 runs or so, and there isn't anything
> about the input data that triggers this, as I've been able to
> (nondeterministically) reproduce the error by simply rerunning the job with
> the same inputs over and over again. The model itself is just a plain Scala
> case class whose fields are strings and integers, so there's no custom
> serialization logic or anything like that. As I understand, this is seems
> related to an issue previously documented here
> <https://issues.apache.org/jira/browse/SPARK-21928>but allegedly this was
> fixed long ago. I'm running this job on an AWS EMR cluster and have
> confirmed that the version of Spark running there is 2.4.0, with the patch
> that is linked in the above issue being part of the code.
>
> A suggested solution has been to set the extraClasspath config settings on
> the driver and executor, but that has not fixed the problem. I'm out of
> ideas for how to tackle this and would love to hear if anyone has any
> suggestions or strategies for fixing this.
>
> thanks,
> Jerry
>
> --
> http://www.google.com/profiles/grapesmoker
>


-- 
http://www.google.com/profiles/grapesmoker

Mime
View raw message