spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chanh Le <giaosu...@gmail.com>
Subject Re: Anyone else having trouble with replicated off heap RDD persistence?
Date Wed, 17 Aug 2016 00:05:01 GMT
Hi Michael,

You should you Alluxio instead.
http://www.alluxio.org/docs/master/en/Running-Spark-on-Alluxio.html <http://www.alluxio.org/docs/master/en/Running-Spark-on-Alluxio.html>
It should be easier.


Regards,
Chanh



> On Aug 17, 2016, at 5:45 AM, Michael Allman <michael@videoamp.com> wrote:
> 
> Hello,
> 
> A coworker was having a problem with a big Spark job failing after several hours when
one of the executors would segfault. That problem aside, I speculated that her job would be
more robust against these kinds of executor crashes if she used replicated RDD storage. She's
using off heap storage (for good reason), so I asked her to try running her job with the following
storage level: `StorageLevel(useDisk = true, useMemory = true, useOffHeap = true, deserialized
= false, replication = 2)`. The job would immediately fail with a rather suspicious looking
exception. For example:
> 
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 9086
> 	at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137)
> 	at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781)
> 	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
> 	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:85)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 
> or
> 
> java.lang.IndexOutOfBoundsException: Index: 6, Size: 0
> 	at java.util.ArrayList.rangeCheck(ArrayList.java:653)
> 	at java.util.ArrayList.get(ArrayList.java:429)
> 	at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:60)
> 	at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:834)
> 	at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:788)
> 	at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:229)
> 	at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169)
> 	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:461)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:85)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 
> or
> 
> java.lang.NullPointerException
> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec$$anonfun$doExecute$1$$anonfun$6.apply(InMemoryTableScanExec.scala:141)
> 	at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec$$anonfun$doExecute$1$$anonfun$6.apply(InMemoryTableScanExec.scala:140)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificColumnarIterator.hasNext(Unknown
Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown
Source)
> 	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
Source)
> 	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
> 	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
> 	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:85)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 
> I tried switching to Java serialization. I got a different exception:
> 
> java.io.StreamCorruptedException: invalid stream header: 780000D0
> 	at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:808)
> 	at java.io.ObjectInputStream.<init>(ObjectInputStream.java:301)
> 	at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.<init>(JavaSerializer.scala:63)
> 	at org.apache.spark.serializer.JavaDeserializationStream.<init>(JavaSerializer.scala:63)
> 	at org.apache.spark.serializer.JavaSerializerInstance.deserializeStream(JavaSerializer.scala:122)
> 	at org.apache.spark.serializer.SerializerManager.dataDeserializeStream(SerializerManager.scala:146)
> 	at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:433)
> 	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)
> 	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> 	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:85)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> 	at java.lang.Thread.run(Thread.java:745)
> 
> All of this suggests some kind of memory corruption. Has anyone else had a problem like
this using off heap storage with replication factor 2?
> 
> Thanks,
> 
> Michael
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
> 


Mime
View raw message