blob: 38747051317d429a2536d2032b8d2439eec5c3b4 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstdint>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <vector>
#include <sparsehash/dense_hash_set>
#include "kudu/client/client.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/client/write_op.h"
#include "kudu/common/txn_id.h"
#include "kudu/gutil/atomicops.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/atomic.h"
#include "kudu/util/locks.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/monotime.h"
#include "kudu/util/status.h"
namespace kudu {
namespace client {
class KuduStatusCallback;
namespace internal {
class ErrorCollector;
class RemoteTablet;
class WriteRpc;
struct InFlightOp;
// A Batcher is the class responsible for collecting row operations, routing them to the
// correct tablet server, and possibly batching them together for better efficiency.
//
// It is a reference-counted class: the client session creating the batch holds one
// reference, and all of the in-flight operations hold others. This allows the client
// session to be destructed while ops are still in-flight, without the async callbacks
// attempting to access a destructed Batcher.
class Batcher : public RefCountedThreadSafe<Batcher> {
public:
// Create a new batcher associated with the given session.
//
// Any errors which come back from operations performed by this batcher
// are posted to the provided ErrorCollector.
//
// Takes a reference on error_collector. Takes a weak_ptr to session -- this
// is to break circular dependencies (a session keeps a reference to its
// current batcher) and make it possible to call notify a session
// (if it's around) from a batcher which does its job using other threads.
Batcher(KuduClient* client,
scoped_refptr<ErrorCollector> error_collector,
client::sp::weak_ptr<KuduSession> session,
kudu::client::KuduSession::ExternalConsistencyMode consistency_mode,
const kudu::TxnId& txn_id);
// Abort the current batch. Any writes that were buffered and not yet sent are
// discarded. Those that were sent may still be delivered. If there is a pending Flush
// callback, it will be called immediately with an error status.
void Abort();
// Set the timeout for this batcher.
//
// The timeout is currently set on all of the RPCs, but in the future will be relative
// to when the Flush call is made (eg even if the lookup of the TS takes a long time, it
// may time out before even sending an op). TODO: implement that
void SetTimeout(const MonoDelta& timeout);
// Add a new operation to the batch. Requires that the batch has not yet been flushed.
//
// NOTE: If this returns not-OK, does not take ownership of 'write_op'.
Status Add(KuduWriteOperation* write_op) WARN_UNUSED_RESULT;
// Return true if any operations are still pending. An operation is no longer considered
// pending once it has either errored or succeeded. Operations are considering pending
// as soon as they are added, even if Flush has not been called.
bool HasPendingOperations() const;
// Return the number of buffered operations. These are only those operations which are
// "corked" (i.e not yet flushed). Once Flush has been called, this returns 0.
int CountBufferedOperations() const;
// Flush any buffered operations. The callback will be called once there are no
// more pending operations from this Batcher. If all of the operations succeeded,
// then the callback will receive Status::OK. Otherwise, it will receive IOError,
// and the caller must inspect the ErrorCollector to retrieve more detailed
// information on which operations failed.
void FlushAsync(KuduStatusCallback* cb);
// Returns the consistency mode set on the batcher by the session when it was initially
// created.
kudu::client::KuduSession::ExternalConsistencyMode external_consistency_mode() const {
return consistency_mode_;
}
// Get time of the first operation in the batch. If no operations are in
// there yet, the returned MonoTime object is not initialized
// (i.e. MonoTime::Initialized() returns false).
const MonoTime& first_op_time() const {
std::lock_guard<simple_spinlock> l(lock_);
return first_op_time_;
}
// Return the total size (number of bytes) of all pending write operations
// accumulated by the batcher.
int64_t buffer_bytes_used() const {
return buffer_bytes_used_.Load();
}
// Return the identifier of a multi-row transaction (if any) that all the
// accumulated write operations are part of.
const TxnId& txn_id() const {
return txn_id_;
}
// Compute in-buffer size for the given write operation.
static int64_t GetOperationSizeInBuffer(KuduWriteOperation* write_op) {
return write_op->SizeInBuffer();
}
private:
friend class RefCountedThreadSafe<Batcher>;
friend class WriteRpc;
~Batcher();
// Add an op to the in-flight set and increment the ref-count.
void AddInFlightOp(InFlightOp* op);
void RemoveInFlightOp(InFlightOp* op);
// Return true if the batch has been aborted, and any in-flight ops should stop
// processing wherever they are.
bool IsAbortedUnlocked() const;
// Mark the fact that errors have occurred with this batch. This ensures that
// the flush callback will get a bad Status.
void MarkHadErrors();
// Remove an op from the in-flight op list, and delete the op itself.
// The operation is reported to the ErrorReporter as having failed with the
// given status.
void MarkInFlightOpFailed(InFlightOp* op, const Status& s);
void MarkInFlightOpFailedUnlocked(InFlightOp* op, const Status& s);
void CheckForFinishedFlush();
void FlushBuffersIfReady();
void FlushBuffer(RemoteTablet* tablet, const std::vector<InFlightOp*>& ops);
// Cleans up an RPC response, scooping out any errors and passing them up
// to the batcher.
void ProcessWriteResponse(const WriteRpc& rpc, const Status& s);
// Async Callbacks.
void TabletLookupFinished(InFlightOp* op, const Status& s);
// Compute a new deadline based on timeout_. If no timeout_ has been set,
// uses a hard-coded default and issues periodic warnings.
MonoTime ComputeDeadlineUnlocked() const;
// See note about lock ordering in batcher.cc
mutable simple_spinlock lock_;
enum State {
kGatheringOps,
kFlushing,
kFlushed,
kAborted
};
State state_;
KuduClient* const client_;
client::sp::weak_ptr<KuduSession> weak_session_;
// The consistency mode set in the session.
const kudu::client::KuduSession::ExternalConsistencyMode consistency_mode_;
// The identifier of a transaction that is associated with operations
// processed by this batcher. All operations accumulated by one batcher either
// related to the same multi-row transaction or all of the accumulated
// operations are not a part of any multi-row transaction. In the latter case,
// txn_id_.IsValid() would return 'false'.
const TxnId txn_id_;
// Errors are reported into this error collector.
scoped_refptr<ErrorCollector> error_collector_;
// The time when the very first operation was added into the batcher.
MonoTime first_op_time_;
// Set to true if there was at least one error from this Batcher.
// Protected by lock_
bool had_errors_;
// If state is kFlushing, this member will be set to the user-provided
// callback. Once there are no more in-flight operations, the callback
// will be called exactly once (and the state changed to kFlushed).
KuduStatusCallback* flush_callback_;
// All buffered or in-flight ops.
google::dense_hash_set<InFlightOp*> ops_;
// Each tablet's buffered ops.
typedef std::unordered_map<RemoteTablet*, std::vector<InFlightOp*> > OpsMap;
OpsMap per_tablet_ops_;
// When each operation is added to the batcher, it is assigned a sequence number
// which preserves the user's intended order. Preserving order is critical when
// a batch contains multiple operations against the same row key. This member
// assigns the sequence numbers.
// Protected by lock_.
int next_op_sequence_number_;
// Amount of time to wait for a given op, from start to finish.
//
// Set by SetTimeout().
MonoDelta timeout_;
// After flushing, the absolute deadline for all in-flight ops.
MonoTime deadline_;
// Number of outstanding lookups across all in-flight ops.
//
// Note: _not_ protected by lock_!
Atomic32 outstanding_lookups_;
// The number of bytes used in the buffer for pending operations.
AtomicInt<int64_t> buffer_bytes_used_;
Arena arena_;
DISALLOW_COPY_AND_ASSIGN(Batcher);
};
} // namespace internal
} // namespace client
} // namespace kudu