Author: bobby
Date: Fri Jun 1 22:01:09 2012
New Revision: 1345366
URL: http://svn.apache.org/viewvc?rev=1345366&view=rev
Log:
svn merge -c 1345362. FIXES: MAPREDUCE-4302. NM goes down if error encountered during log
aggregation (Daryn Sharp via bobby)
Added:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationFinishEvent.java
- copied unchanged from r1345362, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationFinishEvent.java
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1345366&r1=1345365&r2=1345366&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Jun 1 22:01:09
2012
@@ -217,6 +217,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4297. Usersmap file in gridmix should not fail on empty lines
(Ravi Prakash via bobby)
+ MAPREDUCE-4302. NM goes down if error encountered during log aggregation
+ (Daryn Sharp via bobby)
+
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1345366&r1=1345365&r2=1345366&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
Fri Jun 1 22:01:09 2012
@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -524,8 +525,8 @@ public class ContainerManagerImpl extend
(CMgrCompletedAppsEvent) event;
for (ApplicationId appID : appsFinishedEvent.getAppsToCleanup()) {
this.dispatcher.getEventHandler().handle(
- new ApplicationEvent(appID,
- ApplicationEventType.FINISH_APPLICATION));
+ new ApplicationFinishEvent(appID,
+ "Application Killed by ResourceManager"));
}
break;
case FINISH_CONTAINERS:
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java?rev=1345366&r1=1345365&r2=1345366&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java
Fri Jun 1 22:01:09 2012
@@ -23,7 +23,7 @@ public enum ApplicationEventType {
// Source: ContainerManager
INIT_APPLICATION,
INIT_CONTAINER,
- FINISH_APPLICATION,
+ FINISH_APPLICATION, // Source: LogAggregationService if init fails
// Source: ResourceLocalizationService
APPLICATION_INITED,
@@ -33,5 +33,6 @@ public enum ApplicationEventType {
APPLICATION_CONTAINER_FINISHED,
// Source: Log Handler
+ APPLICATION_LOG_HANDLING_INITED,
APPLICATION_LOG_HANDLING_FINISHED
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java?rev=1345366&r1=1345365&r2=1345366&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
Fri Jun 1 22:01:09 2012
@@ -141,6 +141,9 @@ public class ApplicationImpl implements
ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
ApplicationEventType.FINISH_APPLICATION,
new AppFinishTriggeredTransition())
+ .addTransition(ApplicationState.INITING, ApplicationState.INITING,
+ ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
+ new AppLogInitDoneTransition())
.addTransition(ApplicationState.INITING, ApplicationState.RUNNING,
ApplicationEventType.APPLICATION_INITED,
new AppInitDoneTransition())
@@ -192,8 +195,7 @@ public class ApplicationImpl implements
/**
* Notify services of new application.
*
- * In particular, this requests that the {@link ResourceLocalizationService}
- * localize the application-scoped resources.
+ * In particular, this initializes the {@link LogAggregationService}
*/
@SuppressWarnings("unchecked")
static class AppInitTransition implements
@@ -203,6 +205,27 @@ public class ApplicationImpl implements
ApplicationInitEvent initEvent = (ApplicationInitEvent)event;
app.applicationACLs = initEvent.getApplicationACLs();
app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);
+ // Inform the logAggregator
+ app.dispatcher.getEventHandler().handle(
+ new LogHandlerAppStartedEvent(app.appId, app.user,
+ app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
+ app.applicationACLs));
+ }
+ }
+
+ /**
+ * Handles the APPLICATION_LOG_HANDLING_INITED event that occurs after
+ * {@link LogAggregationService} has created the directories for the app
+ * and started the aggregation thread for the app.
+ *
+ * In particular, this requests that the {@link ResourceLocalizationService}
+ * localize the application-scoped resources.
+ */
+ @SuppressWarnings("unchecked")
+ static class AppLogInitDoneTransition implements
+ SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+ @Override
+ public void transition(ApplicationImpl app, ApplicationEvent event) {
app.dispatcher.getEventHandler().handle(
new ApplicationLocalizationEvent(
LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
@@ -248,13 +271,6 @@ public class ApplicationImpl implements
SingleArcTransition<ApplicationImpl, ApplicationEvent> {
@Override
public void transition(ApplicationImpl app, ApplicationEvent event) {
-
- // Inform the logAggregator
- app.dispatcher.getEventHandler().handle(
- new LogHandlerAppStartedEvent(app.appId, app.user,
- app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
- app.applicationACLs));
-
// Start all the containers waiting for ApplicationInit
for (Container container : app.containers.values()) {
app.dispatcher.getEventHandler().handle(new ContainerInitEvent(
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java?rev=1345366&r1=1345365&r2=1345366&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
Fri Jun 1 22:01:09 2012
@@ -49,6 +49,9 @@ import org.apache.hadoop.yarn.logaggrega
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
@@ -56,6 +59,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
import org.apache.hadoop.yarn.service.AbstractService;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
public class LogAggregationService extends AbstractService implements
@@ -146,13 +150,13 @@ public class LogAggregationService exten
try {
remoteFS = FileSystem.get(conf);
} catch (IOException e) {
- throw new YarnException("Unable to get Remote FileSystem isntance", e);
+ throw new YarnException("Unable to get Remote FileSystem instance", e);
}
boolean remoteExists = false;
try {
remoteExists = remoteFS.exists(this.remoteRootLogDir);
} catch (IOException e) {
- throw new YarnException("Failed to check for existance of remoteLogDir ["
+ throw new YarnException("Failed to check for existence of remoteLogDir ["
+ this.remoteRootLogDir + "]");
}
if (remoteExists) {
@@ -266,9 +270,26 @@ public class LogAggregationService exten
}
}
+ @SuppressWarnings("unchecked")
private void initApp(final ApplicationId appId, String user,
Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
Map<ApplicationAccessType, String> appAcls) {
+ ApplicationEvent eventResponse;
+ try {
+ initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls);
+ eventResponse = new ApplicationEvent(appId,
+ ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
+ } catch (YarnException e) {
+ eventResponse = new ApplicationFinishEvent(appId,
+ "Application failed to init aggregation: " + e.getMessage());
+ }
+ this.dispatcher.getEventHandler().handle(eventResponse);
+ }
+
+ @VisibleForTesting
+ public void initAppAggregator(final ApplicationId appId, String user,
+ Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
+ Map<ApplicationAccessType, String> appAcls) {
// Get user's FileSystem credentials
UserGroupInformation userUgi =
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java?rev=1345366&r1=1345365&r2=1345366&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
Fri Jun 1 22:01:09 2012
@@ -93,6 +93,7 @@ public class NonAggregatingLogHandler ex
super.stop();
}
+ @SuppressWarnings("unchecked")
@Override
public void handle(LogHandlerEvent event) {
switch (event.getType()) {
@@ -101,6 +102,9 @@ public class NonAggregatingLogHandler ex
(LogHandlerAppStartedEvent) event;
this.appOwners.put(appStartedEvent.getApplicationId(),
appStartedEvent.getUser());
+ this.dispatcher.getEventHandler().handle(
+ new ApplicationEvent(appStartedEvent.getApplicationId(),
+ ApplicationEventType.APPLICATION_LOG_HANDLING_INITED));
break;
case CONTAINER_FINISHED:
// Ignore
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java?rev=1345366&r1=1345365&r2=1345366&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
(original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
Fri Jun 1 22:01:09 2012
@@ -18,10 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
@@ -32,6 +29,7 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
+import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -47,6 +45,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
@@ -81,6 +83,7 @@ import org.apache.hadoop.yarn.util.Build
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.mortbay.util.MultiException;
//@Ignore
@@ -112,7 +115,7 @@ public class TestLogAggregationService e
@Test
@SuppressWarnings("unchecked")
- public void testLocalFileDeletionAfterUpload() throws IOException {
+ public void testLocalFileDeletionAfterUpload() throws Exception {
this.delSrvc = new DeletionService(createContainerExecutor());
this.delSrvc.init(conf);
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
@@ -170,19 +173,23 @@ public class TestLogAggregationService e
logFilePath.toUri().getPath()).exists());
dispatcher.await();
- ArgumentCaptor<ApplicationEvent> eventCaptor =
- ArgumentCaptor.forClass(ApplicationEvent.class);
- verify(appEventHandler).handle(eventCaptor.capture());
- assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
- eventCaptor.getValue().getType());
- assertEquals(appAttemptId.getApplicationId(), eventCaptor.getValue()
- .getApplicationID());
+ ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
+ new ApplicationEvent(
+ appAttemptId.getApplicationId(),
+ ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
+ new ApplicationEvent(
+ appAttemptId.getApplicationId(),
+ ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)
+ };
+
+ checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID");
+ dispatcher.stop();
}
@Test
@SuppressWarnings("unchecked")
- public void testNoContainerOnNode() {
+ public void testNoContainerOnNode() throws Exception {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
@@ -218,19 +225,22 @@ public class TestLogAggregationService e
.exists());
dispatcher.await();
- ArgumentCaptor<ApplicationEvent> eventCaptor =
- ArgumentCaptor.forClass(ApplicationEvent.class);
- verify(appEventHandler).handle(eventCaptor.capture());
- assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
- eventCaptor.getValue().getType());
- verify(appEventHandler).handle(eventCaptor.capture());
- assertEquals(application1, eventCaptor.getValue()
- .getApplicationID());
+
+ ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
+ new ApplicationEvent(
+ application1,
+ ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
+ new ApplicationEvent(
+ application1,
+ ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)
+ };
+ checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID");
+ dispatcher.stop();
}
@Test
@SuppressWarnings("unchecked")
- public void testMultipleAppsLogAggregation() throws IOException {
+ public void testMultipleAppsLogAggregation() throws Exception {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
@@ -299,9 +309,22 @@ public class TestLogAggregationService e
app3LogDir.mkdir();
logAggregationService.handle(new LogHandlerAppStartedEvent(application3,
this.user, null,
- ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
-
+ ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
+ ApplicationEvent expectedInitEvents[] = new ApplicationEvent[]{
+ new ApplicationEvent(
+ application1,
+ ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
+ new ApplicationEvent(
+ application2,
+ ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
+ new ApplicationEvent(
+ application3,
+ ApplicationEventType.APPLICATION_LOG_HANDLING_INITED)
+ };
+ checkEvents(appEventHandler, expectedInitEvents, false, "getType", "getApplicationID");
+ reset(appEventHandler);
+
ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1);
writeContainerLogs(app3LogDir, container31);
logAggregationService.handle(
@@ -339,22 +362,59 @@ public class TestLogAggregationService e
new ContainerId[] { container31, container32 });
dispatcher.await();
- ArgumentCaptor<ApplicationEvent> eventCaptor =
- ArgumentCaptor.forClass(ApplicationEvent.class);
-
- verify(appEventHandler, times(3)).handle(eventCaptor.capture());
- List<ApplicationEvent> capturedEvents = eventCaptor.getAllValues();
- Set<ApplicationId> appIds = new HashSet<ApplicationId>();
- for (ApplicationEvent cap : capturedEvents) {
- assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
- eventCaptor.getValue().getType());
- appIds.add(cap.getApplicationID());
- }
- assertTrue(appIds.contains(application1));
- assertTrue(appIds.contains(application2));
- assertTrue(appIds.contains(application3));
+
+ ApplicationEvent[] expectedFinishedEvents = new ApplicationEvent[]{
+ new ApplicationEvent(
+ application1,
+ ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED),
+ new ApplicationEvent(
+ application2,
+ ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED),
+ new ApplicationEvent(
+ application3,
+ ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)
+ };
+ checkEvents(appEventHandler, expectedFinishedEvents, false, "getType", "getApplicationID");
+ dispatcher.stop();
}
-
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testLogAggregationInitFailsWithoutKillingNM() throws Exception {
+
+ this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+ this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+ this.remoteRootLogDir.getAbsolutePath());
+
+ DrainDispatcher dispatcher = createDispatcher();
+ EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
+ dispatcher.register(ApplicationEventType.class, appEventHandler);
+
+ LogAggregationService logAggregationService = spy(
+ new LogAggregationService(dispatcher, this.context, this.delSrvc,
+ super.dirsHandler));
+ logAggregationService.init(this.conf);
+ logAggregationService.start();
+
+ ApplicationId appId = BuilderUtils.newApplicationId(
+ System.currentTimeMillis(), (int)Math.random());
+ doThrow(new YarnException("KABOOM!"))
+ .when(logAggregationService).initAppAggregator(
+ eq(appId), eq(user), any(Credentials.class),
+ any(ContainerLogsRetentionPolicy.class), anyMap());
+
+ logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
+ this.user, null,
+ ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
+
+ dispatcher.await();
+ ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
+ new ApplicationFinishEvent(appId, "Application failed to init aggregation: KABOOM!")
+ };
+ checkEvents(appEventHandler, expectedEvents, false,
+ "getType", "getApplicationID", "getDiagnostic");
+ }
+
private void writeContainerLogs(File appLogDir, ContainerId containerId)
throws IOException {
// ContainerLogDir should be created
@@ -599,4 +659,77 @@ public class TestLogAggregationService e
Assert.assertEquals("Log aggregator failed to cleanup!", 0,
logAggregationService.getNumAggregators());
}
+
+ @SuppressWarnings("unchecked")
+ private static <T extends Event<?>>
+ void checkEvents(EventHandler<T> eventHandler,
+ T expectedEvents[], boolean inOrder,
+ String... methods) throws Exception {
+ Class<T> genericClass = (Class<T>)expectedEvents.getClass().getComponentType();
+ ArgumentCaptor<T> eventCaptor = ArgumentCaptor.forClass(genericClass);
+ // captor work work unless used via a verify
+ verify(eventHandler, atLeast(0)).handle(eventCaptor.capture());
+ List<T> actualEvents = eventCaptor.getAllValues();
+
+ // batch up exceptions so junit presents them as one
+ MultiException failures = new MultiException();
+ try {
+ assertEquals("expected events", expectedEvents.length, actualEvents.size());
+ } catch (Throwable e) {
+ failures.add(e);
+ }
+ if (inOrder) {
+ // sequentially verify the events
+ int len = Math.max(expectedEvents.length, actualEvents.size());
+ for (int n=0; n < len; n++) {
+ try {
+ String expect = (n < expectedEvents.length)
+ ? eventToString(expectedEvents[n], methods) : null;
+ String actual = (n < actualEvents.size())
+ ? eventToString(actualEvents.get(n), methods) : null;
+ assertEquals("event#"+n, expect, actual);
+ } catch (Throwable e) {
+ failures.add(e);
+ }
+ }
+ } else {
+ // verify the actual events were expected
+ // verify no expected events were not seen
+ Set<String> expectedSet = new HashSet<String>();
+ for (T expectedEvent : expectedEvents) {
+ expectedSet.add(eventToString(expectedEvent, methods));
+ }
+ for (T actualEvent : actualEvents) {
+ try {
+ String actual = eventToString(actualEvent, methods);
+ assertTrue("unexpected event: "+actual, expectedSet.remove(actual));
+ } catch (Throwable e) {
+ failures.add(e);
+ }
+ }
+ for (String expected : expectedSet) {
+ try {
+ Assert.fail("missing event: "+expected);
+ } catch (Throwable e) {
+ failures.add(e);
+ }
+ }
+ }
+ failures.ifExceptionThrow();
+ }
+
+ private static String eventToString(Event<?> event, String[] methods) throws Exception
{
+ StringBuilder sb = new StringBuilder("[ ");
+ for (String m : methods) {
+ try {
+ Method method = event.getClass().getMethod(m);
+ String value = method.invoke(event).toString();
+ sb.append(method.getName()).append("=").append(value).append(" ");
+ } catch (Exception e) {
+ // ignore, actual event may not implement the method...
+ }
+ }
+ sb.append("]");
+ return sb.toString();
+ }
}
|