| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "PutGCSObject.h" |
| |
| #include <utility> |
| |
| #include "core/Resource.h" |
| #include "core/FlowFile.h" |
| #include "core/ProcessContext.h" |
| #include "core/ProcessSession.h" |
| #include "../GCPAttributes.h" |
| |
| namespace gcs = ::google::cloud::storage; |
| |
| namespace org::apache::nifi::minifi::extensions::gcp { |
| const core::Property PutGCSObject::Bucket( |
| core::PropertyBuilder::createProperty("Bucket") |
| ->withDescription("Bucket of the object.") |
| ->withDefaultValue("${gcs.bucket}") |
| ->supportsExpressionLanguage(true) |
| ->build()); |
| |
| const core::Property PutGCSObject::Key( |
| core::PropertyBuilder::createProperty("Key") |
| ->withDescription("Name of the object.") |
| ->withDefaultValue("${filename}") |
| ->supportsExpressionLanguage(true) |
| ->build()); |
| |
| const core::Property PutGCSObject::ContentType( |
| core::PropertyBuilder::createProperty("Content Type") |
| ->withDescription("Content Type for the file, i.e. text/plain ") |
| ->isRequired(false) |
| ->withDefaultValue("${mime.type}") |
| ->supportsExpressionLanguage(true) |
| ->build()); |
| |
| const core::Property PutGCSObject::MD5Hash( |
| core::PropertyBuilder::createProperty("MD5 Hash") |
| ->withDescription("MD5 Hash (encoded in Base64) of the file for server-side validation.") |
| ->isRequired(false) |
| ->supportsExpressionLanguage(true) |
| ->build()); |
| |
| const core::Property PutGCSObject::Crc32cChecksum( |
| core::PropertyBuilder::createProperty("CRC32C Checksum") |
| ->withDescription("CRC32C Checksum (encoded in Base64, big-Endian order) of the file for server-side validation.") |
| ->isRequired(false) |
| ->supportsExpressionLanguage(true) |
| ->build()); |
| |
| const core::Property PutGCSObject::EncryptionKey( |
| core::PropertyBuilder::createProperty("Server Side Encryption Key") |
| ->withDescription("An AES256 Encryption Key (encoded in base64) for server-side encryption of the object.") |
| ->isRequired(false) |
| ->supportsExpressionLanguage(true) |
| ->build()); |
| |
| const core::Property PutGCSObject::ObjectACL( |
| core::PropertyBuilder::createProperty("Object ACL") |
| ->withDescription("Access Control to be attached to the object uploaded. Not providing this will revert to bucket defaults.") |
| ->isRequired(false) |
| ->withAllowableValues(PredefinedAcl::values()) |
| ->build()); |
| |
| const core::Property PutGCSObject::OverwriteObject( |
| core::PropertyBuilder::createProperty("Overwrite Object") |
| ->withDescription("If false, the upload to GCS will succeed only if the object does not exist.") |
| ->withDefaultValue<bool>(true) |
| ->build()); |
| |
| const core::Relationship PutGCSObject::Success("success", "Files that have been successfully written to Google Cloud Storage are transferred to this relationship"); |
| const core::Relationship PutGCSObject::Failure("failure", "Files that could not be written to Google Cloud Storage for some reason are transferred to this relationship"); |
| |
| namespace { |
| class UploadToGCSCallback { |
| public: |
| UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key) |
| : bucket_(std::move(bucket)), |
| key_(std::move(key)), |
| client_(client) { |
| } |
| |
| int64_t operator()(const std::shared_ptr<io::BaseStream>& stream) { |
| std::string content; |
| content.resize(stream->size()); |
| const auto read_ret = stream->read(gsl::make_span(content).as_span<std::byte>()); |
| if (io::isError(read_ret)) { |
| return -1; |
| } |
| auto writer = client_.WriteObject(bucket_, key_, hash_value_, crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, if_generation_match_); |
| writer << content; |
| writer.Close(); |
| result_ = writer.metadata(); |
| return read_ret; |
| } |
| |
| [[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& getResult() const noexcept { |
| return result_; |
| } |
| |
| void setHashValue(const std::string& hash_value_str) { |
| hash_value_ = gcs::MD5HashValue(hash_value_str); |
| } |
| |
| void setCrc32CChecksumValue(const std::string& crc32c_checksum_str) { |
| crc32c_checksum_ = gcs::Crc32cChecksumValue(crc32c_checksum_str); |
| } |
| |
| void setEncryptionKey(const gcs::EncryptionKey& encryption_key) { |
| encryption_key_ = encryption_key; |
| } |
| |
| void setPredefinedAcl(PutGCSObject::PredefinedAcl predefined_acl) { |
| predefined_acl_ = gcs::PredefinedAcl(predefined_acl.toString()); |
| } |
| |
| void setContentType(const std::string& content_type_str) { |
| content_type_ = gcs::ContentType(content_type_str); |
| } |
| |
| void setIfGenerationMatch(std::optional<bool> overwrite) { |
| if (overwrite.has_value() && overwrite.value() == false) { |
| if_generation_match_ = gcs::IfGenerationMatch(0); |
| } else { |
| if_generation_match_ = gcs::IfGenerationMatch(); |
| } |
| } |
| |
| private: |
| std::string bucket_; |
| std::string key_; |
| gcs::Client& client_; |
| |
| gcs::MD5HashValue hash_value_; |
| gcs::Crc32cChecksumValue crc32c_checksum_; |
| gcs::EncryptionKey encryption_key_; |
| gcs::PredefinedAcl predefined_acl_; |
| gcs::ContentType content_type_; |
| gcs::IfGenerationMatch if_generation_match_; |
| |
| google::cloud::StatusOr<gcs::ObjectMetadata> result_; |
| }; |
| |
| } // namespace |
| |
| |
| void PutGCSObject::initialize() { |
| setSupportedProperties({GCPCredentials, |
| Bucket, |
| Key, |
| NumberOfRetries, |
| ContentType, |
| MD5Hash, |
| Crc32cChecksum, |
| EncryptionKey, |
| ObjectACL, |
| OverwriteObject, |
| EndpointOverrideURL}); |
| setSupportedRelationships({Success, Failure}); |
| } |
| |
| |
| void PutGCSObject::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& session_factory) { |
| GCSProcessor::onSchedule(context, session_factory); |
| gsl_Expects(context); |
| if (auto encryption_key = context->getProperty(EncryptionKey)) { |
| try { |
| encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key); |
| } catch (const google::cloud::RuntimeStatusError&) { |
| throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Could not decode the base64-encoded encryption key from property " + EncryptionKey.getName()); |
| } |
| } |
| } |
| |
| void PutGCSObject::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { |
| gsl_Expects(context && session && gcp_credentials_); |
| |
| auto flow_file = session->get(); |
| if (!flow_file) { |
| context->yield(); |
| return; |
| } |
| |
| auto bucket = context->getProperty(Bucket, flow_file); |
| if (!bucket || bucket->empty()) { |
| logger_->log_error("Missing bucket name"); |
| session->transfer(flow_file, Failure); |
| return; |
| } |
| auto object_name = context->getProperty(Key, flow_file); |
| if (!object_name || object_name->empty()) { |
| logger_->log_error("Missing object name"); |
| session->transfer(flow_file, Failure); |
| return; |
| } |
| |
| gcs::Client client = getClient(); |
| UploadToGCSCallback callback(client, *bucket, *object_name); |
| |
| if (auto crc32_checksum = context->getProperty(Crc32cChecksum, flow_file)) { |
| callback.setCrc32CChecksumValue(*crc32_checksum); |
| } |
| |
| if (auto md5_hash = context->getProperty(MD5Hash, flow_file)) { |
| callback.setHashValue(*md5_hash); |
| } |
| |
| auto content_type = context->getProperty(ContentType, flow_file); |
| if (content_type && !content_type->empty()) |
| callback.setContentType(*content_type); |
| |
| if (auto predefined_acl = context->getProperty<PredefinedAcl>(ObjectACL)) |
| callback.setPredefinedAcl(*predefined_acl); |
| callback.setIfGenerationMatch(context->getProperty<bool>(OverwriteObject)); |
| |
| callback.setEncryptionKey(encryption_key_); |
| |
| session->read(flow_file, std::ref(callback)); |
| auto& result = callback.getResult(); |
| if (!result.ok()) { |
| flow_file->setAttribute(GCS_STATUS_MESSAGE, result.status().message()); |
| flow_file->setAttribute(GCS_ERROR_REASON, result.status().error_info().reason()); |
| flow_file->setAttribute(GCS_ERROR_DOMAIN, result.status().error_info().domain()); |
| logger_->log_error("Failed to upload to Google Cloud Storage %s %s", result.status().message(), result.status().error_info().reason()); |
| session->transfer(flow_file, Failure); |
| } else { |
| setAttributesFromObjectMetadata(*flow_file, *result); |
| session->transfer(flow_file, Success); |
| } |
| } |
| |
| REGISTER_RESOURCE(PutGCSObject, "Puts flow files to a Google Cloud Storage Bucket."); |
| } // namespace org::apache::nifi::minifi::extensions::gcp |