Merge pull request #1554 from keranbingaa/unit-test

[ISSUE #1528]modify unit test of producer
diff --git a/README.md b/README.md
index 1c17c7e..33b4280 100644
--- a/README.md
+++ b/README.md
@@ -8,10 +8,12 @@
 It offers a variety of features:
 
 * Pub/Sub messaging model
-* Scheduled message delivery
+* Financial grade transactional message
+* A variety of cross language clients, such as Java, C/C++, Python, Go
+* Pluggable transport protocols, such as TCP, SSL, AIO
+* Inbuilt message tracing capability, also support opentracing
+* Versatile big-data and streaming ecosytem integration
 * Message retroactivity by time or offset
-* Log hub for streaming
-* Big data integration
 * Reliable FIFO and strict ordered messaging in the same queue
 * Efficient pull&push consumption model
 * Million-level message accumulation capacity in a single queue
@@ -21,9 +23,7 @@
 * Various message filter mechanics such as SQL and Tag
 * Docker images for isolated testing and cloud isolated clusters
 * Feature-rich administrative dashboard for configuration, metrics and monitoring
-* Access control list
-* Message trace
-
+* Authentication and authorisation
 
 ----------
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index a885cd0..85009d6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -61,6 +61,7 @@
 import org.apache.rocketmq.broker.processor.EndTransactionProcessor;
 import org.apache.rocketmq.broker.processor.PullMessageProcessor;
 import org.apache.rocketmq.broker.processor.QueryMessageProcessor;
+import org.apache.rocketmq.broker.processor.ReplyMessageProcessor;
 import org.apache.rocketmq.broker.processor.SendMessageProcessor;
 import org.apache.rocketmq.broker.slave.SlaveSynchronize;
 import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
@@ -132,6 +133,7 @@
     private final SlaveSynchronize slaveSynchronize;
     private final BlockingQueue<Runnable> sendThreadPoolQueue;
     private final BlockingQueue<Runnable> pullThreadPoolQueue;
+    private final BlockingQueue<Runnable> replyThreadPoolQueue;
     private final BlockingQueue<Runnable> queryThreadPoolQueue;
     private final BlockingQueue<Runnable> clientManagerThreadPoolQueue;
     private final BlockingQueue<Runnable> heartbeatThreadPoolQueue;
@@ -147,6 +149,7 @@
     private TopicConfigManager topicConfigManager;
     private ExecutorService sendMessageExecutor;
     private ExecutorService pullMessageExecutor;
+    private ExecutorService replyMessageExecutor;
     private ExecutorService queryMessageExecutor;
     private ExecutorService adminBrokerExecutor;
     private ExecutorService clientManageExecutor;
@@ -194,6 +197,7 @@
 
         this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
         this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
+        this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
         this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
         this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
         this.consumerManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getConsumerManagerThreadPoolQueueCapacity());
@@ -277,6 +281,14 @@
                 this.pullThreadPoolQueue,
                 new ThreadFactoryImpl("PullMessageThread_"));
 
+            this.replyMessageExecutor = new BrokerFixedThreadPoolExecutor(
+                this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
+                this.brokerConfig.getProcessReplyMessageThreadPoolNums(),
+                1000 * 60,
+                TimeUnit.MILLISECONDS,
+                this.replyThreadPoolQueue,
+                new ThreadFactoryImpl("ProcessReplyMessageThread_"));
+
             this.queryMessageExecutor = new BrokerFixedThreadPoolExecutor(
                 this.brokerConfig.getQueryMessageThreadPoolNums(),
                 this.brokerConfig.getQueryMessageThreadPoolNums(),
@@ -554,6 +566,17 @@
         this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);
 
         /**
+         * ReplyMessageProcessor
+         */
+        ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
+        replyMessageProcessor.registerSendMessageHook(sendMessageHookList);
+
+        this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE, replyMessageProcessor, replyMessageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.SEND_REPLY_MESSAGE_V2, replyMessageProcessor, replyMessageExecutor);
+
+        /**
          * QueryMessageProcessor
          */
         NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);
@@ -763,6 +786,10 @@
             this.pullMessageExecutor.shutdown();
         }
 
+        if (this.replyMessageExecutor != null) {
+            this.replyMessageExecutor.shutdown();
+        }
+
         if (this.adminBrokerExecutor != null) {
             this.adminBrokerExecutor.shutdown();
         }
@@ -857,12 +884,9 @@
         if (!messageStoreConfig.isEnableDLegerCommitLog()) {
             startProcessorByHa(messageStoreConfig.getBrokerRole());
             handleSlaveSynchronize(messageStoreConfig.getBrokerRole());
+            this.registerBrokerAll(true, false, true);
         }
 
-
-
-        this.registerBrokerAll(true, false, true);
-
         this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
 
             @Override
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
index 4b986c0..960b848 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerStartup.java
@@ -178,6 +178,10 @@
                     break;
             }
 
+            if (messageStoreConfig.isEnableDLegerCommitLog()) {
+                brokerConfig.setBrokerId(-1);
+            }
+
             messageStoreConfig.setHaListenPort(nettyServerConfig.getListenPort() + 1);
             LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
             JoranConfigurator configurator = new JoranConfigurator();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index 61ceae5..12f632b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -17,17 +17,16 @@
 package org.apache.rocketmq.broker.client;
 
 import io.netty.channel.Channel;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-
 import org.apache.rocketmq.broker.util.PositiveAtomicCounter;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.logging.InternalLogger;
@@ -43,7 +42,9 @@
     private final Lock groupChannelLock = new ReentrantLock();
     private final HashMap<String /* group name */, HashMap<Channel, ClientChannelInfo>> groupChannelTable =
         new HashMap<String, HashMap<Channel, ClientChannelInfo>>();
+    private final ConcurrentHashMap<String, Channel> clientChannelTable = new ConcurrentHashMap<>();
     private PositiveAtomicCounter positiveAtomicCounter = new PositiveAtomicCounter();
+
     public ProducerManager() {
     }
 
@@ -53,7 +54,15 @@
         try {
             if (this.groupChannelLock.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                 try {
-                    newGroupChannelTable.putAll(groupChannelTable);
+                    Iterator<Map.Entry<String, HashMap<Channel, ClientChannelInfo>>> iter = groupChannelTable.entrySet().iterator();
+                    while (iter.hasNext()) {
+                        Map.Entry<String, HashMap<Channel, ClientChannelInfo>> entry = iter.next();
+                        String key = entry.getKey();
+                        HashMap<Channel, ClientChannelInfo> val = entry.getValue();
+                        HashMap<Channel, ClientChannelInfo> tmp = new HashMap<Channel, ClientChannelInfo>();
+                        tmp.putAll(val);
+                        newGroupChannelTable.put(key, tmp);
+                    }
                 } finally {
                     groupChannelLock.unlock();
                 }
@@ -82,6 +91,7 @@
                             long diff = System.currentTimeMillis() - info.getLastUpdateTimestamp();
                             if (diff > CHANNEL_EXPIRED_TIMEOUT) {
                                 it.remove();
+                                clientChannelTable.remove(info.getClientId());
                                 log.warn(
                                     "SCAN: remove expired channel[{}] from ProducerManager groupChannelTable, producer group name: {}",
                                     RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
@@ -113,6 +123,7 @@
                             final ClientChannelInfo clientChannelInfo =
                                 clientChannelInfoTable.remove(channel);
                             if (clientChannelInfo != null) {
+                                clientChannelTable.remove(clientChannelInfo.getClientId());
                                 log.info(
                                     "NETTY EVENT: remove channel[{}][{}] from ProducerManager groupChannelTable, producer group: {}",
                                     clientChannelInfo.toString(), remoteAddr, group);
@@ -146,6 +157,7 @@
                     clientChannelInfoFound = channelTable.get(clientChannelInfo.getChannel());
                     if (null == clientChannelInfoFound) {
                         channelTable.put(clientChannelInfo.getChannel(), clientChannelInfo);
+                        clientChannelTable.put(clientChannelInfo.getClientId(), clientChannelInfo.getChannel());
                         log.info("new producer connected, group: {} channel: {}", group,
                             clientChannelInfo.toString());
                     }
@@ -171,6 +183,7 @@
                     HashMap<Channel, ClientChannelInfo> channelTable = this.groupChannelTable.get(group);
                     if (null != channelTable && !channelTable.isEmpty()) {
                         ClientChannelInfo old = channelTable.remove(clientChannelInfo.getChannel());
+                        clientChannelTable.remove(clientChannelInfo.getClientId());
                         if (old != null) {
                             log.info("unregister a producer[{}] from groupChannelTable {}", group,
                                 clientChannelInfo.toString());
@@ -223,4 +236,8 @@
         }
         return null;
     }
+
+    public Channel findChannel(String clientId) {
+        return clientChannelTable.get(clientId);
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
index c9e85ed..1d5943d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -55,7 +55,7 @@
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         final EndTransactionRequestHeader requestHeader =
             (EndTransactionRequestHeader)request.decodeCommandCustomHeader(EndTransactionRequestHeader.class);
-        LOGGER.info("Transaction request:{}", requestHeader);
+        LOGGER.debug("Transaction request:{}", requestHeader);
         if (BrokerRole.SLAVE == brokerController.getMessageStoreConfig().getBrokerRole()) {
             response.setCode(ResponseCode.SLAVE_NOT_AVAILABLE);
             LOGGER.warn("Message store is slave mode, so end transaction is forbidden. ");
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
new file mode 100644
index 0000000..565857a
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeaderV2;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
+
+public class ReplyMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
+
+    public ReplyMessageProcessor(final BrokerController brokerController) {
+        super(brokerController);
+    }
+
+    @Override
+    public RemotingCommand processRequest(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+        SendMessageContext mqtraceContext = null;
+        SendMessageRequestHeader requestHeader = parseRequestHeader(request);
+        if (requestHeader == null) {
+            return null;
+        }
+
+        mqtraceContext = buildMsgContext(ctx, requestHeader);
+        this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
+
+        RemotingCommand response = this.processReplyMessageRequest(ctx, request, mqtraceContext, requestHeader);
+
+        this.executeSendMessageHookAfter(response, mqtraceContext);
+        return response;
+    }
+
+    @Override
+    protected SendMessageRequestHeader parseRequestHeader(RemotingCommand request) throws RemotingCommandException {
+        SendMessageRequestHeaderV2 requestHeaderV2 = null;
+        SendMessageRequestHeader requestHeader = null;
+        switch (request.getCode()) {
+            case RequestCode.SEND_REPLY_MESSAGE_V2:
+                requestHeaderV2 =
+                    (SendMessageRequestHeaderV2) request
+                        .decodeCommandCustomHeader(SendMessageRequestHeaderV2.class);
+            case RequestCode.SEND_REPLY_MESSAGE:
+                if (null == requestHeaderV2) {
+                    requestHeader =
+                        (SendMessageRequestHeader) request
+                            .decodeCommandCustomHeader(SendMessageRequestHeader.class);
+                } else {
+                    requestHeader = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV1(requestHeaderV2);
+                }
+            default:
+                break;
+        }
+        return requestHeader;
+    }
+
+    private RemotingCommand processReplyMessageRequest(final ChannelHandlerContext ctx,
+        final RemotingCommand request,
+        final SendMessageContext sendMessageContext,
+        final SendMessageRequestHeader requestHeader) {
+        final RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader) response.readCustomHeader();
+
+        response.setOpaque(request.getOpaque());
+
+        response.addExtField(MessageConst.PROPERTY_MSG_REGION, this.brokerController.getBrokerConfig().getRegionId());
+        response.addExtField(MessageConst.PROPERTY_TRACE_SWITCH, String.valueOf(this.brokerController.getBrokerConfig().isTraceOn()));
+
+        log.debug("receive SendReplyMessage request command, {}", request);
+        final long startTimstamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
+        if (this.brokerController.getMessageStore().now() < startTimstamp) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimstamp)));
+            return response;
+        }
+
+        response.setCode(-1);
+        super.msgCheck(ctx, requestHeader, response);
+        if (response.getCode() != -1) {
+            return response;
+        }
+
+        final byte[] body = request.getBody();
+
+        int queueIdInt = requestHeader.getQueueId();
+        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
+
+        if (queueIdInt < 0) {
+            queueIdInt = Math.abs(this.random.nextInt() % 99999999) % topicConfig.getWriteQueueNums();
+        }
+
+        MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
+        msgInner.setTopic(requestHeader.getTopic());
+        msgInner.setQueueId(queueIdInt);
+        msgInner.setBody(body);
+        msgInner.setFlag(requestHeader.getFlag());
+        MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
+        msgInner.setPropertiesString(requestHeader.getProperties());
+        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
+        msgInner.setBornHost(ctx.channel().remoteAddress());
+        msgInner.setStoreHost(this.getStoreHost());
+        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
+
+        PushReplyResult pushReplyResult = this.pushReplyMessage(ctx, requestHeader, msgInner);
+        this.handlePushReplyResult(pushReplyResult, response, responseHeader, queueIdInt);
+
+        if (this.brokerController.getBrokerConfig().isStoreReplyMessageEnable()) {
+            PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
+            this.handlePutMessageResult(putMessageResult, request, msgInner, responseHeader, sendMessageContext, queueIdInt);
+        }
+
+        return response;
+    }
+
+    private PushReplyResult pushReplyMessage(final ChannelHandlerContext ctx,
+        final SendMessageRequestHeader requestHeader,
+        final Message msg) {
+        ReplyMessageRequestHeader replyMessageRequestHeader = new ReplyMessageRequestHeader();
+        replyMessageRequestHeader.setBornHost(ctx.channel().remoteAddress().toString());
+        replyMessageRequestHeader.setStoreHost(this.getStoreHost().toString());
+        replyMessageRequestHeader.setStoreTimestamp(System.currentTimeMillis());
+        replyMessageRequestHeader.setProducerGroup(requestHeader.getProducerGroup());
+        replyMessageRequestHeader.setTopic(requestHeader.getTopic());
+        replyMessageRequestHeader.setDefaultTopic(requestHeader.getDefaultTopic());
+        replyMessageRequestHeader.setDefaultTopicQueueNums(requestHeader.getDefaultTopicQueueNums());
+        replyMessageRequestHeader.setQueueId(requestHeader.getQueueId());
+        replyMessageRequestHeader.setSysFlag(requestHeader.getSysFlag());
+        replyMessageRequestHeader.setBornTimestamp(requestHeader.getBornTimestamp());
+        replyMessageRequestHeader.setFlag(requestHeader.getFlag());
+        replyMessageRequestHeader.setProperties(requestHeader.getProperties());
+        replyMessageRequestHeader.setReconsumeTimes(requestHeader.getReconsumeTimes());
+        replyMessageRequestHeader.setUnitMode(requestHeader.isUnitMode());
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, replyMessageRequestHeader);
+        request.setBody(msg.getBody());
+
+        String senderId = msg.getProperties().get(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);
+        PushReplyResult pushReplyResult = new PushReplyResult(false);
+
+        if (senderId != null) {
+            Channel channel = this.brokerController.getProducerManager().findChannel(senderId);
+            if (channel != null) {
+                msg.getProperties().put(MessageConst.PROPERTY_PUSH_REPLY_TIME, String.valueOf(System.currentTimeMillis()));
+                replyMessageRequestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
+
+                try {
+                    RemotingCommand pushResponse = this.brokerController.getBroker2Client().callClient(channel, request);
+                    assert pushResponse != null;
+                    switch (pushResponse.getCode()) {
+                        case ResponseCode.SUCCESS: {
+                            pushReplyResult.setPushOk(true);
+                            break;
+                        }
+                        default: {
+                            pushReplyResult.setPushOk(false);
+                            pushReplyResult.setRemark("push reply message to " + senderId + "fail.");
+                            log.warn("push reply message to <{}> return fail, response remark: {}", senderId, pushResponse.getRemark());
+                        }
+                    }
+                } catch (RemotingException | InterruptedException e) {
+                    pushReplyResult.setPushOk(false);
+                    pushReplyResult.setRemark("push reply message to " + senderId + "fail.");
+                    log.warn("push reply message to <{}> fail. {}", senderId, channel, e);
+                }
+            } else {
+                pushReplyResult.setPushOk(false);
+                pushReplyResult.setRemark("push reply message fail, channel of <" + senderId + "> not found.");
+                log.warn(pushReplyResult.getRemark());
+            }
+        } else {
+            log.warn(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT + " is null, can not reply message");
+            pushReplyResult.setPushOk(false);
+            pushReplyResult.setRemark("reply message properties[" + MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT + "] is null");
+        }
+        return pushReplyResult;
+    }
+
+    private void handlePushReplyResult(PushReplyResult pushReplyResult, final RemotingCommand response,
+        final SendMessageResponseHeader responseHeader, int queueIdInt) {
+
+        if (!pushReplyResult.isPushOk()) {
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark(pushReplyResult.getRemark());
+        } else {
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+            //set to zore to avoid client decoding exception
+            responseHeader.setMsgId("0");
+            responseHeader.setQueueId(queueIdInt);
+            responseHeader.setQueueOffset(0L);
+        }
+    }
+
+    private void handlePutMessageResult(PutMessageResult putMessageResult,
+        final RemotingCommand request, final MessageExt msg,
+        final SendMessageResponseHeader responseHeader, SendMessageContext sendMessageContext,
+        int queueIdInt) {
+        if (putMessageResult == null) {
+            log.warn("process reply message, store putMessage return null");
+            return;
+        }
+        boolean putOk = false;
+
+        switch (putMessageResult.getPutMessageStatus()) {
+            // Success
+            case PUT_OK:
+            case FLUSH_DISK_TIMEOUT:
+            case FLUSH_SLAVE_TIMEOUT:
+            case SLAVE_NOT_AVAILABLE:
+                putOk = true;
+                break;
+
+            // Failed
+            case CREATE_MAPEDFILE_FAILED:
+                log.info("create mapped file failed, server is busy or broken.");
+                break;
+            case MESSAGE_ILLEGAL:
+                log.info(
+                    "the message is illegal, maybe msg properties length limit 32k.");
+                break;
+            case PROPERTIES_SIZE_EXCEEDED:
+                log.info(
+                    "the message is illegal, maybe msg body or properties length not matched. msg body length limit 128k.");
+                break;
+            case SERVICE_NOT_AVAILABLE:
+                log.info(
+                    "service not available now, maybe disk full, maybe your broker machine memory too small.");
+                break;
+            case OS_PAGECACHE_BUSY:
+                log.info("[PC_SYNCHRONIZED]broker busy, start flow control for a while");
+                break;
+            case UNKNOWN_ERROR:
+                log.info("UNKNOWN_ERROR");
+                break;
+            default:
+                log.info("UNKNOWN_ERROR DEFAULT");
+                break;
+        }
+
+        String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);
+        if (putOk) {
+            this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
+            this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
+                putMessageResult.getAppendMessageResult().getWroteBytes());
+            this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
+
+            responseHeader.setMsgId(putMessageResult.getAppendMessageResult().getMsgId());
+            responseHeader.setQueueId(queueIdInt);
+            responseHeader.setQueueOffset(putMessageResult.getAppendMessageResult().getLogicsOffset());
+
+            if (hasSendMessageHook()) {
+                sendMessageContext.setMsgId(responseHeader.getMsgId());
+                sendMessageContext.setQueueId(responseHeader.getQueueId());
+                sendMessageContext.setQueueOffset(responseHeader.getQueueOffset());
+
+                int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();
+                int wroteSize = putMessageResult.getAppendMessageResult().getWroteBytes();
+                int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT) * commercialBaseCount;
+
+                sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_SUCCESS);
+                sendMessageContext.setCommercialSendTimes(incValue);
+                sendMessageContext.setCommercialSendSize(wroteSize);
+                sendMessageContext.setCommercialOwner(owner);
+            }
+        } else {
+            if (hasSendMessageHook()) {
+                int wroteSize = request.getBody().length;
+                int incValue = (int) Math.ceil(wroteSize / BrokerStatsManager.SIZE_PER_COUNT);
+
+                sendMessageContext.setCommercialSendStats(BrokerStatsManager.StatsType.SEND_FAILURE);
+                sendMessageContext.setCommercialSendTimes(incValue);
+                sendMessageContext.setCommercialSendSize(wroteSize);
+                sendMessageContext.setCommercialOwner(owner);
+            }
+        }
+    }
+
+    class PushReplyResult {
+        boolean pushOk;
+        String remark;
+
+        public PushReplyResult(boolean pushOk) {
+            this.pushOk = pushOk;
+            remark = "";
+        }
+
+        public boolean isPushOk() {
+            return pushOk;
+        }
+
+        public void setPushOk(boolean pushOk) {
+            this.pushOk = pushOk;
+        }
+
+        public String getRemark() {
+            return remark;
+        }
+
+        public void setRemark(String remark) {
+            this.remark = remark;
+        }
+    }
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 8035ae6..2589a75 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -343,11 +343,13 @@
         msgInner.setBody(body);
         msgInner.setFlag(requestHeader.getFlag());
         MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
