blob: 716150d1cd195460ef5e3b0cbde48fe755e06bf4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <memory>
#include "map"
#include "string.h"
#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include "json/value.h"
#include "json/writer.h"
#include "ClientRPCHook.h"
#include "ClientRemotingProcessor.h"
#include "ConsumerRunningInfo.h"
#include "MQClientFactory.h"
#include "MQMessageQueue.h"
#include "MQProtos.h"
#include "RemotingCommand.h"
#include "SessionCredentials.h"
#include "UtilAll.h"
#include "dataBlock.h"
using std::map;
using std::string;
using ::testing::_;
using ::testing::InitGoogleMock;
using ::testing::InitGoogleTest;
using testing::Mock;
using testing::Return;
using testing::SetArgReferee;
using Json::FastWriter;
using Json::Value;
using rocketmq::ClientRemotingProcessor;
using rocketmq::ClientRPCHook;
using rocketmq::ConsumerRunningInfo;
using rocketmq::GetConsumerRunningInfoRequestHeader;
using rocketmq::MemoryBlock;
using rocketmq::MQClientFactory;
using rocketmq::MQMessageQueue;
using rocketmq::MQRequestCode;
using rocketmq::MQResponseCode;
using rocketmq::NotifyConsumerIdsChangedRequestHeader;
using rocketmq::RemotingCommand;
using rocketmq::ResetOffsetBody;
using rocketmq::ResetOffsetRequestHeader;
using rocketmq::SessionCredentials;
using rocketmq::UtilAll;
class MockClientRemotingProcessor : public ClientRemotingProcessor {
public:
MockClientRemotingProcessor(MQClientFactory* factrory) : ClientRemotingProcessor(factrory) {}
MOCK_METHOD1(resetOffset, RemotingCommand*(RemotingCommand* request));
MOCK_METHOD1(getConsumerRunningInfo, RemotingCommand*(RemotingCommand* request));
MOCK_METHOD1(notifyConsumerIdsChanged, RemotingCommand*(RemotingCommand* request));
};
class MockMQClientFactory : public MQClientFactory {
public:
MockMQClientFactory(const string& clientID,
int pullThreadNum,
uint64_t tcpConnectTimeout,
uint64_t tcpTransportTryLockTimeout,
string unitName)
: MQClientFactory(clientID,
pullThreadNum,
tcpConnectTimeout,
tcpTransportTryLockTimeout,
unitName,
true,
rocketmq::DEFAULT_SSL_PROPERTY_FILE) {}
MOCK_METHOD3(resetOffset,
void(const string& group, const string& topic, const map<MQMessageQueue, int64>& offsetTable));
MOCK_METHOD1(consumerRunningInfo, ConsumerRunningInfo*(const string& consumerGroup));
MOCK_METHOD2(getSessionCredentialFromConsumer,
bool(const string& consumerGroup, SessionCredentials& sessionCredentials));
MOCK_METHOD1(doRebalanceByConsumerGroup, void(const string& consumerGroup));
};
TEST(clientRemotingProcessor, processRequest) {
MockMQClientFactory* factory = new MockMQClientFactory("testClientId", 4, 3000, 4000, "a");
ClientRemotingProcessor clientRemotingProcessor(factory);
string addr = "127.0.0.1:9876";
RemotingCommand* command = new RemotingCommand();
RemotingCommand* pResponse = new RemotingCommand(13);
pResponse->setCode(MQRequestCode::RESET_CONSUMER_CLIENT_OFFSET);
command->setCode(MQRequestCode::RESET_CONSUMER_CLIENT_OFFSET);
EXPECT_TRUE(clientRemotingProcessor.processRequest(addr, command) == nullptr);
EXPECT_EQ(nullptr, clientRemotingProcessor.processRequest(addr, command));
NotifyConsumerIdsChangedRequestHeader* header = new NotifyConsumerIdsChangedRequestHeader();
header->setGroup("testGroup");
RemotingCommand* twoCommand = new RemotingCommand(MQRequestCode::NOTIFY_CONSUMER_IDS_CHANGED, header);
EXPECT_EQ(NULL, clientRemotingProcessor.processRequest(addr, twoCommand));
command->setCode(MQRequestCode::GET_CONSUMER_RUNNING_INFO);
// EXPECT_EQ(NULL , clientRemotingProcessor.processRequest(addr, command));
command->setCode(MQRequestCode::CHECK_TRANSACTION_STATE);
EXPECT_TRUE(clientRemotingProcessor.processRequest(addr, command) == nullptr);
command->setCode(MQRequestCode::GET_CONSUMER_STATUS_FROM_CLIENT);
EXPECT_TRUE(clientRemotingProcessor.processRequest(addr, command) == nullptr);
command->setCode(MQRequestCode::CONSUME_MESSAGE_DIRECTLY);
EXPECT_TRUE(clientRemotingProcessor.processRequest(addr, command) == nullptr);
command->setCode(1);
EXPECT_TRUE(clientRemotingProcessor.processRequest(addr, command) == nullptr);
delete twoCommand;
delete command;
delete pResponse;
}
TEST(clientRemotingProcessor, resetOffset) {
MockMQClientFactory* factory = new MockMQClientFactory("testClientId", 4, 3000, 4000, "a");
Mock::AllowLeak(factory);
ClientRemotingProcessor clientRemotingProcessor(factory);
Value root;
Value messageQueues;
Value messageQueue;
messageQueue["brokerName"] = "testBroker";
messageQueue["queueId"] = 4;
messageQueue["topic"] = "testTopic";
messageQueue["offset"] = 1024;
messageQueues.append(messageQueue);
root["offsetTable"] = messageQueues;
FastWriter wrtier;
string strData = wrtier.write(root);
ResetOffsetRequestHeader* header = new ResetOffsetRequestHeader();
RemotingCommand* request = new RemotingCommand(13, header);
EXPECT_CALL(*factory, resetOffset(_, _, _)).Times(1);
clientRemotingProcessor.resetOffset(request);
request->SetBody(strData.c_str(), strData.size() - 2);
clientRemotingProcessor.resetOffset(request);
request->SetBody(strData.c_str(), strData.size());
clientRemotingProcessor.resetOffset(request);
// here header no need delete, it will managered by RemotingCommand
// delete header;
delete request;
}
TEST(clientRemotingProcessor, getConsumerRunningInfoFailed) {
MockMQClientFactory* factory = new MockMQClientFactory("testClientId", 4, 3000, 4000, "a");
Mock::AllowLeak(factory);
ConsumerRunningInfo* info = new ConsumerRunningInfo();
EXPECT_CALL(*factory, consumerRunningInfo(_)).Times(2).WillOnce(Return(info)).WillOnce(Return(info));
EXPECT_CALL(*factory, getSessionCredentialFromConsumer(_, _))
.Times(2); //.WillRepeatedly(SetArgReferee<1>(sessionCredentials));
ClientRemotingProcessor clientRemotingProcessor(factory);
GetConsumerRunningInfoRequestHeader* header = new GetConsumerRunningInfoRequestHeader();
header->setConsumerGroup("testGroup");
header->setClientId("testClientId");
header->setJstackEnable(false);
RemotingCommand* request = new RemotingCommand(14, header);
RemotingCommand* command = clientRemotingProcessor.getConsumerRunningInfo("127.0.0.1:9876", request);
EXPECT_EQ(command->getCode(), MQResponseCode::SYSTEM_ERROR);
EXPECT_EQ(command->getRemark(), "The Consumer Group not exist in this consumer");
delete command;
delete request;
}
TEST(clientRemotingProcessor, notifyConsumerIdsChanged) {
MockMQClientFactory* factory = new MockMQClientFactory("testClientId", 4, 3000, 4000, "a");
Mock::AllowLeak(factory);
ClientRemotingProcessor clientRemotingProcessor(factory);
NotifyConsumerIdsChangedRequestHeader* header = new NotifyConsumerIdsChangedRequestHeader();
header->setGroup("testGroup");
RemotingCommand* request = new RemotingCommand(14, header);
EXPECT_CALL(*factory, doRebalanceByConsumerGroup(_)).Times(1);
clientRemotingProcessor.notifyConsumerIdsChanged(request);
delete request;
}
TEST(clientRemotingProcessor, resetOffsetBody) {
MockMQClientFactory* factory = new MockMQClientFactory("testClientId", 4, 3000, 4000, "a");
ClientRemotingProcessor clientRemotingProcessor(factory);
Value root;
Value messageQueues;
Value messageQueue;
messageQueue["brokerName"] = "testBroker";
messageQueue["queueId"] = 4;
messageQueue["topic"] = "testTopic";
messageQueue["offset"] = 1024;
messageQueues.append(messageQueue);
root["offsetTable"] = messageQueues;
FastWriter wrtier;
string strData = wrtier.write(root);
MemoryBlock* mem = new MemoryBlock(strData.c_str(), strData.size());
ResetOffsetBody* resetOffset = ResetOffsetBody::Decode(mem);
map<MQMessageQueue, int64> map = resetOffset->getOffsetTable();
MQMessageQueue mqmq("testTopic", "testBroker", 4);
EXPECT_EQ(map[mqmq], 1024);
Mock::AllowLeak(factory);
delete resetOffset;
delete mem;
}
int main(int argc, char* argv[]) {
InitGoogleMock(&argc, argv);
testing::GTEST_FLAG(throw_on_failure) = true;
testing::GTEST_FLAG(filter) = "clientRemotingProcessor.*";
int itestts = RUN_ALL_TESTS();
return itestts;
}