MINIFI-85: Add ListenSyslog processor

This closes #6.

Signed-off-by: Aldrin Piri <aldrin@apache.org>
diff --git a/conf/flowListenSyslog.xml b/conf/flowListenSyslog.xml
new file mode 100644
index 0000000..8539bef
--- /dev/null
+++ b/conf/flowListenSyslog.xml
@@ -0,0 +1,138 @@
+<?xml version="1.0" encoding="UTF-8" standalone="no"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<flowController>
+  <maxTimerDrivenThreadCount>10</maxTimerDrivenThreadCount>
+  <maxEventDrivenThreadCount>5</maxEventDrivenThreadCount>
+  <rootGroup>
+    <id>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</id>
+    <name>NiFi Flow</name>
+    <position x="0.0" y="0.0"/>
+    <comment/>
+    <processor>
+      <id>291ee60c-0b91-4524-88c0-d71ee2498e02</id>
+      <name>ListenSyslog</name>
+      <position x="2489.369384765625" y="788.25244140625"/>
+      <styles/>
+      <comment/>
+      <class>org.apache.nifi.processors.standard.ListenSyslog</class>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+      <schedulingPeriod>0 sec</schedulingPeriod>
+      <penalizationPeriod>30 sec</penalizationPeriod>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <bulletinLevel>WARN</bulletinLevel>
+      <lossTolerant>false</lossTolerant>
+      <scheduledState>RUNNING</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <runDurationNanos>0</runDurationNanos>
+      <property>
+        <name>Protocol</name>
+        <value>UDP</value>
+      </property>
+      <property>
+        <name>Port</name>
+        <value>514</value>
+      </property>
+      <property>
+        <name>SSL Context Service</name>
+      </property>
+      <property>
+        <name>Receive Buffer Size</name>
+        <value>65507 B</value>
+      </property>
+      <property>
+        <name>Max Size of Socket Buffer</name>
+        <value>1 MB</value>
+      </property>
+      <property>
+        <name>Max Number of TCP Connections</name>
+        <value>2</value>
+      </property>
+      <property>
+        <name>Max Batch Size</name>
+        <value>10</value>
+      </property>
+      <property>
+        <name>Message Delimiter</name>
+        <value>\n</value>
+      </property>
+      <property>
+        <name>Parse Messages</name>
+        <value>false</value>
+      </property>
+      <property>
+        <name>Character Set</name>
+        <value>UTF-8</value>
+      </property>
+      <autoTerminatedRelationship>invalid</autoTerminatedRelationship>
+    </processor>
+    <processor>
+      <id>12e3dece-dde5-44a2-8691-6d6bb2fab147</id>
+      <name>LogAttribute</name>
+      <position x="3236.369384765625" y="830.25244140625"/>
+      <styles/>
+      <comment/>
+      <class>org.apache.nifi.processors.standard.LogAttribute</class>
+      <maxConcurrentTasks>1</maxConcurrentTasks>
+      <schedulingPeriod>0 sec</schedulingPeriod>
+      <penalizationPeriod>30 sec</penalizationPeriod>
+      <yieldPeriod>1 sec</yieldPeriod>
+      <bulletinLevel>WARN</bulletinLevel>
+      <lossTolerant>false</lossTolerant>
+      <scheduledState>RUNNING</scheduledState>
+      <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy>
+      <runDurationNanos>0</runDurationNanos>
+      <property>
+        <name>Log Level</name>
+        <value>info</value>
+      </property>
+      <property>
+        <name>Log Payload</name>
+        <value>false</value>
+      </property>
+      <property>
+        <name>Attributes to Log</name>
+      </property>
+      <property>
+        <name>Attributes to Ignore</name>
+      </property>
+      <property>
+        <name>Log prefix</name>
+      </property>
+      <autoTerminatedRelationship>success</autoTerminatedRelationship>
+    </processor>
+    <connection>
+      <id>c9e1cc50-2bc7-490d-9b5d-8c5dbc95a850</id>
+      <name/>
+      <bendPoints/>
+      <labelIndex>1</labelIndex>
+      <zIndex>0</zIndex>
+      <sourceId>291ee60c-0b91-4524-88c0-d71ee2498e02</sourceId>
+      <sourceGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</sourceGroupId>
+      <sourceType>PROCESSOR</sourceType>
+      <destinationId>12e3dece-dde5-44a2-8691-6d6bb2fab147</destinationId>
+      <destinationGroupId>fe4a3a42-53b6-4af1-a80d-6fdfe60de97f</destinationGroupId>
+      <destinationType>PROCESSOR</destinationType>
+      <relationship>success</relationship>
+      <maxWorkQueueSize>0</maxWorkQueueSize>
+      <maxWorkQueueDataSize>0 MB</maxWorkQueueDataSize>
+      <flowFileExpiration>0 sec</flowFileExpiration>
+    </connection>
+  </rootGroup>
+  <controllerServices/>
+  <reportingTasks/>
+</flowController>
diff --git a/inc/FlowControlProtocol.h b/inc/FlowControlProtocol.h
index 24416f2..23f2d49 100644
--- a/inc/FlowControlProtocol.h
+++ b/inc/FlowControlProtocol.h
@@ -217,6 +217,8 @@
 			close(_socket);
 		if (_reportBlob)
 			delete [] _reportBlob;
