blob: aadfae2c008e68c9bf498b25d7aae82cf3bfc5bc [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ExecuteJavaProcessor.h"
#include <regex>
#include <memory>
#include <algorithm>
#include <cctype>
#include <cstdint>
#include <cstring>
#include <iostream>
#include <iterator>
#include <map>
#include <set>
#include <string>
#include <utility>
#include <vector>
#include "core/FlowFile.h"
#include "core/logging/Logger.h"
#include "core/ProcessContext.h"
#include "core/Relationship.h"
#include "ResourceClaim.h"
#include "utils/StringUtils.h"
#include "utils/ByteArrayCallback.h"
#include "jvm/JniMethod.h"
#include "jvm/JniLogger.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace jni {
namespace processors {
core::Property ExecuteJavaProcessor::JVMControllerService(
core::PropertyBuilder::createProperty("JVM Controller Service")->withDescription("Name of controller service defined within this flow")->isRequired(false)->withDefaultValue<std::string>("")->build());
core::Property ExecuteJavaProcessor::NiFiProcessor(
core::PropertyBuilder::createProperty("NiFi Processor")->withDescription("Name of NiFi processor to load and run")->isRequired(true)->withDefaultValue<std::string>("")->build());
const char *ExecuteJavaProcessor::ProcessorName = "ExecuteJavaClass";
core::Relationship ExecuteJavaProcessor::Success("success", "All files are routed to success");
void ExecuteJavaProcessor::initialize() {
logger_->log_info("Initializing ExecuteJavaClass");
// Set the supported properties
std::set<core::Property> properties;
properties.insert(JVMControllerService);
properties.insert(NiFiProcessor);
setSupportedProperties(properties);
setAcceptAllProperties();
// Set the supported relationships
std::set<core::Relationship> relationships;
relationships.insert(Success);
setSupportedRelationships(relationships);
}
void ExecuteJavaProcessor::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
std::string controller_service_name;
if (getProperty(JVMControllerService.getName(), controller_service_name)) {
auto cs = context->getControllerService(controller_service_name);
if (cs == nullptr) {
auto serv_cs = JVMLoader::getInstance()->getBaseServicer();
java_servicer_ = std::static_pointer_cast<controllers::JavaControllerService>(serv_cs);
if (serv_cs == nullptr)
throw std::runtime_error("Could not load controller service");
} else {
java_servicer_ = std::static_pointer_cast<controllers::JavaControllerService>(cs);
}
} else {
auto serv_cs = JVMLoader::getInstance()->getBaseServicer();
java_servicer_ = std::static_pointer_cast<controllers::JavaControllerService>(serv_cs);
if (serv_cs == nullptr)
throw std::runtime_error("Could not load controller service");
}
if (!getProperty(NiFiProcessor.getName(), class_name_)) {
throw std::runtime_error("NiFi Processor must be defined");
}
nifi_logger_ = logging::LoggerFactory<ExecuteJavaProcessor>::getAliasedLogger(class_name_);
jni_logger_ref_.logger_reference_ = nifi_logger_;
jni_logger_class_ = java_servicer_->loadClass("org/apache/nifi/processor/JniLogger");
spn = java_servicer_->loadClass("org/apache/nifi/processor/JniProcessContext");
auto env = java_servicer_->attach();
java_servicer_->putNativeFunctionMapping<minifi::jni::JniProcessContext>(env, spn);
ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniInitializationContext", getJniInitializationContextSignatures());
init = java_servicer_->loadClass("org/apache/nifi/processor/JniInitializationContext");
ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniControllerServiceLookup", getJniControllerServiceLookupSignatures());
if (context_instance_ != nullptr) {
java_servicer_->attach()->DeleteGlobalRef(context_instance_);
} else {
init_context_.identifier_ = getUUIDStr();
init_context_.lookup_ = &csl_;
csl_.cs_lookup_reference_ = context;
init_context_.lookup_ref_ = java_servicer_->newInstance("org.apache.nifi.processor.JniControllerServiceLookup");
java_servicer_->setReference<minifi::jni::JniControllerServiceLookup>(env, init_context_.lookup_ref_, &csl_);
}
context_instance_ = spn.newInstance(env);
auto initializer = init.newInstance(env);
java_servicer_->setReference<minifi::jni::JniInitializationContext>(env, initializer, &init_context_);
ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniLogger", getLoggerSignatures());
jni_logger_ref_.clazz_ = jni_logger_class_.getReference();
logger_instance_ = jni_logger_class_.newInstance(env);
java_servicer_->setReference<minifi::jni::JniLogger>(env, logger_instance_, &jni_logger_ref_);
java_servicer_->putNativeFunctionMapping<minifi::jni::JniLogger>(env, jni_logger_class_);
// create provided class
clazzInstance = java_servicer_->newInstance(class_name_);
auto onScheduledNames = java_servicer_->getAnnotations(class_name_, "OnScheduled");
current_processor_class = java_servicer_->getObjectClass(class_name_, clazzInstance);
// attempt to schedule here
ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniProcessContext", getProcessContextSignatures());
init.callVoidMethod(env, initializer, "setLogger", "(Lorg/apache/nifi/processor/JniLogger;)V", logger_instance_);
current_processor_class.callVoidMethod(env, clazzInstance, "initialize", "(Lorg/apache/nifi/processor/ProcessorInitializationContext;)V", initializer);
jpc.nifi_processor_ = clazzInstance;
jpc.context_ = context;
jpc.clazz_ = spn.getReference();
jpc.processor_ = shared_from_this();
jpc.cslookup_ = init_context_.lookup_ref_;
java_servicer_->setReference<minifi::jni::JniProcessContext>(env, context_instance_, &jpc);
try {
for (const auto &onScheduledName : onScheduledNames) {
current_processor_class.callVoidMethod(env, clazzInstance, onScheduledName.first.c_str(), onScheduledName.second, context_instance_);
}
} catch (std::runtime_error &re) {
// this can be ignored.
}
java_servicer_->attach()->DeleteGlobalRef(initializer);
}
ExecuteJavaProcessor::~ExecuteJavaProcessor() {
}
JNINativeMethod ExecuteJavaProcessor::registerNativeMethod(const std::string &name, const std::string &params, const void *ptr) {
JNINativeMethod mthd;
return mthd;
}
void ExecuteJavaProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
assert(context == jpc.context_);
auto env = java_servicer_->attach();
if (ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniFlowFile", getFlowFileSignatures())) {
static auto ffc = java_servicer_->loadClass("org/apache/nifi/processor/JniFlowFile");
java_servicer_->putNativeFunctionMapping<JniFlowFile>(env, ffc);
}
if (ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniInputStream", getInputStreamSignatures())) {
static auto jin = java_servicer_->loadClass("org/apache/nifi/processor/JniInputStream");
java_servicer_->putNativeFunctionMapping<JniInputStream>(env, jin);
}
static auto sessioncls = java_servicer_->loadClass("org/apache/nifi/processor/JniProcessSession");
if (ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniProcessSession", getProcessSessionSignatures())) {
java_servicer_->putNativeFunctionMapping<minifi::jni::JniSession>(env, sessioncls);
}
ClassRegistrar::getRegistrar().registerClasses(env, java_servicer_, "org/apache/nifi/processor/JniProcessSessionFactory", getProcessSessionFactorySignatures());
static auto sessionFactoryCls = java_servicer_->loadClass("org/apache/nifi/processor/JniProcessSessionFactory");
try {
// it is possible that java classes will maintain a reference to the session factory, so we cannot remove the global references
// in this function.
jobject java_process_session_factory = nullptr;
JniSessionFactory *jniSessionFactory = getFactory(sessionFactory);
if (!jniSessionFactory) {
java_process_session_factory = sessionFactoryCls.newInstance(env);
jniSessionFactory = setFactory(sessionFactory, java_process_session_factory);
java_servicer_->putNativeFunctionMapping<JniSessionFactory>(env, sessionFactoryCls);
java_servicer_->setReference<JniSessionFactory>(env, java_process_session_factory, jniSessionFactory);
} else {
java_process_session_factory = jniSessionFactory->getJavaReference();
}
current_processor_class.callVoidMethod(env, clazzInstance, "onTrigger", "(Lorg/apache/nifi/processor/ProcessContext;Lorg/apache/nifi/processor/ProcessSessionFactory;)V", context_instance_,
java_process_session_factory);
} catch (const JavaException &je) {
// clear the java exception so we don't continually wrap it
env->ExceptionClear();
logger_->log_error(" Java Exception occurred during onTrigger, reason: %s", je.what());
} catch (const std::exception &e) {
// clear the java exception so we don't continually wrap it
env->ExceptionClear();
logger_->log_error(" Exception occurred during onTrigger, reason: %s", e.what());
}
}
void ExecuteJavaProcessor::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
// do nothing.
}
} /* namespace processors */
} /* namespace jni */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */