You can run specific Apache Airflow tasks on a Dask cluster to leverage additional computing resources in the cloud whenever you need them. This post gives you 3 Apache Airflow examples in Python to get you started. We will walk you through 1 of them in detail.
Jump straight into the code with these Python scripts.
Apache Airflow is one of the most popular tools for orchestrating data engineering, machine learning, and DevOps workflows, developed by the Apache Software Foundation. Out-of-the-box, Airflow will run your computations locally, which means you can only process datasets that fit within the memory resources of your machine.
To use an Apache Airflow DAG (directed acyclic graph) for data pipelines with larger-than-memory datasets, you can scale out the specific Airflow tasks containing heavy data science workloads to a Dask cluster using Coiled. And since Coiled clusters are charged by the second, you'll only be paying for the extra resources when you're actually using them.
Airflow DAG examples can be hard to find. This blog will show you how to construct your own Apache Airflow DAG for larger-than-memory datasets with only minimal changes to your existing Python code.
We'll walk through one example in detail, you can find the other Airflow DAG examples in this dedicated repository.
Airflow workflows are defined using Tasks and DAGs, which are orchestrated by Executors. DAG stands for Directed Acyclic Graph and each DAG is a Python script that defines your Airflow workflow.
To delegate heavy workflows to Dask, we'll spin up a Coiled cluster within a Task that contains heavy computation and bring back the result, in this case a .value_counts() over a column of interest. Since this result will fit into memory easily, we can shut down the cluster immediately to limit cost and continue using the result in our next tasks locally.
The DAG will contain the following 3 Tasks :
Let's start by defining an Airflow DAG using the @dag decorator, passing it the default_args defined earlier in the script as well as a number of other arguments you can tweak.
Let's define our first Task .
This spins up a Coiled cluster named "airflow-task" consisting of 20 Dask workers, each running a specified Coiled software environment to ensure that they have all the right dependencies.
We can then read our dataset stored in an Amazon S3 bucket into a Dask DataFrame and calculate the result we're interested in. Here, we load in the Github Archive data for 2015 (subsetted to include only PushEvents) and calculate the number of PushEvents per user for the entire year using a call to .value_counts(). Dask also supports reading from Google Cloud, Microsoft Azure, and other cloud platforms.
Since we now have our result locally, we can shut the cluster down to limit our costs. This is really just a formality because Coiled will shut down your cluster automatically after 20 minutes of inactivity.
Going over to the Coiled Cloud dashboard we can see that the computation performed during this Task cost us 5 cents. And no, that’s not a typo ;) That means you could execute this DAG run up to 200 times a month for free using the Coiled free tier.
We've leveraged cloud resources to get the data we're interested in and now we can proceed with our following Tasks locally. Because Coiled runs locally on your own machine, reading and writing to local disk is straightforward.
The next Task will generate summary statistics over the result pandas Series and save those to a CSV file:
And the final Task will fetch the usernames and number of PushEvents for the top 100 most active users:
Finally, we'll specify the order in which we want each Airflow Task to run and trigger the DAG run workflow by calling the DAG function airflow_on_coiled.
You're all set! You can now add your DAG file to your dags folder (by default: ~/airflow/dags ) and execute each DAG run as needed using the Airflow UI. Read the Airflow documentation to see how you can set a schedule interval, tweak airflow scheduler settings and customise each DAG run.
Important: Airflow disables pickling by default. You will have to enable it in order to run Dask tasks. You can do this by editing your airflow.cfg file or by setting the corresponding environment variable using export AIRFLOW__CORE__ENABLE_XCOM_PICKLING = True. Do this before launching your Airflow webserver.
If you’re working on an Apple M1 machine you may want to check out this blog on installing PyData libraries using conda. Specifically, make sure that neither blosc nor python-blosc libraries are installed on your local and cluster software environments.
In our dedicated airflow-with-coiled repository, you will find two more Airflow DAG examples using Dask.. The examples include common ETL pipeline operations.
The Airflow DAG examples in the repo above launch Coiled clusters from within an Airflow task. You can also opt for a different architecture and run all of the tasks in an Airflow DAG on a Coiled cluster. You can then use Coiled’s adaptive scaling capabilities to scale the number of workers up and down depending on the workload.
To do this, switch from using Airflow’s default SequentialExecutor to the DaskExecutor. Using any Airflow executor other than the default SequentialExecutor also requires setting up a dedicated database backend where Airflow can store the metadata related to your workflow. Once that’s done, point the DaskExecutor to a Coiled cluster that is already running.
You can do this by making the following changes in your airflow.cfg file, by default stored in ~/airflow/.
You can then run the entire workflow on Coiled.
Including a cluster.adapt(minimum=1, maximum=20) in the script that spins up your Coiled cluster will ensure that the cluster adaptively scales between a set minimum and maximum number of workers (in this case between 1 and 20) depending on the workload.
Note that this architecture means that your cluster is managed outside of your Airflow DAG data pipeline; i.e. either manually or through a deployment tool. We are currently exploring upgrading the DaskExecutor to improve performance and expand its capabilities, including cluster lifecycle management.
Thanks for reading! And if you’re interested in trying out Coiled, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.