blob: add5d9dbe1f6d41bfe271316d7f88190de48a155 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tserver/scanners.h"
#include <algorithm>
#include <cstdint>
#include <functional>
#include <memory>
#include <mutex>
#include <numeric>
#include <ostream>
#include <gflags/gflags.h>
#include "kudu/common/column_predicate.h"
#include "kudu/common/encoded_key.h"
#include "kudu/common/iterator.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/gutil/dynamic_annotations.h"
#include "kudu/gutil/hash/string_hash.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/rpc/remote_user.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_metrics.h"
#include "kudu/tserver/scanner_metrics.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/locks.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/status.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/thread.h"
DEFINE_int32(scanner_ttl_ms, 60000,
"Number of milliseconds of inactivity allowed for a scanner"
"before it may be expired");
TAG_FLAG(scanner_ttl_ms, advanced);
TAG_FLAG(scanner_ttl_ms, runtime);
DEFINE_int32(scanner_gc_check_interval_us, 5 * 1000L *1000L, // 5 seconds
"Number of microseconds in the interval at which we remove expired scanners");
TAG_FLAG(scanner_gc_check_interval_us, hidden);
DEFINE_int32(scan_history_count, 20,
"Number of completed scans to keep history for. Determines how many historical "
"scans will be shown on the tablet server's scans dashboard.");
TAG_FLAG(scan_history_count, experimental);
METRIC_DEFINE_gauge_size(server, active_scanners,
"Active Scanners",
kudu::MetricUnit::kScanners,
"Number of scanners that are currently active",
kudu::MetricLevel::kInfo);
using std::string;
using std::unique_ptr;
using std::unordered_map;
using std::vector;
using strings::Substitute;
namespace kudu {
using rpc::RemoteUser;
using tablet::TabletReplica;
namespace tserver {
ScannerManager::ScannerManager(const scoped_refptr<MetricEntity>& metric_entity)
: shutdown_(false),
shutdown_cv_(&shutdown_lock_),
completed_scans_offset_(0) {
if (metric_entity) {
metrics_.reset(new ScannerMetrics(metric_entity));
METRIC_active_scanners.InstantiateFunctionGauge(
metric_entity, [this]() { return this->CountActiveScanners(); })
->AutoDetach(&metric_detacher_);
}
for (size_t i = 0; i < kNumScannerMapStripes; i++) {
scanner_maps_.push_back(new ScannerMapStripe());
}
if (FLAGS_scan_history_count > 0) {
completed_scans_.reserve(FLAGS_scan_history_count);
}
}
ScannerManager::~ScannerManager() {
{
MutexLock l(shutdown_lock_);
shutdown_ = true;
shutdown_cv_.Broadcast();
}
if (removal_thread_.get() != nullptr) {
CHECK_OK(ThreadJoiner(removal_thread_.get()).Join());
}
STLDeleteElements(&scanner_maps_);
}
Status ScannerManager::StartRemovalThread() {
RETURN_NOT_OK(Thread::Create("scanners", "removal_thread",
[this]() { this->RunRemovalThread(); },
&removal_thread_));
return Status::OK();
}
void ScannerManager::RunRemovalThread() {
while (true) {
// Loop until we are shutdown.
{
MutexLock l(shutdown_lock_);
if (shutdown_) {
return;
}
shutdown_cv_.WaitFor(MonoDelta::FromMicroseconds(FLAGS_scanner_gc_check_interval_us));
}
RemoveExpiredScanners();
}
}
ScannerManager::ScannerMapStripe& ScannerManager::GetStripeByScannerId(const string& scanner_id) {
size_t slot = HashStringThoroughly(scanner_id.data(), scanner_id.size()) % kNumScannerMapStripes;
return *scanner_maps_[slot];
}
void ScannerManager::NewScanner(const scoped_refptr<TabletReplica>& tablet_replica,
const RemoteUser& remote_user,
uint64_t row_format_flags,
SharedScanner* scanner) {
// Keep trying to generate a unique ID until we get one.
bool success = false;
while (!success) {
string id = oid_generator_.Next();
scanner->reset(new Scanner(id,
tablet_replica,
remote_user,
metrics_.get(),
row_format_flags));
ScannerMapStripe& stripe = GetStripeByScannerId(id);
std::lock_guard<RWMutex> l(stripe.lock_);
success = InsertIfNotPresent(&stripe.scanners_by_id_, id, *scanner);
}
}
Status ScannerManager::LookupScanner(const string& scanner_id,
const string& username,
TabletServerErrorPB::Code* error_code,
SharedScanner* scanner) {
SharedScanner ret;
ScannerMapStripe& stripe = GetStripeByScannerId(scanner_id);
shared_lock<RWMutex> l(stripe.lock_);
bool found_scanner = FindCopy(stripe.scanners_by_id_, scanner_id, &ret);
if (!found_scanner) {
*error_code = TabletServerErrorPB::SCANNER_EXPIRED;
return Status::NotFound(Substitute("Scanner $0 not found (it may have expired)",
scanner_id));
}
if (username != ret->remote_user().username()) {
*error_code = TabletServerErrorPB::NOT_AUTHORIZED;
return Status::NotAuthorized(Substitute("User $0 doesn't own scanner $1",
username, scanner_id));
}
*scanner = std::move(ret);
return Status::OK();
}
bool ScannerManager::UnregisterScanner(const string& scanner_id) {
ScanDescriptor descriptor;
ScannerMapStripe& stripe = GetStripeByScannerId(scanner_id);
{
std::lock_guard<RWMutex> l(stripe.lock_);
auto it = stripe.scanners_by_id_.find(scanner_id);
if (it == stripe.scanners_by_id_.end()) {
return false;
}
bool is_initted = it->second->is_initted();
if (is_initted) {
descriptor = it->second->Descriptor();
descriptor.state = it->second->iter()->HasNext() ? ScanState::kFailed : ScanState::kComplete;
}
stripe.scanners_by_id_.erase(it);
if (!is_initted) {
return true;
}
}
std::lock_guard<RWMutex> l(completed_scans_lock_);
RecordCompletedScanUnlocked(std::move(descriptor));
return true;
}
size_t ScannerManager::CountActiveScanners() const {
size_t total = 0;
for (const ScannerMapStripe* e : scanner_maps_) {
shared_lock<RWMutex> l(e->lock_);
total += e->scanners_by_id_.size();
}
return total;
}
void ScannerManager::ListScanners(std::vector<SharedScanner>* scanners) const {
for (const ScannerMapStripe* stripe : scanner_maps_) {
shared_lock<RWMutex> l(stripe->lock_);
for (const auto& se : stripe->scanners_by_id_) {
scanners->push_back(se.second);
}
}
}
vector<ScanDescriptor> ScannerManager::ListScans() const {
unordered_map<string, ScanDescriptor> scans;
for (const ScannerMapStripe* stripe : scanner_maps_) {
shared_lock<RWMutex> l(stripe->lock_);
for (const auto& se : stripe->scanners_by_id_) {
if (se.second->is_initted()) {
ScanDescriptor desc = se.second->Descriptor();
desc.state = ScanState::kActive;
EmplaceOrDie(&scans, se.first, std::move(desc));
}
}
}
{
shared_lock<RWMutex> l(completed_scans_lock_);
// A scanner in 'scans' may have completed between the above loop and here.
// As we'd rather have the finalized descriptor of the completed scan,
// update over the old descriptor in this case.
for (const auto& scan : completed_scans_) {
InsertOrUpdate(&scans, scan.scanner_id, scan);
}
}
vector<ScanDescriptor> ret;
ret.reserve(scans.size());
AppendValuesFromMap(scans, &ret);
// Sort oldest to newest, so that the ordering is consistent across calls.
std::sort(ret.begin(), ret.end(), [] (const ScanDescriptor& a, const ScanDescriptor& b) {
return a.start_time > b.start_time;
});
return ret;
}
void ScannerManager::RemoveExpiredScanners() {
MonoDelta scanner_ttl = MonoDelta::FromMilliseconds(FLAGS_scanner_ttl_ms);
const MonoTime now = MonoTime::Now();
vector<ScanDescriptor> descriptors;
for (ScannerMapStripe* stripe : scanner_maps_) {
std::lock_guard<RWMutex> l(stripe->lock_);
for (auto it = stripe->scanners_by_id_.begin(); it != stripe->scanners_by_id_.end();) {
const SharedScanner& scanner = it->second;
MonoDelta idle_time = scanner->TimeSinceLastAccess(now);
if (idle_time <= scanner_ttl) {
++it;
continue;
}
// The scanner has expired because of inactivity.
LOG(INFO) << Substitute(
"Expiring scanner id: $0, of tablet $1, "
"after $2 ms of inactivity, which is > TTL ($3 ms).",
it->first,
scanner->tablet_id(),
idle_time.ToMilliseconds(),
scanner_ttl.ToMilliseconds());
if (scanner->is_initted()) {
descriptors.emplace_back(scanner->Descriptor());
}
it = stripe->scanners_by_id_.erase(it);
if (metrics_) {
metrics_->scanners_expired->Increment();
}
}
}
std::lock_guard<RWMutex> l(completed_scans_lock_);
for (auto& descriptor : descriptors) {
descriptor.last_access_time = now;
descriptor.state = ScanState::kExpired;
RecordCompletedScanUnlocked(std::move(descriptor));
}
}
void ScannerManager::RecordCompletedScanUnlocked(ScanDescriptor descriptor) {
if (completed_scans_.capacity() == 0) {
return;
}
if (completed_scans_.size() == completed_scans_.capacity()) {
completed_scans_[completed_scans_offset_++] = std::move(descriptor);
if (completed_scans_offset_ == completed_scans_.capacity()) {
completed_scans_offset_ = 0;
}
} else {
completed_scans_.emplace_back(std::move(descriptor));
}
}
const std::string Scanner::kNullTabletId = "null tablet";
Scanner::Scanner(string id, const scoped_refptr<TabletReplica>& tablet_replica,
RemoteUser remote_user, ScannerMetrics* metrics,
uint64_t row_format_flags)
: id_(std::move(id)),
tablet_replica_(tablet_replica),
remote_user_(std::move(remote_user)),
start_time_(MonoTime::Now()),
row_format_flags_(row_format_flags),
metrics_(metrics),
arena_(256),
last_access_time_(start_time_),
call_seq_id_(0),
num_rows_returned_(0) {
if (tablet_replica_) {
auto tablet = tablet_replica->shared_tablet();
if (tablet && tablet->metrics()) {
tablet->metrics()->tablet_active_scanners->Increment();
}
}
}
Scanner::~Scanner() {
if (tablet_replica_) {
auto tablet = tablet_replica_->shared_tablet();
if (tablet && tablet->metrics()) {
tablet->metrics()->tablet_active_scanners->IncrementBy(-1);
}
}
if (metrics_) {
metrics_->SubmitScannerDuration(start_time_);
}
}
void Scanner::AddTimings(const CpuTimes& elapsed) {
std::unique_lock<RWMutex> l(cpu_times_lock_);
cpu_times_.Add(elapsed);
}
void Scanner::Init(unique_ptr<RowwiseIterator> iter,
unique_ptr<ScanSpec> spec,
unique_ptr<Schema> client_projection) {
lock_.AssertAcquired();
CHECK(!iter_) << "Already initialized";
iter_ = std::move(iter);
spec_ = std::move(spec);
client_projection_schema_ = std::move(client_projection);
initted_.store(true, std::memory_order_release);
}
const ScanSpec& Scanner::spec() const {
return *spec_;
}
IteratorStats Scanner::UpdateStatsAndGetDelta() {
// Here we have to dig into the per-column iterator stats, sum them up, and then
// subtract out the total that we already reported in a previous scan.
lock_.AssertAcquired();
vector<IteratorStats> stats_by_col;
iter_->GetIteratorStats(&stats_by_col);
IteratorStats total_stats = std::accumulate(stats_by_col.begin(),
stats_by_col.end(),
IteratorStats());
IteratorStats delta_stats = total_stats - already_reported_stats_;
already_reported_stats_ = total_stats;
return delta_stats;
}
ScanDescriptor Scanner::Descriptor() const {
// Ignore non-initialized scans. The initializing state is transient, and
// handling it correctly is complicated. Since the scanner is initialized we
// can assume iter_, spec_, and client_projection_schema_ are valid
// pointers.
CHECK(is_initted());
ScanDescriptor descriptor;
descriptor.tablet_id = tablet_id();
descriptor.scanner_id = id();
descriptor.remote_user = remote_user();
descriptor.start_time = start_time_;
for (const auto& column : client_projection_schema_->columns()) {
descriptor.projected_columns.emplace_back(column.name());
}
const auto& tablet_metadata = tablet_replica_->tablet_metadata();
descriptor.table_name = tablet_metadata->table_name();
if (spec().lower_bound_key()) {
descriptor.predicates.emplace_back(
Substitute("PRIMARY KEY >= $0", KUDU_REDACT(
spec().lower_bound_key()->Stringify(tablet_metadata->schema()))));
}
if (spec().exclusive_upper_bound_key()) {
descriptor.predicates.emplace_back(
Substitute("PRIMARY KEY < $0", KUDU_REDACT(
spec().exclusive_upper_bound_key()->Stringify(tablet_metadata->schema()))));
}
for (const auto& predicate : spec().predicates()) {
descriptor.predicates.emplace_back(predicate.second.ToString());
}
vector<IteratorStats> iterator_stats;
iter_->GetIteratorStats(&iterator_stats);
DCHECK_EQ(iterator_stats.size(), iter_->schema().num_columns());
for (int col_idx = 0; col_idx < iterator_stats.size(); col_idx++) {
descriptor.iterator_stats.emplace_back(iter_->schema().column(col_idx).name(),
iterator_stats[col_idx]);
}
descriptor.last_call_seq_id = ANNOTATE_UNPROTECTED_READ(call_seq_id_);
descriptor.last_access_time = last_access_time_.load(std::memory_order_relaxed);
descriptor.cpu_times = cpu_times();
return descriptor;
}
CpuTimes Scanner::cpu_times() const {
shared_lock<RWMutex> l(cpu_times_lock_);
return cpu_times_;
}
} // namespace tserver
} // namespace kudu