blob: de444c6a084d760622c639fa87577da9a4b61b93 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "pooled_connection.hpp"
#include "connection_pool_manager.hpp"
#include "event_loop.hpp"
#include "query_request.hpp"
using namespace datastax;
using namespace datastax::internal::core;
/**
* A request callback that sets the keyspace then runs the original request
* callback. This happens when the current keyspace wasn't set or has been
* changed.
*/
class ChainedSetKeyspaceCallback : public SimpleRequestCallback {
public:
ChainedSetKeyspaceCallback(Connection* connection, const String& keyspace,
const RequestCallback::Ptr& chained_callback);
private:
class SetKeyspaceRequest : public QueryRequest {
public:
SetKeyspaceRequest(const String& keyspace, uint64_t request_timeout_ms)
: QueryRequest("USE " + keyspace) {
set_request_timeout_ms(request_timeout_ms);
}
};
private:
virtual void on_internal_set(ResponseMessage* response);
virtual void on_internal_error(CassError code, const String& message);
virtual void on_internal_timeout();
void on_result_response(ResponseMessage* response);
private:
Connection* connection_;
RequestCallback::Ptr chained_callback_;
};
ChainedSetKeyspaceCallback::ChainedSetKeyspaceCallback(Connection* connection,
const String& keyspace,
const RequestCallback::Ptr& chained_callback)
: SimpleRequestCallback(Request::ConstPtr(
new SetKeyspaceRequest(keyspace, chained_callback->request_timeout_ms())))
, connection_(connection)
, chained_callback_(chained_callback) {}
void ChainedSetKeyspaceCallback::on_internal_set(ResponseMessage* response) {
switch (response->opcode()) {
case CQL_OPCODE_RESULT:
on_result_response(response);
break;
case CQL_OPCODE_ERROR:
connection_->defunct();
chained_callback_->on_error(CASS_ERROR_LIB_UNABLE_TO_SET_KEYSPACE, "Unable to set keyspace");
break;
default:
connection_->defunct();
chained_callback_->on_error(CASS_ERROR_LIB_UNEXPECTED_RESPONSE, "Unexpected response");
break;
}
}
void ChainedSetKeyspaceCallback::on_internal_error(CassError code, const String& message) {
connection_->defunct();
chained_callback_->on_error(CASS_ERROR_LIB_UNABLE_TO_SET_KEYSPACE, "Unable to set keyspace");
}
void ChainedSetKeyspaceCallback::on_internal_timeout() { chained_callback_->on_retry_next_host(); }
void ChainedSetKeyspaceCallback::on_result_response(ResponseMessage* response) {
ResultResponse* result = static_cast<ResultResponse*>(response->response_body().get());
if (result->kind() == CASS_RESULT_KIND_SET_KEYSPACE) {
if (connection_->write_and_flush(chained_callback_) < 0) {
// Try on the same host but a different connection
chained_callback_->on_retry_current_host();
}
} else {
connection_->defunct();
chained_callback_->on_error(CASS_ERROR_LIB_UNABLE_TO_SET_KEYSPACE, "Unable to set keyspace");
}
}
PooledConnection::PooledConnection(ConnectionPool* pool, const Connection::Ptr& connection)
: connection_(connection)
, pool_(pool)
// If the user data is set then use it to start the I/O timer.
, event_loop_(static_cast<EventLoop*>(pool->loop()->data)) {
inc_ref(); // Reference for the connection's lifetime
connection_->set_listener(this);
}
int32_t PooledConnection::write(RequestCallback* callback) {
int32_t result;
const String& keyspace(pool_->keyspace());
if (keyspace != connection_->keyspace()) {
LOG_DEBUG("Setting keyspace %s on connection(%p) pool(%p)", keyspace.c_str(),
static_cast<void*>(connection_.get()), static_cast<void*>(pool_));
result = connection_->write(RequestCallback::Ptr(new ChainedSetKeyspaceCallback(
connection_.get(), keyspace, RequestCallback::Ptr(callback))));
} else {
result = connection_->write(RequestCallback::Ptr(callback));
}
if (result > 0) {
pool_->requires_flush(this, ConnectionPool::Protected());
}
return result;
}
void PooledConnection::flush() {
size_t bytes_flushed = connection_->flush();
#ifdef CASS_INTERNAL_DIAGNOSTICS
if (bytes_flushed > 0) {
pool_->manager()->flush_bytes().record_value(bytes_flushed);
}
#else
UNUSED_(bytes_flushed);
#endif
}
void PooledConnection::close() { connection_->close(); }
int PooledConnection::inflight_request_count() const {
return connection_->inflight_request_count();
}
bool PooledConnection::is_closing() const { return connection_->is_closing(); }
void PooledConnection::on_read() {
if (event_loop_) {
event_loop_->maybe_start_io_time();
}
}
void PooledConnection::on_write() {
if (event_loop_) {
event_loop_->maybe_start_io_time();
}
}
void PooledConnection::on_close(Connection* connection) {
pool_->close_connection(this, ConnectionPool::Protected());
dec_ref();
}