[ISSUE #137] split TcpRemotingClient::m_ioService into m_dispatchService and m_handleService
diff --git a/example/CBatchProducer.c b/example/CBatchProducer.c
new file mode 100644
index 0000000..32d6a7a
--- /dev/null
+++ b/example/CBatchProducer.c
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <stdio.h>
+#include "CBatchMessage.h"
+#include "CCommon.h"
+#include "CMessage.h"
+#include "CProducer.h"
+#include "CSendResult.h"
+
+void StartSendMessage(CProducer* producer) {
+ int i = 0;
+ int ret_code = 0;
+ char body[128];
+ CBatchMessage* batchMessage = CreateBatchMessage("T_TestTopic");
+
+ for (i = 0; i < 10; i++) {
+ CMessage* msg = CreateMessage("T_TestTopic");
+ SetMessageTags(msg, "Test_Tag");
+ SetMessageKeys(msg, "Test_Keys");
+ memset(body, 0, sizeof(body));
+ snprintf(body, sizeof(body), "new message body, index %d", i);
+ SetMessageBody(msg, body);
+ AddMessage(batchMessage, msg);
+ }
+ CSendResult result;
+ int ok = SendBatchMessage(producer, batchMessage, &result);
+ printf("SendBatchMessage is %s .....\n", ok == 0 ? "Success" : ok == 11 ? "FAILED" : " It is null value");
+ DestroyBatchMessage(batchMessage);
+}
+
+void CreateProducerAndStartSendMessage() {
+ printf("Producer Initializing.....\n");
+ CProducer* producer = CreateProducer("Group_producer");
+ SetProducerNameServerAddress(producer, "127.0.0.1:9876");
+ StartProducer(producer);
+ printf("Producer start.....\n");
+ StartSendMessage(producer);
+ ShutdownProducer(producer);
+ DestroyProducer(producer);
+ printf("Producer Shutdown!\n");
+}
+
+int main(int argc, char* argv[]) {
+ printf("Send Batch.....\n");
+ CreateProducerAndStartSendMessage();
+ return 0;
+}
diff --git a/include/CBatchMessage.h b/include/CBatchMessage.h
new file mode 100644
index 0000000..6409a55
--- /dev/null
+++ b/include/CBatchMessage.h
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __C_BATCHMESSAGE_H__
+#define __C_BATCHMESSAGE_H__
+#include "CCommon.h"
+#include "CMessage.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+typedef struct CBatchMessage CBatchMessage;
+
+ROCKETMQCLIENT_API CBatchMessage* CreateBatchMessage();
+ROCKETMQCLIENT_API int AddMessage(CBatchMessage* batchMsg, CMessage* msg);
+ROCKETMQCLIENT_API int DestroyBatchMessage(CBatchMessage* batchMsg);
+
+#ifdef __cplusplus
+};
+#endif
+#endif //__C_BATCHMESSAGE_H__
diff --git a/include/CProducer.h b/include/CProducer.h
index e345e60..6b00575 100644
--- a/include/CProducer.h
+++ b/include/CProducer.h
@@ -18,6 +18,7 @@
#ifndef __C_PRODUCER_H__
#define __C_PRODUCER_H__
+#include "CBatchMessage.h"
#include "CMessage.h"
#include "CSendResult.h"
#include "CMQException.h"
@@ -53,6 +54,7 @@
ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer* producer, int size);
ROCKETMQCLIENT_API int SendMessageSync(CProducer* producer, CMessage* msg, CSendResult* result);
+ROCKETMQCLIENT_API int SendBatchMessage(CProducer* producer, CBatchMessage* msg, CSendResult* result);
ROCKETMQCLIENT_API int SendMessageAsync(CProducer* producer,
CMessage* msg,
CSendSuccessCallback cSendSuccessCallback,
@@ -79,4 +81,4 @@
#ifdef __cplusplus
};
#endif
-#endif //__C_PRODUCER_H__
+#endif //__C_PRODUCER_H__
\ No newline at end of file
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 8ec6044..2038f6c 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -430,10 +430,6 @@
}
}
-void MQClientAPIImpl::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
- m_pRemotingClient->deleteOpaqueForDropPullRequest(mq, opaque);
-}
-
PullResult* MQClientAPIImpl::pullMessage(const string& addr,
PullMessageRequestHeader* pRequestHeader,
int timeoutMillis,
@@ -467,21 +463,9 @@
void* pArg) {
//<!delete in future;
AsyncCallbackWrap* cbw = new PullCallbackWarp(pullCallback, this, pArg);
- MQMessageQueue mq;
- AsyncArg* pAsyncArg = static_cast<AsyncArg*>(pArg);
- if (pAsyncArg && pAsyncArg->pPullRequest) {
- mq = pAsyncArg->mq;
- pAsyncArg->pPullRequest->setLatestPullRequestOpaque(request.getOpaque());
- LOG_DEBUG("pullMessageAsync set opaque:%d, mq:%s", pAsyncArg->pPullRequest->getLatestPullRequestOpaque(),
- mq.toString().c_str());
- }
-
if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMillis) == false) {
- LOG_ERROR("pullMessageAsync failed of addr:%s, opaque:%d, mq:%s", addr.c_str(), request.getOpaque(),
- mq.toString().data());
- if (pAsyncArg && pAsyncArg->pPullRequest) {
- pAsyncArg->pPullRequest->setLatestPullRequestOpaque(0);
- }
+ LOG_ERROR("pullMessageAsync failed of addr:%s, mq:%s", addr.c_str(),
+ static_cast<AsyncArg*>(pArg)->mq.toString().data());
deleteAndZero(cbw);
THROW_MQEXCEPTION(MQClientException, "pullMessageAsync failed", -1);
}
@@ -919,4 +903,4 @@
}
//<!************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 1a5e202..a36038d 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -187,7 +187,6 @@
int64 timeoutMilliseconds,
int maxRetryTimes = 1,
int retrySendTimes = 1);
- void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
private:
SendResult sendMessageSync(const string& addr,
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index cfa62dd..38ce229 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -988,18 +988,6 @@
}
}
-void MQClientFactory::removeDropedPullRequestOpaque(PullRequest* pullRequest) {
- // delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
- if (!pullRequest)
- return;
- MQMessageQueue mq = pullRequest->m_messageQueue;
- int opaque = pullRequest->getLatestPullRequestOpaque();
- if (opaque > 0) {
- LOG_INFO("####### need delete the pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
- getMQClientAPIImpl()->deleteOpaqueForDropPullRequest(mq, opaque);
- }
-}
-
void MQClientFactory::resetOffset(const string& group,
const string& topic,
const map<MQMessageQueue, int64>& offsetTable) {
@@ -1012,10 +1000,7 @@
PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
if (pullreq) {
pullreq->setDroped(true);
- LOG_INFO("resetOffset setDroped for opaque:%d, mq:%s", pullreq->getLatestPullRequestOpaque(),
- mq.toString().data());
- // delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
- // removeDropedPullRequestOpaque(pullreq);
+ LOG_INFO("resetOffset setDroped for mq:%s", mq.toString().data());
pullreq->clearAllMsgs();
pullreq->updateQueueMaxOffset(it->second);
} else {
@@ -1102,4 +1087,4 @@
}
//<!************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h
index e5d6200..eeb3637 100644
--- a/src/MQClientFactory.h
+++ b/src/MQClientFactory.h
@@ -108,7 +108,6 @@
void addBrokerToAddrMap(const string& brokerName, map<int, string>& brokerAddrs);
map<string, map<int, string>> getBrokerAddrMap();
void clearBrokerAddrMap();
- void removeDropedPullRequestOpaque(PullRequest* pullRequest);
private:
void unregisterClient(const string& producerGroup,
diff --git a/src/common/AsyncArg.h b/src/common/AsyncArg.h
index 4e23743..fc358cb 100644
--- a/src/common/AsyncArg.h
+++ b/src/common/AsyncArg.h
@@ -21,7 +21,7 @@
#include "MQMessageQueue.h"
#include "PullAPIWrapper.h"
#include "SubscriptionData.h"
-#include "../consumer/PullRequest.h"
+
namespace rocketmq {
//<!***************************************************************************
@@ -29,7 +29,6 @@
MQMessageQueue mq;
SubscriptionData subData;
PullAPIWrapper* pPullWrapper;
- PullRequest* pPullRequest;
};
//<!***************************************************************************
diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
index 0db381f..2175feb 100644
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -263,7 +263,6 @@
arg.mq = mq;
arg.subData = *pSData;
arg.pPullWrapper = m_pPullAPIWrapper;
- arg.pPullRequest = NULL;
try {
unique_ptr<PullResult> pullResult(m_pPullAPIWrapper->pullKernelImpl(mq, // 1
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index f9e5874..0f51ea1 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -784,7 +784,6 @@
arg.mq = messageQueue;
arg.subData = *pSdata;
arg.pPullWrapper = m_pPullAPIWrapper;
- arg.pPullRequest = request;
try {
request->setLastPullTimestamp(UtilAll::currentTimeMillis());
m_pPullAPIWrapper->pullKernelImpl(messageQueue, // 1
diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp
index 5fd6d58..162be2a 100644
--- a/src/consumer/PullRequest.cpp
+++ b/src/consumer/PullRequest.cpp
@@ -28,7 +28,6 @@
m_queueOffsetMax(0),
m_bDroped(false),
m_bLocked(false),
- m_latestPullRequestOpaque(0),
m_bPullMsgEventInprogress(false) {}
PullRequest::~PullRequest() {
@@ -46,7 +45,6 @@
m_messageQueue = other.m_messageQueue;
m_msgTreeMap = other.m_msgTreeMap;
m_msgTreeMapTemp = other.m_msgTreeMapTemp;
- m_latestPullRequestOpaque = other.m_latestPullRequestOpaque;
}
return *this;
}
@@ -272,15 +270,5 @@
return false;
}
-int PullRequest::getLatestPullRequestOpaque() {
- boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
- return m_latestPullRequestOpaque;
-}
-
-void PullRequest::setLatestPullRequestOpaque(int opaque) {
- boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
- m_latestPullRequestOpaque = opaque;
-}
-
//<!***************************************************************************
} //<!end namespace;
diff --git a/src/consumer/PullRequest.h b/src/consumer/PullRequest.h
index cef6df3..83b39b8 100644
--- a/src/consumer/PullRequest.h
+++ b/src/consumer/PullRequest.h
@@ -68,8 +68,6 @@
boost::timed_mutex& getPullRequestCriticalSection();
void removePullMsgEvent();
bool addPullMsgEvent();
- int getLatestPullRequestOpaque();
- void setLatestPullRequestOpaque(int opaque);
public:
MQMessageQueue m_messageQueue;
@@ -89,7 +87,6 @@
// uint64 m_tryUnlockTimes;
uint64 m_lastPullTimestamp;
uint64 m_lastConsumeTimestamp;
- int m_latestPullRequestOpaque;
boost::timed_mutex m_consumeLock;
boost::atomic<bool> m_bPullMsgEventInprogress;
};
diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index a2997a7..549e412 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -458,7 +458,7 @@
it->second->clearAllMsgs(); // add clear operation to avoid bad state
// when dropped pullRequest returns
// normal
- LOG_INFO("drop mq:%s, delete opaque:%d", mqtemp.toString().c_str(), it->second->getLatestPullRequestOpaque());
+ LOG_INFO("drop mq:%s", mqtemp.toString().c_str());
}
changed = true;
}
@@ -634,4 +634,4 @@
}
//<!************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
diff --git a/src/extern/CBatchMessage.cpp b/src/extern/CBatchMessage.cpp
new file mode 100644
index 0000000..3fb2a97
--- /dev/null
+++ b/src/extern/CBatchMessage.cpp
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vector>
+
+#include "CBatchMessage.h"
+#include "CCommon.h"
+#include "CMessage.h"
+#include "MQMessage.h"
+
+using std::vector;
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+using namespace rocketmq;
+
+CBatchMessage* CreateBatchMessage() {
+ vector<MQMessage>* msgs = new vector<MQMessage>();
+ return (CBatchMessage*)msgs;
+}
+
+int AddMessage(CBatchMessage* batchMsg, CMessage* msg) {
+ if (msg == NULL) {
+ return NULL_POINTER;
+ }
+ if (batchMsg == NULL) {
+ return NULL_POINTER;
+ }
+ MQMessage* message = (MQMessage*)msg;
+ ((vector<MQMessage>*)batchMsg)->push_back(*message);
+ return OK;
+}
+int DestroyBatchMessage(CBatchMessage* batchMsg) {
+ if (batchMsg == NULL) {
+ return NULL_POINTER;
+ }
+ delete (vector<MQMessage>*)batchMsg;
+ return OK;
+}
+
+#ifdef __cplusplus
+};
+#endif
diff --git a/src/extern/CProducer.cpp b/src/extern/CProducer.cpp
index 59ee184..265116e 100644
--- a/src/extern/CProducer.cpp
+++ b/src/extern/CProducer.cpp
@@ -17,6 +17,7 @@
#include "DefaultMQProducer.h"
#include "AsyncCallback.h"
+#include "CBatchMessage.h"
#include "CProducer.h"
#include "CCommon.h"
#include "CSendResult.h"
@@ -156,6 +157,41 @@
return OK;
}
+int SendBatchMessage(CProducer* producer, CBatchMessage* batcMsg, CSendResult* result) {
+ // CSendResult sendResult;
+ if (producer == NULL || batcMsg == NULL || result == NULL) {
+ return NULL_POINTER;
+ }
+ try {
+ DefaultMQProducer* defaultMQProducer = (DefaultMQProducer*)producer;
+ vector<MQMessage>* message = (vector<MQMessage>*)batcMsg;
+ SendResult sendResult = defaultMQProducer->send(*message);
+ switch (sendResult.getSendStatus()) {
+ case SEND_OK:
+ result->sendStatus = E_SEND_OK;
+ break;
+ case SEND_FLUSH_DISK_TIMEOUT:
+ result->sendStatus = E_SEND_FLUSH_DISK_TIMEOUT;
+ break;
+ case SEND_FLUSH_SLAVE_TIMEOUT:
+ result->sendStatus = E_SEND_FLUSH_SLAVE_TIMEOUT;
+ break;
+ case SEND_SLAVE_NOT_AVAILABLE:
+ result->sendStatus = E_SEND_SLAVE_NOT_AVAILABLE;
+ break;
+ default:
+ result->sendStatus = E_SEND_OK;
+ break;
+ }
+ result->offset = sendResult.getQueueOffset();
+ strncpy(result->msgId, sendResult.getMsgId().c_str(), MAX_MESSAGE_ID_LENGTH - 1);
+ result->msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
+ } catch (exception& e) {
+ return PRODUCER_SEND_SYNC_FAILED;
+ }
+ return OK;
+}
+
int SendMessageAsync(CProducer* producer,
CMessage* msg,
CSendSuccessCallback cSendSuccessCallback,
@@ -346,4 +382,4 @@
}
#ifdef __cplusplus
};
-#endif
+#endif
\ No newline at end of file
diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp
index 4e4b0bb..95ef166 100644
--- a/src/protocol/CommandHeader.cpp
+++ b/src/protocol/CommandHeader.cpp
@@ -73,7 +73,7 @@
outData["properties"] = properties;
outData["reconsumeTimes"] = UtilAll::to_string(reconsumeTimes);
outData["unitMode"] = UtilAll::to_string(unitMode);
- // outData["batch"] = batch;
+ outData["batch"] = UtilAll::to_string(batch);
}
int SendMessageRequestHeader::getReconsumeTimes() {
@@ -106,7 +106,7 @@
requestMap.insert(pair<string, string>("properties", properties));
requestMap.insert(pair<string, string>("reconsumeTimes", UtilAll::to_string(reconsumeTimes)));
requestMap.insert(pair<string, string>("unitMode", UtilAll::to_string(unitMode)));
- // requestMap.insert(pair<string, string>("batch", UtilAll::to_string(batch)));
+ requestMap.insert(pair<string, string>("batch", UtilAll::to_string(batch)));
}
//<!************************************************************************
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
index 5353b05..c114ecd 100755
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -656,21 +656,5 @@
m_asyncTimerTable.clear();
}
-void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
- // delete the map record of opaque<->ResponseFuture, so the answer for the pull request will
- // discard when receive it later
- std::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
- if (!pFuture) {
- pFuture = findAndDeleteResponseFuture(opaque);
- if (pFuture) {
- LOG_DEBUG("succ deleted the sync pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
- }
- } else {
- LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
- // delete the timeout timer for opaque for pullrequest
- cancelTimerCallback(opaque);
- }
-}
-
//<!************************************************************************
} // namespace rocketmq
diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h
index dfacd76..ad73cd2 100755
--- a/src/transport/TcpRemotingClient.h
+++ b/src/transport/TcpRemotingClient.h
@@ -58,8 +58,6 @@
void registerProcessor(MQRequestCode requestCode, ClientRemotingProcessor* clientRemotingProcessor);
- void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
-
private:
static void static_messageReceived(void* context, const MemoryBlock& mem, const string& addr);