| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include <sys/stat.h> |
| #include <utility> |
| #include <memory> |
| #include <string> |
| #include <vector> |
| #include <set> |
| #include <fstream> |
| |
| #include "utils/file/FileUtils.h" |
| #include "TestBase.h" |
| |
| #include <chrono> |
| #include <thread> |
| #include "api/nanofi.h" |
| |
| std::string test_file_content = "C API raNdOMcaSe test d4t4 th1s is!"; |
| std::string test_file_name = "tstFile.ext"; |
| |
| static nifi_instance *create_instance_obj(const char *name = "random_instance") { |
| nifi_port port; |
| char port_str[] = "12345"; |
| port.port_id = port_str; |
| return create_instance("random_instance", &port); |
| } |
| |
| static int failure_count = 0; |
| |
| static int custom_onschedule_count = 0; |
| |
| void failure_counter(flow_file_record * fr) { |
| failure_count++; |
| REQUIRE(get_attribute_quantity(fr) > 0); |
| free_flowfile(fr); |
| } |
| |
| void big_failure_counter(flow_file_record * fr) { |
| failure_count += 100; |
| free_flowfile(fr); |
| } |
| |
| void custom_onschedule_logic(processor_context* ctx) { |
| custom_onschedule_count++; |
| } |
| |
| void custom_ontrigger_logic(processor_session *ps, processor_context *ctx) { |
| flow_file_record * ffr = get(ps, ctx); |
| REQUIRE(ffr != nullptr); |
| uint8_t * buffer = (uint8_t*)malloc(ffr->size* sizeof(uint8_t)); |
| get_content(ffr, buffer, ffr->size); |
| REQUIRE(strncmp(reinterpret_cast<const char *>(buffer), test_file_content.c_str(), test_file_content.size()) == 0); |
| |
| attribute attr; |
| attr.key = "filename"; |
| attr.value_size = 0; |
| REQUIRE(get_attribute(ffr, &attr) == 0); |
| REQUIRE(attr.value_size > 0); |
| |
| const char * custom_value = "custom value"; |
| |
| REQUIRE(add_attribute(ffr, "custom attribute", (void*)custom_value, strlen(custom_value)) == 0); |
| |
| char prop_value[20]; |
| |
| REQUIRE(get_property(ctx, "Some test propery", prop_value, 20) == 0); |
| |
| REQUIRE(strncmp("test value", prop_value, strlen(prop_value)) == 0); |
| |
| transfer_to_relationship(ffr, ps, SUCCESS_RELATIONSHIP); |
| |
| free_flowfile(ffr); |
| free(buffer); |
| } |
| |
| std::string create_testfile_for_getfile(const char* sourcedir, const std::string& filename = test_file_name) { |
| std::fstream file; |
| std::stringstream ss; |
| ss << sourcedir << "/" << filename; |
| file.open(ss.str(), std::ios::out); |
| file << test_file_content; |
| file.close(); |
| return ss.str(); |
| } |
| |
| TEST_CASE("Test Creation of instance, one processor", "[createInstanceAndFlow]") { |
| auto instance = create_instance_obj(); |
| REQUIRE(instance != nullptr); |
| flow *test_flow = create_new_flow(instance); |
| REQUIRE(test_flow != nullptr); |
| processor *test_proc = add_processor(test_flow, "GenerateFlowFile"); |
| REQUIRE(test_proc != nullptr); |
| free_flow(test_flow); |
| free_instance(instance); |
| } |
| |
| TEST_CASE("Invalid processor returns null", "[addInvalidProcessor]") { |
| auto instance = create_instance_obj(); |
| REQUIRE(instance != nullptr); |
| flow *test_flow = create_new_flow(instance); |
| processor *test_proc = add_processor(test_flow, "NeverExisted"); |
| REQUIRE(test_proc == nullptr); |
| processor *no_proc = add_processor(test_flow, ""); |
| REQUIRE(no_proc == nullptr); |
| free_flow(test_flow); |
| free_instance(instance); |
| } |
| |
| TEST_CASE("Set valid and invalid properties", "[setProcesssorProperties]") { |
| auto instance = create_instance_obj(); |
| REQUIRE(instance != nullptr); |
| flow *test_flow = create_new_flow(instance); |
| REQUIRE(test_flow != nullptr); |
| processor *test_proc = add_processor(test_flow, "GenerateFlowFile"); |
| REQUIRE(test_proc != nullptr); |
| REQUIRE(set_property(test_proc, "Data Format", "Text") == 0); // Valid value |
| // TODO(aboda): add this two below when property handling is made strictly typed |
| // REQUIRE(set_property(test_proc, "Data Format", "InvalidFormat") != 0); // Invalid value |
| // REQUIRE(set_property(test_proc, "Invalid Attribute", "Blah") != 0); // Invalid attribute |
| REQUIRE(set_property(test_proc, "Data Format", nullptr) != 0); // Empty value |
| REQUIRE(set_property(test_proc, nullptr, "Blah") != 0); // Empty attribute |
| REQUIRE(set_property(nullptr, "Invalid Attribute", "Blah") != 0); // Invalid processor |
| free_flow(test_flow); |
| free_instance(instance); |
| } |
| |
| TEST_CASE("get file and put file", "[getAndPutFile]") { |
| TestController testController; |
| |
| char src_format[] = "/tmp/gt.XXXXXX"; |
| char put_format[] = "/tmp/pt.XXXXXX"; |
| auto sourcedir = testController.createTempDirectory(src_format); |
| auto putfiledir = testController.createTempDirectory(put_format); |
| auto instance = create_instance_obj(); |
| REQUIRE(instance != nullptr); |
| flow *test_flow = create_new_flow(instance); |
| REQUIRE(test_flow != nullptr); |
| processor *get_proc = add_processor(test_flow, "GetFile"); |
| REQUIRE(get_proc != nullptr); |
| processor *put_proc = add_processor(test_flow, "PutFile"); |
| REQUIRE(put_proc != nullptr); |
| REQUIRE(set_property(get_proc, "Input Directory", sourcedir.c_str()) == 0); |
| REQUIRE(set_property(put_proc, "Directory", putfiledir.c_str()) == 0); |
| |
| create_testfile_for_getfile(sourcedir.c_str()); |
| |
| flow_file_record *record = get_next_flow_file(instance, test_flow); |
| REQUIRE(record != nullptr); |
| |
| std::stringstream ss; |
| |
| ss << putfiledir << "/" << test_file_name; |
| std::ifstream t(ss.str()); |
| std::string put_data((std::istreambuf_iterator<char>(t)), std::istreambuf_iterator<char>()); |
| |
| REQUIRE(test_file_content == put_data); |
| |
| // No failure handler can be added after the flow is finalized |
| REQUIRE(add_failure_callback(test_flow, failure_counter) == 1); |
| |
| uint8_t* content = (uint8_t*)malloc(record->size* sizeof(uint8_t)); |
| |
| REQUIRE(get_content(record, content, record->size) == record->size); |
| |
| REQUIRE(test_file_content == std::string(reinterpret_cast<char*>(content), record->size)); |
| |
| free(content); |
| |
| free_flowfile(record); |
| |
| free_flow(test_flow); |
| |
| free_instance(instance); |
| } |
| |
| TEST_CASE("Test manipulation of attributes", "[testAttributes]") { |
| TestController testController; |
| |
| char src_format[] = "/tmp/gt.XXXXXX"; |
| auto sourcedir = testController.createTempDirectory(src_format); |
| |
| create_testfile_for_getfile(sourcedir.c_str()); |
| |
| auto instance = create_instance_obj(); |
| REQUIRE(instance != nullptr); |
| flow *test_flow = create_new_flow(instance); |
| REQUIRE(test_flow != nullptr); |
| |
| processor *get_proc = add_processor(test_flow, "GetFile"); |
| REQUIRE(get_proc != nullptr); |
| REQUIRE(set_property(get_proc, "Input Directory", sourcedir.c_str()) == 0); |
| processor *extract_test = add_processor(test_flow, "ExtractText"); |
| REQUIRE(extract_test != nullptr); |
| REQUIRE(set_property(extract_test, "Attribute", "TestAttr") == 0); |
| processor *update_attr = add_processor(test_flow, "UpdateAttribute"); |
| REQUIRE(update_attr != nullptr); |
| |
| REQUIRE(set_property(update_attr, "UpdatedAttribute", "UpdatedValue") == 0); |
| |
| flow_file_record *record = get_next_flow_file(instance, test_flow); |
| |
| REQUIRE(record != nullptr); |
| |
| attribute test_attr; |
| test_attr.key = "TestAttr"; |
| REQUIRE(get_attribute(record, &test_attr) == 0); |
| |
| REQUIRE(test_attr.value_size != 0); |
| REQUIRE(test_attr.value != nullptr); |
| |
| std::string attr_value(static_cast<char*>(test_attr.value), test_attr.value_size); |
| |
| REQUIRE(attr_value == test_file_content); |
| |
| const char * new_testattr_value = "S0me t3st t3xt"; |
| |
| // Attribute already exist, should fail |
| REQUIRE(add_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value)) != 0); // NOLINT |
| |
| // Update overwrites values |
| update_attribute(record, test_attr.key, (void*) new_testattr_value, strlen(new_testattr_value)); // NOLINT |
| |
| int attr_size = get_attribute_quantity(record); |
| REQUIRE(attr_size > 0); |
| |
| attribute_set attr_set; |
| attr_set.size = attr_size; |
| attr_set.attributes = (attribute*) malloc(attr_set.size * sizeof(attribute)); // NOLINT |
| |
| REQUIRE(get_all_attributes(record, &attr_set) == attr_set.size); |
| |
| bool test_attr_found = false; |
| bool updated_attr_found = false; |
| for (int i = 0; i < attr_set.size; ++i) { |
| if (strcmp(attr_set.attributes[i].key, test_attr.key) == 0) { |
| test_attr_found = true; |
| REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == new_testattr_value); |
| } else if (strcmp(attr_set.attributes[i].key, "UpdatedAttribute") == 0) { |
| updated_attr_found = true; |
| REQUIRE(std::string(static_cast<char*>(attr_set.attributes[i].value), attr_set.attributes[i].value_size) == "UpdatedValue"); |
| } |
| } |
| REQUIRE(updated_attr_found == true); |
| REQUIRE(test_attr_found == true); |
| |
| free_flowfile(record); |
| |
| free_flow(test_flow); |
| free_instance(instance); |
| } |
| |
| TEST_CASE("Test error handling callback", "[errorHandling]") { |
| TestController testController; |
| char src_format[] = "/tmp/gt.XXXXXX"; |
| auto sourcedir = testController.createTempDirectory(src_format); |
| |
| auto instance = create_instance_obj(); |
| REQUIRE(instance != nullptr); |
| flow *test_flow = create_new_flow(instance); |
| REQUIRE(test_flow != nullptr); |
| |
| // Failure strategy cannot be set before a valid callback is added |
| REQUIRE(set_failure_strategy(test_flow, FailureStrategy::AS_IS) != 0); |
| REQUIRE(add_failure_callback(test_flow, failure_counter) == 0); |
| |
| processor *get_proc = add_processor(test_flow, "GetFile"); |
| REQUIRE(get_proc != nullptr); |
| processor *put_proc = add_processor(test_flow, "PutFile"); |
| REQUIRE(put_proc != nullptr); |
| REQUIRE(set_property(get_proc, "Input Directory", sourcedir.c_str()) == 0); |
| REQUIRE(set_property(put_proc, "Directory", "/tmp/never_existed") == 0); |
| REQUIRE(set_property(put_proc, "Create Missing Directories", "false") == 0); |
| |
| create_testfile_for_getfile(sourcedir.c_str()); |
| |
| |
| REQUIRE(get_next_flow_file(instance, test_flow) == nullptr); |
| |
| REQUIRE(failure_count == 1); |
| |
| // Failure handler function can be replaced runtime |
| REQUIRE(add_failure_callback(test_flow, big_failure_counter) == 0); |
| REQUIRE(set_failure_strategy(test_flow, FailureStrategy::ROLLBACK) == 0); |
| |
| // Create new testfile to trigger failure again |
| create_testfile_for_getfile(sourcedir.c_str(), test_file_name + "2"); |
| |
| REQUIRE(get_next_flow_file(instance, test_flow) == nullptr); |
| REQUIRE(failure_count > 100); |
| |
| failure_count = 0; |
| |
| free_flow(test_flow); |
| free_instance(instance); |
| } |
| |
| TEST_CASE("Test standalone processors", "[testStandalone]") { |
| TestController testController; |
| |
| char src_format[] = "/tmp/gt.XXXXXX"; |
| auto sourcedir = testController.createTempDirectory(src_format); |
| |
| create_testfile_for_getfile(sourcedir.c_str()); |
| |
| standalone_processor* getfile_proc = create_processor("GetFile", NULL); |
| REQUIRE(set_standalone_property(getfile_proc, "Input Directory", sourcedir.c_str()) == 0); |
| |
| flow_file_record* ffr = invoke(getfile_proc); |
| |
| REQUIRE(ffr != nullptr); |
| REQUIRE(get_attribute_quantity(ffr) > 0); |
| |
| standalone_processor* extract_test = create_processor("ExtractText", NULL); |
| REQUIRE(extract_test != nullptr); |
| REQUIRE(set_standalone_property(extract_test, "Attribute", "TestAttr") == 0); |
| |
| flow_file_record* ffr2 = invoke_ff(extract_test, ffr); |
| |
| free_flowfile(ffr); |
| |
| // Verify the transfer of attributes |
| REQUIRE(ffr2 != nullptr); |
| REQUIRE(get_attribute_quantity(ffr2) > 0); |
| |
| char filename_key[] = "filename"; |
| attribute attr; |
| attr.key = filename_key; |
| attr.value_size = 0; |
| |
| REQUIRE(get_attribute(ffr2, &attr) == 0); |
| REQUIRE(attr.value_size > 0); |
| |
| // Verify extracttext behavior |
| char test_attr[] = "TestAttr"; |
| attr.key = test_attr; |
| attr.value_size = 0; |
| REQUIRE(get_attribute(ffr2, &attr) == 0); |
| REQUIRE(std::string(static_cast<char*>(attr.value), attr.value_size) == test_file_content); |
| |
| free_flowfile(ffr2); |
| free_standalone_processor(getfile_proc); |
| } |
| |
| TEST_CASE("Test interaction of flow and standlone processors", "[testStandaloneWithFlow]") { |
| TestController testController; |
| |
| char src_format[] = "/tmp/gt.XXXXXX"; |
| char put_format[] = "/tmp/pt.XXXXXX"; |
| auto sourcedir = testController.createTempDirectory(src_format); |
| auto putfiledir = testController.createTempDirectory(put_format); |
| |
| create_testfile_for_getfile(sourcedir.c_str()); |
| |
| auto instance = create_instance_obj(); |
| REQUIRE(instance != nullptr); |
| flow *test_flow = create_new_flow(instance); |
| REQUIRE(test_flow != nullptr); |
| |
| processor *get_proc = add_processor(test_flow, "GetFile"); |
| REQUIRE(get_proc != nullptr); |
| REQUIRE(set_property(get_proc, "Input Directory", sourcedir.c_str()) == 0); |
| |
| flow_file_record *record = get_next_flow_file(instance, test_flow); |
| REQUIRE(record != nullptr); |
| |
| standalone_processor* putfile_proc = create_processor("PutFile", NULL); |
| REQUIRE(set_standalone_property(putfile_proc, "Directory", putfiledir.c_str()) == 0); |
| |
| flow_file_record* put_record = invoke_ff(putfile_proc, record); |
| REQUIRE(put_record != nullptr); |
| |
| free_flowfile(record); |
| free_flowfile(put_record); |
| |
| std::stringstream ss; |
| |
| ss << putfiledir << "/" << test_file_name; |
| std::ifstream t(ss.str()); |
| std::string put_data((std::istreambuf_iterator<char>(t)), std::istreambuf_iterator<char>()); |
| |
| REQUIRE(test_file_content == put_data); |
| |
| free_flow(test_flow); |
| free_instance(instance); |
| free_standalone_processor(putfile_proc); |
| } |
| |
| TEST_CASE("Test standalone processors with file input", "[testStandaloneWithFile]") { |
| TestController testController; |
| |
| enable_logging(); |
| char src_format[] = "/tmp/gt.XXXXXX"; |
| auto sourcedir = testController.createTempDirectory(src_format); |
| std::string path = create_testfile_for_getfile(sourcedir.c_str()); |
| |
| standalone_processor* extract_test = create_processor("ExtractText", NULL); |
| REQUIRE(extract_test != nullptr); |
| REQUIRE(set_standalone_property(extract_test, "Attribute", "TestAttr") == 0); |
| |
| flow_file_record* ffr = invoke_file(extract_test, path.c_str()); |
| |
| REQUIRE(ffr != nullptr); |
| |
| attribute attr; |
| char test_attr[] = "TestAttr"; |
| attr.key = test_attr; |
| attr.value_size = 0; |
| REQUIRE(get_attribute(ffr, &attr) == 0); |
| REQUIRE(std::string(static_cast<char*>(attr.value), attr.value_size) == test_file_content); |
| |
| free_flowfile(ffr); |
| free_standalone_processor(extract_test); |
| } |
| |
| TEST_CASE("Test custom processor", "[TestCutomProcessor]") { |
| TestController testController; |
| |
| char src_format[] = "/tmp/gt.XXXXXX"; |
| auto sourcedir = testController.createTempDirectory(src_format); |
| |
| create_testfile_for_getfile(sourcedir.c_str()); |
| |
| add_custom_processor("myproc", custom_ontrigger_logic, custom_onschedule_logic); |
| |
| auto instance = create_instance_obj(); |
| REQUIRE(instance != nullptr); |
| flow *test_flow = create_new_flow(instance); |
| REQUIRE(test_flow != nullptr); |
| |
| processor *get_proc = add_processor(test_flow, "GetFile"); |
| REQUIRE(get_proc != nullptr); |
| |
| REQUIRE(set_property(get_proc, "Input Directory", sourcedir.c_str()) == 0); |
| |
| processor *my_proc = add_processor(test_flow, "myproc"); |
| REQUIRE(my_proc != nullptr); |
| |
| REQUIRE(set_property(my_proc, "Some test propery", "test value") == 0); |
| |
| flow_file_record *record = get_next_flow_file(instance, test_flow); |
| |
| REQUIRE(custom_onschedule_count > 0); |
| |
| REQUIRE(record != nullptr); |
| } |
| |
| TEST_CASE("C API robustness test", "[TestRobustness]") { |
| free_flow(nullptr); |
| free_standalone_processor(nullptr); |
| free_instance(nullptr); |
| |
| REQUIRE(create_processor(nullptr, nullptr) == nullptr); |
| |
| standalone_processor *standalone_proc = create_processor("GetFile", NULL); |
| REQUIRE(standalone_proc != nullptr); |
| |
| REQUIRE(set_property(nullptr, "prop_name", "prop_value") == -1); |
| |
| REQUIRE(set_standalone_property(standalone_proc, nullptr, "prop_value") == -1); |
| |
| REQUIRE(set_standalone_property(standalone_proc, "prop_name", nullptr) == -1); |
| |
| free_standalone_processor(standalone_proc); |
| |
| const char *file_path = "path/to/file"; |
| |
| flow_file_record *ffr = create_ff_object_na(file_path, strlen(file_path), 0); |
| |
| const char *custom_value = "custom value"; |
| |
| REQUIRE(add_attribute(nullptr, "custom attribute", (void *) custom_value, strlen(custom_value)) == -1); |
| |
| REQUIRE(add_attribute(ffr, "custom attribute", (void *) custom_value, strlen(custom_value)) == -1); |
| |
| REQUIRE(add_attribute(ffr, nullptr, (void *) custom_value, strlen(custom_value)) == -1); |
| |
| REQUIRE(add_attribute(ffr, "custom attribute", nullptr, 0) == -1); |
| |
| update_attribute(nullptr, "custom attribute", (void *) custom_value, strlen(custom_value)); |
| |
| update_attribute(ffr, nullptr, (void *) custom_value, strlen(custom_value)); |
| |
| attribute attr; |
| attr.key = "filename"; |
| attr.value_size = 0; |
| REQUIRE(get_attribute(ffr, &attr) == -1); |
| |
| REQUIRE(get_attribute(nullptr, &attr) == -1); |
| |
| REQUIRE(get_attribute(ffr, nullptr) == -1); |
| |
| REQUIRE(get_attribute_quantity(nullptr) == 0); |
| |
| REQUIRE(get_attribute_quantity(ffr) == 0); |
| |
| attribute_set attr_set; |
| attr_set.size = 3; |
| attr_set.attributes = (attribute *) malloc(attr_set.size * sizeof(attribute)); // NOLINT |
| |
| REQUIRE(get_all_attributes(nullptr, &attr_set) == 0); |
| |
| REQUIRE(get_all_attributes(ffr, &attr_set) == 0); |
| |
| REQUIRE(get_all_attributes(ffr, nullptr) == 0); |
| |
| free(attr_set.attributes); |
| |
| REQUIRE(remove_attribute(nullptr, "key") == -1); |
| |
| REQUIRE(remove_attribute(ffr, "key") == -1); |
| |
| REQUIRE(remove_attribute(ffr, nullptr) == -1); |
| |
| auto instance = create_instance_obj(); |
| |
| REQUIRE(transmit_flowfile(nullptr, instance) == -1); |
| |
| REQUIRE(transmit_flowfile(ffr, nullptr) == -1); |
| |
| REQUIRE(create_new_flow(nullptr) == nullptr); |
| |
| flow *test_flow = create_new_flow(instance); |
| |
| REQUIRE(test_flow != nullptr); |
| |
| REQUIRE(add_processor(nullptr, "GetFile") == nullptr); |
| |
| REQUIRE(add_processor(test_flow, nullptr) == nullptr); |
| |
| free_flow(test_flow); |
| |
| free_instance(instance); |
| } |