/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#undef NDEBUG
#include <chrono>
#include <map>
#include <memory>
#include <string>
#include <thread>

#include "core/Core.h"
#include "core/repository/AtomicRepoEntries.h"
#include "core/RepositoryFactory.h"
#include "FlowFileRecord.h"
#include "provenance/Provenance.h"
#include "properties/Configure.h"
#include "../unit/ProvenanceTestHelper.h"
#include "../TestBase.h"
#include "YamlConfiguration.h"
#include "CustomProcessors.h"
#include "TestControllerWithFlow.h"

const char* yamlConfig =
    R"(
Flow Controller:
    name: MiNiFi Flow
    id: 2438e3c8-015a-1000-79ca-83af40ec1990
Processors:
  - name: Generator
    id: 2438e3c8-015a-1000-79ca-83af40ec1991
    class: org.apache.nifi.processors.standard.TestFlowFileGenerator
    max concurrent tasks: 1
    scheduling strategy: TIMER_DRIVEN
    scheduling period: 100 ms
    penalization period: 300 ms
    yield period: 100 ms
    run duration nanos: 0
    auto-terminated relationships list:
    Properties:
      Batch Size: 3
  - name: TestProcessor
    id: 2438e3c8-015a-1000-79ca-83af40ec1992
    class: org.apache.nifi.processors.standard.TestProcessor
    max concurrent tasks: 1
    scheduling strategy: TIMER_DRIVEN
    scheduling period: 100 ms
    penalization period: 3 sec
    yield period: 1 sec
    run duration nanos: 0
    auto-terminated relationships list:
      - apple
      - banana
Connections:
  - name: Gen
    id: 2438e3c8-015a-1000-79ca-83af40ec1997
    source name: Generator
    source id: 2438e3c8-015a-1000-79ca-83af40ec1991
    source relationship name: success
    destination name: TestProcessor
    destination id: 2438e3c8-015a-1000-79ca-83af40ec1992
    max work queue size: 0
    max work queue data size: 1 MB
    flowfile expiration: 60 sec
Remote Processing Groups:
)";

template<typename Fn>
bool verifyWithBusyWait(std::chrono::milliseconds timeout, Fn&& fn) {
  auto start = std::chrono::steady_clock::now();
  while (std::chrono::steady_clock::now() - start < timeout) {
    if (fn()) {
      return true;
    }
  }
  return false;
}

TEST_CASE("Flow shutdown drains connections", "[TestFlow1]") {
  TestControllerWithFlow testController(yamlConfig);
  auto controller = testController.controller_;
  auto root = testController.root_;

  testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, "100 ms");

  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessorByName("TestProcessor"));
  // prevent execution of the consumer processor
  sinkProc->yield(10000);

  std::map<std::string, std::shared_ptr<minifi::Connection>> connectionMap;

  root->getConnections(connectionMap);
  // adds the single connection to the map both by name and id
  REQUIRE(connectionMap.size() == 2);

  testController.startFlow();

  // wait for the generator to create some files
  std::this_thread::sleep_for(std::chrono::milliseconds{1000});

  for (auto& it : connectionMap) {
    REQUIRE(it.second->getQueueSize() > 10);
  }

  controller->stop();

  REQUIRE(sinkProc->trigger_count == 0);

  for (auto& it : connectionMap) {
    REQUIRE(it.second->isEmpty());
  }
}

TEST_CASE("Flow shutdown waits for a while", "[TestFlow2]") {
  TestControllerWithFlow testController(yamlConfig);
  auto controller = testController.controller_;
  auto root = testController.root_;

  testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, "10 s");

  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessorByName("Generator"));
  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessorByName("TestProcessor"));

  std::promise<void> execSinkPromise;
  std::future<void> execSinkFuture = execSinkPromise.get_future();
  sinkProc->onTriggerCb_ = [&] {
    execSinkFuture.wait();
  };

  testController.startFlow();

  // wait for the source processor to enqueue its flowFiles
  auto flowFilesEnqueued = [&] {return root->getTotalFlowFileCount() == 3;};
  REQUIRE(verifyWithBusyWait(std::chrono::milliseconds{50}, flowFilesEnqueued));

  REQUIRE(sourceProc->trigger_count.load() == 1);

  execSinkPromise.set_value();
  controller->stop();

  REQUIRE(sourceProc->trigger_count.load() == 1);
  REQUIRE(sinkProc->trigger_count.load() == 3);
}

