MINIFI-85: Add ListenSyslog processor
This closes #6.
Signed-off-by: Aldrin Piri <aldrin@apache.org>
diff --git a/conf/flowListenSyslog.xml b/conf/flowListenSyslog.xml
new file mode 100644
index 0000000..8539bef
--- /dev/null
+++ b/conf/flowListenSyslog.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<flowController>
+ <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
+ <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
+ <rootGroup>
+ <id>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</id>
+ <name>NiFi Flow</name>
+ <position x="0.0" y="0.0"/>
+ <comment/>
+ <processor>
+ <id>291ee60c-0b91-4524-88c0-d71ee2498e02</id>
+ <name>ListenSyslog</name>
+ <position x="2489.369384765625" y="788.25244140625"/>
+ <styles/>
+ <comment/>
+ <class>org.apache.nifi.processors.standard.ListenSyslog</class>
+ <maxConcurrentTasks>1</maxConcurrentTasks>
+ <schedulingPeriod>0 sec</schedulingPeriod>
+ <penalizationPeriod>30 sec</penalizationPeriod>
+ <yieldPeriod>1 sec</yieldPeriod>
+ <bulletinLevel>WARN</bulletinLevel>
+ <lossTolerant>false</lossTolerant>
+ <scheduledState>RUNNING</scheduledState>
+ <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+ <runDurationNanos>0</runDurationNanos>
+ <property>
+ <name>Protocol</name>
+ <value>UDP</value>
+ </property>
+ <property>
+ <name>Port</name>
+ <value>514</value>
+ </property>
+ <property>
+ <name>SSL Context Service</name>
+ </property>
+ <property>
+ <name>Receive Buffer Size</name>
+ <value>65507 B</value>
+ </property>
+ <property>
+ <name>Max Size of Socket Buffer</name>
+ <value>1 MB</value>
+ </property>
+ <property>
+ <name>Max Number of TCP Connections</name>
+ <value>2</value>
+ </property>
+ <property>
+ <name>Max Batch Size</name>
+ <value>10</value>
+ </property>
+ <property>
+ <name>Message Delimiter</name>
+ <value>\n</value>
+ </property>
+ <property>
+ <name>Parse Messages</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>Character Set</name>
+ <value>UTF-8</value>
+ </property>
+ <autoTerminatedRelationship>invalid</autoTerminatedRelationship>
+ </processor>
+ <processor>
+ <id>12e3dece-dde5-44a2-8691-6d6bb2fab147</id>
+ <name>LogAttribute</name>
+ <position x="3236.369384765625" y="830.25244140625"/>
+ <styles/>
+ <comment/>
+ <class>org.apache.nifi.processors.standard.LogAttribute</class>
+ <maxConcurrentTasks>1</maxConcurrentTasks>
+ <schedulingPeriod>0 sec</schedulingPeriod>
+ <penalizationPeriod>30 sec</penalizationPeriod>
+ <yieldPeriod>1 sec</yieldPeriod>
+ <bulletinLevel>WARN</bulletinLevel>
+ <lossTolerant>false</lossTolerant>
+ <scheduledState>RUNNING</scheduledState>
+ <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+ <runDurationNanos>0</runDurationNanos>
+ <property>
+ <name>Log Level</name>
+ <value>info</value>
+ </property>
+ <property>
+ <name>Log Payload</name>
+ <value>false</value>
+ </property>
+ <property>
+ <name>Attributes to Log</name>
+ </property>
+ <property>
+ <name>Attributes to Ignore</name>
+ </property>
+ <property>
+ <name>Log prefix</name>
+ </property>
+ <autoTerminatedRelationship>success</autoTerminatedRelationship>
+ </processor>
+ <connection>
+ <id>c9e1cc50-2bc7-490d-9b5d-8c5dbc95a850</id>
+ <name/>
+ <bendPoints/>
+ <labelIndex>1</labelIndex>
+ <zIndex>0</zIndex>
+ <sourceId>291ee60c-0b91-4524-88c0-d71ee2498e02</sourceId>
+ <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId>
+ <sourceType>PROCESSOR</sourceType>
+ <destinationId>12e3dece-dde5-44a2-8691-6d6bb2fab147</destinationId>
+ <destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId>
+ <destinationType>PROCESSOR</destinationType>
+ <relationship>success</relationship>
+ <maxWorkQueueSize>0</maxWorkQueueSize>
+ <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
+ <flowFileExpiration>0 sec</flowFileExpiration>
+ </connection>
+ </rootGroup>
+ <controllerServices/>
+ <reportingTasks/>
+</flowController>
diff --git a/inc/FlowControlProtocol.h b/inc/FlowControlProtocol.h
index 24416f2..23f2d49 100644
--- a/inc/FlowControlProtocol.h
+++ b/inc/FlowControlProtocol.h
@@ -217,6 +217,8 @@
close(_socket);
if (_reportBlob)
delete [] _reportBlob;
+ if (this->_thread)
+ delete this->_thread;
}
public:
diff --git a/inc/FlowController.h b/inc/FlowController.h
index 1d3b2f8..13f7dff 100644
--- a/inc/FlowController.h
+++ b/inc/FlowController.h
@@ -49,6 +49,7 @@
#include "RemoteProcessorGroupPort.h"
#include "GetFile.h"
#include "TailFile.h"
+#include "ListenSyslog.h"
//! Default NiFi Root Group Name
#define DEFAULT_ROOT_GROUP_NAME ""
diff --git a/inc/ListenSyslog.h b/inc/ListenSyslog.h
new file mode 100644
index 0000000..81bc92c
--- /dev/null
+++ b/inc/ListenSyslog.h
@@ -0,0 +1,209 @@
+/**
+ * @file ListenSyslog.h
+ * ListenSyslog class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LISTEN_SYSLOG_H__
+#define __LISTEN_SYSLOG_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <errno.h>
+#include <sys/select.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <chrono>
+#include <thread>
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+
+//! SyslogEvent
+typedef struct {
+ uint8_t *payload;
+ uint64_t len;
+} SysLogEvent;
+
+//! ListenSyslog Class
+class ListenSyslog : public Processor
+{
+public:
+ //! Constructor
+ /*!
+ * Create a new processor
+ */
+ ListenSyslog(std::string name, uuid_t uuid = NULL)
+ : Processor(name, uuid)
+ {
+ _logger = Logger::getLogger();
+ _eventQueueByteSize = 0;
+ _serverSocket = 0;
+ _recvBufSize = 65507;
+ _maxSocketBufSize = 1024*1024;
+ _maxConnections = 2;
+ _maxBatchSize = 1;
+ _messageDelimiter = "\n";
+ _protocol = "UDP";
+ _port = 514;
+ _parseMessages = false;
+ _serverSocket = 0;
+ _maxFds = 0;
+ FD_ZERO(&_readfds);
+ _thread = NULL;
+ _resetServerSocket = false;
+ _serverTheadRunning = false;
+ }
+ //! Destructor
+ virtual ~ListenSyslog()
+ {
+ _serverTheadRunning = false;
+ if (this->_thread)
+ delete this->_thread;
+ // need to reset the socket
+ std::vector<int>::iterator it;
+ for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it)
+ {
+ int clientSocket = *it;
+ close(clientSocket);
+ }
+ _clientSockets.clear();
+ if (_serverSocket > 0)
+ {
+ _logger->log_info("ListenSysLog Server socket %d close", _serverSocket);
+ close(_serverSocket);
+ _serverSocket = 0;
+ }
+ }
+ //! Processor Name
+ static const std::string ProcessorName;
+ //! Supported Properties
+ static Property RecvBufSize;
+ static Property MaxSocketBufSize;
+ static Property MaxConnections;
+ static Property MaxBatchSize;
+ static Property MessageDelimiter;
+ static Property ParseMessages;
+ static Property Protocol;
+ static Property Port;
+ //! Supported Relationships
+ static Relationship Success;
+ static Relationship Invalid;
+ //! Nest Callback Class for write stream
+ class WriteCallback : public OutputStreamCallback
+ {
+ public:
+ WriteCallback(char *data, uint64_t size)
+ : _data(data), _dataSize(size) {}
+ char *_data;
+ uint64_t _dataSize;
+ void process(std::ofstream *stream) {
+ if (_data && _dataSize > 0)
+ stream->write(_data, _dataSize);
+ }
+ };
+
+public:
+ //! OnTrigger method, implemented by NiFi ListenSyslog
+ virtual void onTrigger(ProcessContext *context, ProcessSession *session);
+ //! Initialize, over write by NiFi ListenSyslog
+ virtual void initialize(void);
+
+protected:
+
+private:
+ //! Logger
+ Logger *_logger;
+ //! Run function for the thread
+ static void run(ListenSyslog *process);
+ //! Run Thread
+ void runThread();
+ //! Queue for store syslog event
+ std::queue<SysLogEvent> _eventQueue;
+ //! Size of Event queue in bytes
+ uint64_t _eventQueueByteSize;
+ //! Get event queue size
+ uint64_t getEventQueueSize() {
+ std::lock_guard<std::mutex> lock(_mtx);
+ return _eventQueue.size();
+ }
+ //! Get event queue byte size
+ uint64_t getEventQueueByteSize() {
+ std::lock_guard<std::mutex> lock(_mtx);
+ return _eventQueueByteSize;
+ }
+ //! Whether the event queue is empty
+ bool isEventQueueEmpty()
+ {
+ std::lock_guard<std::mutex> lock(_mtx);
+ return _eventQueue.empty();
+ }
+ //! Put event into directory listing
+ void putEvent(uint8_t *payload, uint64_t len)
+ {
+ std::lock_guard<std::mutex> lock(_mtx);
+ SysLogEvent event;
+ event.payload = payload;
+ event.len = len;
+ _eventQueue.push(event);
+ _eventQueueByteSize += len;
+ }
+ //! Read \n terminated line from TCP socket
+ int readline( int fd, char *bufptr, size_t len );
+ //! start server socket and handling client socket
+ void startSocketThread();
+ //! Poll event
+ void pollEvent(std::queue<SysLogEvent> &list, int maxSize)
+ {
+ std::lock_guard<std::mutex> lock(_mtx);
+
+ while (!_eventQueue.empty() && (maxSize == 0 || list.size() < maxSize))
+ {
+ SysLogEvent event = _eventQueue.front();
+ _eventQueue.pop();
+ _eventQueueByteSize -= event.len;
+ list.push(event);
+ }
+ return;
+ }
+ //! Mutex for protection of the directory listing
+ std::mutex _mtx;
+ int64_t _recvBufSize;
+ int64_t _maxSocketBufSize;
+ int64_t _maxConnections;
+ int64_t _maxBatchSize;
+ std::string _messageDelimiter;
+ std::string _protocol;
+ int64_t _port;
+ bool _parseMessages;
+ int _serverSocket;
+ std::vector<int> _clientSockets;
+ int _maxFds;
+ fd_set _readfds;
+ //! thread
+ std::thread *_thread;
+ //! whether to reset the server socket
+ bool _resetServerSocket;
+ bool _serverTheadRunning;
+ //! buffer for read socket
+ uint8_t _buffer[2048];
+};
+
+#endif
diff --git a/src/FlowController.cpp b/src/FlowController.cpp
index f53146e..c01c385 100644
--- a/src/FlowController.cpp
+++ b/src/FlowController.cpp
@@ -148,6 +148,10 @@
{
processor = new TailFile(name, uuid);
}
+ else if (name == ListenSyslog::ProcessorName)
+ {
+ processor = new ListenSyslog(name, uuid);
+ }
else
{
_logger->log_error("No Processor defined for %s", name.c_str());
diff --git a/src/ListenSyslog.cpp b/src/ListenSyslog.cpp
new file mode 100644
index 0000000..090c988
--- /dev/null
+++ b/src/ListenSyslog.cpp
@@ -0,0 +1,345 @@
+/**
+ * @file ListenSyslog.cpp
+ * ListenSyslog class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <queue>
+#include <stdio.h>
+#include <string>
+#include "TimeUtil.h"
+#include "ListenSyslog.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string ListenSyslog::ProcessorName("ListenSyslog");
+Property ListenSyslog::RecvBufSize("Receive Buffer Size", "The size of each buffer used to receive Syslog messages.", "65507 B");
+Property ListenSyslog::MaxSocketBufSize("Max Size of Socket Buffer", "The maximum size of the socket buffer that should be used.", "1 MB");
+Property ListenSyslog::MaxConnections("Max Number of TCP Connections", "The maximum number of concurrent connections to accept Syslog messages in TCP mode.", "2");
+Property ListenSyslog::MaxBatchSize("Max Batch Size",
+ "The maximum number of Syslog events to add to a single FlowFile.", "1");
+Property ListenSyslog::MessageDelimiter("Message Delimiter",
+ "Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> property).", "\n");
+Property ListenSyslog::ParseMessages("Parse Messages",
+ "Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.", "false");
+Property ListenSyslog::Protocol("Protocol", "The protocol for Syslog communication.", "UDP");
+Property ListenSyslog::Port("Port", "The port for Syslog communication.", "514");
+Relationship ListenSyslog::Success("success", "All files are routed to success");
+Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid");
+
+void ListenSyslog::initialize()
+{
+ //! Set the supported properties
+ std::set<Property> properties;
+ properties.insert(RecvBufSize);
+ properties.insert(MaxSocketBufSize);
+ properties.insert(MaxConnections);
+ properties.insert(MaxBatchSize);
+ properties.insert(MessageDelimiter);
+ properties.insert(ParseMessages);
+ properties.insert(Protocol);
+ properties.insert(Port);
+ setSupportedProperties(properties);
+ //! Set the supported relationships
+ std::set<Relationship> relationships;
+ relationships.insert(Success);
+ relationships.insert(Invalid);
+ setSupportedRelationships(relationships);
+}
+
+void ListenSyslog::startSocketThread()
+{
+ if (_thread != NULL)
+ return;
+
+ _logger->log_info("ListenSysLog Socket Thread Start");
+ _serverTheadRunning = true;
+ _thread = new std::thread(run, this);
+ _thread->detach();
+}
+
+void ListenSyslog::run(ListenSyslog *process)
+{
+ process->runThread();
+}
+
+void ListenSyslog::runThread()
+{
+ while (_serverTheadRunning)
+ {
+ if (_resetServerSocket)
+ {
+ _resetServerSocket = false;
+ // need to reset the socket
+ std::vector<int>::iterator it;
+ for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it)
+ {
+ int clientSocket = *it;
+ close(clientSocket);
+ }
+ _clientSockets.clear();
+ if (_serverSocket > 0)
+ {
+ close(_serverSocket);
+ _serverSocket = 0;
+ }
+ }
+
+ if (_serverSocket <= 0)
+ {
+ uint16_t portno = _port;
+ struct sockaddr_in serv_addr;
+ int sockfd;
+ if (_protocol == "TCP")
+ sockfd = socket(AF_INET, SOCK_STREAM, 0);
+ else
+ sockfd = socket(AF_INET, SOCK_DGRAM, 0);
+ if (sockfd < 0)
+ {
+ _logger->log_info("ListenSysLog Server socket creation failed");
+ break;
+ }
+ bzero((char *) &serv_addr, sizeof(serv_addr));
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_addr.s_addr = INADDR_ANY;
+ serv_addr.sin_port = htons(portno);
+ if (bind(sockfd, (struct sockaddr *) &serv_addr,
+ sizeof(serv_addr)) < 0)
+ {
+ _logger->log_error("ListenSysLog Server socket bind failed");
+ break;
+ }
+ if (_protocol == "TCP")
+ listen(sockfd,5);
+ _serverSocket = sockfd;
+ _logger->log_error("ListenSysLog Server socket %d bind OK to port %d", _serverSocket, portno);
+ }
+ FD_ZERO(&_readfds);
+ FD_SET(_serverSocket, &_readfds);
+ _maxFds = _serverSocket;
+ std::vector<int>::iterator it;
+ for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it)
+ {
+ int clientSocket = *it;
+ if (clientSocket >= _maxFds)
+ _maxFds = clientSocket;
+ FD_SET(clientSocket, &_readfds);
+ }
+ fd_set fds;
+ struct timeval tv;
+ int retval;
+ fds = _readfds;
+ tv.tv_sec = 0;
+ // 100 msec
+ tv.tv_usec = 100000;
+ retval = select(_maxFds+1, &fds, NULL, NULL, &tv);
+ if (retval < 0)
+ break;
+ if (retval == 0)
+ continue;
+ if (FD_ISSET(_serverSocket, &fds))
+ {
+ // server socket, either we have UDP datagram or TCP connection request
+ if (_protocol == "TCP")
+ {
+ socklen_t clilen;
+ struct sockaddr_in cli_addr;
+ clilen = sizeof(cli_addr);
+ int newsockfd = accept(_serverSocket,
+ (struct sockaddr *) &cli_addr,
+ &clilen);
+ if (newsockfd > 0)
+ {
+ if (_clientSockets.size() < _maxConnections)
+ {
+ _clientSockets.push_back(newsockfd);
+ _logger->log_info("ListenSysLog new client socket %d connection", newsockfd);
+ continue;
+ }
+ else
+ {
+ close(newsockfd);
+ }
+ }
+ }
+ else
+ {
+ socklen_t clilen;
+ struct sockaddr_in cli_addr;
+ clilen = sizeof(cli_addr);
+ int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0,
+ (struct sockaddr *)&cli_addr, &clilen);
+ if (recvlen > 0 && (recvlen + getEventQueueByteSize()) <= _recvBufSize)
+ {
+ uint8_t *payload = new uint8_t[recvlen];
+ memcpy(payload, _buffer, recvlen);
+ putEvent(payload, recvlen);
+ }
+ }
+ }
+ it = _clientSockets.begin();
+ while (it != _clientSockets.end())
+ {
+ int clientSocket = *it;
+ if (FD_ISSET(clientSocket, &fds))
+ {
+ int recvlen = readline(clientSocket, (char *)_buffer, sizeof(_buffer));
+ if (recvlen <= 0)
+ {
+ close(clientSocket);
+ _logger->log_info("ListenSysLog client socket %d close", clientSocket);
+ it = _clientSockets.erase(it);
+ }
+ else
+ {
+ if ((recvlen + getEventQueueByteSize()) <= _recvBufSize)
+ {
+ uint8_t *payload = new uint8_t[recvlen];
+ memcpy(payload, _buffer, recvlen);
+ putEvent(payload, recvlen);
+ }
+ ++it;
+ }
+ }
+ }
+ }
+ return;
+}
+
+
+int ListenSyslog::readline( int fd, char *bufptr, size_t len )
+{
+ char *bufx = bufptr;
+ static char *bp;
+ static int cnt = 0;
+ static char b[ 2048 ];
+ char c;
+
+ while ( --len > 0 )
+ {
+ if ( --cnt <= 0 )
+ {
+ cnt = recv( fd, b, sizeof( b ), 0 );
+ if ( cnt < 0 )
+ {
+ if ( errno == EINTR )
+ {
+ len++; /* the while will decrement */
+ continue;
+ }
+ return -1;
+ }
+ if ( cnt == 0 )
+ return 0;
+ bp = b;
+ }
+ c = *bp++;
+ *bufptr++ = c;
+ if ( c == '\n' )
+ {
+ *bufptr = '\n';
+ return bufptr - bufx + 1;
+ }
+ }
+ return -1;
+}
+
+void ListenSyslog::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+ std::string value;
+ bool needResetServerSocket = false;
+ if (context->getProperty(Protocol.getName(), value))
+ {
+ if (_protocol != value)
+ needResetServerSocket = true;
+ _protocol = value;
+ }
+ if (context->getProperty(RecvBufSize.getName(), value))
+ {
+ Property::StringToInt(value, _recvBufSize);
+ }
+ if (context->getProperty(MaxSocketBufSize.getName(), value))
+ {
+ Property::StringToInt(value, _maxSocketBufSize);
+ }
+ if (context->getProperty(MaxConnections.getName(), value))
+ {
+ Property::StringToInt(value, _maxConnections);
+ }
+ if (context->getProperty(MessageDelimiter.getName(), value))
+ {
+ _messageDelimiter = value;
+ }
+ if (context->getProperty(ParseMessages.getName(), value))
+ {
+ Property::StringToBool(value, _parseMessages);
+ }
+ if (context->getProperty(Port.getName(), value))
+ {
+ int64_t oldPort = _port;
+ Property::StringToInt(value, _port);
+ if (_port != oldPort)
+ needResetServerSocket = true;
+ }
+ if (context->getProperty(MaxBatchSize.getName(), value))
+ {
+ Property::StringToInt(value, _maxBatchSize);
+ }
+
+ if (needResetServerSocket)
+ _resetServerSocket = true;
+
+ startSocketThread();
+
+ // read from the event queue
+ if (isEventQueueEmpty())
+ {
+ context->yield();
+ return;
+ }
+
+ std::queue<SysLogEvent> eventQueue;
+ pollEvent(eventQueue, _maxBatchSize);
+ bool firstEvent = true;
+ FlowFileRecord *flowFile = NULL;
+ while(!eventQueue.empty())
+ {
+ SysLogEvent event = eventQueue.front();
+ eventQueue.pop();
+ if (firstEvent)
+ {
+ flowFile = session->create();
+ if (!flowFile)
+ return;
+ ListenSyslog::WriteCallback callback((char *)event.payload, event.len);
+ session->write(flowFile, &callback);
+ delete[] event.payload;
+ firstEvent = false;
+ }
+ else
+ {
+ /*
+ ListenSyslog::WriteCallback callbackSep((char *)_messageDelimiter.data(), _messageDelimiter.size());
+ session->append(flowFile, &callbackSep); */
+ ListenSyslog::WriteCallback callback((char *)event.payload, event.len);
+ session->append(flowFile, &callback);
+ delete[] event.payload;
+ }
+ }
+ flowFile->addAttribute("syslog.protocol", _protocol);
+ flowFile->addAttribute("syslog.port", std::to_string(_port));
+ session->transfer(flowFile, Success);
+}