In this post, I introduce the concept of dynamic DAG creation and explain the significance of Python global variables for Airflow.
What do I mean by “dynamic DAG”?
Dynamic DAG creation is important for scalable data pipeline applications.
When confined to the realm of static DAG scripts, we find ourselves duplicating code in order to create pipelines.
This duplication is undesirable because (usually) it causes an increase in code-base complexity, making DAGs more difficult to update and increasing the changes of bugs appearing.
For example, updated DAGfile code must be copied across each replicated instance, while making sure to keep the intended diffs (e.g. params, custom logic) intact. In other words, a nightmare.
In order to dynamically create DAGs with Airflow, we need two things to happen:
- Run a function that instantiates an
- Pass that object back to the global namespace of the DAGfile.
Sounds simple, and it is. But let’s see how it could go wrong.
Static DAG Example
Let’s imagine I have a pipeline that get’s the current price of bitcoin (BTC) and emails it to me:
There’s nothing wrong here. Not yet anyway.
It renders the following DAG:
Dynamic DAG example
Now let’s imagine we wanted to get the price of some other cryptocurrencies as well; say, Ethereum (ETH), Litecoin (LTC) and Stellar (XLM).
We might try to accomplish that dynamically as follows:
Seems reasonable right? I iterate over the coins and dynamically create a DAG for each.
I’m even making sure to pass the instantiated DAG object
dag back to the global namespace of the DAGfile (lines 22, 26).
However this will not work.
At least, not how we expect. It produces only one DAG (for XLM, the last element in the list):
We are missing the other DAGs:
Why it doesn’t work
In order to understand why the above code does not act like we need it to, we have to consider Ariflow’s core concept of DAG scope.
In particular: “Airflow will load any DAG object it can import from a DAGfile. Critically, that means the DAG must appear in globals()”
globals is a built-in function that returns a dictionary of global variables. In addition to getting variables, it can be used to set them. E.g.
>>> globals()["my_name"] = "Alex"
So we can now see what’s happening; for each loop (over the symbols BTC, ETH, LTC and XLM) the
dag variable changes reference.
Thus, all DAGs except the last lose their global variable reference.
How to make it work
Knowing about this core concept of Airflow, the solution is trivial. All we need to do is maintain references to each DAG in the loop.
This can be accomplished as follows:
Here I’m using
globals() to update the global namespace with my DAG object (line 3) as my loop executes.
This produces the expected DAGs in the Airflow dashboard
We’ve seen how using Python’s builtin globals function can be useful when dynamically creating Airflow DAGs.
As always, thanks for reading. Now get back to your code! Your projects are missing you ;)