This post teaches you how to union Dask DataFrames vertically with concat and the important related technical details. Vertical concatenation combines DataFrames like the SQL UNION operator combines tables which is common when joining datasets for reporting and machine learning. It’s useful whenever you have two tables with identical schemas that you’d like to combine into a single DataFrame.
The tactics outlined in this post will help you combine two DataFrame with the same or similar schemas into a single DataFrame. It’s a useful design pattern to have in your toolkit.
Horizontal concatenations can also use concat, but require an entirely different set of considerations, and that’s why they’re discussed in a separate post.
Here’s how this post on vertical concatenations is organized:
Create two Dask DataFrames with identical schemas.
//import dask.dataframe as dd
import pandas as pd
df = pd.DataFrame(
{"nums": [1, 2, 3, 4, 5, 6], "letters": ["a", "b", "c", "d", "e", "f"]}
)
ddf1 = dd.from_pandas(df, npartitions=2)
df = pd.DataFrame({"nums": [88, 99], "letters": ["xx", "yy"]})
ddf2 = dd.from_pandas(df, npartitions=1)//]]>
Now concatenate both the DataFrames into a single DataFrame.
//ddf3 = dd.concat([ddf1, ddf2])//]]>
Print the contents of ddf3 to verify it contains all the rows from ddf1 and ddf2.
//print(ddf3.compute())
nums letters
0 1 a
1 2 b
2 3 c
3 4 d
4 5 e
5 6 f
0 88 xx
1 99 yy//]]>
ddf1 has two partitions and ddf2 has one partition. ddf1 and ddf2 are combined to ddf3, which has three total partitions.
//ddf3.npartitions
3//]]>
Dask can use information on divisions to speed up certain queries. The creation of ddf3 above wiped out information about DataFrame divisions. Let’s see how we can interleave partitions when concatenating DataFrames to avoid losing the division's data.
Let’s revisit our example with a focus on DataFrame divisions to illustrate how concat wipes out the DataFrame divisions by default.
Recreate the ddf1 DataFrame and look at its divisions.
//df = pd.DataFrame(
{"nums": [1, 2, 3, 4, 5, 6], "letters": ["a", "b", "c", "d", "e", "f"]}
)
ddf1 = dd.from_pandas(df, npartitions=2)
ddf1.divisions # (0, 3, 5)//]]>
Here’s how to interpret this divisions output:
Let’s print every partition of the DataFrame to visualize the actual data and reason about the division's values.
//def print_partitions(ddf):
for i in range(ddf.npartitions):
print(ddf.partitions[i].compute())
print_partitions(ddf1)
nums letters
0 1 a
1 2 b
2 3 c
nums letters
3 4 d
4 5 e
5 6 f//]]>
Let’s recreate ddf2 and view its divisions too.
//df = pd.DataFrame({"nums": [88, 99], "letters": ["xx", "yy"]})
ddf2 = dd.from_pandas(df, npartitions=1)
ddf2.divisions # (0, 1)//]]>
ddf2 has a single partition with index values between zero and one.
//print_partitions(ddf2)
nums letters
0 88 xx
1 99 yy//]]>
Let’s concatenate the DataFrames and see what happens with the divisions.
//ddf3 = dd.concat([ddf1, ddf2])
ddf3.divisions # (None, None, None, None)//]]>
Dask has lost all information about divisions for ddf3 and won’t be able to use divisions-related optimizations for subsequent computations.
You can set interleave_partitions to True when concatenating DataFrames to avoid losing division's information.
//ddf3_interleave = dd.concat([ddf1, ddf2], interleave_partitions=True)
ddf3_interleave.divisions # (0, 1, 3, 5)//]]>
Take a look at how the data is distributed across partitions in ddf3_interleave.
//print_partitions(ddf3_interleave)
nums letters
0 1 a
0 88 xx
nums letters
1 2 b
2 3 c
1 99 yy
nums letters
3 4 d
4 5 e
5 6 f//]]>
Dask can optimize certain computations when divisions exist. Set interleave_partitions to True if you’d like to take advantage of these optimizations after concatenating DataFrames.
You can also concatenate DataFrames with different schemas. Let’s create two DataFrames with different schemas, concatenate them, and see how Dask behaves.
Start by creating the two DataFrames.
//df = pd.DataFrame(
{
"animal": ["cat", "dolphin", "shark", "starfish"],
"is_mammal": [True, True, False, False],
}
)
ddf1 = dd.from_pandas(df, npartitions=2)
df = pd.DataFrame({"animal": ["hippo", "lion"], "likes_water": [True, False]})
ddf2 = dd.from_pandas(df, npartitions=1)//]]>
Concatenate the DataFrames and print the result.
//ddf3 = dd.concat([ddf1, ddf2])
print(ddf3.compute())
animal is_mammal likes_water
0 cat True NaN
1 dolphin True NaN
2 shark False NaN
3 starfish False NaN
0 hippo NaN True
1 lion NaN False//]]>
Dask fills in the missing values with NaN to make the concatenation possible.
Let's create a Dask cluster and concatenate two 31 million row DataFrames to confirm that concat can scale to multi-node workflows.
Create a 5 node Coiled cluster and read in a Parquet dataset into a DataFrame.
//import coiled
import dask
cluster = coiled.Cluster(name="concat-cluster", n_workers=5)
client = dask.distributed.Client(cluster)
ddf2000 = dd.read_parquet(
"s3://coiled-datasets/timeseries/7d/parquet/2000",
storage_options={"anon": True, "use_ssl": True},
engine="pyarrow"
)//]]>
Run ddf2000.head() to visually inspect the contents of the DataFrame.
Let’s run some analytical queries on ddf2000 to better understand the data it contains.
//len(ddf2000) # 31,449,600
ddf2000.npartitions # 52//]]>
Now let’s read another Parquet dataset into a separate DataFrame and run some analytical queries.
//ddf2001 = dd.read_parquet(
"s3://coiled-datasets/timeseries/7d/parquet/2001",
storage_options={"anon": True, "use_ssl": True},
engine="pyarrow"
)
len(ddf2001) # 31,449,600
ddf2001.npartitions # 52//]]>
Concatenate the two DataFrames and inspect the contents of the resulting DataFrame.
//ddf = dd.concat([ddf2000, ddf2001])
len(ddf) # 62,899,200
ddf.npartitions # 104
ddf.divisions
(Timestamp('2000-01-01 00:00:00'),
Timestamp('2000-01-08 00:00:00'),
Timestamp('2000-01-15 00:00:00'),
…
Timestamp('2001-12-17 00:00:00'),
Timestamp('2001-12-24 00:00:00'),
Timestamp('2001-12-30 23:59:59'))//]]>
These DataFrames were concatenated without interleave_partitions=True and the division's metadata was not lost as we saw earlier.
The DataFrames in this example doesn’t have any overlapping divisions, so you don’t need to set interleave_partitions=True.
Dask makes it easy to vertically concatenate DataFrames.
Dask does not interleave partitions when concatenating by default, but will if you set interleave_partitions=True. Make sure to use this flag if your downstream queries will benefit from having divisions metadata.
Thanks for reading! If you’re interested in trying out Coiled Cloud, which provides hosted Dask clusters, docker-less managed software, and one-click deployments, you can do so for free today when you click below.