| /** |
| * @file PutAzureDataLakeStorage.cpp |
| * PutAzureDataLakeStorage class implementation |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "PutAzureDataLakeStorage.h" |
| |
| #include <vector> |
| |
| #include "utils/ProcessorConfigUtils.h" |
| #include "utils/gsl.h" |
| #include "core/ProcessSession.h" |
| #include "core/Resource.h" |
| |
| namespace org::apache::nifi::minifi::azure::processors { |
| |
| void PutAzureDataLakeStorage::initialize() { |
| setSupportedProperties(Properties); |
| setSupportedRelationships(Relationships); |
| } |
| |
| void PutAzureDataLakeStorage::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) { |
| gsl_Expects(context && sessionFactory); |
| AzureDataLakeStorageFileProcessorBase::onSchedule(context, sessionFactory); |
| std::optional<storage::AzureStorageCredentials> credentials; |
| std::tie(std::ignore, credentials) = getCredentialsFromControllerService(*context); |
| if (!credentials) { |
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials Service property missing or invalid"); |
| } |
| |
| if (!credentials->isValid()) { |
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Azure Storage Credentials Service properties are not set or invalid"); |
| } |
| |
| credentials_ = *credentials; |
| conflict_resolution_strategy_ = utils::parseEnumProperty<azure::FileExistsResolutionStrategy>(*context, ConflictResolutionStrategy); |
| } |
| |
| std::optional<storage::PutAzureDataLakeStorageParameters> PutAzureDataLakeStorage::buildUploadParameters( |
| core::ProcessContext& context, const std::shared_ptr<core::FlowFile>& flow_file) { |
| storage::PutAzureDataLakeStorageParameters params; |
| if (!setFileOperationCommonParameters(params, context, flow_file)) { |
| return std::nullopt; |
| } |
| params.replace_file = conflict_resolution_strategy_ == azure::FileExistsResolutionStrategy::replace; |
| |
| return params; |
| } |
| |
| void PutAzureDataLakeStorage::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { |
| gsl_Expects(context && session); |
| logger_->log_trace("PutAzureDataLakeStorage onTrigger"); |
| std::shared_ptr<core::FlowFile> flow_file = session->get(); |
| if (!flow_file) { |
| context->yield(); |
| return; |
| } |
| |
| const auto params = buildUploadParameters(*context, flow_file); |
| if (!params) { |
| session->transfer(flow_file, Failure); |
| return; |
| } |
| |
| PutAzureDataLakeStorage::ReadCallback callback(flow_file->getSize(), azure_data_lake_storage_, *params, logger_); |
| session->read(flow_file, std::ref(callback)); |
| const storage::UploadDataLakeStorageResult result = callback.getResult(); |
| |
| if (result.result_code == storage::UploadResultCode::FILE_ALREADY_EXISTS) { |
| gsl_Expects(conflict_resolution_strategy_ != azure::FileExistsResolutionStrategy::replace); |
| if (conflict_resolution_strategy_ == azure::FileExistsResolutionStrategy::fail) { |
| logger_->log_error("Failed to upload file '%s/%s' to filesystem '%s' on Azure Data Lake storage because file already exists", |
| params->directory_name, params->filename, params->file_system_name); |
| session->transfer(flow_file, Failure); |
| return; |
| } else if (conflict_resolution_strategy_ == azure::FileExistsResolutionStrategy::ignore) { |
| logger_->log_debug("Upload of file '%s/%s' was ignored because it already exits in filesystem '%s' on Azure Data Lake Storage", |
| params->directory_name, params->filename, params->file_system_name); |
| session->transfer(flow_file, Success); |
| return; |
| } |
| } else if (result.result_code == storage::UploadResultCode::FAILURE) { |
| logger_->log_error("Failed to upload file '%s/%s' to filesystem '%s' on Azure Data Lake storage", params->directory_name, params->filename, params->file_system_name); |
| session->transfer(flow_file, Failure); |
| } else { |
| session->putAttribute(flow_file, "azure.filesystem", params->file_system_name); |
| session->putAttribute(flow_file, "azure.directory", params->directory_name); |
| session->putAttribute(flow_file, "azure.filename", params->filename); |
| session->putAttribute(flow_file, "azure.primaryUri", result.primary_uri); |
| session->putAttribute(flow_file, "azure.length", std::to_string(flow_file->getSize())); |
| logger_->log_debug("Successfully uploaded file '%s/%s' to filesystem '%s' on Azure Data Lake storage", params->directory_name, params->filename, params->file_system_name); |
| session->transfer(flow_file, Success); |
| } |
| } |
| |
| PutAzureDataLakeStorage::ReadCallback::ReadCallback( |
| uint64_t flow_size, storage::AzureDataLakeStorage& azure_data_lake_storage, const storage::PutAzureDataLakeStorageParameters& params, std::shared_ptr<core::logging::Logger> logger) |
| : flow_size_(flow_size), |
| azure_data_lake_storage_(azure_data_lake_storage), |
| params_(params), |
| logger_(std::move(logger)) { |
| } |
| |
| int64_t PutAzureDataLakeStorage::ReadCallback::operator()(const std::shared_ptr<io::InputStream>& stream) { |
| std::vector<std::byte> buffer; |
| buffer.resize(flow_size_); |
| size_t read_ret = stream->read(buffer); |
| if (io::isError(read_ret) || read_ret != flow_size_) { |
| return -1; |
| } |
| |
| result_ = azure_data_lake_storage_.uploadFile(params_, buffer); |
| return read_ret; |
| } |
| |
| REGISTER_RESOURCE(PutAzureDataLakeStorage, Processor); |
| |
| } // namespace org::apache::nifi::minifi::azure::processors |