Add tests for ClientChannelManager
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/metrics/ChannelMetrics.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/metrics/ChannelMetrics.java
deleted file mode 100755
index db959b7..0000000
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/common/metrics/ChannelMetrics.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.remoting.common.metrics;
-
-import io.netty.channel.group.ChannelGroup;
-
-public interface ChannelMetrics {
-
-    Integer getChannelCount();
-
-    ChannelGroup getChannels();
-}
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
index cc81a11..9fa79c2 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/config/RemotingConfig.java
@@ -38,7 +38,7 @@
     private int serviceThreadBlockQueueSize = 50000;
     private boolean clientNativeEpollEnable = false;
     private int clientWorkerThreads = 16 + Runtime.getRuntime().availableProcessors() * 2;
-    private int clientConnectionFutureAwaitTimeoutMillis = 30000;
+    private int clientConnectionFutureAwaitTimeoutMillis = 3000;
     private int clientAsyncCallbackExecutorThreads = 16 + Runtime.getRuntime().availableProcessors() * 2;
     private int clientOnewayInvokeSemaphore = 20480;
 
diff --git a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java
index 2f59d24..0b084a0 100644
--- a/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java
+++ b/remoting-core/remoting-impl/src/main/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManager.java
@@ -38,7 +38,7 @@
     protected static final Logger LOG = LoggerFactory.getLogger(ClientChannelManager.class);
 
     private static final long LOCK_TIMEOUT_MILLIS = 3000;
-    private final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<>();
+    final ConcurrentHashMap<String, ChannelWrapper> channelTables = new ConcurrentHashMap<>();
     private final Lock lockChannelTables = new ReentrantLock();
     private final Bootstrap clientBootstrap;
     private final RemotingConfig clientConfig;
@@ -101,8 +101,7 @@
             } else {
                 LOG.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
             }
