[Issue 5827][Issue 5828][netty] Fixes for UDP protocol support in netty connector (#5829)
### Motivation
UDP protocol is not working for netty connector
### Modifications
Added a specific handler for UDP and use Channel instead of SocketChannel in NettyChannelInitializer
Successfully tested
* Fixes for UDP protocol support in netty connector
* Added specific handlers for UDP and TCP netty connector
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
index 775b6f4..60c9c2d 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServer.java
@@ -21,6 +21,17 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.io.netty.NettySource;
+import org.apache.pulsar.io.netty.http.NettyHttpChannelInitializer;
+import org.apache.pulsar.io.netty.http.NettyHttpServerHandler;
+import org.apache.pulsar.io.netty.tcp.NettyTCPChannelInitializer;
+import org.apache.pulsar.io.netty.tcp.NettyTCPServerHandler;
+import org.apache.pulsar.io.netty.udp.NettyUDPChannelInitializer;
+import org.apache.pulsar.io.netty.udp.NettyUDPServerHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
@@ -30,12 +41,6 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.pulsar.io.netty.NettySource;
-import org.apache.pulsar.io.netty.http.NettyHttpChannelInitializer;
-import org.apache.pulsar.io.netty.http.NettyHttpServerHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Netty Server to accept incoming data via the configured type.
@@ -96,7 +101,7 @@
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup);
bootstrap.channel(NioDatagramChannel.class);
- bootstrap.handler(new NettyChannelInitializer(new NettyServerHandler(this.nettySource)))
+ bootstrap.handler(new NettyUDPChannelInitializer(new NettyUDPServerHandler(this.nettySource)))
.option(ChannelOption.SO_BACKLOG, 1024);
ChannelFuture channelFuture = bootstrap.bind(this.host, this.port).sync();
@@ -105,7 +110,7 @@
private void runTcp() throws InterruptedException {
ServerBootstrap serverBootstrap = getServerBootstrap(
- new NettyChannelInitializer(new NettyServerHandler(this.nettySource)));
+ new NettyTCPChannelInitializer(new NettyTCPServerHandler(this.nettySource)));
ChannelFuture channelFuture = serverBootstrap.bind(this.host, this.port).sync();
channelFuture.channel().closeFuture().sync();
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializer.java
similarity index 71%
rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializer.java
index b9a7b4c..1f4972b 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializer.java
@@ -16,28 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.server;
+package org.apache.pulsar.io.netty.tcp;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.Channel;
import io.netty.handler.codec.bytes.ByteArrayDecoder;
/**
* Netty Channel Initializer to register decoder and handler.
*/
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
+public class NettyTCPChannelInitializer extends ChannelInitializer<Channel> {
private ChannelInboundHandlerAdapter handler;
- public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
+ public NettyTCPChannelInitializer(ChannelInboundHandlerAdapter handler) {
this.handler = handler;
}
@Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- socketChannel.pipeline().addLast(new ByteArrayDecoder());
- socketChannel.pipeline().addLast(this.handler);
+ protected void initChannel(Channel channel) throws Exception {
+ channel.pipeline().addLast(new ByteArrayDecoder());
+ channel.pipeline().addLast(this.handler);
}
}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPServerHandler.java
similarity index 82%
rename from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
rename to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPServerHandler.java
index 42f497e..a619bc3 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/NettyTCPServerHandler.java
@@ -16,38 +16,38 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
+package org.apache.pulsar.io.netty.tcp;
import java.io.Serializable;
import java.util.Optional;
-import lombok.Data;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.netty.NettySource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import lombok.Data;
+
/**
* Handles a server-side channel.
*/
@ChannelHandler.Sharable
-public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {
+public class NettyTCPServerHandler extends SimpleChannelInboundHandler<byte[]> {
- private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(NettyTCPServerHandler.class);
private NettySource nettySource;
- public NettyServerHandler(NettySource nettySource) {
+ public NettyTCPServerHandler(NettySource nettySource) {
this.nettySource = nettySource;
}
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, byte[] bytes) throws Exception {
- nettySource.consume(new NettyRecord(Optional.ofNullable(""), bytes));
+ nettySource.consume(new NettyTCPRecord(Optional.ofNullable(""), bytes));
}
@Override
@@ -57,7 +57,7 @@
}
@Data
- static private class NettyRecord implements Record<byte[]>, Serializable {
+ static private class NettyTCPRecord implements Record<byte[]>, Serializable {
private final Optional<String> key;
private final byte[] value;
}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/package-info.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/package-info.java
new file mode 100644
index 0000000..b59d614
--- /dev/null
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/tcp/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.tcp;
\ No newline at end of file
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializer.java
similarity index 68%
copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializer.java
index b9a7b4c..bc86559 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyChannelInitializer.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializer.java
@@ -16,28 +16,26 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.server;
+package org.apache.pulsar.io.netty.udp;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.bytes.ByteArrayDecoder;
/**
* Netty Channel Initializer to register decoder and handler.
*/
-public class NettyChannelInitializer extends ChannelInitializer<SocketChannel> {
+public class NettyUDPChannelInitializer extends ChannelInitializer<Channel> {
private ChannelInboundHandlerAdapter handler;
- public NettyChannelInitializer(ChannelInboundHandlerAdapter handler) {
+ public NettyUDPChannelInitializer(ChannelInboundHandlerAdapter handler) {
this.handler = handler;
}
@Override
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- socketChannel.pipeline().addLast(new ByteArrayDecoder());
- socketChannel.pipeline().addLast(this.handler);
+ protected void initChannel(Channel channel) throws Exception {
+ channel.pipeline().addLast(this.handler);
}
}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPServerHandler.java
similarity index 74%
copy from pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
copy to pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPServerHandler.java
index 42f497e..8341628 100644
--- a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/server/NettyServerHandler.java
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/NettyUDPServerHandler.java
@@ -16,38 +16,40 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.ChannelHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.SimpleChannelInboundHandler;
+package org.apache.pulsar.io.netty.udp;
import java.io.Serializable;
import java.util.Optional;
-import lombok.Data;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.netty.NettySource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.socket.DatagramPacket;
+import lombok.Data;
+
/**
* Handles a server-side channel.
*/
@ChannelHandler.Sharable
-public class NettyServerHandler extends SimpleChannelInboundHandler<byte[]> {
+public class NettyUDPServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
- private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
-
+ private static final Logger logger = LoggerFactory.getLogger(NettyUDPServerHandler.class);
private NettySource nettySource;
- public NettyServerHandler(NettySource nettySource) {
+ public NettyUDPServerHandler(NettySource nettySource) {
this.nettySource = nettySource;
}
-
+
@Override
- protected void channelRead0(ChannelHandlerContext channelHandlerContext, byte[] bytes) throws Exception {
- nettySource.consume(new NettyRecord(Optional.ofNullable(""), bytes));
+ protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket packet) throws Exception {
+ byte[] bytes = ByteBufUtil.getBytes(packet.content());
+ nettySource.consume(new NettyUDPRecord(Optional.ofNullable(""), bytes));
}
@Override
@@ -57,7 +59,7 @@
}
@Data
- static private class NettyRecord implements Record<byte[]>, Serializable {
+ static private class NettyUDPRecord implements Record<byte[]>, Serializable {
private final Optional<String> key;
private final byte[] value;
}
diff --git a/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/package-info.java b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/package-info.java
new file mode 100644
index 0000000..d936c6a
--- /dev/null
+++ b/pulsar-io/netty/src/main/java/org/apache/pulsar/io/netty/udp/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.netty.udp;
\ No newline at end of file
diff --git a/pulsar-io/netty/src/main/java/package-info.java b/pulsar-io/netty/src/main/java/package-info.java
new file mode 100644
index 0000000..51da6c0
--- /dev/null
+++ b/pulsar-io/netty/src/main/java/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializerTest.java
similarity index 73%
copy from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
copy to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializerTest.java
index 7fb3b45..366c7b1 100644
--- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/tcp/NettyTCPChannelInitializerTest.java
@@ -16,30 +16,35 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.pulsar.io.netty.NettySource;
-import org.testng.annotations.Test;
+package org.apache.pulsar.io.netty.tcp;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import org.apache.pulsar.io.netty.NettySource;
+import org.apache.pulsar.io.netty.tcp.NettyTCPChannelInitializer;
+import org.apache.pulsar.io.netty.tcp.NettyTCPServerHandler;
+import org.apache.pulsar.io.netty.udp.NettyUDPServerHandler;
+import org.testng.annotations.Test;
+
+import io.netty.channel.socket.nio.NioDatagramChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
/**
* Tests for Netty Channel Initializer
*/
-public class NettyChannelInitializerTest {
+public class NettyTCPChannelInitializerTest {
@Test
public void testChannelInitializer() throws Exception {
NioSocketChannel channel = new NioSocketChannel();
- NettyChannelInitializer nettyChannelInitializer = new NettyChannelInitializer(
- new NettyServerHandler(new NettySource()));
+ NettyTCPChannelInitializer nettyChannelInitializer = new NettyTCPChannelInitializer(
+ new NettyTCPServerHandler(new NettySource()));
nettyChannelInitializer.initChannel(channel);
assertNotNull(channel.pipeline().toMap());
assertEquals(2, channel.pipeline().toMap().size());
}
-
+
}
diff --git a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializerTest.java
similarity index 74%
rename from pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
rename to pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializerTest.java
index 7fb3b45..33d4940 100644
--- a/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/server/NettyChannelInitializerTest.java
+++ b/pulsar-io/netty/src/test/java/org/apache/pulsar/io/netty/udp/NettyUDPChannelInitializerTest.java
@@ -16,30 +16,31 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.io.netty.server;
-
-import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.pulsar.io.netty.NettySource;
-import org.testng.annotations.Test;
+package org.apache.pulsar.io.netty.udp;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
+import org.apache.pulsar.io.netty.NettySource;
+import org.testng.annotations.Test;
+
+import io.netty.channel.socket.nio.NioDatagramChannel;
+
/**
* Tests for Netty Channel Initializer
*/
-public class NettyChannelInitializerTest {
+public class NettyUDPChannelInitializerTest {
@Test
public void testChannelInitializer() throws Exception {
- NioSocketChannel channel = new NioSocketChannel();
+ NioDatagramChannel channel = new NioDatagramChannel();
- NettyChannelInitializer nettyChannelInitializer = new NettyChannelInitializer(
- new NettyServerHandler(new NettySource()));
+ NettyUDPChannelInitializer nettyChannelInitializer = new NettyUDPChannelInitializer(
+ new NettyUDPServerHandler(new NettySource()));
nettyChannelInitializer.initChannel(channel);
assertNotNull(channel.pipeline().toMap());
- assertEquals(2, channel.pipeline().toMap().size());
+ assertEquals(1, channel.pipeline().toMap().size());
}
-
+
}