Author: schen
Date: Fri Oct 29 01:29:29 2010
New Revision: 1028581
URL: http://svn.apache.org/viewvc?rev=1028581&view=rev
Log:
MAPREDUCE-2099. RaidNode recreates outdated parity HARs. (Ramkumar Vadali via
schen)
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1028581&r1=1028580&r2=1028581&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Oct 29 01:29:29 2010
@@ -359,6 +359,9 @@ Trunk (unreleased changes)
MAPREDUCE-2150. RaidNode periodically fixes corrupt blocks. (Ramkumar Vadali via
schen)
+ MAPREDUCE-2099. RaidNode recreates outdated parity HARs. (Ramkumar Vadali
+ via schen)
+
Release 0.21.1 - Unreleased
NEW FEATURES
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=1028581&r1=1028580&r2=1028581&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
Fri Oct 29 01:29:29 2010
@@ -105,6 +105,7 @@ public class RaidNode implements RaidPro
Daemon triggerThread = null;
/** Deamon thread to delete obsolete parity files */
+ PurgeMonitor purgeMonitor = null;
Daemon purgeThread = null;
/** Deamon thread to har raid directories */
@@ -289,7 +290,8 @@ public class RaidNode implements RaidPro
this.triggerThread.start();
// start the thread that deletes obsolete parity files
- this.purgeThread = new Daemon(new PurgeMonitor());
+ this.purgeMonitor = new PurgeMonitor();
+ this.purgeThread = new Daemon(purgeMonitor);
this.purgeThread.start();
// start the thread that creates HAR files
@@ -899,12 +901,22 @@ public class RaidNode implements RaidPro
LOG.debug("Checking " + destPath + " prefix " + destPrefix);
// Verify if it is a har file
- if (destStr.endsWith(HAR_SUFFIX)) {
- String destParentStr = destPath.getParent().toUri().getPath();
- String src = destParentStr.replaceFirst(destPrefix, "");
- Path srcPath = new Path(src);
- if (!srcFs.exists(srcPath)) {
- destFs.delete(destPath, true);
+ if (dest.isDirectory() && destStr.endsWith(HAR_SUFFIX)) {
+ try {
+ int harUsedPercent =
+ usefulHar(srcFs, destFs, destPath, destPrefix, conf);
+ LOG.info("Useful percentage of " + destStr + " " + harUsedPercent);
+ // Delete the har if its usefulness reaches a threshold.
+ if (harUsedPercent <= conf.getInt("raid.har.usage.threshold", 0)) {
+ LOG.info("Purging " + destStr + " at usage " + harUsedPercent);
+ boolean done = destFs.delete(destPath, true);
+ if (!done) {
+ LOG.error("Could not purge " + destPath);
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Error during purging " + destStr + " " +
+ StringUtils.stringifyException(e));
}
return;
}
@@ -919,14 +931,9 @@ public class RaidNode implements RaidPro
if (dest.isDirectory()) {
FileStatus[] files = null;
files = destFs.listStatus(destPath);
- if (files != null) {
- for (FileStatus one:files) {
- recursePurge(srcFs, destFs, destPrefix, one);
- }
- }
- files = destFs.listStatus(destPath);
if (files == null || files.length == 0){
- boolean done = destFs.delete(destPath,false);
+ boolean done = destFs.delete(destPath,true); // ideal is false, but
+ // DFSClient only deletes directories if it is recursive
if (done) {
LOG.info("Purged directory " + destPath );
}
@@ -934,6 +941,13 @@ public class RaidNode implements RaidPro
LOG.info("Unable to purge directory " + destPath);
}
}
+ if (files != null) {
+ for (FileStatus one:files) {
+ recursePurge(srcFs, destFs, destPrefix, one);
+ }
+ }
+ // If the directory is empty now, it will be purged the next time this
+ // thread runs.
return; // the code below does the file checking
}
@@ -955,7 +969,53 @@ public class RaidNode implements RaidPro
}
}
-
+ //
+ // Returns the number of up-to-date files in the har as a percentage of the
+ // total number of files in the har.
+ //
+ protected static int usefulHar(
+ FileSystem srcFs, FileSystem destFs,
+ Path harPath, String destPrefix, Configuration conf) throws IOException {
+
+ FileSystem fsHar = new HarFileSystem(destFs);
+ String harURIPath = harPath.toUri().getPath();
+ Path qualifiedPath = new Path("har://", harURIPath +
+ Path.SEPARATOR + harPath.getParent().toUri().getPath());
+ fsHar.initialize(qualifiedPath.toUri(), conf);
+ FileStatus[] filesInHar = fsHar.listStatus(qualifiedPath);
+ if (filesInHar.length == 0) {
+ return 0;
+ }
+ int numUseless = 0;
+ for (FileStatus one: filesInHar) {
+ Path parityPath = one.getPath();
+ String parityStr = parityPath.toUri().getPath();
+ if (parityStr.startsWith("har:/")) {
+ LOG.error("Unexpected prefix har:/ for " + parityStr);
+ continue;
+ }
+ String prefixToReplace = harURIPath + destPrefix;
+ if (!parityStr.startsWith(prefixToReplace)) {
+ continue;
+ }
+ String src = parityStr.substring(prefixToReplace.length());
+ try {
+ FileStatus srcStatus = srcFs.getFileStatus(new Path(src));
+ if (srcStatus == null) {
+ numUseless++;
+ } else if (one.getModificationTime() !=
+ srcStatus.getModificationTime()) {
+ numUseless++;
+ }
+ } catch (FileNotFoundException e) {
+ LOG.info("File not found: " + e);
+ numUseless++;
+ }
+ }
+ int uselessPercent = numUseless * 100 / filesInHar.length;
+ return 100 - uselessPercent;
+ }
+
private void doHar() throws IOException, InterruptedException {
PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java?rev=1028581&r1=1028580&r2=1028581&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
Fri Oct 29 01:29:29 2010
@@ -47,6 +47,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.hdfs.TestRaidDfs;
import org.apache.hadoop.raid.protocol.PolicyInfo;
import org.apache.hadoop.raid.protocol.PolicyList;
@@ -107,7 +108,14 @@ public class TestRaidPurge extends TestC
* create raid.xml file for RaidNode
*/
private void mySetup(String srcPath, long targetReplication,
- long metaReplication, long stripeLength) throws Exception {
+ long metaReplication, long stripeLength)
+ throws Exception {
+ mySetup(srcPath, targetReplication, metaReplication, stripeLength, 1);
+ }
+
+ private void mySetup(String srcPath, long targetReplication,
+ long metaReplication, long stripeLength, int harDelay)
+ throws Exception {
FileWriter fileWriter = new FileWriter(CONFIG_FILE);
fileWriter.write("<?xml version=\"1.0\"?>\n");
String str = "<configuration> " +
@@ -139,6 +147,12 @@ public class TestRaidPurge extends TestC
"a candidate for RAIDing " +
"</description> " +
"</property> " +
+ "<property> " +
+ "<name>time_before_har</name> " +
+ "<value> " + harDelay + "</value> " +
+ "<description> time before har'ing parity files" +
+ "</description> " +
+ "</property> " +
"</policy>" +
"</srcPath>" +
"</configuration>";
@@ -260,4 +274,74 @@ public class TestRaidPurge extends TestC
LOG.info("doTestPurge completed:" + " blockSize=" + blockSize +
" stripeLength=" + stripeLength);
}
+
+ /**
+ * Create a file, wait for parity file to get HARed. Then modify the file,
+ * wait for the HAR to get purged.
+ */
+ public void testPurgeHar() throws Exception {
+ LOG.info("testPurgeHar started");
+ int harDelay = 0;
+ createClusters(true);
+ mySetup("/user/dhruba/raidtest", 1, 1, 5, harDelay);
+ Path dir = new Path("/user/dhruba/raidtest/");
+ Path destPath = new Path("/destraid/user/dhruba/raidtest");
+ Path file1 = new Path(dir + "/file");
+ RaidNode cnode = null;
+ try {
+ TestRaidNode.createOldFile(fileSys, file1, 1, 8, 8192L);
+ LOG.info("testPurgeHar created test files");
+
+ // create an instance of the RaidNode
+ Configuration localConf = new Configuration(conf);
+ localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+ cnode = RaidNode.createRaidNode(null, localConf);
+
+ // Wait till har is created.
+ while (true) {
+ try {
+ FileStatus[] listPaths = listPaths = fileSys.listStatus(destPath);
+ if (listPaths != null && listPaths.length == 1) {
+ FileStatus s = listPaths[0];
+ LOG.info("testPurgeHar found path " + s.getPath());
+ if (s.getPath().toString().endsWith(".har")) {
+ break;
+ }
+ }
+ } catch (FileNotFoundException e) {
+ //ignore
+ }
+ Thread.sleep(1000); // keep waiting
+ }
+
+ // Set an old timestamp.
+ fileSys.setTimes(file1, 0, 0);
+
+ boolean found = false;
+ FileStatus[] listPaths = null;
+ while (!found || listPaths == null || listPaths.length > 1) {
+ listPaths = fileSys.listStatus(destPath);
+ if (listPaths != null) {
+ for (FileStatus s: listPaths) {
+ LOG.info("testPurgeHar waiting for parity file to be recreated" +
+ " and har to be deleted found " + s.getPath());
+ if (s.getPath().toString().endsWith("file") &&
+ s.getModificationTime() == 0) {
+ found = true;
+ }
+ }
+ }
+ Thread.sleep(1000);
+ }
+ } catch (Exception e) {
+ LOG.info("testPurgeHar Exception " + e +
+ StringUtils.stringifyException(e));
+ throw e;
+ } finally {
+ if (cnode != null) { cnode.stop(); cnode.join(); }
+ fileSys.delete(dir, true);
+ fileSys.delete(destPath, true);
+ stopClusters();
+ }
+ }
}
|