blob: f021013008032aa553b325f291944ae566b104ca [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <string>
#include <map>
#include <memory>
#include <utility>
#include <exception>
#include <stdio.h>
#include "api/nanofi.h"
#include "core/Core.h"
#include "core/expect.h"
#include "cxx/Instance.h"
#include "cxx/Plan.h"
#include "cxx/CallbackProcessor.h"
#include "ResourceClaim.h"
#include "core/logging/LoggerConfiguration.h"
#include "utils/StringUtils.h"
#include "io/BufferStream.h"
#include "core/cxxstructs.h"
#include "utils/FlatMap.h"
using AttributeMap = org::apache::nifi::minifi::core::FlowFile::AttributeMap;
class API_INITIALIZER {
public:
static int initialized;
};
// Just an internal utility func., not to be published via API!
file_buffer file_to_buffer(const char *path) {
file_buffer fb;
fb.buffer = nullptr;
fb.file_len = 0;
NULL_CHECK(fb, path);
FILE *fileptr;
uint8_t *buffer;
size_t filelen;
fileptr = fopen(path, "rb");
NULL_CHECK(fb, fileptr);
fseek(fileptr, 0, SEEK_END);
filelen = ftell(fileptr);
rewind(fileptr);
buffer = (uint8_t *)malloc((filelen+1)*sizeof(uint8_t)); // Enough memory for file + \0
const size_t read_result = fread(buffer, filelen, 1, fileptr);
fclose(fileptr);
if (read_result != 1) {
free(buffer);
return fb;
}
fb.buffer = buffer;
fb.file_len = filelen;
return fb;
}
int API_INITIALIZER::initialized = initialize_api();
static nifi_instance* standalone_instance = nullptr;
int initialize_api() {
logging::LoggerConfiguration::getConfiguration().disableLogging();
return 1;
}
void enable_logging() {
logging::LoggerConfiguration::getConfiguration().enableLogging();
}
void set_terminate_callback(void (*terminate_callback)()) {
std::set_terminate(terminate_callback);
}
class DirectoryConfiguration {
protected:
DirectoryConfiguration() {
minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY);
}
public:
static void initialize() {
static DirectoryConfiguration configure;
}
};
/**
* Creates a NiFi Instance from the url and output port.
* @param url http URL for NiFi instance
* @param port Remote output port.
* @Deprecated for API version 0.2 in favor of the following prototype
* nifi_instance *create_instance(nifi_port const *port) {
*/
nifi_instance *create_instance_repo(const char *url, nifi_port *port, const char* const repo_type) {
// make sure that we have a thread safe way of initializing the content directory
DirectoryConfiguration::initialize();
// need reinterpret cast until we move to C for this module.
nifi_instance *instance = reinterpret_cast<nifi_instance*>(malloc(sizeof(nifi_instance)));
/**
* This API will gradually move away from C++, hence malloc is used for nifi_instance
* Since minifi::Instance is currently being used, then we need to use new in that case.
*/
instance->instance_ptr = new minifi::Instance(url, port->port_id, repo_type);
NULL_CHECK(nullptr, instance->instance_ptr);
// may have to translate port ID here in the future
// need reinterpret cast until we move to C for this module.
char* port_id = reinterpret_cast<char*>(malloc(strlen(port->port_id) + 1));
std::strcpy(port_id, port->port_id);
instance->port.port_id = port_id;
return instance;
}
standalone_processor * create_processor(const char *name, nifi_instance * instance) {
NULL_CHECK(nullptr, name);
auto ptr = ExecutionPlan::createProcessor(name, name);
if (!ptr) {
return nullptr;
}
if (instance == NULL) {
nifi_port port;
std::string port_str = utils::IdGenerator::getIdGenerator()->generate().to_string();
port.port_id = const_cast<char*>(port_str.c_str());
instance = create_instance("internal_standalone", &port);
}
auto flow = create_new_flow(instance);
std::shared_ptr<ExecutionPlan> plan(flow);
plan->addProcessor(ptr, name);
ExecutionPlan::addProcessorWithPlan(ptr->getUUIDStr(), plan);
return static_cast<standalone_processor*>(ptr.get());
}
void initialize_content_repo(processor_context * ctx, const char * uuid) {
if (ctx->isInitialized()) {
return;
}
char * cwd = get_current_working_directory();
if (cwd) {
const char * sep = get_separator(0);
const std::string repo_path = std::string(cwd) + sep + "contentrepository" + sep + uuid;
ctx->initializeContentRepository(repo_path);
free(cwd);
}
}
void clear_content_repo(const nifi_instance * instance) {
const auto content_repo = static_cast<minifi::Instance*>(instance->instance_ptr)->getContentRepository();
const auto storage_path = content_repo->getStoragePath();
remove_directory(storage_path.c_str());
}
void get_proc_uuid_from_processor(standalone_processor * proc, char * uuid_target) {
strcpy(uuid_target, proc->getUUIDStr().c_str());
}
void get_proc_uuid_from_context(const processor_context * ctx, char * uuid_target) {
standalone_processor * proc = static_cast<standalone_processor*>(ctx->getProcessorNode()->getProcessor().get());
get_proc_uuid_from_processor(proc, uuid_target);
}
void free_standalone_processor(standalone_processor* proc) {
NULL_CHECK(, proc);
ExecutionPlan::removeProcWithPlan(proc->getUUIDStr());
if (ExecutionPlan::getProcWithPlanQty() == 0) {
// The instance is not needed any more as there are no standalone processors in the system
free_nanofi_instance(standalone_instance);
standalone_instance = nullptr;
}
}
/**
* Initializes the instance
*/
void initialize_instance(nifi_instance *instance) {
auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
minifi_instance_ref->setRemotePort(instance->port.port_id);
}
/*
typedef int c2_update_callback(char *);
typedef int c2_stop_callback(char *);
typedef int c2_start_callback(char *);
*/
void enable_async_c2(nifi_instance *instance, C2_Server *server, c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) {
auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
minifi_instance_ref->enableAsyncC2(server, c1, c2, c3);
}
/**
* Sets a property within the nifi instance
* @param instance nifi instance
* @param key key in which we will set the valiue
* @param value
* @return -1 when instance or key are null
*/
int set_instance_property(nifi_instance *instance, const char *key, const char *value) {
NULL_CHECK(-1, instance, key);
auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
minifi_instance_ref->getConfiguration()->set(key, value);
return 0;
}
/**
* Reclaims memory associated with a nifi instance structure.
* @param instance nifi instance.
*/
void free_nanofi_instance(nifi_instance* instance) {
NULL_CHECK(, instance);
delete ((minifi::Instance*) instance->instance_ptr);
free(instance->port.port_id);
free(instance);
}
/**
* Creates a flow file record
* @param file file to place into the flow file.
*/
flow_file_record* create_flowfile(const char *file, const size_t len) {
NULL_CHECK(nullptr, file);
std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
uint64_t file_size = in.tellg();
return create_ff_object(file, len, file_size);
}
/**
* Creates a flow file record
* @param file file to place into the flow file.
*/
flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size) {
NULL_CHECK(nullptr, file);
flow_file_record *new_ff = create_ff_object_na(file, len, size);
new_ff->attributes = new AttributeMap();
return new_ff;
}
flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size) {
flow_file_record *new_ff = (flow_file_record *) malloc(sizeof(flow_file_record));
new_ff->attributes = nullptr;
if (file != nullptr) {
new_ff->contentLocation = (char *) malloc(sizeof(char) * (len + 1));
snprintf(new_ff->contentLocation, len + 1, "%s", file);
// set the size of the flow file.
new_ff->size = size;
} else {
new_ff->contentLocation = nullptr;
new_ff->size = 0;
}
new_ff->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>);
new_ff->ffp = nullptr;
new_ff->keepContent = 0;
return new_ff;
}
flow_file_record* create_ff_object_nc() {
flow_file_record* new_ff = create_ff_object_na(nullptr, 0, 0);
new_ff->attributes = new AttributeMap();
return new_ff;
}
flow_file_record * generate_flow(processor_context * ctx) {
flow_file_record * ffr = create_ff_object_nc();
if (ffr->crp) {
delete static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
}
ffr->crp = static_cast<void*>(new std::shared_ptr<minifi::core::ContentRepository>(ctx->getContentRepository()));
auto plan = ExecutionPlan::getPlan(ctx->getProcessorNode()->getProcessor()->getUUIDStr());
if (!plan) {
return nullptr;
}
ffr->ffp = NULL;
ffr->keepContent = 0;
auto ff_content_repo_ptr = (static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp));
auto claim = std::make_shared<minifi::ResourceClaim>(*ff_content_repo_ptr);
size_t len = strlen(claim->getContentFullPath().c_str());
ffr->contentLocation = (char *) malloc((len + 1) * sizeof(char));
snprintf(ffr->contentLocation, len+1, "%s", claim->getContentFullPath().c_str());
return ffr;
}
flow_file_record * write_to_flow(const char * buff, size_t count, processor_context * ctx) {
if (!ctx) {
return NULL;
}
flow_file_record * ffr = generate_flow(ctx);
if (ffr == NULL) {
printf("Could not generate flow file\n");
return NULL;
}
FILE * ffp = fopen(ffr->contentLocation, "wb");
if (!ffp) {
printf("Cannot open flow file at path %s to write content to.\n", ffr->contentLocation);
free_flowfile(ffr);
return NULL;
}
size_t ret = fwrite(buff, 1, count, ffp);
if (ret < count) {
fclose(ffp);
free_flowfile(ffr);
return NULL;
}
fseek(ffp, 0, SEEK_END);
ffr->size = ftell(ffp);
fclose(ffp);
return ffr;
}
/**
* Reclaims memory associated with a flow file object
* @param ff flow file record.
*/
void free_flowfile(flow_file_record *ff) {
NULL_CHECK(, ff);
if (ff->crp){
auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
if((*content_repo_ptr) && (ff->keepContent == 0)) {
auto claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, *content_repo_ptr);
(*content_repo_ptr)->remove(*claim);
}
delete content_repo_ptr;
}
if (ff->ffp == nullptr) {
auto map = static_cast<AttributeMap*>(ff->attributes);
delete map;
} else {
auto ff_sptr = reinterpret_cast<std::shared_ptr<core::FlowFile>*>(ff->ffp);
delete ff_sptr;
}
free(ff->contentLocation);
free(ff);
}
/**
* Adds an attribute
* @param ff flow file record
* @param key key
* @param value value to add
* @param size size of value
* @return 0 or -1 based on whether the attributed existed previously (-1) or not (0)
*/
int8_t add_attribute(flow_file_record *ff, const char *key, void *value, size_t size) {
NULL_CHECK(-1, ff, key, value);
NULL_CHECK(-1, ff->attributes);
auto attribute_map = static_cast<AttributeMap*>(ff->attributes);
const auto& ret = attribute_map->insert(std::pair<std::string, std::string>(key, std::string(static_cast<char*>(value), size)));
return ret.second ? 0 : -1;
}
/**
* Updates (or adds) an attribute
* @param ff flow file record
* @param key key
* @param value value to add
* @param size size of value
*/
void update_attribute(flow_file_record *ff, const char *key, void *value, size_t size) {
NULL_CHECK(, ff, key);
NULL_CHECK(, ff->attributes);
auto attribute_map = static_cast<AttributeMap*>(ff->attributes);
(*attribute_map)[key] = std::string(static_cast<char*>(value), size);
}
/*
* Obtains the attribute.
* @param ff flow file record
* @param key key
* @param caller_attribute caller supplied object in which we will copy the data ptr
* @return 0 if successful, -1 if the key does not exist
*/
int8_t get_attribute(const flow_file_record * ff, attribute * caller_attribute) {
NULL_CHECK(-1, ff, caller_attribute);
NULL_CHECK(-1, ff->attributes, caller_attribute->key);
auto attribute_map = static_cast<AttributeMap*>(ff->attributes);
auto find = attribute_map->find(caller_attribute->key);
if (find != attribute_map->end()) {
caller_attribute->value = static_cast<void*>(const_cast<char*>(find->second.data()));
caller_attribute->value_size = find->second.size();
return 0;
}
return -1;
}
int get_attribute_quantity(const flow_file_record *ff) {
NULL_CHECK(0, ff);
NULL_CHECK(0, ff->attributes);
auto attribute_map = static_cast<AttributeMap*>(ff->attributes);
return attribute_map ? attribute_map->size() : 0;
}
int get_all_attributes(const flow_file_record* ff, attribute_set *target) {
NULL_CHECK(0, ff, target);
NULL_CHECK(0, ff->attributes, target->attributes);
auto attribute_map = static_cast<AttributeMap*>(ff->attributes);
size_t i = 0;
for (const auto& kv : *attribute_map) {
if (i >= target->size) {
break;
}
target->attributes[i].key = kv.first.data();
target->attributes[i].value = static_cast<void*>(const_cast<char*>(kv.second.data()));
target->attributes[i].value_size = kv.second.size();
++i;
}
return gsl::narrow<int>(i);
}
/**
* Removes a key from the attribute chain
* @param ff flow file record
* @param key key to remove
* @return 0 if removed, -1 otherwise
*/
int8_t remove_attribute(flow_file_record *ff, const char *key) {
NULL_CHECK(-1, ff, key);
NULL_CHECK(-1, ff->attributes);
auto attribute_map = static_cast<AttributeMap*>(ff->attributes);
return gsl::narrow<int8_t>(attribute_map->erase(key)) - 1; // erase by key returns the number of elements removed (0 or 1)
}
int get_content(const flow_file_record* ff, uint8_t* target, int size) {
NULL_CHECK(0, ff, target);
auto content_repo = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp);
if(ff->crp && (*content_repo)) {
std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation,
*content_repo);
auto stream = (*content_repo)->read(*claim);
return stream->read(target, size);
} else {
file_buffer fb = file_to_buffer(ff->contentLocation);
size_t copy_size = size < fb.file_len ? size : gsl::narrow<size_t>(fb.file_len);
memcpy(target, fb.buffer, copy_size*sizeof(uint8_t));
free(fb.buffer);
return copy_size;
}
}
/**
* Transmits the flowfile
* @param ff flow file record
* @param instance nifi instance structure
*/
int transmit_flowfile(flow_file_record *ff, nifi_instance *instance) {
static AttributeMap empty_attribute_map;
NULL_CHECK(-1, ff, instance);
auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
// in the unlikely event the user forgot to initialize the instance, we shall do it for them.
if (UNLIKELY(minifi_instance_ref->isRPGConfigured() == false)) {
minifi_instance_ref->setRemotePort(instance->port.port_id);
}
AttributeMap& attribute_map = empty_attribute_map;
if(ff->attributes) {
attribute_map = *(static_cast<AttributeMap *>(ff->attributes));
}
auto no_op = minifi_instance_ref->getNoOpRepository();
auto content_repo = minifi_instance_ref->getContentRepository();
std::shared_ptr<minifi::ResourceClaim> claim = nullptr;
std::shared_ptr<minifi::io::BufferStream> stream = nullptr; // Used when content is not in content repo
if(ff->contentLocation) {
auto ff_content_repo_ptr = (static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ff->crp));
if (ff->crp && (*ff_content_repo_ptr)) {
content_repo = *ff_content_repo_ptr;
claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
claim->increaseFlowFileRecordOwnedCount();
claim->increaseFlowFileRecordOwnedCount();
} else {
file_buffer fb = file_to_buffer(ff->contentLocation);
stream = std::make_shared<minifi::io::BufferStream>();
stream->write(fb.buffer, gsl::narrow<int>(fb.file_len));
free(fb.buffer);
}
} else {
// The flowfile has no content - create an empty stream to create empty content
stream = std::make_shared<minifi::io::BufferStream>();
}
auto ffr = std::make_shared<minifi::FlowFileRecord>();
for (const auto& attribute : attribute_map) {
ffr->setAttribute(attribute.first, attribute.second);
}
ffr->setResourceClaim(claim);
ffr->addAttribute("nanofi.version", API_VERSION);
ffr->setSize(ff->size);
std::string port_uuid = instance->port.port_id;
minifi_instance_ref->transfer(ffr, stream);
return 0;
}
flow * create_new_flow(nifi_instance * instance) {
NULL_CHECK(nullptr, instance);
auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
flow * area = static_cast<flow*>(malloc(1*sizeof(flow)));
NULL_CHECK(nullptr, area);
flow *new_flow = new(area) flow(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository());
return new_flow;
}
flow *create_flow(nifi_instance *instance, const char *first_processor) {
NULL_CHECK(nullptr, instance);
auto new_flow = create_new_flow(instance);
if(new_flow != nullptr && first_processor != nullptr && strlen(first_processor) > 0) {
// automatically adds it with success
new_flow->addProcessor(first_processor, first_processor);
}
return new_flow;
}
processor *add_python_processor(flow *flow, processor_logic* logic) {
if (nullptr == flow || nullptr == logic) {
return nullptr;
}
auto lambda = [logic](core::ProcessSession *ps, core::ProcessContext *cx) {
logic(static_cast<processor_session*>(ps), static_cast<processor_context*>(cx)); // Meh, sorry for this
};
auto proc = flow->addCallback(nullptr, lambda);
return static_cast<processor*>(proc.get());
}
flow * create_getfile(nifi_instance * instance, flow * parent_flow, GetFileConfig * c) {
static const std::string first_processor = "GetFile";
flow *new_flow = parent_flow == nullptr ? create_new_flow(instance) : parent_flow;
// automatically adds it with success
auto getFile = new_flow->addProcessor(first_processor, first_processor);
new_flow->setProperty(getFile, "Input Directory", c->directory);
new_flow->setProperty(getFile, "Keep Source File ", c->keep_source ? "true" : "false");
new_flow->setProperty(getFile, "Recurse Subdirectories", c->recurse ? "true" : "false");
return new_flow;
}
processor *add_processor(flow *flow, const char *processor_name) {
NULL_CHECK(nullptr, flow, processor_name);
auto proc = flow->addProcessor(processor_name, processor_name, core::Relationship("success", "description"), flow->hasProcessor());
return static_cast<processor*>(proc.get());
}
int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*)) {
return flow->setFailureCallback(onerror_callback) ? 0 : 1;
}
int set_failure_strategy(flow *flow, FailureStrategy strategy) {
return flow->setFailureStrategy(strategy) ? 0 : -1;
}
int set_propery_internal(core::Processor* proc, const char *name, const char *value) {
NULL_CHECK(-1, proc, name, value);
bool success = proc->setProperty(name, value) || (proc->supportsDynamicProperties() && proc->setDynamicProperty(name, value));
return success ? 0 : -2;
}
int set_property(processor *proc, const char *name, const char *value) {
NULL_CHECK(-1, proc);
return set_propery_internal(proc, name, value);
}
int set_standalone_property(standalone_processor *proc, const char *name, const char *value) {
NULL_CHECK(-1, proc);
return set_propery_internal(proc, name, value);
}
uint8_t get_property(const processor_context * context, const char * name, char * buffer, size_t size) {
std::string value;
if(!context->getDynamicProperty(name, value)) {
return -1;
}
size_t chars_to_copy = std::min(value.length(), size-1);
strncpy(buffer, value.data(), chars_to_copy);
buffer[chars_to_copy] = '\0';
return 0;
}
int free_flow(flow *flow) {
NULL_CHECK(-1, flow);
flow->~flow();
free(flow);
return 0;
}
flow_file_record* flowfile_to_record(std::shared_ptr<core::FlowFile> ff, const std::shared_ptr<minifi::core::ContentRepository>& crp) {
auto claim = ff->getResourceClaim();
if(claim == nullptr) {
return nullptr;
}
// create a flow file.
claim->increaseFlowFileRecordOwnedCount();
auto path = claim->getContentFullPath();
auto ffr = create_ff_object_na(path.c_str(), path.length(), ff->getSize());
ffr->ffp = static_cast<void*>(new std::shared_ptr<core::FlowFile>(ff));
ffr->attributes = ff->getAttributesPtr();
auto content_repo_ptr = static_cast<std::shared_ptr<minifi::core::ContentRepository>*>(ffr->crp);
*content_repo_ptr = crp;
return ffr;
}
flow_file_record* flowfile_to_record(std::shared_ptr<core::FlowFile> ff, ExecutionPlan* plan) {
if (ff == nullptr) {
return nullptr;
}
return flowfile_to_record(ff, plan->getContentRepo());
}
flow_file_record* get(processor_session *session, processor_context *context) {
auto ff = session->get();
if(!ff) {
return nullptr;
}
auto ffr = flowfile_to_record(ff, context->getContentRepository());
// The content of the flow file must be kept in a processor logic
ffr->keepContent = 1;
return ffr;
}
flow_file_record * get_next_flow_file(nifi_instance * instance, flow * flow) {
if (instance == nullptr || nullptr == flow)
return nullptr;
flow->reset();
while (flow->runNextProcessor()) {
}
return flowfile_to_record(flow->getCurrentFlowFile(), flow);
}
size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff_r, size_t size) {
if (nullptr == instance || nullptr == flow || nullptr == ff_r)
return 0;
size_t i = 0;
for (; i < size; i++) {
flow->reset();
auto ffr = get_next_flow_file(instance, flow);
if (ffr == nullptr) {
break;
}
ff_r[i] = ffr;
}
return i;
}
flow_file_record *invoke(standalone_processor* proc) {
return invoke_ff(proc, nullptr);
}
flow_file_record *invoke_ff(standalone_processor* proc, const flow_file_record *input_ff) {
if (proc == nullptr) {
return nullptr;
}
auto plan = ExecutionPlan::getPlan(proc->getUUIDStr());
if (!plan) {
// This is not a standalone processor, shouldn't be used with invoke!
return nullptr;
}
plan->reset();
if (input_ff) {
auto content_repo = static_cast<std::shared_ptr<minifi::core::ContentRepository> *>(input_ff->crp);
auto ff_data = std::make_shared<flowfile_input_params>();
if(input_ff->crp && (*content_repo)) {
std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(input_ff->contentLocation,
*content_repo);
ff_data->content_stream = (*content_repo)->read(*claim);
} else {
ff_data->content_stream = std::make_shared<minifi::io::BufferStream>();
file_buffer fb = file_to_buffer(input_ff->contentLocation);
ff_data->content_stream->write(fb.buffer, gsl::narrow<int>(fb.file_len));
free(fb.buffer);
}
ff_data->attributes = *static_cast<AttributeMap*>(input_ff->attributes);
plan->runNextProcessor(nullptr, ff_data);
}
while (plan->runNextProcessor()) {
}
return flowfile_to_record(plan->getCurrentFlowFile(), plan.get());
}
flow_file_record *invoke_chunk(standalone_processor* proc, uint8_t* buf, uint64_t size) {
if (proc == nullptr || buf == nullptr || size == 0) {
return nullptr;
}
auto plan = ExecutionPlan::getPlan(proc->getUUIDStr());
if (!plan) {
// This is not a standalone processor, shouldn't be used with invoke!
return nullptr;
}
plan->reset();
auto ff_data = std::make_shared<flowfile_input_params>();
ff_data->content_stream = std::make_shared<minifi::io::BufferStream>();
ff_data->content_stream->write(buf, gsl::narrow<int>(size));
plan->runNextProcessor(nullptr, ff_data);
while (plan->runNextProcessor()) {
}
return flowfile_to_record(plan->getCurrentFlowFile(), plan.get());
}
flow_file_record *invoke_file(standalone_processor* proc, const char* path) {
NULL_CHECK(nullptr, path, proc);
auto fb = file_to_buffer(path);
NULL_CHECK(nullptr, fb.buffer);
flow_file_record* ffr = invoke_chunk(proc, fb.buffer, fb.file_len);
free(fb.buffer);
return ffr;
}
int transfer(processor_session* session, flow *flow, const char *rel) {
if (nullptr == session || nullptr == flow || rel == nullptr) {
return -1;
}
core::Relationship relationship(rel, rel);
auto ff = flow->getNextFlowFile();
if (nullptr == ff) {
return -2;
}
session->transfer(ff, relationship);
return 0;
}
int var_add_custom_processor(custom_processor_args in) {
return ExecutionPlan::addCustomProcessor(in) ? 0 : -1;
}
int delete_custom_processor(const char * name) {
return ExecutionPlan::deleteCustomProcessor(name) - 1;
}
int transfer_to_relationship(flow_file_record * ffr, processor_session * ps, const char * relationship) {
if(ffr == nullptr || ffr->ffp == nullptr || ps == nullptr || relationship == nullptr || strlen(relationship) == 0) {
return -1;
}
auto ff_sptr = reinterpret_cast<std::shared_ptr<core::FlowFile>*>(ffr->ffp);
ps->transfer(*ff_sptr, core::Relationship(relationship, "desc"));
return 0;
}