MINIFICPP-1169 Simplify C2 metrics collection and reporting.
  Introduce nifi.c2.full.heartbeat configuration parameter
  Remove AgentInformationWithoutManifest, AgentInformationWithManifest classes
  Simplify FlowController base classes. Get rid of inheriting from
  StateManager class and do not use TreeUpdateListener

Signed-off-by: Arpad Boda <aboda@apache.org>

This closes #743
diff --git a/C2.md b/C2.md
index dc99778..560b8ea 100644
--- a/C2.md
+++ b/C2.md
@@ -49,8 +49,8 @@
 files contain the former naming convention of `c2.*`, we will continue to support that as
 an alternate key, but you are encouraged to switch your configuration options as soon as possible.
 
-Note: In release 0.8.0 there is a configuration option to minizime the heartbeat payload size by excluding agent manifest.
-For that, replace "AgentInformation" with "AgentInformationWithoutManifest in nifi.c2.root.classes property value.
+Note: In release 0.8.0 there is a configuration option to minimize the heartbeat payload size by excluding agent manifest.
+For that, add "nifi.c2.full.heartbeat"=false property.
 With this change, heartbeat with agent manifest included is sent only for the first time then falls back to sending
 light weight heartbeat. If for some reason the C2 server does not receive the first full heartbeat, the manifest can
 be requested via C2 DESCRIBE manifest command.
diff --git a/bin/minifi.sh b/bin/minifi.sh
index ac0b409..d149507 100755
--- a/bin/minifi.sh
+++ b/bin/minifi.sh
@@ -192,8 +192,8 @@
         echo "MiNiFi is not currently running."
       else
         echo "Stopping MiNiFi (PID: \${saved_pid})."
-        # Send a SIGINT to MiNiFi so that the handler begins shutdown.
-        kill -2 \${saved_pid} > /dev/null 2>&1
+        # Send a SIGTERM to MiNiFi so that the handler begins shutdown.
+        kill -15 \${saved_pid} > /dev/null 2>&1
         if [ \$? -ne 0 ]; then
           echo "Could not successfully send termination signal to MiNiFi (PID: \${saved_pid})"
           exit 1;
@@ -321,8 +321,8 @@
         echo "MiNiFi is not currently running."
       else
         echo "Stopping MiNiFi (PID: ${saved_pid})."
-        # Send a SIGINT to MiNiFi so that the handler begins shutdown.
-        kill -2 ${saved_pid} > /dev/null 2>&1
+        # Send a SIGTERM to MiNiFi so that the handler begins shutdown.
+        kill -15 ${saved_pid} > /dev/null 2>&1
         if [ $? -ne 0 ]; then
           echo "Could not successfully send termination signal to MiNiFi (PID: ${saved_pid})"
           exit 1;
diff --git a/conf/minifi.properties b/conf/minifi.properties
index 249ca7a..3fa2c19 100644
--- a/conf/minifi.properties
+++ b/conf/minifi.properties
@@ -53,7 +53,7 @@
 #nifi.c2.rest.url.ack=
 nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation
 ## Minimize heartbeat payload size by excluding agent manifest from the heartbeat
-#nifi.c2.root.classes=DeviceInfoNode,AgentInformationWithoutManifest,FlowInformation
+#nifi.c2.full.heartbeat=false
 ## heartbeat 4 times a second
 #nifi.c2.agent.heartbeat.period=250
 ## define parameters about your agent 
diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h
index bd7740e..c8f8499 100644
--- a/extensions/coap/tests/CoapIntegrationBase.h
+++ b/extensions/coap/tests/CoapIntegrationBase.h
@@ -41,8 +41,6 @@
 
   void setUrl(std::string url, CivetHandler *handler);
 
-  virtual ~CoapIntegrationBase() = default;
-
   void shutdownBeforeFlowController() override {
     stop_webserver(server);
   }
@@ -68,8 +66,6 @@
 
     queryRootProcessGroup(pg);
 
-    configureC2RootClasses();
-
     ptr.release();
 
     std::shared_ptr<TestRepository> repo = std::static_pointer_cast<TestRepository>(test_repo);
@@ -83,6 +79,7 @@
 
     shutdownBeforeFlowController();
     controller->waitUnload(wait_time_);
+    controller->stopC2();
     runAssertions();
 
     cleanup();
diff --git a/extensions/http-curl/tests/C2DescribeManifestTest.cpp b/extensions/http-curl/tests/C2DescribeManifestTest.cpp
index 5a84c5c..891f6f5 100644
--- a/extensions/http-curl/tests/C2DescribeManifestTest.cpp
+++ b/extensions/http-curl/tests/C2DescribeManifestTest.cpp
@@ -16,35 +16,9 @@
  * limitations under the License.
  */
 
-#include <sys/stat.h>
 #undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
 #include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
 #include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "c2/C2Agent.h"
-#include "CivetServer.h"
-#include <cstring>
-#include "protocols/RESTSender.h"
 #include "HTTPIntegrationBase.h"
 #include "HTTPHandlers.h"
 
@@ -55,7 +29,7 @@
       : HeartbeatHandler(isSecure) {
   }
 
-  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn) {
+  virtual void handleHeartbeat(const rapidjson::Document&, struct mg_connection * conn) {
     sendHeartbeatResponse("DESCRIBE", "manifest", "889345", conn);
   }
 
diff --git a/extensions/http-curl/tests/C2FailedUpdateTest.cpp b/extensions/http-curl/tests/C2FailedUpdateTest.cpp
index f5f206e..35458ec 100644
--- a/extensions/http-curl/tests/C2FailedUpdateTest.cpp
+++ b/extensions/http-curl/tests/C2FailedUpdateTest.cpp
@@ -198,22 +198,18 @@
   std::unique_ptr<core::ProcessGroup> ptr = yaml_config.getRoot(test_file_location);
   std::shared_ptr<core::ProcessGroup> pg = std::shared_ptr<core::ProcessGroup>(ptr.get());
   ptr.release();
-  auto start = std::chrono::system_clock::now();
 
   controller->load();
   controller->start();
   waitToVerifyProcessor();
 
   controller->waitUnload(60000);
-  auto then = std::chrono::system_clock::now();
 
-  auto milliseconds = std::chrono::duration_cast<std::chrono::milliseconds>(then - start).count();
   std::string logs = LogTestController::getInstance().log_output.str();
   assert(logs.find("Invalid configuration payload") != std::string::npos);
   assert(logs.find("update failed.") != std::string::npos);
   LogTestController::getInstance().reset();
   utils::file::FileUtils::delete_dir("content_repository",true);
-  assert(h_ex.calls_ <= (milliseconds / 1000) + 1);
 
   return 0;
 }
diff --git a/extensions/http-curl/tests/C2JstackTest.cpp b/extensions/http-curl/tests/C2JstackTest.cpp
index 241af1c..e530444 100644
--- a/extensions/http-curl/tests/C2JstackTest.cpp
+++ b/extensions/http-curl/tests/C2JstackTest.cpp
@@ -16,35 +16,9 @@
  * limitations under the License.
  */
 
-#include <sys/stat.h>
 #undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
 #include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
 #include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "c2/C2Agent.h"
-#include "CivetServer.h"
-#include <cstring>
-#include "protocols/RESTSender.h"
 #include "HTTPIntegrationBase.h"
 #include "HTTPHandlers.h"
 
@@ -55,7 +29,7 @@
   }
 
   virtual void runAssertions() {
-    assert(LogTestController::getInstance().contains("SchedulingAgent") == true);
+    assert(LogTestController::getInstance().contains("SchedulingAgent"));
   }
 };
 
@@ -65,12 +39,12 @@
      : HeartbeatHandler(isSecure) {
   }
 
-  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn) {
+  virtual void handleHeartbeat(const rapidjson::Document&, struct mg_connection * conn) {
     sendHeartbeatResponse("DESCRIBE", "jstack", "889398", conn);
   }
 
   virtual void handleAcknowledge(const rapidjson::Document& root) {
-    assert(root.HasMember("SchedulingAgent #0") == true);
+    assert(root.HasMember("Flowcontroller threadpool #0"));
   }
 
 };
diff --git a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
index 6e0c1c2..8a15be9 100644
--- a/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
+++ b/extensions/http-curl/tests/C2VerifyHeartbeatAndStop.cpp
@@ -16,65 +16,34 @@
  * limitations under the License.
  */
 
-#include <sys/stat.h>
 #undef NDEBUG
-#include <cassert>
-#include <utility>
-#include <chrono>
-#include <fstream>
-#include <memory>
-#include <string>
-#include <thread>
-#include <type_traits>
-#include <vector>
-#include <iostream>
-#include <sstream>
-#include "HTTPClient.h"
-#include "InvokeHTTP.h"
 #include "TestBase.h"
-#include "utils/StringUtils.h"
-#include "core/Core.h"
-#include "core/logging/Logger.h"
-#include "core/ProcessGroup.h"
-#include "core/yaml/YamlConfiguration.h"
-#include "FlowController.h"
-#include "properties/Configure.h"
-#include "unit/ProvenanceTestHelper.h"
-#include "io/StreamFactory.h"
-#include "CivetServer.h"
-#include "RemoteProcessorGroupPort.h"
-#include "core/ConfigurableComponent.h"
-#include "controllers/SSLContextService.h"
-#include "TestServer.h"
 #include "c2/C2Agent.h"
-#include "protocols/RESTReceiver.h"
+#include "protocols/RESTProtocol.h"
 #include "protocols/RESTSender.h"
+#include "protocols/RESTReceiver.h"
 #include "HTTPIntegrationBase.h"
 #include "HTTPHandlers.h"
