MINIFICPP-1157 Implement light weight heartbeat.
Remove agent manifest from regular heartbeat messages.
Send agent manifest in response to DESCRIBE manifest request
Signed-off-by: Arpad Boda <aboda@apache.org>
diff --git a/C2.md b/C2.md
index b907926..dc99778 100644
--- a/C2.md
+++ b/C2.md
@@ -49,6 +49,11 @@
files contain the former naming convention of `c2.*`, we will continue to support that as
an alternate key, but you are encouraged to switch your configuration options as soon as possible.
+Note: In release 0.8.0 there is a configuration option to minizime the heartbeat payload size by excluding agent manifest.
+For that, replace "AgentInformation" with "AgentInformationWithoutManifest in nifi.c2.root.classes property value.
+With this change, heartbeat with agent manifest included is sent only for the first time then falls back to sending
+light weight heartbeat. If for some reason the C2 server does not receive the first full heartbeat, the manifest can
+be requested via C2 DESCRIBE manifest command.
#in minifi.properties
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 4f7166a..249ca7a 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -52,6 +52,8 @@
#nifi.c2.rest.url=
#nifi.c2.rest.url.ack=
nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
+## Minimize heartbeat payload size by excluding agent manifest from the heartbeat
+#nifi.c2.root.classes=DeviceInfoNode,AgentInformationWithoutManifest,FlowInformation
## heartbeat 4 times a second
#nifi.c2.agent.heartbeat.period=250
## define parameters about your agent
diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h
index 83935bf..bd7740e 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -41,7 +41,7 @@
void setUrl(std::string url, CivetHandler *handler);
- virtual ~CoapIntegrationBase();
+ virtual ~CoapIntegrationBase() = default;
void shutdownBeforeFlowController() override {
stop_webserver(server);
@@ -68,6 +68,8 @@
queryRootProcessGroup(pg);
+ configureC2RootClasses();
+
ptr.release();
std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
@@ -90,10 +92,6 @@
CivetServer *server;
};
-CoapIntegrationBase::~CoapIntegrationBase() {
-
-}
-
void CoapIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
parse_http_components(url, port, scheme, path);
diff --git a/extensions/http-curl/tests/C2DescribeManifestTest.cpp b/extensions/http-curl/tests/C2DescribeManifestTest.cpp
new file mode 100644
index 0000000..5a84c5c
--- /dev/null
+++ b/extensions/http-curl/tests/C2DescribeManifestTest.cpp
@@ -0,0 +1,91 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "c2/C2Agent.h"
+#include "CivetServer.h"
+#include <cstring>
+#include "protocols/RESTSender.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+
+class DescribeManifestHandler: public HeartbeatHandler {
+public:
+
+ explicit DescribeManifestHandler(bool isSecure)
+ : HeartbeatHandler(isSecure) {
+ }
+
+ virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn) {
+ sendHeartbeatResponse("DESCRIBE", "manifest", "889345", conn);
+ }
+
+ virtual void handleAcknowledge(const rapidjson::Document& root) {
+ verifyJsonHasAgentManifest(root);
+ }
+};
+int main(int argc, char **argv) {
+ std::string key_dir, test_file_location, url;
+ url = "http://localhost:0/api/heartbeat";
+ if (argc > 1) {
+ test_file_location = argv[1];
+ if (argc > 2) {
+ url = "https://localhost:0/api/heartbeat";
+ key_dir = argv[2];
+ }
+ }
+
+ bool isSecure = false;
+ if (url.find("https") != std::string::npos) {
+ isSecure = true;
+ }
+
+ VerifyC2Describe harness(isSecure);
+
+ harness.setKeyDir(key_dir);
+
+ DescribeManifestHandler responder(isSecure);
+
+ harness.setUrl(url, &responder);
+
+ harness.run(test_file_location);
+}
diff --git a/extensions/http-curl/tests/C2JstackTest.cpp b/extensions/http-curl/tests/C2JstackTest.cpp
index dd2ad27..241af1c 100644
--- a/extensions/http-curl/tests/C2JstackTest.cpp
+++ b/extensions/http-curl/tests/C2JstackTest.cpp
@@ -45,123 +45,60 @@
#include "CivetServer.h"
#include <cstring>
#include "protocols/RESTSender.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
-void waitToVerifyProcessor() {
- std::this_thread::sleep_for(std::chrono::seconds(10));
-}
-
-
-class ConfigHandler : public CivetHandler {
+class VerifyC2DescribeJstack : public VerifyC2Describe {
public:
- ConfigHandler() {
- calls_ = 0;
- }
- bool handlePost(CivetServer *server, struct mg_connection *conn) {
- calls_++;
- std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
- "\"operation\" : \"describe\", "
- "\"operationid\" : \"8675309\", "
- "\"name\": \"jstack\""
- "}]}";
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- heartbeat_response.length());
- mg_printf(conn, "%s", heartbeat_response.c_str());
-
-
- return true;
+ explicit VerifyC2DescribeJstack(bool isSecure)
+ : VerifyC2Describe(isSecure) {
}
- bool handleGet(CivetServer *server, struct mg_connection *conn) {
- std::ifstream myfile(test_file_location_.c_str());
-
- if (myfile.is_open()) {
- std::stringstream buffer;
- buffer << myfile.rdbuf();
- std::string str = buffer.str();
- myfile.close();
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- str.length());
- mg_printf(conn, "%s", str.c_str());
- } else {
- mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
- }
-
- return true;
+ virtual void runAssertions() {
+ assert(LogTestController::getInstance().contains("SchedulingAgent") == true);
}
- std::string test_file_location_;
- std::atomic<size_t> calls_;
+};
+
+class DescribeJstackHandler : public HeartbeatHandler {
+ public:
+ explicit DescribeJstackHandler(bool isSecure)
+ : HeartbeatHandler(isSecure) {
+ }
+
+ virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn) {
+ sendHeartbeatResponse("DESCRIBE", "jstack", "889398", conn);
+ }
+
+ virtual void handleAcknowledge(const rapidjson::Document& root) {
+ assert(root.HasMember("SchedulingAgent #0") == true);
+ }
+
};
int main(int argc, char **argv) {
- mg_init_library(0);
- LogTestController::getInstance().setInfo<minifi::FlowController>();
- LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
- LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
- LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
-
- const char *options[] = { "document_root", ".", "listening_ports", "0", 0 };
- std::vector<std::string> cpp_options;
- for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
- cpp_options.push_back(options[i]);
- }
-
- CivetServer server(cpp_options);
-
- std::string port_str = std::to_string(server.getListeningPorts()[0]);
-
- ConfigHandler h_ex;
- server.addHandler("/update", h_ex);
- std::string key_dir, test_file_location;
+ std::string key_dir, test_file_location, url;
+ url = "http://localhost:0/api/heartbeat";
if (argc > 1) {
- h_ex.test_file_location_ = test_file_location = argv[1];
- key_dir = argv[2];
+ test_file_location = argv[1];
+ if (argc > 2) {
+ url = "https://localhost:0/api/heartbeat";
+ key_dir = argv[2];
+ }
}
+ bool isSecure = false;
+ if (url.find("https") != std::string::npos) {
+ isSecure = true;
+ }
- std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+ VerifyC2DescribeJstack harness(isSecure);
- std::string c2_rest_url = "http://localhost:" + port_str + "/update";
+ harness.setKeyDir(key_dir);
- configuration->set("c2.rest.url", c2_rest_url);
- configuration->set("c2.agent.heartbeat.period", "1000");
+ DescribeJstackHandler responder(isSecure);
- std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
- std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
+ harness.setUrl(url, &responder);
- configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
+ harness.run(test_file_location);
- std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
- std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
- std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
- new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
- std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
-
- std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
- true);
-
- core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
-
- std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
- std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
- ptr.release();
- auto start = std::chrono::system_clock::now();
-
- controller->load();
- controller->start();
- waitToVerifyProcessor();
-
- controller->waitUnload(60000);
- auto then = std::chrono::system_clock::now();
-
- auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count();
- std::string logs = LogTestController::getInstance().log_output.str();
- #ifndef WIN32
- assert(logs.find("SchedulingAgent") != std::string::npos);
- #endif
- LogTestController::getInstance().reset();
- assert(h_ex.calls_ <= (milliseconds / 1000) + 1);
-
- return 0;
}
diff --git a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
index 2eaa10b..6e0c1c2 100644
--- a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
+++ b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
@@ -50,98 +50,47 @@
#include "protocols/RESTReceiver.h"
#include "protocols/RESTSender.h"
#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
#include "agent/build_description.h"
#include "processors/LogAttribute.h"
-class Responder : public CivetHandler {
+class LightWeightC2Handler : public HeartbeatHandler {
public:
- explicit Responder(bool isSecure)
- : isSecure(isSecure) {
+ explicit LightWeightC2Handler(bool isSecure)
+ : HeartbeatHandler(isSecure),
+ calls_(0) {
}
- std::string readPost(struct mg_connection *conn) {
- std::string response;
- int blockSize = 1024 * sizeof(char), readBytes;
+ virtual ~LightWeightC2Handler() = default;
- char buffer[1024];
- while ((readBytes = mg_read(conn, buffer, blockSize)) > 0) {
- response.append(buffer, 0, (readBytes / sizeof(char)));
+ virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn) {
+ (void)conn;
+ if (calls_ == 0) {
+ verifyJsonHasAgentManifest(root);
+ } else {
+ assert(root.HasMember("agentInfo") == true);
+ assert(root["agentInfo"].HasMember("agentManifest") == false);
}
- return response;
+ calls_++;
}
- bool handlePost(CivetServer *server, struct mg_connection *conn) {
- auto post_data = readPost(conn);
-
- std::cerr << post_data << std::endl;
-
- if (!IsNullOrEmpty(post_data)) {
- rapidjson::Document root;
- rapidjson::ParseResult ok = root.Parse(post_data.data(), post_data.size());
- bool found = false;
- std::string operation = root["operation"].GetString();
- if (operation == "heartbeat") {
- assert(root.HasMember("agentInfo") == true);
- assert(root["agentInfo"]["agentManifest"].HasMember("bundles") == true);
-
- for (auto &bundle : root["agentInfo"]["agentManifest"]["bundles"].GetArray()) {
- assert(bundle.HasMember("artifact"));
- std::string str = bundle["artifact"].GetString();
- if (str == "minifi-system") {
-
- std::vector<std::string> classes;
- for (auto &proc : bundle["componentManifest"]["processors"].GetArray()) {
- classes.push_back(proc["type"].GetString());
- }
-
- auto group = minifi::BuildDescription::getClassDescriptions(str);
- for (auto proc : group.processors_) {
- assert(std::find(classes.begin(), classes.end(), proc.class_name_) != std::end(classes));
- found = true;
- }
-
- }
- }
- assert(found == true);
- }
- }
- std::string resp = "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"name\" : \"invoke\" }, "
- "{ \"operationid\" : 42, \"operation\" : \"stop\", \"name\" : \"FlowController\" } ]}";
- mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
- "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
- resp.length());
- mg_printf(conn, "%s", resp.c_str());
- return true;
- }
-
- protected:
- bool isSecure;
+ private:
+ std::atomic<size_t> calls_;
};
-class VerifyC2Heartbeat : public CoapIntegrationBase {
+class VerifyC2Heartbeat : public VerifyC2Base {
public:
explicit VerifyC2Heartbeat(bool isSecure)
- : isSecure(isSecure) {
- char format[] = "/tmp/ssth.XXXXXX";
- dir = testController.createTempDirectory(format);
+ : VerifyC2Base(isSecure) {
}
- void testSetup() {
- LogTestController::getInstance().setDebug<utils::HTTPClient>();
+ virtual ~VerifyC2Heartbeat() = default;
+
+ virtual void testSetup() {
LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
- LogTestController::getInstance().setDebug<LogTestController>();
LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
LogTestController::getInstance().setDebug<minifi::c2::RESTProtocol>();
LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
- std::fstream file;
- ss << dir << "/" << "tstFile.ext";
- file.open(ss.str(), std::ios::out);
- file << "tempFile";
- file.close();
- }
-
- void cleanup() {
- LogTestController::getInstance().reset();
- unlink(ss.str().c_str());
+ VerifyC2Base::testSetup();
}
void runAssertions() {
@@ -152,32 +101,22 @@
assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component FlowController") == true);
}
- void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
- std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
- assert(proc != nullptr);
-
- std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
-
- assert(inv != nullptr);
- std::string url = "";
- inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
-
- std::string c2_url = std::string("http") + (isSecure ? "s" : "") + "://localhost:" + getWebPort() + "/api/heartbeat";
-
- configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
- configuration->set("nifi.c2.enable", "true");
- configuration->set("nifi.c2.agent.class", "test");
- configuration->set("nifi.c2.rest.url", c2_url);
- configuration->set("nifi.c2.agent.heartbeat.period", "1000");
- configuration->set("nifi.c2.rest.url.ack", c2_url);
+ void configureC2RootClasses() {
configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformation,FlowInformation");
}
+};
- protected:
- bool isSecure;
- std::string dir;
- std::stringstream ss;
- TestController testController;
+class VerifyLightWeightC2Heartbeat : public VerifyC2Heartbeat {
+public:
+ explicit VerifyLightWeightC2Heartbeat(bool isSecure)
+ : VerifyC2Heartbeat(isSecure) {
+ }
+
+ virtual ~VerifyLightWeightC2Heartbeat() = default;
+
+ void configureC2RootClasses() {
+ configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformationWithoutManifest,FlowInformation");
+ }
};
int main(int argc, char **argv) {
@@ -195,12 +134,23 @@
if (url.find("https") != std::string::npos) {
isSecure = true;
}
+ {
+ VerifyC2Heartbeat harness(isSecure);
- VerifyC2Heartbeat harness(isSecure);
+ harness.setKeyDir(key_dir);
+
+ HeartbeatHandler responder(isSecure);
+
+ harness.setUrl(url, &responder);
+
+ harness.run(test_file_location);
+ }
+
+ VerifyLightWeightC2Heartbeat harness(isSecure);
harness.setKeyDir(key_dir);
- Responder responder(isSecure);
+ LightWeightC2Handler responder(isSecure);
harness.setUrl(url, &responder);
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index ab9176d..ff848b5 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -72,6 +72,7 @@
add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
add_test(NAME C2JstackTest COMMAND C2JstackTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
+add_test(NAME C2DescribeManifestTest COMMAND C2DescribeManifestTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
add_test(NAME C2UpdateAgentTest COMMAND C2UpdateAgentTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/")
add_test(NAME C2FailedUpdateTest COMMAND C2FailedUpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/TestBad.yml" "${TEST_RESOURCES}/")
add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml" "${TEST_RESOURCES}/")
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 282b470..1de39b3 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -20,6 +20,8 @@
#include "concurrentqueue.h"
#include "CivetStream.h"
#include "io/CRCStream.h"
+#include "rapidjson/document.h"
+
#ifndef LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_
#define LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_
static std::atomic<int> transaction_id;
@@ -343,4 +345,104 @@
std::string response_code;
};
+class HeartbeatHandler : public CivetHandler {
+ public:
+ explicit HeartbeatHandler(bool isSecure)
+ : isSecure(isSecure) {
+ }
+
+ std::string readPost(struct mg_connection *conn) {
+ std::string response;
+ int blockSize = 1024 * sizeof(char), readBytes;
+
+ char buffer[1024];
+ while ((readBytes = mg_read(conn, buffer, blockSize)) > 0) {
+ response.append(buffer, 0, (readBytes / sizeof(char)));
+ }
+ return response;
+ }
+
+ void sendStopOperation(struct mg_connection *conn) {
+ std::string resp = "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"name\" : \"invoke\" }, "
+ "{ \"operationid\" : 42, \"operation\" : \"stop\", \"name\" : \"FlowController\" } ]}";
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ resp.length());
+ mg_printf(conn, "%s", resp.c_str());
+ }
+
+ void sendHeartbeatResponse(const std::string& operation, const std::string& operand, const std::string& operationId, struct mg_connection * conn) {
+ std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {"
+ "\"operation\" : \"" + operation + "\","
+ "\"operationid\" : \"" + operationId + "\","
+ "\"operand\": \"" + operand + "\"}]}";
+
+ mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+ "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+ heartbeat_response.length());
+ mg_printf(conn, "%s", heartbeat_response.c_str());
+ }
+
+ void verifyJsonHasAgentManifest(const rapidjson::Document& root) {
+ bool found = false;
+ assert(root.HasMember("agentInfo") == true);
+ assert(root["agentInfo"].HasMember("agentManifest") == true);
+ assert(root["agentInfo"]["agentManifest"].HasMember("bundles") == true);
+
+ for (auto &bundle : root["agentInfo"]["agentManifest"]["bundles"].GetArray()) {
+ assert(bundle.HasMember("artifact"));
+ std::string str = bundle["artifact"].GetString();
+ if (str == "minifi-system") {
+
+ std::vector<std::string> classes;
+ for (auto &proc : bundle["componentManifest"]["processors"].GetArray()) {
+ classes.push_back(proc["type"].GetString());
+ }
+
+ auto group = minifi::BuildDescription::getClassDescriptions(str);
+ for (auto proc : group.processors_) {
+ assert(std::find(classes.begin(), classes.end(), proc.class_name_) != std::end(classes));
+ found = true;
+ }
+
+ }
+ }
+ assert(found == true);
+ }
+
+ virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn) {
+ (void)conn;
+ verifyJsonHasAgentManifest(root);
+ }
+
+ virtual void handleAcknowledge(const rapidjson::Document& root) {
+ }
+
+ void verify(struct mg_connection *conn) {
+ auto post_data = readPost(conn);
+ std::cerr << post_data << std::endl;
+ if (!IsNullOrEmpty(post_data)) {
+ rapidjson::Document root;
+ rapidjson::ParseResult ok = root.Parse(post_data.data(), post_data.size());
+ std::string operation = root["operation"].GetString();
+ if (operation == "heartbeat") {
+ handleHeartbeat(root, conn);
+ } else if (operation == "acknowledge") {
+ handleAcknowledge(root);
+ } else {
+ throw std::runtime_error("operation not supported " + operation);
+ }
+ }
+ }
+
+ bool handlePost(CivetServer *server, struct mg_connection *conn) {
+ verify(conn);
+ sendStopOperation(conn);
+ return true;
+ }
+
+ protected:
+ bool isSecure;
+};
+
#endif /* LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ */
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 4ab623b..4226630 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -21,6 +21,8 @@
#include "../tests/TestServer.h"
#include "CivetServer.h"
#include "integration/IntegrationBase.h"
+#include "c2/C2Agent.h"
+#include "protocols/RESTSender.h"
int log_message(const struct mg_connection *conn, const char *message) {
puts(message);
@@ -41,8 +43,6 @@
void setUrl(std::string url, CivetHandler *handler);
- virtual ~CoapIntegrationBase();
-
void shutdownBeforeFlowController() {
stop_webserver(server);
}
@@ -59,10 +59,6 @@
CivetServer *server;
};
-CoapIntegrationBase::~CoapIntegrationBase() {
-
-}
-
void CoapIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
parse_http_components(url, port, scheme, path);
@@ -91,4 +87,74 @@
}
}
+class VerifyC2Base : public CoapIntegrationBase {
+ public:
+ explicit VerifyC2Base(bool isSecure)
+ : isSecure(isSecure) {
+ char format[] = "/tmp/ssth.XXXXXX";
+ dir = testController.createTempDirectory(format);
+ }
+
+ virtual void testSetup() {
+ LogTestController::getInstance().setDebug<utils::HTTPClient>();
+ LogTestController::getInstance().setDebug<LogTestController>();
+ std::fstream file;
+ ss << dir << "/" << "tstFile.ext";
+ file.open(ss.str(), std::ios::out);
+ file << "tempFile";
+ file.close();
+ }
+
+ void runAssertions() {
+ }
+
+ virtual void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
+ std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
+ assert(proc != nullptr);
+
+ std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
+
+ assert(inv != nullptr);
+ std::string url = "";
+ inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+
+ std::string c2_url = std::string("http") + (isSecure ? "s" : "") + "://localhost:" + getWebPort() + "/api/heartbeat";
+
+ configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
+ configuration->set("nifi.c2.enable", "true");
+ configuration->set("nifi.c2.agent.class", "test");
+ configuration->set("nifi.c2.rest.url", c2_url);
+ configuration->set("nifi.c2.agent.heartbeat.period", "1000");
+ configuration->set("nifi.c2.rest.url.ack", c2_url);
+ }
+
+ void cleanup() {
+ LogTestController::getInstance().reset();
+ unlink(ss.str().c_str());
+ }
+
+ protected:
+ bool isSecure;
+ std::string dir;
+ std::stringstream ss;
+ TestController testController;
+};
+
+class VerifyC2Describe : public VerifyC2Base {
+ public:
+ explicit VerifyC2Describe(bool isSecure)
+ : VerifyC2Base(isSecure) {
+ }
+
+ void testSetup() {
+ LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+ LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+ LogTestController::getInstance().setInfo<minifi::FlowController>();
+ VerifyC2Base::testSetup();
+ }
+
+ void configureC2RootClasses() {
+ configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformationWithoutManifest,FlowInformation");
+ }
+};
#endif /* LIBMINIFI_TEST_INTEGRATION_HTTPINTEGRATIONBASE_H_ */
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 5602452..bef607d 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -303,6 +303,19 @@
*/
virtual int16_t getMetricsNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector, uint16_t metricsClass);
+ /**
+ * Retrieves agent information with manifest only from this source.
+ * @param manifest_vector -- manifest nodes vector.
+ * @return 0 on Success, -1 on failure
+ */
+ virtual int16_t getManifestNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>& manifest_vector) const;
+
+ /**
+ * Returns a response node containing all agent information with manifest and agent status
+ * @return a shared pointer to agent information
+ */
+ virtual std::shared_ptr<state::response::ResponseNode> getAgentInformation() const;
+
virtual uint64_t getUptime();
virtual std::vector<BackTrace> getTraces();
@@ -384,9 +397,10 @@
std::chrono::steady_clock::time_point start_time_;
- std::mutex metrics_mutex_;
+ mutable std::mutex metrics_mutex_;
// root_nodes cache
std::map<std::string, std::shared_ptr<state::response::ResponseNode>> root_response_nodes_;
+
// metrics cache
std::map<std::string, std::shared_ptr<state::response::ResponseNode>> device_information_;
@@ -394,6 +408,10 @@
std::map<std::string, std::shared_ptr<state::response::ResponseNode>> component_metrics_;
std::map<uint8_t, std::vector<std::shared_ptr<state::response::ResponseNode>>> component_metrics_by_id_;
+
+ // manifest cache
+ std::map<std::string, std::shared_ptr<state::response::ResponseNode>> agent_information_;
+
// metrics last run
std::chrono::steady_clock::time_point last_metrics_capture_;
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index f84c131..c39589f 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -171,6 +171,11 @@
*/
bool update_property(const std::string &property_name, const std::string &property_value, bool persist = false);
+ /**
+ * Creates configuration options C2 payload for response
+ */
+ C2Payload prepareConfigurationOptions(const C2ContentResponse &resp) const;
+
std::timed_mutex metrics_mutex_;
std::map<std::string, std::shared_ptr<state::response::ResponseNode>> metrics_map_;
@@ -244,6 +249,8 @@
std::string bin_location_;
std::shared_ptr<logging::Logger> logger_;
+
+ bool manifest_sent_;
};
} /* namesapce c2 */
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h
index abf0c6f..5809a4c 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -68,7 +68,7 @@
class ComponentManifest : public DeviceInformation {
public:
- ComponentManifest(std::string name, utils::Identifier & uuid)
+ ComponentManifest(const std::string& name, utils::Identifier & uuid)
: DeviceInformation(name, uuid) {
}
@@ -92,7 +92,7 @@
}
protected:
- void serializeClassDescription(const std::vector<ClassDescription> &descriptions, const std::string name, SerializedResponseNode &response) {
+ void serializeClassDescription(const std::vector<ClassDescription> &descriptions, const std::string name, SerializedResponseNode &response) const {
if (!descriptions.empty()) {
SerializedResponseNode type;
type.name = name;
@@ -307,7 +307,7 @@
class ExternalManifest : public ComponentManifest {
public:
- ExternalManifest(std::string name, utils::Identifier & uuid)
+ ExternalManifest(const std::string& name, utils::Identifier & uuid)
: ComponentManifest(name, uuid) {
}
@@ -329,7 +329,7 @@
class Bundles : public DeviceInformation {
public:
- Bundles(std::string name, utils::Identifier & uuid)
+ Bundles(const std::string& name, utils::Identifier & uuid)
: DeviceInformation(name, uuid) {
setArray(true);
}
@@ -452,8 +452,17 @@
queuesize.name = "size";
queuesize.value = repo.second->getRepoSize();
- repoNode.children.push_back(queuesize);
+ SerializedResponseNode isRunning;
+ isRunning.name = "running";
+ isRunning.value = repo.second->isRunning();
+ SerializedResponseNode isFull;
+ isFull.name = "full";
+ isFull.value = repo.second->isFull();
+
+ repoNode.children.push_back(queuesize);
+ repoNode.children.push_back(isRunning);
+ repoNode.children.push_back(isFull);
repositories.children.push_back(repoNode);
}
@@ -540,14 +549,12 @@
class AgentManifest : public DeviceInformation {
public:
- AgentManifest(std::string name, utils::Identifier & uuid)
+ AgentManifest(const std::string& name, utils::Identifier & uuid)
: DeviceInformation(name, uuid) {
- //setArray(true);
}
AgentManifest(const std::string &name)
: DeviceInformation(name) {
- // setArray(true);
}
std::string getName() const {
@@ -622,25 +629,21 @@
}
};
-/**
- * Purpose and Justification: Prints classes along with their properties for the current agent.
- */
-class AgentInformation : public DeviceInformation, public AgentMonitor, public AgentIdentifier {
- public:
- AgentInformation(std::string name, utils::Identifier & uuid)
+class AgentNode : public DeviceInformation, public AgentMonitor, public AgentIdentifier {
+public:
+
+ AgentNode(const std::string& name, utils::Identifier & uuid)
: DeviceInformation(name, uuid) {
setArray(false);
}
- AgentInformation(const std::string &name)
+ explicit AgentNode(const std::string& name)
: DeviceInformation(name) {
setArray(false);
}
- std::string getName() const {
- return "agentInfo";
- }
+protected:
std::vector<SerializedResponseNode> serialize() {
std::vector<SerializedResponseNode> serialized;
@@ -654,6 +657,14 @@
agentClass.name = "agentClass";
agentClass.value = agent_class_;
+ serialized.push_back(ident);
+ serialized.push_back(agentClass);
+
+ return serialized;
+ }
+
+ std::vector<SerializedResponseNode> getAgentManifest() const {
+ std::vector<SerializedResponseNode> serialized;
AgentManifest manifest("manifest");
SerializedResponseNode agentManifest;
@@ -661,6 +672,34 @@
for (auto &ser : manifest.serialize()) {
agentManifest.children.push_back(std::move(ser));
}
+ serialized.push_back(agentManifest);
+ return serialized;
+ }
+};
+
+/**
+ * This class is used for regular heartbeat without manifest
+ * A light weight heartbeat
+ */
+class AgentInformationWithoutManifest : public AgentNode {
+public:
+
+ AgentInformationWithoutManifest(const std::string& name, utils::Identifier & uuid)
+ : AgentNode(name, uuid) {
+ setArray(false);
+ }
+
+ explicit AgentInformationWithoutManifest(const std::string &name)
+ : AgentNode(name) {
+ setArray(false);
+ }
+
+ std::string getName() const {
+ return "agentInfo";
+ }
+
+ std::vector<SerializedResponseNode> serialize() {
+ std::vector<SerializedResponseNode> serialized(AgentNode::serialize());
AgentStatus status("status");
status.setRepositories(repositories_);
@@ -672,16 +711,72 @@
agentStatus.children.push_back(std::move(ser));
}
- serialized.push_back(ident);
- serialized.push_back(agentClass);
- serialized.push_back(agentManifest);
serialized.push_back(agentStatus);
return serialized;
}
+};
+
+
+/**
+ * This class is used for sending all agent information including manifest and status
+ * A heavy weight heartbeat. Here to maintain backward compatibility
+ */
+class AgentInformation : public AgentInformationWithoutManifest {
+ public:
+
+ AgentInformation(const std::string& name, utils::Identifier & uuid)
+ : AgentInformationWithoutManifest(name, uuid) {
+ setArray(false);
+ }
+
+ explicit AgentInformation(const std::string &name)
+ : AgentInformationWithoutManifest(name) {
+ setArray(false);
+ }
+
+ std::string getName() const {
+ return "agentInfo";
+ }
+
+ std::vector<SerializedResponseNode> serialize() {
+ std::vector<SerializedResponseNode> serialized(AgentInformationWithoutManifest::serialize());
+ auto manifest = getAgentManifest();
+ serialized.insert(serialized.end(), std::make_move_iterator(manifest.begin()), std::make_move_iterator(manifest.end()));
+ return serialized;
+ }
};
+/**
+ * This class is used for response to DESCRIBE manifest request
+ * It contains static information only
+ */
+class AgentInformationWithManifest : public AgentNode {
+public:
+ AgentInformationWithManifest(const std::string& name, utils::Identifier & uuid)
+ : AgentNode(name, uuid) {
+ setArray(false);
+ }
+
+ explicit AgentInformationWithManifest(const std::string &name)
+ : AgentNode(name) {
+ setArray(false);
+ }
+
+ std::string getName() const {
+ return "agentInfo";
+ }
+
+ std::vector<SerializedResponseNode> serialize() {
+ std::vector<SerializedResponseNode> serialized(AgentNode::serialize());
+ auto manifest = getAgentManifest();
+ serialized.insert(serialized.end(), std::make_move_iterator(manifest.begin()), std::make_move_iterator(manifest.end()));
+ return serialized;
+ }
+};
+
REGISTER_RESOURCE(AgentInformation, "Node part of an AST that defines all agent information, to include the manifest, and bundle information as part of a healthy hearbeat.");
+REGISTER_RESOURCE(AgentInformationWithoutManifest, "Node part of an AST that defines all agent information, without the manifest and bundle information as part of a healthy hearbeat.");
} /* namespace metrics */
} /* namespace state */
diff --git a/libminifi/include/core/state/nodes/MetricsBase.h b/libminifi/include/core/state/nodes/MetricsBase.h
index 40a0c0c..d91df6e 100644
--- a/libminifi/include/core/state/nodes/MetricsBase.h
+++ b/libminifi/include/core/state/nodes/MetricsBase.h
@@ -43,12 +43,12 @@
is_array_(false) {
}
- ResponseNode(std::string name)
+ ResponseNode(const std::string& name)
: core::Connectable(name),
is_array_(false) {
}
- ResponseNode(std::string name, utils::Identifier & uuid)
+ ResponseNode(const std::string& name, utils::Identifier & uuid)
: core::Connectable(name, uuid),
is_array_(false) {
}
@@ -90,10 +90,10 @@
*/
class DeviceInformation : public ResponseNode {
public:
- DeviceInformation(std::string name, utils::Identifier & uuid)
+ DeviceInformation(const std::string& name, utils::Identifier & uuid)
: ResponseNode(name, uuid) {
}
- DeviceInformation(std::string name)
+ DeviceInformation(const std::string& name)
: ResponseNode(name) {
}
};
@@ -228,6 +228,18 @@
*/
virtual int16_t getMetricsNodes(std::vector<std::shared_ptr<ResponseNode>> &metric_vector, uint16_t metricsClass) = 0;
+ /**
+ * Retrieves agent information with manifest only from this source.
+ * @param manifest_vector -- manifest nodes vector.
+ * @return 0 on Success, -1 on failure
+ */
+ virtual int16_t getManifestNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>& manifest_vector) const = 0;
+
+ /**
+ * Returns a response node containing all agent information with manifest and agent status
+ * @return a shared pointer to agent information
+ */
+ virtual std::shared_ptr<state::response::ResponseNode> getAgentInformation() const = 0;
};
/**
diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h
index 2a855ba..648d86f 100644
--- a/libminifi/include/properties/Properties.h
+++ b/libminifi/include/properties/Properties.h
@@ -96,7 +96,7 @@
minifi_home_ = minifiHome;
}
- std::vector<std::string> getConfiguredKeys() {
+ std::vector<std::string> getConfiguredKeys() const {
std::vector<std::string> keys;
for (auto &property : properties_) {
keys.push_back(property.first);
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 31160c8..1a251b5 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -466,6 +466,7 @@
device_information_.clear();
component_metrics_.clear();
component_metrics_by_id_.clear();
+ agent_information_.clear();
std::string class_csv;
if (root_ != nullptr) {
@@ -484,6 +485,15 @@
repoMetrics->addRepository(flow_file_repo_);
device_information_[repoMetrics->getName()] = repoMetrics;
+
+ std::shared_ptr<state::response::AgentInformationWithManifest> manifest = std::make_shared<state::response::AgentInformationWithManifest>("agentInformation");
+ auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(manifest);
+
+ if (identifier != nullptr) {
+ identifier->setIdentifier(identifier_str);
+ identifier->setAgentClass(class_str);
+ agent_information_[manifest->getName()] = manifest;
+ }
}
if (configuration_->get("nifi.c2.root.classes", class_csv)) {
@@ -932,6 +942,34 @@
return 0;
}
+int16_t FlowController::getManifestNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>& manifest_vector) const {
+ std::lock_guard<std::mutex> lock(metrics_mutex_);
+ for (const auto& metric : agent_information_) {
+ manifest_vector.push_back(metric.second);
+ }
+ return 0;
+}
+
+std::shared_ptr<state::response::ResponseNode> FlowController::getAgentInformation() const {
+ auto agentInfo = std::make_shared<state::response::AgentInformation>("agentInfo");
+ auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(agentInfo);
+
+ if (identifier != nullptr) {
+ std::string class_str;
+ configuration_->get("nifi.c2.agent.class", "c2.agent.class", class_str);
+
+ std::string identifier_str;
+ if (!configuration_->get("nifi.c2.agent.identifier", "c2.agent.identifier", identifier_str) || identifier_str.empty()) {
+ identifier_str = uuidStr_;
+ }
+
+ identifier->setIdentifier(identifier_str);
+ identifier->setAgentClass(class_str);
+ return agentInfo;
+ }
+ return nullptr;
+}
+
std::vector<std::shared_ptr<state::StateController>> FlowController::getAllComponents() {
std::vector<std::shared_ptr<state::StateController>> vec;
vec.push_back(shared_from_this());
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 7c4b6c2..77d3f7a 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -50,6 +50,8 @@
logger_(logging::LoggerFactory<C2Agent>::getLogger()) {
allow_updates_ = true;
+ manifest_sent_ = false;
+
running_c2_configuration = std::make_shared<Configure>();
last_run_ = std::chrono::steady_clock::now();
@@ -299,32 +301,34 @@
payload.addPayload(std::move(metrics));
}
- if (device_information_.size() > 0) {
- C2Payload deviceInfo(Operation::HEARTBEAT);
- deviceInfo.setLabel("AgentInformation");
+ for (auto metric : root_response_nodes_) {
+ C2Payload child_metric_payload(Operation::HEARTBEAT);
+ bool isArray{false};
+ std::string metricName;
+ std::vector<state::response::SerializedResponseNode> metrics;
+ std::shared_ptr<state::response::NodeReporter> reporter;
+ std::shared_ptr<state::response::ResponseNode> agentInfo;
- for (auto metric : device_information_) {
- C2Payload child_metric_payload(Operation::HEARTBEAT);
- child_metric_payload.setLabel(metric.first);
- if (metric.second->isArray()) {
- child_metric_payload.setContainer(true);
- }
- serializeMetrics(child_metric_payload, metric.first, metric.second->serialize(), metric.second->isArray());
- deviceInfo.addPayload(std::move(child_metric_payload));
+ // Send agent manifest in first heartbeat
+ if (!manifest_sent_
+ && (reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_))
+ && (agentInfo = reporter->getAgentInformation())
+ && metric.first == agentInfo->getName()) {
+ metricName = agentInfo->getName();
+ isArray = agentInfo->isArray();
+ metrics = agentInfo->serialize();
+ manifest_sent_ = true;
+ } else {
+ metricName = metric.first;
+ isArray = metric.second->isArray();
+ metrics = metric.second->serialize();
}
- payload.addPayload(std::move(deviceInfo));
- }
-
- if (!root_response_nodes_.empty()) {
- for (auto metric : root_response_nodes_) {
- C2Payload child_metric_payload(Operation::HEARTBEAT);
- child_metric_payload.setLabel(metric.first);
- if (metric.second->isArray()) {
- child_metric_payload.setContainer(true);
- }
- serializeMetrics(child_metric_payload, metric.first, metric.second->serialize(), metric.second->isArray());
- payload.addPayload(std::move(child_metric_payload));
+ child_metric_payload.setLabel(metricName);
+ if (isArray) {
+ child_metric_payload.setContainer(true);
}
+ serializeMetrics(child_metric_payload, metricName, metrics, isArray);
+ payload.addPayload(std::move(child_metric_payload));
}
C2Payload && response = protocol_.load()->consumePayload(payload);
@@ -485,6 +489,28 @@
}
}
+C2Payload C2Agent::prepareConfigurationOptions(const C2ContentResponse &resp) const {
+ auto unsanitized_keys = configuration_->getConfiguredKeys();
+ std::vector<std::string> keys;
+ std::copy_if(unsanitized_keys.begin(), unsanitized_keys.end(), std::back_inserter(keys),
+ [](std::string key) {return key.find("pass") == std::string::npos;});
+
+ C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+ C2Payload options(Operation::ACKNOWLEDGE);
+ options.setLabel("configuration_options");
+ std::string value;
+ for (auto key : keys) {
+ C2ContentResponse option(Operation::ACKNOWLEDGE);
+ option.name = key;
+ if (configuration_->get(key, value)) {
+ option.operation_arguments[key] = value;
+ options.addContent(std::move(option));
+ }
+ }
+ response.addPayload(std::move(options));
+ return response;
+}
+
/**
* Descriptions are special types of requests that require information
* to be put into the acknowledgement
@@ -506,67 +532,36 @@
}
std::vector<std::shared_ptr<state::response::ResponseNode>> metrics_vec;
-
- reporter->getResponseNodes(metrics_vec, metric_class_id);
C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
- response.setLabel("metrics");
+ C2Payload metrics(Operation::ACKNOWLEDGE);
+ metrics.setLabel("metrics");
+ reporter->getResponseNodes(metrics_vec, 0);
for (auto metric : metrics_vec) {
- serializeMetrics(response, metric->getName(), metric->serialize());
+ serializeMetrics(metrics, metric->getName(), metric->serialize());
}
+ response.addPayload(std::move(metrics));
enqueue_c2_response(std::move(response));
}
} else if (resp.name == "configuration") {
- auto unsanitized_keys = configuration_->getConfiguredKeys();
- std::vector<std::string> keys;
- std::copy_if(unsanitized_keys.begin(), unsanitized_keys.end(), std::back_inserter(keys), [](std::string key) {return key.find("pass") == std::string::npos;});
- C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
- response.setLabel("configuration_options");
- C2Payload options(Operation::ACKNOWLEDGE, resp.ident, false, true);
- options.setLabel("configuration_options");
- std::string value;
- for (auto key : keys) {
- C2ContentResponse option(Operation::ACKNOWLEDGE);
- option.name = key;
- if (configuration_->get(key, value)) {
- option.operation_arguments[key] = value;
- options.addContent(std::move(option));
- }
- }
- response.addPayload(std::move(options));
- enqueue_c2_response(std::move(response));
+ auto configOptions = prepareConfigurationOptions(resp);
+ enqueue_c2_response(std::move(configOptions));
return;
} else if (resp.name == "manifest") {
- auto keys = configuration_->getConfiguredKeys();
- C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
- response.setLabel("configuration_options");
- C2Payload options(Operation::ACKNOWLEDGE, resp.ident, false, true);
- options.setLabel("configuration_options");
- std::string value;
- for (auto key : keys) {
- C2ContentResponse option(Operation::ACKNOWLEDGE);
- option.name = key;
- if (configuration_->get(key, value)) {
- option.operation_arguments[key] = value;
- options.addContent(std::move(option));
- }
- }
- response.addPayload(std::move(options));
+ C2Payload response(prepareConfigurationOptions(resp));
- if (device_information_.size() > 0) {
- C2Payload deviceInfo(Operation::HEARTBEAT);
- deviceInfo.setLabel("AgentInformation");
+ auto reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_);
+ if (reporter != nullptr) {
+ std::vector<std::shared_ptr<state::response::ResponseNode>> metrics_vec;
- for (auto metric : device_information_) {
- C2Payload child_metric_payload(Operation::HEARTBEAT);
- child_metric_payload.setLabel(metric.first);
- if (metric.second->isArray()) {
- child_metric_payload.setContainer(true);
- }
- serializeMetrics(child_metric_payload, metric.first, metric.second->serialize(), metric.second->isArray());
- deviceInfo.addPayload(std::move(child_metric_payload));
+ C2Payload agentInfo(Operation::ACKNOWLEDGE, resp.ident, false, true);
+ agentInfo.setLabel("agentInfo");
+
+ reporter->getManifestNodes(metrics_vec);
+ for (const auto& metric : metrics_vec) {
+ serializeMetrics(agentInfo, metric->getName(), metric->serialize());
}
- response.addPayload(std::move(deviceInfo));
+ response.addPayload(std::move(agentInfo));
}
enqueue_c2_response(std::move(response));
@@ -581,7 +576,6 @@
}
auto keys = configuration_->getConfiguredKeys();
C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
- response.setLabel("configuration_options");
for (const auto &trace : traces) {
C2Payload options(Operation::ACKNOWLEDGE, resp.ident, false, true);
options.setLabel(trace.getName());
@@ -596,6 +590,7 @@
}
enqueue_c2_response(std::move(response));
}
+ return;
}
C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
enqueue_c2_response(std::move(response));
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 0cc4cc6..30f771a 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -35,7 +35,7 @@
public:
IntegrationBase(uint64_t waitTime = DEFAULT_WAITTIME_MSECS);
- virtual ~IntegrationBase();
+ virtual ~IntegrationBase() = default;
virtual void run(std::string test_file_location);
@@ -75,6 +75,10 @@
}
+ virtual void configureC2RootClasses() {
+
+ }
+
virtual void updateProperties(std::shared_ptr<minifi::FlowController> fc) {
}
@@ -92,9 +96,6 @@
wait_time_(waitTime) {
}
-IntegrationBase::~IntegrationBase() {
-}
-
void IntegrationBase::configureSecurity() {
if (!key_dir.empty()) {
configuration->set(minifi::Configure::nifi_security_client_certificate, key_dir + "cn.crt.pem");
@@ -126,6 +127,8 @@
queryRootProcessGroup(pg);
+ configureC2RootClasses();
+
ptr.release();
std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);