blob: 2218d88f0feb898fc0a85df36d1c079da1873e4a [file] [log] [blame]
---
title: "Working with Arrow Datasets and dplyr"
output: rmarkdown::html_vignette
vignette: >
%\VignetteIndexEntry{Working with Arrow Datasets and dplyr}
%\VignetteEngine{knitr::rmarkdown}
%\VignetteEncoding{UTF-8}
---
Apache Arrow lets you work efficiently with large, multi-file datasets.
The `arrow` R package provides a `dplyr` interface to Arrow Datasets,
as well as other tools for interactive exploration of Arrow data.
This vignette introduces Datasets and shows how to use `dplyr` to analyze them.
It describes both what is possible to do with Arrow now
and what is on the immediate development roadmap.
## Example: NYC taxi data
The [New York City taxi trip record data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)
is widely used in big data exercises and competitions.
For demonstration purposes, we have hosted a Parquet-formatted version
of about 10 years of the trip data in a public S3 bucket.
The total file size is around 37 gigabytes, even in the efficient Parquet file format.
That's bigger than memory on most people's computers,
so we can't just read it all in and stack it into a single data frame.
In a future release, you'll be able to point your R session at S3 and query
the dataset from there. For now, datasets need to be on your local file system.
To download the files,
```{r, eval = FALSE}
bucket <- "https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com"
dir.create("nyc-taxi")
for (year in 2009:2019) {
dir.create(file.path("nyc-taxi", year))
if (year == 2019) {
# We only have through June 2019 there
months <- 1:6
} else {
months <- 1:12
}
for (month in months) {
if (month < 10) {
month <- paste0("0", month)
}
dir.create(file.path("nyc-taxi", year, month))
download.file(
paste(bucket, year, month, "data.parquet", sep = "/"),
file.path("nyc-taxi", year, month, "data.parquet"),
mode = 'wb'
)
}
}
```
Note that the vignette will not execute that code chunk: if you want to run
with live data, you'll have to do it yourself separately.
Given the size, if you're running this locally and don't have a fast connection,
feel free to grab only a year or two of data.
If you don't have the taxi data downloaded, the vignette will still run and will
yield previously cached output for reference. To be explicit about which version
is running, let's check whether we're running with live data:
```{r}
dir.exists("nyc-taxi")
```
## Getting started
Because `dplyr` is not necessary for many Arrow workflows,
it is an optional (`Suggests`) dependency. So, to work with Datasets,
we need to load both `arrow` and `dplyr`.
```{r}
library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)
```
The first step is to create our Dataset object, pointing at the directory of data.
```{r, eval = file.exists("nyc-taxi")}
ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))
```
The default file format for `open_dataset()` is Parquet; if we had a directory
of Arrow format files, we could include `format = "arrow"` in the call.
Future versions will support more file formats, including CSV/delimited text data
and JSON.
The `partitioning` argument lets us specify how the file paths provide information
about how the dataset is chunked into different files. Our files in this example
have file paths like
```
2009/01/data.parquet
2009/02/data.parquet
...
```
By providing a character vector to `partitioning`, we're saying that the first
path segment gives the value for "year" and the second segment is "month".
Every row in `2009/01/data.parquet` has a value of 2009 for "year"
and 1 for "month", even though those columns may not actually be present in the file.
Indeed, when we look at the dataset, we see that in addition to the columns present
in every file, there are also columns "year" and "month".
```{r, eval = file.exists("nyc-taxi")}
ds
```
```{r, echo = FALSE, eval = !file.exists("nyc-taxi")}
cat("
## FileSystemDataset with 125 Parquet files
## vendor_id: string
## pickup_at: timestamp[us]
## dropoff_at: timestamp[us]
## passenger_count: int8
## trip_distance: float
## pickup_longitude: float
## pickup_latitude: float
## rate_code_id: string
## store_and_fwd_flag: string
## dropoff_longitude: float
## dropoff_latitude: float
## payment_type: string
## fare_amount: float
## extra: float
## mta_tax: float
## tip_amount: float
## tolls_amount: float
## total_amount: float
## improvement_surcharge: float
## pickup_location_id: int32
## dropoff_location_id: int32
## congestion_surcharge: float
## year: int32
## month: int32
See $metadata for additional Schema metadata
")
```
The other form of partitioning currently supported is [Hive](https://hive.apache.org/)-style,
in which the partition variable names are included in the path segments.
If we had saved our files in paths like
```
year=2009/month=01/data.parquet
year=2009/month=02/data.parquet
...
```
we would not have had to provide the names in `partitioning`:
we could have just called `ds <- open_dataset("nyc-taxi")` and the partitions
would have been detected automatically.
## Querying the dataset
Up to this point, we haven't loaded any data: we have walked directories to find
files, we've parsed file paths to identify partitions, and we've read the
headers of the Parquet files to inspect their schemas so that we can make sure
they all line up.
In the current release, `arrow` supports methods for selecting a window of data:
`select()`, `rename()`, and `filter()`. Aggregation is not yet supported,
nor is deriving or projecting new columns, so before you call `summarize()` or
`mutate()`, you'll need to `collect()` the data first,
which pulls your selected window of data into an in-memory R data frame.
While we could have made those methods `collect()` the data they needed
automatically and invisibly to the end user,
we thought it best to make it explicit when you're pulling data into memory
so that you can construct your queries most efficiently
and not be surprised when some query consumes way more resources than expected.
Here's an example. Suppose I was curious about tipping behavior among the
longest taxi rides. Let's find the median tip percentage for rides with
fares greater than $100 in 2015, broken down by the number of passengers:
```{r, eval = file.exists("nyc-taxi")}
system.time(ds %>%
filter(total_amount > 100, year == 2015) %>%
select(tip_amount, total_amount, passenger_count) %>%
group_by(passenger_count) %>%
collect() %>%
summarize(
tip_pct = median(100 * tip_amount / total_amount),
n = n()
) %>%
print())
```
```{r, echo = FALSE, eval = !file.exists("nyc-taxi")}
cat("
## # A tibble: 10 x 3
## passenger_count tip_pct n
## <int> <dbl> <int>
## 1 0 9.84 380
## 2 1 16.7 143087
## 3 2 16.6 34418
## 4 3 14.4 8922
## 5 4 11.4 4771
## 6 5 16.7 5806
## 7 6 16.7 3338
## 8 7 16.7 11
## 9 8 16.7 32
## 10 9 16.7 42
##
## user system elapsed
## 4.436 1.012 1.402
")
```
We just selected a window out of a dataset with around 2 billion rows
and aggregated on it in under 2 seconds on my laptop. How does this work?
First, `select()`/`rename()`, `filter()`, and `group_by()`
record their actions but don't evaluate on the data until you run `collect()`.
```{r, eval = file.exists("nyc-taxi")}
ds %>%
filter(total_amount > 100, year == 2015) %>%
select(tip_amount, total_amount, passenger_count) %>%
group_by(passenger_count)
```
```{r, echo = FALSE, eval = !file.exists("nyc-taxi")}
cat("
## FileSystemDataset (query)
## tip_amount: float
## total_amount: float
## passenger_count: int8
##
## * Filter: ((total_amount > 100:double) and (year == 2015:double))
## * Grouped by passenger_count
## See $.data for the source Arrow object
")
```
This returns instantly and shows the window selection you've made, without
loading data from the files. Because the evaluation of these queries is deferred,
you can build up a query that selects down to a small window without generating
intermediate datasets that would potentially be large.
Second, all work is pushed down to the individual data files,
and depending on the file format, chunks of data within the files. As a result,
we can select a window of data from a much larger dataset by collecting the
smaller slices from each file--we don't have to load the whole dataset in memory
in order to slice from it.
Third, because of partitioning, we can ignore some files entirely.
In this example, by filtering `year == 2015`, all files corresponding to other years
are immediately excluded: we don't have to load them in order to find that no
rows match the filter. Relatedly, since Parquet files contain row groups with
statistics on the data within, there may be entire chunks of data we can
avoid scanning because they have no rows where `total_amount > 100`.
## Going farther
There are a few ways you can control the Dataset creation to adapt to special use cases.
For one, you can specify a `schema` argument to declare the columns and their data types.
This is useful if you have data files that have different storage schema
(for example, a column could be `int32` in one and `int8` in another)
and you want to ensure that the resulting Dataset has a specific type.
To be clear, it's not necessary to specify a schema, even in this example of
mixed integer types, because the Dataset constructor will reconcile differences like these.
The schema specification just lets you declare what you want the result to be.
Similarly, you can provide a Schema in the `partitioning` argument of `open_dataset()`
in order to declare the types of the virtual columns that define the partitions.
This would be useful, in our taxi dataset example, if you wanted to keep
"month" as a string instead of an integer for some reason.
Another feature of Datasets is that they can be composed of multiple data sources.
That is, you may have a directory of partitioned Parquet files in one location,
and in another directory, files that haven't been partitioned.
In the future, when there is support for cloud storage and other file formats,
this would mean you could point to an S3 bucked of Parquet data and a directory
of CSVs on the local file system and query them together as a single dataset.
To create a multi-source dataset, provide a list of datasets to `open_dataset()`
instead of a file path, or simply concatenate them like `big_dataset <- c(ds1, ds2)`.