blob: ace37d775f5e7fa45cd423837ed4b4713a2bb7da [file] [log] [blame]
/**
* @file ListenSyslog.cpp
* ListenSyslog class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <queue>
#include <stdio.h>
#include <string>
#include "TimeUtil.h"
#include "ListenSyslog.h"
#include "ProcessContext.h"
#include "ProcessSession.h"
const std::string ListenSyslog::ProcessorName("ListenSyslog");
Property ListenSyslog::RecvBufSize("Receive Buffer Size", "The size of each buffer used to receive Syslog messages.", "65507 B");
Property ListenSyslog::MaxSocketBufSize("Max Size of Socket Buffer", "The maximum size of the socket buffer that should be used.", "1 MB");
Property ListenSyslog::MaxConnections("Max Number of TCP Connections", "The maximum number of concurrent connections to accept Syslog messages in TCP mode.", "2");
Property ListenSyslog::MaxBatchSize("Max Batch Size",
"The maximum number of Syslog events to add to a single FlowFile.", "1");
Property ListenSyslog::MessageDelimiter("Message Delimiter",
"Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> property).", "\n");
Property ListenSyslog::ParseMessages("Parse Messages",
"Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.", "false");
Property ListenSyslog::Protocol("Protocol", "The protocol for Syslog communication.", "UDP");
Property ListenSyslog::Port("Port", "The port for Syslog communication.", "514");
Relationship ListenSyslog::Success("success", "All files are routed to success");
Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid");
void ListenSyslog::initialize()
{
//! Set the supported properties
std::set<Property> properties;
properties.insert(RecvBufSize);
properties.insert(MaxSocketBufSize);
properties.insert(MaxConnections);
properties.insert(MaxBatchSize);
properties.insert(MessageDelimiter);
properties.insert(ParseMessages);
properties.insert(Protocol);
properties.insert(Port);
setSupportedProperties(properties);
//! Set the supported relationships
std::set<Relationship> relationships;
relationships.insert(Success);
relationships.insert(Invalid);
setSupportedRelationships(relationships);
}
void ListenSyslog::startSocketThread()
{
if (_thread != NULL)
return;
_logger->log_info("ListenSysLog Socket Thread Start");
_serverTheadRunning = true;
_thread = new std::thread(run, this);
_thread->detach();
}
void ListenSyslog::run(ListenSyslog *process)
{
process->runThread();
}
void ListenSyslog::runThread()
{
while (_serverTheadRunning)
{
if (_resetServerSocket)
{
_resetServerSocket = false;
// need to reset the socket
std::vector<int>::iterator it;
for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it)
{
int clientSocket = *it;
close(clientSocket);
}
_clientSockets.clear();
if (_serverSocket > 0)
{
close(_serverSocket);
_serverSocket = 0;
}
}
if (_serverSocket <= 0)
{
uint16_t portno = _port;
struct sockaddr_in serv_addr;
int sockfd;
if (_protocol == "TCP")
sockfd = socket(AF_INET, SOCK_STREAM, 0);
else
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0)
{
_logger->log_info("ListenSysLog Server socket creation failed");
break;
}
bzero((char *) &serv_addr, sizeof(serv_addr));
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(portno);
if (bind(sockfd, (struct sockaddr *) &serv_addr,
sizeof(serv_addr)) < 0)
{
_logger->log_error("ListenSysLog Server socket bind failed");
break;
}
if (_protocol == "TCP")
listen(sockfd,5);
_serverSocket = sockfd;
_logger->log_error("ListenSysLog Server socket %d bind OK to port %d", _serverSocket, portno);
}
FD_ZERO(&_readfds);
FD_SET(_serverSocket, &_readfds);
_maxFds = _serverSocket;
std::vector<int>::iterator it;
for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it)
{
int clientSocket = *it;
if (clientSocket >= _maxFds)
_maxFds = clientSocket;
FD_SET(clientSocket, &_readfds);
}
fd_set fds;
struct timeval tv;
int retval;
fds = _readfds;
tv.tv_sec = 0;
// 100 msec
tv.tv_usec = 100000;
retval = select(_maxFds+1, &fds, NULL, NULL, &tv);
if (retval < 0)
break;
if (retval == 0)
continue;
if (FD_ISSET(_serverSocket, &fds))
{
// server socket, either we have UDP datagram or TCP connection request
if (_protocol == "TCP")
{
socklen_t clilen;
struct sockaddr_in cli_addr;
clilen = sizeof(cli_addr);
int newsockfd = accept(_serverSocket,
(struct sockaddr *) &cli_addr,
&clilen);
if (newsockfd > 0)
{
if (_clientSockets.size() < _maxConnections)
{
_clientSockets.push_back(newsockfd);
_logger->log_info("ListenSysLog new client socket %d connection", newsockfd);
continue;
}
else
{
close(newsockfd);
}
}
}
else
{
socklen_t clilen;
struct sockaddr_in cli_addr;
clilen = sizeof(cli_addr);
int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0,
(struct sockaddr *)&cli_addr, &clilen);
if (recvlen > 0 && (recvlen + getEventQueueByteSize()) <= _recvBufSize)
{
uint8_t *payload = new uint8_t[recvlen];
memcpy(payload, _buffer, recvlen);
putEvent(payload, recvlen);
}
}
}
it = _clientSockets.begin();
while (it != _clientSockets.end())
{
int clientSocket = *it;
if (FD_ISSET(clientSocket, &fds))
{
int recvlen = readline(clientSocket, (char *)_buffer, sizeof(_buffer));
if (recvlen <= 0)
{
close(clientSocket);
_logger->log_info("ListenSysLog client socket %d close", clientSocket);
it = _clientSockets.erase(it);
}
else
{
if ((recvlen + getEventQueueByteSize()) <= _recvBufSize)
{
uint8_t *payload = new uint8_t[recvlen];
memcpy(payload, _buffer, recvlen);
putEvent(payload, recvlen);
}
++it;
}
}
}
}
return;
}
int ListenSyslog::readline( int fd, char *bufptr, size_t len )
{
char *bufx = bufptr;
static char *bp;
static int cnt = 0;
static char b[ 2048 ];
char c;
while ( --len > 0 )
{
if ( --cnt <= 0 )
{
cnt = recv( fd, b, sizeof( b ), 0 );
if ( cnt < 0 )
{
if ( errno == EINTR )
{
len++; /* the while will decrement */
continue;
}
return -1;
}
if ( cnt == 0 )
return 0;
bp = b;
}
c = *bp++;
*bufptr++ = c;
if ( c == '\n' )
{
*bufptr = '\n';
return bufptr - bufx + 1;
}
}
return -1;
}
void ListenSyslog::onTrigger(ProcessContext *context, ProcessSession *session)
{
std::string value;
bool needResetServerSocket = false;
if (context->getProperty(Protocol.getName(), value))
{
if (_protocol != value)
needResetServerSocket = true;
_protocol = value;
}
if (context->getProperty(RecvBufSize.getName(), value))
{
Property::StringToInt(value, _recvBufSize);
}
if (context->getProperty(MaxSocketBufSize.getName(), value))
{
Property::StringToInt(value, _maxSocketBufSize);
}
if (context->getProperty(MaxConnections.getName(), value))
{
Property::StringToInt(value, _maxConnections);
}
if (context->getProperty(MessageDelimiter.getName(), value))
{
_messageDelimiter = value;
}
if (context->getProperty(ParseMessages.getName(), value))
{
Property::StringToBool(value, _parseMessages);
}
if (context->getProperty(Port.getName(), value))
{
int64_t oldPort = _port;
Property::StringToInt(value, _port);
if (_port != oldPort)
needResetServerSocket = true;
}
if (context->getProperty(MaxBatchSize.getName(), value))
{
Property::StringToInt(value, _maxBatchSize);
}
if (needResetServerSocket)
_resetServerSocket = true;
startSocketThread();
// read from the event queue
if (isEventQueueEmpty())
{
context->yield();
return;
}
std::queue<SysLogEvent> eventQueue;
pollEvent(eventQueue, _maxBatchSize);
bool firstEvent = true;
FlowFileRecord *flowFile = NULL;
while(!eventQueue.empty())
{
SysLogEvent event = eventQueue.front();
eventQueue.pop();
if (firstEvent)
{
flowFile = session->create();
if (!flowFile)
return;
ListenSyslog::WriteCallback callback((char *)event.payload, event.len);
session->write(flowFile, &callback);
delete[] event.payload;
firstEvent = false;
}
else
{
ListenSyslog::WriteCallback callback((char *)event.payload, event.len);
session->append(flowFile, &callback);
delete[] event.payload;
}
}
flowFile->addAttribute("syslog.protocol", _protocol);
flowFile->addAttribute("syslog.port", std::to_string(_port));
session->transfer(flowFile, Success);
}