blob: 5fa2c1a15e58816894235b2b3742cc6df619481b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include <glog/logging.h>
#include <gtest/gtest_prod.h>
#include "kudu/common/iterator_stats.h"
#include "kudu/common/scan_spec.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/port.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/rpc/remote_user.h"
#include "kudu/tablet/tablet_replica.h"
#include "kudu/tserver/tserver.pb.h"
#include "kudu/util/condition_variable.h"
#include "kudu/util/memory/arena.h"
#include "kudu/util/metrics.h"
#include "kudu/util/monotime.h"
#include "kudu/util/mutex.h"
#include "kudu/util/oid_generator.h"
#include "kudu/util/rw_mutex.h"
#include "kudu/util/stopwatch.h"
namespace kudu {
class RowwiseIterator;
class Schema;
class Status;
class Thread;
namespace tserver {
class Scanner;
enum class ScanState;
struct ScanDescriptor;
struct ScannerMetrics;
typedef std::shared_ptr<Scanner> SharedScanner;
// Manages the live scanners within a Tablet Server.
//
// When a scanner is created by a client, it is assigned a unique scanner ID.
// The client may then use this ID to fetch more rows from the scanner
// or close it.
//
// Since scanners keep resources on the server, the manager periodically
// removes any scanners which have not been accessed since a configurable TTL.
class ScannerManager {
public:
explicit ScannerManager(const scoped_refptr<MetricEntity>& metric_entity);
~ScannerManager();
// Starts the expired scanner removal thread.
Status StartRemovalThread();
// Create a new scanner with a unique ID, inserting it into the map. Further
// lookups for the scanner must provide the username associated with
// 'remote_user'.
void NewScanner(const scoped_refptr<tablet::TabletReplica>& tablet_replica,
const rpc::RemoteUser& remote_user,
uint64_t row_format_flags,
SharedScanner* scanner);
// Lookup the given scanner by its ID with the provided username, setting an
// appropriate error code.
// Returns NotFound if the scanner doesn't exist, or NotAuthorized if the
// scanner wasn't created by 'username'.
Status LookupScanner(const std::string& scanner_id,
const std::string& username,
TabletServerErrorPB::Code* error_code,
SharedScanner* scanner);
// Unregister the given scanner by its ID.
// Returns true if unregistered successfully.
bool UnregisterScanner(const std::string& scanner_id);
// Return the number of scanners currently active.
// Note this method will not return accurate value
// if under concurrent modifications.
size_t CountActiveScanners() const;
// List all active scanners.
// Note this method will not return a consistent view
// of all active scanners if under concurrent modifications.
void ListScanners(std::vector<SharedScanner>* scanners) const;
// List active and recently completed scans.
std::vector<ScanDescriptor> ListScans() const;
// Iterate through scanners and remove any which are past their TTL.
void RemoveExpiredScanners();
private:
FRIEND_TEST(ScannerTest, TestExpire);
enum {
kNumScannerMapStripes = 32
};
typedef std::unordered_map<std::string, SharedScanner> ScannerMap;
struct ScannerMapStripe {
// Lock protecting the scanner map.
mutable RWMutex lock_;
// Map of the currently active scanners.
ScannerMap scanners_by_id_;
};
// Periodically call RemoveExpiredScanners().
void RunRemovalThread();
ScannerMapStripe& GetStripeByScannerId(const std::string& scanner_id);
// Adds the scan descriptor to the completed scans FIFO.
void RecordCompletedScanUnlocked(ScanDescriptor descriptor);
// (Optional) scanner metrics for this instance.
std::unique_ptr<ScannerMetrics> metrics_;
// If true, removal thread should shut itself down. Protected
// by 'shutdown_lock_' and 'shutdown_cv_'.
bool shutdown_;
mutable Mutex shutdown_lock_;
ConditionVariable shutdown_cv_;
std::vector<ScannerMapStripe*> scanner_maps_;
// completed_scans_ is a FIFO ring buffer of completed scans.
mutable RWMutex completed_scans_lock_;
std::vector<ScanDescriptor> completed_scans_;
size_t completed_scans_offset_;
// Generator for scanner IDs.
ObjectIdGenerator oid_generator_;
// Thread to remove expired scanners.
scoped_refptr<kudu::Thread> removal_thread_;
FunctionGaugeDetacher metric_detacher_;
DISALLOW_COPY_AND_ASSIGN(ScannerManager);
};
// RAII wrapper to unregister a scanner upon scope exit.
class ScopedUnregisterScanner {
public:
ScopedUnregisterScanner(ScannerManager* mgr, std::string id)
: mgr_(mgr), id_(std::move(id)), cancelled_(false) {}
~ScopedUnregisterScanner() {
if (!cancelled_) {
mgr_->UnregisterScanner(id_);
}
}
// Do not unregister the scanner when the scope is exited.
void Cancel() {
cancelled_ = true;
}
private:
ScannerManager* const mgr_;
const std::string id_;
bool cancelled_;
};
// An open scanner on the server side.
//
// NOTE: unless otherwise specified, all methods of this class require that the
// caller has acquired the access lock using Scanner::LockForAccess(). It's assumed
// that any RPC related to a scanner will acquire that lock, so that only a single
// RPC thread works on a given scanner at a time.
class Scanner {
public:
class AccessLock {
public:
AccessLock(AccessLock&& l) noexcept
: s_(l.s_),
lock_(std::move(l.lock_)) {
}
~AccessLock() {
if (lock_.owns_lock()) {
Unlock();
}
}
void Unlock() {
s_->last_access_time_.store(MonoTime::Now(), std::memory_order_relaxed);
lock_.unlock();
}
bool owns_lock() {
return lock_.owns_lock();
}
private:
friend class Scanner;
explicit AccessLock(Scanner* s)
: s_(DCHECK_NOTNULL(s)),
lock_(s->lock_) {
}
AccessLock(Scanner* s, std::try_to_lock_t try_lock)
: s_(DCHECK_NOTNULL(s)),
lock_(s->lock_, try_lock) {
}
Scanner* const s_;
std::unique_lock<Mutex> lock_;
};
Scanner(std::string id,
const scoped_refptr<tablet::TabletReplica>& tablet_replica,
rpc::RemoteUser remote_user, ScannerMetrics* metrics,
uint64_t row_format_flags);
~Scanner();
// Lock this scanner for the purposes of an RPC.
//
// While the lock is held, the TimeSinceLastAccess() method will return 0, indicating
// that a call is actively being processed. Upon destruction of the returned Lock
// object, the last-access time will be set to the current time and the internal lock
// released.
AccessLock LockForAccess() WARN_UNUSED_RESULT {
return AccessLock(this);
}
// Try to lock the scanner, but do not wait in the case that the scanner
// is already locked by another thread.
//
// Check result.owns_lock() to see if the lock was successful.
AccessLock TryLockForAccess() WARN_UNUSED_RESULT {
return AccessLock(this, std::try_to_lock);
}
// Mark the scanner as initialized. This indicates that it successfully
// created an iterator, passed validation, etc, and will allow it to
// show up in the scanner dashboard.
void Init(std::unique_ptr<RowwiseIterator> iter,
std::unique_ptr<ScanSpec> spec,
std::unique_ptr<Schema> client_projection);
RowwiseIterator* iter() {
lock_.AssertAcquired();
return DCHECK_NOTNULL(iter_.get());
}
const RowwiseIterator* iter() const {
lock_.AssertAcquired();
return DCHECK_NOTNULL(iter_.get());
}
// Add the timings in 'elapsed' to the total timings for this scanner.
void AddTimings(const CpuTimes& elapsed);
Arena* arena() {
lock_.AssertAcquired();
return &arena_;
}
const std::string& id() const { return id_; }
// Return the ScanSpec associated with this Scanner.
const ScanSpec& spec() const;
const std::string& tablet_id() const {
// scanners-test passes a null tablet_replica.
return tablet_replica_ ? tablet_replica_->tablet_id() : kNullTabletId;
}
const scoped_refptr<tablet::TabletReplica>& tablet_replica() const { return tablet_replica_; }
const rpc::RemoteUser& remote_user() const { return remote_user_; }
// Returns the current call sequence ID of the scanner.
uint32_t call_seq_id() const {
lock_.AssertAcquired();
return call_seq_id_;
}
// Increments the call sequence ID.
void IncrementCallSeqId() {
lock_.AssertAcquired();
call_seq_id_++;
}
// Return the delta from the last time this scan was updated to 'now'.
MonoDelta TimeSinceLastAccess(const MonoTime& now) const {
std::unique_lock<Mutex> l(lock_, std::try_to_lock);
if (l.owns_lock()) {
return now - last_access_time_;
}
return MonoDelta::FromMilliseconds(0);
}
// Returns the time this scan was started.
const MonoTime& start_time() const { return start_time_; }
// Returns client's projection schema.
//
// This may differ from the schema used by the iterator, which must contain all columns
// used as predicates).
const Schema* client_projection_schema() const {
lock_.AssertAcquired();
return DCHECK_NOTNULL(client_projection_schema_.get());
}
// Update the stats from the underlying scanner and return a delta since the
// previous call to this method.
IteratorStats UpdateStatsAndGetDelta();
uint64_t row_format_flags() const {
lock_.AssertAcquired();
return row_format_flags_;
}
void add_num_rows_returned(int64_t num_rows_added) {
lock_.AssertAcquired();
num_rows_returned_ += num_rows_added;
DCHECK_LE(num_rows_added, num_rows_returned_);
}
int64_t num_rows_returned() const {
lock_.AssertAcquired();
return num_rows_returned_;
}
bool has_fulfilled_limit() const {
lock_.AssertAcquired();
return spec_ && spec_->has_limit() && num_rows_returned_ >= spec_->limit();
}
// Return a descriptor of the current state of this scan.
// Does not require the AccessLock.
//
// REQUIRES: is_initted() must be true.
ScanDescriptor Descriptor() const;
// Returns the amount of CPU time accounted to this scanner.
// Does not require the AccessLock.
CpuTimes cpu_times() const;
private:
friend class ScannerManager;
static const std::string kNullTabletId;
// Return true if the scanner has been initialized (i.e has an iterator).
// Once a Scanner is initialized, it is safe to assume that iter() and spec()
// return non-NULL for the lifetime of the Scanner object.
bool is_initted() const {
return initted_.load(std::memory_order_acquire);
}
// The unique ID of this scanner.
const std::string id_;
// Tablet associated with the scanner.
const scoped_refptr<tablet::TabletReplica> tablet_replica_;
// The remote user making the request. Populated from the RemoteUser of the
// first request.
const rpc::RemoteUser remote_user_;
// The time the scanner was started.
const MonoTime start_time_;
// The row format flags the client passed, if any.
const uint64_t row_format_flags_;
// (Optional) scanner metrics struct, for recording scanner's duration.
ScannerMetrics* metrics_;
// Arena used for allocations which must last as long as the scanner
// itself. This is _not_ used for row data, which is scoped to a single RPC
// response.
Arena arena_;
// Protects access to this scanner by a single RPC at a time.
mutable Mutex lock_;
std::atomic<bool> initted_ { false };
// The spec used by 'iter_'
// Assumed to be set once initted_ is true.
std::unique_ptr<ScanSpec> spec_;
// Assumed to be set once initted_ is true.
std::unique_ptr<RowwiseIterator> iter_;
// Stores the request's projection schema, if it differs from the
// schema used by the iterator.
// Assumed to be set once initted_ is true.
std::unique_ptr<Schema> client_projection_schema_;
// The last time that the scanner was accessed.
// Only modified under lock_ but can be read outside.
std::atomic<MonoTime> last_access_time_;
// The current call sequence ID.
// Only modified under lock_ but can be read outside.
uint32_t call_seq_id_;
// A summary of the statistics already reported to the metrics system
// for this scanner. This allows us to report the metrics incrementally
// as the scanner proceeds.
// Protected by lock_.
IteratorStats already_reported_stats_;
// The number of rows that have been serialized and sent over the wire by
// this scanner.
int64_t num_rows_returned_;
// The cumulative amounts of wall, user cpu, and system cpu time spent on
// this scanner, in seconds.
mutable RWMutex cpu_times_lock_;
CpuTimes cpu_times_;
DISALLOW_COPY_AND_ASSIGN(Scanner);
};
enum class ScanState {
// The scan is actively running.
kActive,
// The scan is complete.
kComplete,
// The scan failed.
kFailed,
// The scan timed out due to inactivity.
kExpired,
};
// ScanDescriptor holds information about a scan. The ScanDescriptor can outlive
// the associated scanner without holding open any of the scanner's resources.
struct ScanDescriptor {
// The tablet ID.
std::string tablet_id;
// The scanner ID.
std::string scanner_id;
// The user that made the first request.
rpc::RemoteUser remote_user;
// The table name.
std::string table_name;
// The selected columns.
std::vector<std::string> projected_columns;
// The scan predicates. Holds both the primary key and column predicates.
std::vector<std::string> predicates;
// The per-column scan stats, paired with the column name.
std::vector<std::pair<std::string, IteratorStats>> iterator_stats;
ScanState state;
MonoTime start_time;
MonoTime last_access_time;
uint32_t last_call_seq_id;
// The cumulative amounts of wall, user cpu, and system cpu time spent on
// this scanner, in seconds.
CpuTimes cpu_times;
};
// RAII wrapper to update a scanner with timing information upon scope exit.
class ScopedAddScannerTiming {
public:
// 'scanner' must outlive the scoped object.
// object pointed to by 'cpu_times' will contain the cpu timing information of the scanner upon
// scope exit
explicit ScopedAddScannerTiming(Scanner* scanner, CpuTimes* cpu_times)
: stopped_(false),
scanner_(scanner),
cpu_times_(cpu_times) {
sw_.start();
}
~ScopedAddScannerTiming() {
if (!stopped_) {
Stop();
}
}
// Stop the timing and update the scanner.
void Stop() {
stopped_ = true;
sw_.stop();
scanner_->AddTimings(sw_.elapsed());
*cpu_times_ = scanner_->cpu_times();
}
bool stopped_;
Scanner* scanner_;
CpuTimes* cpu_times_;
Stopwatch sw_;
DISALLOW_COPY_AND_ASSIGN(ScopedAddScannerTiming);
};
} // namespace tserver
} // namespace kudu