MINIFICPP-1273 - Drain connections on flow shutdown

Signed-off-by: Arpad Boda <aboda@apache.org>

This closes #827
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 700b95f..3479070 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -766,6 +766,8 @@
 ## Add KeyValueStorageService tests
 registerTest("${TEST_DIR}/keyvalue-tests")
 
+registerTest("${TEST_DIR}/flow-tests")
+
 include(BuildDocs)
 
 include(DockerConfig)
diff --git a/libminifi/include/Connection.h b/libminifi/include/Connection.h
index 64a805e..5a40d87 100644
--- a/libminifi/include/Connection.h
+++ b/libminifi/include/Connection.h
@@ -168,7 +168,7 @@
   // Poll the flow file from queue, the expired flow file record also being returned
   std::shared_ptr<core::FlowFile> poll(std::set<std::shared_ptr<core::FlowFile>> &expiredFlowRecords);
   // Drain the flow records
-  void drain();
+  void drain(bool delete_permanently);
 
   void yield() override {}
 
diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h
index fcbb78a..9944658 100644
--- a/libminifi/include/core/ProcessGroup.h
+++ b/libminifi/include/core/ProcessGroup.h
@@ -228,6 +228,8 @@
 
   void getConnections(std::map<std::string, std::shared_ptr<Connectable>> &connectionMap);
 
+  void drainConnections();
+
  protected:
   void startProcessingProcessors(const std::shared_ptr<TimerDrivenSchedulingAgent> timeScheduler, const std::shared_ptr<EventDrivenSchedulingAgent> &eventScheduler, const std::shared_ptr<CronDrivenSchedulingAgent> &cronScheduler); // NOLINT
 
diff --git a/libminifi/src/Connection.cpp b/libminifi/src/Connection.cpp
index 9038e91..06cfcce 100644
--- a/libminifi/src/Connection.cpp
+++ b/libminifi/src/Connection.cpp
@@ -262,15 +262,17 @@
   return NULL;
 }
 
-void Connection::drain() {
+void Connection::drain(bool delete_permanently) {
   std::lock_guard<std::mutex> lock(mutex_);
 
   while (!queue_.empty()) {
     std::shared_ptr<core::FlowFile> item = queue_.front();
     queue_.pop();
-    logger_->log_debug("Delete flow file UUID %s from connection %s, because it expired", item->getUUIDStr(), name_);
-    if (flow_repository_->Delete(item->getUUIDStr())) {
-      item->setStoredToRepository(false);
+    logger_->log_debug("Delete flow file UUID %s from connection %s", item->getUUIDStr(), name_);
+    if (delete_permanently) {
+      if (flow_repository_->Delete(item->getUUIDStr())) {
+        item->setStoredToRepository(false);
+      }
     }
   }
   queued_data_size_ = 0;
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index 3976936..669285b 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -250,6 +250,9 @@
      * -Stopping the schedulers doesn't actually quit the onTrigger functions of processors
      * -They only guarantee that the processors are not scheduled any more
      * -After the threadpool is stopped we can make sure that processors don't need repos and controllers anymore */
+    if (this->root_) {
+      this->root_->drainConnections();
+    }
     this->flow_file_repo_->stop();
     this->provenance_repo_->stop();
     // stop the ControllerServices
@@ -923,7 +926,7 @@
     auto conn = connections.find(connection);
     if (conn != connections.end()) {
       logger_->log_info("Clearing connection %s", connection);
-      conn->second->drain();
+      conn->second->drain(true);
     }
   }
   return -1;
diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp
index 9eec44c..ee88d79 100644
--- a/libminifi/src/core/ProcessGroup.cpp
+++ b/libminifi/src/core/ProcessGroup.cpp
@@ -92,13 +92,12 @@
     onScheduleTimer_->stop();
   }
 
-  for (auto &&connection : connections_) {
-    connection->drain();
+  for (auto&& connection : connections_) {
+    connection->drain(false);
   }
 
-  for (std::set<ProcessGroup *>::iterator it = child_process_groups_.begin(); it != child_process_groups_.end(); ++it) {
-    ProcessGroup *processGroup(*it);
-    delete processGroup;
+  for (ProcessGroup* childGroup : child_process_groups_) {
+    delete childGroup;
   }
 }
 
@@ -403,6 +402,16 @@
   }
 }
 
+void ProcessGroup::drainConnections() {
+  for (auto&& connection : connections_) {
+    connection->drain(false);
+  }
+
+  for (ProcessGroup* childGroup : child_process_groups_) {
+    childGroup->drainConnections();
+  }
+}
+
 } /* namespace core */
 } /* namespace minifi */
 } /* namespace nifi */