-        } catch (InterruptedException e) {
-            e.printStackTrace();
+        } catch (InterruptedException ignore) {
         }
 
         if (cw != null) {
@@ -125,9 +124,6 @@
     }
 
     void closeChannel(final String addr, final Channel channel) {
-        if (null == channel)
-            return;
-
         final String addrRemote = null == addr ? extractRemoteAddress(channel) : addr;
         try {
             if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
@@ -171,9 +167,6 @@
     }
 
     void closeChannel(final Channel channel) {
-        if (null == channel)
-            return;
-
         try {
             if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                 try {
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
index 21c7b38..aab6dfb 100644
--- a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/BaseTest.java
@@ -20,6 +20,7 @@
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.rocketmq.remoting.api.command.RemotingCommand;
@@ -28,14 +29,34 @@
 import org.apache.rocketmq.remoting.impl.command.RemotingCommandFactoryImpl;
 
 public class BaseTest {
-    protected void runInThreads(Runnable runnable, int threadsNum) {
+    protected void runInThreads(final Runnable runnable, int threadsNum) {
         ExecutorService executor = Executors.newFixedThreadPool(threadsNum);
         for (int i = 0; i < threadsNum; i++) {
-            executor.submit(runnable);
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    runnable.run();
+                }
+            });
         }
+
         ThreadUtils.shutdownGracefully(executor, 5, TimeUnit.SECONDS);
     }
 
+    protected void runInThreads(final Runnable runnable, int threadsNum, int timeoutMillis) throws InterruptedException {
+        final Semaphore semaphore = new Semaphore(0);
+
+        runInThreads(new Runnable() {
+            @Override
+            public void run() {
+                runnable.run();
+                semaphore.release();
+            }
+        }, threadsNum);
+
+        semaphore.tryAcquire(threadsNum, timeoutMillis, TimeUnit.MILLISECONDS);
+    }
+
     protected void shouldNotReachHere() {
         throw new RuntimeException("shouldn't reach here");
     }
diff --git a/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java
new file mode 100644
index 0000000..16084d8
--- /dev/null
+++ b/remoting-core/remoting-impl/src/test/java/org/apache/rocketmq/remoting/impl/netty/ClientChannelManagerTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.remoting.impl.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.DefaultChannelPromise;
+import io.netty.channel.DefaultEventLoop;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.remoting.BaseTest;
+import org.apache.rocketmq.remoting.config.RemotingConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ClientChannelManagerTest extends BaseTest {
+    private final static String TARGET_ADDR = "127.0.0.1:8080";
+    @Mock
+    private Bootstrap clientBootstrap;
+
+    @Mock
+    private Channel channel;
+
+    private ClientChannelManager channelManager;
+
+    private ChannelPromise channelPromise;
+
+    @Before
+    public void init() {
+        channelPromise = new DefaultChannelPromise(channel, new DefaultEventLoop());
+
+        when(channel.isActive()).thenReturn(true);
+        when(clientBootstrap.connect(any(SocketAddress.class))).thenReturn(channelPromise);
+        when(channel.close()).thenReturn(channelPromise);
+        when(channel.remoteAddress()).thenReturn(new InetSocketAddress(8080));
+
+        channelManager = new ClientChannelManager(clientBootstrap, new RemotingConfig());
+    }
+
+    @Test
+    public void clear() {
+        channelPromise.setSuccess();
+        Channel targetChannel = channelManager.createIfAbsent(TARGET_ADDR);
+
+        assertThat(targetChannel).isNotNull();
+        assertThat(channelManager.channelTables.size()).isEqualTo(1);
+
+        channelManager.clear();
+        assertThat(channelManager.channelTables.size()).isEqualTo(0);
+
+    }
+
+    @Test
+    public void createIfAbsent_UseExistingConnection_Success() {
+        channelPromise.setSuccess();
+        assertThat(channelManager.channelTables.size()).isEqualTo(0);
+        Channel targetChannel = channelManager.createIfAbsent(TARGET_ADDR);
+        assertThat(targetChannel).isEqualTo(channel);
+
+        assertThat(channelManager.channelTables.size()).isEqualTo(1);
+        targetChannel = channelManager.createIfAbsent(TARGET_ADDR);
+        assertThat(targetChannel).isEqualTo(channel);
+        assertThat(channelManager.channelTables.size()).isEqualTo(1);
+    }
+
+    @Test
+    public void createIfAbsent_CreateNewConnection_Success() {
+        channelPromise.setSuccess();
+        Channel targetChannel = channelManager.createIfAbsent(TARGET_ADDR);
+        assertThat(targetChannel).isEqualTo(channel);
+    }
+
+    @Test
+    public void createIfAbsent_Concurrent_Success() throws InterruptedException {
+        int concurrentNum = 3;
+        final boolean[] channelMismatch = {false};
+
+        runInThreads(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(10);
+                    channelPromise.setSuccess();
+                } catch (InterruptedException ignore) {
+                }
+            }
+        }, 1);
+        runInThreads(new Runnable() {
+            @Override
+            public void run() {
+                Channel targetChannel = channelManager.createIfAbsent(TARGET_ADDR);
+                if (targetChannel != channel) {
+                    channelMismatch[0] = true;
+                }
+            }
+        }, concurrentNum, 3000);
+
+        assertThat(channelMismatch[0]).isFalse();
+        assertThat(channelManager.channelTables.size()).isEqualTo(1);
+    }
+
+    @Test
+    public void createIfAbsent_ClosedChannel_NullReturn() {
+        channelPromise.setSuccess();
+        when(channel.isActive()).thenReturn(false);
+        Channel targetChannel = channelManager.createIfAbsent(TARGET_ADDR);
+        assertThat(targetChannel).isNull();
+    }
+
+    @Test
+    public void closeChannel_WithChannel_Success() {
+        channelPromise.setSuccess();
+        Channel targetChannel = channelManager.createIfAbsent(TARGET_ADDR);
+
+        channelManager.closeChannel(targetChannel);
+        assertThat(channelManager.channelTables.size()).isEqualTo(0);
+    }
+
+    @Test
+    public void closeChannel_WithAddr_Success() {
+        channelPromise.setSuccess();
+        Channel targetChannel = channelManager.createIfAbsent(TARGET_ADDR);
+
+        channelManager.closeChannel(TARGET_ADDR, targetChannel);
+        assertThat(channelManager.channelTables.size()).isEqualTo(0);
+    }
+
+    @Test
+    public void closeChannel_NonExistingChannel_Success() {
+        channelPromise.setSuccess();
+        Channel targetChannel = channelManager.createIfAbsent(TARGET_ADDR);
+
+        channelManager.closeChannel(mock(Channel.class));
+        assertThat(channelManager.channelTables.size()).isEqualTo(1);
+    }
+}
\ No newline at end of file