blob: c8e36468f731561d06a337d483de29ece631e692 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/tablet_peer.h"
#include <algorithm>
#include <gflags/gflags.h>
#include <mutex>
#include <string>
#include <utility>
#include <vector>
#include "kudu/consensus/consensus.h"
#include "kudu/consensus/consensus_meta.h"
#include "kudu/consensus/log.h"
#include "kudu/consensus/log_util.h"
#include "kudu/consensus/opid_util.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/quorum_util.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/gutil/mathlimits.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/gutil/sysinfo.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/remote_method.h"
#include "kudu/rpc/rpc_service.h"
#include "kudu/rpc/service_pool.h"
#include "kudu/tablet/transactions/transaction_driver.h"
#include "kudu/tablet/transactions/alter_schema_transaction.h"
#include "kudu/tablet/transactions/write_transaction.h"
#include "kudu/tablet/tablet_bootstrap.h"
#include "kudu/tablet/tablet_metrics.h"
#include "kudu/tablet/tablet_peer_mm_ops.h"
#include "kudu/tablet/tablet.pb.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/threadpool.h"
#include "kudu/util/trace.h"
using std::shared_ptr;
using std::unique_ptr;
namespace kudu {
namespace tablet {
METRIC_DEFINE_histogram(tablet, op_prepare_queue_length, "Operation Prepare Queue Length",
MetricUnit::kTasks,
"Number of operations waiting to be prepared within this tablet. "
"High queue lengths indicate that the server is unable to process "
"operations as fast as they are being written to the WAL.",
10000, 2);
METRIC_DEFINE_histogram(tablet, op_prepare_queue_time, "Operation Prepare Queue Time",
MetricUnit::kMicroseconds,
"Time that operations spent waiting in the prepare queue before being "
"processed. High queue times indicate that the server is unable to "
"process operations as fast as they are being written to the WAL.",
10000000, 2);
METRIC_DEFINE_histogram(tablet, op_prepare_run_time, "Operation Prepare Run Time",
MetricUnit::kMicroseconds,
"Time that operations spent being prepared in the tablet. "
"High values may indicate that the server is under-provisioned or "
"that operations are experiencing high contention with one another for "
"locks.",
10000000, 2);
using consensus::Consensus;
using consensus::ConsensusBootstrapInfo;
using consensus::ConsensusMetadata;
using consensus::ConsensusOptions;
using consensus::ConsensusRound;
using consensus::OpId;
using consensus::RaftConfigPB;
using consensus::RaftPeerPB;
using consensus::RaftConsensus;
using consensus::ALTER_SCHEMA_OP;
using consensus::WRITE_OP;
using log::Log;
using log::LogAnchorRegistry;
using rpc::Messenger;
using rpc::ResultTracker;
using strings::Substitute;
using tserver::TabletServerErrorPB;
// ============================================================================
// Tablet Peer
// ============================================================================
TabletPeer::TabletPeer(const scoped_refptr<TabletMetadata>& meta,
const consensus::RaftPeerPB& local_peer_pb,
ThreadPool* apply_pool,
Callback<void(const std::string& reason)> mark_dirty_clbk)
: meta_(meta),
tablet_id_(meta->tablet_id()),
local_peer_pb_(local_peer_pb),
state_(NOT_STARTED),
status_listener_(new TabletStatusListener(meta)),
apply_pool_(apply_pool),
log_anchor_registry_(new LogAnchorRegistry()),
mark_dirty_clbk_(std::move(mark_dirty_clbk)) {}
TabletPeer::~TabletPeer() {
std::lock_guard<simple_spinlock> lock(lock_);
// We should either have called Shutdown(), or we should have never called
// Init().
CHECK(!tablet_)
<< "TabletPeer not fully shut down. State: "
<< TabletStatePB_Name(state_);
}
Status TabletPeer::Init(const shared_ptr<Tablet>& tablet,
const scoped_refptr<server::Clock>& clock,
const shared_ptr<Messenger>& messenger,
const scoped_refptr<ResultTracker>& result_tracker,
const scoped_refptr<Log>& log,
const scoped_refptr<MetricEntity>& metric_entity) {
DCHECK(tablet) << "A TabletPeer must be provided with a Tablet";
DCHECK(log) << "A TabletPeer must be provided with a Log";
RETURN_NOT_OK(ThreadPoolBuilder("prepare").set_max_threads(1).Build(&prepare_pool_));
prepare_pool_->SetQueueLengthHistogram(
METRIC_op_prepare_queue_length.Instantiate(metric_entity));
prepare_pool_->SetQueueTimeMicrosHistogram(
METRIC_op_prepare_queue_time.Instantiate(metric_entity));
prepare_pool_->SetRunTimeMicrosHistogram(
METRIC_op_prepare_run_time.Instantiate(metric_entity));
{
std::lock_guard<simple_spinlock> lock(lock_);
CHECK_EQ(BOOTSTRAPPING, state_);
tablet_ = tablet;
clock_ = clock;
messenger_ = messenger;
log_ = log;
result_tracker_ = result_tracker;
ConsensusOptions options;
options.tablet_id = meta_->tablet_id();
TRACE("Creating consensus instance");
gscoped_ptr<ConsensusMetadata> cmeta;
RETURN_NOT_OK(ConsensusMetadata::Load(meta_->fs_manager(), tablet_id_,
meta_->fs_manager()->uuid(), &cmeta));
consensus_ = RaftConsensus::Create(options,
std::move(cmeta),
local_peer_pb_,
metric_entity,
clock_,
this,
messenger_,
log_.get(),
tablet_->mem_tracker(),
mark_dirty_clbk_);
}
if (tablet_->metrics() != nullptr) {
TRACE("Starting instrumentation");
txn_tracker_.StartInstrumentation(tablet_->GetMetricEntity());
}
txn_tracker_.StartMemoryTracking(tablet_->mem_tracker());
TRACE("TabletPeer::Init() finished");
VLOG(2) << "T " << tablet_id() << " P " << consensus_->peer_uuid() << ": Peer Initted";
return Status::OK();
}
Status TabletPeer::Start(const ConsensusBootstrapInfo& bootstrap_info) {
std::lock_guard<simple_spinlock> l(state_change_lock_);
TRACE("Starting consensus");
VLOG(2) << "T " << tablet_id() << " P " << consensus_->peer_uuid() << ": Peer starting";
VLOG(2) << "RaftConfig before starting: " << consensus_->CommittedConfig().DebugString();
RETURN_NOT_OK(consensus_->Start(bootstrap_info));
{
std::lock_guard<simple_spinlock> lock(lock_);
CHECK_EQ(state_, BOOTSTRAPPING);
state_ = RUNNING;
}
// Because we changed the tablet state, we need to re-report the tablet to the master.
mark_dirty_clbk_.Run("Started TabletPeer");
return Status::OK();
}
const consensus::RaftConfigPB TabletPeer::RaftConfig() const {
CHECK(consensus_) << "consensus is null";
return consensus_->CommittedConfig();
}
void TabletPeer::Shutdown() {
LOG(INFO) << "Initiating TabletPeer shutdown for tablet: " << tablet_id_;
{
std::unique_lock<simple_spinlock> lock(lock_);
if (state_ == QUIESCING || state_ == SHUTDOWN) {
lock.unlock();
WaitUntilShutdown();
return;
}
state_ = QUIESCING;
}
std::lock_guard<simple_spinlock> l(state_change_lock_);
// Even though Tablet::Shutdown() also unregisters its ops, we have to do it here
// to ensure that any currently running operation finishes before we proceed with
// the rest of the shutdown sequence. In particular, a maintenance operation could
// indirectly end up calling into the log, which we are about to shut down.
if (tablet_) tablet_->UnregisterMaintenanceOps();
UnregisterMaintenanceOps();
if (consensus_) consensus_->Shutdown();
// TODO: KUDU-183: Keep track of the pending tasks and send an "abort" message.
LOG_SLOW_EXECUTION(WARNING, 1000,
Substitute("TabletPeer: tablet $0: Waiting for Transactions to complete", tablet_id())) {
txn_tracker_.WaitForAllToFinish();
}
if (prepare_pool_) {
prepare_pool_->Shutdown();
}
if (log_) {
WARN_NOT_OK(log_->Close(), "Error closing the Log.");
}
if (VLOG_IS_ON(1)) {
VLOG(1) << "TabletPeer: tablet " << tablet_id() << " shut down!";
}
if (tablet_) {
tablet_->Shutdown();
}
// Only mark the peer as SHUTDOWN when all other components have shut down.
{
std::lock_guard<simple_spinlock> lock(lock_);
// Release mem tracker resources.
consensus_.reset();
tablet_.reset();
state_ = SHUTDOWN;
}
}
void TabletPeer::WaitUntilShutdown() {
while (true) {
{
std::lock_guard<simple_spinlock> lock(lock_);
if (state_ == SHUTDOWN) {
return;
}
}
SleepFor(MonoDelta::FromMilliseconds(10));
}
}
Status TabletPeer::CheckRunning() const {
{
std::lock_guard<simple_spinlock> lock(lock_);
if (state_ != RUNNING) {
return Status::IllegalState(Substitute("The tablet is not in a running state: $0",
TabletStatePB_Name(state_)));
}
}
return Status::OK();
}
Status TabletPeer::WaitUntilConsensusRunning(const MonoDelta& timeout) {
MonoTime start(MonoTime::Now(MonoTime::FINE));
int backoff_exp = 0;
const int kMaxBackoffExp = 8;
while (true) {
bool has_consensus = false;
TabletStatePB cached_state;
{
std::lock_guard<simple_spinlock> lock(lock_);
cached_state = state_;
if (consensus_) {
has_consensus = true; // consensus_ is a set-once object.
}
}
if (cached_state == QUIESCING || cached_state == SHUTDOWN) {
return Status::IllegalState(
Substitute("The tablet is already shutting down or shutdown. State: $0",
TabletStatePB_Name(cached_state)));
}
if (cached_state == RUNNING && has_consensus && consensus_->IsRunning()) {
break;
}
MonoTime now(MonoTime::Now(MonoTime::FINE));
MonoDelta elapsed(now.GetDeltaSince(start));
if (elapsed.MoreThan(timeout)) {
return Status::TimedOut(Substitute("Consensus is not running after waiting for $0. State; $1",
elapsed.ToString(), TabletStatePB_Name(cached_state)));
}
SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp));
backoff_exp = std::min(backoff_exp + 1, kMaxBackoffExp);
}
return Status::OK();
}
Status TabletPeer::SubmitWrite(unique_ptr<WriteTransactionState> state) {
RETURN_NOT_OK(CheckRunning());
state->SetResultTracker(result_tracker_);
gscoped_ptr<WriteTransaction> transaction(new WriteTransaction(std::move(state),
consensus::LEADER));
scoped_refptr<TransactionDriver> driver;
RETURN_NOT_OK(NewLeaderTransactionDriver(transaction.PassAs<Transaction>(),
&driver));
return driver->ExecuteAsync();
}
Status TabletPeer::SubmitAlterSchema(unique_ptr<AlterSchemaTransactionState> state) {
RETURN_NOT_OK(CheckRunning());
gscoped_ptr<AlterSchemaTransaction> transaction(
new AlterSchemaTransaction(std::move(state), consensus::LEADER));
scoped_refptr<TransactionDriver> driver;
RETURN_NOT_OK(NewLeaderTransactionDriver(transaction.PassAs<Transaction>(), &driver));
return driver->ExecuteAsync();
}
void TabletPeer::GetTabletStatusPB(TabletStatusPB* status_pb_out) const {
std::lock_guard<simple_spinlock> lock(lock_);
DCHECK(status_pb_out != nullptr);
DCHECK(status_listener_.get() != nullptr);
status_pb_out->set_tablet_id(status_listener_->tablet_id());
status_pb_out->set_table_name(status_listener_->table_name());
status_pb_out->set_last_status(status_listener_->last_status());
status_listener_->partition().ToPB(status_pb_out->mutable_partition());
status_pb_out->set_state(state_);
status_pb_out->set_tablet_data_state(meta_->tablet_data_state());
if (tablet_) {
status_pb_out->set_estimated_on_disk_size(tablet_->EstimateOnDiskSize());
}
}
Status TabletPeer::RunLogGC() {
if (!CheckRunning().ok()) {
return Status::OK();
}
int64_t min_log_index;
int32_t num_gced;
GetEarliestNeededLogIndex(&min_log_index);
Status s = log_->GC(min_log_index, &num_gced);
if (!s.ok()) {
s = s.CloneAndPrepend("Unexpected error while running Log GC from TabletPeer");
LOG(ERROR) << s.ToString();
}
return Status::OK();
}
void TabletPeer::SetFailed(const Status& error) {
std::lock_guard<simple_spinlock> lock(lock_);
state_ = FAILED;
error_ = error;
status_listener_->StatusMessage(error.ToString());
}
string TabletPeer::HumanReadableState() const {
std::lock_guard<simple_spinlock> lock(lock_);
TabletDataState data_state = meta_->tablet_data_state();
// If failed, any number of things could have gone wrong.
if (state_ == FAILED) {
return Substitute("$0 ($1): $2", TabletStatePB_Name(state_),
TabletDataState_Name(data_state),
error_.ToString());
// If it's copying, or tombstoned, that is the important thing
// to show.
} else if (data_state != TABLET_DATA_READY) {
return TabletDataState_Name(data_state);
}
// Otherwise, the tablet's data is in a "normal" state, so we just display
// the runtime state (BOOTSTRAPPING, RUNNING, etc).
return TabletStatePB_Name(state_);
}
void TabletPeer::GetInFlightTransactions(Transaction::TraceType trace_type,
vector<consensus::TransactionStatusPB>* out) const {
vector<scoped_refptr<TransactionDriver> > pending_transactions;
txn_tracker_.GetPendingTransactions(&pending_transactions);
for (const scoped_refptr<TransactionDriver>& driver : pending_transactions) {
if (driver->state() != nullptr) {
consensus::TransactionStatusPB status_pb;
status_pb.mutable_op_id()->CopyFrom(driver->GetOpId());
switch (driver->tx_type()) {
case Transaction::WRITE_TXN:
status_pb.set_tx_type(consensus::WRITE_OP);
break;
case Transaction::ALTER_SCHEMA_TXN:
status_pb.set_tx_type(consensus::ALTER_SCHEMA_OP);
break;
}
status_pb.set_description(driver->ToString());
int64_t running_for_micros =
MonoTime::Now(MonoTime::FINE).GetDeltaSince(driver->start_time()).ToMicroseconds();
status_pb.set_running_for_micros(running_for_micros);
if (trace_type == Transaction::TRACE_TXNS) {
status_pb.set_trace_buffer(driver->trace()->DumpToString());
}
out->push_back(status_pb);
}
}
}
void TabletPeer::GetEarliestNeededLogIndex(int64_t* min_index) const {
// First, we anchor on the last OpId in the Log to establish a lower bound
// and avoid racing with the other checks. This limits the Log GC candidate
// segments before we check the anchors.
{
OpId last_log_op;
log_->GetLatestEntryOpId(&last_log_op);
*min_index = last_log_op.index();
}
// If we never have written to the log, no need to proceed.
if (*min_index == 0) return;
// Next, we interrogate the anchor registry.
// Returns OK if minimum known, NotFound if no anchors are registered.
{
int64_t min_anchor_index;
Status s = log_anchor_registry_->GetEarliestRegisteredLogIndex(&min_anchor_index);
if (PREDICT_FALSE(!s.ok())) {
DCHECK(s.IsNotFound()) << "Unexpected error calling LogAnchorRegistry: " << s.ToString();
} else {
*min_index = std::min(*min_index, min_anchor_index);
}
}
// Next, interrogate the TransactionTracker.
vector<scoped_refptr<TransactionDriver> > pending_transactions;
txn_tracker_.GetPendingTransactions(&pending_transactions);
for (const scoped_refptr<TransactionDriver>& driver : pending_transactions) {
OpId tx_op_id = driver->GetOpId();
// A transaction which doesn't have an opid hasn't been submitted for replication yet and
// thus has no need to anchor the log.
if (tx_op_id.IsInitialized()) {
*min_index = std::min(*min_index, tx_op_id.index());
}
}
}
Status TabletPeer::GetMaxIndexesToSegmentSizeMap(MaxIdxToSegmentSizeMap* idx_size_map) const {
RETURN_NOT_OK(CheckRunning());
int64_t min_op_idx;
GetEarliestNeededLogIndex(&min_op_idx);
log_->GetMaxIndexesToSegmentSizeMap(min_op_idx, idx_size_map);
return Status::OK();
}
Status TabletPeer::GetGCableDataSize(int64_t* retention_size) const {
RETURN_NOT_OK(CheckRunning());
int64_t min_op_idx;
GetEarliestNeededLogIndex(&min_op_idx);
log_->GetGCableDataSize(min_op_idx, retention_size);
return Status::OK();
}
Status TabletPeer::StartReplicaTransaction(const scoped_refptr<ConsensusRound>& round) {
{
std::lock_guard<simple_spinlock> lock(lock_);
if (state_ != RUNNING && state_ != BOOTSTRAPPING) {
return Status::IllegalState(TabletStatePB_Name(state_));
}
}
consensus::ReplicateMsg* replicate_msg = round->replicate_msg();
DCHECK(replicate_msg->has_timestamp());
gscoped_ptr<Transaction> transaction;
switch (replicate_msg->op_type()) {
case WRITE_OP:
{
DCHECK(replicate_msg->has_write_request()) << "WRITE_OP replica"
" transaction must receive a WriteRequestPB";
unique_ptr<WriteTransactionState> tx_state(
new WriteTransactionState(
this,
&replicate_msg->write_request(),
replicate_msg->has_request_id() ? &replicate_msg->request_id() : nullptr));
tx_state->SetResultTracker(result_tracker_);
transaction.reset(new WriteTransaction(std::move(tx_state), consensus::REPLICA));
break;
}
case ALTER_SCHEMA_OP:
{
DCHECK(replicate_msg->has_alter_schema_request()) << "ALTER_SCHEMA_OP replica"
" transaction must receive an AlterSchemaRequestPB";
unique_ptr<AlterSchemaTransactionState> tx_state(
new AlterSchemaTransactionState(this, &replicate_msg->alter_schema_request(),
nullptr));
transaction.reset(
new AlterSchemaTransaction(std::move(tx_state), consensus::REPLICA));
break;
}
default:
LOG(FATAL) << "Unsupported Operation Type";
}
// TODO(todd) Look at wiring the stuff below on the driver
TransactionState* state = transaction->state();
state->set_consensus_round(round);
Timestamp ts(replicate_msg->timestamp());
state->set_timestamp(ts);
clock_->Update(ts);
scoped_refptr<TransactionDriver> driver;
RETURN_NOT_OK(NewReplicaTransactionDriver(std::move(transaction), &driver));
// Unretained is required to avoid a refcount cycle.
state->consensus_round()->SetConsensusReplicatedCallback(
Bind(&TransactionDriver::ReplicationFinished, Unretained(driver.get())));
RETURN_NOT_OK(driver->ExecuteAsync());
return Status::OK();
}
Status TabletPeer::NewLeaderTransactionDriver(gscoped_ptr<Transaction> transaction,
scoped_refptr<TransactionDriver>* driver) {
scoped_refptr<TransactionDriver> tx_driver = new TransactionDriver(
&txn_tracker_,
consensus_.get(),
log_.get(),
prepare_pool_.get(),
apply_pool_,
&txn_order_verifier_);
RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::LEADER));
driver->swap(tx_driver);
return Status::OK();
}
Status TabletPeer::NewReplicaTransactionDriver(gscoped_ptr<Transaction> transaction,
scoped_refptr<TransactionDriver>* driver) {
scoped_refptr<TransactionDriver> tx_driver = new TransactionDriver(
&txn_tracker_,
consensus_.get(),
log_.get(),
prepare_pool_.get(),
apply_pool_,
&txn_order_verifier_);
RETURN_NOT_OK(tx_driver->Init(std::move(transaction), consensus::REPLICA));
driver->swap(tx_driver);
return Status::OK();
}
void TabletPeer::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) {
// Taking state_change_lock_ ensures that we don't shut down concurrently with
// this last start-up task.
std::lock_guard<simple_spinlock> l(state_change_lock_);
if (state() != RUNNING) {
LOG(WARNING) << "Not registering maintenance operations for " << tablet_
<< ": tablet not in RUNNING state";
return;
}
DCHECK(maintenance_ops_.empty());
gscoped_ptr<MaintenanceOp> mrs_flush_op(new FlushMRSOp(this));
maint_mgr->RegisterOp(mrs_flush_op.get());
maintenance_ops_.push_back(mrs_flush_op.release());
gscoped_ptr<MaintenanceOp> dms_flush_op(new FlushDeltaMemStoresOp(this));
maint_mgr->RegisterOp(dms_flush_op.get());
maintenance_ops_.push_back(dms_flush_op.release());
gscoped_ptr<MaintenanceOp> log_gc(new LogGCOp(this));
maint_mgr->RegisterOp(log_gc.get());
maintenance_ops_.push_back(log_gc.release());
tablet_->RegisterMaintenanceOps(maint_mgr);
}
void TabletPeer::UnregisterMaintenanceOps() {
DCHECK(state_change_lock_.is_locked());
for (MaintenanceOp* op : maintenance_ops_) {
op->Unregister();
}
STLDeleteElements(&maintenance_ops_);
}
Status FlushInflightsToLogCallback::WaitForInflightsAndFlushLog() {
// This callback is triggered prior to any TabletMetadata flush.
// The guarantee that we are trying to enforce is this:
//
// If an operation has been flushed to stable storage (eg a DRS or DeltaFile)
// then its COMMIT message must be present in the log.
//
// The purpose for this is so that, during bootstrap, we can accurately identify
// whether each operation has been flushed. If we don't see a COMMIT message for
// an operation, then we assume it was not completely applied and needs to be
// re-applied. Thus, if we had something on disk but with no COMMIT message,
// we'd attempt to double-apply the write, resulting in an error (eg trying to
// delete an already-deleted row).
//
// So, to enforce this property, we do two steps:
//
// 1) Wait for any operations which are already mid-Apply() to Commit() in MVCC.
//
// Because the operations always enqueue their COMMIT message to the log
// before calling Commit(), this ensures that any in-flight operations have
// their commit messages "en route".
//
// NOTE: we only wait for those operations that have started their Apply() phase.
// Any operations which haven't yet started applying haven't made any changes
// to in-memory state: thus, they obviously couldn't have made any changes to
// on-disk storage either (data can only get to the disk by going through an in-memory
// store). Only those that have started Apply() could have potentially written some
// data which is now on disk.
//
// Perhaps more importantly, if we waited on operations that hadn't started their
// Apply() phase, we might be waiting forever -- for example, if a follower has been
// partitioned from its leader, it may have operations sitting around in flight
// for quite a long time before eventually aborting or committing. This would
// end up blocking all flushes if we waited on it.
//
// 2) Flush the log
//
// This ensures that the above-mentioned commit messages are not just enqueued
// to the log, but also on disk.
VLOG(1) << "T " << tablet_->metadata()->tablet_id()
<< ": Waiting for in-flight transactions to commit.";
LOG_SLOW_EXECUTION(WARNING, 200, "Committing in-flights took a long time.") {
tablet_->mvcc_manager()->WaitForApplyingTransactionsToCommit();
}
VLOG(1) << "T " << tablet_->metadata()->tablet_id()
<< ": Waiting for the log queue to be flushed.";
LOG_SLOW_EXECUTION(WARNING, 200, "Flushing the Log queue took a long time.") {
RETURN_NOT_OK(log_->WaitUntilAllFlushed());
}
return Status::OK();
}
} // namespace tablet
} // namespace kudu