airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jacob Ferriero <jferri...@google.com.INVALID>
Subject Re: [AIP-34] Rewrite SubDagOperator
Date Wed, 05 Aug 2020 18:07:56 GMT
I really like this idea of a TaskGroup container as I think this will be
much easier to use than SubDag.

I'd like to propose an optional behavior for special retry mechanics via a
TaskGroup.retry_all property.
This way I could use TaskGroup to replace my favorite use of SubDag for
atomically retrying tasks of the pattern "act on external state then
reschedule poll until desired state reached".

Motivating use case I have for a SubDag is very simple two task group
[SubmitLongRunningJobTask >> PollJobStatusSensor].
I use SubDag is because it gives me an easy way to retry the SubmitJobTask
if something about the PollJobSensor fails.
This pattern would be really nice for jobs that are expected to run a long
time (because we can use sensor can use reschedule mode freeing up slots)
but might fail for a retryable reason.
However, using SubDag to meet this use case defeats the purpose because
SubDag infamously
<https://medium.com/@team_24989/fixing-subdagoperator-deadlock-in-airflow-6c64312ebb10>
blocks a "controller" slot for the entire duration.
This may feel like a cyclic behavior but reality it is very common for a
single operator to submit job / wait til done.
We could use this case refactor many operators (e.g. BQ, Dataproc,
Dataflow) to be implemented as TaskGroup[SubmitTask >> PollTask] with an
optional reschedule mode if user knows that this job may take a long time.

I'd be happy to the development work on adding this specific retry behavior
to TaskGroup once the base concept is implemented if others in the
community would find this a useful feature.

Cheers,
Jake

On Tue, Aug 4, 2020 at 10:07 AM Jarek Potiuk <Jarek.Potiuk@polidea.com>
wrote:

