tez-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hui Zheng <huzh...@yahoo-corp.jp>
Subject Re: Tez can't execute MRR jobs as a tez job when intermediate reduce has different input/output types
Date Mon, 26 Jan 2015 02:50:05 GMT
Hi,

Actually we have already many mapreduce jobs which are running in
production environment.
And we want to find a easy way to change MRR jobs to a tez job to avoid
reading and writing hdfs.
So I find that we may only change the configuration(not use
MultiStageMRConfigUtil class directly ) to implement it.Is it obsolete or
not recommended?
(we use tez-0.5.2)

The way is use the“mrr.intermediate.num-stages”property for a job and use
the“mrr.intermediate.stage.STAGE_NUM”prefix for each intermediate reduce
of the job such as follows.

<property>
      <name>mrr.intermediate.num-stages</name>
     <value>1</value>
     </property>

<property>
     <name>mrr.intermediate.stage.1.mapreduce.job.reduce.class</name>
     
<value>org.apache.hadoop.examples.ConfigableWordCount$ConfigableIntSumReduc
er</value>
     </property>

     <property>
     <name>mrr.intermediate.stage.1.mapreduce.map.output.key.class</name>
     <value>org.apache.hadoop.io.IntWritable</value>
     </property>

      <property>
     <name>mrr.intermediate.stage.1.mapreduce.map.output.value.class</name>
     <value>org.apache.hadoop.io.Text</value>
     </property>

Thanks


-Hui



On 2015/01/24 6:32, "Siddharth Seth" <sseth@apache.org> wrote:

