MINIFICPP-1183 - Cleanup C2 Update tests
diff --git a/extensions/coap/tests/CMakeLists.txt b/extensions/coap/tests/CMakeLists.txt
index 9e7e13b..df4b266 100644
--- a/extensions/coap/tests/CMakeLists.txt
+++ b/extensions/coap/tests/CMakeLists.txt
@@ -17,13 +17,12 @@
# under the License.
#
-file(GLOB CURL_UNIT_TESTS "unit/*.cpp")
-file(GLOB CURL_INTEGRATION_TESTS "*.cpp")
+file(GLOB COAP_INTEGRATION_TESTS "*.cpp")
SET(CURL_INT_TEST_COUNT 0)
if (NOT DISABLE_CURL)
-FOREACH(testfile ${CURL_INTEGRATION_TESTS})
+FOREACH(testfile ${COAP_INTEGRATION_TESTS})
get_filename_component(testfilename "${testfile}" NAME_WE)
add_executable("${testfilename}" "${testfile}")
target_include_directories(${testfilename} BEFORE PRIVATE "${CMAKE_SOURCE_DIR}/extensions/standard-processors")
diff --git a/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp b/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
index a0e9cd3..e87819a 100644
--- a/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
+++ b/extensions/coap/tests/CoapC2VerifyHeartbeat.cpp
@@ -54,25 +54,6 @@
#include "io/BaseStream.h"
#include "concurrentqueue.h"
-class Responder : public CivetHandler {
- public:
- explicit Responder(bool isSecure)
- : isSecure(isSecure) {
- }
- bool handlePost(CivetServer *server, struct mg_connection *conn) {
- std::string resp = "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"name\" : \"invoke\" }, "
- "{ \"operationid\" : 42, \"operation\" : \"stop\", \"name\" : \"FlowController\" } ]}";
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- resp.length());
- mg_printf(conn, "%s", resp.c_str());
- return true;
- }
-
- protected:
- bool isSecure;
-};
-
class VerifyCoAPServer : public CoapIntegrationBase {
public:
explicit VerifyCoAPServer(bool isSecure)
@@ -198,22 +179,11 @@
};
int main(int argc, char **argv) {
- std::string key_dir, test_file_location, url;
- if (argc > 1) {
- test_file_location = argv[1];
- key_dir = argv[2];
- }
-
- bool isSecure = false;
- if (url.find("https") != std::string::npos) {
- isSecure = true;
- }
+ const cmd_args args = parse_cmdline_args(argc, argv);
+ const bool isSecure = args.isUrlSecure();
VerifyCoAPServer harness(isSecure);
-
- harness.setKeyDir(key_dir);
-
- harness.run(test_file_location);
-
+ harness.setKeyDir(args.key_dir);
+ harness.run(args.test_file);
return 0;
}
diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h
index bd00151..ae9a430 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -27,8 +27,7 @@
return 1;
}
-int ssl_enable(void *ssl_context, void *user_data) {
- struct ssl_ctx_st *ctx = (struct ssl_ctx_st *) ssl_context;
+int ssl_enable(void *, void *) {
return 0;
}
diff --git a/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp b/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
index 9bd7573..b555983 100644
--- a/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
+++ b/extensions/http-curl/tests/C2DescribeCoreComponentStateTest.cpp
@@ -27,8 +27,7 @@
class VerifyC2DescribeCoreComponentState : public VerifyC2Describe {
public:
- explicit VerifyC2DescribeCoreComponentState(bool isSecure)
- : VerifyC2Describe(isSecure) {
+ VerifyC2DescribeCoreComponentState() {
char format[] = "/var/tmp/ssth.XXXXXX";
temp_dir_ = testController.createTempDirectory(format);
@@ -58,16 +57,11 @@
class DescribeCoreComponentStateHandler: public HeartbeatHandler {
public:
-
- explicit DescribeCoreComponentStateHandler(bool isSecure)
- : HeartbeatHandler(isSecure) {
- }
-
- virtual void handleHeartbeat(const rapidjson::Document&, struct mg_connection * conn) {
+ void handleHeartbeat(const rapidjson::Document&, struct mg_connection * conn) override {
sendHeartbeatResponse("DESCRIBE", "corecomponentstate", "889345", conn);
}
- virtual void handleAcknowledge(const rapidjson::Document& root) {
+ void handleAcknowledge(const rapidjson::Document& root) override {
assert(root.HasMember("corecomponentstate"));
auto assertExpectedTailFileState = [&](const char* uuid, const char* name, const char* position) {
@@ -87,30 +81,11 @@
};
int main(int argc, char **argv) {
- std::string key_dir, test_file_location, url;
- url = "http://localhost:0/api/heartbeat";
- if (argc > 1) {
- test_file_location = argv[1];
- if (argc > 2) {
- url = "https://localhost:0/api/heartbeat";
- key_dir = argv[2];
- }
- }
-
- bool isSecure = false;
- if (url.find("https") != std::string::npos) {
- isSecure = true;
- }
-
- VerifyC2DescribeCoreComponentState harness(isSecure);
-
- harness.setKeyDir(key_dir);
-
- DescribeCoreComponentStateHandler responder(isSecure);
-
- harness.setUrl(url, &responder);
-
- harness.run(test_file_location);
-
+ const cmd_args args = parse_cmdline_args(argc, argv, "api/heartbeat");
+ VerifyC2DescribeCoreComponentState harness;
+ harness.setKeyDir(args.key_dir);
+ DescribeCoreComponentStateHandler handler;
+ harness.setUrl(args.url, &handler);
+ harness.run(args.test_file);
return 0;
}
diff --git a/extensions/http-curl/tests/C2DescribeManifestTest.cpp b/extensions/http-curl/tests/C2DescribeManifestTest.cpp
index 891f6f5..dee853d 100644
--- a/extensions/http-curl/tests/C2DescribeManifestTest.cpp
+++ b/extensions/http-curl/tests/C2DescribeManifestTest.cpp
@@ -24,42 +24,20 @@
class DescribeManifestHandler: public HeartbeatHandler {
public:
-
- explicit DescribeManifestHandler(bool isSecure)
- : HeartbeatHandler(isSecure) {
- }
-
- virtual void handleHeartbeat(const rapidjson::Document&, struct mg_connection * conn) {
+ void handleHeartbeat(const rapidjson::Document&, struct mg_connection * conn) override {
sendHeartbeatResponse("DESCRIBE", "manifest", "889345", conn);
}
- virtual void handleAcknowledge(const rapidjson::Document& root) {
+ void handleAcknowledge(const rapidjson::Document& root) override {
verifyJsonHasAgentManifest(root);
}
};
+
int main(int argc, char **argv) {
- std::string key_dir, test_file_location, url;
- url = "http://localhost:0/api/heartbeat";
- if (argc > 1) {
- test_file_location = argv[1];
- if (argc > 2) {
- url = "https://localhost:0/api/heartbeat";
- key_dir = argv[2];
- }
- }
-
- bool isSecure = false;
- if (url.find("https") != std::string::npos) {
- isSecure = true;
- }
-
- VerifyC2Describe harness(isSecure);
-
- harness.setKeyDir(key_dir);
-
- DescribeManifestHandler responder(isSecure);
-
- harness.setUrl(url, &responder);
-
- harness.run(test_file_location);
+ const cmd_args args = parse_cmdline_args(argc, argv, "heartbeat");
+ VerifyC2Describe harness;
+ harness.setKeyDir(args.key_dir);
+ DescribeManifestHandler responder;
+ harness.setUrl(args.url, &responder);
+ harness.run(args.test_file);
}
diff --git a/extensions/http-curl/tests/C2FailedUpdateTest.cpp b/extensions/http-curl/tests/C2FailedUpdateTest.cpp
index 35458ec..cc18b3f 100644
--- a/extensions/http-curl/tests/C2FailedUpdateTest.cpp
+++ b/extensions/http-curl/tests/C2FailedUpdateTest.cpp
@@ -16,200 +16,17 @@
* limitations under the License.
*/
-#include <sys/stat.h>
#undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
-#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
-#include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "c2/C2Agent.h"
-#include "CivetServer.h"
-#include <cstring>
-#include "protocols/RESTSender.h"
-#include "utils/file/FileUtils.h"
-
-void waitToVerifyProcessor() {
- std::this_thread::sleep_for(std::chrono::seconds(10));
-}
-
-static std::vector<std::string> responses;
-
-class ConfigHandler : public CivetHandler {
- public:
- ConfigHandler() {
- calls_ = 0;
- }
- virtual bool handlePost(CivetServer *server, struct mg_connection *conn) override {
- calls_++;
- const struct mg_request_info *req_info = mg_get_request_info(conn);
- long long remainlen;
- long long readlen = 0;
- long long contentlen = req_info->content_length;
- char buf[1024];
-
- std::string data;
- while (readlen < contentlen) {
- remainlen = contentlen - readlen;
- if (remainlen > sizeof(buf)) {
- remainlen = sizeof(buf);
- }
- remainlen = mg_read(conn, buf, (size_t) remainlen);
- if (remainlen <= 0) {
- break;
- }
- readlen += remainlen;
- data += std::string(buf, remainlen);
- }
- if (data.find("operationState") != std::string::npos) {
- assert(data.find("state\": \"NOT_APPLIED") != std::string::npos);
- }
-
- if (responses.size() > 0) {
- std::string top_str = responses.back();
- responses.pop_back();
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- top_str.length());
- mg_printf(conn, "%s", top_str.c_str());
- } else {
- mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
- }
-
- return true;
- }
-
- virtual bool handleGet(CivetServer *server, struct mg_connection *conn) override {
- std::ifstream myfile(test_file_location_.c_str());
-
- if (myfile.is_open()) {
- std::stringstream buffer;
- buffer << myfile.rdbuf();
- std::string str = buffer.str();
- myfile.close();
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- str.length());
- mg_printf(conn, "%s", str.c_str());
- } else {
- mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
- }
-
- return true;
- }
- std::string test_file_location_;
- std::string base_location_;
- std::atomic<size_t> calls_;
-};
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
int main(int argc, char **argv) {
- mg_init_library(0);
- LogTestController::getInstance().setInfo<minifi::FlowController>();
- LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
- LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
- LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
-
- const char *options[] = { "document_root", ".", "listening_ports", "0", 0 };
- std::vector<std::string> cpp_options;
- for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
- cpp_options.push_back(options[i]);
- }
-
- CivetServer server(cpp_options);
-
- std::string port_str = std::to_string(server.getListeningPorts()[0]);
-
- ConfigHandler h_ex;
- server.addHandler("/update", h_ex);
- std::string key_dir, test_file_location;
- if (argc > 1) {
- h_ex.base_location_ = test_file_location = argv[1];
- h_ex.test_file_location_ = argv[2];
- key_dir = argv[3];
- }
- std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
- "\"operation\" : \"update\", "
- "\"operationid\" : \"8675309\", "
- "\"name\": \"configuration\""
- "}]}";
-
- responses.push_back(heartbeat_response);
-
- std::ifstream myfile(test_file_location.c_str());
-
- std::string c2_rest_url = "http://localhost:" + port_str + "/update";
-
- if (myfile.is_open()) {
- std::stringstream buffer;
- buffer << myfile.rdbuf();
- std::string str = buffer.str();
- myfile.close();
- std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
- "\"operation\" : \"update\", "
- "\"operationid\" : \"8675309\", "
- "\"name\": \"configuration\", \"content\": { \"location\": \"" + c2_rest_url + "\"}}]}";
- responses.push_back(response);
- }
-
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-
- configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
- configuration->set("nifi.c2.enable", "true");
- configuration->set("nifi.c2.agent.class", "test");
- configuration->set("nifi.c2.rest.url", c2_rest_url);
- configuration->set("nifi.c2.rest.url.ack", c2_rest_url);
- configuration->set("nifi.c2.agent.heartbeat.period", "1000");
- utils::file::FileUtils::create_dir("content_repository");
-
- std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
- std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
-
- configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
-
- std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
- new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
- std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
-
- std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
- true);
-
- core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
-
- std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
- std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
- ptr.release();
-
- controller->load();
- controller->start();
- waitToVerifyProcessor();
-
- controller->waitUnload(60000);
-
- std::string logs = LogTestController::getInstance().log_output.str();
- assert(logs.find("Invalid configuration payload") != std::string::npos);
- assert(logs.find("update failed.") != std::string::npos);
- LogTestController::getInstance().reset();
- utils::file::FileUtils::delete_dir("content_repository",true);
-
+ const cmd_args args = parse_cmdline_args(argc, argv, "update");
+ C2FailedUpdateHandler handler(args.bad_test_file);
+ VerifyC2FailedUpdate harness(10000);
+ harness.setKeyDir(args.key_dir);
+ harness.setUrl(args.url, &handler);
+ handler.setC2RestResponse(harness.getC2RestUrl(), "configuration");
+ harness.run(args.test_file);
return 0;
}
diff --git a/extensions/http-curl/tests/C2JstackTest.cpp b/extensions/http-curl/tests/C2JstackTest.cpp
index e530444..77a315f 100644
--- a/extensions/http-curl/tests/C2JstackTest.cpp
+++ b/extensions/http-curl/tests/C2JstackTest.cpp
@@ -24,55 +24,28 @@
class VerifyC2DescribeJstack : public VerifyC2Describe {
public:
- explicit VerifyC2DescribeJstack(bool isSecure)
- : VerifyC2Describe(isSecure) {
- }
-
- virtual void runAssertions() {
+ void runAssertions() override {
assert(LogTestController::getInstance().contains("SchedulingAgent"));
}
};
class DescribeJstackHandler : public HeartbeatHandler {
public:
- explicit DescribeJstackHandler(bool isSecure)
- : HeartbeatHandler(isSecure) {
- }
-
- virtual void handleHeartbeat(const rapidjson::Document&, struct mg_connection * conn) {
+ void handleHeartbeat(const rapidjson::Document&, struct mg_connection * conn) override {
sendHeartbeatResponse("DESCRIBE", "jstack", "889398", conn);
}
- virtual void handleAcknowledge(const rapidjson::Document& root) {
+ void handleAcknowledge(const rapidjson::Document& root) override {
assert(root.HasMember("Flowcontroller threadpool #0"));
}
-
};
int main(int argc, char **argv) {
- std::string key_dir, test_file_location, url;
- url = "http://localhost:0/api/heartbeat";
- if (argc > 1) {
- test_file_location = argv[1];
- if (argc > 2) {
- url = "https://localhost:0/api/heartbeat";
- key_dir = argv[2];
- }
- }
-
- bool isSecure = false;
- if (url.find("https") != std::string::npos) {
- isSecure = true;
- }
-
- VerifyC2DescribeJstack harness(isSecure);
-
- harness.setKeyDir(key_dir);
-
- DescribeJstackHandler responder(isSecure);
-
- harness.setUrl(url, &responder);
-
- harness.run(test_file_location);
-
+ const cmd_args args = parse_cmdline_args(argc, argv, "heartbeat");
+ VerifyC2DescribeJstack harness;
+ harness.setKeyDir(args.key_dir);
+ DescribeJstackHandler responder;
+ harness.setUrl(args.url, &responder);
+ harness.run(args.test_file);
+ return 0;
}
diff --git a/extensions/http-curl/tests/C2NullConfiguration.cpp b/extensions/http-curl/tests/C2NullConfiguration.cpp
index ea511a8..b698c45 100644
--- a/extensions/http-curl/tests/C2NullConfiguration.cpp
+++ b/extensions/http-curl/tests/C2NullConfiguration.cpp
@@ -16,38 +16,17 @@
* limitations under the License.
*/
-#include <sys/stat.h>
#undef NDEBUG
#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
#include <memory>
#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
#include "InvokeHTTP.h"
#include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
#include "core/logging/Logger.h"
#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "CivetServer.h"
-#include "RemoteProcessorGroupPort.h"
-#include "core/ConfigurableComponent.h"
-#include "controllers/SSLContextService.h"
#include "TestServer.h"
#include "protocols/RESTReceiver.h"
-#include "protocols/RESTSender.h"
#include "c2/C2Agent.h"
#include "processors/LogAttribute.h"
#include "HTTPIntegrationBase.h"
@@ -60,7 +39,7 @@
dir = testController.createTempDirectory(format);
}
- void testSetup() {
+ void testSetup() override {
LogTestController::getInstance().setDebug<utils::HTTPClient>();
LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
@@ -74,23 +53,22 @@
file.close();
}
- void cleanup() {
- unlink(ss.str().c_str());
+ void cleanup() override {
}
- void runAssertions() {
- assert(LogTestController::getInstance().contains("C2Agent] [debug] Could not instantiate null") == true);
- assert(LogTestController::getInstance().contains("Class is RESTSender") == true);
+ void runAssertions() override {
+ assert(LogTestController::getInstance().contains("C2Agent] [debug] Could not instantiate null"));
+ assert(LogTestController::getInstance().contains("Class is RESTSender"));
}
- void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
+ void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) override {
std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
assert(proc != nullptr);
std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
assert(inv != nullptr);
- std::string url = "";
+ std::string url;
inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
std::string port, scheme, path;
@@ -114,23 +92,12 @@
};
int main(int argc, char **argv) {
- std::string key_dir, test_file_location, url;
- if (argc > 1) {
- test_file_location = argv[1];
- key_dir = argv[2];
- }
-
- bool isSecure = false;
- if (url.find("https") != std::string::npos) {
- isSecure = true;
- }
+ const cmd_args args = parse_cmdline_args(argc, argv);
+ const bool isSecure = args.isUrlSecure();
VerifyC2Server harness(isSecure);
-
- harness.setKeyDir(key_dir);
-
- harness.run(test_file_location);
-
+ harness.setKeyDir(args.key_dir);
+ harness.run(args.test_file);
return 0;
}
diff --git a/extensions/http-curl/tests/C2UpdateAgentTest.cpp b/extensions/http-curl/tests/C2UpdateAgentTest.cpp
index ba47e8c..53ce3d7 100644
--- a/extensions/http-curl/tests/C2UpdateAgentTest.cpp
+++ b/extensions/http-curl/tests/C2UpdateAgentTest.cpp
@@ -16,178 +16,24 @@
* limitations under the License.
*/
-#include <sys/stat.h>
#undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
-#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
-#include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "c2/C2Agent.h"
-#include "CivetServer.h"
-#include <cstring>
-#include "protocols/RESTSender.h"
-
-void waitToVerifyProcessor() {
- std::this_thread::sleep_for(std::chrono::seconds(10));
-}
-
-static std::vector<std::string> responses;
-
-class ConfigHandler : public CivetHandler {
- public:
- ConfigHandler() {
- calls_ = 0;
- }
- bool handlePost(CivetServer *server, struct mg_connection *conn) {
- calls_++;
- if (responses.size() > 0) {
- std::string top_str = responses.back();
- responses.pop_back();
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- top_str.length());
- mg_printf(conn, "%s", top_str.c_str());
- } else {
- mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
- }
-
- return true;
- }
-
- bool handleGet(CivetServer *server, struct mg_connection *conn) {
- std::ifstream myfile(test_file_location_.c_str());
-
- if (myfile.is_open()) {
- std::stringstream buffer;
- buffer << myfile.rdbuf();
- std::string str = buffer.str();
- myfile.close();
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- str.length());
- mg_printf(conn, "%s", str.c_str());
- } else {
- mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
- }
-
- return true;
- }
- std::string test_file_location_;
- std::atomic<size_t> calls_;
-};
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
int main(int argc, char **argv) {
- mg_init_library(0);
- LogTestController::getInstance().setInfo<minifi::FlowController>();
- LogTestController::getInstance().setTrace<minifi::utils::HTTPClient>();
- LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
- LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+ const cmd_args args = parse_cmdline_args(argc, argv, "update");
+ C2UpdateHandler handler(args.test_file);
+ VerifyC2UpdateAgent harness(10000);
+ harness.setKeyDir(args.key_dir);
+ harness.setUrl(args.url, &handler);
+ handler.setC2RestResponse(harness.getC2RestUrl(), "agent");
- const char *options[] = { "document_root", ".", "listening_ports", "0", 0 };
- std::vector<std::string> cpp_options;
- for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
- cpp_options.push_back(options[i]);
- }
+ const auto start = std::chrono::system_clock::now();
- CivetServer server(cpp_options);
+ harness.run(args.test_file);
- std::string port_str = std::to_string(server.getListeningPorts()[0]);
-
- ConfigHandler h_ex;
- server.addHandler("/update", h_ex);
- std::string key_dir, test_file_location;
- if (argc > 1) {
- h_ex.test_file_location_ = test_file_location = argv[1];
- key_dir = argv[2];
- }
- std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
- "\"operation\" : \"update\", "
- "\"operationid\" : \"8675309\", "
- "\"name\": \"agent\""
- "}]}";
-
- responses.push_back(heartbeat_response);
-
- std::ifstream myfile(test_file_location.c_str());
-
- std::string c2_rest_url = "http://localhost:" + port_str + "/update";
-
-
- if (myfile.is_open()) {
- std::stringstream buffer;
- buffer << myfile.rdbuf();
- std::string str = buffer.str();
- myfile.close();
- std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
- "\"operation\" : \"update\", "
- "\"operationid\" : \"8675309\", "
- "\"name\": \"agent\", \"content\": { \"location\": \"" + c2_rest_url +"\"}}]}";
- responses.push_back(response);
- }
-
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-
- configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
- configuration->set("nifi.c2.enable", "true");
- configuration->set("nifi.c2.agent.class", "test");
- configuration->set("nifi.c2.agent.update.allow","true");
- configuration->set("nifi.c2.rest.url", c2_rest_url);
- configuration->set("nifi.c2.agent.heartbeat.period", "1000");
-
- std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
- std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
-
- configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
- configuration->set("c2.agent.update.command", "echo \"verification command\"");
-
- std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
- new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
- std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
-
- std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
- true);
-
- core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
-
- std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
- std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
- ptr.release();
- auto start = std::chrono::system_clock::now();
-
- controller->load();
- controller->start();
- waitToVerifyProcessor();
-
- controller->waitUnload(60000);
- auto then = std::chrono::system_clock::now();
-
- auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count();
- std::string logs = LogTestController::getInstance().log_output.str();
- assert(logs.find("removing file") != std::string::npos);
- assert(logs.find("May not have command processor") != std::string::npos);
- LogTestController::getInstance().reset();
- assert(h_ex.calls_ <= (milliseconds / 1000) + 1);
-
+ const auto then = std::chrono::system_clock::now();
+ const auto seconds = std::chrono::duration_cast<std::chrono::seconds>(then - start).count();
+ assert(handler.calls_ <= (seconds) + 1);
return 0;
}
diff --git a/extensions/http-curl/tests/C2UpdateTest.cpp b/extensions/http-curl/tests/C2UpdateTest.cpp
index ea4b151..0475525 100644
--- a/extensions/http-curl/tests/C2UpdateTest.cpp
+++ b/extensions/http-curl/tests/C2UpdateTest.cpp
@@ -16,174 +16,24 @@
* limitations under the License.
*/
-#include <sys/stat.h>
#undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
-#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
-#include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "c2/C2Agent.h"
-#include "CivetServer.h"
-#include <cstring>
-#include "protocols/RESTSender.h"
-
-void waitToVerifyProcessor() {
- std::this_thread::sleep_for(std::chrono::seconds(10));
-}
-
-static std::vector<std::string> responses;
-
-class ConfigHandler : public CivetHandler {
- public:
- ConfigHandler() {
- calls_ = 0;
- }
- bool handlePost(CivetServer *server, struct mg_connection *conn) {
- calls_++;
- if (responses.size() > 0) {
- std::string top_str = responses.back();
- responses.pop_back();
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- top_str.length());
- mg_printf(conn, "%s", top_str.c_str());
- } else {
- mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
- }
-
- return true;
- }
-
- bool handleGet(CivetServer *server, struct mg_connection *conn) {
- std::ifstream myfile(test_file_location_.c_str());
-
- if (myfile.is_open()) {
- std::stringstream buffer;
- buffer << myfile.rdbuf();
- std::string str = buffer.str();
- myfile.close();
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- str.length());
- mg_printf(conn, "%s", str.c_str());
- } else {
- mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
- }
-
- return true;
- }
- std::string test_file_location_;
- std::atomic<size_t> calls_;
-};
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
int main(int argc, char **argv) {
- mg_init_library(0);
- LogTestController::getInstance().setInfo<minifi::FlowController>();
- LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
- LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
- LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
+ const cmd_args args = parse_cmdline_args(argc, argv, "update");
+ C2UpdateHandler handler(args.test_file);
+ VerifyC2Update harness(10000);
+ harness.setKeyDir(args.key_dir);
+ harness.setUrl(args.url, &handler);
+ handler.setC2RestResponse(harness.getC2RestUrl(), "configuration");
- const char *options[] = { "document_root", ".", "listening_ports", "0", 0 };
- std::vector<std::string> cpp_options;
- for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
- cpp_options.push_back(options[i]);
- }
+ const auto start = std::chrono::system_clock::now();
- CivetServer server(cpp_options);
+ harness.run(args.test_file);
- std::string port_str = std::to_string(server.getListeningPorts()[0]);
-
- ConfigHandler h_ex;
- server.addHandler("/update", h_ex);
- std::string key_dir, test_file_location;
- if (argc > 1) {
- h_ex.test_file_location_ = test_file_location = argv[1];
- key_dir = argv[2];
- }
- std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
- "\"operation\" : \"update\", "
- "\"operationid\" : \"8675309\", "
- "\"name\": \"configuration\""
- "}]}";
-
- responses.push_back(heartbeat_response);
-
- std::ifstream myfile(test_file_location.c_str());
-
- std::string c2_rest_url = "http://localhost:" + port_str + "/update";
-
- if (myfile.is_open()) {
- std::stringstream buffer;
- buffer << myfile.rdbuf();
- std::string str = buffer.str();
- myfile.close();
- std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
- "\"operation\" : \"update\", "
- "\"operationid\" : \"8675309\", "
- "\"name\": \"configuration\", \"content\": { \"location\": \"" + c2_rest_url + "\"}}]}";
- responses.push_back(response);
- }
-
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
-
- configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
- configuration->set("nifi.c2.enable", "true");
- configuration->set("nifi.c2.agent.class", "test");
- configuration->set("nifi.c2.rest.url", c2_rest_url);
- configuration->set("nifi.c2.agent.heartbeat.period", "1000");
-
- std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
- std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
-
- configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
-
- std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
- new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
- std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
-
- std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
- true);
-
- core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
-
- std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
- std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
- ptr.release();
- auto start = std::chrono::system_clock::now();
-
- controller->load();
- controller->start();
- waitToVerifyProcessor();
-
- controller->waitUnload(60000);
- auto then = std::chrono::system_clock::now();
-
- auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count();
- std::string logs = LogTestController::getInstance().log_output.str();
- assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version") != std::string::npos);
- LogTestController::getInstance().reset();
- assert(h_ex.calls_ <= (milliseconds / 1000) + 1);
-
+ const auto then = std::chrono::system_clock::now();
+ const auto seconds = std::chrono::duration_cast<std::chrono::seconds>(then - start).count();
+ assert(handler.calls_ <= seconds + 1);
return 0;
}
diff --git a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
index 8a15be9..65c2821 100644
--- a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
+++ b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
@@ -27,13 +27,7 @@
class LightWeightC2Handler : public HeartbeatHandler {
public:
- explicit LightWeightC2Handler(bool isSecure)
- : HeartbeatHandler(isSecure) {
- }
-
- virtual ~LightWeightC2Handler() = default;
-
- virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection *) {
+ void handleHeartbeat(const rapidjson::Document& root, struct mg_connection *) override {
if (calls_ == 0) {
verifyJsonHasAgentManifest(root);
} else {
@@ -48,11 +42,7 @@
class VerifyC2Heartbeat : public VerifyC2Base {
public:
- explicit VerifyC2Heartbeat(bool isSecure)
- : VerifyC2Base(isSecure) {
- }
-
- virtual void testSetup() {
+ void testSetup() override {
LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
LogTestController::getInstance().setDebug<minifi::c2::RESTProtocol>();
@@ -60,56 +50,39 @@
VerifyC2Base::testSetup();
}
- void runAssertions() {
+ void runAssertions() override {
assert(LogTestController::getInstance().contains("Received Ack from Server"));
assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component invoke"));
assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component FlowController"));
}
- void configureFullHeartbeat() {
+ void configureFullHeartbeat() override {
configuration->set("nifi.c2.full.heartbeat", "true");
}
};
class VerifyLightWeightC2Heartbeat : public VerifyC2Heartbeat {
public:
- explicit VerifyLightWeightC2Heartbeat(bool isSecure)
- : VerifyC2Heartbeat(isSecure) {
- }
-
- void configureFullHeartbeat() {
+ void configureFullHeartbeat() override {
configuration->set("nifi.c2.full.heartbeat", "false");
}
};
int main(int argc, char **argv) {
- std::string key_dir, test_file_location, url;
- url = "http://localhost:0/api/heartbeat";
- if (argc > 1) {
- test_file_location = argv[1];
- if (argc > 2) {
- url = "https://localhost:0/api/heartbeat";
- key_dir = argv[2];
- }
- }
-
- bool isSecure = false;
- if (url.find("https") != std::string::npos) {
- isSecure = true;
- }
+ const cmd_args args = parse_cmdline_args(argc, argv, "heartbeat");
{
- VerifyC2Heartbeat harness(isSecure);
- harness.setKeyDir(key_dir);
- HeartbeatHandler responder(isSecure);
- harness.setUrl(url, &responder);
- harness.run(test_file_location);
+ VerifyC2Heartbeat harness;
+ harness.setKeyDir(args.key_dir);
+ HeartbeatHandler responder;
+ harness.setUrl(args.url, &responder);
+ harness.run(args.test_file);
}
- VerifyLightWeightC2Heartbeat harness(isSecure);
- harness.setKeyDir(key_dir);
- LightWeightC2Handler responder(isSecure);
- harness.setUrl(url, &responder);
- harness.run(test_file_location);
+ VerifyLightWeightC2Heartbeat harness;
+ harness.setKeyDir(args.key_dir);
+ LightWeightC2Handler responder;
+ harness.setUrl(args.url, &responder);
+ harness.run(args.test_file);
return 0;
}
diff --git a/extensions/http-curl/tests/C2VerifyServeResults.cpp b/extensions/http-curl/tests/C2VerifyServeResults.cpp
index 9f87c18..6304a9d 100644
--- a/extensions/http-curl/tests/C2VerifyServeResults.cpp
+++ b/extensions/http-curl/tests/C2VerifyServeResults.cpp
@@ -16,74 +16,26 @@
* limitations under the License.
*/
-#include <sys/stat.h>
#undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
#include <memory>
#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
#include <sstream>
-#include "HTTPClient.h"
#include "processors/InvokeHTTP.h"
#include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
#include "properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "CivetServer.h"
-#include "RemoteProcessorGroupPort.h"
-#include "core/ConfigurableComponent.h"
-#include "controllers/SSLContextService.h"
#include "TestServer.h"
-#include "c2/C2Agent.h"
-#include "protocols/RESTReceiver.h"
#include "HTTPIntegrationBase.h"
-#include "processors/LogAttribute.h"
-
-class Responder : public CivetHandler {
- public:
- explicit Responder(bool isSecure)
- : isSecure(isSecure) {
- }
- bool handlePost(CivetServer *server, struct mg_connection *conn) {
- std::string resp = "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"name\" : \"invoke\" }, "
- "{ \"operationid\" : 42, \"operation\" : \"stop\", \"name\" : \"FlowController\" } ]}";
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- resp.length());
- mg_printf(conn, "%s", resp.c_str());
- return true;
- }
-
- protected:
- bool isSecure;
-};
class VerifyC2Server : public CoapIntegrationBase {
public:
- explicit VerifyC2Server(bool isSecure)
- : isSecure(isSecure) {
+ explicit VerifyC2Server() {
char format[] = "/tmp/ssth.XXXXXX";
dir = testController.createTempDirectory(format);
}
- void testSetup() {
- LogTestController::getInstance().setDebug<utils::HTTPClient>();
+ void testSetup() override {
LogTestController::getInstance().setDebug<processors::InvokeHTTP>();
- LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
- LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
- LogTestController::getInstance().setDebug<processors::LogAttribute>();
LogTestController::getInstance().setDebug<minifi::core::ProcessSession>();
std::fstream file;
ss << dir << "/" << "tstFile.ext";
@@ -92,25 +44,23 @@
file.close();
}
- void cleanup() {
+ void cleanup() override {
unlink(ss.str().c_str());
}
- void runAssertions() {
- assert(LogTestController::getInstance().contains("Import offset 0") == true);
-
- assert(LogTestController::getInstance().contains("Outputting success and response") == true);
-
+ void runAssertions() override {
+ assert(LogTestController::getInstance().contains("Import offset 0"));
+ assert(LogTestController::getInstance().contains("Outputting success and response"));
}
- void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
+ void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) override {
std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
assert(proc != nullptr);
std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
assert(inv != nullptr);
- std::string url = "";
+ std::string url;
inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
std::string port, scheme, path;
@@ -125,28 +75,17 @@
}
protected:
- bool isSecure;
std::string dir;
std::stringstream ss;
TestController testController;
};
int main(int argc, char **argv) {
- std::string key_dir, test_file_location, url;
- if (argc > 1) {
- test_file_location = argv[1];
- if (argc > 2) {
- key_dir = argv[2];
- }
- }
+ const cmd_args args = parse_cmdline_args(argc, argv);
- bool isSecure = !key_dir.empty();
-
- VerifyC2Server harness(isSecure);
-
- harness.setKeyDir(key_dir);
-
- harness.run(test_file_location);
+ VerifyC2Server harness;
+ harness.setKeyDir(args.key_dir);
+ harness.run(args.test_file);
return 0;
}
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index 8ba76a2..490f701 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -75,7 +75,7 @@
add_test(NAME C2DescribeManifestTest COMMAND C2DescribeManifestTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
add_test(NAME C2DescribeCoreComponentStateTest COMMAND C2DescribeCoreComponentStateTest "${TEST_RESOURCES}/TestC2DescribeCoreComponentState.yml" "${TEST_RESOURCES}/")
add_test(NAME C2UpdateAgentTest COMMAND C2UpdateAgentTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
-add_test(NAME C2FailedUpdateTest COMMAND C2FailedUpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/TestBad.yml" "${TEST_RESOURCES}/")
+add_test(NAME C2FailedUpdateTest COMMAND C2FailedUpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/" "${TEST_RESOURCES}/TestBad.yml")
add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml" "${TEST_RESOURCES}/")
if(NOT OPENSSL_OFF)
add_test(NAME HttpGetIntegrationTestSecure COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGetSecure.yml" "${TEST_RESOURCES}/")
diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
index e098760..0917f50 100644
--- a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
+++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp
@@ -19,20 +19,15 @@
#undef NDEBUG
#include <cassert>
#include <chrono>
-#include <fstream>
#include <memory>
#include <string>
#include <utility>
#include <thread>
-#include <type_traits>
#include <vector>
#include "core/controller/ControllerServiceMap.h"
-#include "core/controller/StandardControllerServiceNode.h"
#include "core/controller/StandardControllerServiceProvider.h"
#include "controllers/SSLContextService.h"
-#include "core/Core.h"
-#include "core/logging/LoggerConfiguration.h"
#include "core/ProcessGroup.h"
#include "core/Resource.h"
#include "core/yaml/YamlConfiguration.h"
@@ -40,50 +35,38 @@
#include "properties/Configure.h"
#include "unit/MockClasses.h"
#include "unit/ProvenanceTestHelper.h"
+#include "integration/IntegrationBase.h"
REGISTER_RESOURCE(MockControllerService, "");
REGISTER_RESOURCE(MockProcessor, "");
-std::shared_ptr<core::controller::StandardControllerServiceNode> newCsNode(std::shared_ptr<core::controller::ControllerServiceProvider> provider, const std::string id) {
- std::shared_ptr<core::controller::ControllerService> service = std::make_shared<MockControllerService>();
- std::shared_ptr<core::controller::StandardControllerServiceNode> testNode = std::make_shared<core::controller::StandardControllerServiceNode>(service, provider, id,
- std::make_shared<minifi::Configure>());
- return testNode;
-}
-
void waitToVerifyProcessor() {
std::this_thread::sleep_for(std::chrono::seconds(2));
}
int main(int argc, char **argv) {
- std::string test_file_location;
- std::string key_dir;
-
- if (argc > 2) {
- test_file_location = argv[1];
- key_dir = argv[1];
- }
+ const cmd_args args = parse_cmdline_args(argc, argv);
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
- configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
+ configuration->set(minifi::Configure::nifi_flow_configuration_file, args.test_file);
std::string client_cert = "cn.crt.pem";
std::string priv_key_file = "cn.ckey.pem";
std::string passphrase = "cn.pass";
std::string ca_cert = "nifi-cert.pem";
- configuration->set(minifi::Configure::nifi_security_client_certificate, test_file_location);
+ configuration->set(minifi::Configure::nifi_security_client_certificate, args.test_file);
configuration->set(minifi::Configure::nifi_security_client_private_key, priv_key_file);
configuration->set(minifi::Configure::nifi_security_client_pass_phrase, passphrase);
- configuration->set(minifi::Configure::nifi_default_directory, key_dir);
+ configuration->set(minifi::Configure::nifi_default_directory, args.key_dir);
std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(configuration);
std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
- new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
+ new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file));
std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
@@ -94,11 +77,9 @@
disabled = false;
std::shared_ptr<core::controller::ControllerServiceMap> map = std::make_shared<core::controller::ControllerServiceMap>();
- core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+ core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file);
- std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
- std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
- ptr.release();
+ std::shared_ptr<core::ProcessGroup> pg(yaml_config.getRoot(args.test_file));
std::shared_ptr<core::controller::StandardControllerServiceProvider> provider = std::make_shared<core::controller::StandardControllerServiceProvider>(map, pg, std::make_shared<minifi::Configure>());
std::shared_ptr<core::controller::ControllerServiceNode> mockNode = pg->findControllerService("MockItLikeIts1995");
diff --git a/extensions/http-curl/tests/GetFileNoData.cpp b/extensions/http-curl/tests/GetFileNoData.cpp
deleted file mode 100644
index 7481991..0000000
--- a/extensions/http-curl/tests/GetFileNoData.cpp
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <sys/stat.h>
-#undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
-#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "c2/C2Agent.h"
-#include "CivetServer.h"
-#include <cstring>
-#include "protocols/RESTSender.h"
-
-void waitToVerifyProcessor() {
- std::this_thread::sleep_for(std::chrono::seconds(10));
-}
-
-static std::vector<std::string> responses;
-
-class ConfigHandler : public CivetHandler {
- public:
- bool handlePost(CivetServer *server, struct mg_connection *conn) {
- if (responses.size() > 0) {
- std::string top_str = responses.back();
- responses.pop_back();
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- top_str.length());
- mg_printf(conn, "%s", top_str.c_str());
- } else {
- mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
- }
-
- return true;
- }
-
- bool handleGet(CivetServer *server, struct mg_connection *conn) {
- std::ifstream myfile(test_file_location_.c_str());
-
- if (myfile.is_open()) {
- std::stringstream buffer;
- buffer << myfile.rdbuf();
- std::string str = buffer.str();
- myfile.close();
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- str.length());
- mg_printf(conn, "%s", str.c_str());
- } else {
- mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
- }
-
- return true;
- }
- std::string test_file_location_;
-};
-
-int main(int argc, char **argv) {
- mg_init_library(0);
- LogTestController::getInstance().setInfo<minifi::FlowController>();
- LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
- LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
- LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
-
- const char *options[] = { "document_root", ".", "listening_ports", "9090", 0 };
- std::vector<std::string> cpp_options;
- for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
- cpp_options.push_back(options[i]);
- }
-
- CivetServer server(cpp_options);
- ConfigHandler h_ex;
- server.addHandler("/update", h_ex);
- std::string key_dir, test_file_location;
- if (argc > 1) {
- h_ex.test_file_location_ = test_file_location = argv[1];
- key_dir = argv[2];
- }
- std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
- "\"operation\" : \"update\", "
- "\"operationid\" : \"8675309\", "
- "\"name\": \"configuration\""
- "}]}";
-
- responses.push_back(heartbeat_response);
-
- std::ifstream myfile(test_file_location.c_str());
-
- if (myfile.is_open()) {
- std::stringstream buffer;
- buffer << myfile.rdbuf();
- std::string str = buffer.str();
- myfile.close();
- std::string response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
- "\"operation\" : \"update\", "
- "\"operationid\" : \"8675309\", "
- "\"name\": \"configuration\", \"content\": { \"location\": \"http://localhost:9090/update\"}}]}";
- responses.push_back(response);
- }
-
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<
- minifi::Configure>();
-
- configuration->set("c2.rest.url",
- "http://localhost:9090/update");
-
- std::shared_ptr<core::Repository> test_repo =
- std::make_shared<TestRepository>();
- std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<
- TestFlowRepository>();
-
- configuration->set(minifi::Configure::nifi_flow_configuration_file,
- test_file_location);
-
- std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr
- <core::YamlConfiguration
- >(new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory,
- configuration,
- test_file_location));
- std::shared_ptr<TestRepository> repo = std::static_pointer_cast
- <TestRepository>(test_repo);
-
- std::shared_ptr<minifi::FlowController> controller =
- std::make_shared<minifi::FlowController
- >(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME, true);
-
- core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory,
- configuration,
- test_file_location);
-
- std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(
- test_file_location);
- std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup
- >(ptr.get());
- ptr.release();
-
- controller->load();
- controller->start();
- waitToVerifyProcessor();
-
- controller->waitUnload(60000);
- std::string logs = LogTestController::getInstance().log_output.str();
- assert(logs.find("Starting to reload Flow Controller with flow control name MiNiFi Flow, version 0") != std::string::npos);
- LogTestController::getInstance().reset();
-
- return 0;
-}
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 71cd7e2..35472ea 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -51,7 +51,7 @@
explicit SiteToSiteLocationResponder(bool isSecure)
: isSecure(isSecure) {
}
- bool handleGet(CivetServer *server, struct mg_connection *conn) {
+ bool handleGet(CivetServer *server, struct mg_connection *conn) override {
std::string site2site_rest_resp = "{"
"\"revision\": {"
"\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\""
@@ -81,7 +81,7 @@
assert(parse_http_components(base_url, port, scheme, path));
}
- bool handleGet(CivetServer *server, struct mg_connection *conn) {
+ bool handleGet(CivetServer *server, struct mg_connection *conn) override {
#ifdef WIN32
std::string hostname = org::apache::nifi::minifi::io::Socket::getMyHostName();
@@ -109,7 +109,7 @@
: base_url(std::move(base_url)) {
}
- bool handleGet(CivetServer *server, struct mg_connection *conn) {
+ bool handleGet(CivetServer *server, struct mg_connection *conn) override {
std::string site2site_rest_resp =
"{\"controller\":{\"id\":\"96dab149-0162-1000-7924-ed3122d6ea2b\",\"name\":\"NiFi Flow\",\"comments\":\"\",\"runningCount\":3,\"stoppedCount\":6,\"invalidCount\":1,\"disabledCount\":0,\"inputPortCount\":1,\"outputPortCount\":1,\"remoteSiteListeningPort\":10443,\"siteToSiteSecure\":false,\"instanceId\":\"13881505-0167-1000-be72-aa29341a3e9a\",\"inputPorts\":[{\"id\":\"471deef6-2a6e-4a7d-912a-81cc17e3a204\",\"name\":\"RPGIN\",\"comments\":\"\",\"state\":\"RUNNING\"}],\"outputPorts\":[{\"id\":\"9cf15a63-0166-1000-1b29-027406d96013\",\"name\":\"ddsga\",\"comments\":\"\",\"state\":\"STOPPED\"}]}}";
std::stringstream headers;
@@ -145,7 +145,7 @@
}
}
- bool handlePost(CivetServer *server, struct mg_connection *conn) {
+ bool handlePost(CivetServer *server, struct mg_connection *conn) override {
std::string site2site_rest_resp;
std::stringstream headers;
headers << "HTTP/1.1 201 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nX-Location-Uri-Intent: ";
@@ -203,7 +203,7 @@
flow_files_feed_ = feed;
}
- bool handlePost(CivetServer *server, struct mg_connection *conn) {
+ bool handlePost(CivetServer *server, struct mg_connection *conn) override {
std::string site2site_rest_resp;
std::stringstream headers;
@@ -263,7 +263,7 @@
return true;
}
- bool handleGet(CivetServer *server, struct mg_connection *conn) {
+ bool handleGet(CivetServer *server, struct mg_connection *conn) override {
if (flow_files_feed_->size_approx() > 0) {
std::shared_ptr<FlowObj> flowobj;
@@ -294,7 +294,6 @@
stream.write(length);
stream.writeData(flow->data.data(), length);
}
- auto ret = mg_write(conn, serializer.getBuffer(), total);
} else {
mg_printf(conn, "HTTP/1.1 200 OK\r\nConnection: "
"close\r\nContent-Length: 0\r\n");
@@ -337,7 +336,7 @@
response_code(std::move(response_code)) {
}
- bool handleDelete(CivetServer *server, struct mg_connection *conn) {
+ bool handleDelete(CivetServer *server, struct mg_connection *conn) override {
std::string site2site_rest_resp;
std::stringstream headers;
std::string resp;
@@ -360,23 +359,19 @@
std::string response_code;
};
+std::string readPayload(struct mg_connection *conn) {
+ std::string response;
+ int readBytes;
+
+ std::array<char, 1024> buffer;
+ while ((readBytes = mg_read(conn, buffer.data(), buffer.size())) > 0) {
+ response.append(buffer.data(), readBytes);
+ }
+ return response;
+}
+
class HeartbeatHandler : public ServerAwareHandler {
public:
- explicit HeartbeatHandler(bool isSecure)
- : isSecure(isSecure) {
- }
-
- std::string readPost(struct mg_connection *conn) {
- std::string response;
- int readBytes;
-
- char buffer[1024];
- while ((readBytes = mg_read(conn, buffer, sizeof(buffer))) > 0) {
- response.append(buffer, (readBytes / sizeof(char)));
- }
- return response;
- }
-
void sendStopOperation(struct mg_connection *conn) {
std::string resp = "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"operand\" : \"invoke\" }, "
"{ \"operationid\" : 42, \"operation\" : \"stop\", \"operand\" : \"FlowController\" } ]}";
@@ -415,7 +410,7 @@
}
auto group = minifi::BuildDescription::getClassDescriptions(str);
- for (auto proc : group.processors_) {
+ for (const auto& proc : group.processors_) {
assert(std::find(classes.begin(), classes.end(), proc.class_name_) != std::end(classes));
found = true;
}
@@ -433,7 +428,7 @@
}
void verify(struct mg_connection *conn) {
- auto post_data = readPost(conn);
+ auto post_data = readPayload(conn);
//std::cerr << post_data << std::endl;
if (!IsNullOrEmpty(post_data)) {
rapidjson::Document root;
@@ -450,14 +445,78 @@
}
}
- bool handlePost(CivetServer *, struct mg_connection *conn) {
+ bool handlePost(CivetServer *, struct mg_connection *conn) override {
verify(conn);
sendStopOperation(conn);
return true;
}
+};
- protected:
- bool isSecure;
+class C2UpdateHandler : public ServerAwareHandler {
+ public:
+ explicit C2UpdateHandler(const std::string& test_file_location)
+ : test_file_location_(test_file_location) {
+ }
+
+ bool handlePost(CivetServer *server, struct mg_connection *conn) override {
+ calls_++;
+ if (!response_.empty()) {
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ response_.length());
+ mg_printf(conn, "%s", response_.c_str());
+ response_.clear();
+ } else {
+ mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
+ }
+
+ return true;
+ }
+
+ bool handleGet(CivetServer *server, struct mg_connection *conn) override {
+ std::ifstream myfile(test_file_location_.c_str(), std::ios::in | std::ios::binary);
+ if (myfile.good()) {
+ std::string str((std::istreambuf_iterator<char>(myfile)), (std::istreambuf_iterator<char>()));
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ str.length());
+ mg_printf(conn, "%s", str.c_str());
+ } else {
+ mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
+ }
+
+ return true;
+ }
+
+ virtual void setC2RestResponse(const std::string& url, const std::string& name) {
+ response_ = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
+ "\"operation\" : \"update\", "
+ "\"operationid\" : \"8675309\", "
+ "\"name\": \"" + name + "\", \"content\": { \"location\": \"" + url + "\"}}]}";
+ }
+
+ std::atomic<size_t> calls_{0};
+ private:
+ std::string test_file_location_;
+ std::string response_;
+};
+
+class C2FailedUpdateHandler : public C2UpdateHandler {
+public:
+ explicit C2FailedUpdateHandler(const std::string& test_file_location)
+ : C2UpdateHandler(test_file_location) {
+ }
+
+ bool handlePost(CivetServer *server, struct mg_connection *conn) override {
+ calls_++;
+ const auto data = readPayload(conn);
+
+ if (data.find("operationState") != std::string::npos) {
+ assert(data.find("state\": \"NOT_APPLIED") != std::string::npos);
+ }
+
+ return C2UpdateHandler::handlePost(server, conn);
+ }
};
class InvokeHTTPCouldNotConnectHandler : public ServerAwareHandler {
@@ -465,7 +524,7 @@
class InvokeHTTPResponseOKHandler : public ServerAwareHandler {
public:
- bool handlePost(CivetServer *, struct mg_connection *conn) {
+ bool handlePost(CivetServer *, struct mg_connection *conn) override {
mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
return true;
}
@@ -473,7 +532,7 @@
class InvokeHTTPResponse404Handler : public ServerAwareHandler {
public:
- bool handlePost(CivetServer *, struct mg_connection *conn) {
+ bool handlePost(CivetServer *, struct mg_connection *conn) override {
mg_printf(conn, "HTTP/1.1 404 Not Found\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
return true;
}
@@ -481,7 +540,7 @@
class InvokeHTTPResponse501Handler : public ServerAwareHandler {
public:
- bool handlePost(CivetServer *, struct mg_connection *conn) {
+ bool handlePost(CivetServer *, struct mg_connection *conn) override {
mg_printf(conn, "HTTP/1.1 501 Not Implemented\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
return true;
}
@@ -492,25 +551,25 @@
TimeoutingHTTPHandler(std::vector<std::chrono::milliseconds> wait_times)
: wait_times_(wait_times) {
}
- bool handlePost(CivetServer *, struct mg_connection *conn) {
+ bool handlePost(CivetServer *, struct mg_connection *conn) override {
respond(conn);
return true;
}
- bool handleGet(CivetServer *, struct mg_connection *conn) {
+ bool handleGet(CivetServer *, struct mg_connection *conn) override {
respond(conn);
return true;
}
- bool handleDelete(CivetServer *, struct mg_connection *conn) {
+ bool handleDelete(CivetServer *, struct mg_connection *conn) override {
respond(conn);
return true;
}
- bool handlePut(CivetServer *, struct mg_connection *conn) {
+ bool handlePut(CivetServer *, struct mg_connection *conn) override {
respond(conn);
return true;
}
private:
void respond(struct mg_connection *conn) {
- if (wait_times_.size() > 0 && wait_times_[0].count() > 0) {
+ if (!wait_times_.empty() && wait_times_[0] > std::chrono::seconds(0)) {
sleep_for(wait_times_[0]);
}
int chunk_count = std::max(static_cast<int>(wait_times_.size()) - 1, 0);
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 04f0c99..326b50e 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -18,12 +18,13 @@
#ifndef LIBMINIFI_TEST_INTEGRATION_HTTPINTEGRATIONBASE_H_
#define LIBMINIFI_TEST_INTEGRATION_HTTPINTEGRATIONBASE_H_
-#include "../tests/TestServer.h"
#include "CivetServer.h"
#include "integration/IntegrationBase.h"
#include "c2/C2Agent.h"
#include "protocols/RESTSender.h"
#include "ServerAwareHandler.h"
+#include "TestBase.h"
+#include "TestServer.h"
int log_message(const struct mg_connection *conn, const char *message) {
puts(message);
@@ -55,6 +56,12 @@
return ret_val;
}
+ std::string getC2RestUrl() const {
+ std::string c2_rest_url;
+ configuration->get("nifi.c2.rest.url", c2_rest_url);
+ return c2_rest_url;
+ }
+
protected:
std::unique_ptr<TestServer> server;
};
@@ -62,13 +69,12 @@
void CoapIntegrationBase::setUrl(const std::string& url, ServerAwareHandler *handler) {
parse_http_components(url, port, scheme, path);
CivetCallbacks callback{};
- if (server != nullptr) {
+ if (server) {
server->addHandler(path, handler);
return;
}
if (scheme == "https" && !key_dir.empty()) {
- std::string cert = "";
- cert = key_dir + "nifi-cert.pem";
+ std::string cert = key_dir + "nifi-cert.pem";
memset(&callback, 0, sizeof(callback));
callback.init_ssl = ssl_enable;
port += "s";
@@ -77,52 +83,41 @@
} else {
server = utils::make_unique<TestServer>(port, path, handler);
}
+ bool secure{false};
if (port == "0" || port == "0s") {
- bool secure = (port == "0s");
+ secure = (port == "0s");
port = std::to_string(server->getListeningPorts()[0]);
if (secure) {
port += "s";
}
}
+ std::string c2_url = std::string("http") + (secure ? "s" : "") + "://localhost:" + getWebPort() + path;
+ configuration->set("nifi.c2.rest.url", c2_url);
+ configuration->set("nifi.c2.rest.url.ack", c2_url);
}
class VerifyC2Base : public CoapIntegrationBase {
public:
- explicit VerifyC2Base(bool isSecure)
- : isSecure(isSecure) {
- }
-
void testSetup() override {
LogTestController::getInstance().setDebug<utils::HTTPClient>();
LogTestController::getInstance().setDebug<LogTestController>();
}
- void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup>) override {
- std::string c2_url = std::string("http") + (isSecure ? "s" : "") + "://localhost:" + getWebPort() + "/api/heartbeat";
-
+ void configureC2() override {
configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
configuration->set("nifi.c2.enable", "true");
configuration->set("nifi.c2.agent.class", "test");
- configuration->set("nifi.c2.rest.url", c2_url);
configuration->set("nifi.c2.agent.heartbeat.period", "1000");
- configuration->set("nifi.c2.rest.url.ack", c2_url);
configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformation,FlowInformation");
}
void cleanup() override {
LogTestController::getInstance().reset();
}
-
- protected:
- bool isSecure;
};
class VerifyC2Describe : public VerifyC2Base {
public:
- explicit VerifyC2Describe(bool isSecure)
- : VerifyC2Base(isSecure) {
- }
-
void testSetup() override {
LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
@@ -137,4 +132,78 @@
void runAssertions() override {
}
};
+
+class VerifyC2Update : public CoapIntegrationBase {
+ public:
+ explicit VerifyC2Update(uint64_t waitTime)
+ : CoapIntegrationBase(waitTime) {
+ }
+
+ void testSetup() override {
+ LogTestController::getInstance().setInfo<minifi::FlowController>();
+ LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
+ LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+ LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
+ }
+
+ void configureC2() override {
+ configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
+ configuration->set("nifi.c2.enable", "true");
+ configuration->set("nifi.c2.agent.class", "test");
+ configuration->set("nifi.c2.agent.heartbeat.period", "1000");
+ }
+
+ void cleanup() override {
+ LogTestController::getInstance().reset();
+ }
+
+ void runAssertions() override {
+ assert(LogTestController::getInstance().contains("Starting to reload Flow Controller with flow control name MiNiFi Flow, version"));
+ }
+};
+
+class VerifyC2UpdateAgent : public VerifyC2Update {
+ public:
+ explicit VerifyC2UpdateAgent(uint64_t waitTime)
+ : VerifyC2Update(waitTime) {
+ }
+
+ void configureC2() override {
+ VerifyC2Update::configureC2();
+ configuration->set("nifi.c2.agent.update.allow","true");
+ configuration->set("c2.agent.update.command", "echo \"verification command\"");
+ }
+
+ void testSetup() override {
+ LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+ }
+
+ void runAssertions() override {
+ assert(LogTestController::getInstance().contains("removing file"));
+ assert(LogTestController::getInstance().contains("May not have command processor"));
+ }
+};
+
+class VerifyC2FailedUpdate : public VerifyC2Update {
+public:
+ explicit VerifyC2FailedUpdate(uint64_t waitTime)
+ : VerifyC2Update(waitTime) {
+ }
+
+ void testSetup() override {
+ LogTestController::getInstance().setInfo<minifi::FlowController>();
+ LogTestController::getInstance().setDebug<minifi::c2::C2Agent>();
+ utils::file::FileUtils::create_dir("content_repository");
+ }
+
+ void runAssertions() override {
+ assert(LogTestController::getInstance().contains("Invalid configuration payload"));
+ assert(LogTestController::getInstance().contains("update failed"));
+ }
+
+ void cleanup() override {
+ utils::file::FileUtils::delete_dir("content_repository", true);
+ VerifyC2Update::cleanup();
+ }
+};
#endif /* LIBMINIFI_TEST_INTEGRATION_HTTPINTEGRATIONBASE_H_ */
diff --git a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
index 8078171..cfea43b 100644
--- a/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
+++ b/extensions/http-curl/tests/HTTPSiteToSiteTests.cpp
@@ -17,35 +17,19 @@
*/
#define CURLOPT_SSL_VERIFYPEER_DISABLE 1
-#include <sys/stat.h>
#undef NDEBUG
#include <cassert>
-#include <utility>
#include <chrono>
-#include <fstream>
-#include <memory>
#include <string>
-#include <thread>
-#include <type_traits>
#include <vector>
#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "CivetServer.h"
#include "sitetosite/HTTPProtocol.h"
#include "InvokeHTTP.h"
#include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
#include "FlowController.h"
-#include "properties/Configure.h"
#include "io/StreamFactory.h"
#include "RemoteProcessorGroupPort.h"
#include "core/ConfigurableComponent.h"
-#include "TestServer.h"
#include "HTTPIntegrationBase.h"
#include "HTTPHandlers.h"
#include "client/HTTPStream.h"
@@ -190,25 +174,25 @@
std::stringstream assertStr;
if (profile.allFalse()) {
assertStr << "Site2Site transaction " << transaction_id << " peer finished transaction";
- assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+ assert(LogTestController::getInstance().contains(assertStr.str()));
} else if (profile.empty_transaction_url) {
- assert(LogTestController::getInstance().contains("Location is empty") == true);
+ assert(LogTestController::getInstance().contains("Location is empty"));
} else if (profile.transaction_url_broken) {
- assert(LogTestController::getInstance().contains("Could not create transaction, intent is ohstuff") == true);
+ assert(LogTestController::getInstance().contains("Could not create transaction, intent is ohstuff"));
} else if (profile.invalid_checksum) {
assertStr << "Site2Site transaction " << transaction_id << " peer confirm transaction with CRC Imawrongchecksumshortandstout";
- assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+ assert(LogTestController::getInstance().contains(assertStr.str()));
assertStr.str(std::string());
assertStr << "Site2Site transaction " << transaction_id << " CRC not matched";
- assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+ assert(LogTestController::getInstance().contains(assertStr.str()));
assertStr.str(std::string());
assertStr << "Site2Site delete transaction " << transaction_id;
- assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+ assert(LogTestController::getInstance().contains(assertStr.str()));
} else if (profile.no_delete) {
- assert(LogTestController::getInstance().contains("Received 401 response code from delete") == true);
+ assert(LogTestController::getInstance().contains("Received 401 response code from delete"));
} else {
assertStr << "Site2Site transaction " << transaction_id << " peer unknown respond code 254";
- assert(LogTestController::getInstance().contains(assertStr.str()) == true);
+ assert(LogTestController::getInstance().contains(assertStr.str()));
}
LogTestController::getInstance().reset();
}
@@ -216,17 +200,8 @@
int main(int argc, char **argv) {
transaction_id = 0;
transaction_id_output = 0;
- std::string key_dir, test_file_location, url;
- if (argc > 1) {
- test_file_location = argv[1];
- key_dir = argv[2];
- url = argv[3];
- }
-
- bool isSecure = false;
- if (url.find("https") != std::string::npos) {
- isSecure = true;
- }
+ const cmd_args args = parse_cmdline_args_with_url(argc, argv);
+ const bool isSecure = args.isUrlSecure();
#ifdef WIN32
if (url.find("localhost") != std::string::npos) {
@@ -237,37 +212,37 @@
#endif
{
struct test_profile profile;
- run_variance(test_file_location, isSecure, url, profile);
+ run_variance(args.test_file, isSecure, args.url, profile);
}
{
struct test_profile profile;
profile.flow_url_broken = true;
- run_variance(test_file_location, isSecure, url, profile);
+ run_variance(args.test_file, isSecure, args.url, profile);
}
{
struct test_profile profile;
profile.empty_transaction_url = true;
- run_variance(test_file_location, isSecure, url, profile);
+ run_variance(args.test_file, isSecure, args.url, profile);
}
{
struct test_profile profile;
profile.transaction_url_broken = true;
- run_variance(test_file_location, isSecure, url, profile);
+ run_variance(args.test_file, isSecure, args.url, profile);
}
{
struct test_profile profile;
profile.no_delete = true;
- run_variance(test_file_location, isSecure, url, profile);
+ run_variance(args.test_file, isSecure, args.url, profile);
}
{
struct test_profile profile;
profile.invalid_checksum = true;
- run_variance(test_file_location, isSecure, url, profile);
+ run_variance(args.test_file, isSecure, args.url, profile);
}
return 0;
diff --git a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
index f149d4c..0efffb5 100644
--- a/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
+++ b/extensions/http-curl/tests/HttpGetIntegrationTest.cpp
@@ -17,46 +17,31 @@
*/
#define CURLOPT_SSL_VERIFYPEER_DISABLE 1
-#include <sys/stat.h>
#undef NDEBUG
#include <cassert>
#include <utility>
#include <chrono>
-#include <fstream>
#include <memory>
#include <string>
#include <thread>
-#include <type_traits>
#include <vector>
#include "HTTPClient.h"
#include "InvokeHTTP.h"
#include "TestServer.h"
#include "TestBase.h"
#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
-#include "core/ProcessGroup.h"
#include "core/yaml/YamlConfiguration.h"
#include "FlowController.h"
#include "properties/Configure.h"
#include "unit/ProvenanceTestHelper.h"
#include "io/StreamFactory.h"
-#include "processors/InvokeHTTP.h"
-#include "processors/ListenHTTP.h"
#include "processors/LogAttribute.h"
+#include "HTTPIntegrationBase.h"
-void waitToVerifyProcessor() {
- std::this_thread::sleep_for(std::chrono::seconds(10));
-}
-
-int log_message(const struct mg_connection *conn, const char *message) {
- puts(message);
- return 1;
-}
-
-int ssl_enable(void* /*ssl_context*/, void* /*user_data*/) {
- puts("Enable ssl");
- return 0;
+namespace {
+ void waitToVerifyProcessor() {
+ std::this_thread::sleep_for(std::chrono::seconds(10));
+ }
}
class HttpResponder : public CivetHandler {
@@ -74,24 +59,22 @@
};
int main(int argc, char **argv) {
+ const cmd_args args = parse_cmdline_args(argc, argv);
+
LogTestController::getInstance().setDebug<core::Processor>();
LogTestController::getInstance().setDebug<core::ProcessSession>();
LogTestController::getInstance().setDebug<utils::HTTPClient>();
LogTestController::getInstance().setDebug<minifi::controllers::SSLContextService>();
LogTestController::getInstance().setDebug<minifi::processors::InvokeHTTP>();
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
- std::string key_dir, test_file_location;
- if (argc > 1) {
- test_file_location = argv[1];
- key_dir = argv[2];
- }
+
std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
- configuration->set(minifi::Configure::nifi_default_directory, key_dir);
+ configuration->set(minifi::Configure::nifi_default_directory, args.key_dir);
std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
- configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
+ configuration->set(minifi::Configure::nifi_flow_configuration_file, args.test_file);
std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
@@ -100,7 +83,7 @@
content_repo->initialize(configuration);
std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
- new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
+ new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file));
std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr),
@@ -108,9 +91,9 @@
DEFAULT_ROOT_GROUP_NAME,
true);
- core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
+ core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, args.test_file);
- std::shared_ptr<core::Processor> proc = yaml_config.getRoot(test_file_location)->findProcessor("invoke");
+ std::shared_ptr<core::Processor> proc = yaml_config.getRoot(args.test_file)->findProcessor("invoke");
assert(proc != nullptr);
const auto inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
@@ -125,7 +108,7 @@
CivetCallbacks callback{};
if (scheme == "https") {
std::string cert;
- cert = key_dir + "nifi-cert.pem";
+ cert = args.key_dir + "nifi-cert.pem";
memset(&callback, 0, sizeof(callback));
callback.init_ssl = ssl_enable;
std::string https_port = port + "s";
diff --git a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
index 4b841da..7cb7d78 100644
--- a/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
+++ b/extensions/http-curl/tests/HttpPostIntegrationTest.cpp
@@ -16,38 +16,17 @@
* limitations under the License.
*/
-#include <sys/stat.h>
#undef NDEBUG
#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
#include <iostream>
-#include "HTTPClient.h"
#include "InvokeHTTP.h"
#include "processors/ListenHTTP.h"
#include "processors/LogAttribute.h"
-#include <sstream>
#include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
#include "core/logging/Logger.h"
#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
#include "FlowController.h"
-#include "properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "CivetServer.h"
-#include "RemoteProcessorGroupPort.h"
-#include "core/ConfigurableComponent.h"
-#include "controllers/SSLContextService.h"
-#include "../tests/TestServer.h"
#include "HTTPIntegrationBase.h"
class HttpTestHarness : public CoapIntegrationBase {
@@ -57,7 +36,7 @@
dir = testController.createTempDirectory(format);
}
- void testSetup() {
+ void testSetup() override {
LogTestController::getInstance().setDebug<minifi::FlowController>();
LogTestController::getInstance().setDebug<core::ProcessGroup>();
LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
@@ -81,14 +60,14 @@
configuration->set("nifi.c2.enable", "false");
}
- void cleanup() {
+ void cleanup() override {
unlink(ss.str().c_str());
}
- void runAssertions() {
- assert(LogTestController::getInstance().contains("curl performed") == true);
- assert(LogTestController::getInstance().contains("Size:1024 Offset:0") == true);
- assert(LogTestController::getInstance().contains("Size:0 Offset:0") == false);
+ void runAssertions() override {
+ assert(LogTestController::getInstance().contains("curl performed"));
+ assert(LogTestController::getInstance().contains("Size:1024 Offset:0"));
+ assert(!LogTestController::getInstance().contains("Size:0 Offset:0"));
}
protected:
@@ -98,17 +77,10 @@
};
int main(int argc, char **argv) {
- std::string key_dir, test_file_location, url;
- if (argc > 1) {
- test_file_location = argv[1];
- key_dir = argv[2];
- }
+ const cmd_args args = parse_cmdline_args(argc, argv);
HttpTestHarness harness;
-
- harness.setKeyDir(key_dir);
-
- harness.run(test_file_location);
-
+ harness.setKeyDir(args.key_dir);
+ harness.run(args.test_file);
return 0;
}
diff --git a/extensions/http-curl/tests/SiteToSiteRestTest.cpp b/extensions/http-curl/tests/SiteToSiteRestTest.cpp
index 1d6deaf..af23dfc 100644
--- a/extensions/http-curl/tests/SiteToSiteRestTest.cpp
+++ b/extensions/http-curl/tests/SiteToSiteRestTest.cpp
@@ -17,36 +17,18 @@
*/
#define CURLOPT_SSL_VERIFYPEER_DISABLE 1
-#include <sys/stat.h>
#undef NDEBUG
#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
#include "InvokeHTTP.h"
#include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
#include "core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
#include "FlowController.h"
-#include "properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
#include "CivetServer.h"
#include "RemoteProcessorGroupPort.h"
#include "core/ConfigurableComponent.h"
#include "controllers/SSLContextService.h"
-#include "../tests/TestServer.h"
#include "HTTPIntegrationBase.h"
class Responder : public ServerAwareHandler {
@@ -54,7 +36,7 @@
explicit Responder(bool isSecure)
: isSecure(isSecure) {
}
- bool handleGet(CivetServer *server, struct mg_connection *conn) {
+ bool handleGet(CivetServer *server, struct mg_connection *conn) override {
std::string site2site_rest_resp = "{"
"\"revision\": {"
"\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\""
@@ -85,7 +67,7 @@
dir = testController.createTempDirectory(format);
}
- void testSetup() {
+ void testSetup() override {
LogTestController::getInstance().setTrace<minifi::RemoteProcessorGroupPort>();
LogTestController::getInstance().setDebug<utils::HTTPClient>();
LogTestController::getInstance().setTrace<minifi::controllers::SSLContextService>();
@@ -99,17 +81,17 @@
file.close();
}
- void cleanup() {
+ void cleanup() override {
unlink(ss.str().c_str());
}
- void runAssertions() {
+ void runAssertions() override {
if (isSecure) {
- assert(LogTestController::getInstance().contains("process group remote site2site port 10001, is secure 1") == true);
+ assert(LogTestController::getInstance().contains("process group remote site2site port 10001, is secure 1"));
} else {
- assert(LogTestController::getInstance().contains("process group remote site2site port 10001, is secure 0") == true);
+ assert(LogTestController::getInstance().contains("process group remote site2site port 10001, is secure 0"));
}
- assert(LogTestController::getInstance().contains("ProcessGroup::refreshRemoteSite2SiteInfo -- curl_easy_perform() failed ") == true);
+ assert(LogTestController::getInstance().contains("ProcessGroup::refreshRemoteSite2SiteInfo -- curl_easy_perform() failed "));
}
protected:
@@ -120,27 +102,14 @@
};
int main(int argc, char **argv) {
- std::string key_dir, test_file_location, url;
- if (argc > 1) {
- test_file_location = argv[1];
- key_dir = argv[2];
- url = argv[3];
- }
-
- bool isSecure = false;
- if (url.find("https") != std::string::npos) {
- isSecure = true;
- }
+ const cmd_args args = parse_cmdline_args_with_url(argc, argv);
+ const bool isSecure = args.isUrlSecure();
SiteToSiteTestHarness harness(isSecure);
-
Responder responder(isSecure);
-
- harness.setKeyDir(key_dir);
-
- harness.setUrl(url, &responder);
-
- harness.run(test_file_location);
+ harness.setKeyDir(args.key_dir);
+ harness.setUrl(args.url, &responder);
+ harness.run(args.test_file);
return 0;
}
diff --git a/extensions/http-curl/tests/TestServer.h b/extensions/http-curl/tests/TestServer.h
index 13a6cf9..23c3dfc 100644
--- a/extensions/http-curl/tests/TestServer.h
+++ b/extensions/http-curl/tests/TestServer.h
@@ -58,21 +58,17 @@
}
- //mg_init_library(MG_FEATURES_SSL);
-
// ECDH+AESGCM+AES256:!aNULL:!MD5:!DSS
- std::vector<std::string> cpp_options{ "document_root", ".", "listening_ports", port, "error_log_file",
+ const std::vector<std::string> cpp_options{ "document_root", ".", "listening_ports", port, "error_log_file",
"error.log", "ssl_certificate", ca_cert, "ssl_protocol_version", "4", "ssl_cipher_list",
"ALL", "request_timeout_ms", "10000", "enable_auth_domain_check", "no", "ssl_verify_peer", "no"};
server_ = utils::make_unique<CivetServer>(cpp_options, callbacks);
-
addHandler(rooturi, handler);
}
- TestServer(std::string &port, std::string &rooturi, CivetHandler *handler) {
- std::vector<std::string> cpp_options{"document_root", ".", "listening_ports", port};
+ TestServer(const std::string& port, const std::string& rooturi, CivetHandler* handler) {
+ const std::vector<std::string> cpp_options{"document_root", ".", "listening_ports", port};
server_ = utils::make_unique<CivetServer>(cpp_options);
-
addHandler(rooturi, handler);
}
@@ -90,7 +86,6 @@
auto serverAwareHandler = dynamic_cast<ServerAwareHandler*>(handler);
if (serverAwareHandler) serverAwareHandler->stop();
}
-
}
private:
// server_ depends on lib_ (the library initializer)
diff --git a/extensions/http-curl/tests/ThreadPoolAdjust.cpp b/extensions/http-curl/tests/ThreadPoolAdjust.cpp
index 1670e71..c06966e 100644
--- a/extensions/http-curl/tests/ThreadPoolAdjust.cpp
+++ b/extensions/http-curl/tests/ThreadPoolAdjust.cpp
@@ -16,40 +16,17 @@
* limitations under the License.
*/
-#include <sys/stat.h>
#undef NDEBUG
#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
#include "InvokeHTTP.h"
#include "processors/ListenHTTP.h"
#include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
#include "core/logging/Logger.h"
#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
#include "FlowController.h"
-#include "properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "CivetServer.h"
-#include "RemoteProcessorGroupPort.h"
-#include "core/ConfigurableComponent.h"
-#include "controllers/SSLContextService.h"
-#include "TestServer.h"
#include "HTTPIntegrationBase.h"
-#include "processors/InvokeHTTP.h"
-#include "processors/ListenHTTP.h"
#include "processors/LogAttribute.h"
class HttpTestHarness : public IntegrationBase {
@@ -59,7 +36,7 @@
dir = testController.createTempDirectory(format);
}
- void testSetup() {
+ void testSetup() override {
LogTestController::getInstance().setDebug<minifi::FlowController>();
LogTestController::getInstance().setDebug<core::ProcessGroup>();
LogTestController::getInstance().setDebug<minifi::SchedulingAgent>();
@@ -82,14 +59,14 @@
configuration->set("nifi.flow.engine.threads", "1");
}
- void cleanup() {
+ void cleanup() override {
unlink(ss.str().c_str());
}
- void runAssertions() {
- assert(LogTestController::getInstance().contains("curl performed") == true);
- assert(LogTestController::getInstance().contains("Size:1024 Offset:0") == true);
- assert(LogTestController::getInstance().contains("Size:0 Offset:0") == false);
+ void runAssertions() override {
+ assert(LogTestController::getInstance().contains("curl performed"));
+ assert(LogTestController::getInstance().contains("Size:1024 Offset:0"));
+ assert(!LogTestController::getInstance().contains("Size:0 Offset:0"));
}
protected:
@@ -99,17 +76,9 @@
};
int main(int argc, char **argv) {
- std::string key_dir, test_file_location, url;
- if (argc > 1) {
- test_file_location = argv[1];
- key_dir = argv[2];
- }
-
+ const cmd_args args = parse_cmdline_args(argc, argv);
HttpTestHarness harness;
-
- harness.setKeyDir(key_dir);
-
- harness.run(test_file_location);
-
+ harness.setKeyDir(args.key_dir);
+ harness.run(args.test_file);
return 0;
}
diff --git a/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp b/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
index 1e9c59e..4603368 100644
--- a/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
+++ b/extensions/http-curl/tests/TimeoutHTTPSiteToSiteTests.cpp
@@ -20,32 +20,17 @@
#include <sys/stat.h>
#undef NDEBUG
#include <cassert>
-#include <utility>
#include <chrono>
-#include <fstream>
-#include <memory>
#include <string>
-#include <thread>
-#include <type_traits>
#include <vector>
#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "CivetServer.h"
#include "sitetosite/HTTPProtocol.h"
#include "InvokeHTTP.h"
#include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
#include "FlowController.h"
-#include "properties/Configure.h"
#include "io/StreamFactory.h"
#include "RemoteProcessorGroupPort.h"
#include "core/ConfigurableComponent.h"
-#include "TestServer.h"
#include "HTTPIntegrationBase.h"
#include "HTTPHandlers.h"
#include "client/HTTPStream.h"
@@ -96,8 +81,8 @@
if(handler)return handler;
return def;
}
- void set(std::vector<std::chrono::milliseconds> timeout) {
- handler = new TimeoutingHTTPHandler(timeout);
+ void set(std::vector<std::chrono::milliseconds>&& timeout) {
+ handler = new TimeoutingHTTPHandler(std::move(timeout));
}
};
@@ -152,7 +137,7 @@
harness.run(test_file_location);
- assert(LogTestController::getInstance().contains("limit (200ms) reached, terminating connection") == true);
+ assert(LogTestController::getInstance().contains("limit (200ms) reached, terminating connection"));
LogTestController::getInstance().reset();
}
@@ -160,17 +145,8 @@
int main(int argc, char **argv) {
transaction_id = 0;
transaction_id_output = 0;
- std::string key_dir, test_file_location, url;
- if (argc > 1) {
- test_file_location = argv[1];
- key_dir = argv[2];
- url = argv[3];
- }
-
- bool isSecure = false;
- if (url.find("https") != std::string::npos) {
- isSecure = true;
- }
+ const cmd_args args = parse_cmdline_args_with_url(argc, argv);
+ const bool isSecure = args.isUrlSecure();
#ifdef WIN32
if (url.find("localhost") != std::string::npos) {
@@ -185,31 +161,31 @@
{
timeout_test_profile profile;
profile.base_.set({timeout});
- run_timeout_variance(test_file_location, isSecure, url, profile);
+ run_timeout_variance(args.test_file, isSecure, args.url, profile);
}
{
timeout_test_profile profile;
profile.flow_.set({timeout});
- run_timeout_variance(test_file_location, isSecure, url, profile);
+ run_timeout_variance(args.test_file, isSecure, args.url, profile);
}
{
timeout_test_profile profile;
profile.transaction_.set({timeout});
- run_timeout_variance(test_file_location, isSecure, url, profile);
+ run_timeout_variance(args.test_file, isSecure, args.url, profile);
}
{
timeout_test_profile profile;
profile.delete_.set({timeout});
- run_timeout_variance(test_file_location, isSecure, url, profile);
+ run_timeout_variance(args.test_file, isSecure, args.url, profile);
}
{
timeout_test_profile profile;
profile.peer_.set({timeout});
- run_timeout_variance(test_file_location, isSecure, url, profile);
+ run_timeout_variance(args.test_file, isSecure, args.url, profile);
}
return 0;
diff --git a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
index fae79b5..a6c193b 100644
--- a/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
+++ b/extensions/http-curl/tests/VerifyInvokeHTTPTest.cpp
@@ -24,7 +24,6 @@
#include "processors/LogAttribute.h"
#include "core/state/ProcessorController.h"
-#include "../tests/TestServer.h"
#include "CivetServer.h"
#include "HTTPIntegrationBase.h"
@@ -34,14 +33,14 @@
: CoapIntegrationBase(6000) {
}
- virtual void testSetup() override {
+ void testSetup() override {
LogTestController::getInstance().setDebug<utils::HTTPClient>();
LogTestController::getInstance().setDebug<LogTestController>();
LogTestController::getInstance().setTrace<minifi::processors::InvokeHTTP>();
LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>();
}
- virtual void cleanup() override {
+ void cleanup() override {
}
void setProperties(std::shared_ptr<core::Processor> proc) {
@@ -76,7 +75,7 @@
setProperties(processorController->getProcessor());
}
- virtual void run(std::string flow_yml_path) override {
+ void run(std::string flow_yml_path) override {
setupFlow(flow_yml_path);
startFlowController();
@@ -100,7 +99,7 @@
class VerifyInvokeHTTPOKResponse : public VerifyInvokeHTTP {
public:
- virtual void runAssertions() override {
+ void runAssertions() override {
assert(LogTestController::getInstance().contains("key:invokehttp.status.code value:201"));
assert(LogTestController::getInstance().contains("response code 201"));
}
@@ -108,14 +107,14 @@
class VerifyCouldNotConnectInvokeHTTP : public VerifyInvokeHTTP {
public:
- virtual void runAssertions() override {
+ void runAssertions() override {
assert(LogTestController::getInstance().contains("key:invoke_http value:failure"));
}
};
class VerifyNoRetryInvokeHTTP : public VerifyInvokeHTTP {
public:
- virtual void runAssertions() override {
+ void runAssertions() override {
assert(LogTestController::getInstance().contains("key:invokehttp.status.message value:HTTP/1.1 404 Not Found"));
assert(LogTestController::getInstance().contains("isSuccess: 0, response code 404"));
}
@@ -123,7 +122,7 @@
class VerifyRetryInvokeHTTP : public VerifyInvokeHTTP {
public:
- virtual void runAssertions() override {
+ void runAssertions() override {
assert(LogTestController::getInstance().contains("key:invokehttp.status.message value:HTTP/1.1 501 Not Implemented"));
assert(LogTestController::getInstance().contains("isSuccess: 0, response code 501"));
}
@@ -131,7 +130,7 @@
class VerifyRWTimeoutInvokeHTTP : public VerifyInvokeHTTP {
public:
- virtual void runAssertions() override {
+ void runAssertions() override {
assert(LogTestController::getInstance().contains("key:invoke_http value:failure"));
assert(LogTestController::getInstance().contains("limit (1000ms) reached, terminating connection"));
}
@@ -149,24 +148,16 @@
}
int main(int argc, char ** argv) {
- std::string key_dir, test_file_location, url;
- if (argc > 1) {
- test_file_location = argv[1];
- url = "http://localhost:0/minifi";
- if (argc > 2) {
- key_dir = argv[2];
- url = "https://localhost:0/minifi";
- }
- }
+ const cmd_args args = parse_cmdline_args(argc, argv);
// Stop civet server to simulate
// unreachable remote end point
{
InvokeHTTPCouldNotConnectHandler handler;
VerifyCouldNotConnectInvokeHTTP harness;
- harness.setKeyDir(key_dir);
- harness.setUrl(url, &handler);
- harness.setupFlow(test_file_location);
+ harness.setKeyDir(args.key_dir);
+ harness.setUrl(args.url, &handler);
+ harness.setupFlow(args.test_file);
harness.shutdownBeforeFlowController();
harness.startFlowController();
harness.waitToVerifyProcessor();
@@ -176,25 +167,25 @@
{
InvokeHTTPResponseOKHandler handler;
VerifyInvokeHTTPOKResponse harness;
- run(harness, url, test_file_location, key_dir, &handler);
+ run(harness, args.url, args.test_file, args.key_dir, &handler);
}
{
InvokeHTTPResponse404Handler handler;
VerifyNoRetryInvokeHTTP harness;
- run(harness, url, test_file_location, key_dir, &handler);
+ run(harness, args.url, args.test_file, args.key_dir, &handler);
}
{
InvokeHTTPResponse501Handler handler;
VerifyRetryInvokeHTTP harness;
- run(harness, url, test_file_location, key_dir, &handler);
+ run(harness, args.url, args.test_file, args.key_dir, &handler);
}
{
- TimeoutingHTTPHandler handler({std::chrono::milliseconds(4000)});
+ TimeoutingHTTPHandler handler({std::chrono::seconds(4)});
VerifyRWTimeoutInvokeHTTP harness;
- run(harness, url, test_file_location, key_dir, &handler);
+ run(harness, args.url, args.test_file, args.key_dir, &handler);
}
return 0;
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 3310b97..08b36d0 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -71,6 +71,9 @@
protected:
+ virtual void configureC2() {
+ }
+
virtual void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
}
@@ -115,6 +118,9 @@
configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
+ configureC2();
+ configureFullHeartbeat();
+
std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
content_repo->initialize(configuration);
std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
@@ -128,15 +134,9 @@
state_dir = utils::file::FileUtils::create_temp_directory(state_dir_name_template);
core::ProcessContext::getOrCreateDefaultStateManagerProvider(controller_service_provider, configuration, state_dir.c_str());
- std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
- std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
-
+ std::shared_ptr<core::ProcessGroup> pg(yaml_config.getRoot(test_file_location));
queryRootProcessGroup(pg);
- configureFullHeartbeat();
-
- ptr.release();
-
std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
flowController_ = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
@@ -154,4 +154,49 @@
cleanup();
}
+struct cmd_args {
+ bool isUrlSecure() const {
+ // check https prefix
+ return url.rfind("https://", 0) == 0;
+ }
+
+ std::string test_file;
+ std::string key_dir;
+ std::string bad_test_file;
+ std::string url;
+};
+
+cmd_args parse_basic_cmdline_args(int argc, char ** argv) {
+ cmd_args args;
+ if (argc > 1) {
+ args.test_file = argv[1];
+ }
+ if (argc > 2) {
+ args.key_dir = argv[2];
+ }
+ return args;
+}
+
+cmd_args parse_cmdline_args(int argc, char ** argv, const std::string& uri_path = "") {
+ cmd_args args = parse_basic_cmdline_args(argc, argv);
+ if (argc == 2) {
+ args.url = "http://localhost:0/" + uri_path;
+ }
+ if (argc > 2) {
+ args.url = "https://localhost:0/" + uri_path;
+ }
+ if (argc > 3) {
+ args.bad_test_file = argv[3];
+ }
+ return args;
+}
+
+cmd_args parse_cmdline_args_with_url(int argc, char ** argv) {
+ cmd_args args = parse_basic_cmdline_args(argc, argv);
+ if (argc > 3) {
+ args.url = argv[3];
+ }
+ return args;
+}
+
#endif /* LIBMINIFI_TEST_INTEGRATION_INTEGRATIONBASE_H_ */