Reading and Writing Text Files with Dask

This blog post explains how to read and write text files with Dask.

Dask is a great technology for parsing large text files because it divides the data into partitions that can be processed in parallel.  This allows Dask to process text files that are larger than memory and process data in parallel.  Dask is a scalable and fast way to work with text data.

Dask Bag: read text files with read_text

This section shows you how to read a text file into a Dask Bag and run some simple computations.

Suppose you have the following four lines of text in the animals.txt file.

dog
cat
snake
mouse//]]>

Here’s how to read this data into a Dask Bag with two partitions using read_text.

import dask.bag as db

b = db.read_text("animals.txt").map(lambda x: x.strip()).repartition(2)//]]>

You can collect the Dask Bag into a Python list to view the contents.

b.compute # ['dog', 'cat', 'snake', 'mouse']//]]>

Dask Bags are like Python lists that separate data into partitions, which allows for parallel execution and scalable computing.

You can easily convert Dask Bags into Dask DataFrames, another Dask collection that partitions data and supports parallel execution.

Dask convert Bag to DataFrame with to_dataframe()

Let’s convert the Dask Bag to a Dask DataFrame, so we can process this text file like a tabular dataset.

ddf = b.to_dataframe(columns=["animal_type"])//]]>

Let’s inspect the contents of the Dask DataFrame.

ddf.compute()

 animal_type
0         dog
1         cat
0       snake
1       mouse//]]>

Let’s add a column to the Dask DataFrame that contains the length of the animal_type string.

ddf["len_animal_type"] = ddf["animal_type"].apply(
   lambda x: len(x), meta=("animal_type", "int64")
)//]]>

Let’s inspect the contents of the Dask DataFrame.

ddf.compute()

 animal_type  len_animal_type
0         dog                3
1         cat                3
0       snake                5
1       mouse                5//]]>

Dask write Bag to text files with to_textfiles

This section shows how to run some simple operations on the data that’s in the Dask Bag and then write it out to text files.

Let’s run an operation that’ll compute the length of each string in the bag.

animals_length = b.map(lambda x: str(len(x)))//]]>

Run compute() to collect the data in a Python list and view the result.

animals_length.compute() # ['3', '3', '5', '5']//]]>

Mapping over a Dask Bag is similar to mapping over a regular Python list.

Now write out the animals_length bag to text files.

animals_length.to_textfiles("animals_length")//]]>

Here are the files that get written out to disk.

animals_length/
 0.part
 1.part//]]>

Here are the contents of 0.part:

3
3//]]>

Here are the contents of 1.part:

5
5//]]>

Dask outputs data to multiple files so the write operation can be executed in parallel.  It’s faster and more scalable to write to multiple files simultaneously.

Dask read JSON to Bag

JSON files contain text data that’s also convenient to read and process with the Dask Bag API.

The Dask Bag API makes it easy to process JSON data at scale with familiar Python functions.

Let’s look at how we can use Dask to parallelize the json.loads function which parses JSON strings and converts them to Python dictionaries.

Let’s look at the following pets1.json file with five rows of data.

{"name":"fluffy","age":5,"species":"cat"}
{"name":"fido","age":2,"species":"dog"}
{"name":"harvey","age":8,"species":"cat"}
{"name":"chunkers","age":9,"species":"cat"}
{"name":"lola","age":1,"species":"bird"}//]]>

Here’s how to read the JSON file into a Dask Bag and convert all the JSON strings to Python dictionaries.

import json

pets = db.read_text("pets1.json")

lines = pets.map(json.loads)//]]>

Here is how to fetch all the cats from the bag.

cats = lines.filter(lambda d: d["species"] == "cat")//]]>

You can grab all of the cat names from the bag and collect the results in a Python list.

cats.map(lambda d: d["name"]).compute() # ['fluffy', 'harvey', 'chunkers']//]]>

This code scales to larger than memory sized datasets and data that’s stored in multiple JSON files.  Dask is powerful because it’s scalable.

Dask read_text with large dataset

The coiled-datasets bucket contains a large dataset of JSON data from the GitHub archives dataset, specifically all the push events from 2015.

Let’s use Coiled to provision a Dask cluster and read all that data into a Dask Bag.

Start by spinning up a cluster:

import coiled
import dask.bag as db
from dask.distributed import Client
import json

cluster = coiled.Cluster(
   name="github-json",
   software="matthew-powers/my_env",
   n_workers=5,
)

client = Client(cluster)//]]>

Now read all the data into a bag:

rows = db.read_text(
   "s3://coiled-datasets/github-archive/github-archive-2015-json/*.json.gz",
   compression="gzip"
).map(json.loads)//]]>

Run a query to show the first few rows of data from the GitHub user named MrPowers (the author of this post):

rows.filter(lambda d: d["user"] == "MrPowers").compute()

[{'user': 'MrPowers',
 'repo': 'medivo/megaphone',
 'created_at': '2015-01-16T18:55:46Z',
 'message': "Delete Campfire rake tasks that are no longer needed because we're using Slack",
 'author': 'Matthew Powers'},
{'user': 'MrPowers',
 'repo': 'MrPowers/code_quizzer',
 'created_at': '2015-01-20T02:45:52Z',
 'message': 'Add a slug attribute to the Family model so additional pages can be added.  This change will let me add a separate learning Spanish page',
 'author': 'Matthew Powers'},
{'user': 'MrPowers',
 'repo': 'MrPowers/tic_tac_toe',
 'created_at': '2015-01-21T10:55:38Z',
 'message': 'fix #get_move spec description',
 'author': 'Dominik Stodolny'},
 …//]]>

Now run a query to show the top 5 repositories with the most GitHub push events in 2015:

rows.map(lambda d: d["repo"]).frequencies(sort=True).take(5)

(('sakai-mirror/melete', 1919848),
('sakai-mirror/ambrosia', 653320),
('sakai-mirror/mneme', 653281),
('KenanSulayman/heartbeat', 486214),
('JonathonSonesen/PrivacyChanges_Public', 311199),
…//]]>

Take a look at the repo’s that MrPowers had the most push events in 2015:

rows.filter(lambda d: d["user"] == "MrPowers").map(lambda d: d["repo"]).frequencies(sort=True).compute()

[('MrPowers/code_quizzer', 96),
('MrPowers/song_quizzer', 85),
('MrPowers/frontend-generators', 55),
('MrPowers/slack_trello', 54),
('medivo/slack-responder', 41),
…//]]>

Dask Bags make it easy to query large JSON datasets at scale.

Conclusion

Dask makes it easy to read and write text files in parallel.

For Python users, Dask is great because you can easily work with large text datasets and use Python libraries that you’re already familiar with, like json.  You don’t need to learn how to use a different JSON library that’s specialized for some big data framework.

Text files aren’t usually the most efficient for big data analyses, so you may want to consider converting text to a file that’s optimized for big data analyses, like Parquet.  See this blog post on converting JSON data to Parquet for a real-world example.

Level up your Dask using Coiled

Coiled makes it easy to scale Dask maturely in the cloud