blob: f9dc678c40ddd3d7ea1668f1b42bc150ee914b92 [file] [log] [blame]
/**
* @file ListenSyslog.h
* ListenSyslog class declaration
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __LISTEN_SYSLOG_H__
#define __LISTEN_SYSLOG_H__
#include <stdio.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#include <chrono>
#include <thread>
#include "FlowFileRecord.h"
#include "core/Processor.h"
#include "core/ProcessSession.h"
#include "core/Core.h"
#include "core/Resource.h"
#include "core/logging/LoggerConfiguration.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
// SyslogEvent
typedef struct {
char *payload;
uint64_t len;
} SysLogEvent;
// ListenSyslog Class
class ListenSyslog : public core::Processor {
public:
// Constructor
/*!
* Create a new processor
*/
ListenSyslog(std::string name, uuid_t uuid = NULL)
: Processor(name, uuid),
logger_(logging::LoggerFactory<ListenSyslog>::getLogger()) {
_eventQueueByteSize = 0;
_serverSocket = 0;
_recvBufSize = 65507;
_maxSocketBufSize = 1024 * 1024;
_maxConnections = 2;
_maxBatchSize = 1;
_messageDelimiter = "\n";
_protocol = "UDP";
_port = 514;
_parseMessages = false;
_serverSocket = 0;
_maxFds = 0;
FD_ZERO(&_readfds);
_thread = NULL;
_resetServerSocket = false;
_serverTheadRunning = false;
}
// Destructor
virtual ~ListenSyslog() {
_serverTheadRunning = false;
if (this->_thread)
delete this->_thread;
// need to reset the socket
std::vector<int>::iterator it;
for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) {
int clientSocket = *it;
close(clientSocket);
}
_clientSockets.clear();
if (_serverSocket > 0) {
logger_->log_debug("ListenSysLog Server socket %d close", _serverSocket);
close(_serverSocket);
_serverSocket = 0;
}
}
// Processor Name
static constexpr char const *ProcessorName = "ListenSyslog";
// Supported Properties
static core::Property RecvBufSize;
static core::Property MaxSocketBufSize;
static core::Property MaxConnections;
static core::Property MaxBatchSize;
static core::Property MessageDelimiter;
static core::Property ParseMessages;
static core::Property Protocol;
static core::Property Port;
// Supported Relationships
static core::Relationship Success;
static core::Relationship Invalid;
// Nest Callback Class for write stream
class WriteCallback : public OutputStreamCallback {
public:
WriteCallback(char *data, uint64_t size)
: _data(reinterpret_cast<uint8_t*>(data)),
_dataSize(size) {
}
uint8_t *_data;
uint64_t _dataSize;
int64_t process(std::shared_ptr<io::BaseStream> stream) {
int64_t ret = 0;
if (_data && _dataSize > 0)
ret = stream->write(_data, _dataSize);
return ret;
}
};
public:
// OnTrigger method, implemented by NiFi ListenSyslog
virtual void onTrigger(core::ProcessContext *context, core::ProcessSession *session);
// Initialize, over write by NiFi ListenSyslog
virtual void initialize(void);
protected:
private:
// Logger
std::shared_ptr<logging::Logger> logger_;
// Run function for the thread
static void run(ListenSyslog *process);
// Run Thread
void runThread();
// Queue for store syslog event
std::queue<SysLogEvent> _eventQueue;
// Size of Event queue in bytes
uint64_t _eventQueueByteSize;
// Get event queue size
uint64_t getEventQueueSize() {
std::lock_guard<std::mutex> lock(mutex_);
return _eventQueue.size();
}
// Get event queue byte size
uint64_t getEventQueueByteSize() {
std::lock_guard<std::mutex> lock(mutex_);
return _eventQueueByteSize;
}
// Whether the event queue is empty
bool isEventQueueEmpty() {
std::lock_guard<std::mutex> lock(mutex_);
return _eventQueue.empty();
}
// Put event into directory listing
void putEvent(uint8_t *payload, uint64_t len) {
std::lock_guard<std::mutex> lock(mutex_);
SysLogEvent event;
event.payload = reinterpret_cast<char*>(payload);
event.len = len;
_eventQueue.push(event);
_eventQueueByteSize += len;
}
// Read \n terminated line from TCP socket
int readline(int fd, char *bufptr, size_t len);
// start server socket and handling client socket
void startSocketThread();
// Poll event
void pollEvent(std::queue<SysLogEvent> &list, int maxSize) {
std::lock_guard<std::mutex> lock(mutex_);
while (!_eventQueue.empty() && (maxSize == 0 || list.size() < maxSize)) {
SysLogEvent event = _eventQueue.front();
_eventQueue.pop();
_eventQueueByteSize -= event.len;
list.push(event);
}
return;
}
// Mutex for protection of the directory listing
std::mutex mutex_;
int64_t _recvBufSize;
int64_t _maxSocketBufSize;
int64_t _maxConnections;
int64_t _maxBatchSize;
std::string _messageDelimiter;
std::string _protocol;
int64_t _port;bool _parseMessages;
int _serverSocket;
std::vector<int> _clientSockets;
int _maxFds;
fd_set _readfds;
// thread
std::thread *_thread;
// whether to reset the server socket
bool _resetServerSocket;bool _serverTheadRunning;
// buffer for read socket
char _buffer[2048];
};
REGISTER_RESOURCE(ListenSyslog);
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */
#endif