| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #pragma once |
| |
| #include <array> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| #include <memory> |
| |
| #include "unit/TestBase.h" |
| #include "unit/Catch.h" |
| #include "core/Processor.h" |
| #include "processors/GetFile.h" |
| #include "processors/PutFile.h" |
| #include "processors/LogAttribute.h" |
| #include "processors/UpdateAttribute.h" |
| #include "utils/file/FileUtils.h" |
| #include "MockBlobStorage.h" |
| |
| const std::string CONTAINER_NAME = "test-container"; |
| const std::string STORAGE_ACCOUNT_NAME = "test-account"; |
| const std::string STORAGE_ACCOUNT_KEY = "test-key"; |
| const std::string SAS_TOKEN = "test-sas-token"; |
| const std::string ENDPOINT_SUFFIX = "test.suffix.com"; |
| const std::string CONNECTION_STRING = "test-connectionstring"; |
| const std::string BLOB_NAME = "test-blob.txt"; |
| const std::string TEST_DATA = "data"; |
| const std::string GET_FILE_NAME = "input_data.log"; |
| |
| template<typename ProcessorType> |
| class AzureBlobStorageTestsFixture { |
| public: |
| AzureBlobStorageTestsFixture() { |
| LogTestController::getInstance().setDebug<TestPlan>(); |
| LogTestController::getInstance().setDebug<minifi::core::Processor>(); |
| LogTestController::getInstance().setTrace<minifi::core::ProcessSession>(); |
| LogTestController::getInstance().setTrace<minifi::processors::GetFile>(); |
| LogTestController::getInstance().setTrace<minifi::processors::PutFile>(); |
| LogTestController::getInstance().setDebug<minifi::processors::UpdateAttribute>(); |
| LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>(); |
| LogTestController::getInstance().setTrace<ProcessorType>(); |
| |
| // Build MiNiFi processing graph |
| plan_ = test_controller_.createPlan(); |
| auto mock_blob_storage = std::make_unique<MockBlobStorage>(); |
| mock_blob_storage_ptr_ = mock_blob_storage.get(); |
| auto uuid = utils::IdGenerator::getIdGenerator()->generate(); |
| auto impl = std::unique_ptr<ProcessorType>( |
| new ProcessorType({.uuid = uuid, .name = "AzureBlobStorageProcessor", .logger = logging::LoggerFactory<ProcessorType>::getLogger(uuid)}, std::move(mock_blob_storage))); |
| auto azure_blob_storage_processor_unique_ptr = std::make_unique<core::Processor>(impl->getName(), impl->getUUID(), std::move(impl)); |
| azure_blob_storage_processor_ = azure_blob_storage_processor_unique_ptr.get(); |
| auto input_dir = test_controller_.createTempDirectory(); |
| std::ofstream input_file_stream(input_dir / GET_FILE_NAME); |
| input_file_stream << TEST_DATA; |
| input_file_stream.close(); |
| get_file_processor_ = plan_->addProcessor("GetFile", "GetFile"); |
| plan_->setProperty(get_file_processor_, minifi::processors::GetFile::Directory, input_dir.string()); |
| plan_->setProperty(get_file_processor_, minifi::processors::GetFile::KeepSourceFile, "false"); |
| update_attribute_processor_ = plan_->addProcessor("UpdateAttribute", "UpdateAttribute", { {"success", "d"} }, true); |
| plan_->addProcessor(std::move(azure_blob_storage_processor_unique_ptr), "AzureBlobStorageProcessor", { {"success", "d"} }, true); |
| auto logattribute = plan_->addProcessor("LogAttribute", "LogAttribute", { {"success", "d"} }, true); |
| success_putfile_ = plan_->addProcessor("PutFile", "SuccessPutFile", { {"success", "d"} }, false); |
| plan_->addConnection(logattribute, {"success", "d"}, success_putfile_); |
| success_putfile_->setAutoTerminatedRelationships(std::array{core::Relationship{"success", "d"}, core::Relationship{"failure", "d"}}); |
| success_output_dir_ = test_controller_.createTempDirectory(); |
| plan_->setProperty(success_putfile_, org::apache::nifi::minifi::processors::PutFile::Directory, success_output_dir_.string()); |
| |
| failure_putfile_ = plan_->addProcessor("PutFile", "FailurePutFile", { {"success", "d"} }, false); |
| plan_->addConnection(azure_blob_storage_processor_, {"failure", "d"}, failure_putfile_); |
| failure_putfile_->setAutoTerminatedRelationships(std::array{core::Relationship{"success", "d"}, core::Relationship{"failure", "d"}}); |
| failure_output_dir_ = test_controller_.createTempDirectory(); |
| plan_->setProperty(failure_putfile_, org::apache::nifi::minifi::processors::PutFile::Directory, failure_output_dir_.string()); |
| } |
| |
| void setDefaultCredentials() { |
| plan_->setDynamicProperty(update_attribute_processor_, "test.account_name", STORAGE_ACCOUNT_NAME); |
| plan_->setProperty(azure_blob_storage_processor_, "Storage Account Name", "${test.account_name}"); |
| plan_->setDynamicProperty(update_attribute_processor_, "test.account_key", STORAGE_ACCOUNT_KEY); |
| plan_->setProperty(azure_blob_storage_processor_, "Storage Account Key", "${test.account_key}"); |
| } |
| |
| std::vector<std::string> getFileContents(const std::filesystem::path& dir) { |
| std::vector<std::string> file_contents; |
| |
| auto lambda = [&file_contents](const std::filesystem::path& path, const std::filesystem::path& filename) -> bool { |
| std::ifstream is(path / filename, std::ifstream::binary); |
| file_contents.push_back(std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>())); |
| return true; |
| }; |
| |
| utils::file::FileUtils::list_dir(dir, lambda, plan_->getLogger(), false); |
| return file_contents; |
| } |
| |
| std::vector<std::string> getFailedFlowFileContents() { |
| return getFileContents(failure_output_dir_); |
| } |
| |
| std::vector<std::string> getSuccessfulFlowFileContents() { |
| return getFileContents(success_output_dir_); |
| } |
| |
| virtual ~AzureBlobStorageTestsFixture() { |
| LogTestController::getInstance().reset(); |
| } |
| |
| protected: |
| TestController test_controller_; |
| std::shared_ptr<TestPlan> plan_; |
| MockBlobStorage* mock_blob_storage_ptr_; |
| core::Processor* azure_blob_storage_processor_; |
| core::Processor* get_file_processor_; |
| core::Processor* update_attribute_processor_; |
| core::Processor* success_putfile_; |
| core::Processor* failure_putfile_; |
| std::filesystem::path failure_output_dir_; |
| std::filesystem::path success_output_dir_; |
| }; |