feature: Namingserver watch support http2 (#6861)
diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
index b589396..c036b6b 100644
--- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyServerBootstrap.java
@@ -36,6 +36,10 @@
import org.apache.seata.common.XID;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.config.ConfigurationFactory;
+import org.apache.seata.core.protocol.detector.Http2Detector;
+import org.apache.seata.core.protocol.detector.HttpDetector;
+import org.apache.seata.core.protocol.detector.ProtocolDetector;
+import org.apache.seata.core.protocol.detector.SeataDetector;
import org.apache.seata.core.rpc.RemotingBootstrap;
import org.apache.seata.discovery.registry.MultiRegistryFactory;
import org.apache.seata.discovery.registry.RegistryService;
@@ -163,7 +167,7 @@
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new IdleStateHandler(nettyServerConfig.getChannelMaxReadIdleSeconds(), 0, 0))
- .addLast(new ProtocolDetectHandler(NettyServerBootstrap.this));
+ .addLast(new ProtocolDetectHandler(new ProtocolDetector[]{new Http2Detector(getChannelHandlers()), new SeataDetector(getChannelHandlers()), new HttpDetector()}));
}
});
diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDetectHandler.java b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDetectHandler.java
index 6787bfe..f8dbbdb 100644
--- a/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDetectHandler.java
+++ b/core/src/main/java/org/apache/seata/core/rpc/netty/ProtocolDetectHandler.java
@@ -23,7 +23,6 @@
import org.apache.seata.core.protocol.detector.Http2Detector;
import org.apache.seata.core.protocol.detector.HttpDetector;
import org.apache.seata.core.protocol.detector.ProtocolDetector;
-import org.apache.seata.core.protocol.detector.SeataDetector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,12 +30,11 @@
public class ProtocolDetectHandler extends ByteToMessageDecoder {
private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolDetectHandler.class);
- private NettyServerBootstrap nettyServerBootstrap;
+
private ProtocolDetector[] supportedProtocolDetectors;
- public ProtocolDetectHandler(NettyServerBootstrap nettyServerBootstrap) {
- this.nettyServerBootstrap = nettyServerBootstrap;
- this.supportedProtocolDetectors = new ProtocolDetector[]{new Http2Detector(nettyServerBootstrap.getChannelHandlers()), new SeataDetector(nettyServerBootstrap.getChannelHandlers()), new HttpDetector()};
+ public ProtocolDetectHandler(ProtocolDetector[] supportedProtocolDetectors) {
+ this.supportedProtocolDetectors = supportedProtocolDetectors;
}
@Override
diff --git a/discovery/seata-discovery-raft/src/main/test/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImplTest.java b/discovery/seata-discovery-raft/src/main/test/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImplTest.java
new file mode 100644
index 0000000..e6c88fc
--- /dev/null
+++ b/discovery/seata-discovery-raft/src/main/test/java/org/apache/seata/discovery/registry/raft/RaftRegistryServiceImplTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.discovery.registry.raft;
+
+
+import org.apache.seata.common.util.HttpClientUtil;
+import org.apache.seata.config.ConfigurationFactory;
+import org.apache.http.HttpStatus;
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.entity.StringEntity;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.Map;
+
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+class RaftRegistryServiceImplTest {
+
+
+ @BeforeAll
+ public static void beforeClass() {
+ System.setProperty("service.vgroupMapping.tx", "default");
+ System.setProperty("registry.raft.username", "seata");
+ System.setProperty("registry.raft.password", "seata");
+ System.setProperty("registry.raft.serverAddr", "127.0.0.1:8092");
+ System.setProperty("registry.raft.tokenValidityInMilliseconds", "10000");
+ ConfigurationFactory.getInstance();
+ }
+
+ @AfterAll
+ public static void adAfterClass() throws Exception {
+ System.clearProperty("service.vgroupMapping.tx");
+ }
+
+ /**
+ * test whether throws exception when login failed
+ */
+ @Test
+ public void testLoginFailed() throws IOException, NoSuchMethodException {
+ String jwtToken = "null";
+ String responseBody = "{\"code\":\"401\",\"message\":\"Login failed\",\"data\":\"" + jwtToken + "\",\"success\":false}";
+
+ try (MockedStatic<HttpClientUtil> mockedStatic = Mockito.mockStatic(HttpClientUtil.class)) {
+
+ CloseableHttpResponse mockResponse = mock(CloseableHttpResponse.class);
+ StatusLine mockStatusLine = mock(StatusLine.class);
+
+ when(mockResponse.getEntity()).thenReturn(new StringEntity(responseBody));
+ when(mockResponse.getStatusLine()).thenReturn(mockStatusLine);
+ when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+
+
+ when(HttpClientUtil.doPost(any(String.class), any(Map.class), any(Map.class), any(int.class)))
+ .thenReturn(mockResponse);
+
+ // Use reflection to access and invoke the private method
+ Method refreshTokenMethod = RaftRegistryServiceImpl.class.getDeclaredMethod("refreshToken", String.class);
+ refreshTokenMethod.setAccessible(true);
+ assertThrows(Exception.class, () -> refreshTokenMethod.invoke(RaftRegistryServiceImpl.getInstance(), "127.0.0.1:8092"));
+
+ }
+ }
+
+ /**
+ * test whether the jwtToken updated when refreshToken method invoked
+ */
+
+ @Test
+ public void testRefreshTokenSuccess() throws IOException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, NoSuchFieldException {
+ String jwtToken = "newToken";
+ String responseBody = "{\"code\":\"200\",\"message\":\"success\",\"data\":\"" + jwtToken + "\",\"success\":true}";
+
+ try (MockedStatic<HttpClientUtil> mockedStatic = Mockito.mockStatic(HttpClientUtil.class)) {
+
+ CloseableHttpResponse mockResponse = mock(CloseableHttpResponse.class);
+ StatusLine mockStatusLine = mock(StatusLine.class);
+
+ when(mockResponse.getEntity()).thenReturn(new StringEntity(responseBody));
+ when(mockResponse.getStatusLine()).thenReturn(mockStatusLine);
+ when(mockStatusLine.getStatusCode()).thenReturn(HttpStatus.SC_OK);
+
+
+ when(HttpClientUtil.doPost(any(String.class), any(Map.class), any(Map.class), any(int.class)))
+ .thenReturn(mockResponse);
+
+
+ Method refreshTokenMethod = RaftRegistryServiceImpl.class.getDeclaredMethod("refreshToken", String.class);
+ refreshTokenMethod.setAccessible(true);
+ refreshTokenMethod.invoke(RaftRegistryServiceImpl.getInstance(), "127.0.0.1:8092");
+ Field jwtTokenField = RaftRegistryServiceImpl.class.getDeclaredField("jwtToken");
+ jwtTokenField.setAccessible(true);
+ String jwtTokenAct = (String) jwtTokenField.get(null);
+
+
+ assertEquals(jwtToken, jwtTokenAct);
+
+ }
+ }
+
+
+ /**
+ * test whether the jwtToken refreshed when it is expired
+ */
+
+ @Test
+ public void testSecureTTL() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, NoSuchFieldException, InterruptedException {
+ Field tokenTimeStamp = RaftRegistryServiceImpl.class.getDeclaredField("tokenTimeStamp");
+ tokenTimeStamp.setAccessible(true);
+ tokenTimeStamp.setLong(RaftRegistryServiceImpl.class, System.currentTimeMillis());
+ Method isExpiredMethod = RaftRegistryServiceImpl.class.getDeclaredMethod("isTokenExpired");
+ isExpiredMethod.setAccessible(true);
+ boolean rst = (boolean) isExpiredMethod.invoke(null);
+ assertEquals(false, rst);
+ Thread.sleep(10000);
+ rst = (boolean) isExpiredMethod.invoke(null);
+ assertEquals(true, rst);
+ }
+
+}
diff --git a/discovery/seata-discovery-raft/src/main/test/resources/file.conf b/discovery/seata-discovery-raft/src/main/test/resources/file.conf
new file mode 100644
index 0000000..46c3e04
--- /dev/null
+++ b/discovery/seata-discovery-raft/src/main/test/resources/file.conf
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+service {
+ #transaction service group mapping
+ vgroupMapping.default_tx_group = "default"
+ #only support when registry.type=file, please don't set multiple addresses
+ default.grouplist = "127.0.0.1:8091"
+ #disable seata
+ disableGlobalTransaction = false
+}
\ No newline at end of file
diff --git a/discovery/seata-discovery-raft/src/main/test/resources/registry.conf b/discovery/seata-discovery-raft/src/main/test/resources/registry.conf
new file mode 100644
index 0000000..5a15b8e
--- /dev/null
+++ b/discovery/seata-discovery-raft/src/main/test/resources/registry.conf
@@ -0,0 +1,43 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+registry {
+ # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa、custom、raft
+ type = "raft"
+ raft {
+ metadata-max-age-ms = 30000
+ serverAddr = "127.0.0.1:8848"
+ username = "seata"
+ password = "seata"
+ tokenValidityInMilliseconds = 1740000
+ }
+}
+
+config {
+ # file、nacos 、apollo、zk、consul、etcd3、springCloudConfig、custom
+ type = "file"
+ raft {
+ metadata-max-age-ms = 30000
+ serverAddr = "127.0.0.1:8848"
+ }
+ file {
+ name = "file.conf"
+ }
+ custom {
+ name = ""
+ }
+}
diff --git a/namingserver/pom.xml b/namingserver/pom.xml
index fba22f8..6463407 100644
--- a/namingserver/pom.xml
+++ b/namingserver/pom.xml
@@ -35,7 +35,6 @@
<spring-boot-for-server.version>2.7.18</spring-boot-for-server.version>
<spring-framework-for-server.version>5.3.39</spring-framework-for-server.version>
<snakeyaml-for-server.version>2.0</snakeyaml-for-server.version>
- <tomcat-embed.version>9.0.90</tomcat-embed.version>
</properties>
<dependencyManagement>
@@ -80,21 +79,6 @@
<artifactId>snakeyaml</artifactId>
<version>${snakeyaml-for-server.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.tomcat.embed</groupId>
- <artifactId>tomcat-embed-core</artifactId>
- <version>${tomcat-embed.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.tomcat.embed</groupId>
- <artifactId>tomcat-embed-el</artifactId>
- <version>${tomcat-embed.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.tomcat.embed</groupId>
- <artifactId>tomcat-embed-websocket</artifactId>
- <version>${tomcat-embed.version}</version>
- </dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
@@ -149,6 +133,18 @@
</dependency>
<dependency>
+ <groupId>org.apache.seata</groupId>
+ <artifactId>seata-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seata</groupId>
+ <artifactId>seata-spring-autoconfigure-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</dependency>
diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/NamingserverApplication.java b/namingserver/src/main/java/org/apache/seata/namingserver/NamingserverApplication.java
index dbdc1a4..1612ea4 100644
--- a/namingserver/src/main/java/org/apache/seata/namingserver/NamingserverApplication.java
+++ b/namingserver/src/main/java/org/apache/seata/namingserver/NamingserverApplication.java
@@ -16,10 +16,13 @@
*/
package org.apache.seata.namingserver;
+import org.apache.seata.spring.boot.autoconfigure.http.RestControllerBeanPostProcessor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Import;
@SpringBootApplication
+@Import(RestControllerBeanPostProcessor.class)
public class NamingserverApplication {
public static void main(String[] args) {
diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java b/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java
index eaa70e7..357fbae 100644
--- a/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java
+++ b/namingserver/src/main/java/org/apache/seata/namingserver/controller/NamingController.java
@@ -17,6 +17,7 @@
package org.apache.seata.namingserver.controller;
+import io.netty.channel.Channel;
import org.apache.seata.common.metadata.namingserver.MetaResponse;
import org.apache.seata.common.metadata.namingserver.NamingServerNode;
import org.apache.seata.common.result.Result;
@@ -36,8 +37,6 @@
import javax.annotation.Resource;
-import javax.servlet.AsyncContext;
-import javax.servlet.http.HttpServletRequest;
import java.util.List;
import java.util.stream.Collectors;
@@ -72,7 +71,7 @@
@PostMapping("/unregister")
public Result<String> unregisterInstance(@RequestParam String namespace, @RequestParam String clusterName,
- @RequestParam String unit, @RequestBody NamingServerNode registerBody) {
+ @RequestParam String unit, @RequestBody NamingServerNode registerBody) {
Result<String> result = new Result<>();
boolean isSuccess = namingManager.unregisterInstance(namespace, clusterName, unit, registerBody);
if (isSuccess) {
@@ -125,17 +124,14 @@
* @param clientTerm The timestamp of the subscription saved on the client side
* @param vGroup The name of the transaction group
* @param timeout The timeout duration
- * @param request The client's HTTP request
*/
@PostMapping("/watch")
public void watch(@RequestParam String clientTerm,
@RequestParam String vGroup,
@RequestParam String timeout,
- HttpServletRequest request) {
- AsyncContext context = request.startAsync();
- context.setTimeout(0L);
- Watcher<AsyncContext> watcher = new Watcher<>(vGroup, context, Integer.parseInt(timeout), Long.parseLong(clientTerm), request.getRemoteAddr());
+ Channel channel) {
+ Watcher<Channel> watcher = new Watcher<>(vGroup, channel, Integer.parseInt(timeout), Long.parseLong(clientTerm), channel.remoteAddress().toString());
clusterWatcherManager.registryWatcher(watcher);
}
diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/manager/ClusterWatcherManager.java b/namingserver/src/main/java/org/apache/seata/namingserver/manager/ClusterWatcherManager.java
index 6c2baaf..9091549 100644
--- a/namingserver/src/main/java/org/apache/seata/namingserver/manager/ClusterWatcherManager.java
+++ b/namingserver/src/main/java/org/apache/seata/namingserver/manager/ClusterWatcherManager.java
@@ -16,6 +16,14 @@
*/
package org.apache.seata.namingserver.manager;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
+import io.netty.handler.codec.http2.Http2StreamChannel;
import org.apache.seata.namingserver.listener.ClusterChangeEvent;
import org.apache.seata.namingserver.listener.ClusterChangeListener;
import org.apache.seata.namingserver.listener.Watcher;
@@ -28,8 +36,6 @@
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
-import javax.servlet.AsyncContext;
-import javax.servlet.http.HttpServletResponse;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -88,18 +94,23 @@
}
private void notify(Watcher<?> watcher) {
- notify(watcher, HttpServletResponse.SC_OK);
+ notify(watcher, HttpResponseStatus.OK.code());
}
private void notify(Watcher<?> watcher, int statusCode) {
- AsyncContext asyncContext = (AsyncContext) watcher.getAsyncContext();
- HttpServletResponse httpServletResponse = (HttpServletResponse) asyncContext.getResponse();
- watcher.setDone(true);
- if (logger.isDebugEnabled()) {
- logger.debug("notify cluster change event to: {}", asyncContext.getRequest().getRemoteAddr());
+ Channel channel = (Channel) watcher.getAsyncContext();
+ if (channel instanceof Http2StreamChannel) {
+ ByteBuf buf = Unpooled.buffer(4);
+ buf.writeInt(statusCode);
+ channel.writeAndFlush(new DefaultHttp2DataFrame(buf));
+ return;
+ } else {
+ // http
+ channel.writeAndFlush(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(statusCode)));
}
- httpServletResponse.setStatus(statusCode);
- asyncContext.complete();
+ if (logger.isDebugEnabled()) {
+ logger.debug("notify cluster change event to: {}", watcher.getClientEndpoint());
+ }
}
public void registryWatcher(Watcher<?> watcher) {
diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/netty/NamingServerHttp2Detector.java b/namingserver/src/main/java/org/apache/seata/namingserver/netty/NamingServerHttp2Detector.java
new file mode 100644
index 0000000..e8b5f21
--- /dev/null
+++ b/namingserver/src/main/java/org/apache/seata/namingserver/netty/NamingServerHttp2Detector.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.namingserver.netty;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
+import io.netty.handler.codec.http2.Http2MultiplexHandler;
+import io.netty.handler.codec.http2.Http2StreamChannel;
+import org.apache.seata.core.protocol.detector.Http2Detector;
+import org.apache.seata.core.rpc.netty.http2.Http2DispatchHandler;
+
+public class NamingServerHttp2Detector extends Http2Detector {
+ public NamingServerHttp2Detector() {
+ super(null);
+ }
+
+ @Override
+ public ChannelHandler[] getHandlers() {
+ return new ChannelHandler[]{
+ Http2FrameCodecBuilder.forServer().build(),
+ new Http2MultiplexHandler(new ChannelInitializer<Http2StreamChannel>() {
+ @Override
+ protected void initChannel(Http2StreamChannel ch) {
+ final ChannelPipeline p = ch.pipeline();
+ p.addLast(new Http2DispatchHandler());
+ }
+ })
+ };
+ }
+}
diff --git a/namingserver/src/main/java/org/apache/seata/namingserver/netty/NettyNamingServer.java b/namingserver/src/main/java/org/apache/seata/namingserver/netty/NettyNamingServer.java
new file mode 100644
index 0000000..b7f68e9
--- /dev/null
+++ b/namingserver/src/main/java/org/apache/seata/namingserver/netty/NettyNamingServer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.namingserver.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.WriteBufferWaterMark;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.NettyRuntime;
+import org.apache.seata.common.thread.NamedThreadFactory;
+import org.apache.seata.core.protocol.detector.HttpDetector;
+import org.apache.seata.core.protocol.detector.ProtocolDetector;
+import org.apache.seata.core.rpc.netty.ProtocolDetectHandler;
+import org.apache.seata.namingserver.manager.ClusterWatcherManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import javax.annotation.Resource;
+import java.net.InetSocketAddress;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+@Component
+public class NettyNamingServer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(NettyNamingServer.class);
+
+ private final AtomicBoolean initialized = new AtomicBoolean(false);
+
+ private final AtomicBoolean destroyed = new AtomicBoolean(false);
+
+ private final ServerBootstrap serverBootstrap = new ServerBootstrap();
+
+ private EventLoopGroup eventLoopGroupWorker;
+
+ private EventLoopGroup eventLoopGroupBoss;
+
+ @Value("${server.port}")
+ private Integer port;
+
+ @Resource
+ private ClusterWatcherManager clusterWatcherManager;
+
+ @PostConstruct
+ public void init() {
+ if (!initialized.compareAndSet(false, true)) {
+ return;
+ }
+
+ this.eventLoopGroupBoss = new NioEventLoopGroup(1,
+ new NamedThreadFactory("NettyBoss", 1));
+ this.eventLoopGroupWorker = new NioEventLoopGroup(NettyRuntime.availableProcessors() * 2,
+ new NamedThreadFactory("NettyServerNIOWorker",
+ NettyRuntime.availableProcessors() * 2));
+ this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)
+ .channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_BACKLOG, 1024)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.SO_SNDBUF, 153600)
+ .childOption(ChannelOption.SO_RCVBUF, 153600)
+ .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
+ new WriteBufferWaterMark(1048576,
+ 67108864))
+ .localAddress(new InetSocketAddress(port))
+ .childHandler(new ChannelInitializer<SocketChannel>() {
+ @Override
+ public void initChannel(SocketChannel ch) {
+ ch.pipeline().addLast(new ProtocolDetectHandler(new ProtocolDetector[]{new HttpDetector(), new NamingServerHttp2Detector()}));
+ }
+ });
+
+ try {
+ this.serverBootstrap.bind(port).sync();
+ initialized.set(true);
+ } catch (Exception exx) {
+ throw new RuntimeException("Server start failed", exx);
+ }
+ }
+
+ @PreDestroy
+ public void destroy() {
+ if (!destroyed.compareAndSet(false, true)) {
+ return;
+ }
+
+ this.eventLoopGroupBoss.shutdownGracefully();
+ this.eventLoopGroupWorker.shutdownGracefully();;
+ }
+}
diff --git a/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/http/RestControllerBeanPostProcessor.java b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/http/RestControllerBeanPostProcessor.java
new file mode 100644
index 0000000..553ca85
--- /dev/null
+++ b/seata-spring-autoconfigure/seata-spring-autoconfigure-core/src/main/java/org/apache/seata/spring/boot/autoconfigure/http/RestControllerBeanPostProcessor.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seata.spring.boot.autoconfigure.http;
+
+import org.apache.seata.core.rpc.netty.http.ControllerManager;
+import org.apache.seata.core.rpc.netty.http.HttpInvocation;
+import org.apache.seata.core.rpc.netty.http.ParamMetaData;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.stereotype.Component;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.ModelAttribute;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Component
+public class RestControllerBeanPostProcessor implements BeanPostProcessor {
+
+ private static final List<Class<? extends Annotation>> MAPPING_CLASS = new ArrayList<>();
+ private static final Map<Class<? extends Annotation>, ParamMetaData.ParamConvertType> MAPPING_PARAM_TYPE = new HashMap<>();
+
+ static {
+ MAPPING_CLASS.add(GetMapping.class);
+ MAPPING_CLASS.add(PostMapping.class);
+ MAPPING_CLASS.add(RequestMapping.class);
+ MAPPING_CLASS.add(PutMapping.class);
+ MAPPING_CLASS.add(DeleteMapping.class);
+
+ MAPPING_PARAM_TYPE.put(RequestParam.class, ParamMetaData.ParamConvertType.REQUEST_PARAM);
+ MAPPING_PARAM_TYPE.put(RequestBody.class, ParamMetaData.ParamConvertType.REQUEST_BODY);
+ MAPPING_PARAM_TYPE.put(ModelAttribute.class, ParamMetaData.ParamConvertType.MODEL_ATTRIBUTE);
+ }
+
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+ if (!bean.getClass().isAnnotationPresent(RestController.class)) {
+ return bean;
+ }
+
+ Class<?> httpControllerClass = bean.getClass();
+ RequestMapping requestMapping = httpControllerClass.getAnnotation(RequestMapping.class);
+ List<String> prePaths;
+ if (requestMapping != null) {
+ prePaths = Arrays.asList(requestMapping.value());
+ } else {
+ prePaths = new ArrayList<>();
+ }
+ Method[] methods = httpControllerClass.getDeclaredMethods();
+ for (Method method : methods) {
+ for (Class<? extends Annotation> annotationType : MAPPING_CLASS) {
+ Annotation annotation = method.getAnnotation(annotationType);
+ if (annotation != null) {
+ List<String> postPaths = getAnnotationValue(annotation);
+ addPathMapping(bean, prePaths, method, postPaths);
+ }
+ }
+ }
+
+ return bean;
+ }
+
+ private static List<String> getAnnotationValue(Annotation annotation) {
+ try {
+ Class<? extends Annotation> annotationClass = annotation.annotationType();
+ Method valueMethod = annotationClass.getMethod("value");
+ valueMethod.setAccessible(true);
+ return Arrays.asList((String[]) valueMethod.invoke(annotation));
+ } catch (Throwable e) {
+ return new ArrayList<>();
+ }
+ }
+
+ private static void addPathMapping(Object httpController, List<String> prePaths, Method method, List<String> postPaths) {
+ Class<?>[] parameterTypes = method.getParameterTypes();
+ Annotation[][] parameterAnnotations = method.getParameterAnnotations();
+ ParamMetaData[] paramMetaDatas = new ParamMetaData[parameterTypes.length];
+ for (int i = 0; i < parameterTypes.length; i++) {
+ Class<? extends Annotation> parameterAnnotationType = null;
+ if (parameterAnnotations[i] != null && parameterAnnotations[i].length > 0) {
+ parameterAnnotationType = parameterAnnotations[i][0].annotationType();
+ }
+
+ if (parameterAnnotationType == null) {
+ parameterAnnotationType = RequestParam.class;
+ }
+
+ ParamMetaData paramMetaData = new ParamMetaData();
+ ParamMetaData.ParamConvertType paramConvertType = MAPPING_PARAM_TYPE.get(parameterAnnotationType);
+ paramMetaData.setParamConvertType(paramConvertType);
+ paramMetaDatas[i] = paramMetaData;
+ }
+ int maxSize = Math.max(prePaths.size(), postPaths.size());
+ for (int i = prePaths.size(); i < maxSize; i++) {
+ prePaths.add("/");
+ }
+
+ for (int i = postPaths.size(); i < maxSize; i++) {
+ postPaths.add("/");
+ }
+
+ for (String prePath : prePaths) {
+ for (String postPath : postPaths) {
+ String fullPath = (prePath + "/" + postPath).replaceAll("(/)+", "/");
+ HttpInvocation httpInvocation = new HttpInvocation();
+ httpInvocation.setMethod(method);
+ httpInvocation.setController(httpController);
+ httpInvocation.setPath(fullPath);
+ httpInvocation.setParamMetaData(paramMetaDatas);
+ ControllerManager.addHttpInvocation(httpInvocation);
+ }
+ }
+ }
+
+
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java b/server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java
index dda663c..dbe1535 100644
--- a/server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java
+++ b/server/src/main/java/org/apache/seata/server/cluster/manager/ClusterWatcherManager.java
@@ -27,6 +27,7 @@
import javax.servlet.AsyncContext;
import javax.servlet.http.HttpServletResponse;
+import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.DefaultHttpResponse;
@@ -41,6 +42,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.event.EventListener;
+import org.springframework.http.HttpStatus;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@@ -66,11 +68,7 @@
Optional.ofNullable(WATCHERS.remove(group))
.ifPresent(watchers -> watchers.parallelStream().forEach(watcher -> {
if (System.currentTimeMillis() >= watcher.getTimeout()) {
- HttpServletResponse httpServletResponse =
- (HttpServletResponse)((AsyncContext)watcher.getAsyncContext()).getResponse();
- watcher.setDone(true);
- httpServletResponse.setStatus(HttpServletResponse.SC_NOT_MODIFIED);
- ((AsyncContext)watcher.getAsyncContext()).complete();
+ notify(watcher, HttpStatus.NOT_MODIFIED.value());
}
if (!watcher.isDone()) {
// Re-register
@@ -94,17 +92,35 @@
}
private void notify(Watcher<?> watcher) {
- Channel channel = (Channel) watcher.getAsyncContext();
- if (channel instanceof Http2StreamChannel) {
- // http2
- channel.writeAndFlush(new DefaultHttp2DataFrame(Unpooled.wrappedBuffer(Unpooled.EMPTY_BUFFER)));
+ notify(watcher, HttpResponseStatus.OK.code());
+ }
+ private void notify(Watcher<?> watcher, int statusCode) {
+ Object ctx = watcher.getAsyncContext();
+ watcher.setDone(true);
+ if (ctx instanceof Channel) {
+ Channel channel = (Channel) ctx;
+ if (channel instanceof Http2StreamChannel) {
+ ByteBuf buf = Unpooled.buffer(4);
+ buf.writeInt(statusCode);
+ channel.writeAndFlush(new DefaultHttp2DataFrame(buf));
+ return;
+ } else {
+ // http
+ channel.writeAndFlush(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(statusCode)));
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("notify cluster change event to: {}", channel.remoteAddress());
+ }
+
return;
- } else {
- // http
- channel.writeAndFlush(new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK));
- }
- if (logger.isDebugEnabled()) {
- logger.debug("notify cluster change event to: {}", channel.remoteAddress());
+ } else if (ctx instanceof AsyncContext) {
+ AsyncContext asyncContext = (AsyncContext) ctx;
+ HttpServletResponse httpServletResponse = (HttpServletResponse) asyncContext.getResponse();
+ if (logger.isDebugEnabled()) {
+ logger.debug("notify cluster change event to: {}", asyncContext.getRequest().getRemoteAddr());
+ }
+ httpServletResponse.setStatus(HttpServletResponse.SC_OK);
+ asyncContext.complete();
}
}