Author: jlowe
Date: Mon Jul 21 18:22:45 2014
New Revision: 1612371
URL: http://svn.apache.org/r1612371
Log:
svn merge -c 1612358 FIXES: MAPREDUCE-5957. AM throws ClassNotFoundException with job classloader
enabled if custom output format/committer is used. Contributed by Sangjin Lee
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1612371&r1=1612370&r2=1612371&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Mon Jul 21 18:22:45
2014
@@ -23,6 +23,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-5956. Made MR AM not use maxAttempts to determine if the current
attempt is the last retry. (Wangda Tan via zjshen)
+ MAPREDUCE-5957. AM throws ClassNotFoundException with job classloader
+ enabled if custom output format/committer is used (Sangjin Lee via jlowe)
+
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1612371&r1=1612370&r2=1612371&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
Mon Jul 21 18:22:45 2014
@@ -41,7 +41,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher;
@@ -198,6 +197,7 @@ public class MRAppMaster extends Composi
new JobTokenSecretManager();
private JobId jobId;
private boolean newApiCommitter;
+ private ClassLoader jobClassLoader;
private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher;
private JobHistoryEventHandler jobHistoryEventHandler;
@@ -247,6 +247,9 @@ public class MRAppMaster extends Composi
@Override
protected void serviceInit(final Configuration conf) throws Exception {
+ // create the job classloader if enabled
+ createJobClassLoader(conf);
+
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
initJobCredentialsAndUGI(conf);
@@ -446,33 +449,37 @@ public class MRAppMaster extends Composi
}
private OutputCommitter createOutputCommitter(Configuration conf) {
- OutputCommitter committer = null;
-
- LOG.info("OutputCommitter set in config "
- + conf.get("mapred.output.committer.class"));
-
- if (newApiCommitter) {
- org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID = MRBuilderUtils
- .newTaskId(jobId, 0, TaskType.MAP);
- org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID = MRBuilderUtils
- .newTaskAttemptId(taskID, 0);
- TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
- TypeConverter.fromYarn(attemptID));
- OutputFormat outputFormat;
- try {
- outputFormat = ReflectionUtils.newInstance(taskContext
- .getOutputFormatClass(), conf);
- committer = outputFormat.getOutputCommitter(taskContext);
- } catch (Exception e) {
- throw new YarnRuntimeException(e);
+ return callWithJobClassLoader(conf, new Action<OutputCommitter>() {
+ public OutputCommitter call(Configuration conf) {
+ OutputCommitter committer = null;
+
+ LOG.info("OutputCommitter set in config "
+ + conf.get("mapred.output.committer.class"));
+
+ if (newApiCommitter) {
+ org.apache.hadoop.mapreduce.v2.api.records.TaskId taskID =
+ MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+ org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId attemptID =
+ MRBuilderUtils.newTaskAttemptId(taskID, 0);
+ TaskAttemptContext taskContext = new TaskAttemptContextImpl(conf,
+ TypeConverter.fromYarn(attemptID));
+ OutputFormat outputFormat;
+ try {
+ outputFormat = ReflectionUtils.newInstance(taskContext
+ .getOutputFormatClass(), conf);
+ committer = outputFormat.getOutputCommitter(taskContext);
+ } catch (Exception e) {
+ throw new YarnRuntimeException(e);
+ }
+ } else {
+ committer = ReflectionUtils.newInstance(conf.getClass(
+ "mapred.output.committer.class", FileOutputCommitter.class,
+ org.apache.hadoop.mapred.OutputCommitter.class), conf);
+ }
+ LOG.info("OutputCommitter is " + committer.getClass().getName());
+ return committer;
}
- } else {
- committer = ReflectionUtils.newInstance(conf.getClass(
- "mapred.output.committer.class", FileOutputCommitter.class,
- org.apache.hadoop.mapred.OutputCommitter.class), conf);
- }
- LOG.info("OutputCommitter is " + committer.getClass().getName());
- return committer;
+ });
}
protected boolean keepJobFiles(JobConf conf) {
@@ -654,38 +661,42 @@ public class MRAppMaster extends Composi
return new StagingDirCleaningService();
}
- protected Speculator createSpeculator(Configuration conf, AppContext context) {
- Class<? extends Speculator> speculatorClass;
-
- try {
- speculatorClass
- // "yarn.mapreduce.job.speculator.class"
- = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,
- DefaultSpeculator.class,
- Speculator.class);
- Constructor<? extends Speculator> speculatorConstructor
- = speculatorClass.getConstructor
- (Configuration.class, AppContext.class);
- Speculator result = speculatorConstructor.newInstance(conf, context);
-
- return result;
- } catch (InstantiationException ex) {
- LOG.error("Can't make a speculator -- check "
- + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
- throw new YarnRuntimeException(ex);
- } catch (IllegalAccessException ex) {
- LOG.error("Can't make a speculator -- check "
- + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
- throw new YarnRuntimeException(ex);
- } catch (InvocationTargetException ex) {
- LOG.error("Can't make a speculator -- check "
- + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
- throw new YarnRuntimeException(ex);
- } catch (NoSuchMethodException ex) {
- LOG.error("Can't make a speculator -- check "
- + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
- throw new YarnRuntimeException(ex);
- }
+ protected Speculator createSpeculator(Configuration conf,
+ final AppContext context) {
+ return callWithJobClassLoader(conf, new Action<Speculator>() {
+ public Speculator call(Configuration conf) {
+ Class<? extends Speculator> speculatorClass;
+ try {
+ speculatorClass
+ // "yarn.mapreduce.job.speculator.class"
+ = conf.getClass(MRJobConfig.MR_AM_JOB_SPECULATOR,
+ DefaultSpeculator.class,
+ Speculator.class);
+ Constructor<? extends Speculator> speculatorConstructor
+ = speculatorClass.getConstructor
+ (Configuration.class, AppContext.class);
+ Speculator result = speculatorConstructor.newInstance(conf, context);
+
+ return result;
+ } catch (InstantiationException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ throw new YarnRuntimeException(ex);
+ } catch (IllegalAccessException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ throw new YarnRuntimeException(ex);
+ } catch (InvocationTargetException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ throw new YarnRuntimeException(ex);
+ } catch (NoSuchMethodException ex) {
+ LOG.error("Can't make a speculator -- check "
+ + MRJobConfig.MR_AM_JOB_SPECULATOR, ex);
+ throw new YarnRuntimeException(ex);
+ }
+ }
+ });
}
protected TaskAttemptListener createTaskAttemptListener(AppContext context) {
@@ -698,7 +709,7 @@ public class MRAppMaster extends Composi
protected EventHandler<CommitterEvent> createCommitterEventHandler(
AppContext context, OutputCommitter committer) {
return new CommitterEventHandler(context, committer,
- getRMHeartbeatHandler());
+ getRMHeartbeatHandler(), jobClassLoader);
}
protected ContainerAllocator createContainerAllocator(
@@ -1069,8 +1080,8 @@ public class MRAppMaster extends Composi
//start all the components
super.serviceStart();
- // set job classloader if configured
- MRApps.setJobClassLoader(getConfig());
+ // finally set the job classloader
+ MRApps.setClassLoader(jobClassLoader, getConfig());
if (initFailed) {
JobEvent initFailedEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT_FAILED);
@@ -1087,19 +1098,24 @@ public class MRAppMaster extends Composi
TaskLog.syncLogsShutdown(logSyncer);
}
- private boolean isRecoverySupported(OutputCommitter committer2)
- throws IOException {
+ private boolean isRecoverySupported() throws IOException {
boolean isSupported = false;
- JobContext _jobContext;
+ Configuration conf = getConfig();
if (committer != null) {
+ final JobContext _jobContext;
if (newApiCommitter) {
_jobContext = new JobContextImpl(
- getConfig(), TypeConverter.fromYarn(getJobId()));
+ conf, TypeConverter.fromYarn(getJobId()));
} else {
_jobContext = new org.apache.hadoop.mapred.JobContextImpl(
- new JobConf(getConfig()), TypeConverter.fromYarn(getJobId()));
+ new JobConf(conf), TypeConverter.fromYarn(getJobId()));
}
- isSupported = committer.isRecoverySupported(_jobContext);
+ isSupported = callWithJobClassLoader(conf,
+ new ExceptionAction<Boolean>() {
+ public Boolean call(Configuration conf) throws IOException {
+ return committer.isRecoverySupported(_jobContext);
+ }
+ });
}
return isSupported;
}
@@ -1113,7 +1129,7 @@ public class MRAppMaster extends Composi
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
- boolean recoverySupportedByCommitter = isRecoverySupported(committer);
+ boolean recoverySupportedByCommitter = isRecoverySupported();
// If a shuffle secret was not provided by the job client then this app
// attempt will generate one. However that disables recovery if there
@@ -1298,7 +1314,7 @@ public class MRAppMaster extends Composi
this.conf = config;
}
@Override
- public void handle(SpeculatorEvent event) {
+ public void handle(final SpeculatorEvent event) {
if (disabled) {
return;
}
@@ -1325,7 +1341,12 @@ public class MRAppMaster extends Composi
if ( (shouldMapSpec && (tType == null || tType == TaskType.MAP))
|| (shouldReduceSpec && (tType == null || tType == TaskType.REDUCE))) {
// Speculator IS enabled, direct the event to there.
- speculator.handle(event);
+ callWithJobClassLoader(conf, new Action<Void>() {
+ public Void call(Configuration conf) {
+ speculator.handle(event);
+ return null;
+ }
+ });
}
}
@@ -1479,6 +1500,102 @@ public class MRAppMaster extends Composi
});
}
+ /**
+ * Creates a job classloader based on the configuration if the job classloader
+ * is enabled. It is a no-op if the job classloader is not enabled.
+ */
+ private void createJobClassLoader(Configuration conf) throws IOException {
+ jobClassLoader = MRApps.createJobClassLoader(conf);
+ }
+
+ /**
+ * Executes the given action with the job classloader set as the configuration
+ * classloader as well as the thread context class loader if the job
+ * classloader is enabled. After the call, the original classloader is
+ * restored.
+ *
+ * If the job classloader is enabled and the code needs to load user-supplied
+ * classes via configuration or thread context classloader, this method should
+ * be used in order to load them.
+ *
+ * @param conf the configuration on which the classloader will be set
+ * @param action the callable action to be executed
+ */
+ <T> T callWithJobClassLoader(Configuration conf, Action<T> action) {
+ // if the job classloader is enabled, we may need it to load the (custom)
+ // classes; we make the job classloader available and unset it once it is
+ // done
+ ClassLoader currentClassLoader = conf.getClassLoader();
+ boolean setJobClassLoader =
+ jobClassLoader != null && currentClassLoader != jobClassLoader;
+ if (setJobClassLoader) {
+ MRApps.setClassLoader(jobClassLoader, conf);
+ }
+ try {
+ return action.call(conf);
+ } finally {
+ if (setJobClassLoader) {
+ // restore the original classloader
+ MRApps.setClassLoader(currentClassLoader, conf);
+ }
+ }
+ }
+
+ /**
+ * Executes the given action that can throw a checked exception with the job
+ * classloader set as the configuration classloader as well as the thread
+ * context class loader if the job classloader is enabled. After the call, the
+ * original classloader is restored.
+ *
+ * If the job classloader is enabled and the code needs to load user-supplied
+ * classes via configuration or thread context classloader, this method should
+ * be used in order to load them.
+ *
+ * @param conf the configuration on which the classloader will be set
+ * @param action the callable action to be executed
+ * @throws IOException if the underlying action throws an IOException
+ * @throws YarnRuntimeException if the underlying action throws an exception
+ * other than an IOException
+ */
+ <T> T callWithJobClassLoader(Configuration conf, ExceptionAction<T> action)
+ throws IOException {
+ // if the job classloader is enabled, we may need it to load the (custom)
+ // classes; we make the job classloader available and unset it once it is
+ // done
+ ClassLoader currentClassLoader = conf.getClassLoader();
+ boolean setJobClassLoader =
+ jobClassLoader != null && currentClassLoader != jobClassLoader;
+ if (setJobClassLoader) {
+ MRApps.setClassLoader(jobClassLoader, conf);
+ }
+ try {
+ return action.call(conf);
+ } catch (IOException e) {
+ throw e;
+ } catch (YarnRuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ // wrap it with a YarnRuntimeException
+ throw new YarnRuntimeException(e);
+ } finally {
+ if (setJobClassLoader) {
+ // restore the original classloader
+ MRApps.setClassLoader(currentClassLoader, conf);
+ }
+ }
+ }
+
+ /**
+ * Action to be wrapped with setting and unsetting the job classloader
+ */
+ private static interface Action<T> {
+ T call(Configuration conf);
+ }
+
+ private static interface ExceptionAction<T> {
+ T call(Configuration conf) throws Exception;
+ }
+
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java?rev=1612371&r1=1612370&r2=1612371&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
Mon Jul 21 18:22:45 2014
@@ -68,6 +68,7 @@ public class CommitterEventHandler exten
private BlockingQueue<CommitterEvent> eventQueue =
new LinkedBlockingQueue<CommitterEvent>();
private final AtomicBoolean stopped;
+ private final ClassLoader jobClassLoader;
private Thread jobCommitThread = null;
private int commitThreadCancelTimeoutMs;
private long commitWindowMs;
@@ -79,11 +80,17 @@ public class CommitterEventHandler exten
public CommitterEventHandler(AppContext context, OutputCommitter committer,
RMHeartbeatHandler rmHeartbeatHandler) {
+ this(context, committer, rmHeartbeatHandler, null);
+ }
+
+ public CommitterEventHandler(AppContext context, OutputCommitter committer,
+ RMHeartbeatHandler rmHeartbeatHandler, ClassLoader jobClassLoader) {
super("CommitterEventHandler");
this.context = context;
this.committer = committer;
this.rmHeartbeatHandler = rmHeartbeatHandler;
this.stopped = new AtomicBoolean(false);
+ this.jobClassLoader = jobClassLoader;
}
@Override
@@ -109,9 +116,23 @@ public class CommitterEventHandler exten
@Override
protected void serviceStart() throws Exception {
- ThreadFactory tf = new ThreadFactoryBuilder()
- .setNameFormat("CommitterEvent Processor #%d")
- .build();
+ ThreadFactoryBuilder tfBuilder = new ThreadFactoryBuilder()
+ .setNameFormat("CommitterEvent Processor #%d");
+ if (jobClassLoader != null) {
+ // if the job classloader is enabled, we need to use the job classloader
+ // as the thread context classloader (TCCL) of these threads in case the
+ // committer needs to load another class via TCCL
+ ThreadFactory backingTf = new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r);
+ thread.setContextClassLoader(jobClassLoader);
+ return thread;
+ }
+ };
+ tfBuilder.setThreadFactory(backingTf);
+ }
+ ThreadFactory tf = tfBuilder.build();
launcherPool = new ThreadPoolExecutor(5, 5, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
eventHandlingThread = new Thread(new Runnable() {
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java?rev=1612371&r1=1612370&r2=1612371&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/util/MRApps.java
Mon Jul 21 18:22:45 2014
@@ -327,8 +327,8 @@ public class MRApps extends Apps {
}
/**
- * Sets a {@link ApplicationClassLoader} on the given configuration and as
- * the context classloader, if
+ * Creates and sets a {@link ApplicationClassLoader} on the given
+ * configuration and as the thread context classloader, if
* {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
* the APP_CLASSPATH environment variable is set.
* @param conf
@@ -336,24 +336,52 @@ public class MRApps extends Apps {
*/
public static void setJobClassLoader(Configuration conf)
throws IOException {
+ setClassLoader(createJobClassLoader(conf), conf);
+ }
+
+ /**
+ * Creates a {@link ApplicationClassLoader} if
+ * {@link MRJobConfig#MAPREDUCE_JOB_CLASSLOADER} is set to true, and
+ * the APP_CLASSPATH environment variable is set.
+ * @param conf
+ * @returns the created job classloader, or null if the job classloader is not
+ * enabled or the APP_CLASSPATH environment variable is not set
+ * @throws IOException
+ */
+ public static ClassLoader createJobClassLoader(Configuration conf)
+ throws IOException {
+ ClassLoader jobClassLoader = null;
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, false)) {
String appClasspath = System.getenv(Environment.APP_CLASSPATH.key());
if (appClasspath == null) {
- LOG.warn("Not using job classloader since APP_CLASSPATH is not set.");
+ LOG.warn("Not creating job classloader since APP_CLASSPATH is not set.");
} else {
- LOG.info("Using job classloader");
+ LOG.info("Creating job classloader");
if (LOG.isDebugEnabled()) {
LOG.debug("APP_CLASSPATH=" + appClasspath);
}
String[] systemClasses = getSystemClasses(conf);
- ClassLoader jobClassLoader = createJobClassLoader(appClasspath,
+ jobClassLoader = createJobClassLoader(appClasspath,
systemClasses);
- if (jobClassLoader != null) {
- conf.setClassLoader(jobClassLoader);
- Thread.currentThread().setContextClassLoader(jobClassLoader);
- }
}
}
+ return jobClassLoader;
+ }
+
+ /**
+ * Sets the provided classloader on the given configuration and as the thread
+ * context classloader if the classloader is not null.
+ * @param classLoader
+ * @param conf
+ */
+ public static void setClassLoader(ClassLoader classLoader,
+ Configuration conf) {
+ if (classLoader != null) {
+ LOG.info("Setting classloader " + classLoader.getClass().getName() +
+ " on the configuration and as the thread context classloader");
+ conf.setClassLoader(classLoader);
+ Thread.currentThread().setContextClassLoader(classLoader);
+ }
}
@VisibleForTesting
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java?rev=1612371&r1=1612370&r2=1612371&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
(original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
Mon Jul 21 18:22:45 2014
@@ -33,8 +33,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
-import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.FailingMapper;
@@ -77,6 +77,10 @@ import org.apache.hadoop.mapreduce.lib.o
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.MRAppMaster;
+import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
+import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
@@ -86,6 +90,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.util.ApplicationClassLoader;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.Level;
import org.junit.AfterClass;
@@ -210,7 +215,19 @@ public class TestMRJobs {
@Test(timeout = 300000)
public void testJobClassloader() throws IOException, InterruptedException,
ClassNotFoundException {
- LOG.info("\n\n\nStarting testJobClassloader().");
+ testJobClassloader(false);
+ }
+
+ @Test(timeout = 300000)
+ public void testJobClassloaderWithCustomClasses() throws IOException,
+ InterruptedException, ClassNotFoundException {
+ testJobClassloader(true);
+ }
+
+ private void testJobClassloader(boolean useCustomClasses) throws IOException,
+ InterruptedException, ClassNotFoundException {
+ LOG.info("\n\n\nStarting testJobClassloader()"
+ + " useCustomClasses=" + useCustomClasses);
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
@@ -221,6 +238,19 @@ public class TestMRJobs {
// set master address to local to test that local mode applied iff framework == local
sleepConf.set(MRConfig.MASTER_ADDRESS, "local");
sleepConf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
+ if (useCustomClasses) {
+ // to test AM loading user classes such as output format class, we want
+ // to blacklist them from the system classes (they need to be prepended
+ // as the first match wins)
+ String systemClasses =
+ sleepConf.get(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES);
+ // exclude the custom classes from system classes
+ systemClasses = "-" + CustomOutputFormat.class.getName() + ",-" +
+ CustomSpeculator.class.getName() + "," +
+ systemClasses;
+ sleepConf.set(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER_SYSTEM_CLASSES,
+ systemClasses);
+ }
sleepConf.set(MRJobConfig.IO_SORT_MB, TEST_IO_SORT_MB);
sleepConf.set(MRJobConfig.MR_AM_LOG_LEVEL, Level.ALL.toString());
sleepConf.set(MRJobConfig.MAP_LOG_LEVEL, Level.ALL.toString());
@@ -233,12 +263,66 @@ public class TestMRJobs {
job.addFileToClassPath(APP_JAR); // The AppMaster jar itself.
job.setJarByClass(SleepJob.class);
job.setMaxMapAttempts(1); // speed up failures
+ if (useCustomClasses) {
+ // set custom output format class and speculator class
+ job.setOutputFormatClass(CustomOutputFormat.class);
+ final Configuration jobConf = job.getConfiguration();
+ jobConf.setClass(MRJobConfig.MR_AM_JOB_SPECULATOR, CustomSpeculator.class,
+ Speculator.class);
+ // speculation needs to be enabled for the speculator to be loaded
+ jobConf.setBoolean(MRJobConfig.MAP_SPECULATIVE, true);
+ }
job.submit();
boolean succeeded = job.waitForCompletion(true);
Assert.assertTrue("Job status: " + job.getStatus().getFailureInfo(),
succeeded);
}
+ public static class CustomOutputFormat<K,V> extends NullOutputFormat<K,V> {
+ public CustomOutputFormat() {
+ verifyClassLoader(getClass());
+ }
+
+ /**
+ * Verifies that the class was loaded by the job classloader if it is in the
+ * context of the MRAppMaster, and if not throws an exception to fail the
+ * job.
+ */
+ private void verifyClassLoader(Class<?> cls) {
+ // to detect that it is instantiated in the context of the MRAppMaster, we
+ // inspect the stack trace and determine a caller is MRAppMaster
+ for (StackTraceElement e: new Throwable().getStackTrace()) {
+ if (e.getClassName().equals(MRAppMaster.class.getName()) &&
+ !(cls.getClassLoader() instanceof ApplicationClassLoader)) {
+ throw new ExceptionInInitializerError("incorrect classloader used");
+ }
+ }
+ }
+ }
+
+ public static class CustomSpeculator extends DefaultSpeculator {
+ public CustomSpeculator(Configuration conf, AppContext context) {
+ super(conf, context);
+ verifyClassLoader(getClass());
+ }
+
+ /**
+ * Verifies that the class was loaded by the job classloader if it is in the
+ * context of the MRAppMaster, and if not throws an exception to fail the
+ * job.
+ */
+ private void verifyClassLoader(Class<?> cls) {
+ // to detect that it is instantiated in the context of the MRAppMaster, we
+ // inspect the stack trace and determine a caller is MRAppMaster
+ for (StackTraceElement e: new Throwable().getStackTrace()) {
+ if (e.getClassName().equals(MRAppMaster.class.getName()) &&
+ !(cls.getClassLoader() instanceof ApplicationClassLoader)) {
+ throw new ExceptionInInitializerError("incorrect classloader used");
+ }
+ }
+ }
+ }
+
protected void verifySleepJobCounters(Job job) throws InterruptedException,
IOException {
Counters counters = job.getCounters();
|