-        msgInner.setPropertiesString(requestHeader.getProperties());
         msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
         msgInner.setBornHost(ctx.channel().remoteAddress());
         msgInner.setStoreHost(this.getStoreHost());
         msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
+        String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
+        MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
+        msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
         PutMessageResult putMessageResult = null;
         Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
         String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
@@ -536,6 +538,8 @@
         messageExtBatch.setBornHost(ctx.channel().remoteAddress());
         messageExtBatch.setStoreHost(this.getStoreHost());
         messageExtBatch.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
+        String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
+        MessageAccessor.putProperty(messageExtBatch, MessageConst.PROPERTY_CLUSTER, clusterName);
 
         PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessages(messageExtBatch);
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
index 8f215cd..cb29011 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/topic/TopicConfigManager.java
@@ -34,11 +34,11 @@
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.constant.PermName;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.common.protocol.body.KVTable;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.sysflag.TopicSysFlag;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 
 public class TopicConfigManager extends ConfigManager {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@ -134,6 +134,14 @@
                 this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
             }
         }
+        {
+            String topic = this.brokerController.getBrokerConfig().getBrokerClusterName() + "_" + MixAll.REPLY_TOPIC_POSTFIX;
+            TopicConfig topicConfig = new TopicConfig(topic);
+            this.systemTopicList.add(topic);
+            topicConfig.setReadQueueNums(1);
+            topicConfig.setWriteQueueNums(1);
+            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
+        }
     }
 
     public boolean isSystemTopic(final String topic) {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
index 62507cd..35d8112 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/AbstractTransactionalMessageCheckListener.java
@@ -30,6 +30,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
 import java.util.concurrent.TimeUnit;
 
 public abstract class AbstractTransactionalMessageCheckListener {
@@ -48,7 +49,7 @@
             thread.setName("Transaction-msg-check-thread");
             return thread;
         }
-    });
+    }, new CallerRunsPolicy());
 
     public AbstractTransactionalMessageCheckListener() {
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
index 84a6276..67f7a5f 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridge.java
@@ -131,6 +131,9 @@
                     this.brokerController.getBrokerStatsManager().incGroupGetSize(group, topic,
                         getMessageResult.getBufferTotalSize());
                     this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());
+                    if (foundList == null || foundList.size() == 0) {
+                        break;
+                    }
                     this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId,
                         this.brokerController.getMessageStore().now() - foundList.get(foundList.size() - 1)
                             .getStoreTimestamp());
@@ -141,6 +144,7 @@
                         getMessageResult.getStatus(), topic, group, offset);
                     break;
                 case NO_MESSAGE_IN_QUEUE:
+                case OFFSET_OVERFLOW_ONE:
                     pullStatus = PullStatus.NO_NEW_MSG;
                     LOGGER.warn("No new message. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}",
                         getMessageResult.getStatus(), topic, group, offset);
@@ -149,7 +153,6 @@
                 case NO_MATCHED_LOGIC_QUEUE:
                 case OFFSET_FOUND_NULL:
                 case OFFSET_OVERFLOW_BADLY:
