blob: 709c6817d76eedfffd0b1ddb7e2ddfea557a9250 [file] [log] [blame]
#include "api/ecu.h"
#include "api/nanofi.h"
#include "core/string_utils.h"
#include "core/cstructs.h"
#include "core/file_utils.h"
#include "core/flowfiles.h"
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <limits.h>
#include <signal.h>
#include <sys/stat.h>
processor_params * procparams = NULL;
volatile sig_atomic_t stopped = 0;
void free_proc_params(const char * uuid) {
struct processor_params * pp = NULL;
HASH_FIND_STR(procparams, uuid, pp);
if (pp) {
free_flow_file_list(&pp->ff_list);
free(pp->properties->file_path);
free(pp->properties);
HASH_DEL(procparams, pp);
free(pp);
}
}
void signal_handler(int signum) {
if (signum == SIGINT || signum == SIGTERM) {
stopped = 1;
}
}
void init_common_input(tailfile_input_params * input_params, char ** args) {
if (args && *args) {
input_params->file = args[1];
input_params->interval = args[2];
input_params->instance = args[4];
input_params->tcp_port = args[5];
input_params->nifi_port_uuid = args[6];
}
}
tailfile_input_params init_logaggregate_input(char ** args) {
tailfile_input_params input_params;
memset(&input_params, 0, sizeof(input_params));
init_common_input(&input_params, args);
input_params.delimiter = args[3];
return input_params;
}
tailfile_input_params init_tailfile_chunk_input(char ** args) {
tailfile_input_params input_params;
memset(&input_params, 0, sizeof(input_params));
init_common_input(&input_params, args);
input_params.chunk_size = args[3];
return input_params;
}
int validate_input_params(tailfile_input_params * params, uint64_t * intrvl, uint64_t * port_num) {
if (access(params->file, F_OK) == -1) {
printf("Error: %s doesn't exist!\n", params->file);
return -1;
}
struct stat stats;
int ret = stat(params->file, &stats);
if (ret == -1) {
printf("Error occurred while getting file status {file: %s, error: %s}\n", params->file, strerror(errno));
return -1;
}
// Check for file existence
if (S_ISDIR(stats.st_mode)){
printf("Error: %s is a directory!\n", params->file);
return -1;
}
errno = 0;
*intrvl = (uint64_t)(strtoul(params->interval, NULL, 10));
if (errno != 0) {
printf("Invalid interval value specified\n");
return -1;
}
errno = 0;
*port_num = (uint64_t)(strtoul(params->tcp_port, NULL, 10));
if (errno != 0) {
printf("Cannot convert tcp port to numeric value\n");
return -1;
}
return 0;
}
void setup_signal_action() {
struct sigaction action;
memset(&action, 0, sizeof(sigaction));
action.sa_handler = signal_handler;
sigaction(SIGTERM, &action, NULL);
sigaction(SIGINT, &action, NULL);
}
nifi_proc_params setup_nifi_processor(tailfile_input_params * input_params, const char * processor_name, void(*callback)(processor_session *, processor_context *)) {
nifi_proc_params params;
nifi_port port;
port.port_id = input_params->nifi_port_uuid;
nifi_instance * instance = create_instance(input_params->instance, &port);
add_custom_processor(processor_name, callback);
standalone_processor * proc = create_processor(processor_name, instance);
params.instance = instance;
params.processor = proc;
return params;
}
void add_to_hash_table(flow_file_record * ffr, uint64_t offset, const char * uuid) {
struct processor_params * pp = NULL;
HASH_FIND_STR(procparams, uuid, pp);
if (pp == NULL) {
pp = (struct processor_params*)malloc(sizeof(struct processor_params));
memset(pp, 0, sizeof(struct processor_params));
strcpy(pp->uuid_str, uuid);
HASH_ADD_STR(procparams, uuid_str, pp);
}
add_flow_file_record(&pp->ff_list, ffr);
pp->curr_offset = offset;
}
void delete_all_flow_files_from_proc(const char * uuid) {
struct processor_params * pp = NULL;
HASH_FIND_STR(procparams, uuid, pp);
if (pp) {
struct flow_file_list * head = pp->ff_list;
while (head) {
struct flow_file_list * tmp = head;
free_flowfile(tmp->ff_record);
head = head->next;
free(tmp);
}
pp->ff_list = head;
}
}
void delete_completed_flow_files_from_proc(const char * uuid) {
struct processor_params * pp = NULL;
HASH_FIND_STR(procparams, uuid, pp);
if (pp) {
struct flow_file_list * head = pp->ff_list;
while (head) {
struct flow_file_list * tmp = head;
if (tmp->complete) {
free_flowfile(tmp->ff_record);
head = head->next;
free(tmp);
}
else {
break;
}
}
pp->ff_list = head;
}
}
uint64_t get_current_offset(const char * uuid) {
struct processor_params * pp = NULL;
HASH_FIND_STR(procparams, uuid, pp);
if (pp) {
return pp->curr_offset;
}
return 0;
}
processor_params * get_proc_params(const char * uuid) {
struct processor_params * pp = NULL;
HASH_FIND_STR(procparams, uuid, pp);
return pp;
}
void update_proc_params(const char * uuid, uint64_t value, flow_file_list * ffl) {
struct processor_params * pp = get_proc_params(uuid);
if (!pp) {
pp = (struct processor_params *)malloc(sizeof(struct processor_params));
memset(pp, 0, sizeof(struct processor_params));
pp->ff_list = ffl;
pp->curr_offset = value;
strcpy(pp->uuid_str, uuid);
HASH_ADD_STR(procparams, uuid_str, pp);
return;
}
delete_all_flow_files_from_proc(uuid);
pp->curr_offset += value;
pp->ff_list = ffl;
}
uint64_t update_curr_offset(const char * uuid, uint64_t value) {
struct processor_params * pp = get_proc_params(uuid);
if (pp) {
pp->curr_offset += value;
return pp->curr_offset;
}
pp = (struct processor_params *)malloc(sizeof(struct processor_params));
memset(pp, 0, sizeof(struct processor_params));
strcpy(pp->uuid_str, uuid);
pp->curr_offset = value;
HASH_ADD_STR(procparams, uuid_str, pp);
return pp->curr_offset;
}
struct proc_properties * get_processor_properties(const char * uuid) {
if (!uuid) {
return NULL;
}
struct processor_params * pp = NULL;
HASH_FIND_STR(procparams, uuid, pp);
if (!pp) {
return NULL;
}
return pp->properties;
}
void add_processor_properties(const char * uuid, struct proc_properties * const props) {
struct processor_params * pp = get_proc_params(uuid);
if (pp) {
pp->properties = props;
return;
}
pp = (struct processor_params *)malloc(sizeof(struct processor_params));
memset(pp, 0, sizeof(struct processor_params));
strcpy(pp->uuid_str, uuid);
pp->properties = props;
HASH_ADD_STR(procparams, uuid_str, pp);
}
void on_trigger_tailfilechunk(processor_session * ps, processor_context * ctx) {
char uuid_str[37];
get_proc_uuid_from_context(ctx, uuid_str);
initialize_content_repo(ctx, uuid_str);
struct proc_properties * props = get_processor_properties(uuid_str);
if (!props) {
char file_path[4096];
char chunk_size[50];
if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) {
return;
}
if (get_property(ctx, "chunk_size", chunk_size, sizeof(chunk_size)) != 0) {
return;
}
errno = 0;
uint64_t chunk_size_value = strtoul(chunk_size, NULL, 10);
if (errno != 0 || chunk_size_value == 0) {
printf("Invalid chunk size specified\n");
return;
}
props = (struct proc_properties *)malloc(sizeof(struct proc_properties));
memset(props, 0, sizeof(struct proc_properties));
int len = strlen(file_path);
props->file_path = (char *)malloc((len + 1) * sizeof(char));
strncpy(props->file_path, file_path, len);
props->file_path[len] = '\0';
props->chunk_size = chunk_size_value;
add_processor_properties(uuid_str, props);
}
FILE * fp = fopen(props->file_path, "rb");
if (!fp) {
printf("Unable to open file. {file: %s, reason: %s}\n", props->file_path, strerror(errno));
return;
}
char * buff = (char *)malloc((props->chunk_size +1 ) * sizeof(char));
size_t bytes_read = 0;
uint64_t curr_offset = get_current_offset(uuid_str);
fseek(fp, curr_offset, SEEK_SET);
while ((bytes_read = fread(buff, 1, props->chunk_size, fp)) > 0) {
if (bytes_read < props->chunk_size) {
break;
}
buff[props->chunk_size] = '\0';
flow_file_record * ffr = write_to_flow(buff, strlen(buff), ctx);
curr_offset = ftell(fp);
add_attributes(ffr, props->file_path, curr_offset);
add_to_hash_table(ffr, curr_offset, uuid_str);
}
free(buff);
fclose(fp);
}
flow_file_info log_aggregate(const char * file_path, char delim, processor_context * ctx) {
flow_file_info ff_info;
memset(&ff_info, 0, sizeof(ff_info));
if (!file_path) {
return ff_info;
}
char uuid_str[37];
get_proc_uuid_from_context(ctx, uuid_str);
char buff[MAX_BYTES_READ + 1];
errno = 0;
FILE * fp = fopen(file_path, "rb");
if (!fp) {
printf("Cannot open file: {file: %s, reason: %s}\n", file_path, strerror(errno));
return ff_info;
}
uint64_t curr_offset = get_current_offset(uuid_str);
fseek(fp, curr_offset, SEEK_SET);
flow_file_list * ffl = NULL;
size_t bytes_read = 0;
while ((bytes_read = fread(buff, 1, MAX_BYTES_READ, fp)) > 0) {
buff[bytes_read] = '\0';
struct token_list tokens = tokenize_string_tailfile(buff, delim);
if (tokens.total_bytes > 0) {
ff_info.total_bytes += tokens.total_bytes;
curr_offset += tokens.total_bytes;
fseek(fp, curr_offset, SEEK_SET);
}
token_node * head;
for (head = tokens.head; head && head->data; head = head->next) {
flow_file_record * ffr = write_to_flow(head->data, strlen(head->data), ctx);
add_attributes(ffr, file_path, curr_offset);
add_flow_file_record(&ffl, ffr);
}
free_all_tokens(&tokens);
}
fclose(fp);
ff_info.ff_list = ffl;
return ff_info;
}
struct proc_properties * get_properties(const char * uuid, processor_context * ctx) {
struct proc_properties * props = get_processor_properties(uuid);
if (props) {
return props;
}
char file_path[4096];
char delimiter[3];
if (get_property(ctx, "file_path", file_path, sizeof(file_path)) != 0) {
return props;
}
if (get_property(ctx, "delimiter", delimiter, sizeof(delimiter)) != 0) {
printf("No delimiter found\n");
return props;
}
if (strlen(delimiter) == 0) {
printf("Delimiter not specified or it is empty\n");
return props;
}
props = (struct proc_properties *)malloc(sizeof(struct proc_properties));
memset(props, 0, sizeof(struct proc_properties));
char delim = delimiter[0];
if (delim == '\\') {
if (strlen(delimiter) > 1) {
switch (delimiter[1]) {
case 'r':
delim = '\r';
break;
case 't':
delim = '\t';
break;
case 'n':
delim = '\n';
break;
case '\\':
delim = '\\';
break;
default:
break;
}
}
}
int len = strlen(file_path);
props->file_path = (char *)malloc((len + 1) * sizeof(char));
strncpy(props->file_path, file_path, len);
props->file_path[len] = '\0';
props->delimiter = delim;
add_processor_properties(uuid, props);
return props;
}
void on_trigger_logaggregator(processor_session * ps, processor_context * ctx) {
char uuid_str[37];
get_proc_uuid_from_context(ctx, uuid_str);
struct proc_properties * props = get_properties(uuid_str, ctx);
if (!props || !props->file_path) return;
char delim = props->delimiter;
initialize_content_repo(ctx, uuid_str);
flow_file_info ff_info = log_aggregate(props->file_path, delim, ctx);
update_proc_params(uuid_str, ff_info.total_bytes, ff_info.ff_list);
}
void write_flow_file(flow_file_record * ffr, const char * buff, size_t count) {
FILE * ffp = fopen(ffr->contentLocation, "ab");
if (!ffp) return;
if (fwrite(buff, 1, count, ffp) < count) {
fclose(ffp);
free_flowfile(ffr);
return;
}
fclose(ffp);
}
flow_file_list * get_last_flow_file(const char * uuid) {
struct processor_params * pp = NULL;
HASH_FIND_STR(procparams, uuid, pp);
if (!pp) {
return NULL;
}
flow_file_list * ff_list = pp->ff_list;
flow_file_list * el = NULL;
LL_FOREACH(ff_list, el) {
if (el && !el->next) {
return el;
}
}
return NULL;
}
flow_file_list * add_flow_file_to_proc_params(const char * uuid, flow_file_record * ffr) {
struct processor_params * pp = NULL;
HASH_FIND_STR(procparams, uuid, pp);
if (!pp) {
pp = (struct processor_params *)malloc(sizeof(struct processor_params));
memset(pp, 0, sizeof(struct processor_params));
strcpy(pp->uuid_str, uuid);
HASH_ADD_STR(procparams, uuid_str, pp);
}
flow_file_list * ffl_node = add_flow_file_record(&pp->ff_list, ffr);
ffl_node->complete = 0;
return ffl_node;
}
void on_trigger_tailfiledelimited(processor_session * ps, processor_context * ctx) {
char uuid_str[37];
get_proc_uuid_from_context(ctx, uuid_str);
initialize_content_repo(ctx, uuid_str);
struct proc_properties * props = get_properties(uuid_str, ctx);
if (!props || !props->file_path) return;
char delim = props->delimiter;
FILE * fp = fopen(props->file_path, "rb");
if (!fp) {
printf("Unable to open file. {file: %s, reason: %s}\n", props->file_path, strerror(errno));
return;
}
char buff[MAX_BYTES_READ + 1];
size_t bytes_read = 0;
uint64_t curr_offset = get_current_offset(uuid_str);
fseek(fp, curr_offset, SEEK_SET);
flow_file_list * ffl_node = get_last_flow_file(uuid_str);
while ((bytes_read = fread(buff, 1, MAX_BYTES_READ, fp)) > 0) {
buff[bytes_read] = '\0';
const char * begin = buff;
const char * end = NULL;
while ((end = strchr(begin, delim))) {
uint64_t len = end - begin;
if (len > 0) {
if (!ffl_node || ffl_node->complete) {
ffl_node = add_flow_file_to_proc_params(uuid_str, generate_flow(ctx));
}
write_flow_file(ffl_node->ff_record, begin, len);
update_curr_offset(uuid_str, (len + 1));
}
else {
update_curr_offset(uuid_str, 1);
}
if (ffl_node) {
ffl_node->complete = 1;
update_attributes(ffl_node->ff_record, props->file_path, get_current_offset(uuid_str));
}
begin = (end + 1);
}
if (!end && *begin != '\0') {
if (!ffl_node || ffl_node->complete) {
ffl_node = add_flow_file_to_proc_params(uuid_str, generate_flow(ctx));
}
size_t count = strlen(begin);
write_flow_file(ffl_node->ff_record, begin, count);
update_curr_offset(uuid_str, count);
update_attributes(ffl_node->ff_record, props->file_path, get_current_offset(uuid_str));
}
}
fclose(fp);
}