blob: 76560fc5f73caba8a600878706ed9aa64b8b4c98 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.spring.support;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQReplyListener;
import org.junit.Test;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.StringMessageConverter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class DefaultRocketMQListenerContainerTest {
@Test
public void testGetMessageType() throws Exception {
DefaultRocketMQListenerContainer listenerContainer = new DefaultRocketMQListenerContainer();
Method getMessageType = DefaultRocketMQListenerContainer.class.getDeclaredMethod("getMessageType");
getMessageType.setAccessible(true);
listenerContainer.setRocketMQListener(new RocketMQListener<String>() {
@Override
public void onMessage(String message) {
}
});
Class result = (Class) getMessageType.invoke(listenerContainer);
assertThat(result.getName().equals(String.class.getName()));
//support message
listenerContainer.setRocketMQListener(new RocketMQListener<Message>() {
@Override
public void onMessage(Message message) {
}
});
result = (Class) getMessageType.invoke(listenerContainer);
assertThat(result.getName().equals(Message.class.getName()));
listenerContainer.setRocketMQListener(new RocketMQListener<MessageExt>() {
@Override
public void onMessage(MessageExt message) {
}
});
result = (Class) getMessageType.invoke(listenerContainer);
assertThat(result.getName().equals(MessageExt.class.getName()));
listenerContainer.setRocketMQReplyListener(new RocketMQReplyListener<MessageExt, String>() {
@Override
public String onMessage(MessageExt message) {
return "test";
}
});
result = (Class) getMessageType.invoke(listenerContainer);
assertThat(result.getName().equals(MessageExt.class.getName()));
listenerContainer.setRocketMQReplyListener(new RocketMQReplyListener<String, String>() {
@Override
public String onMessage(String message) {
return "test";
}
});
result = (Class) getMessageType.invoke(listenerContainer);
assertThat(result.getName().equals(String.class.getName()));
}
@Test
public void testDoConvertMessage() throws Exception {
DefaultRocketMQListenerContainer listenerContainer = new DefaultRocketMQListenerContainer();
Method doConvertMessage = DefaultRocketMQListenerContainer.class.getDeclaredMethod("doConvertMessage", MessageExt.class);
doConvertMessage.setAccessible(true);
listenerContainer.setRocketMQListener(new RocketMQListener<String>() {
@Override
public void onMessage(String message) {
}
});
Field messageType = DefaultRocketMQListenerContainer.class.getDeclaredField("messageType");
messageType.setAccessible(true);
messageType.set(listenerContainer, String.class);
MessageExt messageExt = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null);
messageExt.setBody("hello".getBytes());
String result = (String) doConvertMessage.invoke(listenerContainer, messageExt);
assertThat(result).isEqualTo("hello");
listenerContainer.setRocketMQListener(new RocketMQListener<MessageExt>() {
@Override
public void onMessage(MessageExt message) {
}
});
Field messageType2 = DefaultRocketMQListenerContainer.class.getDeclaredField("messageType");
messageType2.setAccessible(true);
messageType2.set(listenerContainer, MessageExt.class);
messageExt = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null);
messageExt.setBody("hello".getBytes());
MessageExt result2 = (MessageExt) doConvertMessage.invoke(listenerContainer, messageExt);
assertThat(result2).isEqualTo(messageExt);
//support message
listenerContainer.setRocketMQListener(new RocketMQListener<Message>() {
@Override
public void onMessage(Message message) {
}
});
Field messageType3 = DefaultRocketMQListenerContainer.class.getDeclaredField("messageType");
messageType3.setAccessible(true);
messageType3.set(listenerContainer, Message.class);
Message message = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null);
message.setBody("hello".getBytes());
Message result3 = (Message) doConvertMessage.invoke(listenerContainer, message);
assertThat(result3).isEqualTo(message);
listenerContainer.setRocketMQListener(new RocketMQListener<User>() {
@Override
public void onMessage(User message) {
}
});
listenerContainer.setRocketMQListener(new RocketMQListener<User>() {
@Override
public void onMessage(User message) {
}
});
}
@Test
public void testGenericMessageType() throws Exception {
DefaultRocketMQListenerContainer listenerContainer = new DefaultRocketMQListenerContainer();
listenerContainer.setMessageConverter(new CompositeMessageConverter(Arrays.asList(new StringMessageConverter(), new MappingJackson2MessageConverter())));
Method getMessageType = DefaultRocketMQListenerContainer.class.getDeclaredMethod("getMessageType");
Method getMethodParameter = DefaultRocketMQListenerContainer.class.getDeclaredMethod("getMethodParameter");
getMessageType.setAccessible(true);
getMethodParameter.setAccessible(true);
listenerContainer.setRocketMQListener(new RocketMQListener<ArrayList<Date>>() {
@Override
public void onMessage(ArrayList<Date> message) {
}
});
ParameterizedType type = (ParameterizedType) getMessageType.invoke(listenerContainer);
assertThat(type.getRawType() == ArrayList.class);
MethodParameter methodParameter = ((MethodParameter) getMethodParameter.invoke(listenerContainer));
assertThat(methodParameter.getParameterType() == ArrayList.class);
listenerContainer.setRocketMQReplyListener(new RocketMQReplyListener<ArrayList<Date>, String>() {
@Override
public String onMessage(ArrayList<Date> message) {
return "test";
}
});
type = (ParameterizedType) getMessageType.invoke(listenerContainer);
assertThat(type.getRawType() == ArrayList.class);
methodParameter = ((MethodParameter) getMethodParameter.invoke(listenerContainer));
assertThat(methodParameter.getParameterType() == ArrayList.class);
}
@Test
public void testHandleMessage() throws Exception {
DefaultRocketMQListenerContainer listenerContainer = new DefaultRocketMQListenerContainer();
Method handleMessage = DefaultRocketMQListenerContainer.class.getDeclaredMethod("handleMessage", MessageExt.class);
handleMessage.setAccessible(true);
listenerContainer.setRocketMQListener(new RocketMQListener<String>() {
@Override
public void onMessage(String message) {
}
});
Field messageType = DefaultRocketMQListenerContainer.class.getDeclaredField("messageType");
messageType.setAccessible(true);
messageType.set(listenerContainer, String.class);
MessageExt messageExt = new MessageExt(0, System.currentTimeMillis(), null, System.currentTimeMillis(), null, null);
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_CLUSTER, "defaultCluster");
messageExt.setBody("hello".getBytes());
handleMessage.invoke(listenerContainer, messageExt);
// reply message
listenerContainer.setRocketMQListener(null);
DefaultMQPushConsumer consumer = mock(DefaultMQPushConsumer.class);
DefaultMQPushConsumerImpl pushConsumer = mock(DefaultMQPushConsumerImpl.class);
MQClientInstance mqClientInstance = mock(MQClientInstance.class);
DefaultMQProducer producer = mock(DefaultMQProducer.class);
when(consumer.getDefaultMQPushConsumerImpl()).thenReturn(pushConsumer);
when(pushConsumer.getmQClientFactory()).thenReturn(mqClientInstance);
when(mqClientInstance.getDefaultMQProducer()).thenReturn(producer);
listenerContainer.setConsumer(consumer);
listenerContainer.setMessageConverter(new CompositeMessageConverter(Arrays.asList(new StringMessageConverter(), new MappingJackson2MessageConverter())));
doNothing().when(producer).send(any(MessageExt.class), any(SendCallback.class));
listenerContainer.setRocketMQReplyListener(new RocketMQReplyListener<String, String>() {
@Override
public String onMessage(String message) {
return "test";
}
});
handleMessage.invoke(listenerContainer, messageExt);
}
@Test
public void testSetRocketMQMessageListener() {
DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();
RocketMQMessageListener anno = TestRocketMQMessageListener.class.getAnnotation(RocketMQMessageListener.class);
container.setRocketMQMessageListener(anno);
assertEquals(anno.consumeMode(), container.getConsumeMode());
assertEquals(anno.consumeThreadMax(), container.getConsumeThreadMax());
assertEquals(anno.messageModel(), container.getMessageModel());
assertEquals(anno.selectorType(), container.getSelectorType());
assertEquals(anno.selectorExpression(), container.getSelectorExpression());
assertEquals(anno.tlsEnable(), container.getTlsEnable());
assertEquals(anno.namespace(), container.getNamespace());
assertEquals(anno.delayLevelWhenNextConsume(), container.getDelayLevelWhenNextConsume());
assertEquals(anno.suspendCurrentQueueTimeMillis(), container.getSuspendCurrentQueueTimeMillis());
}
@RocketMQMessageListener(consumerGroup = "abc1", topic = "test",
consumeMode = ConsumeMode.ORDERLY,
consumeThreadMax = 3456,
messageModel = MessageModel.BROADCASTING,
selectorType = SelectorType.SQL92,
selectorExpression = "selectorExpression",
tlsEnable = "tlsEnable",
namespace = "namespace",
delayLevelWhenNextConsume = 1234,
suspendCurrentQueueTimeMillis = 2345
)
class TestRocketMQMessageListener {
}
class User {
private String userName;
private int userAge;
public String getUserName() {
return userName;
}
public User setUserName(String userName) {
this.userName = userName;
return this;
}
public int getUserAge() {
return userAge;
}
public User setUserAge(int userAge) {
this.userAge = userAge;
return this;
}
}
}