hi,maillist:
i try to understand how the FileInputFormat work,and i do the
following things,but it seems not work,and i do not know why,hope anyone
can shed light on it
i change the wordcount code add one line on run function , and
MyTextInputFormat just a copy of TextInputFormat
job.setInputFormatClass(MyTextInputFormat.class);
and i also modify the MyTextInputFormat.java , and MyFileInputFormat just
another copy from FileInputFormat,so i can modify it freely and observe
public class MyTextInputFormat extends FileInputFormat<LongWritable,
Text> => public class MyTextInputFormat extends
MyFileInputFormat<LongWritable, Text>
finally i compile and run the program ,but get error like this, i do not
know why
# hadoop yarndemo/BmAlphaToNum /alex/messages /alex/output8
13/12/24 09:10:15 WARN conf.Configuration: session.id is deprecated.
Instead, use dfs.metrics.session-id
13/12/24 09:10:15 INFO jvm.JvmMetrics: Initializing JVM Metrics with
processName=JobTracker, sessionId=
13/12/24 09:10:15 WARN mapred.JobClient: Use GenericOptionsParser for
parsing the arguments. Applications should implement Tool for the same.
13/12/24 09:10:15 WARN mapred.JobClient: No job jar file set. User classes
may not be found. See JobConf(Class) or JobConf#setJar(String).
13/12/24 09:10:15 INFO yarndemo.MyFileInputFormat: in getsplits
13/12/24 09:10:15 INFO yarndemo.MyFileInputFormat: jumpppppppppp into
liststatus!!
13/12/24 09:10:15 INFO mapred.JobClient: Cleaning up the staging area
file:/data/temp/mapred/staging/root866865048/.staging/job_local866865048_0001
13/12/24 09:10:15 ERROR security.UserGroupInformation:
PriviledgedActionException as:root (auth:SIMPLE) cause:java.io.IOException:
No input paths specified in job
Exception in thread "main" java.io.IOException: No input paths specified in
job
at yarndemo.MyFileInputFormat.listStatus(MyFileInputFormat.java:214)
at yarndemo.MyFileInputFormat.getSplits(MyFileInputFormat.java:284)
at
org.apache.hadoop.mapred.JobClient.writeNewSplits(JobClient.java:1063)
at
org.apache.hadoop.mapred.JobClient.writeSplits(JobClient.java:1080)
at org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:174)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:992)
at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:945)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1408)
at
org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:945)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:566)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:596)
at yarndemo.BmAlphaToNum.run(BmAlphaToNum.java:74)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at yarndemo.BmAlphaToNum.main(BmAlphaToNum.java:78)
here is my java file content
package yarndemo;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class BmAlphaToNum extends Configured implements Tool {
public static class TokenizerMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new
StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException,
InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
private static void usage() throws IOException {
System.err.println("teragen <num rows> <output dir>");
}
public int run(String[] args) throws IOException,
InterruptedException,
ClassNotFoundException {
Job job = Job.getInstance(getConf());
if (args.length != 2) {
usage();
return 2;
}
job.setJobName("wordcount");
job.setJarByClass(BmAlphaToNum.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(MyTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new
BmAlphaToNum(), args);
System.exit(res);
}
}
package yarndemo;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.LineRecordReader;
import org.python.google.common.base.Charsets;
/**
* An {@link InputFormat} for plain text files. Files are broken into lines.
* Either linefeed or carriage-return are used to signal end of line. Keys
are
* the position in the file, and values are the line of text..
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MyTextInputFormat extends MyFileInputFormat<LongWritable,
Text> {
private static final Log LOG =
LogFactory.getLog(MyTextInputFormat.class);
@Override
public RecordReader<LongWritable, Text> createRecordReader(
InputSplit split, TaskAttemptContext context) {
String delimiter = context.getConfiguration().get(
"textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter)
recordDelimiterBytes =
delimiter.getBytes(Charsets.UTF_8);
return new LineRecordReader(recordDelimiterBytes);
}
@Override
protected boolean isSplitable(JobContext context, Path file) {
final CompressionCodec codec = new CompressionCodecFactory(
context.getConfiguration()).getCodec(file);
if (null == codec) {
return true;
}
LOG.info("i split readddddddddd record");
return codec instanceof SplittableCompressionCodec;
}
}
package yarndemo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.annotations.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.InvalidInputException;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
/**
* A base class for file-based {@link InputFormat}s.
*
* <p>
* <code>FileInputFormat</code> is the base class for all file-based
* <code>InputFormat</code>s. This provides a generic implementation of
* {@link #getSplits(JobContext)}. Subclasses of
<code>FileInputFormat</code>
* can also override the {@link #isSplitable(JobContext, Path)} method to
ensure
* input-files are not split-up and are processed as a whole by {@linkMapper}s.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class MyFileInputFormat<K, V> extends InputFormat<K, V> {
public static final String INPUT_DIR =
"mapreduce.input.fileinputformat.inputdir";
public static final String SPLIT_MAXSIZE =
"mapreduce.input.fileinputformat.split.maxsize";
public static final String SPLIT_MINSIZE =
"mapreduce.input.fileinputformat.split.minsize";
public static final String PATHFILTER_CLASS =
"mapreduce.input.pathFilter.class";
public static final String NUM_INPUT_FILES =
"mapreduce.input.fileinputformat.numinputfiles";
private static final Log LOG =
LogFactory.getLog(MyFileInputFormat.class);
private static final double SPLIT_SLOP = 1.1; // 10% slop
private static final PathFilter hiddenFileFilter = new PathFilter(){
public boolean accept(Path p){
String name = p.getName();
return !name.startsWith("_") && !name.startsWith(".");
}
};
/**
* Proxy PathFilter that accepts a path only if all filters given in the
* constructor do. Used by the listPaths() to apply the built-in
* hiddenFileFilter together with a user provided one (if any).
*/
private static class MultiPathFilter implements PathFilter {
private List<PathFilter> filters;
public MultiPathFilter(List<PathFilter> filters) {
this.filters = filters;
}
public boolean accept(Path path) {
for (PathFilter filter : filters) {
if (!filter.accept(path)) {
return false;
}
}
return true;
}
}
/**
* Get the lower bound on split size imposed by the format.
*
* @return the number of bytes of the minimal split for this format
*/
protected long getFormatMinSplitSize() {
return 1;
}
/**
* Is the given filename splitable? Usually, true, but if the file
is stream
* compressed, it will not be.
*
* <code>FileInputFormat</code> implementations can override this
and return
* <code>false</code> to ensure that individual input files are
never
* split-up so that {@link Mapper}s process entire files.
*
* @param context
* the job context
* @param filename
* the file name to check
* @return is this file splitable?
*/
protected boolean isSplitable(JobContext context, Path filename) {
return true;
}
/**
* Set a PathFilter to be applied to the input paths for the
map-reduce job.
*
* @param job
* the job to modify
* @param filter
* the PathFilter class use for filtering the input
paths.
*/
public static void setInputPathFilter(Job job,
Class<? extends PathFilter> filter)
{
job.getConfiguration().setClass(PATHFILTER_CLASS, filter,
PathFilter.class);
}
/**
* Set the minimum input split size
*
* @param job
* the job to modify
* @param size
* the minimum size
*/
public static void setMinInputSplitSize(Job job, long size) {
job.getConfiguration().setLong(SPLIT_MINSIZE, size);
}
/**
* Get the minimum split size
*
* @param job
* the job
* @return the minimum number of bytes that can be in a split
*/
public static long getMinSplitSize(JobContext job) {
return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
}
/**
* Set the maximum split size
*
* @param job
* the job to modify
* @param size
* the maximum split size
*/
public static void setMaxInputSplitSize(Job job, long size) {
job.getConfiguration().setLong(SPLIT_MAXSIZE, size);
}
/**
* Get the maximum split size.
*
* @param context
* the job to look at.
* @return the maximum number of bytes a split can include
*/
public static long getMaxSplitSize(JobContext context) {
return context.getConfiguration()
.getLong(SPLIT_MAXSIZE, Long.MAX_VALUE);
}
/**
* Get a PathFilter instance of the filter set for the input paths.
*
* @return the PathFilter instance set for the job, NULL if none has been
set.
*/
public static PathFilter getInputPathFilter(JobContext context) {
Configuration conf = context.getConfiguration();
Class<?> filterClass = conf.getClass(PATHFILTER_CLASS, null,
PathFilter.class);
return (filterClass != null) ?
(PathFilter) ReflectionUtils
.newInstance(filterClass, conf) : null;
}
/** List input directories.
* Subclasses may override to, e.g., select only files matching a regular
* expression.
*
* @param job the job to list input paths for
* @return array of FileStatus objects
* @throws IOException if zero items.
*/
protected List<FileStatus> listStatus(JobContext job) throws
IOException {
List<FileStatus> result = new ArrayList<FileStatus>();
Path[] dirs = getInputPaths(job);
LOG.info("jumpppppppppp into liststatus!!");
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
// get tokens for all the required FileSystems..
TokenCache.obtainTokensForNamenodes(job.getCredentials(),
dirs,
job.getConfiguration());
List<IOException> errors = new ArrayList<IOException>();
// creates a MultiPathFilter with the hiddenFileFilter and the
// user provided one (if any).
List<PathFilter> filters = new ArrayList<PathFilter>();
filters.add(hiddenFileFilter);
PathFilter jobFilter = getInputPathFilter(job);
if (jobFilter != null) {
filters.add(jobFilter);
}
PathFilter inputFilter = new MultiPathFilter(filters);
for (int i = 0; i < dirs.length; ++i) {
Path p = dirs[i];
FileSystem fs =
p.getFileSystem(job.getConfiguration());
FileStatus[] matches = fs.globStatus(p, inputFilter);
if (matches == null) {
errors.add(new IOException("Input path does not exist: " + p));
} else if (matches.length == 0) {
errors.add(new IOException("Input Pattern " + p + " matches 0
files"));
} else {
for (FileStatus globStat: matches) {
if (globStat.isDirectory()) {
for(FileStatus stat: fs.listStatus(globStat.getPath(),
inputFilter)) {
result.add(stat);
}
} else {
result.add(globStat);
}
}
}
}
if (!errors.isEmpty()) {
throw new InvalidInputException(errors);
}
LOG.info("tttttTotal input paths to process : " + result.size());
return result;
}
/**
* A factory that makes the split for this class. It can be overridden
* by sub-classes to make sub-types
*/
protected FileSplit makeSplit(Path file, long start, long length,
String[] hosts) {
return new FileSplit(file, start, length, hosts);
}
/**
* Generate the list of files and make them into FileSplits.
*
* @param job
* the job context
* @throws IOException
*/
public List<InputSplit> getSplits(JobContext job) throws
IOException {
long minSize = Math.max(getFormatMinSplitSize(),
getMinSplitSize(job));
long maxSize = getMaxSplitSize(job);
LOG.info("in getsplits");
// generate splits
List<InputSplit> splits = new ArrayList<InputSplit>();
List<FileStatus> files = listStatus(job);
for (FileStatus file: files) {
Path path = file.getPath();
long length = file.getLen();
if (length != 0) {
FileSystem fs =
path.getFileSystem(job.getConfiguration());
BlockLocation[] blkLocations =
fs.getFileBlockLocations(file,
0, length);
if (isSplitable(job, path)) {
long blockSize =
file.getBlockSize();
long splitSize =
computeSplitSize(blockSize, minSize,
maxSize);
long bytesRemaining = length;
while (((double) bytesRemaining) /
splitSize > SPLIT_SLOP) {
int blkIndex =
getBlockIndex(blkLocations, length
-
bytesRemaining);
splits.add(makeSplit(path,
length - bytesRemaining,
splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
int blkIndex =
getBlockIndex(blkLocations, length
-
bytesRemaining);
splits.add(makeSplit(path,
length - bytesRemaining,
bytesRemaining,
blkLocations[blkIndex].getHosts()));
}
} else { // not splitable
splits.add(makeSplit(path, 0,
length,
blkLocations[0].getHosts()));
}
} else {
//Create empty hosts array for zero length files
splits.add(makeSplit(path, 0, length, new String[0]));
}
}
// Save the number of input files for metrics/loadgen
job.getConfiguration().setLong(NUM_INPUT_FILES,
files.size());
LOG.debug("Total # of splits: " + splits.size());
return splits;
}
protected long computeSplitSize(long blockSize, long minSize, long
maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
protected int getBlockIndex(BlockLocation[] blkLocations,
long offset) {
for (int i = 0 ; i < blkLocations.length; i++) {
// is the offset inside this block?
if ((blkLocations[i].getOffset() <= offset) &&
(offset < blkLocations[i].getOffset() +
blkLocations[i].getLength())){
return i;
}
}
BlockLocation last = blkLocations[blkLocations.length -1];
long fileLength = last.getOffset() + last.getLength() -1;
throw new IllegalArgumentException("Offset " + offset +
" is outside of file (0.." +
fileLength + ")");
}
/**
* Sets the given comma separated paths as the list of inputs for
the
* map-reduce job.
*
* @param job
* the job
* @param commaSeparatedPaths
* Comma separated paths to be set as the list of inputs
for the
* map-reduce job.
*/
public static void setInputPaths(Job job, String
commaSeparatedPaths)
throws IOException {
setInputPaths(job,
StringUtils.stringToPath(
getPathStrings(commaSeparatedPaths)));
}
/**
* Add the given comma separated paths to the list of inputs for the
* map-reduce job.
*
* @param job
* The job to modify
* @param commaSeparatedPaths
* Comma separated paths to be added to the list of
inputs for
* the map-reduce job.
*/
public static void addInputPaths(Job job, String
commaSeparatedPaths)
throws IOException {
for (String str : getPathStrings(commaSeparatedPaths)) {
addInputPath(job, new Path(str));
}
}
/**
* Set the array of {@link Path}s as the list of inputs for the
map-reduce
* job.
*
* @param job
* The job to modify
* @param inputPaths
* the {@link Path}s of the input directories/files for
the
* map-reduce job.
*/
public static void setInputPaths(Job job, Path... inputPaths)
throws IOException {
Configuration conf = job.getConfiguration();
Path path = inputPaths[0].getFileSystem(conf).makeQualified(
inputPaths[0]);
StringBuffer str = new
StringBuffer(StringUtils.escapeString(path.toString()));
for(int i = 1; i < inputPaths.length;i++) {
str.append(StringUtils.COMMA_STR);
path =
inputPaths[i].getFileSystem(conf).makeQualified(
inputPaths[i]);
str.append(StringUtils.escapeString(path.toString()));
}
conf.set(INPUT_DIR, str.toString());
}
/**
* Add a {@link Path} to the list of inputs for the map-reduce job.
*
* @param job
* The {@link Job} to modify
* @param path
* {@link Path} to be added to the list of inputs for the
* map-reduce job.
*/
public static void addInputPath(Job job, Path path) throws
IOException {
Configuration conf = job.getConfiguration();
path = path.getFileSystem(conf).makeQualified(path);
String dirStr = StringUtils.escapeString(path.toString());
String dirs = conf.get(INPUT_DIR);
conf.set(INPUT_DIR, dirs == null ? dirStr : dirs + "," +
dirStr);
}
// This method escapes commas in the glob pattern of the given paths.
private static String[] getPathStrings(String commaSeparatedPaths) {
int length = commaSeparatedPaths.length();
int curlyOpen = 0;
int pathStart = 0;
boolean globPattern = false;
List<String> pathStrings = new ArrayList<String>();
for (int i=0; i<length; i++) {
char ch = commaSeparatedPaths.charAt(i);
switch(ch) {
case '{' : {
curlyOpen++;
if (!globPattern) {
globPattern = true;
}
break;
}
case '}' : {
curlyOpen--;
if (curlyOpen == 0 && globPattern) {
globPattern = false;
}
break;
}
case ',' : {
if (!globPattern) {
pathStrings.add(commaSeparatedPaths.substring(pathStart, i));
pathStart = i + 1 ;
}
break;
}
default:
continue; // nothing special to do for this character
}
}
pathStrings.add(commaSeparatedPaths.substring(pathStart, length));
return pathStrings.toArray(new String[0]);
}
/**
* Get the list of input {@link Path}s for the map-reduce job.
*
* @param context
* The job
* @return the list of input {@link Path}s for the map-reduce job.
*/
public static Path[] getInputPaths(JobContext context) {
String dirs = context.getConfiguration().get(INPUT_DIR, "");
String [] list = StringUtils.split(dirs);
Path[] result = new Path[list.length];
for (int i = 0; i < list.length; i++) {
result[i] = new Path(StringUtils.unEscapeString(list[i]));
}
return result;
}
}
|