Author: sharad
Date: Wed Apr 13 08:18:49 2011
New Revision: 1091696
URL: http://svn.apache.org/viewvc?rev=1091696&view=rev
Log:
Add HistoryCleanerService to Job History server. Contributed by Krishna Ramachandran.
Added:
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java
Modified:
hadoop/mapreduce/branches/MR-279/CHANGES.txt
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1091696&r1=1091695&r2=1091696&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Wed Apr 13 08:18:49 2011
@@ -7,6 +7,9 @@ Trunk (unreleased changes)
Implement 'bin/mapred job -list' and 'bin/mapred job
-list-active-trackers'. (acmurthy)
+ Add HistoryCleanerService to Job History server. (Krishna Ramachandran
+ via sharad)
+
INCOMPATIBLE CHANGES
MAPREDUCE-1866. Removes deprecated class
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java?rev=1091696&r1=1091695&r2=1091696&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/YarnMRJobConfig.java
Wed Apr 13 08:18:49 2011
@@ -47,4 +47,10 @@ public class YarnMRJobConfig {
public static final String HISTORY_DONE_DIR_KEY =
"yarn.historyfile.doneDir";
+ public static final String HISTORY_MAXAGE =
+ "yarn.historyfile.maxage";
+ public static final String HS_WEBAPP_BIND_ADDRESS = HS_PREFIX +
+ "address.webapp";
+ public static final String DEFAULT_HS_WEBAPP_BIND_ADDRESS =
+ "0.0.0.0:19888";
}
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java?rev=1091696&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java
(added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryCleanerService.java
Wed Apr 13 08:18:49 2011
@@ -0,0 +1,142 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.hs;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.v2.YarnMRJobConfig;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+public class HistoryCleanerService extends AbstractService {
+
+ private static final Log LOG = LogFactory.getLog(HistoryClientService.class);
+
+ static final long DEFAULT_HISTORY_MAX_AGE = 7 * 24 * 60 * 60 * 1000L;
+ private FileContext doneDirFc;
+ private HistoryCleaner historyCleanerThread = null;
+
+ private Configuration conf;
+
+ public HistoryCleanerService(Configuration conf) {
+ super("HistoryCleanerService");
+ this.conf = conf;
+ }
+
+ public void start() {
+ long maxAgeOfHistoryFiles = conf.getLong(
+ YarnMRJobConfig.HISTORY_MAXAGE, DEFAULT_HISTORY_MAX_AGE);
+ historyCleanerThread = new HistoryCleaner(maxAgeOfHistoryFiles);
+ historyCleanerThread.start();
+ super.start();
+ }
+
+ /** Shut down JobHistory after stopping the History cleaner */
+ @Override
+ public void stop() {
+ LOG.info("Interrupting History Cleaner");
+ historyCleanerThread.interrupt();
+ try {
+ historyCleanerThread.join();
+ } catch (InterruptedException e) {
+ LOG.info("Error with shutting down history cleaner thread");
+ }
+ }
+ /**
+ * Delete history files older than a specified time duration.
+ */
+ class HistoryCleaner extends Thread {
+ static final long ONE_DAY_IN_MS = 7 * 24 * 60 * 60 * 1000L;
+ private long cleanupFrequency;
+ private long maxAgeOfHistoryFiles;
+
+ public HistoryCleaner(long maxAge) {
+ setName("Thread for cleaning up History files");
+ setDaemon(true);
+ this.maxAgeOfHistoryFiles = maxAge;
+ cleanupFrequency = Math.min(ONE_DAY_IN_MS, maxAgeOfHistoryFiles);
+ LOG.info("Job History Cleaner Thread started." +
+ " MaxAge is " +
+ maxAge + " ms(" + ((float)maxAge)/(ONE_DAY_IN_MS) + " days)," +
+ " Cleanup Frequency is " +
+ + cleanupFrequency + " ms (" +
+ ((float)cleanupFrequency)/ONE_DAY_IN_MS + " days)");
+ }
+
+ @Override
+ public void run(){
+
+ while (true) {
+ try {
+ doCleanup();
+ Thread.sleep(cleanupFrequency);
+ }
+ catch (InterruptedException e) {
+ LOG.info("History Cleaner thread exiting");
+ return;
+ }
+ catch (Throwable t) {
+ LOG.warn("History cleaner thread threw an exception", t);
+ }
+ }
+ }
+
+ private void doCleanup() {
+ long now = System.currentTimeMillis();
+ try {
+ String defaultDoneDir = conf.get(
+ YARNApplicationConstants.APPS_STAGING_DIR_KEY) + "/history/done";
+ String jobhistoryDir =
+ conf.get(YarnMRJobConfig.HISTORY_DONE_DIR_KEY, defaultDoneDir);
+ Path done = FileContext.getFileContext(conf).makeQualified(
+ new Path(jobhistoryDir));
+ doneDirFc = FileContext.getFileContext(done.toUri(), conf);
+ RemoteIterator<LocatedFileStatus> historyFiles =
+ doneDirFc.util().listFiles(done, true);
+ if (historyFiles != null) {
+ FileStatus f;
+ while (historyFiles.hasNext()) {
+ f = historyFiles.next();
+ if (now - f.getModificationTime() > maxAgeOfHistoryFiles) {
+ doneDirFc.delete(f.getPath(), true);
+ LOG.info("Deleting old history file : " + f.getPath());
+ }
+ }
+ }
+ } catch (IOException ie) {
+ LOG.info("Error cleaning up history directory" +
+ StringUtils.stringifyException(ie));
+ }
+ }
+ }
+}
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java?rev=1091696&r1=1091695&r2=1091696&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
Wed Apr 13 08:18:49 2011
@@ -33,6 +33,8 @@ import org.apache.hadoop.yarn.service.Co
*****************************************************************/
public class JobHistoryServer extends CompositeService {
private static final Log LOG = LogFactory.getLog(JobHistoryServer.class);
+ private HistoryClientService clientService;
+ private HistoryCleanerService cleanerService;
static{
Configuration.addDefaultResource("mapred-default.xml");
@@ -46,15 +48,18 @@ public class JobHistoryServer extends Co
public synchronized void init(Configuration conf) {
Configuration config = new YarnConfiguration(conf);
HistoryContext history = new JobHistory(conf);
- addService(new HistoryClientService(history));
- //TODO: add HistoryCleaner service
+ clientService = new HistoryClientService(history);
+ cleanerService = new HistoryCleanerService(config);
+ addService(clientService);
+ addService(cleanerService);
super.init(config);
}
public static void main(String[] args) {
StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
+ JobHistoryServer server = null;
try {
- JobHistoryServer server = new JobHistoryServer();
+ server = new JobHistoryServer();
YarnConfiguration conf = new YarnConfiguration(new JobConf());
server.init(conf);
server.start();
|