blob: 538609ea06ec42b63c1976d2f4b03dff4f9c99e7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "PutGCSObject.h"
#include <utility>
#include "core/Resource.h"
#include "core/FlowFile.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "../GCPAttributes.h"
#include "utils/ProcessorConfigUtils.h"
namespace gcs = ::google::cloud::storage;
namespace org::apache::nifi::minifi::extensions::gcp {
namespace {
class UploadToGCSCallback {
public:
UploadToGCSCallback(gcs::Client& client, std::string bucket, std::string key)
: bucket_(std::move(bucket)),
key_(std::move(key)),
client_(client) {
}
int64_t operator()(const std::shared_ptr<io::InputStream>& stream) {
std::string content;
content.resize(stream->size());
const auto read_ret = stream->read(as_writable_bytes(std::span(content)));
if (io::isError(read_ret)) {
return -1;
}
auto writer = client_.WriteObject(bucket_, key_, hash_value_, crc32c_checksum_, encryption_key_, content_type_, predefined_acl_, if_generation_match_);
writer << content;
writer.Close();
result_ = writer.metadata();
return read_ret;
}
[[nodiscard]] const google::cloud::StatusOr<gcs::ObjectMetadata>& getResult() const noexcept {
return result_;
}
void setHashValue(const std::string& hash_value_str) {
hash_value_ = gcs::MD5HashValue(hash_value_str);
}
void setCrc32CChecksumValue(const std::string& crc32c_checksum_str) {
crc32c_checksum_ = gcs::Crc32cChecksumValue(crc32c_checksum_str);
}
void setEncryptionKey(const gcs::EncryptionKey& encryption_key) {
encryption_key_ = encryption_key;
}
void setPredefinedAcl(put_gcs_object::PredefinedAcl predefined_acl) {
predefined_acl_ = gcs::PredefinedAcl(std::string{magic_enum::enum_name(predefined_acl)});
}
void setContentType(const std::string& content_type_str) {
content_type_ = gcs::ContentType(content_type_str);
}
void setIfGenerationMatch(std::optional<bool> overwrite) {
if (overwrite.has_value() && !overwrite.value()) {
if_generation_match_ = gcs::IfGenerationMatch(0);
} else {
if_generation_match_ = gcs::IfGenerationMatch();
}
}
private:
std::string bucket_;
std::string key_;
gcs::Client& client_;
gcs::MD5HashValue hash_value_;
gcs::Crc32cChecksumValue crc32c_checksum_;
gcs::EncryptionKey encryption_key_;
gcs::PredefinedAcl predefined_acl_;
gcs::ContentType content_type_;
gcs::IfGenerationMatch if_generation_match_;
google::cloud::StatusOr<gcs::ObjectMetadata> result_;
};
} // namespace
void PutGCSObject::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void PutGCSObject::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) {
GCSProcessor::onSchedule(context, session_factory);
if (auto encryption_key = context.getProperty(EncryptionKey)) {
try {
encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key);
} catch (const google::cloud::RuntimeStatusError&) {
throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Could not decode the base64-encoded encryption key from property " + std::string(EncryptionKey.name));
}
}
}
void PutGCSObject::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
gsl_Expects(gcp_credentials_);
auto flow_file = session.get();
if (!flow_file) {
context.yield();
return;
}
auto bucket = context.getProperty(Bucket, flow_file);
if (!bucket || bucket->empty()) {
logger_->log_error("Missing bucket name");
session.transfer(flow_file, Failure);
return;
}
auto object_name = context.getProperty(Key, flow_file);
if (!object_name || object_name->empty()) {
logger_->log_error("Missing object name");
session.transfer(flow_file, Failure);
return;
}
gcs::Client client = getClient();
UploadToGCSCallback callback(client, *bucket, *object_name);
if (auto crc32_checksum = context.getProperty(Crc32cChecksum, flow_file)) {
callback.setCrc32CChecksumValue(*crc32_checksum);
}
if (auto md5_hash = context.getProperty(MD5Hash, flow_file)) {
callback.setHashValue(*md5_hash);
}
auto content_type = context.getProperty(ContentType, flow_file);
if (content_type && !content_type->empty())
callback.setContentType(*content_type);
if (auto predefined_acl = utils::parseOptionalEnumProperty<put_gcs_object::PredefinedAcl>(context, ObjectACL))
callback.setPredefinedAcl(*predefined_acl);
callback.setIfGenerationMatch(context.getProperty<bool>(OverwriteObject));
callback.setEncryptionKey(encryption_key_);
session.read(flow_file, std::ref(callback));
auto& result = callback.getResult();
if (!result.ok()) {
flow_file->setAttribute(GCS_STATUS_MESSAGE, result.status().message());
flow_file->setAttribute(GCS_ERROR_REASON, result.status().error_info().reason());
flow_file->setAttribute(GCS_ERROR_DOMAIN, result.status().error_info().domain());
logger_->log_error("Failed to upload to Google Cloud Storage {} {}", result.status().message(), result.status().error_info().reason());
session.transfer(flow_file, Failure);
} else {
setAttributesFromObjectMetadata(*flow_file, *result);
session.transfer(flow_file, Success);
}
}
REGISTER_RESOURCE(PutGCSObject, Processor);
} // namespace org::apache::nifi::minifi::extensions::gcp