hadoop-mapreduce-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Arsen Zahray (Created) (JIRA)" <j...@apache.org>
Subject [jira] [Created] (MAPREDUCE-3106) Replacing Mapper with MultithreadedMapper causes the job to crash with "Type mismatch in key from map"
Date Tue, 27 Sep 2011 07:29:13 GMT
Replacing Mapper with MultithreadedMapper causes the job to crash with "Type mismatch in key
from map" 
-------------------------------------------------------------------------------------------------------

                 Key: MAPREDUCE-3106
                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-3106
             Project: Hadoop Map/Reduce
          Issue Type: Bug
    Affects Versions: 0.20.203.0
            Reporter: Arsen Zahray
            Priority: Blocker


I have a hadoop job, which works perfectly fine when done with a class implementing Mapper.
When I do replace Mapper with MultithreadMapper, the job crashes with following message:

java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.IntWritable,
recieved org.apache.hadoop.io.LongWritable
	at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:862)
	at org.apache.hadoop.mapred.MapTask$NewOutputCollector.write(MapTask.java:549)
	at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
	at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$SubMapRecordWriter.write(MultithreadedMapper.java:211)
	at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
	at org.apache.hadoop.mapreduce.Mapper.map(Mapper.java:124)
	at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144)
	at org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper$MapRunner.run(MultithreadedMapper.java:264)

Here are the relevant source codes:

public class MapReduceMain {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		try {
			if (args.length != 2) {
				System.err.println("Usage: MapReduceMain <input path> <output path>");
				System.exit(123);
			}
			Job job = new Job();
			job.setJarByClass(MapReduceMain.class);
			job.setInputFormatClass(TextInputFormat.class);
			FileSystem fs = FileSystem.get(URI.create(args[0]), job.getConfiguration());
			FileStatus[] files = fs.listStatus(new Path(args[0]));
			for(FileStatus sfs:files){
				FileInputFormat.addInputPath(job, sfs.getPath());
			}
			FileOutputFormat.setOutputPath(job, new Path(args[1]));
			
			job.setMapperClass(MyMultithreadMapper.class);
			job.setReducerClass(MyReducer.class);
			MultithreadedMapper.setNumberOfThreads(job, MyMultithreadMapper.nThreads);

			job.setOutputKeyClass(IntWritable.class); 
			job.setOutputValueClass(MyPage.class);
			
			job.setOutputFormatClass(SequenceFileOutputFormat.class);
			
			System.exit(job.waitForCompletion(true) ? 0 : 1);
		} catch (Exception e) {
			e.printStackTrace();
		}

	}


public class MyMultithreadMapper extends MultithreadedMapper<LongWritable, Text, IntWritable,
MyPage> {

	ConcurrentLinkedQueue<MyScraper>	scrapers	= new ConcurrentLinkedQueue<MyScraper>();

	public static final int				nThreads	= 5;

	public VrboMultithreadMapper() {
		for (int i = 0; i < nThreads; i++) {
			scrapers.add(new MyScraper());
		}
	}

	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
		MyScraper scraper = scrapers.poll();

		MyPage result = null;
		for (int i = 0; i < 10; i++) {
			try {
				result = scraper.scrapPage(value.toString(), true);
				break;
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

		if (result == null) {
			result = new MyPage();
			result.setUrl(key.toString());
		}

		context.write(new IntWritable(result.getUrl().hashCode()), result);

		scrapers.add(scraper);
	}
}


and here's the code for the working mapper class, just to be sure:

public class MyMapper extends Mapper<LongWritable, Text, IntWritable,MyPage> {
	MyScraper scr = new MyScraper();
	
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
		MyPage result =null;
		for(int i=0;i<10;i++){
			try{
				result = scr.scrapPage(value.toString(), true);
				break;
			}catch(Exception e){
				e.printStackTrace();
			}
		}
		
		if(result==null){
			result = new MyPage();
			result.setUrl(key.toString());
		}
		
		context.write(new IntWritable(result.getUrl().hashCode()),result);
	}
}


This appears to be a hadoop bug

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Mime
View raw message