blob: 5b5bda3e9236441bf06849f06adffe2042867f07 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef DATASTAX_INTERNAL_HOST_HPP
#define DATASTAX_INTERNAL_HOST_HPP
#include "address.hpp"
#include "allocated.hpp"
#include "atomic.hpp"
#include "copy_on_write_ptr.hpp"
#include "get_time.hpp"
#include "logger.hpp"
#include "macros.hpp"
#include "map.hpp"
#include "ref_counted.hpp"
#include "scoped_ptr.hpp"
#include "spin_lock.hpp"
#include "vector.hpp"
#include <math.h>
#include <stdint.h>
namespace datastax { namespace internal { namespace core {
class Row;
struct TimestampedAverage {
TimestampedAverage()
: average(-1)
, timestamp(0)
, num_measured(0) {}
int64_t average;
uint64_t timestamp;
uint64_t num_measured;
};
class VersionNumber {
public:
VersionNumber()
: major_version_(0)
, minor_version_(0)
, patch_version_(0) {}
VersionNumber(int major_version, int minor_version, int patch_version)
: major_version_(major_version)
, minor_version_(minor_version)
, patch_version_(patch_version) {}
bool operator>=(const VersionNumber& other) const { return compare(other) >= 0; }
bool operator<(const VersionNumber& other) const { return compare(other) < 0; }
int compare(const VersionNumber& other) const {
if (major_version_ < other.major_version_) return -1;
if (major_version_ > other.major_version_) return 1;
if (minor_version_ < other.minor_version_) return -1;
if (minor_version_ > other.minor_version_) return 1;
if (patch_version_ < other.patch_version_) return -1;
if (patch_version_ > other.patch_version_) return 1;
return 0;
}
bool parse(const String& version);
int major_version() const { return major_version_; }
int minor_version() const { return minor_version_; }
int patch_version() const { return patch_version_; }
private:
int major_version_;
int minor_version_;
int patch_version_;
};
class Host : public RefCounted<Host> {
public:
typedef SharedRefPtr<Host> Ptr;
typedef SharedRefPtr<const Host> ConstPtr;
Host(const Address& address)
: address_(address)
, rpc_address_(address)
, rack_id_(0)
, dc_id_(0)
, address_string_(address.to_string())
, connection_count_(0)
, inflight_request_count_(0) {}
const Address& address() const { return address_; }
const String& address_string() const { return address_string_; }
const Address& rpc_address() const { return rpc_address_; }
void set(const Row* row, bool use_tokens);
const String& rack() const { return rack_; }
const String& dc() const { return dc_; }
void set_rack_and_dc(const String& rack, const String& dc) {
rack_ = rack;
dc_ = dc;
}
uint32_t rack_id() const { return rack_id_; }
uint32_t dc_id() const { return dc_id_; }
void set_rack_and_dc_ids(uint32_t rack_id, uint32_t dc_id) {
rack_id_ = rack_id;
dc_id_ = dc_id;
}
const String& partitioner() const { return partitioner_; }
const Vector<String>& tokens() const { return tokens_; }
const VersionNumber& server_version() const { return server_version_; }
const VersionNumber& dse_server_version() const { return dse_server_version_; }
String to_string() const {
OStringStream ss;
ss << address_string_;
if (!rack_.empty() || !dc_.empty()) {
ss << " [" << rack_ << ':' << dc_ << "]";
}
return ss.str();
}
void enable_latency_tracking(uint64_t scale, uint64_t min_measured) {
if (!latency_tracker_) {
latency_tracker_.reset(new LatencyTracker(scale, (30LL * min_measured) / 100LL));
}
}
void update_latency(uint64_t latency_ns) {
if (latency_tracker_) {
LOG_TRACE("Latency %f ms for %s", static_cast<double>(latency_ns) / 1e6, to_string().c_str());
latency_tracker_->update(latency_ns);
}
}
TimestampedAverage get_current_average() const {
if (latency_tracker_) {
return latency_tracker_->get();
}
return TimestampedAverage();
}
void increment_connection_count() { connection_count_.fetch_add(1, MEMORY_ORDER_RELAXED); }
void decrement_connection_count() { connection_count_.fetch_sub(1, MEMORY_ORDER_RELAXED); }
int32_t connection_count() const { return connection_count_.load(MEMORY_ORDER_RELAXED); }
void increment_inflight_requests() { inflight_request_count_.fetch_add(1, MEMORY_ORDER_RELAXED); }
void decrement_inflight_requests() { inflight_request_count_.fetch_sub(1, MEMORY_ORDER_RELAXED); }
int32_t inflight_request_count() const {
return inflight_request_count_.load(MEMORY_ORDER_RELAXED);
}
private:
class LatencyTracker : public Allocated {
public:
LatencyTracker(uint64_t scale_ns, uint64_t threshold_to_account)
: scale_ns_(scale_ns)
, threshold_to_account_(threshold_to_account) {}
void update(uint64_t latency_ns);
TimestampedAverage get() const {
ScopedSpinlock l(SpinlockPool<LatencyTracker>::get_spinlock(this));
return current_;
}
private:
uint64_t scale_ns_;
uint64_t threshold_to_account_;
TimestampedAverage current_;
private:
DISALLOW_COPY_AND_ASSIGN(LatencyTracker);
};
private:
Address address_;
Address rpc_address_;
uint32_t rack_id_;
uint32_t dc_id_;
String address_string_;
VersionNumber server_version_;
VersionNumber dse_server_version_;
String rack_;
String dc_;
String partitioner_;
Vector<String> tokens_;
Atomic<int32_t> connection_count_;
Atomic<int32_t> inflight_request_count_;
ScopedPtr<LatencyTracker> latency_tracker_;
private:
DISALLOW_COPY_AND_ASSIGN(Host);
};
/**
* A listener that handles cluster topology and host status changes.
*/
class HostListener {
public:
virtual ~HostListener() {}
/**
* A callback that's called when a host is marked as being UP.
*
* @param host A fully populated host object.
*/
virtual void on_host_up(const Host::Ptr& host) = 0;
/**
* A callback that's called when a host is marked as being DOWN.
*
* @param host A fully populated host object.
*/
virtual void on_host_down(const Host::Ptr& host) = 0;
/**
* A callback that's called when a new host is added to the cluster.
*
* @param host A fully populated host object.
*/
virtual void on_host_added(const Host::Ptr& host) = 0;
/**
* A callback that's called when a host is removed from a cluster.
*
* @param address The address of the host.
*/
virtual void on_host_removed(const Host::Ptr& host) = 0;
};
class DefaultHostListener
: public HostListener
, public RefCounted<DefaultHostListener> {
public:
typedef SharedRefPtr<DefaultHostListener> Ptr;
virtual void on_host_up(const Host::Ptr& host) {}
virtual void on_host_down(const Host::Ptr& host) {}
virtual void on_host_added(const Host::Ptr& host) {}
virtual void on_host_removed(const Host::Ptr& host) {}
};
class ExternalHostListener : public DefaultHostListener {
public:
typedef SharedRefPtr<ExternalHostListener> Ptr;
ExternalHostListener(const CassHostListenerCallback callback, void* data);
virtual void on_host_up(const Host::Ptr& host);
virtual void on_host_down(const Host::Ptr& host);
virtual void on_host_added(const Host::Ptr& host);
virtual void on_host_removed(const Host::Ptr& host);
private:
const CassHostListenerCallback callback_;
void* data_;
};
typedef Map<Address, Host::Ptr> HostMap;
struct GetAddress {
typedef std::pair<Address, Host::Ptr> Pair;
const Address& operator()(const Pair& pair) const { return pair.first; }
};
struct GetHost {
typedef std::pair<Address, Host::Ptr> Pair;
Host::Ptr operator()(const Pair& pair) const { return pair.second; }
};
typedef std::pair<Address, Host::Ptr> HostPair;
typedef Vector<Host::Ptr> HostVec;
typedef CopyOnWritePtr<HostVec> CopyOnWriteHostVec;
void add_host(CopyOnWriteHostVec& hosts, const Host::Ptr& host);
void remove_host(CopyOnWriteHostVec& hosts, const Host::Ptr& host);
bool remove_host(CopyOnWriteHostVec& hosts, const Address& address);
}}} // namespace datastax::internal::core
#endif