feature: Namingserver watch support http2 (#6861)

diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
index b589396..c036b6b 100644
--- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
@@ -36,6 +36,10 @@
 import org.apache.seata.common.XID;
 import org.apache.seata.common.thread.NamedThreadFactory;
 import org.apache.seata.config.ConfigurationFactory;
+import org.apache.seata.core.protocol.detector.Http2Detector;
+import org.apache.seata.core.protocol.detector.HttpDetector;
+import org.apache.seata.core.protocol.detector.ProtocolDetector;
+import org.apache.seata.core.protocol.detector.SeataDetector;
 import org.apache.seata.core.rpc.RemotingBootstrap;
 import org.apache.seata.discovery.registry.MultiRegistryFactory;
 import org.apache.seata.discovery.registry.RegistryService;
@@ -163,7 +167,7 @@
                 @Override
                 public void initChannel(SocketChannel ch) {
                     ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
-                            .addLast(new ProtocolDetectHandler(NettyServerBootstrap.this));
+                            .addLast(new ProtocolDetectHandler(new ProtocolDetector[]{new Http2Detector(getChannelHandlers()), new SeataDetector(getChannelHandlers()), new HttpDetector()}));
                 }
             });
 
diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDetectHandler.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDetectHandler.java
index 6787bfe..f8dbbdb 100644
--- a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDetectHandler.java
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDetectHandler.java
@@ -23,7 +23,6 @@
 import org.apache.seata.core.protocol.detector.Http2Detector;
 import org.apache.seata.core.protocol.detector.HttpDetector;
 import org.apache.seata.core.protocol.detector.ProtocolDetector;
-import org.apache.seata.core.protocol.detector.SeataDetector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,12 +30,11 @@
 
 public class ProtocolDetectHandler extends ByteToMessageDecoder {
     private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDetectHandler.class);
-    private NettyServerBootstrap nettyServerBootstrap;
+
     private ProtocolDetector[] supportedProtocolDetectors;
 
