Prefect is a popular open-source Python library for automated workflow orchestration. When running Prefect out-of-the-box orchestration of your workflow is done in the cloud, while the actual computation of your code is done locally on your machine. This means that your Prefect Cloud workflows are limited by the resources of your machine.
This blog post will show you:
You can also jump straight into the code with this Prefect Dask Python script.
Prefect is an open-source Python library for automated data workflow orchestration that provides native Dask integration. A Prefect workflow consists of Flow and Tasks, which are orchestrated by an Executor. Prefect’s default Executor runs tasks sequentially. This is fine for simple workflows but means your computations may be running slower than they need to because they are underutilising available resources.
As a data scientist or engineer, you’ll want consider optimising performance by switching to the DaskExecutor. This will leverage the multiple cores of your local machine, speeding up compute-heavy tasks like joins, shuffles and machine learning jobs.
//# create a temporary local Dask Executor
executor = DaskExecutor()//]]>
When you hit the memory limits of your machine, you can connect the DaskExecutor to cloud-computing resources to distribute work across multiple machines. One way to do so is by spinning up a Coiled Cluster and running your Prefect Flows there.
There are three different approaches you can take to set this up. You can:
- run your entire Prefect data workflow on Coiled by using a Coiled cluster as your DaskExecutor,
- run only specific Prefect tasks on Coiled by spinning up a Coiled cluster within specific compute-heavy tasks,
- run specific Prefect tasks on Coiled only when the size of the dataset passes a certain threshold, by using a Coiled cluster within a Prefect ResourceManager object.
The links to the Coiled documentation above include examples of the first two methods; the section below will show you how to code a Prefect script that delegates computations to a Dask cluster when the size of your dataset passes a certain threshold.
The code example below converts a Github Archive dataset from JSON to Parquet and writes it to cloud storage, leveraging the compute resources of Coiled to do so whenever the dataset becomes too large to handle locally. This means you can convert the entire 75GB dataset on the cloud without ever having to download it to your machine. You can find the complete Python script here.
The ETL workflow will look something like this:
Let’s begin by defining the tasks we want to run. If you’re comfortable defining a Prefect Task, feel free to scroll down to the next section to connect your Prefect workflow to Coiled.
Let’s define the tasks in the order they will be run. We’ll start with the Prefect Task that will create the list of filenames we want to fetch from the Github Archive. For more context on how we’re building this code, have a look at this Jupyter Notebook.
def create_list(start_date, end_date, format="%d-%m-%Y"):
start = datetime.datetime.strptime(start_date, format)
end = datetime.datetime.strptime(end_date, format)
date_generated = [start + datetime.timedelta(days=x) for x in range(0, (end-start).days)]
prefix = "https://data.gharchive.org/"
filenames = 
for date in date_generated:
for hour in range(1,24):
filenames.append(prefix + date.strftime("%Y-%m-%d") + '-' + str(hour) + '.json.gz')
Next, let’s define the Task that will determine the type of cluster our Flow will spin up. We’ll use len(filenames) as a proxy for the dataset size, you could also think of other methods to estimate the size of the data in memory.
if len(filenames) > 100:
We’ll also need a Task that will fetch the data as specified in the filenames list...
records = db.read_text(filenames).map(ujson.loads)
push_events = records.filter(lambda record: record["type"] == "PushEvent")
...a Task to transform the raw JSON data into tabular DataFrame format...
for commit in record["payload"]["commits"]:
processed = push_events.map(process)
df = processed.flatten().to_dataframe()
...and a Task to write the flattened DataFrame to our Amazon S3 bucket as a Parquet file.
def to_parquet(df, path):
Note: when running Dask commands within Prefect Tasks, you’ll need to ensure that you have fewer active Prefect tasks than n_workers * n_threads, or that you wrap your compute calls within a with worker_client()context manager. This avoids Prefect tasks waiting on work that can't start since there are no open threads to run in.
Fantastic, we’ve defined all of the Tasks in our Flow. The next step is to define the two cluster types our Flow can use: local for when the dataset is small enough to process locally and coiled otherwise.
When using temporary cloud compute resources, you need to make sure that these are properly instantiated, used and cleaned up in order to avoid errors and unnecessary costs. We’ll use the Prefect ResourceManager object to do this by defining the required __init__, setup, and close blocks. Make sure to include any keyword arguments you’ll want to pass to the cluster in the __init__ definition. This block creates the Dask cluster, including the Dask scheduler, and connects it your local client.
//# Define a ResourceManager object
def __init__(self, cluster_type="local", n_workers=None, software=None, account=None, name=None):
self.cluster_type = cluster_type
self.n_workers = n_workers
self.software = software
self.account = account
self.name = name
if self.cluster_type == "local":
elif self.cluster_type == "coiled":
cluster = coiled.Cluster(
name = self.name,
software = self.software,
n_workers = self.n_workers,
account = self.account,
def cleanup(self, client):
if self.cluster_type == "coiled":
Now that you’ve defined your Tasks and ResourceManager, the next step is to tell Prefect how these tasks relate to each other and how you’ll need them to be run. We’ll also define a number of Parameters that can be tweaked per Flow run by the user.
//# Build Prefect Flow
with Flow(name="Github ETL Test") as flow:
# define parameters
n_workers = Parameter("n_workers", default=4)
software = Parameter("software", default='coiled-examples/prefect')
account = Parameter("account", default=None)
name = Parameter("name", default='cluster-name')
start_date = Parameter("start_date", default="01-01-2015")
end_date = Parameter("end_date", default="31-12-2015")
# build flow
filenames = create_list(start_date=start_date, end_date=end_date)
cluster_type = determine_cluster_type(filenames)
# use ResourceManager object
) as client:
push_events = get_github_data(filenames)
df = to_dataframe(push_events)
Great job! You’re now ready to run your Prefect Flow. Go ahead and experiment with various values for the end_date parameter to see the conditional Coiled cluster spin-up in action: setting end_date to anything after “06-01-2015” will delegate your computations to a cluster.
You can also customize your Coiled cluster by passing alternative Parameter values to flow.run(), such as n_workers and cluster name. If you want to tweak other features of your Dask cluster such as Dask worker and Dask scheduler memory or the idle timeout, you’ll have to include the corresponding keyword arguments in the __init__ block of DaskCluster.
//# Run flow with parameters
Prefect Cloud is built as a hybrid model in which only the metadata about your data workflow is communicated to Prefect. The actual code you’re running never leaves your machine. To match these security standards, Coiled provides end-to-end network security by the use of both cloud networking policies and SSL/TLS encryption. For additional control, Coiled can be deployed within your own cloud account where you can specify and manage data access controls directly. You can refer to our documentation on Security & Privacy for more information.
In this post, we discussed when and how to use Dask and Coiled to leverage parallelism in your Prefect workflows, either locally or in the cloud. We then built a Prefect data engineering workflow that delegates computation to a Coiled cluster whenever the size of the dataset exceeds a certain threshold.
We’d love to see any Prefect Dask workflows you've built! Drop us a line at firstname.lastname@example.org or in our Coiled Community Slack channel to get in touch.
Thanks for reading! If you’re interested in taking Coiled Cloud for a spin, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.