tez-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adrian Nicoara <adn...@microsoft.com.INVALID>
Subject Full cross join of root inputs
Date Fri, 07 Dec 2018 01:57:50 GMT
Hello Tez devs,

I will start with an example of vertex output full cross join, and then circle back to the
root case:

V1[2]       V2[3]
 |           |
DME1(2)     DME2(3)
    \      /

Assume we have:
1. Producer vertex V1, with 2 tasks, in which every task generates one physical output.
2. Once the tasks of V1 complete, they each raise a DataMovementEvent, for a total of 2 DMEs,
called DME1.
3. Producer vertex V2, with 3 tasks, in which every task generates one physical output.
4. Once the tasks of V2 complete, they each raise a DataMovementEvent, for a total of 3 DMEs,
called DME2.
5. Consumer vertex V3, with 6 tasks (2x3), one for each of the V1xV2 output combinations.
6. It is the responsibility of the EdgeManager on the edge V1->V3, to take the DME1 events
{0} and {1}, and broadcast them to the tasks {0, *} and {1, *} respectively in V3 (assuming
we view V3 as a 2D array).
7. It is the responsibility of the EdgeManager on the edge V2->V3, to take the DME2 events
{0}, {1}, and {2} and broadcast them to the tasks {*, 0}, {*, 1} and {*, 2} respectively in
V3 (assuming we view V3 as a 2D array).

Now, consider the example of a full cross join of two root inputs:

R1[2]       R2[3]
 |           |    
IDIE1(?)    IDIE2(?)
     \     /

Then we have:
1. Root input R1, with 2 physical partitions.
2. Root input R2, with 3 physical partitions.
3. A full cross join, with a task setup to process 1 physical partition from each input, would
result in V3 having again 6 tasks.

Problem 1 - configuring the number of tasks in V3, and how many InputDataInformationEvents
to send:

At a first glance, each InputInitializer would have to result in 6 InputDataInformationEvents,
one for each task.
This is in contrast with the DataMovementEvent model, where the number of **generated/stored**
(as opposed to transmitted downstream) events is relative to the number of physical outputs.
Furthermore, configuring the vertex V3 requires global information (all of the inputs to the
vertex), while an InputInitializer functions on local information (the respective root input).
R1 then takes a dependency on R2, and R2 takes a dependency on R1.
The delay that the InputInitializers experience in obtaining the global information can be
made larger, by adding another input from a vertex V4, and delaying configuration up until
V4 is configured.

Problem 2 - the VertexManagerPlugin doesn't have a proper chance of mutating the InputInitializer

While there is VertexManagerPluginContext#addRootInputEvents:
that allows a VertexManagerPlugin to add root input events, that API can only be used within
Because the queued events get processed by the VertexManagerRootInputInitializedCallback:
which runs independently of the calls the VertexManagerPlugin does to VertexManagerPluginContext#addRootInputEvents.

So the only chance of acting and mutating the events is during that one function call, at
which point the VertexManagerPlugin might not have all the required global information to
actually do the mutation.

Solution 1:
Modify VertexManagerPluginContext#addRootInputEvents to be dependent on the VertexManagerPluginContext#vertexReconfigurationPlanned
and VertexManagerPluginContext#doneReconfiguringVertex calls, and be able to send a VertexEventInputDataInformation
event to the VertexImpl, similar to how the VertexManagerPluginContext#reconfigureVertex calls
work; or overload a #reconfigureVertex function to take root input events also.

1. Small contained change.
2. Delegates responsibility of configuration to the user code.

1. Routing, which is usually delegated/implemented in the EdgeManagerPlugin layer must now
be done in the VertexManagerPlugin for root inputs only.
2. Duplicate events must be stored.

Solution 2:
Treat data sources as properties of the graph rather than as properties of a Vertex. To that
end, they are similar to a Vertex that directly completes everything, once InputInitializer
completion happens.

1. The same event movement model is used across root inputs/vertex inputs. This makes for
more uniform code.
2. The amount of stored events is proportional to the number of "physical partitions" that
the data source represents, rather than the number of times these partitions/events have to
be replicated.

1. Fundamental change to the design of the Tez graph.

Appreciate any suggestions.


View raw message