blob: 26234e2bd5d038e444025433274d0421c0965c07 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "kudu/tserver/scanners.h"
#include <boost/bind.hpp>
#include <boost/thread/locks.hpp>
#include <gflags/gflags.h>
#include "kudu/common/iterator.h"
#include "kudu/common/scan_spec.h"
#include "kudu/gutil/hash/string_hash.h"
#include "kudu/gutil/map-util.h"
#include "kudu/tserver/scanner_metrics.h"
#include "kudu/util/flag_tags.h"
#include "kudu/util/thread.h"
#include "kudu/util/metrics.h"
DEFINE_int32(scanner_ttl_ms, 60000,
"Number of milliseconds of inactivity allowed for a scanner"
"before it may be expired");
TAG_FLAG(scanner_ttl_ms, advanced);
DEFINE_int32(scanner_gc_check_interval_us, 5 * 1000L *1000L, // 5 seconds
"Number of microseconds in the interval at which we remove expired scanners");
TAG_FLAG(scanner_gc_check_interval_us, hidden);
// TODO: would be better to scope this at a tablet level instead of
// server level.
METRIC_DEFINE_gauge_size(server, active_scanners,
"Active Scanners",
kudu::MetricUnit::kScanners,
"Number of scanners that are currently active");
namespace kudu {
using tablet::TabletPeer;
namespace tserver {
ScannerManager::ScannerManager(const scoped_refptr<MetricEntity>& metric_entity)
: shutdown_(false) {
if (metric_entity) {
metrics_.reset(new ScannerMetrics(metric_entity));
METRIC_active_scanners.InstantiateFunctionGauge(
metric_entity, Bind(&ScannerManager::CountActiveScanners,
Unretained(this)))
->AutoDetach(&metric_detacher_);
}
for (size_t i = 0; i < kNumScannerMapStripes; i++) {
scanner_maps_.push_back(new ScannerMapStripe());
}
}
ScannerManager::~ScannerManager() {
{
boost::lock_guard<boost::mutex> l(shutdown_lock_);
shutdown_ = true;
shutdown_cv_.notify_all();
}
if (removal_thread_.get() != nullptr) {
CHECK_OK(ThreadJoiner(removal_thread_.get()).Join());
}
STLDeleteElements(&scanner_maps_);
}
Status ScannerManager::StartRemovalThread() {
RETURN_NOT_OK(Thread::Create("scanners", "removal_thread",
&ScannerManager::RunRemovalThread, this,
&removal_thread_));
return Status::OK();
}
void ScannerManager::RunRemovalThread() {
while (true) {
// Loop until we are shutdown.
{
boost::unique_lock<boost::mutex> l(shutdown_lock_);
if (shutdown_) {
return;
}
boost::system_time wtime = boost::get_system_time() +
boost::posix_time::microseconds(FLAGS_scanner_gc_check_interval_us);
shutdown_cv_.timed_wait(l, wtime);
}
RemoveExpiredScanners();
}
}
ScannerManager::ScannerMapStripe& ScannerManager::GetStripeByScannerId(const string& scanner_id) {
size_t slot = HashStringThoroughly(scanner_id.data(), scanner_id.size()) % kNumScannerMapStripes;
return *scanner_maps_[slot];
}
void ScannerManager::NewScanner(const scoped_refptr<TabletPeer>& tablet_peer,
const std::string& requestor_string,
SharedScanner* scanner) {
// Keep trying to generate a unique ID until we get one.
bool success = false;
while (!success) {
// TODO(security): are these UUIDs predictable? If so, we should
// probably generate random numbers instead, since we can safely
// just retry until we avoid a collission.
string id = oid_generator_.Next();
scanner->reset(new Scanner(id, tablet_peer, requestor_string, metrics_.get()));
ScannerMapStripe& stripe = GetStripeByScannerId(id);
boost::lock_guard<boost::shared_mutex> l(stripe.lock_);
success = InsertIfNotPresent(&stripe.scanners_by_id_, id, *scanner);
}
}
bool ScannerManager::LookupScanner(const string& scanner_id, SharedScanner* scanner) {
ScannerMapStripe& stripe = GetStripeByScannerId(scanner_id);
boost::shared_lock<boost::shared_mutex> l(stripe.lock_);
return FindCopy(stripe.scanners_by_id_, scanner_id, scanner);
}
bool ScannerManager::UnregisterScanner(const string& scanner_id) {
ScannerMapStripe& stripe = GetStripeByScannerId(scanner_id);
boost::lock_guard<boost::shared_mutex> l(stripe.lock_);
return stripe.scanners_by_id_.erase(scanner_id) > 0;
}
size_t ScannerManager::CountActiveScanners() const {
size_t total = 0;
for (const ScannerMapStripe* e : scanner_maps_) {
boost::shared_lock<boost::shared_mutex> l(e->lock_);
total += e->scanners_by_id_.size();
}
return total;
}
void ScannerManager::ListScanners(std::vector<SharedScanner>* scanners) {
for (const ScannerMapStripe* stripe : scanner_maps_) {
boost::shared_lock<boost::shared_mutex> l(stripe->lock_);
for (const ScannerMapEntry& se : stripe->scanners_by_id_) {
scanners->push_back(se.second);
}
}
}
void ScannerManager::RemoveExpiredScanners() {
MonoDelta scanner_ttl = MonoDelta::FromMilliseconds(FLAGS_scanner_ttl_ms);
for (ScannerMapStripe* stripe : scanner_maps_) {
boost::lock_guard<boost::shared_mutex> l(stripe->lock_);
for (auto it = stripe->scanners_by_id_.begin(); it != stripe->scanners_by_id_.end();) {
SharedScanner& scanner = it->second;
MonoDelta time_live =
scanner->TimeSinceLastAccess(MonoTime::Now(MonoTime::COARSE));
if (time_live.MoreThan(scanner_ttl)) {
// TODO: once we have a metric for the number of scanners expired, make this a
// VLOG(1).
LOG(INFO) << "Expiring scanner id: " << it->first << ", of tablet " << scanner->tablet_id()
<< ", after " << time_live.ToMicroseconds()
<< " us of inactivity, which is > TTL ("
<< scanner_ttl.ToMicroseconds() << " us).";
it = stripe->scanners_by_id_.erase(it);
if (metrics_) {
metrics_->scanners_expired->Increment();
}
} else {
++it;
}
}
}
}
Scanner::Scanner(string id, const scoped_refptr<TabletPeer>& tablet_peer,
string requestor_string, ScannerMetrics* metrics)
: id_(std::move(id)),
tablet_peer_(tablet_peer),
requestor_string_(std::move(requestor_string)),
call_seq_id_(0),
start_time_(MonoTime::Now(MonoTime::COARSE)),
metrics_(metrics),
arena_(1024, 1024 * 1024) {
UpdateAccessTime();
}
Scanner::~Scanner() {
if (metrics_) {
metrics_->SubmitScannerDuration(start_time_);
}
}
void Scanner::UpdateAccessTime() {
boost::lock_guard<simple_spinlock> l(lock_);
last_access_time_ = MonoTime::Now(MonoTime::COARSE);
}
void Scanner::Init(gscoped_ptr<RowwiseIterator> iter,
gscoped_ptr<ScanSpec> spec) {
boost::lock_guard<simple_spinlock> l(lock_);
CHECK(!iter_) << "Already initialized";
iter_.reset(iter.release());
spec_.reset(spec.release());
}
const ScanSpec& Scanner::spec() const {
return *spec_;
}
void Scanner::GetIteratorStats(vector<IteratorStats>* stats) const {
iter_->GetIteratorStats(stats);
}
} // namespace tserver
} // namespace kudu