blob: d7368704bdfd32dd925ca6b4eb5c895579617ff2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "CommunicationMode.h"
#include "MQClientAPIImpl.h"
#include "MQClientException.h"
using namespace std;
using namespace rocketmq;
using rocketmq::CommunicationMode;
using rocketmq::RemotingCommand;
using rocketmq::TcpRemotingClient;
using testing::_;
using ::testing::InitGoogleMock;
using ::testing::InitGoogleTest;
using testing::Mock;
using testing::Return;
class MockTcpRemotingClient : public TcpRemotingClient {
public:
MockTcpRemotingClient() : TcpRemotingClient(true, DEFAULT_SSL_PROPERTY_FILE) {}
MOCK_METHOD3(invokeSync, RemotingCommand*(const string&, RemotingCommand&, int));
MOCK_METHOD6(invokeAsync, bool(const string&, RemotingCommand&, std::shared_ptr<AsyncCallbackWrap>, int64, int, int));
};
class MockMQClientAPIImpl : public MQClientAPIImpl {
public:
MockMQClientAPIImpl(const string& mqClientId,
ClientRemotingProcessor* clientRemotingProcessor,
int pullThreadNum,
uint64_t tcpConnectTimeout,
uint64_t tcpTransportTryLockTimeout,
string unitName)
: MQClientAPIImpl(mqClientId, true, DEFAULT_SSL_PROPERTY_FILE) {}
void reInitRemoteClient(TcpRemotingClient* client) { m_pRemotingClient.reset(client); }
};
TEST(MQClientAPIImplTest, getMaxOffset) {
SessionCredentials sc;
MockMQClientAPIImpl* impl = new MockMQClientAPIImpl("testMockAPIImpl", nullptr, 1, 2, 3, "testUnit");
Mock::AllowLeak(impl);
MockTcpRemotingClient* pClient = new MockTcpRemotingClient();
Mock::AllowLeak(pClient);
impl->reInitRemoteClient(pClient);
GetMaxOffsetResponseHeader* pHead = new GetMaxOffsetResponseHeader();
pHead->offset = 4096;
RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr);
RemotingCommand* pCommandSuccuss = new RemotingCommand(SUCCESS_VALUE, pHead);
EXPECT_CALL(*pClient, invokeSync(_, _, _))
.Times(3)
.WillOnce(Return(nullptr))
.WillOnce(Return(pCommandFailed))
.WillOnce(Return(pCommandSuccuss));
EXPECT_ANY_THROW(impl->getMaxOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc));
EXPECT_ANY_THROW(impl->getMaxOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc));
int64 offset = impl->getMaxOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc);
EXPECT_EQ(4096, offset);
}
TEST(MQClientAPIImplTest, getMinOffset) {
SessionCredentials sc;
MockMQClientAPIImpl* impl = new MockMQClientAPIImpl("testMockAPIImpl", nullptr, 1, 2, 3, "testUnit");
Mock::AllowLeak(impl);
MockTcpRemotingClient* pClient = new MockTcpRemotingClient();
Mock::AllowLeak(pClient);
impl->reInitRemoteClient(pClient);
GetMinOffsetResponseHeader* pHead = new GetMinOffsetResponseHeader();
pHead->offset = 2048;
RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr);
RemotingCommand* pCommandSuccuss = new RemotingCommand(SUCCESS_VALUE, pHead);
EXPECT_CALL(*pClient, invokeSync(_, _, _))
.Times(3)
.WillOnce(Return(nullptr))
.WillOnce(Return(pCommandFailed))
.WillOnce(Return(pCommandSuccuss));
EXPECT_ANY_THROW(impl->getMinOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc));
EXPECT_ANY_THROW(impl->getMinOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc));
int64 offset = impl->getMinOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc);
EXPECT_EQ(2048, offset);
}
class MyMockAutoDeleteSendCallback : public AutoDeleteSendCallBack {
public:
virtual ~MyMockAutoDeleteSendCallback() {}
virtual void onSuccess(SendResult& sendResult) {
std::cout << "send Success" << std::endl;
return;
}
virtual void onException(MQException& e) {
std::cout << "send Exception" << e << std::endl;
return;
}
};
TEST(MQClientAPIImplTest, sendMessage) {
string cid = "testClientId";
SessionCredentials sc;
MockMQClientAPIImpl* impl = new MockMQClientAPIImpl("testMockAPIImpl", nullptr, 1, 2, 3, "testUnit");
Mock::AllowLeak(impl);
MockTcpRemotingClient* pClient = new MockTcpRemotingClient();
Mock::AllowLeak(pClient);
impl->reInitRemoteClient(pClient);
SendMessageResponseHeader* pHead = new SendMessageResponseHeader();
pHead->msgId = "MessageID";
pHead->queueId = 1;
pHead->queueOffset = 409600;
RemotingCommand* pCommandSync = new RemotingCommand(SUCCESS_VALUE, pHead);
EXPECT_CALL(*pClient, invokeSync(_, _, _)).Times(1).WillOnce(Return(pCommandSync));
MQMessage message("testTopic", "Hello, RocketMQ");
string unique_msgId = "UniqMessageID";
message.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, unique_msgId);
SendMessageRequestHeader* requestHeader = new SendMessageRequestHeader();
requestHeader->producerGroup = cid;
requestHeader->topic = (message.getTopic());
requestHeader->defaultTopic = DEFAULT_TOPIC;
requestHeader->defaultTopicQueueNums = 4;
requestHeader->bornTimestamp = UtilAll::currentTimeMillis();
SendResult result =
impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader, 100, 1, ComMode_SYNC, nullptr, sc);
EXPECT_EQ(result.getSendStatus(), SEND_OK);
EXPECT_EQ(result.getMsgId(), unique_msgId);
EXPECT_EQ(result.getQueueOffset(), 409600);
EXPECT_EQ(result.getOffsetMsgId(), "MessageID");
EXPECT_EQ(result.getMessageQueue().getBrokerName(), "testBroker");
EXPECT_EQ(result.getMessageQueue().getTopic(), "testTopic");
// Try to test Async send
EXPECT_CALL(*pClient, invokeAsync(_, _, _, _, _, _))
.Times(7)
.WillOnce(Return(false))
.WillOnce(Return(true))
.WillOnce(Return(false))
.WillOnce(Return(true))
.WillOnce(Return(false))
.WillOnce(Return(false))
.WillOnce(Return(false));
SendMessageRequestHeader* requestHeader2 = new SendMessageRequestHeader();
requestHeader2->producerGroup = cid;
requestHeader2->topic = (message.getTopic());
requestHeader2->defaultTopic = DEFAULT_TOPIC;
requestHeader2->defaultTopicQueueNums = 4;
requestHeader2->bornTimestamp = UtilAll::currentTimeMillis();
EXPECT_ANY_THROW(
impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader2, 100, 1, ComMode_ASYNC, nullptr, sc));
SendMessageRequestHeader* requestHeader3 = new SendMessageRequestHeader();
requestHeader3->producerGroup = cid;
requestHeader3->topic = (message.getTopic());
requestHeader3->defaultTopic = DEFAULT_TOPIC;
requestHeader3->defaultTopicQueueNums = 4;
requestHeader3->bornTimestamp = UtilAll::currentTimeMillis();
SendCallback* pSendCallback = new MyMockAutoDeleteSendCallback();
EXPECT_NO_THROW(impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader3, 100, 1, ComMode_ASYNC,
pSendCallback, sc));
SendMessageRequestHeader* requestHeader4 = new SendMessageRequestHeader();
requestHeader4->producerGroup = cid;
requestHeader4->topic = (message.getTopic());
requestHeader4->defaultTopic = DEFAULT_TOPIC;
requestHeader4->defaultTopicQueueNums = 4;
requestHeader4->bornTimestamp = UtilAll::currentTimeMillis();
SendCallback* pSendCallback2 = new MyMockAutoDeleteSendCallback();
EXPECT_NO_THROW(impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader4, 1000, 2, ComMode_ASYNC,
pSendCallback2, sc));
SendMessageRequestHeader* requestHeader5 = new SendMessageRequestHeader();
requestHeader5->producerGroup = cid;
requestHeader5->topic = (message.getTopic());
requestHeader5->defaultTopic = DEFAULT_TOPIC;
requestHeader5->defaultTopicQueueNums = 4;
requestHeader5->bornTimestamp = UtilAll::currentTimeMillis();
SendCallback* pSendCallback3 = new MyMockAutoDeleteSendCallback();
EXPECT_NO_THROW(impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader5, 1000, 3, ComMode_ASYNC,
pSendCallback3, sc));
}
TEST(MQClientAPIImplTest, consumerSendMessageBack) {
SessionCredentials sc;
MQMessageExt msg;
MockMQClientAPIImpl* impl = new MockMQClientAPIImpl("testMockAPIImpl", nullptr, 1, 2, 3, "testUnit");
Mock::AllowLeak(impl);
MockTcpRemotingClient* pClient = new MockTcpRemotingClient();
Mock::AllowLeak(pClient);
impl->reInitRemoteClient(pClient);
RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr);
RemotingCommand* pCommandSuccuss = new RemotingCommand(SUCCESS_VALUE, nullptr);
EXPECT_CALL(*pClient, invokeSync(_, _, _))
.Times(3)
.WillOnce(Return(nullptr))
.WillOnce(Return(pCommandFailed))
.WillOnce(Return(pCommandSuccuss));
EXPECT_ANY_THROW(impl->consumerSendMessageBack("127.0.0.0:10911", msg, "testGroup", 0, 1000, 16, sc));
EXPECT_ANY_THROW(impl->consumerSendMessageBack("127.0.0.0:10911", msg, "testGroup", 0, 1000, 16, sc));
EXPECT_NO_THROW(impl->consumerSendMessageBack("127.0.0.0:10911", msg, "testGroup", 0, 1000, 16, sc));
}
int main(int argc, char* argv[]) {
InitGoogleMock(&argc, argv);
testing::GTEST_FLAG(filter) = "MQClientAPIImplTest.*";
return RUN_ALL_TESTS();
}