Author: schen
Date: Tue Feb 1 23:36:05 2011
New Revision: 1066273
URL: http://svn.apache.org/viewvc?rev=1066273&view=rev
Log:
MAPREDUCE-1783. Delay task Initialization till a job can be run
Modified:
hadoop/mapreduce/branches/branch-0.22/CHANGES.txt
hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Modified: hadoop/mapreduce/branches/branch-0.22/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/CHANGES.txt?rev=1066273&r1=1066272&r2=1066273&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.22/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.22/CHANGES.txt Tue Feb 1 23:36:05 2011
@@ -182,6 +182,8 @@ Release 0.22.0 - Unreleased
MAPREDUCE-2260. Remove auto-generated native build files. (rvs via eli)
+ MAPREDUCE-1783. Delay task Initialization till a job can be run
+
OPTIMIZATIONS
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
Modified: hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1066273&r1=1066272&r2=1066273&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
(original)
+++ hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
Tue Feb 1 23:36:05 2011
@@ -28,12 +28,16 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.util.ReflectionUtils;
@@ -49,18 +53,18 @@ public class FairScheduler extends TaskS
// How often to dump scheduler state to the event log
protected long dumpInterval = 10000;
-
+
// How often tasks are preempted (must be longer than a couple
// of heartbeats to give task-kill commands a chance to act).
protected long preemptionInterval = 15000;
-
+
// Used to iterate through map and reduce task types
- private static final TaskType[] MAP_AND_REDUCE =
+ private static final TaskType[] MAP_AND_REDUCE =
new TaskType[] {TaskType.MAP, TaskType.REDUCE};
-
+
// Maximum locality delay when auto-computing locality delays
private static final long MAX_AUTOCOMPUTED_LOCALITY_DELAY = 15000;
-
+
protected PoolManager poolMgr;
protected LoadManager loadMgr;
protected TaskSelector taskSelector;
@@ -82,15 +86,15 @@ public class FairScheduler extends TaskS
protected boolean preemptionEnabled;
protected boolean onlyLogPreemption; // Only log when tasks should be killed
private Clock clock;
- private EagerTaskInitializationListener eagerInitListener;
private JobListener jobListener;
+ private JobInitializer jobInitializer;
private boolean mockMode; // Used for unit tests; disables background updates
// and scheduler event log
private FairSchedulerEventLog eventLog;
protected long lastDumpTime; // Time when we last dumped state to log
- protected long lastHeartbeatTime; // Time we last ran assignTasks
+ protected long lastHeartbeatTime; // Time we last ran assignTasks
private long lastPreemptCheckTime; // Time we last ran preemptTasksIfNecessary
-
+
/**
* A class for holding per-job scheduler variables. These always contain the
* values of the variables at the last update(), and are used along with a
@@ -98,6 +102,8 @@ public class FairScheduler extends TaskS
*/
static class JobInfo {
boolean runnable = false; // Can the job run given user/pool limits?
+ // Does this job need to be initialized?
+ volatile boolean needsInitializing = true;
public JobSchedulable mapSchedulable;
public JobSchedulable reduceSchedulable;
// Variables used for delay scheduling
@@ -111,11 +117,11 @@ public class FairScheduler extends TaskS
this.lastMapLocalityLevel = LocalityLevel.NODE;
}
}
-
+
public FairScheduler() {
this(new Clock(), false);
}
-
+
/**
* Constructor used for tests, which can change the clock and disable updates.
*/
@@ -141,23 +147,18 @@ public class FairScheduler extends TaskS
eventLog.init(conf, hostname);
}
// Initialize other pieces of the scheduler
+ jobInitializer = new JobInitializer(conf, taskTrackerManager);
taskTrackerManager.addJobInProgressListener(jobListener);
- if (!mockMode) {
- eagerInitListener = new EagerTaskInitializationListener(conf);
- eagerInitListener.setTaskTrackerManager(taskTrackerManager);
- eagerInitListener.start();
- taskTrackerManager.addJobInProgressListener(eagerInitListener);
- }
poolMgr = new PoolManager(this);
poolMgr.initialize();
loadMgr = (LoadManager) ReflectionUtils.newInstance(
- conf.getClass("mapred.fairscheduler.loadmanager",
+ conf.getClass("mapred.fairscheduler.loadmanager",
CapBasedLoadManager.class, LoadManager.class), conf);
loadMgr.setTaskTrackerManager(taskTrackerManager);
loadMgr.setEventLog(eventLog);
loadMgr.start();
taskSelector = (TaskSelector) ReflectionUtils.newInstance(
- conf.getClass("mapred.fairscheduler.taskselector",
+ conf.getClass("mapred.fairscheduler.taskselector",
DefaultTaskSelector.class, TaskSelector.class), conf);
taskSelector.setTaskTrackerManager(taskTrackerManager);
taskSelector.start();
@@ -191,7 +192,7 @@ public class FairScheduler extends TaskS
"mapred.fairscheduler.locality.delay.node", defaultDelay);
rackLocalityDelay = conf.getLong(
"mapred.fairscheduler.locality.delay.rack", defaultDelay);
- if (defaultDelay == -1 &&
+ if (defaultDelay == -1 &&
(nodeLocalityDelay == -1 || rackLocalityDelay == -1)) {
autoComputeLocalityDelay = true; // Compute from heartbeat interval
}
@@ -231,14 +232,59 @@ public class FairScheduler extends TaskS
if (eventLog != null)
eventLog.log("SHUTDOWN");
running = false;
+ jobInitializer.terminate();
if (jobListener != null)
taskTrackerManager.removeJobInProgressListener(jobListener);
- if (eagerInitListener != null)
- taskTrackerManager.removeJobInProgressListener(eagerInitListener);
if (eventLog != null)
eventLog.shutdown();
}
-
+
+ private class JobInitializer {
+ private final int DEFAULT_NUM_THREADS = 1;
+ private ExecutorService threadPool;
+ private TaskTrackerManager ttm;
+
+
+ public JobInitializer(Configuration conf, TaskTrackerManager ttm) {
+ int numThreads = conf.getInt("mapred.jobinit.threads",
+ DEFAULT_NUM_THREADS);
+ threadPool = Executors.newFixedThreadPool(numThreads);
+ this.ttm = ttm;
+ }
+
+ public void initJob(JobInfo jobInfo, JobInProgress job) {
+ if (!mockMode) {
+ threadPool.execute(new InitJob(jobInfo, job));
+ } else {
+ new InitJob(jobInfo, job).run();
+ }
+ }
+
+ class InitJob implements Runnable {
+ private JobInfo jobInfo;
+ private JobInProgress job;
+
+ public InitJob(JobInfo jobInfo, JobInProgress job) {
+ this.jobInfo = jobInfo;
+ this.job = job;
+ }
+
+ public void run() {
+ ttm.initJob(job);
+ }
+ }
+
+ void terminate() {
+ LOG.info("Shutting down thread pool");
+ threadPool.shutdownNow();
+ try {
+ threadPool.awaitTermination(1, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ // Ignore, we are in shutdown anyway.
+ }
+ }
+ }
+
/**
* Used to listen for jobs added/removed by our {@link TaskTrackerManager}.
*/
@@ -254,7 +300,7 @@ public class FairScheduler extends TaskS
update();
}
}
-
+
@Override
public void jobRemoved(JobInProgress job) {
synchronized (FairScheduler.this) {
@@ -263,7 +309,7 @@ public class FairScheduler extends TaskS
infos.remove(job);
}
}
-
+
@Override
public void jobUpdated(JobChangeEvent event) {
eventLog.log("JOB_UPDATED", event.getJobInProgress().getJobID());
@@ -292,7 +338,7 @@ public class FairScheduler extends TaskS
}
}
}
-
+
@Override
public synchronized List<Task> assignTasks(TaskTracker tracker)
throws IOException {
@@ -301,7 +347,7 @@ public class FairScheduler extends TaskS
String trackerName = tracker.getTrackerName();
eventLog.log("HEARTBEAT", trackerName);
long currentTime = clock.getTime();
-
+
// Compute total runnable maps and reduces, and currently running ones
int runnableMaps = 0;
int runningMaps = 0;
@@ -316,17 +362,17 @@ public class FairScheduler extends TaskS
ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
// Compute total map/reduce slots
- // In the future we can precompute this if the Scheduler becomes a
+ // In the future we can precompute this if the Scheduler becomes a
// listener of tracker join/leave events.
int totalMapSlots = getTotalSlots(TaskType.MAP, clusterStatus);
int totalReduceSlots = getTotalSlots(TaskType.REDUCE, clusterStatus);
-
- eventLog.log("RUNNABLE_TASKS",
+
+ eventLog.log("RUNNABLE_TASKS",
runnableMaps, runningMaps, runnableReduces, runningReduces);
// Update time waited for local maps for jobs skipped on last heartbeat
updateLocalityWaitTimes(currentTime);
-
+
TaskTrackerStatus tts = tracker.getStatus();
int mapsAssigned = 0; // loop counter for map in the below while loop
@@ -398,8 +444,8 @@ public class FairScheduler extends TaskS
for (Schedulable sched: scheds) { // This loop will assign only one task
eventLog.log("INFO", "Checking for " + taskType +
" task in " + sched.getName());
- Task task = taskType == TaskType.MAP ?
- sched.assignTask(tts, currentTime, visitedForMap) :
+ Task task = taskType == TaskType.MAP ?
+ sched.assignTask(tts, currentTime, visitedForMap) :
sched.assignTask(tts, currentTime, visitedForReduce);
if (task != null) {
foundTask = true;
@@ -439,7 +485,7 @@ public class FairScheduler extends TaskS
infos.get(job).skippedAtLastHeartbeat = true;
}
}
-
+
// If no tasks were found, return null
return tasks.isEmpty() ? null : tasks;
}
@@ -464,7 +510,7 @@ public class FairScheduler extends TaskS
* Update locality wait times for jobs that were skipped at last heartbeat.
*/
private void updateLocalityWaitTimes(long currentTime) {
- long timeSinceLastHeartbeat =
+ long timeSinceLastHeartbeat =
(lastHeartbeatTime == 0 ? 0 : currentTime - lastHeartbeatTime);
lastHeartbeatTime = currentTime;
for (JobInfo info: infos.values()) {
@@ -476,7 +522,7 @@ public class FairScheduler extends TaskS
}
/**
- * Update a job's locality level and locality wait variables given that that
+ * Update a job's locality level and locality wait variables given that that
* it has just launched a map task on a given task tracker.
*/
private void updateLastMapLocalityLevel(JobInProgress job,
@@ -494,7 +540,7 @@ public class FairScheduler extends TaskS
* launch tasks, based on how long it has been waiting for local tasks.
* This is used to implement the "delay scheduling" feature of the Fair
* Scheduler for optimizing data locality.
- * If the job has no locality information (e.g. it does not use HDFS), this
+ * If the job has no locality information (e.g. it does not use HDFS), this
* method returns LocalityLevel.ANY, allowing tasks at any level.
* Otherwise, the job can only launch tasks at its current locality level
* or lower, unless it has waited at least nodeLocalityDelay or
@@ -543,17 +589,17 @@ public class FairScheduler extends TaskS
return LocalityLevel.ANY;
}
}
-
+
/**
* Recompute the internal variables used by the scheduler - per-job weights,
* fair shares, deficits, minimum slot allocations, and numbers of running
- * and needed tasks of each type.
+ * and needed tasks of each type.
*/
protected void update() {
- // Making more granular locking so that clusterStatus can be fetched
+ // Making more granular locking so that clusterStatus can be fetched
// from Jobtracker without locking the scheduler.
ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
-
+
// Recompute locality delay from JobTracker heartbeat interval if enabled.
// This will also lock the JT, so do it outside of a fair scheduler lock.
if (autoComputeLocalityDelay) {
@@ -562,15 +608,15 @@ public class FairScheduler extends TaskS
(long) (1.5 * jobTracker.getNextHeartbeatInterval()));
rackLocalityDelay = nodeLocalityDelay;
}
-
+
// Got clusterStatus hence acquiring scheduler lock now.
synchronized (this) {
// Reload allocations file if it hasn't been loaded in a while
poolMgr.reloadAllocsIfNecessary();
-
+
// Remove any jobs that have stopped running
List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
- for (JobInProgress job: infos.keySet()) {
+ for (JobInProgress job: infos.keySet()) {
int runState = job.getStatus().getRunState();
if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED
|| runState == JobStatus.KILLED) {
@@ -581,15 +627,15 @@ public class FairScheduler extends TaskS
infos.remove(job);
poolMgr.removeJob(job);
}
-
- updateRunnability(); // Set job runnability based on user/pool limits
-
+
+ updateRunnability(); // Set job runnability based on user/pool limits
+
// Update demands of jobs and pools
for (Pool pool: poolMgr.getPools()) {
pool.getMapSchedulable().updateDemand();
pool.getReduceSchedulable().updateDemand();
}
-
+
// Compute fair shares based on updated demands
List<PoolSchedulable> mapScheds = getPoolSchedulables(TaskType.MAP);
List<PoolSchedulable> reduceScheds = getPoolSchedulables(TaskType.REDUCE);
@@ -597,18 +643,18 @@ public class FairScheduler extends TaskS
mapScheds, clusterStatus.getMaxMapTasks());
SchedulingAlgorithms.computeFairShares(
reduceScheds, clusterStatus.getMaxReduceTasks());
-
+
// Use the computed shares to assign shares within each pool
for (Pool pool: poolMgr.getPools()) {
pool.getMapSchedulable().redistributeShare();
pool.getReduceSchedulable().redistributeShare();
}
-
+
if (preemptionEnabled)
updatePreemptionVariables();
}
}
-
+
public List<PoolSchedulable> getPoolSchedulables(TaskType type) {
List<PoolSchedulable> scheds = new ArrayList<PoolSchedulable>();
for (Pool pool: poolMgr.getPools()) {
@@ -616,7 +662,7 @@ public class FairScheduler extends TaskS
}
return scheds;
}
-
+
private void updateRunnability() {
// Start by marking everything as not runnable
for (JobInfo info: infos.values()) {
@@ -630,16 +676,27 @@ public class FairScheduler extends TaskS
Map<String, Integer> userJobs = new HashMap<String, Integer>();
Map<String, Integer> poolJobs = new HashMap<String, Integer>();
for (JobInProgress job: jobs) {
- if (job.getStatus().getRunState() == JobStatus.RUNNING) {
- String user = job.getJobConf().getUser();
- String pool = poolMgr.getPoolName(job);
- int userCount = userJobs.containsKey(user) ? userJobs.get(user) : 0;
- int poolCount = poolJobs.containsKey(pool) ? poolJobs.get(pool) : 0;
- if (userCount < poolMgr.getUserMaxJobs(user) &&
- poolCount < poolMgr.getPoolMaxJobs(pool)) {
- infos.get(job).runnable = true;
+ String user = job.getJobConf().getUser();
+ String pool = poolMgr.getPoolName(job);
+ int userCount = userJobs.containsKey(user) ? userJobs.get(user) : 0;
+ int poolCount = poolJobs.containsKey(pool) ? poolJobs.get(pool) : 0;
+ if (userCount < poolMgr.getUserMaxJobs(user) &&
+ poolCount < poolMgr.getPoolMaxJobs(pool)) {
+ if (job.getStatus().getRunState() == JobStatus.RUNNING ||
+ job.getStatus().getRunState() == JobStatus.PREP) {
userJobs.put(user, userCount + 1);
poolJobs.put(pool, poolCount + 1);
+ JobInfo jobInfo = infos.get(job);
+ if (job.getStatus().getRunState() == JobStatus.RUNNING) {
+ jobInfo.runnable = true;
+ } else {
+ // The job is in the PREP state. Give it to the job initializer
+ // for initialization if we have not already done it.
+ if (jobInfo.needsInitializing) {
+ jobInfo.needsInitializing = false;
+ jobInitializer.initJob(jobInfo, job);
+ }
+ }
}
}
}
@@ -655,7 +712,7 @@ public class FairScheduler extends TaskS
// Set weight based on runnable tasks
JobInfo info = infos.get(job);
int runnableTasks = (taskType == TaskType.MAP) ?
- info.mapSchedulable.getDemand() :
+ info.mapSchedulable.getDemand() :
info.reduceSchedulable.getDemand();
weight = Math.log1p(runnableTasks) / Math.log(2);
}
@@ -677,7 +734,7 @@ public class FairScheduler extends TaskS
default: return 0.25; // priority = VERY_LOW
}
}
-
+
public PoolManager getPoolManager() {
return poolMgr;
}
@@ -716,7 +773,7 @@ public class FairScheduler extends TaskS
int desiredShare = Math.min(sched.getMinShare(), sched.getDemand());
return (sched.getRunningTasks() < desiredShare);
}
-
+
/**
* Is a pool being starved for fair share for the given task type?
* This is defined as being below half its fair share.
@@ -733,19 +790,19 @@ public class FairScheduler extends TaskS
* have been below half their fair share for the fairSharePreemptionTimeout.
* If such pools exist, compute how many tasks of each type need to be
* preempted and then select the right ones using preemptTasks.
- *
+ *
* This method computes and logs the number of tasks we want to preempt even
* if preemption is disabled, for debugging purposes.
*/
protected void preemptTasksIfNecessary() {
if (!preemptionEnabled)
return;
-
+
long curTime = clock.getTime();
if (curTime - lastPreemptCheckTime < preemptionInterval)
return;
lastPreemptCheckTime = curTime;
-
+
// Acquire locks on both the JobTracker (task tracker manager) and this
// because we might need to call some JobTracker methods (killTask).
synchronized (taskTrackerManager) {
@@ -768,9 +825,9 @@ public class FairScheduler extends TaskS
}
/**
- * Preempt a given number of tasks from a list of PoolSchedulables.
- * The policy for this is to pick tasks from pools that are over their fair
- * share, but make sure that no pool is placed below its fair share in the
+ * Preempt a given number of tasks from a list of PoolSchedulables.
+ * The policy for this is to pick tasks from pools that are over their fair
+ * share, but make sure that no pool is placed below its fair share in the
* process. Furthermore, we want to minimize the amount of computation
* wasted by preemption, so out of the tasks in over-scheduled pools, we
* prefer to preempt tasks that started most recently.
@@ -778,9 +835,9 @@ public class FairScheduler extends TaskS
private void preemptTasks(List<PoolSchedulable> scheds, int tasksToPreempt) {
if (scheds.isEmpty() || tasksToPreempt == 0)
return;
-
+
TaskType taskType = scheds.get(0).getTaskType();
-
+
// Collect running tasks of our type from over-scheduled pools
List<TaskStatus> runningTasks = new ArrayList<TaskStatus>();
for (PoolSchedulable sched: scheds) {
@@ -789,7 +846,7 @@ public class FairScheduler extends TaskS
runningTasks.addAll(getRunningTasks(js.getJob(), taskType));
}
}
-
+
// Sort tasks into reverse order of start time
Collections.sort(runningTasks, new Comparator<TaskStatus>() {
public int compare(TaskStatus t1, TaskStatus t2) {
@@ -801,15 +858,15 @@ public class FairScheduler extends TaskS
return -1;
}
});
-
+
// Maintain a count of tasks left in each pool; this is a bit
// faster than calling runningTasks() on the pool repeatedly
// because the latter must scan through jobs in the pool
- HashMap<Pool, Integer> tasksLeft = new HashMap<Pool, Integer>();
+ HashMap<Pool, Integer> tasksLeft = new HashMap<Pool, Integer>();
for (Pool p: poolMgr.getPools()) {
tasksLeft.put(p, p.getSchedulable(taskType).getRunningTasks());
}
-
+
// Scan down the sorted list of task statuses until we've killed enough
// tasks, making sure we don't kill too many from any pool
for (TaskStatus status: runningTasks) {
@@ -862,8 +919,8 @@ public class FairScheduler extends TaskS
}
int tasksToPreempt = Math.max(tasksDueToMinShare, tasksDueToFairShare);
if (tasksToPreempt > 0) {
- String message = "Should preempt " + tasksToPreempt + " "
- + sched.getTaskType() + " tasks for pool " + sched.getName()
+ String message = "Should preempt " + tasksToPreempt + " "
+ + sched.getTaskType() + " tasks for pool " + sched.getName()
+ ": tasksDueToMinShare = " + tasksDueToMinShare
+ ", tasksDueToFairShare = " + tasksDueToFairShare;
eventLog.log("INFO", message);
@@ -930,7 +987,7 @@ public class FairScheduler extends TaskS
synchronized (eventLog) {
eventLog.log("BEGIN_DUMP");
// List jobs in order of submit time
- ArrayList<JobInProgress> jobs =
+ ArrayList<JobInProgress> jobs =
new ArrayList<JobInProgress>(infos.keySet());
Collections.sort(jobs, new Comparator<JobInProgress>() {
public int compare(JobInProgress j1, JobInProgress j2) {
@@ -986,7 +1043,7 @@ public class FairScheduler extends TaskS
public Clock getClock() {
return clock;
}
-
+
public FairSchedulerEventLog getEventLog() {
return eventLog;
}
Modified: hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1066273&r1=1066272&r2=1066273&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
(original)
+++ hadoop/mapreduce/branches/branch-0.22/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Tue Feb 1 23:36:05 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.mapreduce.split.JobSplit;
import org.apache.hadoop.net.Node;
@@ -63,6 +64,7 @@ public class TestFairScheduler extends T
private int mapCounter = 0;
private int reduceCounter = 0;
private final String[][] mapInputLocations; // Array of hosts for each map
+ private boolean initialized;
public FakeJobInProgress(JobConf jobConf,
FakeTaskTrackerManager taskTrackerManager,
@@ -79,7 +81,7 @@ public class TestFairScheduler extends T
this.nonRunningReduces = new LinkedList<TaskInProgress>();
this.runningReduces = new LinkedHashSet<TaskInProgress>();
this.jobHistory = new FakeJobHistory();
- initTasks();
+ this.initialized = false;
}
@Override
@@ -130,6 +132,12 @@ public class TestFairScheduler extends T
reduces[i] = new FakeTaskInProgress(getJobID(), i,
getJobConf(), this);
}
+
+ initialized = true;
+ }
+
+ public boolean isInitialized() {
+ return initialized;
}
@Override
@@ -412,7 +420,12 @@ public class TestFairScheduler extends T
}
public void initJob (JobInProgress job) {
- // do nothing
+ try {
+ job.initTasks();
+ } catch (KillInterruptedException e) {
+ } catch (IOException e) {
+ }
+ job.status.setRunState(JobStatus.RUNNING);
}
public void failJob (JobInProgress job) {
@@ -525,18 +538,23 @@ public class TestFairScheduler extends T
}
}
+ private JobInProgress submitJobNotInitialized(int state, int maps, int reduces)
+ throws IOException {
+ return submitJob(state, maps, reduces, null, null, false);
+ }
+
private JobInProgress submitJob(int state, int maps, int reduces)
throws IOException {
- return submitJob(state, maps, reduces, null, null);
+ return submitJob(state, maps, reduces, null, null, true);
}
private JobInProgress submitJob(int state, int maps, int reduces, String pool)
throws IOException {
- return submitJob(state, maps, reduces, pool, null);
+ return submitJob(state, maps, reduces, pool, null, true);
}
private JobInProgress submitJob(int state, int maps, int reduces, String pool,
- String[][] mapInputLocations) throws IOException {
+ String[][] mapInputLocations, boolean initializeJob) throws IOException {
JobConf jobConf = new JobConf(conf);
jobConf.setNumMapTasks(maps);
jobConf.setNumReduceTasks(reduces);
@@ -544,6 +562,9 @@ public class TestFairScheduler extends T
jobConf.set(POOL_PROPERTY, pool);
JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager,
mapInputLocations, UtilsForTests.getJobTracker());
+ if (initializeJob) {
+ taskTrackerManager.initJob(job);
+ }
job.getStatus().setRunState(state);
taskTrackerManager.submitJob(job);
job.startTime = clock.time;
@@ -641,7 +662,6 @@ public class TestFairScheduler extends T
}
public void testNonRunningJobsAreIgnored() throws IOException {
- submitJobs(1, JobStatus.PREP, 10, 10);
submitJobs(1, JobStatus.SUCCEEDED, 10, 10);
submitJobs(1, JobStatus.FAILED, 10, 10);
submitJobs(1, JobStatus.KILLED, 10, 10);
@@ -1345,18 +1365,24 @@ public class TestFairScheduler extends T
// Submit jobs, advancing time in-between to make sure that they are
// all submitted at distinct times.
- JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+ JobInProgress job1 = submitJobNotInitialized(JobStatus.PREP, 10, 10);
JobInfo info1 = scheduler.infos.get(job1);
advanceTime(10);
- JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
+ JobInProgress job2 = submitJobNotInitialized(JobStatus.PREP, 10, 10);
JobInfo info2 = scheduler.infos.get(job2);
advanceTime(10);
- JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
+ JobInProgress job3 = submitJobNotInitialized(JobStatus.PREP, 10, 10);
JobInfo info3 = scheduler.infos.get(job3);
advanceTime(10);
- JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
+ JobInProgress job4 = submitJobNotInitialized(JobStatus.PREP, 10, 10);
JobInfo info4 = scheduler.infos.get(job4);
-
+
+ // Only two of the jobs should be initialized.
+ assertTrue(((FakeJobInProgress)job1).isInitialized());
+ assertTrue(((FakeJobInProgress)job2).isInitialized());
+ assertFalse(((FakeJobInProgress)job3).isInitialized());
+ assertFalse(((FakeJobInProgress)job4).isInitialized());
+
// Check scheduler variables
assertEquals(2.0, info1.mapSchedulable.getFairShare());
assertEquals(2.0, info1.reduceSchedulable.getFairShare());
@@ -2253,7 +2279,7 @@ public class TestFairScheduler extends T
JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1",
new String[][] {
{"rack2.node2"}
- });
+ }, true);
JobInfo info1 = scheduler.infos.get(job1);
// Advance time before submitting another job j2, to make j1 be ahead
@@ -2301,7 +2327,7 @@ public class TestFairScheduler extends T
JobInProgress job1 = submitJob(JobStatus.RUNNING, 4, 0, "pool1",
new String[][] {
{"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}
- });
+ }, true);
JobInfo info1 = scheduler.infos.get(job1);
// Advance time before submitting another job j2, to make j1 be ahead
@@ -2384,7 +2410,7 @@ public class TestFairScheduler extends T
new String[][] {
{"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"},
{"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"}, {"rack2.node2"},
- });
+ }, true);
JobInfo info1 = scheduler.infos.get(job1);
advanceTime(100);
@@ -2672,6 +2698,7 @@ public class TestFairScheduler extends T
jobConf.set(EXPLICIT_POOL_PROPERTY, "poolA");
JobInProgress job3 = new FakeJobInProgress(jobConf, taskTrackerManager,
null, UtilsForTests.getJobTracker());
+ job3.initTasks();
job3.getStatus().setRunState(JobStatus.RUNNING);
taskTrackerManager.submitJob(job3);
@@ -2687,6 +2714,7 @@ public class TestFairScheduler extends T
jobConf2.set(POOL_PROPERTY, "poolA");
JobInProgress job4 = new FakeJobInProgress(jobConf2, taskTrackerManager,
null, UtilsForTests.getJobTracker());
+ job4.initTasks();
job4.getStatus().setRunState(JobStatus.RUNNING);
taskTrackerManager.submitJob(job4);
@@ -2708,10 +2736,10 @@ public class TestFairScheduler extends T
protected void checkAssignment(String taskTrackerName,
String... expectedTasks) throws IOException {
List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
+ assertNotNull(tasks);
System.out.println("Assigned tasks:");
for (int i = 0; i < tasks.size(); i++)
System.out.println("- " + tasks.get(i));
- assertNotNull(tasks);
assertEquals(expectedTasks.length, tasks.size());
for (int i = 0; i < tasks.size(); i++)
assertEquals("assignment " + i, expectedTasks[i], tasks.get(i).toString());
|