blob: 5dcc738e2de32b3fdf1cd8910aceabc65de632fb [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// This module is internal to the client and not a public API.
#pragma once
#include <atomic>
#include <map>
#include <memory>
#include <set>
#include <string>
#include <unordered_map>
#include <utility>
#include <vector>
#include <boost/optional/optional.hpp>
#include <glog/logging.h>
#include <gtest/gtest_prod.h>
#include "kudu/client/replica_controller-internal.h"
#include "kudu/common/partition.h"
#include "kudu/consensus/metadata.pb.h"
#include "kudu/gutil/macros.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/rpc/rpc.h"
#include "kudu/util/locks.h"
#include "kudu/util/monotime.h"
#include "kudu/util/net/net_util.h"
#include "kudu/util/semaphore.h"
#include "kudu/util/status.h"
#include "kudu/util/status_callback.h"
namespace kudu {
class Sockaddr;
namespace tserver {
class TabletServerAdminServiceProxy;
class TabletServerServiceProxy;
} // namespace tserver
namespace master {
class GetTableLocationsResponsePB;
class GetTabletLocationsResponsePB;
class TSInfoPB;
class TabletLocationsPB;
} // namespace master
namespace client {
class ClientTest_TestMasterLookupPermits_Test;
class ClientTest_TestMetaCacheExpiry_Test;
class KuduClient;
class KuduTable;
namespace internal {
// The number of tablets to fetch from the master in a round trip when performing
// a lookup of a single partition (e.g. for a write), or re-looking-up a tablet with
// stale information.
const int kFetchTabletsPerPointLookup = 10;
// The number of tablets to fetch from the master when looking up a range of tablets.
const int kFetchTabletsPerRangeLookup = 1000;
////////////////////////////////////////////////////////////
class LookupRpc;
class MetaCache;
class RemoteTablet;
// The information cached about a given tablet server in the cluster.
//
// This class is thread-safe.
class RemoteTabletServer {
public:
explicit RemoteTabletServer(const master::TSInfoPB& pb);
// Initialize the RPC proxy to this tablet server, if it is not already set up.
// This will involve a DNS lookup if there is not already an active proxy.
// If there is an active proxy, does nothing.
void InitProxy(KuduClient* client, const StatusCallback& cb);
// Update information from the given pb.
// Requires that 'pb''s UUID matches this server.
void Update(const master::TSInfoPB& pb);
// Return the current proxy to this tablet server. Requires that InitProxy()
// be called prior to this.
std::shared_ptr<tserver::TabletServerServiceProxy> proxy() const;
std::shared_ptr<tserver::TabletServerAdminServiceProxy> admin_proxy();
std::string ToString() const;
void GetHostPorts(std::vector<HostPort>* host_ports) const;
// Returns the remote server's uuid.
const std::string& permanent_uuid() const;
// Return a copy of this tablet server's location, as assigned by the master.
// If no location is assigned, the returned string will be empty.
std::string location() const;
private:
// Internal callback for DNS resolution.
void DnsResolutionFinished(const HostPort& hp,
std::vector<Sockaddr>* addrs,
KuduClient* client,
const StatusCallback& user_callback,
const Status &result_status);
mutable simple_spinlock lock_;
const std::string uuid_;
// If not assigned, location_ will be an empty string.
std::string location_;
std::vector<HostPort> rpc_hostports_;
// The path on which this server is listening for unix domain socket connections.
// This should only be used in the case that it can be determined that the tablet
// server is local to the client.
boost::optional<std::string> unix_domain_socket_path_;
std::shared_ptr<tserver::TabletServerServiceProxy> proxy_;
std::shared_ptr<tserver::TabletServerAdminServiceProxy> admin_proxy_;
DISALLOW_COPY_AND_ASSIGN(RemoteTabletServer);
};
struct RemoteReplica {
RemoteTabletServer* ts;
consensus::RaftPeerPB::Role role;
bool failed;
};
typedef std::unordered_map<std::string, std::unique_ptr<RemoteTabletServer>>
TabletServerRegistry;
typedef std::unordered_map<std::string, RemoteTabletServer*> TabletServerMap;
// A ServerPicker for tablets servers, backed by the MetaCache.
// Replicas are returned fully initialized and ready to be used.
class MetaCacheServerPicker : public rpc::ServerPicker<RemoteTabletServer> {
public:
MetaCacheServerPicker(KuduClient* client,
scoped_refptr<MetaCache> meta_cache,
const KuduTable* table,
RemoteTablet* tablet);
virtual ~MetaCacheServerPicker() {}
void PickLeader(const ServerPickedCallback& callback, const MonoTime& deadline) override;
// In the case of this MetaCacheServerPicker class, the implementation of this
// method is very selective. It marks only servers hosting the remote tablet
// the MetaCacheServerPicker object is bound to, not the entire RemoteTabletServer.
void MarkServerFailed(RemoteTabletServer* replica, const Status& status) override;
void MarkReplicaNotLeader(RemoteTabletServer* replica) override;
void MarkResourceNotFound(RemoteTabletServer* replica) override;
private:
// Called whenever a tablet lookup in the metacache completes.
void LookUpTabletCb(const ServerPickedCallback& callback,
const MonoTime& deadline,
const Status& status);
// Called when the proxy is initialized.
static void InitProxyCb(const ServerPickedCallback& callback,
RemoteTabletServer* replica,
const Status& status);
// Lock protecting accesses/updates to 'followers_'.
mutable simple_spinlock lock_;
// Reference to the client so that we can initialize a replica proxy, when we find it.
KuduClient* client_;
// A ref to the meta cache.
scoped_refptr<MetaCache> meta_cache_;
// The table we're writing to. If null, relies on tablet ID-based lookups
// instead of partition key-based lookups.
const KuduTable* table_;
// The tablet we're picking replicas for.
RemoteTablet* const tablet_;
// TSs that refused writes and that were marked as followers as a consequence.
std::set<RemoteTabletServer*> followers_;
};
// The client's view of a given tablet. This object manages lookups of
// the tablet's locations, status, etc.
//
// This class is thread-safe.
class RemoteTablet : public RefCountedThreadSafe<RemoteTablet> {
public:
RemoteTablet(std::string tablet_id,
Partition partition)
: tablet_id_(std::move(tablet_id)),
partition_(std::move(partition)),
stale_(false) {
}
// Updates this tablet's replica locations.
Status Refresh(
const TabletServerMap& tservers,
const master::TabletLocationsPB& locs_pb,
const google::protobuf::RepeatedPtrField<master::TSInfoPB>& ts_info_dict);
// Mark this tablet as stale, indicating that the cached tablet metadata is
// out of date. Staleness is checked by the MetaCache when
// LookupTabletByKey() is called to determine whether the fast (non-network)
// path can be used or whether the metadata must be refreshed from the Master.
void MarkStale();
// Whether the tablet has been marked as stale.
bool stale() const;
// Mark any replicas of this tablet hosted by 'ts' as failed. They will
// not be returned in future cache lookups.
//
// The provided status is used for logging.
void MarkReplicaFailed(RemoteTabletServer *ts, const Status& status);
// Return the number of failed replicas for this tablet.
int GetNumFailedReplicas() const;
// Return the tablet server which is acting as the current LEADER for
// this tablet, provided it hasn't failed.
//
// Returns NULL if there is currently no leader, or if the leader has
// failed. Given that the replica list may change at any time,
// callers should always check the result against NULL.
RemoteTabletServer* LeaderTServer() const;
// Writes this tablet's TSes (across all replicas) to 'servers'. Skips
// failed replicas.
void GetRemoteTabletServers(std::vector<RemoteTabletServer*>* servers) const;
// Writes this tablet's replicas to 'replicas'. Skips failed replicas.
void GetRemoteReplicas(std::vector<RemoteReplica>* replicas) const;
// Return true if the tablet currently has a known LEADER replica
// (i.e the next call to LeaderTServer() is likely to return non-NULL)
bool HasLeader() const;
const std::string& tablet_id() const { return tablet_id_; }
const Partition& partition() const {
return partition_;
}
// Mark the specified tablet server as the leader of the consensus configuration in the cache.
void MarkTServerAsLeader(const RemoteTabletServer* server);
// Mark the specified tablet server as a follower in the cache.
void MarkTServerAsFollower(const RemoteTabletServer* server);
// Return stringified representation of the list of replicas for this tablet.
std::string ReplicasAsString() const;
private:
// Same as ReplicasAsString(), except that the caller must hold lock_.
std::string ReplicasAsStringUnlocked() const;
const std::string tablet_id_;
const Partition partition_;
std::atomic<bool> stale_;
mutable simple_spinlock lock_; // Protects replicas_.
std::vector<RemoteReplica> replicas_;
DISALLOW_COPY_AND_ASSIGN(RemoteTablet);
};
// MetaCacheEntry holds either a tablet and its associated `RemoteTablet`
// instance, or a non-covered partition range.
class MetaCacheEntry {
public:
MetaCacheEntry() { }
// Construct a MetaCacheEntry representing a tablet.
MetaCacheEntry(MonoTime expiration_time, scoped_refptr<RemoteTablet> tablet)
: expiration_time_(expiration_time),
tablet_(std::move(tablet)) {
}
// Construct a MetaCacheEntry representing a non-covered range with the
// provided range partition bounds.
MetaCacheEntry(MonoTime expiration_time,
std::string lower_bound_partition_key,
std::string upper_bound_partition_key)
: expiration_time_(expiration_time),
lower_bound_partition_key_(std::move(lower_bound_partition_key)),
upper_bound_partition_key_(std::move(upper_bound_partition_key)) {
}
// Returns `true` if this is a non-covered partition range.
bool is_non_covered_range() const {
DCHECK(Initialized());
return tablet_.get() == nullptr;
}
// Returns the remote tablet, should only be called if this entry contains a
// tablet.
const scoped_refptr<RemoteTablet>& tablet() const {
DCHECK(tablet_);
DCHECK(Initialized());
return tablet_;
}
// Returns the inclusive lower bound partition key for the entry.
const std::string& lower_bound_partition_key() const {
DCHECK(Initialized());
if (is_non_covered_range()) {
return lower_bound_partition_key_;
} else {
return tablet_->partition().partition_key_start();
}
}
// Returns the exclusive upper bound partition key for the entry.
const std::string& upper_bound_partition_key() const {
DCHECK(Initialized());
if (is_non_covered_range()) {
return upper_bound_partition_key_;
} else {
return tablet_->partition().partition_key_end();
}
}
const MonoTime& expiration_time() const {
return expiration_time_;
}
void refresh_expiration_time(MonoTime expiration_time) {
DCHECK(Initialized());
DCHECK(expiration_time.Initialized());
// Do not check that the new expiration time comes after the existing expiration
// time, because that may not hold if the master changes it's configured ttl.
expiration_time_ = expiration_time;
}
// Returns true if the partition key is contained in this meta cache entry.
bool Contains(const std::string& partition_key) const;
// Returns true if this meta cache entry is stale.
bool stale() const;
// Returns a formatted string representation of the metacache suitable for
// debug printing.
//
// This string will not be redacted, since table partitions are considered
// metadata.
std::string DebugString(const KuduTable* table) const;
// Returns true if the entry is initialized.
bool Initialized() const {
return expiration_time_.Initialized();
}
private:
// The expiration time of this cached entry.
MonoTime expiration_time_;
// The tablet. If this is a non-covered range then the tablet will be a nullptr.
scoped_refptr<RemoteTablet> tablet_;
// The lower bound partition key, if this is a non-covered range.
std::string lower_bound_partition_key_;
// The upper bound partition key, if this is a non-covered range.
std::string upper_bound_partition_key_;
};
// Manager of RemoteTablets and RemoteTabletServers. The client consults
// this class to look up a given tablet or server.
//
// This class will also be responsible for cache eviction policies, etc.
class MetaCache : public RefCountedThreadSafe<MetaCache> {
public:
// The passed 'client' object must remain valid as long as MetaCache is alive.
MetaCache(KuduClient* client, ReplicaController::Visibility replica_visibility);
~MetaCache() = default;
// Determines what type of operation a MetaCache lookup is being done for.
enum class LookupType {
// The lookup should only return a tablet which actually covers the
// requested partition key.
kPoint,
// The lookup should return the next tablet after the requested
// partition key if the requested key does not fall within a covered
// range.
kLowerBound
};
// Look up which tablet hosts the given partition key for a table. When it is
// available, the tablet is stored in 'remote_tablet' (if not NULL) and the
// callback is fired. Only tablets with non-failed LEADERs are considered.
//
// NOTE: the callback may be called from an IO thread or inline with this
// call if the cached data is already available.
//
// NOTE: the memory referenced by 'table' must remain valid until 'callback'
// is invoked.
void LookupTabletByKey(const KuduTable* table,
std::string partition_key,
const MonoTime& deadline,
LookupType lookup_type,
scoped_refptr<RemoteTablet>* remote_tablet,
const StatusCallback& callback);
// Look up the locations of the given tablet, storing the result in
// 'remote_tablet' if not null, and calling 'lookup_complete_cb' once the
// lookup is complete. Only tablets with non-failed LEADERs are considered.
//
// NOTE: the callback may be called from an IO thread or inline with this
// call if the cached data is already available.
void LookupTabletById(KuduClient* client,
const std::string& tablet_id,
const MonoTime& deadline,
scoped_refptr<RemoteTablet>* remote_tablet,
const StatusCallback& lookup_complete_cb);
// Lookup the given tablet by key, only consulting local information.
// Returns true and sets *entry if successful.
bool LookupEntryByKeyFastPath(const KuduTable* table,
const std::string& partition_key,
MetaCacheEntry* entry);
// Lookup the given tablet by tablet ID, only consulting local information.
// Returns true and sets *entry if successful.
bool LookupEntryByIdFastPath(const std::string& tablet_id,
MetaCacheEntry* entry);
// Process the response for the given key-based lookup parameters, indexing
// the location information as appropriate.
Status ProcessGetTableLocationsResponse(const KuduTable* table,
const std::string& partition_key,
bool is_exact_lookup,
const master::GetTableLocationsResponsePB& resp,
MetaCacheEntry* cache_entry,
int max_returned_locations);
// Clears the non-covered range entries from a table's meta cache.
void ClearNonCoveredRangeEntries(const std::string& table_id);
// Clears the meta cache.
void ClearCache();
// Mark any replicas of any tablets hosted by 'ts' as failed. They will
// not be returned in future cache lookups.
void MarkTSFailed(RemoteTabletServer* ts, const Status& status);
// Acquire or release a permit to perform a (slow) master lookup.
//
// If acquisition fails, caller may still do the lookup, but is first
// blocked for a short time to prevent lookup storms.
bool AcquireMasterLookupPermit();
void ReleaseMasterLookupPermit();
// Return stringified representation of the given partition key, using "<start>" if empty.
static std::string DebugLowerBoundPartitionKey(const KuduTable* table,
const std::string& partition_key);
private:
friend class LookupRpc;
friend class LookupRpcById;
FRIEND_TEST(client::ClientTest, TestMasterLookupPermits);
FRIEND_TEST(client::ClientTest, TestMetaCacheExpiry);
// Called on the slow LookupTablet path when the master responds. Populates
// the tablet caches and returns a reference to the first one.
Status ProcessLookupResponse(const LookupRpc& rpc,
MetaCacheEntry* cache_entry,
int max_returned_locations);
// Process the response for the given id-based lookup parameters, indexing
// the location information as appropriate.
Status ProcessGetTabletLocationsResponse(const std::string& tablet_id,
const master::GetTabletLocationsResponsePB& resp,
MetaCacheEntry* cache_entry);
// Perform the complete fast-path lookup. Returns:
// - NotFound if the lookup hits a non-covering range.
// - Incomplete if the fast path was not possible
// - OK if the lookup was successful.
//
// If 'lookup_type' is kLowerBound, then 'partition_key' will be updated to indicate the
// start of the range for the matched tablet.
Status DoFastPathLookup(const KuduTable* table,
std::string* partition_key, // in-out parameter
LookupType lookup_type,
scoped_refptr<RemoteTablet>* remote_tablet);
// Perform the fast-path lookup by tablet ID. Returns:
// - Incomplete if there was no cache entry
// - OK if the lookup was successful
//
// If 'remote_tablet' isn't null, it is populated with a pointer to the
// RemoteTablet being looked up. Otherwise, just does the lookup, priming the
// cache with the location.
Status DoFastPathLookupById(const std::string& tablet_id,
scoped_refptr<RemoteTablet>* remote_tablet);
// Update our information about the given tablet server.
//
// This is called when we get some response from the master which contains
// the latest host/port info for a server.
//
// NOTE: Must be called with lock_ held.
void UpdateTabletServerUnlocked(const master::TSInfoPB& pb);
KuduClient* client_;
percpu_rwlock lock_;
// Registry of all tablet servers as a map of tablet server's
// UUID -> std::unique_ptr<RemoteTabletServer>.
//
// Given that the set of tablet servers in a cluster is bounded by physical
// machines and every tablet server has its unique identifier, we never remove
// entries from this map until the MetaCache is destructed. Note that the
// ClearCache() method doesn't touch this registry, but updates ts_cache_ map
// below which contains raw pointers to the elements in this registry.
// So, there is no need to use shared_ptr and alike for the entries.
//
// Protected by lock_.
TabletServerRegistry ts_registry_;
// Cache of Tablet Server locations: TS UUID -> RemoteTabletServer*.
// The cache can be cleared by the ClearCache() method.
//
// Protected by lock_.
TabletServerMap ts_cache_;
// Cache entries for tablets and non-covered ranges, keyed by table ID, used
// for key-based lookups.
//
// Protected by lock_.
typedef std::map<std::string, MetaCacheEntry> TabletMap;
std::unordered_map<std::string, TabletMap> tablets_by_table_and_key_;
// Cache entries for tablets, keyed by tablet ID, used for ID-based lookups.
// NOTE: existence in 'tablets_by_table_and_key' does not imply existence in
// 'entry_by_tablet_id_', and vice versa.
//
// Protected by lock_.
//
// TODO(awong): it might be nice for ID-based lookups and table-based lookups
// to use the same entries. It's currently tricky to do so since ID-based
// lookups don't incur any table metadata, making lookups by table ID tricky.
std::unordered_map<std::string, MetaCacheEntry> entry_by_tablet_id_;
// The underlying remote tablets pointed to by the above cache entry
// containers, keyed by tablet ID. If an entry does not exist for a given
// tablet ID in this container, none can exist in either of the above
// containers.
//
// Protected by lock_
std::unordered_map<std::string, scoped_refptr<RemoteTablet>> tablets_by_id_;
// Prevents master lookup "storms" by delaying master lookups when all
// permits have been acquired.
Semaphore master_lookup_sem_;
// Policy on tablet replica visibility: what type of replicas to expose.
const ReplicaController::Visibility replica_visibility_;
DISALLOW_COPY_AND_ASSIGN(MetaCache);
};
} // namespace internal
} // namespace client
} // namespace kudu