Python XGBoost: Parallel XGBoost For Faster Training with Dask in Python

Python XGBoost: Train XGBoost on 100GB in 4 Minutes

This post demonstrates how you can leverage the Dask XGBoost integration for faster machine learning in parallel, with only minor changes to your existing XGBoost Python code.

You will learn how to:

  1. Train a distributed XGBoost machine learning model locally with Dask on a subset of the data,
  2. Scale your distributed training XGBoost model to the cloud with Coiled to train on all 100GB of data,
  3. Speed up your distributed training with Pro tips from the Dask core team.

Here is the Python XGBoost example we will use if you want to jump right in. The code demonstrates training an XGBoost model on 100GB of training data in less than 4 minutes. This data is stored in a public bucket and you can use Coiled for free if you’d like to run these computations yourself.

Python XGBoost: Parallel Dask XGBoost Model Training with xgb.dask.train()

By default, XGBoost trains your model sequentially. This is fine for basic projects, but as the size of your dataset and/or XGBoost model grows, you'll want to consider running XGBoost in distributed mode with Dask to speed up computations and reduce the burden on your local machine.

You'll know when you've hit the memory limits of your machine when you get the following error message:

//<[["xgboost.core.XGBoostError:  out of memory"]]>

XGBoost comes with a native Dask integration that makes it possible to train multiple models in parallel. Running an XGBoost model with the distributed Dask backend only requires two changes to your regular XGBoost code:

  1. substitute dtrain = xgb.DMatrix(X_train, y_train)
    with dtrain = xgb.dask.DaskDMatrix(X_train, y_train)
  2. substitute xgb.train(params, dtrain, ...)
    with xgb.dask.train(client, params, dtrain, ...)

Let's see this in action with a hands-on data science project.

We'll be working with a synthetic 100GB dataset stored in a public Amazon S3 bucket which we'll load into a Dask DataFrame. Using your Coiled Cloud subscription, you can run the entire Python XGBoost example in this notebook yourself. You will need to create a local environment with the dependencies listed in the accompanying .yml file.

Python XGBoost: Run the XGBoost Dask integration locally

First, instantiate a local version of the Dask distributed scheduler, which will orchestrate training your model in parallel. 

//from dask.distributed import Client, LocalCluster

# local dask cluster
cluster = LocalCluster(n_workers=4)
client = Client(cluster)
client//]]>

Since this is synthetic data, we won't be doing any preprocessing. For an example on real-world data that does include preprocessing, check out this notebook that trains XGBoost using a Dask DataFrame containing a 20GB subset of the ARCOS dataset. When you're done preprocessing, you can create your train and test splits using the dask ml library.

In this section, we are working with data_local, a subset of the entire dataset containing only the first 50 partitions. Take a look at the notebook if you want to know how to create the data_local subset.

//from dask_ml.model_selection import train_test_split

# Create the train-test split
X, y = data_local.iloc[:, :-1], data_local["target"]
X_train, X_test, y_train, y_test = train_test_split(
   X, y, test_size=0.3, shuffle=True, random_state=2
)//]]>

Now you're all set to train your XGBoost model.

Let’s use the default parameters for this example.

//import xgboost as xgb

# Create the XGBoost DMatrices
dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)
dtest = xgb.dask.DaskDMatrix(client, X_test, y_test)

# train the model
output = xgb.dask.train(
   client, params, dtrain, num_boost_round=4,
   evals=[(dtrain, 'train')]
)//]]>

You can then use your trained model together with your testing split to make predictions.

//# make predictions
y_pred = xgb.dask.predict(client, output, dtest)//]]>

As you can see, not much has changed from your regular XGBoost code. We’ve just used xgb.dask.DaskDMatrix instead of xgb.DMatrix and called  xgb.dask.train() instead of xgb.train(). Note that the input to DaskDMatrix does not have to be a Dask DataFrame; it can also be a Dask Array.

Now if you were to train XGBoost on the entire Dask DataFrame data, however, Dask would throw a MemoryError because your are still running locally:

//MemoryError

distributed.worker - WARNING - Worker is at 80% memory usage. Pausing worker.  Process memory: 1.49 GiB -- Worker memory limit: 1.86 GiB

