| <!--- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| # Reading and Writing Data - Multiple Files |
| |
| ## Introduction |
| |
| When reading files into R using Apache Arrow, you can read: |
| |
| * a single file into memory as a data frame or an Arrow Table |
| * a single file that is too large to fit in memory as an Arrow Dataset |
| * multiple and partitioned files as an Arrow Dataset |
| |
| This chapter contains recipes related to using Apache Arrow to read and |
| write files too large for memory and multiple or partitioned files as an |
| Arrow Dataset. There are a number of |
| circumstances in which you may want to read in the data as an Arrow Dataset: |
| |
| * your single data file is too large to load into memory |
| * your data are partitioned among numerous files |
| * you want faster performance from your `dplyr` queries |
| * you want to be able to take advantage of Arrow's compute functions |
| |
| It is possible to read in partitioned data in Parquet, Feather (also known as Arrow IPC), and CSV or |
| other text-delimited formats. If you are choosing a partitioned multiple file format, we |
| recommend Parquet or Feather (Arrow IPC ), both of which can have improved performance |
| when compared to CSVs due to their capabilities around metadata and compression. |
| |
| ## Write data to disk - Parquet |
| |
| You want to write data to disk in a single Parquet file. |
| |
| ### Solution |
| |
| ```{r, write_dataset_basic} |
| write_dataset(dataset = airquality, path = "airquality_data") |
| ``` |
| |
| ```{r, test_write_dataset_basic, opts.label = "test"} |
| test_that("write_dataset_basic works as expected", { |
| expect_true(file.exists("airquality_data")) |
| expect_length(list.files("airquality_data"), 1) |
| }) |
| ``` |
| |
| ### Discussion |
| |
| The default format for `open_dataset()` and `write_dataset()` is Parquet. |
| |
| ## Write partitioned data - Parquet |
| |
| You want to save multiple Parquet data files to disk in partitions based on columns in the data. |
| |
| ### Solution |
| |
| ```{r, write_dataset} |
| write_dataset(airquality, "airquality_partitioned", partitioning = c("Month")) |
| ``` |
| |
| ```{r, test_write_dataset, opts.label = "test"} |
| test_that("write_dataset chunk works as expected", { |
| # Partition by month |
| expect_identical(list.files("airquality_partitioned"), c("Month=5", "Month=6", "Month=7", "Month=8", "Month=9")) |
| # We have enough files |
| expect_equal(length(list.files("airquality_partitioned", recursive = TRUE)), 5) |
| }) |
| ``` |
| |
| As you can see, this has created folders based on the supplied partition variable `Month`. |
| |
| ```{r} |
| list.files("airquality_partitioned") |
| ``` |
| |
| ### Discussion |
| |
| The data is written to separate folders based on the values in the `Month` |
| column. The default behaviour is to use Hive-style (i.e. "col_name=value" folder names) |
| partitions. |
| |
| ```{r} |
| # Take a look at the files in this directory |
| list.files("airquality_partitioned", recursive = TRUE) |
| ``` |
| |
| You can specify multiple partitioning variables to add extra levels of partitioning. |
| |
| ```{r, write_dataset_partitioned_deeper} |
| write_dataset(airquality, "airquality_partitioned_deeper", partitioning = c("Month", "Day")) |
| list.files("airquality_partitioned_deeper") |
| ``` |
| |
| ```{r, test_write_dataset_partitioned_deeper, opts.label = "test"} |
| test_that("write_dataset_partitioned_deeper works as expected", { |
| expect_true(file.exists("airquality_partitioned_deeper")) |
| expect_length(list.files("airquality_partitioned_deeper", recursive = TRUE), 153) |
| }) |
| ``` |
| |
| If you take a look in one of these folders, you will see that the data is then partitioned by the second partition variable, `Day`. |
| |
| ```{r} |
| # Take a look at the files in this directory |
| list.files("airquality_partitioned_deeper/Month=5", recursive = TRUE) |
| ``` |
| |
| There are two different ways to specify variables to use for partitioning - |
| either via the `partitioning` variable as above, or by using `dplyr::group_by()` on your data - the group variables will form the partitions. |
| |
| ```{r, write_dataset_partitioned_groupby} |
| write_dataset(dataset = group_by(airquality, Month, Day), |
| path = "airquality_groupby") |
| ``` |
| |
| ```{r, test_write_dataset_partitioned_groupby, opts.label = "test"} |
| test_that("write_dataset_partitioned_groupby works as expected", { |
| expect_true(file.exists("airquality_groupby")) |
| expect_length(list.files("airquality_groupby", recursive = TRUE), 153) |
| }) |
| |
| ``` |
| |
| ```{r} |
| # Take a look at the files in this directory |
| list.files("airquality_groupby", recursive = TRUE) |
| ``` |
| |
| Each of these folders contains 1 or more Parquet files containing the relevant partition of the data. |
| |
| ```{r} |
| list.files("airquality_groupby/Month=5/Day=10") |
| ``` |
| |
| Note that when there was an `NA` value in the partition column, |
| these values are written to the `col_name=__HIVE_DEFAULT_PARTITION__` |
| directory. |
| |
| |
| ## Read partitioned data |
| |
| You want to read partitioned data files as an Arrow Dataset. |
| |
| ### Solution |
| |
| ```{r, open_dataset} |
| # Read data from directory |
| air_data <- open_dataset("airquality_partitioned_deeper") |
| |
| # View data |
| air_data |
| ``` |
| ```{r, test_open_dataset, opts.label = "test"} |
| test_that("open_dataset chunk works as expected", { |
| expect_equal(nrow(air_data), 153) |
| expect_equal(tibble::as_tibble(arrange(collect(air_data), Month, Day)), arrange(tibble::as_tibble(airquality), Month, Day)) |
| }) |
| ``` |
| |
| ### Discussion |
| |
| Partitioning allows you to split data across |
| multiple files and folders, avoiding problems associated with storing all your data |
| in a single file. This can provide further advantages when using Arrow, as Arrow will only |
| read in the necessary partitioned files needed for any given analysis. |
| |
| ## Write data to disk - Feather/Arrow IPC format |
| |
| You want to write data to disk in a single Feather/Arrow IPC file. |
| |
| ### Solution |
| |
| ```{r, write_dataset_feather} |
| write_dataset(dataset = airquality, |
| path = "airquality_data_feather", |
| format = "feather") |
| ``` |
| ```{r, test_write_dataset_feather, opts.label = "test"} |
| test_that("write_dataset_feather works as expected", { |
| expect_true(file.exists("airquality_data_feather")) |
| expect_length(list.files("airquality_data_feather"), 1) |
| }) |
| ``` |
| |
| ## Read in Feather/Arrow IPC data as an Arrow Dataset |
| |
| You want to read in Feather/Arrow IPC data as an Arrow Dataset |
| |
| ### Solution |
| |
| ```{r, read_arrow_datset} |
| # write Arrow file to use in this example |
| write_dataset(dataset = airquality, |
| path = "airquality_data_arrow", |
| format = "arrow") |
| |
| # read into R |
| open_dataset("airquality_data_arrow", format = "arrow") |
| ``` |
| |
| ```{r, test_read_arrow_datset, opts.label = "test"} |
| test_that("read_arrow_dataset works as expected", { |
| dataset <- open_dataset("airquality_data_arrow", format = "arrow") |
| expect_s3_class(dataset, "FileSystemDataset") |
| expect_identical(dim(dataset), c(153L, 6L)) |
| }) |
| ``` |
| |
| ## Write data to disk - CSV format |
| |
| You want to write data to disk in a single CSV file. |
| |
| ### Solution |
| |
| ```{r, write_dataset_csv} |
| write_dataset(dataset = airquality, |
| path = "airquality_data_csv", |
| format = "csv") |
| ``` |
| |
| ```{r, test_write_dataset_csv, opts.label = "test"} |
| test_that("write_dataset_csv works as expected", { |
| expect_true(file.exists("airquality_data_csv")) |
| expect_length(list.files("airquality_data_csv"), 1) |
| }) |
| ``` |
| |
| |
| ## Read in CSV data as an Arrow Dataset |
| |
| You want to read in CSV data as an Arrow Dataset |
| |
| ### Solution |
| |
| ```{r, read_csv_datset} |
| # write CSV file to use in this example |
| write_dataset(dataset = airquality, |
| path = "airquality_data_csv", |
| format = "csv") |
| |
| # read into R |
| open_dataset("airquality_data_csv", format = "csv") |
| ``` |
| |
| ```{r, test_read_csv_datset, opts.label = "test"} |
| test_that("read_csv_dataset works as expected", { |
| dataset <- open_dataset("airquality_data_csv", format = "csv") |
| expect_s3_class(dataset, "FileSystemDataset") |
| expect_identical(dim(dataset), c(153L, 6L)) |
| }) |
| ``` |
| |
| ## Read in a CSV dataset (no headers) |
| |
| You want to read in a dataset containing CSVs with no headers |
| |
| ### Solution |
| |
| ```{r, read_headerless_csv_datset} |
| # write CSV file to use in this example |
| dataset_1 <- airquality[1:40, c("Month", "Day", "Temp")] |
| dataset_2 <- airquality[41:80, c("Month", "Day", "Temp")] |
| |
| dir.create("airquality") |
| write.table(dataset_1, "airquality/part-1.csv", sep = ",", row.names = FALSE, col.names = FALSE) |
| write.table(dataset_2, "airquality/part-2.csv", sep = ",", row.names = FALSE, col.names = FALSE) |
| |
| # read into R |
| open_dataset("airquality", format = "csv", column_names = c("Month", "Day", "Temp")) |
| ``` |
| |
| ```{r, test_read_headerless_csv_datset, opts.label = "test"} |
| test_that("read_headerless_csv_datset works as expected", { |
| data_in <- open_dataset("airquality", format = "csv", column_names = c("Month", "Day", "Temp")) |
| expect_s3_class(data_in, "FileSystemDataset") |
| expect_identical(dim(data_in), c(80L, 3L)) |
| expect_named(data_in, c("Month", "Day", "Temp")) |
| }) |
| ``` |
| |
| ### Discussion |
| |
| If your dataset is made up of headerless CSV files, you must supply the names of |
| each column. You can do this in multiple ways - either via the `column_names` |
| parameter (as shown above) or via a schema: |
| |
| ```{r, read_headerless_csv_datset_schema} |
| open_dataset("airquality", format = "csv", schema = schema("Month" = int32(), "Day" = int32(), "Temp" = int32())) |
| ``` |
| |
| ```{r, test_read_headerless_csv_datset_schema, opts.label = "test"} |
| test_that("read_headerless_csv_datset_schema works as expected", { |
| data_in <- open_dataset("airquality", format = "csv", schema = schema("Month" = int32(), "Day" = int32(), "Temp" = int32())) |
| expect_s3_class(data_in, "FileSystemDataset") |
| expect_identical(dim(data_in), c(80L, 3L)) |
| expect_named(data_in, c("Month", "Day", "Temp")) |
| expect_equal(data_in$schema, schema("Month" = int32(), "Day" = int32(), "Temp" = int32())) |
| }) |
| ``` |
| |
| One additional advantage of using a schema is that you also have control of the |
| data types of the columns. If you provide both column names and a schema, the values |
| in `column_names` must match the `schema` field names. |
| |
| |
| ## Write compressed partitioned data |
| |
| You want to save partitioned files, compressed with a specified compression algorithm. |
| |
| ### Solution |
| |
| ```{r, dataset_gzip} |
| # Create a temporary directory |
| td <- tempfile() |
| dir.create(td) |
| |
| # Write dataset to file |
| write_dataset(iris, path = td, compression = "gzip") |
| ``` |
| |
| ```{r} |
| # View files in the directory |
| list.files(td, recursive = TRUE) |
| ``` |
| ```{r, test_dataset_gzip, opts.label = "test"} |
| test_that("dataset_gzip", { |
| expect_true(file.exists(file.path(td, "part-0.parquet"))) |
| }) |
| ``` |
| |
| ### Discussion |
| |
| You can supply the `compression` argument to `write_dataset()` as long as |
| the compression algorithm is compatible with the chosen format. See `?write_dataset()` |
| for more information on supported compression algorithms and default settings. |
| |
| ## Read compressed data |
| |
| You want to read in data which has been compressed. |
| |
| ### Solution |
| |
| ```{r, opendataset_compressed} |
| # Create a temporary directory |
| td <- tempfile() |
| dir.create(td) |
| |
| # Write dataset to file |
| write_dataset(iris, path = td, compression = "gzip") |
| |
| # Read in data |
| ds <- open_dataset(td) %>% |
| collect() |
| |
| ds |
| ``` |
| |
| ```{r, test_opendataset_compressed, opts.label = "test"} |
| test_that("opendataset_compressed", { |
| expect_s3_class(ds, "data.frame") |
| expect_named( |
| ds, |
| c("Sepal.Length", "Sepal.Width", "Petal.Length", "Petal.Width", "Species") |
| ) |
| }) |
| ``` |
| |
| ### Discussion |
| |
| Note that Arrow automatically detects the compression and you do not have to |
| supply it in the call to `open_dataset()` or the `read_*()` functions. |
| |
| |
| ```{r cleanup_multifile, include = FALSE} |
| #cleanup |
| unlink("airquality", recursive = TRUE) |
| unlink("airquality_data_csv", recursive = TRUE) |
| unlink("airquality_data", recursive = TRUE) |
| unlink("airquality_data_arrow", recursive = TRUE) |
| unlink("airquality_data_feather", recursive = TRUE) |
| unlink("airquality_partitioned", recursive = TRUE) |
| unlink("airquality_groupby", recursive = TRUE) |
| unlink("airquality_partitioned_deeper", recursive = TRUE) |
| ``` |