blob: b2f2a3779fed8e816bf292ce8190ac3419f1c5f9 [file] [log] [blame]
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Reading and Writing Data - Multiple Files
## Introduction
When reading files into R using Apache Arrow, you can read:
* a single file into memory as a data frame or an Arrow Table
* a single file that is too large to fit in memory as an Arrow Dataset
* multiple and partitioned files as an Arrow Dataset
This chapter contains recipes related to using Apache Arrow to read and
write files too large for memory and multiple or partitioned files as an
Arrow Dataset. There are a number of
circumstances in which you may want to read in the data as an Arrow Dataset:
* your single data file is too large to load into memory
* your data are partitioned among numerous files
* you want faster performance from your `dplyr` queries
* you want to be able to take advantage of Arrow's compute functions
It is possible to read in partitioned data in Parquet, Feather (also known as Arrow IPC), and CSV or
other text-delimited formats. If you are choosing a partitioned multiple file format, we
recommend Parquet or Feather (Arrow IPC ), both of which can have improved performance
when compared to CSVs due to their capabilities around metadata and compression.
## Write data to disk - Parquet
You want to write data to disk in a single Parquet file.
### Solution
```{r, write_dataset_basic}
write_dataset(dataset = airquality, path = "airquality_data")
```
```{r, test_write_dataset_basic, opts.label = "test"}
test_that("write_dataset_basic works as expected", {
expect_true(file.exists("airquality_data"))
expect_length(list.files("airquality_data"), 1)
})
```
### Discussion
The default format for `open_dataset()` and `write_dataset()` is Parquet.
## Write partitioned data - Parquet
You want to save multiple Parquet data files to disk in partitions based on columns in the data.
### Solution
```{r, write_dataset}
write_dataset(airquality, "airquality_partitioned", partitioning = c("Month"))
```
```{r, test_write_dataset, opts.label = "test"}
test_that("write_dataset chunk works as expected", {
# Partition by month
expect_identical(list.files("airquality_partitioned"), c("Month=5", "Month=6", "Month=7", "Month=8", "Month=9"))
# We have enough files
expect_equal(length(list.files("airquality_partitioned", recursive = TRUE)), 5)
})
```
As you can see, this has created folders based on the supplied partition variable `Month`.
```{r}
list.files("airquality_partitioned")
```
### Discussion
The data is written to separate folders based on the values in the `Month`
column. The default behaviour is to use Hive-style (i.e. "col_name=value" folder names)
partitions.
```{r}
# Take a look at the files in this directory
list.files("airquality_partitioned", recursive = TRUE)
```
You can specify multiple partitioning variables to add extra levels of partitioning.
```{r, write_dataset_partitioned_deeper}
write_dataset(airquality, "airquality_partitioned_deeper", partitioning = c("Month", "Day"))
list.files("airquality_partitioned_deeper")
```
```{r, test_write_dataset_partitioned_deeper, opts.label = "test"}
test_that("write_dataset_partitioned_deeper works as expected", {
expect_true(file.exists("airquality_partitioned_deeper"))
expect_length(list.files("airquality_partitioned_deeper", recursive = TRUE), 153)
})
```
If you take a look in one of these folders, you will see that the data is then partitioned by the second partition variable, `Day`.
```{r}
# Take a look at the files in this directory
list.files("airquality_partitioned_deeper/Month=5", recursive = TRUE)
```
There are two different ways to specify variables to use for partitioning -
either via the `partitioning` variable as above, or by using `dplyr::group_by()` on your data - the group variables will form the partitions.
```{r, write_dataset_partitioned_groupby}
write_dataset(dataset = group_by(airquality, Month, Day),
path = "airquality_groupby")
```
```{r, test_write_dataset_partitioned_groupby, opts.label = "test"}
test_that("write_dataset_partitioned_groupby works as expected", {
expect_true(file.exists("airquality_groupby"))
expect_length(list.files("airquality_groupby", recursive = TRUE), 153)
})
```
```{r}
# Take a look at the files in this directory
list.files("airquality_groupby", recursive = TRUE)
```
Each of these folders contains 1 or more Parquet files containing the relevant partition of the data.
```{r}
list.files("airquality_groupby/Month=5/Day=10")
```
Note that when there was an `NA` value in the partition column,
these values are written to the `col_name=__HIVE_DEFAULT_PARTITION__`
directory.
## Read partitioned data
You want to read partitioned data files as an Arrow Dataset.
### Solution
```{r, open_dataset}
# Read data from directory
air_data <- open_dataset("airquality_partitioned_deeper")
# View data
air_data
```
```{r, test_open_dataset, opts.label = "test"}
test_that("open_dataset chunk works as expected", {
expect_equal(nrow(air_data), 153)
expect_equal(tibble::as_tibble(arrange(collect(air_data), Month, Day)), arrange(tibble::as_tibble(airquality), Month, Day))
})
```
### Discussion
Partitioning allows you to split data across
multiple files and folders, avoiding problems associated with storing all your data
in a single file. This can provide further advantages when using Arrow, as Arrow will only
read in the necessary partitioned files needed for any given analysis.
## Write data to disk - Feather/Arrow IPC format
You want to write data to disk in a single Feather/Arrow IPC file.
### Solution
```{r, write_dataset_feather}
write_dataset(dataset = airquality,
path = "airquality_data_feather",
format = "feather")
```
```{r, test_write_dataset_feather, opts.label = "test"}
test_that("write_dataset_feather works as expected", {
expect_true(file.exists("airquality_data_feather"))
expect_length(list.files("airquality_data_feather"), 1)
})
```
## Read in Feather/Arrow IPC data as an Arrow Dataset
You want to read in Feather/Arrow IPC data as an Arrow Dataset
### Solution
```{r, read_arrow_datset}
# write Arrow file to use in this example
write_dataset(dataset = airquality,
path = "airquality_data_arrow",
format = "arrow")
# read into R
open_dataset("airquality_data_arrow", format = "arrow")
```
```{r, test_read_arrow_datset, opts.label = "test"}
test_that("read_arrow_dataset works as expected", {
dataset <- open_dataset("airquality_data_arrow", format = "arrow")
expect_s3_class(dataset, "FileSystemDataset")
expect_identical(dim(dataset), c(153L, 6L))
})
```
## Write data to disk - CSV format
You want to write data to disk in a single CSV file.
### Solution
```{r, write_dataset_csv}
write_dataset(dataset = airquality,
path = "airquality_data_csv",
format = "csv")
```
```{r, test_write_dataset_csv, opts.label = "test"}
test_that("write_dataset_csv works as expected", {
expect_true(file.exists("airquality_data_csv"))
expect_length(list.files("airquality_data_csv"), 1)
})
```
## Read in CSV data as an Arrow Dataset
You want to read in CSV data as an Arrow Dataset
### Solution
```{r, read_csv_datset}
# write CSV file to use in this example
write_dataset(dataset = airquality,
path = "airquality_data_csv",
format = "csv")
# read into R
open_dataset("airquality_data_csv", format = "csv")
```
```{r, test_read_csv_datset, opts.label = "test"}
test_that("read_csv_dataset works as expected", {
dataset <- open_dataset("airquality_data_csv", format = "csv")
expect_s3_class(dataset, "FileSystemDataset")
expect_identical(dim(dataset), c(153L, 6L))
})
```
## Read in a CSV dataset (no headers)
You want to read in a dataset containing CSVs with no headers
### Solution
```{r, read_headerless_csv_datset}
# write CSV file to use in this example
dataset_1 <- airquality[1:40, c("Month", "Day", "Temp")]
dataset_2 <- airquality[41:80, c("Month", "Day", "Temp")]
dir.create("airquality")
write.table(dataset_1, "airquality/part-1.csv", sep = ",", row.names = FALSE, col.names = FALSE)
write.table(dataset_2, "airquality/part-2.csv", sep = ",", row.names = FALSE, col.names = FALSE)
# read into R
open_dataset("airquality", format = "csv", column_names = c("Month", "Day", "Temp"))
```
```{r, test_read_headerless_csv_datset, opts.label = "test"}
test_that("read_headerless_csv_datset works as expected", {
data_in <- open_dataset("airquality", format = "csv", column_names = c("Month", "Day", "Temp"))
expect_s3_class(data_in, "FileSystemDataset")
expect_identical(dim(data_in), c(80L, 3L))
expect_named(data_in, c("Month", "Day", "Temp"))
})
```
### Discussion
If your dataset is made up of headerless CSV files, you must supply the names of
each column. You can do this in multiple ways - either via the `column_names`
parameter (as shown above) or via a schema:
```{r, read_headerless_csv_datset_schema}
open_dataset("airquality", format = "csv", schema = schema("Month" = int32(), "Day" = int32(), "Temp" = int32()))
```
```{r, test_read_headerless_csv_datset_schema, opts.label = "test"}
test_that("read_headerless_csv_datset_schema works as expected", {
data_in <- open_dataset("airquality", format = "csv", schema = schema("Month" = int32(), "Day" = int32(), "Temp" = int32()))
expect_s3_class(data_in, "FileSystemDataset")
expect_identical(dim(data_in), c(80L, 3L))
expect_named(data_in, c("Month", "Day", "Temp"))
expect_equal(data_in$schema, schema("Month" = int32(), "Day" = int32(), "Temp" = int32()))
})
```
One additional advantage of using a schema is that you also have control of the
data types of the columns. If you provide both column names and a schema, the values
in `column_names` must match the `schema` field names.
## Write compressed partitioned data
You want to save partitioned files, compressed with a specified compression algorithm.
### Solution
```{r, dataset_gzip}
# Create a temporary directory
td <- tempfile()
dir.create(td)
# Write dataset to file
write_dataset(iris, path = td, compression = "gzip")
```
```{r}
# View files in the directory
list.files(td, recursive = TRUE)
```
```{r, test_dataset_gzip, opts.label = "test"}
test_that("dataset_gzip", {
expect_true(file.exists(file.path(td, "part-0.parquet")))
})
```
### Discussion
You can supply the `compression` argument to `write_dataset()` as long as
the compression algorithm is compatible with the chosen format. See `?write_dataset()`
for more information on supported compression algorithms and default settings.
## Read compressed data
You want to read in data which has been compressed.
### Solution
```{r, opendataset_compressed}
# Create a temporary directory
td <- tempfile()
dir.create(td)
# Write dataset to file
write_dataset(iris, path = td, compression = "gzip")
# Read in data
ds <- open_dataset(td) %>%
collect()
ds
```
```{r, test_opendataset_compressed, opts.label = "test"}
test_that("opendataset_compressed", {
expect_s3_class(ds, "data.frame")
expect_named(
ds,
c("Sepal.Length", "Sepal.Width", "Petal.Length", "Petal.Width", "Species")
)
})
```
### Discussion
Note that Arrow automatically detects the compression and you do not have to
supply it in the call to `open_dataset()` or the `read_*()` functions.
```{r cleanup_multifile, include = FALSE}
#cleanup
unlink("airquality", recursive = TRUE)
unlink("airquality_data_csv", recursive = TRUE)
unlink("airquality_data", recursive = TRUE)
unlink("airquality_data_arrow", recursive = TRUE)
unlink("airquality_data_feather", recursive = TRUE)
unlink("airquality_partitioned", recursive = TRUE)
unlink("airquality_groupby", recursive = TRUE)
unlink("airquality_partitioned_deeper", recursive = TRUE)
```