/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <filesystem>
#include <memory>
#include <string>

#include "SingleProcessorTestController.h"
#include "TestBase.h"
#include "Catch.h"

#include "../../script/ExecuteScript.h"
#include "utils/file/FileUtils.h"
#include "utils/file/PathUtils.h"

namespace org::apache::nifi::minifi::processors::test {

TEST_CASE("Script engine is not set", "[executescriptMisconfiguration]") {
  TestController test_controller;
  auto plan = test_controller.createPlan();

  auto execute_script = plan->addProcessor("ExecuteScript", "executeScript");

  plan->setProperty(execute_script, ExecuteScript::ScriptEngine, "");
  plan->setProperty(execute_script, ExecuteScript::ScriptFile, "/path/to/script.lua");

  REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
}

TEST_CASE("Neither script body nor script file is set", "[executescriptMisconfiguration]") {
  TestController test_controller;
  auto plan = test_controller.createPlan();

  auto execute_script = plan->addProcessor("ExecuteScript", "executeScript");

  plan->setProperty(execute_script, ExecuteScript::ScriptEngine, "lua");

  REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
}

TEST_CASE("Test both script body and script file set", "[executescriptMisconfiguration]") {
  TestController test_controller;
  auto plan = test_controller.createPlan();

  auto execute_script = plan->addProcessor("ExecuteScript", "executeScript");

  plan->setProperty(execute_script, ExecuteScript::ScriptEngine, "lua");
  plan->setProperty(execute_script, ExecuteScript::ScriptFile, "/path/to/script.lua");
  plan->setProperty(execute_script, ExecuteScript::ScriptBody, R"(
function onTrigger(context, session)
  log:info('hello from lua')
end
  )");

  REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
}

TEST_CASE("Lua: Test session get should return None if there are no flowfiles in the incoming connections") {
  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<ExecuteScript>();

  execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
  execute_script->setProperty(ExecuteScript::ScriptBody, R"(
function onTrigger(context, session)
  flow_file = session:get()

  if flow_file ~= nil then
    error("Didn't expect flow_file")
  end
end
  )");
  auto result = controller.trigger();
  REQUIRE(result.at(ExecuteScript::Success).empty());
  REQUIRE(result.at(ExecuteScript::Failure).empty());
}


TEST_CASE("Lua: Test Log", "[executescriptLuaLog]") {
  LogTestController::getInstance().reset();
  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<ExecuteScript>();

  execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
  execute_script->setProperty(ExecuteScript::ScriptBody, R"(
function onTrigger(context, session)
  log:info('hello from lua')
end
  )");

  auto result = controller.trigger();
  REQUIRE(result.at(ExecuteScript::Success).empty());
  REQUIRE(result.at(ExecuteScript::Failure).empty());

  REQUIRE(LogTestController::getInstance().contains("[org::apache::nifi::minifi::processors::ExecuteScript] [info] hello from lua"));
}

TEST_CASE("Lua: Test Read File", "[executescriptLuaRead]") {
  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<ExecuteScript>();

  execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
  execute_script->setProperty(ExecuteScript::ScriptBody, R"(
read_callback = {}

function read_callback.process(self, input_stream)
    content = input_stream:read(0)
    log:info('file content: ' .. content)
    return #content
end

function onTrigger(context, session)
  flow_file = session:get()

  if flow_file ~= nil then
    log:info('got flow file: ' .. flow_file:getAttribute('filename'))
    session:read(flow_file, read_callback)
    session:transfer(flow_file, REL_SUCCESS)
  end
end
  )");

  auto result = controller.trigger("tempFile");
  REQUIRE(result.at(ExecuteScript::Success).size() == 1);
  CHECK(controller.plan->getContent(result.at(ExecuteScript::Success)[0]) == "tempFile");
}

TEST_CASE("Lua: Test Write File", "[executescriptLuaWrite]") {
  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<ExecuteScript>();

  execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
  execute_script->setProperty(ExecuteScript::ScriptBody, R"(
    write_callback = {}

    function write_callback.process(self, output_stream)
      new_content = 'hello 2'
      output_stream:write(new_content)
      return #new_content
    end

    function onTrigger(context, session)
      flow_file = session:get()

      if flow_file ~= nil then
        log:info('got flow file: ' .. flow_file:getAttribute('filename'))
        session:write(flow_file, write_callback)
        session:transfer(flow_file, REL_SUCCESS)
      end
    end
  )");


  auto result = controller.trigger("tempFile");
  REQUIRE(result.at(ExecuteScript::Success).size() == 1);
  CHECK(controller.plan->getContent(result.at(ExecuteScript::Success)[0]) == "hello 2");
}

TEST_CASE("Lua: Test Create", "[executescriptLuaCreate]") {
  LogTestController::getInstance().reset();
  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<ExecuteScript>();

  execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
  execute_script->setProperty(ExecuteScript::ScriptBody, R"(
function onTrigger(context, session)
  flow_file = session:create(nil)

  if flow_file ~= nil then
    log:info('created flow file: ' .. flow_file:getAttribute('filename'))
    session:transfer(flow_file, REL_SUCCESS)
  end
end
  )");


  auto result = controller.trigger();
  REQUIRE(result.at(ExecuteScript::Success).size() == 1);
  REQUIRE(result.at(ExecuteScript::Failure).empty());
  REQUIRE(LogTestController::getInstance().contains("[info] created flow file:"));
}

TEST_CASE("Lua: Test Update Attribute", "[executescriptLuaUpdateAttribute]") {
  LogTestController::getInstance().reset();
  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<ExecuteScript>();

  execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
  execute_script->setProperty(ExecuteScript::ScriptBody, R"(
function onTrigger(context, session)
  flow_file = session:get()

  if flow_file ~= nil then
    log:info('got flow file: ' .. flow_file:getAttribute('filename'))
    flow_file:addAttribute('test_attr', '1')
    attr = tonumber(flow_file:getAttribute('test_attr'))
    log:info('got flow file attr \'test_attr\': ' .. tostring(attr))
    flow_file:updateAttribute('test_attr', tostring(attr + 1))
    session:transfer(flow_file, REL_SUCCESS)
  end
end
  )");

  auto result = controller.trigger("tempFile");
  REQUIRE(result.at(ExecuteScript::Success).size() == 1);
  CHECK(controller.plan->getContent(result.at(ExecuteScript::Success)[0]) == "tempFile");
  CHECK(result.at(ExecuteScript::Success)[0]->getAttribute("test_attr") == "2");
}

TEST_CASE("Lua: Test Require", "[executescriptLuaRequire]") {
  TestController testController;

  LogTestController &logTestController = LogTestController::getInstance();
  logTestController.setDebug<TestPlan>();
  logTestController.setDebug<ExecuteScript>();

  auto plan = testController.createPlan();

  auto executeScript = plan->addProcessor("ExecuteScript", "executeScript");

  plan->setProperty(executeScript, ExecuteScript::ScriptEngine, "lua");
  plan->setProperty(executeScript, ExecuteScript::ScriptBody, R"(
    require 'os'
    require 'coroutine'
    require 'math'
    require 'io'
    require 'string'
    require 'table'
    require 'package'

    log:info('OK')

    function onTrigger(context, session)
    end
  )");

  REQUIRE_NOTHROW(testController.runSession(plan, false));

  REQUIRE(LogTestController::getInstance().contains("[info] OK"));

  logTestController.reset();
}

TEST_CASE("Lua: Test Module Directory property", "[executescriptLuaModuleDirectoryProperty]") {
  using org::apache::nifi::minifi::utils::file::get_executable_dir;

  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<ExecuteScript>();

  const auto script_files_directory =  minifi::utils::file::FileUtils::get_executable_dir() / "resources" / "test_lua_scripts";

  execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
  execute_script->setProperty(ExecuteScript::ScriptFile, (script_files_directory / "foo_bar_processor.lua").string());
  execute_script->setProperty(ExecuteScript::ModuleDirectory, (script_files_directory / "foo_modules" / "foo.lua").string() + "," + (script_files_directory / "bar_modules").string());

  auto result = controller.trigger("tempFile");
  REQUIRE(result.at(ExecuteScript::Success).size() == 1);
  REQUIRE(result.at(ExecuteScript::Failure).empty());

  REQUIRE(LogTestController::getInstance().contains("foobar"));
}

TEST_CASE("Lua: Non existent script file should throw", "[executescriptLuaNonExistentScriptFile]") {
  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<ExecuteScript>();

  execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
  execute_script->setProperty(ExecuteScript::ScriptFile, "/tmp/non-existent-file");

  REQUIRE_THROWS_AS(controller.trigger("tempFile"), minifi::Exception);
}

TEST_CASE("Lua can remove flowfiles", "[ExecuteScript]") {
  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<ExecuteScript>();
  execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
  execute_script->setProperty(ExecuteScript::ScriptBody,
      R"(
        function onTrigger(context, session)
          flow_file = session:get()
          session:remove(flow_file)
        end
      )");
  auto result = controller.trigger("hello");
  REQUIRE(result.at(ExecuteScript::Success).empty());
  REQUIRE(result.at(ExecuteScript::Failure).empty());
}

TEST_CASE("Lua can store states in StateManager", "[ExecuteScript]") {
  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<minifi::processors::ExecuteScript>();
  execute_script->setProperty(ExecuteScript::ScriptEngine, "lua");
  execute_script->setProperty(ExecuteScript::ScriptBody,
      R"(
        function onTrigger(context, session)
          state_manager = context:getStateManager()
          state = state_manager:get()
          if state == nil then
            state = {}
            state['lua_trigger_count'] = 0
          end
          lua_trigger_count = state['lua_trigger_count']
          log:info('lua_trigger_count: ' .. lua_trigger_count)
          state['lua_trigger_count'] = tostring(tonumber(lua_trigger_count) + 1)
          state_manager:set(state)
        end
      )");

  for (size_t i = 0; i < 4; ++i) {
    controller.trigger();
    CHECK(LogTestController::getInstance().contains(fmt::format("lua_trigger_count: {}", i)));
  }
}

}  // namespace org::apache::nifi::minifi::processors::test