-#include "agent/build_description.h"
-#include "processors/LogAttribute.h"
 
 class LightWeightC2Handler : public HeartbeatHandler {
  public:
   explicit LightWeightC2Handler(bool isSecure)
-      : HeartbeatHandler(isSecure),
-        calls_(0) {
+      : HeartbeatHandler(isSecure) {
   }
 
   virtual ~LightWeightC2Handler() = default;
 
-  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn)  {
-    (void)conn;
+  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection *)  {
     if (calls_ == 0) {
       verifyJsonHasAgentManifest(root);
     } else {
-      assert(root.HasMember("agentInfo") == true);
-      assert(root["agentInfo"].HasMember("agentManifest") == false);
+      assert(root.HasMember("agentInfo"));
+      assert(!root["agentInfo"].HasMember("agentManifest"));
     }
     calls_++;
   }
  private:
-  std::atomic<size_t> calls_;
+  std::atomic<size_t> calls_{0};
 };
 
 class VerifyC2Heartbeat : public VerifyC2Base {
@@ -83,8 +52,6 @@
       : VerifyC2Base(isSecure) {
   }
 
-  virtual ~VerifyC2Heartbeat() = default;
-
   virtual void testSetup() {
     LogTestController::getInstance().setTrace<minifi::c2::C2Agent>();
     LogTestController::getInstance().setDebug<minifi::c2::RESTSender>();
@@ -94,15 +61,13 @@
   }
 
   void runAssertions() {
-    assert(LogTestController::getInstance().contains("Received Ack from Server") == true);
-
-    assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component invoke") == true);
-
-    assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component FlowController") == true);
+    assert(LogTestController::getInstance().contains("Received Ack from Server"));
+    assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component invoke"));
+    assert(LogTestController::getInstance().contains("C2Agent] [debug] Stopping component FlowController"));
   }
 
-  void configureC2RootClasses() {
-    configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformation,FlowInformation");
+  void configureFullHeartbeat() {
+    configuration->set("nifi.c2.full.heartbeat", "true");
   }
 };
 
@@ -112,10 +77,8 @@
       : VerifyC2Heartbeat(isSecure) {
   }
 
-  virtual ~VerifyLightWeightC2Heartbeat() = default;
-
-  void configureC2RootClasses() {
-    configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformationWithoutManifest,FlowInformation");
+  void configureFullHeartbeat() {
+    configuration->set("nifi.c2.full.heartbeat", "false");
   }
 };
 
@@ -136,24 +99,16 @@
   }
   {
     VerifyC2Heartbeat harness(isSecure);
-
     harness.setKeyDir(key_dir);
-
     HeartbeatHandler responder(isSecure);
-
     harness.setUrl(url, &responder);
-
     harness.run(test_file_location);
   }
 
   VerifyLightWeightC2Heartbeat harness(isSecure);
-
   harness.setKeyDir(key_dir);
-
   LightWeightC2Handler responder(isSecure);
-
   harness.setUrl(url, &responder);
-
   harness.run(test_file_location);
 
   return 0;
diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h
index 1de39b3..4d3a549 100644
--- a/extensions/http-curl/tests/HTTPHandlers.h
+++ b/extensions/http-curl/tests/HTTPHandlers.h
@@ -353,18 +353,18 @@
 
   std::string readPost(struct mg_connection *conn) {
     std::string response;
-    int blockSize = 1024 * sizeof(char), readBytes;
+    int readBytes;
 
     char buffer[1024];
-    while ((readBytes = mg_read(conn, buffer, blockSize)) > 0) {
-      response.append(buffer, 0, (readBytes / sizeof(char)));
+    while ((readBytes = mg_read(conn, buffer, sizeof(buffer))) > 0) {
+      response.append(buffer, (readBytes / sizeof(char)));
     }
     return response;
   }
 
   void sendStopOperation(struct mg_connection *conn) {
-    std::string resp = "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"name\" : \"invoke\"  }, "
-        "{ \"operationid\" : 42, \"operation\" : \"stop\", \"name\" : \"FlowController\"  } ]}";
+    std::string resp = "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"operand\" : \"invoke\"  }, "
+        "{ \"operationid\" : 42, \"operation\" : \"stop\", \"operand\" : \"FlowController\"  } ]}";
     mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
               "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
               resp.length());
@@ -385,14 +385,14 @@
 
   void verifyJsonHasAgentManifest(const rapidjson::Document& root) {
     bool found = false;
-    assert(root.HasMember("agentInfo") == true);
-    assert(root["agentInfo"].HasMember("agentManifest") == true);
-    assert(root["agentInfo"]["agentManifest"].HasMember("bundles") == true);
+    assert(root.HasMember("agentInfo"));
+    assert(root["agentInfo"].HasMember("agentManifest"));
+    assert(root["agentInfo"]["agentManifest"].HasMember("bundles"));
 
     for (auto &bundle : root["agentInfo"]["agentManifest"]["bundles"].GetArray()) {
       assert(bundle.HasMember("artifact"));
       std::string str = bundle["artifact"].GetString();
-      if (str == "minifi-system") {
+      if (str == "minifi-standard-processors") {
 
         std::vector<std::string> classes;
         for (auto &proc : bundle["componentManifest"]["processors"].GetArray()) {
@@ -407,23 +407,23 @@
 
       }
     }
-    assert(found == true);
+    assert(found);
   }
 
-  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection * conn) {
-    (void)conn;
+  virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection *) {
     verifyJsonHasAgentManifest(root);
   }
 
-  virtual void handleAcknowledge(const rapidjson::Document& root) {
+  virtual void handleAcknowledge(const rapidjson::Document&) {
   }
 
   void verify(struct mg_connection *conn) {
     auto post_data = readPost(conn);
-    std::cerr << post_data << std::endl;
+    //std::cerr << post_data << std::endl;
     if (!IsNullOrEmpty(post_data)) {
       rapidjson::Document root;
       rapidjson::ParseResult ok = root.Parse(post_data.data(), post_data.size());
+      assert(ok);
       std::string operation = root["operation"].GetString();
       if (operation == "heartbeat") {
         handleHeartbeat(root, conn);
@@ -435,7 +435,7 @@
     }
   }
 
-  bool handlePost(CivetServer *server, struct mg_connection *conn) {
+  bool handlePost(CivetServer *, struct mg_connection *conn) {
     verify(conn);
     sendStopOperation(conn);
     return true;
diff --git a/extensions/http-curl/tests/HTTPIntegrationBase.h b/extensions/http-curl/tests/HTTPIntegrationBase.h
index 4226630..02e4104 100644
--- a/extensions/http-curl/tests/HTTPIntegrationBase.h
+++ b/extensions/http-curl/tests/HTTPIntegrationBase.h
@@ -91,33 +91,14 @@
  public:
   explicit VerifyC2Base(bool isSecure)
       : isSecure(isSecure) {
-    char format[] = "/tmp/ssth.XXXXXX";
-    dir = testController.createTempDirectory(format);
   }
 
   virtual void testSetup() {
     LogTestController::getInstance().setDebug<utils::HTTPClient>();
     LogTestController::getInstance().setDebug<LogTestController>();
-    std::fstream file;
-    ss << dir << "/" << "tstFile.ext";
-    file.open(ss.str(), std::ios::out);
-    file << "tempFile";
-    file.close();
   }
 
-  void runAssertions() {
-  }
-
-  virtual void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup> pg) {
-    std::shared_ptr<core::Processor> proc = pg->findProcessor("invoke");
-    assert(proc != nullptr);
-
-    std::shared_ptr<minifi::processors::InvokeHTTP> inv = std::dynamic_pointer_cast<minifi::processors::InvokeHTTP>(proc);
-
-    assert(inv != nullptr);
-    std::string url = "";
-    inv->getProperty(minifi::processors::InvokeHTTP::URL.getName(), url);
-
+  virtual void queryRootProcessGroup(std::shared_ptr<core::ProcessGroup>) override {
     std::string c2_url = std::string("http") + (isSecure ? "s" : "") + "://localhost:" + getWebPort() + "/api/heartbeat";
 
     configuration->set("nifi.c2.agent.protocol.class", "RESTSender");
@@ -126,18 +107,15 @@
     configuration->set("nifi.c2.rest.url", c2_url);
     configuration->set("nifi.c2.agent.heartbeat.period", "1000");
     configuration->set("nifi.c2.rest.url.ack", c2_url);
+    configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformation,FlowInformation");
   }
 
   void cleanup() {
     LogTestController::getInstance().reset();
-    unlink(ss.str().c_str());
   }
 
  protected:
   bool isSecure;
-  std::string dir;
-  std::stringstream ss;
-  TestController testController;
 };
 
 class VerifyC2Describe : public VerifyC2Base {
@@ -153,8 +131,11 @@
     VerifyC2Base::testSetup();
   }
 
-  void configureC2RootClasses() {
-    configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformationWithoutManifest,FlowInformation");
+  void configureFullHeartbeat() {
+    configuration->set("nifi.c2.full.heartbeat", "false");
+  }
+
+  void runAssertions() {
   }
 };
 #endif /* LIBMINIFI_TEST_INTEGRATION_HTTPINTEGRATIONBASE_H_ */
diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h
index bef607d..5efac83 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -47,8 +47,9 @@
 #include "core/Property.h"
 #include "core/state/nodes/MetricsBase.h"
 #include "utils/Id.h"
-#include "core/state/StateManager.h"
+#include "core/state/UpdateController.h"
 #include "core/state/nodes/FlowInformation.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -61,7 +62,7 @@
  * Flow Controller class. Generally used by FlowController factory
  * as a singleton.
  */
-class FlowController : public core::controller::ControllerServiceProvider, public state::StateManager {
+class FlowController : public core::controller::ControllerServiceProvider, public state::response::NodeReporter,  public state::StateMonitor, public std::enable_shared_from_this<FlowController> {
  public:
   /**
    * Flow controller constructor
@@ -285,36 +286,23 @@
   virtual void enableAllControllerServices();
 
   /**
-   * Retrieves all root response nodes from this source.
-   * @param metric_vector -- metrics will be placed in this vector.
-   * @return result of the get operation.
-   *  0 Success
-   *  1 No error condition, but cannot obtain lock in timely manner.
-   *  -1 failure
+   * Retrieves metrics node
+   * @return metrics response node
    */
-  virtual int16_t getResponseNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector, uint16_t metricsClass);
-  /**
-   * Retrieves all metrics from this source.
-   * @param metric_vector -- metrics will be placed in this vector.
-   * @return result of the get operation.
-   *  0 Success
-   *  1 No error condition, but cannot obtain lock in timely manner.
-   *  -1 failure
-   */
-  virtual int16_t getMetricsNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector, uint16_t metricsClass);
+  virtual std::shared_ptr<state::response::ResponseNode> getMetricsNode(const std::string& metricsClass) const;
 
   /**
-   * Retrieves agent information with manifest only from this source.
-   * @param manifest_vector -- manifest nodes vector.
-   * @return 0 on Success, -1 on failure
+   * Retrieves root nodes configured to be included in heartbeat
+   * @param includeManifest -- determines if manifest is to be included
+   * @return a list of response nodes
    */
-  virtual int16_t getManifestNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>& manifest_vector) const;
+  virtual std::vector<std::shared_ptr<state::response::ResponseNode>> getHeartbeatNodes(bool includeManifest) const;
 
   /**
-   * Returns a response node containing all agent information with manifest and agent status
-   * @return a shared pointer to agent information
+   * Retrieves the agent manifest to be sent as a response to C2 DESCRIBE manifest
+   * @return the agent manifest response node
    */
-  virtual std::shared_ptr<state::response::ResponseNode> getAgentInformation() const;
+  virtual std::shared_ptr<state::response::ResponseNode> getAgentManifest() const;
 
   virtual uint64_t getUptime();
 
@@ -322,6 +310,8 @@
 
   void initializeC2();
 
+  void stopC2();
+
  protected:
 
   void loadC2ResponseConfiguration();
@@ -335,6 +325,8 @@
 
   void initializeExternalComponents();
 
+  std::shared_ptr<state::response::ResponseNode> getAgentInformation() const;
+
   /**
    * Initializes flow controller paths.
    */
@@ -409,16 +401,14 @@
 
   std::map<uint8_t, std::vector<std::shared_ptr<state::response::ResponseNode>>> component_metrics_by_id_;
 
-  // manifest cache
-  std::map<std::string, std::shared_ptr<state::response::ResponseNode>> agent_information_;
-
   // metrics last run
   std::chrono::steady_clock::time_point last_metrics_capture_;
 
  private:
   std::shared_ptr<logging::Logger> logger_;
   std::string serial_number_;
-  static std::shared_ptr<utils::IdGenerator> id_generator_;
+
+  std::unique_ptr<state::UpdateController> c2_agent_;
 };
 
 }
diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h
index 13d7ded..51c7b61 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -101,10 +101,6 @@
     running_ = false;
   }
 
-  std::vector<BackTrace> getTraces() {
-    return thread_pool_.getTraces();
-  }
-
   void watchDogFunc();
 
   virtual std::future<utils::TaskRescheduleInfo> enableControllerService(std::shared_ptr<core::controller::ControllerServiceNode> &serviceNode);
diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h
index c39589f..0ea2445 100644
--- a/libminifi/include/c2/C2Agent.h
+++ b/libminifi/include/c2/C2Agent.h
@@ -34,6 +34,9 @@
 #include "C2Protocol.h"
 #include "io/validation.h"
 #include "HeartBeatReporter.h"
+#include "utils/ThreadPool.h"
+#include "utils/Id.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
@@ -52,39 +55,27 @@
  *   0 HeartBeat --  RESERVED
  *   1-255 Defined by the configuration file.
  */
-class C2Agent : public state::UpdateController, public state::response::ResponseNodeSink, public std::enable_shared_from_this<C2Agent> {
+class C2Agent : public state::UpdateController {
  public:
 
-  C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink, const std::shared_ptr<Configure> &configure);
+  C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller,
+          const std::shared_ptr<state::StateMonitor> &updateSink,
+          const std::shared_ptr<Configure> &configure);
 
   virtual ~C2Agent() noexcept {
     delete protocol_.load();
   }
 
+  void start() override;
+
+  void stop() override;
+
   /**
    * Sends the heartbeat to ths server. Will include metrics
    * in the payload if they exist.
    */
   void performHeartBeat();
 
-  virtual std::vector<std::function<state::Update()>> getFunctions() {
-    return functions_;
-  }
-
-  /**
-   * Sets the metric within this sink
-   * @param metric metric to set
-   * @param return 0 on success, -1 on failure.
-   */
-  virtual int16_t setResponseNodes(const std::shared_ptr<state::response::ResponseNode> &metric);
-
-  /**
-    * Sets the metric within this sink
-    * @param metric metric to set
-    * @param return 0 on success, -1 on failure.
-    */
-   virtual int16_t setMetricsNodes(const std::shared_ptr<state::response::ResponseNode> &metric);
-
   int64_t getHeartBeatDelay(){
     std::lock_guard<std::mutex> lock(heartbeat_mutex);
     return heart_beat_period_;
@@ -180,9 +171,9 @@
   std::map<std::string, std::shared_ptr<state::response::ResponseNode>> metrics_map_;
 
   /**
-     * Device information stored in the metrics format
-     */
-    std::map<std::string, std::shared_ptr<state::response::ResponseNode>> root_response_nodes_;
+   * Device information stored in the metrics format
+   */
+  std::map<std::string, std::shared_ptr<state::response::ResponseNode>> root_response_nodes_;
 
   /**
    * Device information stored in the metrics format
@@ -210,16 +201,16 @@
   std::chrono::steady_clock::time_point last_run_;
 
   // function that performs the heartbeat
-  std::function<state::Update()> c2_producer_;
+  std::function<utils::TaskRescheduleInfo()> c2_producer_;
 
   // function that acts upon the
-  std::function<state::Update()> c2_consumer_;
+  std::function<utils::TaskRescheduleInfo()> c2_consumer_;
 
   // reference to the update sink, against which we will execute updates.
   std::shared_ptr<state::StateMonitor> update_sink_;
 
   // functions that will be used for the udpate controller.
-  std::vector<std::function<state::Update()>> functions_;
+  std::vector<std::function<utils::TaskRescheduleInfo()>> functions_;
 
   std::shared_ptr<controllers::UpdatePolicyControllerService> update_service_;
 
@@ -250,7 +241,13 @@
 
   std::shared_ptr<logging::Logger> logger_;
 
+  utils::ThreadPool<utils::TaskRescheduleInfo> thread_pool_;
+
+  std::vector<std::string> task_ids_;
+
   bool manifest_sent_;
+
+  const uint64_t C2RESPONSE_POLL_MS = 100;
 };
 
 } /* namesapce c2 */
diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h
index 51712e3..6c0b6d2 100644
--- a/libminifi/include/core/Core.h
+++ b/libminifi/include/core/Core.h
@@ -153,7 +153,7 @@
       : name_(name) {
     if (uuid == nullptr) {
       // Generate the global UUID for the flow record
-      id_generator_->generate(uuid_);
+      utils::IdGenerator::getIdGenerator()->generate(uuid_);
     } else {
       uuid_ = uuid;
     }
@@ -163,7 +163,7 @@
   explicit CoreComponent(const std::string &name)
       : name_(name) {
     // Generate the global UUID for the flow record
-    id_generator_->generate(uuid_);
+    utils::IdGenerator::getIdGenerator()->generate(uuid_);
     uuidStr_ = uuid_.to_string();
   }
 
@@ -226,9 +226,6 @@
 
   // Connectable's name
   std::string name_;
-
-private:
-  static std::shared_ptr<utils::IdGenerator> id_generator_;
 };
 
 namespace logging {
diff --git a/libminifi/include/core/state/StateManager.h b/libminifi/include/core/state/StateManager.h
deleted file mode 100644
index 983ed0b..0000000
--- a/libminifi/include/core/state/StateManager.h
+++ /dev/null
@@ -1,140 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef LIBMINIFI_INCLUDE_STATE_MANAGER_H
-#define LIBMINIFI_INCLUDE_STATE_MANAGER_H
-
-#include <map>
-#include <atomic>
-#include <algorithm>
-
-#include "UpdateController.h"
-#include "io/validation.h"
-#include "utils/ThreadPool.h"
-#include "core/Core.h"
-#include "nodes/MetricsBase.h"
-#include "nodes/TreeUpdateListener.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-
-/**
- * State manager is meant to be used by implementing agents of this library. It represents the source and sink for metrics,
- * the sink for external updates, and encapsulates the thread pool that runs the listeners for various update operations
- * that can be performed.
- */
-class StateManager : public response::NodeReporter, public response::ResponseNodeSink, public StateMonitor, public std::enable_shared_from_this<StateManager> {
- public:
-
-  StateManager()
-      : metrics_listener_(nullptr) {
-
-  }
-
-  virtual ~StateManager() {
-
-  }
-
-  /**
-   * Initializes the thread pools.
-   */
-  void initialize();
-
-  /**
-   * State management operations.
-   */
-  /**
-   * Stop this controllable.
-   * @param force force stopping
-   * @param timeToWait time to wait before giving up.
-   * @return status of stopping this controller.
-   */
-  virtual int16_t stop(bool force, uint64_t timeToWait = 0);
-
-  /**
-   * Updates the given flow controller.
-   */
-  int16_t update(const std::shared_ptr<Update> &updateController);
-
-  /**
-   * Passes metrics to the update controllers if they are a metrics sink.
-   * @param metrics metric to pass through
-   */
-  int16_t setResponseNodes(const std::shared_ptr<response::ResponseNode> &metrics);
-
-  /**
-   * Metrics operations
-   */
-  virtual int16_t getResponseNodes(std::vector<std::shared_ptr<response::ResponseNode>> &metric_vector, uint16_t metricsClass);
-
-  virtual std::string getVersion(){
-    return "";
-  }
-
- protected:
-
-  void shutdownState(){
-    listener_thread_pool_.shutdown();
-    metrics_maps_.clear();
-    updateControllers.clear();
-  }
-
-  /**
-   * Function to apply updates for a given  update controller.
-   * @param source  source identifier
-   * @param updateController update controller mechanism.
-   */
-  virtual int16_t applyUpdate(const std::string &source, const std::shared_ptr<Update> &updateController) = 0;
-
-  /**
-   * Registers and update controller
-   * @param updateController update controller to add.
-   */
-  bool registerUpdateListener(const std::shared_ptr<UpdateController> &updateController, const int64_t &delay);
-
-
-  /**
-   * Base metrics function will employ the default metrics listener.
-   */
-  virtual bool startMetrics(const int64_t &delay);
-
- private:
-
-  std::timed_mutex mutex_;
-
-  std::map<std::string, std::shared_ptr<response::ResponseNode>> metrics_maps_;
-
-  std::vector<std::shared_ptr<UpdateController> > updateControllers;
-
-  std::unique_ptr<state::response::TreeUpdateListener> metrics_listener_;
-
-  utils::ThreadPool<Update> listener_thread_pool_;
-
-};
-
-
-
-} /* namespace state */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
-
-#endif /* LIBMINIFI_INCLUDE_C2_CONTROLLABLE_H_ */
diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h
index 8c76e58..9b8d785 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -256,12 +256,26 @@
 class UpdateController {
  public:
 
-  virtual std::vector<std::function<Update()>> getFunctions() = 0;
-
-  virtual ~UpdateController() {
-
+  UpdateController()
+      : controller_running_(false) {
   }
 
+  virtual ~UpdateController() = default;
+
+  virtual std::vector<std::function<utils::TaskRescheduleInfo()>> getFunctions() {
+    return {};
+  }
+
+  virtual void start() = 0;
+
+  virtual void stop() = 0;
+
+  std::atomic<bool>& isControllerRunning() {
+    return controller_running_;
+  }
+ protected:
+
+  std::atomic<bool> controller_running_;
 };
 
 } /* namespace state */
diff --git a/libminifi/include/core/state/nodes/AgentInformation.h b/libminifi/include/core/state/nodes/AgentInformation.h
index 5809a4c..4e163e5 100644
--- a/libminifi/include/core/state/nodes/AgentInformation.h
+++ b/libminifi/include/core/state/nodes/AgentInformation.h
@@ -504,8 +504,8 @@
 class AgentIdentifier {
  public:
 
-  AgentIdentifier() {
-
+  AgentIdentifier()
+     : include_agent_manifest_(true) {
   }
 
   void setIdentifier(const std::string &identifier) {
@@ -516,9 +516,14 @@
     agent_class_ = agentClass;
   }
 
+  void includeAgentManifest(bool include) {
+    include_agent_manifest_ = include;
+  }
+
  protected:
   std::string identifier_;
   std::string agent_class_;
+  bool include_agent_manifest_;
 };
 
 class AgentMonitor {
@@ -675,31 +680,9 @@
     serialized.push_back(agentManifest);
     return serialized;
   }
-};
 
-/**
- * This class is used for regular heartbeat without manifest
- * A light weight heartbeat
- */
-class AgentInformationWithoutManifest : public AgentNode {
-public:
-
-  AgentInformationWithoutManifest(const std::string& name, utils::Identifier & uuid)
-      : AgentNode(name, uuid) {
-    setArray(false);
-  }
-
-  explicit AgentInformationWithoutManifest(const std::string &name)
-      : AgentNode(name) {
-    setArray(false);
-  }
-
-  std::string getName() const {
-    return "agentInfo";
-  }
-
-  std::vector<SerializedResponseNode> serialize() {
-    std::vector<SerializedResponseNode> serialized(AgentNode::serialize());
+  std::vector<SerializedResponseNode> getAgentStatus() const {
+    std::vector<SerializedResponseNode> serialized;
 
     AgentStatus status("status");
     status.setRepositories(repositories_);
@@ -716,21 +699,23 @@
   }
 };
 
-
 /**
- * This class is used for sending all agent information including manifest and status
- * A heavy weight heartbeat. Here to maintain backward compatibility
+ * This class is used for sending agent information while including
+ * or excluding the agent manifest. agent status and agent manifest
+ * is included by default
  */
-class AgentInformation : public AgentInformationWithoutManifest {
+class AgentInformation : public AgentNode {
  public:
 
   AgentInformation(const std::string& name, utils::Identifier & uuid)
-      : AgentInformationWithoutManifest(name, uuid) {
+      : AgentNode(name, uuid),
+        include_agent_status_(true) {
     setArray(false);
   }
 
   explicit AgentInformation(const std::string &name)
-      : AgentInformationWithoutManifest(name) {
+      : AgentNode(name),
+        include_agent_status_(true) {
     setArray(false);
   }
 
@@ -738,45 +723,28 @@
     return "agentInfo";
   }
 
-  std::vector<SerializedResponseNode> serialize() {
-    std::vector<SerializedResponseNode> serialized(AgentInformationWithoutManifest::serialize());
-    auto manifest = getAgentManifest();
-    serialized.insert(serialized.end(), std::make_move_iterator(manifest.begin()), std::make_move_iterator(manifest.end()));
-    return serialized;
-  }
-
-};
-
-/**
- * This class is used for response to DESCRIBE manifest request
- * It contains static information only
- */
-class AgentInformationWithManifest : public AgentNode {
-public:
-  AgentInformationWithManifest(const std::string& name, utils::Identifier & uuid)
-      : AgentNode(name, uuid) {
-    setArray(false);
-  }
-
-  explicit AgentInformationWithManifest(const std::string &name)
-      : AgentNode(name) {
-    setArray(false);
-  }
-
-  std::string getName() const {
-    return "agentInfo";
+  void includeAgentStatus(bool include) {
+    include_agent_status_ = include;
   }
 
   std::vector<SerializedResponseNode> serialize() {
     std::vector<SerializedResponseNode> serialized(AgentNode::serialize());
-    auto manifest = getAgentManifest();
-    serialized.insert(serialized.end(), std::make_move_iterator(manifest.begin()), std::make_move_iterator(manifest.end()));
+    if (include_agent_manifest_) {
+      auto manifest = getAgentManifest();
+      serialized.insert(serialized.end(), std::make_move_iterator(manifest.begin()), std::make_move_iterator(manifest.end()));
+    }
+
+    if (include_agent_status_) {
+      auto status = getAgentStatus();
+      serialized.insert(serialized.end(), std::make_move_iterator(status.begin()), std::make_move_iterator(status.end()));
+    }
     return serialized;
   }
+ protected:
+  bool include_agent_status_;
 };
 
 REGISTER_RESOURCE(AgentInformation, "Node part of an AST that defines all agent information, to include the manifest, and bundle information as part of a healthy hearbeat.");
-REGISTER_RESOURCE(AgentInformationWithoutManifest, "Node part of an AST that defines all agent information, without the manifest and bundle information as part of a healthy hearbeat.");
 
 } /* namespace metrics */
 } /* namespace state */
diff --git a/libminifi/include/core/state/nodes/MetricsBase.h b/libminifi/include/core/state/nodes/MetricsBase.h
index d91df6e..eb06ff7 100644
--- a/libminifi/include/core/state/nodes/MetricsBase.h
+++ b/libminifi/include/core/state/nodes/MetricsBase.h
@@ -209,37 +209,23 @@
   }
 
   /**
-   * Retrieves all root response nodes from this source.
-   * @param metric_vector -- metrics will be placed in this vector.
-   * @return result of the get operation.
-   *  0 Success
-   *  1 No error condition, but cannot obtain lock in timely manner.
-   *  -1 failure
+   * Retrieves metrics node
+   * @return metrics response node
    */
-  virtual int16_t getResponseNodes(std::vector<std::shared_ptr<ResponseNode>> &metric_vector, uint16_t metricsClass) = 0;
+  virtual std::shared_ptr<ResponseNode> getMetricsNode(const std::string& metricsClass) const = 0;
 
   /**
-   * Retrieves all metrics from this source.
-   * @param metric_vector -- metrics will be placed in this vector.
-   * @return result of the get operation.
-   *  0 Success
-   *  1 No error condition, but cannot obtain lock in timely manner.
-   *  -1 failure
+   * Retrieves root nodes configured to be included in heartbeat
+   * @param includeManifest -- determines if manifest is to be included
+   * @return a list of response nodes
    */
-  virtual int16_t getMetricsNodes(std::vector<std::shared_ptr<ResponseNode>> &metric_vector, uint16_t metricsClass) = 0;
+  virtual std::vector<std::shared_ptr<ResponseNode>> getHeartbeatNodes(bool includeManifest) const = 0;
 
   /**
-   * Retrieves agent information with manifest only from this source.
-   * @param manifest_vector -- manifest nodes vector.
-   * @return 0 on Success, -1 on failure
+   * Retrieves the agent manifest to be sent as a response to C2 DESCRIBE manifest
+   * @return the agent manifest response node
    */
-  virtual int16_t getManifestNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>& manifest_vector) const = 0;
-
-  /**
-   * Returns a response node containing all agent information with manifest and agent status
-   * @return a shared pointer to agent information
-   */
-  virtual std::shared_ptr<state::response::ResponseNode> getAgentInformation() const = 0;
+  virtual std::shared_ptr<state::response::ResponseNode> getAgentManifest() const = 0;
 };
 
 /**
@@ -269,7 +255,7 @@
    *  1 No error condition, but cannot obtain lock in timely manner.
    *  -1 failure
    */
-//  virtual int16_t setMetricsNodes(const std::shared_ptr<ResponseNode> &metrics) = 0;
+  virtual int16_t setMetricsNodes(const std::shared_ptr<ResponseNode> &metrics) = 0;
 };
 
 } /* namespace metrics */
