blob: bec0f834dd8bb820e166f4452805ab7b156bac31 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
#define LIBMINIFI_INCLUDE_CAPI_NANOFI_H_
#include <stddef.h>
#include <stdint.h>
#include "core/cstructs.h"
#include "core/processors.h"
int initialize_api();
#ifdef __cplusplus
extern "C" {
#endif
/**
* Updates with every release. Functions used here constitute the public API of NanoFi.
*
* Changes here will follow semver
*/
#define API_VERSION "0.02"
#define SUCCESS_RELATIONSHIP "success"
#define FAILURE_RELATIONSHIP "failure"
#define NULL_CHECK(ret_val, ...) \
do { \
const void *_p[] = { __VA_ARGS__ }; \
int _i; \
for (_i = 0; _i < sizeof(_p)/sizeof(*_p); _i++) { \
if (_p[_i] == NULL) { \
return ret_val; \
} \
} \
} while(0)
/**
* Enables logging (disabled by default)
**/
void enable_logging();
/**
* Sets terminate callback. The callback is executed upon termination (undhandled exception in C++ backend)
* @param terminate_callback the callback to execute
**/
void set_terminate_callback(void (*terminate_callback)());
/****
* ##################################################################
* BASE NIFI OPERATIONS
* ##################################################################
*/
nifi_instance *create_instance_repo(const char *url, nifi_port *port, const char* const repo_type);
/**
* Creates a new MiNiFi instance
* @param url remote URL the instance connects to
* @param port remote port the instance connects to
* @return pointer to the new instance
**/
static nifi_instance *create_instance(const char *url, nifi_port *port){
return create_instance_repo(url, port, "filesystemrepository");
}
/**
* Initialize remote connection of instance for transfers
* @param instance
**/
void initialize_instance(nifi_instance * instance);
/**
* Frees instance
* @attention Any action on flows that belong to the freed instance are undefined after this is done!
* It's recommended to free all flows before freeing the instance.
* @param instance instance to be freed
**/
void free_instance(nifi_instance * instance);
/****
* ##################################################################
* C2 OPERATIONS
* ##################################################################
*/
typedef int c2_update_callback(char *);
typedef int c2_stop_callback(char *);
typedef int c2_start_callback(char *);
void enable_async_c2(nifi_instance *, C2_Server *, c2_stop_callback *, c2_start_callback *, c2_update_callback *);
/**
* Creates a new, empty flow
* @param instance the instance new flow will belong to
* @return a pointer to the created flow
**/
flow *create_new_flow(nifi_instance * instance);
/**
* Creates new flow and adds the first processor in case a valid name is provided
* @deprecated as there is no proper indication of processor adding errors,
* usage of "create_new_flow" and "add_processor is recommended instead
* @param instance the instance new flow will belong to
* @param first_processor name of the first processor to be instanciated
* @attention in case first processor is empty or doesn't name any existing processor, an empty flow is returned.
* @return a pointer to the created flow
**/
DEPRECATED(0.6.0,2.0) flow *create_flow(nifi_instance * instance, const char * first_processor);
/**
* Add a getfile processor to "parent" flow.
* Creates new flow in instance in case "parent" is nullptr
* @param instance the instance the flow belongs to
* @param parent the flow to be extended with a new getfile processor
* @param c configuration of the new processor
* @return parent in case it wasn't null, otherwise a pointer to a new flow
*/
flow *create_getfile(nifi_instance *instance, flow *parent, GetFileConfig *c);
/**
* Extend a flow with a new processor
* @param flow the flow to be extended with the new processor
* @param name name of the new processor
* @return pointer to the new processor or nullptr in case it cannot be instantiated (wrong name?)
**/
processor *add_processor(flow * flow, const char * name);
processor *add_python_processor(flow *, processor_logic* logic);
/**
* Create a standalone instance of the given processor.
* Standalone instances can be invoked without having an instance/flow that contains them.
* @param name the name of the processor to instanciate
* @return pointer to the new processor or nullptr in case it cannot be instantiated (wrong name?)
**/
standalone_processor *create_processor(const char * name, nifi_instance * instance);
/**
* Free a standalone processor
* @param processor the processor to be freed
*/
void free_standalone_processor(standalone_processor* processor);
/**
* Register your callback to received flow files that the flow failed to process
* @attention The flow file ownership is transferred to the callback!
* @attention The first callback should be registered before the flow is used. Can be changed later during runtime.
* @param flow flow the callback belongs to
* @param onerror_callback callback to execute in case of failure
* @return 0 in case of success, -1 otherwise (flow is already in use)
**/
int add_failure_callback(flow *flow, void (*onerror_callback)(flow_file_record*));
/**
* Set failure strategy. Please use the enum defined in cstructs.h
* Can be changed runtime.
* The default strategy is AS IS.
* @param flow the flow to set strategy for
* @param strategy the strategy to be set
* @return 0 (success), -1 (strategy cannot be set - no failure callback added?)
**/
int set_failure_strategy(flow *flow, FailureStrategy strategy);
/**
* Set property for a processor
* @param processor the processor the property is set for
* @param name name of the property
* @param value value of the property
* @return 0 in case of success, -1 otherwise (the processor doesn't support such property)
**/
int set_property(processor * processor, const char * name, const char * value);
/**
* Set property for a standalone processor
* @param processor the processor the property is set for
* @param name name of the property
* @param value value of the property
* @return 0 in case of success, -1 otherwise (the processor doesn't support such property)
**/
int set_standalone_property(standalone_processor * processor, const char * name, const char * value);
/**
* Set property for an instance
* @param instance the instance the property is set for
* @param name name of the property
* @param value value of the property
* @return 0 in case of success, -1 otherwise. Always succeeds unless instance or name is nullptr/emtpy.
**/
int set_instance_property(nifi_instance *instance, const char * name, const char * value);
/**
* Get a property. Should be used in custom processor logic callbacks.
* Writes the value of the property to the buffer.
* Nothing is written to the buffer in case the property is not found (return value != 0)
* The result is always null-terminated, at most size-1 characters are written to the buffer.
* @param context the current processor context
* @param name name of the property
* @param buffer buffer to write the value of the property
* @param size size of the buffer
* @return 0 in case of success (property found), -1 otherwise
**/
uint8_t get_property(const processor_context * context, const char * name, char * buffer, size_t size);
/**
* Free a flow
* @param flow the flow to free
* @attention All the processor in the flow are freed, too! Actions performed on freed processors are undefined!
* @return 0 in case of success, -1 otherwise. Always succeeds unless flow is nullptr.
**/
int free_flow(flow * flow);
/**
* Get the next flow file of the given flow
* @param instance the instance the flow belongs to
* @param flow the flow to get flowfile from
* @return a flow file record or nullptr in case no flowfile was generated by the flow
**/
flow_file_record *get_next_flow_file(nifi_instance *, flow *);
/**
* Get all flow files of the given flow
* @param instance the instance the flow belongs to
* @param flow the flow to get flowfiles from
* @param flowfiles target area to copy the flowfiles to
* @param size the maximum number of flowfiles to copy to target (size of target)
* @return the number of flow files copies to target. Less or equal to size.
**/
size_t get_flow_files(nifi_instance * instance, flow * flow, flow_file_record ** flowfiles, size_t size);
/**
* Invoke a standalone processor without input data.
* The processor is expected to generate flow file.
* @return a flow file record or nullptr in case no flowfile was generated
**/
flow_file_record *invoke(standalone_processor* proc);
/**
* Invoke a standalone processor with input flow file
* @param input_ff input flow file, which can belong be the output of another processor or flow
* @return a flow file record or nullptr in case no flowfile was generated
**/
flow_file_record *invoke_ff(standalone_processor* proc, const flow_file_record *input_ff);
/**
* Invoke a standalone processor with file input
* @param path specifies the file system path of the input file
* @return a flow file record or nullptr in case no flowfile was generated
**/
flow_file_record *invoke_file(standalone_processor* proc, const char* path);
/**
* Invoke a standalone processor with some in-memory data
* @param buf specifies the beginning of the input buffer
* @param size specifies the size of the buffer
* @return a flow file record or nullptr in case no flowfile was generated
**/
flow_file_record *invoke_chunk(standalone_processor *proc, uint8_t *buf, uint64_t size);
DEPRECATED(0.6.0, 2.0) int transfer(processor_session* session, flow *flow, const char *rel);
/**
* Creates a flow file record based on a file
* @param file source file
* @param len length of the file name
* @return a flow file record or nullptr in case no flowfile was generated
*/
flow_file_record* create_flowfile(const char *file, const size_t len);
/**
* Creates a flow file record based on a file
* @param file source file
* @param len length of the file name
* @param size size of the file
* @return a flow file record or nullptr in case no flowfile was generated
*/
flow_file_record* create_ff_object(const char *file, const size_t len, const uint64_t size);
/**
* Creates a flow file record based on a file, without attributes
* @attention attributes cannot be added later!
* @param file source file
* @param len length of the file name
* @param size size of the file
* @return a flow file record or nullptr in case no flowfile was generated
*/
flow_file_record* create_ff_object_na(const char *file, const size_t len, const uint64_t size);
/**
* Creates a flow file record without content. Only attributes can be added.
* @attention content cannot be added later!
* @return a flow file record or nullptr in case no flowfile was generated
*/
flow_file_record* create_ff_object_nc();
/**
* Adds content to the flow file record.
* @param instance the nifi instance
* @param proc the standalone processor
* @return a flow file record
*/
flow_file_record* generate_flow_file(nifi_instance * instance, standalone_processor * proc);
/**
* Adds content to the flow file record.
* @param ctx the processor context
* @return a flow file record
*/
flow_file_record * generate_flow(processor_context * ctx);
/**
* Get incoming flow file. To be used in processor logic callbacks.
* @param session current processor session
* @param context current processor context
* @return a flow file record or nullptr in case there is none in the session
**/
flow_file_record* get(processor_session *session, processor_context *context);
/**
* Free flow file
* @param ff flow file
**/
void free_flowfile(flow_file_record* ff);
/**
* Adds an attribute, fails in case there is already an attribute with the given key.
* @param ff flow file
* @param key name of attribute
* @param value location of value
* @size size size of the data pointed by "value"
* @return 0 in case of success, -1 otherwise (already existed)
**/
int8_t add_attribute(flow_file_record*, const char *key, void *value, size_t size);
/**
* Updates an attribute (adds if it hasn't existed before)
* @param ff flow file
* @param key name of attribute
* @param value location of value
* @size size size of the data pointed by "value"
**/
void update_attribute(flow_file_record* ff, const char *key, void *value, size_t size);
/**
* Get the value of an attribute. Value and value size are written to parameter "caller_attribute"
* @param ff flow file
* @param caller_attribute attribute structure to provide name and get value, size
* @return 0 in case of success, -1 otherwise (no such attribute)
**/
int8_t get_attribute(const flow_file_record *ff, attribute *caller_attribute);
/**
* Get the quantity of attributes
* @param ff flow file
* @return the number of attributes
**/
int get_attribute_quantity(const flow_file_record *ff);
/**
* Copies all attributes of the flowfile that fits target.
* @param ff flow file
* @param target attribute set to copy to. target->size determines the maximum number of attributes copied
* @return the number of attributes copied, which is the minimum of attribute quantity and target size
**/
int get_all_attributes(const flow_file_record* ff, attribute_set *target);
/**
* reads the content of a flow file
* @param target reference in which will set the result
* @param size max number of bytes to read (use flow_file_record->size to get the whole content)
* @return resulting read size (<=size)
**/
int get_content(const flow_file_record* ff, uint8_t* target, int size);
/**
* Removes an attribute
* @param name name of the attribute
* @return 0 on success, -1 otherwise (doesn't exist)
**/
int8_t remove_attribute(flow_file_record*, const char * key);
/****
* ##################################################################
* Remote NIFI OPERATIONS
* ##################################################################
*/
int transmit_flowfile(flow_file_record *, nifi_instance *);
/****
* ##################################################################
* API functions for user-defined processor
* ##################################################################
*/
typedef struct {
const char * name;
ontrigger_callback * ontr_cb;
onschedule_callback * onsc_cb;
} custom_processor_args;
/**
* Adds a custom processor for later instantiation
* @param name name of the processor
* @param in the name and the callbacks used for the processor
* @attention it's recommended to use this function via the variadic arg macro: the caller doesn't need to create the
* parameter struct and callback arguments can be optional.
* @return 0 on success, -1 otherwise (name already in use for eg.)
**/
int var_add_custom_processor(custom_processor_args in);
#define add_custom_processor(...) var_add_custom_processor((custom_processor_args){__VA_ARGS__});
/**
* Removes a custom processor
* @param name name of the processor
* @return 0 on success, -1 otherwise (didn't exist)
**/
int delete_custom_processor(const char * name);
/**
* Transfers a flowfile to the given relationship
* This function is only to be used within processor logic callback
* @param ffr flow file to be transfered
* @param ps processor session the transfer happens within
* @param relationship name of the relationship ("success" and "failure" are supported currently)
* @return 0 on success, -1 otherwise (didn't exist)
**/
int transfer_to_relationship(flow_file_record * ffr, processor_session * ps, const char * relationship);
/**
* Write content to a flow file and return a pointer to flow file record
* @param buff, the buffer to read content from
* @param count the number of bytes to read
* @param ctx the processor context
*/
flow_file_record * write_to_flow(const char * buff, size_t count, processor_context * ctx);
/**
* Initialize content repository
* @param ctx the processor context
*/
void initialize_content_repo(processor_context * ctx, const char * uuid);
/**
* Clear content repository contents
*/
void clear_content_repo(const nifi_instance * instance);
/**
* Get the processor uuid from processor context
*/
void get_proc_uuid_from_context(const processor_context * ctx, char * uuid_target);
/**
* Get the processor uuid from processor
*/
void get_proc_uuid_from_processor(standalone_processor * proc, char * uuid_target);
/****
* ##################################################################
* Persistence Operations
* ##################################################################
*/
#ifdef __cplusplus
}
#endif
#endif /* LIBMINIFI_INCLUDE_CAPI_NANOFI_H_ */