blob: 8b09dbec97e1bd6c7887ba4f32c6b7ac463cfd9a [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef DATASTAX_INTERNAL_REQUEST_PROCESSOR_HPP
#define DATASTAX_INTERNAL_REQUEST_PROCESSOR_HPP
#include "atomic.hpp"
#include "config.hpp"
#include "connection_pool_manager.hpp"
#include "event_loop.hpp"
#include "histogram_wrapper.hpp"
#include "host.hpp"
#include "loop_watcher.hpp"
#include "micro_timer.hpp"
#include "mpmc_queue.hpp"
#include "prepare_host_handler.hpp"
#include "random.hpp"
#include "schema_agreement_handler.hpp"
#include "scoped_ptr.hpp"
#include "timer.hpp"
#include "token_map.hpp"
namespace datastax { namespace internal { namespace core {
class ConnectionPoolManagerInitializer;
class RequestProcessor;
class RequestProcessorManager;
class Session;
/**
* A wrapper around a keyspace change response that makes sure the final
* processing for the request happens on the original event loop. This
* needs to be reference counted so that the last processing thread triggers
* setting the response on the request's future.
*/
class KeyspaceChangedHandler : public RefCounted<KeyspaceChangedHandler> {
public:
typedef SharedRefPtr<KeyspaceChangedHandler> Ptr;
KeyspaceChangedHandler(EventLoop* event_loop, const KeyspaceChangedResponse& response)
: event_loop_(event_loop)
, response_(response) {}
~KeyspaceChangedHandler() { event_loop_->add(new Task(response_)); }
private:
/**
* An internal task that keeps the original keyspace change handler
* alive so that processing happens on the original event loop.
*/
class Task : public core::Task {
public:
Task(const KeyspaceChangedResponse& response)
: response_(response) {}
virtual void run(EventLoop* event_loop) { response_.set_response(); }
private:
KeyspaceChangedResponse response_;
};
private:
EventLoop* const event_loop_;
KeyspaceChangedResponse response_;
};
class KeyspaceChangedListener {
public:
virtual ~KeyspaceChangedListener() {}
virtual void on_keyspace_changed(const String& keyspace,
const KeyspaceChangedHandler::Ptr& handler) = 0;
};
class RequestProcessorListener
: public ConnectionPoolStateListener
, public PreparedMetadataListener
, public KeyspaceChangedListener {
public:
/**
* A callback that's called when the processor connects.
*
* Note: This is mostly for testing.
*
* @param processor The processor object.
*/
virtual void on_connect(RequestProcessor* processor) {}
/**
* A callback that's called when the processor has closed.
*
* @param processor The processor object.
*/
virtual void on_close(RequestProcessor* processor) = 0;
};
struct RequestProcessorSettings {
RequestProcessorSettings();
RequestProcessorSettings(const Config& config);
ConnectionPoolSettings connection_pool_settings;
unsigned max_schema_wait_time_ms;
bool prepare_on_all_hosts;
TimestampGenerator::Ptr timestamp_generator;
ExecutionProfile default_profile;
ExecutionProfile::Map profiles;
unsigned request_queue_size;
uint64_t coalesce_delay_us;
int new_request_ratio;
uint64_t max_tracing_wait_time_ms;
uint64_t retry_tracing_wait_time_ms;
CassConsistency tracing_consistency;
AddressFactory::Ptr address_factory;
};
/**
* Request processor for processing client session request(s). This processor
* will fetch a request from the queue and process them accordingly by applying
* the load balancing policy, executing and routing the request to the
* appropriate node and perform the callback to the client.
*/
class RequestProcessor
: public RefCounted<RequestProcessor>
, public ConnectionPoolManagerListener
, public RequestListener
, public SchemaAgreementListener {
public:
typedef SharedRefPtr<RequestProcessor> Ptr;
typedef Vector<Ptr> Vec;
/**
* Create the request processor; Don't use directly use the request processor
* manager initializer
*
* @param listener A listener that handles the events for the processor.
* @param event_loop The event loop the request process is running on.
* @param connection_pool_manager A manager of connection pools for handling requests.
* @param connected_host The currently connected control connection host.
* @param hosts A mapping of the currently available hosts.
* @param token_map The current token map.
* @param settings The current settings for the request processor.
* @param random A RNG for randomizing hosts in the load balancing policies.
* @param local_dc The local datacenter for initializing the load balancing policies.
*/
RequestProcessor(RequestProcessorListener* listener, EventLoop* event_loop,
const ConnectionPoolManager::Ptr& connection_pool_manager,
const Host::Ptr& connected_host, const HostMap& hosts,
const TokenMap::Ptr& token_map, const RequestProcessorSettings& settings,
Random* random, const String& local_dc);
/**
* Close/Terminate the request request processor (thread-safe).
*/
void close();
/**
* Set the listener that will handle events for the processor
* (*NOT* thread-safe).
*
* @param listener The processor listener.
*/
void set_listener(RequestProcessorListener* listener = NULL);
/**
* Set the current keyspace being used for requests
* (thread-safe, asynchronous).
*
* @param keyspace New current keyspace to utilize
* @param handler A keyspace handler to trigger a response to a "USE" query.
*/
void set_keyspace(const String& keyspace, const KeyspaceChangedHandler::Ptr& handler);
/**
* Notify that a host has been added to the cluster
* (thread-safe, asynchronous).
*
* @param host The host that's been added.
*/
void notify_host_added(const Host::Ptr& host);
/**
* Notify that a host has been removed from the cluster
* (thread-safe, asynchronous).
*
* @param host The host that's been removed.
*/
void notify_host_removed(const Host::Ptr& host);
/**
* Notify that a host is now available to handle queries. This has no effect
* if the host has no connections available.
*
* (thread-safe, asynchronous).
*
* @param host A host that's ready to be marked up.
*/
void notify_host_ready(const Host::Ptr& host);
/**
* Notify that a host might be available. This expedites the reconnection
* process for the provided host.
*
* (thread-safe, asynchronous).
*
* @param address An address of the host might be UP.
*/
void notify_host_maybe_up(const Address& address);
/**
* Notify that the token map has been updated
* (thread-safe, asynchronous).
*
* @param token_map The updated token map.
*/
void notify_token_map_updated(const TokenMap::Ptr& token_map);
/**
* Enqueue a request to be processed.
* (thread-safe, asynchronous)).
*
* @param request_handler
*/
void process_request(const RequestHandler::Ptr& request_handler);
/**
* Get the number of requests the processor is handling
*
* @return Request count
*/
int request_count() const { return request_count_.load(MEMORY_ORDER_RELAXED); }
public:
class Protected {
friend class RequestProcessorInitializer;
Protected() {}
Protected(Protected const&) {}
};
/**
* Initialize the async flushing mechanism for the request processor
*
* @param A key to restrict access to the method
*/
int init(Protected);
private:
// Connection pool manager listener methods
virtual void on_pool_up(const Address& address);
virtual void on_pool_down(const Address& address);
virtual void on_pool_critical_error(const Address& address, Connector::ConnectionError code,
const String& message);
virtual void on_requires_flush();
virtual void on_close(ConnectionPoolManager* manager);
private:
// Request listener methods
virtual void on_prepared_metadata_changed(const String& id,
const PreparedMetadata::Entry::Ptr& entry);
virtual void on_keyspace_changed(const String& keyspace, KeyspaceChangedResponse handler);
virtual bool on_wait_for_tracing_data(const RequestHandler::Ptr& request_handler,
const Host::Ptr& current_host,
const Response::Ptr& response);
virtual bool on_wait_for_schema_agreement(const RequestHandler::Ptr& request_handler,
const Host::Ptr& current_host,
const Response::Ptr& response);
virtual bool on_prepare_all(const RequestHandler::Ptr& request_handler,
const Host::Ptr& current_host, const Response::Ptr& response);
virtual void on_done();
private:
// Schema agreement listener methods
virtual bool on_is_host_up(const Address& address);
private:
void on_timeout(MicroTimer* timer);
private:
void internal_close();
void internal_pool_down(const Address& address);
const ExecutionProfile* execution_profile(const String& name) const;
const LoadBalancingPolicy::Vec& load_balancing_policies() const;
private:
friend class ProcessorRunClose;
friend class ProcessorNotifyHostAdd;
friend class ProcessorNotifyHostRemove;
friend class ProcessorNotifyHostReady;
friend class ProcessorNotifyMaybeHostUp;
friend class ProcessorNotifyTokenMapUpdate;
private:
void internal_host_add(const Host::Ptr& host);
void internal_host_remove(const Host::Ptr& host);
void internal_host_ready(const Host::Ptr& host);
void internal_host_maybe_up(const Address& address);
void start_coalescing();
void on_async(Async* async);
void on_prepare(Prepare* prepare);
void maybe_close(int request_count);
int process_requests(uint64_t processing_time);
bool write_wait_callback(const RequestHandler::Ptr& request_handler,
const Host::Ptr& current_host, const RequestCallback::Ptr& callback);
private:
ConnectionPoolManager::Ptr connection_pool_manager_;
String connect_keyspace_;
RequestProcessorListener* listener_;
EventLoop* const event_loop_;
LoadBalancingPolicy::Vec load_balancing_policies_;
const RequestProcessorSettings settings_;
ExecutionProfile default_profile_;
ExecutionProfile::Map profiles_;
Atomic<int> request_count_;
ScopedPtr<MPMCQueue<RequestHandler*> > const request_queue_;
TokenMap::Ptr token_map_;
bool is_closing_;
Atomic<bool> is_processing_;
int attempts_without_requests_;
uint64_t io_time_during_coalesce_;
Async async_;
Prepare prepare_;
MicroTimer timer_;
#ifdef CASS_INTERNAL_DIAGNOSTICS
int reads_during_coalesce_;
int writes_during_coalesce_;
HistogramWrapper writes_per_;
HistogramWrapper reads_per_;
#endif
};
}}} // namespace datastax::internal::core
#endif // DATASTAX_INTERNAL_REQUEST_PROCESSOR_HPP