blob: a3e4ee87cb724cc96a0b98f775ee6b7e8303441a [file] [log] [blame]
/**
* @file AzureBlobStorageProcessorBase.cpp
* AzureBlobStorageProcessorBase class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "AzureBlobStorageProcessorBase.h"
#include "core/ProcessContext.h"
namespace org::apache::nifi::minifi::azure::processors {
void AzureBlobStorageProcessorBase::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
gsl_Expects(context);
std::string value;
if (!context->getProperty(ContainerName, value) || value.empty()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Container Name property missing or invalid");
}
if (context->getProperty(AzureStorageCredentialsService, value) && !value.empty()) {
logger_->log_info("Getting Azure Storage credentials from controller service with name: '%s'", value);
return;
}
if (!context->getProperty(UseManagedIdentityCredentials, use_managed_identity_credentials_)) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Use Managed Identity Credentials is invalid.");
}
if (use_managed_identity_credentials_) {
logger_->log_info("Using Managed Identity for authentication");
return;
}
if (context->getProperty(ConnectionString, value) && !value.empty()) {
logger_->log_info("Using connectionstring directly for Azure Storage authentication");
return;
}
if (!context->getProperty(StorageAccountName, value) || value.empty()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Storage Account Name property missing or invalid");
}
if (context->getProperty(StorageAccountKey, value) && !value.empty()) {
logger_->log_info("Using storage account name and key for authentication");
return;
}
if (!context->getProperty(SASToken, value) || value.empty()) {
throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Neither Storage Account Key nor SAS Token property was set.");
}
logger_->log_info("Using storage account name and SAS token for authentication");
}
storage::AzureStorageCredentials AzureBlobStorageProcessorBase::getAzureCredentialsFromProperties(
core::ProcessContext &context, const std::shared_ptr<core::FlowFile> &flow_file) const {
storage::AzureStorageCredentials credentials;
std::string value;
if (context.getProperty(StorageAccountName, value, flow_file)) {
credentials.setStorageAccountName(value);
}
if (context.getProperty(StorageAccountKey, value, flow_file)) {
credentials.setStorageAccountKey(value);
}
if (context.getProperty(SASToken, value, flow_file)) {
credentials.setSasToken(value);
}
if (context.getProperty(CommonStorageAccountEndpointSuffix, value, flow_file)) {
credentials.setEndpontSuffix(value);
}
if (context.getProperty(ConnectionString, value, flow_file)) {
credentials.setConnectionString(value);
}
credentials.setUseManagedIdentityCredentials(use_managed_identity_credentials_);
return credentials;
}
bool AzureBlobStorageProcessorBase::setCommonStorageParameters(
storage::AzureBlobStorageParameters& params,
core::ProcessContext &context,
const std::shared_ptr<core::FlowFile> &flow_file) {
auto credentials = getCredentials(context, flow_file);
if (!credentials) {
return false;
}
params.credentials = *credentials;
if (!context.getProperty(ContainerName, params.container_name, flow_file) || params.container_name.empty()) {
logger_->log_error("Container Name is invalid or empty!");
return false;
}
return true;
}
std::optional<storage::AzureStorageCredentials> AzureBlobStorageProcessorBase::getCredentials(
core::ProcessContext &context,
const std::shared_ptr<core::FlowFile> &flow_file) const {
auto [result, controller_service_creds] = getCredentialsFromControllerService(context);
if (controller_service_creds) {
if (controller_service_creds->isValid()) {
logger_->log_debug("Azure credentials read from credentials controller service!");
return controller_service_creds;
} else {
logger_->log_error("Azure credentials controller service is set with invalid credential parameters!");
return std::nullopt;
}
} else if (result == GetCredentialsFromControllerResult::CONTROLLER_NAME_INVALID) {
logger_->log_error("Azure credentials controller service name is invalid!");
return std::nullopt;
}
logger_->log_debug("No valid Azure credentials are set in credentials controller service, checking properties...");
auto property_creds = getAzureCredentialsFromProperties(context, flow_file);
if (property_creds.isValid()) {
logger_->log_debug("Azure credentials read from properties!");
return property_creds;
}
logger_->log_error("No valid Azure credentials are set in credentials controller service nor in properties!");
return std::nullopt;
}
} // namespace org::apache::nifi::minifi::azure::processors