tez-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jeff Hurt <Jeff.H...@clickfox.com>
Subject RE: Tez DAG question
Date Mon, 16 Jun 2014 18:13:48 GMT
I first got this error:


2014-06-16 12:43:50,992 INFO [AsyncDispatcher event handler] org.apache.tez.dag.history.HistoryEventHandler:
[HISTORY][DAG:dag_1401404900565_0722_1][Event:TASK_ATTEMPT_FINISHED]: vertexName=map2vertex,
taskAttemptId=attempt_1401404900565_0722_1_02_000000_3, startTime=1402937029418, finishTime=1402937030991,
timeTaken=1573, status=FAILED, diagnostics=Error: java.io.IOException: org.apache.tez.dag.api.TezException:
Only MRInputLegacy supported. Input: class org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy
        at org.apache.tez.mapreduce.processor.map.MapProcessor.run(MapProcessor.java:111)
        at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:307)
        at org.apache.hadoop.mapred.YarnTezDagChild$5.run(YarnTezDagChild.java:564)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1557)
        at org.apache.hadoop.mapred.YarnTezDagChild.main(YarnTezDagChild.java:553)
Caused by: org.apache.tez.dag.api.TezException: Only MRInputLegacy supported. Input: class
org.apache.tez.runtime.library.input.ShuffledMergedInputLegacy
        ... 7 more

I assumed it was because of this:

        dag.addEdge(new Edge(reduce1Vertex, map2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));

(In particular, the usage of "ShuffledMergedInputLegacy" as part of the InputDescriptor constructor.)

So, I changed this line to read:

        dag.addEdge(new Edge(reduce1Vertex, map2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(MRInputLegacy.class.getName()))));


I then ran again, and got this error:


2014-06-16 13:27:51,507 INFO [AsyncDispatcher event handler] org.apache.tez.dag.history.HistoryEventHandler:
[HISTORY][DAG:dag_1401404900565_0727_1][Event:TASK_ATTEMPT_FINISHED]: vertexName=map2vertex,
taskAttemptId=attempt_1401404900565_0727_1_02_000000_3, startTime=1402939670074, finishTime=1402939671506,
timeTaken=1432, status=FAILED, diagnostics=Error: com.google.protobuf.InvalidProtocolBufferException:
Protocol message tag had invalid wire type.
        at com.google.protobuf.InvalidProtocolBufferException.invalidWireType(InvalidProtocolBufferException.java:99)
        at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:498)
        at com.google.protobuf.GeneratedMessage.parseUnknownField(GeneratedMessage.java:193)
        at org.apache.tez.mapreduce.protos.MRRuntimeProtos$MRInputUserPayloadProto.<init>(MRRuntimeProtos.java:1658)
        at org.apache.tez.mapreduce.protos.MRRuntimeProtos$MRInputUserPayloadProto.<init>(MRRuntimeProtos.java:1616)
        at org.apache.tez.mapreduce.protos.MRRuntimeProtos$MRInputUserPayloadProto$1.parsePartialFrom(MRRuntimeProtos.java:1717)
        at org.apache.tez.mapreduce.protos.MRRuntimeProtos$MRInputUserPayloadProto$1.parsePartialFrom(MRRuntimeProtos.java:1712)
        at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193)
        at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49)
        at org.apache.tez.mapreduce.protos.MRRuntimeProtos$MRInputUserPayloadProto.parseFrom(MRRuntimeProtos.java:1936)
        at org.apache.tez.mapreduce.hadoop.MRHelpers.parseMRInputPayload(MRHelpers.java:726)
        at org.apache.tez.mapreduce.input.MRInput.initialize(MRInput.java:122)
        at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask$InitializeInputCallable.call(LogicalIOProcessorRuntimeTask.java:368)
        at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask$InitializeInputCallable.call(LogicalIOProcessorRuntimeTask.java:344)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)

What should be done here?
-Jeff
________________________________
From: Vinod Kumar Vavilapalli [vinodkv@hortonworks.com]
Sent: Monday, June 16, 2014 11:30 AM
To: user@tez.incubator.apache.org
Cc: dev@tez.incubator.apache.org
Subject: Re: Tez DAG question

Can you post the exceptions/error logs?

+Vinod
Hortonworks Inc.
http://hortonworks.com/


On Mon, Jun 16, 2014 at 10:15 AM, Jeff Hurt <Jeff.Hurt@clickfox.com<mailto:Jeff.Hurt@clickfox.com>>
wrote:
All,

I have a scenario where I have three Map-Reduce jobs, and I would like to build this as a
Tez DAG.  The basic design is that the first Map-Reduce job (M1,R1) should invoke both the
second Map-Reduce job (M2,R2) and the third Map (M3) job.  In addition, each job should write
out its results to a file in HDFS.

Graphically, a layout would look like this:

          M1
          |
          R1    (R1 writes output to HDFS)
          |
      M2 --- M3  (M3 has no reducer, writes output to HDFS)
      |
      R2  (R2 writes output to HDFS)

