| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #pragma once |
| |
| #include <memory> |
| #include <string> |
| #include <utility> |
| |
| #include "../GCPAttributes.h" |
| #include "GCSProcessor.h" |
| #include "minifi-cpp/core/PropertyDefinition.h" |
| #include "minifi-cpp/core/PropertyValidator.h" |
| #include "minifi-cpp/core/RelationshipDefinition.h" |
| #include "core/logging/LoggerFactory.h" |
| #include "utils/ArrayUtils.h" |
| #include "utils/Enum.h" |
| #include "google/cloud/storage/well_known_headers.h" |
| |
| namespace org::apache::nifi::minifi::extensions::gcp::put_gcs_object { |
| enum class PredefinedAcl { |
| AUTHENTICATED_READ, |
| BUCKET_OWNER_FULL_CONTROL, |
| BUCKET_OWNER_READ_ONLY, |
| PRIVATE, |
| PROJECT_PRIVATE, |
| PUBLIC_READ_ONLY, |
| PUBLIC_READ_WRITE |
| }; |
| } // namespace org::apache::nifi::minifi::extensions::gcp::put_gcs_object |
| |
| namespace magic_enum::customize { |
| using PredefinedAcl = org::apache::nifi::minifi::extensions::gcp::put_gcs_object::PredefinedAcl; |
| |
| template <> |
| constexpr customize_t enum_name<PredefinedAcl>(PredefinedAcl value) noexcept { |
| switch (value) { |
| case PredefinedAcl::AUTHENTICATED_READ: |
| return "authenticatedRead"; |
| case PredefinedAcl::BUCKET_OWNER_FULL_CONTROL: |
| return "bucketOwnerFullControl"; |
| case PredefinedAcl::BUCKET_OWNER_READ_ONLY: |
| return "bucketOwnerRead"; |
| case PredefinedAcl::PRIVATE: |
| return "private"; |
| case PredefinedAcl::PROJECT_PRIVATE: |
| return "projectPrivate"; |
| case PredefinedAcl::PUBLIC_READ_ONLY: |
| return "publicRead"; |
| case PredefinedAcl::PUBLIC_READ_WRITE: |
| return "publicReadWrite"; |
| } |
| return invalid_tag; |
| } |
| } // namespace magic_enum::customize |
| |
| namespace org::apache::nifi::minifi::extensions::gcp { |
| |
| class PutGCSObject : public GCSProcessor { |
| public: |
| using GCSProcessor::GCSProcessor; |
| ~PutGCSObject() override = default; |
| |
| EXTENSIONAPI static constexpr const char* Description = "Puts flow files to a Google Cloud Storage Bucket."; |
| |
| EXTENSIONAPI static constexpr auto Bucket = core::PropertyDefinitionBuilder<>::createProperty("Bucket") |
| .withDescription("Bucket of the object.") |
| .withDefaultValue("${gcs.bucket}") |
| .supportsExpressionLanguage(true) |
| .build(); |
| EXTENSIONAPI static constexpr auto Key = core::PropertyDefinitionBuilder<>::createProperty("Key") |
| .withDescription("Name of the object.") |
| .withDefaultValue("${filename}") |
| .supportsExpressionLanguage(true) |
| .build(); |
| EXTENSIONAPI static constexpr auto ContentType = core::PropertyDefinitionBuilder<>::createProperty("Content Type") |
| .withDescription("Content Type for the file, i.e. text/plain ") |
| .isRequired(false) |
| .withDefaultValue("${mime.type}") |
| .supportsExpressionLanguage(true) |
| .build(); |
| EXTENSIONAPI static constexpr auto MD5Hash = core::PropertyDefinitionBuilder<>::createProperty("MD5 Hash") |
| .withDescription("MD5 Hash (encoded in Base64) of the file for server-side validation.") |
| .isRequired(false) |
| .supportsExpressionLanguage(true) |
| .build(); |
| EXTENSIONAPI static constexpr auto Crc32cChecksum = core::PropertyDefinitionBuilder<>::createProperty("CRC32C Checksum") |
| .withDescription("CRC32C Checksum (encoded in Base64, big-Endian order) of the file for server-side validation.") |
| .isRequired(false) |
| .supportsExpressionLanguage(true) |
| .build(); |
| EXTENSIONAPI static constexpr auto EncryptionKey = core::PropertyDefinitionBuilder<>::createProperty("Server Side Encryption Key") |
| .withDescription("An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.") |
| .isRequired(false) |
| .supportsExpressionLanguage(true) |
| .isSensitive(true) |
| .build(); |
| EXTENSIONAPI static constexpr auto ObjectACL = core::PropertyDefinitionBuilder<magic_enum::enum_count<put_gcs_object::PredefinedAcl>()>::createProperty("Object ACL") |
| .withDescription("Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults.") |
| .isRequired(false) |
| .withAllowedValues(magic_enum::enum_names<put_gcs_object::PredefinedAcl>()) |
| .build(); |
| EXTENSIONAPI static constexpr auto OverwriteObject = core::PropertyDefinitionBuilder<>::createProperty("Overwrite Object") |
| .withDescription("If false, the upload to GCS will succeed only if the object does not exist.") |
| .withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR) |
| .withDefaultValue("true") |
| .build(); |
| EXTENSIONAPI static constexpr auto Properties = utils::array_cat(GCSProcessor::Properties, std::to_array<core::PropertyReference>({ |
| Bucket, |
| Key, |
| ContentType, |
| MD5Hash, |
| Crc32cChecksum, |
| EncryptionKey, |
| ObjectACL, |
| OverwriteObject |
| })); |
| |
| |
| EXTENSIONAPI static constexpr auto Success = core::RelationshipDefinition{"success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship"}; |
| EXTENSIONAPI static constexpr auto Failure = core::RelationshipDefinition{"failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship"}; |
| EXTENSIONAPI static constexpr auto Relationships = std::array{Success, Failure}; |
| |
| EXTENSIONAPI static constexpr auto Message = core::OutputAttributeDefinition<>{GCS_STATUS_MESSAGE, { Failure }, "The status message received from google cloud."}; |
| EXTENSIONAPI static constexpr auto Reason = core::OutputAttributeDefinition<>{GCS_ERROR_REASON, { Failure }, "The description of the error occurred during upload."}; |
| EXTENSIONAPI static constexpr auto Domain = core::OutputAttributeDefinition<>{GCS_ERROR_DOMAIN, { Failure }, "The domain of the error occurred during upload."}; |
| EXTENSIONAPI static constexpr auto BucketOutputAttribute = core::OutputAttributeDefinition<>{GCS_BUCKET_ATTR, { Success }, "Bucket of the object."}; |
| EXTENSIONAPI static constexpr auto KeyOutputAttribute = core::OutputAttributeDefinition<>{GCS_OBJECT_NAME_ATTR, { Success }, "Name of the object."}; |
| EXTENSIONAPI static constexpr auto Size = core::OutputAttributeDefinition<>{GCS_SIZE_ATTR, { Success }, "Size of the object."}; |
| EXTENSIONAPI static constexpr auto Crc32c = core::OutputAttributeDefinition<>{GCS_CRC32C_ATTR, { Success }, "The CRC32C checksum of object's data, encoded in base64."}; |
| EXTENSIONAPI static constexpr auto Md5 = core::OutputAttributeDefinition<>{GCS_MD5_ATTR, { Success }, "The MD5 hash of the object's data, encoded in base64."}; |
| EXTENSIONAPI static constexpr auto OwnerEntity = core::OutputAttributeDefinition<>{GCS_OWNER_ENTITY_ATTR, { Success }, "The owner entity, in the form \"user-emailAddress\"."}; |
| EXTENSIONAPI static constexpr auto OwnerEntityId = core::OutputAttributeDefinition<>{GCS_OWNER_ENTITY_ID_ATTR, { Success }, "The ID for the entity."}; |
| EXTENSIONAPI static constexpr auto ContentEncoding = core::OutputAttributeDefinition<>{GCS_CONTENT_ENCODING_ATTR, { Success }, "The content encoding of the object."}; |
| EXTENSIONAPI static constexpr auto ContentLanguage = core::OutputAttributeDefinition<>{GCS_CONTENT_LANGUAGE_ATTR, { Success }, "The content language of the object."}; |
| EXTENSIONAPI static constexpr auto ContentDisposition = core::OutputAttributeDefinition<>{GCS_CONTENT_DISPOSITION_ATTR, { Success }, "The data content disposition of the object."}; |
| EXTENSIONAPI static constexpr auto MediaLink = core::OutputAttributeDefinition<>{GCS_MEDIA_LINK_ATTR, { Success }, "The media download link to the object."}; |
| EXTENSIONAPI static constexpr auto SelfLink = core::OutputAttributeDefinition<>{GCS_SELF_LINK_ATTR, { Success }, "The link to this object."}; |
| EXTENSIONAPI static constexpr auto Etag = core::OutputAttributeDefinition<>{GCS_ETAG_ATTR, { Success }, "The HTTP 1.1 Entity tag for the object."}; |
| EXTENSIONAPI static constexpr auto GeneratedId = core::OutputAttributeDefinition<>{GCS_GENERATED_ID, { Success }, "The service-generated ID for the object."}; |
| EXTENSIONAPI static constexpr auto Generation = core::OutputAttributeDefinition<>{GCS_GENERATION, { Success }, "The content generation of this object. Used for object versioning."}; |
| EXTENSIONAPI static constexpr auto Metageneration = core::OutputAttributeDefinition<>{GCS_META_GENERATION, { Success }, "The metageneration of the object."}; |
| EXTENSIONAPI static constexpr auto CreateTime = core::OutputAttributeDefinition<>{GCS_CREATE_TIME_ATTR, { Success }, "Unix timestamp of the object's creation in milliseconds."}; |
| EXTENSIONAPI static constexpr auto UpdateTime = core::OutputAttributeDefinition<>{GCS_UPDATE_TIME_ATTR, { Success }, "Unix timestamp of the object's last modification in milliseconds."}; |
| EXTENSIONAPI static constexpr auto DeleteTime = core::OutputAttributeDefinition<>{GCS_DELETE_TIME_ATTR, { Success }, "Unix timestamp of the object's deletion in milliseconds."}; |
| EXTENSIONAPI static constexpr auto EncryptionAlgorithm = core::OutputAttributeDefinition<>{GCS_ENCRYPTION_ALGORITHM_ATTR, { Success }, "The algorithm used to encrypt the object."}; |
| EXTENSIONAPI static constexpr auto EncryptionSha256 = core::OutputAttributeDefinition<>{GCS_ENCRYPTION_SHA256_ATTR, { Success }, "The SHA256 hash of the key used to encrypt the object."}; |
| EXTENSIONAPI static constexpr auto OutputAttributes = std::array<core::OutputAttributeReference, 24>{ |
| Message, |
| Reason, |
| Domain, |
| BucketOutputAttribute, |
| KeyOutputAttribute, |
| Size, |
| Crc32c, |
| Md5, |
| OwnerEntity, |
| OwnerEntityId, |
| ContentEncoding, |
| ContentLanguage, |
| ContentDisposition, |
| MediaLink, |
| SelfLink, |
| Etag, |
| GeneratedId, |
| Generation, |
| Metageneration, |
| CreateTime, |
| UpdateTime, |
| DeleteTime, |
| EncryptionAlgorithm, |
| EncryptionSha256 |
| }; |
| |
| EXTENSIONAPI static constexpr bool SupportsDynamicProperties = false; |
| EXTENSIONAPI static constexpr bool SupportsDynamicRelationships = false; |
| EXTENSIONAPI static constexpr core::annotation::Input InputRequirement = core::annotation::Input::INPUT_REQUIRED; |
| EXTENSIONAPI static constexpr bool IsSingleThreaded = false; |
| |
| ADD_COMMON_VIRTUAL_FUNCTIONS_FOR_PROCESSORS |
| |
| void initialize() override; |
| void onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) override; |
| void onTrigger(core::ProcessContext& context, core::ProcessSession& session) override; |
| |
| private: |
| google::cloud::storage::EncryptionKey encryption_key_; |
| }; |
| |
| } // namespace org::apache::nifi::minifi::extensions::gcp |