Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,417 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import junit.framework.Assert;
+
+import org.apache.avro.ipc.AvroRemoteException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.RMResourceTrackerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
+import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
+import org.apache.hadoop.yarn.AMRMProtocol;
+import org.apache.hadoop.yarn.AMResponse;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ApplicationMaster;
+import org.apache.hadoop.yarn.ApplicationStatus;
+import org.apache.hadoop.yarn.Container;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.NodeID;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.ResourceRequest;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.junit.Test;
+
+public class TestRMContainerAllocator {
+ private static final Log LOG = LogFactory.getLog(TestRMContainerAllocator.class);
+
+ @Test
+ public void testSimple() throws Exception {
+ FifoScheduler scheduler = createScheduler();
+ LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
+ scheduler);
+
+ //add resources to scheduler
+ NodeInfo nodeManager1 = addNode(scheduler, "h1", 10240);
+ NodeInfo nodeManager2 = addNode(scheduler, "h2", 10240);
+ NodeInfo nodeManager3 = addNode(scheduler, "h3", 10240);
+
+ //create the container request
+ ContainerRequestEvent event1 =
+ createReq(1, 1024, 1, new String[]{"h1"});
+ allocator.sendRequest(event1);
+
+ //send 1 more request with different resource req
+ ContainerRequestEvent event2 = createReq(2, 1024, 1, new String[]{"h2"});
+ allocator.sendRequest(event2);
+
+ //this tells the scheduler about the requests
+ //as nodes are not added, no allocations
+ List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ //send another request with different resource and priority
+ ContainerRequestEvent event3 = createReq(3, 1024, 1, new String[]{"h3"});
+ allocator.sendRequest(event3);
+
+ //this tells the scheduler about the requests
+ //as nodes are not added, no allocations
+ assigned = allocator.schedule();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ //update resources in scheduler
+ scheduler.nodeUpdate(nodeManager1, null); // Node heartbeat
+ scheduler.nodeUpdate(nodeManager2, null); // Node heartbeat
+ scheduler.nodeUpdate(nodeManager3, null); // Node heartbeat
+
+
+ assigned = allocator.schedule();
+ checkAssignments(
+ new ContainerRequestEvent[]{event1, event2, event3}, assigned, false);
+ }
+
+ //TODO: Currently Scheduler seems to have bug where it does not work
+ //for Application asking for containers with different capabilities.
+ //@Test
+ public void testResource() throws Exception {
+ FifoScheduler scheduler = createScheduler();
+ LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
+ scheduler);
+
+ //add resources to scheduler
+ NodeInfo nodeManager1 = addNode(scheduler, "h1", 10240);
+ NodeInfo nodeManager2 = addNode(scheduler, "h2", 10240);
+ NodeInfo nodeManager3 = addNode(scheduler, "h3", 10240);
+
+ //create the container request
+ ContainerRequestEvent event1 =
+ createReq(1, 1024, 1, new String[]{"h1"});
+ allocator.sendRequest(event1);
+
+ //send 1 more request with different resource req
+ ContainerRequestEvent event2 = createReq(2, 2048, 1, new String[]{"h2"});
+ allocator.sendRequest(event2);
+
+ //this tells the scheduler about the requests
+ //as nodes are not added, no allocations
+ List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ //update resources in scheduler
+ scheduler.nodeUpdate(nodeManager1, null); // Node heartbeat
+ scheduler.nodeUpdate(nodeManager2, null); // Node heartbeat
+ scheduler.nodeUpdate(nodeManager3, null); // Node heartbeat
+
+ assigned = allocator.schedule();
+ checkAssignments(
+ new ContainerRequestEvent[]{event1, event2}, assigned, false);
+ }
+
+ @Test
+ public void testPriority() throws Exception {
+ FifoScheduler scheduler = createScheduler();
+ LocalRMContainerAllocator allocator = new LocalRMContainerAllocator(
+ scheduler);
+
+ //add resources to scheduler
+ NodeInfo nodeManager1 = addNode(scheduler, "h1", 1024);
+ NodeInfo nodeManager2 = addNode(scheduler, "h2", 10240);
+ NodeInfo nodeManager3 = addNode(scheduler, "h3", 10240);
+
+ //create the container request
+ ContainerRequestEvent event1 =
+ createReq(1, 2048, 1, new String[]{"h1", "h2"});
+ allocator.sendRequest(event1);
+
+ //send 1 more request with different priority
+ ContainerRequestEvent event2 = createReq(2, 2048, 2, new String[]{"h1"});
+ allocator.sendRequest(event2);
+
+ //send 1 more request with different priority
+ ContainerRequestEvent event3 = createReq(3, 2048, 3, new String[]{"h3"});
+ allocator.sendRequest(event3);
+
+ //this tells the scheduler about the requests
+ //as nodes are not added, no allocations
+ List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ //update resources in scheduler
+ scheduler.nodeUpdate(nodeManager1, null); // Node heartbeat
+ scheduler.nodeUpdate(nodeManager2, null); // Node heartbeat
+ scheduler.nodeUpdate(nodeManager3, null); // Node heartbeat
+
+ assigned = allocator.schedule();
+ checkAssignments(
+ new ContainerRequestEvent[]{event1, event2, event3}, assigned, false);
+
+ //validate that no container is assigned to h1 as it doesn't have 2048
+ for (TaskAttemptContainerAssignedEvent assig : assigned) {
+ Assert.assertFalse("Assigned count not correct",
+ "h1".equals(assig.getContainerManagerAddress()));
+ }
+ }
+
+
+
+ private NodeInfo addNode(FifoScheduler scheduler,
+ String nodeName, int memory) {
+ NodeID nodeId = new NodeID();
+ nodeId.id = 0;
+ Resource resource = new Resource();
+ resource.memory = memory;
+ NodeInfo nodeManager = scheduler.addNode(nodeId, nodeName,
+ RMResourceTrackerImpl.resolve(nodeName), resource); // Node registration
+ return nodeManager;
+ }
+
+ private FifoScheduler createScheduler() throws AvroRemoteException {
+ FifoScheduler fsc = new FifoScheduler(new Configuration(),
+ new ContainerTokenSecretManager()) {
+ //override this to copy the objects
+ //otherwise FifoScheduler updates the numContainers in same objects as kept by
+ //RMContainerAllocator
+ @Override
+ public synchronized List<Container> allocate(ApplicationID applicationId,
+ List<ResourceRequest> ask, List<Container> release)
+ throws IOException {
+ List<ResourceRequest> askCopy = new ArrayList<ResourceRequest>();
+ for (ResourceRequest req : ask) {
+ ResourceRequest reqCopy = new ResourceRequest();
+ reqCopy.priority = req.priority;
+ reqCopy.hostName = req.hostName;
+ reqCopy.capability = req.capability;
+ reqCopy.numContainers = req.numContainers;
+ askCopy.add(reqCopy);
+ }
+ //no need to copy release
+ return super.allocate(applicationId, askCopy, release);
+ }
+ };
+ try {
+ fsc.addApplication(new ApplicationID(), "test", null, null);
+ } catch(IOException ie) {
+ LOG.info("add application failed with ", ie);
+ assert(false);
+ }
+ return fsc;
+ }
+
+ private ContainerRequestEvent createReq(
+ int attemptid, int memory, int priority, String[] hosts) {
+ TaskAttemptID attemptId = new TaskAttemptID();
+ attemptId.id = attemptid;
+ Resource containerNeed = new Resource();
+ containerNeed.memory = memory;
+ return new ContainerRequestEvent(attemptId,
+ containerNeed, priority,
+ hosts, new String[] {NetworkTopology.DEFAULT_RACK});
+ }
+
+ private void checkAssignments(ContainerRequestEvent[] requests,
+ List<TaskAttemptContainerAssignedEvent> assignments,
+ boolean checkHostMatch) {
+ Assert.assertNotNull("Container not assigned", assignments);
+ Assert.assertEquals("Assigned count not correct",
+ requests.length, assignments.size());
+
+ //check for uniqueness of containerIDs
+ Set<ContainerID> containerIds = new HashSet<ContainerID>();
+ for (TaskAttemptContainerAssignedEvent assigned : assignments) {
+ containerIds.add(assigned.getContainerID());
+ }
+ Assert.assertEquals("Assigned containers must be different",
+ assignments.size(), containerIds.size());
+
+ //check for all assignment
+ for (ContainerRequestEvent req : requests) {
+ TaskAttemptContainerAssignedEvent assigned = null;
+ for (TaskAttemptContainerAssignedEvent ass : assignments) {
+ if (ass.getTaskAttemptID().equals(req.getAttemptID())){
+ assigned = ass;
+ break;
+ }
+ }
+ checkAssignment(req, assigned, checkHostMatch);
+ }
+ }
+
+ private void checkAssignment(ContainerRequestEvent request,
+ TaskAttemptContainerAssignedEvent assigned, boolean checkHostMatch) {
+ Assert.assertNotNull("Nothing assigned to attempt " + request.getAttemptID(),
+ assigned);
+ Assert.assertEquals("assigned to wrong attempt", request.getAttemptID(),
+ assigned.getTaskAttemptID());
+ if (checkHostMatch) {
+ Assert.assertTrue("Not assigned to requested host", Arrays.asList
+ (request.getHosts()).contains(assigned.getContainerManagerAddress()));
+ }
+
+ }
+
+ //Mock RMContainerAllocator
+ //Instead of talking to remote Scheduler,uses the local Scheduler
+ public static class LocalRMContainerAllocator extends RMContainerAllocator {
+ private static final List<TaskAttemptContainerAssignedEvent> events =
+ new ArrayList<TaskAttemptContainerAssignedEvent>();
+
+ public static class AMRMProtocolImpl implements AMRMProtocol {
+
+ private ResourceScheduler resourceScheduler;
+
+ public AMRMProtocolImpl(ResourceScheduler resourceScheduler) {
+ this.resourceScheduler = resourceScheduler;
+ }
+
+ @Override
+ public Void registerApplicationMaster(
+ ApplicationMaster applicationMaster) throws AvroRemoteException {
+ return null;
+ }
+
+ @Override
+ public AMResponse allocate(ApplicationStatus status,
+ List<ResourceRequest> ask, List<Container> release)
+ throws AvroRemoteException {
+ try {
+ AMResponse response = new AMResponse();
+ response.containers = resourceScheduler.allocate(status.applicationId, ask, release);
+ return response;
+ } catch(IOException ie) {
+ throw RPCUtil.getRemoteException(ie);
+ }
+ }
+
+ @Override
+ public Void finishApplicationMaster(ApplicationMaster applicationMaster)
+ throws AvroRemoteException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ }
+
+ private ResourceScheduler scheduler;
+ LocalRMContainerAllocator(ResourceScheduler scheduler) {
+ super(null, new TestContext(events));
+ this.scheduler = scheduler;
+ super.init(new Configuration());
+ super.start();
+ }
+
+ protected AMRMProtocol createSchedulerProxy() {
+ return new AMRMProtocolImpl(scheduler);
+ }
+
+ @Override
+ protected void register() {}
+ @Override
+ protected void unregister() {}
+
+ public void sendRequest(ContainerRequestEvent req) {
+ sendRequests(Arrays.asList(new ContainerRequestEvent[]{req}));
+ }
+
+ public void sendRequests(List<ContainerRequestEvent> reqs) {
+ for (ContainerRequestEvent req : reqs) {
+ handle(req);
+ }
+ }
+
+ //API to be used by tests
+ public List<TaskAttemptContainerAssignedEvent> schedule() {
+ //run the scheduler
+ try {
+ allocate();
+ } catch (Exception e) {
+
+ }
+
+ List<TaskAttemptContainerAssignedEvent> result = new ArrayList(events);
+ events.clear();
+ return result;
+ }
+
+ protected void startAllocatorThread() {
+ //override to NOT start thread
+ }
+
+ static class TestContext implements AppContext {
+ private List<TaskAttemptContainerAssignedEvent> events;
+ TestContext(List<TaskAttemptContainerAssignedEvent> events) {
+ this.events = events;
+ }
+ @Override
+ public Map<JobID, Job> getAllJobs() {
+ return null;
+ }
+ @Override
+ public ApplicationID getApplicationID() {
+ return new ApplicationID();
+ }
+ @Override
+ public EventHandler getEventHandler() {
+ return new EventHandler() {
+ @Override
+ public void handle(Event event) {
+ events.add((TaskAttemptContainerAssignedEvent) event);
+ }
+ };
+ }
+ @Override
+ public Job getJob(JobID jobID) {
+ return null;
+ }
+
+ @Override
+ public CharSequence getUser() {
+ return null;
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ TestRMContainerAllocator t = new TestRMContainerAllocator();
+ t.testSimple();
+ //t.testResource();
+ t.testPriority();
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,757 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.Clock;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.service.CompositeService;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRuntimeEstimators {
+
+ private static int INITIAL_NUMBER_FREE_SLOTS = 300;
+ private static int MAP_SLOT_REQUIREMENT = 3;
+ // this has to be at least as much as map slot requirement
+ private static int REDUCE_SLOT_REQUIREMENT = 4;
+ private static int MAP_TASKS = 200;
+ private static int REDUCE_TASKS = 150;
+
+ private Queue<TaskEvent> taskEvents;
+
+ Clock clock;
+
+ Job myJob;
+
+ AppContext myAppContext;
+
+ private static final Log LOG = LogFactory.getLog(TestRuntimeEstimators.class);
+
+ private final AtomicInteger slotsInUse = new AtomicInteger(0);
+
+ Dispatcher dispatcher;
+
+ DefaultSpeculator speculator;
+
+ TaskRuntimeEstimator estimator;
+
+ // This is a huge kluge. The real implementations have a decent approach
+ private final AtomicInteger completedMaps = new AtomicInteger(0);
+ private final AtomicInteger completedReduces = new AtomicInteger(0);
+
+ private final AtomicInteger successfulSpeculations
+ = new AtomicInteger(0);
+ private final AtomicLong taskTimeSavedBySpeculation
+ = new AtomicLong(0L);
+
+ private void coreTestEstimator
+ (TaskRuntimeEstimator testedEstimator, int expectedSpeculations) {
+ estimator = testedEstimator;
+ taskEvents = new ConcurrentLinkedQueue<TaskEvent>();
+ myJob = null;
+ slotsInUse.set(0);
+ completedMaps.set(0);
+ completedReduces.set(0);
+ successfulSpeculations.set(0);
+ taskTimeSavedBySpeculation.set(0);
+
+ ((MockClock)clock).advanceTime(1000);
+
+ Configuration conf = new Configuration();
+
+ myAppContext = new MyAppContext(MAP_TASKS, REDUCE_TASKS);
+ myJob = myAppContext.getAllJobs().values().iterator().next();
+
+ estimator.contextualize(conf, myAppContext);
+
+ speculator = new DefaultSpeculator(conf, myAppContext, estimator, clock);
+
+ dispatcher.register(Speculator.EventType.class, speculator);
+
+ dispatcher.register(TaskEventType.class, new SpeculationRequestEventHandler());
+
+ ((AsyncDispatcher)dispatcher).init(conf);
+ ((AsyncDispatcher)dispatcher).start();
+
+
+
+ speculator.init(conf);
+ speculator.start();
+
+ // Now that the plumbing is hooked up, we do the following:
+ // do until all tasks are finished, ...
+ // 1: If we have spare capacity, assign as many map tasks as we can, then
+ // assign as many reduce tasks as we can. Note that an odd reduce
+ // task might be started while there are still map tasks, because
+ // map tasks take 3 slots and reduce tasks 2 slots.
+ // 2: Send a speculation event for every task attempt that's running
+ // note that new attempts might get started by the speculator
+
+ // discover undone tasks
+ int undoneMaps = MAP_TASKS;
+ int undoneReduces = REDUCE_TASKS;
+
+ // build a task sequence where all the maps precede any of the reduces
+ List<Task> allTasksSequence = new LinkedList<Task>();
+
+ allTasksSequence.addAll(myJob.getTasks(TaskType.MAP).values());
+ allTasksSequence.addAll(myJob.getTasks(TaskType.REDUCE).values());
+
+ while (undoneMaps + undoneReduces > 0) {
+ undoneMaps = 0; undoneReduces = 0;
+ // start all attempts which are new but for which there is enough slots
+ for (Task task : allTasksSequence) {
+ if (!task.isFinished()) {
+ if (task.getType() == TaskType.MAP) {
+ ++undoneMaps;
+ } else {
+ ++undoneReduces;
+ }
+ }
+ for (TaskAttempt attempt : task.getAttempts().values()) {
+ if (attempt.getState() == TaskAttemptState.NEW
+ && INITIAL_NUMBER_FREE_SLOTS - slotsInUse.get()
+ >= taskTypeSlots(task.getType())) {
+ MyTaskAttemptImpl attemptImpl = (MyTaskAttemptImpl)attempt;
+ SpeculatorEvent event
+ = new SpeculatorEvent(attempt.getID(), false, clock.getTime());
+ speculator.handle(event);
+ attemptImpl.startUp();
+ } else {
+ // If a task attempt is in progress we should send the news to
+ // the Speculator.
+ TaskAttemptStatus status = new TaskAttemptStatus();
+ status.id = attempt.getID();
+ status.progress = attempt.getProgress();
+ status.stateString = attempt.getState().name();
+ SpeculatorEvent event = new SpeculatorEvent(status, clock.getTime());
+ speculator.handle(event);
+ }
+ }
+ }
+
+ long startTime = System.currentTimeMillis();
+
+ // drain the speculator event queue
+ while (!speculator.eventQueueEmpty()) {
+ Thread.yield();
+ if (System.currentTimeMillis() > startTime + 130000) {
+ return;
+ }
+ }
+
+ ((MockClock) clock).advanceTime(1000L);
+
+ if (clock.getTime() % 10000L == 0L) {
+ speculator.scanForSpeculations();
+ }
+ }
+
+ Assert.assertEquals("We got the wrong number of successful speculations.",
+ expectedSpeculations, successfulSpeculations.get());
+ }
+
+ @Test
+ public void testLegacyEstimator() throws Exception {
+ clock = new MockClock();
+ TaskRuntimeEstimator specificEstimator = new LegacyTaskRuntimeEstimator();
+ Configuration conf = new Configuration();
+ dispatcher = new AsyncDispatcher();
+ myAppContext = new MyAppContext(MAP_TASKS, REDUCE_TASKS);
+
+ coreTestEstimator(specificEstimator, 3);
+ }
+
+ @Test
+ public void testExponentialEstimator() throws Exception {
+ clock = new MockClock();
+ TaskRuntimeEstimator specificEstimator
+ = new ExponentiallySmoothedTaskRuntimeEstimator();
+ Configuration conf = new Configuration();
+ dispatcher = new AsyncDispatcher();
+ myAppContext = new MyAppContext(MAP_TASKS, REDUCE_TASKS);
+
+ coreTestEstimator(new LegacyTaskRuntimeEstimator(), 3);
+ }
+
+ int taskTypeSlots(TaskType type) {
+ return type == TaskType.MAP ? MAP_SLOT_REQUIREMENT : REDUCE_SLOT_REQUIREMENT;
+ }
+
+ private boolean jobComplete() {
+ for (Task task : myJob.getTasks().values()) {
+ if (!task.isFinished()) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ private int slotsInUse(int mapSize, int reduceSize) {
+ return slotsInUse.get();
+ }
+
+ class SpeculationRequestEventHandler implements EventHandler<TaskEvent> {
+
+ @Override
+ public void handle(TaskEvent event) {
+ TaskID taskID = event.getTaskID();
+ Task task = myJob.getTask(taskID);
+
+ Assert.assertEquals
+ ("Wrong type event", TaskEventType.T_ADD_SPEC_ATTEMPT, event.getType());
+
+ System.out.println("SpeculationRequestEventHandler.handle adds a speculation task for " + taskID);
+
+ addAttempt(task);
+ }
+ }
+
+ void addAttempt(Task task) {
+ MyTaskImpl myTask = (MyTaskImpl) task;
+
+ myTask.addAttempt();
+ }
+
+ class MyTaskImpl implements Task {
+ private final TaskID taskID;
+ private final Map<TaskAttemptID, TaskAttempt> attempts
+ = new HashMap<TaskAttemptID, TaskAttempt>(4);
+
+ MyTaskImpl(JobID jobID, int index, TaskType type) {
+ taskID = new TaskID();
+ taskID.id = index;
+ taskID.taskType = type;
+ taskID.jobID = jobID;
+ }
+
+ void addAttempt() {
+ TaskAttempt taskAttempt
+ = new MyTaskAttemptImpl(taskID, attempts.size(), clock);
+ TaskAttemptID taskAttemptID = taskAttempt.getID();
+
+ attempts.put(taskAttemptID, taskAttempt);
+
+ System.out.println("TLTRE.MyTaskImpl.addAttempt " + getID());
+
+ SpeculatorEvent event = new SpeculatorEvent(taskID, +1);
+ dispatcher.getEventHandler().handle(event);
+ }
+
+ @Override
+ public TaskID getID() {
+ return taskID;
+ }
+
+ @Override
+ public TaskReport getReport() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public Counters getCounters() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public float getProgress() {
+ float result = 0.0F;
+
+
+ for (TaskAttempt attempt : attempts.values()) {
+ result = Math.max(result, attempt.getProgress());
+ }
+
+ return result;
+ }
+
+ @Override
+ public TaskType getType() {
+ return taskID.taskType;
+ }
+
+ @Override
+ public Map<TaskAttemptID, TaskAttempt> getAttempts() {
+ Map<TaskAttemptID, TaskAttempt> result
+ = new HashMap<TaskAttemptID, TaskAttempt>(attempts.size());
+ result.putAll(attempts);
+ return result;
+ }
+
+ @Override
+ public TaskAttempt getAttempt(TaskAttemptID attemptID) {
+ return attempts.get(attemptID);
+ }
+
+ @Override
+ public boolean isFinished() {
+ for (TaskAttempt attempt : attempts.values()) {
+ if (attempt.getState() == TaskAttemptState.SUCCEEDED) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public boolean canCommit(TaskAttemptID taskAttemptID) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public TaskState getState() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ }
+
+ class MyJobImpl implements Job {
+ private final JobID jobID;
+ private final Map<TaskID, Task> allTasks = new HashMap<TaskID, Task>();
+ private final Map<TaskID, Task> mapTasks = new HashMap<TaskID, Task>();
+ private final Map<TaskID, Task> reduceTasks = new HashMap<TaskID, Task>();
+
+ MyJobImpl(JobID jobID, int numMaps, int numReduces) {
+ this.jobID = jobID;
+ for (int i = 0; i < numMaps; ++i) {
+ Task newTask = new MyTaskImpl(jobID, i, TaskType.MAP);
+ mapTasks.put(newTask.getID(), newTask);
+ allTasks.put(newTask.getID(), newTask);
+ }
+ for (int i = 0; i < numReduces; ++i) {
+ Task newTask = new MyTaskImpl(jobID, i, TaskType.REDUCE);
+ reduceTasks.put(newTask.getID(), newTask);
+ allTasks.put(newTask.getID(), newTask);
+ }
+
+ // give every task an attempt
+ for (Task task : allTasks.values()) {
+ MyTaskImpl myTaskImpl = (MyTaskImpl) task;
+ myTaskImpl.addAttempt();
+ }
+ }
+
+ @Override
+ public JobID getID() {
+ return jobID;
+ }
+
+ @Override
+ public JobState getState() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public JobReport getReport() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public Counters getCounters() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public Map<TaskID, Task> getTasks() {
+ return allTasks;
+ }
+
+ @Override
+ public Map<TaskID, Task> getTasks(TaskType taskType) {
+ return taskType == TaskType.MAP ? mapTasks : reduceTasks;
+ }
+
+ @Override
+ public Task getTask(TaskID taskID) {
+ return allTasks.get(taskID);
+ }
+
+ @Override
+ public List<String> getDiagnostics() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public int getCompletedMaps() {
+ return completedMaps.get();
+ }
+
+ @Override
+ public int getCompletedReduces() {
+ return completedReduces.get();
+ }
+
+ @Override
+ public TaskAttemptCompletionEvent[]
+ getTaskAttemptCompletionEvents(int fromEventId, int maxEvents) {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public CharSequence getName() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public int getTotalMaps() {
+ return mapTasks.size();
+ }
+
+ @Override
+ public int getTotalReduces() {
+ return reduceTasks.size();
+ }
+ }
+
+ /*
+ * We follow the pattern of the real XxxImpl . We create a job and initialize
+ * it with a full suite of tasks which in turn have one attempt each in the
+ * NEW state. Attempts transition only from NEW to RUNNING to SUCCEEDED .
+ */
+ class MyTaskAttemptImpl implements TaskAttempt {
+ private final TaskAttemptID myAttemptID;
+
+ long startMockTime = Long.MIN_VALUE;
+
+ long shuffleCompletedTime = Long.MAX_VALUE;
+
+ TaskAttemptState overridingState = TaskAttemptState.NEW;
+
+ MyTaskAttemptImpl(TaskID taskID, int index, Clock clock) {
+ myAttemptID = new TaskAttemptID();
+ myAttemptID.id = index;
+ myAttemptID.taskID = taskID;
+ }
+
+ void startUp() {
+ startMockTime = clock.getTime();
+ overridingState = null;
+
+ slotsInUse.addAndGet(taskTypeSlots(myAttemptID.taskID.taskType));
+
+ System.out.println("TLTRE.MyTaskAttemptImpl.startUp starting " + getID());
+
+ SpeculatorEvent event = new SpeculatorEvent(getID().taskID, -1);
+ dispatcher.getEventHandler().handle(event);
+ }
+
+ @Override
+ public TaskAttemptID getID() {
+ return myAttemptID;
+ }
+
+ @Override
+ public TaskAttemptReport getReport() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public List<CharSequence> getDiagnostics() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public Counters getCounters() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ private float getCodeRuntime() {
+ int taskIndex = myAttemptID.taskID.id;
+ int attemptIndex = myAttemptID.id;
+
+ float result = 200.0F;
+
+ switch (taskIndex % 4) {
+ case 0:
+ if (taskIndex % 40 == 0 && attemptIndex == 0) {
+ result = 600.0F;
+ break;
+ }
+
+ break;
+ case 2:
+ break;
+
+ case 1:
+ result = 150.0F;
+ break;
+
+ case 3:
+ result = 250.0F;
+ break;
+ }
+
+ return result;
+ }
+
+ private float getMapProgress() {
+ float runtime = getCodeRuntime();
+
+ return Math.min
+ ((float) (clock.getTime() - startMockTime) / (runtime * 1000.0F), 1.0F);
+ }
+
+ private float getReduceProgress() {
+ Job job = myAppContext.getJob(myAttemptID.taskID.jobID);
+ float runtime = getCodeRuntime();
+
+ Collection<Task> allMapTasks = job.getTasks(TaskType.MAP).values();
+
+ int numberMaps = allMapTasks.size();
+ int numberDoneMaps = 0;
+
+ for (Task mapTask : allMapTasks) {
+ if (mapTask.isFinished()) {
+ ++numberDoneMaps;
+ }
+ }
+
+ if (numberMaps == numberDoneMaps) {
+ shuffleCompletedTime = Math.min(shuffleCompletedTime, clock.getTime());
+
+ return Math.min
+ ((float) (clock.getTime() - shuffleCompletedTime)
+ / (runtime * 2000.0F) + 0.5F,
+ 1.0F);
+ } else {
+ return ((float) numberDoneMaps) / numberMaps * 0.5F;
+ }
+ }
+
+ // we compute progress from time and an algorithm now
+ @Override
+ public float getProgress() {
+ if (overridingState == TaskAttemptState.NEW) {
+ return 0.0F;
+ }
+ return myAttemptID.taskID.taskType == TaskType.MAP ? getMapProgress() : getReduceProgress();
+ }
+
+ @Override
+ public TaskAttemptState getState() {
+ if (overridingState != null) {
+ return overridingState;
+ }
+ TaskAttemptState result
+ = getProgress() < 1.0F ? TaskAttemptState.RUNNING : TaskAttemptState.SUCCEEDED;
+
+ if (result == TaskAttemptState.SUCCEEDED) {
+ overridingState = TaskAttemptState.SUCCEEDED;
+
+ System.out.println("MyTaskAttemptImpl.getState() -- attempt " + myAttemptID + " finished.");
+
+ slotsInUse.addAndGet(- taskTypeSlots(myAttemptID.taskID.taskType));
+
+ (myAttemptID.taskID.taskType == TaskType.MAP
+ ? completedMaps : completedReduces).getAndIncrement();
+
+ // check for a spectacularly successful speculation
+ TaskID taskID = myAttemptID.taskID;
+ Task undoneTask = null;
+
+ Task task = myJob.getTask(taskID);
+
+ for (TaskAttempt otherAttempt : task.getAttempts().values()) {
+ if (otherAttempt != this
+ && otherAttempt.getState() == TaskAttemptState.RUNNING) {
+ // we had two instances running. Try to determine how much
+ // we might have saved by speculation
+ if (getID().id > otherAttempt.getID().id) {
+ // the speculation won
+ successfulSpeculations.getAndIncrement();
+ float hisProgress = otherAttempt.getProgress();
+ long hisStartTime = ((MyTaskAttemptImpl)otherAttempt).startMockTime;
+ System.out.println("TLTRE:A speculation finished at time "
+ + clock.getTime()
+ + ". The stalled attempt is at " + (hisProgress * 100.0)
+ + "% progress, and it started at "
+ + hisStartTime + ", which is "
+ + (clock.getTime() - hisStartTime) + " ago.");
+ long originalTaskEndEstimate
+ = (hisStartTime
+ + estimator.estimatedRuntime(otherAttempt.getID()));
+ System.out.println(
+ "TLTRE: We would have expected the original attempt to take "
+ + estimator.estimatedRuntime(otherAttempt.getID())
+ + ", finishing at " + originalTaskEndEstimate);
+ long estimatedSavings = originalTaskEndEstimate - clock.getTime();
+ taskTimeSavedBySpeculation.addAndGet(estimatedSavings);
+ System.out.println("TLTRE: The task is " + task.getID());
+ slotsInUse.addAndGet(- taskTypeSlots(myAttemptID.taskID.taskType));
+ ((MyTaskAttemptImpl)otherAttempt).overridingState
+ = TaskAttemptState.KILLED;
+ } else {
+ System.out.println(
+ "TLTRE: The normal attempt beat the speculation in "
+ + task.getID());
+ }
+ }
+ }
+ }
+
+ return result;
+ }
+
+ @Override
+ public boolean isFinished() {
+ return getProgress() == 1.0F;
+ }
+
+ @Override
+ public ContainerID getAssignedContainerID() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public String getAssignedContainerMgrAddress() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public long getLaunchTime() {
+ return startMockTime;
+ }
+
+ @Override
+ public long getFinishTime() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+ }
+
+ static class MockClock extends Clock {
+ private long currentTime = 0;
+
+ @Override
+ long getMeasuredTime() {
+ return currentTime;
+ }
+
+ void setMeasuredTime(long newTime) {
+ currentTime = newTime;
+ }
+
+ void advanceTime(long increment) {
+ currentTime += increment;
+ }
+ }
+
+ class MyAppMaster extends CompositeService {
+ final Clock clock;
+ public MyAppMaster(Clock clock) {
+ super(MyAppMaster.class.getName());
+ if (clock == null) {
+ clock = new Clock();
+ }
+ this.clock = clock;
+ LOG.info("Created MyAppMaster");
+ }
+ }
+
+ class MyAppContext implements AppContext {
+ // I'll be making Avro objects by hand. Please don't do that very often.
+
+ private final ApplicationID myApplicationID;
+ private final JobID myJobID;
+ private final Map<JobID, Job> allJobs;
+
+ MyAppContext(int numberMaps, int numberReduces) {
+ myApplicationID = new ApplicationID();
+ myApplicationID.clusterTimeStamp = clock.getTime();
+ myApplicationID.id = 1;
+
+ myJobID = new JobID();
+ myJobID.appID = myApplicationID;
+
+ Job myJob
+ = new MyJobImpl(myJobID, numberMaps, numberReduces);
+
+ allJobs = Collections.singletonMap(myJobID, myJob);
+ }
+
+ @Override
+ public ApplicationID getApplicationID() {
+ return myApplicationID;
+ }
+
+ @Override
+ public Job getJob(JobID jobID) {
+ return allJobs.get(jobID);
+ }
+
+ @Override
+ public Map<JobID, Job> getAllJobs() {
+ return allJobs;
+ }
+
+ @Override
+ public EventHandler getEventHandler() {
+ return dispatcher.getEventHandler();
+ }
+
+ @Override
+ public CharSequence getUser() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/webapp/TestAMWebApp.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,117 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.webapp;
+
+import com.google.inject.Injector;
+import java.util.Map;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MockJobs;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp;
+import org.apache.hadoop.mapreduce.v2.app.webapp.AppController;
+import org.apache.hadoop.mapreduce.v2.app.webapp.AppView;
+import org.apache.hadoop.mapreduce.v2.app.webapp.JobPage;
+import org.apache.hadoop.mapreduce.v2.app.webapp.TaskPage;
+import org.apache.hadoop.mapreduce.v2.app.webapp.TasksPage;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.webapp.WebApps;
+import org.apache.hadoop.yarn.webapp.test.WebAppTests;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+
+import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*;
+
+public class TestAMWebApp {
+
+ static class TestAppContext implements AppContext {
+ final ApplicationID appID;
+ final String user = MockJobs.newUserName();
+ final Map<JobID, Job> jobs;
+
+ TestAppContext(int appid, int numJobs, int numTasks, int numAttempts) {
+ appID = MockJobs.newAppID(appid);
+ jobs = MockJobs.newJobs(appID, numJobs, numTasks, numAttempts);
+ }
+
+ TestAppContext() {
+ this(0, 1, 1, 1);
+ }
+
+ @Override
+ public ApplicationID getApplicationID() {
+ return appID;
+ }
+
+ @Override
+ public CharSequence getUser() {
+ return user;
+ }
+
+ @Override
+ public Job getJob(JobID jobID) {
+ return jobs.get(jobID);
+ }
+
+ @Override
+ public Map<JobID, Job> getAllJobs() {
+ return jobs; // OK
+ }
+
+ @Override
+ public EventHandler getEventHandler() {
+ return null;
+ }
+ }
+
+ @Test public void testAppControllerIndex() {
+ TestAppContext ctx = new TestAppContext();
+ Injector injector = WebAppTests.createMockInjector(AppContext.class, ctx);
+ AppController controller = injector.getInstance(AppController.class);
+ controller.index();
+ assertEquals(Apps.toString(ctx.appID), controller.get(APP_ID,""));
+ }
+
+ @Test public void testAppView() {
+ WebAppTests.testPage(AppView.class, AppContext.class, new TestAppContext());
+ }
+
+ @Test public void testJobView() {
+ WebAppTests.testPage(JobPage.class, AppContext.class, new TestAppContext());
+ }
+
+ @Test public void testTasksView() {
+ WebAppTests.testPage(TasksPage.class, AppContext.class,
+ new TestAppContext());
+ }
+
+ @Test public void testTaskView() {
+ WebAppTests.testPage(TaskPage.class, AppContext.class,
+ new TestAppContext());
+ }
+
+ public static void main(String[] args) {
+ WebApps.$for("yarn", AppContext.class, new TestAppContext(0, 8, 88, 4)).
+ at(58888).inDevMode().start(new AMWebApp()).joinThread();
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/pom.xml?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/pom.xml (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/pom.xml Thu Mar 17 20:21:13 2011
@@ -0,0 +1,63 @@
+<?xml version="1.0"?><project>
+ <parent>
+ <artifactId>hadoop-mapreduce-client</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <version>${yarn.version}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ <name>hadoop-mapreduce-client-common</name>
+ <version>${yarn.version}</version>
+ <url>http://maven.apache.org</url>
+
+ <dependencies>
+ <!-- begin MNG-4223 workaround -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>yarn</artifactId>
+ <version>${yarn.version}</version>
+ <type>pom</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>yarn-api</artifactId>
+ <version>${yarn.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>yarn-server</artifactId>
+ <version>${yarn.version}</version>
+ <type>pom</type>
+ </dependency>
+ <!-- end MNG-4223 workaround -->
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>yarn-common</artifactId>
+ <version>${yarn.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <version>${yarn.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>1.4.0-SNAPSHOT</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/avro/MRClientProtocol.genavro Thu Mar 17 20:21:13 2011
@@ -0,0 +1,151 @@
+@namespace("org.apache.hadoop.mapreduce.v2.api")
+protocol MRClientProtocol {
+
+ import idl "./yarn/yarn-api/src/main/avro/yarn-types.genavro";
+
+ enum TaskType {
+ MAP,
+ REDUCE
+ }
+
+ record JobID {
+ org.apache.hadoop.yarn.ApplicationID appID;
+ int id;
+ }
+
+ record TaskID {
+ JobID jobID;
+ TaskType taskType;
+ int id;
+ }
+
+ record TaskAttemptID {
+ TaskID taskID;
+ int id;
+ }
+
+ enum TaskState {
+ NEW,
+ SCHEDULED,
+ RUNNING,
+ SUCCEEDED,
+ FAILED,
+ KILL_WAIT,
+ KILLED
+ }
+
+ enum Phase {
+ STARTING,
+ MAP,
+ SHUFFLE,
+ SORT,
+ REDUCE,
+ CLEANUP
+ }
+
+ record Counter {
+ string name;
+ string displayName;
+ long value;
+ }
+
+ record CounterGroup {
+ string name;
+ string displayname;
+ map<Counter> counters;
+ }
+
+ record Counters {
+ map<CounterGroup> groups;
+ }
+
+ record TaskReport {
+ TaskID id;
+ TaskState state;
+ float progress;
+ long startTime;
+ long finishTime;
+ Counters counters;
+ array<TaskAttemptID> runningAttempts;
+ union{TaskAttemptID, null} successfulAttempt;
+ array<string> diagnostics;
+ }
+
+ enum TaskAttemptState {
+ NEW,
+ UNASSIGNED,
+ ASSIGNED,
+ RUNNING,
+ COMMIT_PENDING,
+ SUCCESS_CONTAINER_CLEANUP,
+ SUCCEEDED,
+ FAIL_CONTAINER_CLEANUP,
+ FAIL_TASK_CLEANUP,
+ FAILED,
+ KILL_CONTAINER_CLEANUP,
+ KILL_TASK_CLEANUP,
+ KILLED
+ }
+
+ record TaskAttemptReport {
+ TaskAttemptID id;
+ TaskAttemptState state;
+ float progress;
+ long startTime;
+ long finishTime;
+ Counters counters;
+ string diagnosticInfo;
+ string stateString;
+ Phase phase;
+ }
+
+ enum JobState {
+ NEW,
+ RUNNING,
+ SUCCEEDED,
+ FAILED,
+ KILL_WAIT,
+ KILLED,
+ ERROR
+ }
+
+ record JobReport {
+ JobID id;
+ JobState state;
+ float mapProgress;
+ float reduceProgress;
+ float cleanupProgress;
+ float setupProgress;
+ long startTime;
+ long finishTime;
+ }
+
+ enum TaskAttemptCompletionEventStatus {
+ FAILED,
+ KILLED,
+ SUCCEEDED,
+ OBSOLETE,
+ TIPFAILED
+ }
+
+ record TaskAttemptCompletionEvent {
+ TaskAttemptID attemptId;
+ TaskAttemptCompletionEventStatus status;
+ string mapOutputServerAddress;
+ int attemptRunTime;
+ int eventId;
+ }
+
+ JobReport getJobReport(JobID jobID) throws org.apache.hadoop.yarn.YarnRemoteException;
+ TaskReport getTaskReport(TaskID taskID) throws org.apache.hadoop.yarn.YarnRemoteException;
+ TaskAttemptReport getTaskAttemptReport(TaskAttemptID taskAttemptID) throws org.apache.hadoop.yarn.YarnRemoteException;
+ Counters getCounters(JobID jobID) throws org.apache.hadoop.yarn.YarnRemoteException;
+ array<TaskAttemptCompletionEvent> getTaskAttemptCompletionEvents(JobID jobID, int fromEventId, int maxEvents) throws org.apache.hadoop.yarn.YarnRemoteException;
+ array<TaskReport> getTaskReports(JobID jobID, TaskType taskType) throws org.apache.hadoop.yarn.YarnRemoteException;
+ array<string> getDiagnostics(TaskAttemptID taskAttemptID) throws org.apache.hadoop.yarn.YarnRemoteException;
+
+ void killJob(JobID jobID) throws org.apache.hadoop.yarn.YarnRemoteException;
+ void killTask(TaskID taskID) throws org.apache.hadoop.yarn.YarnRemoteException;
+ void killTaskAttempt(TaskAttemptID taskAttemptID) throws org.apache.hadoop.yarn.YarnRemoteException;
+
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2;
+
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class YarnMRJobConfig {
+ public static final String SPECULATOR_CLASS
+ = "yarn.mapreduce.job.speculator.class";
+ public static final String TASK_RUNTIME_ESTIMATOR_CLASS
+ = "yarn.mapreduce.job.task.runtime.estimator.class";
+ public static final String TASK_ATTEMPT_PROGRESS_RUNTIME_LINEARIZER_CLASS
+ = "yarn.mapreduce.job.task.runtime.linearizer.class";
+ public static final String EXPONENTIAL_SMOOTHING_LAMBDA_MILLISECONDS
+ = "yarn.mapreduce.job.task.runtime.estimator.exponential.smooth.lambda";
+ public static final String EXPONENTIAL_SMOOTHING_SMOOTH_RATE
+ = "yarn.mapreduce.job.task.runtime.estimator.exponential.smooth.smoothsrate";
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/lib/TypeConverter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/lib/TypeConverter.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/lib/TypeConverter.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/lib/TypeConverter.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,308 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.lib;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.mapred.JobPriority;
+import org.apache.hadoop.mapred.TIPStatus;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
+import org.apache.hadoop.mapreduce.TaskReport;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.mapreduce.v2.api.Counter;
+import org.apache.hadoop.mapreduce.v2.api.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.JobState;
+import org.apache.hadoop.mapreduce.v2.api.Phase;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+public class TypeConverter {
+
+ public static org.apache.hadoop.mapred.JobID fromYarn(JobID id) {
+ String identifier = fromClusterTimeStamp(id.appID.clusterTimeStamp);
+ return new org.apache.hadoop.mapred.JobID(identifier, id.id);
+ }
+
+ //currently there is 1-1 mapping between appid and jobid
+ public static org.apache.hadoop.mapreduce.JobID fromYarn(ApplicationID appID) {
+ String identifier = fromClusterTimeStamp(appID.clusterTimeStamp);
+ return new org.apache.hadoop.mapred.JobID(identifier, appID.id);
+ }
+
+ public static JobID toYarn(org.apache.hadoop.mapreduce.JobID id) {
+ JobID jobID = new JobID();
+ jobID.id = id.getId(); //currently there is 1-1 mapping between appid and jobid
+ jobID.appID = new ApplicationID();
+ jobID.appID.id = id.getId();
+ jobID.appID.clusterTimeStamp = toClusterTimeStamp(id.getJtIdentifier());
+ return jobID;
+ }
+
+ private static String fromClusterTimeStamp(long clusterTimeStamp) {
+ return Long.toString(clusterTimeStamp);
+ }
+
+ private static long toClusterTimeStamp(String identifier) {
+ return Long.parseLong(identifier);
+ }
+
+ public static org.apache.hadoop.mapreduce.TaskType fromYarn(
+ TaskType taskType) {
+ switch (taskType) {
+ case MAP:
+ return org.apache.hadoop.mapreduce.TaskType.MAP;
+ case REDUCE:
+ return org.apache.hadoop.mapreduce.TaskType.REDUCE;
+ default:
+ throw new YarnException("Unrecognized task type: " + taskType);
+ }
+ }
+
+ public static TaskType
+ toYarn(org.apache.hadoop.mapreduce.TaskType taskType) {
+ switch (taskType) {
+ case MAP:
+ return TaskType.MAP;
+ case REDUCE:
+ return TaskType.REDUCE;
+ default:
+ throw new YarnException("Unrecognized task type: " + taskType);
+ }
+ }
+
+ public static org.apache.hadoop.mapred.TaskID fromYarn(TaskID id) {
+ return new org.apache.hadoop.mapred.TaskID(fromYarn(id.jobID), fromYarn(id.taskType),
+ id.id);
+ }
+
+ public static TaskID toYarn(org.apache.hadoop.mapreduce.TaskID id) {
+ TaskID taskID = new TaskID();
+ taskID.id = id.getId();
+ taskID.taskType = toYarn(id.getTaskType());
+ taskID.jobID = toYarn(id.getJobID());
+ return taskID;
+ }
+
+ public static Phase toYarn(org.apache.hadoop.mapred.TaskStatus.Phase phase) {
+ switch (phase) {
+ case STARTING:
+ return Phase.STARTING;
+ case MAP:
+ return Phase.MAP;
+ case SHUFFLE:
+ return Phase.SHUFFLE;
+ case SORT:
+ return Phase.SORT;
+ case REDUCE:
+ return Phase.REDUCE;
+ case CLEANUP:
+ return Phase.CLEANUP;
+ }
+ throw new YarnException("Unrecognized Phase: " + phase);
+ }
+
+ public static TaskCompletionEvent[] fromYarn(
+ org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent[] newEvents) {
+ TaskCompletionEvent[] oldEvents =
+ new TaskCompletionEvent[newEvents.length];
+ int i = 0;
+ for (org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent newEvent
+ : newEvents) {
+ oldEvents[i++] = fromYarn(newEvent);
+ }
+ return oldEvents;
+ }
+
+ public static TaskCompletionEvent fromYarn(
+ org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent newEvent) {
+ return new TaskCompletionEvent(newEvent.eventId,
+ fromYarn(newEvent.attemptId), newEvent.attemptId.id,
+ newEvent.attemptId.taskID.taskType.equals(TaskType.MAP),
+ fromYarn(newEvent.status),
+ newEvent.mapOutputServerAddress.toString());
+ }
+
+ public static TaskCompletionEvent.Status fromYarn(
+ TaskAttemptCompletionEventStatus newStatus) {
+ switch (newStatus) {
+ case FAILED:
+ return TaskCompletionEvent.Status.FAILED;
+ case KILLED:
+ return TaskCompletionEvent.Status.KILLED;
+ case OBSOLETE:
+ return TaskCompletionEvent.Status.OBSOLETE;
+ case SUCCEEDED:
+ return TaskCompletionEvent.Status.SUCCEEDED;
+ case TIPFAILED:
+ return TaskCompletionEvent.Status.TIPFAILED;
+ }
+ throw new YarnException("Unrecognized status: " + newStatus);
+ }
+
+ public static org.apache.hadoop.mapred.TaskAttemptID fromYarn(
+ TaskAttemptID id) {
+ return new org.apache.hadoop.mapred.TaskAttemptID(fromYarn(id.taskID),
+ id.id);
+ }
+
+ public static TaskAttemptID toYarn(
+ org.apache.hadoop.mapred.TaskAttemptID id) {
+ TaskAttemptID taskAttemptID = new TaskAttemptID();
+ taskAttemptID.taskID = toYarn(id.getTaskID());
+ taskAttemptID.id = id.getId();
+ return taskAttemptID;
+ }
+
+ public static TaskAttemptID toYarn(
+ org.apache.hadoop.mapreduce.TaskAttemptID id) {
+ TaskAttemptID taskAttemptID = new TaskAttemptID();
+ taskAttemptID.taskID = toYarn(id.getTaskID());
+ taskAttemptID.id = id.getId();
+ return taskAttemptID;
+ }
+
+ public static org.apache.hadoop.mapreduce.Counters fromYarn(
+ Counters yCntrs) {
+ org.apache.hadoop.mapreduce.Counters counters =
+ new org.apache.hadoop.mapreduce.Counters();
+ for (CounterGroup yGrp : yCntrs.groups.values()) {
+ for (Counter yCntr : yGrp.counters.values()) {
+ org.apache.hadoop.mapreduce.Counter c =
+ counters.findCounter(yGrp.displayname.toString(),
+ yCntr.displayName.toString());
+ c.setValue(yCntr.value);
+ }
+ }
+ return counters;
+ }
+
+ public static Counters toYarn(org.apache.hadoop.mapred.Counters counters) {
+ Counters yCntrs = new Counters();
+ yCntrs.groups = new HashMap<CharSequence, CounterGroup>();
+ for (org.apache.hadoop.mapred.Counters.Group grp : counters) {
+ CounterGroup yGrp = new CounterGroup();
+ yGrp.name = grp.getName();
+ yGrp.displayname = grp.getDisplayName();
+ yGrp.counters = new HashMap<CharSequence, Counter>();
+ for (org.apache.hadoop.mapred.Counters.Counter cntr : grp) {
+ Counter yCntr = new Counter();
+ yCntr.name = cntr.getName();
+ yCntr.displayName = cntr.getDisplayName();
+ yCntr.value = cntr.getValue();
+ yGrp.counters.put(yCntr.name, yCntr);
+ }
+ yCntrs.groups.put(yGrp.name, yGrp);
+ }
+ return yCntrs;
+ }
+
+ public static Counters toYarn(org.apache.hadoop.mapreduce.Counters counters) {
+ Counters yCntrs = new Counters();
+ yCntrs.groups = new HashMap<CharSequence, CounterGroup>();
+ for (org.apache.hadoop.mapreduce.CounterGroup grp : counters) {
+ CounterGroup yGrp = new CounterGroup();
+ yGrp.name = grp.getName();
+ yGrp.displayname = grp.getDisplayName();
+ yGrp.counters = new HashMap<CharSequence, Counter>();
+ for (org.apache.hadoop.mapreduce.Counter cntr : grp) {
+ Counter yCntr = new Counter();
+ yCntr.name = cntr.getName();
+ yCntr.displayName = cntr.getDisplayName();
+ yCntr.value = cntr.getValue();
+ yGrp.counters.put(yCntr.name, yCntr);
+ }
+ yCntrs.groups.put(yGrp.name, yGrp);
+ }
+ return yCntrs;
+ }
+
+ public static org.apache.hadoop.mapred.JobStatus fromYarn(
+ JobReport jobreport, String jobFile, String trackingUrl) {
+ String user = null, jobName = null;
+ JobPriority jobPriority = JobPriority.NORMAL;
+ return new org.apache.hadoop.mapred.JobStatus(fromYarn(jobreport.id),
+ jobreport.setupProgress, jobreport.mapProgress,
+ jobreport.reduceProgress, jobreport.cleanupProgress,
+ fromYarn(jobreport.state),
+ jobPriority, user, jobName, jobFile, trackingUrl);
+ }
+
+ public static int fromYarn(JobState state) {
+ switch (state) {
+ case NEW:
+ return org.apache.hadoop.mapred.JobStatus.PREP;
+ case RUNNING:
+ return org.apache.hadoop.mapred.JobStatus.RUNNING;
+ case KILL_WAIT:
+ case KILLED:
+ return org.apache.hadoop.mapred.JobStatus.KILLED;
+ case SUCCEEDED:
+ return org.apache.hadoop.mapred.JobStatus.SUCCEEDED;
+ case FAILED:
+ case ERROR:
+ return org.apache.hadoop.mapred.JobStatus.FAILED;
+ }
+ throw new YarnException("Unrecognized job state: " + state);
+ }
+
+ public static org.apache.hadoop.mapred.TIPStatus fromYarn(
+ TaskState state) {
+ switch (state) {
+ case NEW:
+ case SCHEDULED:
+ return org.apache.hadoop.mapred.TIPStatus.PENDING;
+ case RUNNING:
+ return org.apache.hadoop.mapred.TIPStatus.RUNNING;
+ case KILL_WAIT:
+ case KILLED:
+ return org.apache.hadoop.mapred.TIPStatus.KILLED;
+ case SUCCEEDED:
+ return org.apache.hadoop.mapred.TIPStatus.COMPLETE;
+ case FAILED:
+ return org.apache.hadoop.mapred.TIPStatus.FAILED;
+ }
+ throw new YarnException("Unrecognized task state: " + state);
+ }
+
+ public static TaskReport fromYarn(org.apache.hadoop.mapreduce.v2.api.TaskReport report) {
+ return new TaskReport(fromYarn(report.id), report.progress, report.state.toString(),
+ (String[]) report.diagnostics.toArray(), fromYarn(report.state), report.startTime, report.finishTime,
+ fromYarn(report.counters));
+ }
+
+ public static List<TaskReport> fromYarn(
+ List<org.apache.hadoop.mapreduce.v2.api.TaskReport> taskReports) {
+ List<TaskReport> reports = new ArrayList<TaskReport>();
+ for (org.apache.hadoop.mapreduce.v2.api.TaskReport r : taskReports) {
+ reports.add(fromYarn(r));
+ }
+ return reports;
+ }
+}
+
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,165 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.util;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+import static org.apache.hadoop.yarn.util.StringHelper.*;
+
+/**
+ * Helper class for MR applications
+ */
+public class MRApps extends Apps {
+ public static final String JOB = "job";
+ public static final String TASK = "task";
+ public static final String ATTEMPT = "attempt";
+
+ public static String toString(JobID jid) {
+ return _join(JOB, jid.appID.clusterTimeStamp, jid.appID.id, jid.id);
+ }
+
+ public static JobID toJobID(String jid) {
+ Iterator<String> it = _split(jid).iterator();
+ return toJobID(JOB, jid, it);
+ }
+
+ // mostly useful for parsing task/attempt id like strings
+ public static JobID toJobID(String prefix, String s, Iterator<String> it) {
+ ApplicationID appID = toAppID(prefix, s, it);
+ shouldHaveNext(prefix, s, it);
+ JobID jobID = new JobID();
+ jobID.appID = appID;
+ jobID.id = Integer.parseInt(it.next());
+ return jobID;
+ }
+
+ public static String toString(TaskID tid) {
+ return _join("task", tid.jobID.appID.clusterTimeStamp, tid.jobID.appID.id,
+ tid.jobID.id, taskSymbol(tid.taskType), tid.id);
+ }
+
+ public static TaskID toTaskID(String tid) {
+ Iterator<String> it = _split(tid).iterator();
+ return toTaskID(TASK, tid, it);
+ }
+
+ public static TaskID toTaskID(String prefix, String s, Iterator<String> it) {
+ JobID jid = toJobID(prefix, s, it);
+ shouldHaveNext(prefix, s, it);
+ TaskID tid = new TaskID();
+ tid.jobID = jid;
+ tid.taskType = taskType(it.next());
+ shouldHaveNext(prefix, s, it);
+ tid.id = Integer.parseInt(it.next());
+ return tid;
+ }
+
+ public static String toString(TaskAttemptID taid) {
+ return _join("attempt", taid.taskID.jobID.appID.clusterTimeStamp,
+ taid.taskID.jobID.appID.id, taid.taskID.jobID.id,
+ taskSymbol(taid.taskID.taskType), taid.taskID.id, taid.id);
+ }
+
+ public static TaskAttemptID toTaskAttemptID(String taid) {
+ Iterator<String> it = _split(taid).iterator();
+ TaskID tid = toTaskID(ATTEMPT, taid, it);
+ shouldHaveNext(ATTEMPT, taid, it);
+ TaskAttemptID taID = new TaskAttemptID();
+ taID.taskID = tid;
+ taID.id = Integer.parseInt(it.next());
+ return taID;
+ }
+
+ public static String taskSymbol(TaskType type) {
+ switch (type) {
+ case MAP: return "m";
+ case REDUCE: return "r";
+ }
+ throw new YarnException("Unknown task type: "+ type.toString());
+ }
+
+ public static TaskType taskType(String symbol) {
+ // JDK 7 supports switch on strings
+ if (symbol.equals("m")) return TaskType.MAP;
+ if (symbol.equals("r")) return TaskType.REDUCE;
+ throw new YarnException("Unknown task symbol: "+ symbol);
+ }
+
+ public static void setInitialClasspath(
+ Map<CharSequence, CharSequence> environment) throws IOException {
+
+ // Get yarn mapreduce-app classpath from generated classpath
+ // Works if compile time env is same as runtime. For e.g. tests.
+ InputStream classpathFileStream =
+ Thread.currentThread().getContextClassLoader()
+ .getResourceAsStream("mrapp-generated-classpath");
+ BufferedReader reader =
+ new BufferedReader(new InputStreamReader(classpathFileStream));
+ addToClassPath(environment, reader.readLine().trim());
+
+ // If runtime env is different.
+ if (System.getenv().get("YARN_HOME") != null) {
+ ShellCommandExecutor exec =
+ new ShellCommandExecutor(new String[] {
+ System.getenv().get("YARN_HOME") + "/bin/yarn",
+ "classpath" });
+ exec.execute();
+ addToClassPath(environment, exec.getOutput().trim());
+ }
+
+ // Get yarn mapreduce-app classpath
+ if (System.getenv().get("HADOOP_MAPRED_HOME")!= null) {
+ ShellCommandExecutor exec =
+ new ShellCommandExecutor(new String[] {
+ System.getenv().get("HADOOP_MAPRED_HOME") + "/bin/mapred",
+ "classpath" });
+ exec.execute();
+ addToClassPath(environment, exec.getOutput().trim());
+ }
+
+ // TODO: Remove duplicates.
+ }
+
+ public static void addToClassPath(
+ Map<CharSequence, CharSequence> environment, String fileName) {
+ CharSequence classpath = environment.get(CLASSPATH);
+ if (classpath == null) {
+ classpath = fileName;
+ } else {
+ classpath = classpath + ":" + fileName;
+ }
+ environment.put(CLASSPATH, classpath);
+ }
+
+ public static final String CLASSPATH = "CLASSPATH";
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,102 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.util;
+
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.ApplicationID;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestMRApps {
+
+ @Test public void testJobIDtoString() {
+ JobID jid = new JobID();
+ jid.appID = new ApplicationID();
+ assertEquals("job_0_0_0", MRApps.toString(jid));
+ }
+
+ @Test public void testToJobID() {
+ JobID jid = MRApps.toJobID("job_1_1_1");
+ assertEquals(1, jid.appID.clusterTimeStamp);
+ assertEquals(1, jid.appID.id);
+ assertEquals(1, jid.id);
+ }
+
+ @Test(expected=YarnException.class) public void testJobIDShort() {
+ MRApps.toJobID("job_0_0");
+ }
+
+ @Test public void testTaskIDtoString() {
+ TaskID tid = new TaskID();
+ tid.jobID = new JobID();
+ tid.jobID.appID = new ApplicationID();
+ tid.taskType = TaskType.MAP;
+ assertEquals("task_0_0_0_m_0", MRApps.toString(tid));
+ tid.taskType = TaskType.REDUCE;
+ assertEquals("task_0_0_0_r_0", MRApps.toString(tid));
+ }
+
+ @Test public void testToTaskID() {
+ TaskID tid = MRApps.toTaskID("task_1_2_3_r_4");
+ assertEquals(1, tid.jobID.appID.clusterTimeStamp);
+ assertEquals(2, tid.jobID.appID.id);
+ assertEquals(3, tid.jobID.id);
+ assertEquals(TaskType.REDUCE, tid.taskType);
+ assertEquals(4, tid.id);
+
+ tid = MRApps.toTaskID("task_1_2_3_m_4");
+ assertEquals(TaskType.MAP, tid.taskType);
+ }
+
+ @Test(expected=YarnException.class) public void testTaskIDShort() {
+ MRApps.toTaskID("task_0_0_0_m");
+ }
+
+ @Test(expected=YarnException.class) public void testTaskIDBadType() {
+ MRApps.toTaskID("task_0_0_0_x_0");
+ }
+
+ @Test public void testTaskAttemptIDtoString() {
+ TaskAttemptID taid = new TaskAttemptID();
+ taid.taskID = new TaskID();
+ taid.taskID.taskType = TaskType.MAP;
+ taid.taskID.jobID = new JobID();
+ taid.taskID.jobID.appID = new ApplicationID();
+ assertEquals("attempt_0_0_0_m_0_0", MRApps.toString(taid));
+ }
+
+ @Test public void testToTaskAttemptID() {
+ TaskAttemptID taid = MRApps.toTaskAttemptID("attempt_0_1_2_m_3_4");
+ assertEquals(0, taid.taskID.jobID.appID.clusterTimeStamp);
+ assertEquals(1, taid.taskID.jobID.appID.id);
+ assertEquals(2, taid.taskID.jobID.id);
+ assertEquals(3, taid.taskID.id);
+ assertEquals(4, taid.id);
+ }
+
+ @Test(expected=YarnException.class) public void testTaskAttemptIDShort() {
+ MRApps.toTaskAttemptID("attempt_0_0_0_m_0");
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/pom.xml?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/pom.xml (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/pom.xml Thu Mar 17 20:21:13 2011
@@ -0,0 +1,36 @@
+<?xml version="1.0"?><project>
+ <parent>
+ <artifactId>hadoop-mapreduce-client</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <version>${yarn.version}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ <name>hadoop-mapreduce-client</name>
+ <version>${yarn.version}</version>
+ <url>http://maven.apache.org</url>
+
+ <properties>
+ <yarn.version>1.0-SNAPSHOT</yarn.version>
+ </properties>
+
+<build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>1.4.0-SNAPSHOT</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 17 20:21:13 2011
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapreduce/jobhistory/Events.avpr:713112
+/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/jobhistory/Events.avpr:776175-785643
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/mapred-queues-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/mapred-queues-default.xml?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/mapred-queues-default.xml (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/mapred-queues-default.xml Thu Mar 17 20:21:13 2011
@@ -0,0 +1,29 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!-- This is the default mapred-queues.xml file that is loaded in the case
+ that the user does not have such a file on their classpath. -->
+<queues>
+ <queue>
+ <name>default</name>
+ <properties>
+ </properties>
+ <state>running</state>
+ <acl-submit-job> </acl-submit-job>
+ <acl-administer-jobs> </acl-administer-jobs>
+ </queue>
+</queues>
\ No newline at end of file
Copied: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java (from r1082666, hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/filecache/DistributedCache.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java?p2=hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java&p1=hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/filecache/DistributedCache.java&r1=1082666&r2=1082677&rev=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/filecache/DistributedCache.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java Thu Mar 17 20:21:13 2011
@@ -24,9 +24,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.filecache.TaskDistributedCacheManager;
-import org.apache.hadoop.mapreduce.filecache.TrackerDistributedCacheManager;
-
/**
* Distribute application-specific large, read-only files efficiently.
*
Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/DistributedCache.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 17 20:21:13 2011
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/filecache/DistributedCache.java:713112
+/hadoop/core/trunk/src/mapred/org/apache/hadoop/filecache/DistributedCache.java:776175-785643
Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/filecache/package-info.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 17 20:21:13 2011
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/filecache/package-info.java:713112
+/hadoop/core/trunk/src/mapred/org/apache/hadoop/filecache/package-info.java:776175-785643
Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/AuditLogger.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 17 20:21:13 2011
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapred/AuditLogger.java:713112
+/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/AuditLogger.java:776175-785643
Copied: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java (from r1082666, hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/BackupStore.java)
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java?p2=hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java&p1=hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/BackupStore.java&r1=1082666&r2=1082677&rev=1082677&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/BackupStore.java (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java Thu Mar 17 20:21:13 2011
@@ -560,7 +560,7 @@ public class BackupStore<K,V> {
private Writer<K,V> createSpillFile() throws IOException {
Path tmp =
- new Path(TaskTracker.OUTPUT + "/backup_" + tid.getId() + "_"
+ new Path(Constants.OUTPUT + "/backup_" + tid.getId() + "_"
+ (spillNumber++) + ".out");
LOG.info("Created file: " + tmp);
Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BackupStore.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 17 20:21:13 2011
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapred/BackupStore.java:713112
+/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BackupStore.java:776175-785643
Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 17 20:21:13 2011
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapred/BasicTypeSorterBase.java:713112
+/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BasicTypeSorterBase.java:776175-785643
Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/BufferSorter.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 17 20:21:13 2011
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapred/BufferSorter.java:713112
+/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/BufferSorter.java:776175-785643
Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/CleanupQueue.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 17 20:21:13 2011
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapred/CleanupQueue.java:713112
+/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java:776175-785643
Propchange: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Clock.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Mar 17 20:21:13 2011
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/java/org/apache/hadoop/mapred/Clock.java:713112
+/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/Clock.java:776175-785643
|