/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "ExecuteJavaProcessor.h"

#include <regex>
#include <memory>
#include <algorithm>
#include <cctype>
#include <cstdint>
#include <cstring>
#include <iostream>
#include <iterator>
#include <map>
#include <set>
#include <string>
#include <utility>
#include <vector>

#include "core/FlowFile.h"
#include "core/logging/Logger.h"
#include "core/ProcessContext.h"
#include "core/Relationship.h"
#include "ResourceClaim.h"
#include "utils/StringUtils.h"
#include "utils/ByteArrayCallback.h"
#include "jvm/JniMethod.h"
#include "jvm/JniLogger.h"

namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace jni {
namespace processors {

core::Property ExecuteJavaProcessor::JVMControllerService(
    core::PropertyBuilder::createProperty("JVM Controller Service")->withDescription("Name of controller service defined within this flow")->isRequired(false)->withDefaultValue<std::string>("")->build());
core::Property ExecuteJavaProcessor::NiFiProcessor(
    core::PropertyBuilder::createProperty("NiFi Processor")->withDescription("Name of NiFi processor to load and run")->isRequired(true)->withDefaultValue<std::string>("")->build());

const char *ExecuteJavaProcessor::ProcessorName = "ExecuteJavaClass";

core::Relationship ExecuteJavaProcessor::Success("success", "All files are routed to success");
void ExecuteJavaProcessor::initialize() {
  logger_->log_info("Initializing ExecuteJavaClass");
  // Set the supported properties
  std::set<core::Property> properties;
  properties.insert(JVMControllerService);
  properties.insert(NiFiProcessor);
  setSupportedProperties(properties);
  setAcceptAllProperties();
  // Set the supported relationships
  std::set<core::Relationship> relationships;
  relationships.insert(Success);
  setSupportedRelationships(relationships);
}

void ExecuteJavaProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
  std::string controller_service_name;
  if (getProperty(JVMControllerService.getName(), controller_service_name)) {
    auto cs = context->getControllerService(controller_service_name);
    if (cs == nullptr) {
      auto serv_cs = JVMLoader::getInstance()->getBaseServicer();
      java_servicer_ = std::static_pointer_cast<controllers::JavaControllerService>(serv_cs);
      if (serv_cs == nullptr)
        throw std::runtime_error("Could not load controller service");
    } else {
      java_servicer_ = std::static_pointer_cast<controllers::JavaControllerService>(cs);
    }

  } else {
    auto serv_cs = JVMLoader::getInstance()->getBaseServicer();
    java_servicer_ = std::static_pointer_cast<controllers::JavaControllerService>(serv_cs);
    if (serv_cs == nullptr)
      throw std::runtime_error("Could not load controller service");
  }

  if (!getProperty(NiFiProcessor.getName(), class_name_)) {
    throw std::runtime_error("NiFi Processor must be defined");
  }

  nifi_logger_ = logging::LoggerFactory<ExecuteJavaProcessor>::getAliasedLogger(class_name_);

  jni_logger_ref_.logger_reference_ = nifi_logger_;

  jni_logger_class_ = java_servicer_->loadClass("org/apache/nifi/processor/JniLogger");

  spn = java_servicer_->loadClass("org/apache/nifi/processor/JniProcessContext");
  auto env = java_servicer_->attach();
  java_servicer_->putNativeFunctionMapping<minifi::jni::JniProcessContext>(env, spn);

  ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniInitializationContext", getJniInitializationContextSignatures());

  init = java_servicer_->loadClass("org/apache/nifi/processor/JniInitializationContext");

  ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniControllerServiceLookup", getJniControllerServiceLookupSignatures());

  if (context_instance_ != nullptr) {
    java_servicer_->attach()->DeleteGlobalRef(context_instance_);
  } else {
    init_context_.identifier_ = getUUIDStr();
    init_context_.lookup_ = &csl_;
    csl_.cs_lookup_reference_ = context;

    init_context_.lookup_ref_ = java_servicer_->newInstance("org.apache.nifi.processor.JniControllerServiceLookup");

    java_servicer_->setReference<minifi::jni::JniControllerServiceLookup>(env, init_context_.lookup_ref_, &csl_);
  }
  context_instance_ = spn.newInstance(env);

  auto initializer = init.newInstance(env);

  java_servicer_->setReference<minifi::jni::JniInitializationContext>(env, initializer, &init_context_);

  ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniLogger", getLoggerSignatures());

  jni_logger_ref_.clazz_ = jni_logger_class_.getReference();

  logger_instance_ = jni_logger_class_.newInstance(env);
  java_servicer_->setReference<minifi::jni::JniLogger>(env, logger_instance_, &jni_logger_ref_);
  java_servicer_->putNativeFunctionMapping<minifi::jni::JniLogger>(env, jni_logger_class_);
  // create provided class

  clazzInstance = java_servicer_->newInstance(class_name_);
  auto onScheduledNames = java_servicer_->getAnnotations(class_name_, "OnScheduled");
  current_processor_class = java_servicer_->getObjectClass(class_name_, clazzInstance);
  // attempt to schedule here

  ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniProcessContext", getProcessContextSignatures());

  init.callVoidMethod(env, initializer, "setLogger", "(Lorg/apache/nifi/processor/JniLogger;)V", logger_instance_);

  current_processor_class.callVoidMethod(env, clazzInstance, "initialize", "(Lorg/apache/nifi/processor/ProcessorInitializationContext;)V", initializer);

  jpc.nifi_processor_ = clazzInstance;
  jpc.context_ = context;
  jpc.clazz_ = spn.getReference();
  jpc.processor_ = shared_from_this();
  jpc.cslookup_ = init_context_.lookup_ref_;

  java_servicer_->setReference<minifi::jni::JniProcessContext>(env, context_instance_, &jpc);

  try {

    for (const auto &onScheduledName : onScheduledNames) {
      current_processor_class.callVoidMethod(env, clazzInstance, onScheduledName.first.c_str(), onScheduledName.second, context_instance_);
    }
  } catch (std::runtime_error &re) {
    // this can be ignored.
  }

  java_servicer_->attach()->DeleteGlobalRef(initializer);
}

ExecuteJavaProcessor::~ExecuteJavaProcessor() {
}

JNINativeMethod ExecuteJavaProcessor::registerNativeMethod(const std::string &name, const std::string &params, const void *ptr) {
  JNINativeMethod mthd;

  return mthd;
}

void ExecuteJavaProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {

  assert(context == jpc.context_);
  auto env = java_servicer_->attach();

  if (ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniFlowFile", getFlowFileSignatures())) {
    static auto ffc = java_servicer_->loadClass("org/apache/nifi/processor/JniFlowFile");
    java_servicer_->putNativeFunctionMapping<JniFlowFile>(env, ffc);

  }
  if (ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniInputStream", getInputStreamSignatures())) {
    static auto jin = java_servicer_->loadClass("org/apache/nifi/processor/JniInputStream");
    java_servicer_->putNativeFunctionMapping<JniInputStream>(env, jin);
  }
  static auto sessioncls = java_servicer_->loadClass("org/apache/nifi/processor/JniProcessSession");

  if (ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniProcessSession", getProcessSessionSignatures())) {
    java_servicer_->putNativeFunctionMapping<minifi::jni::JniSession>(env, sessioncls);
  }
  ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniProcessSessionFactory", getProcessSessionFactorySignatures());

  static auto sessionFactoryCls = java_servicer_->loadClass("org/apache/nifi/processor/JniProcessSessionFactory");

  try {
    // it is possible that java classes will maintain a reference to the session factory, so we cannot remove the global references
    // in this function.
    jobject java_process_session_factory = nullptr;

    JniSessionFactory *jniSessionFactory = getFactory(sessionFactory);

    if (!jniSessionFactory) {
      java_process_session_factory = sessionFactoryCls.newInstance(env);
      jniSessionFactory = setFactory(sessionFactory, java_process_session_factory);
      java_servicer_->putNativeFunctionMapping<JniSessionFactory>(env, sessionFactoryCls);
      java_servicer_->setReference<JniSessionFactory>(env, java_process_session_factory, jniSessionFactory);
    } else {
      java_process_session_factory = jniSessionFactory->getJavaReference();
    }
    current_processor_class.callVoidMethod(env, clazzInstance, "onTrigger", "(Lorg/apache/nifi/processor/ProcessContext;Lorg/apache/nifi/processor/ProcessSessionFactory;)V", context_instance_,
                                           java_process_session_factory);
  } catch (const JavaException &je) {
    // clear the java exception so we don't continually wrap it
    env->ExceptionClear();
    logger_->log_error(" Java Exception occurred during onTrigger, reason: %s", je.what());
  } catch (const std::exception &e) {
    // clear the java exception so we don't continually wrap it
    env->ExceptionClear();
    logger_->log_error(" Exception occurred during onTrigger, reason: %s", e.what());
  }
}

void ExecuteJavaProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
  // do nothing.
}

} /* namespace processors */
} /* namespace jni */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */

