Author: schen
Date: Tue Oct 12 18:23:36 2010
New Revision: 1021873
URL: http://svn.apache.org/viewvc?rev=1021873&view=rev
Log:
MAPREDUCE-1819. RaidNode is now smarter in submitting Raid jobs. (Ramkumar
Vadali via schen)
Added:
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Decoder.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ParityInputStream.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XORDecoder.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1021873&r1=1021872&r2=1021873&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Oct 12 18:23:36 2010
@@ -133,6 +133,9 @@ Trunk (unreleased changes)
MAPREDUCE-1517. Supports streaming job to run in the background. (Bochun Bai
via amareshwari)
+ MAPREDUCE-1819. RaidNode is now smarter in submitting Raid jobs. (Ramkumar
+ Vadali via schen)
+
OPTIMIZATIONS
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
@@ -325,7 +328,7 @@ Trunk (unreleased changes)
MAPREDUCE-2095. Fixes Gridmix to run from compressed traces. (Ranjit
Mathew via amareshwari)
- MAPREDUCE-1980. DistributedRaidFileSystem now handles ChecksumException
+ MAPREDUCE-1908. DistributedRaidFileSystem now handles ChecksumException
correctly. (Ramkumar Vadali via schen)
Release 0.21.1 - Unreleased
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java?rev=1021873&r1=1021872&r2=1021873&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java Tue Oct 12 18:23:36 2010
@@ -35,7 +35,9 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.raid.Decoder;
import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.raid.XORDecoder;
import org.apache.hadoop.hdfs.BlockMissingException;
import org.apache.hadoop.hdfs.DistributedFileSystem;
@@ -94,7 +96,7 @@ public class DistributedRaidFileSystem e
}
// find stripe length configured
- stripeLength = conf.getInt("hdfs.raid.stripeLength", RaidNode.DEFAULT_STRIPE_LENGTH);
+ stripeLength = RaidNode.getStripeLength(conf);
if (stripeLength == 0) {
LOG.info("dfs.raid.stripeLength is incorrectly defined to be " +
stripeLength + " Ignoring...");
@@ -343,9 +345,10 @@ public class DistributedRaidFileSystem e
clientConf.set("fs.hdfs.impl", clazz.getName());
// Disable caching so that a previously cached RaidDfs is not used.
clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
- Path npath = RaidNode.unRaid(clientConf, path,
- alternates[idx], stripeLength,
- corruptOffset);
+ Decoder decoder =
+ new XORDecoder(clientConf, RaidNode.getStripeLength(clientConf));
+ Path npath = RaidNode.unRaid(clientConf, path, alternates[idx],
+ decoder, stripeLength, corruptOffset);
FileSystem fs1 = getUnderlyingFileSystem(conf);
fs1.initialize(npath.toUri(), conf);
LOG.info("Opening alternate path " + npath + " at offset " + curpos);
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java?rev=1021873&r1=1021872&r2=1021873&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java Tue Oct 12 18:23:36 2010
@@ -56,6 +56,10 @@ class ConfigManager {
public static final long HAR_PARTFILE_SIZE = 10 * 1024 * 1024 * 1024l;
+ public static final int DISTRAID_MAX_JOBS = 10;
+
+ public static final int DISTRAID_MAX_FILES = 10000;
+
/**
* Time to wait after the config file has been modified before reloading it
* (this is done to prevent loading a file that hasn't been fully written).
@@ -71,6 +75,9 @@ class ConfigManager {
private long reloadInterval = RELOAD_INTERVAL;
private long periodicity; // time between runs of all policies
private long harPartfileSize;
+ private int maxJobsPerPolicy; // Max no. of jobs running simultaneously for
+ // a job.
+ private int maxFilesPerJob; // Max no. of files raided by a job.
// Reload the configuration
private boolean doReload;
@@ -88,6 +95,10 @@ class ConfigManager {
this.reloadInterval = conf.getLong("raid.config.reload.interval", RELOAD_INTERVAL);
this.periodicity = conf.getLong("raid.policy.rescan.interval", RESCAN_INTERVAL);
this.harPartfileSize = conf.getLong("raid.har.partfile.size", HAR_PARTFILE_SIZE);
+ this.maxJobsPerPolicy = conf.getInt("raid.distraid.max.jobs",
+ DISTRAID_MAX_JOBS);
+ this.maxFilesPerJob = conf.getInt("raid.distraid.max.files",
+ DISTRAID_MAX_FILES);
if (configFileName == null) {
String msg = "No raid.config.file given in conf - " +
"the Hadoop Raid utility cannot run. Aborting....";
@@ -306,6 +317,14 @@ class ConfigManager {
return harPartfileSize;
}
+ public synchronized int getMaxJobsPerPolicy() {
+ return maxJobsPerPolicy;
+ }
+
+ public synchronized int getMaxFilesPerJob() {
+ return maxFilesPerJob;
+ }
+
/**
* Get a collection of all policies
*/
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Decoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Decoder.java?rev=1021873&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Decoder.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Decoder.java Tue Oct 12 18:23:36 2010
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.BlockMissingException;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Represents a generic decoder that can be used to read a file with
+ * corrupt blocks by using the parity file.
+ * This is an abstract class, concrete subclasses need to implement
+ * fixErasedBlock.
+ */
+public abstract class Decoder {
+ public static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.Decoder");
+ protected Configuration conf;
+ protected int stripeSize;
+ protected int paritySize;
+ protected Random rand;
+ protected int bufSize;
+ protected byte[][] readBufs;
+ protected byte[][] writeBufs;
+
+ Decoder(Configuration conf, int stripeSize, int paritySize) {
+ this.conf = conf;
+ this.stripeSize = stripeSize;
+ this.paritySize = paritySize;
+ this.rand = new Random();
+ this.bufSize = conf.getInt("raid.decoder.bufsize", 1024 * 1024);
+ this.readBufs = new byte[stripeSize + paritySize][];
+ this.writeBufs = new byte[paritySize][];
+ allocateBuffers();
+ }
+
+ private void allocateBuffers() {
+ for (int i = 0; i < stripeSize + paritySize; i++) {
+ readBufs[i] = new byte[bufSize];
+ }
+ for (int i = 0; i < paritySize; i++) {
+ writeBufs[i] = new byte[bufSize];
+ }
+ }
+
+ private void configureBuffers(long blockSize) {
+ if ((long)bufSize > blockSize) {
+ bufSize = (int)blockSize;
+ allocateBuffers();
+ } else if (blockSize % bufSize != 0) {
+ bufSize = (int)(blockSize / 256L); // heuristic.
+ if (bufSize == 0) {
+ bufSize = 1024;
+ }
+ bufSize = Math.min(bufSize, 1024 * 1024);
+ allocateBuffers();
+ }
+ }
+
+ /**
+ * The interface to generate a decoded file using the good portion of the
+ * source file and the parity file.
+ * @param fs The filesystem containing the source file.
+ * @param srcFile The damaged source file.
+ * @param parityFs The filesystem containing the parity file. This could be
+ * different from fs in case the parity file is part of a HAR archive.
+ * @param parityFile The parity file.
+ * @param errorOffset Known location of error in the source file. There could
+ * be additional errors in the source file that are discovered during
+ * the decode process.
+ * @param decodedFile The decoded file. This will have the exact same contents
+ * as the source file on success.
+ */
+ public void decodeFile(
+ FileSystem fs, Path srcFile, FileSystem parityFs, Path parityFile,
+ long errorOffset, Path decodedFile) throws IOException {
+
+ LOG.info("Create " + decodedFile + " for error at " +
+ srcFile + ":" + errorOffset);
+ FileStatus srcStat = fs.getFileStatus(srcFile);
+ long blockSize = srcStat.getBlockSize();
+ configureBuffers(blockSize);
+ // Move the offset to the start of the block.
+ errorOffset = (errorOffset / blockSize) * blockSize;
+
+ // Create the decoded file.
+ FSDataOutputStream out = fs.create(
+ decodedFile, false, conf.getInt("io.file.buffer.size", 64 * 1024),
+ srcStat.getReplication(), srcStat.getBlockSize());
+
+ // Open the source file.
+ FSDataInputStream in = fs.open(
+ srcFile, conf.getInt("io.file.buffer.size", 64 * 1024));
+
+ // Start copying data block-by-block.
+ for (long offset = 0; offset < srcStat.getLen(); offset += blockSize) {
+ long limit = Math.min(blockSize, srcStat.getLen() - offset);
+ long bytesAlreadyCopied = 0;
+ if (offset != errorOffset) {
+ try {
+ in = fs.open(
+ srcFile, conf.getInt("io.file.buffer.size", 64 * 1024));
+ in.seek(offset);
+ RaidUtils.copyBytes(in, out, readBufs[0], limit);
+ assert(out.getPos() == offset +limit);
+ LOG.info("Copied till " + out.getPos() + " from " + srcFile);
+ continue;
+ } catch (BlockMissingException e) {
+ LOG.info("Encountered BME at " + srcFile + ":" + offset);
+ bytesAlreadyCopied = out.getPos() - offset;
+ } catch (ChecksumException e) {
+ LOG.info("Encountered CE at " + srcFile + ":" + offset);
+ bytesAlreadyCopied = out.getPos() - offset;
+ }
+ }
+ // If we are here offset == errorOffset or we got an exception.
+ // Recover the block starting at offset.
+ fixErasedBlock(fs, srcFile, parityFs, parityFile, blockSize, offset,
+ bytesAlreadyCopied, limit, out);
+ }
+ out.close();
+
+ try {
+ fs.setOwner(decodedFile, srcStat.getOwner(), srcStat.getGroup());
+ fs.setPermission(decodedFile, srcStat.getPermission());
+ fs.setTimes(decodedFile, srcStat.getModificationTime(),
+ srcStat.getAccessTime());
+ } catch (Exception exc) {
+ LOG.info("Didn't manage to copy meta information because of " + exc +
+ " Ignoring...");
+ }
+
+ }
+
+ /**
+ * Recovers a corrupt block to local file.
+ *
+ * @param srcFs The filesystem containing the source file.
+ * @param srcPath The damaged source file.
+ * @param parityPath The filesystem containing the parity file. This could be
+ * different from fs in case the parity file is part of a HAR archive.
+ * @param parityFile The parity file.
+ * @param blockSize The block size of the file.
+ * @param blockOffset Known location of error in the source file. There could
+ * be additional errors in the source file that are discovered during
+ * the decode process.
+ * @param localBlockFile The file to write the block to.
+ * @param limit The maximum number of bytes to be written out.
+ * This is to prevent writing beyond the end of the file.
+ */
+ public void recoverBlockToFile(
+ FileSystem srcFs, Path srcPath, FileSystem parityFs, Path parityPath,
+ long blockSize, long blockOffset, File localBlockFile, long limit)
+ throws IOException {
+ OutputStream out = new FileOutputStream(localBlockFile);
+ fixErasedBlock(srcFs, srcPath, parityFs, parityPath,
+ blockSize, blockOffset, 0, limit, out);
+ out.close();
+ }
+
+ /**
+ * Implementation-specific mechanism of writing a fixed block.
+ * @param fs The filesystem containing the source file.
+ * @param srcFile The damaged source file.
+ * @param parityFs The filesystem containing the parity file. This could be
+ * different from fs in case the parity file is part of a HAR archive.
+ * @param parityFile The parity file.
+ * @param blockSize The maximum size of a block.
+ * @param errorOffset Known location of error in the source file. There could
+ * be additional errors in the source file that are discovered during
+ * the decode process.
+ * @param bytesToSkip After the block is generated, these many bytes should be
+ * skipped before writing to the output. This is needed because the
+ * output may have a portion of the block written from the source file
+ * before a new corruption is discovered in the block.
+ * @param limit The maximum number of bytes to be written out, including
+ * bytesToSkip. This is to prevent writing beyond the end of the file.
+ * @param out The output.
+ */
+ protected abstract void fixErasedBlock(
+ FileSystem fs, Path srcFile, FileSystem parityFs, Path parityFile,
+ long blockSize, long errorOffset, long bytesToSkip, long limit,
+ OutputStream out) throws IOException;
+}
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java?rev=1021873&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DirectoryTraversal.java Tue Oct 12 18:23:36 2010
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Stack;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Implements depth-first traversal using a Stack object. The traversal
+ * can be stopped at any time and the state of traversal is saved.
+ */
+public class DirectoryTraversal {
+ public static final Log LOG =
+ LogFactory.getLog("org.apache.hadoop.raid.DirectoryTraversal");
+
+ private FileSystem fs;
+ private List<FileStatus> paths;
+ private int pathIdx = 0; // Next path to process.
+ private Stack<Node> stack = new Stack<Node>();
+
+ /**
+ * Represents a directory node in directory traversal.
+ */
+ static class Node {
+ private FileStatus path; // Path that this node represents.
+ private FileStatus[] elements; // Elements in the node.
+ private int idx = 0;
+
+ public Node(FileStatus path, FileStatus[] elements) {
+ this.path = path;
+ this.elements = elements;
+ }
+
+ public boolean hasNext() {
+ return idx < elements.length;
+ }
+
+ public FileStatus next() {
+ return elements[idx++];
+ }
+
+ public FileStatus path() {
+ return this.path;
+ }
+ }
+
+ /**
+ * Constructor.
+ * @param fs The filesystem to use.
+ * @param startPaths A list of paths that need to be traversed
+ */
+ public DirectoryTraversal(FileSystem fs, List<FileStatus> startPaths) {
+ this.fs = fs;
+ paths = startPaths;
+ pathIdx = 0;
+ }
+
+ /**
+ * Choose some files to RAID.
+ * @param conf Configuration to use.
+ * @param raidDestPrefix Prefix of the path to RAID to.
+ * @param modTimePeriod Time gap before RAIDing.
+ * @param limit Limit on the number of files to choose.
+ * @return list of files to RAID.
+ * @throws IOException
+ */
+ public List<FileStatus> selectFilesToRaid(
+ Configuration conf, int targetRepl, Path raidDestPrefix,
+ long modTimePeriod, int limit) throws IOException {
+ List<FileStatus> selected = new LinkedList<FileStatus>();
+ int numSelected = 0;
+
+ long now = System.currentTimeMillis();
+ while (numSelected < limit) {
+ FileStatus next = getNextFile();
+ if (next == null) {
+ break;
+ }
+ // We have the next file, do we want to select it?
+ // If the source file has fewer than or equal to 2 blocks, then skip it.
+ long blockSize = next.getBlockSize();
+ if (2 * blockSize >= next.getLen()) {
+ continue;
+ }
+
+ boolean select = false;
+ try {
+ Object ppair = RaidNode.getParityFile(
+ raidDestPrefix, next.getPath(), conf);
+ // Is there is a valid parity file?
+ if (ppair != null) {
+ // Is the source at the target replication?
+ if (next.getReplication() != targetRepl) {
+ // Select the file so that its replication can be set.
+ select = true;
+ } else {
+ // Nothing to do, don't select the file.
+ select = false;
+ }
+ } else if (next.getModificationTime() + modTimePeriod < now) {
+ // If there isn't a valid parity file, check if the file is too new.
+ select = true;
+ }
+ } catch (java.io.FileNotFoundException e) {
+ select = true; // destination file does not exist
+ }
+ if (select) {
+ selected.add(next);
+ numSelected++;
+ }
+ }
+
+ return selected;
+ }
+
+ /**
+ * Return the next file.
+ * @throws IOException
+ */
+ public FileStatus getNextFile() throws IOException {
+ // Check if traversal is done.
+ while (!doneTraversal()) {
+ // If traversal is not done, check if the stack is not empty.
+ while (!stack.isEmpty()) {
+ // If the stack is not empty, look at the top node.
+ Node node = stack.peek();
+ // Check if the top node has an element.
+ if (node.hasNext()) {
+ FileStatus element = node.next();
+ // Is the next element a directory.
+ if (!element.isDir()) {
+ // It is a file, return it.
+ return element;
+ }
+ // Next element is a directory, push it on to the stack and
+ // continue
+ try {
+ pushNewNode(element);
+ } catch (FileNotFoundException e) {
+ // Ignore and move to the next element.
+ }
+ continue;
+ } else {
+ // Top node has no next element, pop it and continue.
+ stack.pop();
+ continue;
+ }
+ }
+ // If the stack is empty, do we have more paths?
+ while (!paths.isEmpty()) {
+ FileStatus next = paths.remove(0);
+ pathIdx++;
+ if (!next.isDir()) {
+ return next;
+ }
+ try {
+ pushNewNode(next);
+ } catch (FileNotFoundException e) {
+ continue;
+ }
+ break;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Gets the next directory in the tree. The algorithm returns deeper directories
+ * first.
+ * @return A FileStatus representing the directory.
+ * @throws IOException
+ */
+ public FileStatus getNextDirectory() throws IOException {
+ // Check if traversal is done.
+ while (!doneTraversal()) {
+ // If traversal is not done, check if the stack is not empty.
+ while (!stack.isEmpty()) {
+ // If the stack is not empty, look at the top node.
+ Node node = stack.peek();
+ // Check if the top node has an element.
+ if (node.hasNext()) {
+ FileStatus element = node.next();
+ // Is the next element a directory.
+ if (element.isDir()) {
+ // Next element is a directory, push it on to the stack and
+ // continue
+ try {
+ pushNewNode(element);
+ } catch (FileNotFoundException e) {
+ // Ignore and move to the next element.
+ }
+ continue;
+ }
+ } else {
+ stack.pop();
+ return node.path;
+ }
+ }
+ // If the stack is empty, do we have more paths?
+ while (!paths.isEmpty()) {
+ FileStatus next = paths.remove(0);
+ pathIdx++;
+ if (next.isDir()) {
+ try {
+ pushNewNode(next);
+ } catch (FileNotFoundException e) {
+ continue;
+ }
+ break;
+ }
+ }
+ }
+ return null;
+ }
+
+ private void pushNewNode(FileStatus stat) throws IOException {
+ if (!stat.isDir()) {
+ return;
+ }
+ Path p = stat.getPath();
+ LOG.info("Traversing to directory " + p);
+ FileStatus[] elements = fs.listStatus(p);
+ Node newNode = new Node(stat, (elements == null? new FileStatus[0]: elements));
+ stack.push(newNode);
+ }
+
+ public boolean doneTraversal() {
+ return paths.isEmpty() && stack.isEmpty();
+ }
+}
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java?rev=1021873&r1=1021872&r2=1021873&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java Tue Oct 12 18:23:36 2010
@@ -34,17 +34,21 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.SequenceFileRecordReader;
+import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.raid.RaidNode.Statistics;
import org.apache.hadoop.raid.protocol.PolicyInfo;
import org.apache.hadoop.util.StringUtils;
@@ -111,6 +115,11 @@ public class DistRaid {
List<RaidPolicyPathPair> raidPolicyPathPairList = new ArrayList<RaidPolicyPathPair>();
+ private JobClient jobClient;
+ private RunningJob runningJob;
+ private int jobEventCounter = 0;
+ private String lastReport = null;
+
/** Responsible for generating splits of the src file list. */
static class DistRaidInputFormat implements InputFormat<Text, PolicyInfo> {
/** Do nothing. */
@@ -184,6 +193,7 @@ public class DistRaid {
private int failcount = 0;
private int succeedcount = 0;
private Statistics st = null;
+ private Reporter reporter = null;
private String getCountString() {
return "Succeeded: " + succeedcount + " Failed: " + failcount;
@@ -200,6 +210,7 @@ public class DistRaid {
public void map(Text key, PolicyInfo policy,
OutputCollector<WritableComparable, Text> out, Reporter reporter)
throws IOException {
+ this.reporter = reporter;
try {
LOG.info("Raiding file=" + key.toString() + " policy=" + policy);
Path p = new Path(key.toString());
@@ -268,29 +279,70 @@ public class DistRaid {
private static int getMapCount(int srcCount, int numNodes) {
int numMaps = (int) (srcCount / OP_PER_MAP);
numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
- return Math.max(numMaps, 1);
+ return Math.max(numMaps, MAX_MAPS_PER_NODE);
}
- /** invokes mapred job do parallel raiding */
- public void doDistRaid() throws IOException {
- if (raidPolicyPathPairList.size() == 0) {
- LOG.info("DistRaid has no paths to raid.");
- return;
- }
- try {
- if (setup()) {
- JobClient.runJob(jobconf);
- }
- } finally {
- // delete job directory
- final String jobdir = jobconf.get(JOB_DIR_LABEL);
- if (jobdir != null) {
- final Path jobpath = new Path(jobdir);
- jobpath.getFileSystem(jobconf).delete(jobpath, true);
- }
- }
- raidPolicyPathPairList.clear();
- }
+ /** Invokes a map-reduce job do parallel raiding.
+ * @return true if the job was started, false otherwise
+ */
+ public boolean startDistRaid() throws IOException {
+ assert(raidPolicyPathPairList.size() > 0);
+ if (setup()) {
+ this.jobClient = new JobClient(jobconf);
+ this.runningJob = this.jobClient.submitJob(jobconf);
+ LOG.info("Job Started: " + runningJob.getID());
+ return true;
+ }
+ return false;
+ }
+
+ /** Checks if the map-reduce job has completed.
+ *
+ * @return true if the job completed, false otherwise.
+ * @throws IOException
+ */
+ public boolean checkComplete() throws IOException {
+ JobID jobID = runningJob.getID();
+ if (runningJob.isComplete()) {
+ // delete job directory
+ final String jobdir = jobconf.get(JOB_DIR_LABEL);
+ if (jobdir != null) {
+ final Path jobpath = new Path(jobdir);
+ jobpath.getFileSystem(jobconf).delete(jobpath, true);
+ }
+ if (runningJob.isSuccessful()) {
+ LOG.info("Job Complete(Succeeded): " + jobID);
+ } else {
+ LOG.info("Job Complete(Failed): " + jobID);
+ }
+ raidPolicyPathPairList.clear();
+ Counters ctrs = runningJob.getCounters();
+ long filesRaided = ctrs.findCounter(Counter.FILES_SUCCEEDED).getValue();
+ long filesFailed = ctrs.findCounter(Counter.FILES_FAILED).getValue();
+ return true;
+ } else {
+ String report = (" job " + jobID +
+ " map " + StringUtils.formatPercent(runningJob.mapProgress(), 0)+
+ " reduce " + StringUtils.formatPercent(runningJob.reduceProgress(), 0));
+ if (!report.equals(lastReport)) {
+ LOG.info(report);
+ lastReport = report;
+ }
+ TaskCompletionEvent[] events =
+ runningJob.getTaskCompletionEvents(jobEventCounter);
+ jobEventCounter += events.length;
+ for(TaskCompletionEvent event : events) {
+ if (event.getTaskStatus() == TaskCompletionEvent.Status.FAILED) {
+ LOG.info(" Job " + jobID + " " + event.toString());
+ }
+ }
+ return false;
+ }
+ }
+
+ public boolean successful() throws IOException {
+ return runningJob.isSuccessful();
+ }
/**
* set up input file which has the list of input files.
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java?rev=1021873&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java Tue Oct 12 18:23:36 2010
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Represents a generic encoder that can generate a parity file for a source
+ * file.
+ * This is an abstract class, concrete subclasses need to implement
+ * encodeFileImpl.
+ */
+public abstract class Encoder {
+ public static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.Encoder");
+ protected Configuration conf;
+ protected int stripeSize;
+ protected int paritySize;
+ protected Random rand;
+ protected int bufSize;
+ protected byte[][] readBufs;
+ protected byte[][] writeBufs;
+
+ /**
+ * A class that acts as a sink for data, similar to /dev/null.
+ */
+ static class NullOutputStream extends OutputStream {
+ public void write(byte[] b) throws IOException {}
+ public void write(int b) throws IOException {}
+ public void write(byte[] b, int off, int len) throws IOException {}
+ }
+
+ Encoder(
+ Configuration conf, int stripeSize, int paritySize) {
+ this.conf = conf;
+ this.stripeSize = stripeSize;
+ this.paritySize = paritySize;
+ this.rand = new Random();
+ this.bufSize = conf.getInt("raid.encoder.bufsize", 1024 * 1024);
+ this.readBufs = new byte[stripeSize][];
+ this.writeBufs = new byte[paritySize][];
+ allocateBuffers();
+ }
+
+ private void allocateBuffers() {
+ for (int i = 0; i < stripeSize; i++) {
+ readBufs[i] = new byte[bufSize];
+ }
+ for (int i = 0; i < paritySize; i++) {
+ writeBufs[i] = new byte[bufSize];
+ }
+ }
+
+ private void configureBuffers(long blockSize) {
+ if ((long)bufSize > blockSize) {
+ bufSize = (int)blockSize;
+ allocateBuffers();
+ } else if (blockSize % bufSize != 0) {
+ bufSize = (int)(blockSize / 256L); // heuristic.
+ if (bufSize == 0) {
+ bufSize = 1024;
+ }
+ bufSize = Math.min(bufSize, 1024 * 1024);
+ allocateBuffers();
+ }
+ }
+
+ /**
+ * The interface to use to generate a parity file.
+ * This method can be called multiple times with the same Encoder object,
+ * thus allowing reuse of the buffers allocated by the Encoder object.
+ *
+ * @param fs The filesystem containing the source file.
+ * @param srcFile The source file.
+ * @param parityFile The parity file to be generated.
+ */
+ public void encodeFile(FileSystem fs, Path srcFile, Path parityFile,
+ short parityRepl, Progressable reporter) throws IOException {
+ FileStatus srcStat = fs.getFileStatus(srcFile);
+ long srcSize = srcStat.getLen();
+ long blockSize = srcStat.getBlockSize();
+
+ configureBuffers(blockSize);
+
+ // Create a tmp file to which we will write first.
+ Path parityTmp = new Path(conf.get("fs.raid.tmpdir", "/tmp/raid") +
+ parityFile.toUri().getPath() +
+ "." + rand.nextLong() + ".tmp");
+ FSDataOutputStream out = fs.create(
+ parityTmp,
+ true,
+ conf.getInt("io.file.buffer.size", 64 * 1024),
+ parityRepl,
+ blockSize);
+
+ try {
+ encodeFileToStream(fs, srcFile, srcSize, blockSize, out, reporter);
+ out.close();
+ out = null;
+ LOG.info("Wrote temp parity file " + parityTmp);
+
+ // delete destination if exists
+ if (fs.exists(parityFile)){
+ fs.delete(parityFile, false);
+ }
+ fs.mkdirs(parityFile.getParent());
+ if (!fs.rename(parityTmp, parityFile)) {
+ String msg = "Unable to rename file " + parityTmp + " to " + parityFile;
+ throw new IOException (msg);
+ }
+ LOG.info("Wrote parity file " + parityFile);
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ fs.delete(parityTmp, false);
+ }
+ }
+
+ /**
+ * Recovers a corrupt block in a parity file to a local file.
+ *
+ * The encoder generates paritySize parity blocks for a source file stripe.
+ * Since we want only one of the parity blocks, this function creates
+ * null outputs for the blocks to be discarded.
+ *
+ * @param fs The filesystem in which both srcFile and parityFile reside.
+ * @param srcFile The source file.
+ * @param srcSize The size of the source file.
+ * @param blockSize The block size for the source/parity files.
+ * @param corruptOffset The location of corruption in the parity file.
+ * @param localBlockFile The destination for the reovered block.
+ */
+ public void recoverParityBlockToFile(
+ FileSystem fs,
+ Path srcFile, long srcSize, long blockSize,
+ Path parityFile, long corruptOffset,
+ File localBlockFile) throws IOException {
+ OutputStream out = new FileOutputStream(localBlockFile);
+ try {
+ recoverParityBlockToStream(fs, srcFile, srcSize, blockSize, parityFile,
+ corruptOffset, out);
+ } finally {
+ out.close();
+ }
+ }
+
+ /**
+ * Recovers a corrupt block in a parity file to a local file.
+ *
+ * The encoder generates paritySize parity blocks for a source file stripe.
+ * Since we want only one of the parity blocks, this function creates
+ * null outputs for the blocks to be discarded.
+ *
+ * @param fs The filesystem in which both srcFile and parityFile reside.
+ * @param srcFile The source file.
+ * @param srcSize The size of the source file.
+ * @param blockSize The block size for the source/parity files.
+ * @param corruptOffset The location of corruption in the parity file.
+ * @param out The destination for the reovered block.
+ */
+ public void recoverParityBlockToStream(
+ FileSystem fs,
+ Path srcFile, long srcSize, long blockSize,
+ Path parityFile, long corruptOffset,
+ OutputStream out) throws IOException {
+ LOG.info("Recovering parity block" + parityFile + ":" + corruptOffset);
+ // Get the start offset of the corrupt block.
+ corruptOffset = (corruptOffset / blockSize) * blockSize;
+ // Output streams to each block in the parity file stripe.
+ OutputStream[] outs = new OutputStream[paritySize];
+ long indexOfCorruptBlockInParityStripe =
+ (corruptOffset / blockSize) % paritySize;
+ LOG.info("Index of corrupt block in parity stripe: " +
+ indexOfCorruptBlockInParityStripe);
+ // Create a real output stream for the block we want to recover,
+ // and create null streams for the rest.
+ for (int i = 0; i < paritySize; i++) {
+ if (indexOfCorruptBlockInParityStripe == i) {
+ outs[i] = out;
+ } else {
+ outs[i] = new NullOutputStream();
+ }
+ }
+ // Get the stripe index and start offset of stripe.
+ long stripeIdx = corruptOffset / (paritySize * blockSize);
+ long stripeStart = stripeIdx * blockSize * stripeSize;
+
+ // Get input streams to each block in the source file stripe.
+ InputStream[] blocks = stripeInputs(fs, srcFile, stripeStart,
+ srcSize, blockSize);
+ LOG.info("Starting recovery by using source stripe " +
+ srcFile + ":" + stripeStart);
+ // Read the data from the blocks and write to the parity file.
+ encodeStripe(blocks, stripeStart, blockSize, outs, Reporter.NULL);
+ }
+
+ /**
+ * Recovers a corrupt block in a parity file to an output stream.
+ *
+ * The encoder generates paritySize parity blocks for a source file stripe.
+ * Since there is only one output provided, some blocks are written out to
+ * files before being written out to the output.
+ *
+ * @param fs The filesystem in which both srcFile and parityFile reside.
+ * @param srcFile The source file.
+ * @param srcSize The size of the source file.
+ * @param blockSize The block size for the source/parity files.
+ * @param out The destination for the reovered block.
+ */
+ private void encodeFileToStream(FileSystem fs, Path srcFile, long srcSize,
+ long blockSize, OutputStream out, Progressable reporter) throws IOException {
+ OutputStream[] tmpOuts = new OutputStream[paritySize];
+ // One parity block can be written directly to out, rest to local files.
+ tmpOuts[0] = out;
+ File[] tmpFiles = new File[paritySize - 1];
+ for (int i = 0; i < paritySize - 1; i++) {
+ tmpFiles[i] = File.createTempFile("parity", "_" + i);
+ LOG.info("Created tmp file " + tmpFiles[i]);
+ tmpFiles[i].deleteOnExit();
+ }
+ try {
+ // Loop over stripes in the file.
+ for (long stripeStart = 0; stripeStart < srcSize;
+ stripeStart += blockSize * stripeSize) {
+ reporter.progress();
+ LOG.info("Starting encoding of stripe " + srcFile + ":" + stripeStart);
+ // Create input streams for blocks in the stripe.
+ InputStream[] blocks = stripeInputs(fs, srcFile, stripeStart,
+ srcSize, blockSize);
+ // Create output streams to the temp files.
+ for (int i = 0; i < paritySize - 1; i++) {
+ tmpOuts[i + 1] = new FileOutputStream(tmpFiles[i]);
+ }
+ // Call the implementation of encoding.
+ encodeStripe(blocks, stripeStart, blockSize, tmpOuts, reporter);
+ // Close output streams to the temp files and write the temp files
+ // to the output provided.
+ for (int i = 0; i < paritySize - 1; i++) {
+ tmpOuts[i + 1].close();
+ tmpOuts[i + 1] = null;
+ InputStream in = new FileInputStream(tmpFiles[i]);
+ RaidUtils.copyBytes(in, out, writeBufs[i], blockSize);
+ reporter.progress();
+ }
+ }
+ } finally {
+ for (int i = 0; i < paritySize - 1; i++) {
+ if (tmpOuts[i + 1] != null) {
+ tmpOuts[i + 1].close();
+ }
+ tmpFiles[i].delete();
+ LOG.info("Deleted tmp file " + tmpFiles[i]);
+ }
+ }
+ }
+
+ /**
+ * Return input streams for each block in a source file's stripe.
+ * @param fs The filesystem where the file resides.
+ * @param srcFile The source file.
+ * @param stripeStartOffset The start offset of the stripe.
+ * @param srcSize The size of the source file.
+ * @param blockSize The block size for the source file.
+ */
+ protected InputStream[] stripeInputs(
+ FileSystem fs,
+ Path srcFile,
+ long stripeStartOffset,
+ long srcSize,
+ long blockSize
+ ) throws IOException {
+ InputStream[] blocks = new InputStream[stripeSize];
+ for (int i = 0; i < stripeSize; i++) {
+ long seekOffset = stripeStartOffset + i * blockSize;
+ if (seekOffset < srcSize) {
+ FSDataInputStream in = fs.open(
+ srcFile, conf.getInt("io.file.buffer.size", 64 * 1024));
+ in.seek(seekOffset);
+ LOG.info("Opening stream at " + srcFile + ":" + seekOffset);
+ blocks[i] = in;
+ } else {
+ LOG.info("Using zeros at offset " + seekOffset);
+ // We have no src data at this offset.
+ blocks[i] = new RaidUtils.ZeroInputStream(
+ seekOffset + blockSize);
+ }
+ }
+ return blocks;
+ }
+
+ /**
+ * The implementation of generating parity data for a stripe.
+ *
+ * @param blocks The streams to blocks in the stripe.
+ * @param srcFile The source file.
+ * @param stripeStartOffset The start offset of the stripe
+ * @param blockSize The maximum size of a block.
+ * @param outs output streams to the parity blocks.
+ * @param reporter progress indicator.
+ */
+ protected abstract void encodeStripe(
+ InputStream[] blocks,
+ long stripeStartOffset,
+ long blockSize,
+ OutputStream[] outs,
+ Progressable reporter) throws IOException;
+}
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java?rev=1021873&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/JobMonitor.java Tue Oct 12 18:23:36 2010
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Periodically monitors the status of jobs registered with it.
+ *
+ * Jobs that are submitted for the same policy name are kept in the same list,
+ * and the list itself is kept in a map that has the policy name as the key and
+ * the list as value.
+ */
+class JobMonitor implements Runnable {
+ public static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.JobMonitor");
+
+ volatile boolean running = true;
+
+ private Map<String, List<DistRaid>> jobs;
+ private long jobMonitorInterval;
+ private volatile long jobsMonitored = 0;
+ private volatile long jobsSucceeded = 0;
+
+ public JobMonitor(Configuration conf) {
+ jobMonitorInterval = conf.getLong("raid.jobmonitor.interval", 60000);
+ jobs = new java.util.HashMap<String, List<DistRaid>>();
+ }
+
+ public void run() {
+ while (running) {
+ try {
+ LOG.info("JobMonitor thread continuing to run...");
+ doMonitor();
+ } catch (Throwable e) {
+ LOG.error("JobMonitor encountered exception " +
+ StringUtils.stringifyException(e));
+ // All expected exceptions are caught by doMonitor(). It is better
+ // to exit now, this will prevent RaidNode from submitting more jobs
+ // since the number of running jobs will never decrease.
+ return;
+ }
+ }
+ }
+
+ /**
+ * Periodically checks status of running map-reduce jobs.
+ */
+ public void doMonitor() {
+ while (running) {
+ String[] keys = null;
+ // Make a copy of the names of the current jobs.
+ synchronized(jobs) {
+ keys = jobs.keySet().toArray(new String[0]);
+ }
+
+ // Check all the jobs. We do not want to block access to `jobs`
+ // because that will prevent new jobs from being added.
+ // This is safe because JobMonitor.run is the only code that can
+ // remove a job from `jobs`. Thus all elements in `keys` will have
+ // valid values.
+ Map<String, List<DistRaid>> finishedJobs =
+ new HashMap<String, List<DistRaid>>();
+
+ for (String key: keys) {
+ // For each policy being monitored, get the list of jobs running.
+ DistRaid[] jobListCopy = null;
+ synchronized(jobs) {
+ List<DistRaid> jobList = jobs.get(key);
+ synchronized(jobList) {
+ jobListCopy = jobList.toArray(new DistRaid[jobList.size()]);
+ }
+ }
+ // The code that actually contacts the JobTracker is not synchronized,
+ // it uses copies of the list of jobs.
+ for (DistRaid job: jobListCopy) {
+ // Check each running job.
+ try {
+ boolean complete = job.checkComplete();
+ if (complete) {
+ addJob(finishedJobs, key, job);
+ if (job.successful()) {
+ jobsSucceeded++;
+ }
+ }
+ } catch (IOException ioe) {
+ // If there was an error, consider the job finished.
+ addJob(finishedJobs, key, job);
+ }
+ }
+ }
+
+ if (finishedJobs.size() > 0) {
+ for (String key: finishedJobs.keySet()) {
+ List<DistRaid> finishedJobList = finishedJobs.get(key);
+ // Iterate through finished jobs and remove from jobs.
+ // removeJob takes care of locking.
+ for (DistRaid job: finishedJobList) {
+ removeJob(jobs, key, job);
+ }
+ }
+ }
+
+ try {
+ Thread.sleep(jobMonitorInterval);
+ } catch (InterruptedException ie) {
+ }
+ }
+ }
+
+ public int runningJobsCount(String key) {
+ int count = 0;
+ synchronized(jobs) {
+ if (jobs.containsKey(key)) {
+ List<DistRaid> jobList = jobs.get(key);
+ synchronized(jobList) {
+ count = jobList.size();
+ }
+ }
+ }
+ return count;
+ }
+
+ public void monitorJob(String key, DistRaid job) {
+ addJob(jobs, key, job);
+ jobsMonitored++;
+ }
+
+ public long jobsMonitored() {
+ return this.jobsMonitored;
+ }
+
+ public long jobsSucceeded() {
+ return this.jobsSucceeded;
+ }
+
+ private static void addJob(Map<String, List<DistRaid>> jobsMap,
+ String jobName, DistRaid job) {
+ synchronized(jobsMap) {
+ List<DistRaid> list = null;
+ if (jobsMap.containsKey(jobName)) {
+ list = jobsMap.get(jobName);
+ } else {
+ list = new LinkedList<DistRaid>();
+ jobsMap.put(jobName, list);
+ }
+ synchronized(list) {
+ list.add(job);
+ }
+ }
+ }
+
+ private static void removeJob(Map<String, List<DistRaid>> jobsMap,
+ String jobName, DistRaid job) {
+ synchronized(jobsMap) {
+ if (jobsMap.containsKey(jobName)) {
+ List<DistRaid> list = jobsMap.get(jobName);
+ synchronized(list) {
+ for (Iterator<DistRaid> it = list.iterator(); it.hasNext(); ) {
+ DistRaid val = it.next();
+ if (val == job) {
+ it.remove();
+ }
+ }
+ if (list.size() == 0) {
+ jobsMap.remove(jobName);
+ }
+ }
+ }
+ }
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ParityInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ParityInputStream.java?rev=1021873&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ParityInputStream.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ParityInputStream.java Tue Oct 12 18:23:36 2010
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Wraps over multiple input streams and provides an input stream that is
+ * an XOR of the streams.
+ */
+class ParityInputStream extends InputStream {
+ private static final int DEFAULT_BUFSIZE = 5*1024*1024;
+ private InputStream[] streams;
+ private byte[] xor;
+ private byte[] buf;
+ private int bufSize;
+ private long remaining;
+ private int available = 0;
+ private int readPos = 0;
+
+ public ParityInputStream(
+ InputStream[] streams, long parityBlockSize, byte[] buf, byte[] xor) {
+ assert buf.length == xor.length;
+ bufSize = buf.length;
+ this.streams = streams;
+ remaining = parityBlockSize;
+ this.buf = buf;
+ this.xor = xor;
+ }
+
+ @Override
+ public int read() throws IOException {
+ makeAvailable();
+ if (available == 0) {
+ return -1;
+ }
+ int ret = xor[readPos];
+ readPos++;
+ available--;
+ return ret;
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ makeAvailable();
+ if (available == 0) {
+ return -1;
+ }
+ int ret = Math.min(len, available);
+ for (int i = 0; i < ret; ++i) {
+ b[off+i] = xor[readPos+i];
+ }
+ readPos += ret;
+ available -= ret;
+ return ret;
+ }
+
+ public void close() throws IOException {
+ for (InputStream i: streams) {
+ i.close();
+ }
+ }
+
+ /**
+ * Send the contents of the stream to the sink.
+ * @param sink
+ * @param reporter
+ * @throws IOException
+ */
+ public void drain(OutputStream sink, Progressable reporter)
+ throws IOException {
+
+ while (true) {
+ makeAvailable();
+ if (available == 0) {
+ break;
+ }
+ sink.write(xor, readPos, available);
+ available = 0;
+ if (reporter != null) {
+ reporter.progress();
+ }
+ }
+ }
+
+ /**
+ * Make some bytes available for reading in the internal buffer.
+ * @throws IOException
+ */
+ private void makeAvailable() throws IOException {
+ if (available > 0 || remaining <= 0) {
+ return;
+ }
+ // Read some bytes from the first stream.
+ int xorlen = (int)Math.min(remaining, bufSize);
+ readExact(streams[0], xor, xorlen);
+
+ // Read bytes from all the other streams and xor them.
+ for (int i = 1; i < streams.length; i++) {
+ readExact(streams[i], buf, xorlen);
+
+ for (int j = 0; j < xorlen; j++) {
+ xor[j] ^= buf[j];
+ }
+ }
+
+ remaining -= xorlen;
+ available = xorlen;
+ readPos = 0;
+ readPos = 0;
+ }
+
+ private static void readExact(InputStream in, byte[] bufs, int toRead)
+ throws IOException {
+ int tread = 0;
+ while (tread < toRead) {
+ int read = in.read(bufs, tread, toRead - tread);
+ if (read == -1) {
+ // If the stream ends, fill in zeros.
+ Arrays.fill(bufs, tread, toRead, (byte)0);
+ tread = toRead;
+ } else {
+ tread += read;
+ }
+ }
+ assert tread == toRead;
+ }
+
+}
+
|