diff --git a/libminifi/test/flow-tests/CMakeLists.txt b/libminifi/test/flow-tests/CMakeLists.txt
new file mode 100644
index 0000000..a17cf23
--- /dev/null
+++ b/libminifi/test/flow-tests/CMakeLists.txt
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+file(GLOB FLOW_TESTS  "*.cpp")
+SET(FLOW_TEST_COUNT 0)
+FOREACH(testfile ${FLOW_TESTS})
+    get_filename_component(testfilename "${testfile}" NAME_WE)
+    add_executable("${testfilename}" "${testfile}")
+    createTests("${testfilename}")
+    target_link_libraries(${testfilename} ${CATCH_MAIN_LIB})
+    target_wholearchive_library(${testfilename} minifi-standard-processors)
+    MATH(EXPR FLOW_TEST_COUNT "${FLOW_TEST_COUNT}+1")
+    add_test(NAME "${testfilename}" COMMAND "${testfilename}" WORKING_DIRECTORY ${TEST_DIR})
+ENDFOREACH()
+message("-- Finished building ${FLOW_TEST_COUNT} flow related test file(s)...")
diff --git a/libminifi/test/flow-tests/FlowControllerTests.cpp b/libminifi/test/flow-tests/FlowControllerTests.cpp
new file mode 100644
index 0000000..eb444f6
--- /dev/null
+++ b/libminifi/test/flow-tests/FlowControllerTests.cpp
@@ -0,0 +1,125 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#undef NDEBUG
+#include <chrono>
+#include <map>
+#include <memory>
+#include <string>
+#include <thread>
+
+#include "core/Core.h"
+#include "core/repository/AtomicRepoEntries.h"
+#include "core/RepositoryFactory.h"
+#include "FlowFileRecord.h"
+#include "provenance/Provenance.h"
+#include "properties/Configure.h"
+#include "../unit/ProvenanceTestHelper.h"
+#include "../TestBase.h"
+#include "YamlConfiguration.h"
+
+const char* yamlConfig =
+R"(
+Flow Controller:
+    name: MiNiFi Flow
+    id: 2438e3c8-015a-1000-79ca-83af40ec1990
+Processors:
+  - name: Generator
+    id: 2438e3c8-015a-1000-79ca-83af40ec1991
+    class: org.apache.nifi.processors.standard.GenerateFlowFile
+    max concurrent tasks: 1
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 100 ms
+    penalization period: 300 ms
+    yield period: 100 ms
+    run duration nanos: 0
+    auto-terminated relationships list:
+    Properties:
+      Batch Size: 10
+  - name: LogAttribute
+    id: 2438e3c8-015a-1000-79ca-83af40ec1992
+    class: org.apache.nifi.processors.standard.LogAttribute
+    max concurrent tasks: 1
+    scheduling strategy: TIMER_DRIVEN
+    scheduling period: 1000 sec
+    penalization period: 30 sec
+    yield period: 1 sec
+    run duration nanos: 0
+    auto-terminated relationships list:
+
+Connections:
+  - name: Gen
+    id: 2438e3c8-015a-1000-79ca-83af40ec1997
+    source name: Generator
+    source id: 2438e3c8-015a-1000-79ca-83af40ec1991
+    source relationship name: success
+    destination name: LogAttribute
+    destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
+    max work queue size: 0
+    max work queue data size: 1 MB
+    flowfile expiration: 60 sec
+
+Remote Processing Groups:
+
+)";
+
+TEST_CASE("Flow shutdown drains connections", "[TestFlow1]") {
+  TestController testController;
+  char format[] = "/tmp/flowTest.XXXXXX";
+  std::string dir = testController.createTempDirectory(format);
+
+  std::string yamlPath = utils::file::FileUtils::concat_path(dir, "config.yml");
+  std::ofstream{yamlPath} << yamlConfig;
+
+  std::shared_ptr<minifi::Configure> configuration = std::make_shared<minifi::Configure>();
+  std::shared_ptr<core::Repository> prov_repo = std::make_shared<TestRepository>();
+  std::shared_ptr<core::Repository> ff_repo = std::make_shared<TestFlowRepository>();
+  std::shared_ptr<core::ContentRepository> content_repo = std::make_shared<core::repository::VolatileContentRepository>();
+
+  configuration->set(minifi::Configure::nifi_flow_configuration_file, yamlPath);
+
+  REQUIRE(content_repo->initialize(configuration));
+  std::shared_ptr<minifi::io::StreamFactory> stream_factory = minifi::io::StreamFactory::getInstance(configuration);
+
+  std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap;
+  std::unique_ptr<core::FlowConfiguration> flow = utils::make_unique<core::YamlConfiguration>(prov_repo, ff_repo, content_repo, stream_factory, configuration, yamlPath);
+  std::shared_ptr<core::ProcessGroup> root = flow->getRoot();
+  std::shared_ptr<minifi::FlowController> controller = std::make_shared<minifi::FlowController>(
+      prov_repo, ff_repo, configuration,
+      std::move(flow),
+      content_repo, DEFAULT_ROOT_GROUP_NAME, true);
+
+
+  root->getConnections(connectionMap);
+  // adds the single connection to the map both by name and id
+  REQUIRE(connectionMap.size() == 2);
+  controller->load(root);
+  controller->start();
+
+  std::this_thread::sleep_for(std::chrono::milliseconds{1000});
+
+  for (auto& it : connectionMap) {
+    REQUIRE(it.second->getQueueSize() > 10);
+  }
+
+  controller->stop(true);
+
+  for (auto& it : connectionMap) {
+    REQUIRE(it.second->isEmpty());
+  }
+}