blob: 3616b1831a5dea55a63378f63403ff11d674d65c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstddef>
#include <cstdint>
#include <memory>
#include <mutex>
#include <string>
#include <vector>
#include <boost/optional/optional.hpp>
#include <glog/logging.h>
#include "kudu/common/row_operations.h"
#include "kudu/common/wire_protocol.pb.h"
#include "kudu/consensus/consensus.pb.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/tablet/lock_manager.h"
#include "kudu/tablet/ops/op.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/bitset.h"
#include "kudu/util/locks.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
namespace kudu {
class Schema;
class rw_semaphore;
namespace rpc {
class RequestIdPB;
}
namespace tablet {
class ScopedOp;
class TabletReplica;
class Txn;
class TxResultPB;
struct TxnRowSets;
struct RowOp;
struct TabletComponents;
// Privileges required for write operations.
enum WritePrivilegeType {
INSERT,
UPDATE,
DELETE,
};
static constexpr size_t kWritePrivilegeMax = WritePrivilegeType::DELETE + 1;
std::string WritePrivilegeToString(const WritePrivilegeType& type);
typedef FixedBitSet<WritePrivilegeType, kWritePrivilegeMax> WritePrivileges;
// Adds the required privileges for the given op types to 'privileges'.
void AddWritePrivilegesForRowOperations(const RowOperationsPB::Type& op_type,
WritePrivileges* privileges);
struct WriteAuthorizationContext {
// Checks that the requested operations can be performed with the given
// privileges, returning a NotAuthorized error if not.
Status CheckPrivileges() const;
// Write privileges with which to authorize.
WritePrivileges write_privileges;
// Write operations for a given write request. This is populated while
// decoding a write request.
RowOpTypes requested_op_types;
};
// A OpState for a batch of inserts/mutates. This class holds and
// owns most everything related to an op, including:
// - A RowOp structure for each of the rows being inserted or mutated, which itself
// contains:
// - decoded/projected data
// - row lock reference
// - result of this particular insert/mutate operation, once executed
// - the Replicate and Commit PB messages
//
// All the op related pointers are owned by this class
// and destroyed on Reset() or by the destructor.
//
// IMPORTANT: All the acquired locks will not be released unless the OpState
// is either destroyed or Reset() or release_locks() is called. Beware of this
// or else there will be lock leaks.
//
// Used when logging to WAL in that we keep track of where inserts/updates
// were applied and add that information to the commit message that is stored
// on the WAL.
//
// NOTE: this class isn't thread safe.
class WriteOpState : public OpState {
public:
// A write op for a given replica and the given request. Optionally takes
// 'response' if this write is meant to populate a response (e.g. this is a
// leader op), and a set of write privileges to check before performing the
// ops in a request.
WriteOpState(TabletReplica* tablet_replica,
const tserver::WriteRequestPB* request,
const rpc::RequestIdPB* request_id,
tserver::WriteResponsePB* response = nullptr,
boost::optional<WriteAuthorizationContext> authz_ctx = boost::none);
~WriteOpState();
// Returns the result of this op in its protocol buffers form. The op result
// holds information on exactly which memory stores were mutated in the
// context of this op and can be used to perform recovery.
//
// This releases part of the state of the op, and will crash
// if called more than once.
void ReleaseTxResultPB(TxResultPB* result) const;
// Returns the original client request for this op, if there was
// one.
const tserver::WriteRequestPB* request() const override {
return request_;
}
// Returns the prepared response to the client that will be sent when this op
// is completed, if this op was started by a client.
tserver::WriteResponsePB* response() const override {
return response_;
}
boost::optional<int64_t> txn_id() const {
return request_->has_txn_id() ? boost::make_optional(request_->txn_id()) : boost::none;
}
// Returns the state associated with authorizing this op, or 'none' if no
// authorization is necessary.
const boost::optional<WriteAuthorizationContext>& authz_context() const {
return authz_context_;
}
// Set the MVCC op associated with this Write operation.
// This must be called exactly once, after the timestamp was acquired.
// This also copies the timestamp from the MVCC op into the
// WriteOpState object.
void SetMvccOp(std::unique_ptr<ScopedOp> mvcc_op);
// Set the Tablet components that this op will write into.
// Called exactly once at the beginning of Apply, before applying its
// in-memory edits.
void set_tablet_components(const scoped_refptr<const TabletComponents>& components);
// Set the txn rowsets that this op will write into.
void set_txn_rowsets(const scoped_refptr<TxnRowSets>& rowsets);
// Take a shared lock on the given schema lock.
// This is required prior to decoding rows so that the schema does
// not change in between performing the projection and applying
// the writes.
void AcquireSchemaLock(rw_semaphore* schema_lock);
// Acquire row locks for all of the rows in this Write.
void AcquireRowLocks(LockManager* lock_manager);
// Acquires the lock on the given transaction, setting 'txn_' and
// 'txn_lock_', which must be freed upon finishing this op. Checks if the
// transaction is available to be written to, returning an error if not.
Status AcquireTxnLockCheckOpen(scoped_refptr<Txn> txn);
// Release the already-acquired schema lock.
void ReleaseSchemaLock();
void ReleaseMvccTxn(Op::OpResult result);
void set_schema_at_decode_time(const Schema* schema) {
std::lock_guard<simple_spinlock> l(op_state_lock_);
schema_at_decode_time_ = schema;
}
const Schema* schema_at_decode_time() const {
std::lock_guard<simple_spinlock> l(op_state_lock_);
return schema_at_decode_time_;
}
const TabletComponents* tablet_components() const {
return tablet_components_.get();
}
const TxnRowSets* txn_rowsets() const {
return txn_rowsets_.get();
}
// Notifies the MVCC manager that this operation is about to start applying
// its in-memory edits. After this method is called, the op _must_ FinishApplying()
// within a bounded amount of time (there may be other threads blocked on
// it).
void StartApplying();
// Commits or aborts the MVCC op and releases all locks held.
//
// In the case of COMMITTED, this method makes the inserts and mutations
// performed by this op visible to other op.
//
// Must be called exactly once.
// REQUIRES: StartApplying() was called.
//
// Note: request_ and response_ are set to NULL after this method returns.
void FinishApplyingOrAbort(Op::OpResult result);
// Returns all the prepared row writes for this op. Usually called on the
// apply phase to actually make changes to the tablet.
const std::vector<RowOp*>& row_ops() const {
return row_ops_;
}
// Return the ProbeStats object collecting statistics for op index 'i'.
ProbeStats* mutable_op_stats(int i) {
DCHECK_LT(i, row_ops_.size());
DCHECK(stats_array_);
return &stats_array_[i];
}
// Set the 'row_ops' member based on the given decoded operations.
void SetRowOps(std::vector<DecodedRowOperation> decoded_ops);
void UpdateMetricsForOp(const RowOp& op);
// Resets this OpState, releasing all locks, destroying all prepared writes,
// clearing the op result _and_ committing the current Mvcc op.
void Reset();
std::string ToString() const override;
private:
// Releases all the row locks acquired by this op.
void ReleaseRowLocks();
// Releases the transaction state lock.
void ReleaseTxnLock();
// Reset the RPC request, response, and row_ops_ (which refers to data
// from the request).
void ResetRpcFields();
// An owned version of the response, for follower ops.
tserver::WriteResponsePB owned_response_;
// The lifecycle of these pointers request and response, is not managed by this class.
// These pointers are never null: 'request_' is always set on construction and 'response_' is
// either set to the response passed on the ctor, if there is one, or to point to
// 'owned_response_' if there isn't.
const tserver::WriteRequestPB* request_;
tserver::WriteResponsePB* response_;
// Encapsulates state required to authorize a write request. If 'none', then
// no authorization is required.
boost::optional<WriteAuthorizationContext> authz_context_;
// The row operations which are decoded from the request during Prepare().
// Protected by op_state_lock_.
std::vector<RowOp*> row_ops_;
// Holds the LockManager locks acquired for this operation.
ScopedRowLock rows_lock_;
// Array of ProbeStats for each of the operations in 'row_ops_'.
// Allocated from this op's arena during SetRowOps().
ProbeStats* stats_array_ = nullptr;
// The MVCC op, set up during PREPARE phase
std::unique_ptr<ScopedOp> mvcc_op_;
// The tablet components, acquired at the same time as mvcc_op_ is set. This
// is maintained in case any of the component's rowsets get compacted away,
// ensuring we retain a reference to them for the duration of this op.
scoped_refptr<const TabletComponents> tablet_components_;
// The uncommitted transaction rowsets to write to if this write op is a part
// of a transaction, acquired at the same time as mvcc_op_ is set.
scoped_refptr<TxnRowSets> txn_rowsets_;
// A lock held on the tablet's schema. Prevents concurrent schema change
// from racing with a write.
shared_lock<rw_semaphore> schema_lock_;
// The Schema of the tablet when the op was first decoded. This is verified
// at APPLY time to ensure we don't have races against schema change.
// Protected by op_state_lock_.
const Schema* schema_at_decode_time_;
// Lock that protects access to various fields of WriteOpState.
mutable simple_spinlock op_state_lock_;
// Transaction to which this write op writes to, if any.
scoped_refptr<Txn> txn_;
// Lock protecting the transaction's state, ensuring it remains open for the
// duration of this write.
shared_lock<rw_semaphore> txn_lock_;
DISALLOW_COPY_AND_ASSIGN(WriteOpState);
};
// Executes a write op.
class WriteOp : public Op {
public:
WriteOp(std::unique_ptr<WriteOpState> state, consensus::DriverType type);
WriteOpState* state() override { return state_.get(); }
const WriteOpState* state() const override { return state_.get(); }
void NewReplicateMsg(std::unique_ptr<consensus::ReplicateMsg>* replicate_msg) override;
// Executes a Prepare for a write op.
//
// Decodes the operations in the request and acquires row locks for each of
// the affected rows. This results in adding 'RowOp' objects for each of the
// operations into the WriteOpState.
//
// Returns an error if the request contains an operation that is malformed
// or isn't authorized.
Status Prepare() override;
void AbortPrepare() override;
// Actually starts the Mvcc op and assigns a timestamp to this op.
Status Start() override;
// Executes an Apply for a write op.
//
// Actually applies inserts/mutates into the tablet. After these start being
// applied, the op must run to completion as there is currently no means of
// undoing an update.
//
// After completing the inserts/mutates, the row locks and the mvcc op can be
// released, allowing other op to update the same rows. However the
// component lock must not be released until the commit msg, which indicates
// where each of the inserts/mutates were applied, is persisted to stable
// storage. Because of this ApplyTask must enqueue a CommitTask before
// releasing both the row locks and deleting the MvccOp as we need to make
// sure that Commits that touch the same set of rows are persisted in order,
// for recovery.
// This, of course, assumes that commits are executed in the same order they
// are placed in the queue (but not necessarily in the same order of the
// original requests) which is already a requirement of the consensus
// algorithm.
Status Apply(consensus::CommitMsg** commit_msg) override;
// If result == COMMITTED, commits the mvcc op and updates the metrics, if
// result == ABORTED aborts the mvcc op.
void Finish(OpResult result) override;
std::string ToString() const override;
private:
void UpdatePerRowErrors();
// this op's start time
MonoTime start_time_;
std::unique_ptr<WriteOpState> state_;
private:
DISALLOW_COPY_AND_ASSIGN(WriteOp);
};
} // namespace tablet
} // namespace kudu