blob: dd52623034fc754c7567b25df5dcf8b4c37c1890 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "FetchGCSObject.h"
#include <utility>
#include "core/Resource.h"
#include "core/FlowFile.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "../GCPAttributes.h"
namespace gcs = ::google::cloud::storage;
namespace org::apache::nifi::minifi::extensions::gcp {
namespace {
class FetchFromGCSCallback {
public:
FetchFromGCSCallback(gcs::Client& client, std::string bucket, std::string key)
: bucket_(std::move(bucket)),
key_(std::move(key)),
client_(client) {
}
int64_t operator()(const std::shared_ptr<io::OutputStream>& stream) {
auto reader = client_.ReadObject(bucket_, key_, encryption_key_, generation_, gcs::IfGenerationNotMatch(0));
auto set_members = gsl::finally([&]{
status_ = reader.status();
result_generation_ = reader.generation();
meta_generation_ = reader.metageneration();
storage_class_ = reader.storage_class();
});
if (!reader)
return 0;
std::string contents{std::istreambuf_iterator<char>{reader}, {}};
auto write_ret = stream->write(gsl::make_span(contents).as_span<std::byte>());
reader.Close();
return write_ret;
}
[[nodiscard]] auto getStatus() const noexcept { return status_; }
[[nodiscard]] auto getGeneration() const noexcept { return result_generation_; }
[[nodiscard]] auto getMetaGeneration() const noexcept { return meta_generation_; }
[[nodiscard]] auto getStorageClass() const noexcept { return storage_class_; }
void setEncryptionKey(const gcs::EncryptionKey& encryption_key) {
encryption_key_ = encryption_key;
}
void setGeneration(const gcs::Generation generation) {
generation_ = generation;
}
private:
std::string bucket_;
std::string key_;
gcs::Client& client_;
gcs::EncryptionKey encryption_key_;
gcs::Generation generation_;
google::cloud::Status status_;
std::optional<std::int64_t> result_generation_;
std::optional<std::int64_t> meta_generation_;
std::optional<std::string> storage_class_;
};
} // namespace
void FetchGCSObject::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
}
void FetchGCSObject::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory& session_factory) {
GCSProcessor::onSchedule(context, session_factory);
if (auto encryption_key = context.getProperty(EncryptionKey)) {
try {
encryption_key_ = gcs::EncryptionKey::FromBase64Key(*encryption_key);
} catch (const google::cloud::RuntimeStatusError&) {
throw minifi::Exception(ExceptionType::PROCESS_SCHEDULE_EXCEPTION, "Could not decode the base64-encoded encryption key from property " + std::string(EncryptionKey.name)); }
}
}
void FetchGCSObject::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
gsl_Expects(gcp_credentials_);
auto flow_file = session.get();
if (!flow_file) {
context.yield();
return;
}
auto bucket = context.getProperty(Bucket, flow_file);
if (!bucket || bucket->empty()) {
logger_->log_error("Missing bucket name");
session.transfer(flow_file, Failure);
return;
}
auto object_name = context.getProperty(Key, flow_file);
if (!object_name || object_name->empty()) {
logger_->log_error("Missing object name");
session.transfer(flow_file, Failure);
return;
}
gcs::Client client = getClient();
FetchFromGCSCallback callback(client, *bucket, *object_name);
callback.setEncryptionKey(encryption_key_);
if (auto gen_str = context.getProperty(ObjectGeneration, flow_file); gen_str && !gen_str->empty()) {
try {
uint64_t gen;
utils::internal::ValueParser(*gen_str).parse(gen).parseEnd();
callback.setGeneration(gcs::Generation(gen));
} catch (const utils::internal::ValueException&) {
logger_->log_error("Invalid generation: {}", *gen_str);
session.transfer(flow_file, Failure);
return;
}
}
session.write(flow_file, std::ref(callback));
if (!callback.getStatus().ok()) {
flow_file->setAttribute(GCS_STATUS_MESSAGE, callback.getStatus().message());
flow_file->setAttribute(GCS_ERROR_REASON, callback.getStatus().error_info().reason());
flow_file->setAttribute(GCS_ERROR_DOMAIN, callback.getStatus().error_info().domain());
logger_->log_error("Failed to fetch from Google Cloud Storage {} {}", callback.getStatus().message(), callback.getStatus().error_info().reason());
session.transfer(flow_file, Failure);
return;
}
if (auto generation = callback.getGeneration())
flow_file->setAttribute(GCS_GENERATION, std::to_string(*generation));
if (auto meta_generation = callback.getMetaGeneration())
flow_file->setAttribute(GCS_META_GENERATION, std::to_string(*meta_generation));
if (auto storage_class = callback.getStorageClass())
flow_file->setAttribute(GCS_STORAGE_CLASS, *storage_class);
session.transfer(flow_file, Success);
}
REGISTER_RESOURCE(FetchGCSObject, Processor);
} // namespace org::apache::nifi::minifi::extensions::gcp