blob: 3d1d335c2fc6c156d70fba8205f9c0472d78bd91 [file] [log] [blame]
/*
* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License.
*/
package org.apache.iotdb.cluster.client.async;
import org.apache.iotdb.cluster.client.async.AsyncDataClient.FactoryAsync;
import org.apache.iotdb.cluster.common.TestAsyncClient;
import org.apache.iotdb.cluster.common.TestAsyncClientFactory;
import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.config.ClusterConfig;
import org.apache.iotdb.cluster.config.ClusterDescriptor;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftService.AsyncClient;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TBinaryProtocol.Factory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class AsyncClientPoolTest {
@Mock private AsyncClientFactory testAsyncClientFactory;
private ClusterConfig config = ClusterDescriptor.getInstance().getConfig();
private boolean isAsyncServer;
@Before
public void setUp() {
isAsyncServer = config.isUseAsyncServer();
config.setUseAsyncServer(true);
}
@After
public void tearDown() {
config.setUseAsyncServer(isAsyncServer);
}
@Test
public void testTestClient() throws IOException {
testAsyncClientFactory = new TestAsyncClientFactory();
getClient();
putClient();
}
@Test
public void testDataClient() throws IOException {
testAsyncClientFactory = new FactoryAsync(new TBinaryProtocol.Factory());
getClient();
putClient();
}
@Test
public void testMetaClient() throws IOException {
testAsyncClientFactory = new AsyncMetaClient.FactoryAsync(new TBinaryProtocol.Factory());
getClient();
putClient();
}
private void getClient() throws IOException {
AsyncClientPool asyncClientPool = new AsyncClientPool(testAsyncClientFactory);
for (int i = 0; i < 10; i++) {
AsyncClient client = asyncClientPool.getClient(TestUtils.getNode(i));
if (client instanceof TestAsyncClient) {
TestAsyncClient testAsyncClient = (TestAsyncClient) client;
assertEquals(i, testAsyncClient.getSerialNum());
}
}
}
private void putClient() throws IOException {
AsyncClientPool asyncClientPool = new AsyncClientPool(testAsyncClientFactory);
List<AsyncClient> testClients = new ArrayList<>();
for (int i = 0; i < 10; i++) {
AsyncClient client = asyncClientPool.getClient(TestUtils.getNode(i));
testClients.add(client);
}
if (testAsyncClientFactory instanceof TestAsyncClientFactory) {
for (int i = 0; i < 10; i++) {
asyncClientPool.putClient(TestUtils.getNode(i), testClients.get(i));
}
} else if (testAsyncClientFactory instanceof AsyncMetaClient.FactoryAsync) {
for (AsyncClient testClient : testClients) {
((AsyncMetaClient) testClient).onComplete();
}
} else if (testAsyncClientFactory instanceof FactoryAsync) {
for (AsyncClient testClient : testClients) {
((AsyncDataClient) testClient).onComplete();
}
}
for (int i = 0; i < 10; i++) {
AsyncClient poolClient = asyncClientPool.getClient(TestUtils.getNode(i));
assertEquals(testClients.get(i), poolClient);
}
}
@Test
public void testMaxClient() throws IOException {
int maxClientNum = config.getMaxClientPerNodePerMember();
config.setMaxClientPerNodePerMember(5);
testAsyncClientFactory = new TestAsyncClientFactory();
AsyncClientPool asyncClientPool = new AsyncClientPool(testAsyncClientFactory);
for (int i = 0; i < 5; i++) {
asyncClientPool.getClient(TestUtils.getNode(0));
}
AtomicReference<AsyncClient> reference = new AtomicReference<>();
Thread t =
new Thread(
() -> {
try {
reference.set(asyncClientPool.getClient(TestUtils.getNode(0)));
} catch (IOException e) {
e.printStackTrace();
}
});
t.start();
t.interrupt();
assertNull(reference.get());
config.setMaxClientPerNodePerMember(maxClientNum);
}
@Test
public void testWaitClient() throws IOException {
int maxClientPerNodePerMember = config.getMaxClientPerNodePerMember();
try {
config.setMaxClientPerNodePerMember(10);
testAsyncClientFactory = new TestAsyncClientFactory();
AsyncClientPool asyncClientPool = new AsyncClientPool(testAsyncClientFactory);
Node node = TestUtils.getNode(0);
List<AsyncClient> clients = new ArrayList<>();
for (int i = 0; i < 10; i++) {
clients.add(asyncClientPool.getClient(node));
}
AtomicBoolean waitStart = new AtomicBoolean(false);
new Thread(
() -> {
while (!waitStart.get()) {
// wait until we start to for wait for a client
}
synchronized (asyncClientPool) {
for (AsyncClient client : clients) {
asyncClientPool.putClient(node, client);
}
}
})
.start();
AsyncClient client;
synchronized (asyncClientPool) {
waitStart.set(true);
// getClient() will wait on asyncClientPool, so the thread above can return clients
client = asyncClientPool.getClient(node);
}
assertNotNull(client);
} finally {
config.setMaxClientPerNodePerMember(maxClientPerNodePerMember);
}
}
@Test
public void testWaitClientTimeOut() throws IOException {
int maxClientPerNodePerMember = config.getMaxClientPerNodePerMember();
try {
config.setMaxClientPerNodePerMember(1);
testAsyncClientFactory = new TestAsyncClientFactory();
AsyncClientPool asyncClientPool = new AsyncClientPool(testAsyncClientFactory);
Node node = TestUtils.getNode(0);
List<AsyncClient> clients = new ArrayList<>();
for (int i = 0; i < 2; i++) {
clients.add(asyncClientPool.getClient(node));
}
assertNotEquals(clients.get(0), clients.get(1));
} finally {
config.setMaxClientPerNodePerMember(maxClientPerNodePerMember);
}
}
@Test
public void testRecreateClient() throws IOException {
testAsyncClientFactory = new TestAsyncClientFactory();
AsyncClientPool asyncClientPool =
new AsyncClientPool(new AsyncMetaClient.FactoryAsync(new Factory()));
AsyncMetaClient client = (AsyncMetaClient) asyncClientPool.getClient(TestUtils.getNode(0));
client.onError(new Exception());
AsyncClient newClient = asyncClientPool.getClient(TestUtils.getNode(0));
assertNotEquals(client, newClient);
}
}