add dubbo protocol to pu server and port reuse (#10602)
* enable dubbo protocol to use pu server
* wire protocol returns protocol name
* delete default channel group
* export extra protocol
* add url mapper and handler mapper to pu server
* fix ci
* adjust some changes
* default value for url and handler in pu server
* get protocol name using extension loader
* naming readability and code annotations
* ext-protocols change to a multiProtocol string with comma split
* fix ci
* method rename
* don't use pu server by default
* rename method
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/ProtocolConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/ProtocolConfig.java
index f235f44..9ef6daf 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/ProtocolConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/ProtocolConfig.java
@@ -200,6 +200,11 @@
private Boolean sslEnabled;
+ /*
+ * Extra Protocol for this service, using Port Unification Server
+ */
+ private String extProtocol;
+
public ProtocolConfig() {
}
@@ -551,4 +556,11 @@
return StringUtils.isNotEmpty(name);
}
+ public String getExtProtocol() {
+ return extProtocol;
+ }
+
+ public void setExtProtocol(String extProtocol) {
+ this.extProtocol = extProtocol;
+ }
}
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
index 7e8c3f9..054f679 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
@@ -89,6 +89,7 @@
import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
import static org.apache.dubbo.remoting.Constants.BIND_IP_KEY;
import static org.apache.dubbo.remoting.Constants.BIND_PORT_KEY;
+import static org.apache.dubbo.remoting.Constants.IS_PU_SERVER_KEY;
import static org.apache.dubbo.rpc.Constants.GENERIC_KEY;
import static org.apache.dubbo.rpc.Constants.LOCAL_PROTOCOL;
import static org.apache.dubbo.rpc.Constants.PROXY_KEY;
@@ -595,10 +596,36 @@
// export to remote if the config is not local (export to local only when config is local)
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
+ // export to extra protocol is used in remote export
+ String extProtocol = url.getParameter("ext.protocol", "");
+ List<String> protocols = new ArrayList<>();
+ // export original url
+ url = URLBuilder.from(url).
+ addParameter(IS_PU_SERVER_KEY, Boolean.TRUE.toString()).
+ removeParameter("ext.protocol").
+ build();
url = exportRemote(url, registryURLs);
if (!isGeneric(generic) && !getScopeModel().isInternal()) {
MetadataUtils.publishServiceDefinition(url, providerModel.getServiceModel(), getApplicationModel());
}
+
+ if (!extProtocol.equals("")) {
+ String[] extProtocols = extProtocol.split(",", -1);
+ protocols.addAll(Arrays.asList(extProtocols));
+ }
+ // export extra protocols
+ for(String protocol : protocols) {
+ if(!protocol.equals("")){
+ URL localUrl = URLBuilder.from(url).
+ setProtocol(protocol).
+ build();
+ localUrl = exportRemote(localUrl, registryURLs);
+ if (!isGeneric(generic) && !getScopeModel().isInternal()) {
+ MetadataUtils.publishServiceDefinition(localUrl, providerModel.getServiceModel(), getApplicationModel());
+ }
+ this.urls.add(localUrl);
+ }
+ }
}
}
this.urls.add(url);
diff --git a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
index 3af0886..10bc5c6 100644
--- a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
+++ b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
@@ -1278,6 +1278,11 @@
<xsd:documentation><![CDATA[ The service port. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="ext-protocol" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[ extra protocol.]]]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
<xsd:attribute name="threadpool" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[ The thread pool type. ]]></xsd:documentation>
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
index ac18f2a..02186be 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
@@ -87,6 +87,8 @@
String SERVER_KEY = "server";
+ String IS_PU_SERVER_KEY = "ispuserver";
+
String CLIENT_KEY = "client";
String DEFAULT_REMOTING_CLIENT = "netty";
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java
index 675e6f5..278712b 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java
@@ -16,6 +16,11 @@
*/
package org.apache.dubbo.remoting.api;
+import org.apache.dubbo.common.URL;
+
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.ssl.SslContext;
+
public abstract class AbstractWireProtocol implements WireProtocol {
private final ProtocolDetector detector;
@@ -30,6 +35,11 @@
}
@Override
+ public void configClientPipeline(URL url, ChannelPipeline pipeline, SslContext sslContext) {
+
+ }
+
+ @Override
public void close() {
}
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/pu/AbstractPortUnificationServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/pu/AbstractPortUnificationServer.java
index a38648b..9a6ac62 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/pu/AbstractPortUnificationServer.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/pu/AbstractPortUnificationServer.java
@@ -23,10 +23,25 @@
import org.apache.dubbo.remoting.transport.AbstractServer;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public abstract class AbstractPortUnificationServer extends AbstractServer {
private final List<WireProtocol> protocols;
+ /*
+ protocol name --> URL object
+ wire protocol will get url object to config server pipeline for channel
+ */
+ private final Map<String, URL> supportedUrls = new ConcurrentHashMap<>();
+
+ /*
+ protocol name --> ChannelHandler object
+ wire protocol will get handler to config server pipeline for channel
+ (for triple protocol, it's a default handler that do nothing)
+ */
+ private final Map<String, ChannelHandler> supportedHandlers = new ConcurrentHashMap<>();
+
public AbstractPortUnificationServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, handler);
this.protocols = url.getOrDefaultFrameworkModel().getExtensionLoader(WireProtocol.class).getActivateExtension(url, new String[0]);
@@ -36,4 +51,23 @@
return protocols;
}
+ /*
+ This method registers URL object and corresponding channel handler to pu server.
+ In PuServerExchanger.bind, this method is called with ConcurrentHashMap.computeIfPresent to register messages to
+ this supportedUrls and supportedHandlers
+ */
+ public void addSupportedProtocol(URL url, ChannelHandler handler) {
+ this.supportedUrls.put(url.getProtocol(), url);
+ this.supportedHandlers.put(url.getProtocol(), handler);
+ }
+
+ protected Map<String, URL> getSupportedUrls() {
+ // this getter is just used by implementation of this class
+ return supportedUrls;
+ }
+
+ public Map<String, ChannelHandler> getSupportedHandlers() {
+ // this getter is just used by implementation of this class
+ return supportedHandlers;
+ }
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/PortUnificationExchanger.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/PortUnificationExchanger.java
index f817855..10fec10 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/PortUnificationExchanger.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/PortUnificationExchanger.java
@@ -34,7 +34,7 @@
private static final Logger log = LoggerFactory.getLogger(PortUnificationExchanger.class);
private static final ConcurrentMap<String, RemotingServer> servers = new ConcurrentHashMap<>();
- public static void bind(URL url, ChannelHandler handler) {
+ public static RemotingServer bind(URL url, ChannelHandler handler) {
servers.computeIfAbsent(url.getAddress(), addr -> {
final AbstractPortUnificationServer server;
try {
@@ -45,6 +45,12 @@
// server.bind();
return server;
});
+
+ servers.computeIfPresent(url.getAddress(), (addr, server) -> {
+ ((AbstractPortUnificationServer) server).addSupportedProtocol(url, handler);
+ return server;
+ });
+ return servers.get(url.getAddress());
}
public static void close() {
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchanger.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchanger.java
index 90f7c98..67f2440 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchanger.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchanger.java
@@ -23,8 +23,11 @@
import org.apache.dubbo.remoting.exchange.ExchangeHandler;
import org.apache.dubbo.remoting.exchange.ExchangeServer;
import org.apache.dubbo.remoting.exchange.Exchanger;
+import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
import org.apache.dubbo.remoting.transport.DecodeHandler;
+import static org.apache.dubbo.remoting.Constants.IS_PU_SERVER_KEY;
+
/**
* DefaultMessenger
*
@@ -41,7 +44,14 @@
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
- return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
+ ExchangeServer server;
+ boolean isPuServerKey = url.getParameter(IS_PU_SERVER_KEY, false);
+ if(isPuServerKey) {
+ server = new HeaderExchangeServer(PortUnificationExchanger.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
+ }else {
+ server = new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
+ }
+ return server;
}
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractChannel.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractChannel.java
index dee60da..2835326 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractChannel.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractChannel.java
@@ -44,4 +44,9 @@
public String toString() {
return getLocalAddress() + " -> " + getRemoteAddress();
}
+
+ @Override
+ protected void setUrl(URL url) {
+ super.setUrl(url);
+ }
}
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java
index 4493bb3..7022146 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java
@@ -94,6 +94,8 @@
if (ret == null) {
ret = nettyChannel;
}
+ } else {
+ ret.markActive(true);
}
return ret;
}
@@ -231,6 +233,7 @@
attributes.remove(key);
}
+
@Override
public int hashCode() {
final int prime = 31;
@@ -240,6 +243,11 @@
}
@Override
+ protected void setUrl(URL url) {
+ super.setUrl(url);
+ }
+
+ @Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java
index e732f96..6a3b926 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java
@@ -20,6 +20,7 @@
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
@@ -29,6 +30,7 @@
import org.apache.dubbo.remoting.api.SslContexts;
import org.apache.dubbo.remoting.api.WireProtocol;
import org.apache.dubbo.remoting.api.pu.AbstractPortUnificationServer;
+import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
@@ -37,14 +39,15 @@
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
-import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_KEY;
@@ -61,9 +64,6 @@
private static final Logger logger = LoggerFactory.getLogger(NettyPortUnificationServer.class);
- private final DefaultChannelGroup channels = new DefaultChannelGroup(
- GlobalEventExecutor.INSTANCE);
-
private final int serverShutdownTimeoutMills;
/**
* netty server bootstrap.
@@ -75,9 +75,11 @@
private io.netty.channel.Channel channel;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
+ private final Map<String, Channel> dubboChannels = new ConcurrentHashMap<>();
+
public NettyPortUnificationServer(URL url, ChannelHandler handler) throws RemotingException {
- super(url, handler);
+ super(url, ChannelHandlers.wrap(handler, url));
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
@@ -87,6 +89,11 @@
}
@Override
+ public void addSupportedProtocol(URL url, ChannelHandler handler) {
+ super.addSupportedProtocol(url, ChannelHandlers.wrap(handler, url));
+ }
+
+ @Override
public void close() {
if (channel != null) {
doClose();
@@ -127,7 +134,8 @@
final ChannelPipeline p = ch.pipeline();
final NettyPortUnificationServerHandler puHandler;
puHandler = new NettyPortUnificationServerHandler(getUrl(), sslContext, true, getProtocols(),
- channels, NettyPortUnificationServer.this);
+ NettyPortUnificationServer.this, NettyPortUnificationServer.this.dubboChannels,
+ getSupportedUrls(), getSupportedHandlers());
p.addLast("negotiation-protocol", puHandler);
}
});
@@ -146,7 +154,6 @@
@Override
public void doClose(){
- final long st = System.currentTimeMillis();
try {
if (channel != null) {
@@ -155,13 +162,25 @@
channel = null;
}
- channels.close().await(serverShutdownTimeoutMills);
- final long cost = System.currentTimeMillis() - st;
- logger.info("Port unification server closed. cost:" + cost);
- } catch (InterruptedException e) {
+ } catch (Throwable e) {
logger.warn("Interrupted while shutting down", e);
}
+ try {
+ Collection<Channel> channels = getChannels();
+ if (CollectionUtils.isNotEmpty(channels)) {
+ for (Channel channel : channels) {
+ try {
+ channel.close();
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
+ }
+ }
+ } catch (Throwable e) {
+ logger.warn(e.getMessage(), e);
+ }
+
for (WireProtocol protocol : getProtocols()) {
protocol.close();
}
@@ -188,16 +207,23 @@
@Override
public Collection<Channel> getChannels() {
- return null;
+ Collection<Channel> chs = new ArrayList<>(this.dubboChannels.size());
+ chs.addAll(this.dubboChannels.values());
+ return chs;
}
@Override
public Channel getChannel(InetSocketAddress remoteAddress) {
- return null;
+ return dubboChannels.get(NetUtils.toAddressString(remoteAddress));
}
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) channel.localAddress();
}
+ @Override
+ public boolean canHandleIdle() {
+ return true;
+ }
+
}
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
index 7605798..e6fcb1a 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
@@ -20,21 +20,23 @@
import org.apache.dubbo.common.io.Bytes;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.api.ProtocolDetector;
import org.apache.dubbo.remoting.api.WireProtocol;
-import org.apache.dubbo.remoting.api.pu.ChannelOperator;
import org.apache.dubbo.remoting.buffer.ChannelBuffer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
+import java.net.InetSocketAddress;
import java.util.List;
+import java.util.Map;
import java.util.Set;
public class NettyPortUnificationServerHandler extends ByteToMessageDecoder {
@@ -42,23 +44,27 @@
private static final Logger LOGGER = LoggerFactory.getLogger(
NettyPortUnificationServerHandler.class);
- private final ChannelGroup channels;
-
private final SslContext sslCtx;
private final URL url;
private final ChannelHandler handler;
private final boolean detectSsl;
private final List<WireProtocol> protocols;
+ private final Map<String, Channel> dubboChannels;
+ private final Map<String, URL> urlMapper;
+ private final Map<String, ChannelHandler> handlerMapper;
+
public NettyPortUnificationServerHandler(URL url, SslContext sslCtx, boolean detectSsl,
- List<WireProtocol> protocols, ChannelGroup channels,
- ChannelHandler handler) {
+ List<WireProtocol> protocols, ChannelHandler handler,
+ Map<String, Channel> dubboChannels, Map<String, URL> urlMapper, Map<String, ChannelHandler> handlerMapper) {
this.url = url;
this.sslCtx = sslCtx;
this.protocols = protocols;
this.detectSsl = detectSsl;
- this.channels = channels;
this.handler = handler;
+ this.dubboChannels = dubboChannels;
+ this.urlMapper = urlMapper;
+ this.handlerMapper = handlerMapper;
}
@Override
@@ -69,7 +75,11 @@
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
- channels.add(ctx.channel());
+ NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+ if (channel != null) {
+ // this is needed by some test cases
+ dubboChannels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
+ }
}
@Override
@@ -94,7 +104,12 @@
case UNRECOGNIZED:
continue;
case RECOGNIZED:
- ChannelOperator operator = new NettyConfigOperator(channel, handler);
+ String protocolName = url.getOrDefaultFrameworkModel().getExtensionLoader(WireProtocol.class)
+ .getExtensionName(protocol);
+ ChannelHandler localHandler = this.handlerMapper.getOrDefault(protocolName, handler);
+ URL localURL = this.urlMapper.getOrDefault(protocolName, url);
+ channel.setUrl(localURL);
+ NettyConfigOperator operator = new NettyConfigOperator(channel, localHandler);
protocol.configServerProtocolHandler(url, operator);
ctx.pipeline().remove(this);
case NEED_MORE_DATA:
@@ -123,7 +138,8 @@
ChannelPipeline p = ctx.pipeline();
p.addLast("ssl", sslCtx.newHandler(ctx.alloc()));
p.addLast("unificationA",
- new NettyPortUnificationServerHandler(url, sslCtx, false, protocols, channels, handler));
+ new NettyPortUnificationServerHandler(url, sslCtx, false, protocols,
+ handler, dubboChannels, urlMapper, handlerMapper));
p.remove(this);
}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index b2ccfbe..c4e238c 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -33,6 +33,7 @@
import org.apache.dubbo.remoting.exchange.ExchangeHandler;
import org.apache.dubbo.remoting.exchange.ExchangeServer;
import org.apache.dubbo.remoting.exchange.Exchangers;
+import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invocation;
@@ -334,6 +335,7 @@
String key = url.getAddress();
// client can export a service which only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
+
if (isServer) {
ProtocolServer server = serverMap.get(key);
if (server == null) {
@@ -653,6 +655,7 @@
}
}
}
+ PortUnificationExchanger.close();
referenceClientMap.clear();
super.destroy();
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/pu/DubboDetector.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/pu/DubboDetector.java
new file mode 100644
index 0000000..88d146c
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/pu/DubboDetector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo.pu;
+
+import org.apache.dubbo.remoting.api.ProtocolDetector;
+import org.apache.dubbo.remoting.buffer.ByteBufferBackedChannelBuffer;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.remoting.buffer.ChannelBuffers;
+
+import java.nio.ByteBuffer;
+
+import static java.lang.Math.min;
+
+public class DubboDetector implements ProtocolDetector {
+ private final ChannelBuffer Preface = new ByteBufferBackedChannelBuffer(
+ ByteBuffer.wrap(new byte[]{(byte)0xda, (byte)0xbb})
+ );
+
+ @Override
+ public Result detect(ChannelBuffer in) {
+ int prefaceLen = Preface.readableBytes();
+ int bytesRead = min(in.readableBytes(), prefaceLen);
+
+ if (bytesRead ==0 || !ChannelBuffers.prefixEquals(in, Preface, bytesRead)) {
+ return Result.UNRECOGNIZED;
+ }
+ if (bytesRead == prefaceLen) {
+ return Result.RECOGNIZED;
+ }
+
+ return Result.NEED_MORE_DATA;
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/pu/DubboWireProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/pu/DubboWireProtocol.java
new file mode 100644
index 0000000..ba83394
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/pu/DubboWireProtocol.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo.pu;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.api.AbstractWireProtocol;
+import org.apache.dubbo.remoting.api.pu.ChannelOperator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Activate
+public class DubboWireProtocol extends AbstractWireProtocol {
+ public DubboWireProtocol() {
+ super(new DubboDetector());
+ }
+
+
+ @Override
+ public void configServerProtocolHandler(URL url, ChannelOperator operator) {
+ List<ChannelHandler> handlers = new ArrayList<>();
+ // operator(for now nettyOperator)'s duties
+ // 1. config codec2 for the protocol(load by extension loader)
+ // 2. config handlers passed by wire protocol
+ // ( for triple, some h2 netty handler and logic handler to handle connection;
+ // for dubbo, nothing, an empty handlers is used to trigger operator logic)
+ // 3. config Dubbo Inner handler(for dubbo protocol, this handler handles connection)
+ operator.configChannelHandler(handlers);
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.api.WireProtocol b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.api.WireProtocol
new file mode 100644
index 0000000..06f029b
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.api.WireProtocol
@@ -0,0 +1 @@
+dubbo=org.apache.dubbo.rpc.protocol.dubbo.pu.DubboWireProtocol