/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <map>
#include <memory>
#include <optional>
#include <set>
#include <string>
#include <utility>
#include <vector>

#include "unit/TestBase.h"
#include "unit/Catch.h"
#include "processors/SQLProcessor.h"
#include "FlowFileRecord.h"
#include "ResourceClaim.h"

#ifdef MINIFI_USE_REAL_ODBC_TEST_DRIVER
static const std::string ODBC_SERVICE = "ODBCService";
#else
#include "mocks/MockODBCService.h"
static const std::string ODBC_SERVICE = "MockODBCService";
#endif

class SQLTestPlan {
 public:
  SQLTestPlan(TestController& controller, const std::string& connection_str, const std::string& sql_processor, std::initializer_list<core::Relationship> output_rels) {
    plan_ = controller.createPlan();
    processor_ = plan_->addProcessor(sql_processor, sql_processor, {}, false);
    plan_->setProperty(processor_, minifi::processors::SQLProcessor::DBControllerService, "ODBCService");
    input_ = plan_->addConnection({}, {"success", "d"}, processor_);
    for (const auto& output_rel : output_rels) {
      outputs_[output_rel] = plan_->addConnection(processor_, output_rel, {});
    }

    // initialize database service
    auto service = plan_->addController(ODBC_SERVICE, "ODBCService");
    plan_->setProperty(service, minifi::sql::controllers::DatabaseService::ConnectionString, connection_str);
  }

  std::string getContent(const std::shared_ptr<core::FlowFile>& flow_file) {
    return plan_->getContent(flow_file);
  }

  std::shared_ptr<core::FlowFile> addInput(std::initializer_list<std::pair<std::string, std::string>> attributes = {}, const std::optional<std::string>& content = {}) {
    auto flow_file = std::make_shared<minifi::FlowFileRecordImpl>();
    for (const auto& attr : attributes) {
      flow_file->setAttribute(attr.first, attr.second);
    }
    if (content) {
      auto claim = std::make_shared<minifi::ResourceClaimImpl>(plan_->getContentRepo());
      auto content_stream = plan_->getContentRepo()->write(*claim);
      const auto ret = content_stream->write(reinterpret_cast<const uint8_t*>(content->c_str()), content->length());
      REQUIRE(ret == content->length());
      flow_file->setOffset(0);
      flow_file->setSize(content->length());
      flow_file->setResourceClaim(claim);
    }
    input_->put(flow_file);
    return flow_file;
  }

  core::Processor* getSQLProcessor() {
    return processor_;
  }

  void run(bool reschedule = false) {
    if (reschedule) {
      plan_->reset(reschedule);
    }
    plan_->runProcessor(static_cast<size_t>(0));  // run the one and only sql processor
  }

  std::vector<std::shared_ptr<core::FlowFile>> getOutputs(const core::Relationship& relationship) {
    auto conn = outputs_[relationship];
    REQUIRE(conn);
    std::vector<std::shared_ptr<core::FlowFile>> flow_files;
    std::set<std::shared_ptr<core::FlowFile>> expired;
    while (auto flow_file = conn->poll(expired)) {
      REQUIRE(expired.empty());
      flow_files.push_back(std::move(flow_file));
    }
    REQUIRE(expired.empty());
    return flow_files;
  }

  bool setProperty(const std::string_view property, const std::string_view value) const {
    return plan_->setProperty(processor_, property, value);
  }

 private:
  std::shared_ptr<TestPlan> plan_;
  core::Processor* processor_;
  minifi::Connection* input_;
  std::map<core::Relationship, minifi::Connection*> outputs_;
};
