Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServices.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,151 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.service.FilterService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
+import org.apache.hadoop.yarn.ApplicationID;
+
+public class AuxServices extends AbstractService
+ implements ServiceStateChangeListener, EventHandler<AuxServicesEvent> {
+
+ private static final Log LOG = LogFactory.getLog(AuxServices.class);
+
+ public static final String AUX_SERVICES = "nodemanager.auxiluary.services";
+ public static final String AUX_SERVICE_CLASS_FMT =
+ "nodemanager.aux.service.%s.class";
+ public final Map<String,AuxiliaryService> serviceMap;
+
+ public AuxServices() {
+ super(AuxServices.class.getName());
+ serviceMap =
+ Collections.synchronizedMap(new HashMap<String,AuxiliaryService>());
+ // Obtain services from configuration in init()
+ }
+
+ protected final synchronized void addService(String name,
+ AuxiliaryService service) {
+ serviceMap.put(name, service);
+ }
+
+ Collection<AuxiliaryService> getServices() {
+ return Collections.unmodifiableCollection(serviceMap.values());
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ Collection<String> auxNames = conf.getStringCollection(AUX_SERVICES);
+ for (final String sName : auxNames) {
+ try {
+ Class<? extends AuxiliaryService> sClass = conf.getClass(
+ String.format(AUX_SERVICE_CLASS_FMT, sName), null,
+ AuxiliaryService.class);
+ if (null == sClass) {
+ throw new RuntimeException("No class defiend for " + sName);
+ }
+ AuxiliaryService s = ReflectionUtils.newInstance(sClass, conf);
+ // TODO better use use s.getName()?
+ addService(sName, s);
+ s.init(conf);
+ } catch (RuntimeException e) {
+ LOG.fatal("Failed to initialize " + sName, e);
+ throw e;
+ }
+ }
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ // TODO fork(?) services running as configured user
+ // monitor for health, shutdown/restart(?) if any should die
+ for (Service service : serviceMap.values()) {
+ service.start();
+ service.register(this);
+ }
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ try {
+ synchronized (serviceMap) {
+ for (Service service : serviceMap.values()) {
+ if (service.getServiceState() == Service.STATE.STARTED) {
+ service.unregister(this);
+ service.stop();
+ }
+ }
+ serviceMap.clear();
+ }
+ } finally {
+ super.stop();
+ }
+ }
+
+ @Override
+ public void stateChanged(Service service) {
+ LOG.fatal("Service " + service.getName() + " changed state: " +
+ service.getServiceState());
+ stop();
+ }
+
+ @Override
+ public void handle(AuxServicesEvent event) {
+ LOG.info("Got event " + event.getType() + " for service "
+ + event.getServiceID());
+ AuxiliaryService service = serviceMap.get(event.getServiceID());
+ if (null == service) {
+ // TODO kill all containers waiting on Application
+ return;
+ }
+ switch (event.getType()) {
+ case APPLICATION_INIT:
+ service.initApp(event.getUser(), event.getApplicationID(),
+ event.getServiceData());
+ break;
+ case APPLICATION_STOP:
+ service.stopApp(event.getApplicationID());
+ break;
+ default:
+ throw new RuntimeException("Unknown type: " + event.getType());
+ }
+ }
+
+ public interface AuxiliaryService extends Service {
+ void initApp(String user, ApplicationID appId, ByteBuffer data);
+ void stopApp(ApplicationID appId);
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,64 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+
+public class AuxServicesEvent extends AbstractEvent<AuxServicesEventType> {
+
+ private final String user;
+ private final String serviceId;
+ private final ByteBuffer serviceData;
+ private final ApplicationID appId;
+
+ public AuxServicesEvent(AuxServicesEventType eventType, ApplicationID appId) {
+ this(eventType, null, appId, null, null);
+ }
+
+ public AuxServicesEvent(AuxServicesEventType eventType, String user,
+ ApplicationID appId, String serviceId, ByteBuffer serviceData) {
+ super(eventType);
+ this.user = user;
+ this.appId = appId;
+ this.serviceId = serviceId;
+ this.serviceData = serviceData;
+ }
+
+ public String getServiceID() {
+ return serviceId;
+ }
+
+ public ByteBuffer getServiceData() {
+ return serviceData;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public ApplicationID getApplicationID() {
+ return appId;
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEventType.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEventType.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/AuxServicesEventType.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,24 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+public enum AuxServicesEventType {
+ APPLICATION_INIT,
+ APPLICATION_STOP
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,295 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.DEFAULT_NM_BIND_ADDRESS;
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.NM_BIND_ADDRESS;
+import static org.apache.hadoop.yarn.service.Service.STATE.STARTED;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+
+import org.apache.avro.ipc.Server;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityInfo;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerManagerSecurityInfo;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerManager;
+import org.apache.hadoop.yarn.ContainerStatus;
+import org.apache.hadoop.yarn.YarnRemoteException;
+
+public class ContainerManagerImpl extends CompositeService implements
+ ServiceStateChangeListener, ContainerManager, EventHandler<Event> {
+
+ private static final Log LOG = LogFactory.getLog(ContainerManagerImpl.class);
+
+ final Context context;
+ private Server server;
+ private InetSocketAddress cmAddr;
+ private final ResourceLocalizationService rsrcLocalizationSrvc;
+ private final ContainersLauncher containersLauncher;
+ private final AuxServices auxiluaryServices;
+
+ private final NodeStatusUpdater nodeStatusUpdater;
+ private ContainerTokenSecretManager containerTokenSecretManager;
+
+ protected AsyncDispatcher dispatcher;
+
+ public ContainerManagerImpl(Context context, ContainerExecutor exec,
+ DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater) {
+ super(ContainerManagerImpl.class.getName());
+ this.context = context;
+ dispatcher = new AsyncDispatcher();
+
+ rsrcLocalizationSrvc =
+ createResourceLocalizationService(exec, deletionContext);
+ addService(rsrcLocalizationSrvc);
+
+ containersLauncher = createContainersLauncher(context, exec);
+ addService(containersLauncher);
+
+ this.nodeStatusUpdater = nodeStatusUpdater;
+ // Create the secretManager if need be.
+ if (UserGroupInformation.isSecurityEnabled()) {
+ LOG.info("Security is enabled on NodeManager. "
+ + "Creating ContainerTokenSecretManager");
+ this.containerTokenSecretManager = new ContainerTokenSecretManager();
+ }
+
+ // Start configurable services
+ auxiluaryServices = new AuxServices();
+ auxiluaryServices.register(this);
+ addService(auxiluaryServices);
+
+ ContainersMonitor containersMonitor = new ContainersMonitor();
+ addService(containersMonitor);
+
+ dispatcher.register(ContainerEventType.class,
+ new ContainerEventDispatcher());
+ dispatcher.register(ApplicationEventType.class,
+ new ApplicationEventDispatcher());
+ dispatcher.register(LocalizerEventType.class, rsrcLocalizationSrvc);
+ dispatcher.register(AuxServicesEventType.class, auxiluaryServices);
+ dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
+ dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
+ addService(dispatcher);
+ }
+
+ protected ResourceLocalizationService createResourceLocalizationService(
+ ContainerExecutor exec, DeletionService deletionContext) {
+ return new ResourceLocalizationService(this.dispatcher, exec,
+ deletionContext);
+ }
+
+ protected ContainersLauncher createContainersLauncher(Context context,
+ ContainerExecutor exec) {
+ return new ContainersLauncher(context, this.dispatcher, exec);
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ // TODO enqueue user dirs in deletion context
+ cmAddr = NetUtils.createSocketAddr(
+ conf.get(NM_BIND_ADDRESS, DEFAULT_NM_BIND_ADDRESS));
+ Configuration cmConf = new Configuration(conf);
+ cmConf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_INFO_CLASS_NAME,
+ ContainerManagerSecurityInfo.class, SecurityInfo.class);
+ super.init(cmConf);
+ }
+
+ @Override
+ public void start() {
+ YarnRPC rpc = YarnRPC.create(getConfig());
+ if (UserGroupInformation.isSecurityEnabled()) {
+ // This is fine as status updater is started before ContainerManager and
+ // RM gives the shared secret in registration during StatusUpdter#start()
+ // itself.
+ this.containerTokenSecretManager.setSecretKey(
+ this.nodeStatusUpdater.getNodeName(),
+ this.nodeStatusUpdater.getRMNMSharedSecret());
+ }
+ server =
+ rpc.getServer(ContainerManager.class, this, cmAddr, getConfig(),
+ this.containerTokenSecretManager);
+ LOG.info("ContainerManager started at " + cmAddr);
+ server.start();
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ if (auxiluaryServices.getServiceState() == STARTED) {
+ auxiluaryServices.unregister(this);
+ }
+ if (server != null) {
+ server.close();
+ }
+ super.stop();
+ }
+
+ @Override
+ public Void cleanupContainer(ContainerID containerID)
+ throws YarnRemoteException {
+ // TODO Is this necessary?
+ return null;
+ }
+
+ @Override
+ public Void startContainer(ContainerLaunchContext launchContext)
+ throws YarnRemoteException {
+ Container container = new ContainerImpl(this.dispatcher, launchContext);
+ ContainerID containerID = launchContext.id;
+ ApplicationID applicationID = containerID.appID;
+ if (this.context.getContainers().putIfAbsent(containerID, container) != null) {
+ throw RPCUtil.getRemoteException("Container " + containerID
+ + " already is running on this node!!");
+ }
+// if (LOG.isDebugEnabled()) { TODO
+ LOG.info("CONTAINER: " + launchContext);
+// }
+
+ // Create the application
+ Application application = new ApplicationImpl(this.dispatcher,
+ launchContext.user, launchContext.id.appID,
+ launchContext.env, launchContext.resources,
+ launchContext.containerTokens);
+ if (this.context.getApplications().putIfAbsent(applicationID, application) == null) {
+ LOG.info("Creating a new application reference for app "
+ + applicationID);
+ }
+
+ // TODO: Validate the request
+ dispatcher.getEventHandler().handle(
+ new ApplicationInitEvent(containerID));
+ return null;
+ }
+
+ @Override
+ public Void stopContainer(ContainerID containerID)
+ throws YarnRemoteException {
+ Container container = this.context.getContainers().get(containerID);
+ if (container == null) {
+ LOG.warn("Trying to stop unknown container " + containerID);
+ return null;
+ //throw RPCUtil.getRemoteException("Trying to stop unknown container "
+ // + containerID + " on this NodeManager!!");
+ }
+ dispatcher.getEventHandler().handle(
+ new ContainerEvent(containerID, ContainerEventType.KILL_CONTAINER));
+
+ // TODO: Move this code to appropriate place once kill_container is
+ // implemented.
+ nodeStatusUpdater.sendOutofBandHeartBeat();
+
+ return null;
+ }
+
+ @Override
+ public ContainerStatus getContainerStatus(ContainerID containerID)
+ throws YarnRemoteException {
+ LOG.info("Getting container-status for " + containerID);
+ Container container = this.context.getContainers().get(containerID);
+ if (container != null) {
+ ContainerStatus containerStatus = container.getContainerStatus();
+ LOG.info("Returning " + containerStatus);
+ return containerStatus;
+ } else {
+ throw RPCUtil.getRemoteException("Container " + containerID
+ + " is not handled by this NodeManager");
+ }
+ }
+
+ class ContainerEventDispatcher implements EventHandler<ContainerEvent> {
+ @Override
+ public void handle(ContainerEvent event) {
+ Map<ContainerID,Container> containers =
+ ContainerManagerImpl.this.context.getContainers();
+ Container c = containers.get(event.getContainerID());
+ if (c != null) {
+ c.handle(event);
+ } else {
+ LOG.warn("Event " + event + " sent to absent container " +
+ event.getContainerID());
+ }
+ }
+ }
+
+ class ApplicationEventDispatcher implements EventHandler<ApplicationEvent> {
+
+ @Override
+ public void handle(ApplicationEvent event) {
+ Application app =
+ ContainerManagerImpl.this.context.getApplications().get(
+ event.getApplicationID());
+ if (app != null) {
+ app.handle(event);
+ } else {
+ LOG.warn("Event " + event + " sent to absent application " +
+ event.getApplicationID());
+ }
+ }
+
+ }
+
+ @Override
+ public void handle(Event event) {
+ dispatcher.getEventHandler().handle(event);
+ }
+
+ @Override
+ public void stateChanged(Service service) {
+ // TODO Auto-generated method stub
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerMonitor.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerMonitor.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerMonitor.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,23 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+public interface ContainerMonitor {
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,46 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.event.EventHandler;
+
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.LocalResource;
+import org.apache.hadoop.yarn.LocalResourceVisibility;
+
+
+public interface Application extends EventHandler<ApplicationEvent> {
+
+ Map<CharSequence,CharSequence> getEnvironment();
+
+ Map<String,LocalResource> getResources(LocalResourceVisibility vis);
+
+ Map<Path,String> getLocalizedResources();
+
+ String getUser();
+
+ ApplicationID getAppId();
+
+ Credentials getCredentials() throws IOException;
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.ApplicationID;
+
+public class ApplicationEvent extends AbstractEvent<ApplicationEventType> {
+
+ private final ApplicationID applicationID;
+
+ public ApplicationEvent(ApplicationID appID,
+ ApplicationEventType appEventType) {
+ super(appEventType);
+ this.applicationID = appID;
+ }
+
+ public ApplicationID getApplicationID() {
+ return this.applicationID;
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,27 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+
+public enum ApplicationEventType {
+ INIT_APPLICATION,
+ FINISH_APPLICATION,
+
+ APPLICATION_INITED,
+ APPLICATION_FINISHED,
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,285 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ApplicationLocalizerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.AvroUtil;
+
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.LocalResource;
+import org.apache.hadoop.yarn.LocalResourceVisibility;
+
+public class ApplicationImpl implements Application {
+
+ final Dispatcher dispatcher;
+ final String user;
+ final ApplicationID appId;
+ final Map<CharSequence,CharSequence> env;
+ final Map<CharSequence,LocalResource> resources;
+ final ByteBuffer containerTokens;
+ Map<Path,String> localizedResources;
+
+ private static final Log LOG = LogFactory.getLog(Application.class);
+
+ private List<ContainerID> containersWaitingForAppInit =
+ new ArrayList<ContainerID>();
+
+ // TODO check for suitability of symlink name
+ static Map<String,LocalResource>
+ filterResources(Map<CharSequence,LocalResource> resources,
+ LocalResourceVisibility state) {
+ Map<String,LocalResource> ret =
+ new HashMap<String,LocalResource>();
+ for (Map.Entry<CharSequence,LocalResource> rsrc : resources.entrySet()) {
+ if (state.equals(rsrc.getValue().state)) {
+ ret.put(rsrc.getKey().toString(), rsrc.getValue());
+ }
+ }
+ return ret;
+ }
+
+ public ApplicationImpl(Dispatcher dispatcher,
+ CharSequence user,
+ ApplicationID appId,
+ Map<CharSequence,CharSequence> env,
+ Map<CharSequence,LocalResource> resources,
+ ByteBuffer containerTokens) {
+ this.dispatcher = dispatcher;
+ this.user = user.toString();
+ this.appId = appId;
+ this.env = env;
+ this.resources = null == resources
+ ? new HashMap<CharSequence,LocalResource>()
+ : resources;
+ this.containerTokens = containerTokens;
+ stateMachine = stateMachineFactory.make(this);
+ }
+
+ @Override
+ public String getUser() {
+ return user.toString();
+ }
+
+ @Override
+ public ApplicationID getAppId() {
+ return appId;
+ }
+
+ @Override
+ public Map<CharSequence,CharSequence> getEnvironment() {
+ return env;
+ }
+
+ @Override
+ public Map<String,LocalResource>
+ getResources(LocalResourceVisibility vis) {
+ final Map<String,LocalResource> ret;
+ if (LocalResourceVisibility.PUBLIC.equals(vis)) {
+ ret = filterResources(resources, LocalResourceVisibility.PUBLIC);
+ } else {
+ // TODO separate these
+ ret = filterResources(resources, LocalResourceVisibility.PRIVATE);
+ ret.putAll(filterResources(resources, LocalResourceVisibility.APPLICATION));
+ }
+ return Collections.unmodifiableMap(ret);
+ }
+
+ @Override
+ public Map<Path,String> getLocalizedResources() {
+ if (ApplicationState.INITED.equals(stateMachine.getCurrentState())) {
+ return localizedResources;
+ }
+ throw new IllegalStateException(
+ "Invalid request for " + stateMachine.getCurrentState());
+ }
+
+ @Override
+ public Credentials getCredentials() throws IOException {
+ Credentials ret = new Credentials();
+ if (containerTokens != null) {
+ DataInputByteBuffer buf = new DataInputByteBuffer();
+ buf.reset(containerTokens);
+ ret.readTokenStorageStream(buf);
+ for (Token<? extends TokenIdentifier> tk : ret.getAllTokens()) {
+ LOG.info(" In Nodemanager , token " + tk);
+ }
+ }
+ return ret;
+ }
+
+ private static StateMachineFactory
+ <ApplicationImpl, ApplicationState, ApplicationEventType, ApplicationEvent>
+ stateMachineFactory =
+ new StateMachineFactory
+ <ApplicationImpl, ApplicationState, ApplicationEventType, ApplicationEvent>
+ (ApplicationState.NEW)
+ .addTransition(ApplicationState.NEW, ApplicationState.INITING,
+ ApplicationEventType.INIT_APPLICATION, new AppInitTransition())
+ .addTransition(ApplicationState.INITING, ApplicationState.INITING,
+ ApplicationEventType.INIT_APPLICATION, new AppIsInitingTransition())
+ .addTransition(ApplicationState.INITING, ApplicationState.FINISHING,
+ ApplicationEventType.FINISH_APPLICATION, new AppFinishingTransition())
+ .addTransition(ApplicationState.INITING, ApplicationState.INITED,
+ ApplicationEventType.APPLICATION_INITED, new AppInitedTransition())
+ .addTransition(ApplicationState.INITED,
+ ApplicationState.INITED, ApplicationEventType.INIT_APPLICATION,
+ new AppHasInitedTransition())
+ .addTransition(ApplicationState.INITED,
+ ApplicationState.FINISHING, ApplicationEventType.FINISH_APPLICATION,
+ new AppFinishingTransition())
+ .addTransition(ApplicationState.FINISHING,
+ ApplicationState.FINISHED, ApplicationEventType.APPLICATION_FINISHED)
+ // TODO failure transitions are completely broken
+
+ // create the topology tables
+ .installTopology();
+
+ private final StateMachine<ApplicationState, ApplicationEventType, ApplicationEvent>
+ stateMachine;
+
+ static class AppInitTransition implements
+ SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+ @Override
+ public void transition(ApplicationImpl app, ApplicationEvent event) {
+ ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
+ ContainerID cId = initEvent.getContainerRequestingAppInit();
+ app.containersWaitingForAppInit.add(cId);
+ app.dispatcher.getEventHandler().handle(
+ new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
+ app.dispatcher.getEventHandler().handle(
+ new ApplicationLocalizerEvent(
+ LocalizerEventType.INIT_APPLICATION_RESOURCES, app));
+ }
+ }
+
+ static class AppIsInitingTransition implements
+ SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+ @Override
+ public void transition(ApplicationImpl app, ApplicationEvent event) {
+ ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
+ ContainerID cId = initEvent.getContainerRequestingAppInit();
+ app.containersWaitingForAppInit.add(cId);
+ app.dispatcher.getEventHandler().handle(
+ new ContainerEvent(cId, ContainerEventType.INIT_CONTAINER));
+ }
+ }
+
+ static class AppInitedTransition implements
+ SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+ @Override
+ public void transition(ApplicationImpl app, ApplicationEvent event) {
+
+ ApplicationInitedEvent initedEvent = (ApplicationInitedEvent) event;
+ app.localizedResources = initedEvent.getLocalizedResources();
+ // Start all the containers waiting for ApplicationInit
+ for (ContainerID containerID : app.containersWaitingForAppInit) {
+ app.dispatcher.getEventHandler().handle(
+ new ContainerEvent(containerID,
+ ContainerEventType.CONTAINER_RESOURCES_LOCALIZED));
+ }
+ }
+ }
+
+ static class AppHasInitedTransition implements
+ SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+ @Override
+ public void transition(ApplicationImpl app, ApplicationEvent event) {
+ ApplicationInitEvent initEvent = (ApplicationInitEvent) event;
+ ContainerID containerID = initEvent.getContainerRequestingAppInit();
+ app.dispatcher.getEventHandler().handle(
+ new ContainerEvent(containerID, ContainerEventType.INIT_CONTAINER));
+ app.dispatcher.getEventHandler().handle(
+ new ContainerEvent(containerID,
+ ContainerEventType.CONTAINER_RESOURCES_LOCALIZED));
+ }
+ }
+
+ static class AppFinishingTransition implements
+ SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+ @Override
+ public void transition(ApplicationImpl app, ApplicationEvent event) {
+
+ // Send event to ContainersLauncher to finish all the containers of this
+ // application.
+ for (ContainerID containerID : app.containersWaitingForAppInit) {
+ app.dispatcher.getEventHandler().handle(
+ new ContainerEvent(containerID,
+ ContainerEventType.KILL_CONTAINER));
+ }
+
+ // Delete Application level resources
+ app.dispatcher.getEventHandler().handle(
+ new ApplicationLocalizerEvent(
+ LocalizerEventType.DESTROY_APPLICATION_RESOURCES, app));
+
+ // TODO: Trigger the LogsManager
+ }
+ }
+
+ @Override
+ public synchronized void handle(ApplicationEvent event) {
+
+ ApplicationID applicationID = event.getApplicationID();
+ LOG.info("Processing " + applicationID + " of type " + event.getType());
+
+ ApplicationState oldState = stateMachine.getCurrentState();
+ ApplicationState newState = null;
+ try {
+ // queue event requesting init of the same app
+ newState = stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.warn("Can't handle this event at current state", e);
+ }
+ if (oldState != newState) {
+ LOG.info("Application " + applicationID + " transitioned from "
+ + oldState + " to " + newState);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return AvroUtil.toString(appId);
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+
+import org.apache.hadoop.yarn.ContainerID;
+
+public class ApplicationInitEvent extends ApplicationEvent {
+
+ private final ContainerID containerRequestingAppInit;
+
+ public ApplicationInitEvent(ContainerID containerID) {
+ super(containerID.appID, ApplicationEventType.INIT_APPLICATION);
+ this.containerRequestingAppInit = containerID;
+ }
+
+ public ContainerID getContainerRequestingAppInit() {
+ return this.containerRequestingAppInit;
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitedEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitedEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationInitedEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,45 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+
+import java.util.Map;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.ApplicationID;
+
+public class ApplicationInitedEvent extends ApplicationEvent {
+
+ private final Path workdir;
+ private final Map<Path,String> localizedResources;
+
+ public ApplicationInitedEvent(ApplicationID appID,
+ Map<Path,String> localizedResources, Path workdir) {
+ super(appID, ApplicationEventType.APPLICATION_INITED);
+ this.workdir = workdir;
+ this.localizedResources = localizedResources;
+ }
+
+ public Map<Path,String> getLocalizedResources() {
+ return localizedResources;
+ }
+
+ public Path getWorkDirectory() {
+ return workdir;
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationState.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationState.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationState.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationState.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,23 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+
+public enum ApplicationState {
+ NEW, INITING, INITED, FINISHING, FINISHED
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,35 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerStatus;
+
+public interface Container extends EventHandler<ContainerEvent> {
+
+ org.apache.hadoop.yarn.Container getContainer();
+
+ // TODO overly-general interface
+ ContainerLaunchContext getLaunchContext();
+
+ ContainerStatus getContainerStatus();
+
+ String toString();
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.ContainerID;
+
+public class ContainerEvent extends AbstractEvent<ContainerEventType> {
+
+ private final ContainerID containerID;
+
+ public ContainerEvent(ContainerID cID, ContainerEventType eventType) {
+ super(eventType);
+ this.containerID = cID;
+ }
+
+ public ContainerID getContainerID() {
+ return containerID;
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerEventType.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+public enum ContainerEventType {
+
+ // Producer: ContainerManager
+ INIT_CONTAINER,
+ KILL_CONTAINER,
+ CONTAINER_DONE,
+
+ // DownloadManager
+ CONTAINER_INITED,
+ CONTAINER_RESOURCES_LOCALIZED,
+ CONTAINER_RESOURCES_CLEANEDUP,
+
+ // Producer: ContainersLauncher
+ CONTAINER_LAUNCHED,
+ CONTAINER_EXITED_WITH_SUCCESS,
+ CONTAINER_EXITED_WITH_FAILURE,
+ CONTAINER_CLEANEDUP_AFTER_KILL,
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,315 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerStatus;
+
+public class ContainerImpl implements Container {
+
+ private final Dispatcher dispatcher;
+ private final ContainerLaunchContext launchContext;
+
+ private static final Log LOG = LogFactory.getLog(Container.class);
+
+ public ContainerImpl(Dispatcher dispatcher,
+ ContainerLaunchContext launchContext) {
+ this.dispatcher = dispatcher;
+ this.launchContext = launchContext;
+
+ stateMachine = stateMachineFactory.make(this);
+ }
+
+ // State Machine for each container.
+ private static StateMachineFactory
+ <ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>
+ stateMachineFactory =
+ new StateMachineFactory<ContainerImpl, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.NEW)
+ // From NEW State
+ .addTransition(ContainerState.NEW, ContainerState.LOCALIZING,
+ ContainerEventType.INIT_CONTAINER)
+ .addTransition(ContainerState.NEW, ContainerState.DONE,
+ ContainerEventType.KILL_CONTAINER)
+
+ // From LOCALIZING State
+ .addTransition(ContainerState.LOCALIZING,
+ ContainerState.LOCALIZED,
+ ContainerEventType.CONTAINER_RESOURCES_LOCALIZED,
+ new LocalizedTransition())
+ .addTransition(ContainerState.LOCALIZING,
+ ContainerState.CONTAINER_RESOURCES_CLEANINGUP,
+ ContainerEventType.KILL_CONTAINER,
+ new KillDuringLocalizationTransition())
+
+ // From LOCALIZED State
+ .addTransition(ContainerState.LOCALIZED, ContainerState.RUNNING,
+ ContainerEventType.CONTAINER_LAUNCHED, new LaunchTransition())
+ .addTransition(ContainerState.LOCALIZED, ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+ new ExitedWithFailureTransition())
+
+ // From RUNNING State
+ .addTransition(ContainerState.RUNNING,
+ ContainerState.EXITED_WITH_SUCCESS,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+ new ExitedWithSuccessTransition())
+ .addTransition(ContainerState.RUNNING,
+ ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+ new ExitedWithFailureTransition())
+ .addTransition(ContainerState.RUNNING, ContainerState.KILLING,
+ ContainerEventType.KILL_CONTAINER, new KillTransition())
+
+ // From CONTAINER_EXITED_WITH_SUCCESS State
+ .addTransition(ContainerState.EXITED_WITH_SUCCESS, ContainerState.DONE,
+ ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)
+ .addTransition(ContainerState.EXITED_WITH_SUCCESS,
+ ContainerState.EXITED_WITH_SUCCESS,
+ ContainerEventType.KILL_CONTAINER)
+
+ // From EXITED_WITH_FAILURE State
+ .addTransition(ContainerState.EXITED_WITH_FAILURE, ContainerState.DONE,
+ ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)
+ .addTransition(ContainerState.EXITED_WITH_FAILURE,
+ ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.KILL_CONTAINER)
+
+ // From KILLING State.
+ .addTransition(ContainerState.KILLING,
+ ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+ ContainerEventType.CONTAINER_CLEANEDUP_AFTER_KILL,
+ new ContainerKilledTransition())
+ .addTransition(ContainerState.KILLING, ContainerState.KILLING,
+ ContainerEventType.KILL_CONTAINER)
+ .addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_SUCCESS,
+ ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS,
+ new ExitedWithSuccessTransition())
+ .addTransition(ContainerState.KILLING, ContainerState.EXITED_WITH_FAILURE,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE,
+ new ExitedWithFailureTransition())
+
+ // From CONTAINER_CLEANEDUP_AFTER_KILL State.
+ .addTransition(ContainerState.CONTAINER_CLEANEDUP_AFTER_KILL,
+ ContainerState.DONE,
+ ContainerEventType.CONTAINER_RESOURCES_CLEANEDUP)
+
+ // From DONE
+ .addTransition(ContainerState.DONE, ContainerState.DONE,
+ ContainerEventType.KILL_CONTAINER)
+
+ // create the topology tables
+ .installTopology();
+
+ private final StateMachine<ContainerState, ContainerEventType, ContainerEvent>
+ stateMachine;
+
+ private synchronized org.apache.hadoop.yarn.ContainerState getCurrentState() {
+ switch (stateMachine.getCurrentState()) {
+ case NEW:
+ case LOCALIZING:
+ case LOCALIZED:
+ return org.apache.hadoop.yarn.ContainerState.INTIALIZING;
+ case RUNNING:
+ case EXITED_WITH_SUCCESS:
+ case EXITED_WITH_FAILURE:
+ case KILLING:
+ case CONTAINER_CLEANEDUP_AFTER_KILL:
+ case CONTAINER_RESOURCES_CLEANINGUP:
+ return org.apache.hadoop.yarn.ContainerState.RUNNING;
+ case DONE:
+ default:
+ return org.apache.hadoop.yarn.ContainerState.COMPLETE;
+ }
+ }
+
+ @Override
+ public synchronized org.apache.hadoop.yarn.Container getContainer() {
+ org.apache.hadoop.yarn.Container c = new org.apache.hadoop.yarn.Container();
+ c.id = this.launchContext.id;
+ c.resource = this.launchContext.resource;
+ c.state = getCurrentState();
+ return c;
+ }
+
+ @Override
+ public ContainerLaunchContext getLaunchContext() {
+ return launchContext;
+ }
+
+ @Override
+ public ContainerStatus getContainerStatus() {
+ ContainerStatus containerStatus = new ContainerStatus();
+ containerStatus.state = getCurrentState();
+ containerStatus.containerID = this.launchContext.id;
+ // TODO: Exit status.
+ return containerStatus;
+ }
+
+ static class ContainerTransition implements
+ SingleArcTransition<ContainerImpl, ContainerEvent> {
+
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ // Just drain the event and change the state.
+ }
+
+ }
+
+ static class LocalizedTransition extends ContainerTransition {
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ // XXX This needs to be container-oriented
+ // Inform the AuxServices about the opaque serviceData
+ ContainerLaunchContext ctxt = container.getLaunchContext();
+ Map<CharSequence,ByteBuffer> csd = ctxt.serviceData;
+ if (csd != null) {
+ // TODO: Isn't this supposed to happen only once per Application?
+ for (Map.Entry<CharSequence,ByteBuffer> service : csd.entrySet()) {
+ container.dispatcher.getEventHandler().handle(
+ new AuxServicesEvent(AuxServicesEventType.APPLICATION_INIT,
+ ctxt.user.toString(), ctxt.id.appID,
+ service.getKey().toString(), service.getValue()));
+ }
+ }
+ container.dispatcher.getEventHandler().handle(
+ new ContainersLauncherEvent(container,
+ ContainersLauncherEventType.LAUNCH_CONTAINER));
+ }
+ }
+
+ static class LaunchTransition extends ContainerTransition {
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ // Inform the ContainersMonitor to start monitoring the container's
+ // resource usage.
+ container.dispatcher.getEventHandler().handle(
+ new ContainersMonitorEvent(
+ ContainersMonitorEventType.START_MONITORING_CONTAINER));
+ }
+ }
+
+ static class ExitedWithSuccessTransition extends ContainerTransition {
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ // TODO: Add containerWorkDir to the deletion service.
+
+ // Inform the localizer to decrement reference counts and cleanup
+ // resources.
+ container.dispatcher.getEventHandler().handle(
+ new ContainerLocalizerEvent(
+ LocalizerEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ }
+ }
+
+ static class ExitedWithFailureTransition extends ContainerTransition {
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ // TODO: Add containerWorkDir to the deletion service.
+ // TODO: Add containerOuputDir to the deletion service.
+
+ // Inform the localizer to decrement reference counts and cleanup
+ // resources.
+ container.dispatcher.getEventHandler().handle(
+ new ContainerLocalizerEvent(
+ LocalizerEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ }
+ }
+
+ static class KillDuringLocalizationTransition implements
+ SingleArcTransition<ContainerImpl, ContainerEvent> {
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ // Inform the localizer to decrement reference counts and cleanup
+ // resources.
+ container.dispatcher.getEventHandler().handle(
+ new ContainerLocalizerEvent(
+ LocalizerEventType.CLEANUP_CONTAINER_RESOURCES, container));
+
+ }
+ }
+
+ static class KillTransition implements
+ SingleArcTransition<ContainerImpl, ContainerEvent> {
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ // Kill the process/process-grp
+ container.dispatcher.getEventHandler().handle(
+ new ContainersLauncherEvent(container,
+ ContainersLauncherEventType.CLEANUP_CONTAINER));
+ }
+ }
+
+ static class ContainerKilledTransition implements
+ SingleArcTransition<ContainerImpl, ContainerEvent> {
+ @Override
+ public void transition(ContainerImpl container, ContainerEvent event) {
+ // The process/process-grp is killed. Decrement reference counts and
+ // cleanup resources
+ container.dispatcher.getEventHandler().handle(
+ new ContainerLocalizerEvent(
+ LocalizerEventType.CLEANUP_CONTAINER_RESOURCES, container));
+ }
+ }
+
+ @Override
+ public synchronized void handle(ContainerEvent event) {
+
+ ContainerID containerID = event.getContainerID();
+ LOG.info("Processing " + containerID + " of type " + event.getType());
+
+ ContainerState oldState = stateMachine.getCurrentState();
+ ContainerState newState = null;
+ try {
+ newState =
+ stateMachine.doTransition(event.getType(), event);
+ } catch (InvalidStateTransitonException e) {
+ LOG.warn("Can't handle this event at current state", e);
+ }
+ if (oldState != newState) {
+ LOG.info("Container " + containerID + " transitioned from " + oldState
+ + " to " + newState);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return AvroUtil.toString(launchContext.id);
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerState.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,25 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;
+
+public enum ContainerState {
+ NEW, LOCALIZING, LOCALIZED, RUNNING, EXITED_WITH_SUCCESS,
+ EXITED_WITH_FAILURE, KILLING, CONTAINER_CLEANEDUP_AFTER_KILL,
+ CONTAINER_RESOURCES_CLEANINGUP, DONE
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainerLaunch.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,136 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+
+import java.io.DataOutputStream;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+
+public class ContainerLaunch implements Callable<Integer> {
+
+ private static final Log LOG = LogFactory.getLog(ContainerLaunch.class);
+
+ public static final String CONTAINER_SCRIPT = "task.sh";
+
+ private final Dispatcher dispatcher;
+ private final ContainerExecutor exec;
+ private final Application app;
+ private final Container container;
+ private final Path sysDir;
+ private final List<Path> appDirs;
+
+ public ContainerLaunch(Dispatcher dispatcher, ContainerExecutor exec,
+ Application app, Container container, Path sysDir, List<Path> appDirs) {
+ this.app = app;
+ this.exec = exec;
+ this.sysDir = sysDir;
+ this.appDirs = appDirs;
+ this.container = container;
+ this.dispatcher = dispatcher;
+ }
+
+ @Override
+ public Integer call() {
+ final ContainerLaunchContext launchContext = container.getLaunchContext();
+ final Map<Path,String> localizedResources = app.getLocalizedResources();
+ final String user = launchContext.user.toString();
+ final Map<CharSequence,CharSequence> env = launchContext.env;
+ final List<CharSequence> command = launchContext.command;
+ int ret = -1;
+ try {
+ FileContext lfs = FileContext.getLocalFSFileContext();
+ Path launchSysDir = new Path(sysDir, container.toString());
+ lfs.mkdir(launchSysDir, null, false);
+ Path launchPath = new Path(launchSysDir, CONTAINER_SCRIPT);
+ Path tokensPath =
+ new Path(launchSysDir, ApplicationLocalizer.APPTOKEN_FILE);
+ DataOutputStream launchOut = null;
+ DataOutputStream tokensOut = null;
+
+ try {
+ launchOut = lfs.create(launchPath, EnumSet.of(CREATE, OVERWRITE));
+ ApplicationLocalizer.writeLaunchEnv(launchOut, env, localizedResources,
+ command, appDirs);
+
+ tokensOut = lfs.create(tokensPath, EnumSet.of(CREATE, OVERWRITE));
+ Credentials creds = new Credentials();
+ if (container.getLaunchContext().containerTokens != null) {
+ // TODO: Is the conditional the correct way of checking?
+ DataInputByteBuffer buf = new DataInputByteBuffer();
+ container.getLaunchContext().containerTokens.rewind();
+ buf.reset(container.getLaunchContext().containerTokens);
+ creds.readTokenStorageStream(buf);
+ for (Token<? extends TokenIdentifier> tk : creds.getAllTokens()) {
+ LOG.debug(tk.getService() + " = " + tk.toString());
+ }
+ }
+ creds.writeTokenStorageToStream(tokensOut);
+ } finally {
+ IOUtils.cleanup(LOG, launchOut, tokensOut);
+ if (launchOut != null) {
+ launchOut.close();
+ }
+ }
+ dispatcher.getEventHandler().handle(new ContainerEvent(
+ container.getLaunchContext().id,
+ ContainerEventType.CONTAINER_LAUNCHED));
+ ret =
+ exec.launchContainer(container, launchSysDir, user, app.toString(),
+ appDirs, null, null);
+ if (ret != 0) {
+ throw new ExitCodeException(ret, "Container failed");
+ }
+ } catch (Throwable e) {
+ LOG.warn("Failed to launch container", e);
+ dispatcher.getEventHandler().handle(new ContainerEvent(
+ launchContext.id,
+ ContainerEventType.CONTAINER_EXITED_WITH_FAILURE));
+ return ret;
+ }
+ LOG.info("Container " + container + " succeeded " + launchContext.id);
+ dispatcher.getEventHandler().handle(new ContainerEvent(
+ launchContext.id, ContainerEventType.CONTAINER_EXITED_WITH_SUCCESS));
+ return 0;
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,149 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ApplicationLocalizer;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+
+import static org.apache.hadoop.yarn.server.nodemanager.NMConfig.*;
+
+public class ContainersLauncher extends AbstractService
+ implements EventHandler<ContainersLauncherEvent> {
+
+ private final Context context;
+ private final ContainerExecutor exec;
+ private final Dispatcher dispatcher;
+ private final ExecutorService containerLauncher =
+ Executors.newCachedThreadPool();
+ private List<Path> logDirs;
+ private List<Path> localDirs;
+ private List<Path> sysDirs;
+ private final Map<ContainerID,Future<Integer>> running =
+ Collections.synchronizedMap(new HashMap<ContainerID,Future<Integer>>());
+
+ public ContainersLauncher(Context context, Dispatcher dispatcher,
+ ContainerExecutor exec) {
+ super("containers-launcher");
+ this.exec = exec;
+ this.context = context;
+ this.dispatcher = dispatcher;
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ // TODO factor this out of Localizer
+ try {
+ FileContext lfs = FileContext.getLocalFSFileContext(conf);
+ String[] sLocalDirs = conf.getStrings(NM_LOCAL_DIR, DEFAULT_NM_LOCAL_DIR);
+
+ localDirs = new ArrayList<Path>(sLocalDirs.length);
+ logDirs = new ArrayList<Path>(sLocalDirs.length);
+ sysDirs = new ArrayList<Path>(sLocalDirs.length);
+ for (String sLocaldir : sLocalDirs) {
+ Path localdir = new Path(sLocaldir);
+ localDirs.add(localdir);
+ // $local/nmPrivate
+ Path sysdir = new Path(localdir, ResourceLocalizationService.NM_PRIVATE_DIR);
+ sysDirs.add(sysdir);
+ }
+ String[] sLogdirs = conf.getStrings(NM_LOG_DIR, DEFAULT_NM_LOG_DIR);
+ for (String sLogdir : sLogdirs) {
+ Path logdir = new Path(sLogdir);
+ logDirs.add(logdir);
+ }
+ } catch (IOException e) {
+ throw new YarnException("Failed to start ContainersLauncher", e);
+ }
+ localDirs = Collections.unmodifiableList(localDirs);
+ logDirs = Collections.unmodifiableList(logDirs);
+ sysDirs = Collections.unmodifiableList(sysDirs);
+ super.init(conf);
+ }
+
+ @Override
+ public void stop() {
+ containerLauncher.shutdownNow();
+ super.stop();
+ }
+
+ @Override
+ public void handle(ContainersLauncherEvent event) {
+ // TODO: ContainersLauncher launches containers one by one!!
+ Container container = event.getContainer();
+ ContainerID containerId = container.getLaunchContext().id;
+ switch (event.getType()) {
+ case LAUNCH_CONTAINER:
+ Application app =
+ context.getApplications().get(containerId.appID);
+ List<Path> appDirs = new ArrayList<Path>(localDirs.size());
+ for (Path p : localDirs) {
+ Path usersdir = new Path(p, ApplicationLocalizer.USERCACHE);
+ Path userdir = new Path(usersdir,
+ container.getLaunchContext().user.toString());
+ Path appsdir = new Path(userdir, ApplicationLocalizer.APPCACHE);
+ appDirs.add(new Path(appsdir, app.toString()));
+ }
+ Path appSysDir = new Path(sysDirs.get(0), app.toString());
+ // TODO set in Application
+ //Path appLogDir = new Path(logDirs.get(0), app.toString());
+ ContainerLaunch launch =
+ new ContainerLaunch(dispatcher, exec, app,
+ event.getContainer(), appSysDir, appDirs);
+ running.put(containerId, containerLauncher.submit(launch));
+ break;
+ case CLEANUP_CONTAINER:
+ Future<Integer> rContainer = running.remove(containerId);
+ if (rContainer != null) {
+ // TODO needs to kill the container
+ rContainer.cancel(false);
+ }
+ dispatcher.getEventHandler().handle(
+ new ContainerEvent(containerId,
+ ContainerEventType.CONTAINER_CLEANEDUP_AFTER_KILL));
+ break;
+ }
+ }
+
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncherEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,40 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+
+public class ContainersLauncherEvent
+ extends AbstractEvent<ContainersLauncherEventType>{
+
+ private final Container container;
+
+ public ContainersLauncherEvent(Container container,
+ ContainersLauncherEventType eventType) {
+ super(eventType);
+ this.container = container;
+ }
+
+ public Container getContainer() {
+ return container;
+ }
+
+}
|