spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yong Zhang <java8...@hotmail.com>
Subject Re: spark streaming exectors memory increasing and executor killed by yarn
Date Fri, 17 Mar 2017 02:38:07 GMT
In this kind of question, you always want to tell us the spark version.


Yong


________________________________
From: darin <lidaling@foxmail.com>
Sent: Thursday, March 16, 2017 9:59 PM
To: user@spark.apache.org
Subject: spark streaming exectors memory increasing and executor killed by yarn

Hi,
I got this exception when streaming program run some hours.

```
*User class threw exception: org.apache.spark.SparkException: Job aborted
due to stage failure: Task 21 in stage 1194.0 failed 4 times, most recent
failure: Lost task 21.3 in stage 1194.0 (TID 2475, 2.dev3, executor 66):
ExecutorLostFailure (executor 66 exited caused by one of the running tasks)
Reason: Container killed by YARN for exceeding memory limits. 3.5 GB of 3.5
GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.*
```

I have googled some solutions like close yarn memory monitor ,increasing
exector memory... .I think it is not the right way .


And this is the submit script:
```
*spark-submit --master yarn-cluster --driver-cores 1 --driver-memory 1G
--num-executors 6 --executor-cores 3 --executor-memory 3G --conf
"spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC -XX:+UseParNewGC
-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/javadump.hprof" --conf
"spark.kryoserializer.buffer.max=512m" --class
com.dtise.data.streaming.ad.DTStreamingStatistics
hdfs://nameservice1/user/yanghb/spark-streaming-1.0.jar*
```

And This is the main codes:

```
val originalStream = ssc.textFileStream(rawDataPath)
    originalStream.repartition(10).mapPartitions(parseAdLog).reduceByKey(_
++ _)
      .mapWithState(StateSpec.function(countAdLogWithState
_)).foreachRDD(rdd => {
        if (!rdd.isEmpty()) {
          val batchTime = Calendar.getInstance.getTimeInMillis
          val dimensionSumMap = rdd.map(_._1).reduce(_ ++ _)
          val nameList = rdd.map(_._2).reduce(_ ++ _).toList
          val jedis = RedisUtils.jedis()
          jedis.hmset(joinString("t_ad_dimension_sum", batchTime),
dimensionSumMap)
          jedis.lpush(joinString("t_ad_name", batchTime), nameList: _*)
          jedis.set(joinString("t_ad", batchTime.toString), "OK")
          jedis.close()

          rdd.flatMap(_._3).foreachPartition(logInfoList => {
            val producter = new StringProducter
            for (logInfo <- logInfoList) {
              val logInfoArr = logInfo.split("\t", -1)
              val kafkaKey = "ad/" + logInfoArr(campaignIdIdx) + "/" +
logInfoArr(logDateIdx)
              producter.send("cookedLog", kafkaKey, logInfo)
            }
            producter.close()
          })
        }
      })
```

These are jvm heap mat results

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095238%402x.png>
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095254%402x.png>
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095331%402x.png>

/*Anybody has any advice about this ?
Thanks*/





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-exectors-memory-increasing-and-executor-killed-by-yarn-tp28500.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


[http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095238%402x.png]

[http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095254%402x.png]

[http://apache-spark-user-list.1001560.n3.nabble.com/file/n28500/QQ20170317-095331%402x.png]

Mime
View raw message