add dubbo protocol to pu server and port reuse (#10602)

* enable dubbo protocol to use pu server

* wire protocol returns protocol name

* delete default channel group

* export extra protocol

* add url mapper and handler mapper to pu server

* fix ci

* adjust some changes

* default value for url and handler in pu server

* get protocol name using extension loader

* naming readability and code annotations

* ext-protocols change to a multiProtocol string with comma split

* fix ci

* method rename

* don't use pu server by default

* rename method
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/ProtocolConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/ProtocolConfig.java
index f235f44..9ef6daf 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/ProtocolConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/ProtocolConfig.java
@@ -200,6 +200,11 @@
 
     private Boolean sslEnabled;
 
+    /*
+     * Extra Protocol for this service, using Port Unification Server
+     */
+    private String extProtocol;
+
     public ProtocolConfig() {
     }
 
@@ -551,4 +556,11 @@
         return StringUtils.isNotEmpty(name);
     }
 
+    public String getExtProtocol() {
+        return extProtocol;
+    }
+
+    public void setExtProtocol(String extProtocol) {
+        this.extProtocol = extProtocol;
+    }
 }
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
index 7e8c3f9..054f679 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ServiceConfig.java
@@ -89,6 +89,7 @@
 import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
 import static org.apache.dubbo.remoting.Constants.BIND_IP_KEY;
 import static org.apache.dubbo.remoting.Constants.BIND_PORT_KEY;
+import static org.apache.dubbo.remoting.Constants.IS_PU_SERVER_KEY;
 import static org.apache.dubbo.rpc.Constants.GENERIC_KEY;
 import static org.apache.dubbo.rpc.Constants.LOCAL_PROTOCOL;
 import static org.apache.dubbo.rpc.Constants.PROXY_KEY;
@@ -595,10 +596,36 @@
 
             // export to remote if the config is not local (export to local only when config is local)
             if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
+                // export to extra protocol is used in remote export
+                String extProtocol = url.getParameter("ext.protocol", "");
+                List<String> protocols = new ArrayList<>();
+                // export original url
+                url = URLBuilder.from(url).
+                    addParameter(IS_PU_SERVER_KEY, Boolean.TRUE.toString()).
+                    removeParameter("ext.protocol").
+                    build();
                 url = exportRemote(url, registryURLs);
                 if (!isGeneric(generic) && !getScopeModel().isInternal()) {
                     MetadataUtils.publishServiceDefinition(url, providerModel.getServiceModel(), getApplicationModel());
                 }
+
+                if (!extProtocol.equals("")) {
+                    String[] extProtocols = extProtocol.split(",", -1);
+                    protocols.addAll(Arrays.asList(extProtocols));
+                }
+                // export extra protocols
+                for(String protocol : protocols) {
+                    if(!protocol.equals("")){
+                        URL localUrl = URLBuilder.from(url).
+                            setProtocol(protocol).
+                            build();
+                        localUrl = exportRemote(localUrl, registryURLs);
+                        if (!isGeneric(generic) && !getScopeModel().isInternal()) {
+                            MetadataUtils.publishServiceDefinition(localUrl, providerModel.getServiceModel(), getApplicationModel());
+                        }
+                        this.urls.add(localUrl);
+                    }
+                }
             }
         }
         this.urls.add(url);
diff --git a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
index 3af0886..10bc5c6 100644
--- a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
+++ b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
@@ -1278,6 +1278,11 @@
                 <xsd:documentation><![CDATA[ The service port. ]]></xsd:documentation>
             </xsd:annotation>
         </xsd:attribute>
+        <xsd:attribute name="ext-protocol" type="xsd:string">
+            <xsd:annotation>
+                <xsd:documentation><![CDATA[ extra protocol.]]]]></xsd:documentation>
+            </xsd:annotation>
+        </xsd:attribute>
         <xsd:attribute name="threadpool" type="xsd:string">
             <xsd:annotation>
                 <xsd:documentation><![CDATA[ The thread pool type. ]]></xsd:documentation>
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
index ac18f2a..02186be 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/Constants.java
@@ -87,6 +87,8 @@
 
     String SERVER_KEY = "server";
 
