| /** |
| * @file AzureDataLakeStorageClient.cpp |
| * AzureDataLakeStorageClient class implementation |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <utility> |
| |
| #include "AzureDataLakeStorageClient.h" |
| #include "azure/core/http/http.hpp" |
| #include "azure/storage/files/datalake/datalake_options.hpp" |
| |
| #include "azure/identity.hpp" |
| |
| #include "utils/AzureSdkLogger.h" |
| #include "utils/span.h" |
| |
| namespace org::apache::nifi::minifi::azure::storage { |
| |
| AzureDataLakeStorageClient::AzureDataLakeStorageClient() { |
| utils::AzureSdkLogger::initialize(); |
| } |
| |
| std::unique_ptr<Azure::Storage::Files::DataLake::DataLakeFileSystemClient> AzureDataLakeStorageClient::createClient( |
| const AzureStorageCredentials& credentials, const std::string& file_system_name, std::optional<uint64_t> number_of_retries) { |
| Azure::Storage::Files::DataLake::DataLakeClientOptions options; |
| if (number_of_retries) { |
| options.Retry.MaxRetries = gsl::narrow<int32_t>(*number_of_retries); |
| } |
| |
| if (credentials.getUseManagedIdentityCredentials()) { |
| auto datalake_service_client = Azure::Storage::Files::DataLake::DataLakeServiceClient( |
| "https://" + credentials.getStorageAccountName() + ".dfs." + credentials.getEndpointSuffix(), std::make_shared<Azure::Identity::ManagedIdentityCredential>(), options); |
| return std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>(datalake_service_client.GetFileSystemClient(file_system_name)); |
| } else { |
| return std::make_unique<Azure::Storage::Files::DataLake::DataLakeFileSystemClient>( |
| Azure::Storage::Files::DataLake::DataLakeFileSystemClient::CreateFromConnectionString(credentials.buildConnectionString(), file_system_name, options)); |
| } |
| } |
| |
| Azure::Storage::Files::DataLake::DataLakeDirectoryClient AzureDataLakeStorageClient::getDirectoryClient(const AzureDataLakeStorageParameters& params) { |
| auto client = createClient(params.credentials, params.file_system_name, params.number_of_retries); |
| return client->GetDirectoryClient(params.directory_name); |
| } |
| |
| Azure::Storage::Files::DataLake::DataLakeFileClient AzureDataLakeStorageClient::getFileClient(const AzureDataLakeStorageFileOperationParameters& params) { |
| auto directory_client = getDirectoryClient(params); |
| if (!params.directory_name.empty()) { |
| directory_client.CreateIfNotExists(); |
| } |
| return directory_client.GetFileClient(params.filename); |
| } |
| |
| bool AzureDataLakeStorageClient::createFile(const PutAzureDataLakeStorageParameters& params) { |
| auto file_client = getFileClient(params); |
| auto response = file_client.CreateIfNotExists(); |
| return response.Value.Created; |
| } |
| |
| std::string AzureDataLakeStorageClient::uploadFile(const PutAzureDataLakeStorageParameters& params, std::span<const std::byte> buffer) { |
| auto file_client = getFileClient(params); |
| file_client.UploadFrom(minifi::utils::as_span<const uint8_t>(buffer).data(), buffer.size()); |
| return file_client.GetUrl(); |
| } |
| |
| bool AzureDataLakeStorageClient::deleteFile(const DeleteAzureDataLakeStorageParameters& params) { |
| auto file_client = getFileClient(params); |
| auto result = file_client.Delete(); |
| return result.Value.Deleted; |
| } |
| |
| std::unique_ptr<io::InputStream> AzureDataLakeStorageClient::fetchFile(const FetchAzureDataLakeStorageParameters& params) { |
| auto file_client = getFileClient(params); |
| Azure::Storage::Files::DataLake::DownloadFileOptions options; |
| if (params.range_start || params.range_length) { |
| Azure::Core::Http::HttpRange range; |
| if (params.range_start) { |
| range.Offset = gsl::narrow<int64_t>(*params.range_start); |
| } |
| |
| if (params.range_length) { |
| range.Length = *params.range_length; |
| } |
| options.Range = range; |
| } |
| auto result = file_client.Download(options); |
| return std::make_unique<AzureDataLakeStorageInputStream>(std::move(result.Value)); |
| } |
| |
| std::vector<Azure::Storage::Files::DataLake::Models::PathItem> AzureDataLakeStorageClient::listDirectory(const ListAzureDataLakeStorageParameters& params) { |
| std::vector<Azure::Storage::Files::DataLake::Models::PathItem> result; |
| if (params.directory_name.empty()) { |
| auto client = createClient(params.credentials, params.file_system_name, params.number_of_retries); |
| for (auto page_result = client->ListPaths(params.recurse_subdirectories); page_result.HasPage(); page_result.MoveToNextPage()) { |
| result.insert(result.end(), page_result.Paths.begin(), page_result.Paths.end()); |
| } |
| } else { |
| auto directory_client = getDirectoryClient(params); |
| for (auto page_result = directory_client.ListPaths(params.recurse_subdirectories); page_result.HasPage(); page_result.MoveToNextPage()) { |
| result.insert(result.end(), page_result.Paths.begin(), page_result.Paths.end()); |
| } |
| } |
| return result; |
| } |
| |
| } // namespace org::apache::nifi::minifi::azure::storage |