+		if (this->_thread)
+			delete this->_thread;
 	}
 
 public:
diff --git a/inc/FlowController.h b/inc/FlowController.h
index 1d3b2f8..13f7dff 100644
--- a/inc/FlowController.h
+++ b/inc/FlowController.h
@@ -49,6 +49,7 @@
 #include "RemoteProcessorGroupPort.h"
 #include "GetFile.h"
 #include "TailFile.h"
+#include "ListenSyslog.h"
 
 //! Default NiFi Root Group Name
 #define DEFAULT_ROOT_GROUP_NAME ""
diff --git a/inc/ListenSyslog.h b/inc/ListenSyslog.h
new file mode 100644
index 0000000..81bc92c
--- /dev/null
+++ b/inc/ListenSyslog.h
@@ -0,0 +1,209 @@
+/**
+ * @file ListenSyslog.h
+ * ListenSyslog class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LISTEN_SYSLOG_H__
+#define __LISTEN_SYSLOG_H__
+
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <errno.h>
+#include <sys/select.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <chrono>
+#include <thread>
+#include "FlowFileRecord.h"
+#include "Processor.h"
+#include "ProcessSession.h"
+
+//! SyslogEvent
+typedef struct {
+	uint8_t *payload;
+	uint64_t len;
+} SysLogEvent;
+
+//! ListenSyslog Class
+class ListenSyslog : public Processor
+{
+public:
+	//! Constructor
+	/*!
+	 * Create a new processor
+	 */
+	ListenSyslog(std::string name, uuid_t uuid = NULL)
+	: Processor(name, uuid)
+	{
+		_logger = Logger::getLogger();
+		_eventQueueByteSize = 0;
+		_serverSocket = 0;
+		_recvBufSize = 65507;
+		_maxSocketBufSize = 1024*1024;
+		_maxConnections = 2;
+		_maxBatchSize = 1;
+		_messageDelimiter = "\n";
+		_protocol = "UDP";
+		_port = 514;
+		_parseMessages = false;
+		_serverSocket = 0;
+		_maxFds = 0;
+		FD_ZERO(&_readfds);
+		_thread = NULL;
+		_resetServerSocket = false;
+		_serverTheadRunning = false;
+	}
+	//! Destructor
+	virtual ~ListenSyslog()
+	{
+		_serverTheadRunning = false;
+		if (this->_thread)
+			delete this->_thread;
+		// need to reset the socket
+		std::vector<int>::iterator it;
+		for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it)
+		{
+			int clientSocket = *it;
+			close(clientSocket);
+		}
+		_clientSockets.clear();
+		if (_serverSocket > 0)
+		{
+			_logger->log_info("ListenSysLog Server socket %d close", _serverSocket);
+			close(_serverSocket);
+			_serverSocket = 0;
+		}
+	}
+	//! Processor Name
+	static const std::string ProcessorName;
+	//! Supported Properties
+	static Property RecvBufSize;
+	static Property MaxSocketBufSize;
+	static Property MaxConnections;
+	static Property MaxBatchSize;
+	static Property MessageDelimiter;
+	static Property ParseMessages;
+	static Property Protocol;
+	static Property Port;
+	//! Supported Relationships
+	static Relationship Success;
+	static Relationship Invalid;
+	//! Nest Callback Class for write stream
+	class WriteCallback : public OutputStreamCallback
+	{
+		public:
+		WriteCallback(char *data, uint64_t size)
+		: _data(data), _dataSize(size) {}
+		char *_data;
+		uint64_t _dataSize;
+		void process(std::ofstream *stream) {
+			if (_data && _dataSize > 0)
+				stream->write(_data, _dataSize);
+		}
+	};
+
+public:
+	//! OnTrigger method, implemented by NiFi ListenSyslog
+	virtual void onTrigger(ProcessContext *context, ProcessSession *session);
+	//! Initialize, over write by NiFi ListenSyslog
+	virtual void initialize(void);
+
+protected:
+
+private:
+	//! Logger
+	Logger *_logger;
+	//! Run function for the thread
+	static void run(ListenSyslog *process);
+	//! Run Thread
+	void runThread();
+	//! Queue for store syslog event
+	std::queue<SysLogEvent> _eventQueue;
+	//! Size of Event queue in bytes
+	uint64_t _eventQueueByteSize;
+	//! Get event queue size
+	uint64_t getEventQueueSize() {
+		std::lock_guard<std::mutex> lock(_mtx);
+		return _eventQueue.size();
+	}
+	//! Get event queue byte size
+	uint64_t getEventQueueByteSize() {
+		std::lock_guard<std::mutex> lock(_mtx);
+		return _eventQueueByteSize;
+	}
+	//! Whether the event queue  is empty
+	bool isEventQueueEmpty()
+	{
+		std::lock_guard<std::mutex> lock(_mtx);
+		return _eventQueue.empty();
+	}
+	//! Put event into directory listing
+	void putEvent(uint8_t *payload, uint64_t len)
+	{
+		std::lock_guard<std::mutex> lock(_mtx);
+		SysLogEvent event;
+		event.payload = payload;
+		event.len = len;
+		_eventQueue.push(event);
+		_eventQueueByteSize += len;
+	}
+	//! Read \n terminated line from TCP socket
+	int readline( int fd, char *bufptr, size_t len );
+	//! start server socket and handling client socket
+	void startSocketThread();
+	//! Poll event
+	void pollEvent(std::queue<SysLogEvent> &list, int maxSize)
+	{
+		std::lock_guard<std::mutex> lock(_mtx);
+
+		while (!_eventQueue.empty() && (maxSize == 0 || list.size() < maxSize))
+		{
+			SysLogEvent event = _eventQueue.front();
+			_eventQueue.pop();
+			_eventQueueByteSize -= event.len;
+			list.push(event);
+		}
+		return;
+	}
+	//! Mutex for protection of the directory listing
+	std::mutex _mtx;
+	int64_t _recvBufSize;
+	int64_t _maxSocketBufSize;
+	int64_t _maxConnections;
+	int64_t _maxBatchSize;
+	std::string _messageDelimiter;
+	std::string _protocol;
+	int64_t _port;
+	bool _parseMessages;
+	int _serverSocket;
+	std::vector<int> _clientSockets;
+	int _maxFds;
+	fd_set _readfds;
+	//! thread
+	std::thread *_thread;
+	//! whether to reset the server socket
+	bool _resetServerSocket;
+	bool _serverTheadRunning;
+	//! buffer for read socket
+	uint8_t _buffer[2048];
+};
+
+#endif
diff --git a/src/FlowController.cpp b/src/FlowController.cpp
index f53146e..c01c385 100644
--- a/src/FlowController.cpp
+++ b/src/FlowController.cpp
@@ -148,6 +148,10 @@
 	{
 		processor = new TailFile(name, uuid);
 	}
