hadoop-mapreduce-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chris Riccomini <criccom...@linkedin.com>
Subject RE: ApplicationMaster Memory Usage
Date Wed, 21 Sep 2011 17:51:51 GMT
Hey Vinod,

I svn up'd, and rebuilt. My application's task (container) now runs!

Unfortunately, my application master eventually gets killed by the NodeManager anyway, and
I'm still not clear as to why. The AM is just running a loop, asking for a container, and
executing a command in the container. It keeps doing this over and over again. After a few
iterations, it gets killed with something like:

2011-09-21 10:42:40,869 INFO  monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:run(402))
- Memory usage of ProcessTree 21666 for container-id container_1316626117280_0002_01_000001
: Virtual 2260938752 bytes, limit : 2147483648 bytes; Physical 77398016 bytes, limit -1 bytes
2011-09-21 10:42:40,869 WARN  monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:isProcessTreeOverLimit(289))
- Process tree for container: container_1316626117280_0002_01_000001 has processes older than
1 iteration running over the configured limit. Limit=2147483648, current usage = 2260938752
2011-09-21 10:42:40,870 WARN  monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:run(453))
- Container [pid=21666,containerID=container_1316626117280_0002_01_000001] is running beyond
memory-limits. Current usage : 2260938752bytes. Limit : 2147483648bytes. Killing container.

Dump of the process-tree for container_1316626117280_0002_01_000001 : 
	|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES)
