Hui Zheng
Tez can't execute MRR jobs as a tez job when intermediate reduce has different input/output types
Fri, 23 Jan 2015 05:13:50 GMT

We want to use "MultiStageMRConfigUtil" of Tez to convert MRR jobs to one Tez job,but it doesn't
work when the intermediate reduce has different input/output types. Please see the details

Suppose that we have two mapreduce jobs to implement the ordered-wordcount job which count
the number of occurrences of word and sort them.

Job1 is a traditional wordcount job except the output is <counts,word> pair. We call
the mapper "Mapper1" and call the reducer "Reducer1".

Job2 sort the word by the number of occurrences.We call the mapper "Mapper2" which has no
any logic and call the reducer "Reducer2".

By MapReduce Jobs we have: Mapper1--(shuffle)-->Reducer1 --(hdfs)--> Mapper2 --(shuffle)-->

By "MultiStageMRConfigUtil" we want convert it to TEZ job such as: Mapper1--(shuffle)-->Reducer1
--(shuffle)--> Reducer2

Here Reducer1 is the intermediate reduce and it's input type is <IntWritable,Text> but
the output is <Text,IntWritable>.

Because the following error happened it didn't work.

 5/01/15 18:13:36 INFO mapreduce.Job: Job job_1416985127132_3432630 failed with state FAILED
due to: Vertex failed, vertexName=ivertex1, vertexId=vertex_1416985127132_3432630_1_01, diagnostics=[Task
failed, taskId=task_1416985127132_3432630_1_01_000000, diagnostics=[TaskAttempt 0 failed,
info=[Error: Failure while running task:java.lang.ClassCastException: org.apache.hadoop.io.IntWritable
cannot be cast to org.apache.hadoop.io.Text

        at org.apache.hadoop.examples.ConfigableWordCount$ConfigableIntSumReducer.reduce(ConfigableWordCount.java:71)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
        at org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.runNewReducer(ReduceProcessor.java:331)
        at org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.run(ReduceProcessor.java:143)
        at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
        at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:176)
        at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:168)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)

I found the YARNRunner Class uses stageConfs[i-1] to determine the Reducer1's input when it
creates the edge of DAG while the ReduceProcessor Class uses  stageConfs[i] to determine his(Reducer1)

But in fact the setting of stageConfs[i] is the Reducer1's output, not its input. ReduceProcessor
should have used stageConfs[i-1]'s setting as YARNRunner does. ( In this case 'i' is 1)


//in createDAG() of org.apache.tez.mapreduce.client.YARNRunner.java

for (int i = 0; i < stageConfs.length; i++) {

    //  use stageConfs[i] to create vertex(in our case it is a ReduceProcessor)
    //  then the ReduceProcessor is created and input is determined also by stageConfs[i]
    //  Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);  //it will be
TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS in ReduceProcessor.java
    //  Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf);  //it will
be TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS in ReduceProcessor.java

    vertices[i] = createVertexForStage(stageConfs[i], jobLocalResources,
        i == 0 ? mapInputLocations : reduceInputLocations, i,stageConfs.length);

// use stageConfs[i-1] to create edge and its input which should be the same as reduce's input
// but the reduce's input uses stageConfs[i] as above so they are maybe incompatible.
OrderedPartitionedKVEdgeConfig edgeConf =
    OrderedPartitionedKVEdgeConfig.newBuilder(stageConfs[i - 1].get(
    stageConfs[i - 1].get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS),
    MRPartitioner.class.getName(), partitionerConf)
    .setFromConfiguration(stageConfs[i - 1]).build();
Edge edge = Edge.create(vertices[i - 1], vertices[i], edgeConf.createDefaultEdgeProperty());


In ReduceProcessor it can't read  stageConfs[i-1], so I simply add two settings to let ReduceProcessor
read. Then it does work well(But I think the best way is to let ReduceProcessor read stageConfs[i-1]).


//"mapreduce.reduce.input.key.class" and "mapreduce.reduce.input.value.class" are the new
settings added by us.
diff src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java ReduceProcessor.java.OLD
<     Class keyClass = jobConf.getClass("mapreduce.reduce.input.key.class",null,Object.class);
<     Class valueClass = jobConf.getClass("mapreduce.reduce.input.value.class",null,Object.class);
>     Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
>     Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf);



- Hui

