blob: 8a3d20c5684a59f3b762588f92008f0549e3f50a [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "host.hpp"
#include "collection_iterator.hpp"
#include "row.hpp"
#include "value.hpp"
using namespace datastax;
using namespace datastax::internal::core;
namespace datastax { namespace internal { namespace core {
void add_host(CopyOnWriteHostVec& hosts, const Host::Ptr& host) {
HostVec::iterator i;
for (i = hosts->begin(); i != hosts->end(); ++i) {
if ((*i)->address() == host->address()) {
*i = host;
break;
}
}
if (i == hosts->end()) {
hosts->push_back(host);
}
}
void remove_host(CopyOnWriteHostVec& hosts, const Host::Ptr& host) {
remove_host(hosts, host->address());
}
bool remove_host(CopyOnWriteHostVec& hosts, const Address& address) {
HostVec::iterator i;
for (i = hosts->begin(); i != hosts->end(); ++i) {
if ((*i)->address() == address) {
hosts->erase(i);
return true;
}
}
return false;
}
}}} // namespace datastax::internal::core
void Host::LatencyTracker::update(uint64_t latency_ns) {
uint64_t now = uv_hrtime();
ScopedSpinlock l(SpinlockPool<LatencyTracker>::get_spinlock(this));
TimestampedAverage previous = current_;
if (previous.num_measured < threshold_to_account_) {
current_.average = -1;
} else if (previous.average < 0) {
current_.average = latency_ns;
} else {
int64_t delay = now - previous.timestamp;
if (delay <= 0) {
return;
}
double scaled_delay = static_cast<double>(delay) / scale_ns_;
double weight = log(scaled_delay + 1) / scaled_delay;
current_.average =
static_cast<int64_t>((1.0 - weight) * latency_ns + weight * previous.average);
}
current_.num_measured = previous.num_measured + 1;
current_.timestamp = now;
}
bool VersionNumber::parse(const String& version) {
return sscanf(version.c_str(), "%d.%d.%d", &major_version_, &minor_version_, &patch_version_) >=
2;
}
void Host::set(const Row* row, bool use_tokens) {
const Value* v;
String rack;
row->get_string_by_name("rack", &rack);
String dc;
row->get_string_by_name("data_center", &dc);
String release_version;
row->get_string_by_name("release_version", &release_version);
rack_ = rack;
dc_ = dc;
VersionNumber server_version;
if (server_version.parse(release_version)) {
server_version_ = server_version;
} else {
LOG_WARN("Invalid release version string \"%s\" on host %s", release_version.c_str(),
address().to_string().c_str());
}
// Possibly correct for invalid Cassandra version numbers for specific
// versions of DSE.
if (server_version_ >= VersionNumber(4, 0, 0) &&
row->get_by_name("dse_version") != NULL) { // DSE only
String dse_version_str;
row->get_string_by_name("dse_version", &dse_version_str);
if (dse_server_version_.parse(dse_version_str)) {
// Versions before DSE 6.7 erroneously return they support Cassandra 4.0.0
// features even though they don't.
if (dse_server_version_ < VersionNumber(6, 7, 0)) {
server_version_ = VersionNumber(3, 11, 0);
}
}
}
row->get_string_by_name("partitioner", &partitioner_);
if (use_tokens) {
v = row->get_by_name("tokens");
if (v != NULL && v->is_collection()) {
CollectionIterator iterator(v);
while (iterator.next()) {
tokens_.push_back(iterator.value()->to_string());
}
}
}
v = row->get_by_name("rpc_address");
if (v && !v->is_null()) {
if (!v->decoder().as_inet(v->size(), address_.port(), &rpc_address_)) {
LOG_WARN("Invalid address format for `rpc_address`");
}
if (Address("0.0.0.0", 0).equals(rpc_address_, false) ||
Address("::", 0).equals(rpc_address_, false)) {
LOG_WARN("Found host with 'bind any' for rpc_address; using listen_address (%s) to contact "
"instead. "
"If this is incorrect you should configure a specific interface for rpc_address on "
"the server.",
address_string_.c_str());
v = row->get_by_name("listen_address"); // Available in system.local
if (v && !v->is_null()) {
v->decoder().as_inet(v->size(), address_.port(), &rpc_address_);
} else {
v = row->get_by_name("peer"); // Available in system.peers
if (v && !v->is_null()) {
v->decoder().as_inet(v->size(), address_.port(), &rpc_address_);
}
}
if (!rpc_address_.is_valid()) {
LOG_WARN("Unable to set rpc_address from either listen_address or peer");
}
}
} else {
LOG_WARN("No rpc_address for host %s in system.local or system.peers.",
address_string_.c_str());
}
}
static CassInet to_inet(const Host::Ptr& host) {
CassInet address;
if (host->address().is_resolved()) {
address.address_length = host->address().to_inet(address.address);
} else {
address.address_length = host->rpc_address().to_inet(&address.address);
}
return address;
}
ExternalHostListener::ExternalHostListener(const CassHostListenerCallback callback, void* data)
: callback_(callback)
, data_(data) {}
void ExternalHostListener::on_host_up(const Host::Ptr& host) {
callback_(CASS_HOST_LISTENER_EVENT_UP, to_inet(host), data_);
}
void ExternalHostListener::on_host_down(const Host::Ptr& host) {
callback_(CASS_HOST_LISTENER_EVENT_DOWN, to_inet(host), data_);
}
void ExternalHostListener::on_host_added(const Host::Ptr& host) {
callback_(CASS_HOST_LISTENER_EVENT_ADD, to_inet(host), data_);
}
void ExternalHostListener::on_host_removed(const Host::Ptr& host) {
callback_(CASS_HOST_LISTENER_EVENT_REMOVE, to_inet(host), data_);
}