Author: acmurthy
Date: Tue Apr 19 21:00:46 2011
New Revision: 1095212
URL: http://svn.apache.org/viewvc?rev=1095212&view=rev
Log:
Added functionality to stop/start queues.
Added:
hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/resources/
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1095212&r1=1095211&r2=1095212&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Tue Apr 19 21:00:46 2011
@@ -4,6 +4,8 @@ Trunk (unreleased changes)
MAPREDUCE-279
+ Added functionality to stop/start queues. (acmurthy)
+
Added functionality to refresh queues at runtime via the 'bin/yarn
rmadmin' command. (acmurthy)
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java?rev=1095212&r1=1095211&r2=1095212&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueInfo.java
Tue Apr 19 21:00:46 2011
@@ -20,4 +20,7 @@ public interface QueueInfo {
List<Application> getApplications();
void setApplications(List<Application> applications);
+
+ QueueState getQueueState();
+ void setQueueState(QueueState queueState);
}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java?rev=1095212&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java
(added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/QueueState.java
Tue Apr 19 21:00:46 2011
@@ -0,0 +1,9 @@
+package org.apache.hadoop.yarn.api.records;
+
+/**
+ * State of a Queue
+ */
+public enum QueueState {
+ STOPPED,
+ RUNNING
+}
\ No newline at end of file
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java?rev=1095212&r1=1095211&r2=1095212&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/QueueInfoPBImpl.java
Tue Apr 19 21:00:46 2011
@@ -7,9 +7,13 @@ import java.util.List;
import org.apache.hadoop.yarn.api.records.Application;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueInfoProto;
import org.apache.hadoop.yarn.proto.YarnProtos.QueueInfoProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnProtos.QueueStateProto;
+import org.apache.hadoop.yarn.util.ProtoUtils;
public class QueueInfoPBImpl extends ProtoBase<QueueInfoProto> implements
QueueInfo {
@@ -67,6 +71,15 @@ public class QueueInfoPBImpl extends Pro
}
@Override
+ public QueueState getQueueState() {
+ QueueInfoProtoOrBuilder p = viaProto ? proto : builder;
+ if (!p.hasState()) {
+ return null;
+ }
+ return convertFromProtoFormat(p.getState());
+ }
+
+ @Override
public void setApplications(List<Application> applications) {
if (applications == null) {
builder.clearApplications();
@@ -110,6 +123,16 @@ public class QueueInfoPBImpl extends Pro
}
@Override
+ public void setQueueState(QueueState queueState) {
+ maybeInitBuilder();
+ if (queueState == null) {
+ builder.clearState();
+ return;
+ }
+ builder.setState(convertToProtoFormat(queueState));
+ }
+
+ @Override
public QueueInfoProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
@@ -252,4 +275,12 @@ public class QueueInfoPBImpl extends Pro
return ((QueueInfoPBImpl)q).getProto();
}
+ private QueueState convertFromProtoFormat(QueueStateProto q) {
+ return ProtoUtils.convertFromProtoFormat(q);
+ }
+
+ private QueueStateProto convertToProtoFormat(QueueState queueState) {
+ return ProtoUtils.convertToProtoFormat(queueState);
+ }
+
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java?rev=1095212&r1=1095211&r2=1095212&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/java/org/apache/hadoop/yarn/util/ProtoUtils.java
Tue Apr 19 21:00:46 2011
@@ -6,11 +6,13 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.YarnContainerTags;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceTypeProto;
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceVisibilityProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.QueueStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnContainerTagsProto;
import com.google.protobuf.ByteString;
@@ -89,4 +91,15 @@ public class ProtoUtils {
return bs;
}
+ /*
+ * QueueState
+ */
+ private static String QUEUE_STATE_PREFIX = "Q_";
+ public static QueueStateProto convertToProtoFormat(QueueState e) {
+ return QueueStateProto.valueOf(QUEUE_STATE_PREFIX + e.name());
+ }
+ public static QueueState convertFromProtoFormat(QueueStateProto e) {
+ return QueueState.valueOf(e.name().replace(QUEUE_STATE_PREFIX, ""));
+ }
+
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto?rev=1095212&r1=1095211&r2=1095212&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/src/main/proto/yarn_protos.proto Tue Apr
19 21:00:46 2011
@@ -162,13 +162,19 @@ message YarnClusterMetricsProto {
optional int32 num_node_managers = 1;
}
+enum QueueStateProto {
+ Q_STOPPED = 1;
+ Q_RUNNING = 2;
+}
+
message QueueInfoProto {
optional string queueName = 1;
optional float capacity = 2;
optional float maximumCapacity = 3;
optional float currentCapacity = 4;
- repeated QueueInfoProto childQueues = 5;
- repeated ApplicationProto applications = 6;
+ optional QueueStateProto state = 5;
+ repeated QueueInfoProto childQueues = 6;
+ repeated ApplicationProto applications = 7;
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java?rev=1095212&r1=1095211&r2=1095212&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
Tue Apr 19 21:00:46 2011
@@ -23,6 +23,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
public class CapacitySchedulerConfiguration extends Configuration {
@@ -56,7 +57,10 @@ public class CapacitySchedulerConfigurat
@Private
public static final String USER_LIMIT_FACTOR = "user-limit-factor";
-
+
+ @Private
+ public static final String STATE = "state";
+
private static final int MINIMUM_MEMORY = 1024;
@Private
@@ -135,6 +139,11 @@ public class CapacitySchedulerConfigurat
setFloat(getQueuePrefix(queue) + USER_LIMIT_FACTOR, userLimitFactor);
}
+ public QueueState getState(String queue) {
+ String state = get(getQueuePrefix(queue) + STATE);
+ return (state != null) ? QueueState.valueOf(state.toUpperCase()) : QueueState.RUNNING;
+ }
+
public void setCapacity(String queue, int capacity) {
setInt(getQueuePrefix(queue) + CAPACITY, capacity);
LOG.info("CSConf - setCapacity: queuePrefix=" + getQueuePrefix(queue) +
@@ -160,4 +169,5 @@ public class CapacitySchedulerConfigurat
return org.apache.hadoop.yarn.server.resourcemanager.resource.Resource.
createResource(minimumMemory);
}
+
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1095212&r1=1095211&r2=1095212&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
Tue Apr 19 21:00:46 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -85,6 +86,8 @@ public class LeafQueue implements Queue
private Map<ApplicationId, org.apache.hadoop.yarn.api.records.Application>
applicationInfos;
+ private QueueState state;
+
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@@ -123,10 +126,13 @@ public class LeafQueue implements Queue
new HashMap<ApplicationId,
org.apache.hadoop.yarn.api.records.Application>();
+ QueueState state = cs.getConfiguration().getState(getQueuePath());
+
setupQueueConfigs(capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity,
userLimit, userLimitFactor,
- maxApplications, maxApplicationsPerUser);
+ maxApplications, maxApplicationsPerUser,
+ state);
LOG.info("DEBUG --- LeafQueue:" +
" name=" + queueName +
@@ -139,7 +145,8 @@ public class LeafQueue implements Queue
float capacity, float absoluteCapacity,
float maxCapacity, float absoluteMaxCapacity,
int userLimit, float userLimitFactor,
- int maxApplications, int maxApplicationsPerUser)
+ int maxApplications, int maxApplicationsPerUser,
+ QueueState state)
{
this.capacity = capacity;
this.absoluteCapacity = parent.getAbsoluteCapacity() * capacity;
@@ -152,9 +159,12 @@ public class LeafQueue implements Queue
this.maxApplications = maxApplications;
this.maxApplicationsPerUser = maxApplicationsPerUser;
+
+ this.state = state;
this.queueInfo.setCapacity(capacity);
this.queueInfo.setMaximumCapacity(maximumCapacity);
+ this.queueInfo.setQueueState(state);
LOG.info(queueName +
", capacity=" + capacity +
@@ -163,7 +173,8 @@ public class LeafQueue implements Queue
", asboluteMaxCapacity=" + absoluteMaxCapacity +
", userLimit=" + userLimit + ", userLimitFactor=" + userLimitFactor +
", maxApplications=" + maxApplications +
- ", maxApplicationsPerUser=" + maxApplicationsPerUser);
+ ", maxApplicationsPerUser=" + maxApplicationsPerUser +
+ ", state=" + state);
}
@@ -244,6 +255,11 @@ public class LeafQueue implements Queue
}
@Override
+ public QueueState getState() {
+ return state;
+ }
+
+ @Override
public synchronized QueueInfo getQueueInfo(boolean includeApplications,
boolean includeChildQueues, boolean recursive) {
queueInfo.setCurrentCapacity(usedCapacity);
@@ -289,7 +305,8 @@ public class LeafQueue implements Queue
setupQueueConfigs(leafQueue.capacity, leafQueue.absoluteCapacity,
leafQueue.maximumCapacity, leafQueue.absoluteMaxCapacity,
leafQueue.userLimit, leafQueue.userLimitFactor,
- leafQueue.maxApplications, leafQueue.maxApplicationsPerUser);
+ leafQueue.maxApplications, leafQueue.maxApplicationsPerUser,
+ leafQueue.state);
update(clusterResource);
}
@@ -299,8 +316,16 @@ public class LeafQueue implements Queue
String queue, Priority priority)
throws AccessControlException {
// Careful! Locking order is important!
+ User user = null;
+
synchronized (this) {
+ if (state != QueueState.RUNNING) {
+ throw new AccessControlException("Queue " + getQueuePath() +
+ " is STOPPED. Cannot accept submission of application: " +
+ application.getApplicationId());
+ }
+
// Check submission limits for queues
if (getNumApplications() >= maxApplications) {
throw new AccessControlException("Queue " + getQueuePath() +
@@ -310,7 +335,7 @@ public class LeafQueue implements Queue
}
// Check submission limits for the user on this queue
- User user = getUser(userName);
+ user = getUser(userName);
if (user.getApplications() >= maxApplicationsPerUser) {
throw new AccessControlException("Queue " + getQueuePath() +
" already has " + user.getApplications() +
@@ -318,51 +343,67 @@ public class LeafQueue implements Queue
" cannot accept submission of application: " +
application.getApplicationId());
}
-
- // Accept
- user.submitApplication();
- applications.add(application);
- applicationInfos.put(application.getApplicationId(),
- application.getApplicationInfo());
-
- LOG.info("Application submission -" +
- " appId: " + application.getApplicationId() +
- " user: " + user + "," + " leaf-queue: " + getQueueName() +
- " #user-applications: " + user.getApplications() +
- " #queue-applications: " + getNumApplications());
+
+ // Add the application to our data-structures
+ addApplication(application, user);
}
// Inform the parent queue
- parent.submitApplication(application, userName, queue, priority);
+ try {
+ parent.submitApplication(application, userName, queue, priority);
+ } catch (AccessControlException ace) {
+ LOG.info("Failed to submit application to parent-queue: " +
+ parent.getQueuePath(), ace);
+ removeApplication(application, user);
+ throw ace;
+ }
}
+ private synchronized void addApplication(Application application, User user) {
+ // Accept
+ user.submitApplication();
+ applications.add(application);
+ applicationInfos.put(application.getApplicationId(),
+ application.getApplicationInfo());
+
+ LOG.info("Application added -" +
+ " appId: " + application.getApplicationId() +
+ " user: " + user + "," + " leaf-queue: " + getQueueName() +
+ " #user-applications: " + user.getApplications() +
+ " #queue-applications: " + getNumApplications());
+
+ }
+
@Override
public void finishApplication(Application application, String queue)
throws AccessControlException {
// Careful! Locking order is important!
synchronized (this) {
- applications.remove(application);
-
- User user = getUser(application.getUser());
- user.finishApplication();
- if (user.getApplications() == 0) {
- users.remove(application.getUser());
- }
-
- applicationInfos.remove(application.getApplicationId());
-
- LOG.info("Application completion -" +
- " appId: " + application.getApplicationId() +
- " user: " + application.getUser() +
- " queue: " + getQueueName() +
- " #user-applications: " + user.getApplications() +
- " #queue-applications: " + getNumApplications());
+ removeApplication(application, getUser(application.getUser()));
}
// Inform the parent queue
parent.finishApplication(application, queue);
}
+ public synchronized void removeApplication(Application application, User user) {
+ applications.remove(application);
+
+ user.finishApplication();
+ if (user.getApplications() == 0) {
+ users.remove(application.getUser());
+ }
+
+ applicationInfos.remove(application.getApplicationId());
+
+ LOG.info("Application removed -" +
+ " appId: " + application.getApplicationId() +
+ " user: " + application.getUser() +
+ " queue: " + getQueueName() +
+ " #user-applications: " + user.getApplications() +
+ " #queue-applications: " + getNumApplications());
+ }
+
@Override
public synchronized Resource
assignContainers(Resource clusterResource, NodeManager node) {
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java?rev=1095212&r1=1095211&r2=1095212&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
Tue Apr 19 21:00:46 2011
@@ -38,12 +38,14 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
@Private
@Evolving
@@ -75,6 +77,8 @@ public class ParentQueue implements Queu
private volatile int numApplications;
private volatile int numContainers;
+ private QueueState state;
+
private QueueInfo queueInfo;
private Map<ApplicationId, org.apache.hadoop.yarn.api.records.Application>
applicationInfos;
@@ -103,12 +107,14 @@ public class ParentQueue implements Queu
(maximumCapacity == CapacitySchedulerConfiguration.UNDEFINED) ?
Float.MAX_VALUE : (parentAbsoluteCapacity * maximumCapacity) / 100;
+ QueueState state = cs.getConfiguration().getState(getQueuePath());
+
this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
this.queueInfo.setQueueName(queueName);
this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
setupQueueConfigs(capacity, absoluteCapacity,
- maximumCapacity, absoluteMaxCapacity);
+ maximumCapacity, absoluteMaxCapacity, state);
this.queueComparator = comparator;
this.childQueues = new TreeSet<Queue>(comparator);
@@ -125,21 +131,26 @@ public class ParentQueue implements Queu
private synchronized void setupQueueConfigs(
float capacity, float absoluteCapacity,
- float maximumCapacity, float absoluteMaxCapacity
+ float maximumCapacity, float absoluteMaxCapacity,
+ QueueState state
) {
this.capacity = capacity;
this.absoluteCapacity = absoluteCapacity;
this.maximumCapacity = maximumCapacity;
this.absoluteMaxCapacity = absoluteMaxCapacity;
+ this.state = state;
+
this.queueInfo.setCapacity(capacity);
this.queueInfo.setMaximumCapacity(maximumCapacity);
+ this.queueInfo.setQueueState(state);
LOG.info(queueName +
", capacity=" + capacity +
", asboluteCapacity=" + absoluteCapacity +
", maxCapacity=" + maximumCapacity +
- ", asboluteMaxCapacity=" + absoluteMaxCapacity);
+ ", asboluteMaxCapacity=" + absoluteMaxCapacity +
+ ", state=" + state);
}
private static float PRECISION = 0.005f; // 0.05% precision
@@ -234,6 +245,11 @@ public class ParentQueue implements Queu
}
@Override
+ public QueueState getState() {
+ return state;
+ }
+
+ @Override
public synchronized QueueInfo getQueueInfo(boolean includeApplications,
boolean includeChildQueues, boolean recursive) {
queueInfo.setCurrentCapacity(usedCapacity);
@@ -303,7 +319,8 @@ public class ParentQueue implements Queu
// Set new configs
setupQueueConfigs(parentQueue.capacity, parentQueue.absoluteCapacity,
- parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity);
+ parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity,
+ parentQueue.state);
// Update
update(clusterResource);
@@ -321,53 +338,84 @@ public class ParentQueue implements Queu
public void submitApplication(Application application, String user,
String queue, Priority priority)
throws AccessControlException {
- // Sanity check
- if (queue.equals(queueName)) {
- throw new AccessControlException("Cannot submit application " +
- "to non-leaf queue: " + queueName);
+
+ synchronized (this) {
+ // Sanity check
+ if (queue.equals(queueName)) {
+ throw new AccessControlException("Cannot submit application " +
+ "to non-leaf queue: " + queueName);
+ }
+
+ if (state != QueueState.RUNNING) {
+ throw new AccessControlException("Queue " + getQueuePath() +
+ " is STOPPED. Cannot accept submission of application: " +
+ application.getApplicationId());
+ }
+
+ addApplication(application, user);
}
+ // Inform the parent queue
+ if (parent != null) {
+ try {
+ parent.submitApplication(application, user, queue, priority);
+ } catch (AccessControlException ace) {
+ LOG.info("Failed to submit application to parent-queue: " +
+ parent.getQueuePath(), ace);
+ removeApplication(application, user);
+ throw ace;
+ }
+ }
+ }
+
+ private synchronized void addApplication(Application application,
+ String user) {
+
++numApplications;
-
+
applicationInfos.put(application.getApplicationId(),
application.getApplicationInfo());
- LOG.info("Application submission -" +
- " appId: " + application.getApplicationId() +
+ LOG.info("Application added -" +
+ " appId: " + application.getApplicationId() +
" user: " + user +
" leaf-queue of parent: " + getQueueName() +
" #applications: " + getNumApplications());
+ }
+
+ @Override
+ public void finishApplication(Application application, String queue)
+ throws AccessControlException {
+
+ synchronized (this) {
+ // Sanity check
+ if (queue.equals(queueName)) {
+ throw new AccessControlException("Cannot finish application " +
+ "from non-leaf queue: " + queueName);
+ }
+ removeApplication(application, application.getUser());
+ }
+
// Inform the parent queue
if (parent != null) {
- parent.submitApplication(application, user, queue, priority);
+ parent.finishApplication(application, queue);
}
}
- @Override
- public void finishApplication(Application application, String queue)
- throws AccessControlException {
- // Sanity check
- if (queue.equals(queueName)) {
- throw new AccessControlException("Cannot finish application " +
- "from non-leaf queue: " + queueName);
- }
+ public synchronized void removeApplication(Application application,
+ String user) {
--numApplications;
applicationInfos.remove(application.getApplicationId());
- LOG.info("Application completion -" +
+ LOG.info("Application removed -" +
" appId: " + application.getApplicationId() +
- " user: " + application.getUser() +
+ " user: " + user +
" leaf-queue of parent: " + getQueueName() +
" #applications: " + getNumApplications());
-
- // Inform the parent queue
- if (parent != null) {
- parent.finishApplication(application, queue);
- }
}
-
+
synchronized void setUsedCapacity(float usedCapacity) {
this.usedCapacity = usedCapacity;
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java?rev=1095212&r1=1095211&r2=1095212&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
(original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/Queue.java
Tue Apr 19 21:00:46 2011
@@ -26,6 +26,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManager;
@@ -109,6 +110,12 @@ extends org.apache.hadoop.yarn.server.re
public float getUtilization();
/**
+ * Get the current run-state of the queue
+ * @return current run-state
+ */
+ public QueueState getState();
+
+ /**
* Get child queues
* @return child queues
*/
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml?rev=1095212&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml
(added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/resources/capacity-scheduler.xml
Tue Apr 19 21:00:46 2011
@@ -0,0 +1,38 @@
+<configuration>
+
+ <property>
+ <name>yarn.capacity-scheduler.maximum-applications</name>
+ <value>10000</value>
+ </property>
+
+ <property>
+ <name>yarn.capacity-scheduler.root.queues</name>
+ <value>default</value>
+ </property>
+
+ <property>
+ <name>yarn.capacity-scheduler.root.capacity</name>
+ <value>100</value>
+ </property>
+
+ <property>
+ <name>yarn.capacity-scheduler.root.default.capacity</name>
+ <value>100</value>
+ </property>
+
+ <property>
+ <name>yarn.capacity-scheduler.root.default.user-limit-factor</name>
+ <value>1</value>
+ </property>
+
+ <property>
+ <name>yarn.capacity-scheduler.root.default.maximum-capacity</name>
+ <value>-1</value>
+ </property>
+
+ <property>
+ <name>yarn.capacity-scheduler.root.default.state</name>
+ <value>RUNNING</value>
+ </property>
+
+</configuration>
|