blob: 6938dc841f232353d6a2f7c7c83f30cdaf6c441d [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "AzureDataLakeStorageTestsFixture.h"
#include "processors/PutAzureDataLakeStorage.h"
#include "controllerservices/AzureStorageCredentialsService.h"
namespace {
using namespace std::literals::chrono_literals;
using PutAzureDataLakeStorageTestsFixture = AzureDataLakeStorageTestsFixture<minifi::azure::processors::PutAzureDataLakeStorage>;
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Azure storage credentials service is empty", "[azureDataLakeStorageParameters]") {
plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::AzureStorageCredentialsService, "");
REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
REQUIRE(getFailedFlowFileContents().empty());
}
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Test Azure credentials with account name and SAS token set", "[azureDataLakeStorageParameters]") {
setDefaultProperties();
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken, "token");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName, "TEST_ACCOUNT");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString, "");
test_controller_.runSession(plan_, true);
auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams();
CHECK(passed_params.credentials.buildConnectionString() == "AccountName=TEST_ACCOUNT;SharedAccessSignature=token");
REQUIRE(getFailedFlowFileContents().empty());
}
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Test Azure credentials with connection string override", "[azureDataLakeStorageParameters]") {
setDefaultProperties();
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString, CONNECTION_STRING);
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken, "token");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName, "TEST_ACCOUNT");
test_controller_.runSession(plan_, true);
auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams();
CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
REQUIRE(getFailedFlowFileContents().empty());
}
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Test Azure credentials with Azure default identity sources", "[azureDataLakeStorageParameters]") {
setDefaultProperties();
minifi::azure::CredentialConfigurationStrategyOption expected_configuration_strategy_option{};
std::string credential_configuration_strategy_string;
std::string managed_identity_client_id;
SECTION("Managed Identity") {
expected_configuration_strategy_option = minifi::azure::CredentialConfigurationStrategyOption::ManagedIdentity;
credential_configuration_strategy_string = "Managed Identity";
managed_identity_client_id = "test-managed-identity-client-id";
}
SECTION("Default Credential") {
expected_configuration_strategy_option = minifi::azure::CredentialConfigurationStrategyOption::DefaultCredential;
credential_configuration_strategy_string = "Default Credential";
}
SECTION("Workload Identity") {
expected_configuration_strategy_option = minifi::azure::CredentialConfigurationStrategyOption::WorkloadIdentity;
credential_configuration_strategy_string = "Workload Identity";
}
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString, "test");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::CredentialConfigurationStrategy, credential_configuration_strategy_string);
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName, "TEST_ACCOUNT");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ManagedIdentityClientId, managed_identity_client_id);
test_controller_.runSession(plan_, true);
auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams();
CHECK(passed_params.credentials.buildConnectionString().empty());
CHECK(passed_params.credentials.getStorageAccountName() == "TEST_ACCOUNT");
CHECK(passed_params.credentials.getEndpointSuffix() == "core.windows.net");
CHECK(passed_params.credentials.getCredentialConfigurationStrategy() == expected_configuration_strategy_option);
CHECK(passed_params.credentials.getManagedIdentityClientId() == managed_identity_client_id);
REQUIRE(getFailedFlowFileContents().empty());
}
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Both SAS Token and Storage Account Key cannot be set in credentials service") {
setDefaultProperties();
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString, "");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken, "token");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName, "TEST_ACCOUNT");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountKey, "TEST_KEY");
REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
}
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Filesystem name is not set", "[azureDataLakeStorageParameters]") {
plan_->setDynamicProperty(update_attribute_, "test.filesystemname", "");
test_controller_.runSession(plan_, true);
using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
CHECK(verifyLogLinePresenceInPollTime(1s, "Filesystem Name '' is invalid or empty!"));
auto failed_flowfiles = getFailedFlowFileContents();
REQUIRE(failed_flowfiles.size() == 1);
REQUIRE(failed_flowfiles[0] == TEST_DATA);
}
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Connection String is empty", "[azureDataLakeStorageParameters]") {
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString, "");
REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
REQUIRE(getFailedFlowFileContents().empty());
}
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Upload to Azure Data Lake Storage with default parameters", "[azureDataLakeStorageUpload]") {
test_controller_.runSession(plan_, true);
auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams();
CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
CHECK(passed_params.directory_name == DIRECTORY_NAME);
CHECK(passed_params.filename == GETFILE_FILE_NAME);
CHECK_FALSE(passed_params.replace_file);
REQUIRE(getFailedFlowFileContents().empty());
using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:" + DIRECTORY_NAME));
CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filename value:" + GETFILE_FILE_NAME));
CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filesystem value:" + FILESYSTEM_NAME));
CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:" + std::to_string(TEST_DATA.size())));
CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.primaryUri value:" + mock_data_lake_storage_client_ptr_->PRIMARY_URI + "\n"));
}
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "File creation fails", "[azureDataLakeStorageUpload]") {
mock_data_lake_storage_client_ptr_->setFileCreationError(true);
test_controller_.runSession(plan_, true);
auto failed_flowfiles = getFailedFlowFileContents();
REQUIRE(failed_flowfiles.size() == 1);
REQUIRE(failed_flowfiles[0] == TEST_DATA);
}
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "File upload fails", "[azureDataLakeStorageUpload]") {
mock_data_lake_storage_client_ptr_->setUploadFailure(true);
test_controller_.runSession(plan_, true);
auto failed_flowfiles = getFailedFlowFileContents();
REQUIRE(failed_flowfiles.size() == 1);
REQUIRE(failed_flowfiles[0] == TEST_DATA);
}
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Transfer to failure on 'fail' resolution strategy if file exists", "[azureDataLakeStorageUpload]") {
mock_data_lake_storage_client_ptr_->setFileCreation(false);
test_controller_.runSession(plan_, true);
auto failed_flowfiles = getFailedFlowFileContents();
REQUIRE(failed_flowfiles.size() == 1);
REQUIRE(failed_flowfiles[0] == TEST_DATA);
}
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Transfer to success on 'ignore' resolution strategy if file exists", "[azureDataLakeStorageUpload]") {
plan_->setProperty(azure_data_lake_storage_,
minifi::azure::processors::PutAzureDataLakeStorage::ConflictResolutionStrategy, magic_enum::enum_name(minifi::azure::FileExistsResolutionStrategy::ignore));
mock_data_lake_storage_client_ptr_->setFileCreation(false);
test_controller_.runSession(plan_, true);
REQUIRE(getFailedFlowFileContents().empty());
using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
CHECK(verifyLogLinePresenceInPollTime(1s, "key:filename value:" + GETFILE_FILE_NAME));
CHECK_FALSE(LogTestController::getInstance().contains("key:azure", 0s, 0ms));
}
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Replace old file on 'replace' resolution strategy if file exists", "[azureDataLakeStorageUpload]") {
plan_->setProperty(azure_data_lake_storage_,
minifi::azure::processors::PutAzureDataLakeStorage::ConflictResolutionStrategy, magic_enum::enum_name(minifi::azure::FileExistsResolutionStrategy::replace));
mock_data_lake_storage_client_ptr_->setFileCreation(false);
test_controller_.runSession(plan_, true);
auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams();
CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
CHECK(passed_params.directory_name == DIRECTORY_NAME);
CHECK(passed_params.filename == GETFILE_FILE_NAME);
CHECK(passed_params.replace_file);
REQUIRE(getFailedFlowFileContents().empty());
using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:" + DIRECTORY_NAME));
CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filename value:" + GETFILE_FILE_NAME));
CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.filesystem value:" + FILESYSTEM_NAME));
CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.length value:" + std::to_string(TEST_DATA.size())));
CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.primaryUri value:" + mock_data_lake_storage_client_ptr_->PRIMARY_URI + "\n"));
}
TEST_CASE_METHOD(PutAzureDataLakeStorageTestsFixture, "Upload to Azure Data Lake Storage with empty directory is accepted", "[azureDataLakeStorageUpload]") {
plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::PutAzureDataLakeStorage::DirectoryName, "");
test_controller_.runSession(plan_, true);
auto passed_params = mock_data_lake_storage_client_ptr_->getPassedPutParams();
CHECK(passed_params.directory_name.empty());
REQUIRE(getFailedFlowFileContents().empty());
using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
CHECK(verifyLogLinePresenceInPollTime(1s, "key:azure.directory value:\n"));
}
} // namespace