>Adding to that, MultiStageMRConfigUtil is not meant to be used by external
>projects. Support of this mechanism for multi-stage jobs is supposed to be
>removed - noone's gotten around to doing this yet, but there's a jira open
>to remove it.
>
>Using the DAG API to set this up should be possible.
>
>On Fri, Jan 23, 2015 at 9:20 AM, Hitesh Shah <hitesh@apache.org> wrote:
>
>> Hello Hui,
>>
>> Thanks for catching and reporting this issue. Before we go about looking
>> at a fix for this, I will provide some background.
>>
>> In the early days of Tez, we tried to change existing MR jobs to
>>support a
>> chain of MRR and ended up using Config/JobConf as a way to specify
>> intermediate stages and also to configure them correctly. We realized
>>that
>> writing MRR pipelines using the JobClient API was quite unwieldy and
>>hard
>> to understand as it relied on setting a bunch of configs. At some
>>point, we
>> start cleaning up the Tez API to make it more easy to write MRR jobs in
>>a
>> more easy to maintain manner. In that respect, have you had a chance to
>> look at the latest OrderedWordCount code in tez-examples? It shows you
>>how
>> to write an MRR job in Tez by using Tez native APIs ( DAG, Edge, Vertex
>>)
>> instead of messing with config properties in JobConf. This might be an
>> easier approach if you are considering using Tez for MRR+ pipelines.
>>
>> In any case, for the issue that you have seen, would you mind filing a
>> jira for this ( please mention what version of Tez you are using ) and
>> possibly helping us by submitting up with a patch for the fix? There
>>was a
>> function aptly named doJobClientMagic() ( removed in recent times ),
>>that
>> did a second pass over the configs and setup things correctly for the
>>case
>> that you describe. I am not sure if removing that somehow introduced
>>this
>> bug.
>>
>> thanks
>> ― Hitesh
>>
>>
>> On Jan 22, 2015, at 9:13 PM, Hui Zheng <huzheng@yahoo-corp.jp> wrote:
>>
>> > Hi,
>> >
>> > We want to use "MultiStageMRConfigUtil" of Tez to convert MRR jobs to
>> one Tez job,but it doesn't work when the intermediate reduce has
>>different
>> input/output types. Please see the details below.
>> >
>> > Suppose that we have two mapreduce jobs to implement the
>> ordered-wordcount job which count the number of occurrences of word and
>> sort them.
>> >
>> > Job1 is a traditional wordcount job except the output is <counts,word>
>> pair. We call the mapper "Mapper1" and call the reducer "Reducer1".
>> >
>> > Job2 sort the word by the number of occurrences.We call the mapper
>> "Mapper2" which has no any logic and call the reducer "Reducer2".
>> >
>> > By MapReduce Jobs we have: Mapper1--(shuffle)-->Reducer1 --(hdfs)-->
>> Mapper2 --(shuffle)--> Reducer2
>> >
>> > By "MultiStageMRConfigUtil" we want convert it to TEZ job such as:
>> Mapper1--(shuffle)-->Reducer1 --(shuffle)--> Reducer2
>> >
>> > Here Reducer1 is the intermediate reduce and it's input type is
>> <IntWritable,Text> but the output is <Text,IntWritable>.
>> >
>> > Because the following error happened it didn't work.
>> >
>> > 5/01/15 18:13:36 INFO mapreduce.Job: Job job_1416985127132_3432630
>> failed with state FAILED due to: Vertex failed, vertexName=ivertex1,
>> vertexId=vertex_1416985127132_3432630_1_01, diagnostics=[Task failed,
>> taskId=task_1416985127132_3432630_1_01_000000, diagnostics=[TaskAttempt
>>0
>> failed, info=[Error: Failure while running
>> task:java.lang.ClassCastException: org.apache.hadoop.io.IntWritable
>>cannot
>> be cast to org.apache.hadoop.io.Text
>> >
>> >        at
>> 
>>org.apache.hadoop.examples.ConfigableWordCount$ConfigableIntSumReducer.re
>>duce(ConfigableWordCount.java:71)
>> >        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
>> >        at
>> 
>>org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.runNewReducer(R
>>educeProcessor.java:331)
>> >        at
>> 
>>org.apache.tez.mapreduce.processor.reduce.ReduceProcessor.run(ReduceProce
>>ssor.java:143)
>> >        at
>> 
>>org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcess
>>orRuntimeTask.java:324)
>> >        at
>> 
>>org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTas
>>kRunner.java:176)
>> >        at
>> 
>>org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTas
>>kRunner.java:168)
>> >        at java.security.AccessController.doPrivileged(Native Method)
>> >        at javax.security.auth.Subject.doAs(Subject.java:415)
>> >
>> >
>> > I found the YARNRunner Class uses stageConfs[i-1] to determine the
>> Reducer1's input when it creates the edge of DAG while the
>>ReduceProcessor
>> Class uses  stageConfs[i] to determine his(Reducer1) input.
>> >
>> > But in fact the setting of stageConfs[i] is the Reducer1's output, not
>> its input. ReduceProcessor should have used stageConfs[i-1]'s setting as
>> YARNRunner does. ( In this case 'i' is 1)
>> >
>> > -------------------------------------------------------------------
>> >
>> > //in createDAG() of org.apache.tez.mapreduce.client.YARNRunner.java
>> >
>> > for (int i = 0; i < stageConfs.length; i++) {
>> >
>> >    //  use stageConfs[i] to create vertex(in our case it is a
>> ReduceProcessor)
>> >    //  then the ReduceProcessor is created and input is determined
>>also
>> by stageConfs[i]
>> >    //  Class keyClass =
>> ConfigUtils.getIntermediateInputKeyClass(jobConf);  //it will be
>> TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS in ReduceProcessor.java
>> >    //  Class valueClass =
>> ConfigUtils.getIntermediateInputValueClass(jobConf);  //it will be
>> TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS in ReduceProcessor.java
>> >
>> >    vertices[i] = createVertexForStage(stageConfs[i],
>>jobLocalResources,
>> >        i == 0 ? mapInputLocations : reduceInputLocations,
>> i,stageConfs.length);
>> > }
>> >
>> > ...
>> > // use stageConfs[i-1] to create edge and its input which should be
>>the
>> same as reduce's input
>> > // but the reduce's input uses stageConfs[i] as above so they are
>>maybe
>> incompatible.
>> > OrderedPartitionedKVEdgeConfig edgeConf =
>> >    OrderedPartitionedKVEdgeConfig.newBuilder(stageConfs[i - 1].get(
>> >        TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS),
>> >    stageConfs[i -
>> 1].get(TezRuntimeConfiguration.TEZ_RUNTIME_VALUE_CLASS),
>> >    MRPartitioner.class.getName(), partitionerConf)
>> >    .configureInput().useLegacyInput().done()
>> >    .setFromConfiguration(stageConfs[i - 1]).build();
>> > Edge edge = Edge.create(vertices[i - 1], vertices[i],
>> edgeConf.createDefaultEdgeProperty());
>> > dag.addEdge(edge);
>> >
>> > -------------------------------------------------------------------
>> >
>> > In ReduceProcessor it can't read  stageConfs[i-1], so I simply add two
>> settings to let ReduceProcessor read. Then it does work well(But I think
>> the best way is to let ReduceProcessor read stageConfs[i-1]).
>> >
>> > -------------------------------------------------------------------
>> >
>> > //"mapreduce.reduce.input.key.class" and
>> "mapreduce.reduce.input.value.class" are the new settings added by us.
>> > diff
>> 
>>src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.j
>>ava
>> ReduceProcessor.java.OLD
>> > 112,113c112,113
>> > <     Class keyClass =
>> jobConf.getClass("mapreduce.reduce.input.key.class",null,Object.class);
>> > <     Class valueClass =
>> 
>>jobConf.getClass("mapreduce.reduce.input.value.class",null,Object.class);
>> > ---
>> >>    Class keyClass =
>>ConfigUtils.getIntermediateInputKeyClass(jobConf);
>> >>    Class valueClass =
>> ConfigUtils.getIntermediateInputValueClass(jobConf);
>> >
>> > -------------------------------------------------------------------
>> >
>> >
>> > Thanks
>> >
>> > - Hui
>>
>>


Mime
View raw message