-                case OFFSET_OVERFLOW_ONE:
                 case OFFSET_TOO_SMALL:
                     pullStatus = PullStatus.OFFSET_ILLEGAL;
                     LOGGER.warn("Offset illegal. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}",
@@ -175,8 +178,10 @@
         try {
             List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList();
             for (ByteBuffer bb : messageBufferList) {
-                MessageExt msgExt = MessageDecoder.decode(bb);
-                foundList.add(msgExt);
+                MessageExt msgExt = MessageDecoder.decode(bb, true, false);
+                if (msgExt != null) {
+                    foundList.add(msgExt);
+                }
             }
 
         } finally {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
index e1549b1..71b575e 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageServiceImpl.java
@@ -159,7 +159,8 @@
                     }
                     if (removeMap.containsKey(i)) {
                         log.info("Half offset {} has been committed/rolled back", i);
-                        removeMap.remove(i);
+                        Long removedOpOffset = removeMap.remove(i);
+                        doneOpOffset.add(removedOpOffset);
                     } else {
                         GetResult getResult = getHalfMsg(messageQueue, i);
                         MessageExt msgExt = getResult.getMsg();
@@ -223,7 +224,7 @@
                             listener.resolveHalfMsg(msgExt);
                         } else {
                             pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
-                            log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
+                            log.debug("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
                                 messageQueue, pullResult);
                             continue;
                         }
@@ -292,7 +293,7 @@
         }
         for (MessageExt opMessageExt : opMsg) {
             Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
-            log.info("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(),
+            log.debug("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(),
                 opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset);
             if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
                 if (queueOffset < miniOffset) {
@@ -460,7 +461,7 @@
     @Override
     public boolean deletePrepareMessage(MessageExt msgExt) {
         if (this.transactionalMessageBridge.putOpMessage(msgExt, TransactionalMessageUtil.REMOVETAG)) {
-            log.info("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
+            log.debug("Transaction op message write successfully. messageId={}, queueId={} msgExt:{}", msgExt.getMsgId(), msgExt.getQueueId(), msgExt);
             return true;
         } else {
             log.error("Transaction op message write failed. messageId is {}, queueId is {}", msgExt.getMsgId(), msgExt.getQueueId());
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
index 08dbb9c..d9539b6 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ProducerManagerTest.java
@@ -20,6 +20,7 @@
 import io.netty.channel.ChannelFuture;
 import java.lang.reflect.Field;
 import java.util.HashMap;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -42,14 +43,14 @@
     @Before
     public void init() {
         producerManager = new ProducerManager();
-        clientInfo = new ClientChannelInfo(channel);
+        clientInfo = new ClientChannelInfo(channel, "clientId", LanguageCode.JAVA, 0);
     }
 
     @Test
     public void scanNotActiveChannel() throws Exception {
         producerManager.registerProducer(group, clientInfo);
         assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
-
+        assertThat(producerManager.findChannel("clientId")).isNotNull();
         Field field = ProducerManager.class.getDeclaredField("CHANNEL_EXPIRED_TIMEOUT");
         field.setAccessible(true);
         long CHANNEL_EXPIRED_TIMEOUT = field.getLong(producerManager);
@@ -57,22 +58,28 @@
         when(channel.close()).thenReturn(mock(ChannelFuture.class));
         producerManager.scanNotActiveChannel();
         assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull();
+        assertThat(producerManager.findChannel("clientId")).isNull();
     }
 
     @Test
     public void doChannelCloseEvent() throws Exception {
         producerManager.registerProducer(group, clientInfo);
         assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNotNull();
+        assertThat(producerManager.findChannel("clientId")).isNotNull();
         producerManager.doChannelCloseEvent("127.0.0.1", channel);
         assertThat(producerManager.getGroupChannelTable().get(group).get(channel)).isNull();
+        assertThat(producerManager.findChannel("clientId")).isNull();
     }
 
     @Test
     public void testRegisterProducer() throws Exception {
         producerManager.registerProducer(group, clientInfo);
         HashMap<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
+        Channel channel1 = producerManager.findChannel("clientId");
         assertThat(channelMap).isNotNull();
+        assertThat(channel1).isNotNull();
         assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
+        assertThat(channel1).isEqualTo(channel);
     }
 
     @Test
@@ -81,10 +88,23 @@
         HashMap<Channel, ClientChannelInfo> channelMap = producerManager.getGroupChannelTable().get(group);
         assertThat(channelMap).isNotNull();
         assertThat(channelMap.get(channel)).isEqualTo(clientInfo);
-
+        Channel channel1 = producerManager.findChannel("clientId");
+        assertThat(channel1).isNotNull();
+        assertThat(channel1).isEqualTo(channel);
         producerManager.unregisterProducer(group, clientInfo);
         channelMap = producerManager.getGroupChannelTable().get(group);
+        channel1 = producerManager.findChannel("clientId");
         assertThat(channelMap).isNull();
+        assertThat(channel1).isNull();
+
     }
 
+    @Test
+    public void testGetGroupChannelTable() throws Exception {
+        producerManager.registerProducer(group, clientInfo);
+        HashMap<Channel, ClientChannelInfo> oldMap = producerManager.getGroupChannelTable().get(group);
+        
+        producerManager.unregisterProducer(group, clientInfo);
+        assertThat(oldMap.size()).isNotEqualTo(0);
+    }
 }
\ No newline at end of file
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
new file mode 100644
index 0000000..85c7750
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessorTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.client.net.Broker2Client;
+import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.AppendMessageResult;
+import org.apache.rocketmq.store.AppendMessageStatus;
+import org.apache.rocketmq.store.MessageExtBrokerInner;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ReplyMessageProcessorTest {
+    private ReplyMessageProcessor replyMessageProcessor;
+    @Spy
+    private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
+    @Mock
+    private ChannelHandlerContext handlerContext;
+    @Mock
+    private MessageStore messageStore;
+    @Mock
+    private Channel channel;
+
+    private String topic = "FooBar";
+    private String group = "FooBarGroup";
+    private ClientChannelInfo clientInfo;
+    @Mock
+    private Broker2Client broker2Client;
+
+    @Before
+    public void init() throws IllegalAccessException, NoSuchFieldException {
+        clientInfo = new ClientChannelInfo(channel, "127.0.0.1", LanguageCode.JAVA, 0);
+        brokerController.setMessageStore(messageStore);
+        Field field = BrokerController.class.getDeclaredField("broker2Client");
+        field.setAccessible(true);
+        field.set(brokerController, broker2Client);
+        when(messageStore.now()).thenReturn(System.currentTimeMillis());
+        Channel mockChannel = mock(Channel.class);
+        when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
+        when(handlerContext.channel()).thenReturn(mockChannel);
+        replyMessageProcessor = new ReplyMessageProcessor(brokerController);
+    }
+
+    @Test
+    public void testProcessRequest_Success() throws RemotingCommandException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException {
+        when(messageStore.putMessage(any(MessageExtBrokerInner.class))).thenReturn(new PutMessageResult(PutMessageStatus.PUT_OK, new AppendMessageResult(AppendMessageStatus.PUT_OK)));
+        brokerController.getProducerManager().registerProducer(group, clientInfo);
+        final RemotingCommand request = createSendMessageRequestHeaderCommand(RequestCode.SEND_REPLY_MESSAGE);
+        when(brokerController.getBroker2Client().callClient(any(Channel.class), any(RemotingCommand.class))).thenReturn(createResponse(ResponseCode.SUCCESS, request));
+        RemotingCommand responseToReturn = replyMessageProcessor.processRequest(handlerContext, request);
+        assertThat(responseToReturn.getCode()).isEqualTo(ResponseCode.SUCCESS);
+        assertThat(responseToReturn.getOpaque()).isEqualTo(request.getOpaque());
+    }
+
+    private RemotingCommand createSendMessageRequestHeaderCommand(int requestCode) {
+        SendMessageRequestHeader requestHeader = createSendMessageRequestHeader();
+        RemotingCommand request = RemotingCommand.createRequestCommand(requestCode, requestHeader);
+        request.setBody(new byte[] {'a'});
+        request.makeCustomHeaderToNet();
+        return request;
+    }
+
+    private SendMessageRequestHeader createSendMessageRequestHeader() {
+        SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
+        requestHeader.setProducerGroup(group);
+        requestHeader.setTopic(topic);
+        requestHeader.setDefaultTopic(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC);
+        requestHeader.setDefaultTopicQueueNums(3);
+        requestHeader.setQueueId(1);
+        requestHeader.setSysFlag(0);
+        requestHeader.setBornTimestamp(System.currentTimeMillis());
+        requestHeader.setFlag(124);
+        requestHeader.setReconsumeTimes(0);
+        Map<String, String> map = new HashMap<String, String>();
+        map.put(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, "127.0.0.1");
+        requestHeader.setProperties(MessageDecoder.messageProperties2String(map));
+        return requestHeader;
+    }
+
+    private RemotingCommand createResponse(int code, RemotingCommand request) {
+        RemotingCommand response = RemotingCommand.createResponseCommand(SendMessageResponseHeader.class);
+        response.setCode(code);
+        response.setOpaque(request.getOpaque());
+        return response;
+    }
+}
\ No newline at end of file
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java
index b1c669c..ebe8872 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/transaction/queue/TransactionalMessageBridgeTest.java
@@ -167,6 +167,24 @@
         assertThat(messageExt).isNotNull();
     }
 
+    @Test
+    public void testGetHalfMessageStatusFound() {
+        when(messageStore
+                .getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class)))
+                .thenReturn(createGetMessageResult(GetMessageStatus.FOUND));
+        PullResult result = transactionBridge.getHalfMessage(0, 0, 1);
+        assertThat(result.getPullStatus()).isEqualTo(PullStatus.FOUND);
+    }
+
+    @Test
+    public void testGetHalfMessageNull() {
+        when(messageStore
+                .getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), ArgumentMatchers.nullable(MessageFilter.class)))
+                .thenReturn(null);
+        PullResult result = transactionBridge.getHalfMessage(0, 0, 1);
+        assertThat(result).isNull();
+    }
+
     private GetMessageResult createGetMessageResult(GetMessageStatus status) {
         GetMessageResult getMessageResult = new GetMessageResult();
         getMessageResult.setStatus(status);
diff --git a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
index c3e4efa..d0ae5e1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
+++ b/client/src/main/java/org/apache/rocketmq/client/ClientConfig.java
@@ -53,6 +53,7 @@
      * Offset persistent interval for consumer
      */
     private int persistConsumerOffsetInterval = 1000 * 5;
+    private long pullTimeDelayMillsWhenException = 1000;
     private boolean unitMode = false;
     private String unitName;
     private boolean vipChannelEnabled = Boolean.parseBoolean(System.getProperty(SEND_MESSAGE_WITH_VIP_CHANNEL_PROPERTY, "false"));
@@ -148,6 +149,7 @@
         this.pollNameServerInterval = cc.pollNameServerInterval;
         this.heartbeatBrokerInterval = cc.heartbeatBrokerInterval;
         this.persistConsumerOffsetInterval = cc.persistConsumerOffsetInterval;
+        this.pullTimeDelayMillsWhenException = cc.pullTimeDelayMillsWhenException;
         this.unitMode = cc.unitMode;
         this.unitName = cc.unitName;
         this.vipChannelEnabled = cc.vipChannelEnabled;
@@ -165,6 +167,7 @@
         cc.pollNameServerInterval = pollNameServerInterval;
         cc.heartbeatBrokerInterval = heartbeatBrokerInterval;
         cc.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
+        cc.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
         cc.unitMode = unitMode;
         cc.unitName = unitName;
         cc.vipChannelEnabled = vipChannelEnabled;
@@ -222,6 +225,14 @@
         this.persistConsumerOffsetInterval = persistConsumerOffsetInterval;
     }
 
+    public long getPullTimeDelayMillsWhenException() {
+        return pullTimeDelayMillsWhenException;
+    }
+
+    public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
+        this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
+    }
+
     public String getUnitName() {
         return unitName;
     }
@@ -287,12 +298,13 @@
         this.accessChannel = accessChannel;
     }
 
+
     @Override
     public String toString() {
         return "ClientConfig [namesrvAddr=" + namesrvAddr + ", clientIP=" + clientIP + ", instanceName=" + instanceName
             + ", clientCallbackExecutorThreads=" + clientCallbackExecutorThreads + ", pollNameServerInterval=" + pollNameServerInterval
-            + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval="
-            + persistConsumerOffsetInterval + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
+            + ", heartbeatBrokerInterval=" + heartbeatBrokerInterval + ", persistConsumerOffsetInterval=" + persistConsumerOffsetInterval
+            + ", pullTimeDelayMillsWhenException=" + pullTimeDelayMillsWhenException + ", unitMode=" + unitMode + ", unitName=" + unitName + ", vipChannelEnabled="
             + vipChannelEnabled + ", useTLS=" + useTLS + ", language=" + language.name() + ", namespace=" + namespace + "]";
     }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
index 62a95df..bc03b14 100644
--- a/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
+++ b/client/src/main/java/org/apache/rocketmq/client/common/ClientErrorCode.java
@@ -23,4 +23,6 @@
     public static final int BROKER_NOT_EXIST_EXCEPTION = 10003;
     public static final int NO_NAME_SERVER_EXCEPTION = 10004;
     public static final int NOT_FOUND_TOPIC_EXCEPTION = 10005;
+    public static final int REQUEST_TIMEOUT_EXCEPTION = 10006;
+    public static final int CREATE_REPLY_MESSAGE_EXCEPTION = 10007;
 }
\ No newline at end of file
diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java b/client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java
new file mode 100644
index 0000000..2d756ec
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/exception/RequestTimeoutException.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.exception;
+
+import org.apache.rocketmq.common.UtilAll;
+
+public class RequestTimeoutException extends Exception {
+    private static final long serialVersionUID = -5758410930844185841L;
+    private int responseCode;
+    private String errorMessage;
+
+    public RequestTimeoutException(String errorMessage, Throwable cause) {
+        super(errorMessage, cause);
+        this.responseCode = -1;
+        this.errorMessage = errorMessage;
+    }
+
+    public RequestTimeoutException(int responseCode, String errorMessage) {
+        super("CODE: " + UtilAll.responseCode2String(responseCode) + "  DESC: "
+            + errorMessage);
+        this.responseCode = responseCode;
+        this.errorMessage = errorMessage;
+    }
+
+    public int getResponseCode() {
+        return responseCode;
+    }
+
+    public RequestTimeoutException setResponseCode(final int responseCode) {
+        this.responseCode = responseCode;
+        return this;
+    }
+
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+
+    public void setErrorMessage(final String errorMessage) {
+        this.errorMessage = errorMessage;
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
index 0bd810a..5861bc4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/ClientRemotingProcessor.java
@@ -16,16 +16,19 @@
  */
 package org.apache.rocketmq.client.impl;
 
+import io.netty.channel.ChannelHandlerContext;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
-
-import io.netty.channel.ChannelHandlerContext;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.impl.producer.MQProducerInner;
 import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.RequestFutureTable;
+import org.apache.rocketmq.client.producer.RequestResponseFuture;
 import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageAccessor;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
@@ -42,14 +45,16 @@
 import org.apache.rocketmq.common.protocol.header.GetConsumerRunningInfoRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetConsumerStatusRequestHeader;
 import org.apache.rocketmq.common.protocol.header.NotifyConsumerIdsChangedRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ReplyMessageRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ResetOffsetRequestHeader;
+import org.apache.rocketmq.common.sysflag.MessageSysFlag;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.remoting.common.RemotingUtil;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
 import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
-
 public class ClientRemotingProcessor implements NettyRequestProcessor {
     private final InternalLogger log = ClientLogger.getLog();
     private final MQClientInstance mqClientFactory;
@@ -76,6 +81,9 @@
 
             case RequestCode.CONSUME_MESSAGE_DIRECTLY:
                 return this.consumeMessageDirectly(ctx, request);
+
+            case RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT:
+                return this.receiveReplyMessage(ctx, request);
             default:
                 break;
         }
@@ -213,4 +221,73 @@
 
         return response;
     }
+
+    private RemotingCommand receiveReplyMessage(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
+
+        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
+        long receiveTime = System.currentTimeMillis();
+        ReplyMessageRequestHeader requestHeader = (ReplyMessageRequestHeader) request.decodeCommandCustomHeader(ReplyMessageRequestHeader.class);
+
+        try {
+            MessageExt msg = new MessageExt();
+            msg.setTopic(requestHeader.getTopic());
+            msg.setQueueId(requestHeader.getQueueId());
+            msg.setStoreTimestamp(requestHeader.getStoreTimestamp());
+
+            if (requestHeader.getBornHost() != null) {
+                msg.setBornHost(RemotingUtil.string2SocketAddress(requestHeader.getBornHost()));
+            }
+
+            if (requestHeader.getStoreHost() != null) {
+                msg.setStoreHost(RemotingUtil.string2SocketAddress(requestHeader.getStoreHost()));
+            }
+
+            byte[] body = request.getBody();
+            if ((requestHeader.getSysFlag() & MessageSysFlag.COMPRESSED_FLAG) == MessageSysFlag.COMPRESSED_FLAG) {
+                try {
+                    body = UtilAll.uncompress(body);
+                } catch (IOException e) {
+                    log.warn("err when uncompress constant", e);
+                }
+            }
+            msg.setBody(body);
+            msg.setFlag(requestHeader.getFlag());
+            MessageAccessor.setProperties(msg, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
+            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REPLY_MESSAGE_ARRIVE_TIME, String.valueOf(receiveTime));
+            msg.setBornTimestamp(requestHeader.getBornTimestamp());
+            msg.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
+            log.debug("receive reply message :{}", msg);
+
+            processReplyMessage(msg);
+
+            response.setCode(ResponseCode.SUCCESS);
+            response.setRemark(null);
+        } catch (Exception e) {
+            log.warn("unknown err when receiveReplyMsg", e);
+            response.setCode(ResponseCode.SYSTEM_ERROR);
+            response.setRemark("process reply message fail");
+        }
+        return response;
+    }
+
+    private void processReplyMessage(MessageExt replyMsg) {
+        final String correlationId = replyMsg.getUserProperty(MessageConst.PROPERTY_CORRELATION_ID);
+        final RequestResponseFuture requestResponseFuture = RequestFutureTable.getRequestFutureTable().get(correlationId);
+        if (requestResponseFuture != null) {
+            requestResponseFuture.putResponseMessage(replyMsg);
+
+            RequestFutureTable.getRequestFutureTable().remove(correlationId);
+
+            if (requestResponseFuture.getRequestCallback() != null) {
+                requestResponseFuture.getRequestCallback().onSuccess(replyMsg);
+            } else {
+                requestResponseFuture.putResponseMessage(replyMsg);
+            }
+        } else {
+            String bornHost = replyMsg.getBornHostString();
+            log.warn(String.format("receive reply message, but not matched any request, CorrelationId: %s , reply from host: %s",
+                correlationId, bornHost));
+        }
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index b9ace0f..1ad5fbf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -27,7 +27,6 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.consumer.PullCallback;
@@ -201,6 +200,8 @@
         this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
 
         this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
+
+        this.remotingClient.registerProcessor(RequestCode.PUSH_REPLY_MESSAGE_TO_CLIENT, this.clientRemotingProcessor, null);
     }
 
     public List<String> getNameServerAddressList() {
@@ -304,8 +305,8 @@
         requestHeader.setDefaultGroupPerm(plainAccessConfig.getDefaultGroupPerm());
         requestHeader.setDefaultTopicPerm(plainAccessConfig.getDefaultTopicPerm());
         requestHeader.setWhiteRemoteAddress(plainAccessConfig.getWhiteRemoteAddress());
-        requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(),","));
-        requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(),","));
+        requestHeader.setTopicPerms(UtilAll.List2String(plainAccessConfig.getTopicPerms(), ","));
+        requestHeader.setGroupPerms(UtilAll.List2String(plainAccessConfig.getGroupPerms(), ","));
 
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_ACL_CONFIG, requestHeader);
 
