Airflow Dynamic DAGs — Python Globals
In this post, I introduce the concept of dynamic DAG creation and explain the significance of Python global variables for Airflow.
What do I mean by “dynamic DAG”?
Dynamic DAG creation is important for scalable data pipeline applications.
When confined to the realm of static DAG scripts, we find ourselves duplicating code in order to create pipelines.
This duplication is undesirable because (usually) it causes an increase in code-base complexity, making DAGs more difficult to update and increasing the chances of bugs appearing.
For example, updated DAGfile code must be copied across each replicated instance, while making sure to keep the intended diffs (e.g. params, custom logic) intact. In other words, a nightmare.
In order to dynamically create DAGs with Airflow, we need two things to happen:
- Run a function that instantiates an
airflow.DAG
object. - Pass that object back to the global namespace of the DAGfile.
Sounds simple, and it is. But let’s see how it could go wrong.
Static DAG Example
Let’s imagine I have a pipeline that get’s the current price of bitcoin (BTC) and emails it to me:
There’s nothing wrong here. Not yet anyway.
It renders the following DAG:
Dynamic DAG example
Now let’s imagine we wanted to get the price of some other cryptocurrencies as well; say, Ethereum (ETH), Litecoin (LTC) and Stellar (XLM).
We might try to accomplish that dynamically as follows:
Seems reasonable right? I iterate over the coins and dynamically create a DAG for each.
I’m even making sure to pass the instantiated DAG object dag
back to the global namespace of the DAGfile (lines 22, 26).
However this will not work.
At least, not how we expect. It produces only one DAG (for XLM, the last element in the list):
We are missing the other DAGs: email_btc_price
,email_eth_price
and email_ltc_price
Why it doesn’t work
In order to understand why the above code does not act like we need it to, we have to consider Ariflow’s core concept of DAG scope.
In particular: “Airflow will load any DAG object it can import from a DAGfile. Critically, that means the DAG must appear in globals()”
In Python, globals
is a built-in function that returns a dictionary of global variables. In addition to getting variables, it can be used to set them. E.g.
>>> globals()["my_name"] = "Alex"
>>> print(my_name)
Alex
So we can now see what’s happening; for each loop (over the symbols BTC, ETH, LTC and XLM) the dag
variable changes reference.
Thus, all DAGs except the last lose their global variable reference.
How to make it work
Knowing about this core concept of Airflow, the solution is trivial. All we need to do is maintain references to each DAG in the loop.
This can be accomplished as follows:
Here I’m using globals()
to update the global namespace with my DAG object (line 3) as my loop executes.
This produces the expected DAGs in the Airflow dashboard
Conclusion
We’ve seen how using Python’s builtin globals function can be useful when dynamically creating Airflow DAGs.
As always, thanks for reading. Now get back to your code! Your projects are missing you ;)