blob: cb55dc1a8911cfd6821583ea3c97a1526b306096 [file] [log] [blame]
/**
* @file AzureDataLakeStorage.cpp
* AzureDataLakeStorage class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "AzureDataLakeStorage.h"
#include <string_view>
#include "AzureDataLakeStorageClient.h"
#include "io/StreamPipe.h"
#include "utils/file/FileUtils.h"
#include "utils/StringUtils.h"
#include "minifi-cpp/utils/gsl.h"
#include "utils/GeneralUtils.h"
#include "utils/RegexUtils.h"
namespace org::apache::nifi::minifi::azure::storage {
namespace {
bool matchesPathFilter(std::string_view base_directory, const std::optional<minifi::utils::Regex>& path_regex, std::string path) {
gsl_Expects(utils::implies(!base_directory.empty(), minifi::utils::string::startsWith(path, base_directory)));
if (!path_regex) {
return true;
}
if (!base_directory.empty()) {
path = path.size() == base_directory.size() ? "" : path.substr(base_directory.size() + 1);
}
return minifi::utils::regexMatch(path, *path_regex);
}
bool matchesFileFilter(const std::optional<minifi::utils::Regex>& file_regex, const std::string& filename) {
if (!file_regex) {
return true;
}
return minifi::utils::regexMatch(filename, *file_regex);
}
} // namespace
AzureDataLakeStorage::AzureDataLakeStorage(std::unique_ptr<DataLakeStorageClient> data_lake_storage_client)
: data_lake_storage_client_(data_lake_storage_client ? std::move(data_lake_storage_client) : std::make_unique<AzureDataLakeStorageClient>()) {
}
UploadDataLakeStorageResult AzureDataLakeStorage::uploadFile(const PutAzureDataLakeStorageParameters& params, std::span<const std::byte> buffer) {
UploadDataLakeStorageResult result;
logger_->log_debug("Uploading file '{}/{}' to Azure Data Lake Storage filesystem '{}'", params.directory_name, params.filename, params.file_system_name);
try {
auto file_created = data_lake_storage_client_->createFile(params);
if (!file_created && !params.replace_file) {
logger_->log_warn("File '{}/{}' already exists on Azure Data Lake Storage filesystem '{}'", params.directory_name, params.filename, params.file_system_name);
result.result_code = UploadResultCode::FILE_ALREADY_EXISTS;
return result;
}
auto upload_url = data_lake_storage_client_->uploadFile(params, buffer);
if (auto query_string_pos = upload_url.find('?'); query_string_pos != std::string::npos) {
upload_url = upload_url.substr(0, query_string_pos);
}
result.primary_uri = upload_url;
return result;
} catch(const std::exception& ex) {
logger_->log_error("An exception occurred while uploading file to Azure Data Lake storage: {}", ex.what());
result.result_code = UploadResultCode::FAILURE;
return result;
}
}
bool AzureDataLakeStorage::deleteFile(const DeleteAzureDataLakeStorageParameters& params) {
try {
return data_lake_storage_client_->deleteFile(params);
} catch (const std::exception& ex) {
logger_->log_error("An exception occurred while deleting '{}/{}' of filesystem '{}': {}", params.directory_name, params.filename, params.file_system_name, ex.what());
return false;
}
}
std::optional<uint64_t> AzureDataLakeStorage::fetchFile(const FetchAzureDataLakeStorageParameters& params, io::OutputStream& stream) {
try {
auto result = data_lake_storage_client_->fetchFile(params);
return internal::pipe(*result, stream);
} catch (const std::exception& ex) {
logger_->log_error("An exception occurred while fetching '{}/{}' of filesystem '{}': {}", params.directory_name, params.filename, params.file_system_name, ex.what());
return std::nullopt;
}
}
std::optional<ListDataLakeStorageResult> AzureDataLakeStorage::listDirectory(const ListAzureDataLakeStorageParameters& params) {
try {
auto list_res = data_lake_storage_client_->listDirectory(params);
ListDataLakeStorageResult result;
for (const auto& azure_element : list_res) {
if (azure_element.IsDirectory) {
continue;
}
ListDataLakeStorageElement element;
auto path = std::filesystem::path(azure_element.Name, std::filesystem::path::format::generic_format);
auto directory = path.parent_path();
auto filename = path.filename();
if (!matchesPathFilter(params.directory_name, params.path_regex, directory.generic_string()) || !matchesFileFilter(params.file_regex, filename.generic_string())) {
continue;
}
element.filename = filename;
element.last_modified = static_cast<std::chrono::system_clock::time_point>(azure_element.LastModified);
element.etag = azure_element.ETag;
element.length = azure_element.FileSize;
element.filesystem = params.file_system_name;
element.file_path = azure_element.Name;
element.directory = directory;
result.push_back(element);
}
return result;
} catch (const std::exception& ex) {
logger_->log_error("An exception occurred while listing directory '{}' of filesystem '{}': {}", params.directory_name, params.file_system_name, ex.what());
return std::nullopt;
}
}
} // namespace org::apache::nifi::minifi::azure::storage