blob: 266e5b778fa15a6805800d82348232621ef33976 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.rpc.rocketmq;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.buffer.ChannelBuffer;
import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.ProtocolServer;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.protocol.AbstractProtocol;
import org.apache.dubbo.rpc.rocketmq.codec.RocketMQCountCodec;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
@RunWith(PowerMockRunner.class)
@PrepareForTest(value = {RocketMQProtocol.class})
public class RocketMQProtocolTest {
private final RocketMQProtocol pocketMQProtocol = PowerMockito.spy(new RocketMQProtocol());
private final DefaultMQProducer defaultMQProducer = PowerMockito.mock(DefaultMQProducer.class);
private final RocketMQProtocolServer rocketMQProtocolServer = PowerMockito.mock(RocketMQProtocolServer.class);
private final RocketMQExporter<Object> rocketMQExporter = PowerMockito.mock(RocketMQExporter.class);
private final RocketMQCountCodec codec = PowerMockito.mock(RocketMQCountCodec.class);
private final List<MessageExt> msgs = new ArrayList<>();
private final MessageExt messageExt = new MessageExt();
private MessageListenerConcurrently messageListenerConcurrently;
private Response response;
private URL url;
@Before
public void before() throws Exception {
String urlString =
"nameservice://localhost:9876/org.apache.dubbo.registry.RegistryService?application=rocketmq-provider&dubbo=2.0.2&interface=org.apache.dubbo.registry.RegistryService&pid=8990&release=3.0.7&route=false";
url = URLBuilder.valueOf(urlString).addParameter("groupModel", "topic").addParameter(CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY, false);
PowerMockito.whenNew(RocketMQProtocolServer.class).withAnyArguments().thenReturn(rocketMQProtocolServer);
PowerMockito.when(rocketMQProtocolServer.getDefaultMQProducer()).thenReturn(defaultMQProducer);
PowerMockito.whenNew(RocketMQExporter.class).withAnyArguments().thenReturn(rocketMQExporter);
PowerMockito.whenNew(RocketMQCountCodec.class).withAnyArguments().thenReturn(codec);
PowerMockito.doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) {
messageListenerConcurrently = invocation.getArgument(0);
messageListenerConcurrently = PowerMockito.spy(messageListenerConcurrently);
return null;
}
}).when(rocketMQProtocolServer).setMessageListenerConcurrently(Mockito.any(MessageListenerConcurrently.class));
PowerMockito.when(pocketMQProtocol, "createServer", Mockito.any(URL.class), Mockito.anyString(), Mockito.anyString()).thenCallRealMethod();
messageExt.setTopic("test");
messageExt.putUserProperty(CommonConstants.TIMEOUT_KEY, (System.currentTimeMillis() + 100000) + "");
messageExt.putUserProperty(RocketMQProtocolConstant.URL_STRING, urlString);
messageExt.setBody(new byte[1024]);
msgs.add(messageExt);
}
@Test
public void createServerTest() throws Exception {
PowerMockito.when(pocketMQProtocol, "createServer", url, "", "").thenCallRealMethod();
Mockito.verify(rocketMQProtocolServer, Mockito.atLeastOnce()).setModel(Mockito.any(String.class));
Mockito.verify(rocketMQProtocolServer, Mockito.atLeastOnce()).setMessageListenerConcurrently(Mockito.any(MessageListenerConcurrently.class));
Mockito.verify(rocketMQProtocolServer, Mockito.atLeastOnce()).reset(url);
}
@Test
public void openServerTest() throws Exception {
Field serverMapField = AbstractProtocol.class.getDeclaredField("serverMap");
serverMapField.setAccessible(true);
Map<String, ProtocolServer> serverMap = (Map<String, ProtocolServer>) serverMapField.get(pocketMQProtocol);
Assert.assertTrue(serverMap.isEmpty());
RocketMQProtocolServer rocketMQProtocolServer = Whitebox.invokeMethod(pocketMQProtocol, "openServer", url, "");
Assert.assertEquals(1, serverMap.size());
PowerMockito.verifyPrivate(pocketMQProtocol).invoke("createServer", Mockito.any(URL.class), Mockito.anyString(), Mockito.anyString());
RocketMQProtocolServer newRocketMQProtocolServer = Whitebox.invokeMethod(pocketMQProtocol, "openServer", url, "");
Assert.assertEquals(rocketMQProtocolServer, newRocketMQProtocolServer);
}
@Test(expected = RuntimeException.class)
public void createMessageSelectorTest() throws Exception {
URL url = URLBuilder.from(this.url).addParameter(CommonConstants.GROUP_KEY, "test").build();
MessageSelector messageSelector = Whitebox.invokeMethod(pocketMQProtocol, "createMessageSelector", url);
Assert.assertEquals("group=test", messageSelector.getExpression());
url = URLBuilder.from(url).addParameter(CommonConstants.VERSION_KEY, "1.0.0").build();
messageSelector = Whitebox.invokeMethod(pocketMQProtocol, "createMessageSelector", url);
Assert.assertEquals("group=test and version=1.0.0", messageSelector.getExpression());
url = URLBuilder.from(this.url).addParameter(CommonConstants.VERSION_KEY, "1.0.0").build();
messageSelector = Whitebox.invokeMethod(pocketMQProtocol, "createMessageSelector", url);
Assert.assertEquals("version=1.0.0", messageSelector.getExpression());
Whitebox.invokeMethod(pocketMQProtocol, "createMessageSelector", this.url);
}
@Test(expected = RpcException.class)
public void exportTest() throws MQClientException {
Invoker<Object> invoker = PowerMockito.mock(Invoker.class);
PowerMockito.when(invoker.getUrl()).thenReturn(this.url);
PowerMockito.when(rocketMQExporter.getKey()).thenReturn("test");
DefaultMQPushConsumer consumer = PowerMockito.mock(DefaultMQPushConsumer.class);
PowerMockito.when(this.rocketMQProtocolServer.getDefaultMQPushConsumer()).thenReturn(consumer);
pocketMQProtocol.export(invoker);
Mockito.verify(consumer, Mockito.atLeastOnce()).subscribe(Mockito.anyString(), Mockito.anyString());
url = URLBuilder.from(url).addParameter("groupModel", "select").addParameter(CommonConstants.GROUP_KEY, "111").build();
PowerMockito.when(invoker.getUrl()).thenReturn(this.url);
pocketMQProtocol.export(invoker);
Mockito.verify(consumer, Mockito.atLeastOnce()).subscribe(Mockito.anyString(), Mockito.any(MessageSelector.class));
PowerMockito.doThrow(new RpcException()).when(consumer).subscribe(Mockito.anyString(), Mockito.any(MessageSelector.class));
pocketMQProtocol.export(invoker);
}
@Test(expected = RpcException.class)
public void exportExceptionTest() throws Exception {
PowerMockito.when(pocketMQProtocol, "createServer", Mockito.any(URL.class), Mockito.anyString(), Mockito.anyString())
.thenThrow(RpcException.class);
Invoker<Object> invoker = PowerMockito.mock(Invoker.class);
pocketMQProtocol.export(invoker);
}
@Test(expected = RpcException.class)
public void protocolBindingReferTest() throws Exception {
RocketMQInvoker<Object> invoker = PowerMockito.mock(RocketMQInvoker.class);
PowerMockito.whenNew(RocketMQInvoker.class).withAnyArguments().thenReturn(invoker);
PowerMockito.when(this.rocketMQProtocolServer.getDefaultMQProducer()).thenReturn(defaultMQProducer);
PowerMockito.doReturn(this.rocketMQProtocolServer).when(pocketMQProtocol, "openServer", this.url, CommonConstants.CONSUMER);
Invoker<Object> newInvoker = pocketMQProtocol.protocolBindingRefer(Object.class, this.url);
PowerMockito.verifyPrivate(pocketMQProtocol).invoke("openServer", Mockito.any(URL.class), Mockito.anyString());
Assert.assertEquals(newInvoker, invoker);
PowerMockito.when(pocketMQProtocol, "openServer", this.url, CommonConstants.CONSUMER).thenThrow(RuntimeException.class);
pocketMQProtocol.protocolBindingRefer(Object.class, this.url);
}
@Test
public void consumeMessageTest() throws Exception {
messageExt.putUserProperty(RocketMQProtocolConstant.SEND_ADDRESS, "127.0.0.1");
ExecutorService executorService = PowerMockito.mock(ExecutorService.class);
PowerMockito.when(rocketMQProtocolServer.getExecutorService()).thenReturn(executorService);
PowerMockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
}
}).when(executorService).submit(Mockito.any(Runnable.class));
messageListenerConcurrently.consumeMessage(this.msgs, null);
Mockito.verify(executorService, Mockito.atLeastOnce()).submit(Mockito.any(Runnable.class));
PowerMockito.verifyPrivate(messageListenerConcurrently).invoke("execute",Mockito.any(MessageExt.class));
}
@Test
public void executeTest() throws Exception {
messageExt.putUserProperty(RocketMQProtocolConstant.SEND_ADDRESS, "127.0.0.1");
PowerMockito.when(messageListenerConcurrently, "invoke", Mockito.any(), Mockito.any(), Mockito.any()).thenReturn(new Response())
.thenReturn(null);
Whitebox.invokeMethod(messageListenerConcurrently, "execute", new Class[] {MessageExt.class},
this.messageExt);
Whitebox.invokeMethod(messageListenerConcurrently, "execute", new Class[] {MessageExt.class},
this.messageExt);
}
@Test
public void invokeTest() throws Exception {
Channel channel = PowerMockito.mock(Channel.class);
Response response = Whitebox.invokeMethod(messageListenerConcurrently, "invoke", new Class[] {MessageExt.class, Channel.class, URL.class},
this.messageExt, channel, url);
Assert.assertEquals(response.getStatus(), Response.BAD_REQUEST);
Request request = PowerMockito.mock(Request.class);
PowerMockito.when(codec.decode(Mockito.any(), Mockito.any())).thenReturn(request);
Invocation inv = PowerMockito.mock(Invocation.class);
PowerMockito.when(request.getData()).thenReturn(inv);
Field exporterMapField = AbstractProtocol.class.getDeclaredField("exporterMap");
exporterMapField.setAccessible(true);
Map<String, Exporter<?>> exporterMap = (Map<String, Exporter<?>>) exporterMapField.get(pocketMQProtocol);
Exporter<Object> exporter = PowerMockito.mock(Exporter.class);
exporterMap.put("test", exporter);
Invoker<Object> invoker = PowerMockito.mock(Invoker.class);
PowerMockito.when(exporter.getInvoker()).thenReturn(invoker);
Result result = PowerMockito.mock(Result.class);
PowerMockito.when(invoker.invoke(Mockito.any(Invocation.class))).thenReturn(result);
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_CLUSTER, "");
response = Whitebox.invokeMethod(messageListenerConcurrently, "invoke", new Class[] {MessageExt.class, Channel.class, URL.class},
this.messageExt, channel, url);
Assert.assertEquals(response.getResult(), result);
messageExt.putUserProperty(CommonConstants.TIMEOUT_KEY, (System.currentTimeMillis() - 100000) + "");
response = Whitebox.invokeMethod(messageListenerConcurrently, "invoke", new Class[] {MessageExt.class, Channel.class, URL.class},
this.messageExt, channel, url);
Assert.assertNull(response);
}
@Test
public void createChannelBufferTest() throws Exception {
this.response = new Response();
Channel channel = PowerMockito.mock(Channel.class);
ChannelBuffer channelBuffer =
Whitebox.invokeMethod(messageListenerConcurrently, "createChannelBuffer", new Class[] {Channel.class, Response.class, URL.class},
channel, response, url);
Assert.assertNotNull(channelBuffer);
Mockito.verify(codec, Mockito.atLeastOnce()).encode(Mockito.any(Channel.class), Mockito.any(ChannelBuffer.class), Mockito.any(Object.class));
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) {
Response newResponse = invocation.getArgument(2);
Assert.assertEquals(newResponse, response);
throw new RuntimeException();
}
}).doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) {
Response newResponse = invocation.getArgument(2);
Assert.assertEquals(newResponse.getStatus(), Response.BAD_REQUEST);
return null;
}
}).when(codec).encode(Mockito.any(Channel.class), Mockito.any(ChannelBuffer.class), Mockito.any(Response.class));
channelBuffer =
Whitebox.invokeMethod(messageListenerConcurrently, "createChannelBuffer", new Class[] {Channel.class, Response.class, URL.class},
channel, response, this.url);
Assert.assertNotNull(channelBuffer);
Mockito.verify(codec, Mockito.atLeast(2)).encode(Mockito.any(Channel.class), Mockito.any(ChannelBuffer.class), Mockito.any(Object.class));
Mockito.reset(codec);
PowerMockito.doThrow(new RuntimeException()).doThrow(new IOException()).when(codec)
.encode(Mockito.any(Channel.class), Mockito.any(ChannelBuffer.class), Mockito.any(Object.class));
channelBuffer =
Whitebox.invokeMethod(messageListenerConcurrently, "createChannelBuffer", new Class[] {Channel.class, Response.class, URL.class},
channel, response, this.url);
Assert.assertNull(channelBuffer);
Mockito.verify(codec, Mockito.atLeast(2)).encode(Mockito.any(Channel.class), Mockito.any(ChannelBuffer.class), Mockito.any(Object.class));
}
@Test
public void sendMessageTest() throws Exception {
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_CLUSTER, "");
ChannelBuffer buffer = PowerMockito.mock(ChannelBuffer.class);
PowerMockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) {
long time = invocation.getArgument(1);
Assert.assertEquals(time, 3000L);
return null;
}
}).when(this.defaultMQProducer).send(Mockito.any(Message.class), Mockito.anyLong());
boolean value = Whitebox
.invokeMethod(messageListenerConcurrently, "sendMessage", new Class[] {MessageExt.class, ChannelBuffer.class, URL.class, String.class},
this.messageExt, buffer, this.url, "url");
Assert.assertTrue(value);
Mockito.verify(defaultMQProducer, Mockito.atLeastOnce()).send(Mockito.any(Message.class), Mockito.anyLong());
PowerMockito.when(this.defaultMQProducer.send(Mockito.any(Message.class), Mockito.anyLong())).thenThrow(new RuntimeException());
value = Whitebox
.invokeMethod(messageListenerConcurrently, "sendMessage", new Class[] {MessageExt.class, ChannelBuffer.class, URL.class, String.class},
this.messageExt, buffer, this.url, "url");
Assert.assertFalse(value);
}
}