tez-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Yingda Chen (JIRA)" <j...@apache.org>
Subject [jira] [Created] (TEZ-3997) Enable CONCURRENT edge
Date Fri, 05 Oct 2018 19:27:00 GMT
Yingda Chen created TEZ-3997:

             Summary: Enable CONCURRENT edge
                 Key: TEZ-3997
                 URL: https://issues.apache.org/jira/browse/TEZ-3997
             Project: Apache Tez
          Issue Type: New Feature
            Reporter: Yingda Chen

A better formatted (and commentable) google doc with figures can be found at 


h1. *Enable CONCURRENT edge in Tez*

h2. *Motivation*

Tez was designed, from day one, to be a unifying framework for building data processing application[1],
with the promise to support different workloads. Yet the focus on Tez has largely placed on
supporting batch data processing, such as Hive/Pig. For those applications, edge SchedulingType
is usually modeled as SEQUENTIAL, with the between-vertices shuffle implemented on top of
Tez APIs. We believe that there are legitimate needs to fully enable the CONCURRENT SchedulingType,
which break away from the assumption that destination vertex can only be scheduled after (part
of) the the tasks in source vertex have been completed. 

There are various scenarios where CONCURRENT scheduling type can be helpful, such as the gang
scheduling of the whole DAG, or a refined version that gang-scheduled a sub-graph of a DAG,
known as “bubble scheduling”[2].  In addition, we have found that for Tez to truly unify
workloads other than conventional MR or SQL-like applications, the need of CONCURRENT scheduling
become more pressing. For example, a parameter-server application[3] can be modeled as a DAG
below, where *PS* denotes the vertex that hosts parameter-servers, and *W* denotes the vertex
that hosts workers responsible for heavy-lifting data-processing. There are two fundamental
assumptions that must be satisfied for parameter-server to work:


see [google doc|https://docs.google.com/document/d/17WRi2hwQGb-ms0-yHMp4OJf6yjmbh5aBy98N6z-tUgY/edit?usp=sharing] for

Fig. 1, Parameter-server modeled as a DAG with concurrent edge

 # All servers (i.e., all tasks in PS vertex) must be up and running before any worker (task
in W vertex) can make meaningful progress
 # All servers must run concurrently with the workers through the lifetime of the job


Note that one salient common trait shared by the above example is that the EPHEMERAL data
source type comes hand-in-hand with the CONCURRENT scheduling type. While this is what we
have found to be true in many practical scenarios, the original design in Tez that provides
orthogonal  DataSourceType and Scheduling remains more descriptive, and the proposed changes
here would keep that intact. 


Overall, we believe that the fundamental design of Tez framework, such as the pluggale Edge/Vertex
managers and versatile edge types, provides the customizability needed to enable the various
scenarios described above, and we propose to make the following changes.

h2. *Proposed Changes*

To address the above issues, we propose the following changes:
 # (No API change) Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType.
 # (Some API change/addition) Extend VertexManagerPlugin interface to allow for relevant events
 # Enable downstream vertex connecting to an EPHEMERAL data source, to reason about network
connections of upstream tasks. 
 # Allow mixture of CONCURRENT/SEQUENTIAL incoming edges on the same vertex


The details for the proposed changes are provided in the following sections.
 * *Allow CONCURRENT edge property in DAG construction and introduce ConcurrentSchedulingType*

|Note:There is no API change in this proposed change. The majority of this change will be
lifting some existing constraints against CONCURRENT edge type, and addition of a VertexMangerPlugin


This includes enabling the CONCURRENT SchedulingType as a valid edge property, by removing
all the sanity check against CONCURRENT during DAG construction/execution. A new VertexManagerPlugin
(namely VertexManagerWithConcurrentInput) will be implemented for vertex with incoming concurrent

In addition, we will assume in this change that 
 * A vertex *cannot* have both SEQUENTIAL and CONCURRENT incoming edges 
 * No shuffle or data movement is handled by Tez framework when two vertices are connected
through a CONCURRENT edge. Instead, runtime should be responsible for handling all the data-plane
communications (as proposed in [1]).

Note that the above assumptions are common for scenarios such as whole-DAG or sub-graph gang
scheduling, but they may be relaxed in later implementation, which may allow mixture of SEQUENTIAL
and CONCURRENT edges on the same vertex.


