blob: 36cbb7382600e88981f30fa17df79cd9f13ccdb5 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "JniProcessSession.h"
#include <string>
#include <memory>
#include <algorithm>
#include <iterator>
#include <set>
#include "core/Property.h"
#include "io/validation.h"
#include "utils/StringUtils.h"
#include "utils/file/FileUtils.h"
#include "properties/Configure.h"
#include "JVMLoader.h"
#include "JniReferenceObjects.h"
#include "core/Processor.h"
#include "JniFlowFile.h"
#include "../JavaException.h"
#ifdef __cplusplus
extern "C" {
#endif
JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_create(JNIEnv *env, jobject obj) {
if (obj == nullptr) {
return nullptr;
}
minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj);
auto ff = minifi::jni::JVMLoader::getInstance()->load_class("org/apache/nifi/processor/JniFlowFile", env);
auto ff_instance = ff.newInstance(env);
minifi::jni::ThrowIf(env);
auto flow_file = session->getSession()->create();
auto flow = std::make_shared<minifi::jni::JniFlowFile>(flow_file, session->getServicer(), ff_instance);
flow_file->addReference(flow);
auto rawFlow = session->addFlowFile(flow);
minifi::jni::JVMLoader::getInstance()->setReference(ff_instance, env, rawFlow);
return ff_instance;
}
JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_readFlowFile(JNIEnv *env, jobject obj, jobject ff) {
if (obj == nullptr) {
return nullptr;
}
THROW_IF_NULL(ff, env, NO_FF_OBJECT);
minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj);
minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff);
if (ptr->get()) {
auto jincls = minifi::jni::JVMLoader::getInstance()->load_class("org/apache/nifi/processor/JniInputStream", env);
auto jin = jincls.newInstance(env);
minifi::jni::ThrowIf(env);
std::unique_ptr<minifi::jni::JniByteInputStream> callback = std::unique_ptr<minifi::jni::JniByteInputStream>(new minifi::jni::JniByteInputStream(4096));
session->getSession()->read(ptr->get(), callback.get());
auto jniInpuStream = std::make_shared<minifi::jni::JniInputStream>(std::move(callback), jin, session->getServicer());
session->addInputStream(jniInpuStream);
minifi::jni::JVMLoader::getInstance()->setReference(jin, env, jniInpuStream.get());
return jin;
}
return nullptr;
}
JNIEXPORT jint JNICALL Java_org_apache_nifi_processor_JniInputStream_read(JNIEnv *env, jobject obj) {
minifi::jni::JniInputStream *jin = minifi::jni::JVMLoader::getPtr<minifi::jni::JniInputStream>(env, obj);
if (obj == nullptr) {
// this technically can't happen per JNI specs
return -1;
}
char value = 0;
if (jin->read(value) > 0)
return value;
else
return -1;
}
JNIEXPORT jint JNICALL Java_org_apache_nifi_processor_JniInputStream_readWithOffset(JNIEnv *env, jobject obj, jbyteArray arr, jint offset, jint length) {
minifi::jni::JniInputStream *jin = minifi::jni::JVMLoader::getPtr<minifi::jni::JniInputStream>(env, obj);
if (obj == nullptr) {
// this technically can't happen per JNI specs
return -1;
}
return jin->read(env, arr, (int) offset, (int) length);
}
JNIEXPORT jboolean JNICALL Java_org_apache_nifi_processor_JniProcessSession_write(JNIEnv *env, jobject obj, jobject ff, jbyteArray byteArray) {
if (obj == nullptr) {
return false;
}
THROW_IF((ff == nullptr || byteArray == nullptr), env, "No flowfile to write");
minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj);
minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff);
if (ptr->get()) {
jbyte* buffer = env->GetByteArrayElements(byteArray, 0);
jsize length = env->GetArrayLength(byteArray);
minifi::jni::JniByteOutStream outStream(buffer, (size_t) length);
session->getSession()->write(ptr->get(), &outStream);
env->ReleaseByteArrayElements(byteArray, buffer, 0);
return true;
}
return false;
}
JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_clone(JNIEnv *env, jobject obj, jobject prevff) {
if (obj == nullptr) {
return nullptr;
}
THROW_IF_NULL(prevff, env, NO_FF_OBJECT);
minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj);
minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, prevff);
if (ptr->get()) {
auto ff = minifi::jni::JVMLoader::getInstance()->load_class("org/apache/nifi/processor/JniFlowFile", env);
auto ff_instance = ff.newInstance(env);
minifi::jni::ThrowIf(env);
auto flow_file = session->getSession()->clone(ptr->get());
auto flow = std::make_shared<minifi::jni::JniFlowFile>(flow_file, session->getServicer(), ff_instance);
flow_file->addReference(flow);
auto rawFlow = session->addFlowFile(flow);
minifi::jni::JVMLoader::getInstance()->setReference(ff_instance, env, rawFlow);
return ff_instance;
}
return nullptr;
}
JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_get(JNIEnv *env, jobject obj) {
if (obj == nullptr)
return nullptr;
minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj);
if (session == nullptr)
return nullptr;
auto flow_file = session->getSession()->get();
auto prevFF = session->getFlowFileReference(flow_file);
if (prevFF != nullptr) {
return prevFF->getJniReference();
}
// otherwise create one
auto ff = minifi::jni::JVMLoader::getInstance()->load_class("org/apache/nifi/processor/JniFlowFile");
auto ff_instance = ff.newInstance(env);
if (flow_file == nullptr || ff_instance == nullptr) {
// this is an acceptable condition.
return nullptr;
}
auto flow = std::make_shared<minifi::jni::JniFlowFile>(flow_file, session->getServicer(), ff_instance);
flow_file->addReference(flow);
auto rawFlow = session->addFlowFile(flow);
minifi::jni::JVMLoader::getInstance()->setReference(ff_instance, env, rawFlow);
return ff_instance;
}
JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_putAttribute(JNIEnv *env, jobject obj, jobject ff, jstring key, jstring value) {
if (obj == nullptr) {
// does not mean an error should be thrown, rather we will let
// Java do its thing as this is a condition of GC most likely
return nullptr;
}
THROW_IF_NULL(ff, env, NO_FF_OBJECT);
minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff);
if (ff == nullptr || key == nullptr || value == nullptr) {
return nullptr;
}
auto resKey = JniStringToUTF(env, key);
auto resValue = JniStringToUTF(env, value);
if (!ptr->get()->addAttribute(resKey, resValue)) {
if (resKey != "uuid") { // don't update the keyed attribute uuid
ptr->get()->updateAttribute(resKey, resValue);
}
}
return ff;
}
JNIEXPORT void JNICALL Java_org_apache_nifi_processor_JniProcessSession_transfer(JNIEnv *env, jobject obj, jobject ff, jstring relationship) {
if (obj == nullptr) {
return;
}
THROW_IF_NULL(ff, env, NO_FF_OBJECT);
minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj);
minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff);
core::Relationship rel(JniStringToUTF(env, relationship), "description");
session->getSession()->transfer(ptr->get(), rel);
}
jstring Java_org_apache_nifi_processor_JniProcessSession_getPropertyValue(JNIEnv *env, jobject obj, jstring propertyName) {
std::string value;
if (obj == nullptr) {
return env->NewStringUTF(value.c_str());
}
core::ProcessContext *context = minifi::jni::JVMLoader::getPtr<core::ProcessContext>(env, obj);
std::string keystr = JniStringToUTF(env, propertyName);
if (!context->getProperty(keystr, value)) {
context->getDynamicProperty(keystr, value);
};
return env->NewStringUTF(value.c_str());
}
JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_createWithParent(JNIEnv *env, jobject obj, jobject parent) {
if (obj == nullptr) {
// does not mean an error should be thrown, rather we will let
// Java do its thing as this is a condition of GC most likely
return nullptr;
}
THROW_IF_NULL(parent, env, NO_FF_OBJECT);
minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj);
minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, parent);
if (ptr->get()) {
auto ff = minifi::jni::JVMLoader::getInstance()->load_class("org/apache/nifi/processor/JniFlowFile", env);
auto ff_instance = ff.newInstance(env);
minifi::jni::ThrowIf(env);
auto flow_file = session->getSession()->create(ptr->get());
auto flow = std::make_shared<minifi::jni::JniFlowFile>(flow_file, session->getServicer(), ff_instance);
flow_file->addReference(flow);
auto rawFlow = session->addFlowFile(flow);
minifi::jni::JVMLoader::getInstance()->setReference(ff_instance, env, rawFlow);
return ff_instance;
}
return nullptr;
}
JNIEXPORT void JNICALL Java_org_apache_nifi_processor_JniProcessSession_commit(JNIEnv *env, jobject obj) {
if (obj == nullptr) {
// does not mean an error should be thrown, rather we will let
// Java do its thing as this is a condition of GC most likely
return;
}
minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj);
try {
session->getSession()->commit();
} catch (const std::exception &e) {
std::string error = "error while committing: ";
error += e.what();
minifi::jni::ThrowJava(env, error.c_str());
} catch (...) {
minifi::jni::ThrowJava(env, "error while commiting");
}
}
JNIEXPORT void JNICALL Java_org_apache_nifi_processor_JniProcessSession_rollback(JNIEnv *env, jobject obj) {
if (obj == nullptr) {
// does not mean an error should be thrown, rather we will let
// Java do its thing as this is a condition of GC most likely
return;
}
minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj);
session->getSession()->rollback();
}
JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSessionFactory_createSession(JNIEnv *env, jobject obj) {
if (obj == nullptr) {
// does not mean an error should be thrown, rather we will let
// Java do its thing as this is a condition of GC most likely
return nullptr;
}
minifi::jni::JniSessionFactory *sessionFactory = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSessionFactory>(env, obj);
auto session_class = minifi::jni::JVMLoader::getInstance()->load_class("org/apache/nifi/processor/JniProcessSession", env);
auto session_instance = session_class.newInstance(env);
minifi::jni::ThrowIf(env);
// create a session
auto procSession = sessionFactory->getFactory()->createSession();
std::shared_ptr<minifi::jni::JniSession> session = std::make_shared<minifi::jni::JniSession>(procSession, session_instance, sessionFactory->getServicer());
// add a reference so the minifi C++ session factory knows to remove these eventually.
procSession->addReference(session);
auto rawSession = sessionFactory->addSession(session);
// set the reference in session_instance using the raw pointer.
minifi::jni::JVMLoader::getInstance()->setReference(session_instance, env, rawSession);
// catalog the session
return session_instance;
}
JNIEXPORT void JNICALL Java_org_apache_nifi_processor_JniProcessSession_remove(JNIEnv *env, jobject obj, jobject ff) {
if (obj == nullptr) {
// does not mean an error should be thrown, rather we will let
// Java do its thing as this is a condition of GC most likely
return;
}
THROW_IF_NULL(ff, env, NO_FF_OBJECT);
minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj);
minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff);
if (ptr->get()) {
session->getSession()->remove(ptr->get());
}
}
JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_penalize(JNIEnv *env, jobject obj, jobject ff) {
if (obj == nullptr) {
// does not mean an error should be thrown, rather we will let
// Java do its thing as this is a condition of GC most likely
return ff;
}
THROW_IF_NULL(ff, env, NO_FF_OBJECT);
minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj);
minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff);
if (ptr->get()) {
session->getSession()->penalize(ptr->get());
}
return ff;
}
JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_removeAttribute(JNIEnv *env, jobject obj, jobject ff, jstring attr) {
if (obj == nullptr) {
// does not mean an error should be thrown, rather we will let
// Java do its thing as this is a condition of GC most likely
return ff;
}
THROW_IF_NULL(ff, env, NO_FF_OBJECT);
minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff);
if (ptr->get()) {
ptr->get()->removeAttribute(JniStringToUTF(env, attr));
}
return ff;
}
JNIEXPORT jobject JNICALL Java_org_apache_nifi_processor_JniProcessSession_clonePortion(JNIEnv *env, jobject obj, jobject prevff, jlong offset, jlong size) {
if (obj == nullptr) {
// does not mean an error should be thrown, rather we will let
// Java do its thing as this is a condition of GC most likely
return prevff;
}
THROW_IF_NULL(prevff, env, NO_FF_OBJECT);
minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj);
minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, prevff);
if (ptr->get()) {
auto ff = minifi::jni::JVMLoader::getInstance()->load_class("org/apache/nifi/processor/JniFlowFile", env);
auto ff_instance = ff.newInstance(env);
minifi::jni::ThrowIf(env);
auto flow_file = session->getSession()->clone(ptr->get(), offset, size);
auto flow = std::make_shared<minifi::jni::JniFlowFile>(flow_file, session->getServicer(), ff_instance);
flow_file->addReference(flow);
auto rawFlow = session->addFlowFile(flow);
minifi::jni::JVMLoader::getInstance()->setReference(ff_instance, env, rawFlow);
return ff_instance;
}
return nullptr;
}
JNIEXPORT jboolean JNICALL Java_org_apache_nifi_processor_JniProcessSession_append(JNIEnv *env, jobject obj, jobject ff, jbyteArray byteArray) {
if (obj == nullptr) {
// does not mean an error should be thrown, rather we will let
// Java do its thing as this is a condition of GC most likely
return false;
}
THROW_IF((ff == nullptr || byteArray == nullptr), env, NO_FF_OBJECT);
minifi::jni::JniSession *session = minifi::jni::JVMLoader::getPtr<minifi::jni::JniSession>(env, obj);
minifi::jni::JniFlowFile *ptr = minifi::jni::JVMLoader::getInstance()->getReference<minifi::jni::JniFlowFile>(env, ff);
if (ptr->get()) {
jbyte* buffer = env->GetByteArrayElements(byteArray, 0);
jsize length = env->GetArrayLength(byteArray);
if (length > 0) {
minifi::jni::JniByteOutStream outStream(buffer, (size_t) length);
session->getSession()->append(ptr->get(), &outStream);
}
env->ReleaseByteArrayElements(byteArray, buffer, 0);
return true;
}
return false;
}
#ifdef __cplusplus
}
#endif