| --- |
| title: "Working with Arrow Datasets and dplyr" |
| output: rmarkdown::html_vignette |
| vignette: > |
| %\VignetteIndexEntry{Working with Arrow Datasets and dplyr} |
| %\VignetteEngine{knitr::rmarkdown} |
| %\VignetteEncoding{UTF-8} |
| --- |
| |
| Apache Arrow lets you work efficiently with large, multi-file datasets. |
| The `arrow` R package provides a `dplyr` interface to Arrow Datasets, |
| as well as other tools for interactive exploration of Arrow data. |
| |
| This vignette introduces Datasets and shows how to use `dplyr` to analyze them. |
| It describes both what is possible to do with Arrow now |
| and what is on the immediate development roadmap. |
| |
| ## Example: NYC taxi data |
| |
| The [New York City taxi trip record data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page) |
| is widely used in big data exercises and competitions. |
| For demonstration purposes, we have hosted a Parquet-formatted version |
| of about 10 years of the trip data in a public S3 bucket. |
| |
| The total file size is around 37 gigabytes, even in the efficient Parquet file format. |
| That's bigger than memory on most people's computers, |
| so we can't just read it all in and stack it into a single data frame. |
| |
| In a future release, you'll be able to point your R session at S3 and query |
| the dataset from there. For now, datasets need to be on your local file system. |
| To download the files, |
| |
| ```{r, eval = FALSE} |
| bucket <- "https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com" |
| dir.create("nyc-taxi") |
| for (year in 2009:2019) { |
| dir.create(file.path("nyc-taxi", year)) |
| if (year == 2019) { |
| # We only have through June 2019 there |
| months <- 1:6 |
| } else { |
| months <- 1:12 |
| } |
| for (month in months) { |
| if (month < 10) { |
| month <- paste0("0", month) |
| } |
| dir.create(file.path("nyc-taxi", year, month)) |
| download.file( |
| paste(bucket, year, month, "data.parquet", sep = "/"), |
| file.path("nyc-taxi", year, month, "data.parquet"), |
| mode = 'wb' |
| ) |
| } |
| } |
| ``` |
| |
| Note that the vignette will not execute that code chunk: if you want to run |
| with live data, you'll have to do it yourself separately. |
| Given the size, if you're running this locally and don't have a fast connection, |
| feel free to grab only a year or two of data. |
| |
| If you don't have the taxi data downloaded, the vignette will still run and will |
| yield previously cached output for reference. To be explicit about which version |
| is running, let's check whether we're running with live data: |
| |
| ```{r} |
| dir.exists("nyc-taxi") |
| ``` |
| |
| ## Getting started |
| |
| Because `dplyr` is not necessary for many Arrow workflows, |
| it is an optional (`Suggests`) dependency. So, to work with Datasets, |
| we need to load both `arrow` and `dplyr`. |
| |
| ```{r} |
| library(arrow, warn.conflicts = FALSE) |
| library(dplyr, warn.conflicts = FALSE) |
| ``` |
| |
| The first step is to create our Dataset object, pointing at the directory of data. |
| |
| ```{r, eval = file.exists("nyc-taxi")} |
| ds <- open_dataset("nyc-taxi", partitioning = c("year", "month")) |
| ``` |
| |
| The default file format for `open_dataset()` is Parquet; if we had a directory |
| of Arrow format files, we could include `format = "arrow"` in the call. |
| Future versions will support more file formats, including CSV/delimited text data |
| and JSON. |
| |
| The `partitioning` argument lets us specify how the file paths provide information |
| about how the dataset is chunked into different files. Our files in this example |
| have file paths like |
| |
| ``` |
| 2009/01/data.parquet |
| 2009/02/data.parquet |
| ... |
| ``` |
| |
| By providing a character vector to `partitioning`, we're saying that the first |
| path segment gives the value for "year" and the second segment is "month". |
| Every row in `2009/01/data.parquet` has a value of 2009 for "year" |
| and 1 for "month", even though those columns may not actually be present in the file. |
| |
| Indeed, when we look at the dataset, we see that in addition to the columns present |
| in every file, there are also columns "year" and "month". |
| |
| ```{r, eval = file.exists("nyc-taxi")} |
| ds |
| ``` |
| ```{r, echo = FALSE, eval = !file.exists("nyc-taxi")} |
| cat(" |
| ## FileSystemDataset with 125 Parquet files |
| ## vendor_id: string |
| ## pickup_at: timestamp[us] |
| ## dropoff_at: timestamp[us] |
| ## passenger_count: int8 |
| ## trip_distance: float |
| ## pickup_longitude: float |
| ## pickup_latitude: float |
| ## rate_code_id: string |
| ## store_and_fwd_flag: string |
| ## dropoff_longitude: float |
| ## dropoff_latitude: float |
| ## payment_type: string |
| ## fare_amount: float |
| ## extra: float |
| ## mta_tax: float |
| ## tip_amount: float |
| ## tolls_amount: float |
| ## total_amount: float |
| ## improvement_surcharge: float |
| ## pickup_location_id: int32 |
| ## dropoff_location_id: int32 |
| ## congestion_surcharge: float |
| ## year: int32 |
| ## month: int32 |
| |
| See $metadata for additional Schema metadata |
| ") |
| ``` |
| |
| The other form of partitioning currently supported is [Hive](https://hive.apache.org/)-style, |
| in which the partition variable names are included in the path segments. |
| If we had saved our files in paths like |
| |
| ``` |
| year=2009/month=01/data.parquet |
| year=2009/month=02/data.parquet |
| ... |
| ``` |
| |
| we would not have had to provide the names in `partitioning`: |
| we could have just called `ds <- open_dataset("nyc-taxi")` and the partitions |
| would have been detected automatically. |
| |
| ## Querying the dataset |
| |
| Up to this point, we haven't loaded any data: we have walked directories to find |
| files, we've parsed file paths to identify partitions, and we've read the |
| headers of the Parquet files to inspect their schemas so that we can make sure |
| they all line up. |
| |
| In the current release, `arrow` supports methods for selecting a window of data: |
| `select()`, `rename()`, and `filter()`. Aggregation is not yet supported, |
| nor is deriving or projecting new columns, so before you call `summarize()` or |
| `mutate()`, you'll need to `collect()` the data first, |
| which pulls your selected window of data into an in-memory R data frame. |
| While we could have made those methods `collect()` the data they needed |
| automatically and invisibly to the end user, |
| we thought it best to make it explicit when you're pulling data into memory |
| so that you can construct your queries most efficiently |
| and not be surprised when some query consumes way more resources than expected. |
| |
| Here's an example. Suppose I was curious about tipping behavior among the |
| longest taxi rides. Let's find the median tip percentage for rides with |
| fares greater than $100 in 2015, broken down by the number of passengers: |
| |
| ```{r, eval = file.exists("nyc-taxi")} |
| system.time(ds %>% |
| filter(total_amount > 100, year == 2015) %>% |
| select(tip_amount, total_amount, passenger_count) %>% |
| group_by(passenger_count) %>% |
| collect() %>% |
| summarize( |
| tip_pct = median(100 * tip_amount / total_amount), |
| n = n() |
| ) %>% |
| print()) |
| ``` |
| |
| ```{r, echo = FALSE, eval = !file.exists("nyc-taxi")} |
| cat(" |
| ## # A tibble: 10 x 3 |
| ## passenger_count tip_pct n |
| ## <int> <dbl> <int> |
| ## 1 0 9.84 380 |
| ## 2 1 16.7 143087 |
| ## 3 2 16.6 34418 |
| ## 4 3 14.4 8922 |
| ## 5 4 11.4 4771 |
| ## 6 5 16.7 5806 |
| ## 7 6 16.7 3338 |
| ## 8 7 16.7 11 |
| ## 9 8 16.7 32 |
| ## 10 9 16.7 42 |
| ## |
| ## user system elapsed |
| ## 4.436 1.012 1.402 |
| ") |
| ``` |
| |
| We just selected a window out of a dataset with around 2 billion rows |
| and aggregated on it in under 2 seconds on my laptop. How does this work? |
| |
| First, `select()`/`rename()`, `filter()`, and `group_by()` |
| record their actions but don't evaluate on the data until you run `collect()`. |
| |
| ```{r, eval = file.exists("nyc-taxi")} |
| ds %>% |
| filter(total_amount > 100, year == 2015) %>% |
| select(tip_amount, total_amount, passenger_count) %>% |
| group_by(passenger_count) |
| ``` |
| |
| ```{r, echo = FALSE, eval = !file.exists("nyc-taxi")} |
| cat(" |
| ## FileSystemDataset (query) |
| ## tip_amount: float |
| ## total_amount: float |
| ## passenger_count: int8 |
| ## |
| ## * Filter: ((total_amount > 100:double) and (year == 2015:double)) |
| ## * Grouped by passenger_count |
| ## See $.data for the source Arrow object |
| ") |
| ``` |
| |
| This returns instantly and shows the window selection you've made, without |
| loading data from the files. Because the evaluation of these queries is deferred, |
| you can build up a query that selects down to a small window without generating |
| intermediate datasets that would potentially be large. |
| |
| Second, all work is pushed down to the individual data files, |
| and depending on the file format, chunks of data within the files. As a result, |
| we can select a window of data from a much larger dataset by collecting the |
| smaller slices from each file--we don't have to load the whole dataset in memory |
| in order to slice from it. |
| |
| Third, because of partitioning, we can ignore some files entirely. |
| In this example, by filtering `year == 2015`, all files corresponding to other years |
| are immediately excluded: we don't have to load them in order to find that no |
| rows match the filter. Relatedly, since Parquet files contain row groups with |
| statistics on the data within, there may be entire chunks of data we can |
| avoid scanning because they have no rows where `total_amount > 100`. |
| |
| ## Going farther |
| |
| There are a few ways you can control the Dataset creation to adapt to special use cases. |
| For one, you can specify a `schema` argument to declare the columns and their data types. |
| This is useful if you have data files that have different storage schema |
| (for example, a column could be `int32` in one and `int8` in another) |
| and you want to ensure that the resulting Dataset has a specific type. |
| To be clear, it's not necessary to specify a schema, even in this example of |
| mixed integer types, because the Dataset constructor will reconcile differences like these. |
| The schema specification just lets you declare what you want the result to be. |
| |
| Similarly, you can provide a Schema in the `partitioning` argument of `open_dataset()` |
| in order to declare the types of the virtual columns that define the partitions. |
| This would be useful, in our taxi dataset example, if you wanted to keep |
| "month" as a string instead of an integer for some reason. |
| |
| Another feature of Datasets is that they can be composed of multiple data sources. |
| That is, you may have a directory of partitioned Parquet files in one location, |
| and in another directory, files that haven't been partitioned. |
| In the future, when there is support for cloud storage and other file formats, |
| this would mean you could point to an S3 bucked of Parquet data and a directory |
| of CSVs on the local file system and query them together as a single dataset. |
| To create a multi-source dataset, provide a list of datasets to `open_dataset()` |
| instead of a file path, or simply concatenate them like `big_dataset <- c(ds1, ds2)`. |