This post explains how to convert Dask DataFrame object columns to floating point columns with to_numeric() and why it’s more flexible than astype() in certain situations. This design pattern is especially useful when you’re working with data in text based file formats like CSV. You’ll often have to read in numeric columns stored in CSV files as object columns because of messy data and then convert the numeric columns to floating point values to null out the bad data. You need to convert object columns to numeric values so you can perform numerical operations like addition, subtraction, etc. You can use the tactics outlined in this post for data munging in an extract, transform, & load (ETL) pipeline.
Cleaning data is often the first step of a data project. Luckily Dask has great helper methods like to_numeric() that make it easy to clean the data and properly type the columns in your DataFrames.
Let’s look at a simple example with a DataFrame that contains an invalid string value in a column that should only contain numbers. Let’s start by creating a DataFrame with nums and letters columns.
//import dask.dataframe as dd
import pandas as pd
df = pd.DataFrame(
{"nums": [1, 2.8, 3, 4, "hi", 6], "letters": ["a", "b", "c", "d", "e", "f"]}
)
ddf = dd.from_pandas(df, npartitions=2)//]]>
Now let’s print the contents of the DataFrame so it’s easy to visualize.
//print(ddf.compute())
nums letters
0 1 a
1 2.8 b
2 3 c
3 4 d
4 hi e
5 6 f//]]>
Notice that row 4 in the nums column has the value “hi”. That’s a string value that Python cannot convert into a numerical value.
Let’s look at the data types of the columns and see that Dask is treating both nums and letters as object type columns.
//ddf.dtypes
nums object
letters object//]]>
Let’s convert the nums column to be a number column with to_numeric.
//ddf["nums"] = dd.to_numeric(ddf["nums"], errors="coerce")
ddf.dtypes
nums int64
letters object
print(ddf.compute())
nums letters
0 1.0 a
1 2.8 b
2 3.0 c
3 4.0 d
4 NaN e
5 6.0 f//]]>
Dask has conveniently nulled out the “hi” value in row 4 to be NaN. Nulling out values that cannot be easily converted to numerical values is often what you’ll want. Alternatively, you can set errors=“raise” to raise an error when a value can’t be cast to numeric dtype.
Many beginning Dask users tend to use the astype method to convert object columns to numeric columns. This has important limitations.
Let’s create another DataFrame to see when astype can be used to convert from object columns to numeric columns and when it falls short.
//df2 = pd.DataFrame(
{"n1": ["bye", 2.8, 3], "n2": ["7.7", "8", 9.2]}
)
ddf2 = dd.from_pandas(df, npartitions=2)
print(ddf2.compute())
n1 n2
0 bye 7.7
1 2.8 8
2 3 9.2//]]>
You can run ddf2.dtypes to see that both n1 and n2 are object columns.
//n1 object
n2 object//]]>
n2 is an object type column because it contains string and float values.
Let’s convert n2 to be a float64 column using astype.
//ddf2["n2"] = ddf2["n2"].astype("float64")
ddf2.dtypes
n1 object
n2 float64
dtype: object
print(ddf2.compute())
n1 n2
0 bye 7.7
1 2.8 8.0
2 3 9.2//]]>
astype can convert n2 to be a float column without issue.
Now let’s try to convert n1 to be a float column with astype.
//ddf2["n1"] = ddf2["n1"].astype("float64")
print(ddf2.compute())//]]>
This errors out with the following long error message.
//---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
Input In [37], in <module>
----> 1 print(ddf2.compute())
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/base.py:288, in DaskMethodsMixin.compute(self, **kwargs)
264 def compute(self, **kwargs):
265 """Compute this dask collection
266
267 This turns a lazy Dask collection into its in-memory equivalent.
(...)
286 dask.base.compute
287 """
--> 288 (result,) = compute(self, traverse=False, **kwargs)
289 return result
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/base.py:571, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
568 keys.append(x.__dask_keys__())
569 postcomputes.append(x.__dask_postcompute__())
--> 571 results = schedule(dsk, keys, **kwargs)
572 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/threaded.py:79, in get(dsk, result, cache, num_workers, pool, **kwargs)
76 elif isinstance(pool, multiprocessing.pool.Pool):
77 pool = MultiprocessingPoolExecutor(pool)
---> 79 results = get_async(
80 pool.submit,
81 pool._max_workers,
82 dsk,
83 result,
84 cache=cache,
85 get_id=_thread_get_id,
86 pack_exception=pack_exception,
87 **kwargs,
88 )
90 # Cleanup pools associated to dead threads
91 with pools_lock:
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/local.py:507, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
505 _execute_task(task, data) # Re-execute locally
506 else:
--> 507 raise_exception(exc, tb)
508 res, worker_id = loads(res_info)
509 state["cache"][key] = res
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/local.py:315, in reraise(exc, tb)
313 if exc.__traceback__ is not tb:
314 raise exc.with_traceback(tb)
--> 315 raise exc
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/local.py:220, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
218 try:
219 task, data = loads(task_info)
--> 220 result = _execute_task(task, data)
221 id = get_id()
222 result = dumps((result, id))
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/optimization.py:969, in SubgraphCallable.__call__(self, *args)
967 if not len(args) == len(self.inkeys):
968 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 969 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/core.py:149, in get(dsk, out, cache)
147 for key in toposort(dsk):
148 task = dsk[key]
--> 149 result = _execute_task(task, cache)
150 cache[key] = result
151 result = _execute_task(out, cache)
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/core.py:119, in <genexpr>(.0)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/utils.py:37, in apply(func, args, kwargs)
35 def apply(func, args, kwargs=None):
36 if kwargs:
---> 37 return func(*args, **kwargs)
38 else:
39 return func(*args)
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/dask/utils.py:1021, in methodcaller.__call__(self, _methodcaller__obj, *args, **kwargs)
1020 def __call__(self, __obj, *args, **kwargs):
-> 1021 return getattr(__obj, self.method)(*args, **kwargs)
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/pandas/core/generic.py:5815, in NDFrame.astype(self, dtype, copy, errors)
5808 results = [
5809 self.iloc[:, i].astype(dtype, copy=copy)
5810 for i in range(len(self.columns))
5811 ]
5813 else:
5814 # else, only a single dtype is given
-> 5815 new_data = self._mgr.astype(dtype=dtype, copy=copy, errors=errors)
5816 return self._constructor(new_data).__finalize__(self, method="astype")
5818 # GH 33113: handle empty frame or series
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/pandas/core/internals/managers.py:418, in BaseBlockManager.astype(self, dtype, copy, errors)
417 def astype(self: T, dtype, copy: bool = False, errors: str = "raise") -> T:
--> 418 return self.apply("astype", dtype=dtype, copy=copy, errors=errors)
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/pandas/core/internals/managers.py:327, in BaseBlockManager.apply(self, f, align_keys, ignore_failures, **kwargs)
325 applied = b.apply(f, **kwargs)
326 else:
--> 327 applied = getattr(b, f)(**kwargs)
328 except (TypeError, NotImplementedError):
329 if not ignore_failures:
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/pandas/core/internals/blocks.py:591, in Block.astype(self, dtype, copy, errors)
573 """
574 Coerce to the new dtype.
575
(...)
587 Block
588 """
589 values = self.values
--> 591 new_values = astype_array_safe(values, dtype, copy=copy, errors=errors)
593 new_values = maybe_coerce_values(new_values)
594 newb = self.make_block(new_values)
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/pandas/core/dtypes/cast.py:1309, in astype_array_safe(values, dtype, copy, errors)
1306 dtype = pandas_dtype(dtype)
1308 try:
-> 1309 new_values = astype_array(values, dtype, copy=copy)
1310 except (ValueError, TypeError):
1311 # e.g. astype_nansafe can fail on object-dtype of strings
1312 # trying to convert to float
1313 if errors == "ignore":
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/pandas/core/dtypes/cast.py:1257, in astype_array(values, dtype, copy)
1254 values = values.astype(dtype, copy=copy)
1256 else:
-> 1257 values = astype_nansafe(values, dtype, copy=copy)
1259 # in pandas we don't store numpy str dtypes, so convert to object
1260 if isinstance(dtype, np.dtype) and issubclass(values.dtype.type, str):
File ~/opt/miniconda3/envs/standard-coiled/lib/python3.9/site-packages/pandas/core/dtypes/cast.py:1201, in astype_nansafe(arr, dtype, copy, skipna)
1197 raise ValueError(msg)
1199 if copy or is_object_dtype(arr.dtype) or is_object_dtype(dtype):
1200 # Explicit copy, or required since NumPy can't view from / to object.
-> 1201 return arr.astype(dtype, copy=True)
1203 return arr.astype(dtype, copy=copy)
ValueError: could not convert string to float: 'bye'//]]>
astype raises errors when columns contain string values that cannot be converted to numbers. It doesn’t coerce string values to NaN.
to_numeric also has the same default behavior and this code will error out as well.
//ddf2["n1"] = dd.to_numeric(ddf["n1"])
print(ddf2.compute())//]]>
Here’s the error message: “ValueError: Unable to parse string "bye" at position 0”.
You need to set errors="coerce" to successfully invoke to_numeric.
//ddf2["n1"] = dd.to_numeric(ddf["n1"], errors="coerce")
print(ddf2.compute())
n1 n2
0 NaN 7.7
1 2.8 8.0
2 3.0 9.2//]]>
See this blog post for more information about astype.
Dask makes it easy to convert object columns into number columns with to_numeric.
to_numeric is customizable with different error behavior when values cannot be converted to numbers. You can coerce these values to NaN, raise an error, or ignore these values. Choose the behavior that works best for your application.
The Parquet file format is generally better than CSV for data analysis because it doesn’t allow for non-numeric data to get written in number columns in the first place. Messy data like this is the CSV file format’s fault in most cases. Whenever possible, convert your CSV data to Parquet files, so you don’t need to deal with these types of data inconsistencies. See here for benchmarks on why Parquet is better than CSV.