| <!--- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| # Reading and Writing Data |
| |
| ## Introduction |
| |
| This chapter contains recipes related to reading and writing data using Apache |
| Arrow. When reading files into R using Apache Arrow, you can choose to read in |
| your file as either a data frame or as an Arrow Table object. |
| |
| |
| There are a number of circumstances in which you may want to read in the data as an Arrow Table: |
| |
| * your dataset is large and if you load it into memory, it may lead to performance issues |
| * you want faster performance from your `dplyr` queries |
| * you want to be able to take advantage of Arrow's compute functions |
| |
| ## Convert from a data frame to an Arrow Table |
| |
| You want to convert an existing `data.frame` or `tibble` object into an Arrow Table. |
| |
| ### Solution |
| |
| ```{r, table_create_from_df} |
| air_table <- arrow_table(airquality) |
| air_table |
| ``` |
| ```{r, test_table_create_from_df, opts.label = "test"} |
| test_that("table_create_from_df chunk works as expected", { |
| expect_s3_class(air_table, "Table") |
| }) |
| ``` |
| |
| ## Convert data from an Arrow Table to a data frame |
| |
| You want to convert an Arrow Table to a data frame to view the data or work with it |
| in your usual analytics pipeline. |
| |
| ### Solution |
| |
| ```{r, asdf_table} |
| air_df <- as.data.frame(air_table) |
| air_df |
| ``` |
| ```{r, test_asdf_table, opts.label = "test"} |
| test_that("asdf_table chunk works as expected", { |
| expect_identical(air_df, airquality) |
| }) |
| ``` |
| |
| ### Discussion |
| |
| You can use either `as.data.frame()` or `dplyr::collect()` to do this. |
| |
| ## Write a Parquet file |
| |
| You want to write Parquet files to disk. |
| |
| ### Solution |
| |
| ```{r, write_parquet} |
| # Create table |
| my_table <- arrow_table(data.frame(group = c("A", "B", "C"), score = c(99, 97, 99))) |
| # Write to Parquet |
| write_parquet(my_table, "my_table.parquet") |
| ``` |
| ```{r, test_write_parquet, opts.label = "test"} |
| test_that("write_parquet chunk works as expected", { |
| expect_true(file.exists("my_table.parquet")) |
| }) |
| ``` |
| |
| ## Read a Parquet file |
| |
| You want to read a Parquet file. |
| |
| ### Solution |
| |
| ```{r, read_parquet} |
| parquet_tbl <- read_parquet("my_table.parquet") |
| parquet_tbl |
| ``` |
| ```{r, test_read_parquet, opts.label = "test"} |
| test_that("read_parquet works as expected", { |
| expect_identical(parquet_tbl, data.frame(group = c("A", "B", "C"), score = c(99, 97, 99))) |
| }) |
| ``` |
| |
| As the argument `as_data_frame` was left set to its default value of `TRUE`, the file was read in as a `data.frame` object. |
| |
| ```{r, read_parquet_2} |
| class(parquet_tbl) |
| ``` |
| ```{r, test_read_parquet_2, opts.label = "test"} |
| test_that("read_parquet_2 works as expected", { |
| expect_s3_class(parquet_tbl, "data.frame") |
| }) |
| ``` |
| |
| ### Discussion |
| |
| If you set `as_data_frame` to `FALSE`, the file will be read in as an Arrow Table. |
| |
| ```{r, read_parquet_table} |
| my_table_arrow <- read_parquet("my_table.parquet", as_data_frame = FALSE) |
| my_table_arrow |
| ``` |
| |
| ```{r, read_parquet_table_class} |
| class(my_table_arrow) |
| ``` |
| ```{r, test_read_parquet_table_class, opts.label = "test"} |
| test_that("read_parquet_table_class works as expected", { |
| expect_s3_class(my_table_arrow, "Table") |
| }) |
| ``` |
| |
| ## Read a Parquet file from S3 |
| |
| You want to read a Parquet file from S3. |
| |
| ### Solution |
| |
| ```{r, read_parquet_s3, eval = FALSE} |
| df <- read_parquet(file = "s3://ursa-labs-taxi-data/2019/06/data.parquet") |
| ``` |
| |
| ### See also |
| |
| For more in-depth instructions, including how to work with S3 buckets which require authentication, you can find a guide to reading and writing to/from S3 buckets here: https://arrow.apache.org/docs/r/articles/fs.html. |
| |
| ## Filter columns while reading a Parquet file |
| |
| You want to specify which columns to include when reading in a Parquet file. |
| |
| ### Solution |
| |
| ```{r, read_parquet_filter} |
| # Create table to read back in |
| dist_time <- arrow_table(data.frame(distance = c(12.2, 15.7, 14.2), time = c(43, 44, 40))) |
| # Write to Parquet |
| write_parquet(dist_time, "dist_time.parquet") |
| |
| # Read in only the "time" column |
| time_only <- read_parquet("dist_time.parquet", col_select = "time") |
| time_only |
| ``` |
| ```{r, test_read_parquet_filter, opts.label = "test"} |
| test_that("read_parquet_filter works as expected", { |
| expect_identical(time_only, data.frame(time = c(43, 44, 40))) |
| }) |
| ``` |
| |
| ## Write an IPC/Feather V2 file |
| |
| You want to read in a Feather file. |
| |
| ### Solution |
| |
| ```{r, write_feather} |
| my_table <- arrow_table(data.frame(group = c("A", "B", "C"), score = c(99, 97, 99))) |
| write_feather(my_table, "my_table.arrow") |
| ``` |
| ```{r, test_write_feather, opts.label = "test"} |
| test_that("write_feather chunk works as expected", { |
| expect_true(file.exists("my_table.arrow")) |
| }) |
| ``` |
| ### Discussion |
| |
| For legacy support, you can write data in the original Feather format by setting the `version` parameter to `1`. |
| |
| ```{r, write_feather1} |
| # Create table |
| my_table <- arrow_table(data.frame(group = c("A", "B", "C"), score = c(99, 97, 99))) |
| # Write to Feather format V1 |
| write_feather(mtcars, "my_table.feather", version = 1) |
| ``` |
| ```{r, test_write_feather1, opts.label = "test"} |
| test_that("write_feather1 chunk works as expected", { |
| expect_true(file.exists("my_table.feather")) |
| }) |
| |
| unlink("my_table.feather") |
| ``` |
| |
| ## Read a Feather file |
| |
| You want to read a Feather file. |
| |
| ### Solution |
| |
| ```{r, read_feather} |
| my_feather_tbl <- read_feather("my_table.arrow") |
| ``` |
| ```{r, test_read_feather, opts.label = "test"} |
| test_that("read_feather chunk works as expected", { |
| expect_identical(as.data.frame(my_feather_tbl), data.frame(group = c("A", "B", "C"), score = c(99, 97, 99))) |
| }) |
| unlink("my_table.arrow") |
| ``` |
| |
| ## Write streaming IPC files |
| |
| You want to write to the IPC stream format. |
| |
| ### Solution |
| |
| ```{r, write_ipc_stream} |
| # Create table |
| my_table <- arrow_table( |
| data.frame( |
| group = c("A", "B", "C"), |
| score = c(99, 97, 99) |
| ) |
| ) |
| # Write to IPC stream format |
| write_ipc_stream(my_table, "my_table.arrows") |
| ``` |
| ```{r, test_write_ipc_stream, opts.label = "test"} |
| test_that("write_ipc_stream chunk works as expected", { |
| expect_true(file.exists("my_table.arrows")) |
| }) |
| ``` |
| |
| ## Read streaming IPC files |
| |
| You want to read from the IPC stream format. |
| |
| ### Solution |
| ```{r, read_ipc_stream} |
| my_ipc_stream <- arrow::read_ipc_stream("my_table.arrows") |
| ``` |
| ```{r, test_read_ipc_stream, opts.label = "test"} |
| test_that("read_ipc_stream chunk works as expected", { |
| expect_equal( |
| my_ipc_stream, |
| data.frame(group = c("A", "B", "C"), score = c(99, 97, 99)) |
| ) |
| }) |
| unlink("my_table.arrows") |
| ``` |
| |
| ## Write CSV files |
| |
| You want to write Arrow data to a CSV file. |
| |
| ### Solution |
| |
| ```{r, write_csv_arrow} |
| write_csv_arrow(cars, "cars.csv") |
| ``` |
| ```{r, test_write_csv_arrow, opts.label = "test"} |
| test_that("write_csv_arrow chunk works as expected", { |
| expect_true(file.exists("cars.csv")) |
| }) |
| ``` |
| |
| ## Read CSV files |
| |
| You want to read a CSV file. |
| |
| ### Solution |
| |
| ```{r, read_csv_arrow} |
| my_csv <- read_csv_arrow("cars.csv", as_data_frame = FALSE) |
| ``` |
| |
| ```{r, test_read_csv_arrow, opts.label = "test"} |
| test_that("read_csv_arrow chunk works as expected", { |
| expect_equivalent(as.data.frame(my_csv), cars) |
| }) |
| unlink("cars.csv") |
| ``` |
| |
| ## Read JSON files |
| |
| You want to read a JSON file. |
| |
| ### Solution |
| |
| ```{r, read_json_arrow} |
| # Create a file to read back in |
| tf <- tempfile() |
| writeLines(' |
| {"country": "United Kingdom", "code": "GB", "long": -3.44, "lat": 55.38} |
| {"country": "France", "code": "FR", "long": 2.21, "lat": 46.23} |
| {"country": "Germany", "code": "DE", "long": 10.45, "lat": 51.17} |
| ', tf, useBytes = TRUE) |
| |
| # Read in the data |
| countries <- read_json_arrow(tf, col_select = c("country", "long", "lat")) |
| countries |
| ``` |
| ```{r, test_read_json_arrow, opts.label = "test"} |
| test_that("read_json_arrow chunk works as expected", { |
| expect_equivalent( |
| countries, |
| data.frame( |
| country = c("United Kingdom", "France", "Germany"), |
| long = c(-3.44, 2.21, 10.45), |
| lat = c(55.38, 46.23, 51.17) |
| ) |
| ) |
| }) |
| unlink(tf) |
| ``` |
| |
| ## Write partitioned data |
| |
| You want to save data to disk in partitions based on columns in the data. |
| |
| ### Solution |
| |
| ```{r, write_dataset} |
| write_dataset(airquality, "airquality_partitioned", partitioning = c("Month", "Day")) |
| list.files("airquality_partitioned") |
| ``` |
| ```{r, test_write_dataset, opts.label = "test"} |
| test_that("write_dataset chunk works as expected", { |
| # Partition by month |
| expect_identical(list.files("airquality_partitioned"), c("Month=5", "Month=6", "Month=7", "Month=8", "Month=9")) |
| # We have enough files |
| expect_equal(length(list.files("airquality_partitioned", recursive = TRUE)), 153) |
| }) |
| ``` |
| As you can see, this has created folders based on the first partition variable supplied, `Month`. |
| |
| If you take a look in one of these folders, you will see that the data is then partitioned by the second partition variable, `Day`. |
| |
| ```{r} |
| list.files("airquality_partitioned/Month=5") |
| ``` |
| |
| Each of these folders contains 1 or more Parquet files containing the relevant partition of the data. |
| |
| ```{r} |
| list.files("airquality_partitioned/Month=5/Day=10") |
| ``` |
| |
| ## Read partitioned data |
| |
| You want to read partitioned data. |
| |
| ### Solution |
| |
| ```{r, open_dataset} |
| # Read data from directory |
| air_data <- open_dataset("airquality_partitioned") |
| |
| # View data |
| air_data |
| ``` |
| ```{r, test_open_dataset, opts.label = "test"} |
| test_that("open_dataset chunk works as expected", { |
| expect_equal(nrow(air_data), 153) |
| expect_equal(arrange(collect(air_data), Month, Day), arrange(airquality, Month, Day), ignore_attr = TRUE) |
| }) |
| ``` |
| |
| ```{r} |
| unlink("airquality_partitioned", recursive = TRUE) |
| ``` |
| |
| ```{r, include = FALSE} |
| # cleanup |
| unlink("my_table.arrow") |
| unlink("my_table.arrows") |
| unlink("cars.csv") |
| unlink("my_table.feather") |
| unlink("my_table.parquet") |
| unlink("dist_time.parquet") |
| unlink("airquality_partitioned", recursive = TRUE) |
| ``` |
| |
| ## Write compressed data |
| |
| You want to save a file, compressed with a specified compression algorithm. |
| |
| ### Solution |
| |
| ```{r, parquet_gzip} |
| # Create a temporary directory |
| td <- tempfile() |
| dir.create(td) |
| |
| # Write data compressed with the gzip algorithm instead of the default |
| write_parquet(iris, file.path(td, "iris.parquet"), compression = "gzip") |
| ``` |
| |
| ```{r, test_parquet_gzip, opts.label = "test"} |
| test_that("parquet_gzip", { |
| file.exists(file.path(td, "iris.parquet")) |
| }) |
| ``` |
| |
| ### Discussion |
| |
| Note that `write_parquet()` by default already uses compression. See |
| `default_parquet_compression()` to see what the default configured on your |
| machine is. |
| |
| You can also supply the `compression` argument to `write_dataset()`, as long as |
| the compression algorithm is compatible with the chosen format. |
| |
| ```{r, dataset_gzip} |
| # Create a temporary directory |
| td <- tempfile() |
| dir.create(td) |
| |
| # Write dataset to file |
| write_dataset(iris, path = td, compression = "gzip") |
| ``` |
| |
| ```{r} |
| # View files in the directory |
| list.files(td, recursive = TRUE) |
| ``` |
| ```{r, test_dataset_gzip, opts.label = "test"} |
| test_that("dataset_gzip", { |
| file.exists(file.path(td, "part-0.parquet")) |
| }) |
| ``` |
| |
| ### See also |
| |
| Some formats write compressed data by default. For more information |
| on the supported compression algorithms and default settings, see: |
| |
| * `?write_parquet()` |
| * `?write_feather()` |
| * `?write_dataset()` |
| |
| ## Read compressed data |
| |
| You want to read in data which has been compressed. |
| |
| ### Solution |
| |
| ```{r, read_parquet_compressed} |
| # Create a temporary directory |
| td <- tempfile() |
| dir.create(td) |
| |
| # Write dataset which is to be read back in |
| write_parquet(iris, file.path(td, "iris.parquet"), compression = "gzip") |
| |
| # Read in data |
| ds <- read_parquet(file.path(td, "iris.parquet")) %>% |
| collect() |
| |
| ds |
| ``` |
| |
| ```{r, test_read_parquet_compressed, opts.label = "test"} |
| test_that("read_parquet_compressed", { |
| expect_s3_class(ds, "data.frame") |
| expect_named( |
| ds, |
| c("Sepal.Length", "Sepal.Width", "Petal.Length", "Petal.Width", "Species") |
| ) |
| }) |
| ``` |
| |
| ### Discussion |
| |
| Note that Arrow automatically detects the compression and you do not have to |
| supply it in the call to `open_dataset()` or the `read_*()` functions. |
| |
| Although the CSV format does not support compression itself, Arrow supports |
| reading in CSV data which has been compressed, if the file extension is `.gz`. |
| |
| ```{r, read_compressed_csv} |
| # Create a temporary directory |
| td <- tempfile() |
| dir.create(td) |
| |
| # Write dataset which is to be read back in |
| write.csv(iris, gzfile(file.path(td, "iris.csv.gz")), row.names = FALSE, quote = FALSE) |
| |
| # Read in data |
| ds <- open_dataset(td, format = "csv") %>% |
| collect() |
| ds |
| ``` |
| |
| ```{r, test_read_compressed_csv, opts.label = "test"} |
| test_that("read_compressed_csv", { |
| expect_s3_class(ds, "data.frame") |
| expect_named( |
| ds, |
| c("Sepal.Length", "Sepal.Width", "Petal.Length", "Petal.Width", "Species") |
| ) |
| }) |
| ``` |
| |
| |