spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jerry Vinokurov <grapesmo...@gmail.com>
Subject Re: intermittent Kryo serialization failures in Spark
Date Thu, 26 Sep 2019 02:32:49 GMT
Hi Julien,

Thanks for the suggestion. If we don't do a broadcast, that would
presumably affect the performance of the job, as the model that is failing
to be broadcast is something that we need to be shared across the cluster.
But it may be worth it if the trade-off is not having things run properly.
Vadim's suggestions did not make a difference for me (still hitting this
error several times a day) but I'll try with disabling broadcast and see if
that does anything.

thanks,
Jerry

On Fri, Sep 20, 2019 at 10:00 AM Julien Laurenceau <
julien.laurenceau@pepitedata.com> wrote:

> Hi,
> Did you try without the broadcast ?
> Regards
> JL
>
> Le jeu. 19 sept. 2019 à 06:41, Vadim Semenov <vadim@datadoghq.com.invalid>
> a écrit :
>
>> Pre-register your classes:
>>
>> ```
>> import com.esotericsoftware.kryo.Kryo
>> import org.apache.spark.serializer.KryoRegistrator
>>
>> class MyKryoRegistrator extends KryoRegistrator {
>>   override def registerClasses(kryo: Kryo): Unit = {
>>     kryo.register(Class.forName("[[B")) // byte[][]
>>     kryo.register(classOf[java.lang.Class[_]])
>>   }
>> }
>> ```
>>
>> then run with
>>
>> 'spark.kryo.referenceTracking': 'false',
>> 'spark.kryo.registrationRequired': 'false',
>> 'spark.kryo.registrator': 'com.datadog.spark.MyKryoRegistrator',
>> 'spark.kryo.unsafe': 'false',
>> 'spark.kryoserializer.buffer.max': '256m',
>>
>> On Tue, Sep 17, 2019 at 10:38 AM Jerry Vinokurov <grapesmoker@gmail.com>
>> wrote:
>>
>>> Hi folks,
>>>
>>> Posted this some time ago but the problem continues to bedevil us. I'm
>>> including a (slightly edited) stack trace that results from this error. If
>>> anyone can shed any light on what exactly is happening here and what we can
>>> do to avoid it, that would be much appreciated.
>>>
>>> org.apache.spark.SparkException: Failed to register classes with Kryo
>>>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:140)
>>>> 	at org.apache.spark.serializer.KryoSerializerInstance.borrowKryo(KryoSerializer.scala:324)
>>>> 	at org.apache.spark.serializer.KryoSerializerInstance.<init>(KryoSerializer.scala:309)
>>>> 	at org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:218)
>>>> 	at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:288)
>>>> 	at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:127)
>>>> 	at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
>>>> 	at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
>>>> 	at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62)
>>>> 	at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1489)
>>>> 	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.buildReader(CSVFileFormat.scala:103)
>>>> 	at org.apache.spark.sql.execution.datasources.FileFormat$class.buildReaderWithPartitionValues(FileFormat.scala:129)
>>>> 	at org.apache.spark.sql.execution.datasources.TextBasedFileFormat.buildReaderWithPartitionValues(FileFormat.scala:165)
>>>> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD$lzycompute(DataSourceScanExec.scala:309)
>>>> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDD(DataSourceScanExec.scala:305)
>>>> 	at org.apache.spark.sql.execution.FileSourceScanExec.inputRDDs(DataSourceScanExec.scala:327)
>>>> 	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>>> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:128)
>>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$$anonfun$doExecute$1.apply(ShuffleExchangeExec.scala:119)
>>>> 	at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>>>> 	at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>>> 	at org.apache.spark.sql.execution.SortExec.inputRDDs(SortExec.scala:121)
>>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.window.WindowExec.doExecute(WindowExec.scala:302)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>>> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.buildBuffers(InMemoryRelation.scala:83)
>>>> 	at org.apache.spark.sql.execution.columnar.CachedRDDBuilder.cachedColumnBuffers(InMemoryRelation.scala:59)
>>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.filteredCachedBatches(InMemoryTableScanExec.scala:276)
>>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD$lzycompute(InMemoryTableScanExec.scala:105)
>>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.inputRDD(InMemoryTableScanExec.scala:104)
>>>> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doExecute(InMemoryTableScanExec.scala:310)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:391)
>>>> 	at org.apache.spark.sql.execution.FilterExec.inputRDDs(basicPhysicalOperators.scala:121)
>>>> 	at org.apache.spark.sql.execution.ProjectExec.inputRDDs(basicPhysicalOperators.scala:41)
>>>> 	at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:627)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:143)
>>>> 	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
>>>> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
>>>> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
>>>> 	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>>> 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>>> 	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
>>>> 	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
>>>> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>>> 	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
>>>> 	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
>>>> 	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
>>>> 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
>>>> 	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
>>>> 	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
>>>> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
>>>> 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
>>>> 	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
>>>>         [our code that writes data to CSV]	
>>>> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>> 	at java.lang.Class.forName0(Native Method)
>>>> 	at java.lang.Class.forName(Class.java:348)
>>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>>>> 	... 132 more
>>>>
>>>>
>>> On Wed, Jul 10, 2019 at 12:50 PM Jerry Vinokurov <grapesmoker@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am experiencing a strange intermittent failure of my Spark job that
>>>> results from serialization issues in Kryo. Here is the stack trace:
>>>>
>>>> Caused by: java.lang.ClassNotFoundException: com.mycompany.models.MyModel
>>>>> 	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>> 	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
>>>>> 	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>> 	at java.lang.Class.forName0(Native Method)
>>>>> 	at java.lang.Class.forName(Class.java:348)
>>>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>>> 	at org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$4.apply(KryoSerializer.scala:132)
>>>>> 	at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>> 	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>>>>> 	at org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:132)
>>>>> 	... 204 more
>>>>>
>>>>> (I've edited the company and model name since this is proprietary code)
>>>>
>>>> This error does not surface every time the job is run; I would say it
>>>> probably shows up once in every 10 runs or so, and there isn't anything
>>>> about the input data that triggers this, as I've been able to
>>>> (nondeterministically) reproduce the error by simply rerunning the job with
>>>> the same inputs over and over again. The model itself is just a plain Scala
>>>> case class whose fields are strings and integers, so there's no custom
>>>> serialization logic or anything like that. As I understand, this is seems
>>>> related to an issue previously documented here
>>>> <https://issues.apache.org/jira/browse/SPARK-21928>but allegedly this
>>>> was fixed long ago. I'm running this job on an AWS EMR cluster and have
>>>> confirmed that the version of Spark running there is 2.4.0, with the patch
>>>> that is linked in the above issue being part of the code.
>>>>
>>>> A suggested solution has been to set the extraClasspath config settings
>>>> on the driver and executor, but that has not fixed the problem. I'm out of
>>>> ideas for how to tackle this and would love to hear if anyone has any
>>>> suggestions or strategies for fixing this.
>>>>
>>>> thanks,
>>>> Jerry
>>>>
>>>> --
>>>> http://www.google.com/profiles/grapesmoker
>>>>
>>>
>>>
>>> --
>>> http://www.google.com/profiles/grapesmoker
>>>
>>
>>
>> --
>> Sent from my iPhone
>>
>

-- 
http://www.google.com/profiles/grapesmoker

Mime
View raw message