diff --git a/libminifi/include/core/state/nodes/TreeUpdateListener.h b/libminifi/include/core/state/nodes/TreeUpdateListener.h
index 4b7e237..ad22e2a 100644
--- a/libminifi/include/core/state/nodes/TreeUpdateListener.h
+++ b/libminifi/include/core/state/nodes/TreeUpdateListener.h
@@ -75,49 +75,6 @@
 
 };
 
-class TreeUpdateListener {
- public:
-  TreeUpdateListener(const std::shared_ptr<response::NodeReporter> &source, const std::shared_ptr<response::ResponseNodeSink> &sink)
-      : running_(true),
-        source_(source),
-        sink_(sink){
-
-    function_ = [&]() {
-      while(running_) {
-        std::vector<std::shared_ptr<response::ResponseNode>> metric_vector;
-        // simple pass through for the metrics
-        if (nullptr != source_ && nullptr != sink_) {
-          source_->getResponseNodes(metric_vector,0);
-          for(auto metric : metric_vector) {
-            sink_->setResponseNodes(metric);
-          }
-        }
-        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
-      }
-      return MetricsUpdate(UpdateState::READ_COMPLETE);
-    };
-  }
-
-  void stop() {
-    running_ = false;
-  }
-
-  std::function<Update()> &getFunction() {
-    return function_;
-  }
-
-  std::future<Update> &getFuture() {
-    return future_;
-  }
-
- private:
-  std::function<Update()> function_;
-  std::future<Update> future_;
-  std::atomic<bool> running_;
-  std::shared_ptr<response::NodeReporter> source_;
-  std::shared_ptr<response::ResponseNodeSink> sink_;
-};
-
 } /* namespace metrics */
 } /* namespace state */
 } /* namespace minifi */
diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h
index 9250b5b..824d386 100644
--- a/libminifi/include/properties/Configure.h
+++ b/libminifi/include/properties/Configure.h
@@ -36,10 +36,21 @@
     std::lock_guard<std::mutex> lock(mutex_);
     agent_identifier_ = identifier;
   }
-  std::string getAgentIdentifier() {
+  std::string getAgentIdentifier() const {
     std::lock_guard<std::mutex> lock(mutex_);
     return agent_identifier_;
   }
+
+  void setAgentClass(const std::string& agentClass) {
+    std::lock_guard<std::mutex> lock(mutex_);
+    agent_class_ = agentClass;
+  }
+
+  std::string getAgentClass() const {
+    std::lock_guard<std::mutex> lock(mutex_);
+    return agent_class_;
+  }
+
   // nifi.flow.configuration.file
   static const char *nifi_default_directory;
   static const char *nifi_flow_configuration_file;
@@ -87,10 +98,12 @@
   static const char *nifi_c2_flow_id;
   static const char *nifi_c2_flow_url;
   static const char *nifi_c2_flow_base_url;
+  static const char *nifi_c2_full_heartbeat;
 
  private:
   std::string agent_identifier_;
-  std::mutex mutex_;
+  std::string agent_class_;
+  mutable std::mutex mutex_;
 };
 
 } /* namespace minifi */
diff --git a/libminifi/include/utils/BackTrace.h b/libminifi/include/utils/BackTrace.h
index 5bd95cb..8ba57c2 100644
--- a/libminifi/include/utils/BackTrace.h
+++ b/libminifi/include/utils/BackTrace.h
@@ -25,8 +25,10 @@
 #include <utility>
 #include <vector>
 #include <mutex>
+#include <condition_variable>
 #include <iostream>
 #include <sstream>
+#include <memory>
 
 #define TRACE_BUFFER_SIZE 128
 
