spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kürşat Kurt <kur...@kursatkurt.com>
Subject Out of memory at 60GB free memory.
Date Mon, 07 Nov 2016 05:32:28 GMT
Hi;

I am trying to use Naive Bayes for multi-class classification.

I am getting OOM at "pipeline.fit(train)" line. When i submit the code,
everything is ok so far the stage "collect at NaiveBayes.scala:400".

At this stage, starting 375 tasks very fast and going slowing down at this
point. Task count could not became 500, getting OOM at 380-390th task.

 

Spark-submit parameters: 

./spark-submit --class main.scala.Test1 --master local[8]  --driver-memory
60g  /projects/simple-project_2.11-1.0.jar

 

System properties:

Mode: client

Free Mem: 60GB(Total 64GB)

OS: Ubuntu 14.04

Core : 8

Java:1.8

 

Code:

 

    object Test {

 

      var num = 50;   

      var savePath = "hdfs://localhost:54310/SparkWork/SparkModel/";   

      var stemmer = Resha.Instance

 

      var STOP_WORDS: Set[String] = Set();

 

      def cropSentence(s: String) = {

        s.replaceAll("\\([^\\)]*\\)", "")          

          .replaceAll(" - ", " ")

          .replaceAll("-", " ")

          .replaceAll(" tr. ", " ")

          .replaceAll("  +", " ")

          .replaceAll(",", " ").trim();   }

 

      def main(args: Array[String]): Unit = {

 

        val start1 = System.currentTimeMillis();

 

        val sc = new SparkConf().setAppName("Test")    

        .set("spark.hadoop.validateOutputSpecs", "false")

 
.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

        .set("spark.kryoserializer.buffer.max","1g")

        .set("spark.driver.maxResultSize","20g")

        .set("spark.executor.memory", "30g")

        .set("spark.executor.cores", "6")

        .set("spark.speculation", "true")

        .set("spark.dynamicAllocation.enabled","true")

        .set("spark.files.overwrite","true")

 

        .set("spark.executor.heartbeatInterval","25s")

        .set("spark.sql.shuffle.partitions","2")

        .set("spark.sql.warehouse.dir", savePath+"wh")

 

        val spark = SparkSession.builder.appName("Java
Spark").config(sc).getOrCreate();

        import spark.implicits._

 

        val mainDataset =
spark.sparkContext.textFile(savePath+"classifications.csv")

          .map( _.split("ß"))

          .map(tokens => {      

             var list=new ListBuffer[String]();

          var
token0=cropSentence(tokens(0).toLowerCase(Locale.forLanguageTag("TR-tr")));

 

          token0.split("\\s+").map {list+=stemmer.stem(_)}   

          (tokens(1), list.toList.mkString(" "))

          }).toDF("className","productName");

 

         val classIndexer = new StringIndexer()

          .setInputCol("className")

          .setOutputCol("label");      

 

        val classIndexerModel = classIndexer.fit(mainDataset);

        var mainDS=classIndexerModel.transform(mainDataset);

        classIndexerModel.write.overwrite.save(savePath + "ClassIndexer");

 

 

        mainDS.write.mode(SaveMode.Overwrite).parquet(savePath+"processed");

        mainDS=spark.sqlContext.read.load(savePath+"processed")

         //Tokenizer

                  val tokenizer = new Tokenizer()


                               .setInputCol("productName")


                               .setOutputCol("words_nonfiltered")

                               ;

 

        //StopWords

                  val remover = new StopWordsRemover()

                                 .setInputCol("words_nonfiltered")

                                 .setOutputCol("words")

                                 .setStopWords(
Array[String]("word1","word2","-","//"));

 

        //CountVectorize

 

                  val countVectorizer = new CountVectorizer()

                                 .setInputCol("words")

                                 .setOutputCol("features")

 

 

                  val nb = new NaiveBayes()

                   .setSmoothing(0.1)

                   .setModelType("multinomial")

 

               val pipeline = new
Pipeline().setStages(Array(tokenizer,remover,countVectorizer,nb));

 

 

               val train =mainDS.repartition(500);

               val model = pipeline.fit(train);

 
model.write.overwrite.save(savePath+"RandomForestClassifier");

 

 

      } }

 

 

 

 

 

 

 



 

Log:

16/11/07 02:07:28 INFO Executor: Finished task 116.0 in stage 15.0 (TID
2433). 1025857381 bytes result sent via BlockManager)