Most of the (meaningful) scheduling decisions today in Tez are made based on the notion of
(or an extended version of) source task completion. This will no longer be true in presence
of CONCURRENT edge. Instead, events such as source vertex configured, or source task running
will become more relevant when making scheduling decision for two vertices connected via a
CONCURRENT edge.  We therefore introduce a new enum *ConcurrentSchedulingType* to describe
the “scheduling timing” for the downstream vertex in such scenarios. 
|public enum ConcurrentSchedulingType{
   /** * trigger downstream vertex tasks scheduling by "configured" event of upstream vertices
   /** * trigger downstream vertex tasks scheduling by "running" event of upstream tasks


Note that in this change, we will only use SOURCE_VERTEX_CONFIGURED as the scheduling type,
which suffice for scenarios of whole-DAG or sub-graph gang-scheduling, where we want (all
the tasks in) the downstream vertex to be scheduled together with (all the tasks) in the upstream
vertex. In this case, we can leverage the existing onVertexStateUpdated() interface of VextexMangerPlugin
to collect relevant information to assist the scheduling decision, and *there is no additional
API change necessary*. However, in more subtle case such as the parameter-server example described
in Fig. 1, other scheduling type would be more relevant, therefore the placeholder for *ConcurrentSchedulingType*
will be introduced in this change as part of the infrastructure work.


Finally, since we assume that all communications between two vertices connected via CONCURRENT
edge are handled by application runtime, a CONCURRENT edge will be assigned a DummyEdgeManager
that basically mute all DME/VME handling.
 #  *Extend VertexManagerPlugin interface to allow for relevant events notification*

For concurrent connection, the downstream and upstream vertices would be running concurrently,
and in some cases, they would be scheduled at the same time as well, such as (sub-graph) gang
scheduling. However, *this is not always true*. In the example in Fig. 1, tasks in PS vertex
should be running before tasks in W vertex should be scheduled. Since otherwise if the resource
requests for PS cannot be fulfilled first, W will be spinning in vain. In other examples,
as long as part of tasks in upstream vertex are running, we can start scheduling downstream


In other words, if we put this into the context of existing interface/implementation of VertexMangerPlugin,
we can see strong duality of “OnSourceTaskRunning” for concurrent connection vs the “OnSourceTaskCompleted”
for (existing) sequential connection. Therefore, we propose an addition of “_onConcurrentSourceTaskRunning(TaskAttemptIdentifer
attempt)_” interface to the VertexManager Plugin, with default implementation being not


This change will also include the logic to add source task running event and to send such
events to downstream vertices. To reduce unnecessary event traffic, we will limit the sending
of such events to CONCURRENT edge, and when the ConcurrentSchedulingType is specified to be

 #  *Enable downstream vertex connecting to an EPHEMERAL data source, to reason about network
connections of upstream tasks.*

Another property that is usually shared with CONCURRENT on the same edge is EPHEMERAL data
source. When two vertices are running concurrently, direct communications between tasks in
those vertices become possible, and oftentimes necessary, throughout the lifetime of the running
task. This can be articulated by an EPHEMERAL data sources, and this change aims to support
such scenarios, which are readily found in real-time applications (such as interactive query)
and/or customized applications that would like to control their own data communications (such
as parameter-server).


This change will allow Tez to be the central orchestrator that gathers necessary network information
from all upstream tasks, compiles them together and send it to downstream tasks. Particularly,
the following changes are planned:
 # For two vertices connected via an edge with both CONCURRENT scheduling type and EPHEMERAL
data source type, the task in upstream vertex will open network port, and send an VertexManagerEvent(VME)
immediately upon running. The payload of VME includes necessary information to communicate
to this task through direct network communication (such as ip and port). The vertex manager
of the downstream vertex, typed VertexManagerWithConcurrentInputs, will receive these VMEs,
and are responsible for aggregate (including de-dup if necessary) all information together
in onVertexManagerEventReceived(). 
 # Once all VMEs have been received, a CustomProcessorEvent will be constructed with a payload
that includes the aggregated information, and be routed to downstream tasks.

The change will introduce additional optional entries in VertexManagerEventPayload and a new
custom payload that will be embedded into CustomProcessorEvent. 


Upon completion of functional feature in this change, additional feature such as handling
of failover in CONCURRENT/EPHEMERAL edge will be addressed in future umbrea JIRAs. 

 #  *Allow mixture of CONCURRENT/SEQUENTIAL incoming edges on the same vertex*

In the above two changes, we assume that a vertex’s incoming edges should have the same
edge property in terms of Scheduling Type, i.e., they are either all SEQUENTIAL, or, all CONCURRENT.

We shall extend beyond this assumption in this change to allow mixture of different incoming
edge types, as exemplified in Fig.2. 

see [google doc|https://docs.google.com/document/d/17WRi2hwQGb-ms0-yHMp4OJf6yjmbh5aBy98N6z-tUgY/edit?usp=sharing] for

Fig. 2 Vertex with mixture of input edges

This change will mainly focus on enriching the VertexMangagerPlugin implementation that we
introduced in our first change, namely, the VertexManagerWithConcurrentInputs. No API change
is expected with this change.

h2. Reference

[1] Apache Tez: A Unifying Framework for Modeling and Building Data Processing Applications
SIGMOD’15 http://dl.acm.org/authorize?N97131

[2] Bubble Execution: Resource-aware Reliable Analytics at Cloud Scale , VLDB 2018 [http://www.vldb.org/pvldb/vol11/p746-yin.pdf]

[3] [https://www.tensorflow.org/deploy/distributed]


This message was sent by Atlassian JIRA

View raw message