blob: f418bad78f07d40049efddd27c908282d42d43f7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/client/batcher.h"
#include <algorithm>
#include <boost/bind.hpp>
#include <glog/logging.h>
#include <memory>
#include <set>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include "kudu/client/callbacks.h"
#include "kudu/client/client.h"
#include "kudu/client/client-internal.h"
#include "kudu/client/error_collector.h"
#include "kudu/client/meta_cache.h"
#include "kudu/client/session-internal.h"
#include "kudu/client/write_op.h"
#include "kudu/client/write_op-internal.h"
#include "kudu/common/encoded_key.h"
#include "kudu/common/row_operations.h"
#include "kudu/common/wire_protocol.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/human_readable.h"
#include "kudu/gutil/strings/join.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/messenger.h"
#include "kudu/rpc/rpc.h"
#include "kudu/tserver/tserver_service.proxy.h"
#include "kudu/util/debug-util.h"
#include "kudu/util/logging.h"
using std::pair;
using std::set;
using std::shared_ptr;
using std::unordered_map;
using strings::Substitute;
namespace kudu {
using rpc::ErrorStatusPB;
using rpc::Messenger;
using rpc::Rpc;
using rpc::RpcController;
using tserver::WriteRequestPB;
using tserver::WriteResponsePB;
using tserver::WriteResponsePB_PerRowErrorPB;
namespace client {
namespace internal {
// About lock ordering in this file:
// ------------------------------
// The locks must be acquired in the following order:
// - Batcher::lock_
// - InFlightOp::lock_
//
// It's generally important to release all the locks before either calling
// a user callback, or chaining to another async function, since that function
// may also chain directly to the callback. Without releasing locks first,
// the lock ordering may be violated, or a lock may deadlock on itself (these
// locks are non-reentrant).
// ------------------------------------------------------------
// An operation which has been submitted to the batcher and not yet completed.
// The operation goes through a state machine as it progress through the
// various stages of a request. See the State enum for details.
//
// Note that in-flight ops *conceptually* hold a reference to the Batcher object.
// However, since there might be millions of these objects floating around,
// we can save a pointer per object by manually incrementing the Batcher ref-count
// when we create the object, and decrementing when we delete it.
struct InFlightOp {
InFlightOp() : state(kNew) {
}
// Lock protecting the internal state of the op.
// This is necessary since callbacks may fire from IO threads
// concurrent with the user trying to abort/delete the batch.
// See comment above about lock ordering.
simple_spinlock lock_;
enum State {
// Newly created op.
//
// OWNERSHIP: The op is only in this state when in local function scope (Batcher::Add)
kNew = 0,
// Waiting for the MetaCache to determine which tablet ID hosts the row associated
// with this operation. In the case that the relevant tablet's key range was
// already cached, this state will be passed through immediately. Otherwise,
// the op may sit in this state for some amount of time while waiting on the
// MetaCache to perform an RPC to the master and find the correct tablet.
//
// OWNERSHIP: the op is present in the 'ops_' set, and also referenced by the
// in-flight callback provided to MetaCache.
kLookingUpTablet,
// Once the correct tablet has been determined, and the tablet locations have been
// refreshed, we are ready to send the operation to the server.
//
// In MANUAL_FLUSH mode, the operations wait in this state until Flush has been called.
//
// In AUTO_FLUSH_BACKGROUND mode, the operations may wait in this state for one of
// two reasons:
//
// 1) There are already too many outstanding RPCs to the given tablet server.
//
// We restrict the number of concurrent RPCs from one client to a given TS
// to achieve better batching and throughput.
// TODO: not implemented yet
//
// 2) Batching delay.
//
// In order to achieve better batching, we do not immediately send a request
// to a TS as soon as we have one pending. Instead, we can wait for a configurable
// number of milliseconds for more requests to enter the queue for the same TS.
// This makes it likely that if a caller simply issues a small number of requests
// to the same tablet in AUTO_FLUSH_BACKGROUND mode that we'll batch all of the
// requests together in a single RPC.
// TODO: not implemented yet
//
// OWNERSHIP: When the operation is in this state, it is present in the 'ops_' set
// and also in the 'per_tablet_ops' map.
kBufferedToTabletServer,
// Once the operation has been flushed (either due to explicit Flush() or background flush)
// it will enter this state.
//
// OWNERSHIP: when entering this state, the op is removed from 'per_tablet_ops' map
// and ownership is transfered to a WriteRPC's 'ops_' vector. The op still
// remains in the 'ops_' set.
kRequestSent
};
State state;
// The actual operation.
gscoped_ptr<KuduWriteOperation> write_op;
string partition_key;
// The tablet the operation is destined for.
// This is only filled in after passing through the kLookingUpTablet state.
scoped_refptr<RemoteTablet> tablet;
// Each operation has a unique sequence number which preserves the user's intended
// order of operations. This is important when multiple operations act on the same row.
int sequence_number_;
string ToString() const {
return strings::Substitute("op[state=$0, write_op=$1]",
state, write_op->ToString());
}
};
// A Write RPC which is in-flight to a tablet. Initially, the RPC is sent
// to the leader replica, but it may be retried with another replica if the
// leader fails.
//
// Keeps a reference on the owning batcher while alive.
class WriteRpc : public Rpc {
public:
WriteRpc(const scoped_refptr<Batcher>& batcher,
RemoteTablet* const tablet,
vector<InFlightOp*> ops,
const MonoTime& deadline,
const shared_ptr<Messenger>& messenger);
virtual ~WriteRpc();
virtual void SendRpc() OVERRIDE;
virtual string ToString() const OVERRIDE;
const KuduTable* table() const {
// All of the ops for a given tablet obviously correspond to the same table,
// so we'll just grab the table from the first.
return ops_[0]->write_op->table();
}
const RemoteTablet* tablet() const { return tablet_; }
const vector<InFlightOp*>& ops() const { return ops_; }
const WriteResponsePB& resp() const { return resp_; }
private:
// Called when we finish a lookup (to find the new consensus leader). Retries
// the rpc after a short delay.
void LookupTabletCb(const Status& status);
// Called when we finish initializing a TS proxy.
// Sends the RPC, provided there was no error.
void InitTSProxyCb(const Status& status);
// Marks all replicas on current_ts_ as failed and retries the write on a
// new replica.
void FailToNewReplica(const Status& reason);
virtual void SendRpcCb(const Status& status) OVERRIDE;
// Pointer back to the batcher. Processes the write response when it
// completes, regardless of success or failure.
scoped_refptr<Batcher> batcher_;
// The tablet that should receive this write.
RemoteTablet* const tablet_;
// The TS receiving the write. May change if the write is retried.
RemoteTabletServer* current_ts_;
// TSes that refused the write because they were followers at the time.
// Cleared when new consensus configuration information arrives from the master.
set<RemoteTabletServer*> followers_;
// Request body.
WriteRequestPB req_;
// Response body.
WriteResponsePB resp_;
// Operations which were batched into this RPC.
// These operations are in kRequestSent state.
vector<InFlightOp*> ops_;
};
WriteRpc::WriteRpc(const scoped_refptr<Batcher>& batcher,
RemoteTablet* const tablet,
vector<InFlightOp*> ops,
const MonoTime& deadline,
const shared_ptr<Messenger>& messenger)
: Rpc(deadline, messenger),
batcher_(batcher),
tablet_(tablet),
current_ts_(NULL),
ops_(std::move(ops)) {
const Schema* schema = table()->schema().schema_;
req_.set_tablet_id(tablet->tablet_id());
switch (batcher->external_consistency_mode()) {
case kudu::client::KuduSession::CLIENT_PROPAGATED:
req_.set_external_consistency_mode(kudu::CLIENT_PROPAGATED);
break;
case kudu::client::KuduSession::COMMIT_WAIT:
req_.set_external_consistency_mode(kudu::COMMIT_WAIT);
break;
default:
LOG(FATAL) << "Unsupported consistency mode: " << batcher->external_consistency_mode();
}
// Set up schema
CHECK_OK(SchemaToPB(*schema, req_.mutable_schema(),
SCHEMA_PB_WITHOUT_STORAGE_ATTRIBUTES | SCHEMA_PB_WITHOUT_IDS));
RowOperationsPB* requested = req_.mutable_row_operations();
// Add the rows
int ctr = 0;
RowOperationsPBEncoder enc(requested);
for (InFlightOp* op : ops_) {
const Partition& partition = op->tablet->partition();
const PartitionSchema& partition_schema = table()->partition_schema();
const KuduPartialRow& row = op->write_op->row();
#ifndef NDEBUG
bool partition_contains_row;
CHECK(partition_schema.PartitionContainsRow(partition, row, &partition_contains_row).ok());
CHECK(partition_contains_row)
<< "Row " << partition_schema.RowDebugString(row)
<< "not in partition " << partition_schema.PartitionDebugString(partition, *schema);
#endif
enc.Add(ToInternalWriteType(op->write_op->type()), op->write_op->row());
// Set the state now, even though we haven't yet sent it -- at this point
// there is no return, and we're definitely going to send it. If we waited
// until after we sent it, the RPC callback could fire before we got a chance
// to change its state to 'sent'.
op->state = InFlightOp::kRequestSent;
VLOG(4) << ++ctr << ". Encoded row " << op->write_op->ToString();
}
if (VLOG_IS_ON(3)) {
VLOG(3) << "Created batch for " << tablet->tablet_id() << ":\n"
<< req_.ShortDebugString();
}
}
WriteRpc::~WriteRpc() {
STLDeleteElements(&ops_);
}
void WriteRpc::SendRpc() {
// Choose a destination TS according to the following algorithm:
// 1. If the tablet metadata is stale, refresh it (goto step 5).
// 2. Select the leader, provided:
// a. The current leader is known,
// b. It hasn't failed, and
// c. It isn't currently marked as a follower.
// 3. If there's no good leader select another replica, provided:
// a. It hasn't failed, and
// b. It hasn't rejected our write due to being a follower.
// 4. Preemptively mark the replica we selected in step 3 as "leader" in the
// meta cache, so that our selection remains sticky until the next Master
// metadata refresh.
// 5. If we're out of appropriate replicas, force a lookup to the master
// to fetch new consensus configuration information.
// 6. When the lookup finishes, forget which replicas were followers and
// retry the write (i.e. goto 2).
// 7. If we issue the write and it fails because the destination was a
// follower, remember that fact and retry the write (i.e. goto 2).
// 8. Repeat steps 1-7 until the write succeeds, fails for other reasons,
// or the write's deadline expires.
current_ts_ = nullptr;
if (!tablet_->stale()) {
current_ts_ = tablet_->LeaderTServer();
if (current_ts_ && ContainsKey(followers_, current_ts_)) {
VLOG(2) << "Tablet " << tablet_->tablet_id() << ": We have a follower for a leader: "
<< current_ts_->ToString();
// Mark the node as a follower in the cache so that on the next go-round,
// LeaderTServer() will not return it as a leader unless a full metadata
// refresh has occurred. This also avoids LookupTabletByKey() going into
// "fast path" mode and not actually performing a metadata refresh from the
// Master when it needs to.
tablet_->MarkTServerAsFollower(current_ts_);
current_ts_ = NULL;
}
if (!current_ts_) {
// Try to "guess" the next leader.
vector<RemoteTabletServer*> replicas;
tablet_->GetRemoteTabletServers(&replicas);
for (RemoteTabletServer* ts : replicas) {
if (!ContainsKey(followers_, ts)) {
current_ts_ = ts;
break;
}
}
if (current_ts_) {
// Mark this next replica "preemptively" as the leader in the meta cache,
// so we go to it first on the next write if writing was successful.
VLOG(1) << "Tablet " << tablet_->tablet_id() << ": Previous leader failed. "
<< "Preemptively marking tserver " << current_ts_->ToString()
<< " as leader in the meta cache.";
tablet_->MarkTServerAsLeader(current_ts_);
}
}
}
// If we've tried all replicas, force a lookup to the master to find the
// new leader. This relies on some properties of LookupTabletByKey():
// 1. The fast path only works when there's a non-failed leader (which we
// know is untrue here).
// 2. The slow path always fetches consensus configuration information and updates the
// looked-up tablet.
// Put another way, we don't care about the lookup results at all; we're
// just using it to fetch the latest consensus configuration information.
//
// TODO: When we support tablet splits, we should let the lookup shift
// the write to another tablet (i.e. if it's since been split).
if (!current_ts_) {
batcher_->client_->data_->meta_cache_->LookupTabletByKey(table(),
tablet_->partition()
.partition_key_start(),
retrier().deadline(),
NULL,
Bind(&WriteRpc::LookupTabletCb,
Unretained(this)));
return;
}
// Make sure we have a working proxy before sending out the RPC.
current_ts_->InitProxy(batcher_->client_,
Bind(&WriteRpc::InitTSProxyCb, Unretained(this)));
}
string WriteRpc::ToString() const {
return Substitute("Write(tablet: $0, num_ops: $1, num_attempts: $2)",
tablet_->tablet_id(), ops_.size(), num_attempts());
}
void WriteRpc::LookupTabletCb(const Status& status) {
// If the table was deleted, the master will return TABLE_DELETED and the
// meta cache will return Status::NotFound.
if (status.IsNotFound()) {
batcher_->ProcessWriteResponse(*this, status);
delete this;
return;
}
// We should retry the RPC regardless of the outcome of the lookup, as
// leader election doesn't depend on the existence of a master at all.
//
// Retry() imposes a slight delay, which is desirable in a lookup loop,
// but unnecessary the first time through. Seeing as leader failures are
// rare, perhaps this doesn't matter.
followers_.clear();
mutable_retrier()->DelayedRetry(this, status);
}
void WriteRpc::InitTSProxyCb(const Status& status) {
// Fail to a replica in the event of a DNS resolution failure.
if (!status.ok()) {
FailToNewReplica(status);
return;
}
VLOG(2) << "Tablet " << tablet_->tablet_id() << ": Writing batch to replica "
<< current_ts_->ToString();
current_ts_->proxy()->WriteAsync(req_, &resp_,
mutable_retrier()->mutable_controller(),
boost::bind(&WriteRpc::SendRpcCb, this, Status::OK()));
}
void WriteRpc::FailToNewReplica(const Status& reason) {
VLOG(1) << "Failing " << ToString() << " to a new replica: "
<< reason.ToString();
bool found = tablet_->MarkReplicaFailed(current_ts_, reason);
DCHECK(found)
<< "Tablet " << tablet_->tablet_id() << ": Unable to mark replica " << current_ts_->ToString()
<< " as failed. Replicas: " << tablet_->ReplicasAsString();
mutable_retrier()->DelayedRetry(this, reason);
}
void WriteRpc::SendRpcCb(const Status& status) {
// Prefer early failures over controller failures.
Status new_status = status;
if (new_status.ok() && mutable_retrier()->HandleResponse(this, &new_status)) {
return;
}
// Failover to a replica in the event of any network failure.
//
// TODO: This is probably too harsh; some network failures should be
// retried on the current replica.
if (new_status.IsNetworkError()) {
FailToNewReplica(new_status);
return;
}
// Prefer controller failures over response failures.
if (new_status.ok() && resp_.has_error()) {
new_status = StatusFromPB(resp_.error().status());
}
if (resp_.has_error() && resp_.error().code() == tserver::TabletServerErrorPB::TABLET_NOT_FOUND) {
// If we get TABLET_NOT_FOUND, the replica we thought was leader has been
// deleted. We mark our tablet cache as stale, forcing a master lookup on
// the next attempt.
tablet_->MarkStale();
// TODO: Don't backoff the first time we hit this error (see KUDU-1314).
mutable_retrier()->DelayedRetry(this, StatusFromPB(resp_.error().status()));
return;
} else if (new_status.IsIllegalState() || new_status.IsAborted()) {
// Alternatively, when we get a status code of IllegalState or Aborted, we
// assume this means that the replica we attempted to write to is not the
// current leader (maybe it got partitioned or slow and another node took
// over). We attempt to fail over to another replica in the config.
//
// TODO: This error handling block should really be rewritten to handle
// specific error codes exclusively instead of Status codes (this may
// require some server-side changes). For example, IllegalState is
// obviously way too broad an error category for this case.
followers_.insert(current_ts_);
mutable_retrier()->DelayedRetry(this, new_status);
return;
}
if (!new_status.ok()) {
string current_ts_string;
if (current_ts_) {
current_ts_string = Substitute("on tablet server $0", current_ts_->ToString());
} else {
current_ts_string = "(no tablet server available)";
}
new_status = new_status.CloneAndPrepend(
Substitute("Failed to write batch of $0 ops to tablet $1 "
"$2 after $3 attempt(s)",
ops_.size(), tablet_->tablet_id(),
current_ts_string, num_attempts()));
LOG(WARNING) << new_status.ToString();
}
batcher_->ProcessWriteResponse(*this, new_status);
delete this;
}
Batcher::Batcher(KuduClient* client,
ErrorCollector* error_collector,
const sp::shared_ptr<KuduSession>& session,
kudu::client::KuduSession::ExternalConsistencyMode consistency_mode)
: state_(kGatheringOps),
client_(client),
weak_session_(session),
consistency_mode_(consistency_mode),
error_collector_(error_collector),
had_errors_(false),
flush_callback_(NULL),
next_op_sequence_number_(0),
outstanding_lookups_(0),
max_buffer_size_(7 * 1024 * 1024),
buffer_bytes_used_(0) {
}
void Batcher::Abort() {
unique_lock<simple_spinlock> l(&lock_);
state_ = kAborted;
vector<InFlightOp*> to_abort;
for (InFlightOp* op : ops_) {
lock_guard<simple_spinlock> l(&op->lock_);
if (op->state == InFlightOp::kBufferedToTabletServer) {
to_abort.push_back(op);
}
}
for (InFlightOp* op : to_abort) {
VLOG(1) << "Aborting op: " << op->ToString();
MarkInFlightOpFailedUnlocked(op, Status::Aborted("Batch aborted"));
}
if (flush_callback_) {
l.unlock();
flush_callback_->Run(Status::Aborted(""));
}
}
Batcher::~Batcher() {
if (PREDICT_FALSE(!ops_.empty())) {
for (InFlightOp* op : ops_) {
LOG(ERROR) << "Orphaned op: " << op->ToString();
}
LOG(FATAL) << "ops_ not empty";
}
CHECK(state_ == kFlushed || state_ == kAborted) << "Bad state: " << state_;
}
void Batcher::SetTimeoutMillis(int millis) {
CHECK_GE(millis, 0);
lock_guard<simple_spinlock> l(&lock_);
timeout_ = MonoDelta::FromMilliseconds(millis);
}
bool Batcher::HasPendingOperations() const {
lock_guard<simple_spinlock> l(&lock_);
return !ops_.empty();
}
int Batcher::CountBufferedOperations() const {
lock_guard<simple_spinlock> l(&lock_);
if (state_ == kGatheringOps) {
return ops_.size();
} else {
// If we've already started to flush, then the ops aren't
// considered "buffered".
return 0;
}
}
void Batcher::CheckForFinishedFlush() {
sp::shared_ptr<KuduSession> session;
{
lock_guard<simple_spinlock> l(&lock_);
if (state_ != kFlushing || !ops_.empty()) {
return;
}
session = weak_session_.lock();
state_ = kFlushed;
}
if (session) {
// Important to do this outside of the lock so that we don't have
// a lock inversion deadlock -- the session lock should always
// come before the batcher lock.
session->data_->FlushFinished(this);
}
Status s;
if (had_errors_) {
// User is responsible for fetching errors from the error collector.
s = Status::IOError("Some errors occurred");
}
flush_callback_->Run(s);
}
MonoTime Batcher::ComputeDeadlineUnlocked() const {
MonoDelta timeout = timeout_;
if (PREDICT_FALSE(!timeout.Initialized())) {
KLOG_EVERY_N(WARNING, 1000) << "Client writing with no timeout set, using 60 seconds.\n"
<< GetStackTrace();
timeout = MonoDelta::FromSeconds(60);
}
MonoTime ret = MonoTime::Now(MonoTime::FINE);
ret.AddDelta(timeout);
return ret;
}
void Batcher::FlushAsync(KuduStatusCallback* cb) {
{
lock_guard<simple_spinlock> l(&lock_);
CHECK_EQ(state_, kGatheringOps);
state_ = kFlushing;
flush_callback_ = cb;
deadline_ = ComputeDeadlineUnlocked();
}
// In the case that we have nothing buffered, just call the callback
// immediately. Otherwise, the callback will be called by the last callback
// when it sees that the ops_ list has drained.
CheckForFinishedFlush();
// Trigger flushing of all of the buffers. Some of these may already have
// been flushed through an async path, but it's idempotent - a second call
// to flush would just be a no-op.
//
// If some of the operations are still in-flight, then they'll get sent
// when they hit 'per_tablet_ops', since our state is now kFlushing.
FlushBuffersIfReady();
}
Status Batcher::Add(KuduWriteOperation* write_op) {
int64_t required_size = write_op->SizeInBuffer();
int64_t size_after_adding = buffer_bytes_used_.IncrementBy(required_size);
if (PREDICT_FALSE(size_after_adding > max_buffer_size_)) {
buffer_bytes_used_.IncrementBy(-required_size);
int64_t size_before_adding = size_after_adding - required_size;
return Status::Incomplete(Substitute(
"not enough space remaining in buffer for op (required $0, "
"$1 already used",
HumanReadableNumBytes::ToString(required_size),
HumanReadableNumBytes::ToString(size_before_adding)));
}
// As soon as we get the op, start looking up where it belongs,
// so that when the user calls Flush, we are ready to go.
gscoped_ptr<InFlightOp> op(new InFlightOp());
RETURN_NOT_OK(write_op->table_->partition_schema()
.EncodeKey(write_op->row(), &op->partition_key));
op->write_op.reset(write_op);
op->state = InFlightOp::kLookingUpTablet;
AddInFlightOp(op.get());
VLOG(3) << "Looking up tablet for " << op->write_op->ToString();
// Increment our reference count for the outstanding callback.
//
// deadline_ is set in FlushAsync(), after all Add() calls are done, so
// here we're forced to create a new deadline.
MonoTime deadline = ComputeDeadlineUnlocked();
base::RefCountInc(&outstanding_lookups_);
client_->data_->meta_cache_->LookupTabletByKey(
op->write_op->table(),
op->partition_key,
deadline,
&op->tablet,
Bind(&Batcher::TabletLookupFinished, this, op.get()));
IgnoreResult(op.release());
return Status::OK();
}
void Batcher::AddInFlightOp(InFlightOp* op) {
DCHECK_EQ(op->state, InFlightOp::kLookingUpTablet);
lock_guard<simple_spinlock> l(&lock_);
CHECK_EQ(state_, kGatheringOps);
InsertOrDie(&ops_, op);
op->sequence_number_ = next_op_sequence_number_++;
}
bool Batcher::IsAbortedUnlocked() const {
return state_ == kAborted;
}
void Batcher::MarkHadErrors() {
lock_guard<simple_spinlock> l(&lock_);
had_errors_ = true;
}
void Batcher::MarkInFlightOpFailed(InFlightOp* op, const Status& s) {
lock_guard<simple_spinlock> l(&lock_);
MarkInFlightOpFailedUnlocked(op, s);
}
void Batcher::MarkInFlightOpFailedUnlocked(InFlightOp* op, const Status& s) {
CHECK_EQ(1, ops_.erase(op))
<< "Could not remove op " << op->ToString() << " from in-flight list";
gscoped_ptr<KuduError> error(new KuduError(op->write_op.release(), s));
error_collector_->AddError(std::move(error));
had_errors_ = true;
delete op;
}
void Batcher::TabletLookupFinished(InFlightOp* op, const Status& s) {
base::RefCountDec(&outstanding_lookups_);
// Acquire the batcher lock early to atomically:
// 1. Test if the batcher was aborted, and
// 2. Change the op state.
unique_lock<simple_spinlock> l(&lock_);
if (IsAbortedUnlocked()) {
VLOG(1) << "Aborted batch: TabletLookupFinished for " << op->write_op->ToString();
MarkInFlightOpFailedUnlocked(op, Status::Aborted("Batch aborted"));
// 'op' is deleted by above function.
return;
}
if (VLOG_IS_ON(3)) {
VLOG(3) << "TabletLookupFinished for " << op->write_op->ToString()
<< ": " << s.ToString();
if (s.ok()) {
VLOG(3) << "Result: tablet_id = " << op->tablet->tablet_id();
}
}
if (!s.ok()) {
MarkInFlightOpFailedUnlocked(op, s);
l.unlock();
CheckForFinishedFlush();
// Even if we failed our lookup, it's possible that other requests were still
// pending waiting for our pending lookup to complete. So, we have to let them
// proceed.
FlushBuffersIfReady();
return;
}
{
lock_guard<simple_spinlock> l2(&op->lock_);
CHECK_EQ(op->state, InFlightOp::kLookingUpTablet);
CHECK(op->tablet != NULL);
op->state = InFlightOp::kBufferedToTabletServer;
vector<InFlightOp*>& to_ts = per_tablet_ops_[op->tablet.get()];
to_ts.push_back(op);
// "Reverse bubble sort" the operation into the right spot in the tablet server's
// buffer, based on the sequence numbers of the ops.
//
// There is a rare race (KUDU-743) where two operations in the same batch can get
// their order inverted with respect to the order that the user originally performed
// the operations. This loop re-sequences them back into the correct order. In
// the common case, it will break on the first iteration, so we expect the loop to be
// constant time, with worst case O(n). This is usually much better than something
// like a priority queue which would have O(lg n) in every case and a more complex
// code path.
for (int i = to_ts.size() - 1; i > 0; --i) {
if (to_ts[i]->sequence_number_ < to_ts[i - 1]->sequence_number_) {
std::swap(to_ts[i], to_ts[i - 1]);
} else {
break;
}
}
}
l.unlock();
FlushBuffersIfReady();
}
void Batcher::FlushBuffersIfReady() {
unordered_map<RemoteTablet*, vector<InFlightOp*> > ops_copy;
// We're only ready to flush if:
// 1. The batcher is in the flushing state (i.e. FlushAsync was called).
// 2. All outstanding ops have finished lookup. Why? To avoid a situation
// where ops are flushed one by one as they finish lookup.
{
lock_guard<simple_spinlock> l(&lock_);
if (state_ != kFlushing) {
VLOG(3) << "FlushBuffersIfReady: batcher not yet in flushing state";
return;
}
if (!base::RefCountIsZero(&outstanding_lookups_)) {
VLOG(3) << "FlushBuffersIfReady: "
<< base::subtle::NoBarrier_Load(&outstanding_lookups_)
<< " ops still in lookup";
return;
}
// Take ownership of the ops while we're under the lock.
ops_copy.swap(per_tablet_ops_);
}
// Now flush the ops for each tablet.
for (const OpsMap::value_type& e : ops_copy) {
RemoteTablet* tablet = e.first;
const vector<InFlightOp*>& ops = e.second;
VLOG(3) << "FlushBuffersIfReady: already in flushing state, immediately flushing to "
<< tablet->tablet_id();
FlushBuffer(tablet, ops);
}
}
void Batcher::FlushBuffer(RemoteTablet* tablet, const vector<InFlightOp*>& ops) {
CHECK(!ops.empty());
// Create and send an RPC that aggregates the ops. The RPC is freed when
// its callback completes.
//
// The RPC object takes ownership of the ops.
WriteRpc* rpc = new WriteRpc(this,
tablet,
ops,
deadline_,
client_->data_->messenger_);
rpc->SendRpc();
}
void Batcher::ProcessWriteResponse(const WriteRpc& rpc,
const Status& s) {
// TODO: there is a potential race here -- if the Batcher gets destructed while
// RPCs are in-flight, then accessing state_ will crash. We probably need to keep
// track of the in-flight RPCs, and in the destructor, change each of them to an
// "aborted" state.
CHECK_EQ(state_, kFlushing);
if (s.ok()) {
if (rpc.resp().has_timestamp()) {
client_->data_->UpdateLatestObservedTimestamp(rpc.resp().timestamp());
}
} else {
// Mark each of the rows in the write op as failed, since the whole RPC failed.
for (InFlightOp* op : rpc.ops()) {
gscoped_ptr<KuduError> error(new KuduError(op->write_op.release(), s));
error_collector_->AddError(std::move(error));
}
MarkHadErrors();
}
// Remove all the ops from the "in-flight" list.
{
lock_guard<simple_spinlock> l(&lock_);
for (InFlightOp* op : rpc.ops()) {
CHECK_EQ(1, ops_.erase(op))
<< "Could not remove op " << op->ToString()
<< " from in-flight list";
}
}
// Check individual row errors.
for (const WriteResponsePB_PerRowErrorPB& err_pb : rpc.resp().per_row_errors()) {
// TODO: handle case where we get one of the more specific TS errors
// like the tablet not being hosted?
if (err_pb.row_index() >= rpc.ops().size()) {
LOG(ERROR) << "Received a per_row_error for an out-of-bound op index "
<< err_pb.row_index() << " (sent only "
<< rpc.ops().size() << " ops)";
LOG(ERROR) << "Response from tablet " << rpc.tablet()->tablet_id() << ":\n"
<< rpc.resp().DebugString();
continue;
}
gscoped_ptr<KuduWriteOperation> op = std::move(rpc.ops()[err_pb.row_index()]->write_op);
VLOG(1) << "Error on op " << op->ToString() << ": "
<< err_pb.error().ShortDebugString();
Status op_status = StatusFromPB(err_pb.error());
gscoped_ptr<KuduError> error(new KuduError(op.release(), op_status));
error_collector_->AddError(std::move(error));
MarkHadErrors();
}
CheckForFinishedFlush();
}
} // namespace internal
} // namespace client
} // namespace kudu