blob: ef097b300f397fe5e221ee41b359c0db944656d4 [file] [log] [blame]
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Reading and Writing Data
## Introduction
This chapter contains recipes related to reading and writing data using Apache
Arrow. When reading files into R using Apache Arrow, you can choose to read in
your file as either a data frame or as an Arrow Table object.
There are a number of circumstances in which you may want to read in the data as an Arrow Table:
* your dataset is large and if you load it into memory, it may lead to performance issues
* you want faster performance from your `dplyr` queries
* you want to be able to take advantage of Arrow's compute functions
## Convert from a data frame to an Arrow Table
You want to convert an existing `data.frame` or `tibble` object into an Arrow Table.
### Solution
```{r, table_create_from_df}
air_table <- arrow_table(airquality)
air_table
```
```{r, test_table_create_from_df, opts.label = "test"}
test_that("table_create_from_df chunk works as expected", {
expect_s3_class(air_table, "Table")
})
```
## Convert data from an Arrow Table to a data frame
You want to convert an Arrow Table to a data frame to view the data or work with it
in your usual analytics pipeline.
### Solution
```{r, asdf_table}
air_df <- as.data.frame(air_table)
air_df
```
```{r, test_asdf_table, opts.label = "test"}
test_that("asdf_table chunk works as expected", {
expect_identical(air_df, airquality)
})
```
### Discussion
You can use either `as.data.frame()` or `dplyr::collect()` to do this.
## Write a Parquet file
You want to write Parquet files to disk.
### Solution
```{r, write_parquet}
# Create table
my_table <- arrow_table(data.frame(group = c("A", "B", "C"), score = c(99, 97, 99)))
# Write to Parquet
write_parquet(my_table, "my_table.parquet")
```
```{r, test_write_parquet, opts.label = "test"}
test_that("write_parquet chunk works as expected", {
expect_true(file.exists("my_table.parquet"))
})
```
## Read a Parquet file
You want to read a Parquet file.
### Solution
```{r, read_parquet}
parquet_tbl <- read_parquet("my_table.parquet")
parquet_tbl
```
```{r, test_read_parquet, opts.label = "test"}
test_that("read_parquet works as expected", {
expect_identical(parquet_tbl, data.frame(group = c("A", "B", "C"), score = c(99, 97, 99)))
})
```
As the argument `as_data_frame` was left set to its default value of `TRUE`, the file was read in as a `data.frame` object.
```{r, read_parquet_2}
class(parquet_tbl)
```
```{r, test_read_parquet_2, opts.label = "test"}
test_that("read_parquet_2 works as expected", {
expect_s3_class(parquet_tbl, "data.frame")
})
```
### Discussion
If you set `as_data_frame` to `FALSE`, the file will be read in as an Arrow Table.
```{r, read_parquet_table}
my_table_arrow <- read_parquet("my_table.parquet", as_data_frame = FALSE)
my_table_arrow
```
```{r, read_parquet_table_class}
class(my_table_arrow)
```
```{r, test_read_parquet_table_class, opts.label = "test"}
test_that("read_parquet_table_class works as expected", {
expect_s3_class(my_table_arrow, "Table")
})
```
## Read a Parquet file from S3
You want to read a Parquet file from S3.
### Solution
```{r, read_parquet_s3, eval = FALSE}
df <- read_parquet(file = "s3://ursa-labs-taxi-data/2019/06/data.parquet")
```
### See also
For more in-depth instructions, including how to work with S3 buckets which require authentication, you can find a guide to reading and writing to/from S3 buckets here: https://arrow.apache.org/docs/r/articles/fs.html.
## Filter columns while reading a Parquet file
You want to specify which columns to include when reading in a Parquet file.
### Solution
```{r, read_parquet_filter}
# Create table to read back in
dist_time <- arrow_table(data.frame(distance = c(12.2, 15.7, 14.2), time = c(43, 44, 40)))
# Write to Parquet
write_parquet(dist_time, "dist_time.parquet")
# Read in only the "time" column
time_only <- read_parquet("dist_time.parquet", col_select = "time")
time_only
```
```{r, test_read_parquet_filter, opts.label = "test"}
test_that("read_parquet_filter works as expected", {
expect_identical(time_only, data.frame(time = c(43, 44, 40)))
})
```
## Write an IPC/Feather V2 file
You want to read in a Feather file.
### Solution
```{r, write_feather}
my_table <- arrow_table(data.frame(group = c("A", "B", "C"), score = c(99, 97, 99)))
write_feather(my_table, "my_table.arrow")
```
```{r, test_write_feather, opts.label = "test"}
test_that("write_feather chunk works as expected", {
expect_true(file.exists("my_table.arrow"))
})
```
### Discussion
For legacy support, you can write data in the original Feather format by setting the `version` parameter to `1`.
```{r, write_feather1}
# Create table
my_table <- arrow_table(data.frame(group = c("A", "B", "C"), score = c(99, 97, 99)))
# Write to Feather format V1
write_feather(mtcars, "my_table.feather", version = 1)
```
```{r, test_write_feather1, opts.label = "test"}
test_that("write_feather1 chunk works as expected", {
expect_true(file.exists("my_table.feather"))
})
unlink("my_table.feather")
```
## Read a Feather file
You want to read a Feather file.
### Solution
```{r, read_feather}
my_feather_tbl <- read_feather("my_table.arrow")
```
```{r, test_read_feather, opts.label = "test"}
test_that("read_feather chunk works as expected", {
expect_identical(as.data.frame(my_feather_tbl), data.frame(group = c("A", "B", "C"), score = c(99, 97, 99)))
})
unlink("my_table.arrow")
```
## Write streaming IPC files
You want to write to the IPC stream format.
### Solution
```{r, write_ipc_stream}
# Create table
my_table <- arrow_table(
data.frame(
group = c("A", "B", "C"),
score = c(99, 97, 99)
)
)
# Write to IPC stream format
write_ipc_stream(my_table, "my_table.arrows")
```
```{r, test_write_ipc_stream, opts.label = "test"}
test_that("write_ipc_stream chunk works as expected", {
expect_true(file.exists("my_table.arrows"))
})
```
## Read streaming IPC files
You want to read from the IPC stream format.
### Solution
```{r, read_ipc_stream}
my_ipc_stream <- arrow::read_ipc_stream("my_table.arrows")
```
```{r, test_read_ipc_stream, opts.label = "test"}
test_that("read_ipc_stream chunk works as expected", {
expect_equal(
my_ipc_stream,
data.frame(group = c("A", "B", "C"), score = c(99, 97, 99))
)
})
unlink("my_table.arrows")
```
## Write CSV files
You want to write Arrow data to a CSV file.
### Solution
```{r, write_csv_arrow}
write_csv_arrow(cars, "cars.csv")
```
```{r, test_write_csv_arrow, opts.label = "test"}
test_that("write_csv_arrow chunk works as expected", {
expect_true(file.exists("cars.csv"))
})
```
## Read CSV files
You want to read a CSV file.
### Solution
```{r, read_csv_arrow}
my_csv <- read_csv_arrow("cars.csv", as_data_frame = FALSE)
```
```{r, test_read_csv_arrow, opts.label = "test"}
test_that("read_csv_arrow chunk works as expected", {
expect_equivalent(as.data.frame(my_csv), cars)
})
unlink("cars.csv")
```
## Read JSON files
You want to read a JSON file.
### Solution
```{r, read_json_arrow}
# Create a file to read back in
tf <- tempfile()
writeLines('
{"country": "United Kingdom", "code": "GB", "long": -3.44, "lat": 55.38}
{"country": "France", "code": "FR", "long": 2.21, "lat": 46.23}
{"country": "Germany", "code": "DE", "long": 10.45, "lat": 51.17}
', tf, useBytes = TRUE)
# Read in the data
countries <- read_json_arrow(tf, col_select = c("country", "long", "lat"))
countries
```
```{r, test_read_json_arrow, opts.label = "test"}
test_that("read_json_arrow chunk works as expected", {
expect_equivalent(
countries,
data.frame(
country = c("United Kingdom", "France", "Germany"),
long = c(-3.44, 2.21, 10.45),
lat = c(55.38, 46.23, 51.17)
)
)
})
unlink(tf)
```
## Write partitioned data
You want to save data to disk in partitions based on columns in the data.
### Solution
```{r, write_dataset}
write_dataset(airquality, "airquality_partitioned", partitioning = c("Month", "Day"))
list.files("airquality_partitioned")
```
```{r, test_write_dataset, opts.label = "test"}
test_that("write_dataset chunk works as expected", {
# Partition by month
expect_identical(list.files("airquality_partitioned"), c("Month=5", "Month=6", "Month=7", "Month=8", "Month=9"))
# We have enough files
expect_equal(length(list.files("airquality_partitioned", recursive = TRUE)), 153)
})
```
As you can see, this has created folders based on the first partition variable supplied, `Month`.
If you take a look in one of these folders, you will see that the data is then partitioned by the second partition variable, `Day`.
```{r}
list.files("airquality_partitioned/Month=5")
```
Each of these folders contains 1 or more Parquet files containing the relevant partition of the data.
```{r}
list.files("airquality_partitioned/Month=5/Day=10")
```
## Read partitioned data
You want to read partitioned data.
### Solution
```{r, open_dataset}
# Read data from directory
air_data <- open_dataset("airquality_partitioned")
# View data
air_data
```
```{r, test_open_dataset, opts.label = "test"}
test_that("open_dataset chunk works as expected", {
expect_equal(nrow(air_data), 153)
expect_equal(arrange(collect(air_data), Month, Day), arrange(airquality, Month, Day), ignore_attr = TRUE)
})
```
```{r}
unlink("airquality_partitioned", recursive = TRUE)
```
```{r, include = FALSE}
# cleanup
unlink("my_table.arrow")
unlink("my_table.arrows")
unlink("cars.csv")
unlink("my_table.feather")
unlink("my_table.parquet")
unlink("dist_time.parquet")
unlink("airquality_partitioned", recursive = TRUE)
```
## Write compressed data
You want to save a file, compressed with a specified compression algorithm.
### Solution
```{r, parquet_gzip}
# Create a temporary directory
td <- tempfile()
dir.create(td)
# Write data compressed with the gzip algorithm instead of the default
write_parquet(iris, file.path(td, "iris.parquet"), compression = "gzip")
```
```{r, test_parquet_gzip, opts.label = "test"}
test_that("parquet_gzip", {
file.exists(file.path(td, "iris.parquet"))
})
```
### Discussion
Note that `write_parquet()` by default already uses compression. See
`default_parquet_compression()` to see what the default configured on your
machine is.
You can also supply the `compression` argument to `write_dataset()`, as long as
the compression algorithm is compatible with the chosen format.
```{r, dataset_gzip}
# Create a temporary directory
td <- tempfile()
dir.create(td)
# Write dataset to file
write_dataset(iris, path = td, compression = "gzip")
```
```{r}
# View files in the directory
list.files(td, recursive = TRUE)
```
```{r, test_dataset_gzip, opts.label = "test"}
test_that("dataset_gzip", {
file.exists(file.path(td, "part-0.parquet"))
})
```
### See also
Some formats write compressed data by default. For more information
on the supported compression algorithms and default settings, see:
* `?write_parquet()`
* `?write_feather()`
* `?write_dataset()`
## Read compressed data
You want to read in data which has been compressed.
### Solution
```{r, read_parquet_compressed}
# Create a temporary directory
td <- tempfile()
dir.create(td)
# Write dataset which is to be read back in
write_parquet(iris, file.path(td, "iris.parquet"), compression = "gzip")
# Read in data
ds <- read_parquet(file.path(td, "iris.parquet")) %>%
collect()
ds
```
```{r, test_read_parquet_compressed, opts.label = "test"}
test_that("read_parquet_compressed", {
expect_s3_class(ds, "data.frame")
expect_named(
ds,
c("Sepal.Length", "Sepal.Width", "Petal.Length", "Petal.Width", "Species")
)
})
```
### Discussion
Note that Arrow automatically detects the compression and you do not have to
supply it in the call to `open_dataset()` or the `read_*()` functions.
Although the CSV format does not support compression itself, Arrow supports
reading in CSV data which has been compressed, if the file extension is `.gz`.
```{r, read_compressed_csv}
# Create a temporary directory
td <- tempfile()
dir.create(td)
# Write dataset which is to be read back in
write.csv(iris, gzfile(file.path(td, "iris.csv.gz")), row.names = FALSE, quote = FALSE)
# Read in data
ds <- open_dataset(td, format = "csv") %>%
collect()
ds
```
```{r, test_read_compressed_csv, opts.label = "test"}
test_that("read_compressed_csv", {
expect_s3_class(ds, "data.frame")
expect_named(
ds,
c("Sepal.Length", "Sepal.Width", "Petal.Length", "Petal.Width", "Species")
)
})
```