tez-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hitesh Shah <hit...@apache.org>
Subject Re: Tez DAG question
Date Mon, 16 Jun 2014 20:04:20 GMT
Hi Jeff

A few questions to clarify your use-case:
   - You seem to have clarified that you need each stage to write to HDFS but just wanted
to confirm in any case whether this is a strict requirement or something just done for faster
recovery in case of failures? 
   - Would I be correct in saying that you want to send the data generated by R1 to both M2
and M3 and also write a replica of that data set to HDFS? 
   - Does the data sent from R1 also need to be sorted and partitioned before being sent to
the downstream vertices?
   - How much of your logic inside the Mappers and Reducers is tied to MapReduce? If you are
wiling to write your own processor instead of using a Mapper/Reducer, you will probably be
able to leverage more performance benefits. For example. the logic in M2 and M3 could possibly
be combined into a single vertex/processor. The single processor could write the required
output to HDFS that M3 would have generated and likewise generate the required intermediate
data needed by R2.

From a Tez point of view, the MapProcessor and ReduceProcessor were written pretty much to
provide MR compatibility when strictly used in an M->R dag or M->R->R…->R (
straight-line DAGs ). They do not handle multiple outputs nor multiple inputs.

That said, I believe your use-case should be something that can be addressed in Tez. However,
there are a couple of things lacking:
   - Support for something called a “shared edge”. This effectively means a vertex generating
data on a given Output and that same data being sent downstream to different downstream Inputs
( edges ). Today, it is a strict 1:1 relationship
   - An edge that uses HDFS to transfer data has not been built yet. This would allow R1 to
write data to HDFS and have M2 and M3 read from HDFS. In your use-case, today, one would need
generate data twice - one for the shuffle edge and one for HDFS and have the shuffle edge
data being sent downstream. But that would not be supported by the Map/Reduce Processors.

thanks
— Hitesh


On Jun 16, 2014, at 10:15 AM, Jeff Hurt <Jeff.Hurt@clickfox.com> wrote:

> All,
> 
> I have a scenario where I have three Map-Reduce jobs, and I would like to build this
as a Tez DAG.  The basic design is that the first Map-Reduce job (M1,R1) should invoke both
the second Map-Reduce job (M2,R2) and the third Map (M3) job.  In addition, each job should
write out its results to a file in HDFS.
> 
> Graphically, a layout would look like this:
> 
>          M1
>          |
>          R1    (R1 writes output to HDFS)
>          |
>      M2 --- M3  (M3 has no reducer, writes output to HDFS)
>      |
>      R2  (R2 writes output to HDFS)
> 
> The results of R1 would be written out to HDFS, and would also be used as the inputs
to both M2 and M3.
> 
> But, we have not been able to get this functionality to work.  Errors show up whenever
our DAG contains more than just the first Map-Reduce job.
> 
> Here is the pseudocode:
> 
>        final byte[] map1Payload = MRHelpers.createUserPayloadFromConf(map1Conf);
>        final byte[] map1InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map1Payload,
>            Text.class.getName());
> 
>        final Vertex map1Vertex = new Vertex("M1",
>            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map1Payload),
-1,
>            MRHelpers.getMapResource(map1Conf));
>        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map1Conf));
> 
>        final Map<String, String> map1Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(map1Conf, map1Env, true);
>        map1Vertex.setTaskEnvironment(map1Env);
> 
>        final Class<? extends TezRootInputInitializer> initializerClazz = MRInputAMSplitGenerator.class;
>        MRHelpers.addMRInput(map1Vertex, map1InputPayload, initializerClazz);
> 
>        final byte[] reduce1Payload = MRHelpers.createUserPayloadFromConf(reduce1Conf);
>        final Vertex reduce1Vertex = new Vertex("R1",
>            new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce1Payload),
1,
>            MRHelpers.getReduceResource(reduce1Conf));
>        reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce1Conf));
> 
>        final Map<String, String> reduce1Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(reduce1Conf, reduce1Env, false);
>        reduceVertex.setTaskEnvironment(reduce1Env);
> 
>        MRHelpers.addMROutputLegacy(reduce1Vertex, reduce1Payload);
> 
>        final byte[] map2Payload = MRHelpers.createUserPayloadFromConf(map2Conf);
>        final byte[] map2InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map2Payload,
>            Text.class.getName());
> 
>        final Vertex map2Vertex = new Vertex("M2",
>            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map2Payload),
-1,
>            MRHelpers.getMapResource(map2Conf));
>        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map2Conf));
> 
>        final Map<String, String> map2Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(map2Conf, map2Env, true);
>        map2Vertex.setTaskEnvironment(map2Env);
> 
>        final byte[] reduce2Payload = MRHelpers.createUserPayloadFromConf(reduce2Conf);
>        final Vertex reduce2Vertex = new Vertex("R2",
>            new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce2Payload),
1,
>            MRHelpers.getReduceResource(reduce2Conf));
>        reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce2Conf));
> 
>        final Map<String, String> reduce2Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(reduce2Conf, reduce2Env, false);
>        reduceVertex.setTaskEnvironment(reduce2Env);
> 
>        MRHelpers.addMROutputLegacy(reduce2Vertex, reduce2Payload);
> 
>        final byte[] map3Payload = MRHelpers.createUserPayloadFromConf(map3Conf);
>        final byte[] map3InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map3Payload,
>            Text.class.getName());
> 
>        final Vertex map3Vertex = new Vertex("M3",
>            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map3Payload),
-1,
>            MRHelpers.getMapResource(map3Conf));
>        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map3Conf));
> 
>        final Map<String, String> map3Env = new HashMap<String, String>();
>        MRHelpers.updateEnvironmentForMRTasks(map3Conf, map3Env, true);
>        map2Vertex.setTaskEnvironment(map3Env);
>       MRHelpers.addMROutputLegacy(map3Vertex, map3Payload);
> 
>       DAG dag = new DAG();
>        dag.addEdge(new Edge(map1Vertex, reduce1Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
> 
>        dag.addEdge(new Edge(reduce1Vertex, map2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
> 
>        dag.addEdge(new Edge(reduce1Vertex, map3Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
> 
>        dag.addEdge(new Edge(map2Vertex, reduce2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
>            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
>                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));
> 
> 
> 
> 
> (Note the use of "MRHelpers.addMROutputLegacy" - it is placed on each reducer)
> 
> We have noticed that whenever we ONLY run Map-Reduce 1 (M1,R1); everything works fine.
 But when we add Map-Reduce 2 or Map 3; we start to get errors.
> 
> Is there a way to have multiple vertices write output to HDFS in the same DAG?  Are there
code examples of doing this?
> 
> FYI:  We are using HDP 2.1, with Tez 0.4.0-incubating.
> 
> Thanks in advance,
> Jeff Hurt


Mime
View raw message