Airflow Dynamic DAGs — Python Globals

In this post, I introduce the concept of dynamic DAG creation and explain the significance of Python global variables for Airflow.

What do I mean by “dynamic DAG”?

Dynamic DAG creation is important for scalable data pipeline applications.

When confined to the realm of static DAG scripts, we find ourselves duplicating code in order to create pipelines.

This duplication is undesirable because (usually) it causes an increase in code-base complexity, making DAGs more difficult to update and increasing the changes of bugs appearing.

For example, updated DAGfile code must be copied across each replicated instance, while making sure to keep the intended diffs (e.g. params, custom logic) intact. In other words, a nightmare.

In order to dynamically create DAGs with Airflow, we need two things to happen:

  1. Run a function that instantiates an airflow.DAG object.
  2. Pass that object back to the global namespace of the DAGfile.

Sounds simple, and it is. But let’s see how it could go wrong.

Static DAG Example

Let’s imagine I have a pipeline that get’s the current price of bitcoin (BTC) and emails it to me:

There’s nothing wrong here. Not yet anyway.

It renders the following DAG:

Dynamic DAG example

Now let’s imagine we wanted to get the price of some other cryptocurrencies as well; say, Ethereum (ETH), Litecoin (LTC) and Stellar (XLM).

We might try to accomplish that dynamically as follows:

Seems reasonable right? I iterate over the coins and dynamically create a DAG for each.

I’m even making sure to pass the instantiated DAG object dag back to the global namespace of the DAGfile (lines 22, 26).

However this will not work.

At least, not how we expect. It produces only one DAG (for XLM, the last element in the list):

We are missing the other DAGs: email_btc_price,email_eth_price and email_ltc_price

Why it doesn’t work

In order to understand why the above code does not act like we need it to, we have to consider Ariflow’s core concept of DAG scope.

In particular: “Airflow will load any DAG object it can import from a DAGfile. Critically, that means the DAG must appear in globals()”

In Python, globals is a built-in function that returns a dictionary of global variables. In addition to getting variables, it can be used to set them. E.g.

>>> globals()["my_name"] = "Alex"
>>> print(my_name)
Alex

So we can now see what’s happening; for each loop (over the symbols BTC, ETH, LTC and XLM) the dag variable changes reference.

Thus, all DAGs except the last lose their global variable reference.

How to make it work

Knowing about this core concept of Airflow, the solution is trivial. All we need to do is maintain references to each DAG in the loop.

This can be accomplished as follows:

Here I’m using globals() to update the global namespace with my DAG object (line 3) as my loop executes.

This produces the expected DAGs in the Airflow dashboard

Conclusion

We’ve seen how using Python’s builtin globals function can be useful when dynamically creating Airflow DAGs.

As always, thanks for reading. Now get back to your code! Your projects are missing you ;)

https://alexgalea.ca/

Python Data Engineer, MSc. Physics

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store