blob: 1c935368a9381acd3aca16046babf690afecb42b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.registry.nameservice;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.constants.RegistryConstants;
import org.apache.dubbo.common.utils.ReflectUtils;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.BrokerData;
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl;
import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
@RunWith(PowerMockRunner.class)
@PrepareForTest({NameServiceRegistry.class})
public class NameServiceRegistryTest {
private DefaultMQAdminExtImpl defaultMQAdminExtImpl = PowerMockito.mock(DefaultMQAdminExtImpl.class);
;
private ScheduledExecutorService scheduledExecutorService = PowerMockito.mock(ScheduledExecutorService.class);
private URL routuUrl;
private URL routuFlase;
private Class<?> registryInfoWrapper;
private Field listenerField;
private Field serviceNameField;
@Before
public void init() throws Exception {
String urlString =
"nameservice://localhost:9876/org.apache.dubbo.registry.RegistryService?application=rocketmq-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=8990&release=3.0.7&route=false";
routuFlase = URLBuilder.valueOf(urlString);
routuUrl = URLBuilder.from(routuFlase).addParameter("route", "true").build();
PowerMockito.whenNew(DefaultMQAdminExtImpl.class).withAnyArguments().thenReturn(defaultMQAdminExtImpl);
PowerMockito.mockStatic(Executors.class);
PowerMockito.when(Executors.newSingleThreadScheduledExecutor(Mockito.any())).thenReturn(scheduledExecutorService);
registryInfoWrapper = Class.forName("org.apache.dubbo.registry.nameservice.NameServiceRegistry$RegistryInfoWrapper");
listenerField = registryInfoWrapper.getDeclaredField("listener");
listenerField.setAccessible(true);
serviceNameField = registryInfoWrapper.getDeclaredField("serviceName");
serviceNameField.setAccessible(true);
}
@Test
public void newTest() throws Exception {
NameServiceRegistry nameServiceRegistry = new NameServiceRegistry(routuUrl);
boolean isNotRoute = ReflectUtils.getFieldValue(nameServiceRegistry, "isNotRoute");
Assert.assertEquals(isNotRoute, true);
long timeoutMillis = ReflectUtils.getFieldValue(nameServiceRegistry, "timeoutMillis");
Assert.assertEquals(timeoutMillis, 0);
String instanceName = ReflectUtils.getFieldValue(nameServiceRegistry, "instanceName");
Assert.assertEquals(instanceName, null);
nameServiceRegistry = new NameServiceRegistry(routuFlase);
isNotRoute = ReflectUtils.getFieldValue(nameServiceRegistry, "isNotRoute");
Assert.assertEquals(isNotRoute, false);
timeoutMillis = ReflectUtils.getFieldValue(nameServiceRegistry, "timeoutMillis");
Assert.assertEquals(timeoutMillis, 3000);
instanceName = ReflectUtils.getFieldValue(nameServiceRegistry, "instanceName");
Assert.assertEquals(instanceName, "nameservic-registry");
Mockito.verify(defaultMQAdminExtImpl, Mockito.atLeastOnce()).start();
Mockito.verify(defaultMQAdminExtImpl, Mockito.atLeastOnce()).examineBrokerClusterInfo();
Mockito.verify(defaultMQAdminExtImpl, Mockito.atLeastOnce()).fetchAllTopicList();
Mockito.verify(scheduledExecutorService, Mockito.atLeastOnce()).scheduleAtFixedRate(Mockito.any(Runnable.class)
, Mockito.any(Long.class)
, Mockito.any(Long.class)
, Mockito.any(TimeUnit.class));
}
@Test(expected = RuntimeException.class)
public void newStartExceptionTest() throws Exception {
PowerMockito.doThrow(new RuntimeException()).when(defaultMQAdminExtImpl).start();
NameServiceRegistry nameServiceRegistry = new NameServiceRegistry(routuFlase);
}
@Test
public void runnableTest() throws Exception {
PowerMockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Object[] arguments = invocation.getArguments();
Assert.assertNotNull(arguments[0]);
Assert.assertEquals(arguments[1], 10000L);
Assert.assertEquals(arguments[2], 30000L);
Assert.assertEquals(arguments[3], TimeUnit.MILLISECONDS);
return null;
}
}).when(scheduledExecutorService)
.scheduleAtFixedRate(Mockito.any(Runnable.class)
, Mockito.any(Long.class)
, Mockito.any(Long.class)
, Mockito.any(TimeUnit.class));
NameServiceRegistry nameServiceRegistry = PowerMockito.spy(new NameServiceRegistry(routuFlase));
PowerMockito.doNothing().when(nameServiceRegistry, "initBeasInfo");
PowerMockito.when(nameServiceRegistry, "run").thenCallRealMethod();
PowerMockito.verifyPrivate(nameServiceRegistry, Mockito.never()).invoke("pullRoute", null, null, null);
nameServiceRegistry = new NameServiceRegistry(routuFlase);
Map<URL, Object> consumerRegistryInfoWrapperMap = ReflectUtils.getFieldValue(nameServiceRegistry, "consumerRegistryInfoWrapperMap");
Constructor<?> constructor = registryInfoWrapper.getConstructor(new Class<?>[] {NameServiceRegistry.class});
constructor.setAccessible(true);
Object wrapper = constructor.newInstance(nameServiceRegistry);
NotifyListener notifyListener = PowerMockito.mock(NotifyListener.class);
listenerField.set(wrapper, notifyListener);
ServiceName serviceName = new ServiceName(routuFlase);
serviceNameField.set(wrapper, serviceName);
consumerRegistryInfoWrapperMap.put(routuUrl, wrapper);
nameServiceRegistry = PowerMockito.spy(nameServiceRegistry);
PowerMockito.when(nameServiceRegistry, "run").thenCallRealMethod();
TopicRouteData topicRouteData = new TopicRouteData();
List<QueueData> queueDataList = new ArrayList<>();
topicRouteData.setQueueDatas(queueDataList);
QueueData queueData = new QueueData();
queueData.setPerm(PermName.PERM_WRITE);
queueDataList.add(queueData);
queueData = new QueueData();
queueData.setPerm(PermName.PERM_READ);
queueData.setReadQueueNums(8);
queueDataList.add(queueData);
Mockito.doReturn(topicRouteData).when(defaultMQAdminExtImpl).examineTopicRouteInfo(Mockito.any());
PowerMockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Object[] arguments = invocation.getArguments();
List<Object> urlList = (List<Object>) arguments[0];
Assert.assertEquals(urlList.size(), 8);
return null;
}
}).when(notifyListener).notify(Mockito.any(List.class));
PowerMockito.when(nameServiceRegistry, "run").thenCallRealMethod();
Mockito.verify(notifyListener, Mockito.atLeastOnce()).notify(Mockito.any());
}
@Test
public void createProviderURLTest() throws Exception {
NameServiceRegistry nameServiceRegistry = PowerMockito.spy(new NameServiceRegistry(routuFlase));
ServiceName serviceName = new ServiceName(routuFlase);
URL url = Whitebox.invokeMethod(nameServiceRegistry, "createProviderURL", serviceName, routuFlase, 8);
Assert.assertEquals(url.getProtocol(), "rocketmq");
Assert.assertEquals(url.getParameter(CommonConstants.PROTOCOL_KEY), "rocketmq");
Assert.assertEquals(url.getParameter(CommonConstants.INTERFACE_KEY), serviceName.getServiceInterface());
Assert.assertEquals(url.getParameter("topic"), serviceName.getValue());
Assert.assertEquals(url.getParameter("queueId"), 8 + "");
}
@Test
public void createServiceNameTest() throws Exception {
NameServiceRegistry nameServiceRegistry = PowerMockito.spy(new NameServiceRegistry(routuFlase));
ServiceName serviceName = new ServiceName(routuFlase);
ServiceName newServiceName = Whitebox.invokeMethod(nameServiceRegistry, "createServiceName", routuFlase);
Assert.assertEquals(serviceName, newServiceName);
}
@Test
public void createTopicTest() throws Exception {
NameServiceRegistry nameServiceRegistry = new NameServiceRegistry(routuUrl);
Field mqAdminExtField = NameServiceRegistry.class.getDeclaredField("mqAdminExt");
mqAdminExtField.setAccessible(true);
mqAdminExtField.set(nameServiceRegistry, this.defaultMQAdminExtImpl);
Field isNotRouteField = NameServiceRegistry.class.getDeclaredField("isNotRoute");
isNotRouteField.setAccessible(true);
TopicList topicList = PowerMockito.mock(TopicList.class);
Field topicListField = NameServiceRegistry.class.getDeclaredField("topicList");
topicListField.setAccessible(true);
topicListField.set(nameServiceRegistry, topicList);
ServiceName serviceName = new ServiceName(routuUrl);
Whitebox.invokeMethod(nameServiceRegistry, "createTopic", serviceName);
Mockito.verify(topicList, Mockito.never()).getTopicList();
isNotRouteField.set(nameServiceRegistry, false);
Set<String> topic = Mockito.mock(Set.class);
Mockito.when(topicList.getTopicList()).thenReturn(topic);
Mockito.when(topic.contains(Mockito.any())).thenReturn(true);
Whitebox.invokeMethod(nameServiceRegistry, "createTopic", serviceName);
Mockito.verify(topicList, Mockito.atLeastOnce()).getTopicList();
Mockito.when(topic.contains(Mockito.any())).thenReturn(false);
ClusterInfo clusterInfo = PowerMockito.mock(ClusterInfo.class);
Field clusterInfoField = NameServiceRegistry.class.getDeclaredField("clusterInfo");
clusterInfoField.setAccessible(true);
clusterInfoField.set(nameServiceRegistry, clusterInfo);
Whitebox.invokeMethod(nameServiceRegistry, "createTopic", serviceName);
Mockito.verify(defaultMQAdminExtImpl, Mockito.never()).createAndUpdateTopicConfig(Mockito.any(), Mockito.any());
HashMap<String/* brokerName */, BrokerData> brokerAddrTable = new HashMap<>();
BrokerData brokerData = new BrokerData();
brokerAddrTable.put("", brokerData);
HashMap<Long, String> brokerAddrs = new HashMap<>();
brokerAddrs.put(1L, "");
brokerAddrs.put(2L, "");
brokerData.setBrokerAddrs(brokerAddrs);
Mockito.when(clusterInfo.getBrokerAddrTable()).thenReturn(brokerAddrTable);
Whitebox.invokeMethod(nameServiceRegistry, "createTopic", serviceName);
Mockito.verify(defaultMQAdminExtImpl, Mockito.atMost(2)).createAndUpdateTopicConfig(Mockito.any(), Mockito.any());
}
@Test
public void isAvailableTest() {
NameServiceRegistry nameServiceRegistry = new NameServiceRegistry(routuUrl);
Assert.assertTrue(nameServiceRegistry.isAvailable());
}
@Test
public void doRegisterTest() throws Exception {
NameServiceRegistry nameServiceRegistry = PowerMockito.spy(new NameServiceRegistry(routuUrl));
PowerMockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Assert.assertEquals(invocation.getArguments()[0].getClass(), ServiceName.class);
return null;
}
}).when(nameServiceRegistry, "createTopic", Mockito.any(ServiceName.class));
nameServiceRegistry.doRegister(this.routuUrl);
}
@Test(expected = RuntimeException.class)
public void doSubscribeException() throws Exception {
NameServiceRegistry nameServiceRegistry = PowerMockito.spy(new NameServiceRegistry(routuUrl));
nameServiceRegistry.doSubscribe(routuUrl, null);
}
@Test
public void doSubscribe() throws Exception {
NameServiceRegistry nameServiceRegistry = PowerMockito.spy(new NameServiceRegistry(routuUrl));
Field mqAdminExtField = NameServiceRegistry.class.getDeclaredField("mqAdminExt");
mqAdminExtField.setAccessible(true);
mqAdminExtField.set(nameServiceRegistry, this.defaultMQAdminExtImpl);
URLBuilder builder = URLBuilder.from(this.routuUrl);
builder.addParameter(RegistryConstants.CATEGORY_KEY, RegistryConstants.CONFIGURATORS_CATEGORY);
URL url = builder.build();
nameServiceRegistry.doSubscribe(url, null);
PowerMockito.verifyPrivate(nameServiceRegistry, Mockito.never()).invoke("createServiceName", url);
NotifyListener listener = PowerMockito.mock(NotifyListener.class);
Mockito.when(this.defaultMQAdminExtImpl.queryTopicConsumeByWho(Mockito.any())).thenReturn(null);
nameServiceRegistry.doSubscribe(routuUrl, listener);
Mockito.verify(listener, Mockito.never()).notify(Mockito.any());
GroupList groupList = new GroupList();
Mockito.when(this.defaultMQAdminExtImpl.queryTopicConsumeByWho(Mockito.any())).thenReturn(groupList);
nameServiceRegistry.doSubscribe(routuUrl, listener);
Mockito.verify(listener, Mockito.never()).notify(Mockito.any());
groupList.getGroupList().add("");
nameServiceRegistry.doSubscribe(routuUrl, listener);
Mockito.verify(listener, Mockito.atLeastOnce()).notify(Mockito.any(List.class));
Field isNotRouteField = NameServiceRegistry.class.getDeclaredField("isNotRoute");
isNotRouteField.setAccessible(true);
isNotRouteField.set(nameServiceRegistry, false);
TopicRouteData topicRouteData = PowerMockito.mock(TopicRouteData.class);
Mockito.when(this.defaultMQAdminExtImpl.examineTopicRouteInfo(Mockito.any())).thenReturn(topicRouteData);
nameServiceRegistry.doSubscribe(routuUrl, listener);
Mockito.verify(listener, Mockito.atLeastOnce()).notify(Mockito.any());
Mockito.verify(this.defaultMQAdminExtImpl, Mockito.atLeastOnce()).examineTopicRouteInfo(Mockito.any());
}
}