blob: 17f724613c6b4d18d3147f3abed02e53cc9dc3c5 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <string>
#include <map>
#include <memory>
#include <utility>
#include "core/Core.h"
#include "capi/api.h"
#include "capi/expect.h"
#include "capi/Instance.h"
#include "capi/Plan.h"
#include "ResourceClaim.h"
#include "processors/GetFile.h"
#include "core/logging/LoggerConfiguration.h"
class API_INITIALIZER {
static int initialized;
};
int API_INITIALIZER::initialized = initialize_api();
int initialize_api() {
logging::LoggerConfiguration::getConfiguration().disableLogging();
return 1;
}
void enable_logging() {
logging::LoggerConfiguration::getConfiguration().enableLogging();
}
class DirectoryConfiguration {
protected:
DirectoryConfiguration() {
minifi::setDefaultDirectory(DEFAULT_CONTENT_DIRECTORY);
}
public:
static void initialize() {
static DirectoryConfiguration configure;
}
};
/**
* Creates a NiFi Instance from the url and output port.
* @param url http URL for NiFi instance
* @param port Remote output port.
*/
nifi_instance *create_instance(char *url, nifi_port *port) {
// make sure that we have a thread safe way of initializing the content directory
DirectoryConfiguration::initialize();
nifi_instance *instance = new nifi_instance;
instance->instance_ptr = new minifi::Instance(url, port->port_id);
instance->port.port_id = port->port_id;
return instance;
}
/**
* Initializes the instance
*/
void initialize_instance(nifi_instance *instance) {
auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
minifi_instance_ref->setRemotePort(instance->port.port_id);
}
/*
typedef int c2_update_callback(char *);
typedef int c2_stop_callback(char *);
typedef int c2_start_callback(char *);
*/
void enable_async_c2(nifi_instance *instance, C2_Server *server, c2_stop_callback *c1, c2_start_callback *c2, c2_update_callback *c3) {
auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
minifi_instance_ref->enableAsyncC2(server, c1, c2, c3);
}
/**
* Sets a property within the nifi instance
* @param instance nifi instance
* @param key key in which we will set the valiue
* @param value
*/
void set_property(nifi_instance *instance, char *key, char *value) {
auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
minifi_instance_ref->getConfiguration()->set(key, value);
}
/**
* Reclaims memory associated with a nifi instance structure.
* @param instance nifi instance.
*/
void free_instance(nifi_instance* instance) {
if (instance != nullptr) {
delete ((minifi::Instance*) instance->instance_ptr);
delete instance;
}
}
/**
* Creates a flow file record
* @param file file to place into the flow file.
*/
flow_file_record* create_flowfile(const char *file, const size_t len) {
flow_file_record *new_ff = new flow_file_record;
new_ff->attributes = new std::map<std::string, std::string>();
new_ff->contentLocation = new char[len + 1];
snprintf(new_ff->contentLocation, len + 1, "%s", file);
std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
// set the size of the flow file.
new_ff->size = in.tellg();
return new_ff;
}
/**
* Creates a flow file record
* @param file file to place into the flow file.
*/
flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size) {
flow_file_record *new_ff = new flow_file_record;
new_ff->attributes = new std::map<std::string, std::string>();
new_ff->contentLocation = new char[len + 1];
snprintf(new_ff->contentLocation, len + 1, "%s", file);
std::ifstream in(file, std::ifstream::ate | std::ifstream::binary);
// set the size of the flow file.
new_ff->size = size;
return new_ff;
}
/**
* Reclaims memory associated with a flow file object
* @param ff flow file record.
*/
void free_flowfile(flow_file_record *ff) {
if (ff != nullptr) {
if (ff->in != nullptr) {
auto instance = static_cast<nifi_instance*>(ff->in);
auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
auto content_repo = minifi_instance_ref->getContentRepository();
std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
content_repo->remove(claim);
}
auto map = static_cast<std::map<std::string, std::string>*>(ff->attributes);
delete[] ff->contentLocation;
delete map;
delete ff;
}
}
/**
* Adds an attribute
* @param ff flow file record
* @param key key
* @param value value to add
* @param size size of value
* @return 0
*/
uint8_t add_attribute(flow_file_record *ff, char *key, void *value, size_t size) {
auto attribute_map = static_cast<std::map<std::string, std::string>*>(ff->attributes);
attribute_map->insert(std::pair<std::string, std::string>(key, std::string(static_cast<char*>(value), size)));
return 0;
}
/*
* Obtains the attribute.
* @param ff flow file record
* @param key key
* @param caller_attribute caller supplied object in which we will copy the data ptr
* @return 0 if successful, -1 if the key does not exist
*/
uint8_t get_attribute(flow_file_record *ff, char *key, attribute *caller_attribute) {
auto attribute_map = static_cast<std::map<std::string, std::string>*>(ff->attributes);
auto find = attribute_map->find(key);
if (find != attribute_map->end()) {
caller_attribute->key = key;
caller_attribute->value = static_cast<void*>(const_cast<char*>(find->second.data()));
caller_attribute->value_size = find->second.size();
return 0;
}
return -1;
}
/**
* Removes a key from the attribute chain
* @param ff flow file record
* @param key key to remove
* @return 0 if removed, -1 otherwise
*/
uint8_t remove_attribute(flow_file_record *ff, char *key) {
auto attribute_map = static_cast<std::map<std::string, std::string>*>(ff->attributes);
auto find = attribute_map->find(key);
if (find != attribute_map->end()) {
attribute_map->erase(find);
return 0;
}
return -1;
}
/**
* Transmits the flowfile
* @param ff flow file record
* @param instance nifi instance structure
*/
void transmit_flowfile(flow_file_record *ff, nifi_instance *instance) {
auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
// in the unlikely event the user forgot to initialize the instance, we shall do it for them.
if (UNLIKELY(minifi_instance_ref->isRPGConfigured() == false)) {
minifi_instance_ref->setRemotePort(instance->port.port_id);
}
auto attribute_map = static_cast<std::map<std::string, std::string>*>(ff->attributes);
auto no_op = minifi_instance_ref->getNoOpRepository();
auto content_repo = minifi_instance_ref->getContentRepository();
std::shared_ptr<minifi::ResourceClaim> claim = std::make_shared<minifi::ResourceClaim>(ff->contentLocation, content_repo);
claim->increaseFlowFileRecordOwnedCount();
claim->increaseFlowFileRecordOwnedCount();
auto ffr = std::make_shared<minifi::FlowFileRecord>(no_op, content_repo, *attribute_map, claim);
ffr->addAttribute("nanofi.version", API_VERSION);
ffr->setSize(ff->size);
std::string port_uuid = instance->port.port_id;
minifi_instance_ref->transfer(ffr);
}
flow *create_flow(nifi_instance *instance, const char *first_processor) {
auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
flow *new_flow = new flow;
auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository());
new_flow->plan = execution_plan;
// automatically adds it with success
execution_plan->addProcessor(first_processor, first_processor);
return new_flow;
}
flow *create_getfile(nifi_instance *instance, flow *parent_flow, GetFileConfig *c) {
std::string first_processor = "GetFile";
auto minifi_instance_ref = static_cast<minifi::Instance*>(instance->instance_ptr);
flow *new_flow = parent_flow == 0x00 ? new flow : parent_flow;
if (parent_flow == 0x00) {
auto execution_plan = new ExecutionPlan(minifi_instance_ref->getContentRepository(), minifi_instance_ref->getNoOpRepository(), minifi_instance_ref->getNoOpRepository());
new_flow->plan = execution_plan;
}
ExecutionPlan *plan = static_cast<ExecutionPlan*>(new_flow->plan);
// automatically adds it with success
auto getFile = plan->addProcessor(first_processor, first_processor);
plan->setProperty(getFile, processors::GetFile::Directory.getName(), c->directory);
plan->setProperty(getFile, processors::GetFile::KeepSourceFile.getName(), c->keep_source ? "true" : "false");
plan->setProperty(getFile, processors::GetFile::Recurse.getName(), c->recurse ? "true" : "false");
return new_flow;
}
void free_flow(flow *flow) {
if (flow == nullptr)
return;
auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
delete execution_plan;
delete flow;
}
flow_file_record *get_next_flow_file(nifi_instance *instance, flow *flow) {
auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
execution_plan->reset();
while (execution_plan->runNextProcessor()) {
}
auto ff = execution_plan->getCurrentFlowFile();
if (ff == nullptr)
return nullptr;
auto claim = ff->getResourceClaim();
if (claim != nullptr) {
// create a flow file.
claim->increaseFlowFileRecordOwnedCount();
auto path = claim->getContentFullPath();
auto ffr = create_ff_object(path.c_str(), path.length(), ff->getSize());
ffr->in = instance;
return ffr;
} else {
return nullptr;
}
}
size_t get_flow_files(nifi_instance *instance, flow *flow, flow_file_record **ff_r, size_t size) {
auto execution_plan = static_cast<ExecutionPlan*>(flow->plan);
size_t i = 0;
for (; i < size; i++) {
execution_plan->reset();
while (execution_plan->runNextProcessor()) {
}
auto ff = execution_plan->getCurrentFlowFile();
if (ff == nullptr)
break;
auto claim = ff->getResourceClaim();
if (claim != nullptr) {
claim->increaseFlowFileRecordOwnedCount();
auto path = claim->getContentFullPath();
// create a flow file.
ff_r[i] = create_ff_object(path.c_str(), path.length(), ff->getSize());
ff_r[i]->in = instance;
} else {
break;
}
}
return i;
}