blob: c94837c6fac7ca61cf6031ecc22e1b1527c28b88 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <deque>
#include <memory>
#include <string>
#include <utility>
#include <glog/logging.h>
#include <google/protobuf/arena.h>
#include "kudu/common/common.pb.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/consensus/raft_consensus.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/rpc/result_tracker.h"
#include "kudu/rpc/rpc_header.pb.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/countdown_latch.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/status.h"
namespace google {
namespace protobuf {
class Message;
namespace kudu {
class Schema;
namespace tablet {
class OpCompletionCallback;
class OpState;
class TabletReplica;
// All metrics associated with a Op.
struct OpMetrics {
void Reset();
int successful_inserts;
int insert_ignore_errors;
int successful_upserts;
int successful_updates;
int update_ignore_errors;
int successful_deletes;
int delete_ignore_errors;
uint64_t commit_wait_duration_usec;
// Base class for ops.
// There are different implementations for different types (Write, AlterSchema, etc.).
// OpDriver implementations use Ops along with Consensus to execute
// and replicate operations in a consensus configuration.
class Op {
enum OpType {
enum TraceType {
enum OpResult {
Op(consensus::DriverType type, OpType op_type);
// Returns the OpState for this op.
virtual OpState* state() = 0;
virtual const OpState* state() const = 0;
// Returns whether this op is being executed on the leader or on a
// replica.
consensus::DriverType type() const { return type_; }
// Returns this op's type.
OpType op_type() const { return op_type_; }
// Builds the ReplicateMsg for this op.
virtual void NewReplicateMsg(std::unique_ptr<consensus::ReplicateMsg>* replicate_msg) = 0;
// Executes the prepare phase of this op, the actual actions
// of this phase depend on the op type, but usually are limited
// to what can be done without actually changing data structures and without
// side-effects.
virtual Status Prepare() = 0;
// Aborts the prepare phase.
virtual void AbortPrepare() {}
// Actually starts an op, assigning a timestamp to the op.
// LEADER replicas execute this in or right after Prepare(), while FOLLOWER/LEARNER
// replicas execute this right before the Apply() phase as the op's
// timestamp is only available on the LEADER's commit message.
// Once Started(), state might have leaked to other replicas/local log and the
// op can't be cancelled without issuing an abort message.
virtual Status Start() = 0;
// Executes the Apply() phase of the op, the actual actions of
// this phase depend on the op type, but usually this is the
// method where data-structures are changed.
virtual Status Apply(consensus::CommitMsg** commit_msg) = 0;
// Executed after the op has been applied and the Raft commit message has
// been appended to the log (though it might not be durable yet), or if the
// op was aborted.
// Implementations are expected to perform cleanup on this method, the driver
// will reply to the client after this method call returns.
// 'result' will be either APPLIED or ABORTED, letting implementations
// know what was the final status of the op.
virtual void Finish(OpResult result) {}
// Each implementation should have its own ToString() method.
virtual std::string ToString() const = 0;
virtual ~Op() {}
const consensus::DriverType type_;
const OpType op_type_;
class OpState {
// Returns the request PB associated with this op. May be NULL if
// the op's state has been reset.
virtual const google::protobuf::Message* request() const { return NULL; }
// Returns the response PB associated with this op, or NULL.
// This will only return a non-null object for leader-side ops.
virtual google::protobuf::Message* response() const { return NULL; }
// Returns whether the results of the op are being tracked.
bool are_results_tracked() const {
return result_tracker_.get() != nullptr && has_request_id();
rpc::ResultTracker* result_tracker() const { return result_tracker_.get(); }
void SetResultTracker(const scoped_refptr<rpc::ResultTracker> result_tracker) {
result_tracker_ = result_tracker;
// Sets the ConsensusRound for this op, if this op is
// being executed through the consensus system.
void set_consensus_round(const scoped_refptr<consensus::ConsensusRound>& consensus_round) {
consensus_round_ = consensus_round;
op_id_ = consensus_round_->id();
// Returns the ConsensusRound being used, if this op is being
// executed through the consensus system or NULL if it's not.
consensus::ConsensusRound* consensus_round() {
return consensus_round_.get();
TabletReplica* tablet_replica() const {
return tablet_replica_;
// Return metrics related to this op.
const OpMetrics& metrics() const {
return op_metrics_;
OpMetrics* mutable_metrics() {
return &op_metrics_;
void set_completion_callback(
std::unique_ptr<OpCompletionCallback> completion_clbk) {
completion_clbk_ = std::move(completion_clbk);
// Returns the completion callback.
OpCompletionCallback* completion_callback() {
return DCHECK_NOTNULL(completion_clbk_.get());
// Sets a heap object to be managed by this op's AutoReleasePool.
void AddToAutoReleasePool(std::unique_ptr<Schema> t) {
// Return the arena associated with this op.
// NOTE: this is not a thread-safe arena!
Arena* arena() {
return &arena_;
google::protobuf::Arena* pb_arena() {
return &pb_arena_;
// Each implementation should have its own ToString() method.
virtual std::string ToString() const = 0;
// Sets the timestamp for the op
virtual void set_timestamp(const Timestamp& timestamp) {
// make sure we set the timestamp only once
DCHECK_EQ(timestamp_, Timestamp::kInvalidTimestamp);
timestamp_ = timestamp;
Timestamp timestamp() const {
Timestamp t = timestamp_.load();
DCHECK(t != Timestamp::kInvalidTimestamp);
return t;
bool has_timestamp() const {
return timestamp_.load() != Timestamp::kInvalidTimestamp;
consensus::OpId* mutable_op_id() {
return &op_id_;
const consensus::OpId& op_id() const {
return op_id_;
ExternalConsistencyMode external_consistency_mode() const {
return external_consistency_mode_;
// Returns where the op associated with this OpState had an
// associated op id.
bool has_request_id() const {
return request_id_.has_client_id();
// Returns the request id for the op associated with this OpState.
// Not all ops will have a request id so users of this method should call
// 'has_request_id()' first to make sure it is set.
const rpc::RequestIdPB& request_id() const {
return request_id_;
explicit OpState(TabletReplica* tablet_replica);
virtual ~OpState();
OpMetrics op_metrics_;
// The TabletReplica that is coordinating this op.
TabletReplica* const tablet_replica_;
// The result tracker that will cache the result of this op.
scoped_refptr<rpc::ResultTracker> result_tracker_;
// Optional callback to be called once the op completes.
std::unique_ptr<OpCompletionCallback> completion_clbk_;
std::deque<std::unique_ptr<Schema>> schemas_pool_;
// This operation's timestamp.
// This is only set once during the operation lifecycle, using external synchronization.
// However, it may be concurrently read by ToString(), etc, so it's atomic.
std::atomic<Timestamp> timestamp_;
// The clock error when timestamp_ was read.
uint64_t timestamp_error_;
Arena arena_;
google::protobuf::Arena pb_arena_;
// This OpId stores the canonical "anchor" OpId for this op.
consensus::OpId op_id_;
// The client's id for this op, if there is one.
rpc::RequestIdPB request_id_;
scoped_refptr<consensus::ConsensusRound> consensus_round_;
// The defined consistency mode for this op.
ExternalConsistencyMode external_consistency_mode_;
// A parent class for the callback that gets called when ops
// complete.
// This must be set in the OpState if the op initiator is to
// be notified of when an op completes. The callback belongs to the
// op context and is deleted along with it.
// NOTE: this is a concrete class so that we can use it as a default implementation
// which avoids callers having to keep checking for NULL.
class OpCompletionCallback {
// Allows to set an error for this op and a mapping to a server level code.
// Calling this method does not mean the op is completed.
void set_error(const Status& status, tserver::TabletServerErrorPB::Code code);
void set_error(const Status& status);
bool has_error() const;
const Status& status() const;
tserver::TabletServerErrorPB::Code error_code() const;
// Subclasses should override this.
virtual void OpCompleted();
virtual ~OpCompletionCallback();
Status status_;
tserver::TabletServerErrorPB::Code code_;
// OpCompletionCallback implementation that can be waited on.
// Helper to make async ops, sync.
// This is templated to accept any response PB that has a TabletServerError
// 'error' field and to set the error before performing the latch countdown.
// The callback does *not* take ownership of either latch or response.
template<class ResponsePB>
class LatchOpCompletionCallback : public OpCompletionCallback {
explicit LatchOpCompletionCallback(CountDownLatch* latch,
ResponsePB* response)
: latch_(DCHECK_NOTNULL(latch)),
response_(DCHECK_NOTNULL(response)) {
void OpCompleted() override {
if (!status_.ok()) {
StatusToPB(status_, response_->mutable_error()->mutable_status());
CountDownLatch* latch_;
ResponsePB* response_;
} // namespace tablet
} // namespace kudu