Author: vinodkv
Date: Wed Aug 3 11:48:31 2011
New Revision: 1153447
URL: http://svn.apache.org/viewvc?rev=1153447&view=rev
Log:
Completing the scheduler-dispatch cycle.
Modified:
hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
Modified: hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java?rev=1153447&r1=1153446&r2=1153447&view=diff
==============================================================================
--- hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
(original)
+++ hadoop/common/branches/MR-279/mapreduce/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
Wed Aug 3 11:48:31 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@@ -32,7 +34,6 @@ import org.apache.hadoop.util.Reflection
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
@@ -43,8 +44,8 @@ import org.apache.hadoop.yarn.security.c
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -53,17 +54,16 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.webapp.WebApp;
@@ -97,6 +97,7 @@ public class ResourceManager extends Com
private ContainerAllocationExpirer containerAllocationExpirer;
protected NMLivelinessMonitor nmLivelinessMonitor;
protected NodesListManager nodesListManager;
+ private SchedulerEventDispatcher schedulerDispatcher;
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private WebApp webApp;
@@ -136,7 +137,10 @@ public class ResourceManager extends Com
this.conf = new YarnConfiguration(conf);
// Initialize the scheduler
this.scheduler = createScheduler();
- this.rmDispatcher.register(SchedulerEventType.class, scheduler);
+ this.schedulerDispatcher = new SchedulerEventDispatcher(this.scheduler);
+ addService(this.schedulerDispatcher);
+ this.rmDispatcher.register(SchedulerEventType.class,
+ this.schedulerDispatcher);
// Register event handler for RmAppEvents
this.rmDispatcher.register(RMAppEventType.class,
@@ -212,6 +216,73 @@ public class ResourceManager extends Com
}
@Private
+ public static final class SchedulerEventDispatcher extends AbstractService
+ implements EventHandler<SchedulerEvent> {
+
+ private final ResourceScheduler scheduler;
+ private final BlockingQueue<SchedulerEvent> eventQueue =
+ new LinkedBlockingQueue<SchedulerEvent>();
+ private final Thread eventProcessor;
+
+ public SchedulerEventDispatcher(ResourceScheduler scheduler) {
+ super(SchedulerEventDispatcher.class.getName());
+ this.scheduler = scheduler;
+ this.eventProcessor = new Thread(new EventProcessor());
+ }
+
+ @Override
+ public synchronized void start() {
+ this.eventProcessor.start();
+ super.start();
+ }
+
+ private final class EventProcessor implements Runnable {
+ @Override
+ public void run() {
+
+ SchedulerEvent event;
+
+ while (!Thread.currentThread().isInterrupted()) {
+ try {
+ event = eventQueue.take();
+ } catch (InterruptedException e) {
+ LOG.error("Returning, interrupted : " + e);
+ return; // TODO: Kill RM.
+ }
+
+ try {
+ scheduler.handle(event);
+ } catch (Throwable t) {
+ LOG.error("Error in handling event type " + event.getType()
+ + " to the scheduler", t);
+ return; // TODO: Kill RM.
+ }
+ }
+ }
+ }
+
+ @Override
+ public synchronized void stop() {
+ this.eventProcessor.interrupt();
+ try {
+ this.eventProcessor.join();
+ } catch (InterruptedException e) {
+ throw new YarnException(e);
+ }
+ super.stop();
+ }
+
+ @Override
+ public void handle(SchedulerEvent event) {
+ try {
+ this.eventQueue.put(event);
+ } catch (InterruptedException e) {
+ throw new YarnException(e);
+ }
+ }
+ }
+
+ @Private
public static final class ApplicationEventDispatcher implements
EventHandler<RMAppEvent> {
|