@@ -344,7 +345,7 @@
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-    public void updateGlobalWhiteAddrsConfig(final String addr, final String globalWhiteAddrs,final long timeoutMillis)
+    public void updateGlobalWhiteAddrsConfig(final String addr, final String globalWhiteAddrs, final long timeoutMillis)
         throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
 
         UpdateGlobalWhiteAddrsConfigRequestHeader requestHeader = new UpdateGlobalWhiteAddrsConfigRequestHeader();
@@ -366,7 +367,8 @@
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-    public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
+    public ClusterAclVersionInfo getBrokerClusterAclInfo(final String addr,
+        final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
         RemotingSendRequestException, RemotingConnectException, MQBrokerException {
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CLUSTER_ACL_INFO, null);
 
@@ -389,7 +391,7 @@
         }
 
         throw new MQBrokerException(response.getCode(), response.getRemark());
-        
+
     }
 
     public AclConfig getBrokerClusterConfig(final String addr, final long timeoutMillis) throws RemotingCommandException, InterruptedException, RemotingTimeoutException,
@@ -445,13 +447,23 @@
     ) throws RemotingException, MQBrokerException, InterruptedException {
         long beginStartTime = System.currentTimeMillis();
         RemotingCommand request = null;
-        if (sendSmartMsg || msg instanceof MessageBatch) {
-            SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
-            request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
+        String msgType = msg.getProperty(MessageConst.PROPERTY_MESSAGE_TYPE);
+        boolean isReply = msgType != null && msgType.equals(MixAll.REPLY_MESSAGE_FLAG);
+        if (isReply) {
+            if (sendSmartMsg) {
+                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
+                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE_V2, requestHeaderV2);
+            } else {
+                request = RemotingCommand.createRequestCommand(RequestCode.SEND_REPLY_MESSAGE, requestHeader);
+            }
         } else {
-            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
+            if (sendSmartMsg || msg instanceof MessageBatch) {
+                SendMessageRequestHeaderV2 requestHeaderV2 = SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
+                request = RemotingCommand.createRequestCommand(msg instanceof MessageBatch ? RequestCode.SEND_BATCH_MESSAGE : RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
+            } else {
+                request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
+            }
         }
-
         request.setBody(msg.getBody());
 
         switch (communicationMode) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index a37c3a0..2c673a1 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -106,7 +106,7 @@
     /**
      * Delay some time when exception occur
      */
-    private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 1000;
+    private long pullTimeDelayMillsWhenException = 1000;
     /**
      * Flow control interval
      */
@@ -156,6 +156,7 @@
                 return new Thread(r, "MonitorMessageQueueChangeThread");
             }
         });
+        this.pullTimeDelayMillsWhenException = defaultLitePullConsumer.getPullTimeDelayMillsWhenException();
     }
 
     private void checkServiceState() {
@@ -783,7 +784,7 @@
                     }
                     updatePullOffset(messageQueue, pullResult.getNextBeginOffset());
                 } catch (Throwable e) {
-                    pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION;
+                    pullDelayTimeMills = pullTimeDelayMillsWhenException;
                     log.error("An error occurred in pull message process.", e);
                 }
 
@@ -1070,4 +1071,8 @@
         }
 
     }
+
+    public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
+        this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index d1b5de1..807e9c6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -83,7 +83,7 @@
     /**
      * Delay some time when exception occur
      */
-    private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
+    private long pullTimeDelayMillsWhenException = 3000;
     /**
      * Flow control interval
      */
@@ -115,6 +115,7 @@
     public DefaultMQPushConsumerImpl(DefaultMQPushConsumer defaultMQPushConsumer, RPCHook rpcHook) {
         this.defaultMQPushConsumer = defaultMQPushConsumer;
         this.rpcHook = rpcHook;
+        this.pullTimeDelayMillsWhenException = defaultMQPushConsumer.getPullTimeDelayMillsWhenException();
     }
 
     public void registerFilterMessageHook(final FilterMessageHook hook) {
@@ -222,7 +223,7 @@
             this.makeSureStateOK();
         } catch (MQClientException e) {
             log.warn("pullMessage exception, consumer state not ok", e);
-            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
             return;
         }
 
@@ -282,7 +283,7 @@
                     pullRequest.setNextOffset(offset);
                 }
             } else {
-                this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+                this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
                 log.info("pull message later because not locked in broker, {}", pullRequest);
                 return;
             }
@@ -290,7 +291,7 @@
 
         final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
         if (null == subscriptionData) {
-            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
             log.warn("find the consumer's subscription failed, {}", pullRequest);
             return;
         }
@@ -397,7 +398,7 @@
                     log.warn("execute the pull request exception", e);
                 }
 
-                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
             }
         };
 
@@ -444,7 +445,7 @@
             );
         } catch (Exception e) {
             log.error("pullKernelImpl exception", e);
-            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
+            this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
         }
     }
 
@@ -1168,4 +1169,8 @@
         this.consumeMessageService = consumeMessageService;
 
     }
+
+    public void setPullTimeDelayMillsWhenException(long pullTimeDelayMillsWhenException) {
+        this.pullTimeDelayMillsWhenException = pullTimeDelayMillsWhenException;
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 95696d9..fca50cc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@ -24,6 +24,8 @@
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -39,6 +41,7 @@
 import org.apache.rocketmq.client.common.ClientErrorCode;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.RequestTimeoutException;
 import org.apache.rocketmq.client.hook.CheckForbiddenContext;
 import org.apache.rocketmq.client.hook.CheckForbiddenHook;
 import org.apache.rocketmq.client.hook.SendMessageContext;
@@ -52,6 +55,9 @@
 import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
 import org.apache.rocketmq.client.producer.LocalTransactionState;
 import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.RequestCallback;
+import org.apache.rocketmq.client.producer.RequestFutureTable;
+import org.apache.rocketmq.client.producer.RequestResponseFuture;
 import org.apache.rocketmq.client.producer.SendCallback;
 import org.apache.rocketmq.client.producer.SendResult;
 import org.apache.rocketmq.client.producer.SendStatus;
@@ -79,6 +85,7 @@
 import org.apache.rocketmq.common.protocol.header.EndTransactionRequestHeader;
 import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
 import org.apache.rocketmq.common.sysflag.MessageSysFlag;
+import org.apache.rocketmq.common.utils.CorrelationIdUtil;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -95,17 +102,16 @@
         new ConcurrentHashMap<String, TopicPublishInfo>();
     private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
     private final RPCHook rpcHook;
+    private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
+    private final ExecutorService defaultAsyncSenderExecutor;
+    private final Timer timer = new Timer("RequestHouseKeepingService", true);
     protected BlockingQueue<Runnable> checkRequestQueue;
     protected ExecutorService checkExecutor;
     private ServiceState serviceState = ServiceState.CREATE_JUST;
     private MQClientInstance mQClientFactory;
     private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();
     private int zipCompressLevel = Integer.parseInt(System.getProperty(MixAll.MESSAGE_COMPRESS_LEVEL, "5"));
-
     private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
-
-    private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
-    private final ExecutorService defaultAsyncSenderExecutor;
     private ExecutorService asyncSenderExecutor;
 
     public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer) {
@@ -212,6 +218,17 @@
         }
 
         this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
+
+        this.timer.scheduleAtFixedRate(new TimerTask() {
+            @Override
+            public void run() {
+                try {
+                    RequestFutureTable.scanExpiredRequest();
+                } catch (Throwable e) {
+                    log.error("scan RequestFutureTable exception", e);
+                }
+            }
+        }, 1000 * 3, 1000);
     }
 
     private void checkConfig() throws MQClientException {
@@ -1188,6 +1205,12 @@
         if (null == localTransactionExecuter && null == transactionListener) {
             throw new MQClientException("tranExecutor is null", null);
         }
+
+        // ignore DelayTimeLevel parameter
+        if (msg.getDelayTimeLevel() != 0) {
+            MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+        }
+
         Validators.checkMessage(msg, this.defaultMQProducer);
 
         SendResult sendResult = null;
@@ -1319,6 +1342,233 @@
         return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
     }
 
