/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <memory>
#include <string>

#include "SingleProcessorTestController.h"
#include "TestBase.h"
#include "Catch.h"

#include "../../script/ExecuteScript.h"
#include "utils/file/FileUtils.h"
#include "utils/file/PathUtils.h"

namespace org::apache::nifi::minifi::processors::test {

TEST_CASE("Script engine is not set", "[executescriptMisconfiguration]") {
  TestController test_controller;
  auto plan = test_controller.createPlan();

  auto execute_script = plan->addProcessor("ExecuteScript", "executeScript");

  plan->setProperty(execute_script, ExecuteScript::ScriptEngine, "");
  plan->setProperty(execute_script, ExecuteScript::ScriptFile, "/path/to/script.py");

  REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
}

TEST_CASE("Neither script body nor script file is set", "[executescriptMisconfiguration]") {
  TestController test_controller;
  auto plan = test_controller.createPlan();

  auto execute_script = plan->addProcessor("ExecuteScript", "executeScript");

  plan->setProperty(execute_script, ExecuteScript::ScriptEngine, "python");

  REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
}

TEST_CASE("Test both script body and script file set", "[executescriptMisconfiguration]") {
  TestController test_controller;
  auto plan = test_controller.createPlan();

  auto execute_script = plan->addProcessor("ExecuteScript", "executeScript");

  plan->setProperty(execute_script, ExecuteScript::ScriptEngine, "python");
  plan->setProperty(execute_script, ExecuteScript::ScriptFile, "/path/to/script.py");
  plan->setProperty(execute_script, ExecuteScript::ScriptBody, R"(
    def onTrigger(context, session):
      log.info('hello from python')
  )");

  REQUIRE_THROWS_AS(test_controller.runSession(plan, true), minifi::Exception);
}

TEST_CASE("Python: Test session get should return None if there are no flowfiles in the incoming connections") {
  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<ExecuteScript>();

  execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
  execute_script->setProperty(ExecuteScript::ScriptBody, R"(
def onTrigger(context, session):
  flow_file = session.get()

  if flow_file is not None:
    raise Exception("Didn't expect flow_file")
  )");
  auto result = controller.trigger();
  REQUIRE(result.at(ExecuteScript::Success).empty());
  REQUIRE(result.at(ExecuteScript::Failure).empty());
}

TEST_CASE("Python: Test Read File", "[executescriptPythonRead]") {
  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<ExecuteScript>();

  execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
  execute_script->setProperty(ExecuteScript::ScriptBody, R"(
import codecs

class ReadCallback(object):
  def process(self, input_stream):
    content = codecs.getreader('utf-8')(input_stream).read()
    log.info('file content: %s' % content)
    return len(content)

def onTrigger(context, session):
  flow_file = session.get()

  if flow_file is not None:
    log.info('got flow file: %s' % flow_file.getAttribute('filename'))
    session.read(flow_file, ReadCallback())
    session.transfer(flow_file, REL_SUCCESS)
  )");

  auto result = controller.trigger("tempFile");
  REQUIRE(result.at(ExecuteScript::Success).size() == 1);
  CHECK(controller.plan->getContent(result.at(ExecuteScript::Success)[0]) == "tempFile");
}

TEST_CASE("Python: Test Write File", "[executescriptPythonWrite]") {
  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<ExecuteScript>();

  execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
  execute_script->setProperty(ExecuteScript::ScriptBody, R"(
class WriteCallback(object):
  def process(self, output_stream):
    new_content = 'hello 2'.encode('utf-8')
    output_stream.write(new_content)
    return len(new_content)

def onTrigger(context, session):
  flow_file = session.get()
  if flow_file is not None:
    log.info('got flow file: %s' % flow_file.getAttribute('filename'))
    session.write(flow_file, WriteCallback())
    session.transfer(flow_file, REL_SUCCESS)
  )");

  auto result = controller.trigger("tempFile");
  REQUIRE(result.at(ExecuteScript::Success).size() == 1);
  CHECK(controller.plan->getContent(result.at(ExecuteScript::Success)[0]) == "hello 2");
}

TEST_CASE("Python: Test Create", "[executescriptPythonCreate]") {
  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<ExecuteScript>();

  execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
  execute_script->setProperty(ExecuteScript::ScriptBody, R"(
def onTrigger(context, session):
  flow_file = session.create()

  if flow_file is not None:
    log.info('created flow file: %s' % flow_file.getAttribute('filename'))
    session.transfer(flow_file, REL_SUCCESS)
  )");


  auto result = controller.trigger();
  REQUIRE(result.at(ExecuteScript::Success).size() == 1);
  REQUIRE(result.at(ExecuteScript::Failure).empty());
  REQUIRE(LogTestController::getInstance().contains("[info] created flow file:"));
}

TEST_CASE("Python: Test Update Attribute", "[executescriptPythonUpdateAttribute]") {
  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<ExecuteScript>();

  execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
  execute_script->setProperty(ExecuteScript::ScriptBody, R"(
def onTrigger(context, session):
  flow_file = session.get()

  if flow_file is not None:
    log.info('got flow file: %s' % flow_file.getAttribute('filename'))
    flow_file.addAttribute('test_attr', '1')
    attr = flow_file.getAttribute('test_attr')
    log.info('got flow file attr \'test_attr\': %s' % attr)
    flow_file.updateAttribute('test_attr', str(int(attr) + 1))
    session.transfer(flow_file, REL_SUCCESS)
  )");

  auto result = controller.trigger("tempFile");
  REQUIRE(result.at(ExecuteScript::Success).size() == 1);
  CHECK(controller.plan->getContent(result.at(ExecuteScript::Success)[0]) == "tempFile");
  CHECK(result.at(ExecuteScript::Success)[0]->getAttribute("test_attr") == "2");
}

TEST_CASE("Python: Test Get Context Property", "[executescriptPythonGetContextProperty]") {
  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<ExecuteScript>();

  execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
  execute_script->setProperty(ExecuteScript::ScriptBody, R"(
def onTrigger(context, session):
  script_engine = context.getProperty('Script Engine')
  log.info('got Script Engine property: %s' % script_engine)
  )");

  auto result_without_input = controller.trigger();
  REQUIRE(result_without_input.at(ExecuteScript::Success).empty());
  REQUIRE(result_without_input.at(ExecuteScript::Failure).empty());

  REQUIRE(LogTestController::getInstance().contains("[info] got Script Engine property: python"));
}

TEST_CASE("Python: Test Module Directory property", "[executescriptPythonModuleDirectoryProperty]") {
  using org::apache::nifi::minifi::utils::file::get_executable_dir;

  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<ExecuteScript>();

  const auto script_files_directory =  minifi::utils::file::FileUtils::get_executable_dir() / "resources" / "test_python_scripts";

  execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
  execute_script->setProperty(ExecuteScript::ScriptFile, (script_files_directory / "foo_bar_processor.py").string());
  execute_script->setProperty(ExecuteScript::ModuleDirectory, (script_files_directory / "foo_modules" / "foo.py").string() + "," + (script_files_directory / "bar_modules").string());

  auto result = controller.trigger("tempFile");
  REQUIRE(result.at(ExecuteScript::Success).size() == 1);
  REQUIRE(result.at(ExecuteScript::Failure).empty());

  REQUIRE(LogTestController::getInstance().contains("foobar"));
}

TEST_CASE("Python: Non existent script file should throw", "[executescriptPythonNonExistentScriptFile]") {
  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<ExecuteScript>();

  execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
  execute_script->setProperty(ExecuteScript::ScriptFile, "/tmp/non-existent-file");

  REQUIRE_THROWS_AS(controller.trigger("tempFile"), minifi::Exception);
}

TEST_CASE("Python can remove flowfiles", "[ExecuteScript]") {
  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<ExecuteScript>();
  execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
  execute_script->setProperty(ExecuteScript::ScriptBody, R"(
def onTrigger(context, session):
  flow_file = session.get()
  session.remove(flow_file))");
  auto result = controller.trigger("hello");
  REQUIRE(result.at(ExecuteScript::Success).empty());
  REQUIRE(result.at(ExecuteScript::Failure).empty());
}

TEST_CASE("Python can store states in StateManager", "[ExecuteScript]") {
  const auto execute_script = std::make_shared<ExecuteScript>("ExecuteScript");

  minifi::test::SingleProcessorTestController controller{execute_script};
  LogTestController::getInstance().setTrace<minifi::processors::ExecuteScript>();
  execute_script->setProperty(ExecuteScript::ScriptEngine, "python");
  execute_script->setProperty(ExecuteScript::ScriptBody,
      R"(
def onTrigger(context, session):
  state_manager = context.getStateManager()
  state = state_manager.get()
  if state is None:
    state = {}
    state['python_trigger_count'] = 0
  python_trigger_count = state['python_trigger_count']
  log.info('python_trigger_count: ' + str(python_trigger_count))
  state['python_trigger_count'] =str(int(python_trigger_count) + 1)
  state_manager.set(state)
)");

  for (size_t i = 0; i < 4; ++i) {
    controller.trigger();
    CHECK(LogTestController::getInstance().contains(fmt::format("python_trigger_count: {}", i)));
  }
}

}  // namespace org::apache::nifi::minifi::processors::test
