blob: ac2f6613f76e51340e6badf35723418ab765a7a1 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#ifndef DATASTAX_INTERNAL_REQUEST_CALLBACK_HPP
#define DATASTAX_INTERNAL_REQUEST_CALLBACK_HPP
#include "buffer.hpp"
#include "cassandra.h"
#include "constants.hpp"
#include "dense_hash_map.hpp"
#include "list.hpp"
#include "prepared.hpp"
#include "request.hpp"
#include "response.hpp"
#include "scoped_ptr.hpp"
#include "socket.hpp"
#include "string.hpp"
#include "timer.hpp"
#include "timestamp_generator.hpp"
#include "utils.hpp"
#include <uv.h>
namespace datastax { namespace internal { namespace core {
class Config;
class Connection;
class ExecutionProfile;
class Metrics;
class Pool;
class PooledConnection;
class PreparedMetadata;
class ResponseMessage;
class ResultResponse;
typedef Vector<uv_buf_t> UvBufVec;
/**
* A wrapper class for keeping a request's state grouped together with the
* request object. This is necessary because a request object is immutable
* when it's being executed.
*/
class RequestWrapper {
public:
RequestWrapper(const Request::ConstPtr& request,
uint64_t request_timeout_ms = CASS_DEFAULT_REQUEST_TIMEOUT_MS)
: request_(request)
, consistency_(CASS_DEFAULT_CONSISTENCY)
, serial_consistency_(CASS_DEFAULT_SERIAL_CONSISTENCY)
, request_timeout_ms_(request_timeout_ms)
, timestamp_(CASS_INT64_MIN) {}
void set_prepared_metadata(const PreparedMetadata::Entry::Ptr& entry);
void init(const ExecutionProfile& profile, TimestampGenerator* timestamp_generator);
const Request::ConstPtr& request() const { return request_; }
CassConsistency consistency() const {
if (request()->consistency() != CASS_CONSISTENCY_UNKNOWN) {
return request()->consistency();
}
return consistency_;
}
CassConsistency serial_consistency() const {
if (request()->serial_consistency() != CASS_CONSISTENCY_UNKNOWN) {
return request()->serial_consistency();
}
return serial_consistency_;
}
uint64_t request_timeout_ms() const {
if (request()->request_timeout_ms() != CASS_UINT64_MAX) {
return request()->request_timeout_ms();
}
return request_timeout_ms_;
}
int64_t timestamp() const {
if (request()->timestamp() != CASS_INT64_MIN) {
return request()->timestamp();
}
return timestamp_;
}
const RetryPolicy::Ptr& retry_policy() const {
if (request()->retry_policy()) {
return request()->retry_policy();
}
return retry_policy_;
}
const PreparedMetadata::Entry::Ptr& prepared_metadata_entry() const {
return prepared_metadata_entry_;
}
private:
Request::ConstPtr request_;
CassConsistency consistency_;
CassConsistency serial_consistency_;
uint64_t request_timeout_ms_;
int64_t timestamp_;
RetryPolicy::Ptr retry_policy_;
PreparedMetadata::Entry::Ptr prepared_metadata_entry_;
};
class RequestCallback
: public RefCounted<RequestCallback>
, public SocketRequest {
public:
typedef SharedRefPtr<RequestCallback> Ptr;
typedef Vector<Ptr> Vec;
enum State {
REQUEST_STATE_NEW,
REQUEST_STATE_WRITING,
REQUEST_STATE_READING,
REQUEST_STATE_READ_BEFORE_WRITE,
REQUEST_STATE_FINISHED
};
RequestCallback(const RequestWrapper& wrapper)
: wrapper_(wrapper)
, stream_(-1)
, state_(REQUEST_STATE_NEW)
, retry_consistency_(CASS_CONSISTENCY_UNKNOWN) {}
virtual ~RequestCallback() {}
void notify_write(Connection* connection, int stream);
public:
// Called to retry a request on a different connection
virtual void on_retry_current_host() = 0;
virtual void on_retry_next_host() = 0;
protected:
// Called right before a request is written to a connection
virtual void on_write(Connection* connection) = 0;
public:
// Called to finish a request
virtual void on_set(ResponseMessage* response) = 0;
virtual void on_error(CassError code, const String& message) = 0;
public:
const Request* request() const { return wrapper_.request().get(); }
bool skip_metadata() const;
CassConsistency consistency() {
// The retry consistency takes the highest priority
if (retry_consistency_ != CASS_CONSISTENCY_UNKNOWN) {
return retry_consistency_;
}
return wrapper_.consistency();
}
CassConsistency serial_consistency() { return wrapper_.serial_consistency(); }
uint64_t request_timeout_ms() { return wrapper_.request_timeout_ms(); }
int64_t timestamp() { return wrapper_.timestamp(); }
const RetryPolicy::Ptr& retry_policy() { return wrapper_.retry_policy(); }
const PreparedMetadata::Entry::Ptr& prepared_metadata_entry() const {
return wrapper_.prepared_metadata_entry();
}
void set_retry_consistency(CassConsistency cl) { retry_consistency_ = cl; }
int stream() const { return stream_; }
State state() const { return state_; }
void set_state(State next_state);
const char* state_string() const;
ResponseMessage* read_before_write_response() const { return read_before_write_response_.get(); }
void set_read_before_write_response(ResponseMessage* response) {
read_before_write_response_.reset(response);
}
private:
virtual int32_t encode(BufferVec* bufs);
virtual void on_close();
private:
const RequestWrapper wrapper_;
ProtocolVersion protocol_version_;
int stream_;
State state_;
CassConsistency retry_consistency_;
ScopedPtr<ResponseMessage> read_before_write_response_;
private:
DISALLOW_COPY_AND_ASSIGN(RequestCallback);
};
class SimpleRequestCallback : public RequestCallback {
public:
SimpleRequestCallback(const String& query,
uint64_t request_timeout_ms = CASS_DEFAULT_REQUEST_TIMEOUT_MS);
SimpleRequestCallback(const Request::ConstPtr& request,
uint64_t request_timeout_ms = CASS_DEFAULT_REQUEST_TIMEOUT_MS)
: RequestCallback(RequestWrapper(request, request_timeout_ms)) {}
SimpleRequestCallback(const RequestWrapper& wrapper)
: RequestCallback(wrapper) {}
protected:
virtual void on_internal_write(Connection* connection) {}
virtual void on_internal_set(ResponseMessage* response) = 0;
virtual void on_internal_error(CassError code, const String& message) = 0;
virtual void on_internal_timeout() = 0;
private:
virtual void on_retry_current_host();
virtual void on_retry_next_host();
virtual void on_write(Connection* connection);
protected:
virtual void on_set(ResponseMessage* response);
virtual void on_error(CassError code, const String& message);
private:
void on_timeout(Timer* timer);
private:
Timer timer_;
};
/**
* A request callback that chains multiple requests together as a single
* request.
*/
class ChainedRequestCallback : public SimpleRequestCallback {
public:
typedef SharedRefPtr<ChainedRequestCallback> Ptr;
class Map : public DenseHashMap<String, Response::Ptr> {
public:
Map() { set_empty_key(String()); }
};
/**
* Constructor for a simple query.
*
* @param key A key to map the response to the query.
* @param query The actual query to run.
* @param chain A request that's chained to this request. Don't use directly
* instead use the chain() method.
*/
ChainedRequestCallback(const String& key, const String& query, const Ptr& chain = Ptr());
/**
* Constructor for any type of request.
*
* @param key A key to map the response to the request.
* @param request The actual request to run.
* @param chain A request that's chained to this request. Don't use directly
* instead use the chain() method.
*/
ChainedRequestCallback(const String& key, const Request::ConstPtr& request,
const Ptr& chain = Ptr());
/**
* Add a chained query to this request callback.
*
* Note: The last request in the chain must be executed for all prior chained
* requests to execute properly.
*
* @param key A key to map the response to the query.
* @param query The actual query to chain.
* @return Returns the new chained request callback so that another request
* can be chained. e.g. callback->chain(...)->chain(...)
*/
ChainedRequestCallback::Ptr chain(const String& key, const String& query);
/**
* Add a chained request to this request callback.
*
* Note: The last request in the chain must be executed for all prior chained
* requests to execute properly.
*
* @param key A key to map the response to the request.
* @param request The actual request to run.
* @return Returns the new chained request callback so that another request
* can be chained. e.g. callback->chain(...)->chain(...)
*/
ChainedRequestCallback::Ptr chain(const String& key, const Request::ConstPtr& request);
/**
* The responses for the chained callbacks.
*
* @return A map of the responses by key.
*/
const Map& responses() const { return responses_; }
/**
* Get the result response for a key.
*
* @param The key the query/request was created with.
* @return The result response for the chained request, null if not a
* result response or if the key doesn't exist.
*/
ResultResponse::Ptr result(const String& key) const;
protected:
/**
* A callback for when the chained request is written to a connection.
*
* @param connection The connection the callback was written to.
*/
virtual void on_chain_write(Connection* connection) {}
/**
* A callback for when all responses have been received.
*/
virtual void on_chain_set() {}
/**
* A callback for when an error occurs. A single error causes the whole
* chain to fail.
*
* @param code The error code.
* @param message The error message.
*/
virtual void on_chain_error(CassError code, const String& message) {}
/**
* A callback for a request timeout. A single timeout causes the whole
* chain to fail.
*/
virtual void on_chain_timeout() {}
private:
virtual void on_internal_write(Connection* connection);
virtual void on_internal_set(ResponseMessage* response);
virtual void on_internal_error(CassError code, const String& message);
virtual void on_internal_timeout();
private:
void set_chain_responses(Map& responses);
bool is_finished() const;
void maybe_finish();
private:
const ChainedRequestCallback::Ptr chain_;
bool has_pending_;
bool has_error_or_timeout_;
String key_;
Response::Ptr response_;
Map responses_;
};
}}} // namespace datastax::internal::core
#endif