| --- |
| title: "Working with Arrow Datasets and dplyr" |
| output: rmarkdown::html_vignette |
| vignette: > |
| %\VignetteIndexEntry{Working with Arrow Datasets and dplyr} |
| %\VignetteEngine{knitr::rmarkdown} |
| %\VignetteEncoding{UTF-8} |
| --- |
| |
| Apache Arrow lets you work efficiently with large, multi-file datasets. |
| The `arrow` R package provides a `dplyr` interface to Arrow Datasets, |
| as well as other tools for interactive exploration of Arrow data. |
| |
| This vignette introduces Datasets and shows how to use `dplyr` to analyze them. |
| It describes both what is possible to do with Arrow now |
| and what is on the immediate development roadmap. |
| |
| ## Example: NYC taxi data |
| |
| The [New York City taxi trip record data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page) |
| is widely used in big data exercises and competitions. |
| For demonstration purposes, we have hosted a Parquet-formatted version |
| of about 10 years of the trip data in a public Amazon S3 bucket. |
| |
| The total file size is around 37 gigabytes, even in the efficient Parquet file |
| format. That's bigger than memory on most people's computers, so we can't just |
| read it all in and stack it into a single data frame. |
| |
| In Windows and macOS binary packages, S3 support is included. |
| On Linux when installing from source, S3 support is not enabled by default, |
| and it has additional system requirements. |
| See `vignette("install", package = "arrow")` for details. |
| To see if your `arrow` installation has S3 support, run |
| |
| ```{r} |
| arrow::arrow_with_s3() |
| ``` |
| |
| Even with S3 support enabled network, speed will be a bottleneck unless your |
| machine is located in the same AWS region as the data. So, for this vignette, |
| we assume that the NYC taxi dataset has been downloaded locally in a "nyc-taxi" |
| directory. |
| |
| If your `arrow` build has S3 support, you can sync the data locally with: |
| |
| ```{r, eval = FALSE} |
| arrow::copy_files("s3://ursa-labs-taxi-data", "nyc-taxi") |
| ``` |
| |
| If your `arrow` build doesn't have S3 support, you can download the files |
| with some additional code: |
| |
| ```{r, eval = FALSE} |
| bucket <- "https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com" |
| for (year in 2009:2019) { |
| if (year == 2019) { |
| # We only have through June 2019 there |
| months <- 1:6 |
| } else { |
| months <- 1:12 |
| } |
| for (month in sprintf("%02d", months)) { |
| dir.create(file.path("nyc-taxi", year, month), recursive = TRUE) |
| try(download.file( |
| paste(bucket, year, month, "data.parquet", sep = "/"), |
| file.path("nyc-taxi", year, month, "data.parquet"), |
| mode = "wb" |
| ), silent = TRUE) |
| } |
| } |
| ``` |
| |
| Note that these download steps in the vignette are not executed: if you want to run |
| with live data, you'll have to do it yourself separately. |
| Given the size, if you're running this locally and don't have a fast connection, |
| feel free to grab only a year or two of data. |
| |
| If you don't have the taxi data downloaded, the vignette will still run and will |
| yield previously cached output for reference. To be explicit about which version |
| is running, let's check whether we're running with live data: |
| |
| ```{r} |
| dir.exists("nyc-taxi") |
| ``` |
| |
| ## Getting started |
| |
| Because `dplyr` is not necessary for many Arrow workflows, |
| it is an optional (`Suggests`) dependency. So, to work with Datasets, |
| we need to load both `arrow` and `dplyr`. |
| |
| ```{r} |
| library(arrow, warn.conflicts = FALSE) |
| library(dplyr, warn.conflicts = FALSE) |
| ``` |
| |
| The first step is to create our Dataset object, pointing at the directory of data. |
| |
| ```{r, eval = file.exists("nyc-taxi")} |
| ds <- open_dataset("nyc-taxi", partitioning = c("year", "month")) |
| ``` |
| |
| The default file format for `open_dataset()` is Parquet; if we had a directory |
| of Arrow format files, we could include `format = "arrow"` in the call. |
| Other supported formats include: `"feather"` (an alias for `"arrow"`, as Feather |
| v2 is the Arrow file format), `"csv"`, `"tsv"` (for tab-delimited), and `"text"` |
| for generic text-delimited files. For text files, you can pass any parsing |
| options (`delim`, `quote`, etc.) to `open_dataset()` that you would otherwise |
| pass to `read_csv_arrow()`. |
| |
| The `partitioning` argument lets us specify how the file paths provide information |
| about how the dataset is chunked into different files. Our files in this example |
| have file paths like |
| |
| ``` |
| 2009/01/data.parquet |
| 2009/02/data.parquet |
| ... |
| ``` |
| |
| By providing a character vector to `partitioning`, we're saying that the first |
| path segment gives the value for `year` and the second segment is `month`. |
| Every row in `2009/01/data.parquet` has a value of 2009 for `year` |
| and 1 for `month`, even though those columns may not actually be present in the file. |
| |
| Indeed, when we look at the dataset, we see that in addition to the columns present |
| in every file, there are also columns `year` and `month`. |
| |
| ```{r, eval = file.exists("nyc-taxi")} |
| ds |
| ``` |
| ```{r, echo = FALSE, eval = !file.exists("nyc-taxi")} |
| cat(" |
| FileSystemDataset with 125 Parquet files |
| vendor_id: string |
| pickup_at: timestamp[us] |
| dropoff_at: timestamp[us] |
| passenger_count: int8 |
| trip_distance: float |
| pickup_longitude: float |
| pickup_latitude: float |
| rate_code_id: null |
| store_and_fwd_flag: string |
| dropoff_longitude: float |
| dropoff_latitude: float |
| payment_type: string |
| fare_amount: float |
| extra: float |
| mta_tax: float |
| tip_amount: float |
| tolls_amount: float |
| total_amount: float |
| year: int32 |
| month: int32 |
| |
| See $metadata for additional Schema metadata |
| ") |
| ``` |
| |
| The other form of partitioning currently supported is [Hive](https://hive.apache.org/)-style, |
| in which the partition variable names are included in the path segments. |
| If we had saved our files in paths like |
| |
| ``` |
| year=2009/month=01/data.parquet |
| year=2009/month=02/data.parquet |
| ... |
| ``` |
| |
| we would not have had to provide the names in `partitioning`: |
| we could have just called `ds <- open_dataset("nyc-taxi")` and the partitions |
| would have been detected automatically. |
| |
| ## Querying the dataset |
| |
| Up to this point, we haven't loaded any data: we have walked directories to find |
| files, we've parsed file paths to identify partitions, and we've read the |
| headers of the Parquet files to inspect their schemas so that we can make sure |
| they all line up. |
| |
| In the current release, `arrow` supports the dplyr verbs `mutate()`, |
| `transmute()`, `select()`, `rename()`, `relocate()`, `filter()`, and |
| `arrange()`. Aggregation is not yet supported, so before you call `summarise()` |
| or other verbs with aggregate functions, use `collect()` to pull the selected |
| subset of the data into an in-memory R data frame. |
| |
| If you attempt to call unsupported `dplyr` verbs or unimplemented functions in |
| your query on an Arrow Dataset, the `arrow` package raises an error. However, |
| for `dplyr` queries on `Table` objects (which are typically smaller in size) the |
| package automatically calls `collect()` before processing that `dplyr` verb. |
| |
| Here's an example. Suppose I was curious about tipping behavior among the |
| longest taxi rides. Let's find the median tip percentage for rides with |
| fares greater than $100 in 2015, broken down by the number of passengers: |
| |
| ```{r, eval = file.exists("nyc-taxi")} |
| system.time(ds %>% |
| filter(total_amount > 100, year == 2015) %>% |
| select(tip_amount, total_amount, passenger_count) %>% |
| mutate(tip_pct = 100 * tip_amount / total_amount) %>% |
| group_by(passenger_count) %>% |
| collect() %>% |
| summarise( |
| median_tip_pct = median(tip_pct), |
| n = n() |
| ) %>% |
| print()) |
| ``` |
| |
| ```{r, echo = FALSE, eval = !file.exists("nyc-taxi")} |
| cat(" |
| # A tibble: 10 x 3 |
| passenger_count median_tip_pct n |
| <int> <dbl> <int> |
| 1 0 9.84 380 |
| 2 1 16.7 143087 |
| 3 2 16.6 34418 |
| 4 3 14.4 8922 |
| 5 4 11.4 4771 |
| 6 5 16.7 5806 |
| 7 6 16.7 3338 |
| 8 7 16.7 11 |
| 9 8 16.7 32 |
| 10 9 16.7 42 |
| |
| user system elapsed |
| 4.436 1.012 1.402 |
| ") |
| ``` |
| |
| We just selected a subset out of a dataset with around 2 billion rows, computed |
| a new column, and aggregated on it in under 2 seconds on my laptop. How does |
| this work? |
| |
| First, |
| `mutate()`/`transmute()`, `select()`/`rename()`/`relocate()`, `filter()`, |
| `group_by()`, and `arrange()` record their actions but don't evaluate on the |
| data until you run `collect()`. |
| |
| ```{r, eval = file.exists("nyc-taxi")} |
| ds %>% |
| filter(total_amount > 100, year == 2015) %>% |
| select(tip_amount, total_amount, passenger_count) %>% |
| mutate(tip_pct = 100 * tip_amount / total_amount) %>% |
| group_by(passenger_count) |
| ``` |
| |
| ```{r, echo = FALSE, eval = !file.exists("nyc-taxi")} |
| cat(" |
| FileSystemDataset (query) |
| tip_amount: float |
| total_amount: float |
| passenger_count: int8 |
| tip_pct: expr |
| |
| * Filter: ((total_amount > 100) and (year == 2015)) |
| * Grouped by passenger_count |
| See $.data for the source Arrow object |
| ") |
| ``` |
| |
| This returns instantly and shows the manipulations you've made, without |
| loading data from the files. Because the evaluation of these queries is deferred, |
| you can build up a query that selects down to a small subset without generating |
| intermediate datasets that would potentially be large. |
| |
| Second, all work is pushed down to the individual data files, |
| and depending on the file format, chunks of data within the files. As a result, |
| we can select a subset of data from a much larger dataset by collecting the |
| smaller slices from each file--we don't have to load the whole dataset in memory |
| in order to slice from it. |
| |
| Third, because of partitioning, we can ignore some files entirely. |
| In this example, by filtering `year == 2015`, all files corresponding to other years |
| are immediately excluded: we don't have to load them in order to find that no |
| rows match the filter. Relatedly, since Parquet files contain row groups with |
| statistics on the data within, there may be entire chunks of data we can |
| avoid scanning because they have no rows where `total_amount > 100`. |
| |
| ## More dataset options |
| |
| There are a few ways you can control the Dataset creation to adapt to special use cases. |
| For one, if you are working with a single file or a set of files that are not |
| all in the same directory, you can provide a file path or a vector of multiple |
| file paths to `open_dataset()`. This is useful if, for example, you have a |
| single CSV file that is too big to read into memory. You could pass the file |
| path to `open_dataset()`, use `group_by()` to partition the Dataset into |
| manageable chunks, then use `write_dataset()` to write each chunk to a separate |
| Parquet file---all without needing to read the full CSV file into R. |
| |
| You can specify a `schema` argument to `open_dataset()` to declare the columns |
| and their data types. This is useful if you have data files that have different |
| storage schema (for example, a column could be `int32` in one and `int8` in another) |
| and you want to ensure that the resulting Dataset has a specific type. |
| To be clear, it's not necessary to specify a schema, even in this example of |
| mixed integer types, because the Dataset constructor will reconcile differences like these. |
| The schema specification just lets you declare what you want the result to be. |
| |
| Similarly, you can provide a Schema in the `partitioning` argument of `open_dataset()` |
| in order to declare the types of the virtual columns that define the partitions. |
| This would be useful, in our taxi dataset example, if you wanted to keep |
| `month` as a string instead of an integer for some reason. |
| |
| Another feature of Datasets is that they can be composed of multiple data sources. |
| That is, you may have a directory of partitioned Parquet files in one location, |
| and in another directory, files that haven't been partitioned. |
| Or, you could point to an S3 bucket of Parquet data and a directory |
| of CSVs on the local file system and query them together as a single dataset. |
| To create a multi-source dataset, provide a list of datasets to `open_dataset()` |
| instead of a file path, or simply concatenate them like `big_dataset <- c(ds1, ds2)`. |
| |
| ## Writing datasets |
| |
| As you can see, querying a large dataset can be made quite fast by storage in an |
| efficient binary columnar format like Parquet or Feather and partitioning based on |
| columns commonly used for filtering. However, we don't always get our data delivered |
| to us that way. Sometimes we start with one giant CSV. Our first step in analyzing data |
| is cleaning is up and reshaping it into a more usable form. |
| |
| The `write_dataset()` function allows you to take a Dataset or other tabular data object---an Arrow `Table` or `RecordBatch`, or an R `data.frame`---and write it to a different file format, partitioned into multiple files. |
| |
| Assume we have a version of the NYC Taxi data as CSV: |
| |
| ```r |
| ds <- open_dataset("nyc-taxi/csv/", format = "csv") |
| ``` |
| |
| We can write it to a new location and translate the files to the Feather format |
| by calling `write_dataset()` on it: |
| |
| ```r |
| write_dataset(ds, "nyc-taxi/feather", format = "feather") |
| ``` |
| |
| Next, let's imagine that the `payment_type` column is something we often filter |
| on, so we want to partition the data by that variable. By doing so we ensure |
| that a filter like `payment_type == "Cash"` will touch only a subset of files |
| where `payment_type` is always `"Cash"`. |
| |
| One natural way to express the columns you want to partition on is to use the |
| `group_by()` method: |
| |
| ```r |
| ds %>% |
| group_by(payment_type) %>% |
| write_dataset("nyc-taxi/feather", format = "feather") |
| ``` |
| |
| This will write files to a directory tree that looks like this: |
| |
| ```r |
| system("tree nyc-taxi/feather") |
| ``` |
| |
| ``` |
| ## feather |
| ## ├── payment_type=1 |
| ## │ └── part-18.feather |
| ## ├── payment_type=2 |
| ## │ └── part-19.feather |
| ## ... |
| ## └── payment_type=UNK |
| ## └── part-17.feather |
| ## |
| ## 18 directories, 23 files |
| ``` |
| |
| Note that the directory names are `payment_type=Cash` and similar: |
| this is the Hive-style partitioning described above. This means that when |
| we call `open_dataset()` on this directory, we don't have to declare what the |
| partitions are because they can be read from the file paths. |
| (To instead write bare values for partition segments, i.e. `Cash` rather than |
| `payment_type=Cash`, call `write_dataset()` with `hive_style = FALSE`.) |
| |
| Perhaps, though, `payment_type == "Cash"` is the only data we ever care about, |
| and we just want to drop the rest and have a smaller working set. |
| For this, we can `filter()` them out when writing: |
| |
| ```r |
| ds %>% |
| filter(payment_type == "Cash") %>% |
| write_dataset("nyc-taxi/feather", format = "feather") |
| ``` |
| |
| The other thing we can do when writing datasets is select a subset of and/or reorder |
| columns. Suppose we never care about `vendor_id`, and being a string column, |
| it can take up a lot of space when we read it in, so let's drop it: |
| |
| ```r |
| ds %>% |
| group_by(payment_type) %>% |
| select(-vendor_id) %>% |
| write_dataset("nyc-taxi/feather", format = "feather") |
| ``` |
| |
| Note that while you can select a subset of columns, |
| you cannot currently rename columns when writing a dataset. |