blob: 51d18005361b8d0bd1c4595e5571c5bffec650de [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "unit/TestBase.h"
#include "unit/TestUtils.h"
#include "unit/Catch.h"
#include "unit/SingleProcessorTestController.h"
#include "processors/PutCouchbaseKey.h"
#include "MockCouchbaseClusterService.h"
#include "core/Resource.h"
namespace org::apache::nifi::minifi::couchbase::test {
REGISTER_RESOURCE(MockCouchbaseClusterService, ControllerService);
const std::string TEST_UUID = "a53f0e78-b91a-4a82-939b-639174edb00b";
struct ExpectedCallOptions {
std::string bucket_name;
std::string scope_name;
std::string collection_name;
::couchbase::persist_to persist_to;
::couchbase::replicate_to replicate_to;
CouchbaseValueType document_type;
std::string doc_id;
};
class PutCouchbaseKeyTestController : public TestController {
public:
PutCouchbaseKeyTestController()
: controller_(minifi::test::utils::make_processor<processors::PutCouchbaseKey>("PutCouchbaseKey")),
proc_(controller_.getProcessor()) {
LogTestController::getInstance().setDebug<TestPlan>();
LogTestController::getInstance().setDebug<minifi::core::Processor>();
LogTestController::getInstance().setTrace<minifi::core::ProcessSession>();
LogTestController::getInstance().setDebug<controllers::CouchbaseClusterService>();
LogTestController::getInstance().setDebug<processors::PutCouchbaseKey>();
auto controller_service_node = controller_.plan->addController("MockCouchbaseClusterService", "MockCouchbaseClusterService");
mock_couchbase_cluster_service_ = std::dynamic_pointer_cast<MockCouchbaseClusterService>(controller_service_node->getControllerServiceImplementation());
gsl_Assert(mock_couchbase_cluster_service_);
REQUIRE(proc_->setProperty(processors::PutCouchbaseKey::CouchbaseClusterControllerService.name, "MockCouchbaseClusterService"));
}
[[nodiscard]] static std::vector<std::byte> stringToByteVector(const std::string& str) {
std::vector<std::byte> byte_vector;
byte_vector.reserve(str.size());
for (char ch : str) {
byte_vector.push_back(static_cast<std::byte>(ch));
}
return byte_vector;
}
void verifyResults(const minifi::test::ProcessorTriggerResult& results, const minifi::core::Relationship& expected_result, const ExpectedCallOptions& expected_call_options,
const std::string& input) const {
std::shared_ptr<core::FlowFile> flow_file;
if (expected_result == processors::PutCouchbaseKey::Success) {
REQUIRE(results.at(processors::PutCouchbaseKey::Success).size() == 1);
REQUIRE(results.at(processors::PutCouchbaseKey::Failure).empty());
REQUIRE(results.at(processors::PutCouchbaseKey::Retry).empty());
flow_file = results.at(processors::PutCouchbaseKey::Success)[0];
} else if (expected_result == processors::PutCouchbaseKey::Failure) {
REQUIRE(results.at(processors::PutCouchbaseKey::Success).empty());
REQUIRE(results.at(processors::PutCouchbaseKey::Failure).size() == 1);
REQUIRE(results.at(processors::PutCouchbaseKey::Retry).empty());
flow_file = results.at(processors::PutCouchbaseKey::Failure)[0];
REQUIRE(LogTestController::getInstance().contains("Failed to upsert document", 1s));
} else {
REQUIRE(results.at(processors::PutCouchbaseKey::Success).empty());
REQUIRE(results.at(processors::PutCouchbaseKey::Failure).empty());
REQUIRE(results.at(processors::PutCouchbaseKey::Retry).size() == 1);
flow_file = results.at(processors::PutCouchbaseKey::Retry)[0];
}
auto get_collection_parameters = mock_couchbase_cluster_service_->getCollectionParameter();
CHECK(get_collection_parameters.bucket_name == expected_call_options.bucket_name);
CHECK(get_collection_parameters.collection_name == expected_call_options.collection_name);
CHECK(get_collection_parameters.scope_name == expected_call_options.scope_name);
auto upsert_parameters = mock_couchbase_cluster_service_->getUpsertParameters();
CHECK(upsert_parameters.document_type == expected_call_options.document_type);
auto expected_doc_id = expected_call_options.doc_id.empty() ? TEST_UUID : expected_call_options.doc_id;
CHECK(upsert_parameters.document_id == expected_doc_id);
CHECK(upsert_parameters.buffer == stringToByteVector(input));
auto upsert_options = upsert_parameters.options.build();
CHECK(upsert_options.persist_to == expected_call_options.persist_to);
CHECK(upsert_options.replicate_to == expected_call_options.replicate_to);
if (expected_result != processors::PutCouchbaseKey::Success) {
return;
}
CHECK(flow_file->getAttribute("couchbase.bucket").value() == expected_call_options.bucket_name);
CHECK(flow_file->getAttribute("couchbase.doc.id").value() == expected_doc_id);
CHECK(flow_file->getAttribute("couchbase.doc.cas").value() == std::to_string(COUCHBASE_PUT_RESULT_CAS));
CHECK(flow_file->getAttribute("couchbase.doc.sequence.number").value() == std::to_string(COUCHBASE_PUT_RESULT_SEQUENCE_NUMBER));
CHECK(flow_file->getAttribute("couchbase.partition.uuid").value() == std::to_string(COUCHBASE_PUT_RESULT_PARTITION_UUID));
CHECK(flow_file->getAttribute("couchbase.partition.id").value() == std::to_string(COUCHBASE_PUT_RESULT_PARTITION_ID));
}
protected:
minifi::test::SingleProcessorTestController controller_;
core::Processor* proc_ = nullptr;
std::shared_ptr<MockCouchbaseClusterService> mock_couchbase_cluster_service_;
};
TEST_CASE_METHOD(PutCouchbaseKeyTestController, "Invalid Couchbase cluster controller service", "[putcouchbasekey]") {
REQUIRE(proc_->setProperty(processors::PutCouchbaseKey::CouchbaseClusterControllerService.name, "invalid"));
REQUIRE_THROWS_AS(controller_.trigger({minifi::test::InputFlowFileData{"{\"name\": \"John\"}\n{\"name\": \"Jill\"}", {{"uuid", TEST_UUID}}}}), minifi::Exception);
}
TEST_CASE_METHOD(PutCouchbaseKeyTestController, "Can't set empty bucket name", "[getcouchbasekey]") {
CHECK_FALSE(proc_->setProperty(processors::PutCouchbaseKey::BucketName.name, ""));
}
TEST_CASE_METHOD(PutCouchbaseKeyTestController, "Empty evaluated bucket name", "[getcouchbasekey]") {
CHECK(proc_->setProperty(processors::PutCouchbaseKey::BucketName.name, "${bucket_name}"));
auto results = controller_.trigger({minifi::test::InputFlowFileData{"couchbase_id"}});
REQUIRE(results[processors::PutCouchbaseKey::Failure].size() == 1);
REQUIRE(LogTestController::getInstance().contains("Bucket '' is invalid or empty!", 1s));
}
TEST_CASE_METHOD(PutCouchbaseKeyTestController, "Put succeeeds with default properties", "[putcouchbasekey]") {
REQUIRE(proc_->setProperty(processors::PutCouchbaseKey::BucketName.name, "mybucket"));
const std::string input = "{\"name\": \"John\"}\n{\"name\": \"Jill\"}";
auto results = controller_.trigger({minifi::test::InputFlowFileData{input, {{"uuid", TEST_UUID}}}});
verifyResults(results, processors::PutCouchbaseKey::Success, ExpectedCallOptions{"mybucket", "_default", "_default",
::couchbase::persist_to::none, ::couchbase::replicate_to::none, CouchbaseValueType::Json, ""}, input);
}
TEST_CASE_METHOD(PutCouchbaseKeyTestController, "Put succeeeds with optional properties", "[putcouchbasekey]") {
REQUIRE(proc_->setProperty(processors::PutCouchbaseKey::BucketName.name, "mybucket"));
REQUIRE(proc_->setProperty(processors::PutCouchbaseKey::ScopeName.name, "scope1"));
REQUIRE(proc_->setProperty(processors::PutCouchbaseKey::CollectionName.name, "collection1"));
REQUIRE(proc_->setProperty(processors::PutCouchbaseKey::DocumentType.name, "Binary"));
REQUIRE(proc_->setProperty(processors::PutCouchbaseKey::DocumentId.name, "important_doc"));
REQUIRE(proc_->setProperty(processors::PutCouchbaseKey::PersistTo.name, "ACTIVE"));
REQUIRE(proc_->setProperty(processors::PutCouchbaseKey::ReplicateTo.name, "TWO"));
const std::string input = "{\"name\": \"John\"}\n{\"name\": \"Jill\"}";
auto results = controller_.trigger({minifi::test::InputFlowFileData{input, {{"uuid", TEST_UUID}}}});
verifyResults(results, processors::PutCouchbaseKey::Success, ExpectedCallOptions{"mybucket", "scope1", "collection1", ::couchbase::persist_to::active,
::couchbase::replicate_to::two, CouchbaseValueType::Binary, "important_doc"}, input);
}
TEST_CASE_METHOD(PutCouchbaseKeyTestController, "Put fails with default properties", "[putcouchbasekey]") {
REQUIRE(proc_->setProperty(processors::PutCouchbaseKey::BucketName.name, "mybucket"));
mock_couchbase_cluster_service_->setUpsertError(CouchbaseErrorType::FATAL);
const std::string input = "{\"name\": \"John\"}\n{\"name\": \"Jill\"}";
auto results = controller_.trigger({minifi::test::InputFlowFileData{input, {{"uuid", TEST_UUID}}}});
verifyResults(results, processors::PutCouchbaseKey::Failure, ExpectedCallOptions{"mybucket", "_default", "_default", ::couchbase::persist_to::none, ::couchbase::replicate_to::none,
CouchbaseValueType::Json, ""}, input);
}
TEST_CASE_METHOD(PutCouchbaseKeyTestController, "FlowFile is transferred to retry relationship when temporary error is returned", "[putcouchbasekey]") {
REQUIRE(proc_->setProperty(processors::PutCouchbaseKey::BucketName.name, "mybucket"));
mock_couchbase_cluster_service_->setUpsertError(CouchbaseErrorType::TEMPORARY);
const std::string input = "{\"name\": \"John\"}\n{\"name\": \"Jill\"}";
auto results = controller_.trigger({minifi::test::InputFlowFileData{input, {{"uuid", TEST_UUID}}}});
verifyResults(results, processors::PutCouchbaseKey::Retry, ExpectedCallOptions{"mybucket", "_default", "_default", ::couchbase::persist_to::none, ::couchbase::replicate_to::none,
CouchbaseValueType::Json, ""}, input);
}
} // namespace org::apache::nifi::minifi::couchbase::test