TEST_CASE("Flow stopped after grace period", "[TestFlow3]") {
  TestControllerWithFlow testController(yamlConfig);
  auto controller = testController.controller_;
  auto root = testController.root_;

  testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, "1000 ms");

  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessorByName("Generator"));
  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessorByName("TestProcessor"));

  std::promise<void> execSinkPromise;
  std::future<void> execSinkFuture = execSinkPromise.get_future();
  sinkProc->onTriggerCb_ = [&]{
    execSinkFuture.wait();
    static std::atomic<bool> first_onTrigger{true};
    bool isFirst = true;
    // sleep only on the first trigger
    if (first_onTrigger.compare_exchange_strong(isFirst, false)) {
      std::this_thread::sleep_for(std::chrono::milliseconds{1500});
    }
  };

  testController.startFlow();

  // wait for the source processor to enqueue its flowFiles
  auto flowFilesEnqueued = [&] {return root->getTotalFlowFileCount() == 3;};
  REQUIRE(verifyWithBusyWait(std::chrono::milliseconds{50}, flowFilesEnqueued));

  REQUIRE(sourceProc->trigger_count.load() == 1);

  execSinkPromise.set_value();
  controller->stop();

  REQUIRE(sourceProc->trigger_count.load() == 1);
  REQUIRE(sinkProc->trigger_count.load() == 1);
}

TEST_CASE("Extend the waiting period during shutdown", "[TestFlow4]") {
  TestControllerWithFlow testController(yamlConfig);
  auto controller = testController.controller_;
  auto root = testController.root_;

  unsigned int timeout_ms = 1000;

  testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, std::to_string(timeout_ms) + " ms");

  auto sourceProc = std::static_pointer_cast<minifi::processors::TestFlowFileGenerator>(root->findProcessorByName("Generator"));
  auto sinkProc = std::static_pointer_cast<minifi::processors::TestProcessor>(root->findProcessorByName("TestProcessor"));

  std::promise<void> execSinkPromise;
  std::future<void> execSinkFuture = execSinkPromise.get_future();
  sinkProc->onTriggerCb_ = [&]{
    execSinkFuture.wait();
    static std::atomic<bool> first_onTrigger{true};
    bool isFirst = true;
    // sleep only on the first trigger
    if (first_onTrigger.compare_exchange_strong(isFirst, false)) {
      std::this_thread::sleep_for(std::chrono::milliseconds{1500});
    }
  };

  testController.startFlow();

  // wait for the source processor to enqueue its flowFiles
  auto flowFilesEnqueued = [&] {return root->getTotalFlowFileCount() == 3;};
  REQUIRE(verifyWithBusyWait(std::chrono::milliseconds{50}, flowFilesEnqueued));

  REQUIRE(sourceProc->trigger_count.load() == 1);

  std::thread shutdownThread([&]{
    execSinkPromise.set_value();
    controller->stop();
  });

  auto shutdownInitiated = std::chrono::steady_clock::now();
  auto shutdownDuration = [&] {return std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - shutdownInitiated);};

  std::this_thread::sleep_for(std::chrono::milliseconds{500});
  while (shutdownDuration() < std::chrono::milliseconds(2500) && controller->isRunning()) {
    timeout_ms += 500;
    testController.getLogger()->log_info("Controller still running after %u ms, extending the waiting period to %u ms, ff count: %u",
        static_cast<unsigned int>(shutdownDuration().count()), timeout_ms, static_cast<unsigned int>(root->getTotalFlowFileCount()));
    testController.configuration_->set(minifi::Configure::nifi_flowcontroller_drain_timeout, std::to_string(timeout_ms) + " ms");
    std::this_thread::sleep_for(std::chrono::milliseconds{500});
  }

  REQUIRE(!controller->isRunning());
  REQUIRE(shutdownDuration().count() > 1500);

  shutdownThread.join();

  REQUIRE(sourceProc->trigger_count.load() == 1);
  REQUIRE(sinkProc->trigger_count.load() == 3);
}
