blob: 6fb9d775d8be0ba87d17ff607a1da632af8c5da9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tserver/scanners.h"
#include <algorithm>
#include <cstdint>
#include <mutex>
#include <ostream>
#include <gflags/gflags.h>
#include "kudu/common/column_predicate.h"
#include "kudu/common/encoded_key.h"
#include "kudu/common/iterator.h"
#include "kudu/common/scan_spec.h"
#include "kudu/common/schema.h"
#include "kudu/gutil/bind.h"
#include "kudu/gutil/bind_helpers.h"
#include "kudu/gutil/hash/string_hash.h"
#include "kudu/gutil/map-util.h"
#include "kudu/gutil/stl_util.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metadata.h"
#include "kudu/tablet/tablet_metrics.h"
#include "kudu/tserver/scanner_metrics.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/logging.h"
#include "kudu/util/metrics.h"
#include "kudu/util/status.h"
#include "kudu/util/thread.h"
DEFINE_int32(scanner_ttl_ms, 60000,
"Number of milliseconds of inactivity allowed for a scanner"
"before it may be expired");
TAG_FLAG(scanner_ttl_ms, advanced);
DEFINE_int32(scanner_gc_check_interval_us, 5 * 1000L *1000L, // 5 seconds
"Number of microseconds in the interval at which we remove expired scanners");
TAG_FLAG(scanner_gc_check_interval_us, hidden);
DEFINE_int32(scan_history_count, 20,
"Number of completed scans to keep history for. Determines how many historical "
"scans will be shown on the tablet server's scans dashboard.");
TAG_FLAG(scan_history_count, experimental);
METRIC_DEFINE_gauge_size(server, active_scanners,
"Active Scanners",
kudu::MetricUnit::kScanners,
"Number of scanners that are currently active");
using std::string;
using std::vector;
using strings::Substitute;
namespace kudu {
using tablet::TabletReplica;
namespace tserver {
ScannerManager::ScannerManager(const scoped_refptr<MetricEntity>& metric_entity)
: shutdown_(false),
shutdown_cv_(&shutdown_lock_),
completed_scans_offset_(0) {
if (metric_entity) {
metrics_.reset(new ScannerMetrics(metric_entity));
METRIC_active_scanners.InstantiateFunctionGauge(
metric_entity, Bind(&ScannerManager::CountActiveScanners,
Unretained(this)))
->AutoDetach(&metric_detacher_);
}
for (size_t i = 0; i < kNumScannerMapStripes; i++) {
scanner_maps_.push_back(new ScannerMapStripe());
}
if (FLAGS_scan_history_count > 0) {
completed_scans_.reserve(FLAGS_scan_history_count);
}
}
ScannerManager::~ScannerManager() {
{
MutexLock l(shutdown_lock_);
shutdown_ = true;
shutdown_cv_.Broadcast();
}
if (removal_thread_.get() != nullptr) {
CHECK_OK(ThreadJoiner(removal_thread_.get()).Join());
}
STLDeleteElements(&scanner_maps_);
}
Status ScannerManager::StartRemovalThread() {
RETURN_NOT_OK(Thread::Create("scanners", "removal_thread",
&ScannerManager::RunRemovalThread, this,
&removal_thread_));
return Status::OK();
}
void ScannerManager::RunRemovalThread() {
while (true) {
// Loop until we are shutdown.
{
MutexLock l(shutdown_lock_);
if (shutdown_) {
return;
}
shutdown_cv_.WaitFor(MonoDelta::FromMicroseconds(FLAGS_scanner_gc_check_interval_us));
}
RemoveExpiredScanners();
}
}
ScannerManager::ScannerMapStripe& ScannerManager::GetStripeByScannerId(const string& scanner_id) {
size_t slot = HashStringThoroughly(scanner_id.data(), scanner_id.size()) % kNumScannerMapStripes;
return *scanner_maps_[slot];
}
void ScannerManager::NewScanner(const scoped_refptr<TabletReplica>& tablet_replica,
const std::string& requestor_string,
uint64_t row_format_flags,
SharedScanner* scanner) {
// Keep trying to generate a unique ID until we get one.
bool success = false;
while (!success) {
// TODO(KUDU-1918): are these UUIDs predictable? If so, we should
// probably generate random numbers instead, since we can safely
// just retry until we avoid a collision. Alternatively we could
// verify that the requestor userid does not change mid-scan.
string id = oid_generator_.Next();
scanner->reset(new Scanner(id,
tablet_replica,
requestor_string,
metrics_.get(),
row_format_flags));
ScannerMapStripe& stripe = GetStripeByScannerId(id);
std::lock_guard<RWMutex> l(stripe.lock_);
success = InsertIfNotPresent(&stripe.scanners_by_id_, id, *scanner);
}
}
bool ScannerManager::LookupScanner(const string& scanner_id, SharedScanner* scanner) {
ScannerMapStripe& stripe = GetStripeByScannerId(scanner_id);
shared_lock<RWMutex> l(stripe.lock_);
return FindCopy(stripe.scanners_by_id_, scanner_id, scanner);
}
bool ScannerManager::UnregisterScanner(const string& scanner_id) {
ScanDescriptor descriptor;
ScannerMapStripe& stripe = GetStripeByScannerId(scanner_id);
{
std::lock_guard<RWMutex> l(stripe.lock_);
auto it = stripe.scanners_by_id_.find(scanner_id);
if (it == stripe.scanners_by_id_.end()) {
return false;
}
bool is_initialized = it->second->IsInitialized();
if (is_initialized) {
descriptor = it->second->descriptor();
descriptor.state = it->second->iter()->HasNext() ? ScanState::kFailed : ScanState::kComplete;
}
stripe.scanners_by_id_.erase(it);
if (!is_initialized) {
return true;
}
}
std::lock_guard<RWMutex> l(completed_scans_lock_);
RecordCompletedScanUnlocked(std::move(descriptor));
return true;
}
size_t ScannerManager::CountActiveScanners() const {
size_t total = 0;
for (const ScannerMapStripe* e : scanner_maps_) {
shared_lock<RWMutex> l(e->lock_);
total += e->scanners_by_id_.size();
}
return total;
}
void ScannerManager::ListScanners(std::vector<SharedScanner>* scanners) const {
for (const ScannerMapStripe* stripe : scanner_maps_) {
shared_lock<RWMutex> l(stripe->lock_);
for (const auto& se : stripe->scanners_by_id_) {
scanners->push_back(se.second);
}
}
}
vector<ScanDescriptor> ScannerManager::ListScans() const {
vector<ScanDescriptor> scans;
for (const ScannerMapStripe* stripe : scanner_maps_) {
shared_lock<RWMutex> l(stripe->lock_);
for (const auto& se : stripe->scanners_by_id_) {
if (se.second->IsInitialized()) {
scans.emplace_back(se.second->descriptor());
scans.back().state = ScanState::kActive;
}
}
}
{
shared_lock<RWMutex> l(completed_scans_lock_);
scans.insert(scans.end(), completed_scans_.begin(), completed_scans_.end());
}
// TODO(dan): It's possible for a descriptor to be included twice in the
// result set if its scanner is concurrently removed from the scanner map.
// Sort oldest to newest, so that the ordering is consistent across calls.
std::sort(scans.begin(), scans.end(), [] (const ScanDescriptor& a, const ScanDescriptor& b) {
return a.start_time > b.start_time;
});
return scans;
}
void ScannerManager::RemoveExpiredScanners() {
MonoDelta scanner_ttl = MonoDelta::FromMilliseconds(FLAGS_scanner_ttl_ms);
const MonoTime now = MonoTime::Now();
vector<ScanDescriptor> descriptors;
for (ScannerMapStripe* stripe : scanner_maps_) {
std::lock_guard<RWMutex> l(stripe->lock_);
for (auto it = stripe->scanners_by_id_.begin(); it != stripe->scanners_by_id_.end();) {
const SharedScanner& scanner = it->second;
MonoDelta idle_time = scanner->TimeSinceLastAccess(now);
if (idle_time <= scanner_ttl) {
++it;
continue;
}
// The scanner has expired because of inactivity.
LOG(INFO) << Substitute(
"Expiring scanner id: $0, of tablet $1, "
"after $2 ms of inactivity, which is > TTL ($3 ms).",
it->first,
scanner->tablet_id(),
idle_time.ToMilliseconds(),
scanner_ttl.ToMilliseconds());
if (scanner->IsInitialized()) {
descriptors.emplace_back(scanner->descriptor());
}
it = stripe->scanners_by_id_.erase(it);
if (metrics_) {
metrics_->scanners_expired->Increment();
}
}
}
std::lock_guard<RWMutex> l(completed_scans_lock_);
for (auto& descriptor : descriptors) {
descriptor.last_access_time = now;
descriptor.state = ScanState::kExpired;
RecordCompletedScanUnlocked(std::move(descriptor));
}
}
void ScannerManager::RecordCompletedScanUnlocked(ScanDescriptor descriptor) {
if (completed_scans_.capacity() == 0) {
return;
}
if (completed_scans_.size() == completed_scans_.capacity()) {
completed_scans_[completed_scans_offset_++] = std::move(descriptor);
if (completed_scans_offset_ == completed_scans_.capacity()) {
completed_scans_offset_ = 0;
}
} else {
completed_scans_.emplace_back(std::move(descriptor));
}
}
const std::string Scanner::kNullTabletId = "null tablet";
Scanner::Scanner(string id, const scoped_refptr<TabletReplica>& tablet_replica,
string requestor_string, ScannerMetrics* metrics,
uint64_t row_format_flags)
: id_(std::move(id)),
tablet_replica_(tablet_replica),
requestor_string_(std::move(requestor_string)),
call_seq_id_(0),
start_time_(MonoTime::Now()),
metrics_(metrics),
arena_(256),
row_format_flags_(row_format_flags),
num_rows_returned_(0) {
if (tablet_replica_) {
auto tablet = tablet_replica->shared_tablet();
if (tablet && tablet->metrics()) {
tablet->metrics()->tablet_active_scanners->Increment();
}
}
UpdateAccessTime();
}
Scanner::~Scanner() {
if (tablet_replica_) {
auto tablet = tablet_replica_->shared_tablet();
if (tablet && tablet->metrics()) {
tablet->metrics()->tablet_active_scanners->IncrementBy(-1);
}
}
if (metrics_) {
metrics_->SubmitScannerDuration(start_time_);
}
}
void Scanner::UpdateAccessTime() {
std::lock_guard<simple_spinlock> l(lock_);
last_access_time_ = MonoTime::Now();
}
void Scanner::Init(gscoped_ptr<RowwiseIterator> iter,
gscoped_ptr<ScanSpec> spec) {
std::lock_guard<simple_spinlock> l(lock_);
CHECK(!iter_) << "Already initialized";
iter_.reset(iter.release());
spec_.reset(spec.release());
}
const ScanSpec& Scanner::spec() const {
return *spec_;
}
void Scanner::GetIteratorStats(vector<IteratorStats>* stats) const {
iter_->GetIteratorStats(stats);
}
ScanDescriptor Scanner::descriptor() const {
// Ignore non-initialized scans. The initializing state is transient, and
// handling it correctly is complicated. Since the scanner is initialized we
// can assume iter(), spec(), and client_projection_schema() return valid
// pointers.
CHECK(IsInitialized());
ScanDescriptor descriptor;
descriptor.tablet_id = tablet_id();
descriptor.scanner_id = id();
descriptor.requestor = requestor_string();
descriptor.start_time = start_time_;
for (const auto& column : client_projection_schema()->columns()) {
descriptor.projected_columns.emplace_back(column.name());
}
const auto& tablet_metadata = tablet_replica_->tablet_metadata();
descriptor.table_name = tablet_metadata->table_name();
if (spec().lower_bound_key()) {
descriptor.predicates.emplace_back(
Substitute("PRIMARY KEY >= $0", KUDU_REDACT(
spec().lower_bound_key()->Stringify(tablet_metadata->schema()))));
}
if (spec().exclusive_upper_bound_key()) {
descriptor.predicates.emplace_back(
Substitute("PRIMARY KEY < $0", KUDU_REDACT(
spec().exclusive_upper_bound_key()->Stringify(tablet_metadata->schema()))));
}
for (const auto& predicate : spec().predicates()) {
descriptor.predicates.emplace_back(predicate.second.ToString());
}
vector<IteratorStats> iterator_stats;
GetIteratorStats(&iterator_stats);
DCHECK_EQ(iterator_stats.size(), iter()->schema().num_columns());
for (int col_idx = 0; col_idx < iterator_stats.size(); col_idx++) {
descriptor.iterator_stats.emplace_back(iter()->schema().column(col_idx).name(),
iterator_stats[col_idx]);
}
{
std::lock_guard<simple_spinlock> l(lock_);
descriptor.last_call_seq_id = call_seq_id_;
descriptor.last_access_time = last_access_time_;
}
return descriptor;
}
} // namespace tserver
} // namespace kudu