| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <exception> |
| #include <memory> |
| #include <sstream> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| // This boost/asio/io_context.hpp include is needless for no MinGW |
| // build. |
| // |
| // This is for including boost/asio/detail/socket_types.hpp before any |
| // "#include <windows.h>". boost/asio/detail/socket_types.hpp doesn't |
| // work if windows.h is already included. boost/process.h -> |
| // boost/process/args.hpp -> boost/process/detail/basic_cmd.hpp |
| // includes windows.h. boost/process/args.hpp is included before |
| // boost/process/async.h that includes |
| // boost/asio/detail/socket_types.hpp implicitly is included. |
| #include <boost/asio/io_context.hpp> |
| // We need BOOST_USE_WINDOWS_H definition with MinGW when we use |
| // boost/process.hpp. See ARROW_BOOST_PROCESS_COMPILE_DEFINITIONS in |
| // cpp/cmake_modules/BuildUtils.cmake for details. |
| #include <boost/process.hpp> |
| |
| #include <gmock/gmock-matchers.h> |
| #include <gtest/gtest.h> |
| |
| #ifdef _WIN32 |
| // Undefine preprocessor macros that interfere with AWS function / method names |
| #ifdef GetMessage |
| #undef GetMessage |
| #endif |
| #ifdef GetObject |
| #undef GetObject |
| #endif |
| #endif |
| |
| #include <aws/core/Aws.h> |
| #include <aws/core/Version.h> |
| #include <aws/core/auth/AWSCredentials.h> |
| #include <aws/core/auth/AWSCredentialsProvider.h> |
| #include <aws/core/client/RetryStrategy.h> |
| #include <aws/core/utils/logging/ConsoleLogSystem.h> |
| #include <aws/s3/S3Client.h> |
| #include <aws/s3/model/CreateBucketRequest.h> |
| #include <aws/s3/model/GetObjectRequest.h> |
| #include <aws/s3/model/PutObjectRequest.h> |
| #include <aws/sts/STSClient.h> |
| |
| #include "arrow/filesystem/filesystem.h" |
| #include "arrow/filesystem/s3_internal.h" |
| #include "arrow/filesystem/s3_test_util.h" |
| #include "arrow/filesystem/s3fs.h" |
| #include "arrow/filesystem/test_util.h" |
| #include "arrow/result.h" |
| #include "arrow/status.h" |
| #include "arrow/testing/future_util.h" |
| #include "arrow/testing/gtest_util.h" |
| #include "arrow/testing/util.h" |
| #include "arrow/util/async_generator.h" |
| #include "arrow/util/checked_cast.h" |
| #include "arrow/util/future.h" |
| #include "arrow/util/io_util.h" |
| #include "arrow/util/logging.h" |
| #include "arrow/util/macros.h" |
| |
| namespace arrow { |
| namespace fs { |
| |
| using ::arrow::internal::checked_pointer_cast; |
| using ::arrow::internal::PlatformFilename; |
| using ::arrow::internal::UriEscape; |
| |
| using ::arrow::fs::internal::ConnectRetryStrategy; |
| using ::arrow::fs::internal::ErrorToStatus; |
| using ::arrow::fs::internal::OutcomeToStatus; |
| using ::arrow::fs::internal::ToAwsString; |
| |
| namespace bp = boost::process; |
| |
| // NOTE: Connecting in Python: |
| // >>> fs = s3fs.S3FileSystem(key='minio', secret='miniopass', |
| // client_kwargs=dict(endpoint_url='http://127.0.0.1:9000')) |
| // >>> fs.ls('') |
| // ['bucket'] |
| // or: |
| // >>> from fs_s3fs import S3FS |
| // >>> fs = S3FS('bucket', endpoint_url='http://127.0.0.1:9000', |
| // aws_access_key_id='minio', aws_secret_access_key='miniopass') |
| |
| #define ARROW_AWS_ASSIGN_OR_FAIL_IMPL(outcome_name, lhs, rexpr) \ |
| auto outcome_name = (rexpr); \ |
| if (!outcome_name.IsSuccess()) { \ |
| FAIL() << "'" ARROW_STRINGIFY(rexpr) "' failed with " \ |
| << outcome_name.GetError().GetMessage(); \ |
| } \ |
| lhs = std::move(outcome_name).GetResultWithOwnership(); |
| |
| #define ARROW_AWS_ASSIGN_OR_FAIL_NAME(x, y) ARROW_CONCAT(x, y) |
| |
| #define ARROW_AWS_ASSIGN_OR_FAIL(lhs, rexpr) \ |
| ARROW_AWS_ASSIGN_OR_FAIL_IMPL( \ |
| ARROW_AWS_ASSIGN_OR_FAIL_NAME(_aws_error_or_value, __COUNTER__), lhs, rexpr); |
| |
| class AwsTestMixin : public ::testing::Test { |
| public: |
| // We set this environment variable to speed up tests by ensuring |
| // DefaultAWSCredentialsProviderChain does not query (inaccessible) |
| // EC2 metadata endpoint |
| AwsTestMixin() : ec2_metadata_disabled_guard_("AWS_EC2_METADATA_DISABLED", "true") {} |
| |
| void SetUp() override { |
| #ifdef AWS_CPP_SDK_S3_NOT_SHARED |
| auto aws_log_level = Aws::Utils::Logging::LogLevel::Fatal; |
| aws_options_.loggingOptions.logLevel = aws_log_level; |
| aws_options_.loggingOptions.logger_create_fn = [&aws_log_level] { |
| return std::make_shared<Aws::Utils::Logging::ConsoleLogSystem>(aws_log_level); |
| }; |
| Aws::InitAPI(aws_options_); |
| #endif |
| } |
| |
| void TearDown() override { |
| #ifdef AWS_CPP_SDK_S3_NOT_SHARED |
| Aws::ShutdownAPI(aws_options_); |
| #endif |
| } |
| |
| private: |
| EnvVarGuard ec2_metadata_disabled_guard_; |
| #ifdef AWS_CPP_SDK_S3_NOT_SHARED |
| Aws::SDKOptions aws_options_; |
| #endif |
| }; |
| |
| class S3TestMixin : public AwsTestMixin { |
| public: |
| void SetUp() override { |
| AwsTestMixin::SetUp(); |
| |
| ASSERT_OK(minio_.Start()); |
| |
| client_config_.reset(new Aws::Client::ClientConfiguration()); |
| client_config_->endpointOverride = ToAwsString(minio_.connect_string()); |
| client_config_->scheme = Aws::Http::Scheme::HTTP; |
| client_config_->retryStrategy = std::make_shared<ConnectRetryStrategy>(); |
| credentials_ = {ToAwsString(minio_.access_key()), ToAwsString(minio_.secret_key())}; |
| bool use_virtual_addressing = false; |
| client_.reset( |
| new Aws::S3::S3Client(credentials_, *client_config_, |
| Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, |
| use_virtual_addressing)); |
| } |
| |
| void TearDown() override { |
| ASSERT_OK(minio_.Stop()); |
| |
| AwsTestMixin::TearDown(); |
| } |
| |
| protected: |
| MinioTestServer minio_; |
| std::unique_ptr<Aws::Client::ClientConfiguration> client_config_; |
| Aws::Auth::AWSCredentials credentials_; |
| std::unique_ptr<Aws::S3::S3Client> client_; |
| }; |
| |
| void AssertGetObject(Aws::S3::Model::GetObjectResult& result, |
| const std::string& expected) { |
| auto length = static_cast<int64_t>(expected.length()); |
| ASSERT_EQ(result.GetContentLength(), length); |
| auto& stream = result.GetBody(); |
| std::string actual; |
| actual.resize(length + 1); |
| stream.read(&actual[0], length + 1); |
| ASSERT_EQ(stream.gcount(), length); // EOF was reached before length + 1 |
| actual.resize(length); |
| ASSERT_EQ(actual.size(), expected.size()); |
| ASSERT_TRUE(actual == expected); // Avoid ASSERT_EQ on large data |
| } |
| |
| void AssertObjectContents(Aws::S3::S3Client* client, const std::string& bucket, |
| const std::string& key, const std::string& expected) { |
| Aws::S3::Model::GetObjectRequest req; |
| req.SetBucket(ToAwsString(bucket)); |
| req.SetKey(ToAwsString(key)); |
| ARROW_AWS_ASSIGN_OR_FAIL(auto result, client->GetObject(req)); |
| AssertGetObject(result, expected); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////// |
| // S3Options tests |
| |
| class S3OptionsTest : public AwsTestMixin {}; |
| |
| TEST_F(S3OptionsTest, FromUri) { |
| std::string path; |
| S3Options options; |
| |
| ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3://", &path)); |
| ASSERT_EQ(options.region, ""); |
| ASSERT_EQ(options.scheme, "https"); |
| ASSERT_EQ(options.endpoint_override, ""); |
| ASSERT_EQ(path, ""); |
| |
| ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3:", &path)); |
| ASSERT_EQ(path, ""); |
| |
| ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3://access:secret@mybucket", &path)); |
| ASSERT_EQ(path, "mybucket"); |
| const auto creds = options.credentials_provider->GetAWSCredentials(); |
| ASSERT_EQ(creds.GetAWSAccessKeyId(), "access"); |
| ASSERT_EQ(creds.GetAWSSecretKey(), "secret"); |
| |
| ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3://mybucket/", &path)); |
| ASSERT_NE(options.region, ""); // Some region was chosen |
| ASSERT_EQ(options.scheme, "https"); |
| ASSERT_EQ(options.endpoint_override, ""); |
| ASSERT_EQ(path, "mybucket"); |
| |
| ASSERT_OK_AND_ASSIGN(options, S3Options::FromUri("s3://mybucket/foo/bar/", &path)); |
| ASSERT_NE(options.region, ""); |
| ASSERT_EQ(options.scheme, "https"); |
| ASSERT_EQ(options.endpoint_override, ""); |
| ASSERT_EQ(path, "mybucket/foo/bar"); |
| |
| // Region resolution with a well-known bucket |
| ASSERT_OK_AND_ASSIGN( |
| options, S3Options::FromUri("s3://aws-earth-mo-atmospheric-ukv-prd/", &path)); |
| ASSERT_EQ(options.region, "eu-west-2"); |
| |
| // Explicit region override |
| ASSERT_OK_AND_ASSIGN( |
| options, |
| S3Options::FromUri( |
| "s3://mybucket/foo/bar/?region=utopia&endpoint_override=localhost&scheme=http", |
| &path)); |
| ASSERT_EQ(options.region, "utopia"); |
| ASSERT_EQ(options.scheme, "http"); |
| ASSERT_EQ(options.endpoint_override, "localhost"); |
| ASSERT_EQ(path, "mybucket/foo/bar"); |
| |
| // Missing bucket name |
| ASSERT_RAISES(Invalid, S3Options::FromUri("s3:///foo/bar/", &path)); |
| |
| // Invalid option |
| ASSERT_RAISES(Invalid, S3Options::FromUri("s3://mybucket/?xxx=zzz", &path)); |
| } |
| |
| TEST_F(S3OptionsTest, FromAccessKey) { |
| S3Options options; |
| |
| // session token is optional and should default to empty string |
| options = S3Options::FromAccessKey("access", "secret"); |
| ASSERT_EQ(options.GetAccessKey(), "access"); |
| ASSERT_EQ(options.GetSecretKey(), "secret"); |
| ASSERT_EQ(options.GetSessionToken(), ""); |
| |
| options = S3Options::FromAccessKey("access", "secret", "token"); |
| ASSERT_EQ(options.GetAccessKey(), "access"); |
| ASSERT_EQ(options.GetSecretKey(), "secret"); |
| ASSERT_EQ(options.GetSessionToken(), "token"); |
| } |
| |
| TEST_F(S3OptionsTest, FromAssumeRole) { |
| S3Options options; |
| |
| // arn should be only required argument |
| options = S3Options::FromAssumeRole("my_role_arn"); |
| options = S3Options::FromAssumeRole("my_role_arn", "session"); |
| options = S3Options::FromAssumeRole("my_role_arn", "session", "id"); |
| options = S3Options::FromAssumeRole("my_role_arn", "session", "id", 42); |
| |
| // test w/ custom STSClient (will not use DefaultAWSCredentialsProviderChain) |
| Aws::Auth::AWSCredentials test_creds = Aws::Auth::AWSCredentials("access", "secret"); |
| std::shared_ptr<Aws::STS::STSClient> sts_client = |
| std::make_shared<Aws::STS::STSClient>(Aws::STS::STSClient(test_creds)); |
| options = S3Options::FromAssumeRole("my_role_arn", "session", "id", 42, sts_client); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////// |
| // Region resolution test |
| |
| class S3RegionResolutionTest : public AwsTestMixin {}; |
| |
| TEST_F(S3RegionResolutionTest, PublicBucket) { |
| ASSERT_OK_AND_EQ("us-east-2", ResolveBucketRegion("ursa-labs-taxi-data")); |
| |
| // Taken from a registry of open S3-hosted datasets |
| // at https://github.com/awslabs/open-data-registry |
| ASSERT_OK_AND_EQ("eu-west-2", ResolveBucketRegion("aws-earth-mo-atmospheric-ukv-prd")); |
| // Same again, cached |
| ASSERT_OK_AND_EQ("eu-west-2", ResolveBucketRegion("aws-earth-mo-atmospheric-ukv-prd")); |
| } |
| |
| TEST_F(S3RegionResolutionTest, RestrictedBucket) { |
| ASSERT_OK_AND_EQ("us-west-2", ResolveBucketRegion("ursa-labs-r-test")); |
| // Same again, cached |
| ASSERT_OK_AND_EQ("us-west-2", ResolveBucketRegion("ursa-labs-r-test")); |
| } |
| |
| TEST_F(S3RegionResolutionTest, NonExistentBucket) { |
| auto maybe_region = ResolveBucketRegion("ursa-labs-non-existent-bucket"); |
| ASSERT_RAISES(IOError, maybe_region); |
| ASSERT_THAT(maybe_region.status().message(), |
| ::testing::HasSubstr("Bucket 'ursa-labs-non-existent-bucket' not found")); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////// |
| // S3FileSystem region test |
| |
| class S3FileSystemRegionTest : public AwsTestMixin {}; |
| |
| TEST_F(S3FileSystemRegionTest, Default) { |
| ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("s3://")); |
| auto s3fs = checked_pointer_cast<S3FileSystem>(fs); |
| ASSERT_EQ(s3fs->region(), "us-east-1"); |
| } |
| |
| // Skipped on Windows, as the AWS SDK ignores runtime environment changes: |
| // https://github.com/aws/aws-sdk-cpp/issues/1476 |
| |
| #ifndef _WIN32 |
| TEST_F(S3FileSystemRegionTest, EnvironmentVariable) { |
| // Region override with environment variable (AWS SDK >= 1.8) |
| EnvVarGuard region_guard("AWS_DEFAULT_REGION", "eu-north-1"); |
| |
| ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri("s3://")); |
| auto s3fs = checked_pointer_cast<S3FileSystem>(fs); |
| |
| if (Aws::Version::GetVersionMajor() > 1 || Aws::Version::GetVersionMinor() >= 8) { |
| ASSERT_EQ(s3fs->region(), "eu-north-1"); |
| } else { |
| ASSERT_EQ(s3fs->region(), "us-east-1"); |
| } |
| } |
| #endif |
| |
| //////////////////////////////////////////////////////////////////////////// |
| // Basic test for the Minio test server. |
| |
| class TestMinioServer : public S3TestMixin { |
| public: |
| void SetUp() override { S3TestMixin::SetUp(); } |
| |
| protected: |
| }; |
| |
| TEST_F(TestMinioServer, Connect) { |
| // Just a dummy connection test. Check that we can list buckets, |
| // and that there are none (the server is launched in an empty temp dir). |
| ARROW_AWS_ASSIGN_OR_FAIL(auto bucket_list, client_->ListBuckets()); |
| ASSERT_EQ(bucket_list.GetBuckets().size(), 0); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////// |
| // Concrete S3 tests |
| |
| class TestS3FS : public S3TestMixin { |
| public: |
| void SetUp() override { |
| S3TestMixin::SetUp(); |
| MakeFileSystem(); |
| // Set up test bucket |
| { |
| Aws::S3::Model::CreateBucketRequest req; |
| req.SetBucket(ToAwsString("bucket")); |
| ASSERT_OK(OutcomeToStatus(client_->CreateBucket(req))); |
| req.SetBucket(ToAwsString("empty-bucket")); |
| ASSERT_OK(OutcomeToStatus(client_->CreateBucket(req))); |
| } |
| { |
| Aws::S3::Model::PutObjectRequest req; |
| req.SetBucket(ToAwsString("bucket")); |
| req.SetKey(ToAwsString("emptydir/")); |
| ASSERT_OK(OutcomeToStatus(client_->PutObject(req))); |
| // NOTE: no need to create intermediate "directories" somedir/ and |
| // somedir/subdir/ |
| req.SetKey(ToAwsString("somedir/subdir/subfile")); |
| req.SetBody(std::make_shared<std::stringstream>("sub data")); |
| ASSERT_OK(OutcomeToStatus(client_->PutObject(req))); |
| req.SetKey(ToAwsString("somefile")); |
| req.SetBody(std::make_shared<std::stringstream>("some data")); |
| ASSERT_OK(OutcomeToStatus(client_->PutObject(req))); |
| } |
| } |
| |
| void MakeFileSystem() { |
| options_.ConfigureAccessKey(minio_.access_key(), minio_.secret_key()); |
| options_.scheme = "http"; |
| options_.endpoint_override = minio_.connect_string(); |
| ASSERT_OK_AND_ASSIGN(fs_, S3FileSystem::Make(options_)); |
| } |
| |
| void TestOpenOutputStream() { |
| std::shared_ptr<io::OutputStream> stream; |
| |
| // Nonexistent |
| ASSERT_RAISES(IOError, fs_->OpenOutputStream("nonexistent-bucket/somefile")); |
| |
| // Create new empty file |
| ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/newfile1")); |
| ASSERT_OK(stream->Close()); |
| AssertObjectContents(client_.get(), "bucket", "newfile1", ""); |
| |
| // Create new file with 1 small write |
| ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/newfile2")); |
| ASSERT_OK(stream->Write("some data")); |
| ASSERT_OK(stream->Close()); |
| AssertObjectContents(client_.get(), "bucket", "newfile2", "some data"); |
| |
| // Create new file with 3 small writes |
| ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/newfile3")); |
| ASSERT_OK(stream->Write("some ")); |
| ASSERT_OK(stream->Write("")); |
| ASSERT_OK(stream->Write("new data")); |
| ASSERT_OK(stream->Close()); |
| AssertObjectContents(client_.get(), "bucket", "newfile3", "some new data"); |
| |
| // Create new file with some large writes |
| std::string s1, s2, s3, s4, s5, expected; |
| s1 = random_string(6000000, /*seed =*/42); // More than the 5 MB minimum part upload |
| s2 = "xxx"; |
| s3 = random_string(6000000, 43); |
| s4 = "zzz"; |
| s5 = random_string(600000, 44); |
| expected = s1 + s2 + s3 + s4 + s5; |
| ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/newfile4")); |
| for (auto input : {s1, s2, s3, s4, s5}) { |
| ASSERT_OK(stream->Write(input)); |
| // Clobber source contents. This shouldn't reflect in the data written. |
| input.front() = 'x'; |
| input.back() = 'x'; |
| } |
| ASSERT_OK(stream->Close()); |
| AssertObjectContents(client_.get(), "bucket", "newfile4", expected); |
| |
| // Overwrite |
| ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/newfile1")); |
| ASSERT_OK(stream->Write("overwritten data")); |
| ASSERT_OK(stream->Close()); |
| AssertObjectContents(client_.get(), "bucket", "newfile1", "overwritten data"); |
| |
| // Overwrite and make empty |
| ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/newfile1")); |
| ASSERT_OK(stream->Close()); |
| AssertObjectContents(client_.get(), "bucket", "newfile1", ""); |
| |
| // Open file and then lose filesystem reference |
| ASSERT_EQ(fs_.use_count(), 1); // needed for test to work |
| std::weak_ptr<S3FileSystem> weak_fs(fs_); |
| ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/newfile5")); |
| fs_.reset(); |
| ASSERT_OK(stream->Write("some other data")); |
| ASSERT_OK(stream->Close()); |
| ASSERT_TRUE(weak_fs.expired()); |
| AssertObjectContents(client_.get(), "bucket", "newfile5", "some other data"); |
| } |
| |
| void TestOpenOutputStreamAbort() { |
| std::shared_ptr<io::OutputStream> stream; |
| ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/somefile")); |
| ASSERT_OK(stream->Write("new data")); |
| // Abort() cancels the multipart upload. |
| ASSERT_OK(stream->Abort()); |
| ASSERT_EQ(stream->closed(), true); |
| AssertObjectContents(client_.get(), "bucket", "somefile", "some data"); |
| } |
| |
| void TestOpenOutputStreamDestructor() { |
| std::shared_ptr<io::OutputStream> stream; |
| ASSERT_OK_AND_ASSIGN(stream, fs_->OpenOutputStream("bucket/somefile")); |
| ASSERT_OK(stream->Write("new data")); |
| // Destructor implicitly closes stream and completes the multipart upload. |
| stream.reset(); |
| AssertObjectContents(client_.get(), "bucket", "somefile", "new data"); |
| } |
| |
| protected: |
| S3Options options_; |
| std::shared_ptr<S3FileSystem> fs_; |
| }; |
| |
| TEST_F(TestS3FS, GetFileInfoRoot) { AssertFileInfo(fs_.get(), "", FileType::Directory); } |
| |
| TEST_F(TestS3FS, GetFileInfoBucket) { |
| AssertFileInfo(fs_.get(), "bucket", FileType::Directory); |
| AssertFileInfo(fs_.get(), "empty-bucket", FileType::Directory); |
| AssertFileInfo(fs_.get(), "nonexistent-bucket", FileType::NotFound); |
| // Trailing slashes |
| AssertFileInfo(fs_.get(), "bucket/", FileType::Directory); |
| AssertFileInfo(fs_.get(), "empty-bucket/", FileType::Directory); |
| AssertFileInfo(fs_.get(), "nonexistent-bucket/", FileType::NotFound); |
| } |
| |
| TEST_F(TestS3FS, GetFileInfoObject) { |
| // "Directories" |
| AssertFileInfo(fs_.get(), "bucket/emptydir", FileType::Directory, kNoSize); |
| AssertFileInfo(fs_.get(), "bucket/somedir", FileType::Directory, kNoSize); |
| AssertFileInfo(fs_.get(), "bucket/somedir/subdir", FileType::Directory, kNoSize); |
| |
| // "Files" |
| AssertFileInfo(fs_.get(), "bucket/somefile", FileType::File, 9); |
| AssertFileInfo(fs_.get(), "bucket/somedir/subdir/subfile", FileType::File, 8); |
| |
| // Nonexistent |
| AssertFileInfo(fs_.get(), "bucket/emptyd", FileType::NotFound); |
| AssertFileInfo(fs_.get(), "bucket/somed", FileType::NotFound); |
| AssertFileInfo(fs_.get(), "non-existent-bucket/somed", FileType::NotFound); |
| |
| // Trailing slashes |
| AssertFileInfo(fs_.get(), "bucket/emptydir/", FileType::Directory, kNoSize); |
| AssertFileInfo(fs_.get(), "bucket/somefile/", FileType::File, 9); |
| AssertFileInfo(fs_.get(), "bucket/emptyd/", FileType::NotFound); |
| AssertFileInfo(fs_.get(), "non-existent-bucket/somed/", FileType::NotFound); |
| } |
| |
| TEST_F(TestS3FS, GetFileInfoSelector) { |
| FileSelector select; |
| std::vector<FileInfo> infos; |
| |
| // Root dir |
| select.base_dir = ""; |
| ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); |
| ASSERT_EQ(infos.size(), 2); |
| SortInfos(&infos); |
| AssertFileInfo(infos[0], "bucket", FileType::Directory); |
| AssertFileInfo(infos[1], "empty-bucket", FileType::Directory); |
| |
| // Empty bucket |
| select.base_dir = "empty-bucket"; |
| ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); |
| ASSERT_EQ(infos.size(), 0); |
| // Nonexistent bucket |
| select.base_dir = "nonexistent-bucket"; |
| ASSERT_RAISES(IOError, fs_->GetFileInfo(select)); |
| select.allow_not_found = true; |
| ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); |
| ASSERT_EQ(infos.size(), 0); |
| select.allow_not_found = false; |
| // Non-empty bucket |
| select.base_dir = "bucket"; |
| ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); |
| SortInfos(&infos); |
| ASSERT_EQ(infos.size(), 3); |
| AssertFileInfo(infos[0], "bucket/emptydir", FileType::Directory); |
| AssertFileInfo(infos[1], "bucket/somedir", FileType::Directory); |
| AssertFileInfo(infos[2], "bucket/somefile", FileType::File, 9); |
| |
| // Empty "directory" |
| select.base_dir = "bucket/emptydir"; |
| ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); |
| ASSERT_EQ(infos.size(), 0); |
| // Non-empty "directories" |
| select.base_dir = "bucket/somedir"; |
| ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); |
| ASSERT_EQ(infos.size(), 1); |
| AssertFileInfo(infos[0], "bucket/somedir/subdir", FileType::Directory); |
| select.base_dir = "bucket/somedir/subdir"; |
| ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); |
| ASSERT_EQ(infos.size(), 1); |
| AssertFileInfo(infos[0], "bucket/somedir/subdir/subfile", FileType::File, 8); |
| // Nonexistent |
| select.base_dir = "bucket/nonexistent"; |
| ASSERT_RAISES(IOError, fs_->GetFileInfo(select)); |
| select.allow_not_found = true; |
| ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); |
| ASSERT_EQ(infos.size(), 0); |
| select.allow_not_found = false; |
| |
| // Trailing slashes |
| select.base_dir = "empty-bucket/"; |
| ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); |
| ASSERT_EQ(infos.size(), 0); |
| select.base_dir = "nonexistent-bucket/"; |
| ASSERT_RAISES(IOError, fs_->GetFileInfo(select)); |
| select.base_dir = "bucket/"; |
| ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); |
| SortInfos(&infos); |
| ASSERT_EQ(infos.size(), 3); |
| } |
| |
| TEST_F(TestS3FS, GetFileInfoSelectorRecursive) { |
| FileSelector select; |
| std::vector<FileInfo> infos; |
| select.recursive = true; |
| |
| // Root dir |
| select.base_dir = ""; |
| ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); |
| ASSERT_EQ(infos.size(), 7); |
| SortInfos(&infos); |
| AssertFileInfo(infos[0], "bucket", FileType::Directory); |
| AssertFileInfo(infos[1], "bucket/emptydir", FileType::Directory); |
| AssertFileInfo(infos[2], "bucket/somedir", FileType::Directory); |
| AssertFileInfo(infos[3], "bucket/somedir/subdir", FileType::Directory); |
| AssertFileInfo(infos[4], "bucket/somedir/subdir/subfile", FileType::File, 8); |
| AssertFileInfo(infos[5], "bucket/somefile", FileType::File, 9); |
| AssertFileInfo(infos[6], "empty-bucket", FileType::Directory); |
| |
| // Empty bucket |
| select.base_dir = "empty-bucket"; |
| ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); |
| ASSERT_EQ(infos.size(), 0); |
| |
| // Non-empty bucket |
| select.base_dir = "bucket"; |
| ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); |
| SortInfos(&infos); |
| ASSERT_EQ(infos.size(), 5); |
| AssertFileInfo(infos[0], "bucket/emptydir", FileType::Directory); |
| AssertFileInfo(infos[1], "bucket/somedir", FileType::Directory); |
| AssertFileInfo(infos[2], "bucket/somedir/subdir", FileType::Directory); |
| AssertFileInfo(infos[3], "bucket/somedir/subdir/subfile", FileType::File, 8); |
| AssertFileInfo(infos[4], "bucket/somefile", FileType::File, 9); |
| |
| // Empty "directory" |
| select.base_dir = "bucket/emptydir"; |
| ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); |
| ASSERT_EQ(infos.size(), 0); |
| |
| // Non-empty "directories" |
| select.base_dir = "bucket/somedir"; |
| ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); |
| SortInfos(&infos); |
| ASSERT_EQ(infos.size(), 2); |
| AssertFileInfo(infos[0], "bucket/somedir/subdir", FileType::Directory); |
| AssertFileInfo(infos[1], "bucket/somedir/subdir/subfile", FileType::File, 8); |
| } |
| |
| TEST_F(TestS3FS, GetFileInfoGenerator) { |
| FileSelector select; |
| FileInfoVector infos; |
| |
| // Root dir |
| select.base_dir = ""; |
| CollectFileInfoGenerator(fs_->GetFileInfoGenerator(select), &infos); |
| ASSERT_EQ(infos.size(), 2); |
| SortInfos(&infos); |
| AssertFileInfo(infos[0], "bucket", FileType::Directory); |
| AssertFileInfo(infos[1], "empty-bucket", FileType::Directory); |
| |
| // Root dir, recursive |
| select.recursive = true; |
| CollectFileInfoGenerator(fs_->GetFileInfoGenerator(select), &infos); |
| ASSERT_EQ(infos.size(), 7); |
| SortInfos(&infos); |
| AssertFileInfo(infos[0], "bucket", FileType::Directory); |
| AssertFileInfo(infos[1], "bucket/emptydir", FileType::Directory); |
| AssertFileInfo(infos[2], "bucket/somedir", FileType::Directory); |
| AssertFileInfo(infos[3], "bucket/somedir/subdir", FileType::Directory); |
| AssertFileInfo(infos[4], "bucket/somedir/subdir/subfile", FileType::File, 8); |
| AssertFileInfo(infos[5], "bucket/somefile", FileType::File, 9); |
| AssertFileInfo(infos[6], "empty-bucket", FileType::Directory); |
| |
| // Non-root dir case is tested by generic tests |
| } |
| |
| TEST_F(TestS3FS, CreateDir) { |
| FileInfo st; |
| |
| // Existing bucket |
| ASSERT_OK(fs_->CreateDir("bucket")); |
| AssertFileInfo(fs_.get(), "bucket", FileType::Directory); |
| |
| // New bucket |
| AssertFileInfo(fs_.get(), "new-bucket", FileType::NotFound); |
| ASSERT_OK(fs_->CreateDir("new-bucket")); |
| AssertFileInfo(fs_.get(), "new-bucket", FileType::Directory); |
| |
| // Existing "directory" |
| AssertFileInfo(fs_.get(), "bucket/somedir", FileType::Directory); |
| ASSERT_OK(fs_->CreateDir("bucket/somedir")); |
| AssertFileInfo(fs_.get(), "bucket/somedir", FileType::Directory); |
| |
| AssertFileInfo(fs_.get(), "bucket/emptydir", FileType::Directory); |
| ASSERT_OK(fs_->CreateDir("bucket/emptydir")); |
| AssertFileInfo(fs_.get(), "bucket/emptydir", FileType::Directory); |
| |
| // New "directory" |
| AssertFileInfo(fs_.get(), "bucket/newdir", FileType::NotFound); |
| ASSERT_OK(fs_->CreateDir("bucket/newdir")); |
| AssertFileInfo(fs_.get(), "bucket/newdir", FileType::Directory); |
| |
| // New "directory", recursive |
| ASSERT_OK(fs_->CreateDir("bucket/newdir/newsub/newsubsub", /*recursive=*/true)); |
| AssertFileInfo(fs_.get(), "bucket/newdir/newsub", FileType::Directory); |
| AssertFileInfo(fs_.get(), "bucket/newdir/newsub/newsubsub", FileType::Directory); |
| |
| // Existing "file", should fail |
| ASSERT_RAISES(IOError, fs_->CreateDir("bucket/somefile")); |
| } |
| |
| TEST_F(TestS3FS, DeleteFile) { |
| // Bucket |
| ASSERT_RAISES(IOError, fs_->DeleteFile("bucket")); |
| ASSERT_RAISES(IOError, fs_->DeleteFile("empty-bucket")); |
| ASSERT_RAISES(IOError, fs_->DeleteFile("nonexistent-bucket")); |
| |
| // "File" |
| ASSERT_OK(fs_->DeleteFile("bucket/somefile")); |
| AssertFileInfo(fs_.get(), "bucket/somefile", FileType::NotFound); |
| ASSERT_RAISES(IOError, fs_->DeleteFile("bucket/somefile")); |
| ASSERT_RAISES(IOError, fs_->DeleteFile("bucket/nonexistent")); |
| |
| // "Directory" |
| ASSERT_RAISES(IOError, fs_->DeleteFile("bucket/somedir")); |
| AssertFileInfo(fs_.get(), "bucket/somedir", FileType::Directory); |
| } |
| |
| TEST_F(TestS3FS, DeleteDir) { |
| FileSelector select; |
| select.base_dir = "bucket"; |
| std::vector<FileInfo> infos; |
| |
| // Empty "directory" |
| ASSERT_OK(fs_->DeleteDir("bucket/emptydir")); |
| ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); |
| ASSERT_EQ(infos.size(), 2); |
| SortInfos(&infos); |
| AssertFileInfo(infos[0], "bucket/somedir", FileType::Directory); |
| AssertFileInfo(infos[1], "bucket/somefile", FileType::File); |
| |
| // Non-empty "directory" |
| ASSERT_OK(fs_->DeleteDir("bucket/somedir")); |
| ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); |
| ASSERT_EQ(infos.size(), 1); |
| AssertFileInfo(infos[0], "bucket/somefile", FileType::File); |
| |
| // Leaving parent "directory" empty |
| ASSERT_OK(fs_->CreateDir("bucket/newdir/newsub/newsubsub")); |
| ASSERT_OK(fs_->DeleteDir("bucket/newdir/newsub")); |
| ASSERT_OK_AND_ASSIGN(infos, fs_->GetFileInfo(select)); |
| ASSERT_EQ(infos.size(), 2); |
| SortInfos(&infos); |
| AssertFileInfo(infos[0], "bucket/newdir", FileType::Directory); // still exists |
| AssertFileInfo(infos[1], "bucket/somefile", FileType::File); |
| |
| // Bucket |
| ASSERT_OK(fs_->DeleteDir("bucket")); |
| AssertFileInfo(fs_.get(), "bucket", FileType::NotFound); |
| } |
| |
| TEST_F(TestS3FS, CopyFile) { |
| // "File" |
| ASSERT_OK(fs_->CopyFile("bucket/somefile", "bucket/newfile")); |
| AssertFileInfo(fs_.get(), "bucket/newfile", FileType::File, 9); |
| AssertObjectContents(client_.get(), "bucket", "newfile", "some data"); |
| AssertFileInfo(fs_.get(), "bucket/somefile", FileType::File, 9); // still exists |
| // Overwrite |
| ASSERT_OK(fs_->CopyFile("bucket/somedir/subdir/subfile", "bucket/newfile")); |
| AssertFileInfo(fs_.get(), "bucket/newfile", FileType::File, 8); |
| AssertObjectContents(client_.get(), "bucket", "newfile", "sub data"); |
| |
| // Nonexistent |
| ASSERT_RAISES(IOError, fs_->CopyFile("bucket/nonexistent", "bucket/newfile2")); |
| ASSERT_RAISES(IOError, fs_->CopyFile("nonexistent-bucket/somefile", "bucket/newfile2")); |
| ASSERT_RAISES(IOError, fs_->CopyFile("bucket/somefile", "nonexistent-bucket/newfile2")); |
| AssertFileInfo(fs_.get(), "bucket/newfile2", FileType::NotFound); |
| } |
| |
| TEST_F(TestS3FS, Move) { |
| // "File" |
| ASSERT_OK(fs_->Move("bucket/somefile", "bucket/newfile")); |
| AssertFileInfo(fs_.get(), "bucket/newfile", FileType::File, 9); |
| AssertObjectContents(client_.get(), "bucket", "newfile", "some data"); |
| // Source was deleted |
| AssertFileInfo(fs_.get(), "bucket/somefile", FileType::NotFound); |
| |
| // Overwrite |
| ASSERT_OK(fs_->Move("bucket/somedir/subdir/subfile", "bucket/newfile")); |
| AssertFileInfo(fs_.get(), "bucket/newfile", FileType::File, 8); |
| AssertObjectContents(client_.get(), "bucket", "newfile", "sub data"); |
| // Source was deleted |
| AssertFileInfo(fs_.get(), "bucket/somedir/subdir/subfile", FileType::NotFound); |
| |
| // Nonexistent |
| ASSERT_RAISES(IOError, fs_->Move("bucket/non-existent", "bucket/newfile2")); |
| ASSERT_RAISES(IOError, fs_->Move("nonexistent-bucket/somefile", "bucket/newfile2")); |
| ASSERT_RAISES(IOError, fs_->Move("bucket/somefile", "nonexistent-bucket/newfile2")); |
| AssertFileInfo(fs_.get(), "bucket/newfile2", FileType::NotFound); |
| } |
| |
| TEST_F(TestS3FS, OpenInputStream) { |
| std::shared_ptr<io::InputStream> stream; |
| std::shared_ptr<Buffer> buf; |
| |
| // Nonexistent |
| ASSERT_RAISES(IOError, fs_->OpenInputStream("nonexistent-bucket/somefile")); |
| ASSERT_RAISES(IOError, fs_->OpenInputStream("bucket/zzzt")); |
| |
| // "Files" |
| ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream("bucket/somefile")); |
| ASSERT_OK_AND_ASSIGN(buf, stream->Read(2)); |
| AssertBufferEqual(*buf, "so"); |
| ASSERT_OK_AND_ASSIGN(buf, stream->Read(5)); |
| AssertBufferEqual(*buf, "me da"); |
| ASSERT_OK_AND_ASSIGN(buf, stream->Read(5)); |
| AssertBufferEqual(*buf, "ta"); |
| ASSERT_OK_AND_ASSIGN(buf, stream->Read(5)); |
| AssertBufferEqual(*buf, ""); |
| |
| ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream("bucket/somedir/subdir/subfile")); |
| ASSERT_OK_AND_ASSIGN(buf, stream->Read(100)); |
| AssertBufferEqual(*buf, "sub data"); |
| ASSERT_OK_AND_ASSIGN(buf, stream->Read(100)); |
| AssertBufferEqual(*buf, ""); |
| ASSERT_OK(stream->Close()); |
| |
| // "Directories" |
| ASSERT_RAISES(IOError, fs_->OpenInputStream("bucket/emptydir")); |
| ASSERT_RAISES(IOError, fs_->OpenInputStream("bucket/somedir")); |
| ASSERT_RAISES(IOError, fs_->OpenInputStream("bucket")); |
| |
| // Open file and then lose filesystem reference |
| ASSERT_EQ(fs_.use_count(), 1); // needed for test to work |
| std::weak_ptr<S3FileSystem> weak_fs(fs_); |
| ASSERT_OK_AND_ASSIGN(stream, fs_->OpenInputStream("bucket/somefile")); |
| fs_.reset(); |
| ASSERT_OK_AND_ASSIGN(buf, stream->Read(10)); |
| AssertBufferEqual(*buf, "some data"); |
| ASSERT_OK(stream->Close()); |
| ASSERT_TRUE(weak_fs.expired()); |
| } |
| |
| TEST_F(TestS3FS, OpenInputFile) { |
| std::shared_ptr<io::RandomAccessFile> file; |
| std::shared_ptr<Buffer> buf; |
| |
| // Nonexistent |
| ASSERT_RAISES(IOError, fs_->OpenInputFile("nonexistent-bucket/somefile")); |
| ASSERT_RAISES(IOError, fs_->OpenInputFile("bucket/zzzt")); |
| |
| // "Files" |
| ASSERT_OK_AND_ASSIGN(file, fs_->OpenInputFile("bucket/somefile")); |
| ASSERT_OK_AND_EQ(9, file->GetSize()); |
| ASSERT_OK_AND_ASSIGN(buf, file->Read(4)); |
| AssertBufferEqual(*buf, "some"); |
| ASSERT_OK_AND_EQ(9, file->GetSize()); |
| ASSERT_OK_AND_EQ(4, file->Tell()); |
| |
| ASSERT_OK_AND_ASSIGN(buf, file->ReadAt(2, 5)); |
| AssertBufferEqual(*buf, "me da"); |
| ASSERT_OK_AND_EQ(4, file->Tell()); |
| ASSERT_OK_AND_ASSIGN(buf, file->ReadAt(5, 20)); |
| AssertBufferEqual(*buf, "data"); |
| ASSERT_OK_AND_ASSIGN(buf, file->ReadAt(9, 20)); |
| AssertBufferEqual(*buf, ""); |
| |
| char result[10]; |
| ASSERT_OK_AND_EQ(5, file->ReadAt(2, 5, &result)); |
| ASSERT_OK_AND_EQ(4, file->ReadAt(5, 20, &result)); |
| ASSERT_OK_AND_EQ(0, file->ReadAt(9, 0, &result)); |
| |
| // Reading past end of file |
| ASSERT_RAISES(IOError, file->ReadAt(10, 20)); |
| |
| ASSERT_OK(file->Seek(5)); |
| ASSERT_OK_AND_ASSIGN(buf, file->Read(2)); |
| AssertBufferEqual(*buf, "da"); |
| ASSERT_OK(file->Seek(9)); |
| ASSERT_OK_AND_ASSIGN(buf, file->Read(2)); |
| AssertBufferEqual(*buf, ""); |
| // Seeking past end of file |
| ASSERT_RAISES(IOError, file->Seek(10)); |
| } |
| |
| TEST_F(TestS3FS, OpenOutputStreamBackgroundWrites) { TestOpenOutputStream(); } |
| |
| TEST_F(TestS3FS, OpenOutputStreamSyncWrites) { |
| options_.background_writes = false; |
| MakeFileSystem(); |
| TestOpenOutputStream(); |
| } |
| |
| TEST_F(TestS3FS, OpenOutputStreamAbortBackgroundWrites) { TestOpenOutputStreamAbort(); } |
| |
| TEST_F(TestS3FS, OpenOutputStreamAbortSyncWrites) { |
| options_.background_writes = false; |
| MakeFileSystem(); |
| TestOpenOutputStreamAbort(); |
| } |
| |
| TEST_F(TestS3FS, OpenOutputStreamDestructorBackgroundWrites) { |
| TestOpenOutputStreamDestructor(); |
| } |
| |
| TEST_F(TestS3FS, OpenOutputStreamDestructorSyncWrite) { |
| options_.background_writes = false; |
| MakeFileSystem(); |
| TestOpenOutputStreamDestructor(); |
| } |
| |
| TEST_F(TestS3FS, FileSystemFromUri) { |
| std::stringstream ss; |
| ss << "s3://" << minio_.access_key() << ":" << minio_.secret_key() |
| << "@bucket/somedir/subdir/subfile" |
| << "?scheme=http&endpoint_override=" << UriEscape(minio_.connect_string()); |
| |
| std::string path; |
| ASSERT_OK_AND_ASSIGN(auto fs, FileSystemFromUri(ss.str(), &path)); |
| ASSERT_EQ(path, "bucket/somedir/subdir/subfile"); |
| |
| // Check the filesystem has the right connection parameters |
| AssertFileInfo(fs.get(), path, FileType::File, 8); |
| } |
| |
| //////////////////////////////////////////////////////////////////////////// |
| // Generic S3 tests |
| |
| class TestS3FSGeneric : public S3TestMixin, public GenericFileSystemTest { |
| public: |
| void SetUp() override { |
| S3TestMixin::SetUp(); |
| // Set up test bucket |
| { |
| Aws::S3::Model::CreateBucketRequest req; |
| req.SetBucket(ToAwsString("s3fs-test-bucket")); |
| ASSERT_OK(OutcomeToStatus(client_->CreateBucket(req))); |
| } |
| |
| options_.ConfigureAccessKey(minio_.access_key(), minio_.secret_key()); |
| options_.scheme = "http"; |
| options_.endpoint_override = minio_.connect_string(); |
| ASSERT_OK_AND_ASSIGN(s3fs_, S3FileSystem::Make(options_)); |
| fs_ = std::make_shared<SubTreeFileSystem>("s3fs-test-bucket", s3fs_); |
| } |
| |
| protected: |
| std::shared_ptr<FileSystem> GetEmptyFileSystem() override { return fs_; } |
| |
| bool have_implicit_directories() const override { return true; } |
| bool allow_write_file_over_dir() const override { return true; } |
| bool allow_move_dir() const override { return false; } |
| bool allow_append_to_file() const override { return false; } |
| bool have_directory_mtimes() const override { return false; } |
| bool have_flaky_directory_tree_deletion() const override { |
| #ifdef _WIN32 |
| // Recent Minio versions on Windows may not register deletion of all |
| // directories in a tree when doing a bulk delete. |
| return true; |
| #else |
| return false; |
| #endif |
| } |
| |
| S3Options options_; |
| std::shared_ptr<S3FileSystem> s3fs_; |
| std::shared_ptr<FileSystem> fs_; |
| }; |
| |
| GENERIC_FS_TEST_FUNCTIONS(TestS3FSGeneric); |
| |
| } // namespace fs |
| } // namespace arrow |