+    public Message request(Message msg,
+        long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException {
+        long beginTimestamp = System.currentTimeMillis();
+        prepareSendRequest(msg, timeout);
+        final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+
+        try {
+            final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
+            RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
+
+            long cost = System.currentTimeMillis() - beginTimestamp;
+            this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
+                @Override
+                public void onSuccess(SendResult sendResult) {
+                    requestResponseFuture.setSendReqeustOk(true);
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    requestResponseFuture.setSendReqeustOk(false);
+                    requestResponseFuture.putResponseMessage(null);
+                    requestResponseFuture.setCause(e);
+                }
+            }, timeout - cost);
+
+            Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
+            if (responseMessage == null) {
+                if (requestResponseFuture.isSendRequestOk()) {
+                    throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
+                        "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
+                } else {
+                    throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
+                }
+            }
+            return responseMessage;
+        } finally {
+            RequestFutureTable.getRequestFutureTable().remove(correlationId);
+        }
+    }
+
+    public void request(Message msg, final RequestCallback requestCallback, long timeout)
+        throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
+        long beginTimestamp = System.currentTimeMillis();
+        prepareSendRequest(msg, timeout);
+        final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+
+        final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
+        RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
+
+        long cost = System.currentTimeMillis() - beginTimestamp;
+        this.sendDefaultImpl(msg, CommunicationMode.ASYNC, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                requestResponseFuture.setSendReqeustOk(true);
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                requestResponseFuture.setCause(e);
+                requestFail(correlationId);
+            }
+        }, timeout - cost);
+    }
+
+    public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
+        final long timeout) throws MQClientException, RemotingException, MQBrokerException,
+        InterruptedException, RequestTimeoutException {
+        long beginTimestamp = System.currentTimeMillis();
+        prepareSendRequest(msg, timeout);
+        final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+
+        try {
+            final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
+            RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
+
+            long cost = System.currentTimeMillis() - beginTimestamp;
+            this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
+                @Override
+                public void onSuccess(SendResult sendResult) {
+                    requestResponseFuture.setSendReqeustOk(true);
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    requestResponseFuture.setSendReqeustOk(false);
+                    requestResponseFuture.putResponseMessage(null);
+                    requestResponseFuture.setCause(e);
+                }
+            }, timeout - cost);
+
+            Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
+            if (responseMessage == null) {
+                if (requestResponseFuture.isSendRequestOk()) {
+                    throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
+                        "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
+                } else {
+                    throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
+                }
+            }
+            return responseMessage;
+        } finally {
+            RequestFutureTable.getRequestFutureTable().remove(correlationId);
+        }
+    }
+
+    public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
+        final RequestCallback requestCallback, final long timeout)
+        throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
+        long beginTimestamp = System.currentTimeMillis();
+        prepareSendRequest(msg, timeout);
+        final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+
+        final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
+        RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
+
+        long cost = System.currentTimeMillis() - beginTimestamp;
+        this.sendSelectImpl(msg, selector, arg, CommunicationMode.ASYNC, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                requestResponseFuture.setSendReqeustOk(true);
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                requestResponseFuture.setCause(e);
+                requestFail(correlationId);
+            }
+        }, timeout - cost);
+
+    }
+
+    public Message request(final Message msg, final MessageQueue mq, final long timeout)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException {
+        long beginTimestamp = System.currentTimeMillis();
+        prepareSendRequest(msg, timeout);
+        final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+
+        try {
+            final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, null);
+            RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
+
+            long cost = System.currentTimeMillis() - beginTimestamp;
+            this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
+                @Override
+                public void onSuccess(SendResult sendResult) {
+                    requestResponseFuture.setSendReqeustOk(true);
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    requestResponseFuture.setSendReqeustOk(false);
+                    requestResponseFuture.putResponseMessage(null);
+                    requestResponseFuture.setCause(e);
+                }
+            }, null, timeout - cost);
+
+            Message responseMessage = requestResponseFuture.waitResponseMessage(timeout - cost);
+            if (responseMessage == null) {
+                if (requestResponseFuture.isSendRequestOk()) {
+                    throw new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION,
+                        "send request message to <" + msg.getTopic() + "> OK, but wait reply message timeout, " + timeout + " ms.");
+                } else {
+                    throw new MQClientException("send request message to <" + msg.getTopic() + "> fail", requestResponseFuture.getCause());
+                }
+            }
+            return responseMessage;
+        } finally {
+            RequestFutureTable.getRequestFutureTable().remove(correlationId);
+        }
+    }
+
+    public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
+        throws RemotingException, InterruptedException, MQClientException, MQBrokerException {
+        long beginTimestamp = System.currentTimeMillis();
+        prepareSendRequest(msg, timeout);
+        final String correlationId = msg.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+
+        final RequestResponseFuture requestResponseFuture = new RequestResponseFuture(correlationId, timeout, requestCallback);
+        RequestFutureTable.getRequestFutureTable().put(correlationId, requestResponseFuture);
+
+        long cost = System.currentTimeMillis() - beginTimestamp;
+        this.sendKernelImpl(msg, mq, CommunicationMode.ASYNC, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                requestResponseFuture.setSendReqeustOk(true);
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                requestResponseFuture.setCause(e);
+                requestFail(correlationId);
+            }
+        }, null, timeout - cost);
+    }
+
+    private void requestFail(final String correlationId) {
+        RequestResponseFuture responseFuture = RequestFutureTable.getRequestFutureTable().remove(correlationId);
+        if (responseFuture != null) {
+            responseFuture.setSendReqeustOk(false);
+            responseFuture.putResponseMessage(null);
+            try {
+                responseFuture.executeRequestCallback();
+            } catch (Exception e) {
+                log.warn("execute requestCallback in requestFail, and callback throw", e);
+            }
+        }
+    }
+
+    private void prepareSendRequest(final Message msg, long timeout) {
+        String correlationId = CorrelationIdUtil.createCorrelationId();
+        String requestClientId = this.getmQClientFactory().getClientId();
+        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_CORRELATION_ID, correlationId);
+        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, requestClientId);
+        MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MESSAGE_TTL, String.valueOf(timeout));
+
+        boolean hasRouteData = this.getmQClientFactory().getTopicRouteTable().containsKey(msg.getTopic());
+        if (!hasRouteData) {
+            long beginTimestamp = System.currentTimeMillis();
+            this.tryToFindTopicPublishInfo(msg.getTopic());
+            this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
+            long cost = System.currentTimeMillis() - beginTimestamp;
+            if (cost > 500) {
+                log.warn("prepare send request for <{}> cost {} ms", msg.getTopic(), cost);
+            }
+        }
+    }
+
     public ConcurrentMap<String, TopicPublishInfo> getTopicPublishInfoTable() {
         return topicPublishInfoTable;
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index b4acf8f..faa79f5 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -24,6 +24,7 @@
 import org.apache.rocketmq.client.Validators;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.RequestTimeoutException;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
@@ -42,38 +43,29 @@
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
 /**
- * This class is the entry point for applications intending to send messages.
- * </p>
+ * This class is the entry point for applications intending to send messages. </p>
  *
  * It's fine to tune fields which exposes getter/setter methods, but keep in mind, all of them should work well out of
- * box for most scenarios.
- * </p>
+ * box for most scenarios. </p>
  *
  * This class aggregates various <code>send</code> methods to deliver messages to brokers. Each of them has pros and
- * cons; you'd better understand strengths and weakness of them before actually coding.
- * </p>
+ * cons; you'd better understand strengths and weakness of them before actually coding. </p>
  *
- * <p>
- * <strong>Thread Safety:</strong> After configuring and starting process, this class can be regarded as thread-safe
- * and used among multiple threads context.
- * </p>
+ * <p> <strong>Thread Safety:</strong> After configuring and starting process, this class can be regarded as thread-safe
+ * and used among multiple threads context. </p>
  */
 public class DefaultMQProducer extends ClientConfig implements MQProducer {
 
-    private final InternalLogger log = ClientLogger.getLog();
-
     /**
      * Wrapping internal implementations for virtually all methods presented in this class.
      */
     protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
-
+    private final InternalLogger log = ClientLogger.getLog();
     /**
      * Producer group conceptually aggregates all producer instances of exactly same role, which is particularly
-     * important when transactional messages are involved.
-     * </p>
+     * important when transactional messages are involved. </p>
      *
-     * For non-transactional messages, it does not matter as long as it's unique per process.
-     * </p>
+     * For non-transactional messages, it does not matter as long as it's unique per process. </p>
      *
      * See {@linktourl http://rocketmq.apache.org/docs/core-concept/} for more discussion.
      */
@@ -100,16 +92,14 @@
     private int compressMsgBodyOverHowmuch = 1024 * 4;
 
     /**
-     * Maximum number of retry to perform internally before claiming sending failure in synchronous mode.
-     * </p>
+     * Maximum number of retry to perform internally before claiming sending failure in synchronous mode. </p>
      *
      * This may potentially cause message duplication which is up to application developers to resolve.
      */
     private int retryTimesWhenSendFailed = 2;
 
     /**
-     * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode.
-     * </p>
+     * Maximum number of retry to perform internally before claiming sending failure in asynchronous mode. </p>
      *
      * This may potentially cause message duplication which is up to application developers to resolve.
      */
@@ -268,14 +258,10 @@
     }
 
     /**
-     * Start this producer instance.
-     * </p>
+     * Start this producer instance. </p>
      *
-     * <strong>
-     * Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must to invoke
-     * this method before sending or querying messages.
-     * </strong>
-     * </p>
+     * <strong> Much internal initializing procedures are carried out to make this instance prepared, thus, it's a must
+     * to invoke this method before sending or querying messages. </strong> </p>
      *
      * @throws MQClientException if there is any unexpected error.
      */
@@ -316,8 +302,7 @@
     }
 
     /**
-     * Send message in synchronous mode. This method returns only when the sending procedure totally completes.
-     * </p>
+     * Send message in synchronous mode. This method returns only when the sending procedure totally completes. </p>
      *
      * <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry
      * {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
@@ -359,11 +344,9 @@
     }
 
     /**
-     * Send message to broker asynchronously.
-     * </p>
+     * Send message to broker asynchronously. </p>
      *
-     * This method returns immediately. On sending completion, <code>sendCallback</code> will be executed.
-     * </p>
+     * This method returns immediately. On sending completion, <code>sendCallback</code> will be executed. </p>
      *
      * Similar to {@link #send(Message)}, internal implementation would potentially retry up to {@link
      * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
@@ -583,6 +566,133 @@
     }
 
     /**
+     * Send request message in synchronous mode. This method returns only when the consumer consume the request message and reply a message. </p>
+     *
+     * <strong>Warn:</strong> this method has internal retry-mechanism, that is, internal implementation will retry
+     * {@link #retryTimesWhenSendFailed} times before claiming failure. As a result, multiple messages may potentially
+     * delivered to broker(s). It's up to the application developers to resolve potential duplication issue.
+     *
+     * @param msg request message to send
+     * @param timeout request timeout
+     * @return reply message
+     * @throws MQClientException if there is any client error.
+     * @throws RemotingException if there is any network-tier error.
+     * @throws MQBrokerException if there is any broker error.
+     * @throws InterruptedException if the thread is interrupted.
+     * @throws RequestTimeoutException if request timeout.
+     */
+    @Override
+    public Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
+        RemotingException, MQBrokerException, InterruptedException {
+        msg.setTopic(withNamespace(msg.getTopic()));
+        return this.defaultMQProducerImpl.request(msg, timeout);
+    }
+
+    /**
+     * Request asynchronously. </p>
+     * This method returns immediately. On receiving reply message, <code>requestCallback</code> will be executed. </p>
+     *
+     * Similar to {@link #request(Message, long)}, internal implementation would potentially retry up to {@link
+     * #retryTimesWhenSendAsyncFailed} times before claiming sending failure, which may yield message duplication and
+     * application developers are the one to resolve this potential issue.
+     *
+     * @param msg request message to send
+     * @param requestCallback callback to execute on request completion.
+     * @param timeout request timeout
+     * @throws MQClientException if there is any client error.
+     * @throws RemotingException if there is any network-tier error.
+     * @throws InterruptedException if the thread is interrupted.
+     * @throws MQBrokerException if there is any broker error.
+     */
+    @Override
+    public void request(final Message msg, final RequestCallback requestCallback, final long timeout)
+        throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+        msg.setTopic(withNamespace(msg.getTopic()));
+        this.defaultMQProducerImpl.request(msg, requestCallback, timeout);
+    }
+
+    /**
+     * Same to {@link #request(Message, long)}  with message queue selector specified.
+     *
+     * @param msg request message to send
+     * @param selector message queue selector, through which we get target message queue to deliver message to.
+     * @param arg argument to work along with message queue selector.
+     * @param timeout timeout of request.
+     * @return reply message
+     * @throws MQClientException if there is any client error.
+     * @throws RemotingException if there is any network-tier error.
+     * @throws MQBrokerException if there is any broker error.
+     * @throws InterruptedException if the thread is interrupted.
+     * @throws RequestTimeoutException if request timeout.
+     */
+    @Override
+    public Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
+        final long timeout) throws MQClientException, RemotingException, MQBrokerException,
+        InterruptedException, RequestTimeoutException {
+        msg.setTopic(withNamespace(msg.getTopic()));
+        return this.defaultMQProducerImpl.request(msg, selector, arg, timeout);
+    }
+
+    /**
+     * Same to {@link #request(Message, RequestCallback, long)} with target message selector specified.
+     *
+     * @param msg requst message to send
+     * @param selector message queue selector, through which we get target message queue to deliver message to.
+     * @param arg argument to work along with message queue selector.
+     * @param requestCallback callback to execute on request completion.
+     * @param timeout timeout of request.
+     * @throws MQClientException if there is any client error.
+     * @throws RemotingException if there is any network-tier error.
+     * @throws InterruptedException if the thread is interrupted.
+     * @throws MQBrokerException if there is any broker error.
+     */
+    @Override
+    public void request(final Message msg, final MessageQueueSelector selector, final Object arg,
+        final RequestCallback requestCallback, final long timeout) throws MQClientException, RemotingException,
+        InterruptedException, MQBrokerException {
+        msg.setTopic(withNamespace(msg.getTopic()));
+        this.defaultMQProducerImpl.request(msg, selector, arg, requestCallback, timeout);
+    }
+
+    /**
+     * Same to {@link #request(Message, long)}  with target message queue specified in addition.
+     *
+     * @param msg request message to send
+     * @param mq target message queue.
+     * @param timeout request timeout
+     * @throws MQClientException if there is any client error.
+     * @throws RemotingException if there is any network-tier error.
+     * @throws MQBrokerException if there is any broker error.
+     * @throws InterruptedException if the thread is interrupted.
+     * @throws RequestTimeoutException if request timeout.
+     */
+    @Override
+    public Message request(final Message msg, final MessageQueue mq, final long timeout)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException, RequestTimeoutException {
+        msg.setTopic(withNamespace(msg.getTopic()));
+        return this.defaultMQProducerImpl.request(msg, mq, timeout);
+    }
+
+    /**
+     * Same to {@link #request(Message, RequestCallback, long)} with target message queue specified.
+     *
+     * @param msg request message to send
+     * @param mq target message queue.
+     * @param requestCallback callback to execute on request completion.
+     * @param timeout timeout of request.
+     * @throws MQClientException if there is any client error.
+     * @throws RemotingException if there is any network-tier error.
+     * @throws InterruptedException if the thread is interrupted.
+     * @throws MQBrokerException if there is any broker error.
+     */
+    @Override
+    public void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
+        throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
+        msg.setTopic(withNamespace(msg.getTopic()));
+        this.defaultMQProducerImpl.request(msg, mq, requestCallback, timeout);
+    }
+
+    /**
      * Same to {@link #sendOneway(Message)} with message queue selector specified.
      *
      * @param msg Message to send.
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
index 1af6005..c6cf4c9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/MQProducer.java
@@ -21,6 +21,7 @@
 import org.apache.rocketmq.client.MQAdmin;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.RequestTimeoutException;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -98,4 +99,26 @@
 
     SendResult send(final Collection<Message> msgs, final MessageQueue mq, final long timeout)
         throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
+
+    //for rpc
+    Message request(final Message msg, final long timeout) throws RequestTimeoutException, MQClientException,
+        RemotingException, MQBrokerException, InterruptedException;
+
+    void request(final Message msg, final RequestCallback requestCallback, final long timeout)
+        throws MQClientException, RemotingException, InterruptedException, MQBrokerException;
+
+    Message request(final Message msg, final MessageQueueSelector selector, final Object arg,
+        final long timeout) throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException,
+        InterruptedException;
+
+    void request(final Message msg, final MessageQueueSelector selector, final Object arg,
+        final RequestCallback requestCallback,
+        final long timeout) throws MQClientException, RemotingException,
+        InterruptedException, MQBrokerException;
+
+    Message request(final Message msg, final MessageQueue mq, final long timeout)
+        throws RequestTimeoutException, MQClientException, RemotingException, MQBrokerException, InterruptedException;
+
+    void request(final Message msg, final MessageQueue mq, final RequestCallback requestCallback, long timeout)
+        throws MQClientException, RemotingException, MQBrokerException, InterruptedException;
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java
new file mode 100644
index 0000000..3107ba5
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestCallback.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import org.apache.rocketmq.common.message.Message;
+
+public interface RequestCallback {
+    void onSuccess(final Message message);
+
+    void onException(final Throwable e);
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java
new file mode 100644
index 0000000..3d4caa2
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestFutureTable.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.rocketmq.client.common.ClientErrorCode;
+import org.apache.rocketmq.client.exception.RequestTimeoutException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.logging.InternalLogger;
+
+public class RequestFutureTable {
+    private static InternalLogger log = ClientLogger.getLog();
+    private static ConcurrentHashMap<String, RequestResponseFuture> requestFutureTable = new ConcurrentHashMap<String, RequestResponseFuture>();
+
+    public static ConcurrentHashMap<String, RequestResponseFuture> getRequestFutureTable() {
+        return requestFutureTable;
+    }
+
+    public static void scanExpiredRequest() {
+        final List<RequestResponseFuture> rfList = new LinkedList<RequestResponseFuture>();
+        Iterator<Map.Entry<String, RequestResponseFuture>> it = requestFutureTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<String, RequestResponseFuture> next = it.next();
+            RequestResponseFuture rep = next.getValue();
+
+            if (rep.isTimeout()) {
+                it.remove();
+                rfList.add(rep);
+                log.warn("remove timeout request, CorrelationId={}" + rep.getCorrelationId());
+            }
+        }
+
+        for (RequestResponseFuture rf : rfList) {
+            try {
+                Throwable cause = new RequestTimeoutException(ClientErrorCode.REQUEST_TIMEOUT_EXCEPTION, "request timeout, no reply message.");
+                rf.setCause(cause);
+                rf.executeRequestCallback();
+            } catch (Throwable e) {
+                log.warn("scanResponseTable, operationComplete Exception", e);
+            }
+        }
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java
new file mode 100644
index 0000000..c54b236
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/RequestResponseFuture.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.message.Message;
+
+public class RequestResponseFuture {
+    private final String correlationId;
+    private final RequestCallback requestCallback;
+    private final long beginTimestamp = System.currentTimeMillis();
+    private final Message requestMsg = null;
+    private long timeoutMillis;
+    private CountDownLatch countDownLatch = new CountDownLatch(1);
+    private volatile Message responseMsg = null;
+    private volatile boolean sendRequestOk = true;
+    private volatile Throwable cause = null;
+
+    public RequestResponseFuture(String correlationId, long timeoutMillis, RequestCallback requestCallback) {
+        this.correlationId = correlationId;
+        this.timeoutMillis = timeoutMillis;
+        this.requestCallback = requestCallback;
+    }
+
+    public void executeRequestCallback() {
+        if (requestCallback != null) {
+            if (sendRequestOk && cause == null) {
+                requestCallback.onSuccess(responseMsg);
+            } else {
+                requestCallback.onException(cause);
+            }
+        }
+    }
+
+    public boolean isTimeout() {
+        long diff = System.currentTimeMillis() - this.beginTimestamp;
+        return diff > this.timeoutMillis;
+    }
+
+    public Message waitResponseMessage(final long timeout) throws InterruptedException {
+        this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
+        return this.responseMsg;
+    }
+
+    public void putResponseMessage(final Message responseMsg) {
+        this.responseMsg = responseMsg;
+        this.countDownLatch.countDown();
+    }
+
+    public String getCorrelationId() {
+        return correlationId;
+    }
+
+    public long getTimeoutMillis() {
+        return timeoutMillis;
+    }
+
+    public void setTimeoutMillis(long timeoutMillis) {
+        this.timeoutMillis = timeoutMillis;
+    }
+
+    public RequestCallback getRequestCallback() {
+        return requestCallback;
+    }
+
+    public long getBeginTimestamp() {
+        return beginTimestamp;
+    }
+
+    public CountDownLatch getCountDownLatch() {
+        return countDownLatch;
+    }
+
+    public void setCountDownLatch(CountDownLatch countDownLatch) {
+        this.countDownLatch = countDownLatch;
+    }
+
+    public Message getResponseMsg() {
+        return responseMsg;
+    }
+
+    public void setResponseMsg(Message responseMsg) {
+        this.responseMsg = responseMsg;
+    }
+
+    public boolean isSendRequestOk() {
+        return sendRequestOk;
+    }
+
+    public void setSendReqeustOk(boolean sendReqeustOk) {
+        this.sendRequestOk = sendReqeustOk;
+    }
+
+    public Message getRequestMsg() {
+        return requestMsg;
+    }
+
+    public Throwable getCause() {
+        return cause;
+    }
+
+    public void setCause(Throwable cause) {
+        this.cause = cause;
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java b/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java
new file mode 100644
index 0000000..416ba44
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/client/utils/MessageUtil.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.utils;
+
+import org.apache.rocketmq.client.common.ClientErrorCode;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+
+public class MessageUtil {
+    public static Message createReplyMessage(final Message requestMessage, final byte[] body) throws MQClientException {
+        if (requestMessage != null) {
+            Message replyMessage = new Message();
+            String cluster = requestMessage.getProperty(MessageConst.PROPERTY_CLUSTER);
+            String replyTo = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);
+            String correlationId = requestMessage.getProperty(MessageConst.PROPERTY_CORRELATION_ID);
+            String ttl = requestMessage.getProperty(MessageConst.PROPERTY_MESSAGE_TTL);
+            replyMessage.setBody(body);
+            if (cluster != null) {
+                String replyTopic = MixAll.getReplyTopic(cluster);
+                replyMessage.setTopic(replyTopic);
+                MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TYPE, MixAll.REPLY_MESSAGE_FLAG);
+                MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_CORRELATION_ID, correlationId);
+                MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, replyTo);
+                MessageAccessor.putProperty(replyMessage, MessageConst.PROPERTY_MESSAGE_TTL, ttl);
+
+                return replyMessage;
+            } else {
+                throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage error, property[" + MessageConst.PROPERTY_CLUSTER + "] is null.");
+            }
+        }
+        throw new MQClientException(ClientErrorCode.CREATE_REPLY_MESSAGE_EXCEPTION, "create reply message fail, requestMessage cannot be null.");
+    }
+
+    public static String getReplyToClient(final Message msg) {
+        return msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT);
+    }
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index 84af632..3f00d9e 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@ -42,6 +42,7 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.mockito.Matchers;
 import org.mockito.Mock;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnitRunner;
@@ -164,7 +165,7 @@
             public Object answer(InvocationOnMock mock) throws Throwable {
                 InvokeCallback callback = mock.getArgument(3);
                 RemotingCommand request = mock.getArgument(1);
-                ResponseFuture responseFuture = new ResponseFuture(null,request.getOpaque(), 3 * 1000, null, null);
+                ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
                 responseFuture.setResponseCommand(createSuccessResponse(request));
                 callback.operationComplete(responseFuture);
                 return null;
@@ -289,6 +290,7 @@
             assertThat(ex.getErrorMessage()).isEqualTo("corresponding to accessConfig has been deleted failed");
         }
     }
+
     @Test
     public void testResumeCheckHalfMessage_WithException() throws RemotingException, InterruptedException, MQBrokerException, MQClientException {
         doAnswer(new Answer() {
@@ -322,6 +324,38 @@
         assertThat(result).isEqualTo(true);
     }
 
+    @Test
+    public void testSendMessageTypeofReply() throws Exception {
+        doAnswer(new Answer() {
+            @Override
+            public Object answer(InvocationOnMock mock) throws Throwable {
+                InvokeCallback callback = mock.getArgument(3);
+                RemotingCommand request = mock.getArgument(1);
+                ResponseFuture responseFuture = new ResponseFuture(null, request.getOpaque(), 3 * 1000, null, null);
+                responseFuture.setResponseCommand(createSuccessResponse(request));
+                callback.operationComplete(responseFuture);
+                return null;
+            }
+        }).when(remotingClient).invokeAsync(Matchers.anyString(), Matchers.any(RemotingCommand.class), Matchers.anyLong(), Matchers.any(InvokeCallback.class));
+        SendMessageContext sendMessageContext = new SendMessageContext();
+        sendMessageContext.setProducer(new DefaultMQProducerImpl(new DefaultMQProducer()));
+        msg.getProperties().put("MSG_TYPE", "reply");
+        mqClientAPI.sendMessage(brokerAddr, brokerName, msg, new SendMessageRequestHeader(), 3 * 1000, CommunicationMode.ASYNC,
+            new SendCallback() {
+                @Override
+                public void onSuccess(SendResult sendResult) {
+                    assertThat(sendResult.getSendStatus()).isEqualTo(SendStatus.SEND_OK);
+                    assertThat(sendResult.getOffsetMsgId()).isEqualTo("123");
+                    assertThat(sendResult.getQueueOffset()).isEqualTo(123L);
+                    assertThat(sendResult.getMessageQueue().getQueueId()).isEqualTo(1);
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                }
+            }, null, null, 0, sendMessageContext, defaultMQProducerImpl);
+    }
+
     private RemotingCommand createResumeSuccessResponse(RemotingCommand request) {
         RemotingCommand response = RemotingCommand.createResponseCommand(null);
         response.setCode(ResponseCode.SUCCESS);
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyServiceTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyServiceTest.java
new file mode 100644
index 0000000..4cfa011
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyServiceTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.client.impl.consumer;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.body.CMResult;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+public class ConsumeMessageOrderlyServiceTest {
+    private String consumerGroup;
+    private String topic = "FooBar";
+    private String brokerName = "BrokerA";
+    private DefaultMQPushConsumer pushConsumer;
+
+    @Before
+    public void init() throws Exception {
+        consumerGroup = "FooBarGroup" + System.currentTimeMillis();
+        pushConsumer = new DefaultMQPushConsumer(consumerGroup);
+    }
+
+    @Test
+    public void testConsumeMessageDirectly_WithNoException() {
+        Map<ConsumeOrderlyStatus, CMResult> map = new HashMap();
+        map.put(ConsumeOrderlyStatus.SUCCESS, CMResult.CR_SUCCESS);
+        map.put(ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT, CMResult.CR_LATER);
+        map.put(ConsumeOrderlyStatus.COMMIT, CMResult.CR_COMMIT);
+        map.put(ConsumeOrderlyStatus.ROLLBACK, CMResult.CR_ROLLBACK);
+        map.put(null, CMResult.CR_RETURN_NULL);
+
+        for (ConsumeOrderlyStatus consumeOrderlyStatus : map.keySet()) {
+            final ConsumeOrderlyStatus status = consumeOrderlyStatus;
+            MessageListenerOrderly listenerOrderly = new MessageListenerOrderly() {
+                @Override
+                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+                    return status;
+                }
+            };
+
+            ConsumeMessageOrderlyService consumeMessageOrderlyService = new ConsumeMessageOrderlyService(pushConsumer.getDefaultMQPushConsumerImpl(), listenerOrderly);
+            MessageExt msg = new MessageExt();
+            msg.setTopic(topic);
+            assertTrue(consumeMessageOrderlyService.consumeMessageDirectly(msg, brokerName).getConsumeResult().equals(map.get(consumeOrderlyStatus)));
+        }
+
+    }
+
+    @Test
+    public void testConsumeMessageDirectly_WithException() {
+        MessageListenerOrderly listenerOrderly = new MessageListenerOrderly() {
+            @Override
+            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
+                throw new RuntimeException();
+            }
+        };
+
+        ConsumeMessageOrderlyService consumeMessageOrderlyService = new ConsumeMessageOrderlyService(pushConsumer.getDefaultMQPushConsumerImpl(), listenerOrderly);
+        MessageExt msg = new MessageExt();
+        msg.setTopic(topic);
+        assertTrue(consumeMessageOrderlyService.consumeMessageDirectly(msg, brokerName).getConsumeResult().equals(CMResult.CR_THROW_EXCEPTION));
+    }
+
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
index 8e73ba5..818c94a 100644
--- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java
@@ -21,15 +21,18 @@
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.exception.RequestTimeoutException;
 import org.apache.rocketmq.client.hook.SendMessageContext;
 import org.apache.rocketmq.client.hook.SendMessageHook;
 import org.apache.rocketmq.client.impl.CommunicationMode;
@@ -45,6 +48,7 @@
 import org.apache.rocketmq.common.protocol.route.QueueData;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
 import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
 import org.junit.After;
 import org.junit.Before;
@@ -337,6 +341,101 @@
         assertThat(remotingClient.getCallbackExecutor()).isEqualTo(customized);
     }
 
+    @Test
+    public void testRequestMessage() throws RemotingException, RequestTimeoutException, MQClientException, InterruptedException, MQBrokerException {
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        final AtomicBoolean finish = new AtomicBoolean(false);
+        new Thread(new Runnable() {
+            @Override public void run() {
+                ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureTable.getRequestFutureTable();
+                assertThat(responseMap).isNotNull();
+                while (!finish.get()) {
+                    try {
+                        Thread.sleep(10);
+                    } catch (InterruptedException e) {
+                    }
+                    for (Map.Entry<String, RequestResponseFuture> entry : responseMap.entrySet()) {
+                        RequestResponseFuture future = entry.getValue();
+                        future.putResponseMessage(message);
+                    }
+                }
+            }
+        }).start();
+        Message result = producer.request(message, 3 * 1000L);
+        finish.getAndSet(true);
+        assertThat(result.getTopic()).isEqualTo("FooBar");
+        assertThat(result.getBody()).isEqualTo(new byte[] {'a'});
+    }
+
+    @Test(expected = RequestTimeoutException.class)
+    public void testRequestMessage_RequestTimeoutException() throws RemotingException, RequestTimeoutException, MQClientException, InterruptedException, MQBrokerException {
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        Message result = producer.request(message, 3 * 1000L);
+    }
+
+    @Test
+    public void testAsyncRequest_OnSuccess() throws Exception {
+        when(mQClientAPIImpl.getTopicRouteInfoFromNameServer(anyString(), anyLong())).thenReturn(createTopicRoute());
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        RequestCallback requestCallback = new RequestCallback() {
+            @Override public void onSuccess(Message message) {
+                assertThat(message.getTopic()).isEqualTo("FooBar");
+                assertThat(message.getBody()).isEqualTo(new byte[] {'a'});
+                assertThat(message.getFlag()).isEqualTo(1);
+                countDownLatch.countDown();
+            }
+
+            @Override public void onException(Throwable e) {
+            }
+        };
+        producer.request(message, requestCallback, 3 * 1000L);
+        ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureTable.getRequestFutureTable();
+        assertThat(responseMap).isNotNull();
+        for (Map.Entry<String, RequestResponseFuture> entry : responseMap.entrySet()) {
+            RequestResponseFuture future = entry.getValue();
+            future.setSendReqeustOk(true);
+            message.setFlag(1);
+            future.getRequestCallback().onSuccess(message);
+        }
+        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+    }
+
+    @Test
+    public void testAsyncRequest_OnException() throws Exception {
+        final AtomicInteger cc = new AtomicInteger(0);
+        final CountDownLatch countDownLatch = new CountDownLatch(1);
+        RequestCallback requestCallback = new RequestCallback() {
+            @Override public void onSuccess(Message message) {
+
+            }
+
+            @Override public void onException(Throwable e) {
+                cc.incrementAndGet();
+                countDownLatch.countDown();
+            }
+        };
+        MessageQueueSelector messageQueueSelector = new MessageQueueSelector() {
+            @Override
+            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                return null;
+            }
+        };
+
+        try {
+            producer.request(message, requestCallback, 3 * 1000L);
+            failBecauseExceptionWasNotThrown(Exception.class);
+        } catch (Exception e) {
+            ConcurrentHashMap<String, RequestResponseFuture> responseMap = RequestFutureTable.getRequestFutureTable();
+            assertThat(responseMap).isNotNull();
+            for (Map.Entry<String, RequestResponseFuture> entry : responseMap.entrySet()) {
+                RequestResponseFuture future = entry.getValue();
+                future.getRequestCallback().onException(e);
+            }
+        }
+        countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
+        assertThat(cc.get()).isEqualTo(1);
+    }
+
     public static TopicRouteData createTopicRoute() {
         TopicRouteData topicRouteData = new TopicRouteData();
 
diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/RequestResponseFutureTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/RequestResponseFutureTest.java
new file mode 100644
index 0000000..90e4623
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/producer/RequestResponseFutureTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.producer;
+
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.message.Message;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class RequestResponseFutureTest {
+
+    @Test
+    public void testExecuteRequestCallback() throws Exception {
+        final AtomicInteger cc = new AtomicInteger(0);
+        RequestResponseFuture future = new RequestResponseFuture(UUID.randomUUID().toString(), 3 * 1000L, new RequestCallback() {
+            @Override public void onSuccess(Message message) {
+                cc.incrementAndGet();
+            }
+
+            @Override public void onException(Throwable e) {
+            }
+        });
+        future.setSendReqeustOk(true);
+        future.executeRequestCallback();
+        assertThat(cc.get()).isEqualTo(1);
+    }
+
+}
diff --git a/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java b/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java
new file mode 100644
index 0000000..803e596
--- /dev/null
+++ b/client/src/test/java/org/apache/rocketmq/client/utils/MessageUtilsTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.client.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.junit.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Fail.failBecauseExceptionWasNotThrown;
+
+public class MessageUtilsTest {
+
+    @Test
+    public void testCreateReplyMessage() throws MQClientException {
+        Message msg = MessageUtil.createReplyMessage(createReplyMessage("clusterName"), new byte[] {'a'});
+        assertThat(msg.getTopic()).isEqualTo("clusterName" + "_" + MixAll.REPLY_TOPIC_POSTFIX);
+        assertThat(msg.getProperty(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT)).isEqualTo("127.0.0.1");
+        assertThat(msg.getProperty(MessageConst.PROPERTY_MESSAGE_TTL)).isEqualTo("3000");
+    }
+
+    @Test
+    public void testCreateReplyMessage_Exception() throws MQClientException {
+        try {
+            Message msg = MessageUtil.createReplyMessage(createReplyMessage(null), new byte[] {'a'});
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageContaining("create reply message fail, requestMessage error, property[" + MessageConst.PROPERTY_CLUSTER + "] is null.");
+        }
+    }
+
+    @Test
+    public void testCreateReplyMessage_reqMsgIsNull() throws MQClientException {
+        try {
+            Message msg = MessageUtil.createReplyMessage(null, new byte[] {'a'});
+            failBecauseExceptionWasNotThrown(MQClientException.class);
+        } catch (MQClientException e) {
+            assertThat(e).hasMessageContaining("create reply message fail, requestMessage cannot be null.");
+        }
+    }
+
+    @Test
+    public void testGetReplyToClient() throws MQClientException {
+        Message msg = createReplyMessage("clusterName");
+        String replyToClient = MessageUtil.getReplyToClient(msg);
+        assertThat(replyToClient).isNotNull();
+        assertThat(replyToClient).isEqualTo("127.0.0.1");
+    }
+
+    private Message createReplyMessage(String clusterName) {
+        Message requestMessage = new Message();
+        Map map = new HashMap<String, String>();
+        map.put(MessageConst.PROPERTY_MESSAGE_REPLY_TO_CLIENT, "127.0.0.1");
+        map.put(MessageConst.PROPERTY_CLUSTER, clusterName);
+        map.put(MessageConst.PROPERTY_MESSAGE_TTL, "3000");
+        MessageAccessor.setProperties(requestMessage, map);
+        return requestMessage;
+    }
+
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 1c3f37d..a7568f0 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -61,6 +61,7 @@
      */
     private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
     private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
