blob: 8cb0dafdfe487b3a5f54a1142198a7f6fce10063 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
context("S3 tests using local minio")
if (arrow_with_s3() && process_is_running("minio server")) {
# Get minio config, with expected defaults
minio_key <- Sys.getenv("MINIO_ACCESS_KEY", "minioadmin")
minio_secret <- Sys.getenv("MINIO_SECRET_KEY", "minioadmin")
minio_port <- Sys.getenv("MINIO_PORT", "9000")
# Helper function for minio URIs
minio_uri <- function(...) {
template <- "s3://%s:%s@%s?scheme=http&endpoint_override=localhost%s%s"
sprintf(template, minio_key, minio_secret, minio_path(...), "%3A", minio_port)
}
minio_path <- function(...) paste(now, ..., sep = "/")
test_that("minio setup", {
# Create a "bucket" on minio for this test run, which we'll delete when done.
fs <- S3FileSystem$create(
access_key = minio_key,
secret_key = minio_secret,
scheme = "http",
endpoint_override = paste0("localhost:", minio_port)
)
expect_r6_class(fs, "S3FileSystem")
now <- as.character(as.numeric(Sys.time()))
# If minio isn't running, this will hang for a few seconds and fail with a
# curl timeout, causing `run_these` to be set to FALSE and skipping the tests
fs$CreateDir(now)
})
# Clean up when we're all done
on.exit(fs$DeleteDir(now))
test_that("read/write Feather on minio", {
write_feather(example_data, minio_uri("test.feather"))
expect_identical(read_feather(minio_uri("test.feather")), example_data)
})
test_that("read/write Feather by filesystem, not URI", {
write_feather(example_data, fs$path(minio_path("test2.feather")))
expect_identical(
read_feather(fs$path(minio_path("test2.feather"))),
example_data
)
})
test_that("read/write stream", {
write_ipc_stream(example_data, fs$path(minio_path("test3.ipc")))
expect_identical(
read_ipc_stream(fs$path(minio_path("test3.ipc"))),
example_data
)
})
test_that("read/write Parquet on minio", {
skip_if_not_available("parquet")
write_parquet(example_data, fs$path(minio_uri("test.parquet")))
expect_identical(read_parquet(minio_uri("test.parquet")), example_data)
})
if (arrow_with_dataset()) {
library(dplyr)
make_temp_dir <- function() {
path <- tempfile()
dir.create(path)
normalizePath(path, winslash = "/")
}
test_that("open_dataset with an S3 file (not directory) URI", {
skip_if_not_available("parquet")
expect_identical(
open_dataset(minio_uri("test.parquet")) %>% collect(),
example_data
)
})
test_that("open_dataset with vector of S3 file URIs", {
expect_identical(
open_dataset(
c(minio_uri("test.feather"), minio_uri("test2.feather")),
format = "feather"
) %>% collect(),
rbind(example_data, example_data)
)
})
test_that("open_dataset errors on URIs for different file systems", {
td <- make_temp_dir()
expect_error(
open_dataset(
c(
minio_uri("test.feather"),
paste0("file://", file.path(td, "fake.feather"))
),
format = "feather"
),
"Vectors of URIs for different file systems are not supported"
)
})
# Dataset test setup, cf. test-dataset.R
first_date <- lubridate::ymd_hms("2015-04-29 03:12:39")
df1 <- tibble(
int = 1:10,
dbl = as.numeric(1:10),
lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 2),
chr = letters[1:10],
fct = factor(LETTERS[1:10]),
ts = first_date + lubridate::days(1:10)
)
second_date <- lubridate::ymd_hms("2017-03-09 07:01:02")
df2 <- tibble(
int = 101:110,
dbl = as.numeric(51:60),
lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 2),
chr = letters[10:1],
fct = factor(LETTERS[10:1]),
ts = second_date + lubridate::days(10:1)
)
# This is also to set up the dataset tests
test_that("write_parquet with filesystem arg", {
skip_if_not_available("parquet")
fs$CreateDir(minio_path("hive_dir", "group=1", "other=xxx"))
fs$CreateDir(minio_path("hive_dir", "group=2", "other=yyy"))
expect_length(fs$ls(minio_path("hive_dir")), 2)
write_parquet(df1, fs$path(minio_path("hive_dir", "group=1", "other=xxx", "file1.parquet")))
write_parquet(df2, fs$path(minio_path("hive_dir", "group=2", "other=yyy", "file2.parquet")))
expect_identical(
read_parquet(fs$path(minio_path("hive_dir", "group=1", "other=xxx", "file1.parquet"))),
df1
)
})
test_that("open_dataset with fs", {
ds <- open_dataset(fs$path(minio_path("hive_dir")))
expect_identical(
ds %>% select(dbl, lgl) %>% collect(),
rbind(df1[, c("dbl", "lgl")], df2[, c("dbl", "lgl")])
)
})
test_that("write_dataset with fs", {
ds <- open_dataset(fs$path(minio_path("hive_dir")))
write_dataset(ds, fs$path(minio_path("new_dataset_dir")))
expect_length(fs$ls(minio_path("new_dataset_dir")), 1)
})
test_that("Let's test copy_files too", {
td <- make_temp_dir()
copy_files(minio_uri("hive_dir"), td)
expect_length(dir(td), 2)
ds <- open_dataset(td)
expect_identical(
ds %>% select(dbl, lgl) %>% collect(),
rbind(df1[, c("dbl", "lgl")], df2[, c("dbl", "lgl")])
)
# Let's copy the other way and use a SubTreeFileSystem rather than URI
copy_files(td, fs$path(minio_path("hive_dir2")))
ds2 <- open_dataset(fs$path(minio_path("hive_dir2")))
expect_identical(
ds2 %>% select(dbl, lgl) %>% collect(),
rbind(df1[, c("dbl", "lgl")], df2[, c("dbl", "lgl")])
)
})
}
test_that("S3FileSystem input validation", {
expect_error(
S3FileSystem$create(access_key = "foo"),
"Key authentication requires both access_key and secret_key"
)
expect_error(
S3FileSystem$create(secret_key = "foo"),
"Key authentication requires both access_key and secret_key"
)
expect_error(
S3FileSystem$create(session_token = "foo"),
paste0(
"In order to initialize a session with temporary credentials, ",
"both secret_key and access_key must be provided ",
"in addition to session_token."
)
)
expect_error(
S3FileSystem$create(access_key = "foo", secret_key = "asdf", anonymous = TRUE),
'Cannot specify "access_key" and "secret_key" when anonymous = TRUE'
)
expect_error(
S3FileSystem$create(access_key = "foo", secret_key = "asdf", role_arn = "qwer"),
"Cannot provide both key authentication and role_arn"
)
expect_error(
S3FileSystem$create(access_key = "foo", secret_key = "asdf", external_id = "qwer"),
'Cannot specify "external_id" without providing a role_arn string'
)
expect_error(
S3FileSystem$create(external_id = "foo"),
'Cannot specify "external_id" without providing a role_arn string'
)
})
} else {
# Kinda hacky, let's put a skipped test here, just so we note that the tests
# didn't run
test_that("S3FileSystem tests with Minio", {
skip("Minio is not running")
})
}