spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kürşat Kurt <kur...@kursatkurt.com>
Subject RE: Out of memory at 60GB free memory.
Date Mon, 07 Nov 2016 07:57:41 GMT
Hi Daniel,

 

I started with 10g-10g, when oom occured i increased to this sizes.

I think i dont need(csv file has 300.00 lines), but i dont know what  causes this...

 

From: Daniel van der Ende [mailto:daniel.vanderende@gmail.com] 
Sent: Monday, November 7, 2016 10:03 AM
To: Kürşat Kurt <kursat@kursatkurt.com>
Cc: user@spark.apache.org
Subject: Re: Out of memory at 60GB free memory.

 

Looks to me like you are actually running out of memory. You're setting driver memory to 60G
and executor memory to 30G. With only 64G available in total, and running in local mode, you
won't have any memory left. Do you really need such a large amount of driver memory for this
job? 

Cheers, 
Daniel

 

On 7 Nov 2016 6:35 a.m., "Kürşat Kurt" <kursat@kursatkurt.com <mailto:kursat@kursatkurt.com>
> wrote:

Hi;

I am trying to use Naive Bayes for multi-class classification.

I am getting OOM at “pipeline.fit(train)” line. When i submit the code, everything is
ok so far the stage “collect at NaiveBayes.scala:400”.

At this stage, starting 375 tasks very fast and going slowing down at this point. Task count
could not became 500, getting OOM at 380-390th task.

 

Spark-submit parameters: 

./spark-submit --class main.scala.Test1 --master local[8]  --driver-memory 60g  /projects/simple-project_2.11-1.0.jar

 

System properties:

Mode: client

Free Mem: 60GB(Total 64GB)

OS: Ubuntu 14.04

Core : 8

Java:1.8

 

Code:

 

    object Test {

 

      var num = 50;   

      var savePath = "hdfs://localhost:54310/SparkWork/SparkModel/";   

      var stemmer = Resha.Instance

 

      var STOP_WORDS: Set[String] = Set();

 

      def cropSentence(s: String) = {

        s.replaceAll("\\([^\\)]*\\)", "")          

          .replaceAll(" - ", " ")

          .replaceAll("-", " ")

          .replaceAll(" tr. ", " ")

          .replaceAll("  +", " ")

          .replaceAll(",", " ").trim();   }

 

      def main(args: Array[String]): Unit = {

 

        val start1 = System.currentTimeMillis();

 

        val sc = new SparkConf().setAppName("Test")    

        .set("spark.hadoop.validateOutputSpecs", "false")

        .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

        .set("spark.kryoserializer.buffer.max","1g")

        .set("spark.driver.maxResultSize","20g")

        .set("spark.executor.memory", "30g")

        .set("spark.executor.cores", "6")

        .set("spark.speculation", "true")

        .set("spark.dynamicAllocation.enabled","true")

        .set("spark.files.overwrite","true")

 

        .set("spark.executor.heartbeatInterval","25s")

        .set("spark.sql.shuffle.partitions","2")

        .set("spark.sql.warehouse.dir", savePath+"wh")

 

        val spark = SparkSession.builder.appName("Java Spark").config(sc).getOrCreate();

        import spark.implicits._

 

        val mainDataset = spark.sparkContext.textFile(savePath+"classifications.csv")

          .map( _.split("ß"))

          .map(tokens => {      

             var list=new ListBuffer[String]();

          var token0=cropSentence(tokens(0).toLowerCase(Locale.forLanguageTag("TR-tr")));

 

          token0.split("\\s+ <file://s+> ").map {list+=stemmer.stem(_)}   

          (tokens(1), list.toList.mkString(" "))

          }).toDF("className","productName");

 

         val classIndexer = new StringIndexer()

          .setInputCol("className")

          .setOutputCol("label");      

 

        val classIndexerModel = classIndexer.fit(mainDataset);

        var mainDS=classIndexerModel.transform(mainDataset);

        classIndexerModel.write.overwrite.save(savePath + "ClassIndexer");

 

 

        mainDS.write.mode(SaveMode.Overwrite).parquet(savePath+"processed");

        mainDS=spark.sqlContext.read.load(savePath+"processed")

         //Tokenizer

                  val tokenizer = new Tokenizer()                                

                               .setInputCol("productName")                     

                               .setOutputCol("words_nonfiltered")

                               ;

 

        //StopWords

                  val remover = new StopWordsRemover()

                                 .setInputCol("words_nonfiltered")

                                 .setOutputCol("words")

                                 .setStopWords( Array[String]("word1","word2","-","//"));

 

        //CountVectorize

 

                  val countVectorizer = new CountVectorizer()

                                 .setInputCol("words")

                                 .setOutputCol("features")

 

 

                  val nb = new NaiveBayes()

                   .setSmoothing(0.1)

                   .setModelType("multinomial")

 

               val pipeline = new Pipeline().setStages(Array(tokenizer,remover,countVectorizer,nb));

 

 

               val train =mainDS.repartition(500);

               val model = pipeline.fit(train);

               model.write.overwrite.save(savePath+"RandomForestClassifier");

 

 

      } }

 

 

 

 

 

 





 

Log:

