Fix receipt handle renewal test race
diff --git a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java index 62e5e64..a21bd7e 100644 --- a/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java +++ b/proxy/src/test/java/org/apache/rocketmq/proxy/processor/ReceiptHandleProcessorTest.java
@@ -17,8 +17,11 @@ package org.apache.rocketmq.proxy.processor; import io.netty.channel.local.LocalChannel; +import java.util.concurrent.CompletableFuture; import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerManager; +import org.apache.rocketmq.client.consumer.AckResult; +import org.apache.rocketmq.client.consumer.AckStatus; import org.apache.rocketmq.common.consumer.ReceiptHandle; import org.apache.rocketmq.common.message.MessageClientIDSetter; import org.apache.rocketmq.proxy.common.ContextVariable; @@ -29,6 +32,7 @@ import org.apache.rocketmq.proxy.config.ProxyConfig; import org.apache.rocketmq.proxy.service.ServiceManager; import org.apache.rocketmq.proxy.service.metadata.MetadataService; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -90,12 +94,33 @@ @Test public void testStart() throws Exception { + AckResult ackResult = new AckResult(); + ackResult.setStatus(AckStatus.OK); + ackResult.setExtraInfo(messageReceiptHandle.getReceiptHandleStr()); + Mockito.when(consumerManager.findChannel(Mockito.eq(CONSUMER_GROUP), Mockito.eq(PROXY_CONTEXT.getChannel()))) + .thenReturn(Mockito.mock(ClientChannelInfo.class)); + Mockito.when(messagingProcessor.changeInvisibleTime(Mockito.any(ProxyContext.class), + Mockito.any(ReceiptHandle.class), Mockito.anyString(), Mockito.anyString(), Mockito.anyString(), + Mockito.anyLong(), Mockito.nullable(String.class))) + .thenReturn(CompletableFuture.completedFuture(ackResult)); + receiptHandleProcessor.start(); receiptHandleProcessor.addReceiptHandle(PROXY_CONTEXT, PROXY_CONTEXT.getChannel(), CONSUMER_GROUP, MSG_ID, messageReceiptHandle); - Mockito.when(consumerManager.findChannel(Mockito.eq(CONSUMER_GROUP), Mockito.eq(PROXY_CONTEXT.getChannel()))).thenReturn(Mockito.mock(ClientChannelInfo.class)); Mockito.verify(messagingProcessor, Mockito.timeout(10000).times(1)) .changeInvisibleTime(Mockito.any(ProxyContext.class), Mockito.any(ReceiptHandle.class), Mockito.eq(MESSAGE_ID), Mockito.eq(CONSUMER_GROUP), Mockito.eq(TOPIC), Mockito.eq(ConfigurationManager.getProxyConfig().getDefaultInvisibleTimeMills()), Mockito.eq(null)); } + @After + @Override + public void after() { + try { + receiptHandleProcessor.shutdown(); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + super.after(); + } + } + }