> All for it :) . I think we are getting closer to have regular planning and
> making some structured approach to 2.0 and starting task force for it soon,
> so I think this should be perfectly fine to discuss and even start
> implementing what's beyond as soon as we make sure that we are prioritizing
> 2.0 work.
>
> J,
>
>
> On Tue, Aug 4, 2020 at 12:09 PM Yu Qian <yuqian1990@gmail.com> wrote:
>
> > Hi Jarek,
> >
> > I agree we should not change the behaviour of the existing SubDagOperator
> > till Airflow 2.1. Is it okay to continue the discussion about TaskGroup
> as
> > a brand new concept/feature independent from the existing SubDagOperator?
> > In other words, shall we add TaskGroup as a UI grouping concept like Ash
> > suggested, and not touch SubDagOperator atl all. Whenever we are ready
> with
> > TaskGroup, we then deprecate SubDagOperator in Airflow 2.1.
> >
> > I really like Ash's idea of simplifying the SubDagOperator idea into a
> > simple UI grouping concept. I think Xinbin's idea of "reattaching all the
> > tasks to the root DAG" is the way to go. And I see James pointed out we
> > need some helper functions to simplify dependencies setting of TaskGroup.
> > Xinbin put up a pretty elegant example in his PR
> > <https://github.com/apache/airflow/pull/9243>. I think having TaskGroup
> as
> > a UI concept should be a relatively small change. We can simplify
> Xinbin's
> > PR further. So I put up this alternative proposal here:
> > https://github.com/apache/airflow/pull/10153
> >
> > I have not done any UI changes due to lack of experience with web UI. If
> > anyone's interested, please take a look at the PR.
> >
> > Qian
> >
> > On Mon, Jun 22, 2020 at 5:15 AM Jarek Potiuk <Jarek.Potiuk@polidea.com>
> > wrote:
> >
> > > Similar point here to the other ideas that are popping up. Maybe we
> > should
> > > just focus on completing 2.0 and make all discussions about further
> > > improvements to 2.1? While those are important discussions (and we
> should
> > > continue them in the  near future !) I think at this point focusing on
> > > delivering 2.0 in its current shape should be our focus now ?
> > >
> > > J.
> > >
> > > On Thu, Jun 18, 2020 at 6:35 PM Xinbin Huang <bin.huangxb@gmail.com>
> > > wrote:
> > >
> > > > Hi Daniel
> > > >
> > > > I agree that the TaskGroup should have the same API as a DAG object
> > > related
> > > > to task dependencies, but it will not have anything related to actual
> > > > execution or scheduling.
> > > > I will update the AIP according to this over the weekend.
> > > >
> > > > > We could even make a “DAGTemplate” object s.t. when you import the
> > > object
> > > > you can import it with parameters to determine the shape of the DAG.
> > > >
> > > > Can you elaborate a bit more on this? Does it serve a similar purpose
> > as
> > > a
> > > > DAG factory function?
> > > >
> > > >
> > > >
> > > > On Thu, Jun 18, 2020 at 9:13 AM Daniel Imberman <
> > > daniel.imberman@gmail.com
> > > > >
> > > > wrote:
> > > >
> > > > > Hi Bin,
> > > > >
> > > > > Why not give the TaskGroup the same API as a DAG object (e.g. the
> > > bitwise
> > > > > operator fro task dependencies). We could even make a “DAGTemplate”
> > > > object
> > > > > s.t. when you import the object you can import it with parameters
> to
> > > > > determine the shape of the DAG.
> > > > >
> > > > >
> > > > > On Wed, Jun 17, 2020 at 8:54 PM, Xinbin Huang <
> bin.huangxb@gmail.com
> > >
> > > > > wrote:
> > > > > The TaskGroup will not take schedule interval as a parameter
> itself,
> > > and
> > > > it
> > > > > depends on the DAG where it attaches to. In my opinion, the
> TaskGroup
> > > > will
> > > > > only contain a group of tasks with interdependencies, and the
> > TaskGroup
> > > > > behaves like a task. It doesn't contain any execution/scheduling
> > logic
> > > > > (i.e. schedule_interval, concurrency, max_active_runs etc.) like a
> > DAG
> > > > > does.
> > > > >
> > > > > > For example, there is the scenario that the schedule interval of
> > DAG
> > > is
> > > > > 1 hour and the schedule interval of TaskGroup is 20 min.
> > > > >
> > > > > I am curious why you ask this. Is this a use case that you want to
> > > > achieve?
> > > > >
> > > > > Bin
> > > > >
> > > > > On Wed, Jun 17, 2020 at 7:59 PM 蒋晓峰 <thanosxnicholas@gmail.com>
> > wrote:
> > > > >
> > > > > > Hi Bin,
> > > > > > Using TaskGroup, Is the schedule interval of TaskGroup the same
> as
> > > the
> > > > > > parent DAG? My main concern is whether the schedule interval of
> > > > TaskGroup
> > > > > > could be different with that of the DAG? For example, there is
> the
> > > > > scenario
> > > > > > that the schedule interval of DAG is 1 hour and the schedule
> > interval
> > > > of
> > > > > > TaskGroup is 20 min.
> > > > > >
> > > > > > Cheers,
> > > > > > Nicholas
> > > > > >
> > > > > > On Thu, Jun 18, 2020 at 10:30 AM Xinbin Huang <
> > bin.huangxb@gmail.com
> > > >
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Nicholas,
> > > > > > >
> > > > > > > I am not sure about the old behavior of SubDagOperator, maybe
> it
> > > will
> > > > > > throw
> > > > > > > an error? But in the original proposal, the subdag's
> > > > schedule_interval
> > > > > > will
> > > > > > > be ignored. Or if we decide to use TaskGroup to replace SubDag,
> > > there
> > > > > > will
> > > > > > > be no subdag schedule_interval.
> > > > > > >
> > > > > > > Bin
> > > > > > >
> > > > > > > On Wed, Jun 17, 2020 at 6:21 PM 蒋晓峰 <thanosxnicholas@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > > Hi Bin,
> > > > > > > > Thanks for your good proposal. I was confused whether the
> > > schedule
> > > > > > > > interval of SubDAG is different from that of the parent DAG?
> I
> > > have
> > > > > > > > discussed with Jiajie Zhong about the schedule interval of
> > > SubDAG.
> > > > If
> > > > > > the
> > > > > > > > SubDagOperator has a different schedule interval, what will
> > > happen
> > > > > for
> > > > > > > the
> > > > > > > > scheduler to schedule the parent DAG?
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Nicholas Jiang
> > > > > > > >
> > > > > > > > On Thu, Jun 18, 2020 at 8:04 AM Xinbin Huang <
> > > > bin.huangxb@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thank you, Max, Kaxil, and everyone's feedback!
> > > > > > > > >
> > > > > > > > > I have rethought about the concept of subdag and task
> > groups. I
> > > > > think
> > > > > > > the
> > > > > > > > > better way to approach this is to entirely remove subdag
> and
> > > > > > introduce
> > > > > > > > the
> > > > > > > > > concept of TaskGroup, which is a container of tasks along
> > with
> > > > > their
> > > > > > > > > dependencies *without execution/scheduling logic as a DAG*.
> > The
> > > > > only
> > > > > > > > > purpose of it is to group a list of tasks, but you still
> need
> > > to
> > > > > add
> > > > > > it
> > > > > > > > to
> > > > > > > > > a DAG for execution.
> > > > > > > > >
> > > > > > > > > Here is a small code snippet.
> > > > > > > > >
> > > > > > > > > ```
> > > > > > > > > class TaskGroup:
> > > > > > > > > """
> > > > > > > > > A TaskGroup contains a group of tasks.
> > > > > > > > >
> > > > > > > > > If default_args is missing, it will take default args from
> > the
> > > > > > DAG.
> > > > > > > > > """
> > > > > > > > > def __init__(self, group_id, default_args):
> > > > > > > > > pass
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > """
> > > > > > > > > You can add tasks to a task group similar to adding tasks
> to
> > a
> > > > DAG
> > > > > > > > >
> > > > > > > > > This can be declared in a separate file from the dag file
> > > > > > > > > """
> > > > > > > > > download_group = TaskGroup(group_id='download',
> > > > > > > > default_args=default_args)
> > > > > > > > > download_group.add_task(task1)
> > > > > > > > > task2.dag = download_group
> > > > > > > > >
> > > > > > > > > with download_group:
> > > > > > > > > task3 = DummyOperator(task_id='task3')
> > > > > > > > >
> > > > > > > > > [task, task2] >> task3
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > """Add it to a DAG for execution"""
> > > > > > > > > with DAG(dag_id='start_download_dag',
> > > default_args=default_args,
> > > > > > > > > schedule_interval='@daily', ...) as dag:
> > > > > > > > > start = DummyOperator(task_id='start')
> > > > > > > > > start >> download_group
> > > > > > > > > # this is equivalent to
> > > > > > > > > # start >> [task, task2] >> task3
> > > > > > > > > ```
> > > > > > > > >
> > > > > > > > > With this, we can still reuse a group of tasks and set
> > > > dependencies
> > > > > > > > between
> > > > > > > > > them; it avoids the boilerplate code from using
> > SubDagOperator,
> > > > and
> > > > > > we
> > > > > > > > can
> > > > > > > > > declare dependencies as `task >> task_group >> task`.
> > > > > > > > >
> > > > > > > > > User migration wise, we can introduce it before Airflow 2.0
> > and
> > > > > allow
> > > > > > > > > gradual transition. Then we can decide if we still want to
> > keep
> > > > the
> > > > > > > > > SubDagOperator or simply remove it.
> > > > > > > > >
> > > > > > > > > Any thoughts?
> > > > > > > > >
> > > > > > > > > Cheers,
> > > > > > > > > Bin
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Jun 17, 2020 at 7:37 AM Maxime Beauchemin <
> > > > > > > > > maximebeauchemin@gmail.com> wrote:
> > > > > > > > >
> > > > > > > > > > +1, proposal looks good.
> > > > > > > > > >
> > > > > > > > > > The original intention was really to have tasks groups
> and
> > a
> > > > > > > > zoom-in/out
> > > > > > > > > in
> > > > > > > > > > the UI. The original reasoning was to reuse the DAG
> object
> > > > since
> > > > > it
> > > > > > > is
> > > > > > > > a
> > > > > > > > > > group of tasks, but as highlighted here it does create
> > > > underlying
> > > > > > > > > > confusions since a DAG is much more than just a group of
> > > tasks.
> > > > > > > > > >
> > > > > > > > > > Max
> > > > > > > > > >
> > > > > > > > > > On Mon, Jun 15, 2020 at 2:43 AM Poornima Joshi <
> > > > > > > > > joshipoornima06@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Thank you for your email.
> > > > > > > > > > >
> > > > > > > > > > > On Sat, Jun 13, 2020 at 12:18 AM Xinbin Huang <
> > > > > > > bin.huangxb@gmail.com
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > > > - *Unpack SubDags during dag parsing*: This
> > rewrites
> > > > the
> > > > > > > > > > > > *DagBag.bag_dag*
> > > > > > > > > > > > > > method to unpack subdag while parsing, and it
> will
> > > > give a
> > > > > > > > flat
> > > > > > > > > > > > > > structure at
> > > > > > > > > > > > > > the task level
> > > > > > > > > > > > >
> > > > > > > > > > > > > The serialized_dag representation already does
> this I
> > > > > think.
> > > > > > At
> > > > > > > > > least
> > > > > > > > > > > if
> > > > > > > > > > > > > I've understood your idea here correctly.
> > > > > > > > > > > >
> > > > > > > > > > > > I am not sure about serialized_dag representation,
> but
> > at
> > > > > least
> > > > > > > it
> > > > > > > > > will
> > > > > > > > > > > > still keep the subdag entry in the DAG table? In my
> > > > proposal
> > > > > as
> > > > > > > > also
> > > > > > > > > in
> > > > > > > > > > > the
> > > > > > > > > > > > draft PR, the idea is to *extract the tasks from the
> > > subdag
> > > > > and
> > > > > > > add
> > > > > > > > > > them
> > > > > > > > > > > > back to the root_dag. *So the runtime DAG graph will
> > look
> > > > > > exactly
> > > > > > > > the
> > > > > > > > > > > > same as without subdag but with metadata attached to
> > > those
> > > > > > > > sections.
> > > > > > > > > > > These
> > > > > > > > > > > > metadata will be later on used to render in the UI.
> So
> > > > after
> > > > > > > > parsing
> > > > > > > > > (
> > > > > > > > > > > > *DagBag.process_file()*), it will just output the
> > > *root_dag
> > > > > > > > *instead
> > > > > > > > > of
> > > > > > > > > > > *root_dag +
> > > > > > > > > > > > subdag + subdag + nested subdag* etc.
> > > > > > > > > > > >
> > > > > > > > > > > > - e.g. section-1-* will have metadata
> > > > > > current_group=section-1,
> > > > > > > > > > > > parent_group=<the-root-dag-id> (welcome for naming
> > > > > > > suggestions),
> > > > > > > > > the
> > > > > > > > > > > > reason for parent_group is that we can have nested
> > group
> > > > and
> > > > > > > > still
> > > > > > > > > > be
> > > > > > > > > > > > able to capture the dependency.
> > > > > > > > > > > >
> > > > > > > > > > > > Runtime DAG:
> > > > > > > > > > > > [image: image.png]
> > > > > > > > > > > >
> > > > > > > > > > > > While at the UI, what we see would be something like
> > this
> > > > by
> > > > > > > > > utilizing
> > > > > > > > > > > the
> > > > > > > > > > > > metadata, and then we can expand or zoom into in some
> > > way.
> > > > > > > > > > > > [image: image.png]
> > > > > > > > > > > >
> > > > > > > > > > > > The benefits I can see is that:
> > > > > > > > > > > > 1. We don't need to deal with the extra complexity of
> > > > SubDag
> > > > > > for
> > > > > > > > > > > execution
> > > > > > > > > > > > and scheduling. It will be the same as not using
> > SubDag.
> > > > > > > > > > > > 2. Still have the benefits of modularized and
> reusable
> > > dag
> > > > > code
> > > > > > > and
> > > > > > > > > > > > declare dependencies between them. And with the new
> > > > > > > SubDagOperator
> > > > > > > > > (see
> > > > > > > > > > > AIP
> > > > > > > > > > > > or draft PR), we can use the same dag_factory
> function
> > > for
> > > > > > > > > generating 1
> > > > > > > > > > > > dag, a lot of dynamic dags, or used for SubDag (in
> this
> > > > case,
> > > > > > it
> > > > > > > > will
> > > > > > > > > > > just
> > > > > > > > > > > > extract all underlying tasks and append to the root
> > dag).
> > > > > > > > > > > >
> > > > > > > > > > > > - Then it gets to the idea of replacing subdag with a
> > > > > > simpler
> > > > > > > > > > concept
> > > > > > > > > > > > by Ash: the proposed change basically drains out the
> > > > > > contents
> > > > > > > > of
> > > > > > > > > a
> > > > > > > > > > > SubDag
> > > > > > > > > > > > and becomes more like
> > > > > > > > ExtractSubdagTasksAndAppendToRootdagOperator
> > > > > > > > > > > (forgive
> > > > > > > > > > > > me about the crazy name..). In this case, it is still
> > > > > > > necessary
> > > > > > > > to
> > > > > > > > > > > keep the
> > > > > > > > > > > > concept of subdag as it is nothing more than a name?
> > > > > > > > > > > >
> > > > > > > > > > > > That's why the TaskGroup idea comes up. Thanks Chris
> > > Palmer
> > > > > for
> > > > > > > > > helping
> > > > > > > > > > > > conceptualize the functionality of TaskGroup, I will
> > just
> > > > > paste
> > > > > > > it
> > > > > > > > > > here.
> > > > > > > > > > > >
> > > > > > > > > > > > > - Tasks can be added to a TaskGroup
> > > > > > > > > > > > > - You *can* have dependencies between Tasks in the
> > same
> > > > > > > > TaskGroup,
> > > > > > > > > > but
> > > > > > > > > > > > > *cannot* have dependencies between a Task in a
> > > TaskGroup
> > > > > > and
> > > > > > > > > > either a
> > > > > > > > > > > > > Task in a different TaskGroup or a Task not in any
> > > group
> > > > > > > > > > > > > - You *can* have dependencies between a TaskGroup
> and
> > > > > > either
> > > > > > > > > other
> > > > > > > > > > > > > TaskGroups or Tasks not in any group
> > > > > > > > > > > > > - The UI will by default render a TaskGroup as a
> > single
> > > > > > > > "object",
> > > > > > > > > > but
> > > > > > > > > > > > > which you expand or zoom into in some way
> > > > > > > > > > > > > - You'd need some way to determine what the
> "status"
> > > of a
> > > > > > > > > TaskGroup
> > > > > > > > > > > was
> > > > > > > > > > > > > at least for UI display purposes
> > > > > > > > > > > >
> > > > > > > > > > > > I agree with Chris:
> > > > > > > > > > > > - From the backend's view (scheduler & executor), I
> > think
> > > > > > > TaskGroup
> > > > > > > > > > > should
> > > > > > > > > > > > be ignored during execution. (unless we decide to
> > > implement
> > > > > > some
> > > > > > > > > > metadata
> > > > > > > > > > > > operations that allows start/stop a group of tasks
> > etc.)
> > > > > > > > > > > > - From the UI's View, it should be able to pick up
> the
> > > > > > individual
> > > > > > > > > > tasks'
> > > > > > > > > > > > status and then determine the TaskGroup's status
> > > > > > > > > > > >
> > > > > > > > > > > > Bin
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Jun 12, 2020 at 10:28 AM Daniel Imberman <
> > > > > > > > > > > > daniel.imberman@gmail.com> wrote:
> > > > > > > > > > > >
> > > > > > > > > > > >> I hadn’t thought about using the `>>` operator to
> tie
> > > dags
> > > > > > > > together
> > > > > > > > > > but
> > > > > > > > > > > I
> > > > > > > > > > > >> think that sounds pretty great! I wonder if we could
> > > > > > essentially
> > > > > > > > > write
> > > > > > > > > > > in
> > > > > > > > > > > >> the ability to set dependencies to all starter-tasks
> > for
> > > > > that
> > > > > > > DAG.
> > > > > > > > > > > >>
> > > > > > > > > > > >> I’m personally ok with SubDag being a mostly UI
> > concept.
> > > > It
> > > > > > > > doesn’t
> > > > > > > > > > need
> > > > > > > > > > > >> to execute separately, you’re just adding more tasks
> > to
> > > > the
> > > > > > > queue
> > > > > > > > > that
> > > > > > > > > > > will
> > > > > > > > > > > >> be executed when there are resources available.
> > > > > > > > > > > >>
> > > > > > > > > > > >> via Newton Mail [
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cloudmagic.com/k/d/mailapp?ct=dx&cv=10.0.50&pv=10.14.6&source=email_footer_2
> > > > > > > > > > > >> ]
> > > > > > > > > > > >> On Fri, Jun 12, 2020 at 9:45 AM, Chris Palmer <
> > > > > > > chris@crpalmer.com
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > > >> I agree that SubDAGs are an overly complex
> > abstraction.
> > > I
> > > > > > think
> > > > > > > > what
> > > > > > > > > > is
> > > > > > > > > > > >> needed/useful is a TaskGroup concept. On a high
> level
> > I
> > > > > think
> > > > > > > you
> > > > > > > > > want
> > > > > > > > > > > >> this
> > > > > > > > > > > >> functionality:
> > > > > > > > > > > >>
> > > > > > > > > > > >> - Tasks can be added to a TaskGroup
> > > > > > > > > > > >> - You *can* have dependencies between Tasks in the
> > same
> > > > > > > TaskGroup,
> > > > > > > > > but
> > > > > > > > > > > >> *cannot* have dependencies between a Task in a
> > TaskGroup
> > > > and
> > > > > > > > either
> > > > > > > > > a
> > > > > > > > > > > >> Task in a different TaskGroup or a Task not in any
> > group
> > > > > > > > > > > >> - You *can* have dependencies between a TaskGroup
> and
> > > > either
> > > > > > > other
> > > > > > > > > > > >> TaskGroups or Tasks not in any group
> > > > > > > > > > > >> - The UI will by default render a TaskGroup as a
> > single
> > > > > > > "object",
> > > > > > > > > but
> > > > > > > > > > > >> which you expand or zoom into in some way
> > > > > > > > > > > >> - You'd need some way to determine what the "status"
> > of
> > > a
> > > > > > > > TaskGroup
> > > > > > > > > > was
> > > > > > > > > > > >> at least for UI display purposes
> > > > > > > > > > > >>
> > > > > > > > > > > >> Not sure if it would need to be a top level object
> > with
> > > > its
> > > > > > own
> > > > > > > > > > database
> > > > > > > > > > > >> table and model or just another attribute on tasks.
> I
> > > > think
> > > > > > you
> > > > > > > > > could
> > > > > > > > > > > >> build
> > > > > > > > > > > >> it in a way such that from the schedulers point of
> > view
> > > a
> > > > > DAG
> > > > > > > with
> > > > > > > > > > > >> TaskGroups doesn't get treated any differently. So
> it
> > > > really
> > > > > > > just
> > > > > > > > > > > becomes
> > > > > > > > > > > >> a
> > > > > > > > > > > >> shortcut for setting dependencies between sets of
> > Tasks,
> > > > and
> > > > > > > > allows
> > > > > > > > > > the
> > > > > > > > > > > UI
> > > > > > > > > > > >> to simplify the render of the DAG structure.
> > > > > > > > > > > >>
> > > > > > > > > > > >> Chris
> > > > > > > > > > > >>
> > > > > > > > > > > >> On Fri, Jun 12, 2020 at 12:12 PM Dan Davydov
> > > > > > > > > > > <ddavydov@twitter.com.invalid
> > > > > > > > > > > >> >
> > > > > > > > > > > >> wrote:
> > > > > > > > > > > >>
> > > > > > > > > > > >> > Agree with James (and think it's actually the more
> > > > > important
> > > > > > > > issue
> > > > > > > > > > to
> > > > > > > > > > > >> fix),
> > > > > > > > > > > >> > but I am still convinced Ash' idea is the right
> way
> > > > > forward
> > > > > > > > (just
> > > > > > > > > it
> > > > > > > > > > > >> might
> > > > > > > > > > > >> > require a bit more work to deprecate than adding
> > > visual
> > > > > > > grouping
> > > > > > > > > in
> > > > > > > > > > > the
> > > > > > > > > > > >> > UI).
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > There was a previous thread about this FYI with
> more
> > > > > context
> > > > > > > on
> > > > > > > > > why
> > > > > > > > > > > >> subdags
> > > > > > > > > > > >> > are bad and potential solutions:
> > > > > > > > > > > >> >
> > > > > > > >
> > > https://www.mail-archive.com/dev@airflow.apache.org/msg01202.html
> > > > > > > > > > . A
> > > > > > > > > > > >> > solution I outline there to Jame's problem is e.g.
> > > > > enabling
> > > > > > > the
> > > > > > > > >>
> > > > > > > > > > > >> operator
> > > > > > > > > > > >> > for Airflow operators to work with DAGs as well. I
> > see
> > > > > this
> > > > > > > > being
> > > > > > > > > > > >> separate
> > > > > > > > > > > >> > from Ash' solution for DAG grouping in the UI but
> > one
> > > of
> > > > > the
> > > > > > > two
> > > > > > > > > > items
> > > > > > > > > > > >> > required to replace all existing subdag
> > functionality.
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > I've been working with subdags for 3 years and
> they
> > > are
> > > > > > > always a
> > > > > > > > > > giant
> > > > > > > > > > > >> pain
> > > > > > > > > > > >> > to use. They are a constant source of user
> confusion
> > > and
> > > > > > > > breakages
> > > > > > > > > > > >> during
> > > > > > > > > > > >> > upgrades. Would love to see them gone :).
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > On Fri, Jun 12, 2020 at 11:11 AM James Coder <
> > > > > > > > jcoder01@gmail.com>
> > > > > > > > > > > >> wrote:
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > > I'm not sure I totally agree it's just a UI
> > > concept. I
> > > > > use
> > > > > > > the
> > > > > > > > > > > subdag
> > > > > > > > > > > >> > > operator to simplify dependencies too. If you
> > have a
> > > > > group
> > > > > > > of
> > > > > > > > > > tasks
> > > > > > > > > > > >> that
> > > > > > > > > > > >> > > need to finish before another group of tasks
> > start,
> > > > > using
> > > > > > a
> > > > > > > > > subdag
> > > > > > > > > > > is
> > > > > > > > > > > >> a
> > > > > > > > > > > >> > > pretty quick way to set those dependencies and I
> > > think
> > > > > > also
> > > > > > > > make
> > > > > > > > > > it
> > > > > > > > > > > >> > easier
> > > > > > > > > > > >> > > to follow the dag code.
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > On Fri, Jun 12, 2020 at 9:53 AM Kyle Hamlin <
> > > > > > > > > hamlin.kn@gmail.com>
> > > > > > > > > > > >> wrote:
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> > > > I second Ash’s grouping concept.
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > On Fri, Jun 12, 2020 at 5:10 AM Ash
> > Berlin-Taylor
> > > <
> > > > > > > > > > ash@apache.org
> > > > > > > > > > > >
> > > > > > > > > > > >> > > wrote:
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > > > > Question:
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > Do we even need the SubDagOperator anymore?
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > Would removing it entirely and just
> replacing
> > it
> > > > > with
> > > > > > a
> > > > > > > UI
> > > > > > > > > > > >> grouping
> > > > > > > > > > > >> > > > > concept be conceptually simpler, less to get
> > > > wrong,
> > > > > > and
> > > > > > > > > closer
> > > > > > > > > > > to
> > > > > > > > > > > >> > what
> > > > > > > > > > > >> > > > > users actually want to achieve with subdags?
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > With your proposed change, tasks in subdags
> > > could
> > > > > > start
> > > > > > > > > > running
> > > > > > > > > > > in
> > > > > > > > > > > >> > > > > parallel (a good change) -- so should we not
> > > also
> > > > > just
> > > > > > > > > > > _enitrely_
> > > > > > > > > > > >> > > remove
> > > > > > > > > > > >> > > > > the concept of a sub dag and replace it with
> > > > > something
> > > > > > > > > > simpler.
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > Problems with subdags (I think. I haven't
> used
> > > > them
> > > > > > > > > > extensively
> > > > > > > > > > > so
> > > > > > > > > > > >> > may
> > > > > > > > > > > >> > > > > be wrong on some of these):
> > > > > > > > > > > >> > > > > - They need their own dag_id, but it has(?)
> to
> > > be
> > > > of
> > > > > > the
> > > > > > > > > form
> > > > > > > > > > > >> > > > > `parent_dag_id.subdag_id`.
> > > > > > > > > > > >> > > > > - They need their own schedule_interval, but
> > it
> > > > has
> > > > > to
> > > > > > > > match
> > > > > > > > > > the
> > > > > > > > > > > >> > parent
> > > > > > > > > > > >> > > > dag
> > > > > > > > > > > >> > > > > - Sub dags can be paused on their own. (Does
> > it
> > > > make
> > > > > > > sense
> > > > > > > > > to
> > > > > > > > > > do
> > > > > > > > > > > >> > this?
> > > > > > > > > > > >> > > > > Pausing just a sub dag would mean the sub
> dag
> > > > would
> > > > > > > never
> > > > > > > > > > > >> execute, so
> > > > > > > > > > > >> > > > > the SubDagOperator would fail too.
> > > > > > > > > > > >> > > > > - You had to choose the executor to
> operator a
> > > > > subdag
> > > > > > > with
> > > > > > > > > --
> > > > > > > > > > > >> always
> > > > > > > > > > > >> > a
> > > > > > > > > > > >> > > > > bit of a kludge.
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > Thoughts?
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > -ash
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > On Jun 12 2020, at 12:01 pm, Ash
> > Berlin-Taylor <
> > > > > > > > > > ash@apache.org>
> > > > > > > > > > > >> > wrote:
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > > > Workon sub-dags is much needed, I'm
> excited
> > to
> > > > see
> > > > > > how
> > > > > > > > > this
> > > > > > > > > > > >> > > progresses.
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > >> - *Unpack SubDags during dag parsing*:
> This
> > > > > > rewrites
> > > > > > > > the
> > > > > > > > > > > >> > > > > *DagBag.bag_dag*
> > > > > > > > > > > >> > > > > >> method to unpack subdag while parsing,
> and
> > it
> > > > > will
> > > > > > > > give a
> > > > > > > > > > > flat
> > > > > > > > > > > >> > > > > >> structure at
> > > > > > > > > > > >> > > > > >> the task level
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > > The serialized_dag representation already
> > does
> > > > > this
> > > > > > I
> > > > > > > > > think.
> > > > > > > > > > > At
> > > > > > > > > > > >> > least
> > > > > > > > > > > >> > > > if
> > > > > > > > > > > >> > > > > > I've understood your idea here correctly.
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > > -ash
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > > On Jun 12 2020, at 9:51 am, Xinbin Huang <
> > > > > > > > > > > bin.huangxb@gmail.com
> > > > > > > > > > > >> >
> > > > > > > > > > > >> > > > wrote:
> > > > > > > > > > > >> > > > > >
> > > > > > > > > > > >> > > > > >> Hi everyone,
> > > > > > > > > > > >> > > > > >>
> > > > > > > > > > > >> > > > > >> Sending a message to everyone and collect
> > > > > feedback
> > > > > > on
> > > > > > > > the
> > > > > > > > > > > >> AIP-34
> > > > > > > > > > > >> > on
> > > > > > > > > > > >> > > > > >> rewriting SubDagOperator. This was
> > previously
> > > > > > briefly
> > > > > > > > > > > >> mentioned in
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > > >> discussion about what needs to be done
> for
> > > > > Airflow
> > > > > > > 2.0,
> > > > > > > > > and
> > > > > > > > > > > >> one of
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > > >> ideas is to make SubDagOperator attach
> > tasks
> > > > back
> > > > > > to
> > > > > > > > the
> > > > > > > > > > root
> > > > > > > > > > > >> DAG.
> > > > > > > > > > > >> > > > > >>
> > > > > > > > > > > >> > > > > >> This AIP-34 focuses on solving
> > SubDagOperator
> > > > > > related
> > > > > > > > > > issues
> > > > > > > > > > > by
> > > > > > > > > > > >> > > > > reattaching
> > > > > > > > > > > >> > > > > >> all tasks back to the root dag while
> > > respecting
> > > > > > > > > > dependencies
> > > > > > > > > > > >> > during
> > > > > > > > > > > >> > > > > >> parsing. The original grouping effect on
> > the
> > > UI
> > > > > > will
> > > > > > > be
> > > > > > > > > > > >> achieved
> > > > > > > > > > > >> > > > through
> > > > > > > > > > > >> > > > > >> grouping related tasks by metadata.
> > > > > > > > > > > >> > > > > >>
> > > > > > > > > > > >> > > > > >> This also makes the dag_factory function
> > more
> > > > > > > reusable
> > > > > > > > > > > because
> > > > > > > > > > > >> you
> > > > > > > > > > > >> > > > don't
> > > > > > > > > > > >> > > > > >> need to have parent_dag_name and
> > > child_dag_name
> > > > > in
> > > > > > > the
> > > > > > > > > > > function
> > > > > > > > > > > >> > > > > signature
> > > > > > > > > > > >> > > > > >> anymore.
> > > > > > > > > > > >> > > > > >>
> > > > > > > > > > > >> > > > > >> Changes proposed:
> > > > > > > > > > > >> > > > > >>
> > > > > > > > > > > >> > > > > >> - *Unpack SubDags during dag parsing*:
> This
> > > > > > rewrites
> > > > > > > > the
> > > > > > > > > > > >> > > > > *DagBag.bag_dag*
> > > > > > > > > > > >> > > > > >> method to unpack subdag while parsing,
> and
> > it
> > > > > will
> > > > > > > > give a
> > > > > > > > > > > flat
> > > > > > > > > > > >> > > > > >> structure at
> > > > > > > > > > > >> > > > > >> the task level
> > > > > > > > > > > >> > > > > >> - *Simplify SubDagOperator*: The new
> > > > > SubDagOperator
> > > > > > > > acts
> > > > > > > > > > > like a
> > > > > > > > > > > >> > > > > >> container and most of the original
> methods
> > > are
> > > > > > > removed.
> > > > > > > > > The
> > > > > > > > > > > >> > > > > >> signature is
> > > > > > > > > > > >> > > > > >> also changed to *subdag_factory *with
> > > > > *subdag_args
> > > > > > > *and
> > > > > > > > > > > >> > > > > *subdag_kwargs*.
> > > > > > > > > > > >> > > > > >> This is similar to the PythonOperator
> > > > signature.
> > > > > > > > > > > >> > > > > >> - *Add a TaskGroup model and add
> > > current_group
> > > > &
> > > > > > > > > > parent_group
> > > > > > > > > > > >> > > > > attributes
> > > > > > > > > > > >> > > > > >> to BaseOperator*: This metadata is used
> to
> > > > group
> > > > > > > tasks
> > > > > > > > > for
> > > > > > > > > > > >> > > > > >> rendering at
> > > > > > > > > > > >> > > > > >> UI level. It may potentially extend
> further
> > > to
> > > > > > group
> > > > > > > > > > > arbitrary
> > > > > > > > > > > >> > > tasks
> > > > > > > > > > > >> > > > > >> outside the context of subdag to allow
> > > > > group-level
> > > > > > > > > > operations
> > > > > > > > > > > >> > > (i.e.
> > > > > > > > > > > >> > > > > >> stop/trigger a group of task within the
> > dag)
> > > > > > > > > > > >> > > > > >> - *Webserver UI for SubDag*: Proposed UI
> > > > > > modification
> > > > > > > > to
> > > > > > > > > > > allow
> > > > > > > > > > > >> > > > > >> (un)collapse a group of tasks for a flat
> > > > > structure
> > > > > > to
> > > > > > > > > pair
> > > > > > > > > > > with
> > > > > > > > > > > >> > > the
> > > > > > > > > > > >> > > > > first
> > > > > > > > > > > >> > > > > >> change instead of the original
> hierarchical
> > > > > > > structure.
> > > > > > > > > > > >> > > > > >>
> > > > > > > > > > > >> > > > > >>
> > > > > > > > > > > >> > > > > >> Please see related documents and PRs for
> > > > details:
> > > > > > > > > > > >> > > > > >> AIP:
> > > > > > > > > > > >> > > > > >>
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> >
> > > > > > > > > > > >>
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-34+Rewrite+SubDagOperator
> > > > > > > > > > > >> > > > > >>
> > > > > > > > > > > >> > > > > >> Original Issue:
> > > > > > > > > > > https://github.com/apache/airflow/issues/8078
> > > > > > > > > > > >> > > > > >> Draft PR:
> > > > > > > https://github.com/apache/airflow/pull/9243
> > > > > > > > > > > >> > > > > >>
> > > > > > > > > > > >> > > > > >> Please let me know if there are any
> aspects
> > > > that
> > > > > > you
> > > > > > > > > > > >> > agree/disagree
> > > > > > > > > > > >> > > > > >> with or
> > > > > > > > > > > >> > > > > >> need more clarification (especially the
> > third
> > > > > > change
> > > > > > > > > > > regarding
> > > > > > > > > > > >> > > > > TaskGroup).
> > > > > > > > > > > >> > > > > >> Any comments are welcome and I am looking
> > > > forward
> > > > > > to
> > > > > > > > it!
> > > > > > > > > > > >> > > > > >>
> > > > > > > > > > > >> > > > > >> Cheers
> > > > > > > > > > > >> > > > > >> Bin
> > > > > > > > > > > >> > > > > >>
> > > > > > > > > > > >> > > > >
> > > > > > > > > > > >> > > > --
> > > > > > > > > > > >> > > > Kyle Hamlin
> > > > > > > > > > > >> > > >
> > > > > > > > > > > >> > >
> > > > > > > > > > > >> >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > > Thanks & Regards
> > > > > > > > > > > Poornima
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> > >
> > > --
> > >
> > > Jarek Potiuk
> > > Polidea <https://www.polidea.com/> | Principal Software Engineer
> > >
> > > M: +48 660 796 129 <+48%20660%20796%20129> <+48660796129
> <+48%20660%20796%20129>>
> > > [image: Polidea] <https://www.polidea.com/>
> > >
> >
>
>
> --
>
> Jarek Potiuk
> Polidea <https://www.polidea.com/> | Principal Software Engineer
>
> M: +48 660 796 129 <+48%20660%20796%20129> <+48660796129
> <+48%20660%20796%20129>>
> [image: Polidea] <https://www.polidea.com/>
>


-- 

*Jacob Ferriero*

Strategic Cloud Engineer: Data Engineering

jferriero@google.com

617-714-2509

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message