blob: e0575e6f80a46f206a506166b8af74732d8cddf9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tablet/ops/op_tracker.h"
#include <algorithm>
#include <cstddef>
#include <limits>
#include <mutex>
#include <ostream>
#include <string>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <google/protobuf/message.h>
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/ops/op.h"
#include "kudu/tablet/ops/op_driver.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/flag_validators.h"
#include "kudu/util/logging.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
DEFINE_int64(tablet_transaction_memory_limit_mb, 64,
"Maximum amount of memory that may be consumed by all in-flight "
"ops belonging to a particular tablet. When this limit "
"is reached, new ops will be rejected and clients will "
"be forced to retry them. If -1, op memory tracking is "
"disabled.");
TAG_FLAG(tablet_transaction_memory_limit_mb, advanced);
DECLARE_int64(rpc_max_message_size);
METRIC_DEFINE_gauge_uint64(tablet, all_transactions_inflight,
"Ops In Flight",
kudu::MetricUnit::kTransactions,
"Number of ops currently in-flight, including any type.",
kudu::MetricLevel::kDebug);
METRIC_DEFINE_gauge_uint64(tablet, write_transactions_inflight,
"Write Ops In Flight",
kudu::MetricUnit::kTransactions,
"Number of write ops currently in-flight",
kudu::MetricLevel::kDebug);
METRIC_DEFINE_gauge_uint64(tablet, alter_schema_transactions_inflight,
"Alter Schema Ops In Flight",
kudu::MetricUnit::kTransactions,
"Number of alter schema ops currently in-flight",
kudu::MetricLevel::kDebug);
METRIC_DEFINE_counter(tablet, transaction_memory_pressure_rejections,
"Op Memory Pressure Rejections",
kudu::MetricUnit::kTransactions,
"Number of ops rejected because the tablet's op"
"memory usage exceeds the op memory limit or the limit"
"of an ancestral tracker.",
kudu::MetricLevel::kWarn);
METRIC_DEFINE_counter(tablet, transaction_memory_limit_rejections,
"Tablet Op Memory Limit Rejections",
kudu::MetricUnit::kTransactions,
"Number of ops rejected because the tablet's "
"op memory limit was reached.",
kudu::MetricLevel::kWarn);
using std::shared_ptr;
using std::string;
using std::vector;
using strings::Substitute;
static bool ValidateOpMemoryLimit(const char* flagname, int64_t value) {
// -1 is a special value for the --tablet_transaction_memory_limit_mb flag.
if (value < -1) {
LOG(ERROR) << Substitute("$0: invalid value for flag $1", value, flagname);
return false;
}
return true;
}
DEFINE_validator(tablet_transaction_memory_limit_mb, ValidateOpMemoryLimit);
static bool ValidateOpMemoryAndRpcSize() {
const int64_t op_max_size =
FLAGS_tablet_transaction_memory_limit_mb * 1024 * 1024;
const int64_t rpc_max_size = FLAGS_rpc_max_message_size;
if (op_max_size >= 0 && op_max_size < rpc_max_size) {
LOG(ERROR) << Substitute(
"--tablet_transaction_memory_limit_mb is set too low compared with "
"--rpc_max_message_size; increase --tablet_transaction_memory_limit_mb "
"at least up to $0", (rpc_max_size + 1024 * 1024 - 1) / (1024 * 1024));
return false;
}
return true;
}
GROUP_FLAG_VALIDATOR(transaction_memory_and_rpc_size,
ValidateOpMemoryAndRpcSize);
namespace kudu {
namespace tablet {
#define MINIT(x) x(METRIC_##x.Instantiate(entity))
#define GINIT(x) x(METRIC_##x.Instantiate(entity, 0))
OpTracker::Metrics::Metrics(const scoped_refptr<MetricEntity>& entity)
: GINIT(all_transactions_inflight),
GINIT(write_transactions_inflight),
GINIT(alter_schema_transactions_inflight),
MINIT(transaction_memory_pressure_rejections),
MINIT(transaction_memory_limit_rejections) {
}
#undef GINIT
#undef MINIT
OpTracker::State::State()
: memory_footprint(0) {
}
OpTracker::OpTracker() {
}
OpTracker::~OpTracker() {
#ifndef NDEBUG
std::lock_guard<simple_spinlock> l(lock_);
DCHECK(pending_ops_.empty());
#endif
}
Status OpTracker::Add(OpDriver* driver) {
size_t driver_mem_footprint = driver->state()->request()->SpaceUsedLong();
if (mem_tracker_ && !mem_tracker_->TryConsume(driver_mem_footprint)) {
if (metrics_) {
metrics_->transaction_memory_pressure_rejections->Increment();
if (!mem_tracker_->CanConsumeNoAncestors(driver_mem_footprint)) {
metrics_->transaction_memory_limit_rejections->Increment();
}
}
// May be null in unit tests.
TabletReplica* replica = driver->state()->tablet_replica();
string msg = Substitute(
"op on tablet $0 rejected due to memory pressure: the memory "
"usage of this op ($1) plus the current consumption ($2) "
"exceeds the op memory limit ($3) or the limit of an ancestral "
"memory tracker.",
replica ? replica->tablet()->tablet_id() : "(unknown)",
driver_mem_footprint, mem_tracker_->consumption(), mem_tracker_->limit());
KLOG_EVERY_N_SECS(WARNING, 1) << msg << THROTTLE_MSG;
return Status::ServiceUnavailable(msg);
}
IncrementCounters(*driver);
// Cache the op memory footprint so we needn't refer to the request
// again, as it may disappear between now and then.
State st;
st.memory_footprint = driver_mem_footprint;
std::lock_guard<simple_spinlock> l(lock_);
InsertOrDie(&pending_ops_, driver, st);
return Status::OK();
}
void OpTracker::IncrementCounters(const OpDriver& driver) const {
if (!metrics_) {
return;
}
metrics_->all_transactions_inflight->Increment();
switch (driver.op_type()) {
case Op::WRITE_OP:
metrics_->write_transactions_inflight->Increment();
break;
case Op::ALTER_SCHEMA_OP:
metrics_->alter_schema_transactions_inflight->Increment();
break;
case Op::PARTICIPANT_OP:
// TODO(awong): implement me!
break;
}
}
void OpTracker::DecrementCounters(const OpDriver& driver) const {
if (!metrics_) {
return;
}
DCHECK_GT(metrics_->all_transactions_inflight->value(), 0);
metrics_->all_transactions_inflight->Decrement();
switch (driver.op_type()) {
case Op::WRITE_OP:
DCHECK_GT(metrics_->write_transactions_inflight->value(), 0);
metrics_->write_transactions_inflight->Decrement();
break;
case Op::ALTER_SCHEMA_OP:
DCHECK_GT(metrics_->alter_schema_transactions_inflight->value(), 0);
metrics_->alter_schema_transactions_inflight->Decrement();
break;
case Op::PARTICIPANT_OP:
// TODO(awong): implement me!
break;
}
}
void OpTracker::Release(OpDriver* driver) {
DecrementCounters(*driver);
// Remove the op from the map updating memory consumption if needed.
std::lock_guard<simple_spinlock> l(lock_);
if (mem_tracker_) {
const State& st = FindOrDie(pending_ops_, driver);
mem_tracker_->Release(st.memory_footprint);
}
if (PREDICT_FALSE(pending_ops_.erase(driver) != 1)) {
LOG(FATAL) << Substitute("Could not remove pending op from map: $0",
driver->ToStringUnlocked());
}
}
void OpTracker::GetPendingOps(
vector<scoped_refptr<OpDriver> >* pending_out) const {
DCHECK(pending_out->empty());
std::lock_guard<simple_spinlock> l(lock_);
for (const TxnMap::value_type& e : pending_ops_) {
// Increments refcount of each op.
pending_out->push_back(e.first);
}
}
int OpTracker::GetNumPendingForTests() const {
std::lock_guard<simple_spinlock> l(lock_);
return pending_ops_.size();
}
void OpTracker::WaitForAllToFinish() const {
// Wait indefinitely.
CHECK_OK(WaitForAllToFinish(MonoDelta::FromNanoseconds(std::numeric_limits<int64_t>::max())));
}
Status OpTracker::WaitForAllToFinish(const MonoDelta& timeout) const {
static constexpr size_t kMaxTxnsToPrint = 50;
int wait_time_us = 250;
int num_complaints = 0;
MonoTime start_time = MonoTime::Now();
MonoTime next_log_time = start_time + MonoDelta::FromSeconds(1);
while (1) {
vector<scoped_refptr<OpDriver> > ops;
GetPendingOps(&ops);
if (ops.empty()) {
break;
}
MonoTime now = MonoTime::Now();
MonoDelta diff = now - start_time;
if (diff > timeout) {
return Status::TimedOut(Substitute("Timed out waiting for all ops to finish. "
"$0 ops pending. Waited for $1",
ops.size(), diff.ToString()));
}
if (now > next_log_time) {
LOG(WARNING) << Substitute("OpTracker waiting for $0 outstanding ops to"
" complete now for $1", ops.size(), diff.ToString());
LOG(INFO) << Substitute("Dumping up to $0 currently running ops: ",
kMaxTxnsToPrint);
const auto num_op_limit = std::min(ops.size(), kMaxTxnsToPrint);
for (auto i = 0; i < num_op_limit; i++) {
LOG(INFO) << ops[i]->ToString();
}
num_complaints++;
// Exponential back-off on how often the ops are dumped.
next_log_time = now + MonoDelta::FromSeconds(1 << std::min(8, num_complaints));
}
wait_time_us = std::min(wait_time_us * 5 / 4, 1000000);
SleepFor(MonoDelta::FromMicroseconds(wait_time_us));
}
return Status::OK();
}
void OpTracker::StartInstrumentation(
const scoped_refptr<MetricEntity>& metric_entity) {
metrics_.reset(new Metrics(metric_entity));
}
void OpTracker::StartMemoryTracking(
const shared_ptr<MemTracker>& parent_mem_tracker) {
if (FLAGS_tablet_transaction_memory_limit_mb != -1) {
mem_tracker_ = MemTracker::CreateTracker(
FLAGS_tablet_transaction_memory_limit_mb * 1024 * 1024,
"op_tracker",
parent_mem_tracker);
}
}
} // namespace tablet
} // namespace kudu