RSSMEM_USAGE(PAGES) FULL_CMD_LINE
	|- 21669 21666 21666 21666 (java) 105 4 2152300544 18593 java -Xmx512M -cp ./package/* kafka.yarn.ApplicationMaster
/home/criccomi/git/kafka-yarn/dist/kafka-streamer.tgz 2 1 1316626117280 com.linkedin.TODO
1 
	|- 21666 20570 21666 21666 (bash) 0 0 108638208 303 /bin/bash -c java -Xmx512M -cp './package/*'
kafka.yarn.ApplicationMaster /home/criccomi/git/kafka-yarn/dist/kafka-streamer.tgz 2 1 1316626117280
com.linkedin.TODO 1 1>/tmp/logs/application_1316626117280_0002/container_1316626117280_0002_01_000001/stdout
2>/tmp/logs/application_1316626117280_0002/container_1316626117280_0002_01_000001/stderr
 

2011-09-21 10:42:40,870 INFO  monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:run(463))
- Removed ProcessTree with root 21666

I don't think that my AM is leaking memory. Full code paste after the break

1. Do I need to release a container in my AM even if the AM receives it as a finished container
in the resource request response?
2. Do I need to free any other resources after a resource request (e.g. ResourceRequest, AllocateRequest,
etc)?

Cheers,
Chris


  def main(args: Array[String]) {
    // YARN will always give our ApplicationMaster
    // the first four parameters as: <package> <app id> <attempt id> <timestamp>
    val packagePath = args(0)
    val appId = args(1).toInt
    val attemptId = args(2).toInt
    val timestamp = args(3).toLong

    // these are our application master's parameters
    val streamerClass = args(4)
    val tasks = args(5).toInt

    // TODO log params here

    // start the application master helper
    val conf = new Configuration
    val applicationMasterHelper = new ApplicationMasterHelper(appId, attemptId, timestamp,
conf)
      .registerWithResourceManager

    // start and manage the slaves
    val noReleases = List[ContainerId]()
    var runningContainers = 0

    // keep going forever
    while (true) {
      val nonRunningTasks = tasks - runningContainers
      val response = applicationMasterHelper.sendResourceRequest(nonRunningTasks, noReleases)

      response.getAllocatedContainers.foreach(container => {
        new ContainerExecutor(packagePath, container)
          .addCommand("java -Xmx256M -cp './package/*' kafka.yarn.StreamingTask " + streamerClass
+ " "
            + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout "
            + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr").execute(conf)
      })

      runningContainers += response.getAllocatedContainers.length
      runningContainers -= response.getCompletedContainersStatuses.length

      Thread.sleep(1000)
    }

    applicationMasterHelper.unregisterWithResourceManager("SUCCESS")
  }


class ApplicationMasterHelper(iAppId: Int, iAppAttemptId: Int, lTimestamp: Long, conf: Configuration)
{
  val rpc = YarnRPC.create(conf)
  val appId = Records.newRecord(classOf[ApplicationId])
  val appAttemptId = Records.newRecord(classOf[ApplicationAttemptId])
  val rmAddress = NetUtils.createSocketAddr(conf.get(YarnConfiguration.RM_SCHEDULER_ADDRESS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS))
  val resourceManager = rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol]
  var requestId = 0

  appId.setClusterTimestamp(lTimestamp)
  appId.setId(iAppId)
  appAttemptId.setApplicationId(appId)
  appAttemptId.setAttemptId(iAppAttemptId)

  def registerWithResourceManager(): ApplicationMasterHelper = {
    val req = Records.newRecord(classOf[RegisterApplicationMasterRequest])
    req.setApplicationAttemptId(appAttemptId)
    // TODO not sure why these are blank- This is how spark does it
    req.setHost("")
    req.setRpcPort(1)
    req.setTrackingUrl("")
    resourceManager.registerApplicationMaster(req)
    this
  }

  def unregisterWithResourceManager(state: String): ApplicationMasterHelper = {
    val finReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
    finReq.setAppAttemptId(appAttemptId)
    finReq.setFinalState(state)
    resourceManager.finishApplicationMaster(finReq)
    this
  }

  def sendResourceRequest(containers: Int, release: List[ContainerId]): AMResponse = {
    // TODO will need to make this more flexible for hostname requests, etc
    val request = Records.newRecord(classOf[ResourceRequest])
    val pri = Records.newRecord(classOf[Priority])
    val capability = Records.newRecord(classOf[Resource])
    val req = Records.newRecord(classOf[AllocateRequest])
    request.setHostName("*")
    request.setNumContainers(containers)
    pri.setPriority(1)
    request.setPriority(pri)
    capability.setMemory(128)
    request.setCapability(capability)
    req.setResponseId(requestId)
    req.setApplicationAttemptId(appAttemptId)
    req.addAllAsks(Lists.newArrayList(request))
    req.addAllReleases(release)
    requestId += 1
    // TODO we might want to return a list of container executors here instead of AMResponses
    resourceManager.allocate(req).getAMResponse
  }
}


________________________________________
From: Vinod Kumar Vavilapalli [vinodkv@hortonworks.com]
Sent: Wednesday, September 21, 2011 10:08 AM
To: mapreduce-dev@hadoop.apache.org
Subject: Re: ApplicationMaster Memory Usage

Yes, the process-dump clearly tells that this is MAPREDUCE-2998.

+Vinod
(With a smirk to see his container-memory-monitoring code in action)


On Wed, Sep 21, 2011 at 10:26 PM, Arun C Murthy <acm@hortonworks.com> wrote:

> I'll bet you are hitting MR-2998.
>
> From the changelog:
>
>    MAPREDUCE-2998. Fixed a bug in TaskAttemptImpl which caused it to fork
> bin/mapred too many times. Contributed by Vinod K V.
>
> Arun
>
> On Sep 21, 2011, at 9:52 AM, Chris Riccomini wrote:
>
> > Hey Guys,
> >
> > My ApplicationMaster is being killed by the NodeManager because of memory
> consumption, and I don't understand why. I'm using -Xmx512M, and setting my
> resource request to 2048.
> >
> >
> >    .addCommand("java -Xmx512M -cp './package/*'
> kafka.yarn.ApplicationMaster " ...
> >
> >    ...
> >
> >    private var memory = 2048
> >
> >    resource.setMemory(memory)
> >    containerCtx.setResource(resource)
> >    containerCtx.setCommands(cmds.toList)
> >    containerCtx.setLocalResources(Collections.singletonMap("package",
> packageResource))
> >    appCtx.setApplicationId(appId)
> >    appCtx.setUser(user.getShortUserName)
> >    appCtx.setAMContainerSpec(containerCtx)
> >    request.setApplicationSubmissionContext(appCtx)
> >    applicationsManager.submitApplication(request)
> >
> > When this runs, I see (in my NodeManager's logs):
> >
> >
> > 2011-09-21 09:35:19,112 INFO  monitor.ContainersMonitorImpl
> (ContainersMonitorImpl.java:run(402)) - Memory usage of ProcessTree 28134
> for container-id container_1316559026783_0003_01_000001 : Virtual 2260938752
> bytes, limit : 2147483648 bytes; Physical 71540736 bytes, limit -1 bytes
> > 2011-09-21 09:35:19,112 WARN  monitor.ContainersMonitorImpl
> (ContainersMonitorImpl.java:isProcessTreeOverLimit(289)) - Process tree for
> container: container_1316559026783_0003_01_000001 has processes older than 1
> iteration running over the configured limit. Limit=2147483648, current usage
> = 2260938752
> > 2011-09-21 09:35:19,113 WARN  monitor.ContainersMonitorImpl
> (ContainersMonitorImpl.java:run(453)) - Container
> [pid=28134,containerID=container_1316559026783_0003_01_000001] is running
> beyond memory-limits. Current usage : 2260938752bytes. Limit :
> 2147483648bytes. Killing container.
> > Dump of the process-tree for container_1316559026783_0003_01_000001 :
> >       |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
> SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
> >       |- 28134 25886 28134 28134 (bash) 0 0 108638208 303 /bin/bash -c
> java -Xmx512M -cp './package/*' kafka.yarn.ApplicationMaster 3 1
> 1316559026783 com.linkedin.TODO 1
> 1>/tmp/logs/application_1316559026783_0003/container_1316559026783_0003_01_000001/stdout
> 2>/tmp/logs/application_1316559026783_0003/container_1316559026783_0003_01_000001/stderr
> >       |- 28137 28134 28134 28134 (java) 92 3 2152300544 17163 java
> -Xmx512M -cp ./package/* kafka.yarn.ApplicationMaster 3 1 1316559026783
> com.linkedin.TODO 1
> >
> > 2011-09-21 09:35:19,113 INFO  monitor.ContainersMonitorImpl
> (ContainersMonitorImpl.java:run(463)) - Removed ProcessTree with root 28134
> >
> > It appears that YARN is honoring my 2048 command, yet my process is
> somehow taking 2260938752 bytes. I don't think that I'm using nearly that
> much in permgen, and my heap is limited to 512. I don't have any JNI stuff
> running (that I know of), so it's unclear to me what's going on here. The
> only thing that I can think of is that Java's Runtime exec is forking, and
> copying its entire JVM memory footprint for the fork.
> >
> > Has anyone seen this? Am I doing something dumb?
> >
> > Thanks!
> > Chris
>
>

Mime
View raw message