blob: 02fd95e973ddba098402e239260057f0c932c08f [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "GetCouchbaseKey.h"
#include "utils/gsl.h"
#include "core/Resource.h"
#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::couchbase::processors {
void GetCouchbaseKey::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
couchbase_cluster_service_ = controllers::CouchbaseClusterService::getFromProperty(context, GetCouchbaseKey::CouchbaseClusterControllerService);
document_type_ = utils::parseEnumProperty<CouchbaseValueType>(context, GetCouchbaseKey::DocumentType);
}
void GetCouchbaseKey::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
gsl_Expects(couchbase_cluster_service_);
auto flow_file = session.get();
if (!flow_file) {
context.yield();
return;
}
CouchbaseCollection collection;
if (const auto bucket_name = utils::parseOptionalProperty(context, BucketName, flow_file.get())) {
collection.bucket_name = *bucket_name;
} else {
logger_->log_error("Bucket '{}' is invalid or empty!", collection.bucket_name);
session.transfer(flow_file, Failure);
return;
}
collection.scope_name = utils::parseOptionalProperty(context, ScopeName, flow_file.get()).value_or(::couchbase::scope::default_name);
collection.collection_name = utils::parseOptionalProperty(context, CollectionName, flow_file.get()).value_or(::couchbase::collection::default_name);
std::string document_id = utils::parseOptionalProperty(context, DocumentId, flow_file.get()) | utils::valueOrElse([&flow_file, &session] {
const auto ff_content = session.readBuffer(flow_file).buffer;
return std::string(reinterpret_cast<const char*>(ff_content.data()), ff_content.size());
});
if (document_id.empty()) {
logger_->log_error("Document ID is empty, transferring FlowFile to failure relationship");
session.transfer(flow_file, Failure);
return;
}
std::string attribute_to_put_result_to = utils::parseOptionalProperty(context, PutValueToAttribute, flow_file.get()).value_or("");
if (auto get_result = couchbase_cluster_service_->get(collection, document_id, document_type_)) {
if (!attribute_to_put_result_to.empty()) {
if (document_type_ == CouchbaseValueType::String) {
session.putAttribute(*flow_file, attribute_to_put_result_to, std::get<std::string>(get_result->value));
} else {
auto& binary_data = std::get<std::vector<std::byte>>(get_result->value);
std::string str_value{reinterpret_cast<const char*>(binary_data.data()), binary_data.size()};
session.putAttribute(*flow_file, attribute_to_put_result_to, str_value);
}
} else {
session.write(flow_file, [&, this](const std::shared_ptr<io::OutputStream>& stream) -> int64_t {
if (document_type_ == CouchbaseValueType::String) {
auto& value = std::get<std::string>(get_result->value);
stream->write(value);
return gsl::narrow<int64_t>(value.size());
} else {
auto& value = std::get<std::vector<std::byte>>(get_result->value);
stream->write(value);
return gsl::narrow<int64_t>(value.size());
}
});
}
session.putAttribute(*flow_file, "couchbase.bucket", get_result->bucket_name);
session.putAttribute(*flow_file, "couchbase.doc.id", document_id);
session.putAttribute(*flow_file, "couchbase.doc.cas", std::to_string(get_result->cas));
session.putAttribute(*flow_file, "couchbase.doc.expiry", get_result->expiry);
session.transfer(flow_file, Success);
} else if (get_result.error() == CouchbaseErrorType::TEMPORARY) {
logger_->log_error("Failed to get document '{}' from collection '{}.{}.{}' due to timeout, transferring to retry relationship",
document_id, collection.bucket_name, collection.scope_name, collection.collection_name);
session.transfer(flow_file, Retry);
} else {
logger_->log_error("Failed to get document '{}' from collection '{}.{}.{}', transferring to failure relationship",
document_id, collection.bucket_name, collection.scope_name, collection.collection_name);
session.transfer(flow_file, Failure);
}
}
REGISTER_RESOURCE(GetCouchbaseKey, Processor);
} // namespace org::apache::nifi::minifi::couchbase::processors