+    private int processReplyMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
     private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
 
     private int adminBrokerThreadPoolNums = 16;
@@ -83,6 +84,7 @@
     private boolean fetchNamesrvAddrByAddressServer = false;
     private int sendThreadPoolQueueCapacity = 10000;
     private int pullThreadPoolQueueCapacity = 100000;
+    private int replyThreadPoolQueueCapacity = 10000;
     private int queryThreadPoolQueueCapacity = 20000;
     private int clientManagerThreadPoolQueueCapacity = 1000000;
     private int consumerManagerThreadPoolQueueCapacity = 1000000;
@@ -180,6 +182,8 @@
     @ImportantField
     private boolean aclEnable = false;
 
+    private boolean storeReplyMessageEnable = true;
+
     public static String localHostName() {
         try {
             return InetAddress.getLocalHost().getHostName();
@@ -374,6 +378,14 @@
         this.pullMessageThreadPoolNums = pullMessageThreadPoolNums;
     }
 
+    public int getProcessReplyMessageThreadPoolNums() {
+        return processReplyMessageThreadPoolNums;
+    }
+
+    public void setProcessReplyMessageThreadPoolNums(int processReplyMessageThreadPoolNums) {
+        this.processReplyMessageThreadPoolNums = processReplyMessageThreadPoolNums;
+    }
+
     public int getQueryMessageThreadPoolNums() {
         return queryMessageThreadPoolNums;
     }
@@ -470,6 +482,14 @@
         this.pullThreadPoolQueueCapacity = pullThreadPoolQueueCapacity;
     }
 
