blob: 941ef6900f01b0a315e00742e979bed2938ef134 [file] [log] [blame]
/**
* ExecuteJavaClass class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __EXECUTE_JAVA_CLASS__
#define __EXECUTE_JAVA_CLASS__
#include <memory>
#include "FlowFileRecord.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
#include "core/Core.h"
#include "core/Property.h"
#include "core/Resource.h"
#include "concurrentqueue.h"
#include "core/logging/LoggerConfiguration.h"
#include "jvm/JavaControllerService.h"
#include "jvm/JniProcessContext.h"
#include "utils/Id.h"
#include "jvm/NarClassLoader.h"
#include "jvm/JniLogger.h"
#include "jvm/JniReferenceObjects.h"
#include "jvm/JniControllerServiceLookup.h"
#include "jvm/JniInitializationContext.h"
#include "ClassRegistrar.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace jni {
namespace processors {
/**
* Purpose and Justification: Executes a java NiFi Processor
*
* Design: Extends Processor to provide basic processor support capabilities.
*/
class ExecuteJavaProcessor : public core::Processor {
public:
// Constructor
/*!
* Create a new processor
*/
explicit ExecuteJavaProcessor(std::string name, utils::Identifier uuid = utils::Identifier())
: Processor(name, uuid),
context_instance_(nullptr),
logger_instance_(nullptr),
logger_(logging::LoggerFactory<ExecuteJavaProcessor>::getLogger()),
nifi_logger_(nullptr) {
}
// Destructor
virtual ~ExecuteJavaProcessor();
// Processor Name
static const char *ProcessorName;
static core::Property JVMControllerService;
static core::Property NiFiProcessor;
// Supported Relationships
static core::Relationship Success;
virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
virtual void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override;
virtual void initialize() override;
virtual void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) override;
virtual bool supportsDynamicProperties() override {
return true;
}
protected:
static JavaSignatures &getLoggerSignatures() {
static JavaSignatures loggersignatures;
if (loggersignatures.empty()) {
loggersignatures.addSignature( { "isWarnEnabled", "()Z", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniLogger_isWarnEnabled) });
loggersignatures.addSignature( { "isTraceEnabled", "()Z", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniLogger_isTraceEnabled) });
loggersignatures.addSignature( { "isInfoEnabled", "()Z", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniLogger_isInfoEnabled) });
loggersignatures.addSignature( { "isErrorEnabled", "()Z", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniLogger_isErrorEnabled) });
loggersignatures.addSignature( { "isDebugEnabled", "()Z", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniLogger_isDebugEnabled) });
loggersignatures.addSignature( { "info", "(Ljava/lang/String;)V", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniLogger_info) });
loggersignatures.addSignature( { "warn", "(Ljava/lang/String;)V", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniLogger_warn) });
loggersignatures.addSignature( { "error", "(Ljava/lang/String;)V", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniLogger_error) });
loggersignatures.addSignature( { "debug", "(Ljava/lang/String;)V", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniLogger_debug) });
loggersignatures.addSignature( { "trace", "(Ljava/lang/String;)V", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniLogger_trace) });
}
return loggersignatures;
}
static JavaSignatures &getProcessContextSignatures() {
static JavaSignatures methodSignatures;
if (methodSignatures.empty()) {
methodSignatures.addSignature( { "getComponent", "()Lorg/apache/nifi/components/AbstractConfigurableComponent;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessContext_getComponent) });
methodSignatures.addSignature( { "getPropertyNames", "()Ljava/util/List;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessContext_getPropertyNames) });
methodSignatures.addSignature( { "getPropertyValue", "(Ljava/lang/String;)Ljava/lang/String;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessContext_getPropertyValue) });
methodSignatures.addSignature( { "getName", "()Ljava/lang/String;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessContext_getName) });
methodSignatures.addSignature( { "getControllerServiceLookup", "()Lorg/apache/nifi/controller/ControllerServiceLookup;",
reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessContext_getControllerServiceLookup) });
}
return methodSignatures;
}
static JavaSignatures &getInputStreamSignatures() {
static JavaSignatures methodSignatures;
if (methodSignatures.empty()) {
methodSignatures.addSignature( { "read", "()I", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniInputStream_read) });
methodSignatures.addSignature( { "readWithOffset", "([BII)I", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniInputStream_readWithOffset) });
}
return methodSignatures;
}
static JavaSignatures &getFlowFileSignatures() {
static JavaSignatures methodSignatures;
if (methodSignatures.empty()) {
methodSignatures.addSignature( { "getAttributes", "()Ljava/util/Map;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniFlowFile_getAttributes) });
methodSignatures.addSignature( { "getAttribute", "(Ljava/lang/String;)Ljava/lang/String;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniFlowFile_getAttribute) });
methodSignatures.addSignature( { "getSize", "()J", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniFlowFile_getSize) });
methodSignatures.addSignature( { "getEntryDate", "()J", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniFlowFile_getEntryDate) });
methodSignatures.addSignature( { "getLineageStartDate", "()J", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniFlowFile_getLineageStartDate) });
methodSignatures.addSignature( { "getLastQueueDatePrim", "()J", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniFlowFile_getLastQueueDatePrim) });
methodSignatures.addSignature( { "getQueueDateIndex", "()J", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniFlowFile_getQueueDateIndex) });
methodSignatures.addSignature( { "getId", "()J", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniFlowFile_getId) });
methodSignatures.addSignature( { "getUUIDStr", "()Ljava/lang/String;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniFlowFile_getUUIDStr) });
}
return methodSignatures;
}
static JavaSignatures &getProcessSessionSignatures() {
static JavaSignatures methodSignatures;
if (methodSignatures.empty()) {
methodSignatures.addSignature( { "remove", "(Lorg/apache/nifi/flowfile/FlowFile;)V", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessSession_remove) });
methodSignatures.addSignature( { "create", "()Lorg/apache/nifi/flowfile/FlowFile;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessSession_create) });
methodSignatures.addSignature( { "penalize", "(Lorg/apache/nifi/flowfile/FlowFile;)Lorg/apache/nifi/flowfile/FlowFile;",
reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessSession_penalize) });
methodSignatures.addSignature( { "createWithParent", "(Lorg/apache/nifi/flowfile/FlowFile;)Lorg/apache/nifi/flowfile/FlowFile;",
reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessSession_createWithParent) });
methodSignatures.addSignature( { "rollback", "()V", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessSession_rollback) });
methodSignatures.addSignature( { "commit", "()V", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessSession_commit) });
methodSignatures.addSignature( { "get", "()Lorg/apache/nifi/flowfile/FlowFile;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessSession_get) });
methodSignatures.addSignature( { "write", "(Lorg/apache/nifi/flowfile/FlowFile;[B)Z", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessSession_write) });
methodSignatures.addSignature( { "append", "(Lorg/apache/nifi/flowfile/FlowFile;[B)Z", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessSession_append) });
methodSignatures.addSignature( { "putAttribute", "(Lorg/apache/nifi/flowfile/FlowFile;Ljava/lang/String;Ljava/lang/String;)Lorg/apache/nifi/flowfile/FlowFile;",
reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessSession_putAttribute) });
methodSignatures.addSignature( { "removeAttribute", "(Lorg/apache/nifi/flowfile/FlowFile;Ljava/lang/String;)Lorg/apache/nifi/flowfile/FlowFile;",
reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessSession_removeAttribute) });
methodSignatures.addSignature( { "clone", "(Lorg/apache/nifi/flowfile/FlowFile;)Lorg/apache/nifi/flowfile/FlowFile;",
reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessSession_clone) });
methodSignatures.addSignature( { "clonePortion", "(Lorg/apache/nifi/flowfile/FlowFile;JJ)Lorg/apache/nifi/flowfile/FlowFile;",
reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessSession_clonePortion) });
methodSignatures.addSignature( { "readFlowFile", "(Lorg/apache/nifi/flowfile/FlowFile;)Lorg/apache/nifi/processor/JniInputStream;",
reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessSession_readFlowFile) });
methodSignatures.addSignature( { "transfer", "(Lorg/apache/nifi/flowfile/FlowFile;Ljava/lang/String;)V", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessSession_transfer) });
}
return methodSignatures;
}
static JavaSignatures &getJniInitializationContextSignatures() {
static JavaSignatures methodSignatures;
if (methodSignatures.empty()) {
methodSignatures.addSignature( { "getControllerServiceLookup", "()Lorg/apache/nifi/controller/ControllerServiceLookup;",
reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniInitializationContext_getControllerServiceLookup) });
methodSignatures.addSignature( { "getIdentifier", "()Ljava/lang/String;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniInitializationContext_getIdentifier) });
}
return methodSignatures;
}
static JavaSignatures &getJniControllerServiceLookupSignatures() {
static JavaSignatures methodSignatures;
if (methodSignatures.empty()) {
methodSignatures.addSignature( { "getControllerService", "(Ljava/lang/String;)Lorg/apache/nifi/controller/ControllerService;",
reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniControllerServiceLookup_getControllerService) });
methodSignatures.addSignature( { "isControllerServiceEnabled", "(Ljava/lang/String;)Z",
reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniControllerServiceLookup_isControllerServiceEnabled) });
methodSignatures.addSignature( { "isControllerServiceEnabling", "(Ljava/lang/String;)Z",
reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniControllerServiceLookup_isControllerServiceEnabling) });
methodSignatures.addSignature( { "getControllerServiceName", "(Ljava/lang/String;)Ljava/lang/String;",
reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniControllerServiceLookup_getControllerServiceName) });
}
return methodSignatures;
}
static JavaSignatures &getProcessSessionFactorySignatures() {
static JavaSignatures methodSignatures;
if (methodSignatures.empty()) {
methodSignatures.addSignature(
{ "createSession", "()Lorg/apache/nifi/processor/ProcessSession;", reinterpret_cast<void*>(&Java_org_apache_nifi_processor_JniProcessSessionFactory_createSession) });
}
return methodSignatures;
}
virtual void notifyStop() override {
auto localEnv = java_servicer_->attach();
auto onStoppedName = java_servicer_->getAnnotation(class_name_, "OnStopped");
try {
if (!onStoppedName.first.empty() && !onStoppedName.second.empty())
current_processor_class.callVoidMethod(localEnv, clazzInstance, onStoppedName.first.c_str(), onStoppedName.second);
} catch (std::runtime_error &re) {
// this is something that we can ignore.
}
std::lock_guard<std::mutex> lock(local_mutex_);
for (auto &factory : session_factories_) {
factory->remove();
delete factory;
}
// delete the reference to the jni process session
if (logger_instance_) {
localEnv->DeleteGlobalRef(logger_instance_);
logger_instance_ = nullptr;
}
if (init_context_.lookup_ref_) {
localEnv->DeleteGlobalRef(init_context_.lookup_ref_);
}
}
private:
JniSessionFactory *getFactory(const std::shared_ptr<core::ProcessSessionFactory> &ptr) {
std::lock_guard<std::mutex> lock(local_mutex_);
for (const auto &factory : session_factories_) {
if (factory->getFactory() == ptr) {
return factory;
}
}
return nullptr;
}
JniSessionFactory *setFactory(const std::shared_ptr<core::ProcessSessionFactory> &ptr, jobject obj) {
std::lock_guard<std::mutex> lock(local_mutex_);
JniSessionFactory *factory = new JniSessionFactory(ptr, java_servicer_, obj);
session_factories_.push_back(factory);
return factory;
}
JNINativeMethod registerNativeMethod(const std::string &name, const std::string &params, const void *ptr);
JavaClass jni_logger_class_;
jobject logger_instance_;
std::mutex local_mutex_;
std::vector<JniSessionFactory*> session_factories_;
minifi::jni::JniLogger jni_logger_ref_;
JavaClass spn;
JavaClass init;
minifi::jni::JniProcessContext jpc;
JavaClass current_processor_class;
jobject context_instance_;
jobject clazzInstance;
std::shared_ptr<controllers::JavaControllerService> java_servicer_;
std::string class_name_;
std::shared_ptr<logging::Logger> logger_;
std::shared_ptr<logging::Logger> nifi_logger_;
JniControllerServiceLookup csl_;
JniInitializationContext init_context_;
};
REGISTER_RESOURCE(ExecuteJavaProcessor, "ExecuteJavaClass runs NiFi processors given a provided system path ")
} /* namespace processors */
} /* namespace jni */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */
#endif