hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1082677 [29/38] - in /hadoop/mapreduce/branches/MR-279: ./ assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/ mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapredu...
Date Thu, 17 Mar 2011 20:21:54 GMT
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,151 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.FilterService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
+import org.apache.hadoop.yarn.ApplicationID;
+
+public class AuxServices extends AbstractService
+    implements ServiceStateChangeListener, EventHandler<AuxServicesEvent> {
+
+  private static final Log LOG = LogFactory.getLog(AuxServices.class);
+
+  public static final String AUX_SERVICES = "nodemanager.auxiluary.services";
+  public static final String AUX_SERVICE_CLASS_FMT =
+    "nodemanager.aux.service.%s.class";
+  public final Map<String,AuxiliaryService> serviceMap;
+
+  public AuxServices() {
+    super(AuxServices.class.getName());
+    serviceMap =
+      Collections.synchronizedMap(new HashMap<String,AuxiliaryService>());
+    // Obtain services from configuration in init()
+  }
+
+  protected final synchronized void addService(String name,
+      AuxiliaryService service) {
+    serviceMap.put(name, service);
+  }
+
+  Collection<AuxiliaryService> getServices() {
+    return Collections.unmodifiableCollection(serviceMap.values());
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    Collection<String> auxNames = conf.getStringCollection(AUX_SERVICES);
+    for (final String sName : auxNames) {
+      try {
+        Class<? extends AuxiliaryService> sClass = conf.getClass(
+              String.format(AUX_SERVICE_CLASS_FMT, sName), null,
+              AuxiliaryService.class);
+        if (null == sClass) {
+          throw new RuntimeException("No class defiend for " + sName);
+        }
+        AuxiliaryService s = ReflectionUtils.newInstance(sClass, conf);
+        // TODO better use use s.getName()?
+        addService(sName, s);
+        s.init(conf);
+      } catch (RuntimeException e) {
+        LOG.fatal("Failed to initialize " + sName, e);
+        throw e;
+      }
+    }
+    super.init(conf);
+  }
+
+  @Override
+  public void start() {
+    // TODO fork(?) services running as configured user
+    //      monitor for health, shutdown/restart(?) if any should die
+    for (Service service : serviceMap.values()) {
+      service.start();
+      service.register(this);
+    }
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    try {
+      synchronized (serviceMap) {
+        for (Service service : serviceMap.values()) {
+          if (service.getServiceState() == Service.STATE.STARTED) {
+            service.unregister(this);
+            service.stop();
+          }
+        }
+        serviceMap.clear();
+      }
+    } finally {
+      super.stop();
+    }
+  }
+
+  @Override
+  public void stateChanged(Service service) {
+    LOG.fatal("Service " + service.getName() + " changed state: " +
+        service.getServiceState());
+    stop();
+  }
+
+  @Override
+  public void handle(AuxServicesEvent event) {
+    LOG.info("Got event " + event.getType() + " for service "
+        + event.getServiceID());
+    AuxiliaryService service = serviceMap.get(event.getServiceID());
+    if (null == service) {
+      // TODO kill all containers waiting on Application
+      return;
+    }
+    switch (event.getType()) {
+    case APPLICATION_INIT:
+      service.initApp(event.getUser(), event.getApplicationID(),
+          event.getServiceData());
+      break;
+    case APPLICATION_STOP:
+      service.stopApp(event.getApplicationID());
+      break;
+    default:
+      throw new RuntimeException("Unknown type: " + event.getType());
+    }
+  }
+
+  public interface AuxiliaryService extends Service {
+    void initApp(String user, ApplicationID appId, ByteBuffer data);
+    void stopApp(ApplicationID appId);
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,64 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+
+public class AuxServicesEvent extends AbstractEvent<AuxServicesEventType> {
+
+  private final String user;
+  private final String serviceId;
+  private final ByteBuffer serviceData;
+  private final ApplicationID appId;
+
+  public AuxServicesEvent(AuxServicesEventType eventType, ApplicationID appId) {
+    this(eventType, null, appId, null, null);
+  }
+
+  public AuxServicesEvent(AuxServicesEventType eventType, String user,
+      ApplicationID appId, String serviceId, ByteBuffer serviceData) {
+    super(eventType);
+    this.user = user;
+    this.appId = appId;
+    this.serviceId = serviceId;
+    this.serviceData = serviceData;
+  }
+
+  public String getServiceID() {
+    return serviceId;
+  }
+
+  public ByteBuffer getServiceData() {
+    return serviceData;
+  }
+
+  public String getUser() {
+    return user;
+  }
+
+  public ApplicationID getApplicationID() {
+    return appId;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEventType.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEventType.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEventType.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,24 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+public enum AuxServicesEventType {
+  APPLICATION_INIT,
+  APPLICATION_STOP
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,295 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_BIND_ADDRESS;
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_BIND_ADDRESS;
+import static org.apache.hadoop.yarn.service.Service.STATE.STARTED;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerManager;
+import org.apache.hadoop.yarn.ContainerStatus;
+import org.apache.hadoop.yarn.YarnRemoteException;
+
+public class ContainerManagerImpl extends CompositeService implements
+    ServiceStateChangeListener, ContainerManager, EventHandler<Event> {
+
+  private static final Log LOG = LogFactory.getLog(ContainerManagerImpl.class);
+
+  final Context context;
+  private Server server;
+  private InetSocketAddress cmAddr;
+  private final ResourceLocalizationService rsrcLocalizationSrvc;
+  private final ContainersLauncher containersLauncher;
+  private final AuxServices auxiluaryServices;
+
+  private final NodeStatusUpdater nodeStatusUpdater;
+  private ContainerTokenSecretManager containerTokenSecretManager;
+
+  protected AsyncDispatcher dispatcher;
+
+  public ContainerManagerImpl(Context context, ContainerExecutor exec,
+      DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater) {
+    super(ContainerManagerImpl.class.getName());
+    this.context = context;
+    dispatcher = new AsyncDispatcher();
+
+    rsrcLocalizationSrvc =
+        createResourceLocalizationService(exec, deletionContext);
+    addService(rsrcLocalizationSrvc);
+
+    containersLauncher = createContainersLauncher(context, exec);
+    addService(containersLauncher);
+
+    this.nodeStatusUpdater = nodeStatusUpdater;
+    // Create the secretManager if need be.
+    if (UserGroupInformation.isSecurityEnabled()) {
+      LOG.info("Security is enabled on NodeManager. "
+          + "Creating ContainerTokenSecretManager");
+      this.containerTokenSecretManager = new ContainerTokenSecretManager();
+    }
+
+    // Start configurable services
+    auxiluaryServices = new AuxServices();
+    auxiluaryServices.register(this);
+    addService(auxiluaryServices);
+
+    ContainersMonitor containersMonitor = new ContainersMonitor();
+    addService(containersMonitor);
+
+    dispatcher.register(ContainerEventType.class,
+        new ContainerEventDispatcher());
+    dispatcher.register(ApplicationEventType.class,
+        new ApplicationEventDispatcher());
+    dispatcher.register(LocalizerEventType.class, rsrcLocalizationSrvc);
+    dispatcher.register(AuxServicesEventType.class, auxiluaryServices);
+    dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
+    dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
+    addService(dispatcher);
+  }
+
+  protected ResourceLocalizationService createResourceLocalizationService(
+      ContainerExecutor exec, DeletionService deletionContext) {
+    return new ResourceLocalizationService(this.dispatcher, exec,
+        deletionContext);
+  }
+
+  protected ContainersLauncher createContainersLauncher(Context context,
+      ContainerExecutor exec) {
+    return new ContainersLauncher(context, this.dispatcher, exec);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    // TODO enqueue user dirs in deletion context
+    cmAddr = NetUtils.createSocketAddr(
+        conf.get(NM_BIND_ADDRESS, DEFAULT_NM_BIND_ADDRESS));
+    Configuration cmConf = new Configuration(conf);
+    cmConf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+        ContainerManagerSecurityInfo.class, SecurityInfo.class);
+    super.init(cmConf);
+  }
+
+  @Override
+  public void start() {
+    YarnRPC rpc = YarnRPC.create(getConfig());
+    if (UserGroupInformation.isSecurityEnabled()) {
+      // This is fine as status updater is started before ContainerManager and
+      // RM gives the shared secret in registration during StatusUpdter#start()
+      // itself.
+      this.containerTokenSecretManager.setSecretKey(
+          this.nodeStatusUpdater.getNodeName(),
+          this.nodeStatusUpdater.getRMNMSharedSecret());
+    }
+    server =
+        rpc.getServer(ContainerManager.class, this, cmAddr, getConfig(),
+            this.containerTokenSecretManager);
+    LOG.info("ContainerManager started at " + cmAddr);
+    server.start();
+    super.start();
+  }
+
+  @Override
+  public void stop() {
+    if (auxiluaryServices.getServiceState() == STARTED) {
+      auxiluaryServices.unregister(this);
+    }
+    if (server != null) {
+      server.close();
+    }
+    super.stop();
+  }
+
+  @Override
+  public Void cleanupContainer(ContainerID containerID)
+      throws YarnRemoteException {
+    // TODO Is this necessary?
+    return null;
+  }
+
+  @Override
+  public Void startContainer(ContainerLaunchContext launchContext)
+      throws YarnRemoteException {
+    Container container = new ContainerImpl(this.dispatcher, launchContext);
+    ContainerID containerID = launchContext.id;
+    ApplicationID applicationID = containerID.appID;
+    if (this.context.getContainers().putIfAbsent(containerID, container) != null) {
+      throw RPCUtil.getRemoteException("Container " + containerID
+          + " already is running on this node!!");
+    }
+//    if (LOG.isDebugEnabled()) { TODO
+      LOG.info("CONTAINER: " + launchContext);
+//    }
+
+    // Create the application
+    Application application = new ApplicationImpl(this.dispatcher,
+        launchContext.user, launchContext.id.appID,
+        launchContext.env, launchContext.resources,
+        launchContext.containerTokens);
+    if (this.context.getApplications().putIfAbsent(applicationID, application) == null) {
+      LOG.info("Creating a new application reference for app "
+          + applicationID);
+    }
+
+    // TODO: Validate the request
+    dispatcher.getEventHandler().handle(
+        new ApplicationInitEvent(containerID));
+    return null;
+  }
+
+  @Override
+  public Void stopContainer(ContainerID containerID)
+      throws YarnRemoteException {
+    Container container = this.context.getContainers().get(containerID);
+    if (container == null) {
+      LOG.warn("Trying to stop unknown container " + containerID);
+      return null;
+      //throw RPCUtil.getRemoteException("Trying to stop unknown container "
+      //    + containerID + " on this NodeManager!!");
+    }
+    dispatcher.getEventHandler().handle(
+        new ContainerEvent(containerID, ContainerEventType.KILL_CONTAINER));
+
+    // TODO: Move this code to appropriate place once kill_container is
+    // implemented.
+    nodeStatusUpdater.sendOutofBandHeartBeat();
+
+    return null;
+  }
+
+  @Override
+  public ContainerStatus getContainerStatus(ContainerID containerID)
+      throws YarnRemoteException {
+    LOG.info("Getting container-status for " + containerID);
+    Container container = this.context.getContainers().get(containerID);
+    if (container != null) {
+      ContainerStatus containerStatus = container.getContainerStatus();
+      LOG.info("Returning " + containerStatus);
+      return containerStatus;
+    } else {
+      throw RPCUtil.getRemoteException("Container " + containerID
+          + " is not handled by this NodeManager");
+    }
+  }
+
+  class ContainerEventDispatcher implements EventHandler<ContainerEvent> {
+    @Override
+    public void handle(ContainerEvent event) {
+      Map<ContainerID,Container> containers =
+        ContainerManagerImpl.this.context.getContainers();
+      Container c = containers.get(event.getContainerID());
+      if (c != null) {
+        c.handle(event);
+      } else {
+        LOG.warn("Event " + event + " sent to absent container " +
+            event.getContainerID());
+      }
+    }
+  }
+
+  class ApplicationEventDispatcher implements EventHandler<ApplicationEvent> {
+
+    @Override
+    public void handle(ApplicationEvent event) {
+      Application app = 
+      ContainerManagerImpl.this.context.getApplications().get(
+          event.getApplicationID());
+      if (app != null) {
+        app.handle(event);
+      } else {
+        LOG.warn("Event " + event + " sent to absent application " +
+            event.getApplicationID());
+      }
+    }
+    
+  }
+
+  @Override
+  public void handle(Event event) {
+    dispatcher.getEventHandler().handle(event);
+  }
+
+  @Override
+  public void stateChanged(Service service) {
+    // TODO Auto-generated method stub
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerMonitor.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerMonitor.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerMonitor.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,23 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+public interface ContainerMonitor {
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,46 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.LocalResource;
+import org.apache.hadoop.yarn.LocalResourceVisibility;
+
+
+public interface Application extends EventHandler<ApplicationEvent> {
+
+  Map<CharSequence,CharSequence> getEnvironment();
+
+  Map<String,LocalResource> getResources(LocalResourceVisibility vis);
+
+  Map<Path,String> getLocalizedResources();
+
+  String getUser();
+
+  ApplicationID getAppId();
+
+  Credentials getCredentials() throws IOException;
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.ApplicationID;
+
+public class ApplicationEvent extends AbstractEvent<ApplicationEventType> {
+
+  private final ApplicationID applicationID;
+
+  public ApplicationEvent(ApplicationID appID,
+      ApplicationEventType appEventType) {
+    super(appEventType);
+    this.applicationID = appID;
+  }
+
+  public ApplicationID getApplicationID() {
+    return this.applicationID;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,27 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+
+public enum ApplicationEventType {
+  INIT_APPLICATION,
+  FINISH_APPLICATION,
+
+  APPLICATION_INITED,
+  APPLICATION_FINISHED,
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,285 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.AvroUtil;
+
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.LocalResource;
+import org.apache.hadoop.yarn.LocalResourceVisibility;
+
+public class ApplicationImpl implements Application {
+
+  final Dispatcher dispatcher;
+  final String user;
+  final ApplicationID appId;
+  final Map<CharSequence,CharSequence> env;
+  final Map<CharSequence,LocalResource> resources;
+  final ByteBuffer containerTokens;
+  Map<Path,String> localizedResources;
+
+  private static final Log LOG = LogFactory.getLog(Application.class);
+
+  private List<ContainerID> containersWaitingForAppInit =
+      new ArrayList<ContainerID>();
+
+  // TODO check for suitability of symlink name
+  static Map<String,LocalResource>
+    filterResources(Map<CharSequence,LocalResource> resources,
+        LocalResourceVisibility state) {
+    Map<String,LocalResource> ret =
+      new HashMap<String,LocalResource>();
+    for (Map.Entry<CharSequence,LocalResource> rsrc : resources.entrySet()) {
+      if (state.equals(rsrc.getValue().state)) {
+        ret.put(rsrc.getKey().toString(), rsrc.getValue());
+      }
+    }
+    return ret;
+  }
+
+  public ApplicationImpl(Dispatcher dispatcher,
+      CharSequence user,
+      ApplicationID appId,
+      Map<CharSequence,CharSequence> env,
+      Map<CharSequence,LocalResource> resources,
+      ByteBuffer containerTokens) {
+    this.dispatcher = dispatcher;
+    this.user = user.toString();
+    this.appId = appId;
+    this.env = env;
+    this.resources = null == resources
+      ? new HashMap<CharSequence,LocalResource>()
+      : resources;
+    this.containerTokens = containerTokens;
+    stateMachine = stateMachineFactory.make(this);
+  }
+
+  @Override
+  public String getUser() {
+    return user.toString();
+  }
+
+  @Override
+  public ApplicationID getAppId() {
+    return appId;
+  }
+
+  @Override
+  public Map<CharSequence,CharSequence> getEnvironment() {
+    return env;
+  }
+
+  @Override
+  public Map<String,LocalResource>
+      getResources(LocalResourceVisibility vis) {
+    final Map<String,LocalResource> ret;
+    if (LocalResourceVisibility.PUBLIC.equals(vis)) {
+      ret = filterResources(resources, LocalResourceVisibility.PUBLIC);
+    } else {
+      // TODO separate these
+      ret = filterResources(resources, LocalResourceVisibility.PRIVATE);
+      ret.putAll(filterResources(resources, LocalResourceVisibility.APPLICATION));
+    }
+    return Collections.unmodifiableMap(ret);
+  }
+
+  @Override
+  public Map<Path,String> getLocalizedResources() {
+    if (ApplicationState.INITED.equals(stateMachine.getCurrentState())) {
+      return localizedResources;
+    }
+    throw new IllegalStateException(
+        "Invalid request for " + stateMachine.getCurrentState());
+  }
+
+  @Override
+  public Credentials getCredentials() throws IOException {
+    Credentials ret = new Credentials();
+    if (containerTokens != null) {
+      DataInputByteBuffer buf = new DataInputByteBuffer();
+      buf.reset(containerTokens);
+      ret.readTokenStorageStream(buf);
+      for (Token<? extends TokenIdentifier> tk : ret.getAllTokens()) {
+        LOG.info(" In Nodemanager , token " + tk);
+      }
+    }
+    return ret;
+  }
+
+  private static StateMachineFactory
+         <ApplicationImpl, ApplicationState, ApplicationEventType, ApplicationEvent>
+       stateMachineFactory =
+      new StateMachineFactory
+         <ApplicationImpl, ApplicationState, ApplicationEventType, ApplicationEvent>
+       (ApplicationState.NEW)
+    .addTransition(ApplicationState.NEW, ApplicationState.INITING,
+        ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
+    .addTransition(ApplicationState.INITING, ApplicationState.INITING,
+        ApplicationEventType.INIT_APPLICATION, new AppIsInitingTransition())
+    .addTransition(ApplicationState.INITING, ApplicationState.FINISHING,
+        ApplicationEventType.FINISH_APPLICATION, new AppFinishingTransition())
+    .addTransition(ApplicationState.INITING, ApplicationState.INITED,
+        ApplicationEventType.APPLICATION_INITED, new AppInitedTransition())
+    .addTransition(ApplicationState.INITED,
+        ApplicationState.INITED, ApplicationEventType.INIT_APPLICATION,
+        new AppHasInitedTransition())
+    .addTransition(ApplicationState.INITED,
+        ApplicationState.FINISHING, ApplicationEventType.FINISH_APPLICATION,
+        new AppFinishingTransition())
+    .addTransition(ApplicationState.FINISHING,
+        ApplicationState.FINISHED, ApplicationEventType.APPLICATION_FINISHED)
+    // TODO failure transitions are completely broken
+
+    // create the topology tables
+    .installTopology();
+
+  private final StateMachine<ApplicationState, ApplicationEventType, ApplicationEvent>
+    stateMachine;
+
+  static class AppInitTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl app, ApplicationEvent event) {
+      ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
+      ContainerID cId = initEvent.getContainerRequestingAppInit();
+      app.containersWaitingForAppInit.add(cId);
+      app.dispatcher.getEventHandler().handle(
+          new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
+      app.dispatcher.getEventHandler().handle(
+          new ApplicationLocalizerEvent(
+            LocalizerEventType.INIT_APPLICATION_RESOURCES, app));
+    }
+  }
+
+  static class AppIsInitingTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl app, ApplicationEvent event) {
+      ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
+      ContainerID cId = initEvent.getContainerRequestingAppInit();
+      app.containersWaitingForAppInit.add(cId);
+      app.dispatcher.getEventHandler().handle(
+          new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
+    }
+  }
+
+  static class AppInitedTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl app, ApplicationEvent event) {
+
+      ApplicationInitedEvent initedEvent = (ApplicationInitedEvent) event;
+      app.localizedResources = initedEvent.getLocalizedResources();
+      // Start all the containers waiting for ApplicationInit
+      for (ContainerID containerID : app.containersWaitingForAppInit) {
+        app.dispatcher.getEventHandler().handle(
+            new ContainerEvent(containerID,
+              ContainerEventType.CONTAINER_RESOURCES_LOCALIZED));
+      }
+    }
+  }
+
+  static class AppHasInitedTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl app, ApplicationEvent event) {
+      ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
+      ContainerID containerID = initEvent.getContainerRequestingAppInit();
+      app.dispatcher.getEventHandler().handle(
+          new ContainerEvent(containerID, ContainerEventType.INIT_CONTAINER));
+      app.dispatcher.getEventHandler().handle(
+          new ContainerEvent(containerID,
+            ContainerEventType.CONTAINER_RESOURCES_LOCALIZED));
+    }
+  }
+
+  static class AppFinishingTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl app, ApplicationEvent event) {
+
+      // Send event to ContainersLauncher to finish all the containers of this
+      // application.
+      for (ContainerID containerID : app.containersWaitingForAppInit) {
+        app.dispatcher.getEventHandler().handle(
+            new ContainerEvent(containerID,
+              ContainerEventType.KILL_CONTAINER));
+      }
+
+      // Delete Application level resources
+      app.dispatcher.getEventHandler().handle(
+          new ApplicationLocalizerEvent(
+            LocalizerEventType.DESTROY_APPLICATION_RESOURCES, app));
+
+      // TODO: Trigger the LogsManager
+    }
+  }
+
+  @Override
+  public synchronized void handle(ApplicationEvent event) {
+
+    ApplicationID applicationID = event.getApplicationID();
+    LOG.info("Processing " + applicationID + " of type " + event.getType());
+
+    ApplicationState oldState = stateMachine.getCurrentState();
+    ApplicationState newState = null;
+    try {
+      // queue event requesting init of the same app
+      newState = stateMachine.doTransition(event.getType(), event);
+    } catch (InvalidStateTransitonException e) {
+      LOG.warn("Can't handle this event at current state", e);
+    }
+    if (oldState != newState) {
+      LOG.info("Application " + applicationID + " transitioned from "
+          + oldState + " to " + newState);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return AvroUtil.toString(appId);
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+
+import org.apache.hadoop.yarn.ContainerID;
+
+public class ApplicationInitEvent extends ApplicationEvent {
+
+  private final ContainerID containerRequestingAppInit;
+
+  public ApplicationInitEvent(ContainerID containerID) {
+    super(containerID.appID, ApplicationEventType.INIT_APPLICATION);
+    this.containerRequestingAppInit = containerID;
+  }
+
+  public ContainerID getContainerRequestingAppInit() {
+    return this.containerRequestingAppInit;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitedEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitedEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitedEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.ApplicationID;
+
+public class ApplicationInitedEvent extends ApplicationEvent {
+
+  private final Path workdir;
+  private final Map<Path,String> localizedResources;
+
+  public ApplicationInitedEvent(ApplicationID appID,
+      Map<Path,String> localizedResources, Path workdir) {
+    super(appID, ApplicationEventType.APPLICATION_INITED);
+    this.workdir = workdir;
+    this.localizedResources = localizedResources;
+  }
+
+  public Map<Path,String> getLocalizedResources() {
+    return localizedResources;
+  }
+
+  public Path getWorkDirectory() {
+    return workdir;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationState.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationState.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationState.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationState.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,23 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+
+public enum ApplicationState {
+  NEW, INITING, INITED, FINISHING, FINISHED 
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,35 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerStatus;
+
+public interface Container extends EventHandler<ContainerEvent> {
+
+  org.apache.hadoop.yarn.Container getContainer();
+
+  // TODO overly-general interface
+  ContainerLaunchContext getLaunchContext();
+
+  ContainerStatus getContainerStatus();
+
+  String toString();
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.ContainerID;
+
+public class ContainerEvent extends AbstractEvent<ContainerEventType> {
+
+  private final ContainerID containerID;
+
+  public ContainerEvent(ContainerID cID, ContainerEventType eventType) {
+    super(eventType);
+    this.containerID = cID;
+  }
+
+  public ContainerID getContainerID() {
+    return containerID;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+public enum ContainerEventType {
+
+  // Producer: ContainerManager
+  INIT_CONTAINER,
+  KILL_CONTAINER,
+  CONTAINER_DONE,
+
+  // DownloadManager
+  CONTAINER_INITED,
+  CONTAINER_RESOURCES_LOCALIZED,
+  CONTAINER_RESOURCES_CLEANEDUP,
+
+  // Producer: ContainersLauncher
+  CONTAINER_LAUNCHED,
+  CONTAINER_EXITED_WITH_SUCCESS,
+  CONTAINER_EXITED_WITH_FAILURE,
+  CONTAINER_CLEANEDUP_AFTER_KILL,
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,315 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerStatus;
+
+public class ContainerImpl implements Container {
+
+  private final Dispatcher dispatcher;
+  private final ContainerLaunchContext launchContext;
+
+  private static final Log LOG = LogFactory.getLog(Container.class);
+
+  public ContainerImpl(Dispatcher dispatcher,
+      ContainerLaunchContext launchContext) {
+    this.dispatcher = dispatcher;
+    this.launchContext = launchContext;
+
+    stateMachine = stateMachineFactory.make(this);
+  }
+
+  // State Machine for each container.
+  private static StateMachineFactory
+           <ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>
+        stateMachineFactory =
+      new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW)
+    // From NEW State
+    .addTransition(ContainerState.NEW, ContainerState.LOCALIZING,
+        ContainerEventType.INIT_CONTAINER)
+    .addTransition(ContainerState.NEW, ContainerState.DONE,
+        ContainerEventType.KILL_CONTAINER)
+
+    // From LOCALIZING State
+    .addTransition(ContainerState.LOCALIZING,
+        ContainerState.LOCALIZED,
+        ContainerEventType.CONTAINER_RESOURCES_LOCALIZED,
+        new LocalizedTransition())
+    .addTransition(ContainerState.LOCALIZING,
+        ContainerState.CONTAINER_RESOURCES_CLEANINGUP,
+        ContainerEventType.KILL_CONTAINER,
+        new KillDuringLocalizationTransition())
+
+    // From LOCALIZED State
+    .addTransition(ContainerState.LOCALIZED, ContainerState.RUNNING,
+        ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
+    .addTransition(ContainerState.LOCALIZED, ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+        new ExitedWithFailureTransition())
+
+    // From RUNNING State
+    .addTransition(ContainerState.RUNNING,
+        ContainerState.EXITED_WITH_SUCCESS,
+        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+        new ExitedWithSuccessTransition())
+    .addTransition(ContainerState.RUNNING,
+        ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+        new ExitedWithFailureTransition())
+    .addTransition(ContainerState.RUNNING, ContainerState.KILLING,
+        ContainerEventType.KILL_CONTAINER, new KillTransition())
+
+    // From CONTAINER_EXITED_WITH_SUCCESS State
+    .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
+            ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)
+    .addTransition(ContainerState.EXITED_WITH_SUCCESS,
+                   ContainerState.EXITED_WITH_SUCCESS,
+                   ContainerEventType.KILL_CONTAINER)
+
+    // From EXITED_WITH_FAILURE State
+    .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
+            ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)
+    .addTransition(ContainerState.EXITED_WITH_FAILURE,
+                   ContainerState.EXITED_WITH_FAILURE,
+                   ContainerEventType.KILL_CONTAINER)
+
+    // From KILLING State.
+    .addTransition(ContainerState.KILLING,
+        ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+        ContainerEventType.CONTAINER_CLEANEDUP_AFTER_KILL,
+        new ContainerKilledTransition())
+    .addTransition(ContainerState.KILLING, ContainerState.KILLING,
+        ContainerEventType.KILL_CONTAINER)
+    .addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_SUCCESS,
+        ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+        new ExitedWithSuccessTransition())
+    .addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_FAILURE,
+        ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+        new ExitedWithFailureTransition())
+
+    // From CONTAINER_CLEANEDUP_AFTER_KILL State.
+    .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+            ContainerState.DONE,
+            ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)
+
+    // From DONE
+    .addTransition(ContainerState.DONE, ContainerState.DONE,
+        ContainerEventType.KILL_CONTAINER)
+
+    // create the topology tables
+    .installTopology();
+
+  private final StateMachine<ContainerState, ContainerEventType, ContainerEvent>
+    stateMachine;
+
+  private synchronized org.apache.hadoop.yarn.ContainerState getCurrentState() {
+    switch (stateMachine.getCurrentState()) {
+    case NEW:
+    case LOCALIZING:
+    case LOCALIZED:
+      return org.apache.hadoop.yarn.ContainerState.INTIALIZING;
+    case RUNNING:
+    case EXITED_WITH_SUCCESS:
+    case EXITED_WITH_FAILURE:
+    case KILLING:
+    case CONTAINER_CLEANEDUP_AFTER_KILL:
+    case CONTAINER_RESOURCES_CLEANINGUP:
+      return org.apache.hadoop.yarn.ContainerState.RUNNING;
+    case DONE:
+    default:
+      return org.apache.hadoop.yarn.ContainerState.COMPLETE;
+    }
+  }
+
+  @Override
+  public synchronized org.apache.hadoop.yarn.Container getContainer() {
+    org.apache.hadoop.yarn.Container c = new org.apache.hadoop.yarn.Container();
+    c.id = this.launchContext.id;
+    c.resource = this.launchContext.resource;
+    c.state = getCurrentState();
+    return c;
+  }
+
+  @Override
+  public ContainerLaunchContext getLaunchContext() {
+    return launchContext;
+  }
+
+  @Override
+  public ContainerStatus getContainerStatus() {
+    ContainerStatus containerStatus = new ContainerStatus();
+    containerStatus.state = getCurrentState();
+    containerStatus.containerID = this.launchContext.id;
+    // TODO: Exit status.
+    return containerStatus;
+  }
+
+  static class ContainerTransition implements
+      SingleArcTransition<ContainerImpl, ContainerEvent> {
+
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      // Just drain the event and change the state.
+    }
+
+  }
+
+  static class LocalizedTransition extends ContainerTransition {
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      // XXX This needs to be container-oriented
+      // Inform the AuxServices about the opaque serviceData
+      ContainerLaunchContext ctxt = container.getLaunchContext();
+      Map<CharSequence,ByteBuffer> csd = ctxt.serviceData;
+      if (csd != null) {
+        // TODO: Isn't this supposed to happen only once per Application?
+        for (Map.Entry<CharSequence,ByteBuffer> service : csd.entrySet()) {
+          container.dispatcher.getEventHandler().handle(
+              new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT,
+                ctxt.user.toString(), ctxt.id.appID,
+                service.getKey().toString(), service.getValue()));
+        }
+      }
+      container.dispatcher.getEventHandler().handle(
+          new ContainersLauncherEvent(container,
+              ContainersLauncherEventType.LAUNCH_CONTAINER));
+    }
+  }
+
+  static class LaunchTransition extends ContainerTransition {
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      // Inform the ContainersMonitor to start monitoring the container's
+      // resource usage.
+      container.dispatcher.getEventHandler().handle(
+          new ContainersMonitorEvent(
+              ContainersMonitorEventType.START_MONITORING_CONTAINER));
+    }
+  }
+
+  static class ExitedWithSuccessTransition extends ContainerTransition {
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      // TODO: Add containerWorkDir to the deletion service.
+
+      // Inform the localizer to decrement reference counts and cleanup
+      // resources.
+      container.dispatcher.getEventHandler().handle(
+          new ContainerLocalizerEvent(
+            LocalizerEventType.CLEANUP_CONTAINER_RESOURCES, container));
+    }
+  }
+
+  static class ExitedWithFailureTransition extends ContainerTransition {
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      // TODO: Add containerWorkDir to the deletion service.
+      // TODO: Add containerOuputDir to the deletion service.
+
+      // Inform the localizer to decrement reference counts and cleanup
+      // resources.
+      container.dispatcher.getEventHandler().handle(
+          new ContainerLocalizerEvent(
+            LocalizerEventType.CLEANUP_CONTAINER_RESOURCES, container));
+    }
+  }
+
+  static class KillDuringLocalizationTransition implements
+      SingleArcTransition<ContainerImpl, ContainerEvent> {
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      // Inform the localizer to decrement reference counts and cleanup
+      // resources.
+      container.dispatcher.getEventHandler().handle(
+          new ContainerLocalizerEvent(
+            LocalizerEventType.CLEANUP_CONTAINER_RESOURCES, container));
+
+    }
+  }
+
+  static class KillTransition implements
+      SingleArcTransition<ContainerImpl, ContainerEvent> {
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      // Kill the process/process-grp
+      container.dispatcher.getEventHandler().handle(
+          new ContainersLauncherEvent(container,
+              ContainersLauncherEventType.CLEANUP_CONTAINER));
+    }
+  }
+
+  static class ContainerKilledTransition implements
+      SingleArcTransition<ContainerImpl, ContainerEvent> {
+    @Override
+    public void transition(ContainerImpl container, ContainerEvent event) {
+      // The process/process-grp is killed. Decrement reference counts and
+      // cleanup resources
+      container.dispatcher.getEventHandler().handle(
+          new ContainerLocalizerEvent(
+            LocalizerEventType.CLEANUP_CONTAINER_RESOURCES, container));
+    }
+  }
+
+  @Override
+  public synchronized void handle(ContainerEvent event) {
+
+    ContainerID containerID = event.getContainerID();
+    LOG.info("Processing " + containerID + " of type " + event.getType());
+
+    ContainerState oldState = stateMachine.getCurrentState();
+    ContainerState newState = null;
+    try {
+      newState =
+          stateMachine.doTransition(event.getType(), event);
+    } catch (InvalidStateTransitonException e) {
+      LOG.warn("Can't handle this event at current state", e);
+    }
+    if (oldState != newState) {
+      LOG.info("Container " + containerID + " transitioned from " + oldState
+          + " to " + newState);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return AvroUtil.toString(launchContext.id);
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,25 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+public enum ContainerState {
+  NEW, LOCALIZING, LOCALIZED, RUNNING, EXITED_WITH_SUCCESS,
+  EXITED_WITH_FAILURE, KILLING, CONTAINER_CLEANEDUP_AFTER_KILL,
+  CONTAINER_RESOURCES_CLEANINGUP, DONE
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,136 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+
+import java.io.DataOutputStream;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+
+public class ContainerLaunch implements Callable<Integer> {
+
+  private static final Log LOG = LogFactory.getLog(ContainerLaunch.class);
+
+  public static final String CONTAINER_SCRIPT = "task.sh";
+
+  private final Dispatcher dispatcher;
+  private final ContainerExecutor exec;
+  private final Application app;
+  private final Container container;
+  private final Path sysDir;
+  private final List<Path> appDirs;
+
+  public ContainerLaunch(Dispatcher dispatcher, ContainerExecutor exec,
+      Application app, Container container, Path sysDir, List<Path> appDirs) {
+    this.app = app;
+    this.exec = exec;
+    this.sysDir = sysDir;
+    this.appDirs = appDirs;
+    this.container = container;
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public Integer call() {
+    final ContainerLaunchContext launchContext = container.getLaunchContext();
+    final Map<Path,String> localizedResources = app.getLocalizedResources();
+    final String user = launchContext.user.toString();
+    final Map<CharSequence,CharSequence> env = launchContext.env;
+    final List<CharSequence> command = launchContext.command;
+    int ret = -1;
+    try {
+      FileContext lfs = FileContext.getLocalFSFileContext();
+      Path launchSysDir = new Path(sysDir, container.toString());
+      lfs.mkdir(launchSysDir, null, false);
+      Path launchPath = new Path(launchSysDir, CONTAINER_SCRIPT);
+      Path tokensPath =
+        new Path(launchSysDir, ApplicationLocalizer.APPTOKEN_FILE);
+      DataOutputStream launchOut = null;
+      DataOutputStream tokensOut = null;
+      
+      try {
+        launchOut = lfs.create(launchPath, EnumSet.of(CREATE, OVERWRITE));
+        ApplicationLocalizer.writeLaunchEnv(launchOut, env, localizedResources,
+            command, appDirs);
+        
+        tokensOut = lfs.create(tokensPath, EnumSet.of(CREATE, OVERWRITE));
+        Credentials creds = new Credentials();
+        if (container.getLaunchContext().containerTokens != null) {
+          // TODO: Is the conditional the correct way of checking?
+          DataInputByteBuffer buf = new DataInputByteBuffer();
+          container.getLaunchContext().containerTokens.rewind();
+          buf.reset(container.getLaunchContext().containerTokens);
+          creds.readTokenStorageStream(buf);
+          for (Token<? extends TokenIdentifier> tk : creds.getAllTokens()) {
+            LOG.debug(tk.getService() + " = " + tk.toString());
+          }
+        }
+        creds.writeTokenStorageToStream(tokensOut);
+      } finally {
+        IOUtils.cleanup(LOG, launchOut, tokensOut);
+        if (launchOut != null) {
+          launchOut.close();
+        }
+      }
+      dispatcher.getEventHandler().handle(new ContainerEvent(
+            container.getLaunchContext().id,
+            ContainerEventType.CONTAINER_LAUNCHED));
+      ret =
+        exec.launchContainer(container, launchSysDir, user, app.toString(),
+            appDirs, null, null);
+      if (ret != 0) {
+        throw new ExitCodeException(ret, "Container failed");
+      }
+    } catch (Throwable e) {
+      LOG.warn("Failed to launch container", e);
+      dispatcher.getEventHandler().handle(new ContainerEvent(
+            launchContext.id,
+            ContainerEventType.CONTAINER_EXITED_WITH_FAILURE));
+      return ret;
+    }
+    LOG.info("Container " + container + " succeeded " + launchContext.id);
+    dispatcher.getEventHandler().handle(new ContainerEvent(
+          launchContext.id, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
+    return 0;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,149 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.*;
+
+public class ContainersLauncher extends AbstractService
+    implements EventHandler<ContainersLauncherEvent> {
+
+  private final Context context;
+  private final ContainerExecutor exec;
+  private final Dispatcher dispatcher;
+  private final ExecutorService containerLauncher =
+    Executors.newCachedThreadPool();
+  private List<Path> logDirs;
+  private List<Path> localDirs;
+  private List<Path> sysDirs;
+  private final Map<ContainerID,Future<Integer>> running =
+    Collections.synchronizedMap(new HashMap<ContainerID,Future<Integer>>());
+
+  public ContainersLauncher(Context context, Dispatcher dispatcher,
+      ContainerExecutor exec) {
+    super("containers-launcher");
+    this.exec = exec;
+    this.context = context;
+    this.dispatcher = dispatcher;
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    // TODO factor this out of Localizer
+    try {
+      FileContext lfs = FileContext.getLocalFSFileContext(conf);
+      String[] sLocalDirs = conf.getStrings(NM_LOCAL_DIR, DEFAULT_NM_LOCAL_DIR);
+
+      localDirs = new ArrayList<Path>(sLocalDirs.length);
+      logDirs = new ArrayList<Path>(sLocalDirs.length);
+      sysDirs = new ArrayList<Path>(sLocalDirs.length);
+      for (String sLocaldir : sLocalDirs) {
+        Path localdir = new Path(sLocaldir);
+        localDirs.add(localdir);
+        // $local/nmPrivate
+        Path sysdir = new Path(localdir, ResourceLocalizationService.NM_PRIVATE_DIR);
+        sysDirs.add(sysdir);
+      }
+      String[] sLogdirs = conf.getStrings(NM_LOG_DIR, DEFAULT_NM_LOG_DIR);
+      for (String sLogdir : sLogdirs) {
+        Path logdir = new Path(sLogdir);
+        logDirs.add(logdir);
+      }
+    } catch (IOException e) {
+      throw new YarnException("Failed to start ContainersLauncher", e);
+    }
+    localDirs = Collections.unmodifiableList(localDirs);
+    logDirs = Collections.unmodifiableList(logDirs);
+    sysDirs = Collections.unmodifiableList(sysDirs);
+    super.init(conf);
+  }
+
+  @Override
+  public void stop() {
+    containerLauncher.shutdownNow();
+    super.stop();
+  }
+
+  @Override
+  public void handle(ContainersLauncherEvent event) {
+    // TODO: ContainersLauncher launches containers one by one!!
+    Container container = event.getContainer();
+    ContainerID containerId = container.getLaunchContext().id;
+    switch (event.getType()) {
+      case LAUNCH_CONTAINER:
+        Application app =
+          context.getApplications().get(containerId.appID);
+        List<Path> appDirs = new ArrayList<Path>(localDirs.size());
+        for (Path p : localDirs) {
+          Path usersdir = new Path(p, ApplicationLocalizer.USERCACHE);
+          Path userdir = new Path(usersdir,
+              container.getLaunchContext().user.toString());
+          Path appsdir = new Path(userdir, ApplicationLocalizer.APPCACHE);
+          appDirs.add(new Path(appsdir, app.toString()));
+        }
+        Path appSysDir = new Path(sysDirs.get(0), app.toString());
+        // TODO set in Application
+        //Path appLogDir = new Path(logDirs.get(0), app.toString());
+        ContainerLaunch launch =
+          new ContainerLaunch(dispatcher, exec, app,
+              event.getContainer(), appSysDir, appDirs);
+        running.put(containerId, containerLauncher.submit(launch));
+        break;
+      case CLEANUP_CONTAINER:
+        Future<Integer> rContainer = running.remove(containerId);
+        if (rContainer != null) {
+          // TODO needs to kill the container
+          rContainer.cancel(false);
+        }
+      dispatcher.getEventHandler().handle(
+          new ContainerEvent(containerId,
+              ContainerEventType.CONTAINER_CLEANEDUP_AFTER_KILL));
+        break;
+    }
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,40 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+
+public class ContainersLauncherEvent
+    extends AbstractEvent<ContainersLauncherEventType>{
+
+  private final Container container;
+
+  public ContainersLauncherEvent(Container container,
+      ContainersLauncherEventType eventType) {
+    super(eventType);
+    this.container = container;
+  }
+
+  public Container getContainer() {
+    return container;
+  }
+
+}



Mime
View raw message