blob: db9de7b83f3d0decb6d943bf08b76fc28f16ef53 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cstddef>
#include <cstdint>
#include <memory>
#include <gtest/gtest_prod.h>
#include "kudu/client/batcher.h"
#include "kudu/client/client.h"
#include "kudu/client/error_collector.h"
#include "kudu/client/shared_ptr.h" // IWYU pragma: keep
#include "kudu/common/txn_id.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/util/condition_variable.h"
#include "kudu/util/monotime.h"
#include "kudu/util/mutex.h"
#include "kudu/util/status.h"
namespace kudu {
namespace rpc {
class Messenger;
} // namespace rpc
namespace client {
class KuduStatusCallback;
class KuduWriteOperation;
// This class contains the code to do the heavy-lifting for the
// kudu::KuduSession-related operations. Its interface does not assume
// thread-safety in general, but it's thread-safe regarding the following
// concurrent actions:
//
// * calls to KuduSession::Apply():
// (there is at most one call at any moment, no concurrency is assumed here).
//
// * activity of the time-based background flush task
// (there is at most one task running at any moment).
//
// * calls from messenger/reactor threads upon completion of the flushed
// operations to the corresponding tablet server
// (there can be multiple of those at any moment).
//
class KuduSession::Data {
public:
explicit Data(sp::shared_ptr<KuduClient> client,
std::weak_ptr<rpc::Messenger> messenger,
const kudu::TxnId& txn_id = kudu::TxnId::kInvalidTxnId);
void Init(sp::weak_ptr<KuduSession> session);
// Called by Batcher when a flush has finished.
void FlushFinished(internal::Batcher* batcher);
// Returns Status::IllegalState() if 'force' is false and there are still pending
// operations. If 'force' is true batcher_ is aborted even if there are pending
// operations.
Status Close(bool force);
// Set flush mode for the session.
Status SetFlushMode(FlushMode mode);
// Set external consistency mode for the session.
Status SetExternalConsistencyMode(KuduSession::ExternalConsistencyMode m);
// Set limit on buffer space consumed by buffered write operations.
Status SetBufferBytesLimit(size_t size);
// Set buffer flush watermark (in percentage of the total buffer space).
Status SetBufferFlushWatermark(int32_t watermark_pct);
// Set the interval of the background max-wait flushing (in milliseconds).
Status SetBufferFlushInterval(unsigned int period_ms);
// Set the limit on maximum number of batchers with pending operations.
Status SetMaxBatchersNum(unsigned int period_ms);
// Set timeout for write operations, in milliseconds.
void SetTimeoutMillis(int timeout_ms);
// Initiate flushing of the current batcher and invoke the specified callback
// once the flushing is finished.
void FlushAsync(KuduStatusCallback* cb);
// Initiate flushing of the current batcher and await until all batchers
// complete flushing. The return value is Status::OK() if none of the
// batchers encountered errors and Status::IOError() otherwise.
Status Flush();
// Check whether there are operations not yet sent to tablet servers.
bool HasPendingOperations() const;
bool HasPendingOperationsUnlocked() const;
// Get total number of buffered operations.
int CountBufferedOperations() const;
// Initiate flushing of the current batcher if its accumulated operations'
// on-the-wire size has reached the specified watermark. The result
// of the asynchronous flushing is reported via the specified callback.
// Even if the callback is null (nullptr), that does not mean the errors
// are dropped on the floor -- in case of an error, corresponding information
// is added into the session's error collector and can be retrieved later.
void FlushCurrentBatcher(int64_t watermark,
KuduStatusCallback* cb);
// Initiate flushing of the current batcher if it has reached the specified
// age. If the current batcher is present but it hasn't reached
// the specified age yet, just return the amount of time left until it reaches
// the specified age, not flushing the batcher. If the current batcher is
// of the specified age or older, flush the batcher and return uninitialized
// MonoDelta object. If there isn't current batcher, return uninitialized
// MonoDelta object.
MonoDelta FlushCurrentBatcher(const MonoDelta& max_age);
// Apply a write operation, i.e. push it through the batcher chain.
Status ApplyWriteOp(KuduWriteOperation* write_op);
// Check and start the time-based flush task in background, if necessary.
void TimeBasedFlushInit();
// The self-rescheduling task to flush write operations which have been
// accumulating for too long (controlled by flush_interval_).
// This does real work only in case of AUTO_FLUSH_BACKGROUND mode.
// This method is used to initiate/re-initiate the run of the task
// and re-schedule the task from within. The 'do_startup_check' parameter
// must be set to 'true' when the method is called not from the task thread.
static void TimeBasedFlushTask(const Status& status,
std::weak_ptr<rpc::Messenger> weak_messenger,
sp::weak_ptr<KuduSession> weak_session,
bool do_startup_check);
// Get the total size of pending (i.e. both freshly added and
// in process of being flushed) operations. This method is used by tests only.
int64_t GetPendingOperationsSizeForTests() const;
// Get the total number of batchers in the session.
// This method is used by tests only.
size_t GetBatchersCountForTests() const;
// Run sanity checks on a write operation: check for the presence of the
// primary key and perform other validations with regard to the column schema.
Status ValidateWriteOperation(KuduWriteOperation* op) const;
// This constant represents a meaningful name for the first argument in
// expressions like FlushCurrentBatcher(1, cbk): this is the watermark
// corresponding to 1 byte of data. This watermark level is the minimum
// possible for a non-empty batcher, so any non-empty batcher will be flushed
// if calling FlushCurrentBatcher() using this watermark.
static const int64_t kWatermarkNonEmptyBatcher = 1;
// The client that this session is associated with.
const sp::shared_ptr<KuduClient> client_;
// Weak reference to the containing session. The reference is weak to
// avoid circular referencing. The reference to the KuduSession object
// is needed by batchers and time-based flush task: being run in independent
// threads, they need to make sure the object is alive before accessing it.
sp::weak_ptr<KuduSession> session_;
// The reference to the client's messenger (keeping the reference instead of
// declaring friendship to KuduClient and accessing it via the client_).
std::weak_ptr<rpc::Messenger> messenger_;
// Buffer for errors.
scoped_refptr<internal::ErrorCollector> error_collector_;
kudu::client::KuduSession::ExternalConsistencyMode external_consistency_mode_;
// Timeout for the next batch.
MonoDelta timeout_;
// Interval for the max-wait flush background task.
MonoDelta flush_interval_; // protected by mutex_
// Whether the flush task is active/scheduled.
bool flush_task_active_; // protected by mutex_
// Current flush mode for the session's data.
FlushMode flush_mode_; // protected by mutex_
// Mutex for the condition_ member (the condition variable).
// This lock protects variables from simultaneous access:
// batcher- and byte-counting members, data flow control and other variables
// whose modification requires to notify the thread which might be waiting
// on the 'condition_' variable.
mutable Mutex mutex_;
// Condition variable used by the code which allocates next batcher
// and count bytes used by the buffered write operations.
// I.e., it used by the data flow control logic while applying/adding
// new write operations (based on mutex_).
ConditionVariable condition_;
// The current batcher being prepared.
scoped_refptr<internal::Batcher> batcher_;// protected by mutex_
// Total number of active batchers. Include the current batcher accumulating
// the newly applied operations, and other batchers with not yet flushed
// or flushed but not yet finished operations.
size_t batchers_num_; // protected by mutex_
// Limit on the number of active batchers.
size_t batchers_num_limit_;
// Session-wide limit on total size of buffer used by all batched write
// operations. The buffer is a virtual entity: there isn't contiguous place
// in the memory which would contain that 'buffered' data. Instead, buffer's
// data is spread across all pending operations in all active batchers.
// Thread-safety note: buffer_bytes_limit_ is not supposed to be modified
// from any other thread since no thread-safety is advertised for the
// kudu::KuduSession interface.
size_t buffer_bytes_limit_;
// The high-watermark level as the percentage of the buffer space used by
// freshly added (not-yet-scheduled-for-flush) write operations.
// Once the level is reached, the BackgroundFlusher triggers flushing
// of accumulated write operations when running in AUTO_FLUSH_BACKGROUND mode.
// Thread-safety note: buffer_watermark_pct_ is not supposed to be modified
// from any other thread since no thread-safety is advertised for the
// kudu::KuduSession interface.
int32_t buffer_watermark_pct_;
// The total number of bytes used by buffered write operations.
int64_t buffer_bytes_used_; // protected by mutex_
// Transaction ID for this session's transaction (if any): txn_id_.IsValid()
// returns true only if the upper-level session is a transactional one.
const TxnId txn_id_;
private:
FRIEND_TEST(ClientTest, TestAutoFlushBackgroundApplyBlocks);
FRIEND_TEST(ClientTest, TestAutoFlushBackgroundAndErrorCollector);
FRIEND_TEST(ClientTest, TxnIdOfTransactionalSession);
bool buffer_pre_flush_enabled_; // Set to 'false' only in test scenarios.
DISALLOW_COPY_AND_ASSIGN(Data);
};
} // namespace client
} // namespace kudu