Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=1021873&r1=1021872&r2=1021873&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java Tue Oct 12 18:23:36 2010
@@ -21,10 +21,12 @@ package org.apache.hadoop.raid;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.LinkedList;
import java.util.Iterator;
import java.util.Arrays;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.HashSet;
@@ -73,7 +75,9 @@ public class RaidNode implements RaidPro
public static final long SLEEP_TIME = 10000L; // 10 seconds
public static final int DEFAULT_PORT = 60000;
public static final int DEFAULT_STRIPE_LENGTH = 5; // default value of stripe length
+ public static final String STRIPE_LENGTH_KEY = "hdfs.raid.stripeLength";
public static final String DEFAULT_RAID_LOCATION = "/raid";
+ public static final String RAID_LOCATION_KEY = "hdfs.raid.locations";
public static final String HAR_SUFFIX = "_raid.har";
/** RPC server */
@@ -101,6 +105,10 @@ public class RaidNode implements RaidPro
/** Deamon thread to har raid directories */
Daemon harThread = null;
+ /** Daemon thread to monitor distributed raid job progress */
+ JobMonitor jobMonitor = null;
+ Daemon jobMonitorThread = null;
+
/** Do do distributed raiding */
boolean isRaidLocal = false;
@@ -168,6 +176,7 @@ public class RaidNode implements RaidPro
try {
initialize(conf);
} catch (IOException e) {
+ LOG.error(StringUtils.stringifyException(e));
this.stop();
throw e;
} catch (Exception e) {
@@ -193,6 +202,7 @@ public class RaidNode implements RaidPro
try {
if (server != null) server.join();
if (triggerThread != null) triggerThread.join();
+ if (jobMonitorThread != null) jobMonitorThread.join();
if (purgeThread != null) purgeThread.join();
} catch (InterruptedException ie) {
// do nothing
@@ -210,6 +220,8 @@ public class RaidNode implements RaidPro
running = false;
if (server != null) server.stop();
if (triggerThread != null) triggerThread.interrupt();
+ if (jobMonitor != null) jobMonitor.running = false;
+ if (jobMonitorThread != null) jobMonitorThread.interrupt();
if (purgeThread != null) purgeThread.interrupt();
}
@@ -252,6 +264,10 @@ public class RaidNode implements RaidPro
running = true;
this.server.start(); // start RPC server
+ this.jobMonitor = new JobMonitor(conf);
+ this.jobMonitorThread = new Daemon(this.jobMonitor);
+ this.jobMonitorThread.start();
+
// start the deamon thread to fire polcies appropriately
this.triggerThread = new Daemon(new TriggerMonitor());
this.triggerThread.start();
@@ -282,22 +298,15 @@ public class RaidNode implements RaidPro
LOG.info("Recover File for " + inStr + " for corrupt offset " + corruptOffset);
Path inputPath = new Path(inStr);
Path srcPath = inputPath.makeQualified(inputPath.getFileSystem(conf));
- PolicyInfo info = findMatchingPolicy(srcPath);
- if (info != null) {
-
- // find stripe length from config
- int stripeLength = getStripeLength(conf, info);
+ // find stripe length from config
+ int stripeLength = getStripeLength(conf);
- // create destination path prefix
- String destPrefix = getDestinationPath(conf, info);
- Path destPath = new Path(destPrefix.trim());
- FileSystem fs = FileSystem.get(destPath.toUri(), conf);
- destPath = destPath.makeQualified(fs);
-
- Path unraided = unRaid(conf, srcPath, destPath, stripeLength, corruptOffset);
- if (unraided != null) {
- return unraided.toString();
- }
+ Path destPref = getDestinationPath(conf);
+ Decoder decoder = new XORDecoder(conf, RaidNode.getStripeLength(conf));
+ Path unraided = unRaid(conf, srcPath, destPref, decoder,
+ stripeLength, corruptOffset);
+ if (unraided != null) {
+ return unraided.toString();
}
return null;
}
@@ -306,6 +315,11 @@ public class RaidNode implements RaidPro
* Periodically checks to see which policies should be fired.
*/
class TriggerMonitor implements Runnable {
+
+ private Map<String, Long> scanTimes = new HashMap<String, Long>();
+ private Map<String, DirectoryTraversal> scanState =
+ new HashMap<String, DirectoryTraversal>();
+
/**
*/
public void run() {
@@ -320,6 +334,109 @@ public class RaidNode implements RaidPro
}
}
+ /**
+ * Should we select more files for a policy.
+ */
+ private boolean shouldSelectFiles(PolicyInfo info) {
+ String policyName = info.getName();
+ int runningJobsCount = jobMonitor.runningJobsCount(policyName);
+ // Is there a scan in progress for this policy?
+ if (scanState.containsKey(policyName)) {
+ int maxJobsPerPolicy = configMgr.getMaxJobsPerPolicy();
+
+ // If there is a scan in progress for this policy, we can have
+ // upto maxJobsPerPolicy running jobs.
+ return (runningJobsCount < maxJobsPerPolicy);
+ } else {
+ // If there isn't a scan in progress for this policy, we don't
+ // want to start a fresh scan if there is even one running job.
+ if (runningJobsCount >= 1) {
+ return false;
+ }
+ // Check the time of the last full traversal before starting a fresh
+ // traversal.
+ if (scanTimes.containsKey(policyName)) {
+ long lastScan = scanTimes.get(policyName);
+ return (now() > lastScan + configMgr.getPeriodicity());
+ } else {
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Returns a list of pathnames that needs raiding.
+ * The list of paths could be obtained by resuming a previously suspended
+ * traversal.
+ * The number of paths returned is limited by raid.distraid.max.jobs.
+ */
+ private List<FileStatus> selectFiles(PolicyInfo info) throws IOException {
+ Path destPrefix = getDestinationPath(conf);
+ String policyName = info.getName();
+ Path srcPath = info.getSrcPath();
+ long modTimePeriod = 0;
+ String str = info.getProperty("modTimePeriod");
+ if (str != null) {
+ modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));
+ }
+ short srcReplication = 0;
+ str = info.getProperty("srcReplication");
+ if (str != null) {
+ srcReplication = Short.parseShort(info.getProperty("srcReplication"));
+ }
+
+ // Max number of files returned.
+ int selectLimit = configMgr.getMaxFilesPerJob();
+ int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
+
+ // If we have a pending traversal, resume it.
+ if (scanState.containsKey(policyName)) {
+ DirectoryTraversal dt = scanState.get(policyName);
+ LOG.info("Resuming traversal for policy " + policyName);
+ List<FileStatus> returnSet = dt.selectFilesToRaid(
+ conf, targetRepl, destPrefix, modTimePeriod, selectLimit);
+ if (dt.doneTraversal()) {
+ scanState.remove(policyName);
+ }
+ return returnSet;
+ }
+
+ // Expand destination prefix path.
+ String destpstr = destPrefix.toString();
+ if (!destpstr.endsWith(Path.SEPARATOR)) {
+ destpstr += Path.SEPARATOR;
+ }
+
+ List<FileStatus> returnSet = new LinkedList<FileStatus>();
+
+ FileSystem fs = srcPath.getFileSystem(conf);
+ FileStatus[] gpaths = fs.globStatus(srcPath);
+ if (gpaths != null) {
+ List<FileStatus> selectedPaths = new LinkedList<FileStatus>();
+ for (FileStatus onepath: gpaths) {
+ String pathstr = onepath.getPath().makeQualified(fs).toString();
+ if (!pathstr.endsWith(Path.SEPARATOR)) {
+ pathstr += Path.SEPARATOR;
+ }
+ if (pathstr.startsWith(destpstr) || destpstr.startsWith(pathstr)) {
+ LOG.info("Skipping source " + pathstr +
+ " because it conflicts with raid directory " + destpstr);
+ } else {
+ selectedPaths.add(onepath);
+ }
+ }
+
+ // Set the time for a new traversal.
+ scanTimes.put(policyName, now());
+ DirectoryTraversal dt = new DirectoryTraversal(fs, selectedPaths);
+ returnSet = dt.selectFilesToRaid(
+ conf, targetRepl, destPrefix, modTimePeriod, selectLimit);
+ if (!dt.doneTraversal()) {
+ scanState.put(policyName, dt);
+ }
+ }
+ return returnSet;
+ }
/**
* Keep processing policies.
@@ -328,18 +445,11 @@ public class RaidNode implements RaidPro
private void doProcess() throws IOException, InterruptedException {
PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
- long prevExec = 0;
- DistRaid dr = null;
while (running) {
+ Thread.sleep(SLEEP_TIME);
- boolean reload = configMgr.reloadConfigsIfNecessary();
- while(!reload && now() < prevExec + configMgr.getPeriodicity()){
- Thread.sleep(SLEEP_TIME);
- reload = configMgr.reloadConfigsIfNecessary();
- }
+ configMgr.reloadConfigsIfNecessary();
- prevExec = now();
-
// activate all categories
Collection<PolicyList> all = configMgr.getAllPolicies();
@@ -348,35 +458,18 @@ public class RaidNode implements RaidPro
PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
Arrays.sort(sorted, lexi);
- if (!isRaidLocal) {
- dr = new DistRaid(conf);
- }
- // paths we have processed so far
- List<String> processed = new LinkedList<String>();
-
for (PolicyList category : sorted) {
for (PolicyInfo info: category.getAll()) {
- long modTimePeriod = 0;
- short srcReplication = 0;
- String str = info.getProperty("modTimePeriod");
- if (str != null) {
- modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));
- }
- str = info.getProperty("srcReplication");
- if (str != null) {
- srcReplication = Short.parseShort(info.getProperty("srcReplication"));
+ if (!shouldSelectFiles(info)) {
+ continue;
}
LOG.info("Triggering Policy Filter " + info.getName() +
" " + info.getSrcPath());
List<FileStatus> filteredPaths = null;
- try {
- filteredPaths = selectFiles(conf, info.getSrcPath(),
- getDestinationPath(conf, info),
- modTimePeriod,
- srcReplication,
- prevExec);
+ try {
+ filteredPaths = selectFiles(info);
} catch (Exception e) {
LOG.info("Exception while invoking filter on policy " + info.getName() +
" srcPath " + info.getSrcPath() +
@@ -389,95 +482,41 @@ public class RaidNode implements RaidPro
continue;
}
- // If any of the filtered path has already been accepted
- // by a previous policy, then skip it.
- for (Iterator<FileStatus> iter = filteredPaths.iterator(); iter.hasNext();) {
- String fs = iter.next().getPath().toString() + "/";
- for (String p : processed) {
- if (p.startsWith(fs)) {
- iter.remove();
- break;
- }
- }
- }
-
// Apply the action on accepted paths
- LOG.info("Triggering Policy Action " + info.getName());
+ LOG.info("Triggering Policy Action " + info.getName() +
+ " " + info.getSrcPath());
try {
- if (isRaidLocal){
- doRaid(conf, info, filteredPaths);
- }
- else{
- //add paths for distributed raiding
- dr.addRaidPaths(info, filteredPaths);
- }
+ if (isRaidLocal){
+ doRaid(conf, info, filteredPaths);
+ }
+ else{
+ // We already checked that no job for this policy is running
+ // So we can start a new job.
+ DistRaid dr = new DistRaid(conf);
+ //add paths for distributed raiding
+ dr.addRaidPaths(info, filteredPaths);
+ boolean started = dr.startDistRaid();
+ if (started) {
+ jobMonitor.monitorJob(info.getName(), dr);
+ }
+ }
} catch (Exception e) {
LOG.info("Exception while invoking action on policy " + info.getName() +
" srcPath " + info.getSrcPath() +
" exception " + StringUtils.stringifyException(e));
continue;
}
-
- // add these paths to processed paths
- for (Iterator<FileStatus> iter = filteredPaths.iterator(); iter.hasNext();) {
- String p = iter.next().getPath().toString() + "/";
- processed.add(p);
- }
}
}
- processed.clear(); // free up memory references before yielding
-
- //do the distributed raiding
- if (!isRaidLocal) {
- dr.doDistRaid();
- }
}
}
}
- /**
- * Returns the policy that matches the specified path.
- * The method below finds the first policy that matches an input path. Since different
- * policies with different purposes and destinations might be associated with the same input
- * path, we should be skeptical about the places using the method and we should try to change
- * the code to avoid it.
- */
- private PolicyInfo findMatchingPolicy(Path inpath) throws IOException {
- PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
- Collection<PolicyList> all = configMgr.getAllPolicies();
-
- // sort all policies by reverse lexicographical order. This is needed
- // to make the nearest policy take precedence.
- PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
- Arrays.sort(sorted, lexi);
-
- // loop through all categories of policies.
- for (PolicyList category : sorted) {
- PolicyInfo first = category.getAll().iterator().next();
- if (first != null) {
- Path[] srcPaths = first.getSrcPathExpanded(); // input src paths unglobbed
- if (srcPaths == null) {
- continue;
- }
-
- for (Path src: srcPaths) {
- if (inpath.toString().startsWith(src.toString())) {
- // if the srcpath is a prefix of the specified path
- // we have a match!
- return first;
- }
- }
- }
- }
- return null; // no matching policies
- }
-
-
static private Path getOriginalParityFile(Path destPathPrefix, Path srcPath) {
return new Path(destPathPrefix, makeRelative(srcPath));
}
- private static class ParityFilePair {
+ static class ParityFilePair {
private Path path;
private FileSystem fs;
@@ -506,11 +545,19 @@ public class RaidNode implements RaidPro
* @return Path object representing the parity file of the source
* @throws IOException
*/
- static private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath, Configuration conf) throws IOException {
+ static ParityFilePair getParityFile(Path destPathPrefix, Path srcPath, Configuration conf) throws IOException {
Path srcParent = srcPath.getParent();
FileSystem fsDest = destPathPrefix.getFileSystem(conf);
-
+ FileSystem fsSrc = srcPath.getFileSystem(conf);
+
+ FileStatus srcStatus = null;
+ try {
+ srcStatus = fsSrc.getFileStatus(srcPath);
+ } catch (java.io.FileNotFoundException e) {
+ return null;
+ }
+
Path outDir = destPathPrefix;
if (srcParent != null) {
if (srcParent.getParent() == null) {
@@ -520,36 +567,36 @@ public class RaidNode implements RaidPro
}
}
+
+ //CASE 1: CHECK HAR - Must be checked first because har is created after
+ // parity file and returning the parity file could result in error while
+ // reading it.
+ Path outPath = getOriginalParityFile(destPathPrefix, srcPath);
String harDirName = srcParent.getName() + HAR_SUFFIX;
Path HarPath = new Path(outDir,harDirName);
- Path outPath = getOriginalParityFile(destPathPrefix, srcPath);
-
- if (!fsDest.exists(HarPath)) { // case 1: no HAR file
- return new ParityFilePair(outPath,fsDest);
- }
-
- URI HarPathUri = HarPath.toUri();
- Path inHarPath = new Path("har://",HarPathUri.getPath()+"/"+outPath.toUri().getPath());
- FileSystem fsHar = new HarFileSystem(fsDest);
- fsHar.initialize(inHarPath.toUri(), conf);
-
- if (!fsHar.exists(inHarPath)) { // case 2: no file inside HAR
- return new ParityFilePair(outPath,fsDest);
- }
-
- if (! fsDest.exists(outPath)) { // case 3: only inside HAR
- return new ParityFilePair(inHarPath,fsHar);
+ if (fsDest.exists(HarPath)) {
+ URI HarPathUri = HarPath.toUri();
+ Path inHarPath = new Path("har://",HarPathUri.getPath()+"/"+outPath.toUri().getPath());
+ FileSystem fsHar = new HarFileSystem(fsDest);
+ fsHar.initialize(inHarPath.toUri(), conf);
+ if (fsHar.exists(inHarPath)) {
+ FileStatus inHar = fsHar.getFileStatus(inHarPath);
+ if (inHar.getModificationTime() == srcStatus.getModificationTime()) {
+ return new ParityFilePair(inHarPath,fsHar);
+ }
+ }
}
-
- // both inside and outside HAR. Should return most recent
- FileStatus inHar = fsHar.getFileStatus(inHarPath);
- FileStatus outHar = fsDest.getFileStatus(outPath);
-
- if (inHar.getModificationTime() >= outHar.getModificationTime()) {
- return new ParityFilePair(inHarPath,fsHar);
+
+ //CASE 2: CHECK PARITY
+ try {
+ FileStatus outHar = fsDest.getFileStatus(outPath);
+ if (outHar.getModificationTime() == srcStatus.getModificationTime()) {
+ return new ParityFilePair(outPath,fsDest);
+ }
+ } catch (java.io.FileNotFoundException e) {
}
- return new ParityFilePair(outPath,fsDest);
+ return null; // NULL if no parity file
}
private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath) throws IOException {
@@ -558,108 +605,6 @@ public class RaidNode implements RaidPro
}
- /**
- * Returns a list of pathnames that needs raiding.
- */
- List<FileStatus> selectFiles(Configuration conf, Path p, String destPrefix,
- long modTimePeriod, short srcReplication, long now) throws IOException {
-
- List<FileStatus> returnSet = new LinkedList<FileStatus>();
-
- // expand destination prefix path
- Path destp = new Path(destPrefix.trim());
- FileSystem fs = FileSystem.get(destp.toUri(), conf);
- destp = destp.makeQualified(fs);
-
- // Expand destination prefix path.
- String destpstr = destp.toString();
- if (!destpstr.endsWith(Path.SEPARATOR)) {
- destpstr += Path.SEPARATOR;
- }
-
- fs = p.getFileSystem(conf);
- FileStatus[] gpaths = fs.globStatus(p);
- if (gpaths != null) {
- for (FileStatus onepath: gpaths) {
- String pathstr = onepath.getPath().makeQualified(fs).toString();
- if (!pathstr.endsWith(Path.SEPARATOR)) {
- pathstr += Path.SEPARATOR;
- }
- if (pathstr.startsWith(destpstr) || destpstr.startsWith(pathstr)) {
- LOG.info("Skipping source " + pathstr +
- " because it conflicts with raid directory " + destpstr);
- } else {
- recurse(fs, conf, destp, onepath, returnSet, modTimePeriod, srcReplication, now);
- }
- }
- }
- return returnSet;
- }
-
- /**
- * Pick files that need to be RAIDed.
- */
- private void recurse(FileSystem srcFs,
- Configuration conf,
- Path destPathPrefix,
- FileStatus src,
- List<FileStatus> accept,
- long modTimePeriod,
- short srcReplication,
- long now) throws IOException {
- Path path = src.getPath();
- FileStatus[] files = null;
- try {
- files = srcFs.listStatus(path);
- } catch (java.io.FileNotFoundException e) {
- // ignore error because the file could have been deleted by an user
- LOG.info("FileNotFound " + path + " " + StringUtils.stringifyException(e));
- } catch (IOException e) {
- throw e;
- }
-
- // If the modTime of the raid file is later than the modtime of the
- // src file and the src file has not been modified
- // recently, then that file is a candidate for RAID.
-
- if (src.isFile()) {
-
- // if the source file has fewer than or equal to 2 blocks, then no need to RAID
- long blockSize = src.getBlockSize();
- if (2 * blockSize >= src.getLen()) {
- return;
- }
-
- // check if destination path already exists. If it does and it's modification time
- // does not match the modTime of the source file, then recalculate RAID
- boolean add = false;
- try {
- ParityFilePair ppair = getParityFile(destPathPrefix, path);
- Path outpath = ppair.getPath();
- FileSystem outFs = ppair.getFileSystem();
- FileStatus ostat = outFs.getFileStatus(outpath);
- if (ostat.getModificationTime() != src.getModificationTime() &&
- src.getModificationTime() + modTimePeriod < now) {
- add = true;
- }
- } catch (java.io.FileNotFoundException e) {
- add = true; // destination file does not exist
- }
-
- if (add) {
- accept.add(src);
- }
- return;
-
- } else if (files != null) {
- for (FileStatus one:files) {
- if (!one.getPath().getName().endsWith(HAR_SUFFIX)){
- recurse(srcFs, conf, destPathPrefix, one, accept, modTimePeriod, srcReplication, now);
- }
- }
- }
- }
-
/**
* RAID a list of files.
@@ -668,8 +613,8 @@ public class RaidNode implements RaidPro
throws IOException {
int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
- int stripeLength = getStripeLength(conf, info);
- String destPrefix = getDestinationPath(conf, info);
+ int stripeLength = getStripeLength(conf);
+ Path destPref = getDestinationPath(conf);
String simulate = info.getProperty("simulate");
boolean doSimulate = simulate == null ? false : Boolean
.parseBoolean(simulate);
@@ -677,13 +622,9 @@ public class RaidNode implements RaidPro
Statistics statistics = new Statistics();
int count = 0;
- Path p = new Path(destPrefix.trim());
- FileSystem fs = FileSystem.get(p.toUri(), conf);
- p = p.makeQualified(fs);
-
for (FileStatus s : paths) {
- doRaid(conf, s, p, statistics, null, doSimulate, targetRepl, metaRepl,
- stripeLength);
+ doRaid(conf, s, destPref, statistics, Reporter.NULL, doSimulate, targetRepl,
+ metaRepl, stripeLength);
if (count % 1000 == 0) {
LOG.info("RAID statistics " + statistics.toString());
}
@@ -701,23 +642,16 @@ public class RaidNode implements RaidPro
FileStatus src, Statistics statistics, Reporter reporter) throws IOException {
int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
- int stripeLength = getStripeLength(conf, info);
- String destPrefix = getDestinationPath(conf, info);
+ int stripeLength = getStripeLength(conf);
+ Path destPref = getDestinationPath(conf);
String simulate = info.getProperty("simulate");
boolean doSimulate = simulate == null ? false : Boolean
.parseBoolean(simulate);
- int count = 0;
-
- Path p = new Path(destPrefix.trim());
- FileSystem fs = FileSystem.get(p.toUri(), conf);
- p = p.makeQualified(fs);
-
- doRaid(conf, src, p, statistics, reporter, doSimulate, targetRepl, metaRepl,
- stripeLength);
+ doRaid(conf, src, destPref, statistics, reporter, doSimulate,
+ targetRepl, metaRepl, stripeLength);
}
-
-
+
/**
* RAID an individual file
*/
@@ -784,25 +718,11 @@ public class RaidNode implements RaidPro
Path destPathPrefix, BlockLocation[] locations,
int metaRepl, int stripeLength) throws IOException {
- // two buffers for generating parity
- Random rand = new Random();
- int bufSize = 5 * 1024 * 1024; // 5 MB
- byte[] bufs = new byte[bufSize];
- byte[] xor = new byte[bufSize];
-
Path inpath = stat.getPath();
- long blockSize = stat.getBlockSize();
- long fileSize = stat.getLen();
-
- // create output tmp path
Path outpath = getOriginalParityFile(destPathPrefix, inpath);
FileSystem outFs = outpath.getFileSystem(conf);
-
- Path tmppath = new Path(conf.get("fs.raid.tmpdir", "/tmp/raid") +
- outpath.toUri().getPath() + "." +
- rand.nextLong() + ".tmp");
- // if the parity file is already upto-date, then nothing to do
+ // If the parity file is already upto-date, then nothing to do
try {
FileStatus stmp = outFs.getFileStatus(outpath);
if (stmp.getModificationTime() == stat.getModificationTime()) {
@@ -812,66 +732,11 @@ public class RaidNode implements RaidPro
}
} catch (IOException e) {
// ignore errors because the raid file might not exist yet.
- }
-
- LOG.info("Parity file for " + inpath + "(" + locations.length + ") is " + outpath);
- FSDataOutputStream out = outFs.create(tmppath,
- true,
- conf.getInt("io.file.buffer.size", 64 * 1024),
- (short)metaRepl,
- blockSize);
-
- try {
-
- // loop once for every stripe length
- for (int startBlock = 0; startBlock < locations.length;) {
-
- // report progress to Map-reduce framework
- if (reporter != null) {
- reporter.progress();
- }
- int blocksLeft = locations.length - startBlock;
- int stripe = Math.min(stripeLength, blocksLeft);
- LOG.info(" startBlock " + startBlock + " stripe " + stripe);
-
- // open a new file descriptor for each block in this stripe.
- // make each fd point to the beginning of each block in this stripe.
- FSDataInputStream[] ins = new FSDataInputStream[stripe];
- for (int i = 0; i < stripe; i++) {
- ins[i] = inFs.open(inpath, bufSize);
- ins[i].seek(blockSize * (startBlock + i));
- }
-
- generateParity(ins,out,blockSize,bufs,xor, reporter);
-
- // close input file handles
- for (int i = 0; i < ins.length; i++) {
- ins[i].close();
- }
-
- // increment startBlock to point to the first block to be processed
- // in the next iteration
- startBlock += stripe;
- }
- out.close();
- out = null;
-
- // delete destination if exists
- if (outFs.exists(outpath)){
- outFs.delete(outpath, false);
- }
- // rename tmppath to the real parity filename
- outFs.mkdirs(outpath.getParent());
- if (!outFs.rename(tmppath, outpath)) {
- String msg = "Unable to rename tmp file " + tmppath + " to " + outpath;
- LOG.warn(msg);
- throw new IOException (msg);
- }
- } finally {
- // remove the tmp file if it still exists
- outFs.delete(tmppath, false);
}
+ XOREncoder encoder = new XOREncoder(conf, stripeLength);
+ encoder.encodeFile(inFs, inpath, outpath, (short)metaRepl, reporter);
+
// set the modification time of the RAID file. This is done so that the modTime of the
// RAID file reflects that contents of the source file that it has RAIDed. This should
// also work for files that are being appended to. This is necessary because the time on
@@ -880,255 +745,39 @@ public class RaidNode implements RaidPro
outFs.setTimes(outpath, stat.getModificationTime(), -1);
FileStatus outstat = outFs.getFileStatus(outpath);
- LOG.info("Source file " + inpath + " of size " + fileSize +
+ FileStatus inStat = inFs.getFileStatus(inpath);
+ LOG.info("Source file " + inpath + " of size " + inStat.getLen() +
" Parity file " + outpath + " of size " + outstat.getLen() +
" src mtime " + stat.getModificationTime() +
" parity mtime " + outstat.getModificationTime());
}
- private static int readInputUntilEnd(FSDataInputStream ins, byte[] bufs, int toRead)
- throws IOException {
-
- int tread = 0;
-
- while (tread < toRead) {
- int read = ins.read(bufs, tread, toRead - tread);
- if (read == -1) {
- return tread;
- } else {
- tread += read;
- }
- }
-
- return tread;
- }
-
- private static void generateParity(FSDataInputStream[] ins, FSDataOutputStream fout,
- long parityBlockSize, byte[] bufs, byte[] xor, Reporter reporter) throws IOException {
-
- int bufSize;
- if ((bufs == null) || (bufs.length == 0)){
- bufSize = 5 * 1024 * 1024; // 5 MB
- bufs = new byte[bufSize];
- } else {
- bufSize = bufs.length;
- }
- if ((xor == null) || (xor.length != bufs.length)){
- xor = new byte[bufSize];
- }
-
- int xorlen = 0;
-
- // this loop processes all good blocks in selected stripe
- long remaining = parityBlockSize;
-
- while (remaining > 0) {
- int toRead = (int)Math.min(remaining, bufSize);
-
- if (ins.length > 0) {
- xorlen = readInputUntilEnd(ins[0], xor, toRead);
- }
-
- // read all remaining blocks and xor them into the buffer
- for (int i = 1; i < ins.length; i++) {
-
- // report progress to Map-reduce framework
- if (reporter != null) {
- reporter.progress();
- }
-
- int actualRead = readInputUntilEnd(ins[i], bufs, toRead);
-
- int j;
- int xorlimit = (int) Math.min(xorlen,actualRead);
- for (j = 0; j < xorlimit; j++) {
- xor[j] ^= bufs[j];
- }
- if ( actualRead > xorlen ){
- for (; j < actualRead; j++) {
- xor[j] = bufs[j];
- }
- xorlen = actualRead;
- }
-
- }
-
- if (xorlen < toRead) {
- Arrays.fill(bufs, xorlen, toRead, (byte) 0);
- }
-
- // write this to the tmp file
- fout.write(xor, 0, toRead);
- remaining -= toRead;
- }
-
- }
-
/**
- * Extract a good block from the parity block. This assumes that the corruption
- * is in the main file and the parity file is always good.
+ * Extract a good block from the parity block. This assumes that the
+ * corruption is in the main file and the parity file is always good.
*/
- public static Path unRaid(Configuration conf, Path srcPath, Path destPathPrefix,
- int stripeLength, long corruptOffset) throws IOException {
-
- // extract block locations, size etc from source file
- Random rand = new Random();
- FileSystem srcFs = srcPath.getFileSystem(conf);
- FileStatus srcStat = srcFs.getFileStatus(srcPath);
- long blockSize = srcStat.getBlockSize();
- long fileSize = srcStat.getLen();
-
- // find the stripe number where the corrupted offset lies
- long snum = corruptOffset / (stripeLength * blockSize);
- long startOffset = snum * stripeLength * blockSize;
- long corruptBlockInStripe = (corruptOffset - startOffset)/blockSize;
- long corruptBlockSize = Math.min(blockSize, fileSize - startOffset);
-
- LOG.info("Start offset of relevent stripe = " + startOffset +
- " corruptBlockInStripe " + corruptBlockInStripe);
-
- // open file descriptors to read all good blocks of the file
- FSDataInputStream[] instmp = new FSDataInputStream[stripeLength];
- int numLength = 0;
- for (int i = 0; i < stripeLength; i++) {
- if (i == corruptBlockInStripe) {
- continue; // do not open corrupt block
- }
- if (startOffset + i * blockSize >= fileSize) {
- LOG.info("Stop offset of relevent stripe = " +
- startOffset + i * blockSize);
- break;
- }
- instmp[numLength] = srcFs.open(srcPath);
- instmp[numLength].seek(startOffset + i * blockSize);
- numLength++;
- }
-
- // create array of inputstream, allocate one extra slot for
- // parity file. numLength could be smaller than stripeLength
- // if we are processing the last partial stripe on a file.
- numLength += 1;
- FSDataInputStream[] ins = new FSDataInputStream[numLength];
- for (int i = 0; i < numLength-1; i++) {
- ins[i] = instmp[i];
- }
- LOG.info("Decompose a total of " + numLength + " blocks.");
-
- // open and seek to the appropriate offset in parity file.
- ParityFilePair ppair = getParityFile(destPathPrefix, srcPath, conf);
- Path parityFile = ppair.getPath();
- FileSystem parityFs = ppair.getFileSystem();
- LOG.info("Parity file for " + srcPath + " is " + parityFile);
- ins[numLength-1] = parityFs.open(parityFile);
- ins[numLength-1].seek(snum * blockSize);
- LOG.info("Parity file " + parityFile +
- " seeking to relevent block at offset " +
- ins[numLength-1].getPos());
-
- // create a temporary filename in the source filesystem
- // do not overwrite an existing tmp file. Make it fail for now.
- // We need to generate a unique name for this tmp file later on.
- Path tmpFile = null;
- FSDataOutputStream fout = null;
- FileSystem destFs = destPathPrefix.getFileSystem(conf);
- int retry = 5;
- try {
- tmpFile = new Path(conf.get("fs.raid.tmpdir", "/tmp/raid") + "/" +
- rand.nextInt());
- fout = destFs.create(tmpFile, false);
- } catch (IOException e) {
- if (retry-- <= 0) {
- LOG.info("Unable to create temporary file " + tmpFile +
- " Aborting....");
- throw e;
- }
- LOG.info("Unable to create temporary file " + tmpFile +
- "Retrying....");
- }
- LOG.info("Created recovered block file " + tmpFile);
-
- // buffers for generating parity bits
- int bufSize = 5 * 1024 * 1024; // 5 MB
- byte[] bufs = new byte[bufSize];
- byte[] xor = new byte[bufSize];
-
- generateParity(ins,fout,corruptBlockSize,bufs,xor,null);
-
- // close all files
- fout.close();
- for (int i = 0; i < ins.length; i++) {
- ins[i].close();
+ public static Path unRaid(Configuration conf, Path srcPath,
+ Path destPathPrefix, Decoder decoder, int stripeLength,
+ long corruptOffset) throws IOException {
+
+ // Test if parity file exists
+ ParityFilePair ppair = getParityFile(destPathPrefix, srcPath, conf);
+ if (ppair == null) {
+ return null;
}
- // Now, reopen the source file and the recovered block file
- // and copy all relevant data to new file
final Path recoveryDestination =
new Path(conf.get("fs.raid.tmpdir", "/tmp/raid"));
+ FileSystem destFs = recoveryDestination.getFileSystem(conf);
final Path recoveredPrefix =
destFs.makeQualified(new Path(recoveryDestination, makeRelative(srcPath)));
final Path recoveredPath =
- new Path(recoveredPrefix + "." + rand.nextLong() + ".recovered");
+ new Path(recoveredPrefix + "." + new Random().nextLong() + ".recovered");
LOG.info("Creating recovered file " + recoveredPath);
- FSDataInputStream sin = srcFs.open(srcPath);
- FSDataOutputStream out = destFs.create(recoveredPath, false,
- conf.getInt("io.file.buffer.size", 64 * 1024),
- srcStat.getReplication(),
- srcStat.getBlockSize());
-
- FSDataInputStream bin = destFs.open(tmpFile);
- long recoveredSize = 0;
-
- // copy all the good blocks (upto the corruption)
- // from source file to output file
- long remaining = corruptOffset / blockSize * blockSize;
- while (remaining > 0) {
- int toRead = (int)Math.min(remaining, bufSize);
- sin.readFully(bufs, 0, toRead);
- out.write(bufs, 0, toRead);
- remaining -= toRead;
- recoveredSize += toRead;
- }
- LOG.info("Copied upto " + recoveredSize + " from src file. ");
-
- // copy recovered block to output file
- remaining = corruptBlockSize;
- while (recoveredSize < fileSize &&
- remaining > 0) {
- int toRead = (int)Math.min(remaining, bufSize);
- bin.readFully(bufs, 0, toRead);
- out.write(bufs, 0, toRead);
- remaining -= toRead;
- recoveredSize += toRead;
- }
- LOG.info("Copied upto " + recoveredSize + " from recovered-block file. ");
-
- // skip bad block in src file
- if (recoveredSize < fileSize) {
- sin.seek(sin.getPos() + corruptBlockSize);
- }
-
- // copy remaining good data from src file to output file
- while (recoveredSize < fileSize) {
- int toRead = (int)Math.min(fileSize - recoveredSize, bufSize);
- sin.readFully(bufs, 0, toRead);
- out.write(bufs, 0, toRead);
- recoveredSize += toRead;
- }
- out.close();
- LOG.info("Completed writing " + recoveredSize + " bytes into " +
- recoveredPath);
-
- sin.close();
- bin.close();
-
- // delete the temporary block file that was created.
- destFs.delete(tmpFile, false);
- LOG.info("Deleted temporary file " + tmpFile);
-
- // copy the meta information from source path to the newly created
- // recovered path
- copyMetaInformation(destFs, srcStat, recoveredPath);
+ FileSystem srcFs = srcPath.getFileSystem(conf);
+ decoder.decodeFile(srcFs, srcPath, ppair.getFileSystem(),
+ ppair.getPath(), corruptOffset, recoveredPath);
return recoveredPath;
}
@@ -1179,35 +828,22 @@ public class RaidNode implements RaidPro
PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
Arrays.sort(sorted, lexi);
- // paths we have processed so far
- Set<Path> processed = new HashSet<Path>();
-
for (PolicyList category : sorted) {
for (PolicyInfo info: category.getAll()) {
try {
// expand destination prefix path
- String destinationPrefix = getDestinationPath(conf, info);
- Path destPref = new Path(destinationPrefix.trim());
- FileSystem destFs = FileSystem.get(destPref.toUri(), conf);
- destPref = destFs.makeQualified(destPref);
+ Path destPref = getDestinationPath(conf);
+ FileSystem destFs = destPref.getFileSystem(conf);
//get srcPaths
Path[] srcPaths = info.getSrcPathExpanded();
- if ( srcPaths != null ){
+ if (srcPaths != null) {
for (Path srcPath: srcPaths) {
// expand destination prefix
Path destPath = getOriginalParityFile(destPref, srcPath);
- // if this destination path has already been processed as part
- // of another policy, then nothing more to do
- if (processed.contains(destPath)) {
- LOG.info("Obsolete parity files for policy " +
- info.getName() + " has already been procesed.");
- continue;
- }
-
FileSystem srcFs = info.getSrcPath().getFileSystem(conf);
FileStatus stat = null;
try {
@@ -1221,12 +857,8 @@ public class RaidNode implements RaidPro
recursePurge(srcFs, destFs, destPref.toUri().getPath(), stat);
}
- // this destination path has already been processed
- processed.add(destPath);
-
}
}
-
} catch (Exception e) {
LOG.warn("Ignoring Exception while processing policy " +
info.getName() + " " +
@@ -1342,10 +974,8 @@ public class RaidNode implements RaidPro
try {
long cutoff = now() - ( Long.parseLong(str) * 24L * 3600000L );
- String destinationPrefix = getDestinationPath(conf, info);
- Path destPref = new Path(destinationPrefix.trim());
+ Path destPref = getDestinationPath(conf);
FileSystem destFs = destPref.getFileSystem(conf);
- destPref = destFs.makeQualified(destPref);
//get srcPaths
Path[] srcPaths = info.getSrcPathExpanded();
@@ -1407,7 +1037,11 @@ public class RaidNode implements RaidPro
recurseHar(info, destFs, one, destPrefix, srcFs, cutoff, tmpHarPath);
shouldHar = false;
} else if (one.getModificationTime() > cutoff ) {
- shouldHar = false;
+ if (shouldHar) {
+ LOG.info("Cannot archive " + destPath +
+ " because " + one.getPath() + " was modified after cutoff");
+ shouldHar = false;
+ }
}
}
@@ -1433,6 +1067,7 @@ public class RaidNode implements RaidPro
}
if ( shouldHar ) {
+ LOG.info("Archiving " + dest.getPath() + " to " + tmpHarPath );
singleHar(destFs, dest, tmpHarPath);
}
}
@@ -1493,56 +1128,25 @@ public class RaidNode implements RaidPro
LOG.info("Leaving Har thread.");
}
-
- }
-
- /**
- * If the config file has an entry for hdfs.raid.locations, then that overrides
- * destination path specified in the raid policy file
- */
- static private String getDestinationPath(Configuration conf, PolicyInfo info) {
- String locs = conf.get("hdfs.raid.locations");
- if (locs != null) {
- return locs;
- }
- locs = info.getDestinationPath();
- if (locs == null) {
- return DEFAULT_RAID_LOCATION;
- }
- return locs;
}
/**
- * If the config file has an entry for hdfs.raid.stripeLength, then use that
- * specified in the raid policy file
+ * Return the path prefix that stores the parity files
*/
- static private int getStripeLength(Configuration conf, PolicyInfo info)
- throws IOException {
- int len = conf.getInt("hdfs.raid.stripeLength", 0);
- if (len != 0) {
- return len;
- }
- String str = info.getProperty("stripeLength");
- if (str == null) {
- String msg = "hdfs.raid.stripeLength is not defined." +
- " Using a default " + DEFAULT_STRIPE_LENGTH;
- LOG.info(msg);
- return DEFAULT_STRIPE_LENGTH;
- }
- return Integer.parseInt(str);
+ static Path getDestinationPath(Configuration conf)
+ throws IOException {
+ String loc = conf.get(RAID_LOCATION_KEY, DEFAULT_RAID_LOCATION);
+ Path p = new Path(loc.trim());
+ FileSystem fs = FileSystem.get(p.toUri(), conf);
+ p = p.makeQualified(fs);
+ return p;
}
/**
- * Copy the file owner, modtime, etc from srcPath to the recovered Path.
- * It is possiible that we might have to retrieve file persmissions,
- * quotas, etc too in future.
+ * Obtain stripe length from configuration
*/
- static private void copyMetaInformation(FileSystem fs, FileStatus stat,
- Path recoveredPath)
- throws IOException {
- fs.setOwner(recoveredPath, stat.getOwner(), stat.getGroup());
- fs.setPermission(recoveredPath, stat.getPermission());
- fs.setTimes(recoveredPath, stat.getModificationTime(), stat.getAccessTime());
+ public static int getStripeLength(Configuration conf) {
+ return conf.getInt(STRIPE_LENGTH_KEY, DEFAULT_STRIPE_LENGTH);
}
/**
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java?rev=1021873&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java Tue Oct 12 18:23:36 2010
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.io.IOUtils;
+
+public class RaidUtils {
+ public static void readTillEnd(InputStream in, byte[] buf, boolean eofOK)
+ throws IOException {
+ int toRead = buf.length;
+ int numRead = 0;
+ while (numRead < toRead) {
+ int nread = in.read(buf, numRead, toRead - numRead);
+ if (nread < 0) {
+ if (eofOK) {
+ // EOF hit, fill with zeros
+ Arrays.fill(buf, numRead, toRead, (byte)0);
+ numRead = toRead;
+ } else {
+ // EOF hit, throw.
+ throw new IOException("Premature EOF");
+ }
+ } else {
+ numRead += nread;
+ }
+ }
+ }
+
+ public static void copyBytes(
+ InputStream in, OutputStream out, byte[] buf, long count)
+ throws IOException {
+ for (long bytesRead = 0; bytesRead < count; ) {
+ int toRead = Math.min(buf.length, (int)(count - bytesRead));
+ IOUtils.readFully(in, buf, 0, toRead);
+ bytesRead += toRead;
+ out.write(buf, 0, toRead);
+ }
+ }
+
+ public static class ZeroInputStream extends InputStream
+ implements Seekable, PositionedReadable {
+ private long endOffset;
+ private long pos;
+
+ public ZeroInputStream(long endOffset) {
+ this.endOffset = endOffset;
+ this.pos = 0;
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (pos < endOffset) {
+ pos++;
+ return 0;
+ }
+ return -1;
+ }
+
+ @Override
+ public int available() throws IOException {
+ return (int)(endOffset - pos);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return pos;
+ }
+
+ @Override
+ public void seek(long seekOffset) throws IOException {
+ if (seekOffset < endOffset) {
+ pos = seekOffset;
+ } else {
+ throw new IOException("Illegal Offset" + pos);
+ }
+ }
+
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+
+ @Override
+ public int read(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ int count = 0;
+ for (; position < endOffset && count < length; position++) {
+ buffer[offset + count] = 0;
+ count++;
+ }
+ return count;
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer, int offset, int length)
+ throws IOException {
+ int count = 0;
+ for (; position < endOffset && count < length; position++) {
+ buffer[offset + count] = 0;
+ count++;
+ }
+ if (count < length) {
+ throw new IOException("Premature EOF");
+ }
+ }
+
+ @Override
+ public void readFully(long position, byte[] buffer) throws IOException {
+ readFully(position, buffer, 0, buffer.length);
+ }
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java?rev=1021873&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java Tue Oct 12 18:23:36 2010
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+
+public class XORDecoder extends Decoder {
+ public static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.XORDecoder");
+
+ public XORDecoder(
+ Configuration conf, int stripeSize) {
+ super(conf, stripeSize, 1);
+ }
+
+ @Override
+ protected void fixErasedBlock(
+ FileSystem fs, Path srcFile, FileSystem parityFs, Path parityFile,
+ long blockSize, long errorOffset, long bytesToSkip, long limit,
+ OutputStream out) throws IOException {
+ LOG.info("Fixing block at " + srcFile + ":" + errorOffset +
+ ", skipping " + bytesToSkip + ", limit " + limit);
+ FileStatus srcStat = fs.getFileStatus(srcFile);
+ ArrayList<FSDataInputStream> xorinputs = new ArrayList<FSDataInputStream>();
+
+ FSDataInputStream parityFileIn = parityFs.open(parityFile);
+ parityFileIn.seek(parityOffset(errorOffset, blockSize));
+ xorinputs.add(parityFileIn);
+
+ long errorBlockOffset = (errorOffset / blockSize) * blockSize;
+ long[] srcOffsets = stripeOffsets(errorOffset, blockSize);
+ for (int i = 0; i < srcOffsets.length; i++) {
+ if (srcOffsets[i] == errorBlockOffset) {
+ LOG.info("Skipping block at " + srcFile + ":" + errorBlockOffset);
+ continue;
+ }
+ if (srcOffsets[i] < srcStat.getLen()) {
+ FSDataInputStream in = fs.open(srcFile);
+ in.seek(srcOffsets[i]);
+ xorinputs.add(in);
+ }
+ }
+ FSDataInputStream[] inputs = xorinputs.toArray(
+ new FSDataInputStream[]{null});
+ ParityInputStream recovered =
+ new ParityInputStream(inputs, limit, readBufs[0], writeBufs[0]);
+ recovered.skip(bytesToSkip);
+ recovered.drain(out, null);
+ }
+
+ protected long[] stripeOffsets(long errorOffset, long blockSize) {
+ long[] offsets = new long[stripeSize];
+ long stripeIdx = errorOffset / (blockSize * stripeSize);
+ long startOffsetOfStripe = stripeIdx * stripeSize * blockSize;
+ for (int i = 0; i < stripeSize; i++) {
+ offsets[i] = startOffsetOfStripe + i * blockSize;
+ }
+ return offsets;
+ }
+
+ protected long parityOffset(long errorOffset, long blockSize) {
+ long stripeIdx = errorOffset / (blockSize * stripeSize);
+ return stripeIdx * blockSize;
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java?rev=1021873&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java Tue Oct 12 18:23:36 2010
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.OutputStream;
+import java.io.InputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+public class XOREncoder extends Encoder {
+ public static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.XOREncoder");
+ public XOREncoder(
+ Configuration conf, int stripeSize) {
+ super(conf, stripeSize, 1);
+ }
+
+ @Override
+ protected void encodeStripe(
+ InputStream[] blocks,
+ long stripeStartOffset,
+ long blockSize,
+ OutputStream[] outs,
+ Progressable reporter) throws IOException {
+ LOG.info("Peforming XOR ");
+ ParityInputStream parityIn =
+ new ParityInputStream(blocks, blockSize, readBufs[0], writeBufs[0]);
+ try {
+ parityIn.drain(outs[0], reporter);
+ } finally {
+ parityIn.close();
+ }
+ }
+}
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java?rev=1021873&r1=1021872&r2=1021873&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java Tue Oct 12 18:23:36 2010
@@ -47,11 +47,14 @@ import org.apache.hadoop.fs.FSDataOutput
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.raid.protocol.PolicyInfo;
public class TestRaidDfs extends TestCase {
final static String TEST_DIR = new File(System.getProperty("test.build.data",
@@ -59,8 +62,7 @@ public class TestRaidDfs extends TestCas
final static String CONFIG_FILE = new File(TEST_DIR,
"test-raid.xml").getAbsolutePath();
final static long RELOAD_INTERVAL = 1000;
- final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidNode");
- final Random rand = new Random();
+ final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidDfs");
final static int NUM_DATANODES = 3;
Configuration conf;
@@ -83,6 +85,9 @@ public class TestRaidDfs extends TestCas
// scan all policies once every 5 second
conf.setLong("raid.policy.rescan.interval", 5000);
+ // make all deletions not go through Trash
+ conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
+
// do not use map-reduce cluster for Raiding
conf.setBoolean("fs.raidnode.local", true);
conf.set("raid.server.address", "localhost:0");
@@ -133,80 +138,148 @@ public class TestRaidDfs extends TestCas
if (cnode != null) { cnode.stop(); cnode.join(); }
if (dfs != null) { dfs.shutdown(); }
}
+
+ private LocatedBlocks getBlockLocations(Path file, long length)
+ throws IOException {
+ DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
+ return dfs.getClient().namenode.getBlockLocations(file.toString(), 0, length);
+ }
- /**
- * Test DFS Raid
- */
- public void testRaidDfs() throws Exception {
- LOG.info("Test testRaidDfs started.");
- long blockSize = 8192L;
- int stripeLength = 3;
- mySetup();
- Path file1 = new Path("/user/dhruba/raidtest/file1");
- Path destPath = new Path("/destraid/user/dhruba/raidtest");
- long crc1 = createOldFile(fileSys, file1, 1, 7, blockSize);
- LOG.info("Test testPathFilter created test files");
+ private LocatedBlocks getBlockLocations(Path file)
+ throws IOException {
+ FileStatus stat = fileSys.getFileStatus(file);
+ return getBlockLocations(file, stat.getLen());
+ }
- // create an instance of the RaidNode
- cnode = RaidNode.createRaidNode(null, conf);
-
- try {
- FileStatus[] listPaths = null;
+ private DistributedRaidFileSystem getRaidFS() throws IOException {
+ DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
+ Configuration clientConf = new Configuration(conf);
+ clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
+ clientConf.set("fs.raid.underlyingfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
+ clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
+ URI dfsUri = dfs.getUri();
+ return (DistributedRaidFileSystem)FileSystem.get(dfsUri, clientConf);
+ }
- // wait till file is raided
- while (listPaths == null || listPaths.length != 1) {
- LOG.info("Test testPathFilter waiting for files to be raided.");
- try {
- listPaths = fileSys.listStatus(destPath);
- } catch (FileNotFoundException e) {
- //ignore
+ public static void waitForFileRaided(
+ Log logger, FileSystem fileSys, Path file, Path destPath)
+ throws IOException, InterruptedException {
+ FileStatus parityStat = null;
+ String fileName = file.getName().toString();
+ // wait till file is raided
+ while (parityStat == null) {
+ logger.info("Waiting for files to be raided.");
+ try {
+ FileStatus[] listPaths = fileSys.listStatus(destPath);
+ if (listPaths != null) {
+ for (FileStatus f : listPaths) {
+ logger.info("File raided so far : " + f.getPath());
+ String found = f.getPath().getName().toString();
+ if (fileName.equals(found)) {
+ parityStat = f;
+ break;
+ }
+ }
}
- Thread.sleep(1000); // keep waiting
+ } catch (FileNotFoundException e) {
+ //ignore
}
- assertEquals(listPaths.length, 1); // all files raided
- LOG.info("Files raided so far : " + listPaths[0].getPath());
+ Thread.sleep(1000); // keep waiting
+ }
- // extract block locations from File system. Wait till file is closed.
+ while (true) {
LocatedBlocks locations = null;
DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
- while (true) {
- locations = dfs.getClient().getNamenode().getBlockLocations(file1.toString(),
- 0, listPaths[0].getLen());
- if (!locations.isUnderConstruction()) {
- break;
- }
- Thread.sleep(1000);
+ locations = dfs.getClient().namenode.getBlockLocations(
+ file.toString(), 0, parityStat.getLen());
+ if (!locations.isUnderConstruction()) {
+ break;
}
+ Thread.sleep(1000);
+ }
- // filter all filesystem calls from client
- Configuration clientConf = new Configuration(conf);
- clientConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
- clientConf.set("fs.raid.underlyingfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
- URI dfsUri = dfs.getUri();
- FileSystem.closeAll();
- FileSystem raidfs = FileSystem.get(dfsUri, clientConf);
-
- assertTrue("raidfs not an instance of DistributedRaidFileSystem",raidfs instanceof DistributedRaidFileSystem);
-
- LOG.info("Corrupt first block of file");
- corruptBlock(file1, locations.get(0).getBlock(), NUM_DATANODES, false);
- validateFile(raidfs, file1, file1, crc1);
+ while (true) {
+ FileStatus stat = fileSys.getFileStatus(file);
+ if (stat.getReplication() == 1) break;
+ Thread.sleep(1000);
+ }
+ }
+
+ private void corruptBlockAndValidate(Path srcFile, Path destPath,
+ int[] listBlockNumToCorrupt, long blockSize, int numBlocks)
+ throws IOException, InterruptedException {
+ int repl = 1;
+ long crc = createTestFilePartialLastBlock(fileSys, srcFile, repl,
+ numBlocks, blockSize);
+ long length = fileSys.getFileStatus(srcFile).getLen();
+
+ waitForFileRaided(LOG, fileSys, srcFile, destPath);
+
+ // Delete first block of file
+ for (int blockNumToCorrupt : listBlockNumToCorrupt) {
+ LOG.info("Corrupt block " + blockNumToCorrupt + " of file " + srcFile);
+ LocatedBlocks locations = getBlockLocations(srcFile);
+ corruptBlock(srcFile, locations.get(blockNumToCorrupt).getBlock(),
+ NUM_DATANODES, true);
+ }
+
+ // Validate
+ DistributedRaidFileSystem raidfs = getRaidFS();
+ assertTrue(validateFile(raidfs, srcFile, length, crc));
+ }
+
+ /**
+ * Create a file, corrupt a block in it and ensure that the file can be
+ * read through DistributedRaidFileSystem.
+ */
+ public void testRaidDfs() throws Exception {
+ LOG.info("Test testRaidDfs started.");
+
+ long blockSize = 8192L;
+ int numBlocks = 8;
+ int repl = 1;
+ mySetup();
+
+ // Create an instance of the RaidNode
+ Configuration localConf = new Configuration(conf);
+ localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+ cnode = RaidNode.createRaidNode(null, localConf);
+
+ Path file = new Path("/user/dhruba/raidtest/file");
+ Path destPath = new Path("/destraid/user/dhruba/raidtest");
+ int[][] corrupt = {{0}, {4}, {7}}; // first, last and middle block
+ try {
+ long crc = createTestFilePartialLastBlock(fileSys, file, repl,
+ numBlocks, blockSize);
+ long length = fileSys.getFileStatus(file).getLen();
+ waitForFileRaided(LOG, fileSys, file, destPath);
+ LocatedBlocks locations = getBlockLocations(file);
+
+ for (int i = 0; i < corrupt.length; i++) {
+ int blockNumToCorrupt = corrupt[i][0];
+ LOG.info("Corrupt block " + blockNumToCorrupt + " of file");
+ corruptBlock(file, locations.get(blockNumToCorrupt).getBlock(),
+ NUM_DATANODES, false);
+ validateFile(getRaidFS(), file, length, crc);
+ }
// Corrupt one more block. This is expected to fail.
- LOG.info("Corrupt second block of file");
- corruptBlock(file1, locations.get(1).getBlock(), NUM_DATANODES, false);
+ LOG.info("Corrupt one more block of file");
+ corruptBlock(file, locations.get(1).getBlock(), NUM_DATANODES, false);
try {
- validateFile(raidfs, file1, file1, crc1);
+ validateFile(getRaidFS(), file, length, crc);
fail("Expected exception ChecksumException not thrown!");
} catch (org.apache.hadoop.fs.ChecksumException e) {
}
} catch (Exception e) {
- LOG.info("testPathFilter Exception " + e + StringUtils.stringifyException(e));
+ LOG.info("testRaidDfs Exception " + e +
+ StringUtils.stringifyException(e));
throw e;
} finally {
+ if (cnode != null) { cnode.stop(); cnode.join(); }
myTearDown();
}
- LOG.info("Test testPathFilter completed.");
+ LOG.info("Test testRaidDfs completed.");
}
/**
@@ -217,7 +290,7 @@ public class TestRaidDfs extends TestCas
try {
Path file = new Path("/user/raid/raidtest/file1");
- createOldFile(fileSys, file, 1, 7, 8192L);
+ createTestFile(fileSys, file, 1, 7, 8192L);
// filter all filesystem calls from client
Configuration clientConf = new Configuration(conf);
@@ -242,13 +315,15 @@ public class TestRaidDfs extends TestCas
myTearDown();
}
}
-
+
//
// creates a file and populate it with random data. Returns its crc.
//
- private long createOldFile(FileSystem fileSys, Path name, int repl, int numBlocks, long blocksize)
+ public static long createTestFile(FileSystem fileSys, Path name, int repl,
+ int numBlocks, long blocksize)
throws IOException {
CRC32 crc = new CRC32();
+ Random rand = new Random();
FSDataOutputStream stm = fileSys.create(name, true,
fileSys.getConf().getInt("io.file.buffer.size", 4096),
(short)repl, blocksize);
@@ -264,19 +339,43 @@ public class TestRaidDfs extends TestCas
}
//
- // validates that file matches the crc.
+ // Creates a file with partially full last block. Populate it with random
+ // data. Returns its crc.
//
- private void validateFile(FileSystem fileSys, Path name1, Path name2, long crc)
+ public static long createTestFilePartialLastBlock(
+ FileSystem fileSys, Path name, int repl, int numBlocks, long blocksize)
throws IOException {
+ CRC32 crc = new CRC32();
+ Random rand = new Random();
+ FSDataOutputStream stm = fileSys.create(name, true,
+ fileSys.getConf().getInt("io.file.buffer.size", 4096),
+ (short)repl, blocksize);
+ // Write whole blocks.
+ byte[] b = new byte[(int)blocksize];
+ for (int i = 1; i < numBlocks; i++) {
+ rand.nextBytes(b);
+ stm.write(b);
+ crc.update(b);
+ }
+ // Write partial block.
+ b = new byte[(int)blocksize/2 - 1];
+ rand.nextBytes(b);
+ stm.write(b);
+ crc.update(b);
- FileStatus stat1 = fileSys.getFileStatus(name1);
- FileStatus stat2 = fileSys.getFileStatus(name2);
- assertTrue(" Length of file " + name1 + " is " + stat1.getLen() +
- " is different from length of file " + name1 + " " + stat2.getLen(),
- stat1.getLen() == stat2.getLen());
+ stm.close();
+ return crc.getValue();
+ }
+ //
+ // validates that file matches the crc.
+ //
+ public static boolean validateFile(FileSystem fileSys, Path name, long length,
+ long crc)
+ throws IOException {
+ long numRead = 0;
CRC32 newcrc = new CRC32();
- FSDataInputStream stm = fileSys.open(name2);
+ FSDataInputStream stm = fileSys.open(name);
final byte[] b = new byte[4192];
int num = 0;
while (num >= 0) {
@@ -284,19 +383,28 @@ public class TestRaidDfs extends TestCas
if (num < 0) {
break;
}
+ numRead += num;
newcrc.update(b, 0, num);
}
stm.close();
+
+ if (numRead != length) {
+ LOG.info("Number of bytes read " + numRead +
+ " does not match file size " + length);
+ return false;
+ }
+
LOG.info(" Newcrc " + newcrc.getValue() + " old crc " + crc);
if (newcrc.getValue() != crc) {
- fail("CRC mismatch of files " + name1 + " with file " + name2);
+ LOG.info("CRC mismatch of file " + name + ": " + newcrc + " vs. " + crc);
}
+ return true;
}
/*
* The Data directories for a datanode
*/
- static private File[] getDataNodeDirs(int i) throws IOException {
+ private static File[] getDataNodeDirs(int i) throws IOException {
File base_dir = new File(System.getProperty("test.build.data"), "dfs/");
File data_dir = new File(base_dir, "data");
File dir1 = new File(data_dir, "data"+(2*i+1));
@@ -353,10 +461,32 @@ public class TestRaidDfs extends TestCas
(numCorrupted + numDeleted) > 0);
}
- //
- // Corrupt specified block of file
- //
- void corruptBlock(Path file, Block blockNum) throws IOException {
- corruptBlock(file, blockNum, NUM_DATANODES, true);
+ public static void corruptBlock(Path file, Block blockNum,
+ int numDataNodes, long offset) throws IOException {
+ long id = blockNum.getBlockId();
+
+ // Now deliberately remove/truncate data blocks from the block.
+ //
+ for (int i = 0; i < numDataNodes; i++) {
+ File[] dirs = getDataNodeDirs(i);
+
+ for (int j = 0; j < dirs.length; j++) {
+ File[] blocks = dirs[j].listFiles();
+ assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length >= 0));
+ for (int idx = 0; idx < blocks.length; idx++) {
+ if (blocks[idx].getName().startsWith("blk_" + id) &&
+ !blocks[idx].getName().endsWith(".meta")) {
+ // Corrupt
+ File f = blocks[idx];
+ RandomAccessFile raf = new RandomAccessFile(f, "rw");
+ raf.seek(offset);
+ int data = raf.readInt();
+ raf.seek(offset);
+ raf.writeInt(data+1);
+ LOG.info("Corrupted block " + blocks[idx]);
+ }
+ }
+ }
+ }
}
}
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java?rev=1021873&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java Tue Oct 12 18:23:36 2010
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+
+public class TestDirectoryTraversal extends TestCase {
+ final static Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.TestDirectoryTraversal");
+ final static String TEST_DIR = new File(System.getProperty("test.build.data",
+ "build/contrib/raid/test/data")).getAbsolutePath();
+
+ MiniDFSCluster dfs = null;
+ FileSystem fs = null;
+ Configuration conf = null;
+
+ /**
+ * Test basic enumeration.
+ */
+ public void testEnumeration() throws IOException {
+ mySetup();
+
+ try {
+ Path topDir = new Path(TEST_DIR + "/testenumeration");
+
+ createTestTree(topDir);
+
+ LOG.info("Enumerating files");
+ List<FileStatus> startPaths = new LinkedList<FileStatus>();
+ startPaths.add(fs.getFileStatus(topDir));
+ DirectoryTraversal dt = new DirectoryTraversal(fs, startPaths);
+
+ List<FileStatus> selected = new LinkedList<FileStatus>();
+ while (true) {
+ FileStatus f = dt.getNextFile();
+ if (f == null) break;
+ assertEquals(false, f.isDir());
+ LOG.info(f.getPath());
+ selected.add(f);
+ }
+ assertEquals(5, selected.size());
+
+ LOG.info("Enumerating directories");
+ startPaths.clear();
+ startPaths.add(fs.getFileStatus(topDir));
+ dt = new DirectoryTraversal(fs, startPaths);
+ selected.clear();
+ while (true) {
+ FileStatus dir = dt.getNextDirectory();
+ if (dir == null) break;
+ assertEquals(true, dir.isDir());
+ LOG.info(dir.getPath());
+ selected.add(dir);
+ }
+ assertEquals(4, selected.size());
+ } finally {
+ myTearDown();
+ }
+ }
+
+ public void testSuspension() throws IOException {
+ mySetup();
+
+ try {
+ Path topDir = new Path(TEST_DIR + "/testenumeration");
+
+ createTestTree(topDir);
+
+ String top = topDir.toString();
+ List<FileStatus> startPaths = new LinkedList<FileStatus>();
+ startPaths.add(fs.getFileStatus(new Path(top + "/a")));
+ startPaths.add(fs.getFileStatus(new Path(top + "/b")));
+ DirectoryTraversal dt = new DirectoryTraversal(fs, startPaths);
+
+ int limit = 2;
+ short targetRepl = 1;
+ Path raid = new Path("/raid");
+ List<FileStatus> selected = dt.selectFilesToRaid(conf, targetRepl, raid,
+ 0, limit);
+ for (FileStatus f: selected) {
+ LOG.info(f.getPath());
+ }
+ assertEquals(limit, selected.size());
+
+ selected = dt.selectFilesToRaid(conf, targetRepl, raid, 0, limit);
+ for (FileStatus f: selected) {
+ LOG.info(f.getPath());
+ }
+ assertEquals(limit, selected.size());
+ } finally {
+ myTearDown();
+ }
+ }
+
+ /**
+ * Creates a test directory tree.
+ * top
+ * / | \
+ * / | f5
+ * a b___
+ * / \ |\ \
+ * f1 f2 f3f4 c
+ */
+ private void createTestTree(Path topDir) throws IOException {
+ String top = topDir.toString();
+ fs.delete(topDir, true);
+
+ fs.mkdirs(topDir);
+ fs.create(new Path(top + "/f5")).close();
+
+ fs.mkdirs(new Path(top + "/a"));
+ createTestFile(new Path(top + "/a/f1"));
+ createTestFile(new Path(top + "/a/f2"));
+
+ fs.mkdirs(new Path(top + "/b"));
+ fs.mkdirs(new Path(top + "/b/c"));
+ createTestFile(new Path(top + "/b/f3"));
+ createTestFile(new Path(top + "/b/f4"));
+ }
+
+ private void createTestFile(Path file) throws IOException {
+ long blockSize = 8192;
+ byte[] bytes = new byte[(int)blockSize];
+ FSDataOutputStream stm = fs.create(file, false, 4096, (short)1, blockSize);
+ stm.write(bytes);
+ stm.write(bytes);
+ stm.write(bytes);
+ stm.close();
+ FileStatus stat = fs.getFileStatus(file);
+ assertEquals(blockSize, stat.getBlockSize());
+ }
+
+ private void mySetup() throws IOException {
+ conf = new Configuration();
+ dfs = new MiniDFSCluster(conf, 6, true, null);
+ dfs.waitActive();
+ fs = dfs.getFileSystem();
+ }
+
+ private void myTearDown() {
+ if (dfs != null) { dfs.shutdown(); }
+ }
+}
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java?rev=1021873&r1=1021872&r2=1021873&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java Tue Oct 12 18:23:36 2010
@@ -82,6 +82,7 @@ public class TestRaidHar extends TestCas
conf.setBoolean("fs.raidnode.local", local);
conf.set("raid.server.address", "localhost:0");
+ conf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
// create a dfs and map-reduce cluster
final int taskTrackers = 4;
@@ -101,12 +102,12 @@ public class TestRaidHar extends TestCas
/**
* create raid.xml file for RaidNode
*/
- private void mySetup(String srcPath, long targetReplication,
- long metaReplication, long stripeLength ) throws Exception {
+ private void mySetup(long targetReplication,
+ long metaReplication, long stripeLength) throws Exception {
FileWriter fileWriter = new FileWriter(CONFIG_FILE);
fileWriter.write("<?xml version=\"1.0\"?>\n");
String str = "<configuration> " +
- "<srcPath prefix=\"" + srcPath + "\"> " +
+ "<srcPath prefix=\"/user/test/raidtest\"> " +
"<policy name = \"RaidTest1\"> " +
"<destPath> /destraid</destPath> " +
"<property> " +
@@ -162,7 +163,6 @@ public class TestRaidHar extends TestCas
public void testRaidHar() throws Exception {
LOG.info("Test testRaidHar started.");
- String srcPaths [] = { "/user/test/raidtest", "/user/test/raid*" };
long blockSizes [] = {1024L};
long stripeLengths [] = {5};
long targetReplication = 1;
@@ -172,13 +172,11 @@ public class TestRaidHar extends TestCas
createClusters(true);
try {
- for (String srcPath : srcPaths) {
- for (long blockSize : blockSizes) {
- for (long stripeLength : stripeLengths) {
- doTestHar(iter, srcPath, targetReplication, metaReplication,
- stripeLength, blockSize, numBlock);
- iter++;
- }
+ for (long blockSize : blockSizes) {
+ for (long stripeLength : stripeLengths) {
+ doTestHar(iter, targetReplication, metaReplication,
+ stripeLength, blockSize, numBlock);
+ iter++;
}
}
} finally {
@@ -191,14 +189,14 @@ public class TestRaidHar extends TestCas
* Create parity file, delete original file and then validate that
* parity file is automatically deleted.
*/
- private void doTestHar(int iter, String srcPath, long targetReplication,
+ private void doTestHar(int iter, long targetReplication,
long metaReplication, long stripeLength,
long blockSize, int numBlock) throws Exception {
LOG.info("doTestHar started---------------------------:" + " iter " + iter +
" blockSize=" + blockSize + " stripeLength=" + stripeLength);
- mySetup(srcPath, targetReplication, metaReplication, stripeLength);
- RaidShell shell = null;
+ mySetup(targetReplication, metaReplication, stripeLength);
Path dir = new Path("/user/test/raidtest/subdir/");
+ Path file1 = new Path(dir + "/file" + iter);
RaidNode cnode = null;
try {
Path destPath = new Path("/destraid/user/test/raidtest/subdir");
@@ -211,21 +209,9 @@ public class TestRaidHar extends TestCas
LOG.info("doTestHar created test files for iteration " + iter);
// create an instance of the RaidNode
- cnode = RaidNode.createRaidNode(null, conf);
- int times = 10;
-
- while (times-- > 0) {
- try {
- shell = new RaidShell(conf, cnode.getListenerAddress());
- } catch (Exception e) {
- LOG.info("doTestHar unable to connect to " +
- cnode.getListenerAddress() + " retrying....");
- Thread.sleep(1000);
- continue;
- }
- break;
- }
- LOG.info("doTestHar created RaidShell.");
+ Configuration localConf = new Configuration(conf);
+ localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+ cnode = RaidNode.createRaidNode(null, localConf);
FileStatus[] listPaths = null;
int maxFilesFound = 0;
@@ -234,6 +220,7 @@ public class TestRaidHar extends TestCas
try {
listPaths = fileSys.listStatus(destPath);
int count = 0;
+ Path harPath = null;
int filesFound = 0;
if (listPaths != null) {
for (FileStatus s : listPaths) {
@@ -250,6 +237,7 @@ public class TestRaidHar extends TestCas
// files since some parity files might get deleted by the
// purge thread.
assertEquals(10, maxFilesFound);
+ harPath = s.getPath();
count++;
}
}
@@ -260,11 +248,12 @@ public class TestRaidHar extends TestCas
} catch (FileNotFoundException e) {
//ignore
}
- LOG.info("doTestHar waiting for files to be raided and parity files to be har'ed and deleted. Found " +
+ LOG.info("doTestHar waiting for files to be raided and parity files to be har'ed and deleted. Found " +
(listPaths == null ? "none" : listPaths.length));
Thread.sleep(1000); // keep waiting
+
}
-
+
fileSys.delete(dir, true);
// wait till raid file is deleted
int count = 1;
@@ -291,7 +280,6 @@ public class TestRaidHar extends TestCas
StringUtils.stringifyException(e));
throw e;
} finally {
- shell.close();
if (cnode != null) { cnode.stop(); cnode.join(); }
}
LOG.info("doTestHar completed:" + " blockSize=" + blockSize +
|