+    String IS_PU_SERVER_KEY = "ispuserver";
+
     String CLIENT_KEY = "client";
 
     String DEFAULT_REMOTING_CLIENT = "netty";
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java
index 675e6f5..278712b 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/AbstractWireProtocol.java
@@ -16,6 +16,11 @@
  */
 package org.apache.dubbo.remoting.api;
 
+import org.apache.dubbo.common.URL;
+
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.ssl.SslContext;
+
 public abstract class AbstractWireProtocol implements WireProtocol {
 
     private final ProtocolDetector detector;
@@ -30,6 +35,11 @@
     }
 
     @Override
+    public void configClientPipeline(URL url, ChannelPipeline pipeline, SslContext sslContext) {
+
+    }
+
+    @Override
     public void close() {
     }
 }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/pu/AbstractPortUnificationServer.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/pu/AbstractPortUnificationServer.java
index a38648b..9a6ac62 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/pu/AbstractPortUnificationServer.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/pu/AbstractPortUnificationServer.java
@@ -23,10 +23,25 @@
 import org.apache.dubbo.remoting.transport.AbstractServer;
 
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public abstract class AbstractPortUnificationServer extends AbstractServer {
     private final List<WireProtocol> protocols;
 
+    /*
+    protocol name --> URL object
+    wire protocol will get url object to config server pipeline for channel
+     */
+    private final Map<String, URL> supportedUrls = new ConcurrentHashMap<>();
+
+    /*
+    protocol name --> ChannelHandler object
+    wire protocol will get handler to config server pipeline for channel
+    (for triple protocol, it's a default handler that do nothing)
+     */
+    private final Map<String, ChannelHandler> supportedHandlers = new ConcurrentHashMap<>();
+
     public AbstractPortUnificationServer(URL url, ChannelHandler handler) throws RemotingException {
         super(url, handler);
         this.protocols = url.getOrDefaultFrameworkModel().getExtensionLoader(WireProtocol.class).getActivateExtension(url, new String[0]);
@@ -36,4 +51,23 @@
         return protocols;
     }
 
+    /*
+    This method registers URL object and corresponding channel handler to pu server.
+    In PuServerExchanger.bind, this method is called with ConcurrentHashMap.computeIfPresent to register messages to
+    this supportedUrls and supportedHandlers
+     */
+    public void addSupportedProtocol(URL url, ChannelHandler handler) {
+        this.supportedUrls.put(url.getProtocol(), url);
+        this.supportedHandlers.put(url.getProtocol(), handler);
+    }
+
+    protected Map<String, URL> getSupportedUrls() {
+        // this getter is just used by implementation of this class
+        return supportedUrls;
+    }
+
+    public Map<String, ChannelHandler> getSupportedHandlers() {
+        // this getter is just used by implementation of this class
+        return supportedHandlers;
+    }
 }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/PortUnificationExchanger.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/PortUnificationExchanger.java
index f817855..10fec10 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/PortUnificationExchanger.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/PortUnificationExchanger.java
@@ -34,7 +34,7 @@
     private static final Logger log = LoggerFactory.getLogger(PortUnificationExchanger.class);
     private static final ConcurrentMap<String, RemotingServer> servers = new ConcurrentHashMap<>();
 
