blob: c90418fa4de3fda1c5fc4a63a01a8dc246092ab7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.context.ConfigManager;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
import org.apache.dubbo.remoting.api.connection.ConnectionManager;
import org.apache.dubbo.remoting.api.connection.MultiplexProtocolConnectionManager;
import org.apache.dubbo.remoting.api.pu.DefaultPuHandler;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ModuleModel;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import static org.apache.dubbo.common.constants.CommonConstants.EXECUTOR_MANAGEMENT_MODE_DEFAULT;
import static org.awaitility.Awaitility.await;
public class ConnectionTest {
private static URL url;
private static NettyPortUnificationServer server;
private static ConnectionManager connectionManager;
@BeforeAll
public static void init() throws Throwable {
int port = NetUtils.getAvailablePort();
url = URL.valueOf("empty://127.0.0.1:" + port + "?foo=bar");
ApplicationModel applicationModel = ApplicationModel.defaultModel();
ApplicationConfig applicationConfig = new ApplicationConfig("provider-app");
applicationConfig.setExecutorManagementMode(EXECUTOR_MANAGEMENT_MODE_DEFAULT);
applicationModel.getApplicationConfigManager().setApplication(applicationConfig);
ConfigManager configManager = new ConfigManager(applicationModel);
configManager.setApplication(applicationConfig);
configManager.getApplication();
applicationModel.setConfigManager(configManager);
url = url.setScopeModel(applicationModel);
ModuleModel moduleModel = applicationModel.getDefaultModule();
url = url.putAttribute(CommonConstants.SCOPE_MODEL, moduleModel);
server = new NettyPortUnificationServer(url, new DefaultPuHandler());
server.bind();
connectionManager = url.getOrDefaultFrameworkModel()
.getExtensionLoader(ConnectionManager.class)
.getExtension(MultiplexProtocolConnectionManager.NAME);
}
@AfterAll
public static void close() {
try {
server.close();
} catch (Throwable e) {
// ignored
}
}
@Test
void testGetChannel() {
final AbstractConnectionClient connectionClient = connectionManager.connect(url, new DefaultPuHandler());
Assertions.assertNotNull(connectionClient);
connectionClient.close();
}
@Test
void testRefCnt0() throws InterruptedException {
final AbstractConnectionClient connectionClient = connectionManager.connect(url, new DefaultPuHandler());
CountDownLatch latch = new CountDownLatch(1);
Assertions.assertNotNull(connectionClient);
connectionClient.addCloseListener(latch::countDown);
connectionClient.release();
latch.await();
Assertions.assertEquals(0, latch.getCount());
}
@Test
void testRefCnt1() {
DefaultPuHandler handler = new DefaultPuHandler();
final AbstractConnectionClient connectionClient = connectionManager.connect(url, handler);
CountDownLatch latch = new CountDownLatch(1);
Assertions.assertNotNull(connectionClient);
connectionManager.connect(url, handler);
connectionClient.addCloseListener(latch::countDown);
connectionClient.release();
Assertions.assertEquals(1, latch.getCount());
connectionClient.close();
}
@Test
void testRefCnt2() throws InterruptedException {
final AbstractConnectionClient connectionClient = connectionManager.connect(url, new DefaultPuHandler());
CountDownLatch latch = new CountDownLatch(1);
connectionClient.retain();
connectionClient.addCloseListener(latch::countDown);
connectionClient.release();
connectionClient.release();
latch.await();
Assertions.assertEquals(0, latch.getCount());
}
@Test
void connectSyncTest() throws Throwable {
int port = NetUtils.getAvailablePort();
URL url = URL.valueOf("empty://127.0.0.1:" + port + "?foo=bar");
NettyPortUnificationServer nettyPortUnificationServer =
new NettyPortUnificationServer(url, new DefaultPuHandler());
nettyPortUnificationServer.bind();
final AbstractConnectionClient connectionClient = connectionManager.connect(url, new DefaultPuHandler());
Assertions.assertTrue(connectionClient.isAvailable());
nettyPortUnificationServer.close();
Assertions.assertFalse(connectionClient.isAvailable());
nettyPortUnificationServer.bind();
// auto reconnect
await().atMost(Duration.ofSeconds(100)).until(() -> connectionClient.isAvailable());
Assertions.assertTrue(connectionClient.isAvailable());
connectionClient.close();
Assertions.assertFalse(connectionClient.isAvailable());
nettyPortUnificationServer.close();
}
@Test
void testMultiConnect() throws Throwable {
ExecutorService service = Executors.newFixedThreadPool(10);
final CountDownLatch latch = new CountDownLatch(10);
AtomicInteger failedCount = new AtomicInteger(0);
final AbstractConnectionClient connectionClient = connectionManager.connect(url, new DefaultPuHandler());
Runnable runnable = () -> {
try {
Assertions.assertTrue(connectionClient.isAvailable());
} catch (Exception e) {
// ignore
e.printStackTrace();
failedCount.incrementAndGet();
} finally {
latch.countDown();
}
};
for (int i = 0; i < 10; i++) {
service.execute(runnable);
}
latch.await();
Assertions.assertEquals(0, failedCount.get());
service.shutdown();
connectionClient.destroy();
}
}