| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include <brpc/channel.h> |
| #include <brpc/server.h> |
| #include <brpc/stream.h> |
| #include <butil/logging.h> |
| #include <gen_cpp/Types_types.h> |
| #include <gen_cpp/internal_service.pb.h> |
| #include <gflags/gflags.h> |
| #include <gtest/gtest-message.h> |
| #include <gtest/gtest-test-part.h> |
| #include <olap/storage_engine.h> |
| #include <service/internal_service.h> |
| #include <unistd.h> |
| |
| #include <functional> |
| #include <memory> |
| |
| #include "common/config.h" |
| #include "common/status.h" |
| #include "exec/tablet_info.h" |
| #include "gen_cpp/BackendService_types.h" |
| #include "gen_cpp/FrontendService_types.h" |
| #include "gtest/gtest_pred_impl.h" |
| #include "io/fs/local_file_system.h" |
| #include "olap/olap_define.h" |
| #include "olap/options.h" |
| #include "olap/rowset/beta_rowset.h" |
| #include "olap/tablet_manager.h" |
| #include "olap/txn_manager.h" |
| #include "runtime/descriptor_helper.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/load_stream_mgr.h" |
| #include "util/debug/leakcheck_disabler.h" |
| #include "util/runtime_profile.h" |
| |
| using namespace brpc; |
| |
| namespace doris { |
| |
| static const uint32_t MAX_PATH_LEN = 1024; |
| static StorageEngine* engine_ref = nullptr; |
| static const std::string zTestDir = "./data_test/data/load_stream_mgr_test"; |
| |
| const int64_t NORMAL_TABLET_ID = 10000; |
| const int64_t ABNORMAL_TABLET_ID = 40000; |
| const int64_t NORMAL_INDEX_ID = 50000; |
| const int64_t ABNORMAL_INDEX_ID = 60000; |
| const int64_t NORMAL_PARTITION_ID = 50000; |
| const int64_t SCHEMA_HASH = 90000; |
| const uint32_t NORMAL_SENDER_ID = 0; |
| const uint32_t ABNORMAL_SENDER_ID = 10000; |
| const int64_t NORMAL_TXN_ID = 600001; |
| const UniqueId NORMAL_LOAD_ID(1, 1); |
| const UniqueId ABNORMAL_LOAD_ID(1, 0); |
| std::string NORMAL_STRING("normal"); |
| std::string ABNORMAL_STRING("abnormal"); |
| |
| void construct_schema(OlapTableSchemaParam* schema) { |
| // construct schema |
| TOlapTableSchemaParam tschema; |
| tschema.db_id = 1; |
| tschema.table_id = 2; |
| tschema.version = 0; |
| |
| // descriptor |
| { |
| TDescriptorTableBuilder dtb; |
| { |
| TTupleDescriptorBuilder tuple_builder; |
| |
| tuple_builder.add_slot(TSlotDescriptorBuilder() |
| .type(TYPE_INT) |
| .column_name("c1") |
| .column_pos(1) |
| .build()); |
| tuple_builder.add_slot(TSlotDescriptorBuilder() |
| .type(TYPE_BIGINT) |
| .column_name("c2") |
| .column_pos(2) |
| .build()); |
| tuple_builder.add_slot(TSlotDescriptorBuilder() |
| .string_type(10) |
| .column_name("c3") |
| .column_pos(3) |
| .build()); |
| |
| tuple_builder.build(&dtb); |
| } |
| { |
| TTupleDescriptorBuilder tuple_builder; |
| |
| tuple_builder.add_slot(TSlotDescriptorBuilder() |
| .type(TYPE_INT) |
| .column_name("c1") |
| .column_pos(1) |
| .build()); |
| tuple_builder.add_slot(TSlotDescriptorBuilder() |
| .type(TYPE_BIGINT) |
| .column_name("c2") |
| .column_pos(2) |
| .build()); |
| tuple_builder.add_slot(TSlotDescriptorBuilder() |
| .string_type(20) |
| .column_name("c3") |
| .column_pos(3) |
| .build()); |
| |
| tuple_builder.build(&dtb); |
| } |
| |
| auto desc_tbl = dtb.desc_tbl(); |
| tschema.slot_descs = desc_tbl.slotDescriptors; |
| tschema.tuple_desc = desc_tbl.tupleDescriptors[0]; |
| } |
| // index |
| tschema.indexes.resize(2); |
| tschema.indexes[0].id = NORMAL_INDEX_ID; |
| tschema.indexes[0].columns = {"c1", "c2", "c3"}; |
| |
| tschema.indexes[1].id = NORMAL_INDEX_ID + 1; |
| tschema.indexes[1].columns = {"c1", "c2", "c3"}; |
| |
| static_cast<void>(schema->init(tschema)); |
| } |
| |
| // copied from delta_writer_test.cpp |
| static void create_tablet_request(int64_t tablet_id, int32_t schema_hash, |
| TCreateTabletReq* request) { |
| request->tablet_id = tablet_id; |
| request->partition_id = 30001; |
| request->__set_version(1); |
| request->tablet_schema.schema_hash = schema_hash; |
| request->tablet_schema.short_key_column_count = 6; |
| request->tablet_schema.keys_type = TKeysType::AGG_KEYS; |
| request->tablet_schema.storage_type = TStorageType::COLUMN; |
| request->__set_storage_format(TStorageFormat::V2); |
| |
| TColumn k1; |
| |
| k1.__set_is_key(true); |
| k1.column_type.type = TPrimitiveType::TINYINT; |
| request->tablet_schema.columns.push_back(k1); |
| |
| TColumn k2; |
| k2.column_name = "k2"; |
| k2.__set_is_key(true); |
| k2.column_type.type = TPrimitiveType::SMALLINT; |
| request->tablet_schema.columns.push_back(k2); |
| |
| TColumn k3; |
| k3.column_name = "k3"; |
| k3.__set_is_key(true); |
| k3.column_type.type = TPrimitiveType::INT; |
| request->tablet_schema.columns.push_back(k3); |
| |
| TColumn k4; |
| k4.column_name = "k4"; |
| k4.__set_is_key(true); |
| k4.column_type.type = TPrimitiveType::BIGINT; |
| request->tablet_schema.columns.push_back(k4); |
| |
| TColumn k5; |
| k5.column_name = "k5"; |
| k5.__set_is_key(true); |
| k5.column_type.type = TPrimitiveType::LARGEINT; |
| request->tablet_schema.columns.push_back(k5); |
| |
| TColumn k6; |
| k6.column_name = "k6"; |
| k6.__set_is_key(true); |
| k6.column_type.type = TPrimitiveType::DATE; |
| request->tablet_schema.columns.push_back(k6); |
| |
| TColumn k7; |
| k7.column_name = "k7"; |
| k7.__set_is_key(true); |
| k7.column_type.type = TPrimitiveType::DATETIME; |
| request->tablet_schema.columns.push_back(k7); |
| |
| TColumn k8; |
| k8.column_name = "k8"; |
| k8.__set_is_key(true); |
| k8.column_type.type = TPrimitiveType::CHAR; |
| k8.column_type.__set_len(4); |
| request->tablet_schema.columns.push_back(k8); |
| |
| TColumn k9; |
| k9.column_name = "k9"; |
| k9.__set_is_key(true); |
| k9.column_type.type = TPrimitiveType::VARCHAR; |
| k9.column_type.__set_len(65); |
| request->tablet_schema.columns.push_back(k9); |
| |
| TColumn k10; |
| k10.column_name = "k10"; |
| k10.__set_is_key(true); |
| k10.column_type.type = TPrimitiveType::DECIMALV2; |
| k10.column_type.__set_precision(6); |
| k10.column_type.__set_scale(3); |
| request->tablet_schema.columns.push_back(k10); |
| |
| TColumn k11; |
| k11.column_name = "k11"; |
| k11.__set_is_key(true); |
| k11.column_type.type = TPrimitiveType::DATEV2; |
| request->tablet_schema.columns.push_back(k11); |
| |
| TColumn v1; |
| v1.column_name = "v1"; |
| v1.__set_is_key(false); |
| v1.column_type.type = TPrimitiveType::TINYINT; |
| v1.__set_aggregation_type(TAggregationType::SUM); |
| request->tablet_schema.columns.push_back(v1); |
| |
| TColumn v2; |
| v2.column_name = "v2"; |
| v2.__set_is_key(false); |
| v2.column_type.type = TPrimitiveType::SMALLINT; |
| v2.__set_aggregation_type(TAggregationType::SUM); |
| request->tablet_schema.columns.push_back(v2); |
| |
| TColumn v3; |
| v3.column_name = "v3"; |
| v3.__set_is_key(false); |
| v3.column_type.type = TPrimitiveType::INT; |
| v3.__set_aggregation_type(TAggregationType::SUM); |
| request->tablet_schema.columns.push_back(v3); |
| |
| TColumn v4; |
| v4.column_name = "v4"; |
| v4.__set_is_key(false); |
| v4.column_type.type = TPrimitiveType::BIGINT; |
| v4.__set_aggregation_type(TAggregationType::SUM); |
| request->tablet_schema.columns.push_back(v4); |
| |
| TColumn v5; |
| v5.column_name = "v5"; |
| v5.__set_is_key(false); |
| v5.column_type.type = TPrimitiveType::LARGEINT; |
| v5.__set_aggregation_type(TAggregationType::SUM); |
| request->tablet_schema.columns.push_back(v5); |
| |
| TColumn v6; |
| v6.column_name = "v6"; |
| v6.__set_is_key(false); |
| v6.column_type.type = TPrimitiveType::DATE; |
| v6.__set_aggregation_type(TAggregationType::REPLACE); |
| request->tablet_schema.columns.push_back(v6); |
| |
| TColumn v7; |
| v7.column_name = "v7"; |
| v7.__set_is_key(false); |
| v7.column_type.type = TPrimitiveType::DATETIME; |
| v7.__set_aggregation_type(TAggregationType::REPLACE); |
| request->tablet_schema.columns.push_back(v7); |
| |
| TColumn v8; |
| v8.column_name = "v8"; |
| v8.__set_is_key(false); |
| v8.column_type.type = TPrimitiveType::CHAR; |
| v8.column_type.__set_len(4); |
| v8.__set_aggregation_type(TAggregationType::REPLACE); |
| request->tablet_schema.columns.push_back(v8); |
| |
| TColumn v9; |
| v9.column_name = "v9"; |
| v9.__set_is_key(false); |
| v9.column_type.type = TPrimitiveType::VARCHAR; |
| v9.column_type.__set_len(65); |
| v9.__set_aggregation_type(TAggregationType::REPLACE); |
| request->tablet_schema.columns.push_back(v9); |
| |
| TColumn v10; |
| v10.column_name = "v10"; |
| v10.__set_is_key(false); |
| v10.column_type.type = TPrimitiveType::DECIMALV2; |
| v10.column_type.__set_precision(6); |
| v10.column_type.__set_scale(3); |
| v10.__set_aggregation_type(TAggregationType::SUM); |
| request->tablet_schema.columns.push_back(v10); |
| |
| TColumn v11; |
| v11.column_name = "v11"; |
| v11.__set_is_key(false); |
| v11.column_type.type = TPrimitiveType::DATEV2; |
| v11.__set_aggregation_type(TAggregationType::REPLACE); |
| request->tablet_schema.columns.push_back(v11); |
| } |
| |
| struct ResponseStat { |
| std::atomic<int32_t> num; |
| std::vector<int64_t> success_tablet_ids; |
| std::vector<int64_t> failed_tablet_ids; |
| }; |
| bthread::Mutex g_stat_lock; |
| static ResponseStat g_response_stat; |
| |
| void reset_response_stat() { |
| std::lock_guard lock_guard(g_stat_lock); |
| g_response_stat.num = 0; |
| g_response_stat.success_tablet_ids.clear(); |
| g_response_stat.failed_tablet_ids.clear(); |
| } |
| |
| class LoadStreamMgrTest : public testing::Test { |
| public: |
| class Handler : public brpc::StreamInputHandler { |
| public: |
| int on_received_messages(StreamId id, butil::IOBuf* const messages[], |
| size_t size) override { |
| for (size_t i = 0; i < size; i++) { |
| PLoadStreamResponse response; |
| butil::IOBufAsZeroCopyInputStream wrapper(*messages[i]); |
| response.ParseFromZeroCopyStream(&wrapper); |
| LOG(INFO) << "response " << response.DebugString(); |
| std::lock_guard lock_guard(g_stat_lock); |
| for (auto& id : response.success_tablet_ids()) { |
| g_response_stat.success_tablet_ids.push_back(id); |
| } |
| for (auto& tablet : response.failed_tablets()) { |
| g_response_stat.failed_tablet_ids.push_back(tablet.id()); |
| } |
| g_response_stat.num++; |
| } |
| |
| return 0; |
| } |
| void on_idle_timeout(StreamId id) override { std::cerr << "on_idle_timeout" << std::endl; } |
| void on_closed(StreamId id) override { std::cerr << "on_closed" << std::endl; } |
| }; |
| |
| class StreamService : public PBackendService { |
| public: |
| StreamService(LoadStreamMgr* load_stream_mgr) |
| : _sd(brpc::INVALID_STREAM_ID), _load_stream_mgr(load_stream_mgr) {} |
| virtual ~StreamService() { brpc::StreamClose(_sd); }; |
| virtual void open_load_stream(google::protobuf::RpcController* controller, |
| const POpenLoadStreamRequest* request, |
| POpenLoadStreamResponse* response, |
| google::protobuf::Closure* done) { |
| brpc::ClosureGuard done_guard(done); |
| std::unique_ptr<PStatus> status = std::make_unique<PStatus>(); |
| brpc::Controller* cntl = static_cast<brpc::Controller*>(controller); |
| brpc::StreamOptions stream_options; |
| |
| for (const auto& req : request->tablets()) { |
| TabletManager* tablet_mgr = engine_ref->tablet_manager(); |
| TabletSharedPtr tablet = tablet_mgr->get_tablet(req.tablet_id()); |
| if (tablet == nullptr) { |
| cntl->SetFailed("Tablet not found"); |
| status->set_status_code(TStatusCode::NOT_FOUND); |
| response->set_allocated_status(status.get()); |
| static_cast<void>(response->release_status()); |
| return; |
| } |
| auto resp = response->add_tablet_schemas(); |
| resp->set_index_id(req.index_id()); |
| resp->set_enable_unique_key_merge_on_write( |
| tablet->enable_unique_key_merge_on_write()); |
| tablet->tablet_schema()->to_schema_pb(resp->mutable_tablet_schema()); |
| } |
| |
| LoadStream* load_stream; |
| LOG(INFO) << "total streams: " << request->total_streams(); |
| EXPECT_GT(request->total_streams(), 0); |
| auto st = _load_stream_mgr->open_load_stream(request, load_stream); |
| |
| stream_options.handler = load_stream; |
| |
| StreamId streamid; |
| if (brpc::StreamAccept(&streamid, *cntl, &stream_options) != 0) { |
| cntl->SetFailed("Fail to accept stream"); |
| status->set_status_code(TStatusCode::CANCELLED); |
| response->set_allocated_status(status.get()); |
| static_cast<void>(response->release_status()); |
| return; |
| } |
| |
| status->set_status_code(TStatusCode::OK); |
| response->set_allocated_status(status.get()); |
| static_cast<void>(response->release_status()); |
| } |
| |
| private: |
| Handler _receiver; |
| brpc::StreamId _sd; |
| LoadStreamMgr* _load_stream_mgr = nullptr; |
| }; |
| |
| class MockSinkClient { |
| public: |
| MockSinkClient() = default; |
| ~MockSinkClient() { disconnect(); } |
| |
| class MockClosure : public google::protobuf::Closure { |
| public: |
| MockClosure(std::function<void()> cb) : _cb(cb) {} |
| void Run() override { |
| _cb(); |
| delete this; |
| } |
| |
| private: |
| std::function<void()> _cb; |
| }; |
| |
| Status connect_stream(int64_t sender_id = NORMAL_SENDER_ID, int total_streams = 1) { |
| brpc::Channel channel; |
| std::cerr << "connect_stream" << std::endl; |
| // Initialize the channel, NULL means using default options. |
| brpc::ChannelOptions options; |
| options.protocol = brpc::PROTOCOL_BAIDU_STD; |
| options.connection_type = "single"; |
| options.timeout_ms = 10000 /*milliseconds*/; |
| options.max_retry = 3; |
| CHECK_EQ(0, channel.Init("127.0.0.1:18947", nullptr)); |
| |
| // Normally, you should not call a Channel directly, but instead construct |
| // a stub Service wrapping it. stub can be shared by all threads as well. |
| PBackendService_Stub stub(&channel); |
| |
| _stream_options.handler = &_handler; |
| if (brpc::StreamCreate(&_stream, _cntl, &_stream_options) != 0) { |
| LOG(ERROR) << "Fail to create stream"; |
| return Status::InternalError("Fail to create stream"); |
| } |
| |
| POpenLoadStreamRequest request; |
| POpenLoadStreamResponse response; |
| PUniqueId id; |
| id.set_hi(1); |
| id.set_lo(1); |
| |
| auto param = std::make_shared<OlapTableSchemaParam>(); |
| construct_schema(param.get()); |
| *request.mutable_schema() = *param->to_protobuf(); |
| *request.mutable_load_id() = id; |
| request.set_txn_id(NORMAL_TXN_ID); |
| request.set_src_id(sender_id); |
| request.set_total_streams(total_streams); |
| auto ptablet = request.add_tablets(); |
| ptablet->set_tablet_id(NORMAL_TABLET_ID); |
| ptablet->set_index_id(NORMAL_INDEX_ID); |
| stub.open_load_stream(&_cntl, &request, &response, nullptr); |
| if (_cntl.Failed()) { |
| std::cerr << "open_load_stream failed" << std::endl; |
| LOG(ERROR) << "Fail to open load stream " << _cntl.ErrorText(); |
| return Status::InternalError("Fail to open load stream"); |
| } |
| |
| return Status::OK(); |
| } |
| |
| void disconnect() const { |
| std::cerr << "disconnect" << std::endl; |
| CHECK_EQ(0, brpc::StreamClose(_stream)); |
| } |
| |
| Status send(butil::IOBuf* buf) { |
| int ret = brpc::StreamWrite(_stream, *buf); |
| if (ret != 0) { |
| LOG(ERROR) << "Fail to write stream"; |
| return Status::InternalError("Fail to write stream"); |
| } |
| LOG(INFO) << "sent by stream successfully" << std::endl; |
| return Status::OK(); |
| } |
| |
| Status close() { return Status::OK(); } |
| |
| private: |
| brpc::StreamId _stream; |
| brpc::Controller _cntl; |
| brpc::StreamOptions _stream_options; |
| Handler _handler; |
| }; |
| |
| LoadStreamMgrTest() |
| : _heavy_work_pool(4, 32, "load_stream_test_heavy"), |
| _light_work_pool(4, 32, "load_stream_test_light") {} |
| |
| void close_load(MockSinkClient& client, const std::vector<PTabletID>& tablets_to_commit = {}, |
| uint32_t sender_id = NORMAL_SENDER_ID) { |
| butil::IOBuf append_buf; |
| PStreamHeader header; |
| header.mutable_load_id()->set_hi(1); |
| header.mutable_load_id()->set_lo(1); |
| header.set_opcode(PStreamHeader::CLOSE_LOAD); |
| header.set_src_id(sender_id); |
| for (const auto& tablet : tablets_to_commit) { |
| *header.add_tablets() = tablet; |
| } |
| size_t hdr_len = header.ByteSizeLong(); |
| append_buf.append((char*)&hdr_len, sizeof(size_t)); |
| append_buf.append(header.SerializeAsString()); |
| static_cast<void>(client.send(&append_buf)); |
| } |
| |
| void write_one_tablet(MockSinkClient& client, UniqueId load_id, uint32_t sender_id, |
| int64_t index_id, int64_t tablet_id, uint32_t segid, uint64_t offset, |
| std::string& data, bool segment_eos) { |
| // append data |
| butil::IOBuf append_buf; |
| PStreamHeader header; |
| header.set_opcode(PStreamHeader::APPEND_DATA); |
| header.mutable_load_id()->set_hi(load_id.hi); |
| header.mutable_load_id()->set_lo(load_id.lo); |
| header.set_index_id(index_id); |
| header.set_tablet_id(tablet_id); |
| header.set_segment_id(segid); |
| header.set_segment_eos(segment_eos); |
| header.set_src_id(sender_id); |
| header.set_partition_id(NORMAL_PARTITION_ID); |
| header.set_offset(offset); |
| size_t hdr_len = header.ByteSizeLong(); |
| append_buf.append((char*)&hdr_len, sizeof(size_t)); |
| append_buf.append(header.SerializeAsString()); |
| size_t data_len = data.length(); |
| append_buf.append((char*)&data_len, sizeof(size_t)); |
| append_buf.append(data); |
| LOG(INFO) << "send " << header.DebugString() << data; |
| static_cast<void>(client.send(&append_buf)); |
| } |
| |
| void write_normal(MockSinkClient& client) { |
| write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, |
| NORMAL_TABLET_ID, 0, 0, NORMAL_STRING, true); |
| } |
| |
| void write_abnormal_load(MockSinkClient& client) { |
| write_one_tablet(client, ABNORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, |
| NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true); |
| } |
| |
| void write_abnormal_index(MockSinkClient& client) { |
| write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, ABNORMAL_INDEX_ID, |
| NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true); |
| } |
| |
| void write_abnormal_sender(MockSinkClient& client) { |
| write_one_tablet(client, NORMAL_LOAD_ID, ABNORMAL_SENDER_ID, NORMAL_INDEX_ID, |
| NORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true); |
| } |
| |
| void write_abnormal_tablet(MockSinkClient& client) { |
| write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, |
| ABNORMAL_TABLET_ID, 0, 0, ABNORMAL_STRING, true); |
| } |
| |
| void wait_for_ack(int32_t num) { |
| for (int i = 0; i < 1000 && g_response_stat.num < num; i++) { |
| LOG(INFO) << "waiting ack, current " << g_response_stat.num << ", expected " << num; |
| bthread_usleep(1000); |
| } |
| } |
| |
| void wait_for_close() { |
| for (int i = 0; i < 3000 && _load_stream_mgr->get_load_stream_num() != 0; i++) { |
| bthread_usleep(1000); |
| } |
| } |
| |
| void SetUp() override { |
| _server = new brpc::Server(); |
| srand(time(nullptr)); |
| char buffer[MAX_PATH_LEN]; |
| EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr); |
| config::storage_root_path = std::string(buffer) + "/data_test"; |
| |
| auto st = io::global_local_filesystem()->delete_directory(config::storage_root_path); |
| ASSERT_TRUE(st.ok()) << st; |
| st = io::global_local_filesystem()->create_directory(config::storage_root_path); |
| ASSERT_TRUE(st.ok()) << st; |
| |
| std::vector<StorePath> paths; |
| paths.emplace_back(config::storage_root_path, -1); |
| |
| doris::EngineOptions options; |
| options.store_paths = paths; |
| auto engine = std::make_unique<StorageEngine>(options); |
| engine_ref = engine.get(); |
| Status s = engine->open(); |
| EXPECT_TRUE(s.ok()) << s; |
| ExecEnv::GetInstance()->set_storage_engine(std::move(engine)); |
| |
| EXPECT_TRUE(io::global_local_filesystem()->create_directory(zTestDir).ok()); |
| |
| _load_stream_mgr = std::make_unique<LoadStreamMgr>(4); |
| _load_stream_mgr->set_heavy_work_pool(&_heavy_work_pool); |
| _stream_service = new StreamService(_load_stream_mgr.get()); |
| CHECK_EQ(0, _server->AddService(_stream_service, brpc::SERVER_OWNS_SERVICE)); |
| brpc::ServerOptions server_options; |
| server_options.idle_timeout_sec = 300; |
| { |
| debug::ScopedLeakCheckDisabler disable_lsan; |
| CHECK_EQ(0, _server->Start("127.0.0.1:18947", &server_options)); |
| } |
| |
| for (int i = 0; i < 3; i++) { |
| TCreateTabletReq request; |
| create_tablet_request(NORMAL_TABLET_ID + i, SCHEMA_HASH, &request); |
| auto profile = std::make_unique<RuntimeProfile>("test"); |
| Status res = engine_ref->create_tablet(request, profile.get()); |
| EXPECT_EQ(Status::OK(), res); |
| } |
| } |
| |
| void TearDown() override { |
| _server->Stop(0); |
| CHECK_EQ(0, _server->Join()); |
| SAFE_DELETE(_server); |
| engine_ref = nullptr; |
| ExecEnv::GetInstance()->set_storage_engine(nullptr); |
| } |
| |
| std::string read_data(int64_t txn_id, int64_t partition_id, int64_t tablet_id, uint32_t segid) { |
| auto tablet = engine_ref->tablet_manager()->get_tablet(tablet_id); |
| std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs; |
| engine_ref->txn_manager()->get_txn_related_tablets(txn_id, partition_id, |
| &tablet_related_rs); |
| LOG(INFO) << "get txn related tablet, txn_id=" << txn_id << ", tablet_id=" << tablet_id |
| << "partition_id=" << partition_id; |
| for (auto& [tablet, rowset] : tablet_related_rs) { |
| if (tablet.tablet_id != tablet_id || rowset == nullptr) { |
| continue; |
| } |
| |
| auto path = local_segment_path(rowset->tablet_path(), rowset->rowset_id().to_string(), |
| segid); |
| LOG(INFO) << "read data from " << path; |
| std::ifstream inputFile(path, std::ios::binary); |
| inputFile.seekg(0, std::ios::end); |
| std::streampos file_size = inputFile.tellg(); |
| inputFile.seekg(0, std::ios::beg); |
| |
| // Read the file contents |
| std::unique_ptr<char[]> buffer = std::make_unique<char[]>(file_size); |
| inputFile.read(buffer.get(), file_size); |
| return std::string(buffer.get(), file_size); |
| } |
| return std::string(); |
| } |
| |
| ExecEnv* _env; |
| brpc::Server* _server; |
| StreamService* _stream_service; |
| |
| FifoThreadPool _heavy_work_pool; |
| FifoThreadPool _light_work_pool; |
| |
| std::unique_ptr<LoadStreamMgr> _load_stream_mgr; |
| }; |
| |
| // <client, index, bucket> |
| // one client |
| TEST_F(LoadStreamMgrTest, one_client_normal) { |
| MockSinkClient client; |
| auto st = client.connect_stream(); |
| EXPECT_TRUE(st.ok()); |
| |
| write_normal(client); |
| |
| reset_response_stat(); |
| PTabletID tablet; |
| tablet.set_partition_id(NORMAL_PARTITION_ID); |
| tablet.set_index_id(NORMAL_INDEX_ID); |
| tablet.set_tablet_id(NORMAL_TABLET_ID); |
| tablet.set_num_segments(1); |
| close_load(client, {tablet}, ABNORMAL_SENDER_ID); |
| wait_for_ack(1); |
| EXPECT_EQ(g_response_stat.num, 1); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1); |
| |
| close_load(client, {tablet}); |
| wait_for_ack(2); |
| EXPECT_EQ(g_response_stat.num, 2); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID); |
| |
| // server will close stream on CLOSE_LOAD |
| wait_for_close(); |
| EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); |
| } |
| |
| TEST_F(LoadStreamMgrTest, one_client_abnormal_load) { |
| MockSinkClient client; |
| auto st = client.connect_stream(); |
| EXPECT_TRUE(st.ok()); |
| |
| reset_response_stat(); |
| write_abnormal_load(client); |
| wait_for_ack(1); |
| EXPECT_EQ(g_response_stat.num, 1); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); |
| EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1); |
| |
| close_load(client); |
| wait_for_ack(2); |
| EXPECT_EQ(g_response_stat.num, 2); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); |
| |
| // server will close stream on CLOSE_LOAD |
| wait_for_close(); |
| EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); |
| } |
| |
| TEST_F(LoadStreamMgrTest, one_client_abnormal_index) { |
| MockSinkClient client; |
| auto st = client.connect_stream(); |
| EXPECT_TRUE(st.ok()); |
| |
| reset_response_stat(); |
| write_abnormal_index(client); |
| wait_for_ack(1); |
| EXPECT_EQ(g_response_stat.num, 1); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); |
| |
| PTabletID tablet; |
| tablet.set_partition_id(NORMAL_PARTITION_ID); |
| tablet.set_index_id(ABNORMAL_INDEX_ID); |
| tablet.set_tablet_id(NORMAL_TABLET_ID); |
| tablet.set_num_segments(1); |
| close_load(client, {tablet}, 1); |
| wait_for_ack(2); |
| EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1); |
| EXPECT_EQ(g_response_stat.num, 2); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); |
| |
| close_load(client, {tablet}, 0); |
| wait_for_ack(3); |
| EXPECT_EQ(g_response_stat.num, 3); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); |
| |
| // server will close stream on CLOSE_LOAD |
| wait_for_close(); |
| EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); |
| } |
| |
| TEST_F(LoadStreamMgrTest, one_client_abnormal_sender) { |
| MockSinkClient client; |
| auto st = client.connect_stream(); |
| EXPECT_TRUE(st.ok()); |
| |
| reset_response_stat(); |
| write_abnormal_sender(client); |
| wait_for_ack(1); |
| EXPECT_EQ(g_response_stat.num, 1); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); |
| |
| PTabletID tablet; |
| tablet.set_partition_id(NORMAL_PARTITION_ID); |
| tablet.set_index_id(NORMAL_INDEX_ID); |
| tablet.set_tablet_id(NORMAL_TABLET_ID); |
| tablet.set_num_segments(1); |
| close_load(client, {tablet}, 1); |
| wait_for_ack(2); |
| EXPECT_EQ(g_response_stat.num, 2); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); |
| |
| // on the final close_load, segment num check will fail |
| close_load(client, {tablet}, 0); |
| wait_for_ack(3); |
| EXPECT_EQ(g_response_stat.num, 3); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 2); |
| |
| // server will close stream on CLOSE_LOAD |
| wait_for_close(); |
| EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); |
| } |
| |
| TEST_F(LoadStreamMgrTest, one_client_abnormal_tablet) { |
| MockSinkClient client; |
| auto st = client.connect_stream(); |
| EXPECT_TRUE(st.ok()); |
| |
| reset_response_stat(); |
| write_abnormal_tablet(client); |
| wait_for_ack(1); |
| EXPECT_EQ(g_response_stat.num, 1); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids[0], ABNORMAL_TABLET_ID); |
| |
| PTabletID tablet; |
| tablet.set_partition_id(NORMAL_PARTITION_ID); |
| tablet.set_index_id(NORMAL_INDEX_ID); |
| tablet.set_tablet_id(ABNORMAL_TABLET_ID); |
| tablet.set_num_segments(1); |
| close_load(client, {tablet}, 1); |
| wait_for_ack(2); |
| EXPECT_EQ(g_response_stat.num, 2); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); |
| |
| close_load(client, {tablet}, 0); |
| wait_for_ack(3); |
| EXPECT_EQ(g_response_stat.num, 3); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 2); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids[1], ABNORMAL_TABLET_ID); |
| |
| // server will close stream on CLOSE_LOAD |
| wait_for_close(); |
| EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); |
| } |
| |
| TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0_zero_bytes) { |
| MockSinkClient client; |
| auto st = client.connect_stream(); |
| EXPECT_TRUE(st.ok()); |
| |
| reset_response_stat(); |
| |
| // append data |
| butil::IOBuf append_buf; |
| PStreamHeader header; |
| std::string data; |
| write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, |
| 0, data, true); |
| |
| EXPECT_EQ(g_response_stat.num, 0); |
| PTabletID tablet; |
| tablet.set_partition_id(NORMAL_PARTITION_ID); |
| tablet.set_index_id(NORMAL_INDEX_ID); |
| tablet.set_tablet_id(NORMAL_TABLET_ID); |
| tablet.set_num_segments(1); |
| // CLOSE_LOAD |
| close_load(client, {tablet}, 1); |
| wait_for_ack(1); |
| EXPECT_EQ(g_response_stat.num, 1); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| |
| // duplicated close |
| close_load(client, {tablet}, 1); |
| wait_for_ack(2); |
| EXPECT_EQ(g_response_stat.num, 2); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| |
| close_load(client, {tablet}, 0); |
| wait_for_ack(3); |
| EXPECT_EQ(g_response_stat.num, 3); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); |
| |
| // server will close stream on CLOSE_LOAD |
| wait_for_close(); |
| EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); |
| } |
| |
| TEST_F(LoadStreamMgrTest, close_load_before_recv_eos) { |
| MockSinkClient client; |
| auto st = client.connect_stream(); |
| EXPECT_TRUE(st.ok()); |
| |
| reset_response_stat(); |
| |
| // append data |
| butil::IOBuf append_buf; |
| PStreamHeader header; |
| std::string data = "file1 hello world 123 !@#$%^&*()_+"; |
| write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, |
| data.length(), data, false); |
| |
| EXPECT_EQ(g_response_stat.num, 0); |
| PTabletID tablet; |
| tablet.set_partition_id(NORMAL_PARTITION_ID); |
| tablet.set_index_id(NORMAL_INDEX_ID); |
| tablet.set_tablet_id(NORMAL_TABLET_ID); |
| tablet.set_num_segments(1); |
| // CLOSE_LOAD before EOS |
| close_load(client, {tablet}); |
| wait_for_ack(1); |
| EXPECT_EQ(g_response_stat.num, 1); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); |
| // server will close stream on CLOSE_LOAD |
| wait_for_close(); |
| EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); |
| |
| // then the late EOS, will not be handled |
| write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, |
| data.length(), data, true); |
| |
| // duplicated close, will not be handled |
| close_load(client, {tablet}); |
| wait_for_ack(2); |
| EXPECT_EQ(g_response_stat.num, 1); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); |
| |
| auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0); |
| EXPECT_EQ(written_data, ""); |
| } |
| |
| TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment0) { |
| MockSinkClient client; |
| auto st = client.connect_stream(); |
| EXPECT_TRUE(st.ok()); |
| |
| reset_response_stat(); |
| |
| // append data |
| butil::IOBuf append_buf; |
| PStreamHeader header; |
| std::string data = "file1 hello world 123 !@#$%^&*()_+"; |
| write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, |
| 0, data, false); |
| write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, |
| data.length(), data, true); |
| |
| EXPECT_EQ(g_response_stat.num, 0); |
| PTabletID tablet; |
| tablet.set_partition_id(NORMAL_PARTITION_ID); |
| tablet.set_index_id(NORMAL_INDEX_ID); |
| tablet.set_tablet_id(NORMAL_TABLET_ID); |
| tablet.set_num_segments(1); |
| // CLOSE_LOAD |
| close_load(client, {tablet}, 1); |
| wait_for_ack(1); |
| EXPECT_EQ(g_response_stat.num, 1); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| |
| // duplicated close |
| close_load(client, {tablet}, 1); |
| wait_for_ack(2); |
| EXPECT_EQ(g_response_stat.num, 2); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| |
| close_load(client, {tablet}, 0); |
| wait_for_ack(3); |
| EXPECT_EQ(g_response_stat.num, 3); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID); |
| |
| auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0); |
| EXPECT_EQ(written_data, data + data); |
| |
| // server will close stream on CLOSE_LOAD |
| wait_for_close(); |
| EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); |
| } |
| |
| TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment_without_eos) { |
| MockSinkClient client; |
| auto st = client.connect_stream(); |
| EXPECT_TRUE(st.ok()); |
| |
| reset_response_stat(); |
| |
| // append data |
| butil::IOBuf append_buf; |
| PStreamHeader header; |
| std::string data = "file1 hello world 123 !@#$%^&*()_+"; |
| write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, |
| 0, data, false); |
| |
| EXPECT_EQ(g_response_stat.num, 0); |
| PTabletID tablet; |
| tablet.set_partition_id(NORMAL_PARTITION_ID); |
| tablet.set_index_id(NORMAL_INDEX_ID); |
| tablet.set_tablet_id(NORMAL_TABLET_ID); |
| tablet.set_num_segments(1); |
| // CLOSE_LOAD |
| close_load(client, {tablet}, 1); |
| wait_for_ack(1); |
| EXPECT_EQ(g_response_stat.num, 1); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| |
| // duplicated close |
| close_load(client, {tablet}, 1); |
| wait_for_ack(2); |
| EXPECT_EQ(g_response_stat.num, 2); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| |
| close_load(client, {tablet}, 0); |
| wait_for_ack(3); |
| EXPECT_EQ(g_response_stat.num, 3); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); |
| |
| // server will close stream on CLOSE_LOAD |
| wait_for_close(); |
| EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); |
| } |
| |
| TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_single_segment1) { |
| MockSinkClient client; |
| auto st = client.connect_stream(); |
| EXPECT_TRUE(st.ok()); |
| |
| reset_response_stat(); |
| |
| // append data |
| butil::IOBuf append_buf; |
| PStreamHeader header; |
| std::string data = "file1 hello world 123 !@#$%^&*()_+"; |
| write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1, |
| 0, data, false); |
| write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1, |
| data.length(), data, true); |
| |
| EXPECT_EQ(g_response_stat.num, 0); |
| PTabletID tablet; |
| tablet.set_partition_id(NORMAL_PARTITION_ID); |
| tablet.set_index_id(NORMAL_INDEX_ID); |
| tablet.set_tablet_id(NORMAL_TABLET_ID); |
| tablet.set_num_segments(1); |
| // CLOSE_LOAD |
| close_load(client, {tablet}, 1); |
| wait_for_ack(1); |
| EXPECT_EQ(g_response_stat.num, 1); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| |
| // duplicated close |
| close_load(client, {tablet}, 1); |
| wait_for_ack(2); |
| EXPECT_EQ(g_response_stat.num, 2); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| |
| close_load(client, {tablet}, 0); |
| wait_for_ack(3); |
| EXPECT_EQ(g_response_stat.num, 3); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 1); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids[0], NORMAL_TABLET_ID); |
| |
| // server will close stream on CLOSE_LOAD |
| wait_for_close(); |
| EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); |
| } |
| |
| TEST_F(LoadStreamMgrTest, one_client_one_index_one_tablet_two_segment) { |
| MockSinkClient client; |
| auto st = client.connect_stream(); |
| EXPECT_TRUE(st.ok()); |
| |
| reset_response_stat(); |
| |
| // append data |
| butil::IOBuf append_buf; |
| PStreamHeader header; |
| std::string data1 = "file1 hello world 123 !@#$%^&*()_+1"; |
| write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, |
| 0, data1, false); |
| std::string empty; |
| write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 0, |
| data1.length(), empty, true); |
| std::string data2 = "file1 hello world 123 !@#$%^&*()_+2"; |
| write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, NORMAL_TABLET_ID, 1, |
| 0, data2, true); |
| |
| EXPECT_EQ(g_response_stat.num, 0); |
| PTabletID tablet; |
| tablet.set_partition_id(NORMAL_PARTITION_ID); |
| tablet.set_index_id(NORMAL_INDEX_ID); |
| tablet.set_tablet_id(NORMAL_TABLET_ID); |
| tablet.set_num_segments(2); |
| // CLOSE_LOAD |
| close_load(client, {tablet}, 1); |
| wait_for_ack(1); |
| EXPECT_EQ(g_response_stat.num, 1); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| |
| // duplicated close |
| close_load(client, {tablet}, 1); |
| wait_for_ack(2); |
| EXPECT_EQ(g_response_stat.num, 2); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| |
| close_load(client, {tablet}, 0); |
| wait_for_ack(3); |
| EXPECT_EQ(g_response_stat.num, 3); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID); |
| |
| auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0); |
| EXPECT_EQ(written_data, data1); |
| |
| written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 1); |
| EXPECT_EQ(written_data, data2); |
| |
| // server will close stream on CLOSE_LOAD |
| wait_for_close(); |
| EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); |
| } |
| |
| TEST_F(LoadStreamMgrTest, one_client_one_index_three_tablet) { |
| MockSinkClient client; |
| auto st = client.connect_stream(); |
| EXPECT_TRUE(st.ok()); |
| |
| reset_response_stat(); |
| |
| // append data |
| butil::IOBuf append_buf; |
| PStreamHeader header; |
| std::string data1 = "file1 hello world 123 !@#$%^&*()_+1"; |
| write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, |
| NORMAL_TABLET_ID + 0, 0, 0, data1, true); |
| write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, |
| NORMAL_TABLET_ID + 1, 0, 0, data1, true); |
| std::string data2 = "file1 hello world 123 !@#$%^&*()_+2"; |
| write_one_tablet(client, NORMAL_LOAD_ID, NORMAL_SENDER_ID, NORMAL_INDEX_ID, |
| NORMAL_TABLET_ID + 2, 0, 0, data2, true); |
| |
| EXPECT_EQ(g_response_stat.num, 0); |
| |
| PTabletID tablet1; |
| tablet1.set_partition_id(NORMAL_PARTITION_ID); |
| tablet1.set_index_id(NORMAL_INDEX_ID); |
| tablet1.set_tablet_id(NORMAL_TABLET_ID); |
| tablet1.set_num_segments(1); |
| PTabletID tablet2 {tablet1}; |
| tablet2.set_tablet_id(NORMAL_TABLET_ID + 1); |
| PTabletID tablet3 {tablet1}; |
| tablet3.set_tablet_id(NORMAL_TABLET_ID + 2); |
| |
| // CLOSE_LOAD |
| close_load(client, {tablet1, tablet2, tablet3}, 1); |
| wait_for_ack(1); |
| EXPECT_EQ(g_response_stat.num, 1); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| |
| // duplicated close |
| close_load(client, {tablet1, tablet2, tablet3}, 1); |
| wait_for_ack(2); |
| EXPECT_EQ(g_response_stat.num, 2); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| |
| close_load(client, {tablet1, tablet2, tablet3}, 0); |
| wait_for_ack(3); |
| EXPECT_EQ(g_response_stat.num, 3); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 3); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| std::set<int64_t> success_tablet_ids; |
| for (auto& id : g_response_stat.success_tablet_ids) { |
| success_tablet_ids.insert(id); |
| } |
| EXPECT_TRUE(success_tablet_ids.count(NORMAL_TABLET_ID)); |
| EXPECT_TRUE(success_tablet_ids.count(NORMAL_TABLET_ID + 1)); |
| EXPECT_TRUE(success_tablet_ids.count(NORMAL_TABLET_ID + 2)); |
| |
| auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0); |
| EXPECT_EQ(written_data, data1); |
| |
| written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID + 1, 0); |
| EXPECT_EQ(written_data, data1); |
| |
| written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID + 2, 0); |
| EXPECT_EQ(written_data, data2); |
| |
| // server will close stream on CLOSE_LOAD |
| wait_for_close(); |
| EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); |
| } |
| |
| TEST_F(LoadStreamMgrTest, two_client_one_index_one_tablet_three_segment) { |
| MockSinkClient clients[2]; |
| |
| for (int i = 0; i < 2; i++) { |
| auto st = clients[i].connect_stream(NORMAL_SENDER_ID + i, 2); |
| EXPECT_TRUE(st.ok()); |
| } |
| reset_response_stat(); |
| |
| std::vector<std::string> segment_data; |
| segment_data.resize(6); |
| // each sender three segments |
| for (int32_t segid = 2; segid >= 0; segid--) { |
| // append data |
| for (int i = 0; i < 2; i++) { |
| butil::IOBuf append_buf; |
| PStreamHeader header; |
| std::string data1 = |
| "sender_id=" + std::to_string(i) + ",segid=" + std::to_string(segid); |
| write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID, |
| NORMAL_TABLET_ID, segid, 0, data1, true); |
| segment_data[i * 3 + segid] = data1; |
| LOG(INFO) << "segment_data[" << i * 3 + segid << "]" << data1; |
| } |
| } |
| |
| EXPECT_EQ(g_response_stat.num, 0); |
| PTabletID tablet; |
| tablet.set_partition_id(NORMAL_PARTITION_ID); |
| tablet.set_index_id(NORMAL_INDEX_ID); |
| tablet.set_tablet_id(NORMAL_TABLET_ID); |
| tablet.set_num_segments(3); |
| // CLOSE_LOAD |
| close_load(clients[1], {tablet}, 1); |
| wait_for_ack(1); |
| EXPECT_EQ(g_response_stat.num, 1); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| |
| // duplicated close |
| close_load(clients[1], {tablet}, 1); |
| wait_for_ack(2); |
| // stream closed, no response will be sent |
| EXPECT_EQ(g_response_stat.num, 1); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| |
| close_load(clients[0], {tablet}, 0); |
| wait_for_ack(2); |
| EXPECT_EQ(g_response_stat.num, 2); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID); |
| |
| auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0); |
| size_t sender_pos = written_data.find('='); |
| size_t sender_end = written_data.find(','); |
| EXPECT_NE(sender_pos, std::string::npos); |
| EXPECT_NE(sender_end, std::string::npos); |
| auto sender_str = written_data.substr(sender_pos + 1, sender_end - sender_pos); |
| LOG(INFO) << "sender_str " << sender_str; |
| uint32_t sender_id = std::stoi(sender_str); |
| |
| for (int i = 0; i < 3; i++) { |
| auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, i); |
| EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]); |
| } |
| sender_id = (sender_id + 1) % 2; |
| for (int i = 0; i < 3; i++) { |
| auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, i + 3); |
| EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]); |
| } |
| |
| // server will close stream on CLOSE_LOAD |
| wait_for_close(); |
| EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); |
| } |
| |
| TEST_F(LoadStreamMgrTest, two_client_one_close_before_the_other_open) { |
| MockSinkClient clients[2]; |
| |
| EXPECT_TRUE(clients[0].connect_stream(NORMAL_SENDER_ID, 2).ok()); |
| |
| reset_response_stat(); |
| |
| std::vector<std::string> segment_data; |
| segment_data.resize(6); |
| for (int32_t segid = 2; segid >= 0; segid--) { |
| for (int i = 0; i < 2; i++) { |
| std::string data = "sender_id=" + std::to_string(i) + ",segid=" + std::to_string(segid); |
| segment_data[i * 3 + segid] = data; |
| LOG(INFO) << "segment_data[" << i * 3 + segid << "]" << data; |
| } |
| } |
| |
| for (int32_t segid = 2; segid >= 0; segid--) { |
| int i = 0; |
| write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID, |
| NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 + segid], true); |
| } |
| |
| EXPECT_EQ(g_response_stat.num, 0); |
| PTabletID tablet; |
| tablet.set_partition_id(NORMAL_PARTITION_ID); |
| tablet.set_index_id(NORMAL_INDEX_ID); |
| tablet.set_tablet_id(NORMAL_TABLET_ID); |
| tablet.set_num_segments(3); |
| // CLOSE_LOAD |
| close_load(clients[0], {tablet}, 0); |
| wait_for_ack(1); |
| EXPECT_EQ(g_response_stat.num, 1); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| |
| // sender 0 closed, before open sender 1, load stream should still be open |
| EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 1); |
| |
| EXPECT_TRUE(clients[1].connect_stream(NORMAL_SENDER_ID + 1, 2).ok()); |
| |
| for (int32_t segid = 2; segid >= 0; segid--) { |
| int i = 1; |
| write_one_tablet(clients[i], NORMAL_LOAD_ID, NORMAL_SENDER_ID + i, NORMAL_INDEX_ID, |
| NORMAL_TABLET_ID, segid, 0, segment_data[i * 3 + segid], true); |
| } |
| |
| close_load(clients[1], {tablet}, 1); |
| wait_for_ack(2); |
| EXPECT_EQ(g_response_stat.num, 2); |
| EXPECT_EQ(g_response_stat.success_tablet_ids.size(), 1); |
| EXPECT_EQ(g_response_stat.failed_tablet_ids.size(), 0); |
| EXPECT_EQ(g_response_stat.success_tablet_ids[0], NORMAL_TABLET_ID); |
| |
| // server will close stream on CLOSE_LOAD |
| wait_for_close(); |
| EXPECT_EQ(_load_stream_mgr->get_load_stream_num(), 0); |
| |
| auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, 0); |
| size_t sender_pos = written_data.find('='); |
| size_t sender_end = written_data.find(','); |
| EXPECT_NE(sender_pos, std::string::npos); |
| EXPECT_NE(sender_end, std::string::npos); |
| auto sender_str = written_data.substr(sender_pos + 1, sender_end - sender_pos); |
| LOG(INFO) << "sender_str " << sender_str; |
| uint32_t sender_id = std::stoi(sender_str); |
| |
| for (int i = 0; i < 3; i++) { |
| auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, i); |
| EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]); |
| } |
| sender_id = (sender_id + 1) % 2; |
| for (int i = 0; i < 3; i++) { |
| auto written_data = read_data(NORMAL_TXN_ID, NORMAL_PARTITION_ID, NORMAL_TABLET_ID, i + 3); |
| EXPECT_EQ(written_data, segment_data[sender_id * 3 + i]); |
| } |
| } |
| |
| } // namespace doris |