+    public int getReplyThreadPoolQueueCapacity() {
+        return replyThreadPoolQueueCapacity;
+    }
+
+    public void setReplyThreadPoolQueueCapacity(int replyThreadPoolQueueCapacity) {
+        this.replyThreadPoolQueueCapacity = replyThreadPoolQueueCapacity;
+    }
+
     public int getQueryThreadPoolQueueCapacity() {
         return queryThreadPoolQueueCapacity;
     }
@@ -749,7 +769,7 @@
     public void setMsgTraceTopicName(String msgTraceTopicName) {
         this.msgTraceTopicName = msgTraceTopicName;
     }
-    
+
     public boolean isTraceTopicEnable() {
         return traceTopicEnable;
     }
@@ -765,4 +785,12 @@
     public void setAclEnable(boolean aclEnable) {
         this.aclEnable = aclEnable;
     }
+
+    public boolean isStoreReplyMessageEnable() {
+        return storeReplyMessageEnable;
+    }
+
+    public void setStoreReplyMessageEnable(boolean storeReplyMessageEnable) {
+        this.storeReplyMessageEnable = storeReplyMessageEnable;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index bba8b36..e9a67bb 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -45,8 +45,6 @@
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 
 public class MixAll {
-    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
-
     public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME";
     public static final String ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir";
     public static final String NAMESRV_ADDR_ENV = "NAMESRV_ADDR";
@@ -74,27 +72,26 @@
     public static final String CID_ONSAPI_OWNER_GROUP = "CID_ONSAPI_OWNER";
     public static final String CID_ONSAPI_PULL_GROUP = "CID_ONSAPI_PULL";
     public static final String CID_RMQ_SYS_PREFIX = "CID_RMQ_SYS_";
-
     public static final List<String> LOCAL_INET_ADDRESS = getLocalInetAddress();
     public static final String LOCALHOST = localhost();
     public static final String DEFAULT_CHARSET = "UTF-8";
     public static final long MASTER_ID = 0L;
     public static final long CURRENT_JVM_PID = getPID();
-
     public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
-
     public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";
+    public static final String REPLY_TOPIC_POSTFIX = "REPLY_TOPIC";
     public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_";
     public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY";
     public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion";
     public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType";
-
     public static final String RMQ_SYS_TRANS_HALF_TOPIC = "RMQ_SYS_TRANS_HALF_TOPIC";
     public static final String RMQ_SYS_TRACE_TOPIC = "RMQ_SYS_TRACE_TOPIC";
     public static final String RMQ_SYS_TRANS_OP_HALF_TOPIC = "RMQ_SYS_TRANS_OP_HALF_TOPIC";
     public static final String TRANS_CHECK_MAX_TIME_TOPIC = "TRANS_CHECK_MAX_TIME_TOPIC";
     public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
     public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
+    public static final String REPLY_MESSAGE_FLAG = "reply";
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
 
     public static String getWSAddr() {
         String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
@@ -110,6 +107,10 @@
         return RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
     }
 
+    public static String getReplyTopic(final String clusterName) {
+        return clusterName + "_" + REPLY_TOPIC_POSTFIX;
+    }
+
     public static boolean isSysConsumerGroup(final String consumerGroup) {
         return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX);
     }
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index 85a4cde..222f697 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -440,13 +440,11 @@
         return false;
     }
 
