| /* |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| #include "client_insights.hpp" |
| |
| #include "config.hpp" |
| #include "driver_info.hpp" |
| #include "get_time.hpp" |
| #include "logger.hpp" |
| #include "session.hpp" |
| #include "ssl.hpp" |
| #include "string.hpp" |
| #include "utils.hpp" |
| |
| #include <cassert> |
| #include <uv.h> |
| |
| #ifdef _WIN32 |
| #include <windows.h> |
| #include <winsock2.h> |
| #else |
| #include <sys/utsname.h> |
| #include <unistd.h> |
| #endif |
| #define HOSTNAME_MAX_LENGTH 256 |
| |
| #define METADATA_STARTUP_NAME "driver.startup" |
| #define METADATA_STATUS_NAME "driver.status" |
| #define METADATA_INSIGHTS_MAPPING_ID "v1" |
| #define METADATA_LANGUAGE "C/C++" |
| |
| #define CONFIG_ANTIPATTERN_MSG_MULTI_DC_HOSTS \ |
| "Contact points contain hosts from " \ |
| "multiple data centers but only one " \ |
| "is going to be used" |
| #define CONFIG_ANTIPATTERN_MSG_REMOTE_HOSTS "Using remote hosts for failover" |
| #define CONFIG_ANTIPATTERN_MSG_DOWNGRADING \ |
| "Downgrading consistency retry " \ |
| "policy in use" |
| #define CONFIG_ANTIPATTERN_MSG_CERT_VALIDATION \ |
| "Client-to-node encryption is " \ |
| "enabled but server certificate " \ |
| "validation is disabled" |
| #define CONFIG_ANTIPATTERN_MSG_PLAINTEXT_NO_SSL \ |
| "Plain text authentication is " \ |
| "enabled without client-to-node " \ |
| "encryption" \ |
| "" |
| |
| namespace datastax { namespace internal { namespace core { |
| |
| MonitorReporting* create_monitor_reporting(const String& client_id, const String& session_id, |
| const Config& config) { |
| // Ensure the client monitor events should be enabled |
| unsigned interval_secs = config.monitor_reporting_interval_secs(); |
| if (interval_secs > 0) { |
| return new enterprise::ClientInsights(client_id, session_id, interval_secs); |
| } |
| return new NopMonitorReporting(); |
| } |
| |
| }}} // namespace datastax::internal::core |
| |
| using namespace datastax::internal::core; |
| using namespace datastax::internal; |
| |
| namespace datastax { namespace internal { namespace enterprise { |
| |
| #ifdef _WIN32 |
| #define ERROR_BUFFER_MAX_LENGTH 1024 |
| String get_last_error() { |
| DWORD rc = GetLastError(); |
| |
| char buf[ERROR_BUFFER_MAX_LENGTH]; |
| size_t size = FormatMessageA(FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS, NULL, rc, |
| MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), |
| reinterpret_cast<LPSTR>(&buf[0]), ERROR_BUFFER_MAX_LENGTH, NULL); |
| String str(buf, size); |
| trim(str); |
| return str; |
| } |
| #endif |
| |
| String get_hostname() { |
| #ifdef _WIN32 |
| WSADATA data; |
| WORD version_required = MAKEWORD(2, 2); |
| if (WSAStartup(version_required, &data) != 0) { |
| LOG_WARN("Unable to determine hostname: Failed to initialize WinSock2"); |
| return String(); |
| } |
| #endif |
| |
| char buf[HOSTNAME_MAX_LENGTH + 1]; |
| size_t size = HOSTNAME_MAX_LENGTH + 1; |
| if (int rc = gethostname(buf, size) != 0) { |
| LOG_WARN("Unable to determine hostname: Error code %d", rc); |
| return "UNKNOWN"; |
| } |
| return String(buf, size); |
| } |
| |
| struct Os { |
| String name; |
| String version; |
| String arch; |
| }; |
| Os get_os() { |
| Os os; |
| #ifdef _WIN32 |
| os.name = "Microsoft Windows"; |
| |
| DWORD size = GetFileVersionInfoSize(TEXT("kernel32.dll"), NULL); |
| if (size) { |
| Vector<BYTE> version_info(size); |
| if (GetFileVersionInfo(TEXT("kernel32.dll"), 0, size, &version_info[0])) { |
| VS_FIXEDFILEINFO* file_info = NULL; |
| UINT file_info_length = 0; |
| if (VerQueryValue(&version_info[0], TEXT("\\"), reinterpret_cast<LPVOID*>(&file_info), |
| &file_info_length)) { |
| OStringStream oss; |
| oss << static_cast<int>(HIWORD(file_info->dwProductVersionMS)) << "." |
| << static_cast<int>(LOWORD(file_info->dwProductVersionMS)) << "." |
| << static_cast<int>(HIWORD(file_info->dwProductVersionLS)); |
| os.version = oss.str(); |
| } else { |
| LOG_DEBUG("Unable to retrieve Windows version: %s\n", get_last_error().c_str()); |
| } |
| } else { |
| LOG_DEBUG("Unable to retrieve Windows version (GetFileVersionInfo): %s\n", |
| get_last_error().c_str()); |
| } |
| } else { |
| LOG_DEBUG("Unable to retrieve Windows version (GetFileVersionInfoSize): %s\n", |
| get_last_error().c_str()); |
| } |
| |
| #ifdef _WIN64 |
| os.arch = "x64"; |
| #else |
| os.arch = "x86"; |
| #endif |
| #else |
| struct utsname client_info; |
| uname(&client_info); |
| os.name = client_info.sysname; |
| os.version = client_info.release; |
| os.arch = client_info.machine; |
| #endif |
| |
| return os; |
| } |
| |
| struct Cpus { |
| int length; |
| String model; |
| }; |
| Cpus get_cpus() { |
| Cpus cpus; |
| |
| uv_cpu_info_t* cpus_infos; |
| int cpus_count; |
| int rc = uv_cpu_info(&cpus_infos, &cpus_count); |
| if (rc == 0) { |
| uv_cpu_info_t cpus_info = cpus_infos[0]; |
| cpus.length = cpus_count; |
| cpus.model = cpus_info.model; |
| uv_free_cpu_info(cpus_infos, cpus_count); |
| } else { |
| LOG_DEBUG("Unable to determine CPUs information: %s\n", uv_strerror(rc)); |
| } |
| |
| return cpus; |
| } |
| |
| class ClientInsightsRequestCallback : public SimpleRequestCallback { |
| public: |
| typedef SharedRefPtr<ClientInsightsRequestCallback> Ptr; |
| |
| ClientInsightsRequestCallback(const String& json, const String& event_type) |
| : SimpleRequestCallback("CALL InsightsRpc.reportInsight('" + json + "')") |
| , event_type_(event_type) {} |
| |
| virtual void on_internal_set(ResponseMessage* response) { |
| if (response->opcode() != CQL_OPCODE_RESULT) { |
| LOG_DEBUG("Failed to send %s event message: Invalid response [%s]", event_type_.c_str(), |
| opcode_to_string(response->opcode()).c_str()); |
| } |
| } |
| |
| virtual void on_internal_error(CassError code, const String& message) { |
| LOG_DEBUG("Failed to send %s event message: %s", event_type_.c_str(), message.c_str()); |
| } |
| |
| virtual void on_internal_timeout() { |
| LOG_DEBUG("Failed to send %s event message: Timed out waiting for response", |
| event_type_.c_str()); |
| } |
| |
| private: |
| String event_type_; |
| }; |
| |
| void metadata(ClientInsights::Writer& writer, const String& name) { |
| writer.Key("metadata"); |
| writer.StartObject(); |
| |
| writer.Key("name"); |
| writer.String(name.c_str()); |
| writer.Key("insightMappingId"); |
| writer.String(METADATA_INSIGHTS_MAPPING_ID); |
| writer.Key("insightType"); |
| writer.String("EVENT"); // TODO: Make this an enumeration in the future |
| writer.Key("timestamp"); |
| writer.Uint64(get_time_since_epoch_ms()); |
| writer.Key("tags"); |
| writer.StartObject(); |
| writer.Key("language"); |
| writer.String(METADATA_LANGUAGE); |
| writer.EndObject(); // tags |
| |
| writer.EndObject(); |
| } |
| |
| class StartupMessageHandler : public RefCounted<StartupMessageHandler> { |
| public: |
| typedef SharedRefPtr<StartupMessageHandler> Ptr; |
| |
| StartupMessageHandler(const Connection::Ptr& connection, const String& client_id, |
| const String& session_id, const Config& config, const HostMap& hosts, |
| const LoadBalancingPolicy::Vec& initialized_policies) |
| : connection_(connection) |
| , client_id_(client_id) |
| , session_id_(session_id) |
| , config_(config) |
| , hosts_(hosts) |
| , initialized_policies_(initialized_policies) {} |
| |
| ~StartupMessageHandler() { |
| ClientInsights::StringBuffer buffer; |
| ClientInsights::Writer writer(buffer); |
| |
| writer.StartObject(); |
| metadata(writer, METADATA_STARTUP_NAME); |
| startup_message_data(writer); |
| writer.EndObject(); |
| |
| assert(writer.IsComplete() && "Startup JSON is incomplete"); |
| connection_->write_and_flush(RequestCallback::Ptr( |
| new ClientInsightsRequestCallback(buffer.GetString(), METADATA_STARTUP_NAME))); |
| } |
| |
| void send_message() { resolve_contact_points(); } |
| |
| private: |
| // Startup message associated methods |
| void startup_message_data(ClientInsights::Writer& writer) { |
| writer.Key("data"); |
| writer.StartObject(); |
| |
| writer.Key("clientId"); |
| writer.String(client_id_.c_str()); |
| writer.Key("sessionId"); |
| writer.String(session_id_.c_str()); |
| bool is_application_name_generated = false; |
| writer.Key("applicationName"); |
| if (!config_.application_name().empty()) { |
| writer.String(config_.application_name().c_str()); |
| } else { |
| writer.String(driver_name()); |
| is_application_name_generated = true; |
| } |
| writer.Key("applicationNameWasGenerated"); |
| writer.Bool(is_application_name_generated); |
| if (!config_.application_version().empty()) { |
| writer.Key("applicationVersion"); |
| writer.String(config_.application_version().c_str()); |
| } |
| writer.Key("driverName"); |
| writer.String(driver_name()); |
| writer.Key("driverVersion"); |
| writer.String(driver_version()); |
| contact_points(writer); |
| data_centers(writer); |
| writer.Key("initialControlConnection"); |
| writer.String(connection_->resolved_address().to_string(true).c_str()); |
| writer.Key("protocolVersion"); |
| writer.Int(connection_->protocol_version().value()); |
| writer.Key("localAddress"); |
| writer.String(get_local_address(connection_->handle()).c_str()); |
| writer.Key("hostName"); |
| writer.String(get_hostname().c_str()); |
| execution_profiles(writer); |
| pool_size_by_host_distance(writer); |
| writer.Key("heartbeatInterval"); |
| writer.Uint64(config_.connection_heartbeat_interval_secs() * 1000); // in milliseconds |
| writer.Key("compression"); |
| writer.String("NONE"); // TODO: Update once compression is added |
| reconnection_policy(writer); |
| ssl(writer); |
| auth_provider(writer); |
| other_options(writer); |
| platform_info(writer); |
| config_anti_patterns(writer); |
| writer.Key("periodicStatusInterval"); |
| writer.Uint(config_.monitor_reporting_interval_secs()); |
| |
| writer.EndObject(); |
| } |
| |
| void contact_points(ClientInsights::Writer& writer) { |
| writer.Key("contactPoints"); |
| writer.StartObject(); |
| |
| for (ResolvedHostMap::const_iterator map_it = contact_points_resolved_.begin(), |
| map_end = contact_points_resolved_.end(); |
| map_it != map_end; ++map_it) { |
| writer.Key(map_it->first.c_str()); |
| writer.StartArray(); |
| for (AddressSet::const_iterator vec_it = map_it->second.begin(), |
| vec_end = map_it->second.end(); |
| vec_it != vec_end; ++vec_it) { |
| writer.String(vec_it->to_string(true).c_str()); |
| } |
| writer.EndArray(); |
| } |
| |
| writer.EndObject(); |
| } |
| |
| void data_centers(ClientInsights::Writer& writer) { |
| writer.Key("dataCenters"); |
| writer.StartArray(); |
| |
| Set<String> data_centers; |
| for (HostMap::const_iterator it = hosts_.begin(), end = hosts_.end(); it != end; ++it) { |
| const String& data_center = it->second->dc(); |
| if (data_centers.insert(data_center).second) { |
| writer.String(data_center.c_str()); |
| } |
| } |
| |
| writer.EndArray(); |
| } |
| |
| void execution_profiles(ClientInsights::Writer& writer) { |
| writer.Key("executionProfiles"); |
| writer.StartObject(); |
| |
| const ExecutionProfile& default_profile = config_.default_profile(); |
| const ExecutionProfile::Map& profiles = config_.profiles(); |
| writer.Key("default"); |
| execution_profile_as_json(writer, default_profile); |
| for (ExecutionProfile::Map::const_iterator it = profiles.begin(), end = profiles.end(); |
| it != end; ++it) { |
| writer.Key(it->first.c_str()); |
| execution_profile_as_json(writer, it->second, &default_profile); |
| } |
| |
| writer.EndObject(); |
| } |
| |
| void pool_size_by_host_distance(ClientInsights::Writer& writer) { |
| writer.Key("poolSizeByHostDistance"); |
| writer.StartObject(); |
| |
| writer.Key("local"); |
| writer.Uint(config_.core_connections_per_host() * hosts_.size()); |
| // NOTE: Remote does not pertain to DataStax C/C++ driver pool |
| writer.Key("remote"); |
| writer.Uint(0); |
| |
| writer.EndObject(); |
| } |
| |
| void reconnection_policy(ClientInsights::Writer& writer) { |
| writer.Key("reconnectionPolicy"); |
| writer.StartObject(); |
| |
| ReconnectionPolicy::Ptr reconnection_policy = config_.reconnection_policy(); |
| writer.Key("type"); |
| if (reconnection_policy->type() == ReconnectionPolicy::CONSTANT) { |
| writer.String("ConstantReconnectionPolicy"); |
| } else if (reconnection_policy->type() == ReconnectionPolicy::EXPONENTIAL) { |
| writer.String("ExponentialReconnectionPolicy"); |
| } else { |
| assert(false && "Reconnection policy needs to be added"); |
| writer.String("UnknownReconnectionPolicy"); |
| } |
| writer.Key("options"); |
| writer.StartObject(); |
| if (reconnection_policy->type() == ReconnectionPolicy::CONSTANT) { |
| ConstantReconnectionPolicy::Ptr crp = |
| static_cast<ConstantReconnectionPolicy::Ptr>(reconnection_policy); |
| writer.Key("delayMs"); |
| writer.Uint(crp->delay_ms()); |
| } else if (reconnection_policy->type() == ReconnectionPolicy::EXPONENTIAL) { |
| ExponentialReconnectionPolicy::Ptr erp = |
| static_cast<ExponentialReconnectionPolicy::Ptr>(reconnection_policy); |
| writer.Key("baseDelayMs"); |
| writer.Uint(erp->base_delay_ms()); |
| writer.Key("maxDelayMs"); |
| writer.Uint(erp->max_delay_ms()); |
| } |
| writer.EndObject(); // options |
| |
| writer.EndObject(); |
| } |
| |
| void ssl(ClientInsights::Writer& writer) { |
| writer.Key("ssl"); |
| writer.StartObject(); |
| |
| const SslContext::Ptr& ssl_context = config_.ssl_context(); |
| writer.Key("enabled"); |
| if (ssl_context) { |
| writer.Bool(true); |
| } else { |
| writer.Bool(false); |
| } |
| writer.Key("certValidation"); |
| if (ssl_context) { |
| writer.Bool(ssl_context->is_cert_validation_enabled()); |
| } else { |
| writer.Bool(false); |
| } |
| |
| writer.EndObject(); |
| } |
| |
| void auth_provider(ClientInsights::Writer& writer) { |
| const AuthProvider::Ptr& auth_provider = config_.auth_provider(); |
| if (auth_provider) { |
| writer.Key("authProvider"); |
| writer.StartObject(); |
| |
| writer.Key("type"); |
| writer.String(auth_provider->name().c_str()); |
| |
| writer.EndObject(); |
| } |
| } |
| |
| void other_options(ClientInsights::Writer& writer) { |
| writer.Key("otherOptions"); |
| writer.StartObject(); |
| |
| writer.Key("configuration"); |
| writer.StartObject(); |
| writer.Key("protocolVersion"); |
| writer.Uint(config_.protocol_version().value()); |
| writer.Key("useBetaProtocol"); |
| writer.Bool(config_.use_beta_protocol_version()); |
| writer.Key("threadCountIo"); |
| writer.Uint(config_.thread_count_io()); |
| writer.Key("queueSizeIo"); |
| writer.Uint(config_.queue_size_io()); |
| writer.Key("coreConnectionsPerHost"); |
| writer.Uint(config_.core_connections_per_host()); |
| writer.Key("connectTimeoutMs"); |
| writer.Uint(config_.connect_timeout_ms()); |
| writer.Key("resolveTimeoutMs"); |
| writer.Uint(config_.resolve_timeout_ms()); |
| writer.Key("maxSchemaWaitTimeMs"); |
| writer.Uint(config_.max_schema_wait_time_ms()); |
| writer.Key("maxTracingWaitTimeMs"); |
| writer.Uint(config_.max_tracing_wait_time_ms()); |
| writer.Key("tracingConsistency"); |
| writer.String(cass_consistency_string(config_.tracing_consistency())); |
| writer.Key("coalesceDelayUs"); |
| writer.Uint64(config_.coalesce_delay_us()); |
| writer.Key("newRequestRatio"); |
| writer.Uint(config_.new_request_ratio()); |
| writer.Key("logLevel"); |
| writer.String(cass_log_level_string(config_.log_level())); |
| writer.Key("tcpNodelayEnable"); |
| writer.Bool(config_.tcp_nodelay_enable()); |
| writer.Key("tcpKeepaliveEnable"); |
| writer.Bool(config_.tcp_keepalive_enable()); |
| writer.Key("tcpKeepaliveDelaySecs"); |
| writer.Uint(config_.tcp_keepalive_delay_secs()); |
| writer.Key("connectionIdleTimeoutSecs"); |
| writer.Uint(config_.connection_idle_timeout_secs()); |
| writer.Key("useSchema"); |
| writer.Bool(config_.use_schema()); |
| writer.Key("useHostnameResolution"); |
| writer.Bool(config_.use_hostname_resolution()); |
| writer.Key("useRandomizedContactPoints"); |
| writer.Bool(config_.use_randomized_contact_points()); |
| writer.Key("maxReusableWriteObjects"); |
| writer.Uint(config_.max_reusable_write_objects()); |
| writer.Key("prepareOnAllHosts"); |
| writer.Bool(config_.prepare_on_all_hosts()); |
| writer.Key("prepareOnUpOrAddHost"); |
| writer.Bool(config_.prepare_on_up_or_add_host()); |
| writer.Key("noCompact"); |
| writer.Bool(config_.no_compact()); |
| writer.Key("cloudSecureConnectBundleLoaded"); |
| writer.Bool(config_.cloud_secure_connection_config().is_loaded()); |
| writer.Key("clusterMetadataResolver"); |
| writer.String(config_.cluster_metadata_resolver_factory()->name()); |
| writer.EndObject(); // configuration |
| |
| writer.EndObject(); // otherOptions |
| } |
| |
| void platform_info(ClientInsights::Writer& writer) { |
| writer.Key("platformInfo"); |
| writer.StartObject(); |
| |
| writer.Key("os"); |
| writer.StartObject(); |
| Os os = get_os(); |
| writer.Key("name"); |
| writer.String(os.name.c_str()); |
| writer.Key("version"); |
| writer.String(os.version.c_str()); |
| writer.Key("arch"); |
| writer.String(os.arch.c_str()); |
| writer.EndObject(); // os |
| |
| writer.Key("cpus"); |
| writer.StartObject(); |
| Cpus cpus = get_cpus(); |
| writer.Key("length"); |
| writer.Int(cpus.length); |
| writer.Key("model"); |
| writer.String(cpus.model.c_str()); |
| writer.EndObject(); // cpus |
| |
| writer.Key("runtime"); |
| writer.StartObject(); |
| #if defined(__clang__) || defined(__APPLE_CC__) |
| writer.Key("Clang/LLVM"); |
| writer.String(STRINGIFY(__clang_major__) "." STRINGIFY(__clang_minor__) "." STRINGIFY( |
| __clang_patchlevel__)); |
| #elif defined(__INTEL_COMPILER) |
| writer.Key("Intel ICC/ICPC"); |
| writer.String(STRINGIFY(__INTEL_COMPILER)); |
| #elif defined(__GNUC__) || defined(__GNUG__) |
| writer.Key("GNU GCC/G++"); |
| writer.String( |
| STRINGIFY(__GNUC__) "." STRINGIFY(__GNUC_MINOR__) "." STRINGIFY(__GNUC_PATCHLEVEL__)); |
| #elif defined(__HP_aCC) |
| writer.Key("Hewlett-Packard C/aC++"); |
| writer.String(STRINGIFY(__HP_aCC)); |
| #elif defined(__IBMCPP__) |
| writer.Key("IBM XL C/C++"); |
| writer.String(STRINGIFY(__xlc__)); |
| #elif defined(_MSC_VER) |
| writer.Key("Microsoft Visual Studio"); |
| writer.String(STRINGIFY(_MSC_FULL_VER)); |
| #elif defined(__PGI) |
| writer.Key("Portland Group PGCC/PGCPP"); |
| writer.String( |
| STRINGIFY(__PGIC__) "." STRINGIFY(__PGIC_MINOR__) "." STRINGIFY(__PGIC_PATCHLEVEL__)); |
| #elif defined(__SUNPRO_CC) |
| writer.Key("Oracle Solaris Studio"); |
| writer.String(STRINGIFY(__SUNPRO_CC)); |
| #else |
| writer.Key("Unknown"); |
| writer.String("Unknown"); |
| #endif |
| writer.Key("uv"); |
| writer.String(STRINGIFY(UV_VERSION_MAJOR) "." STRINGIFY(UV_VERSION_MINOR) "." STRINGIFY( |
| UV_VERSION_PATCH)); |
| writer.Key("openssl"); |
| #ifdef OPENSSL_VERSION_TEXT |
| writer.String(OPENSSL_VERSION_TEXT); |
| #else |
| #ifdef LIBRESSL_VERSION_NUMBER |
| writer.String("LibreSSL " STRINGIFY(LIBRESSL_VERSION_NUMBER)); |
| #else |
| writer.String("OpenSSL " STRINGIFY(OPENSSL_VERSION_NUMBER)); |
| #endif |
| #endif |
| writer.EndObject(); // runtime |
| |
| writer.EndObject(); // platformInfo |
| } |
| |
| void config_anti_patterns(ClientInsights::Writer& writer) { |
| StringPairVec config_anti_patterns = get_config_anti_patterns( |
| config_.default_profile(), config_.profiles(), initialized_policies_, hosts_, |
| config_.ssl_context(), config_.auth_provider()); |
| if (!config_anti_patterns.empty()) { |
| writer.Key("configAntiPatterns"); |
| writer.StartObject(); |
| |
| for (StringPairVec::const_iterator it = config_anti_patterns.begin(), |
| end = config_anti_patterns.end(); |
| it != end; ++it) { |
| writer.Key(it->first.c_str()); |
| writer.String(it->second.c_str()); |
| } |
| |
| writer.EndObject(); |
| } |
| } |
| |
| private: |
| // Startup message helper methods |
| void resolve_contact_points() { |
| const AddressVec& contact_points = config_.contact_points(); |
| const int port = config_.port(); |
| MultiResolver::Ptr resolver; |
| |
| for (AddressVec::const_iterator it = contact_points.begin(), end = contact_points.end(); |
| it != end; ++it) { |
| const Address& contact_point = *it; |
| // Attempt to parse the contact point string. If it's an IP address |
| // then immediately add it to our resolved contact points, otherwise |
| // attempt to resolve the string as a hostname. |
| if (contact_point.is_resolved()) { |
| AddressSet addresses; |
| addresses.insert(contact_point); |
| contact_points_resolved_[contact_point.hostname_or_address()] = addresses; |
| } else { |
| if (!resolver) { |
| inc_ref(); |
| resolver.reset( |
| new MultiResolver(bind_callback(&StartupMessageHandler::on_resolve, this))); |
| } |
| resolver->resolve(connection_->loop(), contact_point.hostname_or_address(), port, |
| config_.resolve_timeout_ms()); |
| } |
| } |
| |
| // NOTE: If no resolution is performed the startup message will be sent in |
| // the destructor |
| } |
| |
| void on_resolve(MultiResolver* resolver) { |
| const Resolver::Vec& resolvers = resolver->resolvers(); |
| for (Resolver::Vec::const_iterator it = resolvers.begin(), end = resolvers.end(); it != end; |
| ++it) { |
| const Resolver::Ptr resolver(*it); |
| AddressSet addresses; |
| if (resolver->is_success()) { |
| if (!resolver->addresses().empty()) { |
| for (AddressVec::const_iterator it = resolver->addresses().begin(), |
| end = resolver->addresses().end(); |
| it != end; ++it) { |
| addresses.insert(*it); |
| } |
| } |
| } |
| contact_points_resolved_[resolver->hostname()] = addresses; // Empty resolved addresses are OK |
| } |
| |
| dec_ref(); // Send startup message in destructor |
| } |
| |
| String get_local_address(const uv_tcp_t* tcp) const { |
| Address::SocketStorage name; |
| int namelen = sizeof(name); |
| if (uv_tcp_getsockname(tcp, name.addr(), &namelen) == 0) { |
| Address address(name.addr()); |
| if (address.is_valid_and_resolved()) { |
| return address.to_string(); |
| } |
| } |
| return "unknown"; |
| } |
| |
| void execution_profile_as_json(ClientInsights::Writer& writer, const ExecutionProfile& profile, |
| const ExecutionProfile* default_profile = NULL) { |
| writer.StartObject(); |
| |
| if (!default_profile || (default_profile && profile.request_timeout_ms() != |
| default_profile->request_timeout_ms())) { |
| writer.Key("requestTimeoutMs"); |
| writer.Uint64(profile.request_timeout_ms()); |
| } |
| if (!default_profile || |
| (default_profile && profile.consistency() != default_profile->consistency())) { |
| writer.Key("consistency"); |
| writer.String(cass_consistency_string(profile.consistency())); |
| } |
| if (!default_profile || (default_profile && profile.serial_consistency() != |
| default_profile->serial_consistency())) { |
| writer.Key("serialConsistency"); |
| writer.String(cass_consistency_string(profile.serial_consistency())); |
| } |
| if (!default_profile || |
| (default_profile && profile.retry_policy() != default_profile->retry_policy())) { |
| const RetryPolicy::Ptr& retry_policy = profile.retry_policy(); |
| if (retry_policy) { |
| writer.Key("retryPolicy"); |
| if (retry_policy->type() == RetryPolicy::DEFAULT) { |
| writer.String("DefaultRetryPolicy"); |
| } else if (retry_policy->type() == RetryPolicy::DOWNGRADING) { |
| writer.String("DowngradingConsistencyRetryPolicy"); |
| } else if (retry_policy->type() == RetryPolicy::FALLTHROUGH) { |
| writer.String("FallthroughRetryPolicy"); |
| } else if (retry_policy->type() == RetryPolicy::LOGGING) { |
| writer.String("LoggingRetryPolicy"); |
| } else { |
| LOG_DEBUG("Invalid retry policy: %d", retry_policy->type()); |
| writer.String("unknown"); |
| } |
| } |
| } |
| |
| if (profile.load_balancing_policy()) { |
| writer.Key("loadBalancing"); |
| writer.StartObject(); |
| writer.Key("type"); |
| LoadBalancingPolicy* current_lbp = profile.load_balancing_policy().get(); |
| do { |
| // NOTE: DCAware and RoundRobin are leaf policies (e.g. not chainable) |
| if (dynamic_cast<DCAwarePolicy*>(current_lbp)) { |
| writer.String("DCAwarePolicy"); |
| break; |
| } else if (dynamic_cast<RoundRobinPolicy*>(current_lbp)) { |
| writer.String("RoundRobinPolicy"); |
| break; |
| } |
| |
| if (ChainedLoadBalancingPolicy* chained_lbp = |
| dynamic_cast<ChainedLoadBalancingPolicy*>(current_lbp)) { |
| current_lbp = chained_lbp->child_policy().get(); |
| } else { |
| current_lbp = NULL; |
| } |
| } while (current_lbp); |
| writer.Key("options"); |
| writer.StartObject(); |
| if (DCAwarePolicy* dc_lbp = dynamic_cast<DCAwarePolicy*>(current_lbp)) { |
| writer.Key("localDc"); |
| if (dc_lbp->local_dc().empty()) { |
| writer.Null(); |
| } else { |
| writer.String(dc_lbp->local_dc().c_str()); |
| } |
| writer.Key("usedHostsPerRemoteDc"); |
| writer.Uint64(dc_lbp->used_hosts_per_remote_dc()); |
| writer.Key("allowRemoteDcsForLocalCl"); |
| writer.Bool(!dc_lbp->skip_remote_dcs_for_local_cl()); |
| } |
| if (!profile.blacklist().empty()) { |
| writer.Key("blacklist"); |
| writer.String(implode(profile.blacklist()).c_str()); |
| } |
| if (!profile.blacklist_dc().empty()) { |
| writer.Key("blacklistDc"); |
| writer.String(implode(profile.blacklist_dc()).c_str()); |
| } |
| if (!profile.whitelist().empty()) { |
| writer.Key("whitelist"); |
| writer.String(implode(profile.whitelist()).c_str()); |
| } |
| if (!profile.whitelist_dc().empty()) { |
| writer.Key("whitelistDc"); |
| writer.String(implode(profile.whitelist_dc()).c_str()); |
| } |
| if (profile.token_aware_routing()) { |
| writer.Key("tokenAwareRouting"); |
| writer.StartObject(); |
| writer.Key("shuffleReplicas"); |
| writer.Bool(profile.token_aware_routing_shuffle_replicas()); |
| writer.EndObject(); // tokenAwareRouting |
| } |
| if (profile.latency_aware()) { |
| writer.Key("latencyAwareRouting"); |
| writer.StartObject(); |
| writer.Key("exclusionThreshold"); |
| writer.Double(profile.latency_aware_routing_settings().exclusion_threshold); |
| writer.Key("scaleNs"); |
| writer.Uint64(profile.latency_aware_routing_settings().scale_ns); |
| writer.Key("retryPeriodNs"); |
| writer.Uint64(profile.latency_aware_routing_settings().retry_period_ns); |
| writer.Key("updateRateMs"); |
| writer.Uint64(profile.latency_aware_routing_settings().update_rate_ms); |
| writer.Key("minMeasured"); |
| writer.Uint64(profile.latency_aware_routing_settings().min_measured); |
| writer.EndObject(); // latencyAwareRouting |
| } |
| writer.EndObject(); // options |
| |
| writer.EndObject(); // loadBalancingPolicy |
| } |
| |
| typedef ConstantSpeculativeExecutionPolicy CSEP; |
| CSEP* default_csep = |
| default_profile ? dynamic_cast<CSEP*>(default_profile->speculative_execution_policy().get()) |
| : NULL; |
| CSEP* csep = dynamic_cast<CSEP*>(profile.speculative_execution_policy().get()); |
| if (csep) { |
| if (!default_csep || |
| (default_csep->constant_delay_ms_ != csep->constant_delay_ms_ && |
| default_csep->max_speculative_executions_ != csep->max_speculative_executions_)) { |
| writer.Key("speculativeExecutionPolicy"); |
| writer.StartObject(); |
| writer.Key("type"); |
| writer.String("ConstantSpeculativeExecutionPolicy"); |
| |
| writer.Key("options"); |
| writer.StartObject(); |
| writer.Key("constantDelayMs"); |
| writer.Uint64(csep->constant_delay_ms_); |
| writer.Key("maxSpeculativeExecutions"); |
| writer.Int(csep->max_speculative_executions_); |
| writer.EndObject(); // options |
| |
| writer.EndObject(); // speculativeExecutionPolicy |
| } |
| } |
| |
| writer.EndObject(); // executionProfile |
| } |
| |
| typedef std::pair<String, String> StringPair; |
| typedef Vector<StringPair> StringPairVec; |
| StringPairVec get_config_anti_patterns(const ExecutionProfile& default_profile, |
| const ExecutionProfile::Map& profiles, |
| const LoadBalancingPolicy::Vec& policies, |
| const HostMap& hosts, const SslContext::Ptr& ssl_context, |
| const AuthProvider::Ptr& auth_provider) { |
| StringPairVec config_anti_patterns; |
| |
| if (is_contact_points_multiple_dcs(policies, hosts)) { |
| config_anti_patterns.push_back( |
| StringPair("contactPointsMultipleDCs", CONFIG_ANTIPATTERN_MSG_MULTI_DC_HOSTS)); |
| LOG_WARN("Configuration anti-pattern detected: %s", CONFIG_ANTIPATTERN_MSG_MULTI_DC_HOSTS); |
| } |
| |
| for (LoadBalancingPolicy::Vec::const_iterator it = policies.begin(), end = policies.end(); |
| it != end; ++it) { |
| DCAwarePolicy* dc_lbp = get_dc_aware_policy(*it); |
| if (dc_lbp && !dc_lbp->skip_remote_dcs_for_local_cl()) { |
| config_anti_patterns.push_back( |
| StringPair("useRemoteHosts", CONFIG_ANTIPATTERN_MSG_REMOTE_HOSTS)); |
| LOG_WARN("Configuration anti-pattern detected: %s", CONFIG_ANTIPATTERN_MSG_REMOTE_HOSTS); |
| break; |
| } |
| } |
| |
| bool is_downgrading_consistency_enabled = |
| is_downgrading_retry_anti_pattern(default_profile.retry_policy()); |
| if (!is_downgrading_consistency_enabled) { |
| for (ExecutionProfile::Map::const_iterator it = profiles.begin(), end = profiles.end(); |
| it != end; ++it) { |
| if (is_downgrading_retry_anti_pattern(it->second.retry_policy())) { |
| is_downgrading_consistency_enabled = true; |
| break; |
| } |
| } |
| } |
| if (is_downgrading_consistency_enabled) { |
| config_anti_patterns.push_back( |
| StringPair("downgradingConsistency", CONFIG_ANTIPATTERN_MSG_DOWNGRADING)); |
| LOG_WARN("Configuration anti-pattern detected: %s", CONFIG_ANTIPATTERN_MSG_DOWNGRADING); |
| } |
| |
| if (ssl_context && !ssl_context->is_cert_validation_enabled()) { |
| config_anti_patterns.push_back( |
| StringPair("sslWithoutCertValidation", CONFIG_ANTIPATTERN_MSG_CERT_VALIDATION)); |
| LOG_WARN("Configuration anti-pattern detected: %s", CONFIG_ANTIPATTERN_MSG_CERT_VALIDATION); |
| } |
| |
| if (auth_provider && auth_provider->name().find("PlainTextAuthProvider") != String::npos && |
| !ssl_context) { |
| config_anti_patterns.push_back( |
| StringPair("plainTextAuthWithoutSsl", CONFIG_ANTIPATTERN_MSG_PLAINTEXT_NO_SSL)); |
| LOG_WARN("Configuration anti-pattern detected: %s", CONFIG_ANTIPATTERN_MSG_PLAINTEXT_NO_SSL); |
| } |
| |
| return config_anti_patterns; |
| } |
| |
| DCAwarePolicy* get_dc_aware_policy(const LoadBalancingPolicy::Ptr& policy) { |
| LoadBalancingPolicy* current_lbp = policy.get(); |
| do { |
| if (DCAwarePolicy* dc_lbp = dynamic_cast<DCAwarePolicy*>(current_lbp)) { |
| return dc_lbp; |
| } |
| if (ChainedLoadBalancingPolicy* chained_lbp = |
| dynamic_cast<ChainedLoadBalancingPolicy*>(current_lbp)) { |
| current_lbp = chained_lbp->child_policy().get(); |
| } else { |
| break; |
| } |
| } while (current_lbp); |
| |
| return NULL; |
| } |
| |
| bool is_contact_points_multiple_dcs(const LoadBalancingPolicy::Vec& policies, |
| const HostMap& hosts) { |
| // Get the DC aware load balancing policy if it is the only policy that |
| // exists. If found this policy will be used after the contact points have |
| // been resolved in order to determine if there are contacts points that exist |
| // in multiple DCs using a copy of the discovered hosts. |
| if (policies.size() == 1) { |
| DCAwarePolicy* policy = get_dc_aware_policy(policies[0]); |
| if (policy) { |
| // Loop through the resolved contacts, find the correct initialized host |
| // and if the contact point is a remote host identify as an anti-pattern |
| for (ResolvedHostMap::const_iterator resolved_it = contact_points_resolved_.begin(), |
| hosts_end = contact_points_resolved_.end(); |
| resolved_it != hosts_end; ++resolved_it) { |
| const AddressSet& addresses = resolved_it->second; |
| for (AddressSet::const_iterator addresses_it = addresses.begin(), |
| addresses_end = addresses.end(); |
| addresses_it != addresses_end; ++addresses_it) { |
| const Address& address = *addresses_it; |
| for (HostMap::const_iterator hosts_it = hosts.begin(), hosts_end = hosts.end(); |
| hosts_it != hosts_end; ++hosts_it) { |
| const Host::Ptr& host = hosts_it->second; |
| if (host->address() == address && |
| policy->distance(host) == CASS_HOST_DISTANCE_REMOTE) { |
| return true; |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| return false; |
| } |
| |
| bool is_downgrading_retry_anti_pattern(const RetryPolicy::Ptr& policy) { |
| if (policy && policy->type() == RetryPolicy::DOWNGRADING) { |
| return true; |
| } |
| return false; |
| } |
| |
| private: |
| const Connection::Ptr connection_; |
| const String client_id_; |
| const String session_id_; |
| const Config config_; |
| const HostMap hosts_; |
| const LoadBalancingPolicy::Vec initialized_policies_; |
| |
| private: |
| typedef Map<String, AddressSet> ResolvedHostMap; |
| ResolvedHostMap contact_points_resolved_; |
| }; |
| |
| ClientInsights::ClientInsights(const String& client_id, const String& session_id, |
| unsigned interval_secs) |
| : client_id_(client_id) |
| , session_id_(session_id) |
| , interval_ms_(interval_secs * 1000) {} |
| |
| uint64_t ClientInsights::interval_ms(const VersionNumber& dse_server_version) const { |
| // DSE v5.1.13+ (backported) |
| // DSE v6.0.5+ (backported) |
| // DSE v6.7.0 was the first to supported the Insights RPC call |
| if ((dse_server_version >= VersionNumber(5, 1, 13) && |
| dse_server_version < VersionNumber(6, 0, 0)) || |
| dse_server_version >= VersionNumber(6, 0, 5)) { |
| return interval_ms_; |
| } |
| return 0; |
| } |
| |
| void ClientInsights::send_startup_message(const Connection::Ptr& connection, const Config& config, |
| const HostMap& hosts, |
| const LoadBalancingPolicy::Vec& initialized_policies) { |
| StartupMessageHandler::Ptr handler = StartupMessageHandler::Ptr(new StartupMessageHandler( |
| connection, client_id_, session_id_, config, hosts, initialized_policies)); |
| handler->send_message(); |
| } |
| |
| void ClientInsights::send_status_message(const Connection::Ptr& connection, const HostMap& hosts) { |
| StringBuffer buffer; |
| Writer writer(buffer); |
| |
| writer.StartObject(); |
| metadata(writer, METADATA_STATUS_NAME); |
| |
| writer.Key("data"); |
| writer.StartObject(); |
| |
| writer.Key("clientId"); |
| writer.String(client_id_.c_str()); |
| writer.Key("sessionId"); |
| writer.String(session_id_.c_str()); |
| writer.Key("controlConnection"); |
| writer.String(connection->resolved_address().to_string(true).c_str()); |
| |
| writer.Key("conntectedNodes"); |
| writer.StartObject(); |
| for (HostMap::const_iterator it = hosts.begin(), end = hosts.end(); it != end; ++it) { |
| String address_with_port = it->first.to_string(true); |
| const Host::Ptr& host = it->second; |
| writer.Key(address_with_port.c_str()); |
| writer.StartObject(); |
| writer.Key("connections"); |
| writer.Int(host->connection_count()); |
| writer.Key("inFlightQueries"); |
| writer.Int(host->inflight_request_count()); |
| writer.EndObject(); // address_with_port |
| } |
| writer.EndObject(); // connectedNodes |
| |
| writer.EndObject(); // data |
| writer.EndObject(); |
| |
| assert(writer.IsComplete() && "Status JSON is incomplete"); |
| connection->write_and_flush(RequestCallback::Ptr( |
| new ClientInsightsRequestCallback(buffer.GetString(), METADATA_STATUS_NAME))); |
| } |
| |
| }}} // namespace datastax::internal::enterprise |