blob: c5505f49e892b9fbd10e07d510c6d5672e46aa52 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "AzureDataLakeStorageTestsFixture.h"
#include "processors/FetchAzureDataLakeStorage.h"
#include "controllerservices/AzureStorageCredentialsService.h"
namespace {
using namespace std::literals::chrono_literals;
using FetchAzureDataLakeStorageTestsFixture = AzureDataLakeStorageTestsFixture<minifi::azure::processors::FetchAzureDataLakeStorage>;
TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Azure storage credentials service is empty", "[azureDataLakeStorageParameters]") {
plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::AzureStorageCredentialsService, "");
REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
REQUIRE(getFailedFlowFileContents().empty());
}
TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Test Azure credentials with account name and SAS token set", "[azureDataLakeStorageParameters]") {
setDefaultProperties();
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken, "token");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName, "TEST_ACCOUNT");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString, "");
test_controller_.runSession(plan_, true);
auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams();
CHECK(passed_params.credentials.buildConnectionString() == "AccountName=TEST_ACCOUNT;SharedAccessSignature=token");
CHECK(getFailedFlowFileContents().empty());
}
TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Test Azure credentials with connection string override", "[azureDataLakeStorageParameters]") {
setDefaultProperties();
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString, CONNECTION_STRING);
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken, "token");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName, "TEST_ACCOUNT");
test_controller_.runSession(plan_, true);
auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams();
CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
CHECK(getFailedFlowFileContents().empty());
}
TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Test Azure credentials with Azure default identity sources", "[azureDataLakeStorageParameters]") {
setDefaultProperties();
minifi::azure::CredentialConfigurationStrategyOption expected_configuration_strategy_option{};
std::string credential_configuration_strategy_string;
std::string managed_identity_client_id;
SECTION("Managed Identity") {
expected_configuration_strategy_option = minifi::azure::CredentialConfigurationStrategyOption::ManagedIdentity;
credential_configuration_strategy_string = "Managed Identity";
managed_identity_client_id = "test-managed-identity-client-id";
}
SECTION("Default Credential") {
expected_configuration_strategy_option = minifi::azure::CredentialConfigurationStrategyOption::DefaultCredential;
credential_configuration_strategy_string = "Default Credential";
}
SECTION("Workload Identity") {
expected_configuration_strategy_option = minifi::azure::CredentialConfigurationStrategyOption::WorkloadIdentity;
credential_configuration_strategy_string = "Workload Identity";
}
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString, "test");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::CredentialConfigurationStrategy, credential_configuration_strategy_string);
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName, "TEST_ACCOUNT");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ManagedIdentityClientId, managed_identity_client_id);
test_controller_.runSession(plan_, true);
auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams();
CHECK(passed_params.credentials.buildConnectionString().empty());
CHECK(passed_params.credentials.getStorageAccountName() == "TEST_ACCOUNT");
CHECK(passed_params.credentials.getEndpointSuffix() == "core.windows.net");
CHECK(passed_params.credentials.getCredentialConfigurationStrategy() == expected_configuration_strategy_option);
CHECK(passed_params.credentials.getManagedIdentityClientId() == managed_identity_client_id);
CHECK(getFailedFlowFileContents().empty());
}
TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Both SAS Token and Storage Account Key cannot be set in credentials service") {
setDefaultProperties();
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString, "");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::SASToken, "token");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountName, "TEST_ACCOUNT");
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::StorageAccountKey, "TEST_KEY");
REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
}
TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Filesystem name is not set", "[azureDataLakeStorageParameters]") {
plan_->setDynamicProperty(update_attribute_, "test.filesystemname", "");
test_controller_.runSession(plan_, true);
using org::apache::nifi::minifi::test::utils::verifyLogLinePresenceInPollTime;
CHECK(verifyLogLinePresenceInPollTime(1s, "Filesystem Name '' is invalid or empty!"));
auto failed_flowfiles = getFailedFlowFileContents();
REQUIRE(failed_flowfiles.size() == 1);
REQUIRE(failed_flowfiles[0] == TEST_DATA);
}
TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Connection String is empty", "[azureDataLakeStorageParameters]") {
plan_->setProperty(azure_storage_cred_service_, minifi::azure::controllers::AzureStorageCredentialsService::ConnectionString, "");
REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception);
REQUIRE(getFailedFlowFileContents().empty());
}
TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Fetch full file succeeds", "[azureDataLakeStorageFetch]") {
test_controller_.runSession(plan_, true);
REQUIRE(getFailedFlowFileContents().empty());
auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams();
CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
CHECK(passed_params.directory_name == DIRECTORY_NAME);
CHECK(passed_params.filename == GETFILE_FILE_NAME);
CHECK(passed_params.range_start == std::nullopt);
CHECK(passed_params.range_length == std::nullopt);
auto success_contents = getSuccessfulFlowFileContents();
REQUIRE(success_contents.size() == 1);
REQUIRE(success_contents[0] == mock_data_lake_storage_client_ptr_->FETCHED_DATA);
}
TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Fetch a range of the file succeeds", "[azureDataLakeStorageFetch]") {
plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::RangeStart, "5");
plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::RangeLength, "10");
test_controller_.runSession(plan_, true);
REQUIRE(getFailedFlowFileContents().empty());
auto passed_params = mock_data_lake_storage_client_ptr_->getPassedFetchParams();
CHECK(passed_params.credentials.buildConnectionString() == CONNECTION_STRING);
CHECK(passed_params.file_system_name == FILESYSTEM_NAME);
CHECK(passed_params.directory_name == DIRECTORY_NAME);
CHECK(passed_params.filename == GETFILE_FILE_NAME);
CHECK(*passed_params.range_start == 5);
CHECK(*passed_params.range_length == 10);
auto success_contents = getSuccessfulFlowFileContents();
REQUIRE(success_contents.size() == 1);
REQUIRE(success_contents[0] == mock_data_lake_storage_client_ptr_->FETCHED_DATA.substr(5, 10));
}
TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Number of Retries is set", "[azureDataLakeStorageFetch]") {
plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::NumberOfRetries, "1");
test_controller_.runSession(plan_, true);
CHECK(mock_data_lake_storage_client_ptr_->getPassedFetchParams().number_of_retries == 1);
}
TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Valid Number of Retries is set via EL", "[azureDataLakeStorageFetch]") {
plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::NumberOfRetries, "${literal(10):multiply(2):plus(1):multiply(2)}");
test_controller_.runSession(plan_, true);
CHECK(mock_data_lake_storage_client_ptr_->getPassedFetchParams().number_of_retries == 42);
}
TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Invalid Number of Retries is set via EL", "[azureDataLakeStorageFetch]") {
plan_->setProperty(azure_data_lake_storage_, minifi::azure::processors::FetchAzureDataLakeStorage::NumberOfRetries, "${literal(\"asd\")}");
REQUIRE_THROWS_WITH(test_controller_.runSession(plan_, true), "Expected parsable uint64_t from \"AzureDataLakeStorageProcessor::Number of Retries\", but got GeneralParsingError (Parsing Error:0)");
CHECK_FALSE(mock_data_lake_storage_client_ptr_->getPassedFetchParams().number_of_retries);
}
TEST_CASE_METHOD(FetchAzureDataLakeStorageTestsFixture, "Fetch full file fails", "[azureDataLakeStorageFetch]") {
mock_data_lake_storage_client_ptr_->setFetchFailure(true);
test_controller_.runSession(plan_, true);
REQUIRE(getSuccessfulFlowFileContents().empty());
auto failed_contents = getFailedFlowFileContents();
REQUIRE(failed_contents.size() == 1);
REQUIRE(failed_contents[0] == TEST_DATA);
}
} // namespace