-    public ProtocolDetectHandler(NettyServerBootstrap nettyServerBootstrap) {
-        this.nettyServerBootstrap = nettyServerBootstrap;
-        this.supportedProtocolDetectors = new ProtocolDetector[]{new Http2Detector(nettyServerBootstrap.getChannelHandlers()), new SeataDetector(nettyServerBootstrap.getChannelHandlers()), new HttpDetector()};
+    public ProtocolDetectHandler(ProtocolDetector[] supportedProtocolDetectors) {
+        this.supportedProtocolDetectors = supportedProtocolDetectors;
     }
 
     @Override
diff --git a/discovery/seata-discovery-raft/src/main/test/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImplTest.java b/discovery/seata-discovery-raft/src/main/test/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImplTest.java
new file mode 100644
index 0000000..e6c88fc
--- /dev/null
+++ b/discovery/seata-discovery-raft/src/main/test/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImplTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.discovery.registry.raft;
+
+
+import org.apache.seata.common.util.HttpClientUtil;
+import org.apache.seata.config.ConfigurationFactory;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.entity.StringEntity;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+class RaftRegistryServiceImplTest {
+
+
+    @BeforeAll
+    public static void beforeClass() {
+        System.setProperty("service.vgroupMapping.tx", "default");
+        System.setProperty("registry.raft.username", "seata");
+        System.setProperty("registry.raft.password", "seata");
+        System.setProperty("registry.raft.serverAddr", "127.0.0.1:8092");
+        System.setProperty("registry.raft.tokenValidityInMilliseconds", "10000");
+        ConfigurationFactory.getInstance();
+    }
+
+    @AfterAll
+    public static void adAfterClass() throws Exception {
+        System.clearProperty("service.vgroupMapping.tx");
+    }
+
+    /**
+     * test whether throws exception when login failed
+     */
+    @Test
+    public void testLoginFailed() throws IOException, NoSuchMethodException {
+        String jwtToken = "null";
+        String responseBody = "{\"code\":\"401\",\"message\":\"Login failed\",\"data\":\"" + jwtToken + "\",\"success\":false}";
+
+        try (MockedStatic<HttpClientUtil> mockedStatic = Mockito.mockStatic(HttpClientUtil.class)) {
+
+            CloseableHttpResponse mockResponse = mock(CloseableHttpResponse.class);
+            StatusLine mockStatusLine = mock(StatusLine.class);
+
+            when(mockResponse.getEntity()).thenReturn(new StringEntity(responseBody));
+            when(mockResponse.getStatusLine()).thenReturn(mockStatusLine);
+            when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+
+
+            when(HttpClientUtil.doPost(any(String.class), any(Map.class), any(Map.class), any(int.class)))
+                .thenReturn(mockResponse);
+
+            // Use reflection to access and invoke the private method
+            Method refreshTokenMethod = RaftRegistryServiceImpl.class.getDeclaredMethod("refreshToken", String.class);
+            refreshTokenMethod.setAccessible(true);
+            assertThrows(Exception.class, () -> refreshTokenMethod.invoke(RaftRegistryServiceImpl.getInstance(), "127.0.0.1:8092"));
+
+        }
+    }
+
+    /**
+     * test whether the jwtToken updated when refreshToken method invoked
+     */
+
+    @Test
+    public void testRefreshTokenSuccess() throws IOException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, NoSuchFieldException {
+        String jwtToken = "newToken";
+        String responseBody = "{\"code\":\"200\",\"message\":\"success\",\"data\":\"" + jwtToken + "\",\"success\":true}";
+
+        try (MockedStatic<HttpClientUtil> mockedStatic = Mockito.mockStatic(HttpClientUtil.class)) {
+
+            CloseableHttpResponse mockResponse = mock(CloseableHttpResponse.class);
+            StatusLine mockStatusLine = mock(StatusLine.class);
+
+            when(mockResponse.getEntity()).thenReturn(new StringEntity(responseBody));
+            when(mockResponse.getStatusLine()).thenReturn(mockStatusLine);
+            when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+
+
+            when(HttpClientUtil.doPost(any(String.class), any(Map.class), any(Map.class), any(int.class)))
+                .thenReturn(mockResponse);
+
+
+            Method refreshTokenMethod = RaftRegistryServiceImpl.class.getDeclaredMethod("refreshToken", String.class);
+            refreshTokenMethod.setAccessible(true);
+            refreshTokenMethod.invoke(RaftRegistryServiceImpl.getInstance(), "127.0.0.1:8092");
+            Field jwtTokenField = RaftRegistryServiceImpl.class.getDeclaredField("jwtToken");
+            jwtTokenField.setAccessible(true);
+            String jwtTokenAct = (String) jwtTokenField.get(null);
+
+
+            assertEquals(jwtToken, jwtTokenAct);
+
+        }
+    }
+
+
+    /**
+     * test whether the jwtToken refreshed when it is expired
+     */
+
+    @Test
+    public void testSecureTTL() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, NoSuchFieldException, InterruptedException {
+        Field tokenTimeStamp = RaftRegistryServiceImpl.class.getDeclaredField("tokenTimeStamp");
+        tokenTimeStamp.setAccessible(true);
+        tokenTimeStamp.setLong(RaftRegistryServiceImpl.class, System.currentTimeMillis());
+        Method isExpiredMethod = RaftRegistryServiceImpl.class.getDeclaredMethod("isTokenExpired");
+        isExpiredMethod.setAccessible(true);
+        boolean rst = (boolean) isExpiredMethod.invoke(null);
+        assertEquals(false, rst);
+        Thread.sleep(10000);
+        rst = (boolean) isExpiredMethod.invoke(null);
+        assertEquals(true, rst);
+    }
+
+}
diff --git a/discovery/seata-discovery-raft/src/main/test/resources/file.conf b/discovery/seata-discovery-raft/src/main/test/resources/file.conf
new file mode 100644
index 0000000..46c3e04
--- /dev/null
+++ b/discovery/seata-discovery-raft/src/main/test/resources/file.conf
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+service {
+  #transaction service group mapping
+  vgroupMapping.default_tx_group = "default"
+  #only support when registry.type=file, please don't set multiple addresses
+  default.grouplist = "127.0.0.1:8091"
+  #disable seata
+  disableGlobalTransaction = false
+}
\ No newline at end of file
diff --git a/discovery/seata-discovery-raft/src/main/test/resources/registry.conf b/discovery/seata-discovery-raft/src/main/test/resources/registry.conf
new file mode 100644
index 0000000..5a15b8e
--- /dev/null
+++ b/discovery/seata-discovery-raft/src/main/test/resources/registry.conf
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+registry {
+  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom、raft
+   type = "raft"
+   raft {
+      metadata-max-age-ms = 30000
+      serverAddr = "127.0.0.1:8848"
+      username = "seata"
+      password = "seata"
+      tokenValidityInMilliseconds = 1740000
+   }
+}
+
+config {
+  # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig、custom
+  type = "file"
+  raft {
+    metadata-max-age-ms = 30000
+    serverAddr = "127.0.0.1:8848"
+  }
+  file {
+    name = "file.conf"
+  }
+  custom {
+    name = ""
+  }
+}
diff --git a/namingserver/pom.xml b/namingserver/pom.xml
index fba22f8..6463407 100644
--- a/namingserver/pom.xml
+++ b/namingserver/pom.xml
@@ -35,7 +35,6 @@
         <spring-boot-for-server.version>2.7.18</spring-boot-for-server.version>
         <spring-framework-for-server.version>5.3.39</spring-framework-for-server.version>
         <snakeyaml-for-server.version>2.0</snakeyaml-for-server.version>
-        <tomcat-embed.version>9.0.90</tomcat-embed.version>
     </properties>
 
     <dependencyManagement>
@@ -80,21 +79,6 @@
             <artifactId>snakeyaml</artifactId>
             <version>${snakeyaml-for-server.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.apache.tomcat.embed</groupId>
-            <artifactId>tomcat-embed-core</artifactId>
-            <version>${tomcat-embed.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.tomcat.embed</groupId>
-            <artifactId>tomcat-embed-el</artifactId>
-            <version>${tomcat-embed.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.tomcat.embed</groupId>
-            <artifactId>tomcat-embed-websocket</artifactId>
-            <version>${tomcat-embed.version}</version>
-        </dependency>
 
         <dependency>
             <groupId>org.springframework.boot</groupId>
@@ -149,6 +133,18 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.seata</groupId>
+            <artifactId>seata-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seata</groupId>
+            <artifactId>seata-spring-autoconfigure-core</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.httpcomponents</groupId>
             <artifactId>httpclient</artifactId>
         </dependency>
diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/NamingserverApplication.java b/namingserver/src/main/java/org/apache/seata/namingserver/NamingserverApplication.java
index dbdc1a4..1612ea4 100644
--- a/namingserver/src/main/java/org/apache/seata/namingserver/NamingserverApplication.java
+++ b/namingserver/src/main/java/org/apache/seata/namingserver/NamingserverApplication.java
@@ -16,10 +16,13 @@
  */
 package org.apache.seata.namingserver;
 
+import org.apache.seata.spring.boot.autoconfigure.http.RestControllerBeanPostProcessor;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Import;
 
 @SpringBootApplication
+@Import(RestControllerBeanPostProcessor.class)
 public class NamingserverApplication {
 
     public static void main(String[] args) {
diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java b/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java
index eaa70e7..357fbae 100644
--- a/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java
+++ b/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java
@@ -17,6 +17,7 @@
 package org.apache.seata.namingserver.controller;
 
 
+import io.netty.channel.Channel;
 import org.apache.seata.common.metadata.namingserver.MetaResponse;
 import org.apache.seata.common.metadata.namingserver.NamingServerNode;
 import org.apache.seata.common.result.Result;
@@ -36,8 +37,6 @@
 
 
 import javax.annotation.Resource;
-import javax.servlet.AsyncContext;
-import javax.servlet.http.HttpServletRequest;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -72,7 +71,7 @@
 
     @PostMapping("/unregister")
     public Result<String> unregisterInstance(@RequestParam String namespace, @RequestParam String clusterName,
-            @RequestParam String unit, @RequestBody NamingServerNode registerBody) {
+        @RequestParam String unit, @RequestBody NamingServerNode registerBody) {
         Result<String> result = new Result<>();
         boolean isSuccess = namingManager.unregisterInstance(namespace, clusterName, unit, registerBody);
         if (isSuccess) {
@@ -125,17 +124,14 @@
      * @param clientTerm The timestamp of the subscription saved on the client side
      * @param vGroup     The name of the transaction group
      * @param timeout    The timeout duration
-     * @param request    The client's HTTP request
      */
 
     @PostMapping("/watch")
     public void watch(@RequestParam String clientTerm,
                       @RequestParam String vGroup,
                       @RequestParam String timeout,
-                      HttpServletRequest request) {
-        AsyncContext context = request.startAsync();
-        context.setTimeout(0L);
-        Watcher<AsyncContext> watcher = new Watcher<>(vGroup, context, Integer.parseInt(timeout), Long.parseLong(clientTerm), request.getRemoteAddr());
+                      Channel channel) {
+        Watcher<Channel> watcher = new Watcher<>(vGroup, channel, Integer.parseInt(timeout), Long.parseLong(clientTerm), channel.remoteAddress().toString());
         clusterWatcherManager.registryWatcher(watcher);
     }
 
diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/manager/ClusterWatcherManager.java b/namingserver/src/main/java/org/apache/seata/namingserver/manager/ClusterWatcherManager.java
index 6c2baaf..9091549 100644
--- a/namingserver/src/main/java/org/apache/seata/namingserver/manager/ClusterWatcherManager.java
+++ b/namingserver/src/main/java/org/apache/seata/namingserver/manager/ClusterWatcherManager.java
@@ -16,6 +16,14 @@
  */
 package org.apache.seata.namingserver.manager;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
+import io.netty.handler.codec.http2.Http2StreamChannel;
 import org.apache.seata.namingserver.listener.ClusterChangeEvent;
 import org.apache.seata.namingserver.listener.ClusterChangeListener;
 import org.apache.seata.namingserver.listener.Watcher;
@@ -28,8 +36,6 @@
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
-import javax.servlet.AsyncContext;
-import javax.servlet.http.HttpServletResponse;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -88,18 +94,23 @@
     }
 
     private void notify(Watcher<?> watcher) {
-        notify(watcher, HttpServletResponse.SC_OK);
+        notify(watcher, HttpResponseStatus.OK.code());
     }
 
     private void notify(Watcher<?> watcher, int statusCode) {
-        AsyncContext asyncContext = (AsyncContext) watcher.getAsyncContext();
-        HttpServletResponse httpServletResponse = (HttpServletResponse) asyncContext.getResponse();
-        watcher.setDone(true);
-        if (logger.isDebugEnabled()) {
-            logger.debug("notify cluster change event to: {}", asyncContext.getRequest().getRemoteAddr());
+        Channel channel = (Channel) watcher.getAsyncContext();
+        if (channel instanceof Http2StreamChannel) {
+            ByteBuf buf = Unpooled.buffer(4);
+            buf.writeInt(statusCode);
+            channel.writeAndFlush(new DefaultHttp2DataFrame(buf));
+            return;
+        } else {
+            // http
+            channel.writeAndFlush(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(statusCode)));
         }
-        httpServletResponse.setStatus(statusCode);
-        asyncContext.complete();
+        if (logger.isDebugEnabled()) {
+            logger.debug("notify cluster change event to: {}", watcher.getClientEndpoint());
+        }
     }
 
     public void registryWatcher(Watcher<?> watcher) {
diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/netty/NamingServerHttp2Detector.java b/namingserver/src/main/java/org/apache/seata/namingserver/netty/NamingServerHttp2Detector.java
new file mode 100644
index 0000000..e8b5f21
--- /dev/null
+++ b/namingserver/src/main/java/org/apache/seata/namingserver/netty/NamingServerHttp2Detector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.namingserver.netty;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
+import io.netty.handler.codec.http2.Http2MultiplexHandler;
+import io.netty.handler.codec.http2.Http2StreamChannel;
+import org.apache.seata.core.protocol.detector.Http2Detector;
+import org.apache.seata.core.rpc.netty.http2.Http2DispatchHandler;
+
+public class NamingServerHttp2Detector extends Http2Detector {
+    public NamingServerHttp2Detector() {
+        super(null);
+    }
+
+    @Override
+    public ChannelHandler[] getHandlers() {
+        return new ChannelHandler[]{
+                Http2FrameCodecBuilder.forServer().build(),
+                new Http2MultiplexHandler(new ChannelInitializer<Http2StreamChannel>() {
+                    @Override
+                    protected void initChannel(Http2StreamChannel ch) {
+                        final ChannelPipeline p = ch.pipeline();
+                        p.addLast(new Http2DispatchHandler());
+                    }
+                })
+        };
+    }
+}
diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/netty/NettyNamingServer.java b/namingserver/src/main/java/org/apache/seata/namingserver/netty/NettyNamingServer.java
new file mode 100644
index 0000000..b7f68e9
--- /dev/null
+++ b/namingserver/src/main/java/org/apache/seata/namingserver/netty/NettyNamingServer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.namingserver.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.NettyRuntime;
+import org.apache.seata.common.thread.NamedThreadFactory;
+import org.apache.seata.core.protocol.detector.HttpDetector;
+import org.apache.seata.core.protocol.detector.ProtocolDetector;
+import org.apache.seata.core.rpc.netty.ProtocolDetectHandler;
+import org.apache.seata.namingserver.manager.ClusterWatcherManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Component
+public class NettyNamingServer {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(NettyNamingServer.class);
+
+    private final AtomicBoolean initialized = new AtomicBoolean(false);
+
+    private final AtomicBoolean destroyed = new AtomicBoolean(false);
+
+    private final ServerBootstrap serverBootstrap = new ServerBootstrap();
+
+    private EventLoopGroup eventLoopGroupWorker;
+
+    private EventLoopGroup eventLoopGroupBoss;
+
+    @Value("${server.port}")
+    private Integer port;
+
+    @Resource
+    private ClusterWatcherManager clusterWatcherManager;
+
+    @PostConstruct
+    public void init() {
+        if (!initialized.compareAndSet(false, true)) {
+            return;
+        }
+
+        this.eventLoopGroupBoss = new NioEventLoopGroup(1,
+                new NamedThreadFactory("NettyBoss", 1));
+        this.eventLoopGroupWorker = new NioEventLoopGroup(NettyRuntime.availableProcessors() * 2,
+                new NamedThreadFactory("NettyServerNIOWorker",
+                        NettyRuntime.availableProcessors() * 2));
+        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
+                .channel(NioServerSocketChannel.class)
+                .option(ChannelOption.SO_BACKLOG, 1024)
+                .option(ChannelOption.SO_REUSEADDR, true)
+                .childOption(ChannelOption.SO_KEEPALIVE, true)
+                .childOption(ChannelOption.TCP_NODELAY, true)
+                .childOption(ChannelOption.SO_SNDBUF, 153600)
+                .childOption(ChannelOption.SO_RCVBUF, 153600)
+                .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
+                        new WriteBufferWaterMark(1048576,
+                                67108864))
+                .localAddress(new InetSocketAddress(port))
+                .childHandler(new ChannelInitializer<SocketChannel>() {
+                    @Override
+                    public void initChannel(SocketChannel ch) {
+                        ch.pipeline().addLast(new ProtocolDetectHandler(new ProtocolDetector[]{new HttpDetector(), new NamingServerHttp2Detector()}));
+                    }
+                });
+
+        try {
+            this.serverBootstrap.bind(port).sync();
+            initialized.set(true);
+        } catch (Exception exx) {
+            throw new RuntimeException("Server start failed", exx);
+        }
+    }
+
+    @PreDestroy
+    public void destroy() {
+        if (!destroyed.compareAndSet(false, true)) {
+            return;
+        }
+
+        this.eventLoopGroupBoss.shutdownGracefully();
+        this.eventLoopGroupWorker.shutdownGracefully();;
+    }
+}
diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/http/RestControllerBeanPostProcessor.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/http/RestControllerBeanPostProcessor.java
new file mode 100644
index 0000000..553ca85
--- /dev/null
+++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/http/RestControllerBeanPostProcessor.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.spring.boot.autoconfigure.http;
+
+import org.apache.seata.core.rpc.netty.http.ControllerManager;
+import org.apache.seata.core.rpc.netty.http.HttpInvocation;
+import org.apache.seata.core.rpc.netty.http.ParamMetaData;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.stereotype.Component;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.ModelAttribute;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Component
+public class RestControllerBeanPostProcessor implements BeanPostProcessor {
+
+    private static final List<Class<? extends Annotation>> MAPPING_CLASS = new ArrayList<>();
+    private static final Map<Class<? extends Annotation>, ParamMetaData.ParamConvertType> MAPPING_PARAM_TYPE = new HashMap<>();
+
+    static {
+        MAPPING_CLASS.add(GetMapping.class);
+        MAPPING_CLASS.add(PostMapping.class);
+        MAPPING_CLASS.add(RequestMapping.class);
+        MAPPING_CLASS.add(PutMapping.class);
+        MAPPING_CLASS.add(DeleteMapping.class);
+
+        MAPPING_PARAM_TYPE.put(RequestParam.class, ParamMetaData.ParamConvertType.REQUEST_PARAM);
+        MAPPING_PARAM_TYPE.put(RequestBody.class, ParamMetaData.ParamConvertType.REQUEST_BODY);
+        MAPPING_PARAM_TYPE.put(ModelAttribute.class, ParamMetaData.ParamConvertType.MODEL_ATTRIBUTE);
+    }
+
+    @Override
+    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+        if (!bean.getClass().isAnnotationPresent(RestController.class)) {
+            return bean;
+        }
+
+        Class<?> httpControllerClass = bean.getClass();
+        RequestMapping requestMapping = httpControllerClass.getAnnotation(RequestMapping.class);
+        List<String> prePaths;
+        if (requestMapping != null) {
+            prePaths = Arrays.asList(requestMapping.value());
+        } else {
+            prePaths = new ArrayList<>();
+        }
+        Method[] methods = httpControllerClass.getDeclaredMethods();
+        for (Method method : methods) {
+            for (Class<? extends Annotation> annotationType : MAPPING_CLASS) {
+                Annotation annotation = method.getAnnotation(annotationType);
+                if (annotation != null) {
+                    List<String> postPaths = getAnnotationValue(annotation);
+                    addPathMapping(bean, prePaths, method, postPaths);
+                }
+            }
+        }
+
+        return bean;
+    }
+
+    private static List<String> getAnnotationValue(Annotation annotation) {
+        try {
+            Class<? extends Annotation> annotationClass = annotation.annotationType();
+            Method valueMethod = annotationClass.getMethod("value");
+            valueMethod.setAccessible(true);
+            return Arrays.asList((String[]) valueMethod.invoke(annotation));
+        } catch (Throwable e) {
+            return new ArrayList<>();
+        }
+    }
+
+    private static void addPathMapping(Object httpController, List<String> prePaths, Method method, List<String> postPaths) {
+        Class<?>[] parameterTypes = method.getParameterTypes();
+        Annotation[][] parameterAnnotations = method.getParameterAnnotations();
+        ParamMetaData[] paramMetaDatas = new ParamMetaData[parameterTypes.length];
+        for (int i = 0; i < parameterTypes.length; i++) {
+            Class<? extends Annotation> parameterAnnotationType = null;
+            if (parameterAnnotations[i] != null && parameterAnnotations[i].length > 0) {
+                parameterAnnotationType = parameterAnnotations[i][0].annotationType();
+            }
+
+            if (parameterAnnotationType == null) {
+                parameterAnnotationType = RequestParam.class;
+            }
+
+            ParamMetaData paramMetaData = new ParamMetaData();
+            ParamMetaData.ParamConvertType paramConvertType = MAPPING_PARAM_TYPE.get(parameterAnnotationType);
+            paramMetaData.setParamConvertType(paramConvertType);
+            paramMetaDatas[i] = paramMetaData;
+        }
+        int maxSize = Math.max(prePaths.size(), postPaths.size());
+        for (int i = prePaths.size(); i < maxSize; i++) {
+            prePaths.add("/");
+        }
+
+        for (int i = postPaths.size(); i < maxSize; i++) {
+            postPaths.add("/");
+        }
+
+        for (String prePath : prePaths) {
+            for (String postPath : postPaths) {
+                String fullPath = (prePath + "/" + postPath).replaceAll("(/)+", "/");
+                HttpInvocation httpInvocation = new HttpInvocation();
+                httpInvocation.setMethod(method);
+                httpInvocation.setController(httpController);
+                httpInvocation.setPath(fullPath);
+                httpInvocation.setParamMetaData(paramMetaDatas);
+                ControllerManager.addHttpInvocation(httpInvocation);
+            }
+        }
+    }
+
+
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java b/server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java
index dda663c..dbe1535 100644
--- a/server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java
+++ b/server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java
@@ -27,6 +27,7 @@
 import javax.servlet.AsyncContext;
 import javax.servlet.http.HttpServletResponse;
 
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.handler.codec.http.DefaultHttpResponse;
@@ -41,6 +42,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.event.EventListener;
+import org.springframework.http.HttpStatus;
 import org.springframework.scheduling.annotation.Async;
 import org.springframework.stereotype.Component;
 
@@ -66,11 +68,7 @@
                 Optional.ofNullable(WATCHERS.remove(group))
                     .ifPresent(watchers -> watchers.parallelStream().forEach(watcher -> {
                         if (System.currentTimeMillis() >= watcher.getTimeout()) {
-                            HttpServletResponse httpServletResponse =
-                                (HttpServletResponse)((AsyncContext)watcher.getAsyncContext()).getResponse();
-                            watcher.setDone(true);
-                            httpServletResponse.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
-                            ((AsyncContext)watcher.getAsyncContext()).complete();
+                            notify(watcher, HttpStatus.NOT_MODIFIED.value());
                         }
                         if (!watcher.isDone()) {
                             // Re-register
@@ -94,17 +92,35 @@
     }
 
     private void notify(Watcher<?> watcher) {
-        Channel channel  = (Channel) watcher.getAsyncContext();
-        if (channel instanceof Http2StreamChannel) {
-            // http2
-            channel.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.wrappedBuffer(Unpooled.EMPTY_BUFFER)));
+        notify(watcher, HttpResponseStatus.OK.code());
+    }
+    private void notify(Watcher<?> watcher, int statusCode) {
+        Object ctx = watcher.getAsyncContext();
+        watcher.setDone(true);
+        if (ctx instanceof Channel) {
+            Channel channel = (Channel) ctx;
+            if (channel instanceof Http2StreamChannel) {
+                ByteBuf buf = Unpooled.buffer(4);
+                buf.writeInt(statusCode);
+                channel.writeAndFlush(new DefaultHttp2DataFrame(buf));
+                return;
+            } else {
+                // http
+                channel.writeAndFlush(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(statusCode)));
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug("notify cluster change event to: {}", channel.remoteAddress());
+            }
+
             return;
-        } else {
-            // http
-            channel.writeAndFlush(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK));
-        }
-        if (logger.isDebugEnabled()) {
-            logger.debug("notify cluster change event to: {}", channel.remoteAddress());
+        } else if (ctx instanceof AsyncContext) {
+            AsyncContext asyncContext = (AsyncContext) ctx;
+            HttpServletResponse httpServletResponse = (HttpServletResponse) asyncContext.getResponse();
+            if (logger.isDebugEnabled()) {
+                logger.debug("notify cluster change event to: {}", asyncContext.getRequest().getRemoteAddr());
+            }
+            httpServletResponse.setStatus(HttpServletResponse.SC_OK);
+            asyncContext.complete();
         }
     }