spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "haitao .yao" <yao.e...@gmail.com>
Subject replace ConnectionManager#ackTimeoutMonitor with ScheduledExecutorService to avoid OOME under long timeout
Date Tue, 11 Nov 2014 09:50:42 GMT
​Hey, guys, after join big data in a small cluster, I find out the Executor
OOME, after analyze the heap dump, I think I found out the reason
heap dump screen shot


   - I found out it takes too long to transfer the shuffle file, because of
   low IO performance, so I increased the timeout value
   -

   After increased the timeout value, I got OOME. and I found out that the
   java.util.Timer’s queue is holding the reference of the timeoutTask, and
   the timeoutTask is holding the reference of message through message.id
   ```
   // ConnectionManager, line: 905
   val timeoutTask = new TimeTask {

    override def run(): Unit = {
      messageStatuses.synchronized {
       messageStatuses.remove(message.id).foreach ( s => { // here!!!
          val e = new IOException("sendMessageReliably failed because ack " +
            s"was not received within $ackTimeout sec")
          if (!promise.tryFailure(e)) {

   }


* java.util.Timer 's TimeTask did not remove the element from the task
query, so only when the timeout value reaches, the TimeTask will be
removed so that the message reference count decr to 0 to be GCed
here's the code of java.util.TimerTask

public boolean cancel() {
    synchronized(lock) {
        boolean result = (state == SCHEDULED);
        state = CANCELLED;
        return result;
    }
}

```

   - so I replaced the Timer with ScheduledExecutorService, use only
   messageId in the TimerTask.

Here’s the push request.

https://github.com/apache/spark/pull/3207

I’ve pushed the code into our production system, without changing any other
parameter, the same job succeeded.

Thanks.
​
-- 
haitao.yao

Mime
View raw message