16/11/07 02:07:28 INFO TaskSetManager: Finished task 44.0 in stage 15.0 (TID
2415) in 175757 ms on localhost (384/500)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 140.0 in stage 15.0
(TID 2439, localhost, partition 140, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 140.0 in stage 15.0 (TID 2439)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 144.0 in stage 15.0
(TID 2440, localhost, partition 144, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 144.0 in stage 15.0 (TID 2440)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 148.0 in stage 15.0
(TID 2441, localhost, partition 148, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 148.0 in stage 15.0 (TID 2441)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 152.0 in stage 15.0
(TID 2442, localhost, partition 152, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 152.0 in stage 15.0 (TID 2442)

16/11/07 02:07:28 INFO TaskSetManager: Starting task 156.0 in stage 15.0
(TID 2443, localhost, partition 156, ANY, 5254 bytes)

16/11/07 02:07:28 INFO Executor: Running task 156.0 in stage 15.0 (TID 2443)

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 494 non-empty
blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches
in 0 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 490 non-empty
blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches
in 0 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 497 non-empty
blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches
in 0 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 496 non-empty
blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches
in 1 ms

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Getting 495 non-empty
blocks out of 500 blocks

16/11/07 02:07:28 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches
in 1 ms

16/11/07 02:07:43 INFO BlockManagerInfo: Removed taskresult_2412 on
89.163.242.124:49610 in memory (size: 979.2 MB, free: 8.1 GB)

16/11/07 02:08:32 WARN NettyRpcEndpointRef: Error sending message [message =
Heartbeat(driver,[Lscala.Tuple2;@544bf77,BlockManagerId(driver,
89.163.242.124, 49610))] in 1 attempts

org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [25
seconds]. This timeout is controlled by spark.executor.heartbeatInterval

        at
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTi
meoutException(RpcTimeout.scala:48)

        at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(R
pcTimeout.scala:63)

        at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(R
pcTimeout.scala:59)

        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)

        at
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)

        at
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$repor
tHeartBeat(Executor.scala:518)

        at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Execu
tor.scala:547)

        at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.sca
la:547)

        at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.sca
la:547)

        at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)

        at
org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)

        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$
301(ScheduledThreadPoolExecutor.java:180)

        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Sch
eduledThreadPoolExecutor.java:294)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[25 seconds]

        at
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

        at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)

        at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scal
a:53)

        at scala.concurrent.Await$.result(package.scala:190)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)

        ... 14 more

16/11/07 02:09:04 WARN NettyRpcEndpointRef: Error sending message [message =
Heartbeat(driver,[Lscala.Tuple2;@544bf77,BlockManagerId(driver,
89.163.242.124, 49610))] in 2 attempts

org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [25
seconds]. This timeout is controlled by spark.executor.heartbeatInterval

        at
org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTi
meoutException(RpcTimeout.scala:48)

        at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(R
pcTimeout.scala:63)

        at
org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(R
pcTimeout.scala:59)

        at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83)

        at
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)

        at
org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$repor
tHeartBeat(Executor.scala:518)

        at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Execu
tor.scala:547)

        at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.sca
la:547)

        at
org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.sca
la:547)

        at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1877)

        at
org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:547)

        at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)

        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$
301(ScheduledThreadPoolExecutor.java:180)

        at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Sch
eduledThreadPoolExecutor.java:294)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)

        at java.lang.Thread.run(Thread.java:745)

Caused by: java.util.concurrent.TimeoutException: Futures timed out after
[25 seconds]

        at
scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)

        at
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)

        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)

        at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scal
a:53)

        at scala.concurrent.Await$.result(package.scala:190)

        at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:81)

        ... 14 more

16/11/07 02:09:18 WARN TaskMemoryManager: leak 1097.3 MB memory from
org.apache.spark.util.collection.ExternalAppendOnlyMap@53cf605d

16/11/07 02:09:31 ERROR Executor: Exception in task 140.0 in stage 15.0 (TID
2439)

java.lang.OutOfMemoryError: GC overhead limit exceeded

        at
net.jpountz.xxhash.StreamingXXHash32.asChecksum(StreamingXXHash32.java:81)

        at
org.apache.spark.io.LZ4BlockInputStream.<init>(LZ4BlockInputStream.java:94)

        at
org.apache.spark.io.LZ4BlockInputStream.<init>(LZ4BlockInputStream.java:104)

        at
org.apache.spark.io.LZ4CompressionCodec.compressedInputStream(CompressionCod
ec.scala:118)

        at
org.apache.spark.serializer.SerializerManager.wrapForCompression(SerializerM
anager.scala:116)

        at
org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$2.apply(BlockStore
ShuffleReader.scala:56)

        at
org.apache.spark.shuffle.BlockStoreShuffleReader$$anonfun$2.apply(BlockStore
ShuffleReader.scala:55)

        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)

        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)

        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

        at
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32
)

        at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:3
9)

        at
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalApp
endOnlyMap.scala:154)

        at
org.apache.spark.Aggregator.combineCombinersByKey(Aggregator.scala:50)

        at
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReade
r.scala:85)

        at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:109)

        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)

        at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)

        at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)

        at org.apache.spark.scheduler.Task.run(Task.scala:86)

        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)

        at java.lang.Thread.run(Thread.java:745)

16/11/07 02:09:38 ERROR Executor: Exception in task 128.0 in stage 15.0 (TID
2436)

java.lang.OutOfMemoryError: Java heap space

        at com.esotericsoftware.kryo.io.Output.require(Output.java:168)

        at com.esotericsoftware.kryo.io.Output.writeLong(Output.java:519)

        at com.esotericsoftware.kryo.io.Output.writeDouble(Output.java:648)

        at com.esotericsoftware.kryo.io.Output.writeDoubles(Output.java:729)

        at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySer
ializer.write(DefaultArraySerializers.java:216)

        at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$DoubleArraySer
ializer.write(DefaultArraySerializers.java:205)

        at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:606)

        at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:87)

        at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.
java:518)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)

        at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)

        at
com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySer
ializer.write(DefaultArraySerializers.java:366)

        at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySer
ializer.write(DefaultArraySerializers.java:307)

        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)

        at
org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.
scala:297)

        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)

        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
42)

        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:6
17)

        at java.lang.Thread.run(Thread.java:745)


Mime
View raw message