The results of R1 would be written out to HDFS, and would also be used as the inputs to both
M2 and M3.

But, we have not been able to get this functionality to work.  Errors show up whenever our
DAG contains more than just the first Map-Reduce job.

Here is the pseudocode:

        final byte[] map1Payload = MRHelpers.createUserPayloadFromConf(map1Conf);
        final byte[] map1InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map1Payload,
            Text.class.getName());

        final Vertex map1Vertex = new Vertex("M1",
            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map1Payload),
-1,
            MRHelpers.getMapResource(map1Conf));
        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map1Conf));

        final Map<String, String> map1Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(map1Conf, map1Env, true);
        map1Vertex.setTaskEnvironment(map1Env);

        final Class<? extends TezRootInputInitializer> initializerClazz = MRInputAMSplitGenerator.class;
        MRHelpers.addMRInput(map1Vertex, map1InputPayload, initializerClazz);

        final byte[] reduce1Payload = MRHelpers.createUserPayloadFromConf(reduce1Conf);
        final Vertex reduce1Vertex = new Vertex("R1",
            new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce1Payload),
1,
            MRHelpers.getReduceResource(reduce1Conf));
        reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce1Conf));

        final Map<String, String> reduce1Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(reduce1Conf, reduce1Env, false);
        reduceVertex.setTaskEnvironment(reduce1Env);

        MRHelpers.addMROutputLegacy(reduce1Vertex, reduce1Payload);

        final byte[] map2Payload = MRHelpers.createUserPayloadFromConf(map2Conf);
        final byte[] map2InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map2Payload,
            Text.class.getName());

        final Vertex map2Vertex = new Vertex("M2",
            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map2Payload),
-1,
            MRHelpers.getMapResource(map2Conf));
        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map2Conf));

        final Map<String, String> map2Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(map2Conf, map2Env, true);
        map2Vertex.setTaskEnvironment(map2Env);

        final byte[] reduce2Payload = MRHelpers.createUserPayloadFromConf(reduce2Conf);
        final Vertex reduce2Vertex = new Vertex("R2",
            new ProcessorDescriptor(ReduceProcessor.class.getName()).setUserPayload(reduce2Payload),
1,
            MRHelpers.getReduceResource(reduce2Conf));
        reduce1Vertex.setJavaOpts(MRHelpers.getReduceJavaOpts(reduce2Conf));

        final Map<String, String> reduce2Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(reduce2Conf, reduce2Env, false);
        reduceVertex.setTaskEnvironment(reduce2Env);

        MRHelpers.addMROutputLegacy(reduce2Vertex, reduce2Payload);

        final byte[] map3Payload = MRHelpers.createUserPayloadFromConf(map3Conf);
        final byte[] map3InputPayload = MRHelpers.createMRInputPayloadWithGrouping(map3Payload,
            Text.class.getName());

        final Vertex map3Vertex = new Vertex("M3",
            new ProcessorDescriptor(MapProcessor.class.getName()).setUserPayload(map3Payload),
-1,
            MRHelpers.getMapResource(map3Conf));
        mapVertex.setJavaOpts(MRHelpers.getMapJavaOpts(map3Conf));

        final Map<String, String> map3Env = new HashMap<String, String>();
        MRHelpers.updateEnvironmentForMRTasks(map3Conf, map3Env, true);
        map2Vertex.setTaskEnvironment(map3Env);
       MRHelpers.addMROutputLegacy(map3Vertex, map3Payload);

       DAG dag = new DAG();
        dag.addEdge(new Edge(map1Vertex, reduce1Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));

        dag.addEdge(new Edge(reduce1Vertex, map2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));

        dag.addEdge(new Edge(reduce1Vertex, map3Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));

        dag.addEdge(new Edge(map2Vertex, reduce2Vertex, new EdgeProperty(DataMovementType.SCATTER_GATHER,
            DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL, new OutputDescriptor(OnFileSortedOutput.class
                .getName()), new InputDescriptor(ShuffledMergedInputLegacy.class.getName()))));




(Note the use of "MRHelpers.addMROutputLegacy" - it is placed on each reducer)

We have noticed that whenever we ONLY run Map-Reduce 1 (M1,R1); everything works fine.  But
when we add Map-Reduce 2 or Map 3; we start to get errors.

Is there a way to have multiple vertices write output to HDFS in the same DAG?  Are there
code examples of doing this?

FYI:  We are using HDP 2.1, with Tez 0.4.0-incubating.

Thanks in advance,
Jeff Hurt


CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to which it is addressed
and may contain information that is confidential, privileged and exempt from disclosure under
applicable law. If the reader of this message is not the intended recipient, you are hereby
notified that any printing, copying, dissemination, distribution, disclosure or forwarding
of this communication is strictly prohibited. If you have received this communication in error,
please contact the sender immediately and delete it from your system. Thank You.

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message