MINIFICPP-1157 Implement light weight heartbeat.
  Remove agent manifest from regular heartbeat messages.
  Send agent manifest in response to DESCRIBE manifest request

Signed-off-by: Arpad Boda <aboda@apache.org>
diff --git a/C2.md b/C2.md
index b907926..dc99778 100644
--- a/C2.md
+++ b/C2.md
@@ -49,6 +49,11 @@
 files contain the former naming convention of `c2.*`, we will continue to support that as
 an alternate key, but you are encouraged to switch your configuration options as soon as possible.
 
+Note: In release 0.8.0 there is a configuration option to minizime the heartbeat payload size by excluding agent manifest.
+For that, replace "AgentInformation" with "AgentInformationWithoutManifest in nifi.c2.root.classes property value.
+With this change, heartbeat with agent manifest included is sent only for the first time then falls back to sending
+light weight heartbeat. If for some reason the C2 server does not receive the first full heartbeat, the manifest can
+be requested via C2 DESCRIBE manifest command.
 
 	#in minifi.properties
 
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 4f7166a..249ca7a 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -52,6 +52,8 @@
 #nifi.c2.rest.url=
 #nifi.c2.rest.url.ack=
 nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
+## Minimize heartbeat payload size by excluding agent manifest from the heartbeat
+#nifi.c2.root.classes=DeviceInfoNode,AgentInformationWithoutManifest,FlowInformation
 ## heartbeat 4 times a second
 #nifi.c2.agent.heartbeat.period=250
 ## define parameters about your agent 
diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h
index 83935bf..bd7740e 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -41,7 +41,7 @@
 
   void setUrl(std::string url, CivetHandler *handler);
 
-  virtual ~CoapIntegrationBase();
+  virtual ~CoapIntegrationBase() = default;
 
   void shutdownBeforeFlowController() override {
     stop_webserver(server);
@@ -68,6 +68,8 @@
 
     queryRootProcessGroup(pg);
 
+    configureC2RootClasses();
+
     ptr.release();
 
     std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
@@ -90,10 +92,6 @@
   CivetServer *server;
 };
 
-CoapIntegrationBase::~CoapIntegrationBase() {
-
-}
-
 void CoapIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
 
   parse_http_components(url, port, scheme, path);
diff --git a/extensions/http-curl/tests/C2DescribeManifestTest.cpp b/extensions/http-curl/tests/C2DescribeManifestTest.cpp
new file mode 100644
index 0000000..5a84c5c
--- /dev/null
+++ b/extensions/http-curl/tests/C2DescribeManifestTest.cpp
@@ -0,0 +1,91 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <sys/stat.h>
+#undef NDEBUG
+#include <cassert>
+#include <utility>
+#include <chrono>
+#include <fstream>
+#include <memory>
+#include <string>
+#include <thread>
+#include <type_traits>
+#include <vector>
+#include <iostream>
+#include <sstream>
+#include "HTTPClient.h"
+#include "InvokeHTTP.h"
+#include "TestBase.h"
+#include "utils/StringUtils.h"
+#include "core/Core.h"
+#include "core/logging/Logger.h"
+#include "core/ProcessGroup.h"
+#include "core/yaml/YamlConfiguration.h"
+#include "FlowController.h"
+#include "properties/Configure.h"
+#include "unit/ProvenanceTestHelper.h"
+#include "io/StreamFactory.h"
+#include "c2/C2Agent.h"
+#include "CivetServer.h"
+#include <cstring>
+#include "protocols/RESTSender.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
+
+class DescribeManifestHandler: public HeartbeatHandler {
+public:
+
+  explicit DescribeManifestHandler(bool isSecure)
+      : HeartbeatHandler(isSecure) {
+  }
+
+  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn) {
+    sendHeartbeatResponse("DESCRIBE", "manifest", "889345", conn);
+  }
+
+  virtual void handleAcknowledge(const rapidjson::Document& root) {
+    verifyJsonHasAgentManifest(root);
+  }
+};
+int main(int argc, char **argv) {
+  std::string key_dir, test_file_location, url;
+  url = "http://localhost:0/api/heartbeat";
+  if (argc > 1) {
+    test_file_location = argv[1];
+    if (argc > 2) {
+      url = "https://localhost:0/api/heartbeat";
+      key_dir = argv[2];
+    }
+  }
+
+  bool isSecure = false;
+  if (url.find("https") != std::string::npos) {
+    isSecure = true;
+  }
+
+  VerifyC2Describe harness(isSecure);
+
+  harness.setKeyDir(key_dir);
+
+  DescribeManifestHandler responder(isSecure);
+
+  harness.setUrl(url, &responder);
+
+  harness.run(test_file_location);
+}
diff --git a/extensions/http-curl/tests/C2JstackTest.cpp b/extensions/http-curl/tests/C2JstackTest.cpp
index dd2ad27..241af1c 100644
--- a/extensions/http-curl/tests/C2JstackTest.cpp
+++ b/extensions/http-curl/tests/C2JstackTest.cpp
@@ -45,123 +45,60 @@
 #include "CivetServer.h"
 #include <cstring>
 #include "protocols/RESTSender.h"
+#include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
 