-    public static boolean isInternalV6IP(byte[] ip) {
-        if (ip.length != 16) {
-            throw new RuntimeException("illegal ipv6 bytes");
-        }
-
-        //FEC0:0000:0000:0000:0000:0000:0000:0000/10
-        if (ip[0] == (byte) 254 && ip[1] >= (byte) 192) {
+    public static boolean isInternalV6IP(InetAddress inetAddr) {
+        if (inetAddr.isAnyLocalAddress() // Wild card ipv6
+            || inetAddr.isLinkLocalAddress() // Single broadcast ipv6 address: fe80:xx:xx...
+            || inetAddr.isLoopbackAddress() //Loopback ipv6 address
+            || inetAddr.isSiteLocalAddress()) { // Site local ipv6 address: fec0:xx:xx...
             return true;
         }
         return false;
@@ -457,9 +455,6 @@
             throw new RuntimeException("illegal ipv4 bytes");
         }
 
-//        if (ip[0] == (byte)30 && ip[1] == (byte)10 && ip[2] == (byte)163 && ip[3] == (byte)120) {
-//        }
-
         if (ip[0] >= (byte) 1 && ip[0] <= (byte) 126) {
             if (ip[1] == (byte) 1 && ip[2] == (byte) 1 && ip[3] == (byte) 1) {
                 return false;
@@ -550,7 +545,7 @@
                         byte[] ipByte = ip.getAddress();
                         if (ipByte.length == 16) {
                             if (ipV6Check(ipByte)) {
-                                if (!isInternalV6IP(ipByte)) {
+                                if (!isInternalV6IP(ip)) {
                                     return ipByte;
                                 } else if (internalIP == null) {
                                     internalIP = ipByte;
diff --git a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index aa84816..5bdc846 100644
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@ -45,6 +45,13 @@
     public static final String PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES";
     public static final String PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS";
     public static final String PROPERTY_INSTANCE_ID = "INSTANCE_ID";
+    public static final String PROPERTY_CORRELATION_ID = "CORRELATION_ID";
+    public static final String PROPERTY_MESSAGE_REPLY_TO_CLIENT = "REPLY_TO_CLIENT";
+    public static final String PROPERTY_MESSAGE_TTL = "TTL";
+    public static final String PROPERTY_REPLY_MESSAGE_ARRIVE_TIME = "ARRIVE_TIME";
+    public static final String PROPERTY_PUSH_REPLY_TIME = "PUSH_REPLY_TIME";
+    public static final String PROPERTY_CLUSTER = "CLUSTER";
+    public static final String PROPERTY_MESSAGE_TYPE = "MSG_TYPE";
 
     public static final String KEY_SEPARATOR = " ";
 
@@ -74,5 +81,12 @@
         STRING_HASH_SET.add(PROPERTY_MAX_RECONSUME_TIMES);
         STRING_HASH_SET.add(PROPERTY_CONSUME_START_TIMESTAMP);
         STRING_HASH_SET.add(PROPERTY_INSTANCE_ID);
+        STRING_HASH_SET.add(PROPERTY_CORRELATION_ID);
+        STRING_HASH_SET.add(PROPERTY_MESSAGE_REPLY_TO_CLIENT);
+        STRING_HASH_SET.add(PROPERTY_MESSAGE_TTL);
+        STRING_HASH_SET.add(PROPERTY_REPLY_MESSAGE_ARRIVE_TIME);
+        STRING_HASH_SET.add(PROPERTY_PUSH_REPLY_TIME);
+        STRING_HASH_SET.add(PROPERTY_CLUSTER);
+        STRING_HASH_SET.add(PROPERTY_MESSAGE_TYPE);
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index dbdabbc..b3009d7 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -182,4 +182,10 @@
      * resume logic of checking half messages that have been put in TRANS_CHECK_MAXTIME_TOPIC before
      */
     public static final int RESUME_CHECK_HALF_MESSAGE = 323;
+
+    public static final int SEND_REPLY_MESSAGE = 324;
+
+    public static final int SEND_REPLY_MESSAGE_V2 = 325;
+
+    public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java
new file mode 100644
index 0000000..3bb0907
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/ReplyMessageRequestHeader.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.annotation.CFNullable;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class ReplyMessageRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String producerGroup;
+    @CFNotNull
+    private String topic;
+    @CFNotNull
+    private String defaultTopic;
+    @CFNotNull
+    private Integer defaultTopicQueueNums;
+    @CFNotNull
+    private Integer queueId;
+    @CFNotNull
+    private Integer sysFlag;
+    @CFNotNull
+    private Long bornTimestamp;
+    @CFNotNull
+    private Integer flag;
+    @CFNullable
+    private String properties;
+    @CFNullable
+    private Integer reconsumeTimes;
+    @CFNullable
+    private boolean unitMode = false;
+
+    @CFNotNull
+    private String bornHost;
+    @CFNotNull
+    private String storeHost;
+    @CFNotNull
+    private long storeTimestamp;
+
+    public void checkFields() throws RemotingCommandException {
+    }
+
+    public String getProducerGroup() {
+        return producerGroup;
+    }
+
+    public void setProducerGroup(String producerGroup) {
+        this.producerGroup = producerGroup;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    public String getDefaultTopic() {
+        return defaultTopic;
+    }
+
+    public void setDefaultTopic(String defaultTopic) {
+        this.defaultTopic = defaultTopic;
+    }
+
+    public Integer getDefaultTopicQueueNums() {
+        return defaultTopicQueueNums;
+    }
+
+    public void setDefaultTopicQueueNums(Integer defaultTopicQueueNums) {
+        this.defaultTopicQueueNums = defaultTopicQueueNums;
+    }
+
+    public Integer getQueueId() {
+        return queueId;
+    }
+
+    public void setQueueId(Integer queueId) {
+        this.queueId = queueId;
+    }
+
+    public Integer getSysFlag() {
+        return sysFlag;
+    }
+
+    public void setSysFlag(Integer sysFlag) {
+        this.sysFlag = sysFlag;
+    }
+
+    public Long getBornTimestamp() {
+        return bornTimestamp;
+    }
+
+    public void setBornTimestamp(Long bornTimestamp) {
+        this.bornTimestamp = bornTimestamp;
+    }
+
+    public Integer getFlag() {
+        return flag;
+    }
+
+    public void setFlag(Integer flag) {
+        this.flag = flag;
+    }
+
+    public String getProperties() {
+        return properties;
+    }
+
+    public void setProperties(String properties) {
+        this.properties = properties;
+    }
+
+    public Integer getReconsumeTimes() {
+        return reconsumeTimes;
+    }
+
+    public void setReconsumeTimes(Integer reconsumeTimes) {
+        this.reconsumeTimes = reconsumeTimes;
+    }
+
+    public boolean isUnitMode() {
+        return unitMode;
+    }
+
+    public void setUnitMode(boolean unitMode) {
+        this.unitMode = unitMode;
+    }
+
+    public String getBornHost() {
+        return bornHost;
+    }
+
+    public void setBornHost(String bornHost) {
+        this.bornHost = bornHost;
+    }
+
+    public String getStoreHost() {
+        return storeHost;
+    }
+
+    public void setStoreHost(String storeHost) {
+        this.storeHost = storeHost;
+    }
+
+    public long getStoreTimestamp() {
+        return storeTimestamp;
+    }
+
+    public void setStoreTimestamp(long storeTimestamp) {
+        this.storeTimestamp = storeTimestamp;
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/utils/CorrelationIdUtil.java b/common/src/main/java/org/apache/rocketmq/common/utils/CorrelationIdUtil.java
new file mode 100644
index 0000000..86d1fd4
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/utils/CorrelationIdUtil.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.utils;
+
+import java.util.UUID;
+
+public class CorrelationIdUtil {
+    public static String createCorrelationId() {
+        return UUID.randomUUID().toString();
+    }
+}
diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
index 3991367..b854099 100644
--- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.common;
 
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.util.Properties;
 import org.junit.Test;
 
@@ -99,12 +101,12 @@
     }
 
     @Test
-    public void testIPv6Check() {
-        byte[] nonInternalIp = UtilAll.string2bytes("24084004018081003FAA1DDE2B3F898A");
-        byte[] internalIp = UtilAll.string2bytes("FEC0000000000000000000000000FFFF");
-        assertThat(UtilAll.isInternalV6IP(nonInternalIp)).isFalse();
-        assertThat(UtilAll.isInternalV6IP(internalIp)).isTrue();
-        assertThat(UtilAll.ipToIPv6Str(nonInternalIp).toUpperCase()).isEqualTo("2408:4004:0180:8100:3FAA:1DDE:2B3F:898A");
+    public void testIPv6Check() throws UnknownHostException {
+        InetAddress nonInternal = InetAddress.getByName("2408:4004:0180:8100:3FAA:1DDE:2B3F:898A");
+        InetAddress internal = InetAddress.getByName("FE80:0000:0000:0000:0000:0000:0000:FFFF");
+        assertThat(UtilAll.isInternalV6IP(nonInternal)).isFalse();
+        assertThat(UtilAll.isInternalV6IP(internal)).isTrue();
+        assertThat(UtilAll.ipToIPv6Str(nonInternal.getAddress()).toUpperCase()).isEqualTo("2408:4004:0180:8100:3FAA:1DDE:2B3F:898A");
     }
 
     static class DemoConfig {
diff --git a/common/src/test/java/org/apache/rocketmq/common/message/MessageClientIDSetterTest.java b/common/src/test/java/org/apache/rocketmq/common/message/MessageClientIDSetterTest.java
index 0df75eb..1ec6d93 100644
--- a/common/src/test/java/org/apache/rocketmq/common/message/MessageClientIDSetterTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/message/MessageClientIDSetterTest.java
@@ -17,9 +17,8 @@
 
 package org.apache.rocketmq.common.message;
 
-import java.util.Calendar;
-import java.util.Date;
 import org.junit.Test;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 public class MessageClientIDSetterTest {
diff --git a/docs/cn/README b/docs/cn/README
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/docs/cn/README
diff --git a/docs/cn/README.md b/docs/cn/README.md
index 3247985..2dbd854 100644
--- a/docs/cn/README.md
+++ b/docs/cn/README.md
@@ -1,7 +1,7 @@
 Apache RocketMQ开发者指南
 --------
 
-##### 这个开发者指南是帮忙您快速了解,并使用 Apache RocketMQ
+##### 这个开发者指南是帮助您快速了解,并使用 Apache RocketMQ
 
 ### 1. 概念和特性
 
diff --git a/docs/cn/RocketMQ_Example.md b/docs/cn/RocketMQ_Example.md
index d298db8..09acb0d 100644
--- a/docs/cn/RocketMQ_Example.md
+++ b/docs/cn/RocketMQ_Example.md
@@ -66,7 +66,11 @@
     	// 启动Producer实例
         producer.start();
         producer.setRetryTimesWhenSendAsyncFailed(0);
-    	for (int i = 0; i < 100; i++) {
+	
+	int messageCount = 100;
+        // 根据消息数量实例化倒计时计算器
+	final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
+    	for (int i = 0; i < messageCount; i++) {
                 final int index = i;
             	// 创建消息,并指定Topic,Tag和消息体
                 Message msg = new Message("TopicTest",
@@ -87,6 +91,8 @@
                     }
             	});
     	}
+	// 等待5s
+	countDownLatch.await(5, TimeUnit.SECONDS);
     	// 如果不再发送消息,关闭Producer实例。
     	producer.shutdown();
     }
diff --git a/docs/cn/features.md b/docs/cn/features.md
index 859e0f8..e7b6386 100644
--- a/docs/cn/features.md
+++ b/docs/cn/features.md
@@ -16,7 +16,7 @@
 RocketMQ的消费者可以根据Tag进行消息过滤,也支持自定义属性过滤。消息过滤目前是在Broker端实现的,优点是减少了对于Consumer无用消息的网络传输,缺点是增加了Broker的负担、而且实现相对复杂。
 ## 4 消息可靠性
 RocketMQ支持消息的高可靠,影响消息可靠性的几种情况:
-1) Broker正常关闭
+1) Broker非正常关闭
 2) Broker异常Crash
 3) OS Crash
 4) 机器掉电,但是能立即恢复供电情况
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
index d431d3e..4724a1d 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/Consumer.java
@@ -103,6 +103,10 @@
         }, 10000, 10000);
 
         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
+        if (commandLine.hasOption('n')) {
+            String ns = commandLine.getOptionValue('n');
+            consumer.setNamesrvAddr(ns);
+        }
         consumer.setInstanceName(Long.toString(System.currentTimeMillis()));
 
         if (filterType == null || expression == null) {
diff --git a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
index d9fafdd..04707e1 100644
--- a/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/benchmark/TransactionProducer.java
@@ -24,6 +24,11 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.PosixParser;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.LocalTransactionExecuter;
 import org.apache.rocketmq.client.producer.LocalTransactionState;
@@ -33,20 +38,19 @@
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
+import org.apache.rocketmq.srvutil.ServerUtil;
 
 public class TransactionProducer {
-    private static int threadCount;
-    private static int messageSize;
-    private static boolean ischeck;
-    private static boolean ischeckffalse;
 
     public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
-        threadCount = args.length >= 1 ? Integer.parseInt(args[0]) : 32;
-        messageSize = args.length >= 2 ? Integer.parseInt(args[1]) : 1024 * 2;
-        ischeck = args.length >= 3 && Boolean.parseBoolean(args[2]);
-        ischeckffalse = args.length >= 4 && Boolean.parseBoolean(args[3]);
+        Options options = ServerUtil.buildCommandlineOptions(new Options());
+        CommandLine commandLine = ServerUtil.parseCmdLine("TransactionProducer", args, buildCommandlineOptions(options), new PosixParser());
 
-        final Message msg = buildMessage(messageSize);
+        final String topic = commandLine.hasOption('t') ? commandLine.getOptionValue('t').trim() : "BenchmarkTest";
+        final int threadCount = commandLine.hasOption('w') ? Integer.parseInt(commandLine.getOptionValue('w')) : 32;
+        final int messageSize = commandLine.hasOption('s') ? Integer.parseInt(commandLine.getOptionValue('s')) : 2048;
+        final boolean ischeck = commandLine.hasOption('c') ? Boolean.parseBoolean(commandLine.getOptionValue('c')) : false;
+        final boolean ischeckffalse = commandLine.hasOption('r') ? Boolean.parseBoolean(commandLine.getOptionValue('r')) : true;
 
         final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);
 
@@ -98,6 +102,10 @@
         producer.setInstanceName(Long.toString(System.currentTimeMillis()));
         producer.setTransactionCheckListener(transactionCheckListener);
         producer.setDefaultTopicQueueNums(1000);
+        if (commandLine.hasOption('n')) {
+            String ns = commandLine.getOptionValue('n');
+            producer.setNamesrvAddr(ns);
+        }
         producer.start();
 
         final TransactionExecuterBImpl tranExecuter = new TransactionExecuterBImpl(ischeck);
@@ -111,7 +119,7 @@
                             // Thread.sleep(1000);
                             final long beginTimestamp = System.currentTimeMillis();
                             SendResult sendResult =
-                                producer.sendMessageInTransaction(msg, tranExecuter, null);
+                                producer.sendMessageInTransaction(buildMessage(messageSize, topic), tranExecuter, null);
                             if (sendResult != null) {
                                 statsBenchmark.getSendRequestSuccessCount().incrementAndGet();
                                 statsBenchmark.getReceiveResponseSuccessCount().incrementAndGet();
@@ -138,18 +146,45 @@
         }
     }
 
-    private static Message buildMessage(final int messageSize) throws UnsupportedEncodingException {
-        Message msg = new Message();
-        msg.setTopic("BenchmarkTest");
+    private static Message buildMessage(final int messageSize, String topic) {
+        try {
+            Message msg = new Message();
+            msg.setTopic(topic);
 
-        StringBuilder sb = new StringBuilder();
-        for (int i = 0; i < messageSize; i += 10) {
-            sb.append("hello baby");
+            StringBuilder sb = new StringBuilder();
+            for (int i = 0; i < messageSize; i += 10) {
+                sb.append("hello baby");
+            }
+            msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
+            return msg;
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
         }
+    }
 
-        msg.setBody(sb.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
+    public static Options buildCommandlineOptions(final Options options) {
+        Option opt = new Option("w", "threadCount", true, "Thread count, Default: 32");
+        opt.setRequired(false);
+        options.addOption(opt);
 
-        return msg;
+        opt = new Option("s", "messageSize", true, "Message Size, Default: 2048");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("t", "topic", true, "Topic name, Default: BenchmarkTest");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("c", "check", true, "Check the message, Default: false");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+        opt = new Option("r", "checkResult", true, "Message check result, Default: true");
+        opt.setRequired(false);
+        options.addOption(opt);
+
+
+        return options;
     }
 }
 
diff --git a/example/src/main/java/org/apache/rocketmq/example/rpc/AsyncRequestProducer.java b/example/src/main/java/org/apache/rocketmq/example/rpc/AsyncRequestProducer.java
new file mode 100644
index 0000000..072291d
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/rpc/AsyncRequestProducer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.rpc;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.RequestCallback;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class AsyncRequestProducer {
+    private static final InternalLogger log = ClientLogger.getLog();
+
+    public static void main(String[] args) throws MQClientException, InterruptedException {
+        String producerGroup = "please_rename_unique_group_name";
+        String topic = "RequestTopic";
+        long ttl = 3000;
+
+        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
+        producer.start();
+
+        try {
+            Message msg = new Message(topic,
+                "",
+                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+
+            long begin = System.currentTimeMillis();
+            producer.request(msg, new RequestCallback() {
+                @Override
+                public void onSuccess(Message message) {
+                    long cost = System.currentTimeMillis() - begin;
+                    System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, message);
+                }
+
+                @Override
+                public void onException(Throwable e) {
+                    System.err.printf("request to <%s> fail.", topic);
+                }
+            }, ttl);
+        } catch (Exception e) {
+            log.warn("", e);
+        }
+         /* shutdown after your request callback is finished */
+//        producer.shutdown();
+    }
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java b/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
new file mode 100644
index 0000000..b34908b
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/rpc/RequestProducer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.rpc;
+
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.remoting.common.RemotingHelper;
+
+public class RequestProducer {
+    public static void main(String[] args) throws MQClientException, InterruptedException {
+        String producerGroup = "please_rename_unique_group_name";
+        String topic = "RequestTopic";
+        long ttl = 3000;
+
+        DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
+        producer.start();
+
+        try {
+            Message msg = new Message(topic,
+                "",
+                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
+
+            long begin = System.currentTimeMillis();
+            Message retMsg = producer.request(msg, ttl);
+            long cost = System.currentTimeMillis() - begin;
+            System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        producer.shutdown();
+    }
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java b/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java
new file mode 100644
index 0000000..c62c7d4
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/rpc/ResponseConsumer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.example.rpc;
+
+import java.util.List;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQBrokerException;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.utils.MessageUtil;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+
+public class ResponseConsumer {
+    public static void main(String[] args) throws InterruptedException, MQClientException {
+        String producerGroup = "please_rename_unique_group_name";
+        String consumerGroup = "please_rename_unique_group_name";
+        String topic = "RequestTopic";
+
+        // create a producer to send reply message
+        DefaultMQProducer replyProducer = new DefaultMQProducer(producerGroup);
+        replyProducer.start();
+
+        // create consumer
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
+
+        // recommend client configs
+        consumer.setPullTimeDelayMillsWhenException(0L);
+
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+                for (MessageExt msg : msgs) {
+                    try {
+                        System.out.printf("handle message: %s", msg.toString());
+                        String replyTo = MessageUtil.getReplyToClient(msg);
+                        byte[] replyContent = "reply message contents.".getBytes();
+                        // create reply message with given util, do not create reply message by yourself
+                        Message replyMessage = MessageUtil.createReplyMessage(msg, replyContent);
+
+                        // send reply message with producer
+                        SendResult replyResult = replyProducer.send(replyMessage, 3000);
+                        System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());
+                    } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+
+        consumer.subscribe(topic, "*");
+        consumer.start();
+        System.out.printf("Consumer Started.%n");
+    }
+}
diff --git a/pom.xml b/pom.xml
index 0161c7b..14b10fa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -569,7 +569,7 @@
             <dependency>
                 <groupId>com.alibaba</groupId>
                 <artifactId>fastjson</artifactId>
-                <version>1.2.51</version>
+                <version>1.2.61</version>
             </dependency>
             <dependency>
                 <groupId>org.javassist</groupId>
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
index 84fb421..3035c57 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/HAService.java
@@ -280,7 +280,9 @@
                 if (!this.requestsRead.isEmpty()) {
                     for (CommitLog.GroupCommitRequest req : this.requestsRead) {
                         boolean transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
-                        for (int i = 0; !transferOK && i < 5; i++) {
+                        long waitUntilWhen = HAService.this.defaultMessageStore.getSystemClock().now()
+                            + HAService.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout();
+                        while (!transferOK && HAService.this.defaultMessageStore.getSystemClock().now() < waitUntilWhen) {
                             this.notifyTransferObject.waitForRunning(1000);
                             transferOK = HAService.this.push2SlaveMaxOffset.get() >= req.getNextOffset();
                         }