Merge pull request #1554 from keranbingaa/unit-test
[ISSUE #1528]modify unit test of producer
diff --git a/README.md b/README.md
index 1c17c7e..33b4280 100644
--- a/README.md
+++ b/README.md
@@ -8,10 +8,12 @@
It offers a variety of features:
* Pub/Sub messaging model
-* Scheduled message delivery
+* Financial grade transactional message
+* A variety of cross language clients, such as Java, C/C++, Python, Go
+* Pluggable transport protocols, such as TCP, SSL, AIO
+* Inbuilt message tracing capability, also support opentracing
+* Versatile big-data and streaming ecosytem integration
* Message retroactivity by time or offset
-* Log hub for streaming
-* Big data integration
* Reliable FIFO and strict ordered messaging in the same queue
* Efficient pull&push consumption model
* Million-level message accumulation capacity in a single queue
@@ -21,9 +23,7 @@
* Various message filter mechanics such as SQL and Tag
* Docker images for isolated testing and cloud isolated clusters
* Feature-rich administrative dashboard for configuration, metrics and monitoring
-* Access control list
-* Message trace
-
+* Authentication and authorisation
----------
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index a885cd0..85009d6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -61,6 +61,7 @@
import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
import org.apache.rocketmq.broker.processor.PullMessageProcessor;
import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
+import org.apache.rocketmq.broker.processor.ReplyMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
@@ -132,6 +133,7 @@
private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue<Runnable> sendThreadPoolQueue;
private final BlockingQueue<Runnable> pullThreadPoolQueue;
+ private final BlockingQueue<Runnable> replyThreadPoolQueue;
private final BlockingQueue<Runnable> queryThreadPoolQueue;
private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
@@ -147,6 +149,7 @@
private TopicConfigManager topicConfigManager;
private ExecutorService sendMessageExecutor;
private ExecutorService pullMessageExecutor;
+ private ExecutorService replyMessageExecutor;
private ExecutorService queryMessageExecutor;
private ExecutorService adminBrokerExecutor;
private ExecutorService clientManageExecutor;
@@ -194,6 +197,7 @@
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
+ this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
@@ -277,6 +281,14 @@
this.pullThreadPoolQueue,
new ThreadFactoryImpl("PullMessageThread_"));
+ this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
+ this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
+ this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
+ 1000 * 60,
+ TimeUnit.MILLISECONDS,
+ this.replyThreadPoolQueue,
+ new ThreadFactoryImpl("ProcessReplyMessageThread_"));
+
this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
this.brokerConfig.getQueryMessageThreadPoolNums(),
this.brokerConfig.getQueryMessageThreadPoolNums(),
@@ -554,6 +566,17 @@
this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
/**
+ * ReplyMessageProcessor
+ */
+ ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
+ replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
+
+ this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
+ this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
+ this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
+ this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
+
+ /**
* QueryMessageProcessor
*/
NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
@@ -763,6 +786,10 @@
this.pullMessageExecutor.shutdown();
}
+ if (this.replyMessageExecutor != null) {
+ this.replyMessageExecutor.shutdown();
+ }
+
if (this.adminBrokerExecutor != null) {
this.adminBrokerExecutor.shutdown();
}
@@ -857,12 +884,9 @@
if (!messageStoreConfig.isEnableDLegerCommitLog()) {
startProcessorByHa(messageStoreConfig.getBrokerRole());
handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
+ this.registerBrokerAll(true, false, true);
}
-
-
- this.registerBrokerAll(true, false, true);
-
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 4b986c0..960b848 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -178,6 +178,10 @@
break;
}
+ if (messageStoreConfig.isEnableDLegerCommitLog()) {
+ brokerConfig.setBrokerId(-1);
+ }
+
messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 61ceae5..12f632b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -17,17 +17,16 @@
package org.apache.rocketmq.broker.client;
import io.netty.channel.Channel;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.InternalLogger;
@@ -43,7 +42,9 @@
private final Lock groupChannelLock = new ReentrantLock();
private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+ private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
+
public ProducerManager() {
}
@@ -53,7 +54,15 @@
try {
if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
- newGroupChannelTable.putAll(groupChannelTable);
+ Iterator<Map.Entry<String, HashMap<Channel, ClientChannelInfo>>> iter = groupChannelTable.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry = iter.next();
+ String key = entry.getKey();
+ HashMap<Channel, ClientChannelInfo> val = entry.getValue();
+ HashMap<Channel, ClientChannelInfo> tmp = new HashMap<Channel, ClientChannelInfo>();
+ tmp.putAll(val);
+ newGroupChannelTable.put(key, tmp);
+ }
} finally {
groupChannelLock.unlock();
}
@@ -82,6 +91,7 @@
long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
if (diff > CHANNEL_EXPIRED_TIMEOUT) {
it.remove();
+ clientChannelTable.remove(info.getClientId());
log.warn(
"SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
@@ -113,6 +123,7 @@
final ClientChannelInfo clientChannelInfo =
clientChannelInfoTable.remove(channel);
if (clientChannelInfo != null) {
+ clientChannelTable.remove(clientChannelInfo.getClientId());
log.info(
"NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
clientChannelInfo.toString(), remoteAddr, group);
@@ -146,6 +157,7 @@
clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
if (null == clientChannelInfoFound) {
channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
+ clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
log.info("new producer connected, group: {} channel: {}", group,
clientChannelInfo.toString());
}
@@ -171,6 +183,7 @@
HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
if (null != channelTable && !channelTable.isEmpty()) {
ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
+ clientChannelTable.remove(clientChannelInfo.getClientId());
if (old != null) {
log.info("unregister a producer[{}] from groupChannelTable {}", group,
clientChannelInfo.toString());
@@ -223,4 +236,8 @@
}
return null;
}
+
+ public Channel findChannel(String clientId) {
+ return clientChannelTable.get(clientId);
+ }
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
index c9e85ed..1d5943d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -55,7 +55,7 @@
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final EndTransactionRequestHeader requestHeader =
(EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
- LOGGER.info("Transaction request:{}", requestHeader);
+ LOGGER.debug("Transaction request:{}", requestHeader);
if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
new file mode 100644
index 0000000..565857a
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+
+public class ReplyMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+ public ReplyMessageProcessor(final BrokerController brokerController) {
+ super(brokerController);
+ }
+
+ @Override
+ public RemotingCommand processRequest(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+ SendMessageContext mqtraceContext = null;
+ SendMessageRequestHeader requestHeader = parseRequestHeader(request);
+ if (requestHeader == null) {
+ return null;
+ }
+
+ mqtraceContext = buildMsgContext(ctx, requestHeader);
+ this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
+
+ RemotingCommand response = this.processReplyMessageRequest(ctx, request, mqtraceContext, requestHeader);
+
+ this.executeSendMessageHookAfter(response, mqtraceContext);
+ return response;
+ }
+
+ @Override
+ protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request) throws RemotingCommandException {
+ SendMessageRequestHeaderV2 requestHeaderV2 = null;
+ SendMessageRequestHeader requestHeader = null;
+ switch (request.getCode()) {
+ case RequestCode.SEND_REPLY_MESSAGE_V2:
+ requestHeaderV2 =
+ (SendMessageRequestHeaderV2) request
+ .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
+ case RequestCode.SEND_REPLY_MESSAGE:
+ if (null == requestHeaderV2) {
+ requestHeader =
+ (SendMessageRequestHeader) request
+ .decodeCommandCustomHeader(SendMessageRequestHeader.class);
+ } else {
+ requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
+ }
+ default:
+ break;
+ }
+ return requestHeader;
+ }
+
+ private RemotingCommand processReplyMessageRequest(final ChannelHandlerContext ctx,
+ final RemotingCommand request,
+ final SendMessageContext sendMessageContext,
+ final SendMessageRequestHeader requestHeader) {
+ final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+ final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
+
+ response.setOpaque(request.getOpaque());
+
+ response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
+ response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
+
+ log.debug("receive SendReplyMessage request command, {}", request);
+ final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
+ if (this.brokerController.getMessageStore().now() < startTimstamp) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
+ return response;
+ }
+
+ response.setCode(-1);
+ super.msgCheck(ctx, requestHeader, response);
+ if (response.getCode() != -1) {
+ return response;
+ }
+
+ final byte[] body = request.getBody();
+
+ int queueIdInt = requestHeader.getQueueId();
+ TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+
+ if (queueIdInt < 0) {
+ queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
+ }
+
+ MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+ msgInner.setTopic(requestHeader.getTopic());
+ msgInner.setQueueId(queueIdInt);
+ msgInner.setBody(body);
+ msgInner.setFlag(requestHeader.getFlag());
+ MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
+ msgInner.setPropertiesString(requestHeader.getProperties());
+ msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
+ msgInner.setBornHost(ctx.channel().remoteAddress());
+ msgInner.setStoreHost(this.getStoreHost());
+ msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
+
+ PushReplyResult pushReplyResult = this.pushReplyMessage(ctx, requestHeader, msgInner);
+ this.handlePushReplyResult(pushReplyResult, response, responseHeader, queueIdInt);
+
+ if (this.brokerController.getBrokerConfig().isStoreReplyMessageEnable()) {
+ PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
+ this.handlePutMessageResult(putMessageResult, request, msgInner, responseHeader, sendMessageContext, queueIdInt);
+ }
+
+ return response;
+ }
+
+ private PushReplyResult pushReplyMessage(final ChannelHandlerContext ctx,
+ final SendMessageRequestHeader requestHeader,
+ final Message msg) {
+ ReplyMessageRequestHeader replyMessageRequestHeader = new ReplyMessageRequestHeader();
+ replyMessageRequestHeader.setBornHost(ctx.channel().remoteAddress().toString());
+ replyMessageRequestHeader.setStoreHost(this.getStoreHost().toString());
+ replyMessageRequestHeader.setStoreTimestamp(System.currentTimeMillis());
+ replyMessageRequestHeader.setProducerGroup(requestHeader.getProducerGroup());
+ replyMessageRequestHeader.setTopic(requestHeader.getTopic());
+ replyMessageRequestHeader.setDefaultTopic(requestHeader.getDefaultTopic());
+ replyMessageRequestHeader.setDefaultTopicQueueNums(requestHeader.getDefaultTopicQueueNums());
+ replyMessageRequestHeader.setQueueId(requestHeader.getQueueId());
+ replyMessageRequestHeader.setSysFlag(requestHeader.getSysFlag());
+ replyMessageRequestHeader.setBornTimestamp(requestHeader.getBornTimestamp());
+ replyMessageRequestHeader.setFlag(requestHeader.getFlag());
+ replyMessageRequestHeader.setProperties(requestHeader.getProperties());
+ replyMessageRequestHeader.setReconsumeTimes(requestHeader.getReconsumeTimes());
+ replyMessageRequestHeader.setUnitMode(requestHeader.isUnitMode());
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, replyMessageRequestHeader);
+ request.setBody(msg.getBody());
+
+ String senderId = msg.getProperties().get(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);
+ PushReplyResult pushReplyResult = new PushReplyResult(false);
+
+ if (senderId != null) {
+ Channel channel = this.brokerController.getProducerManager().findChannel(senderId);
+ if (channel != null) {
+ msg.getProperties().put(MessageConst.PROPERTY_PUSH_REPLY_TIME, String.valueOf(System.currentTimeMillis()));
+ replyMessageRequestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
+
+ try {
+ RemotingCommand pushResponse = this.brokerController.getBroker2Client().callClient(channel, request);
+ assert pushResponse != null;
+ switch (pushResponse.getCode()) {
+ case ResponseCode.SUCCESS: {
+ pushReplyResult.setPushOk(true);
+ break;
+ }
+ default: {
+ pushReplyResult.setPushOk(false);
+ pushReplyResult.setRemark("push reply message to " + senderId + "fail.");
+ log.warn("push reply message to <{}> return fail, response remark: {}", senderId, pushResponse.getRemark());
+ }
+ }
+ } catch (RemotingException | InterruptedException e) {
+ pushReplyResult.setPushOk(false);
+ pushReplyResult.setRemark("push reply message to " + senderId + "fail.");
+ log.warn("push reply message to <{}> fail. {}", senderId, channel, e);
+ }
+ } else {
+ pushReplyResult.setPushOk(false);
+ pushReplyResult.setRemark("push reply message fail, channel of <" + senderId + "> not found.");
+ log.warn(pushReplyResult.getRemark());
+ }
+ } else {
+ log.warn(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT + " is null, can not reply message");
+ pushReplyResult.setPushOk(false);
+ pushReplyResult.setRemark("reply message properties[" + MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT + "] is null");
+ }
+ return pushReplyResult;
+ }
+
+ private void handlePushReplyResult(PushReplyResult pushReplyResult, final RemotingCommand response,
+ final SendMessageResponseHeader responseHeader, int queueIdInt) {
+
+ if (!pushReplyResult.isPushOk()) {
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark(pushReplyResult.getRemark());
+ } else {
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ //set to zore to avoid client decoding exception
+ responseHeader.setMsgId("0");
+ responseHeader.setQueueId(queueIdInt);
+ responseHeader.setQueueOffset(0L);
+ }
+ }
+
+ private void handlePutMessageResult(PutMessageResult putMessageResult,
+ final RemotingCommand request, final MessageExt msg,
+ final SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext,
+ int queueIdInt) {
+ if (putMessageResult == null) {
+ log.warn("process reply message, store putMessage return null");
+ return;
+ }
+ boolean putOk = false;
+
+ switch (putMessageResult.getPutMessageStatus()) {
+ // Success
+ case PUT_OK:
+ case FLUSH_DISK_TIMEOUT:
+ case FLUSH_SLAVE_TIMEOUT:
+ case SLAVE_NOT_AVAILABLE:
+ putOk = true;
+ break;
+
+ // Failed
+ case CREATE_MAPEDFILE_FAILED:
+ log.info("create mapped file failed, server is busy or broken.");
+ break;
+ case MESSAGE_ILLEGAL:
+ log.info(
+ "the message is illegal, maybe msg properties length limit 32k.");
+ break;
+ case PROPERTIES_SIZE_EXCEEDED:
+ log.info(
+ "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k.");
+ break;
+ case SERVICE_NOT_AVAILABLE:
+ log.info(
+ "service not available now, maybe disk full, maybe your broker machine memory too small.");
+ break;
+ case OS_PAGECACHE_BUSY:
+ log.info("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
+ break;
+ case UNKNOWN_ERROR:
+ log.info("UNKNOWN_ERROR");
+ break;
+ default:
+ log.info("UNKNOWN_ERROR DEFAULT");
+ break;
+ }
+
+ String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
+ if (putOk) {
+ this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
+ this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
+ putMessageResult.getAppendMessageResult().getWroteBytes());
+ this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
+
+ responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
+ responseHeader.setQueueId(queueIdInt);
+ responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
+
+ if (hasSendMessageHook()) {
+ sendMessageContext.setMsgId(responseHeader.getMsgId());
+ sendMessageContext.setQueueId(responseHeader.getQueueId());
+ sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
+
+ int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
+ int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
+ int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
+
+ sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
+ sendMessageContext.setCommercialSendTimes(incValue);
+ sendMessageContext.setCommercialSendSize(wroteSize);
+ sendMessageContext.setCommercialOwner(owner);
+ }
+ } else {
+ if (hasSendMessageHook()) {
+ int wroteSize = request.getBody().length;
+ int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
+
+ sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
+ sendMessageContext.setCommercialSendTimes(incValue);
+ sendMessageContext.setCommercialSendSize(wroteSize);
+ sendMessageContext.setCommercialOwner(owner);
+ }
+ }
+ }
+
+ class PushReplyResult {
+ boolean pushOk;
+ String remark;
+
+ public PushReplyResult(boolean pushOk) {
+ this.pushOk = pushOk;
+ remark = "";
+ }
+
+ public boolean isPushOk() {
+ return pushOk;
+ }
+
+ public void setPushOk(boolean pushOk) {
+ this.pushOk = pushOk;
+ }
+
+ public String getRemark() {
+ return remark;
+ }
+
+ public void setRemark(String remark) {
+ this.remark = remark;
+ }
+ }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 8035ae6..2589a75 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -343,11 +343,13 @@
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
- msgInner.setPropertiesString(requestHeader.getProperties());
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
+ String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
+ MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
+ msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
PutMessageResult putMessageResult = null;
Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
@@ -536,6 +538,8 @@
messageExtBatch.setBornHost(ctx.channel().remoteAddress());
messageExtBatch.setStoreHost(this.getStoreHost());
messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
+ String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
+ MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName);
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 8f215cd..cb29011 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -34,11 +34,11 @@
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.constant.PermName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
public class TopicConfigManager extends ConfigManager {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -134,6 +134,14 @@
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
+ {
+ String topic = this.brokerController.getBrokerConfig().getBrokerClusterName() + "_" + MixAll.REPLY_TOPIC_POSTFIX;
+ TopicConfig topicConfig = new TopicConfig(topic);
+ this.systemTopicList.add(topic);
+ topicConfig.setReadQueueNums(1);
+ topicConfig.setWriteQueueNums(1);
+ this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+ }
}
public boolean isSystemTopic(final String topic) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
index 62507cd..35d8112 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
@@ -30,6 +30,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.TimeUnit;
public abstract class AbstractTransactionalMessageCheckListener {
@@ -48,7 +49,7 @@
thread.setName("Transaction-msg-check-thread");
return thread;
}
- });
+ }, new CallerRunsPolicy());
public AbstractTransactionalMessageCheckListener() {
}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
index 84a6276..67f7a5f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
@@ -131,6 +131,9 @@
this.brokerController.getBrokerStatsManager().incGroupGetSize(group, topic,
getMessageResult.getBufferTotalSize());
this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
+ if (foundList == null || foundList.size() == 0) {
+ break;
+ }
this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId,
this.brokerController.getMessageStore().now() - foundList.get(foundList.size() - 1)
.getStoreTimestamp());
@@ -141,6 +144,7 @@
getMessageResult.getStatus(), topic, group, offset);
break;
case NO_MESSAGE_IN_QUEUE:
+ case OFFSET_OVERFLOW_ONE:
pullStatus = PullStatus.NO_NEW_MSG;
LOGGER.warn("No new message. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}",
getMessageResult.getStatus(), topic, group, offset);
@@ -149,7 +153,6 @@
case NO_MATCHED_LOGIC_QUEUE:
case OFFSET_FOUND_NULL:
case OFFSET_OVERFLOW_BADLY:
- case OFFSET_OVERFLOW_ONE:
case OFFSET_TOO_SMALL:
pullStatus = PullStatus.OFFSET_ILLEGAL;
LOGGER.warn("Offset illegal. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}",
@@ -175,8 +178,10 @@
try {
List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList();
for (ByteBuffer bb : messageBufferList) {
- MessageExt msgExt = MessageDecoder.decode(bb);
- foundList.add(msgExt);
+ MessageExt msgExt = MessageDecoder.decode(bb, true, false);
+ if (msgExt != null) {
+ foundList.add(msgExt);
+ }
}
} finally {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
index e1549b1..71b575e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
@@ -159,7 +159,8 @@
}
if (removeMap.containsKey(i)) {
log.info("Half offset {} has been committed/rolled back", i);
- removeMap.remove(i);
+ Long removedOpOffset = removeMap.remove(i);
+ doneOpOffset.add(removedOpOffset);
} else {
GetResult getResult = getHalfMsg(messageQueue, i);
MessageExt msgExt = getResult.getMsg();
@@ -223,7 +224,7 @@
listener.resolveHalfMsg(msgExt);
} else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
- log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
+ log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
@@ -292,7 +293,7 @@
}
for (MessageExt opMessageExt : opMsg) {
Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
- log.info("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(),
+ log.debug("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(),
opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);
if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
if (queueOffset < miniOffset) {
@@ -460,7 +461,7 @@
@Override
public boolean deletePrepareMessage(MessageExt msgExt) {
if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
- log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
+ log.debug("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
return true;
} else {
log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
index 08dbb9c..d9539b6 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
@@ -20,6 +20,7 @@
import io.netty.channel.ChannelFuture;
import java.lang.reflect.Field;
import java.util.HashMap;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -42,14 +43,14 @@
@Before
public void init() {
producerManager = new ProducerManager();
- clientInfo = new ClientChannelInfo(channel);
+ clientInfo = new ClientChannelInfo(channel, "clientId", LanguageCode.JAVA, 0);
}
@Test
public void scanNotActiveChannel() throws Exception {
producerManager.registerProducer(group, clientInfo);
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
-
+ assertThat(producerManager.findChannel("clientId")).isNotNull();
Field field = ProducerManager.class.getDeclaredField("CHANNEL_EXPIRED_TIMEOUT");
field.setAccessible(true);
long CHANNEL_EXPIRED_TIMEOUT = field.getLong(producerManager);
@@ -57,22 +58,28 @@
when(channel.close()).thenReturn(mock(ChannelFuture.class));
producerManager.scanNotActiveChannel();
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull();
+ assertThat(producerManager.findChannel("clientId")).isNull();
}
@Test
public void doChannelCloseEvent() throws Exception {
producerManager.registerProducer(group, clientInfo);
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
+ assertThat(producerManager.findChannel("clientId")).isNotNull();
producerManager.doChannelCloseEvent("127.0.0.1", channel);
assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull();
+ assertThat(producerManager.findChannel("clientId")).isNull();
}
@Test
public void testRegisterProducer() throws Exception {
producerManager.registerProducer(group, clientInfo);
HashMap<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
+ Channel channel1 = producerManager.findChannel("clientId");
assertThat(channelMap).isNotNull();
+ assertThat(channel1).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
+ assertThat(channel1).isEqualTo(channel);
}
@Test
@@ -81,10 +88,23 @@
HashMap<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
assertThat(channelMap).isNotNull();
assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
-
+ Channel channel1 = producerManager.findChannel("clientId");
+ assertThat(channel1).isNotNull();
+ assertThat(channel1).isEqualTo(channel);
producerManager.unregisterProducer(group, clientInfo);
channelMap = producerManager.getGroupChannelTable().get(group);
+ channel1 = producerManager.findChannel("clientId");
assertThat(channelMap).isNull();
+ assertThat(channel1).isNull();
+
}
+ @Test
+ public void testGetGroupChannelTable() throws Exception {
+ producerManager.registerProducer(group, clientInfo);
+ HashMap<Channel, ClientChannelInfo> oldMap = producerManager.getGroupChannelTable().get(group);
+
+ producerManager.unregisterProducer(group, clientInfo);
+ assertThat(oldMap.size()).isNotEqualTo(0);
+ }
}
\ No newline at end of file
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
new file mode 100644
index 0000000..85c7750
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.net.Broker2Client;
+import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.AppendMessageResult;
+import org.apache.rocketmq.store.AppendMessageStatus;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ReplyMessageProcessorTest {
+ private ReplyMessageProcessor replyMessageProcessor;
+ @Spy
+ private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
+ @Mock
+ private ChannelHandlerContext handlerContext;
+ @Mock
+ private MessageStore messageStore;
+ @Mock
+ private Channel channel;
+
+ private String topic = "FooBar";
+ private String group = "FooBarGroup";
+ private ClientChannelInfo clientInfo;
+ @Mock
+ private Broker2Client broker2Client;
+
+ @Before
+ public void init() throws IllegalAccessException, NoSuchFieldException {
+ clientInfo = new ClientChannelInfo(channel, "127.0.0.1", LanguageCode.JAVA, 0);
+ brokerController.setMessageStore(messageStore);
+ Field field = BrokerController.class.getDeclaredField("broker2Client");
+ field.setAccessible(true);
+ field.set(brokerController, broker2Client);
+ when(messageStore.now()).thenReturn(System.currentTimeMillis());
+ Channel mockChannel = mock(Channel.class);
+ when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
+ when(handlerContext.channel()).thenReturn(mockChannel);
+ replyMessageProcessor = new ReplyMessageProcessor(brokerController);
+ }
+
+ @Test
+ public void testProcessRequest_Success() throws RemotingCommandException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException {
+ when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
+ brokerController.getProducerManager().registerProducer(group, clientInfo);
+ final RemotingCommand request = createSendMessageRequestHeaderCommand(RequestCode.SEND_REPLY_MESSAGE);
+ when(brokerController.getBroker2Client().callClient(any(Channel.class), any(RemotingCommand.class))).thenReturn(createResponse(ResponseCode.SUCCESS, request));
+ RemotingCommand responseToReturn = replyMessageProcessor.processRequest(handlerContext, request);
+ assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
+ }
+
+ private RemotingCommand createSendMessageRequestHeaderCommand(int requestCode) {
+ SendMessageRequestHeader requestHeader = createSendMessageRequestHeader();
+ RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
+ request.setBody(new byte[] {'a'});
+ request.makeCustomHeaderToNet();
+ return request;
+ }
+
+ private SendMessageRequestHeader createSendMessageRequestHeader() {
+ SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
+ requestHeader.setProducerGroup(group);
+ requestHeader.setTopic(topic);
+ requestHeader.setDefaultTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC);
+ requestHeader.setDefaultTopicQueueNums(3);
+ requestHeader.setQueueId(1);
+ requestHeader.setSysFlag(0);
+ requestHeader.setBornTimestamp(System.currentTimeMillis());
+ requestHeader.setFlag(124);
+ requestHeader.setReconsumeTimes(0);
+ Map<String, String> map = new HashMap<String, String>();
+ map.put(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, "127.0.0.1");
+ requestHeader.setProperties(MessageDecoder.messageProperties2String(map));
+ return requestHeader;
+ }
+
+ private RemotingCommand createResponse(int code, RemotingCommand request) {
+ RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+ response.setCode(code);
+ response.setOpaque(request.getOpaque());
+ return response;
+ }
+}
\ No newline at end of file
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java
index b1c669c..ebe8872 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java
@@ -167,6 +167,24 @@
assertThat(messageExt).isNotNull();
}
+ @Test
+ public void testGetHalfMessageStatusFound() {
+ when(messageStore
+ .getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class)))
+ .thenReturn(createGetMessageResult(GetMessageStatus.FOUND));
+ PullResult result = transactionBridge.getHalfMessage(0, 0, 1);
+ assertThat(result.getPullStatus()).isEqualTo(PullStatus.FOUND);
+ }
+
+ @Test
+ public void testGetHalfMessageNull() {
+ when(messageStore
+ .getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class)))
+ .thenReturn(null);
+ PullResult result = transactionBridge.getHalfMessage(0, 0, 1);
+ assertThat(result).isNull();
+ }
+
private GetMessageResult createGetMessageResult(GetMessageStatus status) {
GetMessageResult getMessageResult = new GetMessageResult();
getMessageResult.setStatus(status);
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index c3e4efa..d0ae5e1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -53,6 +53,7 @@
* Offset persistent interval for consumer
*/
private int persistConsumerOffsetInterval = 1000 * 5;
+ private long pullTimeDelayMillsWhenException = 1000;
private boolean unitMode = false;
private String unitName;
private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false"));
@@ -148,6 +149,7 @@
this.pollNameServerInterval = cc.pollNameServerInterval;
this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval;
this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval;
+ this.pullTimeDelayMillsWhenException = cc.pullTimeDelayMillsWhenException;
this.unitMode = cc.unitMode;
this.unitName = cc.unitName;
this.vipChannelEnabled = cc.vipChannelEnabled;
@@ -165,6 +167,7 @@
cc.pollNameServerInterval = pollNameServerInterval;
cc.heartbeatBrokerInterval = heartbeatBrokerInterval;
cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
+ cc.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
cc.unitMode = unitMode;
cc.unitName = unitName;
cc.vipChannelEnabled = vipChannelEnabled;
@@ -222,6 +225,14 @@
this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
}
+ public long getPullTimeDelayMillsWhenException() {
+ return pullTimeDelayMillsWhenException;
+ }
+
+ public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
+ this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
+ }
+
public String getUnitName() {
return unitName;
}
@@ -287,12 +298,13 @@
this.accessChannel = accessChannel;
}
+
@Override
public String toString() {
return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
+ ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
- + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
- + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+ + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval
+ + ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+ vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + "]";
}
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
index 62a95df..bc03b14 100644
--- a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
+++ b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
@@ -23,4 +23,6 @@
public static final int BROKER_NOT_EXIST_EXCEPTION = 10003;
public static final int NO_NAME_SERVER_EXCEPTION = 10004;
public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005;
+ public static final int REQUEST_TIMEOUT_EXCEPTION = 10006;
+ public static final int CREATE_REPLY_MESSAGE_EXCEPTION = 10007;
}
\ No newline at end of file
diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java b/client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java
new file mode 100644
index 0000000..2d756ec
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.exception;
+
+import org.apache.rocketmq.common.UtilAll;
+
+public class RequestTimeoutException extends Exception {
+ private static final long serialVersionUID = -5758410930844185841L;
+ private int responseCode;
+ private String errorMessage;
+
+ public RequestTimeoutException(String errorMessage, Throwable cause) {
+ super(errorMessage, cause);
+ this.responseCode = -1;
+ this.errorMessage = errorMessage;
+ }
+
+ public RequestTimeoutException(int responseCode, String errorMessage) {
+ super("CODE: " + UtilAll.responseCode2String(responseCode) + " DESC: "
+ + errorMessage);
+ this.responseCode = responseCode;
+ this.errorMessage = errorMessage;
+ }
+
+ public int getResponseCode() {
+ return responseCode;
+ }
+
+ public RequestTimeoutException setResponseCode(final int responseCode) {
+ this.responseCode = responseCode;
+ return this;
+ }
+
+ public String getErrorMessage() {
+ return errorMessage;
+ }
+
+ public void setErrorMessage(final String errorMessage) {
+ this.errorMessage = errorMessage;
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
index 0bd810a..5861bc4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
@@ -16,16 +16,19 @@
*/
package org.apache.rocketmq.client.impl;
+import io.netty.channel.ChannelHandlerContext;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
-
-import io.netty.channel.ChannelHandlerContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.MQProducerInner;
import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.RequestFutureTable;
+import org.apache.rocketmq.client.producer.RequestResponseFuture;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
@@ -42,14 +45,16 @@
import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
public class ClientRemotingProcessor implements NettyRequestProcessor {
private final InternalLogger log = ClientLogger.getLog();
private final MQClientInstance mqClientFactory;
@@ -76,6 +81,9 @@
case RequestCode.CONSUME_MESSAGE_DIRECTLY:
return this.consumeMessageDirectly(ctx, request);
+
+ case RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT:
+ return this.receiveReplyMessage(ctx, request);
default:
break;
}
@@ -213,4 +221,73 @@
return response;
}
+
+ private RemotingCommand receiveReplyMessage(ChannelHandlerContext ctx,
+ RemotingCommand request) throws RemotingCommandException {
+
+ final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+ long receiveTime = System.currentTimeMillis();
+ ReplyMessageRequestHeader requestHeader = (ReplyMessageRequestHeader) request.decodeCommandCustomHeader(ReplyMessageRequestHeader.class);
+
+ try {
+ MessageExt msg = new MessageExt();
+ msg.setTopic(requestHeader.getTopic());
+ msg.setQueueId(requestHeader.getQueueId());
+ msg.setStoreTimestamp(requestHeader.getStoreTimestamp());
+
+ if (requestHeader.getBornHost() != null) {
+ msg.setBornHost(RemotingUtil.string2SocketAddress(requestHeader.getBornHost()));
+ }
+
+ if (requestHeader.getStoreHost() != null) {
+ msg.setStoreHost(RemotingUtil.string2SocketAddress(requestHeader.getStoreHost()));
+ }
+
+ byte[] body = request.getBody();
+ if ((requestHeader.getSysFlag() & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {
+ try {
+ body = UtilAll.uncompress(body);
+ } catch (IOException e) {
+ log.warn("err when uncompress constant", e);
+ }
+ }
+ msg.setBody(body);
+ msg.setFlag(requestHeader.getFlag());
+ MessageAccessor.setProperties(msg, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REPLY_MESSAGE_ARRIVE_TIME, String.valueOf(receiveTime));
+ msg.setBornTimestamp(requestHeader.getBornTimestamp());
+ msg.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
+ log.debug("receive reply message :{}", msg);
+
+ processReplyMessage(msg);
+
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ } catch (Exception e) {
+ log.warn("unknown err when receiveReplyMsg", e);
+ response.setCode(ResponseCode.SYSTEM_ERROR);
+ response.setRemark("process reply message fail");
+ }
+ return response;
+ }
+
+ private void processReplyMessage(MessageExt replyMsg) {
+ final String correlationId = replyMsg.getUserProperty(MessageConst.PROPERTY_CORRELATION_ID);
+ final RequestResponseFuture requestResponseFuture = RequestFutureTable.getRequestFutureTable().get(correlationId);
+ if (requestResponseFuture != null) {
+ requestResponseFuture.putResponseMessage(replyMsg);
+
+ RequestFutureTable.getRequestFutureTable().remove(correlationId);
+
+ if (requestResponseFuture.getRequestCallback() != null) {
+ requestResponseFuture.getRequestCallback().onSuccess(replyMsg);
+ } else {
+ requestResponseFuture.putResponseMessage(replyMsg);
+ }
+ } else {
+ String bornHost = replyMsg.getBornHostString();
+ log.warn(String.format("receive reply message, but not matched any request, CorrelationId: %s , reply from host: %s",
+ correlationId, bornHost));
+ }
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index b9ace0f..1ad5fbf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -27,7 +27,6 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.PullCallback;
@@ -201,6 +200,8 @@
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
+
+ this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
}
public List<String> getNameServerAddressList() {
@@ -304,8 +305,8 @@
requestHeader.setDefaultGroupPerm(plainAccessConfig.getDefaultGroupPerm());
requestHeader.setDefaultTopicPerm(plainAccessConfig.getDefaultTopicPerm());
requestHeader.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress());
- requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(),","));
- requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(),","));
+ requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(), ","));
+ requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(), ","));
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_ACL_CONFIG, requestHeader);
@@ -344,7 +345,7 @@
throw new MQClientException(response.getCode(), response.getRemark());
}
- public void updateGlobalWhiteAddrsConfig(final String addr, final String globalWhiteAddrs,final long timeoutMillis)
+ public void updateGlobalWhiteAddrsConfig(final String addr, final String globalWhiteAddrs, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
UpdateGlobalWhiteAddrsConfigRequestHeader requestHeader = new UpdateGlobalWhiteAddrsConfigRequestHeader();
@@ -366,7 +367,8 @@
throw new MQClientException(response.getCode(), response.getRemark());
}
- public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
+ public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,
+ final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
RemotingSendRequestException, RemotingConnectException, MQBrokerException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_INFO, null);
@@ -389,7 +391,7 @@
}
throw new MQBrokerException(response.getCode(), response.getRemark());
-
+
}
public AclConfig getBrokerClusterConfig(final String addr, final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
@@ -445,13 +447,23 @@
) throws RemotingException, MQBrokerException, InterruptedException {
long beginStartTime = System.currentTimeMillis();
RemotingCommand request = null;
- if (sendSmartMsg || msg instanceof MessageBatch) {
- SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
- request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
+ String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
+ boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
+ if (isReply) {
+ if (sendSmartMsg) {
+ SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
+ request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
+ } else {
+ request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
+ }
} else {
- request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
+ if (sendSmartMsg || msg instanceof MessageBatch) {
+ SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
+ request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
+ } else {
+ request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
+ }
}
-
request.setBody(msg.getBody());
switch (communicationMode) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index a37c3a0..2c673a1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -106,7 +106,7 @@
/**
* Delay some time when exception occur
*/
- private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 1000;
+ private long pullTimeDelayMillsWhenException = 1000;
/**
* Flow control interval
*/
@@ -156,6 +156,7 @@
return new Thread(r, "MonitorMessageQueueChangeThread");
}
});
+ this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
}
private void checkServiceState() {
@@ -783,7 +784,7 @@
}
updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
} catch (Throwable e) {
- pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION;
+ pullDelayTimeMills = pullTimeDelayMillsWhenException;
log.error("An error occurred in pull message process.", e);
}
@@ -1070,4 +1071,8 @@
}
}
+
+ public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
+ this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index d1b5de1..807e9c6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -83,7 +83,7 @@
/**
* Delay some time when exception occur
*/
- private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
+ private long pullTimeDelayMillsWhenException = 3000;
/**
* Flow control interval
*/
@@ -115,6 +115,7 @@
public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
this.defaultMQPushConsumer = defaultMQPushConsumer;
this.rpcHook = rpcHook;
+ this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException();
}
public void registerFilterMessageHook(final FilterMessageHook hook) {
@@ -222,7 +223,7 @@
this.makeSureStateOK();
} catch (MQClientException e) {
log.warn("pullMessage exception, consumer state not ok", e);
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+ this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
return;
}
@@ -282,7 +283,7 @@
pullRequest.setNextOffset(offset);
}
} else {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+ this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.info("pull message later because not locked in broker, {}", pullRequest);
return;
}
@@ -290,7 +291,7 @@
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+ this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
log.warn("find the consumer's subscription failed, {}", pullRequest);
return;
}
@@ -397,7 +398,7 @@
log.warn("execute the pull request exception", e);
}
- DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+ DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
};
@@ -444,7 +445,7 @@
);
} catch (Exception e) {
log.error("pullKernelImpl exception", e);
- this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+ this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
}
@@ -1168,4 +1169,8 @@
this.consumeMessageService = consumeMessageService;
}
+
+ public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
+ this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 95696d9..fca50cc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -24,6 +24,8 @@
import java.util.List;
import java.util.Random;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -39,6 +41,7 @@
import org.apache.rocketmq.client.common.ClientErrorCode;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.hook.CheckForbiddenContext;
import org.apache.rocketmq.client.hook.CheckForbiddenHook;
import org.apache.rocketmq.client.hook.SendMessageContext;
@@ -52,6 +55,9 @@
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.RequestCallback;
+import org.apache.rocketmq.client.producer.RequestFutureTable;
+import org.apache.rocketmq.client.producer.RequestResponseFuture;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
@@ -79,6 +85,7 @@
import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.common.utils.CorrelationIdUtil;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -95,17 +102,16 @@
new ConcurrentHashMap<String, TopicPublishInfo>();
private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final RPCHook rpcHook;
+ private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
+ private final ExecutorService defaultAsyncSenderExecutor;
+ private final Timer timer = new Timer("RequestHouseKeepingService", true);
protected BlockingQueue<Runnable> checkRequestQueue;
protected ExecutorService checkExecutor;
private ServiceState serviceState = ServiceState.CREATE_JUST;
private MQClientInstance mQClientFactory;
private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
-
private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
-
- private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
- private final ExecutorService defaultAsyncSenderExecutor;
private ExecutorService asyncSenderExecutor;
public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
@@ -212,6 +218,17 @@
}
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+
+ this.timer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ RequestFutureTable.scanExpiredRequest();
+ } catch (Throwable e) {
+ log.error("scan RequestFutureTable exception", e);
+ }
+ }
+ }, 1000 * 3, 1000);
}
private void checkConfig() throws MQClientException {
@@ -1188,6 +1205,12 @@
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
+
+ // ignore DelayTimeLevel parameter
+ if (msg.getDelayTimeLevel() != 0) {
+ MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+ }
+
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
@@ -1319,6 +1342,233 @@
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
+ public Message request(Message msg,
+ long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {
+ long beginTimestamp = System.currentTimeMillis();
+ prepareSendRequest(msg, timeout);
+ final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+
+ try {
+ final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
+ RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
+
+ long cost = System.currentTimeMillis() - beginTimestamp;
+ this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ requestResponseFuture.setSendReqeustOk(true);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ requestResponseFuture.setSendReqeustOk(false);
+ requestResponseFuture.putResponseMessage(null);
+ requestResponseFuture.setCause(e);
+ }
+ }, timeout - cost);
+
+ Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
+ if (responseMessage == null) {
+ if (requestResponseFuture.isSendRequestOk()) {
+ throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
+ "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
+ } else {
+ throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
+ }
+ }
+ return responseMessage;
+ } finally {
+ RequestFutureTable.getRequestFutureTable().remove(correlationId);
+ }
+ }
+
+ public void request(Message msg, final RequestCallback requestCallback, long timeout)
+ throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
+ long beginTimestamp = System.currentTimeMillis();
+ prepareSendRequest(msg, timeout);
+ final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+
+ final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
+ RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
+
+ long cost = System.currentTimeMillis() - beginTimestamp;
+ this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ requestResponseFuture.setSendReqeustOk(true);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ requestResponseFuture.setCause(e);
+ requestFail(correlationId);
+ }
+ }, timeout - cost);
+ }
+
+ public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final long timeout) throws MQClientException, RemotingException, MQBrokerException,
+ InterruptedException, RequestTimeoutException {
+ long beginTimestamp = System.currentTimeMillis();
+ prepareSendRequest(msg, timeout);
+ final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+
+ try {
+ final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
+ RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
+
+ long cost = System.currentTimeMillis() - beginTimestamp;
+ this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ requestResponseFuture.setSendReqeustOk(true);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ requestResponseFuture.setSendReqeustOk(false);
+ requestResponseFuture.putResponseMessage(null);
+ requestResponseFuture.setCause(e);
+ }
+ }, timeout - cost);
+
+ Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
+ if (responseMessage == null) {
+ if (requestResponseFuture.isSendRequestOk()) {
+ throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
+ "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
+ } else {
+ throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
+ }
+ }
+ return responseMessage;
+ } finally {
+ RequestFutureTable.getRequestFutureTable().remove(correlationId);
+ }
+ }
+
+ public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final RequestCallback requestCallback, final long timeout)
+ throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
+ long beginTimestamp = System.currentTimeMillis();
+ prepareSendRequest(msg, timeout);
+ final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+
+ final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
+ RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
+
+ long cost = System.currentTimeMillis() - beginTimestamp;
+ this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ requestResponseFuture.setSendReqeustOk(true);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ requestResponseFuture.setCause(e);
+ requestFail(correlationId);
+ }
+ }, timeout - cost);
+
+ }
+
+ public Message request(final Message msg, final MessageQueue mq, final long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException {
+ long beginTimestamp = System.currentTimeMillis();
+ prepareSendRequest(msg, timeout);
+ final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+
+ try {
+ final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
+ RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
+
+ long cost = System.currentTimeMillis() - beginTimestamp;
+ this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ requestResponseFuture.setSendReqeustOk(true);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ requestResponseFuture.setSendReqeustOk(false);
+ requestResponseFuture.putResponseMessage(null);
+ requestResponseFuture.setCause(e);
+ }
+ }, null, timeout - cost);
+
+ Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
+ if (responseMessage == null) {
+ if (requestResponseFuture.isSendRequestOk()) {
+ throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
+ "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
+ } else {
+ throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
+ }
+ }
+ return responseMessage;
+ } finally {
+ RequestFutureTable.getRequestFutureTable().remove(correlationId);
+ }
+ }
+
+ public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
+ throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
+ long beginTimestamp = System.currentTimeMillis();
+ prepareSendRequest(msg, timeout);
+ final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+
+ final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
+ RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
+
+ long cost = System.currentTimeMillis() - beginTimestamp;
+ this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ requestResponseFuture.setSendReqeustOk(true);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ requestResponseFuture.setCause(e);
+ requestFail(correlationId);
+ }
+ }, null, timeout - cost);
+ }
+
+ private void requestFail(final String correlationId) {
+ RequestResponseFuture responseFuture = RequestFutureTable.getRequestFutureTable().remove(correlationId);
+ if (responseFuture != null) {
+ responseFuture.setSendReqeustOk(false);
+ responseFuture.putResponseMessage(null);
+ try {
+ responseFuture.executeRequestCallback();
+ } catch (Exception e) {
+ log.warn("execute requestCallback in requestFail, and callback throw", e);
+ }
+ }
+ }
+
+ private void prepareSendRequest(final Message msg, long timeout) {
+ String correlationId = CorrelationIdUtil.createCorrelationId();
+ String requestClientId = this.getmQClientFactory().getClientId();
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, correlationId);
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, requestClientId);
+ MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TTL, String.valueOf(timeout));
+
+ boolean hasRouteData = this.getmQClientFactory().getTopicRouteTable().containsKey(msg.getTopic());
+ if (!hasRouteData) {
+ long beginTimestamp = System.currentTimeMillis();
+ this.tryToFindTopicPublishInfo(msg.getTopic());
+ this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
+ long cost = System.currentTimeMillis() - beginTimestamp;
+ if (cost > 500) {
+ log.warn("prepare send request for <{}> cost {} ms", msg.getTopic(), cost);
+ }
+ }
+ }
+
public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
return topicPublishInfoTable;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index b4acf8f..faa79f5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -24,6 +24,7 @@
import org.apache.rocketmq.client.Validators;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
@@ -42,38 +43,29 @@
import org.apache.rocketmq.remoting.exception.RemotingException;
/**
- * This class is the entry point for applications intending to send messages.
- * </p>
+ * This class is the entry point for applications intending to send messages. </p>
*
* It's fine to tune fields which exposes getter/setter methods, but keep in mind, all of them should work well out of
- * box for most scenarios.
- * </p>
+ * box for most scenarios. </p>
*
* This class aggregates various <code>send</code> methods to deliver messages to brokers. Each of them has pros and
- * cons; you'd better understand strengths and weakness of them before actually coding.
- * </p>
+ * cons; you'd better understand strengths and weakness of them before actually coding. </p>
*
- * <p>
- * <strong>Thread Safety:</strong> After configuring and starting process, this class can be regarded as thread-safe
- * and used among multiple threads context.
- * </p>
+ * <p> <strong>Thread Safety:</strong> After configuring and starting process, this class can be regarded as thread-safe
+ * and used among multiple threads context. </p>
*/
public class DefaultMQProducer extends ClientConfig implements MQProducer {
- private final InternalLogger log = ClientLogger.getLog();
-
/**
* Wrapping internal implementations for virtually all methods presented in this class.
*/
protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
-
+ private final InternalLogger log = ClientLogger.getLog();
/**
* Producer group conceptually aggregates all producer instances of exactly same role, which is particularly
- * important when transactional messages are involved.
- * </p>
+ * important when transactional messages are involved. </p>
*
- * For non-transactional messages, it does not matter as long as it's unique per process.
- * </p>
+ * For non-transactional messages, it does not matter as long as it's unique per process. </p>
*
* See {@linktourl http://rocketmq.apache.org/docs/core-concept/} for more discussion.
*/
@@ -100,16 +92,14 @@
private int compressMsgBodyOverHowmuch = 1024 * 4;
/**
- * Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
- * </p>
+ * Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p>
*
* This may potentially cause message duplication which is up to application developers to resolve.
*/
private int retryTimesWhenSendFailed = 2;
/**
- * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.
- * </p>
+ * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
*
* This may potentially cause message duplication which is up to application developers to resolve.
*/
@@ -268,14 +258,10 @@
}
/**
- * Start this producer instance.
- * </p>
+ * Start this producer instance. </p>
*
- * <strong>
- * Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must to invoke
- * this method before sending or querying messages.
- * </strong>
- * </p>
+ * <strong> Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must
+ * to invoke this method before sending or querying messages. </strong> </p>
*
* @throws MQClientException if there is any unexpected error.
*/
@@ -316,8 +302,7 @@
}
/**
- * Send message in synchronous mode. This method returns only when the sending procedure totally completes.
- * </p>
+ * Send message in synchronous mode. This method returns only when the sending procedure totally completes. </p>
*
* <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry
* {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
@@ -359,11 +344,9 @@
}
/**
- * Send message to broker asynchronously.
- * </p>
+ * Send message to broker asynchronously. </p>
*
- * This method returns immediately. On sending completion, <code>sendCallback</code> will be executed.
- * </p>
+ * This method returns immediately. On sending completion, <code>sendCallback</code> will be executed. </p>
*
* Similar to {@link #send(Message)}, internal implementation would potentially retry up to {@link
* #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
@@ -583,6 +566,133 @@
}
/**
+ * Send request message in synchronous mode. This method returns only when the consumer consume the request message and reply a message. </p>
+ *
+ * <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry
+ * {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
+ * delivered to broker(s). It's up to the application developers to resolve potential duplication issue.
+ *
+ * @param msg request message to send
+ * @param timeout request timeout
+ * @return reply message
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws InterruptedException if the thread is interrupted.
+ * @throws RequestTimeoutException if request timeout.
+ */
+ @Override
+ public Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
+ RemotingException, MQBrokerException, InterruptedException {
+ msg.setTopic(withNamespace(msg.getTopic()));
+ return this.defaultMQProducerImpl.request(msg, timeout);
+ }
+
+ /**
+ * Request asynchronously. </p>
+ * This method returns immediately. On receiving reply message, <code>requestCallback</code> will be executed. </p>
+ *
+ * Similar to {@link #request(Message, long)}, internal implementation would potentially retry up to {@link
+ * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
+ * application developers are the one to resolve this potential issue.
+ *
+ * @param msg request message to send
+ * @param requestCallback callback to execute on request completion.
+ * @param timeout request timeout
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the thread is interrupted.
+ * @throws MQBrokerException if there is any broker error.
+ */
+ @Override
+ public void request(final Message msg, final RequestCallback requestCallback, final long timeout)
+ throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+ msg.setTopic(withNamespace(msg.getTopic()));
+ this.defaultMQProducerImpl.request(msg, requestCallback, timeout);
+ }
+
+ /**
+ * Same to {@link #request(Message, long)} with message queue selector specified.
+ *
+ * @param msg request message to send
+ * @param selector message queue selector, through which we get target message queue to deliver message to.
+ * @param arg argument to work along with message queue selector.
+ * @param timeout timeout of request.
+ * @return reply message
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws InterruptedException if the thread is interrupted.
+ * @throws RequestTimeoutException if request timeout.
+ */
+ @Override
+ public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final long timeout) throws MQClientException, RemotingException, MQBrokerException,
+ InterruptedException, RequestTimeoutException {
+ msg.setTopic(withNamespace(msg.getTopic()));
+ return this.defaultMQProducerImpl.request(msg, selector, arg, timeout);
+ }
+
+ /**
+ * Same to {@link #request(Message, RequestCallback, long)} with target message selector specified.
+ *
+ * @param msg requst message to send
+ * @param selector message queue selector, through which we get target message queue to deliver message to.
+ * @param arg argument to work along with message queue selector.
+ * @param requestCallback callback to execute on request completion.
+ * @param timeout timeout of request.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the thread is interrupted.
+ * @throws MQBrokerException if there is any broker error.
+ */
+ @Override
+ public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final RequestCallback requestCallback, final long timeout) throws MQClientException, RemotingException,
+ InterruptedException, MQBrokerException {
+ msg.setTopic(withNamespace(msg.getTopic()));
+ this.defaultMQProducerImpl.request(msg, selector, arg, requestCallback, timeout);
+ }
+
+ /**
+ * Same to {@link #request(Message, long)} with target message queue specified in addition.
+ *
+ * @param msg request message to send
+ * @param mq target message queue.
+ * @param timeout request timeout
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws MQBrokerException if there is any broker error.
+ * @throws InterruptedException if the thread is interrupted.
+ * @throws RequestTimeoutException if request timeout.
+ */
+ @Override
+ public Message request(final Message msg, final MessageQueue mq, final long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException {
+ msg.setTopic(withNamespace(msg.getTopic()));
+ return this.defaultMQProducerImpl.request(msg, mq, timeout);
+ }
+
+ /**
+ * Same to {@link #request(Message, RequestCallback, long)} with target message queue specified.
+ *
+ * @param msg request message to send
+ * @param mq target message queue.
+ * @param requestCallback callback to execute on request completion.
+ * @param timeout timeout of request.
+ * @throws MQClientException if there is any client error.
+ * @throws RemotingException if there is any network-tier error.
+ * @throws InterruptedException if the thread is interrupted.
+ * @throws MQBrokerException if there is any broker error.
+ */
+ @Override
+ public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
+ throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+ msg.setTopic(withNamespace(msg.getTopic()));
+ this.defaultMQProducerImpl.request(msg, mq, requestCallback, timeout);
+ }
+
+ /**
* Same to {@link #sendOneway(Message)} with message queue selector specified.
*
* @param msg Message to send.
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index 1af6005..c6cf4c9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -21,6 +21,7 @@
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -98,4 +99,26 @@
SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
+
+ //for rpc
+ Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
+ RemotingException, MQBrokerException, InterruptedException;
+
+ void request(final Message msg, final RequestCallback requestCallback, final long timeout)
+ throws MQClientException, RemotingException, InterruptedException, MQBrokerException;
+
+ Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException,
+ InterruptedException;
+
+ void request(final Message msg, final MessageQueueSelector selector, final Object arg,
+ final RequestCallback requestCallback,
+ final long timeout) throws MQClientException, RemotingException,
+ InterruptedException, MQBrokerException;
+
+ Message request(final Message msg, final MessageQueue mq, final long timeout)
+ throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException;
+
+ void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
+ throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java
new file mode 100644
index 0000000..3107ba5
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import org.apache.rocketmq.common.message.Message;
+
+public interface RequestCallback {
+ void onSuccess(final Message message);
+
+ void onException(final Throwable e);
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java
new file mode 100644
index 0000000..3d4caa2
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.client.common.ClientErrorCode;
+import org.apache.rocketmq.client.exception.RequestTimeoutException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.logging.InternalLogger;
+
+public class RequestFutureTable {
+ private static InternalLogger log = ClientLogger.getLog();
+ private static ConcurrentHashMap<String, RequestResponseFuture> requestFutureTable = new ConcurrentHashMap<String, RequestResponseFuture>();
+
+ public static ConcurrentHashMap<String, RequestResponseFuture> getRequestFutureTable() {
+ return requestFutureTable;
+ }
+
+ public static void scanExpiredRequest() {
+ final List<RequestResponseFuture> rfList = new LinkedList<RequestResponseFuture>();
+ Iterator<Map.Entry<String, RequestResponseFuture>> it = requestFutureTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, RequestResponseFuture> next = it.next();
+ RequestResponseFuture rep = next.getValue();
+
+ if (rep.isTimeout()) {
+ it.remove();
+ rfList.add(rep);
+ log.warn("remove timeout request, CorrelationId={}" + rep.getCorrelationId());
+ }
+ }
+
+ for (RequestResponseFuture rf : rfList) {
+ try {
+ Throwable cause = new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, "request timeout, no reply message.");
+ rf.setCause(cause);
+ rf.executeRequestCallback();
+ } catch (Throwable e) {
+ log.warn("scanResponseTable, operationComplete Exception", e);
+ }
+ }
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java
new file mode 100644
index 0000000..c54b236
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.message.Message;
+
+public class RequestResponseFuture {
+ private final String correlationId;
+ private final RequestCallback requestCallback;
+ private final long beginTimestamp = System.currentTimeMillis();
+ private final Message requestMsg = null;
+ private long timeoutMillis;
+ private CountDownLatch countDownLatch = new CountDownLatch(1);
+ private volatile Message responseMsg = null;
+ private volatile boolean sendRequestOk = true;
+ private volatile Throwable cause = null;
+
+ public RequestResponseFuture(String correlationId, long timeoutMillis, RequestCallback requestCallback) {
+ this.correlationId = correlationId;
+ this.timeoutMillis = timeoutMillis;
+ this.requestCallback = requestCallback;
+ }
+
+ public void executeRequestCallback() {
+ if (requestCallback != null) {
+ if (sendRequestOk && cause == null) {
+ requestCallback.onSuccess(responseMsg);
+ } else {
+ requestCallback.onException(cause);
+ }
+ }
+ }
+
+ public boolean isTimeout() {
+ long diff = System.currentTimeMillis() - this.beginTimestamp;
+ return diff > this.timeoutMillis;
+ }
+
+ public Message waitResponseMessage(final long timeout) throws InterruptedException {
+ this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
+ return this.responseMsg;
+ }
+
+ public void putResponseMessage(final Message responseMsg) {
+ this.responseMsg = responseMsg;
+ this.countDownLatch.countDown();
+ }
+
+ public String getCorrelationId() {
+ return correlationId;
+ }
+
+ public long getTimeoutMillis() {
+ return timeoutMillis;
+ }
+
+ public void setTimeoutMillis(long timeoutMillis) {
+ this.timeoutMillis = timeoutMillis;
+ }
+
+ public RequestCallback getRequestCallback() {
+ return requestCallback;
+ }
+
+ public long getBeginTimestamp() {
+ return beginTimestamp;
+ }
+
+ public CountDownLatch getCountDownLatch() {
+ return countDownLatch;
+ }
+
+ public void setCountDownLatch(CountDownLatch countDownLatch) {
+ this.countDownLatch = countDownLatch;
+ }
+
+ public Message getResponseMsg() {
+ return responseMsg;
+ }
+
+ public void setResponseMsg(Message responseMsg) {
+ this.responseMsg = responseMsg;
+ }
+
+ public boolean isSendRequestOk() {
+ return sendRequestOk;
+ }
+
+ public void setSendReqeustOk(boolean sendReqeustOk) {
+ this.sendRequestOk = sendReqeustOk;
+ }
+
+ public Message getRequestMsg() {
+ return requestMsg;
+ }
+
+ public Throwable getCause() {
+ return cause;
+ }
+
+ public void setCause(Throwable cause) {
+ this.cause = cause;
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java b/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java
new file mode 100644
index 0000000..416ba44
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.utils;
+
+import org.apache.rocketmq.client.common.ClientErrorCode;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+
+public class MessageUtil {
+ public static Message createReplyMessage(final Message requestMessage, final byte[] body) throws MQClientException {
+ if (requestMessage != null) {
+ Message replyMessage = new Message();
+ String cluster = requestMessage.getProperty(MessageConst.PROPERTY_CLUSTER);
+ String replyTo = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);
+ String correlationId = requestMessage.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+ String ttl = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_TTL);
+ replyMessage.setBody(body);
+ if (cluster != null) {
+ String replyTopic = MixAll.getReplyTopic(cluster);
+ replyMessage.setTopic(replyTopic);
+ MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);
+ MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_CORRELATION_ID, correlationId);
+ MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, replyTo);
+ MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TTL, ttl);
+
+ return replyMessage;
+ } else {
+ throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage error, property[" + MessageConst.PROPERTY_CLUSTER + "] is null.");
+ }
+ }
+ throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage cannot be null.");
+ }
+
+ public static String getReplyToClient(final Message msg) {
+ return msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);
+ }
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index 84af632..3f00d9e 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -42,6 +42,7 @@
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
@@ -164,7 +165,7 @@
public Object answer(InvocationOnMock mock) throws Throwable {
InvokeCallback callback = mock.getArgument(3);
RemotingCommand request = mock.getArgument(1);
- ResponseFuture responseFuture = new ResponseFuture(null,request.getOpaque(), 3 * 1000, null, null);
+ ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
responseFuture.setResponseCommand(createSuccessResponse(request));
callback.operationComplete(responseFuture);
return null;
@@ -289,6 +290,7 @@
assertThat(ex.getErrorMessage()).isEqualTo("corresponding to accessConfig has been deleted failed");
}
}
+
@Test
public void testResumeCheckHalfMessage_WithException() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
doAnswer(new Answer() {
@@ -322,6 +324,38 @@
assertThat(result).isEqualTo(true);
}
+ @Test
+ public void testSendMessageTypeofReply() throws Exception {
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock mock) throws Throwable {
+ InvokeCallback callback = mock.getArgument(3);
+ RemotingCommand request = mock.getArgument(1);
+ ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
+ responseFuture.setResponseCommand(createSuccessResponse(request));
+ callback.operationComplete(responseFuture);
+ return null;
+ }
+ }).when(remotingClient).invokeAsync(Matchers.anyString(), Matchers.any(RemotingCommand.class), Matchers.anyLong(), Matchers.any(InvokeCallback.class));
+ SendMessageContext sendMessageContext = new SendMessageContext();
+ sendMessageContext.setProducer(new DefaultMQProducerImpl(new DefaultMQProducer()));
+ msg.getProperties().put("MSG_TYPE", "reply");
+ mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), 3 * 1000, CommunicationMode.ASYNC,
+ new SendCallback() {
+ @Override
+ public void onSuccess(SendResult sendResult) {
+ assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+ assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+ assertThat(sendResult.getQueueOffset()).isEqualTo(123L);
+ assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(1);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ }
+ }, null, null, 0, sendMessageContext, defaultMQProducerImpl);
+ }
+
private RemotingCommand createResumeSuccessResponse(RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
response.setCode(ResponseCode.SUCCESS);
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyServiceTest.java
new file mode 100644
index 0000000..4cfa011
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyServiceTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.body.CMResult;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class ConsumeMessageOrderlyServiceTest {
+ private String consumerGroup;
+ private String topic = "FooBar";
+ private String brokerName = "BrokerA";
+ private DefaultMQPushConsumer pushConsumer;
+
+ @Before
+ public void init() throws Exception {
+ consumerGroup = "FooBarGroup" + System.currentTimeMillis();
+ pushConsumer = new DefaultMQPushConsumer(consumerGroup);
+ }
+
+ @Test
+ public void testConsumeMessageDirectly_WithNoException() {
+ Map<ConsumeOrderlyStatus, CMResult> map = new HashMap();
+ map.put(ConsumeOrderlyStatus.SUCCESS, CMResult.CR_SUCCESS);
+ map.put(ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT, CMResult.CR_LATER);
+ map.put(ConsumeOrderlyStatus.COMMIT, CMResult.CR_COMMIT);
+ map.put(ConsumeOrderlyStatus.ROLLBACK, CMResult.CR_ROLLBACK);
+ map.put(null, CMResult.CR_RETURN_NULL);
+
+ for (ConsumeOrderlyStatus consumeOrderlyStatus : map.keySet()) {
+ final ConsumeOrderlyStatus status = consumeOrderlyStatus;
+ MessageListenerOrderly listenerOrderly = new MessageListenerOrderly() {
+ @Override
+ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+ return status;
+ }
+ };
+
+ ConsumeMessageOrderlyService consumeMessageOrderlyService = new ConsumeMessageOrderlyService(pushConsumer.getDefaultMQPushConsumerImpl(), listenerOrderly);
+ MessageExt msg = new MessageExt();
+ msg.setTopic(topic);
+ assertTrue(consumeMessageOrderlyService.consumeMessageDirectly(msg, brokerName).getConsumeResult().equals(map.get(consumeOrderlyStatus)));
+ }
+
+ }
+
+ @Test
+ public void testConsumeMessageDirectly_WithException() {
+ MessageListenerOrderly listenerOrderly = new MessageListenerOrderly() {
+ @Override
+ public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+ throw new RuntimeException();
+ }
+ };
+
+ ConsumeMessageOrderlyService consumeMessageOrderlyService = new ConsumeMessageOrderlyService(pushConsumer.getDefaultMQPushConsumerImpl(), listenerOrderly);
+ MessageExt msg = new MessageExt();
+ msg.setTopic(topic);
+ assertTrue(consumeMessageOrderlyService.consumeMessageDirectly(msg, brokerName).getConsumeResult().equals(CMResult.CR_THROW_EXCEPTION));
+ }
+
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index 8e73ba5..818c94a 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -21,15 +21,18 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.RequestTimeoutException;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
import org.apache.rocketmq.client.impl.CommunicationMode;
@@ -45,6 +48,7 @@
import org.apache.rocketmq.common.protocol.route.QueueData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.junit.After;
import org.junit.Before;
@@ -337,6 +341,101 @@
assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
}
+ @Test
+ public void testRequestMessage() throws RemotingException, RequestTimeoutException, MQClientException, InterruptedException, MQBrokerException {
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+ final AtomicBoolean finish = new AtomicBoolean(false);
+ new Thread(new Runnable() {
+ @Override public void run() {
+ ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureTable.getRequestFutureTable();
+ assertThat(responseMap).isNotNull();
+ while (!finish.get()) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ }
+ for (Map.Entry<String, RequestResponseFuture> entry : responseMap.entrySet()) {
+ RequestResponseFuture future = entry.getValue();
+ future.putResponseMessage(message);
+ }
+ }
+ }
+ }).start();
+ Message result = producer.request(message, 3 * 1000L);
+ finish.getAndSet(true);
+ assertThat(result.getTopic()).isEqualTo("FooBar");
+ assertThat(result.getBody()).isEqualTo(new byte[] {'a'});
+ }
+
+ @Test(expected = RequestTimeoutException.class)
+ public void testRequestMessage_RequestTimeoutException() throws RemotingException, RequestTimeoutException, MQClientException, InterruptedException, MQBrokerException {
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+ Message result = producer.request(message, 3 * 1000L);
+ }
+
+ @Test
+ public void testAsyncRequest_OnSuccess() throws Exception {
+ when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ RequestCallback requestCallback = new RequestCallback() {
+ @Override public void onSuccess(Message message) {
+ assertThat(message.getTopic()).isEqualTo("FooBar");
+ assertThat(message.getBody()).isEqualTo(new byte[] {'a'});
+ assertThat(message.getFlag()).isEqualTo(1);
+ countDownLatch.countDown();
+ }
+
+ @Override public void onException(Throwable e) {
+ }
+ };
+ producer.request(message, requestCallback, 3 * 1000L);
+ ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureTable.getRequestFutureTable();
+ assertThat(responseMap).isNotNull();
+ for (Map.Entry<String, RequestResponseFuture> entry : responseMap.entrySet()) {
+ RequestResponseFuture future = entry.getValue();
+ future.setSendReqeustOk(true);
+ message.setFlag(1);
+ future.getRequestCallback().onSuccess(message);
+ }
+ countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ }
+
+ @Test
+ public void testAsyncRequest_OnException() throws Exception {
+ final AtomicInteger cc = new AtomicInteger(0);
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ RequestCallback requestCallback = new RequestCallback() {
+ @Override public void onSuccess(Message message) {
+
+ }
+
+ @Override public void onException(Throwable e) {
+ cc.incrementAndGet();
+ countDownLatch.countDown();
+ }
+ };
+ MessageQueueSelector messageQueueSelector = new MessageQueueSelector() {
+ @Override
+ public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+ return null;
+ }
+ };
+
+ try {
+ producer.request(message, requestCallback, 3 * 1000L);
+ failBecauseExceptionWasNotThrown(Exception.class);
+ } catch (Exception e) {
+ ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureTable.getRequestFutureTable();
+ assertThat(responseMap).isNotNull();
+ for (Map.Entry<String, RequestResponseFuture> entry : responseMap.entrySet()) {
+ RequestResponseFuture future = entry.getValue();
+ future.getRequestCallback().onException(e);
+ }
+ }
+ countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+ assertThat(cc.get()).isEqualTo(1);
+ }
+
public static TopicRouteData createTopicRoute() {
TopicRouteData topicRouteData = new TopicRouteData();
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/RequestResponseFutureTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/RequestResponseFutureTest.java
new file mode 100644
index 0000000..90e4623
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/RequestResponseFutureTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.message.Message;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class RequestResponseFutureTest {
+
+ @Test
+ public void testExecuteRequestCallback() throws Exception {
+ final AtomicInteger cc = new AtomicInteger(0);
+ RequestResponseFuture future = new RequestResponseFuture(UUID.randomUUID().toString(), 3 * 1000L, new RequestCallback() {
+ @Override public void onSuccess(Message message) {
+ cc.incrementAndGet();
+ }
+
+ @Override public void onException(Throwable e) {
+ }
+ });
+ future.setSendReqeustOk(true);
+ future.executeRequestCallback();
+ assertThat(cc.get()).isEqualTo(1);
+ }
+
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java b/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java
new file mode 100644
index 0000000..803e596
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+
+public class MessageUtilsTest {
+
+ @Test
+ public void testCreateReplyMessage() throws MQClientException {
+ Message msg = MessageUtil.createReplyMessage(createReplyMessage("clusterName"), new byte[] {'a'});
+ assertThat(msg.getTopic()).isEqualTo("clusterName" + "_" + MixAll.REPLY_TOPIC_POSTFIX);
+ assertThat(msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT)).isEqualTo("127.0.0.1");
+ assertThat(msg.getProperty(MessageConst.PROPERTY_MESSAGE_TTL)).isEqualTo("3000");
+ }
+
+ @Test
+ public void testCreateReplyMessage_Exception() throws MQClientException {
+ try {
+ Message msg = MessageUtil.createReplyMessage(createReplyMessage(null), new byte[] {'a'});
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageContaining("create reply message fail, requestMessage error, property[" + MessageConst.PROPERTY_CLUSTER + "] is null.");
+ }
+ }
+
+ @Test
+ public void testCreateReplyMessage_reqMsgIsNull() throws MQClientException {
+ try {
+ Message msg = MessageUtil.createReplyMessage(null, new byte[] {'a'});
+ failBecauseExceptionWasNotThrown(MQClientException.class);
+ } catch (MQClientException e) {
+ assertThat(e).hasMessageContaining("create reply message fail, requestMessage cannot be null.");
+ }
+ }
+
+ @Test
+ public void testGetReplyToClient() throws MQClientException {
+ Message msg = createReplyMessage("clusterName");
+ String replyToClient = MessageUtil.getReplyToClient(msg);
+ assertThat(replyToClient).isNotNull();
+ assertThat(replyToClient).isEqualTo("127.0.0.1");
+ }
+
+ private Message createReplyMessage(String clusterName) {
+ Message requestMessage = new Message();
+ Map map = new HashMap<String, String>();
+ map.put(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, "127.0.0.1");
+ map.put(MessageConst.PROPERTY_CLUSTER, clusterName);
+ map.put(MessageConst.PROPERTY_MESSAGE_TTL, "3000");
+ MessageAccessor.setProperties(requestMessage, map);
+ return requestMessage;
+ }
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 1c3f37d..a7568f0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -61,6 +61,7 @@
*/
private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
+ private int processReplyMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
private int adminBrokerThreadPoolNums = 16;
@@ -83,6 +84,7 @@
private boolean fetchNamesrvAddrByAddressServer = false;
private int sendThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
+ private int replyThreadPoolQueueCapacity = 10000;
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
private int consumerManagerThreadPoolQueueCapacity = 1000000;
@@ -180,6 +182,8 @@
@ImportantField
private boolean aclEnable = false;
+ private boolean storeReplyMessageEnable = true;
+
public static String localHostName() {
try {
return InetAddress.getLocalHost().getHostName();
@@ -374,6 +378,14 @@
this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;
}
+ public int getProcessReplyMessageThreadPoolNums() {
+ return processReplyMessageThreadPoolNums;
+ }
+
+ public void setProcessReplyMessageThreadPoolNums(int processReplyMessageThreadPoolNums) {
+ this.processReplyMessageThreadPoolNums = processReplyMessageThreadPoolNums;
+ }
+
public int getQueryMessageThreadPoolNums() {
return queryMessageThreadPoolNums;
}
@@ -470,6 +482,14 @@
this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
}
+ public int getReplyThreadPoolQueueCapacity() {
+ return replyThreadPoolQueueCapacity;
+ }
+
+ public void setReplyThreadPoolQueueCapacity(int replyThreadPoolQueueCapacity) {
+ this.replyThreadPoolQueueCapacity = replyThreadPoolQueueCapacity;
+ }
+
public int getQueryThreadPoolQueueCapacity() {
return queryThreadPoolQueueCapacity;
}
@@ -749,7 +769,7 @@
public void setMsgTraceTopicName(String msgTraceTopicName) {
this.msgTraceTopicName = msgTraceTopicName;
}
-
+
public boolean isTraceTopicEnable() {
return traceTopicEnable;
}
@@ -765,4 +785,12 @@
public void setAclEnable(boolean aclEnable) {
this.aclEnable = aclEnable;
}
+
+ public boolean isStoreReplyMessageEnable() {
+ return storeReplyMessageEnable;
+ }
+
+ public void setStoreReplyMessageEnable(boolean storeReplyMessageEnable) {
+ this.storeReplyMessageEnable = storeReplyMessageEnable;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index bba8b36..e9a67bb 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -45,8 +45,6 @@
import org.apache.rocketmq.logging.InternalLoggerFactory;
public class MixAll {
- private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
-
public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME";
public static final String ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir";
public static final String NAMESRV_ADDR_ENV = "NAMESRV_ADDR";
@@ -74,27 +72,26 @@
public static final String CID_ONSAPI_OWNER_GROUP = "CID_ONSAPI_OWNER";
public static final String CID_ONSAPI_PULL_GROUP = "CID_ONSAPI_PULL";
public static final String CID_RMQ_SYS_PREFIX = "CID_RMQ_SYS_";
-
public static final List<String> LOCAL_INET_ADDRESS = getLocalInetAddress();
public static final String LOCALHOST = localhost();
public static final String DEFAULT_CHARSET = "UTF-8";
public static final long MASTER_ID = 0L;
public static final long CURRENT_JVM_PID = getPID();
-
public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
-
public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";
+ public static final String REPLY_TOPIC_POSTFIX = "REPLY_TOPIC";
public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_";
public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY";
public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion";
public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType";
-
public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC";
public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC";
public static final String TRANS_CHECK_MAX_TIME_TOPIC = "TRANS_CHECK_MAX_TIME_TOPIC";
public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
+ public static final String REPLY_MESSAGE_FLAG = "reply";
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
public static String getWSAddr() {
String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
@@ -110,6 +107,10 @@
return RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
}
+ public static String getReplyTopic(final String clusterName) {
+ return clusterName + "_" + REPLY_TOPIC_POSTFIX;
+ }
+
public static boolean isSysConsumerGroup(final String consumerGroup) {
return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX);
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index 85a4cde..222f697 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -440,13 +440,11 @@
return false;
}
- public static boolean isInternalV6IP(byte[] ip) {
- if (ip.length != 16) {
- throw new RuntimeException("illegal ipv6 bytes");
- }
-
- //FEC0:0000:0000:0000:0000:0000:0000:0000/10
- if (ip[0] == (byte) 254 && ip[1] >= (byte) 192) {
+ public static boolean isInternalV6IP(InetAddress inetAddr) {
+ if (inetAddr.isAnyLocalAddress() // Wild card ipv6
+ || inetAddr.isLinkLocalAddress() // Single broadcast ipv6 address: fe80:xx:xx...
+ || inetAddr.isLoopbackAddress() //Loopback ipv6 address
+ || inetAddr.isSiteLocalAddress()) { // Site local ipv6 address: fec0:xx:xx...
return true;
}
return false;
@@ -457,9 +455,6 @@
throw new RuntimeException("illegal ipv4 bytes");
}
-// if (ip[0] == (byte)30 && ip[1] == (byte)10 && ip[2] == (byte)163 && ip[3] == (byte)120) {
-// }
-
if (ip[0] >= (byte) 1 && ip[0] <= (byte) 126) {
if (ip[1] == (byte) 1 && ip[2] == (byte) 1 && ip[3] == (byte) 1) {
return false;
@@ -550,7 +545,7 @@
byte[] ipByte = ip.getAddress();
if (ipByte.length == 16) {
if (ipV6Check(ipByte)) {
- if (!isInternalV6IP(ipByte)) {
+ if (!isInternalV6IP(ip)) {
return ipByte;
} else if (internalIP == null) {
internalIP = ipByte;
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index aa84816..5bdc846 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -45,6 +45,13 @@
public static final String PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES";
public static final String PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS";
public static final String PROPERTY_INSTANCE_ID = "INSTANCE_ID";
+ public static final String PROPERTY_CORRELATION_ID = "CORRELATION_ID";
+ public static final String PROPERTY_MESSAGE_REPLY_TO_CLIENT = "REPLY_TO_CLIENT";
+ public static final String PROPERTY_MESSAGE_TTL = "TTL";
+ public static final String PROPERTY_REPLY_MESSAGE_ARRIVE_TIME = "ARRIVE_TIME";
+ public static final String PROPERTY_PUSH_REPLY_TIME = "PUSH_REPLY_TIME";
+ public static final String PROPERTY_CLUSTER = "CLUSTER";
+ public static final String PROPERTY_MESSAGE_TYPE = "MSG_TYPE";
public static final String KEY_SEPARATOR = " ";
@@ -74,5 +81,12 @@
STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES);
STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP);
STRING_HASH_SET.add(PROPERTY_INSTANCE_ID);
+ STRING_HASH_SET.add(PROPERTY_CORRELATION_ID);
+ STRING_HASH_SET.add(PROPERTY_MESSAGE_REPLY_TO_CLIENT);
+ STRING_HASH_SET.add(PROPERTY_MESSAGE_TTL);
+ STRING_HASH_SET.add(PROPERTY_REPLY_MESSAGE_ARRIVE_TIME);
+ STRING_HASH_SET.add(PROPERTY_PUSH_REPLY_TIME);
+ STRING_HASH_SET.add(PROPERTY_CLUSTER);
+ STRING_HASH_SET.add(PROPERTY_MESSAGE_TYPE);
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index dbdabbc..b3009d7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -182,4 +182,10 @@
* resume logic of checking half messages that have been put in TRANS_CHECK_MAXTIME_TOPIC before
*/
public static final int RESUME_CHECK_HALF_MESSAGE = 323;
+
+ public static final int SEND_REPLY_MESSAGE = 324;
+
+ public static final int SEND_REPLY_MESSAGE_V2 = 325;
+
+ public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java
new file mode 100644
index 0000000..3bb0907
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class ReplyMessageRequestHeader implements CommandCustomHeader {
+ @CFNotNull
+ private String producerGroup;
+ @CFNotNull
+ private String topic;
+ @CFNotNull
+ private String defaultTopic;
+ @CFNotNull
+ private Integer defaultTopicQueueNums;
+ @CFNotNull
+ private Integer queueId;
+ @CFNotNull
+ private Integer sysFlag;
+ @CFNotNull
+ private Long bornTimestamp;
+ @CFNotNull
+ private Integer flag;
+ @CFNullable
+ private String properties;
+ @CFNullable
+ private Integer reconsumeTimes;
+ @CFNullable
+ private boolean unitMode = false;
+
+ @CFNotNull
+ private String bornHost;
+ @CFNotNull
+ private String storeHost;
+ @CFNotNull
+ private long storeTimestamp;
+
+ public void checkFields() throws RemotingCommandException {
+ }
+
+ public String getProducerGroup() {
+ return producerGroup;
+ }
+
+ public void setProducerGroup(String producerGroup) {
+ this.producerGroup = producerGroup;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public void setTopic(String topic) {
+ this.topic = topic;
+ }
+
+ public String getDefaultTopic() {
+ return defaultTopic;
+ }
+
+ public void setDefaultTopic(String defaultTopic) {
+ this.defaultTopic = defaultTopic;
+ }
+
+ public Integer getDefaultTopicQueueNums() {
+ return defaultTopicQueueNums;
+ }
+
+ public void setDefaultTopicQueueNums(Integer defaultTopicQueueNums) {
+ this.defaultTopicQueueNums = defaultTopicQueueNums;
+ }
+
+ public Integer getQueueId() {
+ return queueId;
+ }
+
+ public void setQueueId(Integer queueId) {
+ this.queueId = queueId;
+ }
+
+ public Integer getSysFlag() {
+ return sysFlag;
+ }
+
+ public void setSysFlag(Integer sysFlag) {
+ this.sysFlag = sysFlag;
+ }
+
+ public Long getBornTimestamp() {
+ return bornTimestamp;
+ }
+
+ public void setBornTimestamp(Long bornTimestamp) {
+ this.bornTimestamp = bornTimestamp;
+ }
+
+ public Integer getFlag() {
+ return flag;
+ }
+
+ public void setFlag(Integer flag) {
+ this.flag = flag;
+ }
+
+ public String getProperties() {
+ return properties;
+ }
+
+ public void setProperties(String properties) {
+ this.properties = properties;
+ }
+
+ public Integer getReconsumeTimes() {
+ return reconsumeTimes;
+ }
+
+ public void setReconsumeTimes(Integer reconsumeTimes) {
+ this.reconsumeTimes = reconsumeTimes;
+ }
+
+ public boolean isUnitMode() {
+ return unitMode;
+ }
+
+ public void setUnitMode(boolean unitMode) {
+ this.unitMode = unitMode;
+ }
+
+ public String getBornHost() {
+ return bornHost;
+ }
+
+ public void setBornHost(String bornHost) {
+ this.bornHost = bornHost;
+ }
+
+ public String getStoreHost() {
+ return storeHost;
+ }
+
+ public void setStoreHost(String storeHost) {
+ this.storeHost = storeHost;
+ }
+
+ public long getStoreTimestamp() {
+ return storeTimestamp;
+ }
+
+ public void setStoreTimestamp(long storeTimestamp) {
+ this.storeTimestamp = storeTimestamp;
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/CorrelationIdUtil.java b/common/src/main/java/org/apache/rocketmq/common/utils/CorrelationIdUtil.java
new file mode 100644
index 0000000..86d1fd4
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/CorrelationIdUtil.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.utils;
+
+import java.util.UUID;
+
+public class CorrelationIdUtil {
+ public static String createCorrelationId() {
+ return UUID.randomUUID().toString();
+ }
+}
diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
index 3991367..b854099 100644
--- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.common;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.Properties;
import org.junit.Test;
@@ -99,12 +101,12 @@
}
@Test
- public void testIPv6Check() {
- byte[] nonInternalIp = UtilAll.string2bytes("24084004018081003FAA1DDE2B3F898A");
- byte[] internalIp = UtilAll.string2bytes("FEC0000000000000000000000000FFFF");
- assertThat(UtilAll.isInternalV6IP(nonInternalIp)).isFalse();
- assertThat(UtilAll.isInternalV6IP(internalIp)).isTrue();
- assertThat(UtilAll.ipToIPv6Str(nonInternalIp).toUpperCase()).isEqualTo("2408:4004:0180:8100:3FAA:1DDE:2B3F:898A");
+ public void testIPv6Check() throws UnknownHostException {
+ InetAddress nonInternal = InetAddress.getByName("2408:4004:0180:8100:3FAA:1DDE:2B3F:898A");
+ InetAddress internal = InetAddress.getByName("FE80:0000:0000:0000:0000:0000:0000:FFFF");
+ assertThat(UtilAll.isInternalV6IP(nonInternal)).isFalse();
+ assertThat(UtilAll.isInternalV6IP(internal)).isTrue();
+ assertThat(UtilAll.ipToIPv6Str(nonInternal.getAddress()).toUpperCase()).isEqualTo("2408:4004:0180:8100:3FAA:1DDE:2B3F:898A");
}
static class DemoConfig {
diff --git a/common/src/test/java/org/apache/rocketmq/common/message/MessageClientIDSetterTest.java b/common/src/test/java/org/apache/rocketmq/common/message/MessageClientIDSetterTest.java
index 0df75eb..1ec6d93 100644
--- a/common/src/test/java/org/apache/rocketmq/common/message/MessageClientIDSetterTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/message/MessageClientIDSetterTest.java
@@ -17,9 +17,8 @@
package org.apache.rocketmq.common.message;
-import java.util.Calendar;
-import java.util.Date;
import org.junit.Test;
+
import static org.assertj.core.api.Assertions.assertThat;
public class MessageClientIDSetterTest {
diff --git a/docs/cn/README b/docs/cn/README
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/docs/cn/README
diff --git a/docs/cn/README.md b/docs/cn/README.md
index 3247985..2dbd854 100644
--- a/docs/cn/README.md
+++ b/docs/cn/README.md
@@ -1,7 +1,7 @@
Apache RocketMQ开发者指南
--------
-##### 这个开发者指南是帮忙您快速了解,并使用 Apache RocketMQ
+##### 这个开发者指南是帮助您快速了解,并使用 Apache RocketMQ
### 1. 概念和特性
diff --git a/docs/cn/RocketMQ_Example.md b/docs/cn/RocketMQ_Example.md
index d298db8..09acb0d 100644
--- a/docs/cn/RocketMQ_Example.md
+++ b/docs/cn/RocketMQ_Example.md
@@ -66,7 +66,11 @@
// 启动Producer实例
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
- for (int i = 0; i < 100; i++) {
+
+ int messageCount = 100;
+ // 根据消息数量实例化倒计时计算器
+ final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
+ for (int i = 0; i < messageCount; i++) {
final int index = i;
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest",
@@ -87,6 +91,8 @@
}
});
}
+ // 等待5s
+ countDownLatch.await(5, TimeUnit.SECONDS);
// 如果不再发送消息,关闭Producer实例。
producer.shutdown();
}
diff --git a/docs/cn/features.md b/docs/cn/features.md
index 859e0f8..e7b6386 100644
--- a/docs/cn/features.md
+++ b/docs/cn/features.md
@@ -16,7 +16,7 @@
RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。消息过滤目前是在Broker端实现的,优点是减少了对于Consumer无用消息的网络传输,缺点是增加了Broker的负担、而且实现相对复杂。
## 4 消息可靠性
RocketMQ支持消息的高可靠,影响消息可靠性的几种情况:
-1) Broker正常关闭
+1) Broker非正常关闭
2) Broker异常Crash
3) OS Crash
4) 机器掉电,但是能立即恢复供电情况
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index d431d3e..4724a1d 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -103,6 +103,10 @@
}, 10000, 10000);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
+ if (commandLine.hasOption('n')) {
+ String ns = commandLine.getOptionValue('n');
+ consumer.setNamesrvAddr(ns);
+ }
consumer.setInstanceName(Long.toString(System.currentTimeMillis()));
if (filterType == null || expression == null) {
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index d9fafdd..04707e1 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -24,6 +24,11 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
import org.apache.rocketmq.client.producer.LocalTransactionState;
@@ -33,20 +38,19 @@
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.srvutil.ServerUtil;
public class TransactionProducer {
- private static int threadCount;
- private static int messageSize;
- private static boolean ischeck;
- private static boolean ischeckffalse;
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
- threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32;
- messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2;
- ischeck = args.length >= 3 && Boolean.parseBoolean(args[2]);
- ischeckffalse = args.length >= 4 && Boolean.parseBoolean(args[3]);
+ Options options = ServerUtil.buildCommandlineOptions(new Options());
+ CommandLine commandLine = ServerUtil.parseCmdLine("TransactionProducer", args, buildCommandlineOptions(options), new PosixParser());
- final Message msg = buildMessage(messageSize);
+ final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
+ final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 32;
+ final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 2048;
+ final boolean ischeck = commandLine.hasOption('c') ? Boolean.parseBoolean(commandLine.getOptionValue('c')) : false;
+ final boolean ischeckffalse = commandLine.hasOption('r') ? Boolean.parseBoolean(commandLine.getOptionValue('r')) : true;
final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
@@ -98,6 +102,10 @@
producer.setInstanceName(Long.toString(System.currentTimeMillis()));
producer.setTransactionCheckListener(transactionCheckListener);
producer.setDefaultTopicQueueNums(1000);
+ if (commandLine.hasOption('n')) {
+ String ns = commandLine.getOptionValue('n');
+ producer.setNamesrvAddr(ns);
+ }
producer.start();
final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck);
@@ -111,7 +119,7 @@
// Thread.sleep(1000);
final long beginTimestamp = System.currentTimeMillis();
SendResult sendResult =
- producer.sendMessageInTransaction(msg, tranExecuter, null);
+ producer.sendMessageInTransaction(buildMessage(messageSize, topic), tranExecuter, null);
if (sendResult != null) {
statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
@@ -138,18 +146,45 @@
}
}
- private static Message buildMessage(final int messageSize) throws UnsupportedEncodingException {
- Message msg = new Message();
- msg.setTopic("BenchmarkTest");
+ private static Message buildMessage(final int messageSize, String topic) {
+ try {
+ Message msg = new Message();
+ msg.setTopic(topic);
- StringBuilder sb = new StringBuilder();
- for (int i = 0; i < messageSize; i += 10) {
- sb.append("hello baby");
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < messageSize; i += 10) {
+ sb.append("hello baby");
+ }
+ msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
+ return msg;
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
}
+ }
- msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
+ public static Options buildCommandlineOptions(final Options options) {
+ Option opt = new Option("w", "threadCount", true, "Thread count, Default: 32");
+ opt.setRequired(false);
+ options.addOption(opt);
- return msg;
+ opt = new Option("s", "messageSize", true, "Message Size, Default: 2048");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("t", "topic", true, "Topic name, Default: BenchmarkTest");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("c", "check", true, "Check the message, Default: false");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+ opt = new Option("r", "checkResult", true, "Message check result, Default: true");
+ opt.setRequired(false);
+ options.addOption(opt);
+
+
+ return options;
}
}
diff --git a/example/src/main/java/org/apache/rocketmq/example/rpc/AsyncRequestProducer.java b/example/src/main/java/org/apache/rocketmq/example/rpc/AsyncRequestProducer.java
new file mode 100644
index 0000000..072291d
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/rpc/AsyncRequestProducer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.rpc;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.RequestCallback;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class AsyncRequestProducer {
+ private static final InternalLogger log = ClientLogger.getLog();
+
+ public static void main(String[] args) throws MQClientException, InterruptedException {
+ String producerGroup = "please_rename_unique_group_name";
+ String topic = "RequestTopic";
+ long ttl = 3000;
+
+ DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
+ producer.start();
+
+ try {
+ Message msg = new Message(topic,
+ "",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+
+ long begin = System.currentTimeMillis();
+ producer.request(msg, new RequestCallback() {
+ @Override
+ public void onSuccess(Message message) {
+ long cost = System.currentTimeMillis() - begin;
+ System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, message);
+ }
+
+ @Override
+ public void onException(Throwable e) {
+ System.err.printf("request to <%s> fail.", topic);
+ }
+ }, ttl);
+ } catch (Exception e) {
+ log.warn("", e);
+ }
+ /* shutdown after your request callback is finished */
+// producer.shutdown();
+ }
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java b/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
new file mode 100644
index 0000000..b34908b
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.rpc;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class RequestProducer {
+ public static void main(String[] args) throws MQClientException, InterruptedException {
+ String producerGroup = "please_rename_unique_group_name";
+ String topic = "RequestTopic";
+ long ttl = 3000;
+
+ DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
+ producer.start();
+
+ try {
+ Message msg = new Message(topic,
+ "",
+ "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+
+ long begin = System.currentTimeMillis();
+ Message retMsg = producer.request(msg, ttl);
+ long cost = System.currentTimeMillis() - begin;
+ System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ producer.shutdown();
+ }
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java b/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java
new file mode 100644
index 0000000..c62c7d4
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.rpc;
+
+import java.util.List;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.utils.MessageUtil;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public class ResponseConsumer {
+ public static void main(String[] args) throws InterruptedException, MQClientException {
+ String producerGroup = "please_rename_unique_group_name";
+ String consumerGroup = "please_rename_unique_group_name";
+ String topic = "RequestTopic";
+
+ // create a producer to send reply message
+ DefaultMQProducer replyProducer = new DefaultMQProducer(producerGroup);
+ replyProducer.start();
+
+ // create consumer
+ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
+ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+
+ // recommend client configs
+ consumer.setPullTimeDelayMillsWhenException(0L);
+
+ consumer.registerMessageListener(new MessageListenerConcurrently() {
+ @Override
+ public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+ System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+ for (MessageExt msg : msgs) {
+ try {
+ System.out.printf("handle message: %s", msg.toString());
+ String replyTo = MessageUtil.getReplyToClient(msg);
+ byte[] replyContent = "reply message contents.".getBytes();
+ // create reply message with given util, do not create reply message by yourself
+ Message replyMessage = MessageUtil.createReplyMessage(msg, replyContent);
+
+ // send reply message with producer
+ SendResult replyResult = replyProducer.send(replyMessage, 3000);
+ System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());
+ } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+ }
+ });
+
+ consumer.subscribe(topic, "*");
+ consumer.start();
+ System.out.printf("Consumer Started.%n");
+ }
+}
diff --git a/pom.xml b/pom.xml
index 0161c7b..14b10fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -569,7 +569,7 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
- <version>1.2.51</version>
+ <version>1.2.61</version>
</dependency>
<dependency>
<groupId>org.javassist</groupId>
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 84fb421..3035c57 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -280,7 +280,9 @@
if (!this.requestsRead.isEmpty()) {
for (CommitLog.GroupCommitRequest req : this.requestsRead) {
boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
- for (int i = 0; !transferOK && i < 5; i++) {
+ long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
+ + HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
+ while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
this.notifyTransferObject.waitForRunning(1000);
transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
}