blob: 8031c7b7502ffdb3c2581ee6fcadf4151d96e4be [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <fstream>
#include "S3TestsFixture.h"
#include "processors/FetchS3Object.h"
#include "utils/IntegrationTestUtils.h"
#include "utils/file/FileUtils.h"
namespace {
using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime;
using org::apache::nifi::minifi::utils::file::get_content;
class FetchS3ObjectTestsFixture : public FlowProcessorS3TestsFixture<minifi::aws::processors::FetchS3Object> {
public:
FetchS3ObjectTestsFixture() {
auto putfile = plan->addProcessor(
"PutFile",
"PutFile",
core::Relationship("success", "d"),
true);
output_dir = test_controller.createTempDirectory();
plan->setProperty(putfile, "Directory", output_dir.string());
}
std::filesystem::path output_dir;
};
TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test AWS credential setting", "[awsCredentials]") {
setBucket();
SECTION("Test property credentials") {
setAccesKeyCredentialsInProcessor();
}
SECTION("Test credentials setting from AWS Credentials service") {
setAccessKeyCredentialsInController();
setCredentialsService();
}
SECTION("Test credentials file setting") {
setCredentialFile(s3_processor);
}
SECTION("Test credentials file setting from AWS Credentials service") {
setCredentialFile(aws_credentials_service);
setCredentialsService();
}
SECTION("Test credentials setting using default credential chain") {
setUseDefaultCredentialsChain(s3_processor);
}
SECTION("Test credentials setting from AWS Credentials service using default credential chain") {
setUseDefaultCredentialsChain(aws_credentials_service);
setCredentialsService();
}
test_controller.runSession(plan, true);
REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSAccessKeyId() == "key");
REQUIRE(mock_s3_request_sender_ptr->getCredentials().GetAWSSecretKey() == "secret");
}
TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test required property not set", "[awsS3Config]") {
SECTION("Test credentials not set") {
}
SECTION("Test no bucket is set") {
setAccesKeyCredentialsInProcessor();
}
SECTION("Test no object key is set") {
setRequiredProperties();
plan->setDynamicProperty(update_attribute, "filename", "");
}
SECTION("Test region is empty") {
setRequiredProperties();
plan->setProperty(s3_processor, "Region", "");
}
REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
}
TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test proxy setting", "[awsS3Proxy]") {
setRequiredProperties();
setProxy();
test_controller.runSession(plan, true);
checkProxySettings();
}
TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test default properties", "[awsS3Config]") {
setRequiredProperties();
test_controller.runSession(plan, true);
REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.bucket value:" + S3_BUCKET));
REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:filename value:" + INPUT_FILENAME));
REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:path value:\n"));
REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:absolute.path value:" + INPUT_FILENAME));
REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:mime.type value:" + S3_CONTENT_TYPE));
REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.etag value:" + S3_ETAG_UNQUOTED));
REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.expirationTime value:" + S3_EXPIRATION_DATE));
REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.expirationTimeRuleId value:" + S3_EXPIRATION_TIME_RULE_ID));
REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.sseAlgorithm value:" + S3_SSEALGORITHM_STR));
REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.version value:" + S3_VERSION_1));
REQUIRE(get_content(output_dir / INPUT_FILENAME) == S3_CONTENT);
REQUIRE(mock_s3_request_sender_ptr->get_object_request.GetVersionId().empty());
REQUIRE(!mock_s3_request_sender_ptr->get_object_request.VersionIdHasBeenSet());
REQUIRE(mock_s3_request_sender_ptr->get_object_request.GetRequestPayer() == Aws::S3::Model::RequestPayer::NOT_SET);
}
TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test empty optional S3 results", "[awsS3Config]") {
setRequiredProperties();
mock_s3_request_sender_ptr->returnEmptyS3Result();
test_controller.runSession(plan, true);
REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:s3.bucket value:" + S3_BUCKET));
REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:filename value:" + INPUT_FILENAME));
REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:path value:\n"));
REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:absolute.path value:" + INPUT_FILENAME));
REQUIRE(!LogTestController::getInstance().contains("key:mime.type", std::chrono::seconds(0), std::chrono::milliseconds(0)));
REQUIRE(!LogTestController::getInstance().contains("key:s3.etag", std::chrono::seconds(0), std::chrono::milliseconds(0)));
REQUIRE(!LogTestController::getInstance().contains("key:s3.expirationTime", std::chrono::seconds(0), std::chrono::milliseconds(0)));
REQUIRE(!LogTestController::getInstance().contains("key:s3.expirationTimeRuleId", std::chrono::seconds(0), std::chrono::milliseconds(0)));
REQUIRE(!LogTestController::getInstance().contains("key:s3.sseAlgorithm", std::chrono::seconds(0), std::chrono::milliseconds(0)));
REQUIRE(!LogTestController::getInstance().contains("key:s3.version", std::chrono::seconds(0), std::chrono::milliseconds(0)));
REQUIRE(get_content(output_dir / INPUT_FILENAME).empty());
}
TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test subdirectories on AWS", "[awsS3Config]") {
setRequiredProperties();
plan->setProperty(s3_processor, "Object Key", "dir1/dir2/logs.txt");
test_controller.runSession(plan, true);
REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:filename value:logs.txt"));
REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:path value:dir1/dir2"));
REQUIRE(verifyLogLinePresenceInPollTime(std::chrono::seconds(3), "key:absolute.path value:dir1/dir2/logs.txt"));
REQUIRE(get_content(output_dir / INPUT_FILENAME).empty());
}
TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test optional values are set in request", "[awsS3Config]") {
setRequiredProperties();
plan->setProperty(s3_processor, "Version", S3_VERSION_1);
plan->setProperty(s3_processor, "Requester Pays", "true");
test_controller.runSession(plan, true);
REQUIRE(mock_s3_request_sender_ptr->get_object_request.GetVersionId() == S3_VERSION_1);
REQUIRE(mock_s3_request_sender_ptr->get_object_request.GetRequestPayer() == Aws::S3::Model::RequestPayer::requester);
}
TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test non-default client configuration values", "[awsS3Config]") {
setRequiredProperties();
plan->setProperty(s3_processor, "Region", minifi::aws::processors::region::US_EAST_1);
plan->setProperty(s3_processor, "Communications Timeout", "10 Sec");
plan->setDynamicProperty(update_attribute, "test.endpoint", "http://localhost:1234");
plan->setProperty(s3_processor, "Endpoint Override URL", "${test.endpoint}");
test_controller.runSession(plan, true);
REQUIRE(mock_s3_request_sender_ptr->getClientConfig().region == minifi::aws::processors::region::US_EAST_1);
REQUIRE(mock_s3_request_sender_ptr->getClientConfig().connectTimeoutMs == 10000);
REQUIRE(mock_s3_request_sender_ptr->getClientConfig().endpointOverride == "http://localhost:1234");
}
} // namespace