blob: b4c2d92d646244bf7df78feca5d5d799d0af1580 [file] [log] [blame]
# Reading and Writing Data
## Introduction
This chapter contains recipes related to reading and writing data using Apache
Arrow. When reading files into R using Apache Arrow, you can choose to read in
your file as either a data frame or as an Arrow Table object.
There are a number of circumstances in which you may want to read in the data as an Arrow Table:
* your dataset is large and if you load it into memory, it may lead to performance issues
* you want faster performance from your `dplyr` queries
* you want to be able to take advantage of Arrow's compute functions
## Convert from a data frame to an Arrow Table
You want to convert an existing `data.frame` or `tibble` object into an Arrow Table.
### Solution
```{r, table_create_from_df}
air_table <- Table$create(airquality)
air_table
```
```{r, test_table_create_from_df, opts.label = "test"}
test_that("table_create_from_df chunk works as expected", {
expect_s3_class(air_table, "Table")
})
```
## Convert data from an Arrow Table to a data frame
You want to convert an Arrow Table to a data frame to view the data or work with it
in your usual analytics pipeline.
### Solution
```{r, asdf_table}
air_df <- as.data.frame(air_table)
air_df
```
```{r, test_asdf_table, opts.label = "test"}
test_that("asdf_table chunk works as expected", {
expect_identical(air_df, airquality)
})
```
### Discussion
You can use either `as.data.frame()` or `dplyr::collect()` to do this.
## Write a Parquet file
You want to write Parquet files to disk.
### Solution
```{r, write_parquet}
# Create table
my_table <- Table$create(data.frame(group = c("A", "B", "C"), score = c(99, 97, 99)))
# Write to Parquet
write_parquet(my_table, "my_table.parquet")
```
```{r, test_write_parquet, opts.label = "test"}
test_that("write_parquet chunk works as expected", {
expect_true(file.exists("my_table.parquet"))
})
```
## Read a Parquet file
You want to read a Parquet file.
### Solution
```{r, read_parquet}
parquet_tbl <- read_parquet("my_table.parquet")
parquet_tbl
```
```{r, test_read_parquet, opts.label = "test"}
test_that("read_parquet works as expected", {
expect_identical(parquet_tbl, data.frame(group = c("A", "B", "C"), score = c(99, 97, 99)))
})
```
As the argument `as_data_frame` was left set to its default value of `TRUE`, the file was read in as a `data.frame` object.
```{r, read_parquet_2}
class(parquet_tbl)
```
```{r, test_read_parquet_2, opts.label = "test"}
test_that("read_parquet_2 works as expected", {
expect_s3_class(parquet_tbl, "data.frame")
})
```
### Discussion
If you set `as_data_frame` to `FALSE`, the file will be read in as an Arrow Table.
```{r, read_parquet_table}
my_table_arrow <- read_parquet("my_table.parquet", as_data_frame = FALSE)
my_table_arrow
```
```{r, read_parquet_table_class}
class(my_table_arrow)
```
```{r, test_read_parquet_table_class, opts.label = "test"}
test_that("read_parquet_table_class works as expected", {
expect_s3_class(my_table_arrow, "Table")
})
```
## Read a Parquet file from S3
You want to read a Parquet file from S3.
### Solution
```{r, read_parquet_s3, eval = FALSE}
df <- read_parquet(file = "s3://ursa-labs-taxi-data/2019/06/data.parquet")
```
### See also
For more in-depth instructions, including how to work with S3 buckets which require authentication, you can find a guide to reading and writing to/from S3 buckets here: https://arrow.apache.org/docs/r/articles/fs.html.
## Filter columns while reading a Parquet file
You want to specify which columns to include when reading in a Parquet file.
### Solution
```{r, read_parquet_filter}
# Create table to read back in
dist_time <- Table$create(data.frame(distance = c(12.2, 15.7, 14.2), time = c(43, 44, 40)))
# Write to Parquet
write_parquet(dist_time, "dist_time.parquet")
# Read in only the "time" column
time_only <- read_parquet("dist_time.parquet", col_select = "time")
time_only
```
```{r, test_read_parquet_filter, opts.label = "test"}
test_that("read_parquet_filter works as expected", {
expect_identical(time_only, data.frame(time = c(43, 44, 40)))
})
```
## Write an IPC/Feather V2 file
You want to read in a Feather file.
### Solution
```{r, write_feather}
my_table <- Table$create(data.frame(group = c("A", "B", "C"), score = c(99, 97, 99)))
write_feather(my_table, "my_table.arrow")
```
```{r, test_write_feather, opts.label = "test"}
test_that("write_feather chunk works as expected", {
expect_true(file.exists("my_table.arrow"))
})
```
### Discussion
For legacy support, you can write data in the original Feather format by setting the `version` parameter to `1`.
```{r, write_feather1}
# Create table
my_table <- Table$create(data.frame(group = c("A", "B", "C"), score = c(99, 97, 99)))
# Write to Feather format V1
write_feather(mtcars, "my_table.feather", version = 1)
```
```{r, test_write_feather1, opts.label = "test"}
test_that("write_feather1 chunk works as expected", {
expect_true(file.exists("my_table.feather"))
})
unlink("my_table.feather")
```
## Read a Feather file
You want to read a Feather file.
### Solution
```{r, read_feather}
my_feather_tbl <- read_feather("my_table.arrow")
```
```{r, test_read_feather, opts.label = "test"}
test_that("read_feather chunk works as expected", {
expect_identical(as.data.frame(my_feather_tbl), data.frame(group = c("A", "B", "C"), score = c(99, 97, 99)))
})
unlink("my_table.arrow")
```
## Write streaming IPC files
You want to write to the IPC stream format.
### Solution
```{r, write_ipc_stream}
# Create table
my_table <- Table$create(
data.frame(
group = c("A", "B", "C"),
score = c(99, 97, 99)
)
)
# Write to IPC stream format
write_ipc_stream(my_table, "my_table.arrows")
```
```{r, test_write_ipc_stream, opts.label = "test"}
test_that("write_ipc_stream chunk works as expected", {
expect_true(file.exists("my_table.arrows"))
})
```
## Read streaming IPC files
You want to read from the IPC stream format.
### Solution
```{r, read_ipc_stream}
my_ipc_stream <- arrow::read_ipc_stream("my_table.arrows")
```
```{r, test_read_ipc_stream, opts.label = "test"}
test_that("read_ipc_stream chunk works as expected", {
expect_equal(
my_ipc_stream,
data.frame(group = c("A", "B", "C"), score = c(99, 97, 99))
)
})
unlink("my_table.arrows")
```
## Read CSV files
You want to write Arrow data to a CSV file.
### Solution
```{r, write_csv_arrow}
write_csv_arrow(cars, "cars.csv")
```
```{r, test_write_csv_arrow, opts.label = "test"}
test_that("write_csv_arrow chunk works as expected", {
expect_true(file.exists("cars.csv"))
})
```
## Write CSV files
You want to read a CSV file.
### Solution
```{r, read_csv_arrow}
my_csv <- read_csv_arrow("cars.csv", as_data_frame = FALSE)
```
```{r, test_read_csv_arrow, opts.label = "test"}
test_that("read_csv_arrow chunk works as expected", {
expect_equivalent(as.data.frame(my_csv), cars)
})
unlink("cars.csv")
```
## Read JSON files
You want to read a JSON file.
### Solution
```{r, read_json_arrow}
# Create a file to read back in
tf <- tempfile()
writeLines('
{"country": "United Kingdom", "code": "GB", "long": -3.44, "lat": 55.38}
{"country": "France", "code": "FR", "long": 2.21, "lat": 46.23}
{"country": "Germany", "code": "DE", "long": 10.45, "lat": 51.17}
', tf, useBytes = TRUE)
# Read in the data
countries <- read_json_arrow(tf, col_select = c("country", "long", "lat"))
countries
```
```{r, test_read_json_arrow, opts.label = "test"}
test_that("read_json_arrow chunk works as expected", {
expect_equivalent(
countries,
data.frame(
country = c("United Kingdom", "France", "Germany"),
long = c(-3.44, 2.21, 10.45),
lat = c(55.38, 46.23, 51.17)
)
)
})
unlink(tf)
```
## Write partitioned data
You want to save data to disk in partitions based on columns in the data.
### Solution
```{r, write_dataset}
write_dataset(airquality, "airquality_partitioned", partitioning = c("Month", "Day"))
list.files("airquality_partitioned")
```
```{r, test_write_dataset, opts.label = "test"}
test_that("write_dataset chunk works as expected", {
# Partition by month
expect_identical(list.files("airquality_partitioned"), c("Month=5", "Month=6", "Month=7", "Month=8", "Month=9"))
# We have enough files
expect_equal(length(list.files("airquality_partitioned", recursive = TRUE)), 153)
})
```
As you can see, this has created folders based on the first partition variable supplied, `Month`.
If you take a look in one of these folders, you will see that the data is then partitioned by the second partition variable, `Day`.
```{r}
list.files("airquality_partitioned/Month=5")
```
Each of these folders contains 1 or more Parquet files containing the relevant partition of the data.
```{r}
list.files("airquality_partitioned/Month=5/Day=10")
```
## Read partitioned data
You want to read partitioned data.
### Solution
```{r, open_dataset}
# Read data from directory
air_data <- open_dataset("airquality_partitioned")
# View data
air_data
```
```{r, test_open_dataset, opts.label = "test"}
test_that("open_dataset chunk works as expected", {
expect_equal(nrow(air_data), 153)
expect_equal(arrange(collect(air_data), Month, Day), arrange(airquality, Month, Day), ignore_attr = TRUE)
})
```
```{r}
unlink("airquality_partitioned", recursive = TRUE)
```
```{r, include = FALSE}
# cleanup
unlink("my_table.arrow")
unlink("my_table.arrows")
unlink("cars.csv")
unlink("my_table.feather")
unlink("my_table.parquet")
unlink("dist_time.parquet")
unlink("airquality_partitioned", recursive = TRUE)
```
## Write compressed data
You want to save a file, compressed with a specified compression algorithm.
### Solution
```{r, parquet_gzip}
# Create a temporary directory
td <- tempfile()
dir.create(td)
# Write data compressed with the gzip algorithm instead of the default
write_parquet(iris, file.path(td, "iris.parquet"), compression = "gzip")
```
```{r, test_parquet_gzip, opts.label = "test"}
test_that("parquet_gzip", {
file.exists(file.path(td, "iris.parquet"))
})
```
### Discussion
Note that `write_parquet()` by default already uses compression. See
`default_parquet_compression()` to see what the default configured on your
machine is.
You can also supply the `compression` argument to `write_dataset()`, as long as
the compression algorithm is compatible with the chosen format.
```{r, dataset_gzip}
# Create a temporary directory
td <- tempfile()
dir.create(td)
# Write dataset to file
write_dataset(iris, path = td, compression = "gzip")
```
```{r}
# View files in the directory
list.files(td, recursive = TRUE)
```
```{r, test_dataset_gzip, opts.label = "test"}
test_that("dataset_gzip", {
file.exists(file.path(td, "part-0.parquet"))
})
```
### See also
Some formats write compressed data by default. For more information
on the supported compression algorithms and default settings, see:
* `?write_parquet()`
* `?write_feather()`
* `?write_dataset()`
## Read compressed data
You want to read in data which has been compressed.
### Solution
```{r, read_parquet_compressed}
# Create a temporary directory
td <- tempfile()
dir.create(td)
# Write dataset which is to be read back in
write_parquet(iris, file.path(td, "iris.parquet"), compression = "gzip")
# Read in data
ds <- read_parquet(file.path(td, "iris.parquet")) %>%
collect()
ds
```
```{r, test_read_parquet_compressed, opts.label = "test"}
test_that("read_parquet_compressed", {
expect_s3_class(ds, "data.frame")
expect_named(
ds,
c("Sepal.Length", "Sepal.Width", "Petal.Length", "Petal.Width", "Species")
)
})
```
### Discussion
Note that Arrow automatically detects the compression and you do not have to
supply it in the call to `open_dataset()` or the `read_*()` functions.
Although the CSV format does not support compression itself, Arrow supports
reading in CSV data which has been compressed, if the file extension is `.gz`.
```{r, read_compressed_csv}
# Create a temporary directory
td <- tempfile()
dir.create(td)
# Write dataset which is to be read back in
write.csv(iris, gzfile(file.path(td, "iris.csv.gz")), row.names = FALSE, quote = FALSE)
# Read in data
ds <- open_dataset(td, format = "csv") %>%
collect()
ds
```
```{r, test_read_compressed_csv, opts.label = "test"}
test_that("read_compressed_csv", {
expect_s3_class(ds, "data.frame")
expect_named(
ds,
c("Sepal.Length", "Sepal.Width", "Petal.Length", "Petal.Width", "Species")
)
})
```