blob: 4570c1f5762941e80f7094782275e7fd16e6bdd6 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
skip_if_not_available("dataset")
context("Dataset")
library(dplyr)
make_temp_dir <- function() {
path <- tempfile()
dir.create(path)
normalizePath(path, winslash = "/")
}
dataset_dir <- make_temp_dir()
hive_dir <- make_temp_dir()
ipc_dir <- make_temp_dir()
csv_dir <- make_temp_dir()
tsv_dir <- make_temp_dir()
first_date <- lubridate::ymd_hms("2015-04-29 03:12:39")
df1 <- tibble(
int = 1:10,
dbl = as.numeric(1:10),
lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 2),
chr = letters[1:10],
fct = factor(LETTERS[1:10]),
ts = first_date + lubridate::days(1:10)
)
second_date <- lubridate::ymd_hms("2017-03-09 07:01:02")
df2 <- tibble(
int = 101:110,
dbl = c(as.numeric(51:59), NaN),
lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 2),
chr = letters[10:1],
fct = factor(LETTERS[10:1]),
ts = second_date + lubridate::days(10:1)
)
test_that("Setup (putting data in the dir)", {
if (arrow_with_parquet()) {
dir.create(file.path(dataset_dir, 1))
dir.create(file.path(dataset_dir, 2))
write_parquet(df1, file.path(dataset_dir, 1, "file1.parquet"))
write_parquet(df2, file.path(dataset_dir, 2, "file2.parquet"))
expect_length(dir(dataset_dir, recursive = TRUE), 2)
dir.create(file.path(hive_dir, "subdir", "group=1", "other=xxx"), recursive = TRUE)
dir.create(file.path(hive_dir, "subdir", "group=2", "other=yyy"), recursive = TRUE)
write_parquet(df1, file.path(hive_dir, "subdir", "group=1", "other=xxx", "file1.parquet"))
write_parquet(df2, file.path(hive_dir, "subdir", "group=2", "other=yyy", "file2.parquet"))
expect_length(dir(hive_dir, recursive = TRUE), 2)
}
# Now, an IPC format dataset
dir.create(file.path(ipc_dir, 3))
dir.create(file.path(ipc_dir, 4))
write_feather(df1, file.path(ipc_dir, 3, "file1.arrow"))
write_feather(df2, file.path(ipc_dir, 4, "file2.arrow"))
expect_length(dir(ipc_dir, recursive = TRUE), 2)
# Now, CSV
dir.create(file.path(csv_dir, 5))
dir.create(file.path(csv_dir, 6))
write.csv(df1, file.path(csv_dir, 5, "file1.csv"), row.names = FALSE)
write.csv(df2, file.path(csv_dir, 6, "file2.csv"), row.names = FALSE)
expect_length(dir(csv_dir, recursive = TRUE), 2)
# Now, tab-delimited
dir.create(file.path(tsv_dir, 5))
dir.create(file.path(tsv_dir, 6))
write.table(df1, file.path(tsv_dir, 5, "file1.tsv"), row.names = FALSE, sep = "\t")
write.table(df2, file.path(tsv_dir, 6, "file2.tsv"), row.names = FALSE, sep = "\t")
expect_length(dir(tsv_dir, recursive = TRUE), 2)
})
if(arrow_with_parquet()) {
files <- c(
file.path(dataset_dir, 1, "file1.parquet", fsep = "/"),
file.path(dataset_dir, 2, "file2.parquet", fsep = "/")
)
}
test_that("Simple interface for datasets", {
skip_if_not_available("parquet")
ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()))
expect_r6_class(ds$format, "ParquetFileFormat")
expect_r6_class(ds$filesystem, "LocalFileSystem")
expect_r6_class(ds, "Dataset")
expect_equivalent(
ds %>%
select(chr, dbl) %>%
filter(dbl > 7 & dbl < 53L) %>% # Testing the auto-casting of scalars
collect() %>%
arrange(dbl),
rbind(
df1[8:10, c("chr", "dbl")],
df2[1:2, c("chr", "dbl")]
)
)
expect_equivalent(
ds %>%
select(string = chr, integer = int, part) %>%
filter(integer > 6 & part == 1) %>% # 6 not 6L to test autocasting
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
# Collecting virtual partition column works
expect_equal(
collect(ds) %>% pull(part),
c(rep(1, 10), rep(2, 10))
)
})
test_that("dim method returns the correct number of rows and columns", {
skip_if_not_available("parquet")
ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()))
expect_identical(dim(ds), c(20L, 7L))
})
test_that("dim() correctly determine numbers of rows and columns on arrow_dplyr_query object", {
skip_if_not_available("parquet")
ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()))
expect_warning(
expect_identical(
ds %>%
filter(chr == 'a') %>%
dim(),
c(NA, 7L)
)
)
expect_equal(
ds %>%
select(chr, fct, int) %>%
dim(),
c(20L, 3L)
)
expect_warning(
expect_identical(
ds %>%
select(chr, fct, int) %>%
filter(chr == 'a') %>%
dim(),
c(NA, 3L)
)
)
})
test_that("dataset from single local file path", {
skip_on_os("windows")
skip_if_not_available("parquet")
ds <- open_dataset(files[1])
expect_is(ds, "Dataset")
expect_equivalent(
ds %>%
select(chr, dbl) %>%
filter(dbl > 7) %>%
collect() %>%
arrange(dbl),
df1[8:10, c("chr", "dbl")]
)
})
test_that("dataset from vector of file paths", {
skip_on_os("windows")
skip_if_not_available("parquet")
ds <- open_dataset(files)
expect_is(ds, "Dataset")
expect_equivalent(
ds %>%
select(chr, dbl) %>%
filter(dbl > 7 & dbl < 53L) %>%
collect() %>%
arrange(dbl),
rbind(
df1[8:10, c("chr", "dbl")],
df2[1:2, c("chr", "dbl")]
)
)
})
test_that("dataset from directory URI", {
skip_on_os("windows")
skip_if_not_available("parquet")
uri <- paste0("file://", dataset_dir)
ds <- open_dataset(uri, partitioning = schema(part = uint8()))
expect_r6_class(ds, "Dataset")
expect_equivalent(
ds %>%
select(chr, dbl) %>%
filter(dbl > 7 & dbl < 53L) %>%
collect() %>%
arrange(dbl),
rbind(
df1[8:10, c("chr", "dbl")],
df2[1:2, c("chr", "dbl")]
)
)
})
test_that("dataset from single file URI", {
skip_on_os("windows")
skip_if_not_available("parquet")
uri <- paste0("file://", files[1])
ds <- open_dataset(uri)
expect_is(ds, "Dataset")
expect_equivalent(
ds %>%
select(chr, dbl) %>%
filter(dbl > 7) %>%
collect() %>%
arrange(dbl),
df1[8:10, c("chr", "dbl")]
)
})
test_that("dataset from vector of file URIs", {
skip_on_os("windows")
skip_if_not_available("parquet")
uris <- paste0("file://", files)
ds <- open_dataset(uris)
expect_is(ds, "Dataset")
expect_equivalent(
ds %>%
select(chr, dbl) %>%
filter(dbl > 7 & dbl < 53L) %>%
collect() %>%
arrange(dbl),
rbind(
df1[8:10, c("chr", "dbl")],
df2[1:2, c("chr", "dbl")]
)
)
})
test_that("open_dataset errors on mixed paths and URIs", {
skip_on_os("windows")
skip_if_not_available("parquet")
expect_error(
open_dataset(c(files[1], paste0("file://", files[2]))),
"Vectors of mixed paths and URIs are not supported"
)
})
test_that("Simple interface for datasets (custom ParquetFileFormat)", {
skip_if_not_available("parquet")
ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()),
format = FileFormat$create("parquet", dict_columns = c("chr")))
expect_type_equal(ds$schema$GetFieldByName("chr")$type, dictionary())
})
test_that("Hive partitioning", {
skip_if_not_available("parquet")
ds <- open_dataset(hive_dir, partitioning = hive_partition(other = utf8(), group = uint8()))
expect_r6_class(ds, "Dataset")
expect_equivalent(
ds %>%
filter(group == 2) %>%
select(chr, dbl) %>%
filter(dbl > 7 & dbl < 53) %>%
collect() %>%
arrange(dbl),
df2[1:2, c("chr", "dbl")]
)
})
test_that("input validation", {
skip_if_not_available("parquet")
expect_error(
open_dataset(hive_dir, hive_partition(other = utf8(), group = uint8()))
)
})
test_that("Partitioning inference", {
skip_if_not_available("parquet")
# These are the same tests as above, just using the *PartitioningFactory
ds1 <- open_dataset(dataset_dir, partitioning = "part")
expect_identical(names(ds1), c(names(df1), "part"))
expect_equivalent(
ds1 %>%
select(string = chr, integer = int, part) %>%
filter(integer > 6 & part == 1) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
ds2 <- open_dataset(hive_dir)
expect_identical(names(ds2), c(names(df1), "group", "other"))
expect_equivalent(
ds2 %>%
filter(group == 2) %>%
select(chr, dbl) %>%
filter(dbl > 7 & dbl < 53) %>%
collect() %>%
arrange(dbl),
df2[1:2, c("chr", "dbl")]
)
})
test_that("IPC/Feather format data", {
ds <- open_dataset(ipc_dir, partitioning = "part", format = "feather")
expect_r6_class(ds$format, "IpcFileFormat")
expect_r6_class(ds$filesystem, "LocalFileSystem")
expect_identical(names(ds), c(names(df1), "part"))
expect_warning(
expect_identical(dim(ds), c(NA, 7L))
)
expect_equivalent(
ds %>%
select(string = chr, integer = int, part) %>%
filter(integer > 6 & part == 3) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
# Collecting virtual partition column works
expect_equal(
collect(ds) %>% pull(part),
c(rep(3, 10), rep(4, 10))
)
})
test_that("CSV dataset", {
ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
expect_r6_class(ds$format, "CsvFileFormat")
expect_r6_class(ds$filesystem, "LocalFileSystem")
expect_identical(names(ds), c(names(df1), "part"))
expect_warning(
expect_identical(dim(ds), c(NA, 7L))
)
expect_equivalent(
ds %>%
select(string = chr, integer = int, part) %>%
filter(integer > 6 & part == 5) %>%
collect() %>%
summarize(mean = mean(as.numeric(integer))), # as.numeric bc they're being parsed as int64
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
# Collecting virtual partition column works
expect_equal(
collect(ds) %>% pull(part),
c(rep(5, 10), rep(6, 10))
)
})
test_that("CSV scan options", {
options <- FragmentScanOptions$create("text")
expect_equal(options$type, "csv")
options <- FragmentScanOptions$create("csv",
null_values = c("mynull"),
strings_can_be_null = TRUE)
expect_equal(options$type, "csv")
dst_dir <- make_temp_dir()
dst_file <- file.path(dst_dir, "data.csv")
df <- tibble(chr = c("foo", "mynull"))
write.csv(df, dst_file, row.names = FALSE, quote = FALSE)
ds <- open_dataset(dst_dir, format = "csv")
expect_equivalent(ds %>% collect(), df)
sb <- ds$NewScan()
sb$FragmentScanOptions(options)
tab <- sb$Finish()$ToTable()
expect_equivalent(as.data.frame(tab), tibble(chr = c("foo", NA)))
# Set default convert options in CsvFileFormat
csv_format <- CsvFileFormat$create(null_values = c("mynull"),
strings_can_be_null = TRUE)
ds <- open_dataset(dst_dir, format = csv_format)
expect_equivalent(ds %>% collect(), tibble(chr = c("foo", NA)))
# Set both parse and convert options
df <- tibble(chr = c("foo", "mynull"), chr2 = c("bar", "baz"))
write.table(df, dst_file, row.names = FALSE, quote = FALSE, sep = "\t")
ds <- open_dataset(dst_dir, format = "csv",
delimiter="\t",
null_values = c("mynull"),
strings_can_be_null = TRUE)
expect_equivalent(ds %>% collect(), tibble(chr = c("foo", NA),
chr2 = c("bar", "baz")))
})
test_that("compressed CSV dataset", {
skip_if_not_available("gzip")
dst_dir <- make_temp_dir()
dst_file <- file.path(dst_dir, "data.csv.gz")
write.csv(df1, gzfile(dst_file), row.names = FALSE, quote = FALSE)
format <- FileFormat$create("csv")
ds <- open_dataset(dst_dir, format = format)
expect_r6_class(ds$format, "CsvFileFormat")
expect_r6_class(ds$filesystem, "LocalFileSystem")
expect_equivalent(
ds %>%
select(string = chr, integer = int) %>%
filter(integer > 6 & integer < 11) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
})
test_that("CSV dataset options", {
dst_dir <- make_temp_dir()
dst_file <- file.path(dst_dir, "data.csv")
df <- tibble(chr = letters[1:10])
write.csv(df, dst_file, row.names = FALSE, quote = FALSE)
format <- FileFormat$create("csv", skip_rows = 1)
ds <- open_dataset(dst_dir, format = format)
expect_equivalent(
ds %>%
select(string = a) %>%
collect(),
df1[-1,] %>%
select(string = chr)
)
ds <- open_dataset(dst_dir, format = "csv", column_names = c("foo"))
expect_equivalent(
ds %>%
select(string = foo) %>%
collect(),
tibble(foo = c(c('chr'), letters[1:10]))
)
})
test_that("Other text delimited dataset", {
ds1 <- open_dataset(tsv_dir, partitioning = "part", format = "tsv")
expect_equivalent(
ds1 %>%
select(string = chr, integer = int, part) %>%
filter(integer > 6 & part == 5) %>%
collect() %>%
summarize(mean = mean(as.numeric(integer))), # as.numeric bc they're being parsed as int64
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
ds2 <- open_dataset(tsv_dir, partitioning = "part", format = "text", delimiter = "\t")
expect_equivalent(
ds2 %>%
select(string = chr, integer = int, part) %>%
filter(integer > 6 & part == 5) %>%
collect() %>%
summarize(mean = mean(as.numeric(integer))), # as.numeric bc they're being parsed as int64
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
})
test_that("readr parse options", {
arrow_opts <- names(formals(CsvParseOptions$create))
readr_opts <- names(formals(readr_to_csv_parse_options))
# Arrow and readr parse options must be mutually exclusive, or else the code
# in `csv_file_format_parse_options()` will error or behave incorrectly. A
# failure of this test indicates that these two sets of option names are not
# mutually exclusive.
expect_equal(
intersect(arrow_opts, readr_opts),
character(0)
)
# With not yet supported readr parse options (ARROW-8631)
expect_error(
open_dataset(tsv_dir, partitioning = "part", delim = "\t", na = "\\N"),
"supported"
)
# With unrecognized (garbage) parse options
expect_error(
open_dataset(
tsv_dir,
partitioning = "part",
format = "text",
asdfg = "\\"
),
"Unrecognized"
)
# With both Arrow and readr parse options (disallowed)
expect_error(
open_dataset(
tsv_dir,
partitioning = "part",
format = "text",
quote = "\"",
quoting = TRUE
),
"either"
)
# With ambiguous partial option names (disallowed)
expect_error(
open_dataset(
tsv_dir,
partitioning = "part",
format = "text",
quo = "\"",
),
"Ambiguous"
)
# With only readr parse options (and omitting format = "text")
ds1 <- open_dataset(tsv_dir, partitioning = "part", delim = "\t")
expect_equivalent(
ds1 %>%
select(string = chr, integer = int, part) %>%
filter(integer > 6 & part == 5) %>%
collect() %>%
summarize(mean = mean(as.numeric(integer))), # as.numeric bc they're being parsed as int64
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
})
test_that("Dataset with multiple file formats", {
skip("https://issues.apache.org/jira/browse/ARROW-7653")
skip_if_not_available("parquet")
ds <- open_dataset(list(
open_dataset(dataset_dir, format = "parquet", partitioning = "part"),
open_dataset(ipc_dir, format = "arrow", partitioning = "part")
))
expect_identical(names(ds), c(names(df1), "part"))
expect_equivalent(
ds %>%
filter(int > 6 & part %in% c(1, 3)) %>%
select(string = chr, integer = int) %>%
collect(),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
rbind(., .) # Stack it twice
)
})
test_that("Creating UnionDataset", {
skip_if_not_available("parquet")
ds1 <- open_dataset(file.path(dataset_dir, 1))
ds2 <- open_dataset(file.path(dataset_dir, 2))
union1 <- open_dataset(list(ds1, ds2))
expect_r6_class(union1, "UnionDataset")
expect_equivalent(
union1 %>%
select(chr, dbl) %>%
filter(dbl > 7 & dbl < 53L) %>% # Testing the auto-casting of scalars
collect() %>%
arrange(dbl),
rbind(
df1[8:10, c("chr", "dbl")],
df2[1:2, c("chr", "dbl")]
)
)
# Now with the c() method
union2 <- c(ds1, ds2)
expect_r6_class(union2, "UnionDataset")
expect_equivalent(
union2 %>%
select(chr, dbl) %>%
filter(dbl > 7 & dbl < 53L) %>% # Testing the auto-casting of scalars
collect() %>%
arrange(dbl),
rbind(
df1[8:10, c("chr", "dbl")],
df2[1:2, c("chr", "dbl")]
)
)
# Confirm c() method error handling
expect_error(c(ds1, 42), "character")
})
test_that("InMemoryDataset", {
ds <- InMemoryDataset$create(rbind(df1, df2))
expect_r6_class(ds, "InMemoryDataset")
expect_equivalent(
ds %>%
select(chr, dbl) %>%
filter(dbl > 7 & dbl < 53L) %>%
collect() %>%
arrange(dbl),
rbind(
df1[8:10, c("chr", "dbl")],
df2[1:2, c("chr", "dbl")]
)
)
})
test_that("map_batches", {
skip_if_not_available("parquet")
ds <- open_dataset(dataset_dir, partitioning = "part")
expect_equivalent(
ds %>%
filter(int > 5) %>%
select(int, lgl) %>%
map_batches(~summarize(., min_int = min(int))),
tibble(min_int = c(6L, 101L))
)
})
test_that("partitioning = NULL to ignore partition information (but why?)", {
skip_if_not_available("parquet")
ds <- open_dataset(hive_dir, partitioning = NULL)
expect_identical(names(ds), names(df1)) # i.e. not c(names(df1), "group", "other")
})
test_that("filter() with is.na()", {
skip_if_not_available("parquet")
ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()))
expect_equivalent(
ds %>%
select(part, lgl) %>%
filter(!is.na(lgl), part == 1) %>%
collect(),
tibble(part = 1L, lgl = df1$lgl[!is.na(df1$lgl)])
)
})
test_that("filter() with is.nan()", {
skip_if_not_available("parquet")
ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()))
expect_equivalent(
ds %>%
select(part, dbl) %>%
filter(!is.nan(dbl), part == 2) %>%
collect(),
tibble(part = 2L, dbl = df2$dbl[!is.nan(df2$dbl)])
)
})
test_that("filter() with %in%", {
skip_if_not_available("parquet")
ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()))
expect_equivalent(
ds %>%
select(int, part) %>%
filter(int %in% c(6, 4, 3, 103, 107), part == 1) %>%
collect(),
tibble(int = df1$int[c(3, 4, 6)], part = 1)
)
# ARROW-9606: bug in %in% filter on partition column with >1 partition columns
ds <- open_dataset(hive_dir)
expect_equivalent(
ds %>%
filter(group %in% 2) %>%
select(names(df2)) %>%
collect(),
df2
)
})
test_that("filter() with negative scalar", {
skip_if_not_available("parquet")
ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()))
expect_equivalent(
ds %>%
filter(part == 1) %>%
select(chr, int) %>%
filter(int > -2) %>%
collect(),
df1[, c("chr", "int")]
)
expect_equivalent(
ds %>%
filter(part == 1) %>%
select(chr, int) %>%
filter(int %in% -2) %>%
collect(),
df1[FALSE, c("chr", "int")]
)
expect_equivalent(
ds %>%
filter(part == 1) %>%
select(chr, int) %>%
filter(-int < -2) %>%
collect(),
df1[df1$int > 2, c("chr", "int")]
)
})
test_that("filter() with strings", {
skip_if_not_available("parquet")
ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()))
expect_equivalent(
ds %>%
select(chr, part) %>%
filter(chr == "b", part == 1) %>%
collect(),
tibble(chr = "b", part = 1)
)
skip_if_not_available("utf8proc")
expect_equivalent(
ds %>%
select(chr, part) %>%
filter(toupper(chr) == "B", part == 1) %>%
collect(),
tibble(chr = "b", part = 1)
)
})
test_that("filter() with arrow compute functions by name", {
skip_if_not_available("parquet")
ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()))
expect_equivalent(
ds %>%
select(part, lgl) %>%
filter(arrow_is_valid(lgl), arrow_equal(part, 1)) %>%
collect(),
ds %>%
select(part, lgl) %>%
filter(!is.na(lgl), part == 1L) %>%
collect()
)
})
test_that("filter() with .data", {
skip_if_not_available("parquet")
ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()))
expect_equivalent(
ds %>%
select(.data$int, .data$part) %>%
filter(.data$int == 3, .data$part == 1) %>%
collect(),
tibble(int = df1$int[3], part = 1)
)
expect_equivalent(
ds %>%
select(.data$int, .data$part) %>%
filter(.data$int %in% c(6, 4, 3, 103, 107), .data$part == 1) %>%
collect(),
tibble(int = df1$int[c(3, 4, 6)], part = 1)
)
# and the .env pronoun too!
chr <- 1
expect_equivalent(
ds %>%
select(.data$int, .data$part) %>%
filter(.data$int %in% c(6, 4, 3, 103, 107), .data$part == .env$chr) %>%
collect(),
tibble(int = df1$int[c(3, 4, 6)], part = 1)
)
})
test_that("filter() on timestamp columns", {
skip_if_not_available("parquet")
ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()))
expect_equivalent(
ds %>%
filter(ts >= lubridate::ymd_hms("2015-05-04 03:12:39")) %>%
filter(part == 1) %>%
select(ts) %>%
collect(),
df1[5:10, c("ts")],
)
# Now with Date
expect_equivalent(
ds %>%
filter(ts >= as.Date("2015-05-04")) %>%
filter(part == 1) %>%
select(ts) %>%
collect(),
df1[5:10, c("ts")],
)
# Now with bare string date
skip("Implement more aggressive implicit casting for scalars (ARROW-11402)")
expect_equivalent(
ds %>%
filter(ts >= "2015-05-04") %>%
filter(part == 1) %>%
select(ts) %>%
collect(),
df1[5:10, c("ts")],
)
})
test_that("filter() on date32 columns", {
skip_if_not_available("parquet")
tmp <- tempfile()
dir.create(tmp)
df <- data.frame(date = as.Date(c("2020-02-02", "2020-02-03")))
write_parquet(df, file.path(tmp, "file.parquet"))
expect_equal(
open_dataset(tmp) %>%
filter(date > as.Date("2020-02-02")) %>%
collect() %>%
nrow(),
1L
)
# Also with timestamp scalar
expect_equal(
open_dataset(tmp) %>%
filter(date > lubridate::ymd_hms("2020-02-02 00:00:00")) %>%
collect() %>%
nrow(),
1L
)
})
test_that("filter() with expressions", {
skip_if_not_available("parquet")
ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()))
expect_r6_class(ds$format, "ParquetFileFormat")
expect_r6_class(ds$filesystem, "LocalFileSystem")
expect_r6_class(ds, "Dataset")
expect_equivalent(
ds %>%
select(chr, dbl) %>%
filter(dbl * 2 > 14 & dbl - 50 < 3L) %>%
collect() %>%
arrange(dbl),
rbind(
df1[8:10, c("chr", "dbl")],
df2[1:2, c("chr", "dbl")]
)
)
# check division's special casing.
expect_equivalent(
ds %>%
select(chr, dbl) %>%
filter(dbl / 2 > 3.5 & dbl < 53) %>%
collect() %>%
arrange(dbl),
rbind(
df1[8:10, c("chr", "dbl")],
df2[1:2, c("chr", "dbl")]
)
)
expect_equivalent(
ds %>%
select(chr, dbl, int) %>%
filter(int %/% 2L > 3 & dbl < 53) %>%
collect() %>%
arrange(dbl),
rbind(
df1[8:10, c("chr", "dbl", "int")],
df2[1:2, c("chr", "dbl", "int")]
)
)
expect_equivalent(
ds %>%
select(chr, dbl, int) %>%
filter(int %/% 2 > 3 & dbl < 53) %>%
collect() %>%
arrange(dbl),
rbind(
df1[8:10, c("chr", "dbl", "int")],
df2[1:2, c("chr", "dbl", "int")]
)
)
expect_equivalent(
ds %>%
select(chr, dbl, int) %>%
filter(int %% 2L > 0 & dbl < 53) %>%
collect() %>%
arrange(dbl),
rbind(
df1[c(1, 3, 5, 7, 9), c("chr", "dbl", "int")],
df2[1, c("chr", "dbl", "int")]
)
)
expect_equivalent(
ds %>%
select(chr, dbl, int) %>%
filter(int %% 2L > 0 & dbl < 53) %>%
collect() %>%
arrange(dbl),
rbind(
df1[c(1, 3, 5, 7, 9), c("chr", "dbl", "int")],
df2[1, c("chr", "dbl", "int")]
)
)
expect_equivalent(
ds %>%
select(chr, dbl, int) %>%
filter(int %% 2 > 0 & dbl < 53) %>%
collect() %>%
arrange(dbl),
rbind(
df1[c(1, 3, 5, 7, 9), c("chr", "dbl", "int")],
df2[1, c("chr", "dbl", "int")]
)
)
expect_equivalent(
ds %>%
select(chr, dbl, int) %>%
filter(dbl + int > 15 & dbl < 53L) %>%
collect() %>%
arrange(dbl),
rbind(
df1[8:10, c("chr", "dbl", "int")],
df2[1:2, c("chr", "dbl", "int")]
)
)
})
test_that("mutate()", {
ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()))
mutated <- ds %>%
select(chr, dbl, int) %>%
filter(dbl * 2 > 14 & dbl - 50 < 3L) %>%
mutate(twice = int * 2)
expect_output(
print(mutated),
"FileSystemDataset (query)
chr: string
dbl: double
int: int32
twice: expr
* Filter: ((multiply_checked(dbl, 2) > 14) and (subtract_checked(dbl, 50) < 3))
See $.data for the source Arrow object",
fixed = TRUE
)
expect_equivalent(
mutated %>%
collect() %>%
arrange(dbl),
rbind(
df1[8:10, c("chr", "dbl", "int")],
df2[1:2, c("chr", "dbl", "int")]
) %>%
mutate(
twice = int * 2
)
)
})
test_that("transmute()", {
ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()))
mutated <-
expect_equivalent(
ds %>%
select(chr, dbl, int) %>%
filter(dbl * 2 > 14 & dbl - 50 < 3L) %>%
transmute(twice = int * 2) %>%
collect() %>%
arrange(twice),
rbind(
df1[8:10, "int", drop = FALSE],
df2[1:2, "int", drop = FALSE]
) %>%
transmute(
twice = int * 2
)
)
})
test_that("mutate() features not yet implemented", {
expect_error(
ds %>%
group_by(int) %>%
mutate(avg = mean(int)),
"mutate() on grouped data not supported in Arrow\nCall collect() first to pull data into R.",
fixed = TRUE
)
})
test_that("mutate() with scalar (length 1) literal inputs", {
expect_equal(
ds %>%
mutate(the_answer = 42) %>%
collect() %>%
pull(the_answer),
rep(42, nrow(ds))
)
expect_error(
ds %>% mutate(the_answer = c(42, 42)),
"In the_answer = c(42, 42), only values of size one are recycled\nCall collect() first to pull data into R.",
fixed = TRUE
)
})
test_that("mutate() with NULL inputs", {
expect_equal(
ds %>%
mutate(int = NULL) %>%
collect(),
ds %>%
select(-int) %>%
collect()
)
})
test_that("empty mutate()", {
expect_equal(
ds %>%
mutate() %>%
collect(),
ds %>%
collect()
)
})
test_that("transmute() with NULL inputs", {
expect_equal(
ds %>%
transmute(int = NULL) %>%
collect(),
ds %>%
select() %>%
collect()
)
})
test_that("empty transmute()", {
expect_equal(
ds %>%
transmute() %>%
collect(),
ds %>%
select() %>%
collect()
)
})
test_that("filter scalar validation doesn't crash (ARROW-7772)", {
expect_error(
ds %>%
filter(int == "fff", part == 1) %>%
collect(),
"equal has no kernel matching input types .array.int32., scalar.string.."
)
})
test_that("collect() on Dataset works (if fits in memory)", {
skip_if_not_available("parquet")
expect_equal(
collect(open_dataset(dataset_dir)),
rbind(df1, df2)
)
})
test_that("count()", {
skip_if_not_available("parquet")
skip("count() is not a generic so we have to get here through summarize()")
ds <- open_dataset(dataset_dir)
df <- rbind(df1, df2)
expect_equal(
ds %>%
filter(int > 6, int < 108) %>%
count(chr),
df %>%
filter(int > 6, int < 108) %>%
count(chr)
)
})
test_that("arrange()", {
ds <- open_dataset(dataset_dir, partitioning = schema(part = uint8()))
arranged <- ds %>%
select(chr, dbl, int) %>%
filter(dbl * 2 > 14 & dbl - 50 < 3L) %>%
mutate(twice = int * 2) %>%
arrange(chr, desc(twice), dbl + int)
expect_output(
print(arranged),
"FileSystemDataset (query)
chr: string
dbl: double
int: int32
twice: expr
* Filter: ((multiply_checked(dbl, 2) > 14) and (subtract_checked(dbl, 50) < 3))
* Sorted by chr [asc], multiply_checked(int, 2) [desc], add_checked(dbl, int) [asc]
See $.data for the source Arrow object",
fixed = TRUE
)
expect_equivalent(
arranged %>%
collect(),
rbind(
df1[8, c("chr", "dbl", "int")],
df2[2, c("chr", "dbl", "int")],
df1[9, c("chr", "dbl", "int")],
df2[1, c("chr", "dbl", "int")],
df1[10, c("chr", "dbl", "int")]
) %>%
mutate(
twice = int * 2
)
)
})
test_that("compute()/collect(as_data_frame=FALSE)", {
skip_if_not_available("parquet")
ds <- open_dataset(dataset_dir)
tab1 <- ds %>% compute()
expect_is(tab1, "Table")
tab2 <- ds %>% collect(as_data_frame = FALSE)
expect_is(tab2, "Table")
tab3 <- ds %>%
mutate(negint = -int) %>%
filter(negint > - 100) %>%
arrange(chr) %>%
select(negint) %>%
compute()
expect_is(tab3, "Table")
expect_equal(
tab3 %>% collect(),
tibble(negint = -1:-10)
)
tab4 <- ds %>%
mutate(negint = -int) %>%
filter(negint > - 100) %>%
arrange(chr) %>%
select(negint) %>%
collect(as_data_frame = FALSE)
expect_is(tab3, "Table")
expect_equal(
tab4 %>% collect(),
tibble(negint = -1:-10)
)
tab5 <- ds %>%
mutate(negint = -int) %>%
group_by(fct) %>%
compute()
# the group_by() prevents compute() from returning a Table...
expect_is(tab5, "arrow_dplyr_query")
# ... but $.data is a Table...
expect_is(tab5$.data, "Table")
# ... and the mutate() was evaluated
expect_true("negint" %in% names(tab5$.data))
})
test_that("head/tail", {
skip_if_not_available("parquet")
ds <- open_dataset(dataset_dir)
expect_equal(as.data.frame(head(ds)), head(df1))
expect_equal(
as.data.frame(head(ds, 12)),
rbind(df1, df2[1:2,])
)
expect_equal(
ds %>%
filter(int > 6) %>%
head() %>%
as.data.frame(),
rbind(df1[7:10,], df2[1:2,])
)
expect_equal(as.data.frame(tail(ds)), tail(df2))
expect_equal(
as.data.frame(tail(ds, 12)),
rbind(df1[9:10,], df2)
)
expect_equal(
ds %>%
filter(int < 105) %>%
tail() %>%
as.data.frame(),
rbind(df1[9:10,], df2[1:4,])
)
})
test_that("Dataset [ (take by index)", {
skip_if_not_available("parquet")
ds <- open_dataset(dataset_dir)
# Taking only from one file
expect_equal(
as.data.frame(ds[c(4, 5, 9), 3:4]),
df1[c(4, 5, 9), 3:4]
)
# Taking from more than one
expect_equal(
as.data.frame(ds[c(4, 5, 9, 12, 13), 3:4]),
rbind(df1[c(4, 5, 9), 3:4], df2[2:3, 3:4])
)
# Taking out of order
expect_equal(
as.data.frame(ds[c(4, 13, 9, 12, 5), ]),
rbind(
df1[4, ],
df2[3, ],
df1[9, ],
df2[2, ],
df1[5, ]
)
)
# Take from a query
ds2 <- ds %>%
filter(int > 6) %>%
select(int, lgl)
expect_equal(
as.data.frame(ds2[c(2, 5), ]),
rbind(
df1[8, c("int", "lgl")],
df2[1, c("int", "lgl")]
)
)
})
test_that("dplyr method not implemented messages", {
skip_if_not_available("parquet")
ds <- open_dataset(dataset_dir)
# This one is more nuanced
expect_error(
ds %>% filter(int > 6, dbl > max(dbl)),
"Filter expression not supported for Arrow Datasets: dbl > max(dbl)\nCall collect() first to pull data into R.",
fixed = TRUE
)
# One explicit test of the full message
expect_error(
ds %>% summarize(mean(int)),
"summarize() is not currently implemented for Arrow Datasets. Call collect() first to pull data into R.",
fixed = TRUE
)
# Helper for everything else
expect_not_implemented <- function(x) {
expect_error(x, "is not currently implemented for Arrow Datasets")
}
expect_not_implemented(ds %>% filter(int == 1) %>% summarize(n()))
})
test_that("Dataset and query print methods", {
skip_if_not_available("parquet")
ds <- open_dataset(hive_dir)
expect_output(
print(ds),
paste(
"FileSystemDataset with 2 Parquet files",
"int: int32",
"dbl: double",
"lgl: bool",
"chr: string",
"fct: dictionary<values=string, indices=int32>",
"ts: timestamp[us, tz=UTC]",
"group: int32",
"other: string",
sep = "\n"
),
fixed = TRUE
)
expect_type(ds$metadata, "list")
q <- select(ds, string = chr, lgl, integer = int)
expect_output(
print(q),
paste(
"Dataset (query)",
"string: string",
"lgl: bool",
"integer: int32",
"",
"See $.data for the source Arrow object",
sep = "\n"
),
fixed = TRUE
)
expect_output(
print(q %>% filter(integer == 6) %>% group_by(lgl)),
paste(
"Dataset (query)",
"string: string",
"lgl: bool",
"integer: int32",
"",
"* Filter: (int == 6)",
"* Grouped by lgl",
"See $.data for the source Arrow object",
sep = "\n"
),
fixed = TRUE
)
})
test_that("Scanner$ScanBatches", {
ds <- open_dataset(ipc_dir, format = "feather")
batches <- ds$NewScan()$Finish()$ScanBatches()
table <- Table$create(!!!batches)
expect_equivalent(as.data.frame(table), rbind(df1, df2))
})
expect_scan_result <- function(ds, schm) {
sb <- ds$NewScan()
expect_r6_class(sb, "ScannerBuilder")
expect_equal(sb$schema, schm)
sb$Project(c("chr", "lgl"))
sb$Filter(Expression$field_ref("dbl") == 8)
scn <- sb$Finish()
expect_r6_class(scn, "Scanner")
tab <- scn$ToTable()
expect_r6_class(tab, "Table")
expect_equivalent(
as.data.frame(tab),
df1[8, c("chr", "lgl")]
)
}
test_that("Assembling a Dataset manually and getting a Table", {
skip_if_not_available("parquet")
fs <- LocalFileSystem$create()
selector <- FileSelector$create(dataset_dir, recursive = TRUE)
partitioning <- DirectoryPartitioning$create(schema(part = double()))
fmt <- FileFormat$create("parquet")
factory <- FileSystemDatasetFactory$create(fs, selector, NULL, fmt, partitioning = partitioning)
expect_r6_class(factory, "FileSystemDatasetFactory")
schm <- factory$Inspect()
expect_r6_class(schm, "Schema")
phys_schm <- ParquetFileReader$create(files[1])$GetSchema()
expect_equal(names(phys_schm), names(df1))
expect_equal(names(schm), c(names(phys_schm), "part"))
child <- factory$Finish(schm)
expect_r6_class(child, "FileSystemDataset")
expect_r6_class(child$schema, "Schema")
expect_r6_class(child$format, "ParquetFileFormat")
expect_equal(names(schm), names(child$schema))
expect_equivalent(child$files, files)
ds <- Dataset$create(list(child), schm)
expect_scan_result(ds, schm)
})
test_that("Assembling multiple DatasetFactories with DatasetFactory", {
skip_if_not_available("parquet")
factory1 <- dataset_factory(file.path(dataset_dir, 1), format = "parquet")
expect_r6_class(factory1, "FileSystemDatasetFactory")
factory2 <- dataset_factory(file.path(dataset_dir, 2), format = "parquet")
expect_r6_class(factory2, "FileSystemDatasetFactory")
factory <- DatasetFactory$create(list(factory1, factory2))
expect_r6_class(factory, "DatasetFactory")
schm <- factory$Inspect()
expect_r6_class(schm, "Schema")
phys_schm <- ParquetFileReader$create(files[1])$GetSchema()
expect_equal(names(phys_schm), names(df1))
ds <- factory$Finish(schm)
expect_r6_class(ds, "UnionDataset")
expect_r6_class(ds$schema, "Schema")
expect_equal(names(schm), names(ds$schema))
expect_equivalent(map(ds$children, ~.$files), files)
expect_scan_result(ds, schm)
})
test_that("Writing a dataset: CSV->IPC", {
skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651
ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
dst_dir <- make_temp_dir()
write_dataset(ds, dst_dir, format = "feather", partitioning = "int")
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), sort(paste("int", c(1:10, 101:110), sep = "=")))
new_ds <- open_dataset(dst_dir, format = "feather")
expect_equivalent(
new_ds %>%
select(string = chr, integer = int) %>%
filter(integer > 6 & integer < 11) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
# Check whether "int" is present in the files or just in the dirs
first <- read_feather(
dir(dst_dir, pattern = ".feather$", recursive = TRUE, full.names = TRUE)[1],
as_data_frame = FALSE
)
# It shouldn't be there
expect_false("int" %in% names(first))
})
test_that("Writing a dataset: Parquet->IPC", {
skip_if_not_available("parquet")
skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651
ds <- open_dataset(hive_dir)
dst_dir <- make_temp_dir()
write_dataset(ds, dst_dir, format = "feather", partitioning = "int")
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), sort(paste("int", c(1:10, 101:110), sep = "=")))
new_ds <- open_dataset(dst_dir, format = "feather")
expect_equivalent(
new_ds %>%
select(string = chr, integer = int, group) %>%
filter(integer > 6 & group == 1) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
})
test_that("Writing a dataset: CSV->Parquet", {
skip_if_not_available("parquet")
skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651
ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
dst_dir <- make_temp_dir()
write_dataset(ds, dst_dir, format = "parquet", partitioning = "int")
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), sort(paste("int", c(1:10, 101:110), sep = "=")))
new_ds <- open_dataset(dst_dir)
expect_equivalent(
new_ds %>%
select(string = chr, integer = int) %>%
filter(integer > 6 & integer < 11) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
})
test_that("Writing a dataset: Parquet->Parquet (default)", {
skip_if_not_available("parquet")
skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651
ds <- open_dataset(hive_dir)
dst_dir <- make_temp_dir()
write_dataset(ds, dst_dir, partitioning = "int")
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), sort(paste("int", c(1:10, 101:110), sep = "=")))
new_ds <- open_dataset(dst_dir)
expect_equivalent(
new_ds %>%
select(string = chr, integer = int, group) %>%
filter(integer > 6 & group == 1) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
})
test_that("Writing a dataset: no format specified", {
skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651
dst_dir <- make_temp_dir()
write_dataset(mtcars, dst_dir)
new_ds <- open_dataset(dst_dir)
expect_equal(
list.files(dst_dir, pattern = "parquet"),
"part-0.parquet"
)
expect_true(
inherits(new_ds$format, "ParquetFileFormat")
)
expect_equivalent(
new_ds %>% collect(),
mtcars
)
})
test_that("Dataset writing: dplyr methods", {
skip_if_not_available("parquet")
skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651
ds <- open_dataset(hive_dir)
dst_dir <- tempfile()
# Specify partition vars by group_by
ds %>%
group_by(int) %>%
write_dataset(dst_dir, format = "feather")
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), sort(paste("int", c(1:10, 101:110), sep = "=")))
# select to specify schema
dst_dir2 <- tempfile()
ds %>%
group_by(int) %>%
select(chr, dbl) %>%
write_dataset(dst_dir2, format = "feather")
new_ds <- open_dataset(dst_dir2, format = "feather")
expect_equivalent(
collect(new_ds) %>% arrange(int),
rbind(df1[c("chr", "dbl", "int")], df2[c("chr", "dbl", "int")])
)
# filter to restrict written rows
dst_dir3 <- tempfile()
ds %>%
filter(int == 4) %>%
write_dataset(dst_dir3, format = "feather")
new_ds <- open_dataset(dst_dir3, format = "feather")
expect_equivalent(
new_ds %>% select(names(df1)) %>% collect(),
df1 %>% filter(int == 4)
)
})
test_that("Dataset writing: non-hive", {
skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651
skip_if_not_available("parquet")
ds <- open_dataset(hive_dir)
dst_dir <- tempfile()
write_dataset(ds, dst_dir, format = "feather", partitioning = "int", hive_style = FALSE)
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), sort(as.character(c(1:10, 101:110))))
})
test_that("Dataset writing: no partitioning", {
skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651
skip_if_not_available("parquet")
ds <- open_dataset(hive_dir)
dst_dir <- tempfile()
write_dataset(ds, dst_dir, format = "feather", partitioning = NULL)
expect_true(dir.exists(dst_dir))
expect_true(length(dir(dst_dir)) > 0)
})
test_that("Dataset writing: partition on null", {
skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651
ds <- open_dataset(hive_dir)
dst_dir <- tempfile()
partitioning = hive_partition(lgl = boolean())
write_dataset(ds, dst_dir, partitioning = partitioning)
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), c("lgl=__HIVE_DEFAULT_PARTITION__", "lgl=false", "lgl=true"))
dst_dir <- tempfile()
partitioning = hive_partition(lgl = boolean(), null_fallback="xyz")
write_dataset(ds, dst_dir, partitioning = partitioning)
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), c("lgl=false", "lgl=true", "lgl=xyz"))
ds_readback <- open_dataset(dst_dir, partitioning = hive_partition(lgl = boolean(), null_fallback="xyz"))
expect_identical(
ds %>%
select(int, lgl) %>%
collect() %>%
arrange(lgl, int),
ds_readback %>%
select(int, lgl) %>%
collect() %>%
arrange(lgl, int)
)
})
test_that("Dataset writing: from data.frame", {
skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651
dst_dir <- tempfile()
stacked <- rbind(df1, df2)
stacked %>%
group_by(int) %>%
write_dataset(dst_dir, format = "feather")
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), sort(paste("int", c(1:10, 101:110), sep = "=")))
new_ds <- open_dataset(dst_dir, format = "feather")
expect_equivalent(
new_ds %>%
select(string = chr, integer = int) %>%
filter(integer > 6 & integer < 11) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
})
test_that("Dataset writing: from RecordBatch", {
skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651
dst_dir <- tempfile()
stacked <- record_batch(rbind(df1, df2))
stacked %>%
group_by(int) %>%
write_dataset(dst_dir, format = "feather")
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), sort(paste("int", c(1:10, 101:110), sep = "=")))
new_ds <- open_dataset(dst_dir, format = "feather")
expect_equivalent(
new_ds %>%
select(string = chr, integer = int) %>%
filter(integer > 6 & integer < 11) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
})
test_that("Writing a dataset: Ipc format options & compression", {
skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651
ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
dst_dir <- make_temp_dir()
codec <- NULL
if (codec_is_available("zstd")) {
codec <- Codec$create("zstd")
}
write_dataset(ds, dst_dir, format = "feather", codec = codec)
expect_true(dir.exists(dst_dir))
new_ds <- open_dataset(dst_dir, format = "feather")
expect_equivalent(
new_ds %>%
select(string = chr, integer = int) %>%
filter(integer > 6 & integer < 11) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
})
test_that("Writing a dataset: Parquet format options", {
skip_on_os("windows") # https://issues.apache.org/jira/browse/ARROW-9651
skip_if_not_available("parquet")
ds <- open_dataset(csv_dir, partitioning = "part", format = "csv")
dst_dir <- make_temp_dir()
dst_dir_no_truncated_timestamps <- make_temp_dir()
# Use trace() to confirm that options are passed in
trace(
"parquet___ArrowWriterProperties___create",
tracer = quote(warning("allow_truncated_timestamps == ", allow_truncated_timestamps)),
print = FALSE,
where = write_dataset
)
expect_warning(
write_dataset(ds, dst_dir_no_truncated_timestamps, format = "parquet", partitioning = "int"),
"allow_truncated_timestamps == FALSE"
)
expect_warning(
write_dataset(ds, dst_dir, format = "parquet", partitioning = "int", allow_truncated_timestamps = TRUE),
"allow_truncated_timestamps == TRUE"
)
untrace("parquet___ArrowWriterProperties___create", where = write_dataset)
# Now confirm we can read back what we sent
expect_true(dir.exists(dst_dir))
expect_identical(dir(dst_dir), sort(paste("int", c(1:10, 101:110), sep = "=")))
new_ds <- open_dataset(dst_dir)
expect_equivalent(
new_ds %>%
select(string = chr, integer = int) %>%
filter(integer > 6 & integer < 11) %>%
collect() %>%
summarize(mean = mean(integer)),
df1 %>%
select(string = chr, integer = int) %>%
filter(integer > 6) %>%
summarize(mean = mean(integer))
)
})
test_that("Dataset writing: unsupported features/input validation", {
skip_if_not_available("parquet")
expect_error(write_dataset(4), 'dataset must be a "Dataset"')
ds <- open_dataset(hive_dir)
expect_error(
select(ds, integer = int) %>% write_dataset(ds),
"Renaming columns when writing a dataset is not yet supported"
)
expect_error(
write_dataset(ds, partitioning = c("int", "NOTACOLUMN"), format = "ipc"),
'Invalid field name: "NOTACOLUMN"'
)
expect_error(
write_dataset(ds, tempfile(), basename_template = "something_without_i")
)
expect_error(
write_dataset(ds, tempfile(), basename_template = NULL)
)
})
# see https://issues.apache.org/jira/browse/ARROW-11328
test_that("Collecting zero columns from a dataset doesn't return entire dataset", {
skip_if_not_available("parquet")
tmp <- tempfile()
write_dataset(mtcars, tmp, format = "parquet")
expect_equal(
open_dataset(tmp) %>% select() %>% collect() %>% dim(),
c(32, 0)
)
})