16/11/07 02:07:28 INFO Executor: Finished task 116.0 in stage 15.0 (TID 2433). 1025857381
bytes result sent via BlockManager)

16/11/07 02:07:28 INFO TaskSetManager: Finished task 44.0 in stage 15.0 (TID 2415) in 175757
ms on localhost (384/500)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 140.0 in stage 15.0 (TID 2439, localhost,
partition 140, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 140.0 in stage 15.0 (TID 2439)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 144.0 in stage 15.0 (TID 2440, localhost,
partition 144, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 144.0 in stage 15.0 (TID 2440)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 148.0 in stage 15.0 (TID 2441, localhost,
partition 148, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 148.0 in stage 15.0 (TID 2441)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 152.0 in stage 15.0 (TID 2442, localhost,
partition 152, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 152.0 in stage 15.0 (TID 2442)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 156.0 in stage 15.0 (TID 2443, localhost,
partition 156, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 156.0 in stage 15.0 (TID 2443)

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 494 non-empty blocks out of 500
blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 490 non-empty blocks out of 500
blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 497 non-empty blocks out of 500
blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 496 non-empty blocks out of 500
blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 495 non-empty blocks out of 500
blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms

16/11/07 02:07:43 INFO BlockManagerInfo: Removed taskresult_2412 on 89.163.242.124:49610 <http://89.163.242.124:49610>
 in memory (size: 979.2 MB, free: 8.1 GB)

16/11/07 02:08:32 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@544bf77,BlockManagerId(driver,
89.163.242.124, 49610))] in 1 attempts

org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [25 seconds]. This timeout
is controlled by spark.executor.heartbeatInterval

        at org.apache.spark.rpc.RpcTimeout.org <http://org.apache.spark.rpc.RpcTimeout.org>
$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)

        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)

        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)

        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)

        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)

        at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)

        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)

        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)

        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)

        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)

        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)

        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.util.concurrent.TimeoutException: Futures timed out after [25 seconds]

        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)

        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)

        at scala.concurrent.Await$.result(package.scala:190)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)

        ... 14 more

16/11/07 02:09:04 WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@544bf77,BlockManagerId(driver,
89.163.242.124, 49610))] in 2 attempts

org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [25 seconds]. This timeout
is controlled by spark.executor.heartbeatInterval

        at org.apache.spark.rpc.RpcTimeout.org <http://org.apache.spark.rpc.RpcTimeout.org>
$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)

        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)

        at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)

        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)

        at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)

        at org.apache.spark.executor.Executor.org <http://org.apache.spark.executor.Executor.org>
$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:518)

        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:547)

        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)

        at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:547)

        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)

        at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)

        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)

        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.util.concurrent.TimeoutException: Futures timed out after [25 seconds]

        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)

        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)

        at scala.concurrent.Await$.result(package.scala:190)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)

        ... 14 more

16/11/07 02:09:18 WARN TaskMemoryManager: leak 1097.3 MB memory from org.apache.spark.util.collection.ExternalAppendOnlyMap@53cf605d
<mailto:org.apache.spark.util.collection.ExternalAppendOnlyMap@53cf605d> 

16/11/07 02:09:31 ERROR Executor: Exception in task 140.0 in stage 15.0 (TID 2439)

java.lang.OutOfMemoryError: GC overhead limit exceeded

        at net.jpountz.xxhash.StreamingXXHash32.asChecksum(StreamingXXHash32.java:81)

        at org.apache.spark.io <http://org.apache.spark.io> .LZ4BlockInputStream.<init>(LZ4BlockInputStream.java:94)

        at org.apache.spark.io <http://org.apache.spark.io> .LZ4BlockInputStream.<init>(LZ4BlockInputStream.java:104)

        at org.apache.spark.io <http://org.apache.spark.io> .LZ4CompressionCodec.compressedInputStream(CompressionCodec.scala:118)

        at org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerManager.scala:116)

        at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$2.apply(BlockStoreShuffleReader.scala:56)

        at org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$2.apply(BlockStoreShuffleReader.scala:55)

        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)

        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

        at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)

        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)

        at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:154)

        at org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)

        at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:85)

        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)

        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)

        at org.apache.spark.scheduler.Task.run(Task.scala:86)

        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

16/11/07 02:09:38 ERROR Executor: Exception in task 128.0 in stage 15.0 (TID 2436)

java.lang.OutOfMemoryError: Java heap space

        at com.esotericsoftware.kryo.io <http://com.esotericsoftware.kryo.io> .Output.require(Output.java:168)

        at com.esotericsoftware.kryo.io <http://com.esotericsoftware.kryo.io> .Output.writeLong(Output.java:519)

        at com.esotericsoftware.kryo.io <http://com.esotericsoftware.kryo.io> .Output.writeDouble(Output.java:648)

        at com.esotericsoftware.kryo.io <http://com.esotericsoftware.kryo.io> .Output.writeDoubles(Output.java:729)

        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:216)

        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySerializer.write(DefaultArraySerializers.java:205)

        at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:606)

        at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:87)

        at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)

        at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)

        at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:366)

        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:307)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:297)

        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)

        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

        at java.lang.Thread.run(Thread.java:745)

 


Mime
View raw message