tez-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jonathan Eagles <jeag...@gmail.com>
Subject Re: Full cross join of root inputs
Date Wed, 19 Dec 2018 20:34:39 GMT

Sorry for the late reply. I'm out until second week in January. Not
being completely familiar with how full cross product edge was
implemented as part of TEZ-2104, I would want to compare the solutions
you present and weigh the pro/cons. I can definitely see how your two
solutions differ especially with messaging and amount of effort.


On Thu, Dec 6, 2018 at 7:58 PM Adrian Nicoara
<adnico@microsoft.com.invalid> wrote:
> Hello Tez devs,
> I will start with an example of vertex output full cross join, and then circle back to
the root case:
> V1[2]       V2[3]
>  |           |
> DME1(2)     DME2(3)
>     \      /
>      V3[2x3]
> Assume we have:
> 1. Producer vertex V1, with 2 tasks, in which every task generates one physical output.
> 2. Once the tasks of V1 complete, they each raise a DataMovementEvent, for a total of
2 DMEs, called DME1.
> 3. Producer vertex V2, with 3 tasks, in which every task generates one physical output.
> 4. Once the tasks of V2 complete, they each raise a DataMovementEvent, for a total of
3 DMEs, called DME2.
> 5. Consumer vertex V3, with 6 tasks (2x3), one for each of the V1xV2 output combinations.
> 6. It is the responsibility of the EdgeManager on the edge V1->V3, to take the DME1
events {0} and {1}, and broadcast them to the tasks {0, *} and {1, *} respectively in V3 (assuming
we view V3 as a 2D array).
> 7. It is the responsibility of the EdgeManager on the edge V2->V3, to take the DME2
events {0}, {1}, and {2} and broadcast them to the tasks {*, 0}, {*, 1} and {*, 2} respectively
in V3 (assuming we view V3 as a 2D array).
> Now, consider the example of a full cross join of two root inputs:
> R1[2]       R2[3]
>  |           |
> IDIE1(?)    IDIE2(?)
>      \     /
>       V3[2x3]
> Then we have:
> 1. Root input R1, with 2 physical partitions.
> 2. Root input R2, with 3 physical partitions.
> 3. A full cross join, with a task setup to process 1 physical partition from each input,
would result in V3 having again 6 tasks.
> Problem 1 - configuring the number of tasks in V3, and how many InputDataInformationEvents
to send:
> At a first glance, each InputInitializer would have to result in 6 InputDataInformationEvents,
one for each task.
> This is in contrast with the DataMovementEvent model, where the number of **generated/stored**
(as opposed to transmitted downstream) events is relative to the number of physical outputs.
> Furthermore, configuring the vertex V3 requires global information (all of the inputs
to the vertex), while an InputInitializer functions on local information (the respective root
> R1 then takes a dependency on R2, and R2 takes a dependency on R1.
> The delay that the InputInitializers experience in obtaining the global information can
be made larger, by adding another input from a vertex V4, and delaying configuration up until
V4 is configured.
> Problem 2 - the VertexManagerPlugin doesn't have a proper chance of mutating the InputInitializer
> While there is VertexManagerPluginContext#addRootInputEvents:
> https://github.com/apache/tez/blob/282bb0a3fddf20260d71b0a6cd798fa5479e7038/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPluginContext.java#L259-L272
> that allows a VertexManagerPlugin to add root input events, that API can only be used
within VertexManagerPlugin#onRootVertexInitialized:
> https://github.com/apache/tez/blob/282bb0a3fddf20260d71b0a6cd798fa5479e7038/tez-api/src/main/java/org/apache/tez/dag/api/VertexManagerPlugin.java#L125-L133
> Because the queued events get processed by the VertexManagerRootInputInitializedCallback:
> https://github.com/apache/tez/blob/282bb0a3fddf20260d71b0a6cd798fa5479e7038/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexManager.java#L589-L591
> which runs independently of the calls the VertexManagerPlugin does to VertexManagerPluginContext#addRootInputEvents.
> So the only chance of acting and mutating the events is during that one function call,
at which point the VertexManagerPlugin might not have all the required global information
to actually do the mutation.
> Solution 1:
> Modify VertexManagerPluginContext#addRootInputEvents to be dependent on the VertexManagerPluginContext#vertexReconfigurationPlanned
and VertexManagerPluginContext#doneReconfiguringVertex calls, and be able to send a VertexEventInputDataInformation
event to the VertexImpl, similar to how the VertexManagerPluginContext#reconfigureVertex calls
work; or overload a #reconfigureVertex function to take root input events also.
> Benefits:
> 1. Small contained change.
> 2. Delegates responsibility of configuration to the user code.
> Drawbacks:
> 1. Routing, which is usually delegated/implemented in the EdgeManagerPlugin layer must
now be done in the VertexManagerPlugin for root inputs only.
> 2. Duplicate events must be stored.
> Solution 2:
> Treat data sources as properties of the graph rather than as properties of a Vertex.
To that end, they are similar to a Vertex that directly completes everything, once InputInitializer
completion happens.
> Benefits:
> 1. The same event movement model is used across root inputs/vertex inputs. This makes
for more uniform code.
> 2. The amount of stored events is proportional to the number of "physical partitions"
that the data source represents, rather than the number of times these partitions/events have
to be replicated.
> Drawbacks:
> 1. Fundamental change to the design of the Tez graph.
> Appreciate any suggestions.
> Thanks,
> Adrian

View raw message