blob: 424e83fd53a5605dd2720f3914161175d4d20369 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "S3TestsFixture.h"
#include "processors/ListS3.h"
using ListS3TestsFixture = FlowProducerS3TestsFixture<minifi::aws::processors::ListS3>;
TEST_CASE_METHOD(ListS3TestsFixture, "Test AWS credential setting", "[awsCredentials]") {
setBucket();
SECTION("Test property credentials") {
setAccesKeyCredentialsInProcessor();
}
SECTION("Test credentials setting from AWS Credentials service") {
setAccessKeyCredentialsInController();
setCredentialsService();
}
SECTION("Test credentials file setting") {
setCredentialFile(s3_processor);
}
SECTION("Test credentials file setting from AWS Credentials service") {
setCredentialFile(aws_credentials_service);
setCredentialsService();
}
SECTION("Test credentials setting using default credential chain") {
setUseDefaultCredentialsChain(s3_processor);
}
SECTION("Test credentials setting from AWS Credentials service using default credential chain") {
setUseDefaultCredentialsChain(aws_credentials_service);
setCredentialsService();
}
test_controller.runSession(plan, true);
REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSAccessKeyId() == "key");
REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSSecretKey() == "secret");
}
TEST_CASE_METHOD(ListS3TestsFixture, "Test required property not set", "[awsS3Errors]") {
SECTION("Test credentials not set") {
}
SECTION("Test no bucket is set") {
setAccesKeyCredentialsInProcessor();
}
REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
}
TEST_CASE_METHOD(ListS3TestsFixture, "Non blank validator tests") {
setRequiredProperties();
CHECK_FALSE(plan->setProperty(s3_processor, "Region", ""));
}
TEST_CASE_METHOD(ListS3TestsFixture, "Test proxy setting", "[awsS3Proxy]") {
setRequiredProperties();
setProxy();
test_controller.runSession(plan, true);
checkProxySettings();
}
TEST_CASE_METHOD(ListS3TestsFixture, "Test listing without versioning", "[awsS3ListObjects]") {
setRequiredProperties();
plan->setProperty(s3_processor, "Region", minifi::aws::processors::region::US_EAST_1);
plan->setProperty(s3_processor, "Communications Timeout", "10 Sec");
plan->setProperty(s3_processor, "Endpoint Override URL", "http://localhost:1234");
test_controller.runSession(plan, true);
for (std::size_t i = 0; i < S3_OBJECT_COUNT; ++i) {
REQUIRE(LogTestController::getInstance().contains("key:filename value:" + S3_KEY_PREFIX + std::to_string(i)));
REQUIRE(LogTestController::getInstance().contains("key:s3.etag value:" + S3_ETAG_PREFIX + std::to_string(i)));
}
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.bucket value:" + S3_BUCKET) == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest value:true") == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:") == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS) + "\n") == S3_OBJECT_COUNT / 2);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.length value:" + std::to_string(S3_OBJECT_SIZE)) == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version") == 0);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.storeClass value:" + S3_STORAGE_CLASS_STR) == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.tag") == 0);
REQUIRE(!mock_s3_request_sender_ptr->list_object_request.ContinuationTokenHasBeenSet());
REQUIRE(mock_s3_request_sender_ptr->getClientConfig().region == minifi::aws::processors::region::US_EAST_1);
REQUIRE(mock_s3_request_sender_ptr->getClientConfig().connectTimeoutMs == 10000);
REQUIRE(mock_s3_request_sender_ptr->getClientConfig().endpointOverride == "http://localhost:1234");
}
TEST_CASE_METHOD(ListS3TestsFixture, "Test listing with versioning", "[awsS3ListVersions]") {
setRequiredProperties();
plan->setProperty(s3_processor, "Use Versions", "true");
test_controller.runSession(plan, true);
for (std::size_t i = 0; i < S3_OBJECT_COUNT; ++i) {
// 2 versions of every object
REQUIRE(LogTestController::getInstance().countOccurrences("key:filename value:" + S3_KEY_PREFIX + std::to_string(i)) == 2);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.etag value:" + S3_ETAG_PREFIX + std::to_string(i)) == 2);
}
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version value:" + S3_VERSION_1) == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version value:" + S3_VERSION_2) == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.bucket value:" + S3_BUCKET) == S3_OBJECT_COUNT * 2);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest value:true") == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest value:false") == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:") == S3_OBJECT_COUNT * 2);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS) + "\n") == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.length value:" + std::to_string(S3_OBJECT_SIZE)) == S3_OBJECT_COUNT * 2);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.storeClass value:" + S3_STORAGE_CLASS_STR) == S3_OBJECT_COUNT * 2);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.tag") == 0);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.user.metadata") == 0);
REQUIRE(!mock_s3_request_sender_ptr->list_version_request.KeyMarkerHasBeenSet());
REQUIRE(!mock_s3_request_sender_ptr->list_version_request.VersionIdMarkerHasBeenSet());
}
TEST_CASE_METHOD(ListS3TestsFixture, "Test if optional request values are set without versioning", "[awsS3ListOptionalValues]") {
setRequiredProperties();
plan->setProperty(s3_processor, "Delimiter", "/");
plan->setProperty(s3_processor, "Prefix", "test/");
test_controller.runSession(plan, true);
REQUIRE(mock_s3_request_sender_ptr->list_object_request.GetDelimiter() == "/");
REQUIRE(mock_s3_request_sender_ptr->list_object_request.GetPrefix() == "test/");
}
TEST_CASE_METHOD(ListS3TestsFixture, "Test if optional request values are set with versioning", "[awsS3ListOptionalValues]") {
setRequiredProperties();
plan->setProperty(s3_processor, "Delimiter", "/");
plan->setProperty(s3_processor, "Prefix", "test/");
plan->setProperty(s3_processor, "Use Versions", "true");
test_controller.runSession(plan, true);
REQUIRE(mock_s3_request_sender_ptr->list_version_request.GetDelimiter() == "/");
REQUIRE(mock_s3_request_sender_ptr->list_version_request.GetPrefix() == "test/");
}
TEST_CASE_METHOD(ListS3TestsFixture, "Test minimum age property handling with non-versioned objects", "[awsS3ListMinAge]") {
setRequiredProperties();
plan->setProperty(s3_processor, "Minimum Object Age", "120 days");
test_controller.runSession(plan, true);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:") == S3_OBJECT_COUNT / 2);
}
TEST_CASE_METHOD(ListS3TestsFixture, "Test minimum age property handling with versioned objects", "[awsS3ListMinAge]") {
setRequiredProperties();
plan->setProperty(s3_processor, "Minimum Object Age", "120 days");
plan->setProperty(s3_processor, "Use Versions", "true");
test_controller.runSession(plan, true);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:") == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version value:" + S3_VERSION_1) == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version value:" + S3_VERSION_2) == 0);
}
TEST_CASE_METHOD(ListS3TestsFixture, "Test write object tags", "[awsS3ListTags]") {
setRequiredProperties();
plan->setProperty(s3_processor, "Region", minifi::aws::processors::region::US_EAST_1);
plan->setProperty(s3_processor, "Communications Timeout", "10 Sec");
plan->setProperty(s3_processor, "Endpoint Override URL", "http://localhost:1234");
plan->setProperty(s3_processor, "Write Object Tags", "true");
test_controller.runSession(plan, true);
for (const auto& tag : S3_OBJECT_TAGS) {
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.tag." + tag.first + " value:" + tag.second) == S3_OBJECT_COUNT);
}
REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSAccessKeyId() == "key");
REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSSecretKey() == "secret");
REQUIRE(mock_s3_request_sender_ptr->getClientConfig().region == minifi::aws::processors::region::US_EAST_1);
REQUIRE(mock_s3_request_sender_ptr->getClientConfig().connectTimeoutMs == 10000);
REQUIRE(mock_s3_request_sender_ptr->getClientConfig().endpointOverride == "http://localhost:1234");
}
TEST_CASE_METHOD(ListS3TestsFixture, "Test write user metadata", "[awsS3ListMetadata]") {
setRequiredProperties();
plan->setProperty(s3_processor, "Region", minifi::aws::processors::region::US_EAST_1);
plan->setProperty(s3_processor, "Communications Timeout", "10 Sec");
plan->setProperty(s3_processor, "Endpoint Override URL", "http://localhost:1234");
plan->setProperty(s3_processor, "Write User Metadata", "true");
plan->setProperty(s3_processor, "Requester Pays", "true");
test_controller.runSession(plan, true);
for (const auto& metadata : S3_OBJECT_USER_METADATA) {
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.user.metadata." + metadata.first + " value:" + metadata.second) == S3_OBJECT_COUNT);
}
REQUIRE(mock_s3_request_sender_ptr->head_object_request.GetRequestPayer() == Aws::S3::Model::RequestPayer::requester);
REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSAccessKeyId() == "key");
REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSSecretKey() == "secret");
REQUIRE(mock_s3_request_sender_ptr->getClientConfig().region == minifi::aws::processors::region::US_EAST_1);
REQUIRE(mock_s3_request_sender_ptr->getClientConfig().connectTimeoutMs == 10000);
REQUIRE(mock_s3_request_sender_ptr->getClientConfig().endpointOverride == "http://localhost:1234");
}
TEST_CASE_METHOD(ListS3TestsFixture, "Test truncated listing without versioning", "[awsS3ListObjects]") {
setRequiredProperties();
mock_s3_request_sender_ptr->setListingTruncated(true);
test_controller.runSession(plan, true);
for (std::size_t i = 0; i < S3_OBJECT_COUNT; ++i) {
REQUIRE(LogTestController::getInstance().contains("key:filename value:" + S3_KEY_PREFIX + std::to_string(i)));
REQUIRE(LogTestController::getInstance().contains("key:s3.etag value:" + S3_ETAG_PREFIX + std::to_string(i)));
}
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.bucket value:" + S3_BUCKET) == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest value:true") == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:") == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS) + "\n") == S3_OBJECT_COUNT / 2);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.length value:" + std::to_string(S3_OBJECT_SIZE)) == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version") == 0);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.storeClass value:" + S3_STORAGE_CLASS_STR) == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.tag") == 0);
REQUIRE(mock_s3_request_sender_ptr->list_object_request.ContinuationTokenHasBeenSet());
REQUIRE(mock_s3_request_sender_ptr->list_object_request.GetContinuationToken() == S3_CONTINUATION_TOKEN);
}
TEST_CASE_METHOD(ListS3TestsFixture, "Test truncated listing with versioning", "[awsS3ListVersions]") {
setRequiredProperties();
plan->setProperty(s3_processor, "Use Versions", "true");
mock_s3_request_sender_ptr->setListingTruncated(true);
test_controller.runSession(plan, true);
for (std::size_t i = 0; i < S3_OBJECT_COUNT; ++i) {
// 2 versions of every object
REQUIRE(LogTestController::getInstance().countOccurrences("key:filename value:" + S3_KEY_PREFIX + std::to_string(i)) == 2);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.etag value:" + S3_ETAG_PREFIX + std::to_string(i)) == 2);
}
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version value:" + S3_VERSION_1) == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.version value:" + S3_VERSION_2) == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.bucket value:" + S3_BUCKET) == S3_OBJECT_COUNT * 2);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest value:true") == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.isLatest value:false") == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:") == S3_OBJECT_COUNT * 2);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.lastModified value:" + std::to_string(S3_OBJECT_OLD_AGE_MILLISECONDS) + "\n") == S3_OBJECT_COUNT);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.length value:" + std::to_string(S3_OBJECT_SIZE)) == S3_OBJECT_COUNT * 2);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.storeClass value:" + S3_STORAGE_CLASS_STR) == S3_OBJECT_COUNT * 2);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.tag") == 0);
REQUIRE(LogTestController::getInstance().countOccurrences("key:s3.user.metadata") == 0);
REQUIRE(mock_s3_request_sender_ptr->list_version_request.KeyMarkerHasBeenSet());
REQUIRE(mock_s3_request_sender_ptr->list_version_request.GetKeyMarker() == S3_KEY_MARKER);
REQUIRE(mock_s3_request_sender_ptr->list_version_request.VersionIdMarkerHasBeenSet());
REQUIRE(mock_s3_request_sender_ptr->list_version_request.GetVersionIdMarker() == S3_VERSION_ID_MARKER);
}