blob: f0f488ee5246db92e9f8895ddb163e811375b244 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "http_client.hpp"
#include "constants.hpp"
#include "driver_info.hpp"
using namespace datastax;
using namespace datastax::internal::core;
namespace datastax { namespace internal { namespace core {
class HttpClientSocketHandler : public SocketHandler {
public:
HttpClientSocketHandler(HttpClient* client)
: client_(client) {}
virtual void on_read(Socket* socket, ssize_t nread, const uv_buf_t* buf) {
client_->on_read(buf->base, nread);
free_buffer(buf);
}
virtual void on_write(Socket* socket, int status, SocketRequest* request) { delete request; }
virtual void on_close() { client_->finish(); }
private:
HttpClient* client_;
};
class HttpClientSslSocketHandler : public SslSocketHandler {
public:
HttpClientSslSocketHandler(SslSession* ssl_session, HttpClient* client)
: SslSocketHandler(ssl_session)
, client_(client) {}
virtual void on_ssl_read(Socket* socket, char* buf, size_t size) { client_->on_read(buf, size); }
virtual void on_write(Socket* socket, int status, SocketRequest* request) { delete request; }
virtual void on_close() { client_->finish(); }
private:
HttpClient* client_;
};
}}} // namespace datastax::internal::core
HttpClient::HttpClient(const Address& address, const String& path, const Callback& callback)
: error_code_(HTTP_CLIENT_OK)
, address_(address)
, path_(path)
, callback_(callback)
, socket_connector_(
new SocketConnector(address, bind_callback(&HttpClient::on_socket_connect, this)))
, request_timeout_ms_(CASS_DEFAULT_CONNECT_TIMEOUT_MS)
, status_code_(0) {
http_parser_init(&parser_, HTTP_RESPONSE);
http_parser_settings_init(&parser_settings_);
parser_.data = this;
parser_settings_.on_status = on_status;
parser_settings_.on_header_field = on_header_field;
parser_settings_.on_header_value = on_header_value;
parser_settings_.on_body = on_body;
parser_settings_.on_message_complete = on_message_complete;
}
HttpClient* HttpClient::with_settings(const SocketSettings& settings) {
socket_connector_->with_settings(settings);
return this;
}
HttpClient* HttpClient::with_request_timeout_ms(uint64_t request_timeout_ms) {
request_timeout_ms_ = request_timeout_ms;
return this;
}
void HttpClient::request(uv_loop_t* loop) {
inc_ref();
socket_connector_->connect(loop);
if (request_timeout_ms_ > 0) {
request_timer_.start(loop, request_timeout_ms_, bind_callback(&HttpClient::on_timeout, this));
}
}
void HttpClient::cancel() {
error_code_ = HTTP_CLIENT_CANCELED;
socket_connector_->cancel();
if (socket_) socket_->close();
request_timer_.stop();
}
void HttpClient::on_socket_connect(SocketConnector* connector) {
if (connector->is_ok()) {
socket_ = connector->release_socket();
if (connector->ssl_session()) {
socket_->set_handler(
new HttpClientSslSocketHandler(connector->ssl_session().release(), this));
} else {
socket_->set_handler(new HttpClientSocketHandler(this));
}
OStringStream ss;
ss << "GET " << path_ << " HTTP/1.0\r\n" // HTTP/1.0 ensures chunked responses are not sent
<< "Host: " << address_.to_string(false) << "\r\n"
<< "User-Agent: cpp-driver/" << driver_version() << "\r\nAccept: */*\r\n\r\n";
String request = ss.str();
socket_->write_and_flush(new BufferSocketRequest(Buffer(request.c_str(), request.size())));
} else {
if (!connector->is_canceled()) {
error_code_ = HTTP_CLIENT_ERROR_SOCKET;
error_message_ = "Failed to establish HTTP connection: " + connector->error_message();
}
finish();
}
}
void HttpClient::on_read(char* buf, ssize_t nread) {
if (is_canceled()) return;
if (nread > 0) {
size_t parsed = http_parser_execute(&parser_, &parser_settings_, buf, nread);
if (parsed < static_cast<size_t>(nread)) {
error_code_ = HTTP_CLIENT_ERROR_PARSING;
OStringStream ss;
enum http_errno err = HTTP_PARSER_ERRNO(&parser_);
ss << "HTTP parsing error (" << http_errno_name(err) << "):" << http_errno_description(err);
error_message_ = ss.str();
socket_->close();
}
} else if (is_ok() && status_code_ == 0) { // Make sure there wasn't an existing error
error_code_ = HTTP_CLIENT_ERROR_CLOSED;
error_message_ = "HTTP connection prematurely closed";
}
}
void HttpClient::on_timeout(Timer* timer) {
error_code_ = HTTP_CLIENT_ERROR_TIMEOUT;
OStringStream ss;
ss << "HTTP request timed out after " << request_timeout_ms_ << " ms";
error_message_ = ss.str();
socket_connector_->cancel();
if (socket_) socket_->close();
}
int HttpClient::on_status(http_parser* parser, const char* buf, size_t len) {
HttpClient* self = static_cast<HttpClient*>(parser->data);
return self->handle_status(parser->status_code);
}
int HttpClient::handle_status(unsigned status_code) {
if (status_code < 200 || status_code > 299) {
error_code_ = HTTP_CLIENT_ERROR_HTTP_STATUS;
}
status_code_ = status_code;
return 0;
}
int HttpClient::on_header_field(http_parser* parser, const char* buf, size_t len) {
HttpClient* self = static_cast<HttpClient*>(parser->data);
return self->handle_header_field(buf, len);
}
int HttpClient::handle_header_field(const char* buf, size_t len) {
current_header_.assign(buf, len);
return 0;
}
int HttpClient::on_header_value(http_parser* parser, const char* buf, size_t len) {
HttpClient* self = static_cast<HttpClient*>(parser->data);
return self->handle_header_value(buf, len);
}
int HttpClient::handle_header_value(const char* buf, size_t len) {
if (StringRef(current_header_).iequals("content-type")) {
content_type_.assign(buf, len);
}
return 0;
}
int HttpClient::on_body(http_parser* parser, const char* buf, size_t len) {
HttpClient* self = static_cast<HttpClient*>(parser->data);
return self->handle_body(buf, len);
}
int HttpClient::handle_body(const char* buf, size_t len) {
response_body_.assign(buf, len);
return 0;
}
int HttpClient::on_message_complete(http_parser* parser) {
HttpClient* self = static_cast<HttpClient*>(parser->data);
return self->handle_message_complete();
}
int HttpClient::handle_message_complete() {
socket_->close();
return 0;
}
void HttpClient::finish() {
request_timer_.stop();
if (callback_) callback_(this);
dec_ref();
}