blob: 32389b951620247144e7b0348e382a51be126910 [file] [log] [blame]
---
title: "Working with Arrow Datasets and dplyr"
output: rmarkdown::html_vignette
vignette: >
%\VignetteIndexEntry{Working with Arrow Datasets and dplyr}
%\VignetteEngine{knitr::rmarkdown}
%\VignetteEncoding{UTF-8}
---
Apache Arrow lets you work efficiently with large, multi-file datasets.
The `arrow` R package provides a `dplyr` interface to Arrow Datasets,
as well as other tools for interactive exploration of Arrow data.
This vignette introduces Datasets and shows how to use `dplyr` to analyze them.
It describes both what is possible to do with Arrow now
and what is on the immediate development roadmap.
## Example: NYC taxi data
The [New York City taxi trip record data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)
is widely used in big data exercises and competitions.
For demonstration purposes, we have hosted a Parquet-formatted version
of about 10 years of the trip data in a public AWS S3 bucket.
The total file size is around 37 gigabytes, even in the efficient Parquet file format.
That's bigger than memory on most people's computers,
so we can't just read it all in and stack it into a single data frame.
In Windows and macOS binary packages, S3 support is included.
On Linux when installing from source, S3 support is not enabled by default,
and it has additional system requirements.
See `vignette("install", package = "arrow")` for details.
To see if your `arrow` installation has S3 support, run
```{r}
arrow::arrow_with_s3()
```
Even with S3 support enabled network, speed will be a bottleneck unless your
machine is located in the same AWS region as the data. So, for this vignette,
we assume that the NYC taxi dataset has been downloaded locally in a "nyc-taxi"
directory.
If your `arrow` build has S3 support, you can sync the data locally with:
```{r, eval = FALSE}
arrow::copy_files("s3://ursa-labs-taxi-data", "nyc-taxi")
```
If your `arrow` build doesn't have S3 support, you can download the files
with some additional code:
```{r, eval = FALSE}
bucket <- "https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com"
for (year in 2009:2019) {
if (year == 2019) {
# We only have through June 2019 there
months <- 1:6
} else {
months <- 1:12
}
for (month in sprintf("%02d", months)) {
dir.create(file.path("nyc-taxi", year, month), recursive = TRUE)
try(download.file(
paste(bucket, year, month, "data.parquet", sep = "/"),
file.path("nyc-taxi", year, month, "data.parquet"),
mode = "wb"
), silent = TRUE)
}
}
```
Note that these download steps in the vignette are not executed: if you want to run
with live data, you'll have to do it yourself separately.
Given the size, if you're running this locally and don't have a fast connection,
feel free to grab only a year or two of data.
If you don't have the taxi data downloaded, the vignette will still run and will
yield previously cached output for reference. To be explicit about which version
is running, let's check whether we're running with live data:
```{r}
dir.exists("nyc-taxi")
```
## Getting started
Because `dplyr` is not necessary for many Arrow workflows,
it is an optional (`Suggests`) dependency. So, to work with Datasets,
we need to load both `arrow` and `dplyr`.
```{r}
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)
```
The first step is to create our Dataset object, pointing at the directory of data.
```{r, eval = file.exists("nyc-taxi")}
ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))
```
The default file format for `open_dataset()` is Parquet; if we had a directory
of Arrow format files, we could include `format = "arrow"` in the call.
Other supported formats include: "feather" (an alias for "arrow", as Feather v2
is the Arrow file format), "csv", "tsv" (for tab-delimited), and "text" for
generic text-delimited files. For text files, you can pass any parsing options
("delim", "quote", etc.) to `open_dataset()` that you would otherwise pass to
`read_csv_arrow()`.
The `partitioning` argument lets us specify how the file paths provide information
about how the dataset is chunked into different files. Our files in this example
have file paths like
```
2009/01/data.parquet
2009/02/data.parquet
...
```
By providing a character vector to `partitioning`, we're saying that the first
path segment gives the value for "year" and the second segment is "month".
Every row in `2009/01/data.parquet` has a value of 2009 for "year"
and 1 for "month", even though those columns may not actually be present in the file.
Indeed, when we look at the dataset, we see that in addition to the columns present
in every file, there are also columns "year" and "month".
```{r, eval = file.exists("nyc-taxi")}
ds
```
```{r, echo = FALSE, eval = !file.exists("nyc-taxi")}
cat("
FileSystemDataset with 125 Parquet files
vendor_id: string
pickup_at: timestamp[us]
dropoff_at: timestamp[us]
passenger_count: int8
trip_distance: float
pickup_longitude: float
pickup_latitude: float
rate_code_id: string
store_and_fwd_flag: string
dropoff_longitude: float
dropoff_latitude: float
payment_type: string
fare_amount: float
extra: float
mta_tax: float
tip_amount: float
tolls_amount: float
total_amount: float
improvement_surcharge: float
pickup_location_id: int32
dropoff_location_id: int32
congestion_surcharge: float
year: int32
month: int32
See $metadata for additional Schema metadata
")
```
The other form of partitioning currently supported is [Hive](https://hive.apache.org/)-style,
in which the partition variable names are included in the path segments.
If we had saved our files in paths like
```
year=2009/month=01/data.parquet
year=2009/month=02/data.parquet
...
```
we would not have had to provide the names in `partitioning`:
we could have just called `ds <- open_dataset("nyc-taxi")` and the partitions
would have been detected automatically.
## Querying the dataset
Up to this point, we haven't loaded any data: we have walked directories to find
files, we've parsed file paths to identify partitions, and we've read the
headers of the Parquet files to inspect their schemas so that we can make sure
they all line up.
In the current release, `arrow` supports methods for selecting a window of data:
`select()`, `rename()`, and `filter()`. Aggregation is not yet supported,
nor is deriving or projecting new columns, so before you call `summarize()` or
`mutate()`, you'll need to `collect()` the data first,
which pulls your selected window of data into an in-memory R data frame.
While we could have made those methods `collect()` the data they needed
automatically and invisibly to the end user,
we thought it best to make it explicit when you're pulling data into memory
so that you can construct your queries most efficiently
and not be surprised when some query consumes way more resources than expected.
Here's an example. Suppose I was curious about tipping behavior among the
longest taxi rides. Let's find the median tip percentage for rides with
fares greater than $100 in 2015, broken down by the number of passengers:
```{r, eval = file.exists("nyc-taxi")}
system.time(ds %>%
filter(total_amount > 100, year == 2015) %>%
select(tip_amount, total_amount, passenger_count) %>%
group_by(passenger_count) %>%
collect() %>%
summarize(
tip_pct = median(100 * tip_amount / total_amount),
n = n()
) %>%
print())
```
```{r, echo = FALSE, eval = !file.exists("nyc-taxi")}
cat("
# A tibble: 10 x 3
passenger_count tip_pct n
<int> <dbl> <int>
1 0 9.84 380
2 1 16.7 143087
3 2 16.6 34418
4 3 14.4 8922
5 4 11.4 4771
6 5 16.7 5806
7 6 16.7 3338
8 7 16.7 11
9 8 16.7 32
10 9 16.7 42
user system elapsed
4.436 1.012 1.402
")
```
We just selected a window out of a dataset with around 2 billion rows
and aggregated on it in under 2 seconds on my laptop. How does this work?
First, `select()`/`rename()`, `filter()`, and `group_by()`
record their actions but don't evaluate on the data until you run `collect()`.
```{r, eval = file.exists("nyc-taxi")}
ds %>%
filter(total_amount > 100, year == 2015) %>%
select(tip_amount, total_amount, passenger_count) %>%
group_by(passenger_count)
```
```{r, echo = FALSE, eval = !file.exists("nyc-taxi")}
cat("
FileSystemDataset (query)
tip_amount: float
total_amount: float
passenger_count: int8
* Filter: ((total_amount > 100:double) and (year == 2015:double))
* Grouped by passenger_count
See $.data for the source Arrow object
")
```
This returns instantly and shows the window selection you've made, without
loading data from the files. Because the evaluation of these queries is deferred,
you can build up a query that selects down to a small window without generating
intermediate datasets that would potentially be large.
Second, all work is pushed down to the individual data files,
and depending on the file format, chunks of data within the files. As a result,
we can select a window of data from a much larger dataset by collecting the
smaller slices from each file--we don't have to load the whole dataset in memory
in order to slice from it.
Third, because of partitioning, we can ignore some files entirely.
In this example, by filtering `year == 2015`, all files corresponding to other years
are immediately excluded: we don't have to load them in order to find that no
rows match the filter. Relatedly, since Parquet files contain row groups with
statistics on the data within, there may be entire chunks of data we can
avoid scanning because they have no rows where `total_amount > 100`.
## More dataset options
There are a few ways you can control the Dataset creation to adapt to special use cases.
For one, you can specify a `schema` argument to declare the columns and their data types.
This is useful if you have data files that have different storage schema
(for example, a column could be `int32` in one and `int8` in another)
and you want to ensure that the resulting Dataset has a specific type.
To be clear, it's not necessary to specify a schema, even in this example of
mixed integer types, because the Dataset constructor will reconcile differences like these.
The schema specification just lets you declare what you want the result to be.
Similarly, you can provide a Schema in the `partitioning` argument of `open_dataset()`
in order to declare the types of the virtual columns that define the partitions.
This would be useful, in our taxi dataset example, if you wanted to keep
"month" as a string instead of an integer for some reason.
Another feature of Datasets is that they can be composed of multiple data sources.
That is, you may have a directory of partitioned Parquet files in one location,
and in another directory, files that haven't been partitioned.
Or, you could point to an S3 bucket of Parquet data and a directory
of CSVs on the local file system and query them together as a single dataset.
To create a multi-source dataset, provide a list of datasets to `open_dataset()`
instead of a file path, or simply concatenate them like `big_dataset <- c(ds1, ds2)`.
## Writing datasets
As you can see, querying a large dataset can be made quite fast by storage in an
efficient binary columnar format like Parquet or Feather and partitioning based on
columns commonly used for filtering. However, we don't always get our data delivered
to us that way. Sometimes we start with one giant CSV. Our first step in analyzing data
is cleaning is up and reshaping it into a more usable form.
The `write_dataset()` function allows you to take a Dataset or other tabular data object---an Arrow `Table` or `RecordBatch`, or an R `data.frame`---and write it to a different file format, partitioned into multiple files.
Assume we have a version of the NYC Taxi data as CSV:
```r
ds <- open_dataset("nyc-taxi/csv/", format = "csv")
```
We can write it to a new location and translate the files to the Feather format
by calling `write_dataset()` on it:
```r
write_dataset(ds, "nyc-taxi/feather", format = "feather")
```
Next, let's imagine that the "payment_type" column is something we often filter on,
so we want to partition the data by that variable. By doing so we ensure that a filter like
`payment_type == 3` will touch only a subset of files where payment_type is always 3.
One natural way to express the columns you want to partition on is to use the
`group_by()` method:
```r
ds %>%
group_by(payment_type) %>%
write_dataset("nyc-taxi/feather", format = "feather")
```
This will write files to a directory tree that looks like this:
```r
system("tree nyc-taxi/feather")
# feather
# ├── payment_type=1
# │ └── part-5.feather
# ├── payment_type=2
# │ └── part-0.feather
# ...
# └── payment_type=5
# └── part-2.feather
#
# 5 directories, 25 files
```
Note that the directory names are `payment_type=1` and similar:
this is the Hive-style partitioning described above. This means that when
we call `open_dataset()` on this directory, we don't have to declare what the
partitions are because they can be read from the file paths.
(To instead write bare values for partition segments,
i.e. `1` rather than `payment_type=1`, call `write_dataset()` with `hive_style = FALSE`.)
Perhaps, though, `payment_type == 3` is the only data we ever care about,
and we just want to drop the rest and have a smaller working set.
For this, we can `filter()` them out when writing:
```r
ds %>%
filter(payment_type == 3) %>%
write_dataset("nyc-taxi/feather", format = "feather")
```
The other thing we can do when writing datasets is select a subset of and/or reorder
columns. Suppose we never care about `vendor_id`, and being a string column,
it can take up a lot of space when we read it in, so let's drop it:
```r
ds %>%
group_by(payment_type) %>%
select(-vendor_id) %>%
write_dataset("nyc-taxi/feather", format = "feather")
```
Note that while you can select a subset of columns,
you cannot currently rename columns when writing.