[ https://issues.apache.org/jira/browse/MRUNIT-165?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yoni Ben-Meshulam updated MRUNIT-165: ------------------------------------- Description: MapReduceDriver calls the Mapper#run method for each input, causing the Mapper#cleanup method to be called multiple times. I believe this is a bug, since the contract in MapReduce is that, for a single Mapper instance, the Mapper#cleanup method is only called once after all inputs to that mapper have been processed. I might be mistaken in my assumption here. This would not be an issue, were it not for the fact that MapReduceDriver has only a single instance of Mapper. One solution might be to pass the Mapper _class_ into the MapReduceDriver and create a new instance for each input. Another solution might be to call the MapDriver with multiple inputs (which AFAIK is not possible). ---- To reproduce, create a MapReduce job with some stateful mapper: {code} public class StatefulMapper extends Mapper { public static final Text KEY = new Text("SomeKey"); private Int someState = 0; /** * Increment someState for each input. * * @param context the Hadoop job Map context * @throws java.io.IOException */ @Override public void map( LongWritable key, Text value, Context context ) throws IOException, InterruptedException { this.someState += 1; } /** * Runs once after all maps have occurred. Dumps the accumulated state to the output. * @param context the Hadoop job Map context */ @Override protected void cleanup(Context context) throws IOException, InterruptedException { context.write(this.KEY, new IntWritable(this.someState)); } } {code} was: MapReduceDriver calls the Mapper#run method for each input, causing the Mapper#cleanup method to be called multiple times. I believe this is a bug, since the contract in MapReduce is that, for a single Mapper instance, the Mapper#cleanup method is only called once after all inputs to that mapper have been processed. I might be mistaken in my assumption here. This would not be an issue, were it not for the fact that MapReduceDriver has only a single instance of Mapper. One solution might be to pass the Mapper _class_ into the MapReduceDriver and create a new instance for each input. Another solution might be to call the MapDriver with multiple inputs (which AFAIK is not possible). ---- To reproduce, create a MapReduce job with some stateful mapper: {code} public class ClosedFormRegressionMapper extends Mapper { public static final Text KEY = new Text("SomeKey"); private Int someState = 0; /** * Increment someState for each input. * * @param context the Hadoop job Map context * @throws java.io.IOException */ @Override public void map( LongWritable key, Text value, Context context ) throws IOException, InterruptedException { this.someState += 1; } /** * Runs once after all maps have occurred. Dumps the accumulated state to the output. * @param context the Hadoop job Map context */ @Override protected void cleanup(Context context) throws IOException, InterruptedException { context.write(this.KEY, new IntWritable(this.someState)); } } {code} > MapReduceDriver calls Mapper#cleanup for each input instead of once > ------------------------------------------------------------------- > > Key: MRUNIT-165 > URL: https://issues.apache.org/jira/browse/MRUNIT-165 > Project: MRUnit > Issue Type: Bug > Affects Versions: 0.9.0 > Reporter: Yoni Ben-Meshulam > > MapReduceDriver calls the Mapper#run method for each input, causing the Mapper#cleanup method to be called multiple times. > I believe this is a bug, since the contract in MapReduce is that, for a single Mapper instance, the Mapper#cleanup method is only called once after all inputs to that mapper have been processed. I might be mistaken in my assumption here. > This would not be an issue, were it not for the fact that MapReduceDriver has only a single instance of Mapper. > One solution might be to pass the Mapper _class_ into the MapReduceDriver and create a new instance for each input. Another solution might be to call the MapDriver with multiple inputs (which AFAIK is not possible). > ---- > To reproduce, create a MapReduce job with some stateful mapper: > {code} > public class StatefulMapper extends Mapper { > public static final Text KEY = new Text("SomeKey"); > private Int someState = 0; > /** > * Increment someState for each input. > * > * @param context the Hadoop job Map context > * @throws java.io.IOException > */ > @Override > public void map( > LongWritable key, > Text value, > Context context > ) throws IOException, InterruptedException { > this.someState += 1; > } > /** > * Runs once after all maps have occurred. Dumps the accumulated state to the output. > * @param context the Hadoop job Map context > */ > @Override > protected void cleanup(Context context) throws IOException, InterruptedException { > context.write(this.KEY, new IntWritable(this.someState)); > } > } > {code} -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira