/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#ifndef LIBMINIFI_CAPI_PLAN_H_
#define LIBMINIFI_CAPI_PLAN_H_

#ifndef WIN32
  #include <dirent.h>
#endif
#include <cstdio>
#include <cstdlib>
#include <sstream>
#include "ResourceClaim.h"
#include <vector>
#include <set>
#include <map>
#include <unordered_map>
#include "core/logging/Logger.h"
#include "core/Core.h"
#include "properties/Configure.h"
#include "properties/Properties.h"
#include "core/logging/LoggerConfiguration.h"
#include "utils/Id.h"
#include "spdlog/sinks/ostream_sink.h"
#include "spdlog/sinks/dist_sink.h"
#include "core/Core.h"
#include "core/FlowFile.h"
#include "core/Processor.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
#include "core/ProcessorNode.h"
#include "core/reporting/SiteToSiteProvenanceReportingTask.h"
#include "api/nanofi.h"

static const std::string CallbackProcessorName = "CallbackProcessor";

using failure_callback_type = std::function<void(flow_file_record*)>;
using content_repo_sptr = std::shared_ptr<core::ContentRepository>;

struct flowfile_input_params {
  std::shared_ptr<minifi::io::DataStream> content_stream;
  std::map<std::string, std::string> attributes;
};

namespace {

  void failureStrategyAsIs(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) {
    auto ff = session->get();
    if (ff == nullptr) {
      return;
    }

    auto claim = ff->getResourceClaim();

    if (claim != nullptr && user_callback != nullptr) {
      claim->increaseFlowFileRecordOwnedCount();
      // create a flow file.
      auto path = claim->getContentFullPath();
      auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
      ffr->attributes = ff->getAttributesPtr();
      ffr->ffp = static_cast<void*>(new std::shared_ptr<minifi::core::FlowFile>(ff));
      auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
      *content_repo_ptr = cr_ptr;
      user_callback(ffr);
    }
    session->remove(ff);
  }

  void failureStrategyRollback(core::ProcessSession *session, failure_callback_type user_callback, content_repo_sptr cr_ptr) {
    session->rollback();
    failureStrategyAsIs(session, user_callback, cr_ptr);
  }
}

static const std::map<FailureStrategy, const std::function<void(core::ProcessSession*, failure_callback_type, content_repo_sptr)>> FailureStrategies =
    { { FailureStrategy::AS_IS, failureStrategyAsIs }, {FailureStrategy::ROLLBACK, failureStrategyRollback } };

class ExecutionPlan {
 public:

  explicit ExecutionPlan(std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::Repository> prov_repo);

  std::shared_ptr<core::Processor> addSimpleCallback(void *, std::function<void(core::ProcessSession*)>);

  std::shared_ptr<core::Processor> addCallback(void *obj,
      std::function<void(core::ProcessSession*, core::ProcessContext *)> ontrigger_callback,
      std::function<void(core::ProcessContext *)> onschedule_callback = {});

  std::shared_ptr<core::Processor> addProcessor(const std::shared_ptr<core::Processor> &processor, const std::string &name,
                                                core::Relationship relationship = core::Relationship("success", "description"),
                                                bool linkToPrevious = false);

  std::shared_ptr<core::Processor> addProcessor(const std::string &processor_name, const std::string &name, core::Relationship relationship = core::Relationship("success", "description"),
  bool linkToPrevious = false);

  bool setProperty(const std::shared_ptr<core::Processor> proc, const std::string &prop, const std::string &value);

  void reset();

  bool runNextProcessor(std::function<void(const std::shared_ptr<core::ProcessContext>, const std::shared_ptr<core::ProcessSession>)> verify = nullptr, std::shared_ptr<flowfile_input_params> = nullptr);

  bool setFailureCallback(failure_callback_type onerror_callback);

  bool setFailureStrategy(FailureStrategy start);

  std::set<std::shared_ptr<provenance::ProvenanceEventRecord>> getProvenanceRecords();

  std::shared_ptr<core::FlowFile> getCurrentFlowFile();

  std::shared_ptr<core::ProcessSession> getCurrentSession();

  std::shared_ptr<core::Repository> getFlowRepo() {
    return flow_repo_;
  }

  std::shared_ptr<core::Repository> getProvenanceRepo() {
    return prov_repo_;
  }

  std::shared_ptr<core::ContentRepository> getContentRepo() {
    return content_repo_;
  }

  std::shared_ptr<core::FlowFile> getNextFlowFile(){
    return next_ff_;
  }

  void setNextFlowFile(std::shared_ptr<core::FlowFile> ptr){
    next_ff_ = ptr;
  }

  bool hasProcessor() {
    return !processor_queue_.empty();
  }

  static std::shared_ptr<core::Processor> createProcessor(const std::string &processor_name, const std::string &name);

  static std::shared_ptr<core::Processor> createCallback(void *obj,
      std::function<void(core::ProcessSession*, core::ProcessContext *)> ontrigger_callback,
      std::function<void(core::ProcessContext *)> onschedule_callback = {});

  static std::shared_ptr<ExecutionPlan> getPlan(const std::string& uuid) {
    auto it = proc_plan_map_.find(uuid);
    return it != proc_plan_map_.end() ? it->second : nullptr;
  }

  static void addProcessorWithPlan(const std::string &uuid, std::shared_ptr<ExecutionPlan> plan) {
    proc_plan_map_[uuid] = plan;
  }

  static bool removeProcWithPlan(const std::string& uuid) {
    return proc_plan_map_.erase(uuid) > 0;
  }

  static size_t getProcWithPlanQty() {
    return proc_plan_map_.size();
  }

  static bool addCustomProcessor(custom_processor_args);

  static int deleteCustomProcessor(const char * name);

 protected:
  class FailureHandler {
   public:
    FailureHandler(content_repo_sptr cr_ptr) {
      callback_ = nullptr;
      strategy_ = FailureStrategy::AS_IS;
      content_repo_ = cr_ptr;
    }
    void setCallback(failure_callback_type onerror_callback) {
      callback_=onerror_callback;
    }
    void setStrategy(FailureStrategy strat) {
      strategy_ = strat;
    }
    void operator()(core::ProcessSession* ps) {
      FailureStrategies.at(strategy_)(ps, callback_, content_repo_);
    }
   private:
    failure_callback_type callback_;
    FailureStrategy strategy_;
    content_repo_sptr content_repo_;
  };

  void finalize();

  std::shared_ptr<minifi::Connection> buildFinalConnection(std::shared_ptr<core::Processor> processor, bool set_dst = false);

  std::shared_ptr<minifi::Connection> connectProcessors(std::shared_ptr<core::Processor> src_proc, std::shared_ptr<core::Processor> dst_proc,
                                                        core::Relationship relationship = core::Relationship("success", "description"), bool set_dst = false);

  std::shared_ptr<org::apache::nifi::minifi::io::StreamFactory> stream_factory;

  content_repo_sptr content_repo_;

  std::shared_ptr<core::Repository> flow_repo_;
  std::shared_ptr<core::Repository> prov_repo_;

  std::shared_ptr<core::controller::ControllerServiceProvider> controller_services_provider_;

  std::atomic<bool> finalized;

  uint32_t location;

  std::shared_ptr<core::ProcessSession> current_session_;
  std::shared_ptr<core::FlowFile> current_flowfile_;

  std::map<std::string, std::shared_ptr<core::Processor>> processor_mapping_;
  std::vector<std::shared_ptr<core::Processor>> processor_queue_;
  std::vector<std::shared_ptr<core::Processor>> configured_processors_;
  std::vector<std::shared_ptr<core::ProcessorNode>> processor_nodes_;
  std::vector<std::shared_ptr<core::ProcessContext>> processor_contexts_;
  std::vector<std::shared_ptr<core::ProcessSession>> process_sessions_;
  std::vector<std::shared_ptr<core::ProcessSessionFactory>> factories_;
  std::vector<std::shared_ptr<minifi::Connection>> relationships_;
  core::Relationship termination_;

  std::shared_ptr<core::FlowFile> next_ff_;

 private:

  static std::shared_ptr<utils::IdGenerator> id_generator_;
  std::shared_ptr<logging::Logger> logger_;
  std::shared_ptr<FailureHandler> failure_handler_;
  static std::unordered_map<std::string, std::shared_ptr<ExecutionPlan>> proc_plan_map_;
  static std::map<std::string, custom_processor_args> custom_processors;
};

#endif /* LIBMINIFI_CAPI_PLAN_H_ */
