From mapreduce-commits-return-5404-apmail-hadoop-mapreduce-commits-archive=hadoop.apache.org@hadoop.apache.org Wed Feb 13 02:11:59 2013 Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1CD02EA79 for ; Wed, 13 Feb 2013 02:11:59 +0000 (UTC) Received: (qmail 91150 invoked by uid 500); 13 Feb 2013 02:11:58 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 91094 invoked by uid 500); 13 Feb 2013 02:11:58 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 91086 invoked by uid 99); 13 Feb 2013 02:11:58 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Feb 2013 02:11:58 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Feb 2013 02:11:56 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 494F92388900; Wed, 13 Feb 2013 02:11:37 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1445456 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/... Date: Wed, 13 Feb 2013 02:11:37 -0000 To: mapreduce-commits@hadoop.apache.org From: jlowe@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130213021137.494F92388900@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jlowe Date: Wed Feb 13 02:11:36 2013 New Revision: 1445456 URL: http://svn.apache.org/r1445456 Log: MAPREDUCE-4992. AM hangs in RecoveryService when recovering tasks with speculative attempts. Contributed by Robert Parker Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1445456&r1=1445455&r2=1445456&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Feb 13 02:11:36 2013 @@ -712,6 +712,9 @@ Release 0.23.7 - UNRELEASED MAPREDUCE-4458. Warn if java.library.path is used for AM or Task (Robert Parker via jeagles) + MAPREDUCE-4992. AM hangs in RecoveryService when recovering tasks with + speculative attempts (Robert Parker via jlowe) + Release 0.23.6 - UNRELEASED INCOMPATIBLE CHANGES Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1445456&r1=1445455&r2=1445456&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Wed Feb 13 02:11:36 2013 @@ -21,9 +21,12 @@ package org.apache.hadoop.mapreduce.v2.a import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,6 +38,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser; @@ -108,7 +112,7 @@ public class RecoveryService extends Com private JobInfo jobInfo = null; private final Map completedTasks = new HashMap(); - + private final List pendingTaskScheduleEvents = new ArrayList(); @@ -193,6 +197,14 @@ public class RecoveryService extends Com .getAllTasks(); for (TaskInfo taskInfo : taskInfos.values()) { if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) { + Iterator> taskAttemptIterator = + taskInfo.getAllTaskAttempts().entrySet().iterator(); + while (taskAttemptIterator.hasNext()) { + Map.Entry currentEntry = taskAttemptIterator.next(); + if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) { + taskAttemptIterator.remove(); + } + } completedTasks .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo); LOG.info("Read from history task " @@ -215,6 +227,7 @@ public class RecoveryService extends Com JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId); Path histDirPath = FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir)); + LOG.info("Trying file " + histDirPath.toString()); FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf); // read the previous history file historyFile = Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1445456&r1=1445455&r2=1445456&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Wed Feb 13 02:11:36 2013 @@ -50,11 +50,15 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.event.EventHandler; import org.junit.Test; @@ -734,12 +738,173 @@ public class TestRecovery { app.verifyCompleted(); validateOutput(); } - + + /** + * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt + * completely disappears because of failed launch, one attempt gets killed and + * one attempt succeeds. AM crashes after the first tasks finishes and + * recovers completely and succeeds in the second generation. + * + * @throws Exception + */ + @Test + public void testSpeculative() throws Exception { + + int runCount = 0; + long am1StartTimeEst = System.currentTimeMillis(); + MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount); + Configuration conf = new Configuration(); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + long jobStartTime = job.getReport().getStartTime(); + //all maps would be running + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + + Iterator it = job.getTasks().values().iterator(); + Task mapTask1 = it.next(); + Task mapTask2 = it.next(); + Task reduceTask = it.next(); + + // all maps must be running + app.waitForState(mapTask1, TaskState.RUNNING); + app.waitForState(mapTask2, TaskState.RUNNING); + + // Launch a Speculative Task for the first Task + app.getContext().getEventHandler().handle( + new TaskEvent(mapTask1.getID(), TaskEventType.T_ADD_SPEC_ATTEMPT)); + int timeOut = 0; + while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) { + Thread.sleep(1000); + LOG.info("Waiting for next attempt to start"); + } + Iterator t1it = mapTask1.getAttempts().values().iterator(); + TaskAttempt task1Attempt1 = t1it.next(); + TaskAttempt task1Attempt2 = t1it.next(); + TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next(); + + ContainerId t1a2contId = task1Attempt2.getAssignedContainerID(); + + LOG.info(t1a2contId.toString()); + LOG.info(task1Attempt1.getID().toString()); + LOG.info(task1Attempt2.getID().toString()); + + // Launch container for speculative attempt + app.getContext().getEventHandler().handle( + new TaskAttemptContainerLaunchedEvent(task1Attempt2.getID(), runCount)); + + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(task1Attempt1, TaskAttemptState.RUNNING); + app.waitForState(task1Attempt2, TaskAttemptState.RUNNING); + app.waitForState(task2Attempt, TaskAttemptState.RUNNING); + + // reduces must be in NEW state + Assert.assertEquals("Reduce Task state not correct", + TaskState.RUNNING, reduceTask.getReport().getTaskState()); + + //send the done signal to the map 1 attempt 1 + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + task1Attempt1.getID(), + TaskAttemptEventType.TA_DONE)); + + app.waitForState(task1Attempt1, TaskAttemptState.SUCCEEDED); + + //wait for first map task to complete + app.waitForState(mapTask1, TaskState.SUCCEEDED); + long task1StartTime = mapTask1.getReport().getStartTime(); + long task1FinishTime = mapTask1.getReport().getFinishTime(); + + //stop the app + app.stop(); + + //rerun + //in rerun the 1st map will be recovered from previous run + long am2StartTimeEst = System.currentTimeMillis(); + app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount); + conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + //all maps would be running + Assert.assertEquals("No of tasks not correct", + 3, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask1 = it.next(); + mapTask2 = it.next(); + reduceTask = it.next(); + + // first map will be recovered, no need to send done + app.waitForState(mapTask1, TaskState.SUCCEEDED); + + app.waitForState(mapTask2, TaskState.RUNNING); + + task2Attempt = mapTask2.getAttempts().values().iterator().next(); + //before sending the TA_DONE, event make sure attempt has come to + //RUNNING state + app.waitForState(task2Attempt, TaskAttemptState.RUNNING); + + //send the done signal to the 2nd map task + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + mapTask2.getAttempts().values().iterator().next().getID(), + TaskAttemptEventType.TA_DONE)); + + //wait to get it completed + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + //wait for reduce to be running before sending done + app.waitForState(reduceTask, TaskState.RUNNING); + + //send the done signal to the reduce + app.getContext().getEventHandler().handle( + new TaskAttemptEvent( + reduceTask.getAttempts().values().iterator().next().getID(), + TaskAttemptEventType.TA_DONE)); + + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + Assert.assertEquals("Job Start time not correct", + jobStartTime, job.getReport().getStartTime()); + Assert.assertEquals("Task Start time not correct", + task1StartTime, mapTask1.getReport().getStartTime()); + Assert.assertEquals("Task Finish time not correct", + task1FinishTime, mapTask1.getReport().getFinishTime()); + Assert.assertEquals(2, job.getAMInfos().size()); + int attemptNum = 1; + // Verify AMInfo + for (AMInfo amInfo : job.getAMInfos()) { + Assert.assertEquals(attemptNum++, amInfo.getAppAttemptId() + .getAttemptId()); + Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId() + .getApplicationAttemptId()); + Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost()); + Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort()); + Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort()); + } + long am1StartTimeReal = job.getAMInfos().get(0).getStartTime(); + long am2StartTimeReal = job.getAMInfos().get(1).getStartTime(); + Assert.assertTrue(am1StartTimeReal >= am1StartTimeEst + && am1StartTimeReal <= am2StartTimeEst); + Assert.assertTrue(am2StartTimeReal >= am2StartTimeEst + && am2StartTimeReal <= System.currentTimeMillis()); + + } + private void writeBadOutput(TaskAttempt attempt, Configuration conf) throws Exception { TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, TypeConverter.fromYarn(attempt.getID())); - + TextOutputFormat theOutputFormat = new TextOutputFormat(); RecordWriter theRecordWriter = theOutputFormat .getRecordWriter(tContext); Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1445456&r1=1445455&r2=1445456&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original) +++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Wed Feb 13 02:11:36 2013 @@ -246,6 +246,7 @@ public class JobHistoryParser implements attemptInfo.state = StringInterner.weakIntern(event.getState()); attemptInfo.counters = event.getCounters(); attemptInfo.hostname = StringInterner.weakIntern(event.getHostname()); + info.completedTaskAttemptsMap.put(event.getAttemptId(), attemptInfo); } private void handleReduceAttemptFinishedEvent @@ -262,6 +263,7 @@ public class JobHistoryParser implements attemptInfo.hostname = StringInterner.weakIntern(event.getHostname()); attemptInfo.port = event.getPort(); attemptInfo.rackname = StringInterner.weakIntern(event.getRackName()); + info.completedTaskAttemptsMap.put(event.getAttemptId(), attemptInfo); } private void handleMapAttemptFinishedEvent(MapAttemptFinishedEvent event) { @@ -276,6 +278,7 @@ public class JobHistoryParser implements attemptInfo.hostname = StringInterner.weakIntern(event.getHostname()); attemptInfo.port = event.getPort(); attemptInfo.rackname = StringInterner.weakIntern(event.getRackName()); + info.completedTaskAttemptsMap.put(event.getAttemptId(), attemptInfo); } private void handleTaskAttemptFailedEvent( @@ -306,6 +309,7 @@ public class JobHistoryParser implements taskInfo.successfulAttemptId = null; } } + info.completedTaskAttemptsMap.put(event.getTaskAttemptId(), attemptInfo); } private void handleTaskAttemptStartedEvent(TaskAttemptStartedEvent event) { @@ -443,6 +447,7 @@ public class JobHistoryParser implements Map jobACLs; Map tasksMap; + Map completedTaskAttemptsMap; List amInfos; AMInfo latestAmInfo; boolean uberized; @@ -456,6 +461,7 @@ public class JobHistoryParser implements finishedMaps = finishedReduces = 0; username = jobname = jobConfPath = jobQueueName = ""; tasksMap = new HashMap(); + completedTaskAttemptsMap = new HashMap(); jobACLs = new HashMap(); priority = JobPriority.NORMAL; } @@ -530,6 +536,8 @@ public class JobHistoryParser implements public Counters getReduceCounters() { return reduceCounters; } /** @return the map of all tasks in this job */ public Map getAllTasks() { return tasksMap; } + /** @return the map of all completed task attempts in this job */ + public Map getAllCompletedTaskAttempts() { return completedTaskAttemptsMap; } /** @return the priority of this job */ public String getPriority() { return priority.toString(); } public Map getJobACLs() { return jobACLs; }