+	else if (name == ListenSyslog::ProcessorName)
+	{
+		processor = new ListenSyslog(name, uuid);
+	}
 	else
 	{
 		_logger->log_error("No Processor defined for %s", name.c_str());
diff --git a/src/ListenSyslog.cpp b/src/ListenSyslog.cpp
new file mode 100644
index 0000000..090c988
--- /dev/null
+++ b/src/ListenSyslog.cpp
@@ -0,0 +1,345 @@
+/**
+ * @file ListenSyslog.cpp
+ * ListenSyslog class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <queue>
+#include <stdio.h>
+#include <string>
+#include "TimeUtil.h"
+#include "ListenSyslog.h"
+#include "ProcessContext.h"
+#include "ProcessSession.h"
+
+const std::string ListenSyslog::ProcessorName("ListenSyslog");
+Property ListenSyslog::RecvBufSize("Receive Buffer Size", "The size of each buffer used to receive Syslog messages.", "65507 B");
+Property ListenSyslog::MaxSocketBufSize("Max Size of Socket Buffer", "The maximum size of the socket buffer that should be used.", "1 MB");
+Property ListenSyslog::MaxConnections("Max Number of TCP Connections", "The maximum number of concurrent connections to accept Syslog messages in TCP mode.", "2");
+Property ListenSyslog::MaxBatchSize("Max Batch Size",
+		"The maximum number of Syslog events to add to a single FlowFile.", "1");
+Property ListenSyslog::MessageDelimiter("Message Delimiter",
+		"Specifies the delimiter to place between Syslog messages when multiple messages are bundled together (see <Max Batch Size> property).", "\n");
+Property ListenSyslog::ParseMessages("Parse Messages",
+		"Indicates if the processor should parse the Syslog messages. If set to false, each outgoing FlowFile will only.", "false");
+Property ListenSyslog::Protocol("Protocol", "The protocol for Syslog communication.", "UDP");
+Property ListenSyslog::Port("Port", "The port for Syslog communication.", "514");
+Relationship ListenSyslog::Success("success", "All files are routed to success");
+Relationship ListenSyslog::Invalid("invalid", "SysLog message format invalid");
+
+void ListenSyslog::initialize()
+{
+	//! Set the supported properties
+	std::set<Property> properties;
+	properties.insert(RecvBufSize);
+	properties.insert(MaxSocketBufSize);
+	properties.insert(MaxConnections);
+	properties.insert(MaxBatchSize);
+	properties.insert(MessageDelimiter);
+	properties.insert(ParseMessages);
+	properties.insert(Protocol);
+	properties.insert(Port);
+	setSupportedProperties(properties);
+	//! Set the supported relationships
+	std::set<Relationship> relationships;
+	relationships.insert(Success);
+	relationships.insert(Invalid);
+	setSupportedRelationships(relationships);
+}
+
+void ListenSyslog::startSocketThread()
+{
+	if (_thread != NULL)
+		return;
+
+	_logger->log_info("ListenSysLog Socket Thread Start");
+	_serverTheadRunning = true;
+	_thread = new std::thread(run, this);
+	_thread->detach();
+}
+
+void ListenSyslog::run(ListenSyslog *process)
+{
+	process->runThread();
+}
+
+void ListenSyslog::runThread()
+{
+	while (_serverTheadRunning)
+	{
+		if (_resetServerSocket)
+		{
+			_resetServerSocket = false;
+			// need to reset the socket
+			std::vector<int>::iterator it;
+			for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it)
+			{
+				int clientSocket = *it;
+				close(clientSocket);
+			}
+			_clientSockets.clear();
+			if (_serverSocket > 0)
+			{
+				close(_serverSocket);
+				_serverSocket = 0;
+			}
+		}
+
+		if (_serverSocket <= 0)
+		{
+			uint16_t portno = _port;
+			struct sockaddr_in serv_addr;
+			int sockfd;
+			if (_protocol == "TCP")
+				sockfd = socket(AF_INET, SOCK_STREAM, 0);
+			else
+				sockfd = socket(AF_INET, SOCK_DGRAM, 0);
+			if (sockfd < 0)
+			{
+				_logger->log_info("ListenSysLog Server socket creation failed");
+				break;
+			}
+			bzero((char *) &serv_addr, sizeof(serv_addr));
+			serv_addr.sin_family = AF_INET;
+			serv_addr.sin_addr.s_addr = INADDR_ANY;
+			serv_addr.sin_port = htons(portno);
+			if (bind(sockfd, (struct sockaddr *) &serv_addr,
+					sizeof(serv_addr)) < 0)
+			{
+				_logger->log_error("ListenSysLog Server socket bind failed");
+				break;
+			}
+			if (_protocol == "TCP")
+				listen(sockfd,5);
+			_serverSocket = sockfd;
+			_logger->log_error("ListenSysLog Server socket %d bind OK to port %d", _serverSocket, portno);
+		}
+		FD_ZERO(&_readfds);
+		FD_SET(_serverSocket, &_readfds);
+		_maxFds = _serverSocket;
+		std::vector<int>::iterator it;
+		for (it = _clientSockets.begin(); it != _clientSockets.end(); ++it)
+		{
+			int clientSocket = *it;
+			if (clientSocket >= _maxFds)
+				_maxFds = clientSocket;
+			FD_SET(clientSocket, &_readfds);
+		}
+		fd_set fds;
+		struct timeval tv;
+		int retval;
+		fds = _readfds;
+		tv.tv_sec = 0;
+		// 100 msec
+		tv.tv_usec = 100000;
+		retval = select(_maxFds+1, &fds, NULL, NULL, &tv);
+		if (retval < 0)
+			break;
+		if (retval == 0)
+			continue;
+		if (FD_ISSET(_serverSocket, &fds))
+		{
+			// server socket, either we have UDP datagram or TCP connection request
+			if (_protocol == "TCP")
+			{
+				socklen_t clilen;
+				struct sockaddr_in cli_addr;
+				clilen = sizeof(cli_addr);
+				int newsockfd = accept(_serverSocket,
+						(struct sockaddr *) &cli_addr,
+						&clilen);
+				if (newsockfd > 0)
+				{
+					if (_clientSockets.size() < _maxConnections)
+					{
+						_clientSockets.push_back(newsockfd);
+						_logger->log_info("ListenSysLog new client socket %d connection", newsockfd);
+						continue;
+					}
+					else
+					{
+						close(newsockfd);
+					}
+				}
+			}
+			else
+			{
+				socklen_t clilen;
+				struct sockaddr_in cli_addr;
+				clilen = sizeof(cli_addr);
+				int recvlen = recvfrom(_serverSocket, _buffer, sizeof(_buffer), 0,
+						(struct sockaddr *)&cli_addr, &clilen);
+				if (recvlen > 0 && (recvlen + getEventQueueByteSize()) <= _recvBufSize)
+				{
+					uint8_t *payload = new uint8_t[recvlen];
+					memcpy(payload, _buffer, recvlen);
+					putEvent(payload, recvlen);
+				}
+			}
+		}
+		it = _clientSockets.begin();
+		while (it != _clientSockets.end())
+		{
+			int clientSocket = *it;
+			if (FD_ISSET(clientSocket, &fds))
+			{
+				int recvlen = readline(clientSocket, (char *)_buffer, sizeof(_buffer));
+				if (recvlen <= 0)
+				{
+					close(clientSocket);
+					_logger->log_info("ListenSysLog client socket %d close", clientSocket);
+					it = _clientSockets.erase(it);
+				}
+				else
+				{
+					if ((recvlen + getEventQueueByteSize()) <= _recvBufSize)
+					{
+						uint8_t *payload = new uint8_t[recvlen];
+						memcpy(payload, _buffer, recvlen);
+						putEvent(payload, recvlen);
+					}
+					++it;
+				}
+			}
+		}
+	}
+	return;
+}
+
+
+int ListenSyslog::readline( int fd, char *bufptr, size_t len )
+{
+	char *bufx = bufptr;
+	static char *bp;
+	static int cnt = 0;
+	static char b[ 2048 ];
+	char c;
+
+	while ( --len > 0 )
+    {
+      if ( --cnt <= 0 )
+      {
+    	  cnt = recv( fd, b, sizeof( b ), 0 );
+    	  if ( cnt < 0 )
+    	  {
+    		  if ( errno == EINTR )
+    		  {
+    			  len++;		/* the while will decrement */
+    			  continue;
+    		  }
+    		  return -1;
+    	  }
+    	  if ( cnt == 0 )
+    		  return 0;
+    	  bp = b;
+      }
+      c = *bp++;
+      *bufptr++ = c;
+      if ( c == '\n' )
+      {
+    	  *bufptr = '\n';
+    	  return bufptr - bufx + 1;
+      }
+    }
+	return -1;
+}
+
+void ListenSyslog::onTrigger(ProcessContext *context, ProcessSession *session)
+{
+	std::string value;
+	bool needResetServerSocket = false;
+	if (context->getProperty(Protocol.getName(), value))
+	{
+		if (_protocol != value)
+			needResetServerSocket = true;
+		_protocol = value;
+	}
+	if (context->getProperty(RecvBufSize.getName(), value))
+	{
+		Property::StringToInt(value, _recvBufSize);
+	}
+	if (context->getProperty(MaxSocketBufSize.getName(), value))
+	{
+		Property::StringToInt(value, _maxSocketBufSize);
+	}
+	if (context->getProperty(MaxConnections.getName(), value))
+	{
+		Property::StringToInt(value, _maxConnections);
+	}
+	if (context->getProperty(MessageDelimiter.getName(), value))
+	{
+		_messageDelimiter = value;
+	}
+	if (context->getProperty(ParseMessages.getName(), value))
+	{
+		Property::StringToBool(value, _parseMessages);
+	}
+	if (context->getProperty(Port.getName(), value))
+	{
+		int64_t oldPort = _port;
+		Property::StringToInt(value, _port);
+		if (_port != oldPort)
+			needResetServerSocket = true;
+	}
+	if (context->getProperty(MaxBatchSize.getName(), value))
+	{
+		Property::StringToInt(value, _maxBatchSize);
+	}
+
+	if (needResetServerSocket)
+		_resetServerSocket = true;
+
+	startSocketThread();
+
+	// read from the event queue
+	if (isEventQueueEmpty())
+	{
+		context->yield();
+		return;
+	}
+
+	std::queue<SysLogEvent> eventQueue;
+	pollEvent(eventQueue, _maxBatchSize);
+	bool firstEvent = true;
+	FlowFileRecord *flowFile = NULL;
+	while(!eventQueue.empty())
+	{
+		SysLogEvent event = eventQueue.front();
+		eventQueue.pop();
+		if (firstEvent)
+		{
+			flowFile = session->create();
+			if (!flowFile)
+				return;
+			ListenSyslog::WriteCallback callback((char *)event.payload, event.len);
+			session->write(flowFile, &callback);
+			delete[] event.payload;
+			firstEvent = false;
+		}
+		else
+		{
+			/*
+			ListenSyslog::WriteCallback callbackSep((char *)_messageDelimiter.data(), _messageDelimiter.size());
+			session->append(flowFile, &callbackSep); */
+			ListenSyslog::WriteCallback callback((char *)event.payload, event.len);
+			session->append(flowFile, &callback);
+			delete[] event.payload;
+		}
+	}
+	flowFile->addAttribute("syslog.protocol", _protocol);
+	flowFile->addAttribute("syslog.port", std::to_string(_port));
+	session->transfer(flowFile, Success);
+}