[Issue 5827][Issue 5828][netty] Fixes for UDP protocol support in netty connector (#5829)


### Motivation
UDP protocol is not working for netty connector

### Modifications
Added a specific handler for UDP and use Channel instead of SocketChannel in NettyChannelInitializer

Successfully tested

* Fixes for UDP protocol support in netty connector

* Added specific handlers for UDP and TCP netty connector
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
index 775b6f4..60c9c2d 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
@@ -21,6 +21,17 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.netty.NettySource;
+import org.apache.pulsar.io.netty.http.NettyHttpChannelInitializer;
+import org.apache.pulsar.io.netty.http.NettyHttpServerHandler;
+import org.apache.pulsar.io.netty.tcp.NettyTCPChannelInitializer;
+import org.apache.pulsar.io.netty.tcp.NettyTCPServerHandler;
+import org.apache.pulsar.io.netty.udp.NettyUDPChannelInitializer;
+import org.apache.pulsar.io.netty.udp.NettyUDPServerHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import io.netty.bootstrap.Bootstrap;
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
@@ -30,12 +41,6 @@
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.nio.NioDatagramChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.io.netty.NettySource;
-import org.apache.pulsar.io.netty.http.NettyHttpChannelInitializer;
-import org.apache.pulsar.io.netty.http.NettyHttpServerHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Netty Server to accept incoming data via the configured type.
@@ -96,7 +101,7 @@
         Bootstrap bootstrap = new Bootstrap();
         bootstrap.group(workerGroup);
         bootstrap.channel(NioDatagramChannel.class);
-        bootstrap.handler(new NettyChannelInitializer(new NettyServerHandler(this.nettySource)))
+        bootstrap.handler(new NettyUDPChannelInitializer(new NettyUDPServerHandler(this.nettySource)))
                 .option(ChannelOption.SO_BACKLOG, 1024);
 
         ChannelFuture channelFuture = bootstrap.bind(this.host, this.port).sync();
@@ -105,7 +110,7 @@
 
     private void runTcp() throws InterruptedException {
         ServerBootstrap serverBootstrap = getServerBootstrap(
-                new NettyChannelInitializer(new NettyServerHandler(this.nettySource)));
+                new NettyTCPChannelInitializer(new NettyTCPServerHandler(this.nettySource)));
 
         ChannelFuture channelFuture = serverBootstrap.bind(this.host, this.port).sync();
         channelFuture.channel().closeFuture().sync();
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializer.java
similarity index 71%
rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializer.java
index b9a7b4c..1f4972b 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializer.java
@@ -16,28 +16,28 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
+package org.apache.pulsar.io.netty.tcp;
 
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.Channel;
 import io.netty.handler.codec.bytes.ByteArrayDecoder;
 
 /**
  * Netty Channel Initializer to register decoder and handler.
  */
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
+public class NettyTCPChannelInitializer extends ChannelInitializer<Channel> {
 
     private ChannelInboundHandlerAdapter handler;
 
-    public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
+    public NettyTCPChannelInitializer(ChannelInboundHandlerAdapter handler) {
         this.handler = handler;
     }
 
     @Override
-    protected void initChannel(SocketChannel socketChannel) throws Exception {
-        socketChannel.pipeline().addLast(new ByteArrayDecoder());
-        socketChannel.pipeline().addLast(this.handler);
+    protected void initChannel(Channel channel) throws Exception {
+        channel.pipeline().addLast(new ByteArrayDecoder());
+        channel.pipeline().addLast(this.handler);
     }
 
 }
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPServerHandler.java
similarity index 82%
rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPServerHandler.java
index 42f497e..a619bc3 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPServerHandler.java
@@ -16,38 +16,38 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
+package org.apache.pulsar.io.netty.tcp;
 
 import java.io.Serializable;
 import java.util.Optional;
 
-import lombok.Data;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.netty.NettySource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import lombok.Data;
+
 /**
  * Handles a server-side channel.
  */
 @ChannelHandler.Sharable
-public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {
+public class NettyTCPServerHandler extends SimpleChannelInboundHandler<byte[]> {
 
-    private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
+    private static final Logger logger = LoggerFactory.getLogger(NettyTCPServerHandler.class);
 
     private NettySource nettySource;
 
-    public NettyServerHandler(NettySource nettySource) {
+    public NettyTCPServerHandler(NettySource nettySource) {
         this.nettySource = nettySource;
     }
 
     @Override
     protected void channelRead0(ChannelHandlerContext channelHandlerContext, byte[] bytes) throws Exception {
-        nettySource.consume(new NettyRecord(Optional.ofNullable(""), bytes));
+        nettySource.consume(new NettyTCPRecord(Optional.ofNullable(""), bytes));
     }
 
     @Override
@@ -57,7 +57,7 @@
     }
 
     @Data
-    static private class NettyRecord implements Record<byte[]>, Serializable {
+    static private class NettyTCPRecord implements Record<byte[]>, Serializable {
         private final Optional<String> key;
         private final byte[] value;
     }
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/package-info.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/package-info.java
new file mode 100644
index 0000000..b59d614
--- /dev/null
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.tcp;
\ No newline at end of file
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializer.java
similarity index 68%
copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializer.java
index b9a7b4c..bc86559 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializer.java
@@ -16,28 +16,26 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
+package org.apache.pulsar.io.netty.udp;
 
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.bytes.ByteArrayDecoder;
 
 /**
  * Netty Channel Initializer to register decoder and handler.
  */
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
+public class NettyUDPChannelInitializer extends ChannelInitializer<Channel> {
 
     private ChannelInboundHandlerAdapter handler;
 
-    public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
+    public NettyUDPChannelInitializer(ChannelInboundHandlerAdapter handler) {
         this.handler = handler;
     }
 
     @Override
-    protected void initChannel(SocketChannel socketChannel) throws Exception {
-        socketChannel.pipeline().addLast(new ByteArrayDecoder());
-        socketChannel.pipeline().addLast(this.handler);
+    protected void initChannel(Channel channel) throws Exception {
+        channel.pipeline().addLast(this.handler);
     }
 
 }
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPServerHandler.java
similarity index 74%
copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPServerHandler.java
index 42f497e..8341628 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPServerHandler.java
@@ -16,38 +16,40 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
+package org.apache.pulsar.io.netty.udp;
 
 import java.io.Serializable;
 import java.util.Optional;
 
-import lombok.Data;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.netty.NettySource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.DatagramPacket;
+import lombok.Data;
+
 /**
  * Handles a server-side channel.
  */
 @ChannelHandler.Sharable
-public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {
+public class NettyUDPServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
 
-    private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
-
+    private static final Logger logger = LoggerFactory.getLogger(NettyUDPServerHandler.class);
     private NettySource nettySource;
 
-    public NettyServerHandler(NettySource nettySource) {
+    public NettyUDPServerHandler(NettySource nettySource) {
         this.nettySource = nettySource;
     }
-
+    
     @Override
-    protected void channelRead0(ChannelHandlerContext channelHandlerContext, byte[] bytes) throws Exception {
-        nettySource.consume(new NettyRecord(Optional.ofNullable(""), bytes));
+    protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket packet) throws Exception {
+        byte[] bytes = ByteBufUtil.getBytes(packet.content());
+        nettySource.consume(new NettyUDPRecord(Optional.ofNullable(""), bytes));
     }
 
     @Override
@@ -57,7 +59,7 @@
     }
 
     @Data
-    static private class NettyRecord implements Record<byte[]>, Serializable {
+    static private class NettyUDPRecord implements Record<byte[]>, Serializable {
         private final Optional<String> key;
         private final byte[] value;
     }
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/package-info.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/package-info.java
new file mode 100644
index 0000000..d936c6a
--- /dev/null
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.udp;
\ No newline at end of file
diff --git a/pulsar-io/netty/src/main/java/package-info.java b/pulsar-io/netty/src/main/java/package-info.java
new file mode 100644
index 0000000..51da6c0
--- /dev/null
+++ b/pulsar-io/netty/src/main/java/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializerTest.java
similarity index 73%
copy from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
copy to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializerTest.java
index 7fb3b45..366c7b1 100644
--- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializerTest.java
@@ -16,30 +16,35 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.pulsar.io.netty.NettySource;
-import org.testng.annotations.Test;
+package org.apache.pulsar.io.netty.tcp;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 
+import org.apache.pulsar.io.netty.NettySource;
+import org.apache.pulsar.io.netty.tcp.NettyTCPChannelInitializer;
+import org.apache.pulsar.io.netty.tcp.NettyTCPServerHandler;
+import org.apache.pulsar.io.netty.udp.NettyUDPServerHandler;
+import org.testng.annotations.Test;
+
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
 /**
  * Tests for Netty Channel Initializer
  */
-public class NettyChannelInitializerTest {
+public class NettyTCPChannelInitializerTest {
 
     @Test
     public void testChannelInitializer() throws Exception {
         NioSocketChannel channel = new NioSocketChannel();
 
-        NettyChannelInitializer nettyChannelInitializer = new NettyChannelInitializer(
-                new NettyServerHandler(new NettySource()));
+        NettyTCPChannelInitializer nettyChannelInitializer = new NettyTCPChannelInitializer(
+                new NettyTCPServerHandler(new NettySource()));
         nettyChannelInitializer.initChannel(channel);
 
         assertNotNull(channel.pipeline().toMap());
         assertEquals(2, channel.pipeline().toMap().size());
     }
-
+    
 }
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializerTest.java
similarity index 74%
rename from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
rename to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializerTest.java
index 7fb3b45..33d4940 100644
--- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializerTest.java
@@ -16,30 +16,31 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.pulsar.io.netty.NettySource;
-import org.testng.annotations.Test;
+package org.apache.pulsar.io.netty.udp;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 
+import org.apache.pulsar.io.netty.NettySource;
+import org.testng.annotations.Test;
+
+import io.netty.channel.socket.nio.NioDatagramChannel;
+
 /**
  * Tests for Netty Channel Initializer
  */
-public class NettyChannelInitializerTest {
+public class NettyUDPChannelInitializerTest {
 
     @Test
     public void testChannelInitializer() throws Exception {
-        NioSocketChannel channel = new NioSocketChannel();
+        NioDatagramChannel channel = new NioDatagramChannel();
 
-        NettyChannelInitializer nettyChannelInitializer = new NettyChannelInitializer(
-                new NettyServerHandler(new NettySource()));
+        NettyUDPChannelInitializer nettyChannelInitializer = new NettyUDPChannelInitializer(
+                new NettyUDPServerHandler(new NettySource()));
         nettyChannelInitializer.initChannel(channel);
 
         assertNotNull(channel.pipeline().toMap());
-        assertEquals(2, channel.pipeline().toMap().size());
+        assertEquals(1, channel.pipeline().toMap().size());
     }
-
+    
 }