-void waitToVerifyProcessor() {
-  std::this_thread::sleep_for(std::chrono::seconds(10));
-}
-
-
-class ConfigHandler : public CivetHandler {
+class VerifyC2DescribeJstack : public VerifyC2Describe {
  public:
-  ConfigHandler() {
-    calls_ = 0;
-  }
-  bool handlePost(CivetServer *server, struct mg_connection *conn) {
-    calls_++;
-    std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [  {"
-          "\"operation\" : \"describe\", "
-          "\"operationid\" : \"8675309\", "
-          "\"name\": \"jstack\""
-          "}]}";
-      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-                "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
-                heartbeat_response.length());
-      mg_printf(conn, "%s", heartbeat_response.c_str());
-
-
-    return true;
+  explicit VerifyC2DescribeJstack(bool isSecure)
+      : VerifyC2Describe(isSecure) {
   }
 
-  bool handleGet(CivetServer *server, struct mg_connection *conn) {
-    std::ifstream myfile(test_file_location_.c_str());
-
-    if (myfile.is_open()) {
-      std::stringstream buffer;
-      buffer << myfile.rdbuf();
-      std::string str = buffer.str();
-      myfile.close();
-      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-                "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
-                str.length());
-      mg_printf(conn, "%s", str.c_str());
-    } else {
-      mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
-    }
-
-    return true;
+  virtual void runAssertions() {
+    assert(LogTestController::getInstance().contains("SchedulingAgent") == true);
   }
-  std::string test_file_location_;
-  std::atomic<size_t> calls_;
+};
+
+class DescribeJstackHandler : public HeartbeatHandler {
+ public:
+  explicit DescribeJstackHandler(bool isSecure)
+     : HeartbeatHandler(isSecure) {
+  }
+
+  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn) {
+    sendHeartbeatResponse("DESCRIBE", "jstack", "889398", conn);
+  }
+
+  virtual void handleAcknowledge(const rapidjson::Document& root) {
+    assert(root.HasMember("SchedulingAgent #0") == true);
+  }
+
 };
 
 int main(int argc, char **argv) {
-  mg_init_library(0);
-  LogTestController::getInstance().setInfo<minifi::FlowController>();
-  LogTestController::getInstance().setDebug<minifi::utils::HTTPClient>();
-  LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
-  LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
-
-  const char *options[] = { "document_root", ".", "listening_ports", "0", 0 };
-  std::vector<std::string> cpp_options;
-  for (int i = 0; i < (sizeof(options) / sizeof(options[0]) - 1); i++) {
-    cpp_options.push_back(options[i]);
-  }
-
-  CivetServer server(cpp_options);
-
-  std::string port_str = std::to_string(server.getListeningPorts()[0]);
-
-  ConfigHandler h_ex;
-  server.addHandler("/update", h_ex);
-  std::string key_dir, test_file_location;
+  std::string key_dir, test_file_location, url;
+  url = "http://localhost:0/api/heartbeat";
   if (argc > 1) {
-    h_ex.test_file_location_ = test_file_location = argv[1];
-    key_dir = argv[2];
+    test_file_location = argv[1];
+    if (argc > 2) {
+      url = "https://localhost:0/api/heartbeat";
+      key_dir = argv[2];
+    }
   }
 
+  bool isSecure = false;
+  if (url.find("https") != std::string::npos) {
+    isSecure = true;
+  }
 
-  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+  VerifyC2DescribeJstack harness(isSecure);
 
-  std::string c2_rest_url = "http://localhost:" + port_str + "/update";
+  harness.setKeyDir(key_dir);
 
-  configuration->set("c2.rest.url", c2_rest_url);
-  configuration->set("c2.agent.heartbeat.period", "1000");
+  DescribeJstackHandler responder(isSecure);
 
-  std::shared_ptr<core::Repository> test_repo = std::make_shared<TestRepository>();
-  std::shared_ptr<core::Repository> test_flow_repo = std::make_shared<TestFlowRepository>();
+  harness.setUrl(url, &responder);
 
-  configuration->set(minifi::Configure::nifi_flow_configuration_file, test_file_location);
+  harness.run(test_file_location);
 
-  std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
-  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
-  std::unique_ptr<core::FlowConfiguration> yaml_ptr = std::unique_ptr<core::YamlConfiguration>(
-      new core::YamlConfiguration(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location));
-  std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
-
-  std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(yaml_ptr), content_repo, DEFAULT_ROOT_GROUP_NAME,
-  true);
-
-  core::YamlConfiguration yaml_config(test_repo, test_repo, content_repo, stream_factory, configuration, test_file_location);
-
-  std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
-  std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
-  ptr.release();
-  auto start = std::chrono::system_clock::now();
-
-  controller->load();
-  controller->start();
-  waitToVerifyProcessor();
-
-  controller->waitUnload(60000);
-  auto then = std::chrono::system_clock::now();
-
-  auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count();
-  std::string logs = LogTestController::getInstance().log_output.str();
-  #ifndef WIN32
-  assert(logs.find("SchedulingAgent") != std::string::npos);
-  #endif
-  LogTestController::getInstance().reset();
-  assert(h_ex.calls_ <= (milliseconds / 1000) + 1);
-
-  return 0;
 }
diff --git a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
index 2eaa10b..6e0c1c2 100644
--- a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
+++ b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
@@ -50,98 +50,47 @@
 #include "protocols/RESTReceiver.h"
 #include "protocols/RESTSender.h"
 #include "HTTPIntegrationBase.h"
+#include "HTTPHandlers.h"
 #include "agent/build_description.h"
 #include "processors/LogAttribute.h"
 
-class Responder : public CivetHandler {
+class LightWeightC2Handler : public HeartbeatHandler {
  public:
-  explicit Responder(bool isSecure)
-      : isSecure(isSecure) {
+  explicit LightWeightC2Handler(bool isSecure)
+      : HeartbeatHandler(isSecure),
+        calls_(0) {
   }
 
-  std::string readPost(struct mg_connection *conn) {
-    std::string response;
-    int blockSize = 1024 * sizeof(char), readBytes;
+  virtual ~LightWeightC2Handler() = default;
 
-    char buffer[1024];
-    while ((readBytes = mg_read(conn, buffer, blockSize)) > 0) {
-      response.append(buffer, 0, (readBytes / sizeof(char)));
+  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn)  {
+    (void)conn;
+    if (calls_ == 0) {
+      verifyJsonHasAgentManifest(root);
+    } else {
+      assert(root.HasMember("agentInfo") == true);
+      assert(root["agentInfo"].HasMember("agentManifest") == false);
     }
-    return response;
+    calls_++;
   }
-  bool handlePost(CivetServer *server, struct mg_connection *conn) {
-    auto post_data = readPost(conn);
-
-    std::cerr << post_data << std::endl;
-
-    if (!IsNullOrEmpty(post_data)) {
-      rapidjson::Document root;
-      rapidjson::ParseResult ok = root.Parse(post_data.data(), post_data.size());
-      bool found = false;
-      std::string operation = root["operation"].GetString();
-      if (operation == "heartbeat") {
-        assert(root.HasMember("agentInfo") == true);
-        assert(root["agentInfo"]["agentManifest"].HasMember("bundles") == true);
-
-        for (auto &bundle : root["agentInfo"]["agentManifest"]["bundles"].GetArray()) {
-          assert(bundle.HasMember("artifact"));
-          std::string str = bundle["artifact"].GetString();
-          if (str == "minifi-system") {
-
-            std::vector<std::string> classes;
-            for (auto &proc : bundle["componentManifest"]["processors"].GetArray()) {
-              classes.push_back(proc["type"].GetString());
-            }
-
-            auto group = minifi::BuildDescription::getClassDescriptions(str);
-            for (auto proc : group.processors_) {
-              assert(std::find(classes.begin(), classes.end(), proc.class_name_) != std::end(classes));
-              found = true;
-            }
-
-          }
-        }
-        assert(found == true);
-      }
-    }
-    std::string resp = "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"name\" : \"invoke\"  }, "
-        "{ \"operationid\" : 42, \"operation\" : \"stop\", \"name\" : \"FlowController\"  } ]}";
-    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
-              "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
-              resp.length());
-    mg_printf(conn, "%s", resp.c_str());
-    return true;
-  }
-
- protected:
-  bool isSecure;
+ private:
+  std::atomic<size_t> calls_;
 };
 
-class VerifyC2Heartbeat : public CoapIntegrationBase {
+class VerifyC2Heartbeat : public VerifyC2Base {
  public:
   explicit VerifyC2Heartbeat(bool isSecure)
-      : isSecure(isSecure) {
-    char format[] = "/tmp/ssth.XXXXXX";
-    dir = testController.createTempDirectory(format);
+      : VerifyC2Base(isSecure) {
   }
 
-  void testSetup() {
-    LogTestController::getInstance().setDebug<utils::HTTPClient>();
+  virtual ~VerifyC2Heartbeat() = default;
+
+  virtual void testSetup() {
     LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
-    LogTestController::getInstance().setDebug<LogTestController>();
     LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
     LogTestController::getInstance().setDebug<minifi::c2::RESTProtocol>();
     LogTestController::getInstance().setDebug<minifi::c2::RESTReceiver>();
-    std::fstream file;
-    ss << dir << "/" << "tstFile.ext";
-    file.open(ss.str(), std::ios::out);
-    file << "tempFile";
-    file.close();
-  }
-
-  void cleanup() {
-    LogTestController::getInstance().reset();
-    unlink(ss.str().c_str());
+    VerifyC2Base::testSetup();
   }
 
   void runAssertions() {
@@ -152,32 +101,22 @@
     assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component FlowController") == true);
   }
 
-  void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
-    std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
-    assert(proc != nullptr);
-
-    std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
-
-    assert(inv != nullptr);
-    std::string url = "";
-    inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
-
-    std::string c2_url = std::string("http") + (isSecure ? "s" : "") + "://localhost:" + getWebPort() + "/api/heartbeat";
-
-    configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
-    configuration->set("nifi.c2.enable", "true");
-    configuration->set("nifi.c2.agent.class", "test");
-    configuration->set("nifi.c2.rest.url", c2_url);
-    configuration->set("nifi.c2.agent.heartbeat.period", "1000");
-    configuration->set("nifi.c2.rest.url.ack", c2_url);
+  void configureC2RootClasses() {
     configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformation,FlowInformation");
   }
+};
 
- protected:
-  bool isSecure;
-  std::string dir;
-  std::stringstream ss;
-  TestController testController;
+class VerifyLightWeightC2Heartbeat : public VerifyC2Heartbeat {
+public:
+  explicit VerifyLightWeightC2Heartbeat(bool isSecure)
+      : VerifyC2Heartbeat(isSecure) {
+  }
+
+  virtual ~VerifyLightWeightC2Heartbeat() = default;
+
+  void configureC2RootClasses() {
+    configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformationWithoutManifest,FlowInformation");
+  }
 };
 
 int main(int argc, char **argv) {
@@ -195,12 +134,23 @@
   if (url.find("https") != std::string::npos) {
     isSecure = true;
   }
+  {
+    VerifyC2Heartbeat harness(isSecure);
 
-  VerifyC2Heartbeat harness(isSecure);
+    harness.setKeyDir(key_dir);
+
+    HeartbeatHandler responder(isSecure);
+
+    harness.setUrl(url, &responder);
+
+    harness.run(test_file_location);
+  }
+
+  VerifyLightWeightC2Heartbeat harness(isSecure);
 
   harness.setKeyDir(key_dir);
 
-  Responder responder(isSecure);
+  LightWeightC2Handler responder(isSecure);
 
   harness.setUrl(url, &responder);
 
diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt
index ab9176d..ff848b5 100644
--- a/extensions/http-curl/tests/CMakeLists.txt
+++ b/extensions/http-curl/tests/CMakeLists.txt
@@ -72,6 +72,7 @@
 add_test(NAME HttpGetIntegrationTest COMMAND HttpGetIntegrationTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2UpdateTest COMMAND C2UpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2JstackTest COMMAND C2JstackTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
+add_test(NAME C2DescribeManifestTest COMMAND C2DescribeManifestTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2UpdateAgentTest COMMAND C2UpdateAgentTest "${TEST_RESOURCES}/TestHTTPGet.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2FailedUpdateTest COMMAND C2FailedUpdateTest "${TEST_RESOURCES}/TestHTTPGet.yml" "${TEST_RESOURCES}/TestBad.yml"  "${TEST_RESOURCES}/")
 add_test(NAME C2NullConfiguration COMMAND C2NullConfiguration "${TEST_RESOURCES}/TestNull.yml"  "${TEST_RESOURCES}/")
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 282b470..1de39b3 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -20,6 +20,8 @@
 #include "concurrentqueue.h"
 #include "CivetStream.h"
 #include "io/CRCStream.h"
+#include "rapidjson/document.h"
+
 #ifndef LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_
 #define LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_
 static std::atomic<int> transaction_id;
@@ -343,4 +345,104 @@
   std::string response_code;
 };
 
+class HeartbeatHandler : public CivetHandler {
+ public:
+  explicit HeartbeatHandler(bool isSecure)
+      : isSecure(isSecure) {
+  }
+
+  std::string readPost(struct mg_connection *conn) {
+    std::string response;
+    int blockSize = 1024 * sizeof(char), readBytes;
+
+    char buffer[1024];
+    while ((readBytes = mg_read(conn, buffer, blockSize)) > 0) {
+      response.append(buffer, 0, (readBytes / sizeof(char)));
+    }
+    return response;
+  }
+
+  void sendStopOperation(struct mg_connection *conn) {
+    std::string resp = "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"name\" : \"invoke\"  }, "
+        "{ \"operationid\" : 42, \"operation\" : \"stop\", \"name\" : \"FlowController\"  } ]}";
+    mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+              "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+              resp.length());
+    mg_printf(conn, "%s", resp.c_str());
+  }
+
+  void sendHeartbeatResponse(const std::string& operation, const std::string& operand, const std::string& operationId, struct mg_connection * conn) {
+    std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [  {"
+          "\"operation\" : \"" + operation + "\","
+          "\"operationid\" : \"" + operationId + "\","
+          "\"operand\": \"" + operand + "\"}]}";
+
+      mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
+                "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
+                heartbeat_response.length());
+      mg_printf(conn, "%s", heartbeat_response.c_str());
+  }
+
+  void verifyJsonHasAgentManifest(const rapidjson::Document& root) {
+    bool found = false;
+    assert(root.HasMember("agentInfo") == true);
+    assert(root["agentInfo"].HasMember("agentManifest") == true);
+    assert(root["agentInfo"]["agentManifest"].HasMember("bundles") == true);
+
+    for (auto &bundle : root["agentInfo"]["agentManifest"]["bundles"].GetArray()) {
+      assert(bundle.HasMember("artifact"));
+      std::string str = bundle["artifact"].GetString();
+      if (str == "minifi-system") {
+
+        std::vector<std::string> classes;
+        for (auto &proc : bundle["componentManifest"]["processors"].GetArray()) {
+          classes.push_back(proc["type"].GetString());
+        }
+
+        auto group = minifi::BuildDescription::getClassDescriptions(str);
+        for (auto proc : group.processors_) {
+          assert(std::find(classes.begin(), classes.end(), proc.class_name_) != std::end(classes));
+          found = true;
+        }
+
+      }
+    }
+    assert(found == true);
+  }
+
+  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn) {
+    (void)conn;
+    verifyJsonHasAgentManifest(root);
+  }
+
+  virtual void handleAcknowledge(const rapidjson::Document& root) {
+  }
+
+  void verify(struct mg_connection *conn) {
+    auto post_data = readPost(conn);
+    std::cerr << post_data << std::endl;
+    if (!IsNullOrEmpty(post_data)) {
+      rapidjson::Document root;
+      rapidjson::ParseResult ok = root.Parse(post_data.data(), post_data.size());
+      std::string operation = root["operation"].GetString();
+      if (operation == "heartbeat") {
+        handleHeartbeat(root, conn);
+      } else if (operation == "acknowledge") {
+        handleAcknowledge(root);
+      } else {
+        throw std::runtime_error("operation not supported " + operation);
+      }
+    }
+  }
+
+  bool handlePost(CivetServer *server, struct mg_connection *conn) {
+    verify(conn);
+    sendStopOperation(conn);
+    return true;
+  }
+
+ protected:
+  bool isSecure;
+};
+
 #endif /* LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ */
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 4ab623b..4226630 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -21,6 +21,8 @@
 #include "../tests/TestServer.h"
 #include "CivetServer.h"
 #include "integration/IntegrationBase.h"
+#include "c2/C2Agent.h"
+#include "protocols/RESTSender.h"
 
 int log_message(const struct mg_connection *conn, const char *message) {
   puts(message);
@@ -41,8 +43,6 @@
 
   void setUrl(std::string url, CivetHandler *handler);
 
-  virtual ~CoapIntegrationBase();
-
   void shutdownBeforeFlowController() {
     stop_webserver(server);
   }
@@ -59,10 +59,6 @@
   CivetServer *server;
 };
 
-CoapIntegrationBase::~CoapIntegrationBase() {
-
-}
-
 void CoapIntegrationBase::setUrl(std::string url, CivetHandler *handler) {
 
   parse_http_components(url, port, scheme, path);
@@ -91,4 +87,74 @@
   }
 }
 
+class VerifyC2Base : public CoapIntegrationBase {
+ public:
+  explicit VerifyC2Base(bool isSecure)
+      : isSecure(isSecure) {
+    char format[] = "/tmp/ssth.XXXXXX";
+    dir = testController.createTempDirectory(format);
+  }
+
+  virtual void testSetup() {
+    LogTestController::getInstance().setDebug<utils::HTTPClient>();
+    LogTestController::getInstance().setDebug<LogTestController>();
+    std::fstream file;
+    ss << dir << "/" << "tstFile.ext";
+    file.open(ss.str(), std::ios::out);
+    file << "tempFile";
+    file.close();
+  }
+
+  void runAssertions() {
+  }
+
+  virtual void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
+    std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
+    assert(proc != nullptr);
+
+    std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
+
+    assert(inv != nullptr);
+    std::string url = "";
+    inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
+
+    std::string c2_url = std::string("http") + (isSecure ? "s" : "") + "://localhost:" + getWebPort() + "/api/heartbeat";
+
+    configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
+    configuration->set("nifi.c2.enable", "true");
+    configuration->set("nifi.c2.agent.class", "test");
+    configuration->set("nifi.c2.rest.url", c2_url);
+    configuration->set("nifi.c2.agent.heartbeat.period", "1000");
+    configuration->set("nifi.c2.rest.url.ack", c2_url);
+  }
+
+  void cleanup() {
+    LogTestController::getInstance().reset();
+    unlink(ss.str().c_str());
+  }
+
+ protected:
+  bool isSecure;
+  std::string dir;
+  std::stringstream ss;
+  TestController testController;
+};
+
+class VerifyC2Describe : public VerifyC2Base {
+ public:
+  explicit VerifyC2Describe(bool isSecure)
+      : VerifyC2Base(isSecure) {
+  }
+
+  void testSetup() {
+    LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
+    LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
+    LogTestController::getInstance().setInfo<minifi::FlowController>();
+    VerifyC2Base::testSetup();
+  }
+
+  void configureC2RootClasses() {
+    configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformationWithoutManifest,FlowInformation");
+  }
+};
 #endif /* LIBMINIFI_TEST_INTEGRATION_HTTPINTEGRATIONBASE_H_ */
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index 5602452..bef607d 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -303,6 +303,19 @@
    */
   virtual int16_t getMetricsNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector, uint16_t metricsClass);
 
+  /**
+   * Retrieves agent information with manifest only from this source.
+   * @param manifest_vector -- manifest nodes vector.
+   * @return 0 on Success, -1 on failure
+   */
+  virtual int16_t getManifestNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>& manifest_vector) const;
+
+  /**
+   * Returns a response node containing all agent information with manifest and agent status
+   * @return a shared pointer to agent information
+   */
+  virtual std::shared_ptr<state::response::ResponseNode> getAgentInformation() const;
+
   virtual uint64_t getUptime();
 
   virtual std::vector<BackTrace> getTraces();
@@ -384,9 +397,10 @@
 
   std::chrono::steady_clock::time_point start_time_;
 
-  std::mutex metrics_mutex_;
+  mutable std::mutex metrics_mutex_;
   // root_nodes cache
   std::map<std::string, std::shared_ptr<state::response::ResponseNode>> root_response_nodes_;
+
   // metrics cache
   std::map<std::string, std::shared_ptr<state::response::ResponseNode>> device_information_;
 
@@ -394,6 +408,10 @@
   std::map<std::string, std::shared_ptr<state::response::ResponseNode>> component_metrics_;
 
   std::map<uint8_t, std::vector<std::shared_ptr<state::response::ResponseNode>>> component_metrics_by_id_;
+
+  // manifest cache
+  std::map<std::string, std::shared_ptr<state::response::ResponseNode>> agent_information_;
+
   // metrics last run
   std::chrono::steady_clock::time_point last_metrics_capture_;
 
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index f84c131..c39589f 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -171,6 +171,11 @@
    */
   bool update_property(const std::string &property_name, const std::string &property_value,  bool persist = false);
 
+  /**
+   * Creates configuration options C2 payload for response
+   */
+  C2Payload prepareConfigurationOptions(const C2ContentResponse &resp) const;
+
   std::timed_mutex metrics_mutex_;
   std::map<std::string, std::shared_ptr<state::response::ResponseNode>> metrics_map_;
 
@@ -244,6 +249,8 @@
   std::string bin_location_;
 
   std::shared_ptr<logging::Logger> logger_;
+
+  bool manifest_sent_;
 };
 
 } /* namesapce c2 */
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h
index abf0c6f..5809a4c 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -68,7 +68,7 @@
 
 class ComponentManifest : public DeviceInformation {
  public:
-  ComponentManifest(std::string name, utils::Identifier & uuid)
+  ComponentManifest(const std::string& name, utils::Identifier & uuid)
       : DeviceInformation(name, uuid) {
   }
 
@@ -92,7 +92,7 @@
   }
  protected:
 
-  void serializeClassDescription(const std::vector<ClassDescription> &descriptions, const std::string name, SerializedResponseNode &response) {
+  void serializeClassDescription(const std::vector<ClassDescription> &descriptions, const std::string name, SerializedResponseNode &response) const {
     if (!descriptions.empty()) {
       SerializedResponseNode type;
       type.name = name;
@@ -307,7 +307,7 @@
 
 class ExternalManifest : public ComponentManifest {
  public:
-  ExternalManifest(std::string name, utils::Identifier & uuid)
+  ExternalManifest(const std::string& name, utils::Identifier & uuid)
       : ComponentManifest(name, uuid) {
   }
 
@@ -329,7 +329,7 @@
 
 class Bundles : public DeviceInformation {
  public:
-  Bundles(std::string name, utils::Identifier & uuid)
+  Bundles(const std::string& name, utils::Identifier & uuid)
       : DeviceInformation(name, uuid) {
     setArray(true);
   }
@@ -452,8 +452,17 @@
         queuesize.name = "size";
         queuesize.value = repo.second->getRepoSize();
 
-        repoNode.children.push_back(queuesize);
+        SerializedResponseNode isRunning;
+        isRunning.name = "running";
+        isRunning.value = repo.second->isRunning();
 
+        SerializedResponseNode isFull;
+        isFull.name = "full";
+        isFull.value = repo.second->isFull();
+
+        repoNode.children.push_back(queuesize);
+        repoNode.children.push_back(isRunning);
+        repoNode.children.push_back(isFull);
         repositories.children.push_back(repoNode);
 
       }
@@ -540,14 +549,12 @@
 class AgentManifest : public DeviceInformation {
  public:
 
-  AgentManifest(std::string name, utils::Identifier & uuid)
+  AgentManifest(const std::string& name, utils::Identifier & uuid)
       : DeviceInformation(name, uuid) {
-    //setArray(true);
   }
 
   AgentManifest(const std::string &name)
       : DeviceInformation(name) {
-    //  setArray(true);
   }
 
   std::string getName() const {
@@ -622,25 +629,21 @@
   }
 };
 
-/**
- * Purpose and Justification: Prints classes along with their properties for the current agent.
- */
-class AgentInformation : public DeviceInformation, public AgentMonitor, public AgentIdentifier {
- public:
 
-  AgentInformation(std::string name, utils::Identifier & uuid)
+class AgentNode : public DeviceInformation, public AgentMonitor, public AgentIdentifier {
+public:
+
+  AgentNode(const std::string& name, utils::Identifier & uuid)
       : DeviceInformation(name, uuid) {
     setArray(false);
   }
 
-  AgentInformation(const std::string &name)
+  explicit AgentNode(const std::string& name)
       : DeviceInformation(name) {
     setArray(false);
   }
 
-  std::string getName() const {
-    return "agentInfo";
-  }
+protected:
 
   std::vector<SerializedResponseNode> serialize() {
     std::vector<SerializedResponseNode> serialized;
@@ -654,6 +657,14 @@
     agentClass.name = "agentClass";
     agentClass.value = agent_class_;
 
+    serialized.push_back(ident);
+    serialized.push_back(agentClass);
+
+    return serialized;
+  }
+
+  std::vector<SerializedResponseNode> getAgentManifest() const {
+    std::vector<SerializedResponseNode> serialized;
     AgentManifest manifest("manifest");
 
     SerializedResponseNode agentManifest;
@@ -661,6 +672,34 @@
     for (auto &ser : manifest.serialize()) {
       agentManifest.children.push_back(std::move(ser));
     }
+    serialized.push_back(agentManifest);
+    return serialized;
+  }
+};
+
+/**
+ * This class is used for regular heartbeat without manifest
+ * A light weight heartbeat
+ */
+class AgentInformationWithoutManifest : public AgentNode {
+public:
+
+  AgentInformationWithoutManifest(const std::string& name, utils::Identifier & uuid)
+      : AgentNode(name, uuid) {
+    setArray(false);
+  }
+
+  explicit AgentInformationWithoutManifest(const std::string &name)
+      : AgentNode(name) {
+    setArray(false);
+  }
+
+  std::string getName() const {
+    return "agentInfo";
+  }
+
+  std::vector<SerializedResponseNode> serialize() {
+    std::vector<SerializedResponseNode> serialized(AgentNode::serialize());
 
     AgentStatus status("status");
     status.setRepositories(repositories_);
@@ -672,16 +711,72 @@
       agentStatus.children.push_back(std::move(ser));
     }
 
-    serialized.push_back(ident);
-    serialized.push_back(agentClass);
-    serialized.push_back(agentManifest);
     serialized.push_back(agentStatus);
     return serialized;
   }
+};
+
+
+/**
+ * This class is used for sending all agent information including manifest and status
+ * A heavy weight heartbeat. Here to maintain backward compatibility
+ */
+class AgentInformation : public AgentInformationWithoutManifest {
+ public:
+
+  AgentInformation(const std::string& name, utils::Identifier & uuid)
+      : AgentInformationWithoutManifest(name, uuid) {
+    setArray(false);
+  }
+
+  explicit AgentInformation(const std::string &name)
+      : AgentInformationWithoutManifest(name) {
+    setArray(false);
+  }
+
+  std::string getName() const {
+    return "agentInfo";
+  }
+
+  std::vector<SerializedResponseNode> serialize() {
+    std::vector<SerializedResponseNode> serialized(AgentInformationWithoutManifest::serialize());
+    auto manifest = getAgentManifest();
+    serialized.insert(serialized.end(), std::make_move_iterator(manifest.begin()), std::make_move_iterator(manifest.end()));
+    return serialized;
+  }
 
 };
 
+/**
+ * This class is used for response to DESCRIBE manifest request
+ * It contains static information only
+ */
+class AgentInformationWithManifest : public AgentNode {
+public:
+  AgentInformationWithManifest(const std::string& name, utils::Identifier & uuid)
+      : AgentNode(name, uuid) {
+    setArray(false);
+  }
+
+  explicit AgentInformationWithManifest(const std::string &name)
+      : AgentNode(name) {
+    setArray(false);
+  }
+
+  std::string getName() const {
+    return "agentInfo";
+  }
+
+  std::vector<SerializedResponseNode> serialize() {
+    std::vector<SerializedResponseNode> serialized(AgentNode::serialize());
+    auto manifest = getAgentManifest();
+    serialized.insert(serialized.end(), std::make_move_iterator(manifest.begin()), std::make_move_iterator(manifest.end()));
+    return serialized;
+  }
+};
+
 REGISTER_RESOURCE(AgentInformation, "Node part of an AST that defines all agent information, to include the manifest, and bundle information as part of a healthy hearbeat.");
+REGISTER_RESOURCE(AgentInformationWithoutManifest, "Node part of an AST that defines all agent information, without the manifest and bundle information as part of a healthy hearbeat.");
 
 } /* namespace metrics */
 } /* namespace state */
diff --git a/libminifi/include/core/state/nodes/MetricsBase.h b/libminifi/include/core/state/nodes/MetricsBase.h
index 40a0c0c..d91df6e 100644
--- a/libminifi/include/core/state/nodes/MetricsBase.h
+++ b/libminifi/include/core/state/nodes/MetricsBase.h
@@ -43,12 +43,12 @@
         is_array_(false) {
   }
 
-  ResponseNode(std::string name)
+  ResponseNode(const std::string& name)
       : core::Connectable(name),
         is_array_(false) {
   }
 
-  ResponseNode(std::string name, utils::Identifier & uuid)
+  ResponseNode(const std::string& name, utils::Identifier & uuid)
       : core::Connectable(name, uuid),
         is_array_(false) {
   }
@@ -90,10 +90,10 @@
  */
 class DeviceInformation : public ResponseNode {
  public:
-  DeviceInformation(std::string name, utils::Identifier & uuid)
+  DeviceInformation(const std::string& name, utils::Identifier & uuid)
       : ResponseNode(name, uuid) {
   }
-  DeviceInformation(std::string name)
+  DeviceInformation(const std::string& name)
       : ResponseNode(name) {
   }
 };
@@ -228,6 +228,18 @@
    */
   virtual int16_t getMetricsNodes(std::vector<std::shared_ptr<ResponseNode>> &metric_vector, uint16_t metricsClass) = 0;
 
+  /**
+   * Retrieves agent information with manifest only from this source.
+   * @param manifest_vector -- manifest nodes vector.
+   * @return 0 on Success, -1 on failure
+   */
+  virtual int16_t getManifestNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>& manifest_vector) const = 0;
+
+  /**
+   * Returns a response node containing all agent information with manifest and agent status
+   * @return a shared pointer to agent information
+   */
+  virtual std::shared_ptr<state::response::ResponseNode> getAgentInformation() const = 0;
 };
 
 /**
diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h
index 2a855ba..648d86f 100644
--- a/libminifi/include/properties/Properties.h
+++ b/libminifi/include/properties/Properties.h
@@ -96,7 +96,7 @@
     minifi_home_ = minifiHome;
   }
 
-  std::vector<std::string> getConfiguredKeys() {
+  std::vector<std::string> getConfiguredKeys() const {
     std::vector<std::string> keys;
     for (auto &property : properties_) {
       keys.push_back(property.first);
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 31160c8..1a251b5 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -466,6 +466,7 @@
   device_information_.clear();
   component_metrics_.clear();
   component_metrics_by_id_.clear();
+  agent_information_.clear();
   std::string class_csv;
 
   if (root_ != nullptr) {
@@ -484,6 +485,15 @@
     repoMetrics->addRepository(flow_file_repo_);
 
     device_information_[repoMetrics->getName()] = repoMetrics;
+
+    std::shared_ptr<state::response::AgentInformationWithManifest> manifest = std::make_shared<state::response::AgentInformationWithManifest>("agentInformation");
+    auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(manifest);
+
+    if (identifier != nullptr) {
+      identifier->setIdentifier(identifier_str);
+      identifier->setAgentClass(class_str);
+      agent_information_[manifest->getName()] = manifest;
+    }
   }
 
   if (configuration_->get("nifi.c2.root.classes", class_csv)) {
@@ -932,6 +942,34 @@
   return 0;
 }
 
+int16_t FlowController::getManifestNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>& manifest_vector) const {
+    std::lock_guard<std::mutex> lock(metrics_mutex_);
+    for (const auto& metric : agent_information_) {
+        manifest_vector.push_back(metric.second);
+    }
+    return 0;
+}
+
+std::shared_ptr<state::response::ResponseNode> FlowController::getAgentInformation() const {
+    auto agentInfo = std::make_shared<state::response::AgentInformation>("agentInfo");
+    auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(agentInfo);
+
+    if (identifier != nullptr) {
+      std::string class_str;
+      configuration_->get("nifi.c2.agent.class", "c2.agent.class", class_str);
+
+      std::string identifier_str;
+      if (!configuration_->get("nifi.c2.agent.identifier", "c2.agent.identifier", identifier_str) || identifier_str.empty()) {
+        identifier_str = uuidStr_;
+      }
+
+      identifier->setIdentifier(identifier_str);
+      identifier->setAgentClass(class_str);
+      return agentInfo;
+    }
+    return nullptr;
+}
+
 std::vector<std::shared_ptr<state::StateController>> FlowController::getAllComponents() {
   std::vector<std::shared_ptr<state::StateController>> vec;
   vec.push_back(shared_from_this());
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 7c4b6c2..77d3f7a 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -50,6 +50,8 @@
       logger_(logging::LoggerFactory<C2Agent>::getLogger()) {
   allow_updates_ = true;
 
+  manifest_sent_ = false;
+
   running_c2_configuration = std::make_shared<Configure>();
 
   last_run_ = std::chrono::steady_clock::now();
@@ -299,32 +301,34 @@
     payload.addPayload(std::move(metrics));
   }
 
-  if (device_information_.size() > 0) {
-    C2Payload deviceInfo(Operation::HEARTBEAT);
-    deviceInfo.setLabel("AgentInformation");
+  for (auto metric : root_response_nodes_) {
+    C2Payload child_metric_payload(Operation::HEARTBEAT);
+    bool isArray{false};
+    std::string metricName;
+    std::vector<state::response::SerializedResponseNode> metrics;
+    std::shared_ptr<state::response::NodeReporter> reporter;
+    std::shared_ptr<state::response::ResponseNode> agentInfo;
 
-    for (auto metric : device_information_) {
-      C2Payload child_metric_payload(Operation::HEARTBEAT);
-      child_metric_payload.setLabel(metric.first);
-      if (metric.second->isArray()) {
-        child_metric_payload.setContainer(true);
-      }
-      serializeMetrics(child_metric_payload, metric.first, metric.second->serialize(), metric.second->isArray());
-      deviceInfo.addPayload(std::move(child_metric_payload));
+    // Send agent manifest in first heartbeat
+    if (!manifest_sent_
+        && (reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_))
+        && (agentInfo = reporter->getAgentInformation())
+        && metric.first == agentInfo->getName()) {
+      metricName = agentInfo->getName();
+      isArray = agentInfo->isArray();
+      metrics = agentInfo->serialize();
+      manifest_sent_ = true;
+    } else {
+      metricName = metric.first;
+      isArray = metric.second->isArray();
+      metrics = metric.second->serialize();
     }
-    payload.addPayload(std::move(deviceInfo));
-  }
-
-  if (!root_response_nodes_.empty()) {
-    for (auto metric : root_response_nodes_) {
-      C2Payload child_metric_payload(Operation::HEARTBEAT);
-      child_metric_payload.setLabel(metric.first);
-      if (metric.second->isArray()) {
-        child_metric_payload.setContainer(true);
-      }
-      serializeMetrics(child_metric_payload, metric.first, metric.second->serialize(), metric.second->isArray());
-      payload.addPayload(std::move(child_metric_payload));
+    child_metric_payload.setLabel(metricName);
+    if (isArray) {
+      child_metric_payload.setContainer(true);
     }
+    serializeMetrics(child_metric_payload, metricName, metrics, isArray);
+    payload.addPayload(std::move(child_metric_payload));
   }
   C2Payload && response = protocol_.load()->consumePayload(payload);
 
@@ -485,6 +489,28 @@
   }
 }
 
+C2Payload C2Agent::prepareConfigurationOptions(const C2ContentResponse &resp) const {
+    auto unsanitized_keys = configuration_->getConfiguredKeys();
+    std::vector<std::string> keys;
+    std::copy_if(unsanitized_keys.begin(), unsanitized_keys.end(), std::back_inserter(keys),
+            [](std::string key) {return key.find("pass") == std::string::npos;});
+
+    C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+    C2Payload options(Operation::ACKNOWLEDGE);
+    options.setLabel("configuration_options");
+    std::string value;
+    for (auto key : keys) {
+      C2ContentResponse option(Operation::ACKNOWLEDGE);
+      option.name = key;
+      if (configuration_->get(key, value)) {
+        option.operation_arguments[key] = value;
+        options.addContent(std::move(option));
+      }
+    }
+    response.addPayload(std::move(options));
+    return response;
+}
+
 /**
  * Descriptions are special types of requests that require information
  * to be put into the acknowledgement
@@ -506,67 +532,36 @@
       }
 
       std::vector<std::shared_ptr<state::response::ResponseNode>> metrics_vec;
-
-      reporter->getResponseNodes(metrics_vec, metric_class_id);
       C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
-      response.setLabel("metrics");
+      C2Payload metrics(Operation::ACKNOWLEDGE);
+      metrics.setLabel("metrics");
+      reporter->getResponseNodes(metrics_vec, 0);
       for (auto metric : metrics_vec) {
-        serializeMetrics(response, metric->getName(), metric->serialize());
+        serializeMetrics(metrics, metric->getName(), metric->serialize());
       }
+      response.addPayload(std::move(metrics));
       enqueue_c2_response(std::move(response));
     }
 
   } else if (resp.name == "configuration") {
-    auto unsanitized_keys = configuration_->getConfiguredKeys();
-    std::vector<std::string> keys;
-    std::copy_if(unsanitized_keys.begin(), unsanitized_keys.end(), std::back_inserter(keys), [](std::string key) {return key.find("pass") == std::string::npos;});
-    C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
-    response.setLabel("configuration_options");
-    C2Payload options(Operation::ACKNOWLEDGE, resp.ident, false, true);
-    options.setLabel("configuration_options");
-    std::string value;
-    for (auto key : keys) {
-      C2ContentResponse option(Operation::ACKNOWLEDGE);
-      option.name = key;
-      if (configuration_->get(key, value)) {
-        option.operation_arguments[key] = value;
-        options.addContent(std::move(option));
-      }
-    }
-    response.addPayload(std::move(options));
-    enqueue_c2_response(std::move(response));
+    auto configOptions = prepareConfigurationOptions(resp);
+    enqueue_c2_response(std::move(configOptions));
     return;
   } else if (resp.name == "manifest") {
-    auto keys = configuration_->getConfiguredKeys();
-    C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
-    response.setLabel("configuration_options");
-    C2Payload options(Operation::ACKNOWLEDGE, resp.ident, false, true);
-    options.setLabel("configuration_options");
-    std::string value;
-    for (auto key : keys) {
-      C2ContentResponse option(Operation::ACKNOWLEDGE);
-      option.name = key;
-      if (configuration_->get(key, value)) {
-        option.operation_arguments[key] = value;
-        options.addContent(std::move(option));
-      }
-    }
-    response.addPayload(std::move(options));
+    C2Payload response(prepareConfigurationOptions(resp));
 
-    if (device_information_.size() > 0) {
-      C2Payload deviceInfo(Operation::HEARTBEAT);
-      deviceInfo.setLabel("AgentInformation");
+    auto reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_);
+    if (reporter != nullptr) {
+      std::vector<std::shared_ptr<state::response::ResponseNode>> metrics_vec;
 
-      for (auto metric : device_information_) {
-        C2Payload child_metric_payload(Operation::HEARTBEAT);
-        child_metric_payload.setLabel(metric.first);
-        if (metric.second->isArray()) {
-          child_metric_payload.setContainer(true);
-        }
-        serializeMetrics(child_metric_payload, metric.first, metric.second->serialize(), metric.second->isArray());
-        deviceInfo.addPayload(std::move(child_metric_payload));
+      C2Payload agentInfo(Operation::ACKNOWLEDGE, resp.ident, false, true);
+      agentInfo.setLabel("agentInfo");
+
+      reporter->getManifestNodes(metrics_vec);
+      for (const auto& metric : metrics_vec) {
+          serializeMetrics(agentInfo, metric->getName(), metric->serialize());
       }
-      response.addPayload(std::move(deviceInfo));
+      response.addPayload(std::move(agentInfo));
     }
 
     enqueue_c2_response(std::move(response));
@@ -581,7 +576,6 @@
       }
       auto keys = configuration_->getConfiguredKeys();
       C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
-      response.setLabel("configuration_options");
       for (const auto &trace : traces) {
         C2Payload options(Operation::ACKNOWLEDGE, resp.ident, false, true);
         options.setLabel(trace.getName());
@@ -596,6 +590,7 @@
       }
       enqueue_c2_response(std::move(response));
     }
+    return;
   }
   C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
   enqueue_c2_response(std::move(response));
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 0cc4cc6..30f771a 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -35,7 +35,7 @@
  public:
   IntegrationBase(uint64_t waitTime = DEFAULT_WAITTIME_MSECS);
 
-  virtual ~IntegrationBase();
+  virtual ~IntegrationBase() = default;
 
   virtual void run(std::string test_file_location);
 
@@ -75,6 +75,10 @@
 
   }
 
+  virtual void configureC2RootClasses() {
+
+  }
+
   virtual void updateProperties(std::shared_ptr<minifi::FlowController> fc) {
 
   }
@@ -92,9 +96,6 @@
       wait_time_(waitTime) {
 }
 
-IntegrationBase::~IntegrationBase() {
-}
-
 void IntegrationBase::configureSecurity() {
   if (!key_dir.empty()) {
     configuration->set(minifi::Configure::nifi_security_client_certificate, key_dir + "cn.crt.pem");
@@ -126,6 +127,8 @@
 
   queryRootProcessGroup(pg);
 
+  configureC2RootClasses();
+
   ptr.release();
 
   std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);