blob: 571977a6e66bb93b7f1c23cebac4dcc45ecd25fb [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "S3TestsFixture.h"
#include "processors/PutS3Object.h"
#include "utils/IntegrationTestUtils.h"
namespace {
using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
class PutS3ObjectTestsFixture : public FlowProcessorS3TestsFixture<minifi::aws::processors::PutS3Object> {
public:
static void checkPutObjectResults() {
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.version value:" + S3_VERSION_1));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.etag value:" + S3_ETAG_UNQUOTED));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.expiration value:" + S3_EXPIRATION_DATE));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.sseAlgorithm value:" + S3_SSEALGORITHM_STR));
}
static void checkEmptyPutObjectResults() {
CHECK_FALSE(LogTestController::getInstance().contains("key:s3.version value:", std::chrono::seconds(0), std::chrono::milliseconds(0)));
CHECK_FALSE(LogTestController::getInstance().contains("key:s3.etag value:", std::chrono::seconds(0), std::chrono::milliseconds(0)));
CHECK_FALSE(LogTestController::getInstance().contains("key:s3.expiration value:", std::chrono::seconds(0), std::chrono::milliseconds(0)));
CHECK_FALSE(LogTestController::getInstance().contains("key:s3.sseAlgorithm value:", std::chrono::seconds(0), std::chrono::milliseconds(0)));
}
};
class PutS3ObjectLimitChanged : public minifi::aws::processors::PutS3Object {
protected:
friend class ::S3TestsFixture<PutS3ObjectLimitChanged>;
explicit PutS3ObjectLimitChanged(const std::string& name, const minifi::utils::Identifier& uuid, std::unique_ptr<minifi::aws::s3::S3RequestSender> s3_request_sender)
: PutS3Object(name, uuid, std::move(s3_request_sender)) {
}
uint64_t getMinPartSize() const override {
return 1;
}
};
class PutS3ObjectUploadLimitChangedTestsFixture : public FlowProcessorS3TestsFixture<PutS3ObjectLimitChanged> {
};
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test AWS credential setting", "[awsCredentials]") {
setBucket();
SECTION("Test property credentials") {
setAccesKeyCredentialsInProcessor();
}
SECTION("Test credentials setting from AWS Credentials service") {
setAccessKeyCredentialsInController();
setCredentialsService();
}
SECTION("Test credentials file setting") {
setCredentialFile(s3_processor);
}
SECTION("Test credentials file setting from AWS Credentials service") {
setCredentialFile(aws_credentials_service);
setCredentialsService();
}
SECTION("Test credentials setting using default credential chain") {
setUseDefaultCredentialsChain(s3_processor);
}
SECTION("Test credentials setting from AWS Credentials service using default credential chain") {
setUseDefaultCredentialsChain(aws_credentials_service);
setCredentialsService();
}
test_controller.runSession(plan);
CHECK(mock_s3_request_sender_ptr->getCredentials().GetAWSAccessKeyId() == "key");
CHECK(mock_s3_request_sender_ptr->getCredentials().GetAWSSecretKey() == "secret");
}
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test required property not set", "[awsS3Config]") {
SECTION("Test credentials not set") {
}
SECTION("Test no bucket is set") {
setAccesKeyCredentialsInProcessor();
}
SECTION("Test no object key is set") {
setRequiredProperties();
plan->setDynamicProperty(update_attribute, "filename", "");
}
SECTION("Test storage class is empty") {
setRequiredProperties();
plan->setProperty(s3_processor, "Storage Class", "");
}
SECTION("Test region is empty") {
setRequiredProperties();
plan->setProperty(s3_processor, "Region", "");
}
SECTION("Test no server side encryption is set") {
setRequiredProperties();
plan->setProperty(s3_processor, "Server Side Encryption", "");
}
REQUIRE_THROWS_AS(test_controller.runSession(plan), minifi::Exception);
}
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test incomplete credentials in credentials service", "[awsS3Config]") {
setBucket();
plan->setProperty(aws_credentials_service, "Secret Key", "secret");
setCredentialsService();
REQUIRE_THROWS_AS(test_controller.runSession(plan), minifi::Exception);
REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "AWS Credentials have not been set!"));
// Test that no invalid credentials file was set from previous properties
REQUIRE_FALSE(LogTestController::getInstance().contains("load configure file failed", std::chrono::seconds(0), std::chrono::milliseconds(0)));
}
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Check default client configuration", "[awsS3ClientConfig]") {
setRequiredProperties();
test_controller.runSession(plan);
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.bucket value:testBucket"));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.key value:" + INPUT_FILENAME));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.contenttype value:application/octet-stream"));
checkPutObjectResults();
CHECK(mock_s3_request_sender_ptr->put_object_request.GetContentType() == "application/octet-stream");
CHECK(mock_s3_request_sender_ptr->put_object_request.GetStorageClass() == Aws::S3::Model::StorageClass::STANDARD);
CHECK(mock_s3_request_sender_ptr->put_object_request.GetServerSideEncryption() == Aws::S3::Model::ServerSideEncryption::NOT_SET);
CHECK(mock_s3_request_sender_ptr->put_object_request.GetACL() == Aws::S3::Model::ObjectCannedACL::NOT_SET);
CHECK(mock_s3_request_sender_ptr->getClientConfig().region == minifi::aws::processors::region::US_WEST_2);
CHECK(mock_s3_request_sender_ptr->getClientConfig().connectTimeoutMs == 30000);
CHECK(mock_s3_request_sender_ptr->getClientConfig().endpointOverride.empty());
CHECK(mock_s3_request_sender_ptr->getClientConfig().proxyHost.empty());
CHECK(mock_s3_request_sender_ptr->getClientConfig().proxyUserName.empty());
CHECK(mock_s3_request_sender_ptr->getClientConfig().proxyPassword.empty());
CHECK(mock_s3_request_sender_ptr->getPutObjectRequestBody() == INPUT_DATA);
CHECK(mock_s3_request_sender_ptr->getUseVirtualAddressing());
}
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Check default client configuration with empty result", "[awsS3ClientConfig]") {
setRequiredProperties();
mock_s3_request_sender_ptr->returnEmptyS3Result();
test_controller.runSession(plan);
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.bucket value:testBucket"));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.key value:input_data.log"));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.contenttype value:application/octet-stream"));
checkEmptyPutObjectResults();
}
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Set non-default client configuration", "[awsS3ClientConfig]") {
setRequiredProperties();
plan->setProperty(s3_processor, "Object Key", "custom_key");
plan->setDynamicProperty(update_attribute, "test.contentType", "application/tar");
plan->setProperty(s3_processor, "Content Type", "${test.contentType}");
plan->setProperty(s3_processor, "Storage Class", "ReducedRedundancy");
plan->setProperty(s3_processor, "Region", minifi::aws::processors::region::AP_SOUTHEAST_3);
plan->setProperty(s3_processor, "Communications Timeout", "10 Sec");
plan->setDynamicProperty(update_attribute, "test.endpoint", "http://localhost:1234");
plan->setProperty(s3_processor, "Endpoint Override URL", "${test.endpoint}");
plan->setProperty(s3_processor, "Server Side Encryption", "AES256");
test_controller.runSession(plan);
checkPutObjectResults();
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.bucket value:testBucket"));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.key value:custom_key"));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.contenttype value:application/tar"));
CHECK(mock_s3_request_sender_ptr->put_object_request.GetContentType() == "application/tar");
CHECK(mock_s3_request_sender_ptr->put_object_request.GetStorageClass() == Aws::S3::Model::StorageClass::REDUCED_REDUNDANCY);
CHECK(mock_s3_request_sender_ptr->put_object_request.GetServerSideEncryption() == Aws::S3::Model::ServerSideEncryption::AES256);
CHECK(mock_s3_request_sender_ptr->getClientConfig().region == minifi::aws::processors::region::AP_SOUTHEAST_3);
CHECK(mock_s3_request_sender_ptr->getClientConfig().connectTimeoutMs == 10000);
CHECK(mock_s3_request_sender_ptr->getClientConfig().endpointOverride == "http://localhost:1234");
CHECK(mock_s3_request_sender_ptr->getPutObjectRequestBody() == INPUT_DATA);
CHECK(mock_s3_request_sender_ptr->getUseVirtualAddressing());
}
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test single user metadata", "[awsS3MetaData]") {
setRequiredProperties();
plan->setDynamicProperty(s3_processor, "meta_key", "meta_value");
test_controller.runSession(plan);
CHECK(mock_s3_request_sender_ptr->put_object_request.GetMetadata().at("meta_key") == "meta_value");
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.usermetadata value:meta_key=meta_value"));
}
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test multiple user metadata", "[awsS3MetaData]") {
setRequiredProperties();
plan->setDynamicProperty(s3_processor, "meta_key1", "meta_value1");
plan->setDynamicProperty(s3_processor, "meta_key2", "meta_value2");
test_controller.runSession(plan);
CHECK(mock_s3_request_sender_ptr->put_object_request.GetMetadata().at("meta_key1") == "meta_value1");
CHECK(mock_s3_request_sender_ptr->put_object_request.GetMetadata().at("meta_key2") == "meta_value2");
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.usermetadata value:meta_key1=meta_value1,meta_key2=meta_value2"));
}
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test proxy setting", "[awsS3Proxy]") {
setRequiredProperties();
setProxy();
test_controller.runSession(plan);
checkProxySettings();
}
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test access control setting", "[awsS3ACL]") {
setRequiredProperties();
plan->setDynamicProperty(update_attribute, "s3.permissions.full.users", "myuserid123, myuser@example.com");
plan->setProperty(s3_processor, "FullControl User List", "${s3.permissions.full.users}");
plan->setDynamicProperty(update_attribute, "s3.permissions.read.users", "myuserid456,myuser2@example.com");
plan->setProperty(s3_processor, "Read Permission User List", "${s3.permissions.read.users}");
plan->setDynamicProperty(update_attribute, "s3.permissions.readacl.users", "myuserid789, otheruser");
plan->setProperty(s3_processor, "Read ACL User List", "${s3.permissions.readacl.users}");
plan->setDynamicProperty(update_attribute, "s3.permissions.writeacl.users", "myuser3@example.com");
plan->setProperty(s3_processor, "Write ACL User List", "${s3.permissions.writeacl.users}");
plan->setDynamicProperty(update_attribute, "s3.permissions.cannedacl", "PublicReadWrite");
plan->setProperty(s3_processor, "Canned ACL", "${s3.permissions.cannedacl}");
test_controller.runSession(plan);
CHECK(mock_s3_request_sender_ptr->put_object_request.GetGrantFullControl() == "id=myuserid123, emailAddress=\"myuser@example.com\"");
CHECK(mock_s3_request_sender_ptr->put_object_request.GetGrantRead() == "id=myuserid456, emailAddress=\"myuser2@example.com\"");
CHECK(mock_s3_request_sender_ptr->put_object_request.GetGrantReadACP() == "id=myuserid789, id=otheruser");
CHECK(mock_s3_request_sender_ptr->put_object_request.GetGrantWriteACP() == "emailAddress=\"myuser3@example.com\"");
CHECK(mock_s3_request_sender_ptr->put_object_request.GetACL() == Aws::S3::Model::ObjectCannedACL::public_read_write);
}
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test path style access property", "[awsS3PathStyleAccess]") {
setRequiredProperties();
plan->setProperty(s3_processor, "Use Path Style Access", "true");
test_controller.runSession(plan);
REQUIRE(!mock_s3_request_sender_ptr->getUseVirtualAddressing());
}
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test multipart upload limits", "[awsS3MultipartUpload]") {
setRequiredProperties();
SECTION("Multipart Threshold is below limit") {
plan->setProperty(s3_processor, "Multipart Threshold", "4 MB");
}
SECTION("Multipart Threshold is above limit") {
plan->setProperty(s3_processor, "Multipart Threshold", "51 GB");
}
SECTION("Multipart Part Size is below limit") {
plan->setProperty(s3_processor, "Multipart Part Size", "4 MB");
}
SECTION("Multipart Part Size is above limit") {
plan->setProperty(s3_processor, "Multipart Part Size", "51 GB");
}
REQUIRE_THROWS_AS(test_controller.runSession(plan), minifi::Exception);
}
TEST_CASE_METHOD(PutS3ObjectUploadLimitChangedTestsFixture, "Test multipart upload", "[awsS3MultipartUpload]") {
setRequiredProperties();
plan->setProperty(s3_processor, "Multipart Threshold", "35 B");
plan->setProperty(s3_processor, "Multipart Part Size", "10 B");
auto temp_dir = test_controller.createTempDirectory();
plan->setProperty(s3_processor, "Temporary Directory Multipart State", temp_dir.string());
plan->setDynamicProperty(update_attribute, "s3.permissions.full.users", "myuserid123, myuser@example.com");
plan->setProperty(s3_processor, "FullControl User List", "${s3.permissions.full.users}");
plan->setDynamicProperty(update_attribute, "s3.permissions.read.users", "myuserid456,myuser2@example.com");
plan->setProperty(s3_processor, "Read Permission User List", "${s3.permissions.read.users}");
plan->setDynamicProperty(update_attribute, "s3.permissions.readacl.users", "myuserid789, otheruser");
plan->setProperty(s3_processor, "Read ACL User List", "${s3.permissions.readacl.users}");
plan->setDynamicProperty(update_attribute, "s3.permissions.writeacl.users", "myuser3@example.com");
plan->setProperty(s3_processor, "Write ACL User List", "${s3.permissions.writeacl.users}");
plan->setDynamicProperty(update_attribute, "s3.permissions.cannedacl", "PublicReadWrite");
plan->setProperty(s3_processor, "Canned ACL", "${s3.permissions.cannedacl}");
plan->setDynamicProperty(s3_processor, "meta_key1", "meta_value1");
plan->setDynamicProperty(s3_processor, "meta_key2", "meta_value2");
plan->setDynamicProperty(update_attribute, "test.contentType", "application/tar");
plan->setProperty(s3_processor, "Content Type", "${test.contentType}");
plan->setProperty(s3_processor, "Storage Class", "ReducedRedundancy");
plan->setProperty(s3_processor, "Region", minifi::aws::processors::region::AP_SOUTHEAST_3);
plan->setProperty(s3_processor, "Communications Timeout", "10 Sec");
plan->setDynamicProperty(update_attribute, "test.endpoint", "http://localhost:1234");
plan->setProperty(s3_processor, "Endpoint Override URL", "${test.endpoint}");
plan->setProperty(s3_processor, "Server Side Encryption", "AES256");
std::string object_key;
SECTION("Successful upload on first try") {
object_key = INPUT_FILENAME;
test_controller.runSession(plan);
}
SECTION("Successful upload on second try continuing the first multipart upload") {
plan->setProperty(s3_processor, "Object Key", "resumable_key");
object_key = "resumable_key";
auto log_failure = plan->addProcessor(
"LogAttribute",
"LogFailure",
core::Relationship("failure", "d"));
plan->addConnection(s3_processor, core::Relationship("failure", "d"), log_failure);
log_failure->setAutoTerminatedRelationships(std::array{core::Relationship("success", "d")});
mock_s3_request_sender_ptr->failOnPartOnce(3);
test_controller.runSession(plan);
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "Failed to upload part 3 of 4"));
plan->reset();
LogTestController::getInstance().clear();
test_controller.runSession(plan);
}
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.version value:" + S3_VERSION_1));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.etag value:" + S3_ETAG_UNQUOTED));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.expiration value:" + S3_EXPIRATION_DATE));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.sseAlgorithm value:" + S3_SSEALGORITHM_STR));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.bucket value:testBucket"));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.key value:" + object_key));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.contenttype value:application/tar"));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.permissions.cannedacl value:PublicReadWrite"));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.permissions.full.users value:myuserid123, myuser@example.com"));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.permissions.read.users value:myuserid456,myuser2@example.com"));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.permissions.readacl.users value:myuserid789, otheruser"));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.permissions.writeacl.users value:myuser3@example.com"));
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.usermetadata value:meta_key1=meta_value1,meta_key2=meta_value2"));
CHECK(mock_s3_request_sender_ptr->getClientConfig().region == minifi::aws::processors::region::AP_SOUTHEAST_3);
CHECK(mock_s3_request_sender_ptr->getClientConfig().connectTimeoutMs == 10000);
CHECK(mock_s3_request_sender_ptr->getClientConfig().endpointOverride == "http://localhost:1234");
CHECK(mock_s3_request_sender_ptr->getClientConfig().proxyHost.empty());
CHECK(mock_s3_request_sender_ptr->getClientConfig().proxyUserName.empty());
CHECK(mock_s3_request_sender_ptr->getClientConfig().proxyPassword.empty());
CHECK(mock_s3_request_sender_ptr->getUseVirtualAddressing());
CHECK(mock_s3_request_sender_ptr->create_multipart_upload_request.GetBucket() == S3_BUCKET);
CHECK(mock_s3_request_sender_ptr->create_multipart_upload_request.GetKey() == object_key);
CHECK(mock_s3_request_sender_ptr->create_multipart_upload_request.GetGrantFullControl() == "id=myuserid123, emailAddress=\"myuser@example.com\"");
CHECK(mock_s3_request_sender_ptr->create_multipart_upload_request.GetGrantRead() == "id=myuserid456, emailAddress=\"myuser2@example.com\"");
CHECK(mock_s3_request_sender_ptr->create_multipart_upload_request.GetGrantReadACP() == "id=myuserid789, id=otheruser");
CHECK(mock_s3_request_sender_ptr->create_multipart_upload_request.GetGrantWriteACP() == "emailAddress=\"myuser3@example.com\"");
CHECK(mock_s3_request_sender_ptr->create_multipart_upload_request.GetACL() == Aws::S3::Model::ObjectCannedACL::public_read_write);
CHECK(mock_s3_request_sender_ptr->create_multipart_upload_request.GetMetadata().at("meta_key1") == "meta_value1");
CHECK(mock_s3_request_sender_ptr->create_multipart_upload_request.GetMetadata().at("meta_key2") == "meta_value2");
CHECK(mock_s3_request_sender_ptr->create_multipart_upload_request.GetContentType() == "application/tar");
CHECK(mock_s3_request_sender_ptr->create_multipart_upload_request.GetStorageClass() == Aws::S3::Model::StorageClass::REDUCED_REDUNDANCY);
CHECK(mock_s3_request_sender_ptr->create_multipart_upload_request.GetServerSideEncryption() == Aws::S3::Model::ServerSideEncryption::AES256);
REQUIRE(mock_s3_request_sender_ptr->upload_part_requests.size() == 4);
for (size_t i = 0; i < mock_s3_request_sender_ptr->upload_part_requests.size(); ++i) {
const auto& upload_part_request = mock_s3_request_sender_ptr->upload_part_requests[i];
CHECK(upload_part_request.GetBucket() == S3_BUCKET);
CHECK(upload_part_request.GetKey() == object_key);
CHECK(upload_part_request.GetPartNumber() == static_cast<int>(i + 1));
CHECK(upload_part_request.GetUploadId() == S3_UPLOAD_ID);
}
CHECK(mock_s3_request_sender_ptr->getUploadPartRequestBody(mock_s3_request_sender_ptr->upload_part_requests[0]) == INPUT_DATA.substr(0, 10));
CHECK(mock_s3_request_sender_ptr->getUploadPartRequestBody(mock_s3_request_sender_ptr->upload_part_requests[1]) == INPUT_DATA.substr(10, 10));
CHECK(mock_s3_request_sender_ptr->getUploadPartRequestBody(mock_s3_request_sender_ptr->upload_part_requests[2]) == INPUT_DATA.substr(20, 10));
const auto last_part = mock_s3_request_sender_ptr->getUploadPartRequestBody(mock_s3_request_sender_ptr->upload_part_requests[3]);
CHECK(last_part.size() == INPUT_DATA.size() % 10);
CHECK(last_part == INPUT_DATA.substr(30));
const auto& parts = mock_s3_request_sender_ptr->complete_multipart_upload_request.GetMultipartUpload().GetParts();
REQUIRE(parts.size() == 4);
for (size_t i = 0; i < parts.size(); ++i) {
CHECK(parts[i].GetPartNumber() == static_cast<int>(i + 1));
CHECK(parts[i].GetETag() == "etag" + std::to_string(i + 1));
}
}
TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test ageoff functionality aborting obselete multipart uploads", "[awsS3MultipartUpload]") {
setRequiredProperties();
plan->setProperty(s3_processor, "Multipart Upload AgeOff Interval", "1 sec");
plan->setProperty(s3_processor, "Multipart Upload Max Age Threshold", "2 years");
auto temp_dir = test_controller.createTempDirectory();
plan->setProperty(s3_processor, "Temporary Directory Multipart State", temp_dir.string());
test_controller.runSession(plan);
REQUIRE(mock_s3_request_sender_ptr->abort_multipart_upload_requests.size() == 1);
CHECK(mock_s3_request_sender_ptr->abort_multipart_upload_requests[0].GetBucket() == S3_BUCKET);
CHECK(mock_s3_request_sender_ptr->abort_multipart_upload_requests[0].GetKey() == "old_key");
CHECK(mock_s3_request_sender_ptr->abort_multipart_upload_requests[0].GetUploadId() == "upload2");
}
TEST_CASE_METHOD(PutS3ObjectUploadLimitChangedTestsFixture, "Local state is not kept after successful upload", "[awsS3MultipartUpload]") {
setRequiredProperties();
plan->setProperty(s3_processor, "Multipart Threshold", "35 B");
plan->setProperty(s3_processor, "Multipart Part Size", "10 B");
plan->setProperty(s3_processor, "Object Key", "resumable_key");
auto temp_dir = test_controller.createTempDirectory();
plan->setProperty(s3_processor, "Temporary Directory Multipart State", temp_dir.string());
auto log_failure = plan->addProcessor(
"LogAttribute",
"LogFailure",
core::Relationship("failure", "d"));
plan->addConnection(s3_processor, core::Relationship("failure", "d"), log_failure);
log_failure->setAutoTerminatedRelationships(std::array{core::Relationship("success", "d")});
mock_s3_request_sender_ptr->failOnPartOnce(3);
test_controller.runSession(plan);
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "Failed to upload part 3 of 4"));
plan->reset();
LogTestController::getInstance().clear();
test_controller.runSession(plan);
plan->reset();
LogTestController::getInstance().clear();
test_controller.runSession(plan);
const auto& parts = mock_s3_request_sender_ptr->complete_multipart_upload_request.GetMultipartUpload().GetParts();
REQUIRE(parts.size() == 4);
for (size_t i = 0; i < parts.size(); ++i) {
CHECK(parts[i].GetPartNumber() == static_cast<int>(i + 1));
CHECK(parts[i].GetETag() == "etag" + std::to_string(4 + i + 1)); // The second successful upload contains different parts
}
}
TEST_CASE_METHOD(PutS3ObjectUploadLimitChangedTestsFixture, "Do not continue multipart upload that only exists in local cache but not in S3", "[awsS3MultipartUpload]") {
setRequiredProperties();
plan->setProperty(s3_processor, "Multipart Threshold", "35 B");
plan->setProperty(s3_processor, "Multipart Part Size", "10 B");
plan->setProperty(s3_processor, "Object Key", "non_resumable_key");
auto temp_dir = test_controller.createTempDirectory();
plan->setProperty(s3_processor, "Temporary Directory Multipart State", temp_dir.string());
auto log_failure = plan->addProcessor(
"LogAttribute",
"LogFailure",
core::Relationship("failure", "d"));
plan->addConnection(s3_processor, core::Relationship("failure", "d"), log_failure);
log_failure->setAutoTerminatedRelationships(std::array{core::Relationship("success", "d")});
mock_s3_request_sender_ptr->failOnPartOnce(3);
test_controller.runSession(plan);
CHECK(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "Failed to upload part 3 of 4"));
plan->reset();
LogTestController::getInstance().clear();
test_controller.runSession(plan);
const auto& parts = mock_s3_request_sender_ptr->complete_multipart_upload_request.GetMultipartUpload().GetParts();
REQUIRE(parts.size() == 4);
for (size_t i = 0; i < parts.size(); ++i) {
CHECK(parts[i].GetPartNumber() == static_cast<int>(i + 1));
CHECK(parts[i].GetETag() == "etag" + std::to_string(2 + i + 1)); // The second successful upload contains different parts
}
}
} // namespace