| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/tserver/tablet_server-test-base.h" |
| |
| #include <algorithm> |
| #include <cstddef> |
| #include <cstdint> |
| #include <iostream> |
| #include <memory> |
| #include <string> |
| #include <vector> |
| |
| #include <gflags/gflags.h> |
| #include <gflags/gflags_declare.h> |
| #include <glog/logging.h> |
| #include <gtest/gtest.h> |
| |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/iterator.h" |
| #include "kudu/common/partial_row.h" |
| #include "kudu/common/rowblock.h" |
| #include "kudu/common/wire_protocol-test-util.h" |
| #include "kudu/common/wire_protocol.h" |
| #include "kudu/common/wire_protocol.pb.h" |
| #include "kudu/consensus/raft_consensus.h" |
| #include "kudu/gutil/stringprintf.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/rpc/messenger.h" |
| #include "kudu/rpc/rpc_controller.h" |
| #include "kudu/tablet/local_tablet_writer.h" |
| #include "kudu/tablet/mvcc.h" |
| #include "kudu/tablet/tablet.h" |
| #include "kudu/tablet/tablet_replica.h" |
| #include "kudu/tserver/mini_tablet_server.h" |
| #include "kudu/tserver/tablet_server.h" |
| #include "kudu/tserver/tablet_server_options.h" |
| #include "kudu/tserver/tablet_server_test_util.h" |
| #include "kudu/tserver/ts_tablet_manager.h" |
| #include "kudu/tserver/tserver_service.proxy.h" |
| #include "kudu/util/memory/arena.h" |
| #include "kudu/util/metrics.h" |
| #include "kudu/util/monotime.h" |
| #include "kudu/util/net/net_util.h" |
| #include "kudu/util/pb_util.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/test_graph.h" |
| #include "kudu/util/test_macros.h" |
| #include "kudu/util/test_util.h" |
| |
| DEFINE_int32(rpc_timeout, 1000, "Timeout for RPC calls, in seconds"); |
| DECLARE_bool(enable_maintenance_manager); |
| DECLARE_int32(heartbeat_rpc_timeout_ms); |
| |
| METRIC_DEFINE_entity(test); |
| |
| using kudu::pb_util::SecureDebugString; |
| using kudu::pb_util::SecureShortDebugString; |
| using kudu::rpc::RpcController; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| namespace kudu { |
| namespace tserver { |
| |
| const char* TabletServerTestBase::kTableId = "TestTable"; |
| const char* TabletServerTestBase::kTabletId = "ffffffffffffffffffffffffffffffff"; |
| |
| TabletServerTestBase::TabletServerTestBase() |
| : schema_(GetSimpleTestSchema()), |
| ts_test_metric_entity_(METRIC_ENTITY_test.Instantiate( |
| &ts_test_metric_registry_, "ts_server-test")) { |
| // Disable the maintenance ops manager since we want to trigger our own |
| // maintenance operations at predetermined times. |
| FLAGS_enable_maintenance_manager = false; |
| |
| // Decrease heartbeat timeout: we keep re-trying heartbeats when a |
| // single master server fails due to a network error. Decreasing |
| // the hearbeat timeout to 1 second speeds up unit tests which |
| // purposefully specify non-running Master servers. |
| FLAGS_heartbeat_rpc_timeout_ms = 1000; |
| } |
| |
| // Starts the tablet server, override to start it later. |
| void TabletServerTestBase::SetUp() { |
| KuduTest::SetUp(); |
| |
| key_schema_ = schema_.CreateKeyProjection(); |
| rb_.reset(new RowBuilder(&schema_)); |
| |
| rpc::MessengerBuilder bld("Client"); |
| ASSERT_OK(bld.Build(&client_messenger_)); |
| } |
| |
| void TabletServerTestBase::StartTabletServer(int num_data_dirs) { |
| CHECK(!mini_server_); |
| |
| // Start server with an invalid master address, so it never successfully |
| // heartbeats, even if there happens to be a master running on this machine. |
| mini_server_.reset(new MiniTabletServer(GetTestPath("TabletServerTest-fsroot"), |
| HostPort("127.0.0.1", 0), num_data_dirs)); |
| mini_server_->options()->master_addresses.clear(); |
| mini_server_->options()->master_addresses.emplace_back("255.255.255.255", 1); |
| ASSERT_OK(mini_server_->Start()); |
| |
| // Set up a tablet inside the server. |
| ASSERT_OK(mini_server_->AddTestTablet(kTableId, kTabletId, schema_)); |
| ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kTabletId, |
| &tablet_replica_)); |
| |
| // Creating a tablet is async, we wait here instead of having to handle errors later. |
| ASSERT_OK(WaitForTabletRunning(kTabletId)); |
| |
| // Connect to it. |
| ResetClientProxies(); |
| } |
| |
| Status TabletServerTestBase::WaitForTabletRunning(const char *tablet_id) { |
| scoped_refptr<tablet::TabletReplica> tablet_replica; |
| const auto* tablet_manager = mini_server_->server()->tablet_manager(); |
| const auto kTimeout = MonoDelta::FromSeconds(10); |
| RETURN_NOT_OK(tablet_manager->GetTabletReplica(tablet_id, &tablet_replica)); |
| RETURN_NOT_OK(tablet_replica->WaitUntilConsensusRunning(kTimeout)); |
| RETURN_NOT_OK( |
| tablet_replica->consensus()->WaitUntilLeaderForTests(kTimeout)); |
| |
| // KUDU-2463: Even though the tablet thinks its leader, for correctness, it |
| // must wait to finish replicating its no-op (even as a single replica) |
| // before being available to scans. |
| MonoTime deadline = MonoTime::Now() + kTimeout; |
| while (!tablet_replica->tablet()->mvcc_manager()->CheckIsSafeTimeInitialized().ok()) { |
| if (MonoTime::Now() >= deadline) { |
| return Status::TimedOut("mvcc did not advance safe time within timeout"); |
| } |
| SleepFor(MonoDelta::FromMilliseconds(10)); |
| } |
| |
| // KUDU-2444: Even though the tablet replica is fully running, the tablet |
| // manager may regard it as still transitioning to the running state. |
| return tablet_manager->WaitForNoTransitionsForTests(MonoDelta::FromSeconds(10)); |
| } |
| |
| void TabletServerTestBase::UpdateTestRowRemote(int32_t row_idx, |
| int32_t new_val, |
| TimeSeries* ts) { |
| WriteRequestPB req; |
| req.set_tablet_id(kTabletId); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| |
| WriteResponsePB resp; |
| RpcController controller; |
| controller.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout)); |
| string new_string_val(Substitute("mutated$0", row_idx)); |
| |
| AddTestRowToPB(RowOperationsPB::UPDATE, schema_, row_idx, new_val, new_string_val, |
| req.mutable_row_operations()); |
| ASSERT_OK(proxy_->Write(req, &resp, &controller)); |
| |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()) << SecureShortDebugString(resp); |
| ASSERT_EQ(0, resp.per_row_errors_size()); |
| if (ts) { |
| ts->AddValue(1); |
| } |
| } |
| |
| void TabletServerTestBase::ResetClientProxies() { |
| CreateTsClientProxies(mini_server_->bound_rpc_addr(), |
| client_messenger_, |
| &tablet_copy_proxy_, |
| &proxy_, |
| &admin_proxy_, |
| &consensus_proxy_, |
| &generic_proxy_); |
| } |
| |
| // Inserts 'num_rows' test rows directly into the tablet (i.e not via RPC) |
| void TabletServerTestBase::InsertTestRowsDirect(int32_t start_row, |
| int32_t num_rows) { |
| tablet::LocalTabletWriter writer(tablet_replica_->tablet(), &schema_); |
| KuduPartialRow row(&schema_); |
| for (int32_t i = 0; i < num_rows; i++) { |
| BuildTestRow(start_row + i, &row); |
| CHECK_OK(writer.Insert(row)); |
| } |
| } |
| |
| // Inserts 'num_rows' test rows remotely into the tablet (i.e via RPC) |
| // Rows are grouped in batches of 'count'/'num_batches' size. |
| // Batch size defaults to 1. |
| void TabletServerTestBase::InsertTestRowsRemote( |
| int32_t first_row, |
| int32_t count, |
| int32_t num_batches, |
| TabletServerServiceProxy* proxy, |
| string tablet_id, |
| vector<uint64_t>* write_timestamps_collector, |
| TimeSeries* ts, |
| bool string_field_defined) { |
| if (!proxy) { |
| proxy = proxy_.get(); |
| } |
| |
| if (num_batches == -1) { |
| num_batches = count; |
| } |
| |
| WriteRequestPB req; |
| req.set_tablet_id(std::move(tablet_id)); |
| |
| WriteResponsePB resp; |
| RpcController controller; |
| |
| RowOperationsPB* data = req.mutable_row_operations(); |
| |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| |
| int32_t inserted_since_last_report = 0; |
| for (int i = 0; i < num_batches; ++i) { |
| |
| // reset the controller and the request |
| controller.Reset(); |
| controller.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout)); |
| data->Clear(); |
| |
| int32_t first_row_in_batch = first_row + (i * count / num_batches); |
| int32_t last_row_in_batch = first_row_in_batch + count / num_batches; |
| |
| for (int32_t j = first_row_in_batch; j < last_row_in_batch; j++) { |
| string str_val = Substitute("original$0", j); |
| const char* cstr_val = str_val.c_str(); |
| if (!string_field_defined) { |
| cstr_val = NULL; |
| } |
| AddTestRowWithNullableStringToPB(RowOperationsPB::INSERT, schema_, j, j, |
| cstr_val, data); |
| } |
| CHECK_OK(DCHECK_NOTNULL(proxy)->Write(req, &resp, &controller)); |
| if (write_timestamps_collector) { |
| write_timestamps_collector->push_back(resp.timestamp()); |
| } |
| |
| if (resp.has_error() || resp.per_row_errors_size() > 0) { |
| LOG(FATAL) << "Failed to insert batch " |
| << first_row_in_batch << "-" << last_row_in_batch |
| << ": " << SecureDebugString(resp); |
| } |
| |
| inserted_since_last_report += count / num_batches; |
| if ((inserted_since_last_report > 100) && ts) { |
| ts->AddValue(static_cast<double>(inserted_since_last_report)); |
| inserted_since_last_report = 0; |
| } |
| } |
| |
| if (ts) { |
| ts->AddValue(static_cast<double>(inserted_since_last_report)); |
| } |
| } |
| |
| // Delete specified test row range. |
| void TabletServerTestBase::DeleteTestRowsRemote(int32_t first_row, |
| int32_t count, |
| TabletServerServiceProxy* proxy, |
| string tablet_id) { |
| if (!proxy) { |
| proxy = proxy_.get(); |
| } |
| |
| WriteRequestPB req; |
| WriteResponsePB resp; |
| RpcController controller; |
| |
| req.set_tablet_id(std::move(tablet_id)); |
| ASSERT_OK(SchemaToPB(schema_, req.mutable_schema())); |
| |
| RowOperationsPB* ops = req.mutable_row_operations(); |
| for (int32_t rowid = first_row; rowid < first_row + count; rowid++) { |
| AddTestKeyToPB(RowOperationsPB::DELETE, schema_, rowid, ops); |
| } |
| |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(DCHECK_NOTNULL(proxy)->Write(req, &resp, &controller)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()) << SecureShortDebugString(resp); |
| } |
| |
| void TabletServerTestBase::BuildTestRow(int index, KuduPartialRow* row) { |
| ASSERT_OK(row->SetInt32(0, index)); |
| ASSERT_OK(row->SetInt32(1, index * 2)); |
| ASSERT_OK(row->SetStringCopy(2, StringPrintf("hello %d", index))); |
| } |
| |
| void TabletServerTestBase::DrainScannerToStrings(const string& scanner_id, |
| const Schema& projection, |
| vector<string>* results, |
| TabletServerServiceProxy* proxy, |
| uint32_t call_seq_id) { |
| if (!proxy) { |
| proxy = proxy_.get(); |
| } |
| |
| RpcController rpc; |
| rpc.set_timeout(MonoDelta::FromSeconds(FLAGS_rpc_timeout)); |
| ScanRequestPB req; |
| ScanResponsePB resp; |
| req.set_scanner_id(scanner_id); |
| |
| // NOTE: we do not sort the results here, since this function is used |
| // by test cases which are verifying the server side's ability to |
| // do ordered scans. |
| do { |
| rpc.Reset(); |
| req.set_batch_size_bytes(10000); |
| req.set_call_seq_id(call_seq_id); |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(DCHECK_NOTNULL(proxy)->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_FALSE(resp.has_error()); |
| |
| StringifyRowsFromResponse(projection, rpc, &resp, results); |
| call_seq_id += 1; |
| } while (resp.has_more_results()); |
| } |
| |
| void TabletServerTestBase::StringifyRowsFromResponse( |
| const Schema& projection, |
| const RpcController& rpc, |
| ScanResponsePB* resp, |
| vector<string>* results) { |
| RowwiseRowBlockPB* rrpb = resp->mutable_data(); |
| Slice direct, indirect; // sidecar data buffers |
| ASSERT_OK(rpc.GetInboundSidecar(rrpb->rows_sidecar(), &direct)); |
| if (rrpb->has_indirect_data_sidecar()) { |
| ASSERT_OK(rpc.GetInboundSidecar(rrpb->indirect_data_sidecar(), |
| &indirect)); |
| } |
| vector<const uint8_t*> rows; |
| ASSERT_OK(ExtractRowsFromRowBlockPB(projection, *rrpb, |
| indirect, &direct, &rows)); |
| VLOG(1) << "Round trip got " << rows.size() << " rows"; |
| for (const uint8_t* row_ptr : rows) { |
| ConstContiguousRow row(&projection, row_ptr); |
| results->push_back(projection.DebugRow(row)); |
| } |
| } |
| |
| void TabletServerTestBase::ShutdownTablet() { |
| if (mini_server_.get()) { |
| // The TabletReplica must be destroyed before the TS, otherwise data |
| // blocks may be destroyed after their owning block manager. |
| tablet_replica_.reset(); |
| mini_server_->Shutdown(); |
| mini_server_.reset(); |
| } |
| } |
| |
| Status TabletServerTestBase::ShutdownAndRebuildTablet(int num_data_dirs) { |
| ShutdownTablet(); |
| |
| // Start server. |
| mini_server_.reset(new MiniTabletServer(GetTestPath("TabletServerTest-fsroot"), |
| HostPort("127.0.0.1", 0), num_data_dirs)); |
| mini_server_->options()->master_addresses.clear(); |
| mini_server_->options()->master_addresses.emplace_back("255.255.255.255", 1); |
| // this should open the tablet created on StartTabletServer() |
| RETURN_NOT_OK(mini_server_->Start()); |
| |
| // Don't RETURN_NOT_OK immediately -- even if we fail, we may still get a TabletReplica object |
| // which has information about the failure. |
| Status wait_status = mini_server_->WaitStarted(); |
| bool found_peer = mini_server_->server()->tablet_manager()->LookupTablet( |
| kTabletId, &tablet_replica_); |
| RETURN_NOT_OK(wait_status); |
| if (!found_peer) { |
| return Status::NotFound("Tablet was not found"); |
| } |
| |
| // Connect to it. |
| ResetClientProxies(); |
| |
| // Opening a tablet is async, we wait here instead of having to handle errors later. |
| return WaitForTabletRunning(kTabletId); |
| } |
| |
| // Verifies that a set of expected rows (key, value) is present in the tablet. |
| void TabletServerTestBase::VerifyRows(const Schema& schema, |
| const vector<KeyValue>& expected) { |
| unique_ptr<RowwiseIterator> iter; |
| ASSERT_OK(tablet_replica_->tablet()->NewRowIterator(schema, &iter)); |
| ASSERT_OK(iter->Init(nullptr)); |
| |
| int batch_size = std::max<int>(1, |
| std::min<int>(expected.size() / 10, |
| 4*1024*1024 / schema.byte_size())); |
| |
| Arena arena(32*1024); |
| RowBlock block(&schema, batch_size, &arena); |
| |
| int count = 0; |
| while (iter->HasNext()) { |
| ASSERT_OK_FAST(iter->NextBlock(&block)); |
| RowBlockRow rb_row = block.row(0); |
| for (int i = 0; i < block.nrows(); i++) { |
| if (block.selection_vector()->IsRowSelected(i)) { |
| rb_row.Reset(&block, i); |
| VLOG(1) << "Verified row " << schema.DebugRow(rb_row); |
| ASSERT_LT(count, expected.size()) << "Got more rows than expected!"; |
| EXPECT_EQ(expected[count].first, *schema.ExtractColumnFromRow<INT32>(rb_row, 0)) |
| << "Key mismatch at row: " << count; |
| EXPECT_EQ(expected[count].second, *schema.ExtractColumnFromRow<INT32>(rb_row, 1)) |
| << "Value mismatch at row: " << count; |
| count++; |
| } |
| } |
| } |
| ASSERT_EQ(count, expected.size()); |
| } |
| |
| void TabletServerTestBase::VerifyScanRequestFailure(const ScanRequestPB& req, |
| TabletServerErrorPB::Code expected_code, |
| const char *expected_message) { |
| ScanResponsePB resp; |
| RpcController rpc; |
| |
| // Send the call. |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, &resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(resp)); |
| ASSERT_TRUE(resp.has_error()); |
| ASSERT_EQ(expected_code, resp.error().code()); |
| ASSERT_STR_CONTAINS(resp.error().status().message(), expected_message); |
| } |
| } |
| |
| void TabletServerTestBase::VerifyScanRequestFailure(const Schema& projection, |
| TabletServerErrorPB::Code expected_code, |
| const char *expected_message) { |
| ScanRequestPB req; |
| NewScanRequestPB* scan = req.mutable_new_scan_request(); |
| scan->set_tablet_id(kTabletId); |
| ASSERT_OK(SchemaToColumnPBs(projection, scan->mutable_projected_columns())); |
| req.set_call_seq_id(0); |
| NO_FATALS(VerifyScanRequestFailure(req, expected_code, expected_message)); |
| } |
| |
| Status TabletServerTestBase::FillNewScanRequest(ReadMode read_mode, NewScanRequestPB* scan) const { |
| scan->set_tablet_id(kTabletId); |
| scan->set_read_mode(read_mode); |
| return SchemaToColumnPBs(schema_, scan->mutable_projected_columns()); |
| } |
| |
| // Open a new scanner which scans all of the columns in the table. |
| void TabletServerTestBase::OpenScannerWithAllColumns(ScanResponsePB* resp, |
| ReadMode read_mode) { |
| ScanRequestPB req; |
| RpcController rpc; |
| |
| // Set up a new request with no predicates, all columns. |
| ASSERT_OK(FillNewScanRequest(read_mode, req.mutable_new_scan_request())); |
| req.set_call_seq_id(0); |
| req.set_batch_size_bytes(0); // so it won't return data right away |
| |
| // Send the call |
| { |
| SCOPED_TRACE(SecureDebugString(req)); |
| ASSERT_OK(proxy_->Scan(req, resp, &rpc)); |
| SCOPED_TRACE(SecureDebugString(*resp)); |
| ASSERT_FALSE(resp->has_error()); |
| ASSERT_TRUE(resp->has_more_results()); |
| } |
| } |
| |
| } // namespace tserver |
| } // namespace kudu |