blob: 054f2663acf015a9c054871d88f658477a992567 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <brpc/channel.h>
#include <brpc/server.h>
#include <brpc/stream.h>
#include <butil/logging.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <gflags/gflags.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <olap/storage_engine.h>
#include <service/internal_service.h>
#include <unistd.h>
#include <functional>
#include <memory>
#include "common/config.h"
#include "common/status.h"
#include "exec/tablet_info.h"
#include "gen_cpp/BackendService_types.h"
#include "gen_cpp/FrontendService_types.h"
#include "gtest/gtest_pred_impl.h"
#include "io/fs/local_file_system.h"
#include "olap/olap_define.h"
#include "olap/options.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/tablet_manager.h"
#include "olap/txn_manager.h"
#include "runtime/descriptor_helper.h"
#include "runtime/exec_env.h"
#include "runtime/load_stream_mgr.h"
#include "util/debug/leakcheck_disabler.h"
#include "util/runtime_profile.h"
using namespace brpc;
namespace doris {
static const uint32_t MAX_PATH_LEN = 1024;
static StorageEngine* engine_ref = nullptr;
static const std::string zTestDir = "./data_test/data/load_stream_mgr_test";
const int64_t NORMAL_TABLET_ID = 10000;
const int64_t ABNORMAL_TABLET_ID = 40000;
const int64_t NORMAL_INDEX_ID = 50000;
const int64_t ABNORMAL_INDEX_ID = 60000;
const int64_t NORMAL_PARTITION_ID = 50000;
const int64_t SCHEMA_HASH = 90000;
const uint32_t NORMAL_SENDER_ID = 0;
const uint32_t ABNORMAL_SENDER_ID = 10000;
const int64_t NORMAL_TXN_ID = 600001;
const UniqueId NORMAL_LOAD_ID(1, 1);
const UniqueId ABNORMAL_LOAD_ID(1, 0);
std::string NORMAL_STRING("normal");
std::string ABNORMAL_STRING("abnormal");
void construct_schema(OlapTableSchemaParam* schema) {
// construct schema
TOlapTableSchemaParam tschema;
tschema.db_id = 1;
tschema.table_id = 2;
tschema.version = 0;
// descriptor
{
TDescriptorTableBuilder dtb;
{
TTupleDescriptorBuilder tuple_builder;
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_INT)
.column_name("c1")
.column_pos(1)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_BIGINT)
.column_name("c2")
.column_pos(2)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.string_type(10)
.column_name("c3")
.column_pos(3)
.build());
tuple_builder.build(&dtb);
}
{
TTupleDescriptorBuilder tuple_builder;
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_INT)
.column_name("c1")
.column_pos(1)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.type(TYPE_BIGINT)
.column_name("c2")
.column_pos(2)
.build());
tuple_builder.add_slot(TSlotDescriptorBuilder()
.string_type(20)
.column_name("c3")
.column_pos(3)
.build());
tuple_builder.build(&dtb);
}
auto desc_tbl = dtb.desc_tbl();
tschema.slot_descs = desc_tbl.slotDescriptors;
tschema.tuple_desc = desc_tbl.tupleDescriptors[0];
}
// index
tschema.indexes.resize(2);
tschema.indexes[0].id = NORMAL_INDEX_ID;
tschema.indexes[0].columns = {"c1", "c2", "c3"};
tschema.indexes[1].id = NORMAL_INDEX_ID + 1;
tschema.indexes[1].columns = {"c1", "c2", "c3"};
static_cast<void>(schema->init(tschema));
}
// copied from delta_writer_test.cpp
static void create_tablet_request(int64_t tablet_id, int32_t schema_hash,
TCreateTabletReq* request) {
request->tablet_id = tablet_id;
request->partition_id = 30001;
request->__set_version(1);
request->tablet_schema.schema_hash = schema_hash;
request->tablet_schema.short_key_column_count = 6;
request->tablet_schema.keys_type = TKeysType::AGG_KEYS;
request->tablet_schema.storage_type = TStorageType::COLUMN;
request->__set_storage_format(TStorageFormat::V2);
TColumn k1;
k1.__set_is_key(true);
k1.column_type.type = TPrimitiveType::TINYINT;
request->tablet_schema.columns.push_back(k1);
TColumn k2;
k2.column_name = "k2";
k2.__set_is_key(true);
k2.column_type.type = TPrimitiveType::SMALLINT;
request->tablet_schema.columns.push_back(k2);
TColumn k3;
k3.column_name = "k3";
k3.__set_is_key(true);
k3.column_type.type = TPrimitiveType::INT;
request->tablet_schema.columns.push_back(k3);
TColumn k4;
k4.column_name = "k4";
k4.__set_is_key(true);
k4.column_type.type = TPrimitiveType::BIGINT;
request->tablet_schema.columns.push_back(k4);
TColumn k5;
k5.column_name = "k5";
k5.__set_is_key(true);
k5.column_type.type = TPrimitiveType::LARGEINT;
request->tablet_schema.columns.push_back(k5);
TColumn k6;
k6.column_name = "k6";
k6.__set_is_key(true);
k6.column_type.type = TPrimitiveType::DATE;
request->tablet_schema.columns.push_back(k6);
TColumn k7;
k7.column_name = "k7";
k7.__set_is_key(true);
k7.column_type.type = TPrimitiveType::DATETIME;
request->tablet_schema.columns.push_back(k7);
TColumn k8;
k8.column_name = "k8";
k8.__set_is_key(true);
k8.column_type.type = TPrimitiveType::CHAR;
k8.column_type.__set_len(4);
request->tablet_schema.columns.push_back(k8);
TColumn k9;
k9.column_name = "k9";
k9.__set_is_key(true);
k9.column_type.type = TPrimitiveType::VARCHAR;
k9.column_type.__set_len(65);
request->tablet_schema.columns.push_back(k9);
TColumn k10;
k10.column_name = "k10";
k10.__set_is_key(true);
k10.column_type.type = TPrimitiveType::DECIMALV2;
k10.column_type.__set_precision(6);
k10.column_type.__set_scale(3);
request->tablet_schema.columns.push_back(k10);
TColumn k11;
k11.column_name = "k11";
k11.__set_is_key(true);
k11.column_type.type = TPrimitiveType::DATEV2;
request->tablet_schema.columns.push_back(k11);
TColumn v1;
v1.column_name = "v1";
v1.__set_is_key(false);
v1.column_type.type = TPrimitiveType::TINYINT;
v1.__set_aggregation_type(TAggregationType::SUM);
request->tablet_schema.columns.push_back(v1);
TColumn v2;
v2.column_name = "v2";
v2.__set_is_key(false);
v2.column_type.type = TPrimitiveType::SMALLINT;
v2.__set_aggregation_type(TAggregationType::SUM);
request->tablet_schema.columns.push_back(v2);
TColumn v3;
v3.column_name = "v3";
v3.__set_is_key(false);
v3.column_type.type = TPrimitiveType::INT;
v3.__set_aggregation_type(TAggregationType::SUM);
request->tablet_schema.columns.push_back(v3);
TColumn v4;
v4.column_name = "v4";
v4.__set_is_key(false);
v4.column_type.type = TPrimitiveType::BIGINT;
v4.__set_aggregation_type(TAggregationType::SUM);
request->tablet_schema.columns.push_back(v4);
TColumn v5;
v5.column_name = "v5";
v5.__set_is_key(false);
v5.column_type.type = TPrimitiveType::LARGEINT;
v5.__set_aggregation_type(TAggregationType::SUM);
request->tablet_schema.columns.push_back(v5);
TColumn v6;
v6.column_name = "v6";
v6.__set_is_key(false);
v6.column_type.type = TPrimitiveType::DATE;
v6.__set_aggregation_type(TAggregationType::REPLACE);
request->tablet_schema.columns.push_back(v6);
TColumn v7;
v7.column_name = "v7";
v7.__set_is_key(false);
v7.column_type.type = TPrimitiveType::DATETIME;
v7.__set_aggregation_type(TAggregationType::REPLACE);
request->tablet_schema.columns.push_back(v7);
TColumn v8;
v8.column_name = "v8";
v8.__set_is_key(false);
v8.column_type.type = TPrimitiveType::CHAR;
v8.column_type.__set_len(4);
v8.__set_aggregation_type(TAggregationType::REPLACE);
request->tablet_schema.columns.push_back(v8);
TColumn v9;
v9.column_name = "v9";
v9.__set_is_key(false);
v9.column_type.type = TPrimitiveType::VARCHAR;
v9.column_type.__set_len(65);
v9.__set_aggregation_type(TAggregationType::REPLACE);
request->tablet_schema.columns.push_back(v9);
TColumn v10;
v10.column_name = "v10";
v10.__set_is_key(false);
v10.column_type.type = TPrimitiveType::DECIMALV2;
v10.column_type.__set_precision(6);
v10.column_type.__set_scale(3);
v10.__set_aggregation_type(TAggregationType::SUM);
request->tablet_schema.columns.push_back(v10);
TColumn v11;
v11.column_name = "v11";
v11.__set_is_key(false);
v11.column_type.type = TPrimitiveType::DATEV2;
v11.__set_aggregation_type(TAggregationType::REPLACE);
request->tablet_schema.columns.push_back(v11);
}
struct ResponseStat {
std::atomic<int32_t> num;
std::vector<int64_t> success_tablet_ids;
std::vector<int64_t> failed_tablet_ids;
};
bthread::Mutex g_stat_lock;
static ResponseStat g_response_stat;
void reset_response_stat() {
std::lock_guard lock_guard(g_stat_lock);
g_response_stat.num = 0;
g_response_stat.success_tablet_ids.clear();
g_response_stat.failed_tablet_ids.clear();
}
class LoadStreamMgrTest : public testing::Test {
public:
class Handler : public brpc::StreamInputHandler {
public:
int on_received_messages(StreamId id, butil::IOBuf* const messages[],
size_t size) override {
for (size_t i = 0; i < size; i++) {
PLoadStreamResponse response;
butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]);
response.ParseFromZeroCopyStream(&wrapper);
LOG(INFO) << "response " << response.DebugString();
std::lock_guard lock_guard(g_stat_lock);
for (auto& id : response.success_tablet_ids()) {
g_response_stat.success_tablet_ids.push_back(id);
}
for (auto& tablet : response.failed_tablets()) {
g_response_stat.failed_tablet_ids.push_back(tablet.id());
}
g_response_stat.num++;
}
return 0;
}
void on_idle_timeout(StreamId id) override { std::cerr << "on_idle_timeout" << std::endl; }
void on_closed(StreamId id) override { std::cerr << "on_closed" << std::endl; }
};
class StreamService : public PBackendService {
public:
StreamService(LoadStreamMgr* load_stream_mgr)
: _sd(brpc::INVALID_STREAM_ID), _load_stream_mgr(load_stream_mgr) {}
virtual ~StreamService() { brpc::StreamClose(_sd); };
virtual void open_load_stream(google::protobuf::RpcController* controller,
const POpenLoadStreamRequest* request,
POpenLoadStreamResponse* response,
google::protobuf::Closure* done) {
brpc::ClosureGuard done_guard(done);
std::unique_ptr<PStatus> status = std::make_unique<PStatus>();
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
brpc::StreamOptions stream_options;
for (const auto& req : request->tablets()) {
TabletManager* tablet_mgr = engine_ref->tablet_manager();
TabletSharedPtr tablet = tablet_mgr->get_tablet(req.tablet_id());
if (tablet == nullptr) {
cntl->SetFailed("Tablet not found");
status->set_status_code(TStatusCode::NOT_FOUND);
response->set_allocated_status(status.get());
static_cast<void>(response->release_status());
return;
}
auto resp = response->add_tablet_schemas();
resp->set_index_id(req.index_id());
resp->set_enable_unique_key_merge_on_write(
tablet->enable_unique_key_merge_on_write());
tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema());
}
LoadStream* load_stream;
LOG(INFO) << "total streams: " << request->total_streams();
EXPECT_GT(request->total_streams(), 0);
auto st = _load_stream_mgr->open_load_stream(request, load_stream);
stream_options.handler = load_stream;
StreamId streamid;
if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) {
cntl->SetFailed("Fail to accept stream");
status->set_status_code(TStatusCode::CANCELLED);
response->set_allocated_status(status.get());
static_cast<void>(response->release_status());
return;
}
status->set_status_code(TStatusCode::OK);
response->set_allocated_status(status.get());
static_cast<void>(response->release_status());
}
private:
Handler _receiver;
brpc::StreamId _sd;
LoadStreamMgr* _load_stream_mgr = nullptr;
};
class MockSinkClient {
public:
MockSinkClient() = default;
~MockSinkClient() { disconnect(); }
class MockClosure : public google::protobuf::Closure {
public:
MockClosure(std::function<void()> cb) : _cb(cb) {}
void Run() override {
_cb();
delete this;
}
private:
std::function<void()> _cb;
};
Status connect_stream(int64_t sender_id = NORMAL_SENDER_ID, int total_streams = 1) {
brpc::Channel channel;
std::cerr << "connect_stream" << std::endl;
// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.protocol = brpc::PROTOCOL_BAIDU_STD;
options.connection_type = "single";
options.timeout_ms = 10000 /*milliseconds*/;
options.max_retry = 3;
CHECK_EQ(0, channel.Init("127.0.0.1:18947", nullptr));
// Normally, you should not call a Channel directly, but instead construct
// a stub Service wrapping it. stub can be shared by all threads as well.
PBackendService_Stub stub(&channel);
_stream_options.handler = &_handler;
if (brpc::StreamCreate(&_stream, _cntl, &_stream_options) != 0) {
LOG(ERROR) << "Fail to create stream";
return Status::InternalError("Fail to create stream");
}
POpenLoadStreamRequest request;
POpenLoadStreamResponse response;
PUniqueId id;
id.set_hi(1);
id.set_lo(1);
auto param = std::make_shared<OlapTableSchemaParam>();
construct_schema(param.get());
*request.mutable_schema() = *param->to_protobuf();
*request.mutable_load_id() = id;
request.set_txn_id(NORMAL_TXN_ID);
request.set_src_id(sender_id);
request.set_total_streams(total_streams);
auto ptablet = request.add_tablets();
ptablet->set_tablet_id(NORMAL_TABLET_ID);
ptablet->set_index_id(NORMAL_INDEX_ID);
stub.open_load_stream(&_cntl, &request, &response, nullptr);
if (_cntl.Failed()) {
std::cerr << "open_load_stream failed" << std::endl;
LOG(ERROR) << "Fail to open load stream " << _cntl.ErrorText();
return Status::InternalError("Fail to open load stream");
}
return Status::OK();
}
void disconnect() const {
std::cerr << "disconnect" << std::endl;
CHECK_EQ(0, brpc::StreamClose(_stream));
}
Status send(butil::IOBuf* buf) {
int ret = brpc::StreamWrite(_stream, *buf);
if (ret != 0) {
LOG(ERROR) << "Fail to write stream";
return Status::InternalError("Fail to write stream");
}
LOG(INFO) << "sent by stream successfully" << std::endl;
return Status::OK();
}
Status close() { return Status::OK(); }
private:
brpc::StreamId _stream;
brpc::Controller _cntl;
brpc::StreamOptions _stream_options;
Handler _handler;
};
LoadStreamMgrTest()
: _heavy_work_pool(4, 32, "load_stream_test_heavy"),
_light_work_pool(4, 32, "load_stream_test_light") {}
void close_load(MockSinkClient& client, const std::vector<PTabletID>& tablets_to_commit = {},
uint32_t sender_id = NORMAL_SENDER_ID) {
butil::IOBuf append_buf;
PStreamHeader header;
header.mutable_load_id()->set_hi(1);
header.mutable_load_id()->set_lo(1);
header.set_opcode(PStreamHeader::CLOSE_LOAD);
header.set_src_id(sender_id);
for (const auto& tablet : tablets_to_commit) {
*header.add_tablets() = tablet;
}
size_t hdr_len = header.ByteSizeLong();
append_buf.append((char*)&hdr_len, sizeof(size_t));
append_buf.append(header.SerializeAsString());
static_cast<void>(client.send(&append_buf));
}
void write_one_tablet(MockSinkClient& client, UniqueId load_id, uint32_t sender_id,
int64_t index_id, int64_t tablet_id, uint32_t segid, uint64_t offset,
std::string& data, bool segment_eos) {
// append data
butil::IOBuf append_buf;
PStreamHeader header;
header.set_opcode(PStreamHeader::APPEND_DATA);
header.mutable_load_id()->set_hi(load_id.hi);
header.mutable_load_id()->set_lo(load_id.lo);
header.set_index_id(index_id);
header.set_tablet_id(tablet_id);
header.set_segment_id(segid);
header.set_segment_eos(segment_eos);
header.set_src_id(sender_id);
header.set_partition_id(NORMAL_PARTITION_ID);
header.set_offset(offset);
size_t hdr_len = header.ByteSizeLong();
append_buf.append((char*)&hdr_len, sizeof(size_t));
append_buf.append(header.SerializeAsString());
size_t data_len = data.length();
append_buf.append((char*)&data_len, sizeof(size_t));
append_buf.append(data);
LOG(INFO) << "send " << header.DebugString() << data;
static_cast<void>(client.send(&append_buf));
}
void write_normal(MockSinkClient& client) {
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
NORMAL_TABLET_ID, 0, 0, NORMAL_STRING, true);
}
void write_abnormal_load(MockSinkClient& client) {
write_one_tablet(client, ABNORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true);
}
void write_abnormal_index(MockSinkClient& client) {
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, ABNORMAL_INDEX_ID,
NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true);
}
void write_abnormal_sender(MockSinkClient& client) {
write_one_tablet(client, NORMAL_LOAD_ID, ABNORMAL_SENDER_ID, NORMAL_INDEX_ID,
NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true);
}
void write_abnormal_tablet(MockSinkClient& client) {
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
ABNORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true);
}
void wait_for_ack(int32_t num) {
for (int i = 0; i < 1000 && g_response_stat.num < num; i++) {
LOG(INFO) << "waiting ack, current " << g_response_stat.num << ", expected " << num;
bthread_usleep(1000);
}
}
void wait_for_close() {
for (int i = 0; i < 3000 && _load_stream_mgr->get_load_stream_num() != 0; i++) {
bthread_usleep(1000);
}
}
void SetUp() override {
_server = new brpc::Server();
srand(time(nullptr));
char buffer[MAX_PATH_LEN];
EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
config::storage_root_path = std::string(buffer) + "/data_test";
auto st = io::global_local_filesystem()->delete_directory(config::storage_root_path);
ASSERT_TRUE(st.ok()) << st;
st = io::global_local_filesystem()->create_directory(config::storage_root_path);
ASSERT_TRUE(st.ok()) << st;
std::vector<StorePath> paths;
paths.emplace_back(config::storage_root_path, -1);
doris::EngineOptions options;
options.store_paths = paths;
auto engine = std::make_unique<StorageEngine>(options);
engine_ref = engine.get();
Status s = engine->open();
EXPECT_TRUE(s.ok()) << s;
ExecEnv::GetInstance()->set_storage_engine(std::move(engine));
EXPECT_TRUE(io::global_local_filesystem()->create_directory(zTestDir).ok());
_load_stream_mgr = std::make_unique<LoadStreamMgr>(4);
_load_stream_mgr->set_heavy_work_pool(&_heavy_work_pool);
_stream_service = new StreamService(_load_stream_mgr.get());
CHECK_EQ(0, _server->AddService(_stream_service, brpc::SERVER_OWNS_SERVICE));
brpc::ServerOptions server_options;
server_options.idle_timeout_sec = 300;
{
debug::ScopedLeakCheckDisabler disable_lsan;
CHECK_EQ(0, _server->Start("127.0.0.1:18947", &server_options));
}
for (int i = 0; i < 3; i++) {
TCreateTabletReq request;
create_tablet_request(NORMAL_TABLET_ID + i, SCHEMA_HASH, &request);
auto profile = std::make_unique<RuntimeProfile>("test");
Status res = engine_ref->create_tablet(request, profile.get());
EXPECT_EQ(Status::OK(), res);
}
}
void TearDown() override {
_server->Stop(0);
CHECK_EQ(0, _server->Join());
SAFE_DELETE(_server);
engine_ref = nullptr;
ExecEnv::GetInstance()->set_storage_engine(nullptr);
}
std::string read_data(int64_t txn_id, int64_t partition_id, int64_t tablet_id, uint32_t segid) {
auto tablet = engine_ref->tablet_manager()->get_tablet(tablet_id);
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
engine_ref->txn_manager()->get_txn_related_tablets(txn_id, partition_id,
&tablet_related_rs);
LOG(INFO) << "get txn related tablet, txn_id=" << txn_id << ", tablet_id=" << tablet_id
<< "partition_id=" << partition_id;
for (auto& [tablet, rowset] : tablet_related_rs) {
if (tablet.tablet_id != tablet_id || rowset == nullptr) {
continue;
}
auto path = local_segment_path(rowset->tablet_path(), rowset->rowset_id().to_string(),
segid);
LOG(INFO) << "read data from " << path;
std::ifstream inputFile(path, std::ios::binary);
inputFile.seekg(0, std::ios::end);
std::streampos file_size = inputFile.tellg();
inputFile.seekg(0, std::ios::beg);
// Read the file contents
std::unique_ptr<char[]> buffer = std::make_unique<char[]>(file_size);
inputFile.read(buffer.get(), file_size);
return std::string(buffer.get(), file_size);
}
return std::string();
}
ExecEnv* _env;
brpc::Server* _server;
StreamService* _stream_service;
FifoThreadPool _heavy_work_pool;
FifoThreadPool _light_work_pool;
std::unique_ptr<LoadStreamMgr> _load_stream_mgr;
};
// <client, index, bucket>
// one client
TEST_F(LoadStreamMgrTest, one_client_normal) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
write_normal(client);
reset_response_stat();
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
close_load(client, {tablet}, ABNORMAL_SENDER_ID);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
close_load(client, {tablet});
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
// server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
TEST_F(LoadStreamMgrTest, one_client_abnormal_load) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
reset_response_stat();
write_abnormal_load(client);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
close_load(client);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
// server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
TEST_F(LoadStreamMgrTest, one_client_abnormal_index) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
reset_response_stat();
write_abnormal_index(client);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(ABNORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
// server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
TEST_F(LoadStreamMgrTest, one_client_abnormal_sender) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
reset_response_stat();
write_abnormal_sender(client);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
// on the final close_load, segment num check will fail
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 2);
// server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
TEST_F(LoadStreamMgrTest, one_client_abnormal_tablet) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
reset_response_stat();
write_abnormal_tablet(client);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], ABNORMAL_TABLET_ID);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(ABNORMAL_TABLET_ID);
tablet.set_num_segments(1);
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 2);
EXPECT_EQ(g_response_stat.failed_tablet_ids[1], ABNORMAL_TABLET_ID);
// server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0_zero_bytes) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
reset_response_stat();
// append data
butil::IOBuf append_buf;
PStreamHeader header;
std::string data;
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
0, data, true);
EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
// CLOSE_LOAD
close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
// duplicated close
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
// server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
TEST_F(LoadStreamMgrTest, close_load_before_recv_eos) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
reset_response_stat();
// append data
butil::IOBuf append_buf;
PStreamHeader header;
std::string data = "file1 hello world 123 !@#$%^&*()_+";
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
data.length(), data, false);
EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
// CLOSE_LOAD before EOS
close_load(client, {tablet});
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
// server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
// then the late EOS, will not be handled
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
data.length(), data, true);
// duplicated close, will not be handled
close_load(client, {tablet});
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0);
EXPECT_EQ(written_data, "");
}
TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
reset_response_stat();
// append data
butil::IOBuf append_buf;
PStreamHeader header;
std::string data = "file1 hello world 123 !@#$%^&*()_+";
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
0, data, false);
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
data.length(), data, true);
EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
// CLOSE_LOAD
close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
// duplicated close
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0);
EXPECT_EQ(written_data, data + data);
// server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment_without_eos) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
reset_response_stat();
// append data
butil::IOBuf append_buf;
PStreamHeader header;
std::string data = "file1 hello world 123 !@#$%^&*()_+";
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
0, data, false);
EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
// CLOSE_LOAD
close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
// duplicated close
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
// server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment1) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
reset_response_stat();
// append data
butil::IOBuf append_buf;
PStreamHeader header;
std::string data = "file1 hello world 123 !@#$%^&*()_+";
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1,
0, data, false);
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1,
data.length(), data, true);
EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(1);
// CLOSE_LOAD
close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
// duplicated close
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID);
// server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_two_segment) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
reset_response_stat();
// append data
butil::IOBuf append_buf;
PStreamHeader header;
std::string data1 = "file1 hello world 123 !@#$%^&*()_+1";
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
0, data1, false);
std::string empty;
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0,
data1.length(), empty, true);
std::string data2 = "file1 hello world 123 !@#$%^&*()_+2";
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1,
0, data2, true);
EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(2);
// CLOSE_LOAD
close_load(client, {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
// duplicated close
close_load(client, {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
close_load(client, {tablet}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0);
EXPECT_EQ(written_data, data1);
written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 1);
EXPECT_EQ(written_data, data2);
// server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
TEST_F(LoadStreamMgrTest, one_client_one_index_three_tablet) {
MockSinkClient client;
auto st = client.connect_stream();
EXPECT_TRUE(st.ok());
reset_response_stat();
// append data
butil::IOBuf append_buf;
PStreamHeader header;
std::string data1 = "file1 hello world 123 !@#$%^&*()_+1";
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
NORMAL_TABLET_ID + 0, 0, 0, data1, true);
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
NORMAL_TABLET_ID + 1, 0, 0, data1, true);
std::string data2 = "file1 hello world 123 !@#$%^&*()_+2";
write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID,
NORMAL_TABLET_ID + 2, 0, 0, data2, true);
EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet1;
tablet1.set_partition_id(NORMAL_PARTITION_ID);
tablet1.set_index_id(NORMAL_INDEX_ID);
tablet1.set_tablet_id(NORMAL_TABLET_ID);
tablet1.set_num_segments(1);
PTabletID tablet2 {tablet1};
tablet2.set_tablet_id(NORMAL_TABLET_ID + 1);
PTabletID tablet3 {tablet1};
tablet3.set_tablet_id(NORMAL_TABLET_ID + 2);
// CLOSE_LOAD
close_load(client, {tablet1, tablet2, tablet3}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
// duplicated close
close_load(client, {tablet1, tablet2, tablet3}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
close_load(client, {tablet1, tablet2, tablet3}, 0);
wait_for_ack(3);
EXPECT_EQ(g_response_stat.num, 3);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 3);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
std::set<int64_t> success_tablet_ids;
for (auto& id : g_response_stat.success_tablet_ids) {
success_tablet_ids.insert(id);
}
EXPECT_TRUE(success_tablet_ids.count(NORMAL_TABLET_ID));
EXPECT_TRUE(success_tablet_ids.count(NORMAL_TABLET_ID + 1));
EXPECT_TRUE(success_tablet_ids.count(NORMAL_TABLET_ID + 2));
auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0);
EXPECT_EQ(written_data, data1);
written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID + 1, 0);
EXPECT_EQ(written_data, data1);
written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID + 2, 0);
EXPECT_EQ(written_data, data2);
// server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
TEST_F(LoadStreamMgrTest, two_client_one_index_one_tablet_three_segment) {
MockSinkClient clients[2];
for (int i = 0; i < 2; i++) {
auto st = clients[i].connect_stream(NORMAL_SENDER_ID + i, 2);
EXPECT_TRUE(st.ok());
}
reset_response_stat();
std::vector<std::string> segment_data;
segment_data.resize(6);
// each sender three segments
for (int32_t segid = 2; segid >= 0; segid--) {
// append data
for (int i = 0; i < 2; i++) {
butil::IOBuf append_buf;
PStreamHeader header;
std::string data1 =
"sender_id=" + std::to_string(i) + ",segid=" + std::to_string(segid);
write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID,
NORMAL_TABLET_ID, segid, 0, data1, true);
segment_data[i * 3 + segid] = data1;
LOG(INFO) << "segment_data[" << i * 3 + segid << "]" << data1;
}
}
EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(3);
// CLOSE_LOAD
close_load(clients[1], {tablet}, 1);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
// duplicated close
close_load(clients[1], {tablet}, 1);
wait_for_ack(2);
// stream closed, no response will be sent
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
close_load(clients[0], {tablet}, 0);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0);
size_t sender_pos = written_data.find('=');
size_t sender_end = written_data.find(',');
EXPECT_NE(sender_pos, std::string::npos);
EXPECT_NE(sender_end, std::string::npos);
auto sender_str = written_data.substr(sender_pos + 1, sender_end - sender_pos);
LOG(INFO) << "sender_str " << sender_str;
uint32_t sender_id = std::stoi(sender_str);
for (int i = 0; i < 3; i++) {
auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, i);
EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]);
}
sender_id = (sender_id + 1) % 2;
for (int i = 0; i < 3; i++) {
auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, i + 3);
EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]);
}
// server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
}
TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) {
MockSinkClient clients[2];
EXPECT_TRUE(clients[0].connect_stream(NORMAL_SENDER_ID, 2).ok());
reset_response_stat();
std::vector<std::string> segment_data;
segment_data.resize(6);
for (int32_t segid = 2; segid >= 0; segid--) {
for (int i = 0; i < 2; i++) {
std::string data = "sender_id=" + std::to_string(i) + ",segid=" + std::to_string(segid);
segment_data[i * 3 + segid] = data;
LOG(INFO) << "segment_data[" << i * 3 + segid << "]" << data;
}
}
for (int32_t segid = 2; segid >= 0; segid--) {
int i = 0;
write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID,
NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 + segid], true);
}
EXPECT_EQ(g_response_stat.num, 0);
PTabletID tablet;
tablet.set_partition_id(NORMAL_PARTITION_ID);
tablet.set_index_id(NORMAL_INDEX_ID);
tablet.set_tablet_id(NORMAL_TABLET_ID);
tablet.set_num_segments(3);
// CLOSE_LOAD
close_load(clients[0], {tablet}, 0);
wait_for_ack(1);
EXPECT_EQ(g_response_stat.num, 1);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
// sender 0 closed, before open sender 1, load stream should still be open
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1);
EXPECT_TRUE(clients[1].connect_stream(NORMAL_SENDER_ID + 1, 2).ok());
for (int32_t segid = 2; segid >= 0; segid--) {
int i = 1;
write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID,
NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 + segid], true);
}
close_load(clients[1], {tablet}, 1);
wait_for_ack(2);
EXPECT_EQ(g_response_stat.num, 2);
EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1);
EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0);
EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID);
// server will close stream on CLOSE_LOAD
wait_for_close();
EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0);
auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0);
size_t sender_pos = written_data.find('=');
size_t sender_end = written_data.find(',');
EXPECT_NE(sender_pos, std::string::npos);
EXPECT_NE(sender_end, std::string::npos);
auto sender_str = written_data.substr(sender_pos + 1, sender_end - sender_pos);
LOG(INFO) << "sender_str " << sender_str;
uint32_t sender_id = std::stoi(sender_str);
for (int i = 0; i < 3; i++) {
auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, i);
EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]);
}
sender_id = (sender_id + 1) % 2;
for (int i = 0; i < 3; i++) {
auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, i + 3);
EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]);
}
}
} // namespace doris