blob: ae6d681d9e63e99008ca19822e45311a588e715e [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef DATASTAX_INTERNAL_TOKEN_MAP_IMPL_HPP
#define DATASTAX_INTERNAL_TOKEN_MAP_IMPL_HPP
#include "collection_iterator.hpp"
#include "constants.hpp"
#include "dense_hash_map.hpp"
#include "dense_hash_set.hpp"
#include "deque.hpp"
#include "json.hpp"
#include "map_iterator.hpp"
#include "result_iterator.hpp"
#include "result_response.hpp"
#include "row.hpp"
#include "string_ref.hpp"
#include "token_map.hpp"
#include "value.hpp"
#include "vector.hpp"
#include <algorithm>
#include <assert.h>
#include <iomanip>
#include <ios>
#include <uv.h>
#define CASS_NETWORK_TOPOLOGY_STRATEGY "NetworkTopologyStrategy"
#define CASS_SIMPLE_STRATEGY "SimpleStrategy"
namespace std {
template <>
struct equal_to<datastax::internal::core::Host::Ptr> {
bool operator()(const datastax::internal::core::Host::Ptr& lhs,
const datastax::internal::core::Host::Ptr& rhs) const {
if (lhs == rhs) {
return true;
}
if (!lhs || !rhs) {
return false;
}
return lhs->address() == rhs->address();
}
};
#if defined(HASH_IN_TR1) && !defined(_WIN32)
namespace tr1 {
#endif
template <>
struct hash<datastax::internal::core::Host::Ptr> {
std::size_t operator()(const datastax::internal::core::Host::Ptr& host) const {
if (!host) return 0;
return hasher(host->address());
}
SPARSEHASH_HASH<datastax::internal::core::Address> hasher;
};
#if defined(HASH_IN_TR1) && !defined(_WIN32)
} // namespace tr1
#endif
} // namespace std
namespace datastax { namespace internal { namespace core {
class IdGenerator {
public:
typedef DenseHashMap<String, uint32_t> IdMap;
static const uint32_t EMPTY_KEY;
static const uint32_t DELETED_KEY;
IdGenerator() { ids_.set_empty_key(String()); }
uint32_t get(const String& key) {
if (key.empty()) {
return 0;
}
IdMap::const_iterator i = ids_.find(key);
if (i != ids_.end()) {
return i->second;
}
// This will never generate a 0 identifier. So 0 can be used as
// inalid or empty.
uint32_t id = ids_.size() + 1;
ids_[key] = id;
return id;
}
private:
IdMap ids_;
};
struct Murmur3Partitioner {
typedef int64_t Token;
static Token from_string(const StringRef& str);
static Token hash(const StringRef& str);
static StringRef name() { return "Murmur3Partitioner"; }
};
struct RandomPartitioner {
struct Token {
uint64_t hi;
uint64_t lo;
bool operator<(const Token& other) const {
return hi == other.hi ? lo < other.lo : hi < other.hi;
}
bool operator==(const Token& other) const { return hi == other.hi && lo == other.lo; }
};
static Token abs(Token token);
static uint64_t encode(uint8_t* bytes);
static Token from_string(const StringRef& str);
static Token hash(const StringRef& str);
static StringRef name() { return "RandomPartitioner"; }
};
class ByteOrderedPartitioner {
public:
typedef Vector<uint8_t> Token;
static Token from_string(const StringRef& str);
static Token hash(const StringRef& str);
static StringRef name() { return "ByteOrderedPartitioner"; }
};
inline std::ostream& operator<<(std::ostream& os, const RandomPartitioner::Token& token) {
os << std::setfill('0') << std::setw(16) << std::hex << token.hi << std::setfill('0')
<< std::setw(16) << std::hex << token.lo;
return os;
}
inline std::ostream& operator<<(std::ostream& os, const ByteOrderedPartitioner::Token& token) {
for (ByteOrderedPartitioner::Token::const_iterator it = token.begin(), end = token.end();
it != end; ++it) {
os << std::hex << *it;
}
return os;
}
class HostSet : public DenseHashSet<Host::Ptr> {
public:
HostSet() {
set_empty_key(Host::Ptr(new Host(Address::EMPTY_KEY)));
set_deleted_key(Host::Ptr(new Host(Address::DELETED_KEY)));
}
template <class InputIterator>
HostSet(InputIterator first, InputIterator last)
: DenseHashSet<Host::Ptr>(first, last, Host::Ptr(new Host(Address::EMPTY_KEY))) {
set_deleted_key(Host::Ptr(new Host(Address::DELETED_KEY)));
}
};
class RackSet : public DenseHashSet<uint32_t> {
public:
RackSet() {
set_empty_key(IdGenerator::EMPTY_KEY);
set_deleted_key(IdGenerator::DELETED_KEY);
}
};
struct Datacenter {
Datacenter()
: num_nodes(0) {}
size_t num_nodes;
RackSet racks;
};
class DatacenterMap : public DenseHashMap<uint32_t, Datacenter> {
public:
DatacenterMap() {
set_empty_key(IdGenerator::EMPTY_KEY);
set_deleted_key(IdGenerator::DELETED_KEY);
}
};
struct ReplicationFactor {
ReplicationFactor()
: count(0) {}
size_t count;
String name; // Used for logging the datacenter name
bool operator==(const ReplicationFactor& other) const {
return count == other.count && name == other.name;
}
};
inline void build_datacenters(const HostSet& hosts, DatacenterMap& result) {
result.clear();
for (HostSet::const_iterator i = hosts.begin(), end = hosts.end(); i != end; ++i) {
uint32_t dc = (*i)->dc_id();
uint32_t rack = (*i)->rack_id();
if (dc != 0 && rack != 0) {
Datacenter& datacenter = result[dc];
datacenter.racks.insert(rack);
datacenter.num_nodes++;
}
}
}
class ReplicationFactorMap : public DenseHashMap<uint32_t, ReplicationFactor> {
public:
ReplicationFactorMap() { set_empty_key(IdGenerator::EMPTY_KEY); }
};
template <class Partitioner>
class ReplicationStrategy {
public:
typedef typename Partitioner::Token Token;
typedef std::pair<Token, Host*> TokenHost;
typedef Vector<TokenHost> TokenHostVec;
typedef std::pair<Token, CopyOnWriteHostVec> TokenReplicas;
typedef Vector<TokenReplicas> TokenReplicasVec;
typedef Deque<typename TokenHostVec::const_iterator> TokenHostQueue;
struct DatacenterRackInfo {
DatacenterRackInfo()
: replica_count(0)
, replication_factor(0)
, rack_count(0) {}
size_t replica_count;
size_t replication_factor;
RackSet racks_observed;
size_t rack_count;
TokenHostQueue skipped_endpoints;
};
class DatacenterRackInfoMap : public DenseHashMap<uint32_t, DatacenterRackInfo> {
public:
DatacenterRackInfoMap() {
DenseHashMap<uint32_t, DatacenterRackInfo>::set_empty_key(IdGenerator::EMPTY_KEY);
}
};
enum Type { NETWORK_TOPOLOGY_STRATEGY, SIMPLE_STRATEGY, NON_REPLICATED };
ReplicationStrategy()
: type_(NON_REPLICATED) {}
void init(IdGenerator& dc_ids, const VersionNumber& cassandra_version, const Row* row);
bool operator!=(const ReplicationStrategy& other) const {
return type_ != other.type_ || replication_factors_ != other.replication_factors_;
}
void build_replicas(const TokenHostVec& tokens, const DatacenterMap& datacenters,
TokenReplicasVec& result) const;
private:
void build_replicas_network_topology(const TokenHostVec& tokens, const DatacenterMap& datacenters,
TokenReplicasVec& result) const;
void build_replicas_simple(const TokenHostVec& tokens, const DatacenterMap& datacenters,
TokenReplicasVec& result) const;
void build_replicas_non_replicated(const TokenHostVec& tokens, const DatacenterMap& datacenters,
TokenReplicasVec& result) const;
private:
Type type_;
ReplicationFactorMap replication_factors_;
};
template <class Partitioner>
void ReplicationStrategy<Partitioner>::init(IdGenerator& dc_ids,
const VersionNumber& cassandra_version,
const Row* row) {
StringRef strategy_class;
if (cassandra_version >= VersionNumber(3, 0, 0)) {
const Value* value = row->get_by_name("replication");
if (value && value->is_map() && is_string_type(value->primary_value_type()) &&
is_string_type(value->secondary_value_type())) {
MapIterator iterator(value);
while (iterator.next()) {
String key(iterator.key()->to_string());
if (key == "class") {
strategy_class = iterator.value()->to_string_ref();
} else {
String value(iterator.value()->to_string());
size_t count = strtoul(value.c_str(), NULL, 10);
if (count > 0) {
ReplicationFactor replication_factor;
replication_factor.count = count;
replication_factor.name = key;
if (key == "replication_factor") {
replication_factors_[1] = replication_factor;
} else {
replication_factors_[dc_ids.get(key)] = replication_factor;
}
} else {
LOG_WARN("Replication factor of 0 for option %s", key.c_str());
}
}
}
}
} else {
const Value* value;
value = row->get_by_name("strategy_class");
if (value && is_string_type(value->value_type())) {
strategy_class = value->to_string_ref();
}
value = row->get_by_name("strategy_options");
Vector<char> buf = value->decoder().as_vector();
json::Document d;
d.ParseInsitu(&buf[0]);
if (!d.HasParseError() && d.IsObject()) {
for (json::Value::ConstMemberIterator i = d.MemberBegin(); i != d.MemberEnd(); ++i) {
String key(i->name.GetString(), i->name.GetStringLength());
String value(i->value.GetString(), i->value.GetStringLength());
size_t count = strtoul(value.c_str(), NULL, 10);
if (count > 0) {
ReplicationFactor replication_factor;
replication_factor.count = count;
replication_factor.name = key;
if (key == "replication_factor") {
replication_factors_[1] = replication_factor;
} else {
replication_factors_[dc_ids.get(key)] = replication_factor;
}
} else {
LOG_WARN("Replication factor of 0 for option %s", key.c_str());
}
}
}
}
if (ends_with(strategy_class, CASS_NETWORK_TOPOLOGY_STRATEGY)) {
type_ = NETWORK_TOPOLOGY_STRATEGY;
} else if (ends_with(strategy_class, CASS_SIMPLE_STRATEGY)) {
type_ = SIMPLE_STRATEGY;
}
}
template <class Partitioner>
void ReplicationStrategy<Partitioner>::build_replicas(const TokenHostVec& tokens,
const DatacenterMap& datacenters,
TokenReplicasVec& result) const {
result.clear();
result.reserve(tokens.size());
switch (type_) {
case NETWORK_TOPOLOGY_STRATEGY:
build_replicas_network_topology(tokens, datacenters, result);
break;
case SIMPLE_STRATEGY:
build_replicas_simple(tokens, datacenters, result);
break;
default:
build_replicas_non_replicated(tokens, datacenters, result);
break;
}
}
// Adds unique replica. It returns true if the replica was added.
inline bool add_replica(CopyOnWriteHostVec& hosts, const Host::Ptr& host) {
for (HostVec::const_reverse_iterator it = hosts->rbegin(); it != hosts->rend(); ++it) {
if ((*it)->address() == host->address()) {
return false; // Already in the replica set
}
}
hosts->push_back(host);
return true;
}
template <class Partitioner>
void ReplicationStrategy<Partitioner>::build_replicas_network_topology(
const TokenHostVec& tokens, const DatacenterMap& datacenters, TokenReplicasVec& result) const {
if (replication_factors_.empty()) {
return;
}
DatacenterRackInfoMap dc_racks;
dc_racks.resize(datacenters.size());
size_t num_replicas = 0;
// Populate the datacenter and rack information. Only considering valid
// datacenters that actually have hosts. If there's a replication factor
// for a datacenter that doesn't exist or has no node then it will not
// be counted.
for (ReplicationFactorMap::const_iterator i = replication_factors_.begin(),
end = replication_factors_.end();
i != end; ++i) {
DatacenterMap::const_iterator j = datacenters.find(i->first);
// Don't include datacenters that don't exist
if (j != datacenters.end()) {
// A replication factor cannot exceed the number of nodes in a datacenter
size_t replication_factor = std::min<size_t>(i->second.count, j->second.num_nodes);
num_replicas += replication_factor;
DatacenterRackInfo dc_rack_info;
dc_rack_info.replication_factor = replication_factor;
dc_rack_info.rack_count = j->second.racks.size();
dc_racks[j->first] = dc_rack_info;
} else {
LOG_WARN("No nodes in datacenter '%s'. Check your replication strategies.",
i->second.name.c_str());
}
}
if (num_replicas == 0) {
return;
}
for (typename TokenHostVec::const_iterator i = tokens.begin(), end = tokens.end(); i != end;
++i) {
Token token = i->first;
typename TokenHostVec::const_iterator token_it = i;
CopyOnWriteHostVec replicas(new HostVec());
replicas->reserve(num_replicas);
// Clear datacenter and rack information for the next token
for (typename DatacenterRackInfoMap::iterator j = dc_racks.begin(), end = dc_racks.end();
j != end; ++j) {
j->second.replica_count = 0;
j->second.racks_observed.clear();
j->second.skipped_endpoints.clear();
}
for (typename TokenHostVec::const_iterator j = tokens.begin(), end = tokens.end();
j != end && replicas->size() < num_replicas; ++j) {
typename TokenHostVec::const_iterator curr_token_it = token_it;
Host* host = curr_token_it->second;
uint32_t dc = host->dc_id();
uint32_t rack = host->rack_id();
++token_it;
if (token_it == tokens.end()) {
token_it = tokens.begin();
}
typename DatacenterRackInfoMap::iterator dc_rack_it = dc_racks.find(dc);
if (dc_rack_it == dc_racks.end()) {
continue;
}
DatacenterRackInfo& dc_rack_info = dc_rack_it->second;
size_t& replica_count_this_dc = dc_rack_info.replica_count;
const size_t replication_factor = dc_rack_info.replication_factor;
if (replica_count_this_dc >= replication_factor) {
continue;
}
RackSet& racks_observed_this_dc = dc_rack_info.racks_observed;
const size_t rack_count_this_dc = dc_rack_info.rack_count;
// First, attempt to distribute replicas over all possible racks in a
// datacenter only then consider hosts in the same rack
if (rack == 0 || racks_observed_this_dc.size() == rack_count_this_dc) {
if (add_replica(replicas, Host::Ptr(host))) {
++replica_count_this_dc;
}
} else {
TokenHostQueue& skipped_endpoints_this_dc = dc_rack_info.skipped_endpoints;
if (racks_observed_this_dc.count(rack) > 0) {
skipped_endpoints_this_dc.push_back(curr_token_it);
} else {
if (add_replica(replicas, Host::Ptr(host))) {
++replica_count_this_dc;
racks_observed_this_dc.insert(rack);
}
// Once we visited every rack in the current datacenter then starting considering
// hosts we've already skipped.
if (racks_observed_this_dc.size() == rack_count_this_dc) {
while (!skipped_endpoints_this_dc.empty() &&
replica_count_this_dc < replication_factor) {
if (add_replica(replicas, Host::Ptr(skipped_endpoints_this_dc.front()->second))) {
++replica_count_this_dc;
}
skipped_endpoints_this_dc.pop_front();
}
}
}
}
}
result.push_back(TokenReplicas(token, replicas));
}
}
template <class Partitioner>
void ReplicationStrategy<Partitioner>::build_replicas_simple(const TokenHostVec& tokens,
const DatacenterMap& not_used,
TokenReplicasVec& result) const {
ReplicationFactorMap::const_iterator it = replication_factors_.find(1);
if (it == replication_factors_.end()) {
return;
}
const size_t num_tokens = tokens.size();
const size_t num_replicas = std::min<size_t>(it->second.count, num_tokens);
for (typename TokenHostVec::const_iterator i = tokens.begin(), end = tokens.end(); i != end;
++i) {
CopyOnWriteHostVec replicas(new HostVec());
replicas->reserve(num_replicas);
typename TokenHostVec::const_iterator token_it = i;
for (size_t j = 0; j < num_tokens && replicas->size() < num_replicas; ++j) {
add_replica(replicas, Host::Ptr(Host::Ptr(token_it->second)));
++token_it;
if (token_it == tokens.end()) {
token_it = tokens.begin();
}
}
result.push_back(TokenReplicas(i->first, replicas));
}
}
template <class Partitioner>
void ReplicationStrategy<Partitioner>::build_replicas_non_replicated(
const TokenHostVec& tokens, const DatacenterMap& not_used, TokenReplicasVec& result) const {
for (typename TokenHostVec::const_iterator i = tokens.begin(); i != tokens.end(); ++i) {
CopyOnWriteHostVec replicas(new HostVec(1, Host::Ptr(i->second)));
result.push_back(TokenReplicas(i->first, replicas));
}
}
template <class Partitioner>
class TokenMapImpl : public TokenMap {
public:
typedef typename Partitioner::Token Token;
typedef std::pair<Token, Host*> TokenHost;
typedef Vector<TokenHost> TokenHostVec;
struct TokenHostCompare {
bool operator()(const TokenHost& lhs, const TokenHost& rhs) const {
return lhs.first < rhs.first;
}
};
struct RemoveTokenHostIf {
RemoveTokenHostIf(const Host::Ptr& host)
: host(host) {}
bool operator()(const TokenHost& token) const {
if (!token.second) {
return false;
}
return token.second->address() == host->address();
}
const Host::Ptr& host;
};
typedef std::pair<Token, CopyOnWriteHostVec> TokenReplicas;
typedef Vector<TokenReplicas> TokenReplicasVec;
struct TokenReplicasCompare {
bool operator()(const TokenReplicas& lhs, const TokenReplicas& rhs) const {
return lhs.first < rhs.first;
}
};
typedef DenseHashMap<String, TokenReplicasVec> KeyspaceReplicaMap;
typedef DenseHashMap<String, ReplicationStrategy<Partitioner> > KeyspaceStrategyMap;
TokenMapImpl()
: no_replicas_dummy_(NULL) {
replicas_.set_empty_key(String());
replicas_.set_deleted_key(String(1, '\0'));
strategies_.set_empty_key(String());
strategies_.set_deleted_key(String(1, '\0'));
}
TokenMapImpl(const TokenMapImpl& other)
: tokens_(other.tokens_)
, hosts_(other.hosts_)
, replicas_(other.replicas_)
, strategies_(other.strategies_)
, rack_ids_(other.rack_ids_)
, dc_ids_(other.dc_ids_)
, no_replicas_dummy_(NULL) {}
virtual void add_host(const Host::Ptr& host);
virtual void update_host_and_build(const Host::Ptr& host);
virtual void remove_host_and_build(const Host::Ptr& host);
virtual void add_keyspaces(const VersionNumber& cassandra_version, const ResultResponse* result);
virtual void update_keyspaces_and_build(const VersionNumber& cassandra_version,
const ResultResponse* result);
virtual void drop_keyspace(const String& keyspace_name);
virtual void build();
virtual TokenMap::Ptr copy() const;
virtual const CopyOnWriteHostVec& get_replicas(const String& keyspace_name,
const String& routing_key) const;
virtual String dump(const String& keyspace_name) const;
public:
// Testing only
bool contains(const Token& token) const {
for (typename TokenHostVec::const_iterator i = tokens_.begin(), end = tokens_.end(); i != end;
++i) {
if (token == i->first) return true;
}
return false;
}
const TokenReplicasVec& token_replicas(const String& keyspace_name) const;
private:
void update_keyspace(const VersionNumber& cassandra_version, const ResultResponse* result,
bool should_build_replicas);
void remove_host_tokens(const Host::Ptr& host);
void update_host_ids(const Host::Ptr& host);
void build_replicas();
private:
TokenHostVec tokens_;
HostSet hosts_;
DatacenterMap datacenters_;
KeyspaceReplicaMap replicas_;
KeyspaceStrategyMap strategies_;
IdGenerator rack_ids_;
IdGenerator dc_ids_;
CopyOnWriteHostVec no_replicas_dummy_;
};
template <class Partitioner>
void TokenMapImpl<Partitioner>::add_host(const Host::Ptr& host) {
update_host_ids(host);
hosts_.insert(host);
const Vector<String>& tokens(host->tokens());
for (Vector<String>::const_iterator it = tokens.begin(), end = tokens.end(); it != end; ++it) {
Token token = Partitioner::from_string(*it);
tokens_.push_back(TokenHost(token, host.get()));
}
}
template <class Partitioner>
void TokenMapImpl<Partitioner>::update_host_and_build(const Host::Ptr& host) {
uint64_t start = uv_hrtime();
remove_host_tokens(host);
update_host_ids(host);
hosts_.insert(host);
TokenHostVec new_tokens;
const Vector<String>& tokens(host->tokens());
for (Vector<String>::const_iterator it = tokens.begin(), end = tokens.end(); it != end; ++it) {
Token token = Partitioner::from_string(*it);
new_tokens.push_back(TokenHost(token, host.get()));
}
std::sort(new_tokens.begin(), new_tokens.end());
TokenHostVec merged(tokens_.size() + new_tokens.size());
std::merge(tokens_.begin(), tokens_.end(), new_tokens.begin(), new_tokens.end(), merged.begin(),
TokenHostCompare());
tokens_ = merged;
build_replicas();
LOG_DEBUG("Updated token map with host %s (%u tokens). Rebuilt token map with %u hosts and %u "
"tokens in %f ms",
host->address_string().c_str(), (unsigned int)new_tokens.size(),
(unsigned int)hosts_.size(), (unsigned int)tokens_.size(),
(double)(uv_hrtime() - start) / (1000.0 * 1000.0));
}
template <class Partitioner>
void TokenMapImpl<Partitioner>::remove_host_and_build(const Host::Ptr& host) {
if (hosts_.find(host) == hosts_.end()) return;
uint64_t start = uv_hrtime();
remove_host_tokens(host);
hosts_.erase(host);
build_replicas();
LOG_DEBUG(
"Removed host %s from token map. Rebuilt token map with %u hosts and %u tokens in %f ms",
host->address_string().c_str(), (unsigned int)hosts_.size(), (unsigned int)tokens_.size(),
(double)(uv_hrtime() - start) / (1000.0 * 1000.0));
}
template <class Partitioner>
void TokenMapImpl<Partitioner>::add_keyspaces(const VersionNumber& cassandra_version,
const ResultResponse* result) {
update_keyspace(cassandra_version, result, false);
}
template <class Partitioner>
void TokenMapImpl<Partitioner>::update_keyspaces_and_build(const VersionNumber& cassandra_version,
const ResultResponse* result) {
update_keyspace(cassandra_version, result, true);
}
template <class Partitioner>
void TokenMapImpl<Partitioner>::drop_keyspace(const String& keyspace_name) {
replicas_.erase(keyspace_name);
strategies_.erase(keyspace_name);
}
template <class Partitioner>
void TokenMapImpl<Partitioner>::build() {
uint64_t start = uv_hrtime();
std::sort(tokens_.begin(), tokens_.end());
build_replicas();
LOG_DEBUG("Built token map with %u hosts and %u tokens in %f ms", (unsigned int)hosts_.size(),
(unsigned int)tokens_.size(), (double)(uv_hrtime() - start) / (1000.0 * 1000.0));
}
template <class Partitioner>
TokenMap::Ptr TokenMapImpl<Partitioner>::copy() const {
return Ptr(new TokenMapImpl<Partitioner>(*this));
}
template <class Partitioner>
const CopyOnWriteHostVec& TokenMapImpl<Partitioner>::get_replicas(const String& keyspace_name,
const String& routing_key) const {
typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name);
if (ks_it != replicas_.end()) {
Token token = Partitioner::hash(routing_key);
const TokenReplicasVec& replicas = ks_it->second;
typename TokenReplicasVec::const_iterator replicas_it =
std::upper_bound(replicas.begin(), replicas.end(), TokenReplicas(token, no_replicas_dummy_),
TokenReplicasCompare());
if (replicas_it != replicas.end()) {
return replicas_it->second;
} else if (!replicas.empty()) {
return replicas.front().second;
}
}
return no_replicas_dummy_;
}
template <class Partitioner>
String TokenMapImpl<Partitioner>::dump(const String& keyspace_name) const {
String result;
typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name);
const TokenReplicasVec& replicas = ks_it->second;
for (typename TokenReplicasVec::const_iterator it = replicas.begin(), end = replicas.end();
it != end; ++it) {
OStringStream ss;
ss << std::setw(20) << it->first << " [ ";
const CopyOnWriteHostVec& hosts = it->second;
for (HostVec::const_iterator host_it = hosts->begin(), end = hosts->end(); host_it != end;
++host_it) {
ss << (*host_it)->address_string() << " ";
}
ss << "]\n";
result.append(ss.str());
}
return result;
}
template <class Partitioner>
const typename TokenMapImpl<Partitioner>::TokenReplicasVec&
TokenMapImpl<Partitioner>::token_replicas(const String& keyspace_name) const {
typename KeyspaceReplicaMap::const_iterator ks_it = replicas_.find(keyspace_name);
static TokenReplicasVec not_found;
return ks_it != replicas_.end() ? ks_it->second : not_found;
}
template <class Partitioner>
void TokenMapImpl<Partitioner>::update_keyspace(const VersionNumber& cassandra_version,
const ResultResponse* result,
bool should_build_replicas) {
ResultIterator rows(result);
while (rows.next()) {
String keyspace_name;
const Row* row = rows.row();
if (!row->get_string_by_name("keyspace_name", &keyspace_name)) {
LOG_ERROR("Unable to get column value for 'keyspace_name'");
continue;
}
ReplicationStrategy<Partitioner> strategy;
strategy.init(dc_ids_, cassandra_version, row);
typename KeyspaceStrategyMap::iterator i = strategies_.find(keyspace_name);
if (i == strategies_.end() || i->second != strategy) {
if (i == strategies_.end()) {
strategies_[keyspace_name] = strategy;
} else {
i->second = strategy;
}
if (should_build_replicas) {
uint64_t start = uv_hrtime();
build_datacenters(hosts_, datacenters_);
strategy.build_replicas(tokens_, datacenters_, replicas_[keyspace_name]);
LOG_DEBUG("Updated token map with keyspace '%s'. Rebuilt token map with %u hosts and %u "
"tokens in %f ms",
keyspace_name.c_str(), (unsigned int)hosts_.size(), (unsigned int)tokens_.size(),
(double)(uv_hrtime() - start) / (1000.0 * 1000.0));
}
}
}
}
template <class Partitioner>
void TokenMapImpl<Partitioner>::remove_host_tokens(const Host::Ptr& host) {
typename TokenHostVec::iterator last =
std::remove_copy_if(tokens_.begin(), tokens_.end(), tokens_.begin(), RemoveTokenHostIf(host));
tokens_.resize(last - tokens_.begin());
}
template <class Partitioner>
void TokenMapImpl<Partitioner>::update_host_ids(const Host::Ptr& host) {
host->set_rack_and_dc_ids(rack_ids_.get(host->rack()), dc_ids_.get(host->dc()));
}
template <class Partitioner>
void TokenMapImpl<Partitioner>::build_replicas() {
build_datacenters(hosts_, datacenters_);
for (typename KeyspaceStrategyMap::const_iterator i = strategies_.begin(),
end = strategies_.end();
i != end; ++i) {
const String& keyspace_name = i->first;
const ReplicationStrategy<Partitioner>& strategy = i->second;
strategy.build_replicas(tokens_, datacenters_, replicas_[keyspace_name]);
LOG_TRACE("Replicas for keyspace '%s':\n%s", keyspace_name.c_str(),
dump(keyspace_name).c_str());
}
}
}}} // namespace datastax::internal::core
#endif