airflow-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Brian Greene <>
Subject Re: Task middleware or task hooks as a way to increase observability
Date Sun, 31 Mar 2019 21:11:50 GMT
My answer is like his.

It still requires all devs making dags to use the same mechanism, but it’s reaaaaaly easy
(assuming you’re not already using those hooks).

We have a single python utility function that is set as “on_success_callback” AND for
failure.  Set it in “default_args” once for the dag and the entire thing is “instrumented”
somewhat well without any other code change.

It grabs task metadata, grabs dagrun.conf (super useful if you do a lot of triggered dags
where the dag run is your “param” structure).

We took the path of completely leaving airflow for the rest of the work...  that payload gets
sent to a single API endpoint.  Said endpoint has a lib that does “things” with the messages...
logs in dynamo for starters, some carry on to slack notifications, most get passed to the
logging/evening service after some cleanup...

You can write a whole little handler thingy to handle all the messages and get whatever you
want done, with almost no code in airflow or the dags.


Sent from a device with less than stellar autocorrect

> On Mar 31, 2019, at 3:29 PM, Stachurski, Stephan <>
> Hi-
> I'm pretty new to airflow, and I'm trying to work on getting
> visibility/observability into what airflow is doing.
> I would like to be able to observe things about dag runs and task
> instances. I would like to be able to send metrics to time series databases
> (possibly extending the existing airflow metrics exported in statsd
> format). I would like to be able to label these data points with things
> like the dag run id. Let's say that every time a task succeeded, was
> retried, or failed, it sent a datapoint where the value was the duration of
> the task, and it was labeled with dag run id. Then I could plot these
> datapoints in a stacked bar graph, where each stack includes the total
> duration of all tasks in a dag run, the height is the total duration, and I
> could analyze whether my dag runs are getting faster or slower over time.
> I would like information from airflow state to be able to label metric
> datapoints. I would like to be able to apply my own labels (perhaps via
> callback that could be executed at the time the metric is published, and
> given context parameter(s)).
> I believe I can do this today, but it requires quite a bit of work on my
> end, and all developers working on my team/cluster will sort of need to be
> onboarded with a standard way of doing things, otherwise not everything
> running in the cluster will be observed the same way.
> I could try adding callbacks to all my operators. But there are some
> problems here. When the on_success_callback runs, I know when the task
> started, but I have to kind of infer the total duration by taking the
> difference between the task start time and "now" in the middle of the
> callback. Also it's kind of tedious and error-prone to make sure that these
> callbacks are actually used everywhere.
> I could use custom operators, which is slightly less tedious than
> augmenting every operator instance, but still not as elegant to me as the
> idea of airflow offering task, operator, and/or dag middleware extension
> points.
> An alternative way of approaching the problem would be if airflow fired
> webhooks, corresponding to events like dag runs starting, ending, being
> cleared, or tasks starting, being retried, etc. Think of github firing
> webhooks when branches are updated, PRs opened, updated, closed, etc, and
> CI/CD systems loosely coupled to github webhooks that run builds and deploy
> code. If you could configure airflow to make a request to a webhook
> endpoint whenever something happened, you could include a bunch of relevant
> state in the payload. The listener could use the info in the payload to
> build the metrics I need, and more. The listener could also make further
> requests to the airflow api if necessary.
> I haven't really explored how the perspective on the problem changes if I
> wanted to use pull style metrics instead of push style. For example, if I
> wanted to get the same graph I described above, except from prometheus
> scraping airflow, then maybe I wouldn't need middleware or hooks. If
> airflow fires webhooks, then I need a webhook listener to interpret these
> hooks and get data into my monitoring system. If airflow provides
> middleware extension points, then I need to stick my custom code directly
> within airflow, which sort of introduces a coupling between airflow and the
> monitoring system. If I provide a metrics endpoint for something like
> prometheus to scrape, then additional logic or actions I would have stuck
> on my webhook listener now has to plug into or monitor prometheus instead.
> I'm putting this out to the list for two reasons:
> Maybe someone can suggest a simple solution where I can write a small
> amount of code in one place to glue things together, and not depend on devs
> on my team always reaching for the correct custom operator if they're used
> to using the standard ones.
> If there is no satisfactory solution, then what's the right way to evolve
> airflow so that there is a satisfactory solution?
> For additional context I asked about this on slack already:
> I have to admit that I don't fully understand what @bosnjak was getting at
> here, maybe this is really the answer to all my problems:
>> bosnjak   [10 days ago]
> if you want to handle them differently depending on the task, you should
> have a single callback handler that can route the calls depending on the
> context, which is passed to the handler by default. You can access task
> instance and other stuff from there.

View raw message