blob: 41c96b5cc058fbb8487e67060e6ac56a4f2634ca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.cluster.support;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.cluster.Directory;
import org.apache.dubbo.rpc.cluster.LoadBalance;
import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import static org.mockito.Mockito.when;
@SuppressWarnings("all")
public class ConnectivityValidationTest {
private Invoker invoker1;
private Invoker invoker2;
private Invoker invoker3;
private Invoker invoker4;
private Invoker invoker5;
private Invoker invoker6;
private Invoker invoker7;
private Invoker invoker8;
private Invoker invoker9;
private Invoker invoker10;
private Invoker invoker11;
private Invoker invoker12;
private Invoker invoker13;
private Invoker invoker14;
private Invoker invoker15;
private List<Invoker> invokerList;
private StaticDirectory directory;
private ConnectivityClusterInvoker clusterInvoker;
@BeforeEach
public void setup() {
invoker1 = Mockito.mock(Invoker.class);
invoker2 = Mockito.mock(Invoker.class);
invoker3 = Mockito.mock(Invoker.class);
invoker4 = Mockito.mock(Invoker.class);
invoker5 = Mockito.mock(Invoker.class);
invoker6 = Mockito.mock(Invoker.class);
invoker7 = Mockito.mock(Invoker.class);
invoker8 = Mockito.mock(Invoker.class);
invoker9 = Mockito.mock(Invoker.class);
invoker10 = Mockito.mock(Invoker.class);
invoker11 = Mockito.mock(Invoker.class);
invoker12 = Mockito.mock(Invoker.class);
invoker13 = Mockito.mock(Invoker.class);
invoker14 = Mockito.mock(Invoker.class);
invoker15 = Mockito.mock(Invoker.class);
configInvoker(invoker1);
configInvoker(invoker2);
configInvoker(invoker3);
configInvoker(invoker4);
configInvoker(invoker5);
configInvoker(invoker6);
configInvoker(invoker7);
configInvoker(invoker8);
configInvoker(invoker9);
configInvoker(invoker10);
configInvoker(invoker11);
configInvoker(invoker12);
configInvoker(invoker13);
configInvoker(invoker14);
configInvoker(invoker15);
invokerList = new LinkedList<>();
invokerList.add(invoker1);
invokerList.add(invoker2);
invokerList.add(invoker3);
invokerList.add(invoker4);
invokerList.add(invoker5);
directory = new StaticDirectory(invokerList);
clusterInvoker = new ConnectivityClusterInvoker(directory);
}
@AfterEach
public void tearDown() {
clusterInvoker.destroy();
}
private void configInvoker(Invoker invoker) {
when(invoker.getUrl()).thenReturn(URL.valueOf(""));
when(invoker.isAvailable()).thenReturn(true);
}
@BeforeAll
public static void setupClass() {
System.setProperty(CommonConstants.RECONNECT_TASK_PERIOD, "1");
}
@AfterAll
public static void clearAfterClass() {
System.clearProperty(CommonConstants.RECONNECT_TASK_PERIOD);
}
@Test
public void testBasic() throws InterruptedException {
Invocation invocation = new RpcInvocation();
LoadBalance loadBalance = new RandomLoadBalance();
Assertions.assertEquals(5, directory.list(invocation).size());
Assertions.assertNotNull(clusterInvoker.select(loadBalance, invocation, directory.list(invocation), Collections.emptyList()));
when(invoker1.isAvailable()).thenReturn(false);
when(invoker2.isAvailable()).thenReturn(false);
when(invoker3.isAvailable()).thenReturn(false);
when(invoker4.isAvailable()).thenReturn(false);
when(invoker5.isAvailable()).thenReturn(false);
clusterInvoker.select(loadBalance, invocation, directory.list(invocation), Collections.emptyList());
Assertions.assertEquals(0, directory.list(invocation).size());
when(invoker1.isAvailable()).thenReturn(true);
Set<Invoker> invokerSet = new HashSet<>();
invokerSet.add(invoker1);
waitRefresh(invokerSet);
Assertions.assertEquals(1, directory.list(invocation).size());
Assertions.assertNotNull(clusterInvoker.select(loadBalance, invocation, directory.list(invocation), Collections.emptyList()));
when(invoker2.isAvailable()).thenReturn(true);
invokerSet.add(invoker2);
waitRefresh(invokerSet);
Assertions.assertEquals(2, directory.list(invocation).size());
Assertions.assertNotNull(clusterInvoker.select(loadBalance, invocation, directory.list(invocation), Collections.emptyList()));
invokerList.remove(invoker5);
directory.notify(invokerList);
when(invoker2.isAvailable()).thenReturn(true);
waitRefresh(invokerSet);
Assertions.assertEquals(2, directory.list(invocation).size());
Assertions.assertNotNull(clusterInvoker.select(loadBalance, invocation, directory.list(invocation), Collections.emptyList()));
when(invoker3.isAvailable()).thenReturn(true);
when(invoker4.isAvailable()).thenReturn(true);
invokerSet.add(invoker3);
invokerSet.add(invoker4);
waitRefresh(invokerSet);
Assertions.assertEquals(4, directory.list(invocation).size());
Assertions.assertNotNull(clusterInvoker.select(loadBalance, invocation, directory.list(invocation), Collections.emptyList()));
}
@Test
public void testRetry() throws InterruptedException {
Invocation invocation = new RpcInvocation();
LoadBalance loadBalance = new RandomLoadBalance();
invokerList.clear();
invokerList.add(invoker1);
invokerList.add(invoker2);
directory.notify(invokerList);
Assertions.assertEquals(2, directory.list(invocation).size());
when(invoker1.isAvailable()).thenReturn(false);
Assertions.assertEquals(invoker2, clusterInvoker.select(loadBalance, invocation, directory.list(invocation), Collections.singletonList(invoker2)));
Assertions.assertEquals(1, directory.list(invocation).size());
when(invoker1.isAvailable()).thenReturn(true);
Set<Invoker> invokerSet = new HashSet<>();
invokerSet.add(invoker1);
waitRefresh(invokerSet);
Assertions.assertEquals(2, directory.list(invocation).size());
}
@Test
public void testRandomSelect() throws InterruptedException {
Invocation invocation = new RpcInvocation();
LoadBalance loadBalance = new RandomLoadBalance();
invokerList.add(invoker6);
invokerList.add(invoker7);
invokerList.add(invoker8);
invokerList.add(invoker9);
invokerList.add(invoker10);
invokerList.add(invoker11);
invokerList.add(invoker12);
invokerList.add(invoker13);
invokerList.add(invoker14);
invokerList.add(invoker15);
directory.notify(invokerList);
Assertions.assertEquals(15, directory.list(invocation).size());
when(invoker2.isAvailable()).thenReturn(false);
when(invoker3.isAvailable()).thenReturn(false);
when(invoker4.isAvailable()).thenReturn(false);
when(invoker5.isAvailable()).thenReturn(false);
when(invoker6.isAvailable()).thenReturn(false);
when(invoker7.isAvailable()).thenReturn(false);
when(invoker8.isAvailable()).thenReturn(false);
when(invoker9.isAvailable()).thenReturn(false);
when(invoker10.isAvailable()).thenReturn(false);
when(invoker11.isAvailable()).thenReturn(false);
when(invoker12.isAvailable()).thenReturn(false);
when(invoker13.isAvailable()).thenReturn(false);
when(invoker14.isAvailable()).thenReturn(false);
when(invoker15.isAvailable()).thenReturn(false);
for (int i = 0; i < 15; i++) {
clusterInvoker.select(loadBalance, invocation, directory.list(invocation), Collections.emptyList());
}
for (int i = 0; i < 5; i++) {
Assertions.assertEquals(invoker1, clusterInvoker.select(loadBalance, invocation, directory.list(invocation), Collections.emptyList()));
}
when(invoker1.isAvailable()).thenReturn(false);
clusterInvoker.select(loadBalance, invocation, directory.list(invocation), Collections.emptyList());
Assertions.assertEquals(0, directory.list(invocation).size());
when(invoker1.isAvailable()).thenReturn(true);
when(invoker2.isAvailable()).thenReturn(true);
when(invoker3.isAvailable()).thenReturn(true);
when(invoker4.isAvailable()).thenReturn(true);
when(invoker5.isAvailable()).thenReturn(true);
when(invoker6.isAvailable()).thenReturn(true);
when(invoker7.isAvailable()).thenReturn(true);
when(invoker8.isAvailable()).thenReturn(true);
when(invoker9.isAvailable()).thenReturn(true);
when(invoker10.isAvailable()).thenReturn(true);
when(invoker11.isAvailable()).thenReturn(true);
when(invoker12.isAvailable()).thenReturn(true);
when(invoker13.isAvailable()).thenReturn(true);
when(invoker14.isAvailable()).thenReturn(true);
when(invoker15.isAvailable()).thenReturn(true);
Set<Invoker> invokerSet = new HashSet<>();
invokerSet.add(invoker1);
invokerSet.add(invoker2);
invokerSet.add(invoker3);
invokerSet.add(invoker4);
invokerSet.add(invoker5);
invokerSet.add(invoker6);
invokerSet.add(invoker7);
invokerSet.add(invoker8);
invokerSet.add(invoker9);
invokerSet.add(invoker10);
invokerSet.add(invoker11);
invokerSet.add(invoker12);
invokerSet.add(invoker13);
invokerSet.add(invoker14);
invokerSet.add(invoker15);
waitRefresh(invokerSet);
Assertions.assertTrue(directory.list(invocation).size() > 1);
}
private static class ConnectivityClusterInvoker<T> extends AbstractClusterInvoker<T> {
public ConnectivityClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
public Invoker<T> select(LoadBalance loadbalance, Invocation invocation, List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {
return super.select(loadbalance, invocation, invokers, selected);
}
@Override
protected Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
return null;
}
}
private void waitRefresh(Set<Invoker> invokerSet) throws InterruptedException {
directory.checkConnectivity();
while (true) {
List<Invoker> reconnectList = directory.getInvokersToReconnect();
if (reconnectList.stream().anyMatch(invoker -> invokerSet.contains(invoker))) {
Thread.sleep(10);
continue;
}
break;
}
}
private static class RandomLoadBalance implements LoadBalance {
@Override
public <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
return CollectionUtils.isNotEmpty(invokers) ? invokers.get(ThreadLocalRandom.current().nextInt(invokers.size())) : null;
}
}
}