NIFI-8792 - Modified ListenRELP to use Netty

- Refactored RELP encoders and decoders

This closes #5398

Signed-off-by: David Handermann <exceptionfactory@apache.org>
diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java
index 502860e..2d1574a 100644
--- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java
+++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/socket/NetworkUtils.java
@@ -17,10 +17,13 @@
 package org.apache.nifi.remote.io.socket;
 
 import java.net.DatagramSocket;
-import java.net.Socket;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
 import java.net.ServerSocket;
-import java.util.concurrent.Executors;
+import java.net.Socket;
+import java.net.SocketException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 public class NetworkUtils {
@@ -89,4 +92,20 @@
 
         return (result != null && result);
     }
+
+    /**
+     * Get Interface Address using interface name eg. en0, eth0
+     *
+     * @param interfaceName Network Interface Name
+     * @return Interface Address or null when matching network interface name not found
+     * @throws SocketException Thrown when failing to get interface addresses
+     */
+    public static InetAddress getInterfaceAddress(final String interfaceName) throws SocketException {
+        InetAddress interfaceAddress = null;
+        if (interfaceName != null && !interfaceName.isEmpty()) {
+            NetworkInterface networkInterface = NetworkInterface.getByName(interfaceName);
+            interfaceAddress = networkInterface.getInetAddresses().nextElement();
+        }
+        return interfaceAddress;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/NetworkEvent.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/NetworkEvent.java
new file mode 100644
index 0000000..437b214
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/NetworkEvent.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.event.transport;
+
+/**
+ * An interface to represent network delivered event/messages
+ */
+public interface NetworkEvent {
+
+    /**
+     * @return the sending host of the data, as a socket
+     */
+    String getSender();
+
+    /**
+     * @return raw data for this event
+     */
+    byte[] getMessage();
+
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java
index 8e77914..00744ce 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-event-transport/src/main/java/org/apache/nifi/event/transport/message/ByteArrayMessage.java
@@ -16,10 +16,12 @@
  */
 package org.apache.nifi.event.transport.message;
 
+import org.apache.nifi.event.transport.NetworkEvent;
+
 /**
  * Byte Array Message with Sender
  */
-public class ByteArrayMessage {
+public class ByteArrayMessage implements NetworkEvent {
     private final byte[] message;
 
     private final String sender;
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java
index c83fdae..d008587 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventBatchingProcessor.java
@@ -16,11 +16,8 @@
  */
 package org.apache.nifi.processor.util.listen;
 
-import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
-
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -28,7 +25,6 @@
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
-import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.listen.event.Event;
 
 import java.io.IOException;
@@ -41,6 +37,8 @@
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.nifi.processor.util.listen.ListenerProperties.NETWORK_INTF_NAME;
+
 /**
  * An abstract processor that extends from AbstractListenEventProcessor and adds common functionality for
  * batching events into a single FlowFile.
@@ -49,25 +47,6 @@
  */
 public abstract class AbstractListenEventBatchingProcessor<E extends Event> extends AbstractListenEventProcessor<E> {
 
-    public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
-            .name("Max Batch Size")
-            .description(
-                    "The maximum number of messages to add to a single FlowFile. If multiple messages are available, they will be concatenated along with "
-                            + "the <Message Delimiter> up to this configured maximum number of messages")
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .defaultValue("1")
-            .required(true)
-            .build();
-    public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
-            .name("Message Delimiter")
-            .displayName("Batching Message Delimiter")
-            .description("Specifies the delimiter to place between messages when multiple messages are bundled together (see <Max Batch Size> property).")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .defaultValue("\\n")
-            .required(true)
-            .build();
-
     // it is only the array reference that is volatile - not the contents.
     protected volatile byte[] messageDemarcatorBytes;
 
@@ -80,8 +59,8 @@
         descriptors.add(MAX_MESSAGE_QUEUE_SIZE);
         descriptors.add(MAX_SOCKET_BUFFER_SIZE);
         descriptors.add(CHARSET);
-        descriptors.add(MAX_BATCH_SIZE);
-        descriptors.add(MESSAGE_DELIMITER);
+        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
         descriptors.addAll(getAdditionalProperties());
         this.descriptors = Collections.unmodifiableList(descriptors);
 
@@ -95,13 +74,13 @@
     @OnScheduled
     public void onScheduled(ProcessContext context) throws IOException {
         super.onScheduled(context);
-        final String msgDemarcator = context.getProperty(MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
+        final String msgDemarcator = context.getProperty(ListenerProperties.MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
         messageDemarcatorBytes = msgDemarcator.getBytes(charset);
     }
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
-        final int maxBatchSize = context.getProperty(MAX_BATCH_SIZE).asInteger();
+        final int maxBatchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
         final Map<String,FlowFileEventBatch> batches = getBatches(session, maxBatchSize, messageDemarcatorBytes);
 
         // if the size is 0 then there was nothing to process so return
@@ -169,7 +148,7 @@
 
     /**
      * Batches together up to the batchSize events. Events are grouped together based on a batch key which
-     * by default is the sender of the event, but can be override by sub-classes.
+     * by default is the sender of the event, but can be overriden by sub-classes.
      *
      * This method will return when batchSize has been reached, or when no more events are available on the queue.
      *
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
index 7898319..03334ba 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.java
@@ -16,7 +16,6 @@
  */
 package org.apache.nifi.processor.util.listen;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -30,10 +29,10 @@
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
 import org.apache.nifi.processor.util.listen.event.Event;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.NetworkInterface;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -56,8 +55,6 @@
  */
 public abstract class AbstractListenEventProcessor<E extends Event> extends AbstractProcessor {
 
-
-
     public static final PropertyDescriptor PORT = new PropertyDescriptor
             .Builder().name("Port")
             .description("The port to listen on for communication.")
@@ -178,19 +175,13 @@
         charset = Charset.forName(context.getProperty(CHARSET).getValue());
         port = context.getProperty(PORT).evaluateAttributeExpressions().asInteger();
         events = new LinkedBlockingQueue<>(context.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger());
+        final String interfaceName = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+        final InetAddress interfaceAddress = NetworkUtils.getInterfaceAddress(interfaceName);
 
-        final String nicIPAddressStr = context.getProperty(NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
         final int maxChannelBufferSize = context.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-
-        InetAddress nicIPAddress = null;
-        if (!StringUtils.isEmpty(nicIPAddressStr)) {
-            NetworkInterface netIF = NetworkInterface.getByName(nicIPAddressStr);
-            nicIPAddress = netIF.getInetAddresses().nextElement();
-        }
-
         // create the dispatcher and call open() to bind to the given port
         dispatcher = createDispatcher(context, events);
-        dispatcher.open(nicIPAddress, port, maxChannelBufferSize);
+        dispatcher.open(interfaceAddress, port, maxChannelBufferSize);
 
         // start a thread to run the dispatcher
         final Thread readerThread = new Thread(dispatcher);
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java
new file mode 100644
index 0000000..bcdb598
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/EventBatcher.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen;
+
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+public abstract class EventBatcher<E extends ByteArrayMessage> {
+
+    public static final int POLL_TIMEOUT_MS = 20;
+
+    private volatile BlockingQueue<E> events;
+    private volatile BlockingQueue<E> errorEvents;
+    private final ComponentLog logger;
+
+    public EventBatcher(final ComponentLog logger, final BlockingQueue events, final BlockingQueue errorEvents) {
+        this.logger = logger;
+        this.events = events;
+        this.errorEvents = errorEvents;
+    }
+
+    /**
+     * Batches together up to the batchSize events. Events are grouped together based on a batch key which
+     * by default is the sender of the event, but can be overriden by sub-classes.
+     * <p>
+     * This method will return when batchSize has been reached, or when no more events are available on the queue.
+     *
+     * @param session                the current session
+     * @param totalBatchSize         the total number of events to process
+     * @param messageDemarcatorBytes the demarcator to put between messages when writing to a FlowFile
+     * @return a Map from the batch key to the FlowFile and events for that batch, the size of events in all
+     * the batches will be <= batchSize
+     */
+    public Map<String, FlowFileEventBatch> getBatches(final ProcessSession session, final int totalBatchSize,
+                                                      final byte[] messageDemarcatorBytes) {
+
+        final Map<String, FlowFileEventBatch> batches = new HashMap<String, FlowFileEventBatch>();
+        for (int i = 0; i < totalBatchSize; i++) {
+            final E event = getMessage(true, true, session);
+            if (event == null) {
+                break;
+            }
+
+            final String batchKey = getBatchKey(event);
+            FlowFileEventBatch batch = batches.get(batchKey);
+
+            // if we don't have a batch for this key then create a new one
+            if (batch == null) {
+                batch = new FlowFileEventBatch(session.create(), new ArrayList<E>());
+                batches.put(batchKey, batch);
+            }
+
+            // add the current event to the batch
+            batch.getEvents().add(event);
+
+            // append the event's data to the FlowFile, write the demarcator first if not on the first event
+            final boolean writeDemarcator = (i > 0);
+            try {
+                final byte[] rawMessage = event.getMessage();
+                FlowFile appendedFlowFile = session.append(batch.getFlowFile(), new OutputStreamCallback() {
+                    @Override
+                    public void process(final OutputStream out) throws IOException {
+                        if (writeDemarcator) {
+                            out.write(messageDemarcatorBytes);
+                        }
+
+                        out.write(rawMessage);
+                    }
+                });
+
+                // update the FlowFile reference in the batch object
+                batch.setFlowFile(appendedFlowFile);
+
+            } catch (final Exception e) {
+                logger.error("Failed to write contents of the message to FlowFile due to {}; will re-queue message and try again",
+                        e.getMessage(), e);
+                errorEvents.offer(event);
+                break;
+            }
+        }
+
+        return batches;
+    }
+
+    /**
+     * The implementation should generate the indexing key for the event, to allow batching together related events.
+     * Typically the batch key will be the sender IP + port to allow batching events from the same sender into a single
+     * flow file.
+     * @param event Use information from the event to generate a batching key
+     * @return The key to batch like-kind events together eg. sender ID/socket
+     */
+    protected abstract String getBatchKey(E event);
+
+    /**
+     * If pollErrorQueue is true, the error queue will be checked first and event will be
+     * returned from the error queue if available.
+     *
+     * If pollErrorQueue is false, or no data is in the error queue, the regular queue is polled.
+     *
+     * If longPoll is true, the regular queue will be polled with a short timeout, otherwise it will
+     * poll with no timeout which will return immediately.
+     *
+     * @param longPoll whether or not to poll the main queue with a small timeout
+     * @param pollErrorQueue whether or not to poll the error queue first
+     *
+     * @return an event from one of the queues, or null if none are available
+     */
+    protected E getMessage(final boolean longPoll, final boolean pollErrorQueue, final ProcessSession session) {
+        E event = null;
+        if (pollErrorQueue) {
+            event = errorEvents.poll();
+        }
+
+        if (event != null) {
+            return event;
+        }
+
+        try {
+            if (longPoll) {
+                event = events.poll(POLL_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+            } else {
+                event = events.poll();
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            return null;
+        }
+
+        if (event != null) {
+            session.adjustCounter("Messages Received", 1L, false);
+        }
+
+        return event;
+    }
+
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/FlowFileEventBatch.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/FlowFileEventBatch.java
new file mode 100644
index 0000000..37eacab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/FlowFileEventBatch.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen;
+
+import org.apache.nifi.event.transport.NetworkEvent;
+import org.apache.nifi.flowfile.FlowFile;
+
+import java.util.List;
+
+public final class FlowFileEventBatch<E extends NetworkEvent> {
+
+        private FlowFile flowFile;
+        private List<E> events;
+
+        public FlowFileEventBatch(final FlowFile flowFile, final List<E> events) {
+            this.flowFile = flowFile;
+            this.events = events;
+        }
+
+        public FlowFile getFlowFile() {
+            return flowFile;
+        }
+
+        public List<E> getEvents() {
+            return events;
+        }
+
+        public void setFlowFile(FlowFile flowFile) {
+            this.flowFile = flowFile;
+        }
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
index 128a9d9..5810f30 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/ListenerProperties.java
@@ -22,6 +22,7 @@
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.processor.util.StandardValidators;
 
 import java.net.NetworkInterface;
 import java.net.SocketException;
@@ -85,4 +86,70 @@
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
+    public static final PropertyDescriptor PORT = new PropertyDescriptor
+            .Builder().name("Port")
+            .description("The port to listen on for communication.")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.PORT_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder()
+            .name("Character Set")
+            .description("Specifies the character set of the received data.")
+            .required(true)
+            .defaultValue("UTF-8")
+            .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder()
+            .name("Receive Buffer Size")
+            .description("The size of each buffer used to receive messages. Adjust this value appropriately based on the expected size of the " +
+                    "incoming messages.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("65507 B")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder()
+            .name("Max Size of Socket Buffer")
+            .description("The maximum size of the socket buffer that should be used. This is a suggestion to the Operating System " +
+                    "to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before " +
+                    "the data can be read, and incoming data will be dropped.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .defaultValue("1 MB")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new PropertyDescriptor.Builder()
+            .name("Max Size of Message Queue")
+            .description("The maximum size of the internal queue used to buffer messages being transferred from the underlying channel to the processor. " +
+                    "Setting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total " +
+                    "memory used by the processor during these surges.")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .defaultValue("10000")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
+            .name("Max Number of TCP Connections")
+            .description("The maximum number of concurrent TCP connections to accept.")
+            .addValidator(StandardValidators.createLongValidator(1, 65535, true))
+            .defaultValue("2")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor MAX_BATCH_SIZE = new PropertyDescriptor.Builder()
+            .name("Max Batch Size")
+            .description(
+                    "The maximum number of messages to add to a single FlowFile. If multiple messages are available, they will be concatenated along with "
+                            + "the <Message Delimiter> up to this configured maximum number of messages")
+            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .defaultValue("1")
+            .required(true)
+            .build();
+    public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
+            .name("Message Delimiter")
+            .displayName("Batching Message Delimiter")
+            .description("Specifies the delimiter to place between messages when multiple messages are bundled together (see <Max Batch Size> property).")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .defaultValue("\\n")
+            .required(true)
+            .build();
+
 }
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/NetworkEventFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/NetworkEventFactory.java
new file mode 100644
index 0000000..ae056b7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/NetworkEventFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+import org.apache.nifi.event.transport.NetworkEvent;
+
+import java.util.Map;
+
+/**
+ * Factory to create instances of a given type of NettyEvent.
+ */
+public interface NetworkEventFactory<E extends NetworkEvent> {
+
+    /**
+     * Creates an event for the given data and metadata.
+     *
+     * @param data raw data from a channel
+     * @param metadata additional metadata
+     *
+     * @return an instance of the given type
+     */
+    E create(final byte[] data, final Map<String, String> metadata);
+
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardNetworkEventFactory.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardNetworkEventFactory.java
new file mode 100644
index 0000000..9eb334d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/listen/event/StandardNetworkEventFactory.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen.event;
+
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+
+import java.util.Map;
+
+/**
+ * An EventFactory implementation to create NettyEvents.
+ */
+public class StandardNetworkEventFactory implements NetworkEventFactory<ByteArrayMessage> {
+
+    @Override
+    public ByteArrayMessage create(final byte[] data, final Map<String, String> metadata) {
+        return new ByteArrayMessage(data, metadata.get(EventFactory.SENDER_KEY));
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/listen/EventBatcherTest.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/listen/EventBatcherTest.java
new file mode 100644
index 0000000..28cc26d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/test/java/org/apache/nifi/processor/util/listen/EventBatcherTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processor.util.listen;
+
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Processor;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.event.StandardNetworkEventFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
+import org.apache.nifi.util.MockProcessSession;
+import org.apache.nifi.util.SharedSessionState;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+public class EventBatcherTest {
+
+    final String MESSAGE_DATA_1 = "some message data";
+    final String MESSAGE_DATA_2 = "some more data";
+    Processor processor;
+    final AtomicLong idGenerator = new AtomicLong(0L);
+    final ComponentLog logger = mock(ComponentLog.class);
+    BlockingQueue events;
+    BlockingQueue errorEvents;
+    EventBatcher batcher;
+    MockProcessSession session;
+    StandardNetworkEventFactory eventFactory;
+
+    @Before
+    public void setUp() {
+        processor = new SimpleProcessor();
+        events = new LinkedBlockingQueue<>();
+        errorEvents = new LinkedBlockingQueue<>();
+        batcher = new EventBatcher<ByteArrayMessage>(logger, events, errorEvents) {
+            @Override
+            protected String getBatchKey(ByteArrayMessage event) {
+                return event.getSender();
+            }
+        };
+        session = new MockProcessSession(new SharedSessionState(processor, idGenerator), Mockito.mock(Processor.class));
+        eventFactory = new StandardNetworkEventFactory();
+    }
+
+    @Test
+    public void testGetBatches() throws InterruptedException {
+        String sender1 = new InetSocketAddress(NetworkUtils.getAvailableTcpPort()).toString();
+        String sender2 = new InetSocketAddress(NetworkUtils.getAvailableTcpPort()).toString();
+        final Map<String, String> sender1Metadata = EventFactoryUtil.createMapWithSender(sender1);
+        final Map<String, String> sender2Metadata = EventFactoryUtil.createMapWithSender(sender2);
+        events.put(eventFactory.create(MESSAGE_DATA_1.getBytes(StandardCharsets.UTF_8), sender1Metadata));
+        events.put(eventFactory.create(MESSAGE_DATA_1.getBytes(StandardCharsets.UTF_8), sender1Metadata));
+        events.put(eventFactory.create(MESSAGE_DATA_1.getBytes(StandardCharsets.UTF_8), sender1Metadata));
+        events.put(eventFactory.create(MESSAGE_DATA_1.getBytes(StandardCharsets.UTF_8), sender1Metadata));
+        events.put(eventFactory.create(MESSAGE_DATA_2.getBytes(StandardCharsets.UTF_8), sender2Metadata));
+        events.put(eventFactory.create(MESSAGE_DATA_2.getBytes(StandardCharsets.UTF_8), sender2Metadata));
+        Map<String, FlowFileEventBatch> batches = batcher.getBatches(session, 100, "\n".getBytes(StandardCharsets.UTF_8));
+        assertEquals(2, batches.size());
+        assertEquals(4, batches.get(sender1).getEvents().size());
+        assertEquals(2, batches.get(sender2).getEvents().size());
+    }
+
+    public static class SimpleProcessor extends AbstractProcessor {
+        @Override
+        public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        }
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 27e75be..4429990 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -413,6 +413,12 @@
             <version>${okhttp.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-inline</artifactId>
+            <version>3.12.4</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
index 004424e..0e3cef9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ListenRELP.java
@@ -24,44 +24,47 @@
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.event.transport.EventException;
+import org.apache.nifi.event.transport.EventServer;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
+import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
-import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
-import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
-import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-import org.apache.nifi.processor.util.listen.response.ChannelResponse;
-import org.apache.nifi.processors.standard.relp.event.RELPEvent;
-import org.apache.nifi.processors.standard.relp.event.RELPEventFactory;
-import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
-import org.apache.nifi.processors.standard.relp.handler.RELPSocketChannelHandlerFactory;
-import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
-import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.listen.EventBatcher;
+import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
+import org.apache.nifi.processor.util.listen.ListenerProperties;
+import org.apache.nifi.processors.standard.relp.event.RELPMessage;
+import org.apache.nifi.processors.standard.relp.handler.RELPMessageServerFactory;
+import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.security.util.ClientAuth;
 import org.apache.nifi.ssl.RestrictedSSLContextService;
 import org.apache.nifi.ssl.SSLContextService;
 
 import javax.net.ssl.SSLContext;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @Tags({"listen", "relp", "tcp", "logs"})
@@ -77,7 +80,7 @@
         @WritesAttribute(attribute="mime.type", description="The mime.type of the content which is text/plain")
     })
 @SeeAlso({ParseSyslog.class})
-public class ListenRELP extends AbstractListenEventBatchingProcessor<RELPEvent> {
+public class ListenRELP extends AbstractProcessor {
 
     public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
             .name("SSL Context Service")
@@ -87,6 +90,7 @@
             .required(false)
             .identifiesControllerService(RestrictedSSLContextService.class)
             .build();
+
     public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder()
             .name("Client Auth")
             .displayName("Client Auth")
@@ -96,27 +100,80 @@
             .defaultValue(ClientAuth.REQUIRED.name())
             .build();
 
-    private volatile RELPEncoder relpEncoder;
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Messages received successfully will be sent out this relationship.")
+            .build();
 
-    @Override
-    protected List<PropertyDescriptor> getAdditionalProperties() {
-        return Arrays.asList(MAX_CONNECTIONS, SSL_CONTEXT_SERVICE, CLIENT_AUTH);
+    protected List<PropertyDescriptor> descriptors;
+    protected Set<Relationship> relationships;
+    protected volatile int port;
+    protected volatile BlockingQueue<RELPMessage> events;
+    protected volatile BlockingQueue<RELPMessage> errorEvents;
+    protected volatile EventServer eventServer;
+    protected volatile byte[] messageDemarcatorBytes;
+    protected volatile EventBatcher eventBatcher;
+
+    @OnScheduled
+    public void onScheduled(ProcessContext context) throws IOException {
+        int maxConnections = context.getProperty(ListenerProperties.MAX_CONNECTIONS).asInteger();
+        int bufferSize = context.getProperty(ListenerProperties.RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
+        final String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
+        InetAddress hostname = NetworkUtils.getInterfaceAddress(networkInterface);
+        Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
+        port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
+        events = new LinkedBlockingQueue<>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
+        errorEvents = new LinkedBlockingQueue<>();
+        eventBatcher = getEventBatcher();
+
+        final String msgDemarcator = getMessageDemarcator(context);
+        messageDemarcatorBytes = msgDemarcator.getBytes(charset);
+        final NettyEventServerFactory eventFactory = getNettyEventServerFactory(hostname, port, charset, events);
+        eventFactory.setSocketReceiveBuffer(bufferSize);
+        eventFactory.setWorkerThreads(maxConnections);
+        configureFactoryForSsl(context, eventFactory);
+
+        try {
+            eventServer = eventFactory.getEventServer();
+        } catch (EventException e) {
+            getLogger().error("Failed to bind to [{}:{}].", hostname.getHostAddress(), port);
+        }
+    }
+
+    @OnStopped
+    public void stopped() {
+        if (eventServer != null) {
+            eventServer.shutdown();
+            eventServer = null;
+        }
     }
 
     @Override
-    @OnScheduled
-    public void onScheduled(ProcessContext context) throws IOException {
-        super.onScheduled(context);
-        // wanted to ensure charset was already populated here
-        relpEncoder = new RELPEncoder(charset);
+    protected void init(final ProcessorInitializationContext context) {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
+        descriptors.add(ListenerProperties.NETWORK_INTF_NAME);
+        descriptors.add(ListenerProperties.PORT);
+        descriptors.add(ListenerProperties.RECV_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE);
+        descriptors.add(ListenerProperties.MAX_SOCKET_BUFFER_SIZE);
+        descriptors.add(ListenerProperties.CHARSET);
+        descriptors.add(ListenerProperties.MAX_CONNECTIONS);
+        descriptors.add(ListenerProperties.MAX_BATCH_SIZE);
+        descriptors.add(ListenerProperties.MESSAGE_DELIMITER);
+        descriptors.add(SSL_CONTEXT_SERVICE);
+        descriptors.add(CLIENT_AUTH);
+        this.descriptors = Collections.unmodifiableList(descriptors);
+
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_SUCCESS);
+        this.relationships = Collections.unmodifiableSet(relationships);
     }
 
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
         final List<ValidationResult> results = new ArrayList<>();
-        final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
 
-        // Validate CLIENT_AUTH
+        final SSLContextService sslContextService = validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
         final String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
         if (sslContextService != null && StringUtils.isBlank(clientAuth)) {
             results.add(new ValidationResult.Builder()
@@ -128,66 +185,31 @@
     }
 
     @Override
-    protected ChannelDispatcher createDispatcher(final ProcessContext context, final BlockingQueue<RELPEvent> events) throws IOException {
-        final EventFactory<RELPEvent> eventFactory = new RELPEventFactory();
-        final ChannelHandlerFactory<RELPEvent,AsyncChannelDispatcher> handlerFactory = new RELPSocketChannelHandlerFactory<>();
+    public final Set<Relationship> getRelationships() {
+        return this.relationships;
+    }
 
-        final int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
-        final int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
-        final Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
+    @Override
+    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return descriptors;
+    }
 
-        // initialize the buffer pool based on max number of connections and the buffer size
-        final ByteBufferSource byteBufferSource = new ByteBufferPool(maxConnections, bufferSize);
-
-        // if an SSLContextService was provided then create an SSLContext to pass down to the dispatcher
-        SSLContext sslContext = null;
-        ClientAuth clientAuth = null;
-
+    private void configureFactoryForSsl(final ProcessContext context, final NettyEventServerFactory eventFactory) {
         final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
         if (sslContextService != null) {
             final String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
-            sslContext = sslContextService.createContext();
-            clientAuth = ClientAuth.valueOf(clientAuthValue);
-        }
-
-        // if we decide to support SSL then get the context and pass it in here
-        return new SocketChannelDispatcher<>(eventFactory, handlerFactory, byteBufferSource, events,
-                getLogger(), maxConnections, sslContext, clientAuth, charSet);
-    }
-
-    @Override
-    protected String getBatchKey(RELPEvent event) {
-        return event.getSender() + "_" + event.getCommand();
-    }
-
-    @Override
-    protected void postProcess(final ProcessContext context, final ProcessSession session, final List<RELPEvent> events) {
-        // first commit the session so we guarantee we have all the events successfully
-        // written to FlowFiles and transferred to the success relationship
-        session.commitAsync(() -> {
-            // respond to each event to acknowledge successful receipt
-            for (final RELPEvent event : events) {
-                respond(event, RELPResponse.ok(event.getTxnr()));
+            SSLContext sslContext = sslContextService.createContext();
+            if (sslContext != null) {
+                eventFactory.setSslContext(sslContext);
+                eventFactory.setClientAuth(ClientAuth.valueOf(clientAuthValue));
             }
-        });
-    }
-
-    protected void respond(final RELPEvent event, final RELPResponse relpResponse) {
-        final ChannelResponse response = new RELPChannelResponse(relpEncoder, relpResponse);
-
-        final ChannelResponder responder = event.getResponder();
-        responder.addResponse(response);
-        try {
-            responder.respond();
-        } catch (IOException e) {
-            getLogger().error("Error sending response for transaction {} due to {}",
-                    new Object[] {event.getTxnr(), e.getMessage()}, e);
+        } else {
+            eventFactory.setSslContext(null);
         }
     }
 
-    @Override
     protected Map<String, String> getAttributes(FlowFileEventBatch batch) {
-        final List<RELPEvent> events = batch.getEvents();
+        final List<RELPMessage> events = batch.getEvents();
 
         // the sender and command will be the same for all events based on the batch key
         final String sender = events.get(0).getSender();
@@ -209,15 +231,63 @@
         return attributes;
     }
 
-    @Override
     protected String getTransitUri(FlowFileEventBatch batch) {
-        final String sender = batch.getEvents().get(0).getSender();
+        final List<RELPMessage> events = batch.getEvents();
+        final String sender = events.get(0).getSender();
         final String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
         final String transitUri = new StringBuilder().append("relp").append("://").append(senderHost).append(":")
                 .append(port).toString();
         return transitUri;
     }
 
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+        EventBatcher eventBatcher = getEventBatcher();
+
+        final int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
+        Map<String, FlowFileEventBatch> batches = eventBatcher.getBatches(session, batchSize, messageDemarcatorBytes);
+        processEvents(session, batches);
+    }
+
+    private void processEvents(final ProcessSession session, final Map<String, FlowFileEventBatch> batches) {
+        for (Map.Entry<String, FlowFileEventBatch> entry : batches.entrySet()) {
+            FlowFile flowFile = entry.getValue().getFlowFile();
+            final List<RELPMessage> events = entry.getValue().getEvents();
+
+            if (flowFile.getSize() == 0L || events.size() == 0) {
+                session.remove(flowFile);
+                getLogger().debug("No data written to FlowFile from batch {}; removing FlowFile", entry.getKey());
+                continue;
+            }
+
+            final Map<String,String> attributes = getAttributes(entry.getValue());
+            flowFile = session.putAllAttributes(flowFile, attributes);
+
+            getLogger().debug("Transferring {} to success", flowFile);
+            session.transfer(flowFile, REL_SUCCESS);
+            session.adjustCounter("FlowFiles Transferred to Success", 1L, false);
+
+            // the sender and command will be the same for all events based on the batch key
+            final String transitUri = getTransitUri(entry.getValue());
+            session.getProvenanceReporter().receive(flowFile, transitUri);
+
+        }
+        session.commitAsync();
+    }
+
+    private String getRELPBatchKey(final RELPMessage event) {
+        return event.getSender() + "_" + event.getCommand();
+    }
+
+    private EventBatcher getEventBatcher() {
+        return new EventBatcher<RELPMessage>(getLogger(), events, errorEvents) {
+            @Override
+            protected String getBatchKey(RELPMessage event) {
+                return getRELPBatchKey(event);
+            }
+        };
+    }
+
     public enum RELPAttributes implements FlowFileAttributeKey {
         TXNR("relp.txnr"),
         COMMAND("relp.command"),
@@ -235,4 +305,14 @@
             return key;
         }
     }
+
+    private NettyEventServerFactory getNettyEventServerFactory(final InetAddress hostname, final int port, final Charset charset, final BlockingQueue events) {
+        return new RELPMessageServerFactory(getLogger(), hostname, port, charset, events);
+    }
+
+    private String getMessageDemarcator(final ProcessContext context) {
+        return context.getProperty(ListenerProperties.MESSAGE_DELIMITER)
+                .getValue()
+                .replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEvent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPMessage.java
similarity index 68%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEvent.java
rename to nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPMessage.java
index e877ea2..5365c2d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEvent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPMessage.java
@@ -16,21 +16,18 @@
  */
 package org.apache.nifi.processors.standard.relp.event;
 
-import org.apache.nifi.processor.util.listen.event.StandardEvent;
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-
-import java.nio.channels.SocketChannel;
+import org.apache.nifi.event.transport.message.ByteArrayMessage;
 
 /**
- * A RELP event which adds the transaction number and command to the StandardEvent.
+ * A RELP message which adds a transaction number and command to the ByteArrayMessage.
  */
-public class RELPEvent extends StandardEvent<SocketChannel> {
+public class RELPMessage extends ByteArrayMessage {
 
     private final long txnr;
     private final String command;
 
-    public RELPEvent(final String sender, final byte[] data, final ChannelResponder<SocketChannel> responder, final long txnr, final String command) {
-        super(sender, data, responder);
+    public RELPMessage(final String sender, final byte[] data, final long txnr, final String command) {
+        super(data, sender);
         this.txnr = txnr;
         this.command = command;
     }
@@ -42,5 +39,4 @@
     public String getCommand() {
         return command;
     }
-
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEventFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPMessageFactory.java
similarity index 71%
rename from nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEventFactory.java
rename to nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPMessageFactory.java
index 22eba01..06e44b4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPEventFactory.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPMessageFactory.java
@@ -16,22 +16,20 @@
  */
 package org.apache.nifi.processors.standard.relp.event;
 
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
+import org.apache.nifi.processor.util.listen.event.NetworkEventFactory;
 
 import java.util.Map;
 
 /**
  * An EventFactory implementation to create RELPEvents.
  */
-public class RELPEventFactory implements EventFactory<RELPEvent> {
+public class RELPMessageFactory implements NetworkEventFactory<RELPMessage> {
 
     @Override
-    public RELPEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) {
+    public RELPMessage create(final byte[] data, final Map<String, String> metadata) {
         final long txnr = Long.valueOf(metadata.get(RELPMetadata.TXNR_KEY));
         final String command = metadata.get(RELPMetadata.COMMAND_KEY);
-        final String sender = metadata.get(EventFactory.SENDER_KEY);
-        return new RELPEvent(sender, data, responder, txnr, command);
+        final String sender = metadata.get(RELPMetadata.SENDER_KEY);
+        return new RELPMessage(sender, data, txnr, command);
     }
-
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPMetadata.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPMetadata.java
index 88051c0..f54f0c4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPMetadata.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/event/RELPMetadata.java
@@ -23,5 +23,6 @@
 
     String TXNR_KEY = "relp.txnr";
     String COMMAND_KEY = "relp.command";
+    String SENDER_KEY = "sender";
 
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPDecoder.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPDecoder.java
index 53ba97b..a6a0db7 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPDecoder.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPDecoder.java
@@ -16,11 +16,12 @@
  */
 package org.apache.nifi.processors.standard.relp.frame;
 
-import java.io.ByteArrayOutputStream;
-import java.nio.charset.Charset;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.nio.charset.Charset;
+
 /**
  * Decodes a RELP frame by maintaining a state based on each byte that has been processed. This class
  * should not be shared by multiple threads.
@@ -43,6 +44,13 @@
     }
 
     /**
+     * @param charset the charset to decode bytes from the RELP frame
+     */
+    public RELPDecoder(final Charset charset, final int bufferSize) {
+        this(charset, new ByteArrayOutputStream(bufferSize));
+    }
+
+    /**
      *
      * @param charset the charset to decode bytes from the RELP frame
      * @param buffer a buffer to use while processing the bytes
@@ -139,7 +147,7 @@
         if (b == RELPFrame.SEPARATOR) {
             final String command = new String(currBytes.toByteArray(), charset);
             frameBuilder.command(command);
-            logger.debug("Command is {}", new Object[] {command});
+            logger.debug("Command is {}", command);
 
             currBytes.reset();
             currState = RELPState.LENGTH;
@@ -152,7 +160,7 @@
         if (b == RELPFrame.SEPARATOR || (currBytes.size() > 0 && b == RELPFrame.DELIMITER)) {
             final int dataLength = Integer.parseInt(new String(currBytes.toByteArray(), charset));
             frameBuilder.dataLength(dataLength);
-            logger.debug("Length is {}", new Object[] {dataLength});
+            logger.debug("Length is {}", dataLength);
 
             currBytes.reset();
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoder.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoder.java
new file mode 100644
index 0000000..b2f83b5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoder.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
+import org.apache.nifi.processor.util.listen.response.ChannelResponse;
+import org.apache.nifi.processors.standard.relp.event.RELPMessageFactory;
+import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+
+import java.nio.charset.Charset;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Decode RELP message bytes into a RELPMessage
+ */
+public class RELPFrameDecoder extends ByteToMessageDecoder {
+
+    private Charset charset;
+    private RELPDecoder decoder;
+    private final ComponentLog logger;
+    private final RELPEncoder encoder;
+    private final RELPMessageFactory eventFactory;
+
+    static final String CMD_OPEN = "open";
+    static final String CMD_CLOSE = "close";
+
+    public RELPFrameDecoder(final ComponentLog logger, final Charset charset) {
+        this.charset = charset;
+        this.logger = logger;
+        this.encoder = new RELPEncoder(charset);
+        this.eventFactory = new RELPMessageFactory();
+    }
+
+    @Override
+    protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final List<Object> out) throws Exception {
+        final int total = in.readableBytes();
+        final String senderSocket = ctx.channel().remoteAddress().toString();
+        this.decoder = new RELPDecoder(charset, total);
+
+        // go through the buffer parsing the RELP command
+        for (int i = 0; i < total; i++) {
+            byte currByte = in.readByte();
+            // if we found the end of a frame, handle the frame and mark the buffer
+            if (decoder.process(currByte)) {
+                final RELPFrame frame = decoder.getFrame();
+
+                logger.debug("Received RELP frame with transaction {} and command {}",
+                       frame.getTxnr(), frame.getCommand());
+                handle(frame, ctx, senderSocket, out);
+                in.markReaderIndex();
+            }
+        }
+    }
+
+    private void handle(final RELPFrame frame, final ChannelHandlerContext ctx, final String sender, final List<Object> out) {
+        // respond to open and close commands immediately, create and queue an event for everything else
+        if (CMD_OPEN.equals(frame.getCommand())) {
+            Map<String,String> offers = RELPResponse.parseOffers(frame.getData(), charset);
+            ChannelResponse response = new RELPChannelResponse(encoder, RELPResponse.open(frame.getTxnr(), offers));
+            ctx.writeAndFlush(Unpooled.wrappedBuffer(response.toByteArray()));
+        } else if (CMD_CLOSE.equals(frame.getCommand())) {
+            ChannelResponse response = new RELPChannelResponse(encoder, RELPResponse.ok(frame.getTxnr()));
+            ctx.writeAndFlush(Unpooled.wrappedBuffer(response.toByteArray()));
+            ctx.close();
+        } else {
+            final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender);
+            metadata.put(RELPMetadata.TXNR_KEY, String.valueOf(frame.getTxnr()));
+            metadata.put(RELPMetadata.COMMAND_KEY, frame.getCommand());
+            metadata.put(RELPMetadata.SENDER_KEY, sender);
+            out.add(eventFactory.create(frame.getData(), metadata));
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPMessageChannelHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPMessageChannelHandler.java
new file mode 100644
index 0000000..aeb10ad
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPMessageChannelHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.nifi.processors.standard.relp.event.RELPMessage;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.Charset;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Decode data received into a RELPMessage
+ */
+@ChannelHandler.Sharable
+public class RELPMessageChannelHandler extends SimpleChannelInboundHandler<RELPMessage> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RELPMessageChannelHandler.class);
+    private final BlockingQueue<RELPMessage> events;
+    private final RELPEncoder encoder;
+
+    public RELPMessageChannelHandler(BlockingQueue<RELPMessage> events, final Charset charset) {
+        this.events = events;
+        this.encoder = new RELPEncoder(charset);
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, RELPMessage msg) {
+        LOGGER.debug("RELP Message Received Length [{}] Remote Address [{}] ", msg.getMessage().length, msg.getSender());
+        if (events.offer(msg)) {
+            LOGGER.debug("Event Queued: RELP Message Sender [{}] Transaction Number [{}]", msg.getSender(), msg.getTxnr());
+            ctx.writeAndFlush(Unpooled.wrappedBuffer(new RELPChannelResponse(encoder, RELPResponse.ok(msg.getTxnr())).toByteArray()));
+        } else {
+            LOGGER.debug("Event Queue Full: Failed RELP Message Sender [{}] Transaction Number [{}]", msg.getSender(), msg.getTxnr());
+            ctx.writeAndFlush(Unpooled.wrappedBuffer(new RELPChannelResponse(encoder, RELPResponse.serverFullError(msg.getTxnr())).toByteArray()));
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPResponseEncoder.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPResponseEncoder.java
new file mode 100644
index 0000000..e84c9b6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/frame/RELPResponseEncoder.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+
+import java.nio.charset.Charset;
+import java.util.List;
+
+/**
+ * Message encoder for a RELPResponse
+ */
+@ChannelHandler.Sharable
+public class RELPResponseEncoder extends MessageToMessageEncoder<RELPResponse> {
+
+    private Charset charset;
+
+    public RELPResponseEncoder(final Charset charset) {
+        this.charset = charset;
+    }
+
+    @Override
+    protected void encode(ChannelHandlerContext context, RELPResponse event, List<Object> out) throws Exception {
+        out.add(new RELPEncoder(charset).encode(event.toFrame(charset)));
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java
deleted file mode 100644
index dff8c84..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPFrameHandler.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.relp.handler;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
-import org.apache.nifi.processor.util.listen.event.EventQueue;
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-import org.apache.nifi.processor.util.listen.response.ChannelResponse;
-import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
-import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
-import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
-import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
-import org.apache.nifi.processors.standard.relp.response.RELPResponse;
-
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Encapsulates the logic to handle a RELPFrame once it has been read from the channel.
- */
-public class RELPFrameHandler<E extends Event<SocketChannel>> {
-
-    static final String CMD_OPEN = "open";
-    static final String CMD_CLOSE = "close";
-
-    private final Charset charset;
-    private final EventFactory<E> eventFactory;
-    private final EventQueue<E> events;
-    private final SelectionKey key;
-    private final AsyncChannelDispatcher dispatcher;
-    private final ComponentLog logger;
-    private final RELPEncoder encoder;
-
-    public RELPFrameHandler(final SelectionKey selectionKey,
-                            final Charset charset,
-                            final EventFactory<E> eventFactory,
-                            final BlockingQueue<E> events,
-                            final AsyncChannelDispatcher dispatcher,
-                            final ComponentLog logger) {
-        this.key = selectionKey;
-        this.charset = charset;
-        this.eventFactory = eventFactory;
-        this.dispatcher = dispatcher;
-        this.logger = logger;
-        this.events = new EventQueue<>(events, logger);
-        this.encoder = new RELPEncoder(charset);
-    }
-
-    public void handle(final RELPFrame frame, final ChannelResponder<SocketChannel> responder, final String sender)
-            throws IOException, InterruptedException {
-
-        // respond to open and close commands immediately, create and queue an event for everything else
-        if (CMD_OPEN.equals(frame.getCommand())) {
-            Map<String,String> offers = RELPResponse.parseOffers(frame.getData(), charset);
-            ChannelResponse response = new RELPChannelResponse(encoder, RELPResponse.open(frame.getTxnr(), offers));
-            responder.addResponse(response);
-            responder.respond();
-        } else if (CMD_CLOSE.equals(frame.getCommand())) {
-            ChannelResponse response = new RELPChannelResponse(encoder, RELPResponse.ok(frame.getTxnr()));
-            responder.addResponse(response);
-            responder.respond();
-            dispatcher.completeConnection(key);
-        } else {
-            final Map<String, String> metadata = EventFactoryUtil.createMapWithSender(sender.toString());
-            metadata.put(RELPMetadata.TXNR_KEY, String.valueOf(frame.getTxnr()));
-            metadata.put(RELPMetadata.COMMAND_KEY, frame.getCommand());
-
-            final E event = eventFactory.create(frame.getData(), metadata, responder);
-            events.offer(event);
-        }
-    }
-
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPMessageServerFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPMessageServerFactory.java
new file mode 100644
index 0000000..646099d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPMessageServerFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.handler;
+
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.NettyEventServerFactory;
+import org.apache.nifi.event.transport.netty.channel.LogExceptionChannelHandler;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.standard.relp.event.RELPMessage;
+import org.apache.nifi.processors.standard.relp.frame.RELPFrameDecoder;
+import org.apache.nifi.processors.standard.relp.frame.RELPMessageChannelHandler;
+
+import java.net.InetAddress;
+import java.nio.charset.Charset;
+import java.util.Arrays;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Netty Event Server Factory implementation for RELP Messages
+ */
+public class RELPMessageServerFactory extends NettyEventServerFactory {
+
+    /**
+     * RELP Message Server Factory to receive RELP messages
+     * @param log Component Log
+     * @param address Server Address
+     * @param port Server Port Number
+     * @param charset Charset to use when decoding RELP messages
+     * @param events Blocking Queue for events received
+     */
+    public RELPMessageServerFactory(final ComponentLog log,
+                                    final InetAddress address,
+                                    final int port,
+                                    final Charset charset,
+                                    final BlockingQueue<RELPMessage> events) {
+        super(address, port, TransportProtocol.TCP);
+        final LogExceptionChannelHandler logExceptionChannelHandler = new LogExceptionChannelHandler(log);
+        final RELPMessageChannelHandler relpChannelHandler = new RELPMessageChannelHandler(events, charset);
+
+        setHandlerSupplier(() -> Arrays.asList(
+                logExceptionChannelHandler,
+                new RELPFrameDecoder(log, charset),
+                relpChannelHandler
+        ));
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java
deleted file mode 100644
index 9bb2548..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSSLSocketChannelHandler.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.relp.handler;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.handler.socket.SSLSocketChannelHandler;
-import org.apache.nifi.processor.util.listen.response.socket.SSLSocketChannelResponder;
-import org.apache.nifi.processors.standard.relp.frame.RELPDecoder;
-import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
-import org.apache.nifi.processors.standard.relp.frame.RELPFrameException;
-import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * A RELP implementation of SSLSocketChannelHandler.
- */
-public class RELPSSLSocketChannelHandler<E extends Event<SocketChannel>> extends SSLSocketChannelHandler<E> {
-
-    private RELPDecoder decoder;
-    private RELPFrameHandler<E> frameHandler;
-
-    public RELPSSLSocketChannelHandler(final SelectionKey key,
-                                       final AsyncChannelDispatcher dispatcher,
-                                       final Charset charset,
-                                       final EventFactory<E> eventFactory,
-                                       final BlockingQueue<E> events,
-                                       final ComponentLog logger) {
-        super(key, dispatcher, charset, eventFactory, events, logger);
-        this.decoder = new RELPDecoder(charset);
-        this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
-    }
-
-    @Override
-    protected void processBuffer(final SSLSocketChannel sslSocketChannel, final SocketChannel socketChannel,
-                                 final int bytesRead, final byte[] buffer) throws InterruptedException, IOException {
-
-        final InetAddress sender = socketChannel.socket().getInetAddress();
-        try {
-            // go through the buffer parsing the RELP command
-            for (int i = 0; i < bytesRead; i++) {
-                byte currByte = buffer[i];
-
-                // if we found the end of a frame, handle the frame and mark the buffer
-                if (decoder.process(currByte)) {
-                    final RELPFrame frame = decoder.getFrame();
-
-                    logger.debug("Received RELP frame with transaction {} and command {}",
-                            new Object[] {frame.getTxnr(), frame.getCommand()});
-
-                    final SSLSocketChannelResponder responder = new SSLSocketChannelResponder(socketChannel, sslSocketChannel);
-                    frameHandler.handle(frame, responder, sender.toString());
-                }
-            }
-
-            logger.debug("Done processing buffer");
-
-        } catch (final RELPFrameException rfe) {
-            logger.error("Error reading RELP frames due to {}", new Object[] {rfe.getMessage()} , rfe);
-            // if an invalid frame or bad data was sent then the decoder will be left in a
-            // corrupted state, so lets close the connection and cause the client to re-establish
-            dispatcher.completeConnection(key);
-        }
-    }
-
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java
deleted file mode 100644
index 7e7b769..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandler.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.relp.handler;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.handler.socket.StandardSocketChannelHandler;
-import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
-import org.apache.nifi.processors.standard.relp.frame.RELPDecoder;
-import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
-import org.apache.nifi.processors.standard.relp.frame.RELPFrameException;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Extends the StandardSocketChannelHandler to decode bytes into RELP frames.
- */
-public class RELPSocketChannelHandler<E extends Event<SocketChannel>> extends StandardSocketChannelHandler<E> {
-
-    private RELPDecoder decoder;
-    private RELPFrameHandler<E> frameHandler;
-
-    public RELPSocketChannelHandler(final SelectionKey key,
-                                    final AsyncChannelDispatcher dispatcher,
-                                    final Charset charset,
-                                    final EventFactory<E> eventFactory,
-                                    final BlockingQueue<E> events,
-                                    final ComponentLog logger) {
-        super(key, dispatcher, charset, eventFactory, events, logger);
-        this.decoder = new RELPDecoder(charset);
-        this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
-    }
-
-    @Override
-    protected void processBuffer(final SocketChannel socketChannel, final ByteBuffer socketBuffer)
-            throws InterruptedException, IOException {
-
-        // get total bytes in buffer
-        final int total = socketBuffer.remaining();
-        final InetAddress sender = socketChannel.socket().getInetAddress();
-
-        try {
-            // go through the buffer parsing the RELP command
-            for (int i = 0; i < total; i++) {
-                byte currByte = socketBuffer.get();
-
-                // if we found the end of a frame, handle the frame and mark the buffer
-                if (decoder.process(currByte)) {
-                    final RELPFrame frame = decoder.getFrame();
-
-                    logger.debug("Received RELP frame with transaction {} and command {}",
-                            new Object[] {frame.getTxnr(), frame.getCommand()});
-
-                    final SocketChannelResponder responder = new SocketChannelResponder(socketChannel);
-                    frameHandler.handle(frame, responder, sender.toString());
-                    socketBuffer.mark();
-                }
-            }
-
-            logger.debug("Done processing buffer");
-
-        } catch (final RELPFrameException rfe) {
-            logger.error("Error reading RELP frames due to {}", new Object[] {rfe.getMessage()}, rfe);
-            // if an invalid frame or bad data was sent then the decoder will be left in a
-            // corrupted state, so lets close the connection and cause the client to re-establish
-            dispatcher.completeConnection(key);
-        }
-    }
-
-    // not used for anything in RELP since the decoder encapsulates the delimiter
-    @Override
-    public byte getDelimiter() {
-        return RELPFrame.DELIMITER;
-    }
-
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandlerFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandlerFactory.java
deleted file mode 100644
index 625219b..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/handler/RELPSocketChannelHandlerFactory.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.relp.handler;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.handler.ChannelHandler;
-import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
-
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.util.concurrent.BlockingQueue;
-
-/**
- * Default factory for creating RELP socket channel handlers.
- */
-public class RELPSocketChannelHandlerFactory<E extends Event<SocketChannel>> implements ChannelHandlerFactory<E, AsyncChannelDispatcher> {
-
-    @Override
-    public ChannelHandler<E, AsyncChannelDispatcher> createHandler(final SelectionKey key,
-                                           final AsyncChannelDispatcher dispatcher,
-                                           final Charset charset,
-                                           final EventFactory<E> eventFactory,
-                                           final BlockingQueue<E> events,
-                                           final ComponentLog logger) {
-        return new RELPSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger);
-    }
-
-    @Override
-    public ChannelHandler<E, AsyncChannelDispatcher> createSSLHandler(final SelectionKey key,
-                                              final AsyncChannelDispatcher dispatcher,
-                                              final Charset charset,
-                                              final EventFactory<E> eventFactory,
-                                              final BlockingQueue<E> events,
-                                              final ComponentLog logger) {
-        return new RELPSSLSocketChannelHandler<>(key, dispatcher, charset, eventFactory, events, logger);
-    }
-
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPResponse.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPResponse.java
index f543f26..9e8246b 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPResponse.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/relp/response/RELPResponse.java
@@ -127,9 +127,23 @@
      * @return a RELPResponse with a 500 code and a message of "ERROR"
      */
     public static RELPResponse error(final long txnr) {
-        return new RELPResponse(txnr, ERROR, "ERROR", null);
+        return error(txnr, "ERROR");
     }
 
+    /**
+     * Utility method to create a default "ERROR" response if the server event queue is full.
+     *
+     * @param txnr the transaction number being responded to
+     *
+     * @return a RELPResponse with a 500 code and a message of "SERVER FULL"
+     */
+    public static RELPResponse serverFullError(final long txnr) {
+        return error(txnr, "SERVER FULL");
+    }
+
+    private static RELPResponse error(final long txnr, final String message) {
+        return new RELPResponse(txnr, ERROR, message, null);
+    }
 
     /**
      * Parses the provided data into a Map of offers.
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
index 7e95433..efdac3a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenRELP.java
@@ -16,22 +16,16 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import javax.net.ssl.SSLContext;
-
 import org.apache.commons.lang3.StringUtils;
+import org.apache.ftpserver.ssl.ClientAuth;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.event.transport.EventSender;
+import org.apache.nifi.event.transport.configuration.TransportProtocol;
+import org.apache.nifi.event.transport.netty.ByteArrayNettyEventSenderFactory;
 import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-import org.apache.nifi.processors.standard.relp.event.RELPEvent;
+import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
+import org.apache.nifi.processor.util.listen.ListenerProperties;
+import org.apache.nifi.processors.standard.relp.event.RELPMessage;
 import org.apache.nifi.processors.standard.relp.frame.RELPEncoder;
 import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -45,32 +39,47 @@
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.apache.nifi.web.util.ssl.SslContextUtils;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
-import org.mockito.Mockito;
 import org.mockito.junit.MockitoJUnitRunner;
 
+import javax.net.ssl.SSLContext;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.mockito.Mockito.when;
+
 @RunWith(MockitoJUnitRunner.class)
 public class TestListenRELP {
 
     public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";
-    public static final String SYSLOG_FRAME_DATA = "this is a syslog message here";
+    public static final String RELP_FRAME_DATA = "this is a relp message here";
+
+    private static final String LOCALHOST = "localhost";
+    private static final Charset CHARSET = StandardCharsets.US_ASCII;
+    private static final Duration SENDER_TIMEOUT = Duration.ofSeconds(10);
 
     static final RELPFrame OPEN_FRAME = new RELPFrame.Builder()
             .txnr(1)
             .command("open")
             .dataLength(OPEN_FRAME_DATA.length())
-            .data(OPEN_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
+            .data(OPEN_FRAME_DATA.getBytes(CHARSET))
             .build();
 
-    static final RELPFrame SYSLOG_FRAME = new RELPFrame.Builder()
+    static final RELPFrame RELP_FRAME = new RELPFrame.Builder()
             .txnr(2)
             .command("syslog")
-            .dataLength(SYSLOG_FRAME_DATA.length())
-            .data(SYSLOG_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
+            .dataLength(RELP_FRAME_DATA.length())
+            .data(RELP_FRAME_DATA.getBytes(CHARSET))
             .build();
 
     static final RELPFrame CLOSE_FRAME = new RELPFrame.Builder()
@@ -80,14 +89,6 @@
             .data(new byte[0])
             .build();
 
-    private static final String LOCALHOST = "localhost";
-
-    @Mock
-    private ChannelResponder<SocketChannel> responder;
-
-    @Mock
-    private ChannelDispatcher channelDispatcher;
-
     @Mock
     private RestrictedSSLContextService sslContextService;
 
@@ -97,46 +98,53 @@
 
     @Before
     public void setup() {
-        encoder = new RELPEncoder(StandardCharsets.UTF_8);
-        runner = TestRunners.newTestRunner(ListenRELP.class);
+        encoder = new RELPEncoder(CHARSET);
+        ListenRELP mockRELP = new MockListenRELP();
+        runner = TestRunners.newTestRunner(mockRELP);
+    }
+
+    @After
+    public void shutdown() {
+        runner.shutdown();
     }
 
     @Test
-    public void testRun() throws IOException {
-        final int syslogFrames = 5;
-        final List<RELPFrame> frames = getFrames(syslogFrames);
+    public void testRELPFramesAreReceivedSuccessfully() throws IOException {
+        final int relpFrames = 5;
+        final List<RELPFrame> frames = getFrames(relpFrames);
 
-        // three syslog frames should be transferred and three responses should be sent
-        run(frames, syslogFrames, syslogFrames, null);
+        // three RELP frames should be transferred
+        run(frames, relpFrames, null);
 
         final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
         Assert.assertNotNull(events);
-        Assert.assertEquals(syslogFrames, events.size());
+        Assert.assertEquals(relpFrames, events.size());
 
         final ProvenanceEventRecord event = events.get(0);
         Assert.assertEquals(ProvenanceEventType.RECEIVE, event.getEventType());
         Assert.assertTrue("transit uri must be set and start with proper protocol", event.getTransitUri().toLowerCase().startsWith("relp"));
 
         final List<MockFlowFile> mockFlowFiles = runner.getFlowFilesForRelationship(ListenRELP.REL_SUCCESS);
-        Assert.assertEquals(syslogFrames, mockFlowFiles.size());
+        Assert.assertEquals(relpFrames, mockFlowFiles.size());
 
         final MockFlowFile mockFlowFile = mockFlowFiles.get(0);
-        Assert.assertEquals(String.valueOf(SYSLOG_FRAME.getTxnr()), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.TXNR.key()));
-        Assert.assertEquals(SYSLOG_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
+        Assert.assertEquals(String.valueOf(RELP_FRAME.getTxnr()), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.TXNR.key()));
+        Assert.assertEquals(RELP_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
         Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
         Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
     }
 
     @Test
-    public void testRunBatching() throws IOException {
-        runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "5");
+    public void testRELPFramesAreReceivedSuccessfullyWhenBatched() throws IOException {
 
-        final int syslogFrames = 3;
-        final List<RELPFrame> frames = getFrames(syslogFrames);
+        runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "5");
 
-        // one syslog frame should be transferred since we are batching, but three responses should be sent
+        final int relpFrames = 3;
+        final List<RELPFrame> frames = getFrames(relpFrames);
+
+        // one relp frame should be transferred since we are batching
         final int expectedFlowFiles = 1;
-        run(frames, expectedFlowFiles, syslogFrames, null);
+        run(frames, expectedFlowFiles, null);
 
         final List<ProvenanceEventRecord> events = runner.getProvenanceEvents();
         Assert.assertNotNull(events);
@@ -150,117 +158,110 @@
         Assert.assertEquals(expectedFlowFiles, mockFlowFiles.size());
 
         final MockFlowFile mockFlowFile = mockFlowFiles.get(0);
-        Assert.assertEquals(SYSLOG_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
+        Assert.assertEquals(RELP_FRAME.getCommand(), mockFlowFile.getAttribute(ListenRELP.RELPAttributes.COMMAND.key()));
         Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.PORT.key())));
         Assert.assertFalse(StringUtils.isBlank(mockFlowFile.getAttribute(ListenRELP.RELPAttributes.SENDER.key())));
     }
 
     @Test
     public void testRunMutualTls() throws IOException, TlsException, InitializationException {
+
+
         final String serviceIdentifier = SSLContextService.class.getName();
-        Mockito.when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
+        when(sslContextService.getIdentifier()).thenReturn(serviceIdentifier);
         final SSLContext sslContext = SslContextUtils.createKeyStoreSslContext();
-        Mockito.when(sslContextService.createContext()).thenReturn(sslContext);
+        when(sslContextService.createContext()).thenReturn(sslContext);
         runner.addControllerService(serviceIdentifier, sslContextService);
         runner.enableControllerService(sslContextService);
 
         runner.setProperty(ListenRELP.SSL_CONTEXT_SERVICE, serviceIdentifier);
+        runner.setProperty(ListenRELP.CLIENT_AUTH, ClientAuth.NONE.name());
 
-        final int syslogFrames = 3;
-        final List<RELPFrame> frames = getFrames(syslogFrames);
-        run(frames, syslogFrames, syslogFrames, sslContext);
-    }
-
-    @Test
-    public void testRunNoEventsAvailable() {
-        MockListenRELP mockListenRELP = new MockListenRELP(new ArrayList<>());
-        runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
-
-        runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 0);
-        runner.shutdown();
+        final int relpFrames = 3;
+        final List<RELPFrame> frames = getFrames(relpFrames);
+        run(frames, relpFrames, sslContext);
     }
 
     @Test
     public void testBatchingWithDifferentSenders() {
-        final String sender1 = "sender1";
-        final String sender2 = "sender2";
+        String sender1 = "/192.168.1.50:55000";
+        String sender2 = "/192.168.1.50:55001";
+        String sender3 = "/192.168.1.50:55002";
 
-        final List<RELPEvent> mockEvents = new ArrayList<>();
-        mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender1, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
-        mockEvents.add(new RELPEvent(sender2, SYSLOG_FRAME.getData(), responder, SYSLOG_FRAME.getTxnr(), SYSLOG_FRAME.getCommand()));
+        final List<RELPMessage> mockEvents = new ArrayList<>();
+        mockEvents.add(new RELPMessage(sender1, RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender1, RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender1, RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender2, RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender3, RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
+        mockEvents.add(new RELPMessage(sender3, RELP_FRAME.getData(), RELP_FRAME.getTxnr(), RELP_FRAME.getCommand()));
 
         MockListenRELP mockListenRELP = new MockListenRELP(mockEvents);
         runner = TestRunners.newTestRunner(mockListenRELP);
-        runner.setProperty(ListenRELP.PORT, Integer.toString(NetworkUtils.availablePort()));
-        runner.setProperty(ListenRELP.MAX_BATCH_SIZE, "10");
+        runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(NetworkUtils.availablePort()));
+        runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
 
         runner.run();
-        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 2);
+        runner.assertAllFlowFilesTransferred(ListenRELP.REL_SUCCESS, 3);
         runner.shutdown();
     }
 
-    private void run(final List<RELPFrame> frames, final int flowFiles, final int responses, final SSLContext sslContext)
+    private void run(final List<RELPFrame> frames, final int flowFiles, final SSLContext sslContext)
             throws IOException {
 
         final int port = NetworkUtils.availablePort();
-        runner.setProperty(ListenRELP.PORT, Integer.toString(port));
-
+        runner.setProperty(AbstractListenEventBatchingProcessor.PORT, Integer.toString(port));
         // Run Processor and start Dispatcher without shutting down
         runner.run(1, false, true);
-
-        try (final Socket socket = getSocket(port, sslContext)) {
-            final OutputStream outputStream = socket.getOutputStream();
-            sendFrames(frames, outputStream);
-
-            // Run Processor for number of responses
-            runner.run(responses, false, false);
-
-            runner.assertTransferCount(ListenRELP.REL_SUCCESS, flowFiles);
-        } finally {
-            runner.shutdown();
-        }
+        final byte[] relpMessages = getRELPMessages(frames);
+        sendMessages(port, relpMessages, sslContext);
+        runner.run(flowFiles, false, false);
+        runner.assertTransferCount(ListenRELP.REL_SUCCESS, flowFiles);
     }
 
-    private void sendFrames(final List<RELPFrame> frames, final OutputStream outputStream) throws IOException {
+    private byte[] getRELPMessages(final List<RELPFrame> frames) throws IOException {
+
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
         for (final RELPFrame frame : frames) {
             final byte[] encodedFrame = encoder.encode(frame);
             outputStream.write(encodedFrame);
             outputStream.flush();
         }
+
+        return outputStream.toByteArray();
     }
 
-    private Socket getSocket(final int port, final SSLContext sslContext) throws IOException {
-        final Socket socket;
-        if (sslContext == null) {
-            socket = new Socket(LOCALHOST, port);
-        } else {
-            socket = sslContext.getSocketFactory().createSocket(LOCALHOST, port);
-        }
-        return socket;
-    }
-
-    private List<RELPFrame> getFrames(final int syslogFrames) {
+    private List<RELPFrame> getFrames(final int relpFrames) {
         final List<RELPFrame> frames = new ArrayList<>();
         frames.add(OPEN_FRAME);
 
-        for (int i = 0; i < syslogFrames; i++) {
-            frames.add(SYSLOG_FRAME);
+        for (int i = 0; i < relpFrames; i++) {
+            frames.add(RELP_FRAME);
         }
 
         frames.add(CLOSE_FRAME);
         return frames;
     }
 
-    // Extend ListenRELP to mock the ChannelDispatcher and allow us to return staged events
+    private void sendMessages(final int port, final byte[] relpMessages, final SSLContext sslContext) {
+        final ByteArrayNettyEventSenderFactory eventSenderFactory = new ByteArrayNettyEventSenderFactory(runner.getLogger(), LOCALHOST, port, TransportProtocol.TCP);
+        if (sslContext != null) {
+            eventSenderFactory.setSslContext(sslContext);
+        }
+
+        eventSenderFactory.setTimeout(SENDER_TIMEOUT);
+        EventSender<byte[]> eventSender = eventSenderFactory.getEventSender();
+        eventSender.sendEvent(relpMessages);
+    }
+
     private class MockListenRELP extends ListenRELP {
+        private final List<RELPMessage> mockEvents;
 
-        private final List<RELPEvent> mockEvents;
+        public MockListenRELP() {
+            this.mockEvents = new ArrayList<>();
+        }
 
-        public MockListenRELP(List<RELPEvent> mockEvents) {
+        public MockListenRELP(List<RELPMessage> mockEvents) {
             this.mockEvents = mockEvents;
         }
 
@@ -270,12 +271,5 @@
             super.onScheduled(context);
             events.addAll(mockEvents);
         }
-
-        @Override
-        protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<RELPEvent> events) {
-            return channelDispatcher;
-        }
-
     }
-
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
index bcdb4c3..847770f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenTCP.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.standard;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.processor.util.listen.ListenerProperties;
 import org.apache.nifi.remote.io.socket.NetworkUtils;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.security.util.ClientAuth;
@@ -94,7 +95,7 @@
 
     @Test
     public void testRunBatching() throws IOException {
-        runner.setProperty(ListenTCP.MAX_BATCH_SIZE, "3");
+        runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "3");
 
         final List<String> messages = new ArrayList<>();
         messages.add("This is message 1\n");
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
index 3fcfb4c..0513db6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestListenUDP.java
@@ -19,6 +19,7 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.listen.ListenerProperties;
 import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
 import org.apache.nifi.processor.util.listen.event.StandardEvent;
 import org.apache.nifi.processor.util.listen.response.ChannelResponder;
@@ -111,8 +112,8 @@
     @Test
     public void testBatchingSingleSender() throws IOException, InterruptedException {
         final String delimiter = "NN";
-        runner.setProperty(ListenUDP.MESSAGE_DELIMITER, delimiter);
-        runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "3");
+        runner.setProperty(ListenerProperties.MESSAGE_DELIMITER, delimiter);
+        runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "3");
 
         final List<String> messages = getMessages(5);
         final int expectedTransferred = 2;
@@ -146,7 +147,7 @@
         MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
         runner = TestRunners.newTestRunner(mockListenUDP);
         runner.setProperty(ListenUDP.PORT, "1");
-        runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "10");
+        runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
 
         // sending 4 messages with a batch size of 10, but should get 2 FlowFiles because of different senders
         runner.run();
@@ -162,7 +163,7 @@
         MockListenUDP mockListenUDP = new MockListenUDP(mockEvents);
         runner = TestRunners.newTestRunner(mockListenUDP);
         runner.setProperty(ListenUDP.PORT, "1");
-        runner.setProperty(ListenUDP.MAX_BATCH_SIZE, "10");
+        runner.setProperty(ListenerProperties.MAX_BATCH_SIZE, "10");
 
         runner.run(5);
         runner.assertAllFlowFilesTransferred(ListenUDP.REL_SUCCESS, 0);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/event/TestRELPEventFactory.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/event/TestRELPEventFactory.java
deleted file mode 100644
index 5d86c26..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/event/TestRELPEventFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.relp.event;
-
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-import org.apache.nifi.processor.util.listen.response.socket.SocketChannelResponder;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-
-public class TestRELPEventFactory {
-
-    @Test
-    public void testCreateRELPEvent() {
-        final byte[] data = "this is an event".getBytes(StandardCharsets.UTF_8);
-
-        final String sender = "sender1";
-        final long txnr = 1;
-        final String command = "syslog";
-
-        final Map<String,String> metadata = new HashMap<>();
-        metadata.put(EventFactory.SENDER_KEY, sender);
-        metadata.put(RELPMetadata.TXNR_KEY, String.valueOf(txnr));
-        metadata.put(RELPMetadata.COMMAND_KEY, command);
-
-        final ChannelResponder responder = new SocketChannelResponder(null);
-
-        final EventFactory<RELPEvent> factory = new RELPEventFactory();
-
-        final RELPEvent event = factory.create(data, metadata, responder);
-        Assert.assertEquals(data, event.getData());
-        Assert.assertEquals(sender, event.getSender());
-        Assert.assertEquals(txnr, event.getTxnr());
-        Assert.assertEquals(command, event.getCommand());
-        Assert.assertEquals(responder, event.getResponder());
-    }
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoderTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoderTest.java
new file mode 100644
index 0000000..8ee2b93
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/RELPFrameDecoderTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.standard.relp.event.RELPMessage;
+import org.apache.nifi.util.MockComponentLog;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+class RELPFrameDecoderTest {
+
+    final ComponentLog logger = new MockComponentLog(this.getClass().getSimpleName(), this);
+
+    public static final String OPEN_FRAME_DATA = "relp_version=0\nrelp_software=librelp,1.2.7,http://librelp.adiscon.com\ncommands=syslog";
+    public static final String SYSLOG_FRAME_DATA = "this is a syslog message here";
+
+    static final RELPFrame OPEN_FRAME = new RELPFrame.Builder()
+            .txnr(1)
+            .command("open")
+            .dataLength(OPEN_FRAME_DATA.length())
+            .data(OPEN_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
+            .build();
+
+    static final RELPFrame SYSLOG_FRAME = new RELPFrame.Builder()
+            .txnr(2)
+            .command("syslog")
+            .dataLength(SYSLOG_FRAME_DATA.length())
+            .data(SYSLOG_FRAME_DATA.getBytes(StandardCharsets.UTF_8))
+            .build();
+
+    static final RELPFrame CLOSE_FRAME = new RELPFrame.Builder()
+            .txnr(3)
+            .command("close")
+            .dataLength(0)
+            .data(new byte[0])
+            .build();
+
+    @Test
+    void testDecodeRELPEvents() throws IOException {
+        final List<RELPFrame> frames = getFrames(5);
+        ByteBufOutputStream eventBytes = new ByteBufOutputStream(Unpooled.buffer());
+        sendFrames(frames, eventBytes);
+        EmbeddedChannel channel = new EmbeddedChannel(new RELPFrameDecoder(logger, StandardCharsets.UTF_8));
+
+        assert(channel.writeInbound(eventBytes.buffer()));
+        assertEquals(5, channel.inboundMessages().size());
+
+        RELPMessage event = channel.readInbound();
+        assertEquals(RELPMessage.class, event.getClass());
+        assertEquals(SYSLOG_FRAME_DATA, new String(event.getMessage(), StandardCharsets.UTF_8));
+        assertEquals(2, channel.outboundMessages().size());
+    }
+
+    private void sendFrames(final List<RELPFrame> frames, final OutputStream outputStream) throws IOException {
+        RELPEncoder encoder = new RELPEncoder(StandardCharsets.UTF_8);
+        for (final RELPFrame frame : frames) {
+            final byte[] encodedFrame = encoder.encode(frame);
+            outputStream.write(encodedFrame);
+            outputStream.flush();
+        }
+    }
+
+    private List<RELPFrame> getFrames(final int syslogFrames) {
+        final List<RELPFrame> frames = new ArrayList<>();
+        frames.add(OPEN_FRAME);
+
+        for (int i = 0; i < syslogFrames; i++) {
+            frames.add(SYSLOG_FRAME);
+        }
+
+        frames.add(CLOSE_FRAME);
+        return frames;
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/RELPResponseEncoderTest.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/RELPResponseEncoderTest.java
new file mode 100644
index 0000000..eaf4a9d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/frame/RELPResponseEncoderTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.relp.frame;
+
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.nifi.processors.standard.relp.response.RELPChannelResponse;
+import org.apache.nifi.processors.standard.relp.response.RELPResponse;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+
+import static org.junit.Assert.assertEquals;
+
+class RELPResponseEncoderTest {
+
+    @Test
+    void testEncodeRELPResponse() throws IOException {
+        final byte[] relpResponse = new RELPChannelResponse(new RELPEncoder(Charset.defaultCharset()), RELPResponse.ok(321L)).toByteArray();
+
+        ByteBufOutputStream eventBytes = new ByteBufOutputStream(Unpooled.buffer(relpResponse.length));
+        eventBytes.write(relpResponse);
+        EmbeddedChannel channel = new EmbeddedChannel(new RELPResponseEncoder(Charset.defaultCharset()));
+
+        assert(channel.writeOutbound(eventBytes));
+        ByteBufOutputStream result = channel.readOutbound();
+        assertEquals("321 rsp 6 200 OK\n", new String(result.buffer().array()));
+    }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java
deleted file mode 100644
index 46cd3ef..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPFrameHandler.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.relp.handler;
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-import org.apache.nifi.processor.util.listen.response.ChannelResponse;
-import org.apache.nifi.processors.standard.relp.event.RELPEvent;
-import org.apache.nifi.processors.standard.relp.event.RELPEventFactory;
-import org.apache.nifi.processors.standard.relp.frame.RELPFrame;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class TestRELPFrameHandler {
-
-    private Charset charset;
-    private EventFactory<RELPEvent> eventFactory;
-    private BlockingQueue<RELPEvent> events;
-    private SelectionKey key;
-    private AsyncChannelDispatcher dispatcher;
-    private ComponentLog logger;
-
-    private RELPFrameHandler<RELPEvent> frameHandler;
-
-    @Before
-    public void setup() {
-        this.charset = StandardCharsets.UTF_8;
-        this.eventFactory = new RELPEventFactory();
-        this.events = new LinkedBlockingQueue<>();
-        this.key = Mockito.mock(SelectionKey.class);
-        this.dispatcher = Mockito.mock(AsyncChannelDispatcher.class);
-        this.logger = Mockito.mock(ComponentLog.class);
-
-        this.frameHandler = new RELPFrameHandler<>(key, charset, eventFactory, events, dispatcher, logger);
-    }
-
-    @Test
-    public void testOpen() throws IOException, InterruptedException {
-        final String offer1 = "relp_version=0";
-        final String offer2 = "relp_software=librelp,1.2.7,http://librelp.adiscon.com";
-        final String offer3 = "commands=syslog";
-
-        final String data = offer1 + "\n" + offer2 + "\n" + offer3;
-
-        final RELPFrame openFrame = new RELPFrame.Builder()
-                .txnr(1).command("open")
-                .dataLength(data.length())
-                .data(data.getBytes(charset))
-                .build();
-
-        final String sender = "sender1";
-        final CapturingChannelResponder responder = new CapturingChannelResponder();
-
-        // call the handler and verify respond() was called once with once response
-        frameHandler.handle(openFrame, responder, sender);
-        Assert.assertEquals(1, responder.responded);
-        Assert.assertEquals(1, responder.responses.size());
-
-        // verify the response sent back the offers that were received
-        final ChannelResponse response = responder.responses.get(0);
-        final String responseData = new String(response.toByteArray(), charset);
-        Assert.assertTrue(responseData.contains(offer1));
-        Assert.assertTrue(responseData.contains(offer2));
-        Assert.assertTrue(responseData.contains(offer3));
-    }
-
-    @Test
-    public void testClose() throws IOException, InterruptedException {
-        final RELPFrame openFrame = new RELPFrame.Builder()
-                .txnr(1).command("close")
-                .dataLength(0)
-                .data(new byte[0])
-                .build();
-
-        final String sender = "sender1";
-        final CapturingChannelResponder responder = new CapturingChannelResponder();
-
-        // call the handler and verify respond() was called once with once response
-        frameHandler.handle(openFrame, responder, sender);
-        Assert.assertEquals(1, responder.responded);
-        Assert.assertEquals(1, responder.responses.size());
-
-        // verify the response sent back the offers that were received
-        final ChannelResponse response = responder.responses.get(0);
-        final String responseData = new String(response.toByteArray(), charset);
-        Assert.assertTrue(responseData.contains("200 OK"));
-    }
-
-    @Test
-    public void testCommand() throws IOException, InterruptedException {
-        final String data = "this is a syslog message";
-
-        final RELPFrame openFrame = new RELPFrame.Builder()
-                .txnr(1).command("syslog")
-                .dataLength(data.length())
-                .data(data.getBytes(charset))
-                .build();
-
-        final String sender = "sender1";
-        final CapturingChannelResponder responder = new CapturingChannelResponder();
-
-        // call the handler and verify respond() was called once with once response
-        frameHandler.handle(openFrame, responder, sender);
-        Assert.assertEquals(0, responder.responded);
-        Assert.assertEquals(0, responder.responses.size());
-        Assert.assertEquals(1, events.size());
-
-        final RELPEvent event = events.poll();
-        Assert.assertEquals(data, new String(event.getData(), charset));
-    }
-
-    private static class CapturingChannelResponder implements ChannelResponder<SocketChannel> {
-
-        int responded;
-        List<ChannelResponse> responses = new ArrayList<>();
-
-        @Override
-        public SocketChannel getChannel() {
-            return Mockito.mock(SocketChannel.class);
-        }
-
-        @Override
-        public List<ChannelResponse> getResponses() {
-            return responses;
-        }
-
-        @Override
-        public void addResponse(ChannelResponse response) {
-            responses.add(response);
-        }
-
-        @Override
-        public void respond() throws IOException {
-            responded++;
-        }
-    }
-
-}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
deleted file mode 100644
index d7b7d5c..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.standard.relp.handler;
-
-
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
-import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
-import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
-import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
-import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
-import org.apache.nifi.processor.util.listen.event.Event;
-import org.apache.nifi.processor.util.listen.event.EventFactory;
-import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
-import org.apache.nifi.processor.util.listen.response.ChannelResponder;
-import org.apache.nifi.processors.standard.relp.event.RELPMetadata;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-
-import javax.net.ssl.SSLContext;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-public class TestRELPSocketChannelHandler {
-
-    private EventFactory<TestEvent> eventFactory;
-    private ChannelHandlerFactory<TestEvent,AsyncChannelDispatcher> channelHandlerFactory;
-    private ByteBufferSource byteBufferSource;
-    private BlockingQueue<TestEvent> events;
-    private ComponentLog logger = Mockito.mock(ComponentLog.class);
-    private int maxConnections;
-    private SSLContext sslContext;
-    private Charset charset;
-    private ChannelDispatcher dispatcher;
-
-    @Before
-    public void setup() {
-        eventFactory = new TestEventHolderFactory();
-        channelHandlerFactory = new RELPSocketChannelHandlerFactory<>();
-
-        byteBufferSource = new ByteBufferPool(1, 4096);
-
-        events = new LinkedBlockingQueue<>();
-        logger = Mockito.mock(ComponentLog.class);
-
-        maxConnections = 1;
-        sslContext = null;
-        charset = StandardCharsets.UTF_8;
-
-        dispatcher = new SocketChannelDispatcher<>(eventFactory, channelHandlerFactory, byteBufferSource, events, logger,
-                maxConnections, sslContext, charset);
-
-    }
-
-    @Test
-    public void testBasicHandling() throws IOException, InterruptedException {
-        final List<String> messages = new ArrayList<>();
-        messages.add("1 syslog 20 this is message 1234\n");
-        messages.add("2 syslog 22 this is message 456789\n");
-        messages.add("3 syslog 21 this is message ABCDE\n");
-
-        run(messages);
-        Assert.assertEquals(messages.size(), events.size());
-
-        boolean found1 = false;
-        boolean found2 = false;
-        boolean found3 = false;
-
-        TestEvent event;
-        while((event = events.poll()) != null) {
-            Map<String,String> metadata = event.metadata;
-            Assert.assertTrue(metadata.containsKey(RELPMetadata.TXNR_KEY));
-
-            final String txnr = metadata.get(RELPMetadata.TXNR_KEY);
-            if (txnr.equals("1")) {
-                found1 = true;
-            } else if (txnr.equals("2")) {
-                found2 = true;
-            } else if (txnr.equals("3")) {
-                found3 = true;
-            }
-        }
-
-        Assert.assertTrue(found1);
-        Assert.assertTrue(found2);
-        Assert.assertTrue(found3);
-    }
-
-    @Test
-    public void testLotsOfFrames() throws IOException, InterruptedException {
-        final String baseMessage = " syslog 19 this is message ";
-        final List<String> messages = new ArrayList<>();
-
-        for (int i=100; i < 1000; i++) {
-            messages.add(i + baseMessage + i + "\n");
-        }
-
-        run(messages);
-        Assert.assertEquals(messages.size(), events.size());
-    }
-
-    protected void run(List<String> messages) throws IOException, InterruptedException {
-        final ByteBuffer buffer = ByteBuffer.allocate(1024);
-        try {
-            // starts the dispatcher listening on port 0 so it selects a random port
-            dispatcher.open(null, 0, 4096);
-
-            // starts a thread to run the dispatcher which will accept/read connections
-            Thread dispatcherThread = new Thread(dispatcher);
-            dispatcherThread.start();
-
-
-            // create a client connection to the port the dispatcher is listening on
-            final int realPort = dispatcher.getPort();
-            try (SocketChannel channel = SocketChannel.open()) {
-                channel.connect(new InetSocketAddress("localhost", realPort));
-                Thread.sleep(100);
-
-                // send the provided messages
-                for (int i=0; i < messages.size(); i++) {
-                    buffer.clear();
-                    buffer.put(messages.get(i).getBytes(charset));
-                    buffer.flip();
-
-                    while (buffer.hasRemaining()) {
-                        channel.write(buffer);
-                    }
-                    Thread.sleep(1);
-                }
-            }
-
-            // wait up to 25 seconds to verify the responses
-            long timeout = 25000;
-            long startTime = System.currentTimeMillis();
-            while (events.size() < messages.size() && (System.currentTimeMillis() - startTime < timeout)) {
-                Thread.sleep(100);
-            }
-
-            // should have gotten an event for each message sent
-            Assert.assertEquals(messages.size(), events.size());
-
-        } finally {
-            // stop the dispatcher thread and ensure we shut down handler threads
-            dispatcher.close();
-        }
-    }
-
-    // Test event to produce from the data
-    private static class TestEvent implements Event<SocketChannel> {
-
-        private byte[] data;
-        private Map<String,String> metadata;
-
-        public TestEvent(byte[] data, Map<String, String> metadata) {
-            this.data = data;
-            this.metadata = metadata;
-        }
-
-        @Override
-        public String getSender() {
-            return metadata.get(EventFactory.SENDER_KEY);
-        }
-
-        @Override
-        public byte[] getData() {
-            return data;
-        }
-
-        @Override
-        public ChannelResponder<SocketChannel> getResponder() {
-            return null;
-        }
-    }
-
-    // Factory to create test events and send responses for testing
-    private static class TestEventHolderFactory implements EventFactory<TestEvent> {
-
-        @Override
-        public TestEvent create(final byte[] data, final Map<String, String> metadata, final ChannelResponder responder) {
-            return new TestEvent(data, metadata);
-        }
-    }
-
-}