-    public static void bind(URL url, ChannelHandler handler) {
+    public static RemotingServer bind(URL url, ChannelHandler handler) {
         servers.computeIfAbsent(url.getAddress(), addr -> {
             final AbstractPortUnificationServer server;
             try {
@@ -45,6 +45,12 @@
             // server.bind();
             return server;
         });
+
+        servers.computeIfPresent(url.getAddress(), (addr, server) -> {
+            ((AbstractPortUnificationServer) server).addSupportedProtocol(url, handler);
+            return server;
+        });
+        return servers.get(url.getAddress());
     }
 
     public static void close() {
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchanger.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchanger.java
index 90f7c98..67f2440 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchanger.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/header/HeaderExchanger.java
@@ -23,8 +23,11 @@
 import org.apache.dubbo.remoting.exchange.ExchangeHandler;
 import org.apache.dubbo.remoting.exchange.ExchangeServer;
 import org.apache.dubbo.remoting.exchange.Exchanger;
+import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
 import org.apache.dubbo.remoting.transport.DecodeHandler;
 
+import static org.apache.dubbo.remoting.Constants.IS_PU_SERVER_KEY;
+
 /**
  * DefaultMessenger
  *
@@ -41,7 +44,14 @@
 
     @Override
     public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
-        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
+        ExchangeServer server;
+        boolean isPuServerKey = url.getParameter(IS_PU_SERVER_KEY, false);
+        if(isPuServerKey) {
+            server = new HeaderExchangeServer(PortUnificationExchanger.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
+        }else {
+            server = new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
+        }
+        return server;
     }
 
 }
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractChannel.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractChannel.java
index dee60da..2835326 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractChannel.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractChannel.java
@@ -44,4 +44,9 @@
     public String toString() {
         return getLocalAddress() + " -> " + getRemoteAddress();
     }
+
+    @Override
+    protected void setUrl(URL url) {
+        super.setUrl(url);
+    }
 }
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java
index 4493bb3..7022146 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java
@@ -94,6 +94,8 @@
             if (ret == null) {
                 ret = nettyChannel;
             }
+        } else {
+            ret.markActive(true);
         }
         return ret;
     }
@@ -231,6 +233,7 @@
         attributes.remove(key);
     }
 
+
     @Override
     public int hashCode() {
         final int prime = 31;
@@ -240,6 +243,11 @@
     }
 
     @Override
+    protected void setUrl(URL url) {
+        super.setUrl(url);
+    }
+
+    @Override
     public boolean equals(Object obj) {
         if (this == obj) {
             return true;
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java
index e732f96..6a3b926 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java
@@ -20,6 +20,7 @@
 import org.apache.dubbo.common.config.ConfigurationUtils;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.NetUtils;
 import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.ChannelHandler;
@@ -29,6 +30,7 @@
 import org.apache.dubbo.remoting.api.SslContexts;
 import org.apache.dubbo.remoting.api.WireProtocol;
 import org.apache.dubbo.remoting.api.pu.AbstractPortUnificationServer;
+import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.buffer.PooledByteBufAllocator;
@@ -37,14 +39,15 @@
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
-import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.ssl.SslContext;
 import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GlobalEventExecutor;
 
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.dubbo.common.constants.CommonConstants.ANYHOST_KEY;
@@ -61,9 +64,6 @@
 
     private static final Logger logger = LoggerFactory.getLogger(NettyPortUnificationServer.class);
 
-    private final DefaultChannelGroup channels = new DefaultChannelGroup(
-        GlobalEventExecutor.INSTANCE);
-
     private final int serverShutdownTimeoutMills;
     /**
      * netty server bootstrap.
@@ -75,9 +75,11 @@
     private io.netty.channel.Channel channel;
     private EventLoopGroup bossGroup;
     private EventLoopGroup workerGroup;
+    private final Map<String, Channel> dubboChannels = new ConcurrentHashMap<>();
+
 
     public NettyPortUnificationServer(URL url, ChannelHandler handler) throws RemotingException {
-        super(url, handler);
+        super(url, ChannelHandlers.wrap(handler, url));
 
         // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
         // the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
@@ -87,6 +89,11 @@
     }
 
     @Override
+    public void addSupportedProtocol(URL url, ChannelHandler handler) {
+        super.addSupportedProtocol(url, ChannelHandlers.wrap(handler, url));
+    }
+
+    @Override
     public void close() {
         if (channel != null) {
             doClose();
@@ -127,7 +134,8 @@
                     final ChannelPipeline p = ch.pipeline();
                     final NettyPortUnificationServerHandler puHandler;
                     puHandler = new NettyPortUnificationServerHandler(getUrl(), sslContext, true, getProtocols(),
-                        channels, NettyPortUnificationServer.this);
+                        NettyPortUnificationServer.this, NettyPortUnificationServer.this.dubboChannels,
+                        getSupportedUrls(), getSupportedHandlers());
                     p.addLast("negotiation-protocol", puHandler);
                 }
             });
@@ -146,7 +154,6 @@
 
     @Override
     public void doClose(){
-        final long st = System.currentTimeMillis();
 
         try {
             if (channel != null) {
@@ -155,13 +162,25 @@
                 channel = null;
             }
 
-            channels.close().await(serverShutdownTimeoutMills);
-            final long cost = System.currentTimeMillis() - st;
-            logger.info("Port unification server closed. cost:" + cost);
-        } catch (InterruptedException e) {
+        } catch (Throwable e) {
             logger.warn("Interrupted while shutting down", e);
         }
 
+        try {
+            Collection<Channel> channels = getChannels();
+            if (CollectionUtils.isNotEmpty(channels)) {
+                for (Channel channel : channels) {
+                    try {
+                        channel.close();
+                    } catch (Throwable e) {
+                        logger.warn(e.getMessage(), e);
+                    }
+                }
+            }
+        } catch (Throwable e) {
+            logger.warn(e.getMessage(), e);
+        }
+
         for (WireProtocol protocol : getProtocols()) {
             protocol.close();
         }
@@ -188,16 +207,23 @@
 
     @Override
     public Collection<Channel> getChannels() {
-        return null;
+        Collection<Channel> chs = new ArrayList<>(this.dubboChannels.size());
+        chs.addAll(this.dubboChannels.values());
+        return chs;
     }
 
     @Override
     public Channel getChannel(InetSocketAddress remoteAddress) {
-        return null;
+        return dubboChannels.get(NetUtils.toAddressString(remoteAddress));
     }
 
     public InetSocketAddress getLocalAddress() {
         return (InetSocketAddress) channel.localAddress();
     }
 
+    @Override
+    public boolean canHandleIdle() {
+        return true;
+    }
+
 }
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
index 7605798..e6fcb1a 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
@@ -20,21 +20,23 @@
 import org.apache.dubbo.common.io.Bytes;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
 import org.apache.dubbo.remoting.ChannelHandler;
 import org.apache.dubbo.remoting.api.ProtocolDetector;
 import org.apache.dubbo.remoting.api.WireProtocol;
-import org.apache.dubbo.remoting.api.pu.ChannelOperator;
 import org.apache.dubbo.remoting.buffer.ChannelBuffer;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
-import io.netty.channel.group.ChannelGroup;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
 
+import java.net.InetSocketAddress;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 public class NettyPortUnificationServerHandler extends ByteToMessageDecoder {
@@ -42,23 +44,27 @@
     private static final Logger LOGGER = LoggerFactory.getLogger(
         NettyPortUnificationServerHandler.class);
 
-    private final ChannelGroup channels;
-
     private final SslContext sslCtx;
     private final URL url;
     private final ChannelHandler handler;
     private final boolean detectSsl;
     private final List<WireProtocol> protocols;
+    private final Map<String, Channel> dubboChannels;
+    private final Map<String, URL> urlMapper;
+    private final Map<String, ChannelHandler> handlerMapper;
+
 
     public NettyPortUnificationServerHandler(URL url, SslContext sslCtx, boolean detectSsl,
-                                             List<WireProtocol> protocols, ChannelGroup channels,
-                                             ChannelHandler handler) {
+                                             List<WireProtocol> protocols, ChannelHandler handler,
+                                             Map<String, Channel> dubboChannels, Map<String, URL> urlMapper, Map<String, ChannelHandler> handlerMapper) {
         this.url = url;
         this.sslCtx = sslCtx;
         this.protocols = protocols;
         this.detectSsl = detectSsl;
-        this.channels = channels;
         this.handler = handler;
+        this.dubboChannels = dubboChannels;
+        this.urlMapper = urlMapper;
+        this.handlerMapper = handlerMapper;
     }
 
     @Override
@@ -69,7 +75,11 @@
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         super.channelActive(ctx);
-        channels.add(ctx.channel());
+        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+        if (channel != null) {
+            // this is needed by some test cases
+            dubboChannels.put(NetUtils.toAddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
+        }
     }
 
     @Override
@@ -94,7 +104,12 @@
                     case UNRECOGNIZED:
                         continue;
                     case RECOGNIZED:
-                        ChannelOperator operator = new NettyConfigOperator(channel, handler);
+                        String protocolName = url.getOrDefaultFrameworkModel().getExtensionLoader(WireProtocol.class)
+                            .getExtensionName(protocol);
+                        ChannelHandler localHandler = this.handlerMapper.getOrDefault(protocolName, handler);
+                        URL localURL = this.urlMapper.getOrDefault(protocolName, url);
+                        channel.setUrl(localURL);
+                        NettyConfigOperator operator = new NettyConfigOperator(channel, localHandler);
                         protocol.configServerProtocolHandler(url, operator);
                         ctx.pipeline().remove(this);
                     case NEED_MORE_DATA:
@@ -123,7 +138,8 @@
         ChannelPipeline p = ctx.pipeline();
         p.addLast("ssl", sslCtx.newHandler(ctx.alloc()));
         p.addLast("unificationA",
-            new NettyPortUnificationServerHandler(url, sslCtx, false, protocols, channels, handler));
+            new NettyPortUnificationServerHandler(url, sslCtx, false, protocols,
+                handler, dubboChannels, urlMapper, handlerMapper));
         p.remove(this);
     }
 
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index b2ccfbe..c4e238c 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -33,6 +33,7 @@
 import org.apache.dubbo.remoting.exchange.ExchangeHandler;
 import org.apache.dubbo.remoting.exchange.ExchangeServer;
 import org.apache.dubbo.remoting.exchange.Exchangers;
+import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
 import org.apache.dubbo.remoting.exchange.support.ExchangeHandlerAdapter;
 import org.apache.dubbo.rpc.Exporter;
 import org.apache.dubbo.rpc.Invocation;
@@ -334,6 +335,7 @@
         String key = url.getAddress();
         // client can export a service which only for server to invoke
         boolean isServer = url.getParameter(IS_SERVER_KEY, true);
+
         if (isServer) {
             ProtocolServer server = serverMap.get(key);
             if (server == null) {
@@ -653,6 +655,7 @@
                 }
             }
         }
+        PortUnificationExchanger.close();
         referenceClientMap.clear();
 
         super.destroy();
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/pu/DubboDetector.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/pu/DubboDetector.java
new file mode 100644
index 0000000..88d146c
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/pu/DubboDetector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo.pu;
+
+import org.apache.dubbo.remoting.api.ProtocolDetector;
+import org.apache.dubbo.remoting.buffer.ByteBufferBackedChannelBuffer;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.remoting.buffer.ChannelBuffers;
+
+import java.nio.ByteBuffer;
+
+import static java.lang.Math.min;
+
+public class DubboDetector implements ProtocolDetector {
+    private final ChannelBuffer Preface = new ByteBufferBackedChannelBuffer(
+        ByteBuffer.wrap(new byte[]{(byte)0xda, (byte)0xbb})
+    );
+
+    @Override
+    public Result detect(ChannelBuffer in) {
+        int prefaceLen = Preface.readableBytes();
+        int bytesRead = min(in.readableBytes(), prefaceLen);
+
+        if (bytesRead ==0 || !ChannelBuffers.prefixEquals(in,  Preface,  bytesRead)) {
+            return Result.UNRECOGNIZED;
+        }
+        if (bytesRead == prefaceLen) {
+            return Result.RECOGNIZED;
+        }
+
+        return Result.NEED_MORE_DATA;
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/pu/DubboWireProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/pu/DubboWireProtocol.java
new file mode 100644
index 0000000..ba83394
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/pu/DubboWireProtocol.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.dubbo.pu;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.api.AbstractWireProtocol;
+import org.apache.dubbo.remoting.api.pu.ChannelOperator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Activate
+public class DubboWireProtocol extends AbstractWireProtocol {
+    public DubboWireProtocol() {
+        super(new DubboDetector());
+    }
+
+
+    @Override
+    public void configServerProtocolHandler(URL url, ChannelOperator operator) {
+        List<ChannelHandler> handlers = new ArrayList<>();
+        // operator(for now nettyOperator)'s duties
+        // 1. config codec2 for the protocol(load by extension loader)
+        // 2. config handlers passed by wire protocol
+        // ( for triple, some h2 netty handler and logic handler to handle connection;
+        //   for dubbo, nothing, an empty handlers is used to trigger operator logic)
+        // 3. config Dubbo Inner handler(for dubbo protocol, this handler handles connection)
+        operator.configChannelHandler(handlers);
+    }
+}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.api.WireProtocol b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.api.WireProtocol
new file mode 100644
index 0000000..06f029b
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.remoting.api.WireProtocol
@@ -0,0 +1 @@
+dubbo=org.apache.dubbo.rpc.protocol.dubbo.pu.DubboWireProtocol