tez-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hitesh Shah <hit...@apache.org>
Subject Re: Tez can't execute MRR jobs as a tez job when intermediate reduce has different input/output types
Date Fri, 23 Jan 2015 17:20:15 GMT
Hello Hui, 

Thanks for catching and reporting this issue. Before we go about looking at a fix for this,
I will provide some background.

In the early days of Tez, we tried to change existing MR jobs to support a chain of MRR and
ended up using Config/JobConf as a way to specify intermediate stages and also to configure
them correctly. We realized that writing MRR pipelines using the JobClient API was quite unwieldy
and hard to understand as it relied on setting a bunch of configs. At some point, we start
cleaning up the Tez API to make it more easy to write MRR jobs in a more easy to maintain
manner. In that respect, have you had a chance to look at the latest OrderedWordCount code
in tez-examples? It shows you how to write an MRR job in Tez by using Tez native APIs ( DAG,
Edge, Vertex ) instead of messing with config properties in JobConf. This might be an easier
approach if you are considering using Tez for MRR+ pipelines.

In any case, for the issue that you have seen, would you mind filing a jira for this ( please
mention what version of Tez you are using ) and possibly helping us by submitting up with
a patch for the fix? There was a function aptly named doJobClientMagic() ( removed in recent
times ), that did a second pass over the configs and setup things correctly for the case that
you describe. I am not sure if removing that somehow introduced this bug.

― Hitesh 

On Jan 22, 2015, at 9:13 PM, Hui Zheng <huzheng@yahoo-corp.jp> wrote:

> Hi,
> We want to use "MultiStageMRConfigUtil" of Tez to convert MRR jobs to one Tez job,but
it doesn't work when the intermediate reduce has different input/output types. Please see
the details below.
> Suppose that we have two mapreduce jobs to implement the ordered-wordcount job which
count the number of occurrences of word and sort them.
> Job1 is a traditional wordcount job except the output is <counts,word> pair. We
call the mapper "Mapper1" and call the reducer "Reducer1".
> Job2 sort the word by the number of occurrences.We call the mapper "Mapper2" which has
no any logic and call the reducer "Reducer2".
> By MapReduce Jobs we have: Mapper1--(shuffle)-->Reducer1 --(hdfs)--> Mapper2 --(shuffle)-->
> By "MultiStageMRConfigUtil" we want convert it to TEZ job such as: Mapper1--(shuffle)-->Reducer1
--(shuffle)--> Reducer2
> Here Reducer1 is the intermediate reduce and it's input type is <IntWritable,Text>
but the output is <Text,IntWritable>.
> Because the following error happened it didn't work.
> 5/01/15 18:13:36 INFO mapreduce.Job: Job job_1416985127132_3432630 failed with state
FAILED due to: Vertex failed, vertexName=ivertex1, vertexId=vertex_1416985127132_3432630_1_01,
diagnostics=[Task failed, taskId=task_1416985127132_3432630_1_01_000000, diagnostics=[TaskAttempt
0 failed, info=[Error: Failure while running task:java.lang.ClassCastException: org.apache.hadoop.io.IntWritable
cannot be cast to org.apache.hadoop.io.Text
>        at org.apache.hadoop.examples.ConfigableWordCount$ConfigableIntSumReducer.reduce(ConfigableWordCount.java:71)
>        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
>        at org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.runNewReducer(ReduceProcessor.java:331)
>        at org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.run(ReduceProcessor.java:143)
>        at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
>        at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:176)
>        at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:168)
>        at java.security.AccessController.doPrivileged(Native Method)
>        at javax.security.auth.Subject.doAs(Subject.java:415)
> I found the YARNRunner Class uses stageConfs[i-1] to determine the Reducer1's input when
it creates the edge of DAG while the ReduceProcessor Class uses  stageConfs[i] to determine
his(Reducer1) input.
> But in fact the setting of stageConfs[i] is the Reducer1's output, not its input. ReduceProcessor
should have used stageConfs[i-1]'s setting as YARNRunner does. ( In this case 'i' is 1)
> -------------------------------------------------------------------
> //in createDAG() of org.apache.tez.mapreduce.client.YARNRunner.java
> for (int i = 0; i < stageConfs.length; i++) {
>    //  use stageConfs[i] to create vertex(in our case it is a ReduceProcessor)
>    //  then the ReduceProcessor is created and input is determined also by stageConfs[i]
>    //  Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);  //it will
be TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS in ReduceProcessor.java
>    //  Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf);  //it
will be TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS in ReduceProcessor.java
>    vertices[i] = createVertexForStage(stageConfs[i], jobLocalResources,
>        i == 0 ? mapInputLocations : reduceInputLocations, i,stageConfs.length);
> }
> ...
> // use stageConfs[i-1] to create edge and its input which should be the same as reduce's
> // but the reduce's input uses stageConfs[i] as above so they are maybe incompatible.
> OrderedPartitionedKVEdgeConfig edgeConf =
>    OrderedPartitionedKVEdgeConfig.newBuilder(stageConfs[i - 1].get(
>        TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS),
>    stageConfs[i - 1].get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS),
>    MRPartitioner.class.getName(), partitionerConf)
>    .configureInput().useLegacyInput().done()
>    .setFromConfiguration(stageConfs[i - 1]).build();
> Edge edge = Edge.create(vertices[i - 1], vertices[i], edgeConf.createDefaultEdgeProperty());
> dag.addEdge(edge);
> -------------------------------------------------------------------
> In ReduceProcessor it can't read  stageConfs[i-1], so I simply add two settings to let
ReduceProcessor read. Then it does work well(But I think the best way is to let ReduceProcessor
read stageConfs[i-1]).
> -------------------------------------------------------------------
> //"mapreduce.reduce.input.key.class" and "mapreduce.reduce.input.value.class" are the
new settings added by us.
> diff src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java ReduceProcessor.java.OLD
> 112,113c112,113
> <     Class keyClass = jobConf.getClass("mapreduce.reduce.input.key.class",null,Object.class);
> <     Class valueClass = jobConf.getClass("mapreduce.reduce.input.value.class",null,Object.class);
> ---
>>    Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
>>    Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf);
> -------------------------------------------------------------------
> Thanks
> - Hui

View raw message