| /** |
| * @file ListenSyslog.cpp |
| * ListenSyslog class implementation |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include <queue> |
| #include <stdio.h> |
| #include <string> |
| #include "TimeUtil.h" |
| #include "ListenSyslog.h" |
| #include "ProcessContext.h" |
| #include "ProcessSession.h" |
| |
| const std::string ListenSyslog::ProcessorName("ListenSyslog"); |
| Property ListenSyslog::RecvBufSize("Receive Buffer Size", "The size of each buffer used to receive Syslog messages.", "65507 B"); |
| Property ListenSyslog::MaxSocketBufSize("Max Size of Socket Buffer", "The maximum size of the socket buffer that should be used.", "1 MB"); |
| Property ListenSyslog::MaxConnections("Max Number of TCP Connections", "The maximum number of concurrent connections to accept Syslog messages in TCP mode.", "2"); |
| Property ListenSyslog::MaxBatchSize("Max Batch Size", |
| "The maximum number of Syslog events to add to a single FlowFile.", "1"); |
| Property ListenSyslog::MessageDelimiter("Message Delimiter", |
| "Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> property).", "\n"); |
| Property ListenSyslog::ParseMessages("Parse Messages", |
| "Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.", "false"); |
| Property ListenSyslog::Protocol("Protocol", "The protocol for Syslog communication.", "UDP"); |
| Property ListenSyslog::Port("Port", "The port for Syslog communication.", "514"); |
| Relationship ListenSyslog::Success("success", "All files are routed to success"); |
| Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid"); |
| |
| void ListenSyslog::initialize() |
| { |
| //! Set the supported properties |
| std::set<Property> properties; |
| properties.insert(RecvBufSize); |
| properties.insert(MaxSocketBufSize); |
| properties.insert(MaxConnections); |
| properties.insert(MaxBatchSize); |
| properties.insert(MessageDelimiter); |
| properties.insert(ParseMessages); |
| properties.insert(Protocol); |
| properties.insert(Port); |
| setSupportedProperties(properties); |
| //! Set the supported relationships |
| std::set<Relationship> relationships; |
| relationships.insert(Success); |
| relationships.insert(Invalid); |
| setSupportedRelationships(relationships); |
| } |
| |
| void ListenSyslog::startSocketThread() |
| { |
| if (_thread != NULL) |
| return; |
| |
| _logger->log_info("ListenSysLog Socket Thread Start"); |
| _serverTheadRunning = true; |
| _thread = new std::thread(run, this); |
| _thread->detach(); |
| } |
| |
| void ListenSyslog::run(ListenSyslog *process) |
| { |
| process->runThread(); |
| } |
| |
| void ListenSyslog::runThread() |
| { |
| while (_serverTheadRunning) |
| { |
| if (_resetServerSocket) |
| { |
| _resetServerSocket = false; |
| // need to reset the socket |
| std::vector<int>::iterator it; |
| for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) |
| { |
| int clientSocket = *it; |
| close(clientSocket); |
| } |
| _clientSockets.clear(); |
| if (_serverSocket > 0) |
| { |
| close(_serverSocket); |
| _serverSocket = 0; |
| } |
| } |
| |
| if (_serverSocket <= 0) |
| { |
| uint16_t portno = _port; |
| struct sockaddr_in serv_addr; |
| int sockfd; |
| if (_protocol == "TCP") |
| sockfd = socket(AF_INET, SOCK_STREAM, 0); |
| else |
| sockfd = socket(AF_INET, SOCK_DGRAM, 0); |
| if (sockfd < 0) |
| { |
| _logger->log_info("ListenSysLog Server socket creation failed"); |
| break; |
| } |
| bzero((char *) &serv_addr, sizeof(serv_addr)); |
| serv_addr.sin_family = AF_INET; |
| serv_addr.sin_addr.s_addr = INADDR_ANY; |
| serv_addr.sin_port = htons(portno); |
| if (bind(sockfd, (struct sockaddr *) &serv_addr, |
| sizeof(serv_addr)) < 0) |
| { |
| _logger->log_error("ListenSysLog Server socket bind failed"); |
| break; |
| } |
| if (_protocol == "TCP") |
| listen(sockfd,5); |
| _serverSocket = sockfd; |
| _logger->log_error("ListenSysLog Server socket %d bind OK to port %d", _serverSocket, portno); |
| } |
| FD_ZERO(&_readfds); |
| FD_SET(_serverSocket, &_readfds); |
| _maxFds = _serverSocket; |
| std::vector<int>::iterator it; |
| for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it) |
| { |
| int clientSocket = *it; |
| if (clientSocket >= _maxFds) |
| _maxFds = clientSocket; |
| FD_SET(clientSocket, &_readfds); |
| } |
| fd_set fds; |
| struct timeval tv; |
| int retval; |
| fds = _readfds; |
| tv.tv_sec = 0; |
| // 100 msec |
| tv.tv_usec = 100000; |
| retval = select(_maxFds+1, &fds, NULL, NULL, &tv); |
| if (retval < 0) |
| break; |
| if (retval == 0) |
| continue; |
| if (FD_ISSET(_serverSocket, &fds)) |
| { |
| // server socket, either we have UDP datagram or TCP connection request |
| if (_protocol == "TCP") |
| { |
| socklen_t clilen; |
| struct sockaddr_in cli_addr; |
| clilen = sizeof(cli_addr); |
| int newsockfd = accept(_serverSocket, |
| (struct sockaddr *) &cli_addr, |
| &clilen); |
| if (newsockfd > 0) |
| { |
| if (_clientSockets.size() < _maxConnections) |
| { |
| _clientSockets.push_back(newsockfd); |
| _logger->log_info("ListenSysLog new client socket %d connection", newsockfd); |
| continue; |
| } |
| else |
| { |
| close(newsockfd); |
| } |
| } |
| } |
| else |
| { |
| socklen_t clilen; |
| struct sockaddr_in cli_addr; |
| clilen = sizeof(cli_addr); |
| int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0, |
| (struct sockaddr *)&cli_addr, &clilen); |
| if (recvlen > 0 && (recvlen + getEventQueueByteSize()) <= _recvBufSize) |
| { |
| uint8_t *payload = new uint8_t[recvlen]; |
| memcpy(payload, _buffer, recvlen); |
| putEvent(payload, recvlen); |
| } |
| } |
| } |
| it = _clientSockets.begin(); |
| while (it != _clientSockets.end()) |
| { |
| int clientSocket = *it; |
| if (FD_ISSET(clientSocket, &fds)) |
| { |
| int recvlen = readline(clientSocket, (char *)_buffer, sizeof(_buffer)); |
| if (recvlen <= 0) |
| { |
| close(clientSocket); |
| _logger->log_info("ListenSysLog client socket %d close", clientSocket); |
| it = _clientSockets.erase(it); |
| } |
| else |
| { |
| if ((recvlen + getEventQueueByteSize()) <= _recvBufSize) |
| { |
| uint8_t *payload = new uint8_t[recvlen]; |
| memcpy(payload, _buffer, recvlen); |
| putEvent(payload, recvlen); |
| } |
| ++it; |
| } |
| } |
| } |
| } |
| return; |
| } |
| |
| |
| int ListenSyslog::readline( int fd, char *bufptr, size_t len ) |
| { |
| char *bufx = bufptr; |
| static char *bp; |
| static int cnt = 0; |
| static char b[ 2048 ]; |
| char c; |
| |
| while ( --len > 0 ) |
| { |
| if ( --cnt <= 0 ) |
| { |
| cnt = recv( fd, b, sizeof( b ), 0 ); |
| if ( cnt < 0 ) |
| { |
| if ( errno == EINTR ) |
| { |
| len++; /* the while will decrement */ |
| continue; |
| } |
| return -1; |
| } |
| if ( cnt == 0 ) |
| return 0; |
| bp = b; |
| } |
| c = *bp++; |
| *bufptr++ = c; |
| if ( c == '\n' ) |
| { |
| *bufptr = '\n'; |
| return bufptr - bufx + 1; |
| } |
| } |
| return -1; |
| } |
| |
| void ListenSyslog::onTrigger(ProcessContext *context, ProcessSession *session) |
| { |
| std::string value; |
| bool needResetServerSocket = false; |
| if (context->getProperty(Protocol.getName(), value)) |
| { |
| if (_protocol != value) |
| needResetServerSocket = true; |
| _protocol = value; |
| } |
| if (context->getProperty(RecvBufSize.getName(), value)) |
| { |
| Property::StringToInt(value, _recvBufSize); |
| } |
| if (context->getProperty(MaxSocketBufSize.getName(), value)) |
| { |
| Property::StringToInt(value, _maxSocketBufSize); |
| } |
| if (context->getProperty(MaxConnections.getName(), value)) |
| { |
| Property::StringToInt(value, _maxConnections); |
| } |
| if (context->getProperty(MessageDelimiter.getName(), value)) |
| { |
| _messageDelimiter = value; |
| } |
| if (context->getProperty(ParseMessages.getName(), value)) |
| { |
| Property::StringToBool(value, _parseMessages); |
| } |
| if (context->getProperty(Port.getName(), value)) |
| { |
| int64_t oldPort = _port; |
| Property::StringToInt(value, _port); |
| if (_port != oldPort) |
| needResetServerSocket = true; |
| } |
| if (context->getProperty(MaxBatchSize.getName(), value)) |
| { |
| Property::StringToInt(value, _maxBatchSize); |
| } |
| |
| if (needResetServerSocket) |
| _resetServerSocket = true; |
| |
| startSocketThread(); |
| |
| // read from the event queue |
| if (isEventQueueEmpty()) |
| { |
| context->yield(); |
| return; |
| } |
| |
| std::queue<SysLogEvent> eventQueue; |
| pollEvent(eventQueue, _maxBatchSize); |
| bool firstEvent = true; |
| FlowFileRecord *flowFile = NULL; |
| while(!eventQueue.empty()) |
| { |
| SysLogEvent event = eventQueue.front(); |
| eventQueue.pop(); |
| if (firstEvent) |
| { |
| flowFile = session->create(); |
| if (!flowFile) |
| return; |
| ListenSyslog::WriteCallback callback((char *)event.payload, event.len); |
| session->write(flowFile, &callback); |
| delete[] event.payload; |
| firstEvent = false; |
| } |
| else |
| { |
| ListenSyslog::WriteCallback callback((char *)event.payload, event.len); |
| session->append(flowFile, &callback); |
| delete[] event.payload; |
| } |
| } |
| flowFile->addAttribute("syslog.protocol", _protocol); |
| flowFile->addAttribute("syslog.port", std::to_string(_port)); |
| session->transfer(flowFile, Success); |
| } |