//]]>

Python XGBoost: Scaling your Dask XGBoost Training to the Cloud

Let's now process the entire 100 GB dataset by scaling XGBoost to the cloud.

You’ll need to make 2 changes to the Python code we wrote above:

  1. Connect Dask to a Coiled cluster in the cloud instead of to our local CPU cores,
  2. Work with the entire 100 GB dataset instead of the local subset.

Let's jump in.

  1. Spin up your Cluster

We'll start by launching a Coiled cluster in the cloud that can run your pipeline on the entire dataset. To run the Python code in this section, you'll need a Coiled Cloud account.

//import coiled

cluster = coiled.Cluster(
   name="xgboost",
   software="coiled-examples/xgboost",
   n_workers=50,
   worker_memory='16Gib',
   shutdown_on_close=False,
)//]]>

Note if you’re using the Coiled Free Tier you’ll have to decrease the n_workers argument to 24 as this Tier allows you a maximum of 100 concurrent cores, and the default setting is 4 cores per Dask worker plus 1 for the Dask scheduler.

Finally, instruct the Dask scheduler to run computations on your Coiled cluster.

//# connect Dask to your Coiled cluster
from dask.distributed import Client
client = Client(cluster)
client//]]>

  1. Use the entire dataset

For this section, be sure to use the whole Dask DataFrame data containing 2750 partitions, rather than the data_local subset containing only the first 50 partitions we used above. 

//# download data from S3
data = dd.read_parquet(
   "s3://coiled-datasets/synthetic-data/synth-reg-104GB.parquet/",
   compression="lz4",
   storage_options={"anon": True, 'use_ssl': True},
)//]]>

  1. Train XGBoost in the Cloud

You can now re-run all the same code from above. All computations will be run on the multiple machines in your Coiled cluster in the cloud rather than on your local single machine. This means you’ll have orders of magnitude more compute at your disposal.

//%%time
# train the model
output = xgb.dask.train(
   client, params, dtrain, num_boost_round=5,
   evals=[(dtrain, 'train')]
)//]]>

CPU times: user 17.5 s, sys: 3.43 s, total: 20.9 s
Wall time: 3min 24s

Coiled Cloud also hosts a Dask interface to follow along with running computations. Read more about the Dask dashboard in the Pro tips below.

  1. Shut down your cluster

After our training is done, we can save the model and close down the cluster, releasing the resources. Had we forgotten to do so for whatever reason, Coiled automatically shuts down clusters after 20 minutes of inactivity, to help avoid unnecessary costs.

//# Shut down the cluster
client.cluster.close()//]]>

Python XGBoost: Pro Tips to speed up XGBoost training

We've collected a set of pro tips straight from the Dask core team to help you speed up your XGBoost training and get immediate value for your data science workflows:

  • Increase the number of Dask worker in your cluster. This will speed up your computations.
  • Re-cast columns to less memory-intensive dtypes. For example, convert float64 into int16 whenever possible. This will reduce the memory load of your dataframe and speed up training.
  • Use the Dask Dashboard to spot bottle-necks and identify opportunities for increased performance in your code. Watch the original author of Dask, Matt Rocklin, explain how to get the most out of the Dask Dashboard here.
  • Pay attention to unmanaged memory. Read Dask core contributor Guido Imperiale’s blog on how to tackle the issue of unmanaged memory in Dask workers here

Python XGBoost: Summary

Let's recap what we've discussed in this post:

  • Out of the box, XGBoost cannot be trained on large datasets that exceed your local memory.
  • You can leverage the multiple cores in your single machine by connecting XGboost to a local Dask cluster.
  • You can burst to the cloud for even higher performance gains with Coiled
  • You can tweak your distributed XGBoost performance by inspecting the Dask Dashboard.

We'd love to see you apply distributed training of your XGBoost model to a dataset that's meaningful to you. If you'd like to try, swap your dataset into this Python XGboost example notebook and see how well it does! Let us know how you get on in our Coiled Community Slack channel!

Thanks for reading! And if you’re a data scientist interested in trying out Coiled, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.

Level up your Dask using Coiled

Coiled makes it easy to scale Dask maturely in the cloud