This post demonstrates how you can leverage the Dask XGBoost integration for faster machine learning in parallel, with only minor changes to your existing XGBoost Python code.
You will learn how to:
Here is the Python XGBoost example we will use if you want to jump right in. The code demonstrates training an XGBoost model on 100GB of training data in less than 4 minutes. This data is stored in a public bucket and you can use Coiled for free if you’d like to run these computations yourself.
By default, XGBoost trains your model sequentially. This is fine for basic projects, but as the size of your dataset and/or XGBoost model grows, you'll want to consider running XGBoost in distributed mode with Dask to speed up computations and reduce the burden on your local machine.
You'll know when you've hit the memory limits of your machine when you get the following error message:
//<[["xgboost.core.XGBoostError: out of memory"]]>
XGBoost comes with a native Dask integration that makes it possible to train multiple models in parallel. Running an XGBoost model with the distributed Dask backend only requires two changes to your regular XGBoost code:
Let's see this in action with a hands-on data science project.
We'll be working with a synthetic 100GB dataset stored in a public Amazon S3 bucket which we'll load into a Dask DataFrame. Using your Coiled Cloud subscription, you can run the entire Python XGBoost example in this notebook yourself. You will need to create a local environment with the dependencies listed in the accompanying .yml file.
First, instantiate a local version of the Dask distributed scheduler, which will orchestrate training your model in parallel.
//from dask.distributed import Client, LocalCluster
# local dask cluster
cluster = LocalCluster(n_workers=4)
client = Client(cluster)
client//]]>
Since this is synthetic data, we won't be doing any preprocessing. For an example on real-world data that does include preprocessing, check out this notebook that trains XGBoost using a Dask DataFrame containing a 20GB subset of the ARCOS dataset. When you're done preprocessing, you can create your train and test splits using the dask ml library.
In this section, we are working with data_local, a subset of the entire dataset containing only the first 50 partitions. Take a look at the notebook if you want to know how to create the data_local subset.
//from dask_ml.model_selection import train_test_split
# Create the train-test split
X, y = data_local.iloc[:, :-1], data_local["target"]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, shuffle=True, random_state=2
)//]]>
Now you're all set to train your XGBoost model.
Let’s use the default parameters for this example.
//import xgboost as xgb
# Create the XGBoost DMatrices
dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)
dtest = xgb.dask.DaskDMatrix(client, X_test, y_test)
# train the model
output = xgb.dask.train(
client, params, dtrain, num_boost_round=4,
evals=[(dtrain, 'train')]
)//]]>
You can then use your trained model together with your testing split to make predictions.
//# make predictions
y_pred = xgb.dask.predict(client, output, dtest)//]]>
As you can see, not much has changed from your regular XGBoost code. We’ve just used xgb.dask.DaskDMatrix instead of xgb.DMatrix and called xgb.dask.train() instead of xgb.train(). Note that the input to DaskDMatrix does not have to be a Dask DataFrame; it can also be a Dask Array.
Now if you were to train XGBoost on the entire Dask DataFrame data, however, Dask would throw a MemoryError because your are still running locally:
//MemoryError
distributed.worker - WARNING - Worker is at 80% memory usage. Pausing worker. Process memory: 1.49 GiB -- Worker memory limit: 1.86 GiB
//]]>
Let's now process the entire 100 GB dataset by scaling XGBoost to the cloud.
You’ll need to make 2 changes to the Python code we wrote above:
Let's jump in.
We'll start by launching a Coiled cluster in the cloud that can run your pipeline on the entire dataset. To run the Python code in this section, you'll need a Coiled Cloud account.
//import coiled
cluster = coiled.Cluster(
name="xgboost",
software="coiled-examples/xgboost",
n_workers=50,
worker_memory='16Gib',
shutdown_on_close=False,
)//]]>
Note if you’re using the Coiled Free Tier you’ll have to decrease the n_workers argument to 24 as this Tier allows you a maximum of 100 concurrent cores, and the default setting is 4 cores per Dask worker plus 1 for the Dask scheduler.
Finally, instruct the Dask scheduler to run computations on your Coiled cluster.
//# connect Dask to your Coiled cluster
from dask.distributed import Client
client = Client(cluster)
client//]]>
For this section, be sure to use the whole Dask DataFrame data containing 2750 partitions, rather than the data_local subset containing only the first 50 partitions we used above.
//# download data from S3
data = dd.read_parquet(
"s3://coiled-datasets/synthetic-data/synth-reg-104GB.parquet/",
compression="lz4",
storage_options={"anon": True, 'use_ssl': True},
)//]]>
You can now re-run all the same code from above. All computations will be run on the multiple machines in your Coiled cluster in the cloud rather than on your local single machine. This means you’ll have orders of magnitude more compute at your disposal.
//%%time
# train the model
output = xgb.dask.train(
client, params, dtrain, num_boost_round=5,
evals=[(dtrain, 'train')]
)//]]>
CPU times: user 17.5 s, sys: 3.43 s, total: 20.9 s
Wall time: 3min 24s
Coiled Cloud also hosts a Dask interface to follow along with running computations. Read more about the Dask dashboard in the Pro tips below.
After our training is done, we can save the model and close down the cluster, releasing the resources. Had we forgotten to do so for whatever reason, Coiled automatically shuts down clusters after 20 minutes of inactivity, to help avoid unnecessary costs.
//# Shut down the cluster
client.cluster.close()//]]>
We've collected a set of pro tips straight from the Dask core team to help you speed up your XGBoost training and get immediate value for your data science workflows:
Let's recap what we've discussed in this post:
We'd love to see you apply distributed training of your XGBoost model to a dataset that's meaningful to you. If you'd like to try, swap your dataset into this Python XGboost example notebook and see how well it does! Let us know how you get on in our Coiled Community Slack channel!
Thanks for reading! And if you’re a data scientist interested in trying out Coiled, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.