@@ -87,7 +89,6 @@
  */
 class TraceResolver {
  public:
-
   /**
    * Retrieves the backtrace for the provided thread reference
    * @return BackTrace instance
@@ -145,27 +146,25 @@
     trace_.addLine(line.str());
   }
 
-  /**
-   * Returns the thread handle reference in the native format.
-   */
-  std::thread::native_handle_type getThreadHandle() {
-    return thread_handle_;
+  std::unique_lock<std::mutex> lock() {
+    return std::unique_lock<std::mutex>(trace_mutex_);
   }
 
-  /**
-   * Returns the caller handle reference in the native format.
-   */
-  std::thread::native_handle_type getCallerHandle() {
-    return caller_handle_;
+  void notifyPullTracesDone(std::unique_lock<std::mutex>& lock) {
+    std::unique_lock<std::mutex> tlock(std::move(lock));
+    pull_traces_ = true;
+    trace_condition_.notify_one();
   }
 
  private:
   TraceResolver() = default;
 
   BackTrace trace_;
-  std::thread::native_handle_type thread_handle_{0};
-  std::thread::native_handle_type caller_handle_{0};
-  std::mutex mutex_;
+  mutable std::mutex mutex_;
+
+  bool pull_traces_{false};
+  mutable std::mutex trace_mutex_;
+  std::condition_variable trace_condition_;
 };
 
 #endif /* LIBMINIFI_INCLUDE_UTILS_BACKTRACE_H_ */
diff --git a/libminifi/include/utils/ThreadPool.h b/libminifi/include/utils/ThreadPool.h
index 86e2abb..56a550e 100644
--- a/libminifi/include/utils/ThreadPool.h
+++ b/libminifi/include/utils/ThreadPool.h
@@ -50,7 +50,7 @@
 template<typename T>
 class Worker {
  public:
-  explicit Worker(std::function<T()> &task, const std::string &identifier, std::unique_ptr<AfterExecute<T>> run_determinant)
+  explicit Worker(const std::function<T()> &task, const std::string &identifier, std::unique_ptr<AfterExecute<T>> run_determinant)
       : identifier_(identifier),
         next_exec_time_(std::chrono::steady_clock::now()),
         task(task),
@@ -58,7 +58,7 @@
     promise = std::make_shared<std::promise<T>>();
   }
 
-  explicit Worker(std::function<T()> &task, const std::string &identifier)
+  explicit Worker(const std::function<T()> &task, const std::string &identifier)
       : identifier_(identifier),
         next_exec_time_(std::chrono::steady_clock::now()),
         task(task),
diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp
index 38b9c19..fbb0fa9 100644
--- a/libminifi/src/Configure.cpp
+++ b/libminifi/src/Configure.cpp
@@ -62,6 +62,7 @@
 const char *Configure::nifi_c2_flow_id = "nifi.c2.flow.id";
 const char *Configure::nifi_c2_flow_url = "nifi.c2.flow.url";
 const char *Configure::nifi_c2_flow_base_url = "nifi.c2.flow.base.url";
+const char *Configure::nifi_c2_full_heartbeat = "nifi.c2.full.heartbeat";
 
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 1a251b5..350b64d 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -54,6 +54,7 @@
 #include "core/Connectable.h"
 #include "utils/HTTPClient.h"
 #include "io/NetworkPrioritizer.h"
+#include "io/validation.h"
 
 #ifdef _MSC_VER
 #ifndef PATH_MAX
@@ -66,8 +67,6 @@
 namespace nifi {
 namespace minifi {
 
-std::shared_ptr<utils::IdGenerator> FlowController::id_generator_ = utils::IdGenerator::getIdGenerator();
-
 #define DEFAULT_CONFIG_NAME "conf/config.yml"
 
 FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure,
@@ -98,7 +97,7 @@
   if (IsNullOrEmpty(configuration_)) {
     throw std::runtime_error("Must supply a configuration.");
   }
-  id_generator_->generate(uuid_);
+  utils::IdGenerator::getIdGenerator()->generate(uuid_);
   setUUID(uuid_);
   flow_update_ = false;
   // Setup the default values
@@ -172,6 +171,7 @@
 
 FlowController::~FlowController() {
   stop(true);
+  stopC2();
   unload();
   if (NULL != protocol_)
     delete protocol_;
@@ -179,6 +179,11 @@
   provenance_repo_ = nullptr;
 }
 
+void FlowController::stopC2() {
+  if (c2_agent_)
+    c2_agent_->stop();
+}
+
 bool FlowController::applyConfiguration(const std::string &source, const std::string &configurePayload) {
   std::unique_ptr<core::ProcessGroup> newRoot;
   try {
@@ -243,7 +248,7 @@
     this->timer_scheduler_->stop();
     this->event_scheduler_->stop();
     this->cron_scheduler_->stop();
-    this->thread_pool_.shutdown();
+    thread_pool_.shutdown();
     running_ = false;
   }
   return 0;
@@ -282,8 +287,6 @@
     initialized_ = false;
     name_ = "";
   }
-
-  return;
 }
 
 void FlowController::load(const std::shared_ptr<core::ProcessGroup> &root, bool reload) {
@@ -398,6 +401,7 @@
       this->protocol_->start();
       this->provenance_repo_->start();
       this->flow_file_repo_->start();
+      thread_pool_.start();
       logger_->log_info("Started Flow Controller");
     }
     return 0;
@@ -414,6 +418,7 @@
 
   // don't need to worry about the return code, only whether class_str is defined.
   configuration_->get("nifi.c2.agent.class", "c2.agent.class", class_str);
+  configuration_->setAgentClass(class_str);
 
   if (configuration_->get(Configure::nifi_c2_enable, "c2.enable", c2_enable_str)) {
     bool enable_c2 = true;
@@ -447,28 +452,17 @@
     // set to the flow controller's identifier
     identifier_str = uuidStr_;
   }
+  configuration_->setAgentIdentifier(identifier_str);
 
-  if (!c2_initialized_) {
-    configuration_->setAgentIdentifier(identifier_str);
-    state::StateManager::initialize();
-    std::shared_ptr<c2::C2Agent> agent = std::make_shared<c2::C2Agent>(std::dynamic_pointer_cast<FlowController>(shared_from_this()), std::dynamic_pointer_cast<FlowController>(shared_from_this()),
-                                                                       configuration_);
-    registerUpdateListener(agent, agent->getHeartBeatDelay());
-
-    state::StateManager::startMetrics(agent->getHeartBeatDelay());
-
-    c2_initialized_ = true;
-  } else {
-    if (!flow_update_) {
-      return;
-    }
+  if (c2_initialized_ && !flow_update_) {
+    return;
   }
+
   device_information_.clear();
   component_metrics_.clear();
   component_metrics_by_id_.clear();
-  agent_information_.clear();
-  std::string class_csv;
 
+  std::string class_csv;
   if (root_ != nullptr) {
     std::shared_ptr<state::response::QueueMetrics> queueMetrics = std::make_shared<state::response::QueueMetrics>();
 
@@ -485,15 +479,6 @@
     repoMetrics->addRepository(flow_file_repo_);
 
     device_information_[repoMetrics->getName()] = repoMetrics;
-
-    std::shared_ptr<state::response::AgentInformationWithManifest> manifest = std::make_shared<state::response::AgentInformationWithManifest>("agentInformation");
-    auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(manifest);
-
-    if (identifier != nullptr) {
-      identifier->setIdentifier(identifier_str);
-      identifier->setAgentClass(class_str);
-      agent_information_[manifest->getName()] = manifest;
-    }
   }
 
   if (configuration_->get("nifi.c2.root.classes", class_csv)) {
@@ -611,6 +596,14 @@
   }
 
   loadC2ResponseConfiguration();
+
+  if (!c2_initialized_) {
+    c2_agent_ = std::unique_ptr<c2::C2Agent>(new c2::C2Agent(std::dynamic_pointer_cast<FlowController>(shared_from_this()),
+                                                             std::dynamic_pointer_cast<FlowController>(shared_from_this()),
+                                                             configuration_));
+    c2_agent_->start();
+    c2_initialized_ = true;
+  }
 }
 
 void FlowController::loadC2ResponseConfiguration(const std::string &prefix) {
@@ -917,57 +910,44 @@
   return -1;
 }
 
-int16_t FlowController::getResponseNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector, uint16_t metricsClass) {
+std::shared_ptr<state::response::ResponseNode> FlowController::getMetricsNode(const std::string& metricsClass) const {
   std::lock_guard<std::mutex> lock(metrics_mutex_);
-
-  for (auto metric : root_response_nodes_) {
-    metric_vector.push_back(metric.second);
-  }
-
-  return 0;
-}
-
-int16_t FlowController::getMetricsNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector, uint16_t metricsClass) {
-  std::lock_guard<std::mutex> lock(metrics_mutex_);
-  if (metricsClass == 0) {
-    for (auto metric : device_information_) {
-      metric_vector.push_back(metric.second);
+  if (!metricsClass.empty()) {
+    const auto citer = component_metrics_.find(metricsClass);
+    if (citer != component_metrics_.end()) {
+      return citer->second;
     }
   } else {
-    auto metrics = component_metrics_by_id_[metricsClass];
-    for (const auto &metric : metrics) {
-      metric_vector.push_back(metric);
+    const auto iter = root_response_nodes_.find("metrics");
+    if (iter != root_response_nodes_.end()) {
+      return iter->second;
     }
   }
-  return 0;
+  return nullptr;
 }
 
-int16_t FlowController::getManifestNodes(std::vector<std::shared_ptr<state::response::ResponseNode>>& manifest_vector) const {
-    std::lock_guard<std::mutex> lock(metrics_mutex_);
-    for (const auto& metric : agent_information_) {
-        manifest_vector.push_back(metric.second);
+std::vector<std::shared_ptr<state::response::ResponseNode>> FlowController::getHeartbeatNodes(bool includeManifest) const {
+  std::string fullHb{"true"};
+  configuration_->get("nifi.c2.full.heartbeat", fullHb);
+  const bool include = includeManifest ? true : (fullHb == "true");
+
+  std::vector<std::shared_ptr<state::response::ResponseNode>> nodes;
+  for (const auto& entry : root_response_nodes_) {
+    auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(entry.second);
+    if (identifier) {
+      identifier->includeAgentManifest(include);
     }
-    return 0;
+    nodes.push_back(entry.second);
+  }
+  return nodes;
 }
 
-std::shared_ptr<state::response::ResponseNode> FlowController::getAgentInformation() const {
-    auto agentInfo = std::make_shared<state::response::AgentInformation>("agentInfo");
-    auto identifier = std::dynamic_pointer_cast<state::response::AgentIdentifier>(agentInfo);
-
-    if (identifier != nullptr) {
-      std::string class_str;
-      configuration_->get("nifi.c2.agent.class", "c2.agent.class", class_str);
-
-      std::string identifier_str;
-      if (!configuration_->get("nifi.c2.agent.identifier", "c2.agent.identifier", identifier_str) || identifier_str.empty()) {
-        identifier_str = uuidStr_;
-      }
-
-      identifier->setIdentifier(identifier_str);
-      identifier->setAgentClass(class_str);
-      return agentInfo;
-    }
-    return nullptr;
+std::shared_ptr<state::response::ResponseNode> FlowController::getAgentManifest() const {
+  auto agentInfo = std::make_shared<state::response::AgentInformation>("agentInfo");
+  agentInfo->setIdentifier(configuration_->getAgentIdentifier());
+  agentInfo->setAgentClass(configuration_->getAgentClass());
+  agentInfo->includeAgentStatus(false);
+  return agentInfo;
 }
 
 std::vector<std::shared_ptr<state::StateController>> FlowController::getAllComponents() {
@@ -1029,14 +1009,7 @@
 }
 
 std::vector<BackTrace> FlowController::getTraces() {
-  std::vector<BackTrace> traces;
-  auto timer_driven = timer_scheduler_->getTraces();
-  traces.insert(traces.end(), std::make_move_iterator(timer_driven.begin()), std::make_move_iterator(timer_driven.end()));
-  auto event_driven = event_scheduler_->getTraces();
-  traces.insert(traces.end(), std::make_move_iterator(event_driven.begin()), std::make_move_iterator(event_driven.end()));
-  auto cron_driven = cron_scheduler_->getTraces();
-  traces.insert(traces.end(), std::make_move_iterator(cron_driven.begin()), std::make_move_iterator(cron_driven.end()));
-  // repositories
+  std::vector<BackTrace> traces{thread_pool_.getTraces()};
   auto prov_repo_trace = provenance_repo_->getTraces();
   traces.emplace_back(std::move(prov_repo_trace));
   auto flow_repo_trace = flow_file_repo_->getTraces();
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 77d3f7a..6670a73 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -32,13 +32,17 @@
 #include "utils/file/FileUtils.h"
 #include "utils/file/FileManager.h"
 #include "utils/HTTPClient.h"
+#include "utils/GeneralUtils.h"
+#include "utils/Monitors.h"
+
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace c2 {
 
-C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller, const std::shared_ptr<state::StateMonitor> &updateSink,
+C2Agent::C2Agent(const std::shared_ptr<core::controller::ControllerServiceProvider> &controller,
+                 const std::shared_ptr<state::StateMonitor> &updateSink,
                  const std::shared_ptr<Configure> &configuration)
     : heart_beat_period_(3000),
       max_c2_responses(5),
@@ -47,7 +51,8 @@
       controller_(controller),
       configuration_(configuration),
       protocol_(nullptr),
-      logger_(logging::LoggerFactory<C2Agent>::getLogger()) {
+      logger_(logging::LoggerFactory<C2Agent>::getLogger()),
+      thread_pool_(2, false, nullptr, "C2 threadpool") {
   allow_updates_ = true;
 
   manifest_sent_ = false;
@@ -67,12 +72,10 @@
   configure(configuration, false);
 
   c2_producer_ = [&]() {
-    auto now = std::chrono::steady_clock::now();
-    auto time_since = std::chrono::duration_cast<std::chrono::milliseconds>(now - last_run_).count();
-
     // place priority on messages to send to the c2 server
-      if ( protocol_.load() != nullptr && request_mutex.try_lock_until(now + std::chrono::seconds(1)) ) {
-        if (requests.size() > 0) {
+      if (protocol_.load() != nullptr && request_mutex.try_lock_for(std::chrono::seconds(1))) {
+        std::lock_guard<std::timed_mutex> lock(request_mutex, std::adopt_lock);
+        if (!requests.empty()) {
           int count = 0;
           do {
             const C2Payload payload(std::move(requests.back()));
@@ -87,48 +90,72 @@
             catch(...) {
               logger_->log_error("Unknonwn exception occurred while consuming payload.");
             }
-          }while(requests.size() > 0 && ++count < max_c2_responses);
+          }while(!requests.empty() && ++count < max_c2_responses);
         }
-        request_mutex.unlock();
       }
-
-      if ( time_since > heart_beat_period_ ) {
-        last_run_ = now;
-        try {
-          performHeartBeat();
-        }
-        catch(const std::exception &e) {
-          logger_->log_error("Exception occurred while performing heartbeat. error: %s", e.what());
-        }
-        catch(...) {
-          logger_->log_error("Unknonwn exception occurred while performing heartbeat.");
-        }
+      try {
+        performHeartBeat();
+      }
+      catch(const std::exception &e) {
+        logger_->log_error("Exception occurred while performing heartbeat. error: %s", e.what());
+      }
+      catch(...) {
+        logger_->log_error("Unknonwn exception occurred while performing heartbeat.");
       }
 
       checkTriggers();
 
-      std::this_thread::sleep_for(std::chrono::milliseconds(heart_beat_period_ > 500 ? 500 : heart_beat_period_));
-      return state::Update(state::UpdateStatus(state::UpdateState::READ_COMPLETE, false));
+      return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(heart_beat_period_));
     };
-
   functions_.push_back(c2_producer_);
 
   c2_consumer_ = [&]() {
-    auto now = std::chrono::steady_clock::now();
-    if ( queue_mutex.try_lock_until(now + std::chrono::seconds(1)) ) {
-      if (responses.size() > 0) {
-        const C2Payload payload(std::move(responses.back()));
+    if ( queue_mutex.try_lock_for(std::chrono::seconds(1)) ) {
+      C2Payload payload(Operation::HEARTBEAT);
+      {
+        std::lock_guard<std::timed_mutex> lock(queue_mutex, std::adopt_lock);
+        if (responses.empty()) {
+          return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS));
+        }
+        payload = std::move(responses.back());
         responses.pop_back();
-        extractPayload(std::move(payload));
       }
-      queue_mutex.unlock();
+      extractPayload(std::move(payload));
     }
-    return state::Update(state::UpdateStatus(state::UpdateState::READ_COMPLETE, false));
+    return utils::TaskRescheduleInfo::RetryIn(std::chrono::milliseconds(C2RESPONSE_POLL_MS));
   };
-
   functions_.push_back(c2_consumer_);
 }
 
+void C2Agent::start() {
+  if (controller_running_) {
+    return;
+  }
+  task_ids_.clear();
+  for (const auto& function : functions_) {
+    utils::Identifier uuid;
+    utils::IdGenerator::getIdGenerator()->generate(uuid);
+    const std::string uuid_str = uuid.to_string();
+    task_ids_.push_back(uuid_str);
+    auto monitor = utils::make_unique<utils::ComplexMonitor>();
+    utils::Worker<utils::TaskRescheduleInfo> functor(function, uuid_str, std::move(monitor));
+    std::future<utils::TaskRescheduleInfo> future;
+    thread_pool_.execute(std::move(functor), future);
+  }
+  controller_running_ = true;
+  thread_pool_.start();
+  logger_->log_info("C2 agent started");
+}
+
+void C2Agent::stop() {
+  controller_running_ = false;
+  for (const auto& id : task_ids_) {
+    thread_pool_.stopTasks(id);
+  }
+  thread_pool_.shutdown();
+  logger_->log_info("C2 agent stopped");
+}
+
 void C2Agent::checkTriggers() {
   logger_->log_debug("Checking %d triggers", triggers_.size());
   for (const auto &trigger : triggers_) {
@@ -275,60 +302,25 @@
 
 void C2Agent::performHeartBeat() {
   C2Payload payload(Operation::HEARTBEAT);
-
   logger_->log_trace("Performing heartbeat");
-
-  std::map<std::string, std::shared_ptr<state::response::ResponseNode>> metrics_copy;
-  {
-    std::lock_guard<std::timed_mutex> lock(metrics_mutex_);
-    if (metrics_map_.size() > 0) {
-      metrics_copy = std::move(metrics_map_);
-    }
-  }
-
-  if (metrics_copy.size() > 0) {
-    C2Payload metrics(Operation::HEARTBEAT);
-    metrics.setLabel("metrics");
-
-    for (auto metric : metrics_copy) {
-      if (metric.second->serialize().size() == 0)
-        continue;
-      C2Payload child_metric_payload(Operation::HEARTBEAT);
-      child_metric_payload.setLabel(metric.first);
-      serializeMetrics(child_metric_payload, metric.first, metric.second->serialize(), metric.second->isArray());
-      metrics.addPayload(std::move(child_metric_payload));
-    }
-    payload.addPayload(std::move(metrics));
-  }
-
-  for (auto metric : root_response_nodes_) {
-    C2Payload child_metric_payload(Operation::HEARTBEAT);
-    bool isArray{false};
-    std::string metricName;
-    std::vector<state::response::SerializedResponseNode> metrics;
-    std::shared_ptr<state::response::NodeReporter> reporter;
-    std::shared_ptr<state::response::ResponseNode> agentInfo;
-
-    // Send agent manifest in first heartbeat
-    if (!manifest_sent_
-        && (reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_))
-        && (agentInfo = reporter->getAgentInformation())
-        && metric.first == agentInfo->getName()) {
-      metricName = agentInfo->getName();
-      isArray = agentInfo->isArray();
-      metrics = agentInfo->serialize();
+  std::shared_ptr<state::response::NodeReporter> reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_);
+  std::vector<std::shared_ptr<state::response::ResponseNode>> metrics;
+  if (reporter) {
+    if (!manifest_sent_) {
+      // include agent manifest for the first heartbeat
+      metrics = reporter->getHeartbeatNodes(true);
       manifest_sent_ = true;
     } else {
-      metricName = metric.first;
-      isArray = metric.second->isArray();
-      metrics = metric.second->serialize();
+      metrics = reporter->getHeartbeatNodes(false);
     }
-    child_metric_payload.setLabel(metricName);
-    if (isArray) {
-      child_metric_payload.setContainer(true);
+
+    for (const auto& metric : metrics) {
+      C2Payload child_metric_payload(Operation::HEARTBEAT);
+      child_metric_payload.setLabel(metric->getName());
+      child_metric_payload.setContainer(metric->isArray());
+      serializeMetrics(child_metric_payload, metric->getName(), metric->serialize(), metric->isArray());
+      payload.addPayload(std::move(child_metric_payload));
     }
-    serializeMetrics(child_metric_payload, metricName, metrics, isArray);
-    payload.addPayload(std::move(child_metric_payload));
   }
   C2Payload && response = protocol_.load()->consumePayload(payload);
 
@@ -456,7 +448,7 @@
       update_sink_->stop(true);
       C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
       protocol_.load()->consumePayload(std::move(response));
-      exit(1);
+      restart_agent();
     }
       break;
     case Operation::START:
@@ -466,7 +458,6 @@
       }
 
       std::vector<std::shared_ptr<state::StateController>> components = update_sink_->getComponents(resp.name);
-
       // stop all referenced components.
       for (auto &component : components) {
         logger_->log_debug("Stopping component %s", component->getComponentName());
@@ -516,54 +507,39 @@
  * to be put into the acknowledgement
  */
 void C2Agent::handle_describe(const C2ContentResponse &resp) {
+  auto reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_);
   if (resp.name == "metrics") {
-    auto reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_);
-
+    C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
     if (reporter != nullptr) {
-      auto metricsClass = resp.operation_arguments.find("metricsClass");
-      uint8_t metric_class_id = 0;
-      if (metricsClass != resp.operation_arguments.end()) {
-        // we have a class
-        try {
-          metric_class_id = std::stoi(metricsClass->second.to_string());
-        } catch (...) {
-          logger_->log_error("Could not convert %s into an integer", metricsClass->second.to_string());
-        }
+      auto iter = resp.operation_arguments.find("metricsClass");
+      std::string metricsClass;
+      if (iter != resp.operation_arguments.end()) {
+        metricsClass = iter->second.to_string();
       }
-
-      std::vector<std::shared_ptr<state::response::ResponseNode>> metrics_vec;
-      C2Payload response(Operation::ACKNOWLEDGE, resp.ident, false, true);
+      auto metricsNode = reporter->getMetricsNode(metricsClass);
       C2Payload metrics(Operation::ACKNOWLEDGE);
-      metrics.setLabel("metrics");
-      reporter->getResponseNodes(metrics_vec, 0);
-      for (auto metric : metrics_vec) {
-        serializeMetrics(metrics, metric->getName(), metric->serialize());
+      metricsClass.empty() ? metrics.setLabel("metrics") : metrics.setLabel(metricsClass);
+      if (metricsNode) {
+        serializeMetrics(metrics, metricsNode->getName(), metricsNode->serialize(), metricsNode->isArray());
       }
       response.addPayload(std::move(metrics));
-      enqueue_c2_response(std::move(response));
     }
-
+    enqueue_c2_response(std::move(response));
+    return;
   } else if (resp.name == "configuration") {
     auto configOptions = prepareConfigurationOptions(resp);
     enqueue_c2_response(std::move(configOptions));
     return;
   } else if (resp.name == "manifest") {
     C2Payload response(prepareConfigurationOptions(resp));
-
-    auto reporter = std::dynamic_pointer_cast<state::response::NodeReporter>(update_sink_);
     if (reporter != nullptr) {
-      std::vector<std::shared_ptr<state::response::ResponseNode>> metrics_vec;
-
       C2Payload agentInfo(Operation::ACKNOWLEDGE, resp.ident, false, true);
       agentInfo.setLabel("agentInfo");
 
-      reporter->getManifestNodes(metrics_vec);
-      for (const auto& metric : metrics_vec) {
-          serializeMetrics(agentInfo, metric->getName(), metric->serialize());
-      }
+      const auto manifest = reporter->getAgentManifest();
+      serializeMetrics(agentInfo, manifest->getName(), manifest->serialize());
       response.addPayload(std::move(agentInfo));
     }
-
     enqueue_c2_response(std::move(response));
     return;
   } else if (resp.name == "jstack") {
@@ -839,7 +815,8 @@
   }
 
   std::stringstream command;
-  command << cwd << "/minifi.sh restart";
+  command << cwd << "/bin/minifi.sh restart";
+  system(command.str().c_str());
 }
 
 void C2Agent::update_agent() {
@@ -848,26 +825,6 @@
   }
 }
 
-int16_t C2Agent::setResponseNodes(const std::shared_ptr<state::response::ResponseNode> &metric) {
-  auto now = std::chrono::steady_clock::now();
-  if (metrics_mutex_.try_lock_until(now + std::chrono::seconds(1))) {
-    root_response_nodes_[metric->getName()] = metric;
-    metrics_mutex_.unlock();
-    return 0;
-  }
-  return -1;
-}
-
-int16_t C2Agent::setMetricsNodes(const std::shared_ptr<state::response::ResponseNode> &metric) {
-  auto now = std::chrono::steady_clock::now();
-  if (metrics_mutex_.try_lock_until(now + std::chrono::seconds(1))) {
-    metrics_map_[metric->getName()] = metric;
-    metrics_mutex_.unlock();
-    return 0;
-  }
-  return -1;
-}
-
 } /* namespace c2 */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/libminifi/src/core/Core.cpp b/libminifi/src/core/Core.cpp
index 1e95194..0cc6932 100644
--- a/libminifi/src/core/Core.cpp
+++ b/libminifi/src/core/Core.cpp
@@ -26,8 +26,6 @@
 namespace minifi {
 namespace core {
 
-std::shared_ptr<utils::IdGenerator> CoreComponent::id_generator_ = utils::IdGenerator::getIdGenerator();
-
 // Set UUID
 void CoreComponent::setUUID(utils::Identifier &uuid) {
   uuid_ = uuid;
diff --git a/libminifi/src/core/state/StateManager.cpp b/libminifi/src/core/state/StateManager.cpp
deleted file mode 100644
index ce46bc5..0000000
--- a/libminifi/src/core/state/StateManager.cpp
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "core/state/StateManager.h"
-#include <memory>
-#include <utility>
-#include <vector>
-
-#include "core/state/nodes/MetricsBase.h"
-
-namespace org {
-namespace apache {
-namespace nifi {
-namespace minifi {
-namespace state {
-
-void StateManager::initialize() {
-  metrics_listener_ = std::unique_ptr<state::response::TreeUpdateListener>(new state::response::TreeUpdateListener(shared_from_this(), shared_from_this()));
-  // manually add the c2 agent for now
-  listener_thread_pool_.setMaxConcurrentTasks(2);
-  listener_thread_pool_.start();
-  controller_running_ = true;
-}
-/**
- * State management operations.
- */
-int16_t StateManager::stop(bool force, uint64_t timeToWait) {
-  controller_running_ = false;
-  listener_thread_pool_.shutdown();
-  return 1;
-}
-
-int16_t StateManager::update(const std::shared_ptr<Update> &updateController) {
-  // must be stopped to update.
-  if (isStateMonitorRunning()) {
-    return -1;
-  }
-  int16_t ret = applyUpdate("StateManager", updateController);
-  switch (ret) {
-    case -1:
-      return -1;
-    default:
-      return 1;
-  }
-}
-
-/**
- * Passes metrics to the update controllers if they are a metrics sink.
- * @param metrics metric to pass through
- */
-int16_t StateManager::setResponseNodes(const std::shared_ptr<response::ResponseNode> &metrics) {
-  if (IsNullOrEmpty(metrics)) {
-    return -1;
-  }
-  auto now = std::chrono::steady_clock::now();
-  if (mutex_.try_lock_until(now + std::chrono::milliseconds(100))) {
-    // update controllers can be metric sinks too
-    for (auto controller : updateControllers) {
-      std::shared_ptr<response::ResponseNodeSink> sink = std::dynamic_pointer_cast<response::ResponseNodeSink>(controller);
-      if (sink != nullptr) {
-        sink->setResponseNodes(metrics);
-      }
-    }
-    metrics_maps_[metrics->getName()] = metrics;
-    mutex_.unlock();
-  } else {
-    return -1;
-  }
-  return 0;
-}
-/**
- * Metrics operations
- */
-int16_t StateManager::getResponseNodes(std::vector<std::shared_ptr<response::ResponseNode>> &metric_vector, uint16_t metricsClass) {
-  auto now = std::chrono::steady_clock::now();
-  const std::chrono::steady_clock::time_point wait_time = now + std::chrono::milliseconds(100);
-  if (mutex_.try_lock_until(wait_time)) {
-    for (auto metric : metrics_maps_) {
-      metric_vector.push_back(metric.second);
-    }
-    mutex_.unlock();
-    return 0;
-  }
-  return -1;
-}
-
-bool StateManager::registerUpdateListener(const std::shared_ptr<UpdateController> &updateController, const int64_t &delay) {
-  auto functions = updateController->getFunctions();
-
-  updateControllers.push_back(updateController);
-  // run all functions independently
-
-  for (auto function : functions) {
-    std::unique_ptr<utils::AfterExecute<Update>> after_execute = std::unique_ptr<utils::AfterExecute<Update>>(new UpdateRunner(isStateMonitorRunning(), delay));
-    utils::Worker<Update> functor(function, "listeners", std::move(after_execute));
-    std::future<Update> future;
-    if (!listener_thread_pool_.execute(std::move(functor), future)) {
-      // denote failure
-      return false;
-    }
-  }
-  return true;
-}
-
-/**
- * Base metrics function will employ the default metrics listener.
- */
-bool StateManager::startMetrics(const int64_t &delay) {
-  std::unique_ptr<utils::AfterExecute<Update>> after_execute = std::unique_ptr<utils::AfterExecute<Update>>(new UpdateRunner(isStateMonitorRunning(), delay));
-  utils::Worker<Update> functor(metrics_listener_->getFunction(), "metrics", std::move(after_execute));
-  if (!listener_thread_pool_.execute(std::move(functor), metrics_listener_->getFuture())) {
-    // denote failure
-    return false;
-  }
-  return true;
-}
-
-} /* namespace state */
-} /* namespace minifi */
-} /* namespace nifi */
-} /* namespace apache */
-} /* namespace org */
diff --git a/libminifi/src/utils/BackTrace.cpp b/libminifi/src/utils/BackTrace.cpp
index 282c69f..fba8d9a 100644
--- a/libminifi/src/utils/BackTrace.cpp
+++ b/libminifi/src/utils/BackTrace.cpp
@@ -132,25 +132,21 @@
   // lock so that we only perform one backtrace at a time.
 #ifdef HAS_EXECINFO
   std::lock_guard<std::mutex> lock(mutex_);
-
-  caller_handle_ = pthread_self();
-  thread_handle_ = thread_handle;
   trace_ = BackTrace(std::move(thread_name));
 
-  if (0 == thread_handle_ || pthread_equal(caller_handle_, thread_handle)) {
+  if (0 == thread_handle || pthread_equal(pthread_self(), thread_handle)) {
     pull_trace();
   } else {
-    if (thread_handle_ == 0) {
+    if (thread_handle == 0) {
       return std::move(trace_);
     }
     emplace_handler();
-    if (pthread_kill(thread_handle_, SIGUSR2) != 0) {
+    std::unique_lock<std::mutex> ulock(trace_mutex_);
+    if (pthread_kill(thread_handle, SIGUSR2) != 0) {
       return std::move(trace_);
     }
-    sigset_t mask;
-    sigfillset(&mask);
-    sigdelset(&mask, SIGUSR2);
-    sigsuspend(&mask);
+    pull_traces_ = false;
+    trace_condition_.wait(ulock, [this] { return pull_traces_; });
   }
 #else
   // even if tracing is disabled, include thread name into the trace object
@@ -159,15 +155,10 @@
   return std::move(trace_);
 }
 #ifdef HAS_EXECINFO
-static void handler(int, siginfo_t*, void*) {
-  // not the intended thread
-  if (!pthread_equal(pthread_self(), TraceResolver::getResolver().getThreadHandle())) {
-    return;
-  }
-
+void handler(int signr, siginfo_t *info, void *secret) {
+  std::unique_lock<std::mutex> lock(TraceResolver::getResolver().lock());
   pull_trace();
-
-  pthread_kill(TraceResolver::getResolver().getCallerHandle(), SIGUSR2);
+  TraceResolver::getResolver().notifyPullTracesDone(lock);
 }
 #endif
 
diff --git a/libminifi/src/utils/ThreadPool.cpp b/libminifi/src/utils/ThreadPool.cpp
index 01651d8..7bafd60 100644
--- a/libminifi/src/utils/ThreadPool.cpp
+++ b/libminifi/src/utils/ThreadPool.cpp
@@ -16,7 +16,7 @@
  */
 
 #include "utils/ThreadPool.h"
-#include "core/state/StateManager.h"
+#include "core/state/UpdateController.h"
 
 namespace org {
 namespace apache {
diff --git a/libminifi/test/integration/IntegrationBase.h b/libminifi/test/integration/IntegrationBase.h
index 30f771a..91bafe8 100644
--- a/libminifi/test/integration/IntegrationBase.h
+++ b/libminifi/test/integration/IntegrationBase.h
@@ -75,7 +75,7 @@
 
   }
 
-  virtual void configureC2RootClasses() {
+  virtual void configureFullHeartbeat() {
 
   }
 
@@ -127,7 +127,7 @@
 
   queryRootProcessGroup(pg);
 
-  configureC2RootClasses();
+  configureFullHeartbeat();
 
   ptr.release();
 
@@ -142,8 +142,9 @@
 
   shutdownBeforeFlowController();
   flowController_->unload();
-  runAssertions();
+  flowController_->stopC2();
 
+  runAssertions();
   cleanup();
 }
 
diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp
index 9b0e671..465ed34 100644
--- a/main/MiNiFiMain.cpp
+++ b/main/MiNiFiMain.cpp
@@ -329,6 +329,8 @@
    */
   controller->waitUnload(stop_wait_time);
 
+  controller->stopC2();
+
   flow_repo = nullptr;
 
   prov_repo = nullptr;
diff --git a/nanofi/include/cxx/Instance.h b/nanofi/include/cxx/Instance.h
index 4684cd3..0a594e4 100644
--- a/nanofi/include/cxx/Instance.h
+++ b/nanofi/include/cxx/Instance.h
@@ -155,9 +155,8 @@
     // run all functions independently
 
     for (auto function : functions) {
-      std::unique_ptr<utils::AfterExecute<state::Update>> after_execute = std::unique_ptr<utils::AfterExecute<state::Update>>(new state::UpdateRunner(running_, delay));
-      utils::Worker<state::Update> functor(function, "listeners", std::move(after_execute));
-      std::future<state::Update> future;
+      utils::Worker<utils::TaskRescheduleInfo> functor(function, "listeners");
+      std::future<utils::TaskRescheduleInfo> future;
       if (!listener_thread_pool_.execute(std::move(functor), future)) {
         // denote failure
         return false;
@@ -181,7 +180,7 @@
   std::string url_;
   std::shared_ptr<Configure> configure_;
 
-  utils::ThreadPool<state::Update> listener_thread_pool_{ 1 };
+  utils::ThreadPool<utils::TaskRescheduleInfo> listener_thread_pool_;
 };
 
 } /* namespace minifi */