blob: 94dd860b7f55056717a33b520c13435ed3948615 [file] [log] [blame]
// Copyright 2010 Google Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Author: jmarantz@google.com (Joshua Marantz)
// lsong@google.com (Libo Song)
// TODO(jmarantz): Avoid initiating fetches for resources already in flight.
// The challenge is that we would want to call all the callbacks that indicated
// interest in a particular URL once the callback completed. Alternatively,
// this could be done in a level above the URL fetcher.
#include "pagespeed/system/serf_url_async_fetcher.h"
#include <cstddef>
#include <list>
#include <vector>
#include "apr.h"
#include "apr_strings.h"
#include "apr_pools.h"
#include "apr_thread_proc.h"
#include "base/logging.h"
#include "net/instaweb/http/public/async_fetch.h"
#include "net/instaweb/public/global_constants.h"
#include "net/instaweb/public/version.h"
#include "pagespeed/system/apr_thread_compatible_pool.h"
#include "pagespeed/kernel/base/abstract_mutex.h"
#include "pagespeed/kernel/base/basictypes.h"
#include "pagespeed/kernel/base/condvar.h"
#include "pagespeed/kernel/base/message_handler.h"
#include "pagespeed/kernel/base/pool.h"
#include "pagespeed/kernel/base/pool_element.h"
#include "pagespeed/kernel/base/scoped_ptr.h"
#include "pagespeed/kernel/base/statistics.h"
#include "pagespeed/kernel/base/string_util.h"
#include "pagespeed/kernel/base/thread_system.h"
#include "pagespeed/kernel/base/timer.h"
#include "pagespeed/kernel/http/google_url.h"
#include "pagespeed/kernel/http/http_names.h"
#include "pagespeed/kernel/http/request_headers.h"
#include "pagespeed/kernel/http/response_headers.h"
// This is an easy way to turn on lots of debug messages. Note that this
// is somewhat verbose.
#define SERF_DEBUG(x)
namespace {
enum HttpsOptions {
kEnableHttps = 1 << 0,
kAllowSelfSigned = 1 << 1,
kAllowUnknownCertificateAuthority = 1 << 2,
kAllowCertificateNotYetValid = 1 << 3,
};
} // namespace
extern "C" {
// Declares new functions added to
// src/third_party/serf/instaweb_context.c
serf_bucket_t* serf_request_bucket_request_create_for_host(
serf_request_t *request,
const char *method,
const char *uri,
serf_bucket_t *body,
serf_bucket_alloc_t *allocator, const char* host);
int serf_connection_is_in_error_state(serf_connection_t* connection);
// Declares new functions added in instaweb_ssl_buckets.c
apr_status_t serf_ssl_set_certificates_directory(serf_ssl_context_t *ssl_ctx,
const char* path);
apr_status_t serf_ssl_set_certificates_file(serf_ssl_context_t *ssl_ctx,
const char* file);
int serf_ssl_check_host(const serf_ssl_certificate_t *cert,
const char* hostname);
} // extern "C"
namespace net_instaweb {
const char SerfStats::kSerfFetchRequestCount[] = "serf_fetch_request_count";
const char SerfStats::kSerfFetchByteCount[] = "serf_fetch_bytes_count";
const char SerfStats::kSerfFetchTimeDurationMs[] =
"serf_fetch_time_duration_ms";
const char SerfStats::kSerfFetchCancelCount[] = "serf_fetch_cancel_count";
const char SerfStats::kSerfFetchActiveCount[] =
"serf_fetch_active_count";
const char SerfStats::kSerfFetchTimeoutCount[] = "serf_fetch_timeout_count";
const char SerfStats::kSerfFetchFailureCount[] = "serf_fetch_failure_count";
const char SerfStats::kSerfFetchCertErrors[] = "serf_fetch_cert_errors";
GoogleString GetAprErrorString(apr_status_t status) {
char error_str[1024];
apr_strerror(status, error_str, sizeof(error_str));
return error_str;
}
SerfFetch::SerfFetch(const GoogleString& url,
AsyncFetch* async_fetch,
MessageHandler* message_handler,
Timer* timer)
: fetcher_(NULL),
timer_(timer),
str_url_(url),
async_fetch_(async_fetch),
parser_(async_fetch->response_headers()),
status_line_read_(false),
one_byte_read_(false),
has_saved_byte_(false),
saved_byte_('\0'),
message_handler_(message_handler),
pool_(NULL), // filled in once assigned to a thread, to use its pool.
bucket_alloc_(NULL),
host_header_(NULL),
sni_host_(NULL),
connection_(NULL),
bytes_received_(0),
fetch_start_ms_(0),
fetch_end_ms_(0),
using_https_(false),
ssl_context_(NULL),
ssl_error_message_(NULL) {
memset(&url_, 0, sizeof(url_));
}
SerfFetch::~SerfFetch() {
DCHECK(async_fetch_ == NULL);
if (connection_ != NULL) {
serf_connection_close(connection_);
}
if (pool_ != NULL) {
apr_pool_destroy(pool_);
}
}
GoogleString SerfFetch::DebugInfo() {
if (host_header_ != NULL &&
url_.scheme != NULL &&
url_.hostinfo != NULL) {
GoogleUrl base(StrCat(url_.scheme, "://", host_header_));
if (base.IsWebValid()) {
const char* url_path = apr_uri_unparse(pool_, &url_,
APR_URI_UNP_OMITSITEPART);
GoogleUrl abs_url(base, url_path);
if (abs_url.IsWebValid()) {
GoogleString debug_info;
abs_url.Spec().CopyToString(&debug_info);
if (StringPiece(url_.hostinfo) != host_header_) {
StrAppend(&debug_info, " (connecting to:", url_.hostinfo, ")");
}
return debug_info;
}
}
}
return str_url_;
}
void SerfFetch::Cancel() {
if (connection_ != NULL) {
// We can get here either because we're canceling the connection ourselves
// or because Serf detected an error.
//
// If we canceled/timed out, we want to close the serf connection so it
// doesn't call us back, as we will detach from the async_fetch_ shortly.
//
// If Serf detected an error we also want to clean up as otherwise it will
// keep re-detecting it, which will interfere with other jobs getting
// handled (until we finally cleanup the old fetch and close things in
// ~SerfFetch).
serf_connection_close(connection_);
connection_ = NULL;
}
CallCallback(false);
}
void SerfFetch::CallCallback(bool success) {
if (ssl_error_message_ != NULL) {
success = false;
}
if (async_fetch_ != NULL) {
fetch_end_ms_ = timer_->NowMs();
fetcher_->ReportCompletedFetchStats(this);
CallbackDone(success);
fetcher_->FetchComplete(this);
} else if (ssl_error_message_ == NULL) {
LOG(FATAL) << "BUG: Serf callback called more than once on same fetch "
<< DebugInfo() << " (" << this << "). Please report this "
<< "at http://code.google.com/p/modpagespeed/issues/";
}
}
void SerfFetch::CallbackDone(bool success) {
// fetcher_==NULL if Start is called during shutdown.
if (fetcher_ != NULL) {
if (!success) {
fetcher_->failure_count_->Add(1);
}
if (fetcher_->track_original_content_length() &&
!async_fetch_->response_headers()->Has(
HttpAttributes::kXOriginalContentLength)) {
async_fetch_->extra_response_headers()->SetOriginalContentLength(
bytes_received_);
}
}
async_fetch_->Done(success);
// We should always NULL the async_fetch_ out after calling otherwise we
// could get weird double calling errors.
async_fetch_ = NULL;
}
void SerfFetch::CleanupIfError() {
if ((connection_ != NULL) &&
serf_connection_is_in_error_state(connection_)) {
message_handler_->Message(
kInfo, "Serf cleanup for error'd fetch of: %s", DebugInfo().c_str());
Cancel();
}
}
int64 SerfFetch::TimeDuration() const {
if ((fetch_start_ms_ != 0) && (fetch_end_ms_ != 0)) {
return fetch_end_ms_ - fetch_start_ms_;
} else {
return 0;
}
}
#if SERF_HTTPS_FETCHING
// static
apr_status_t SerfFetch::SSLCertValidate(void *data, int failures,
const serf_ssl_certificate_t *cert) {
return static_cast<SerfFetch*>(data)->HandleSSLCertValidation(
failures, 0, cert);
}
// static
apr_status_t SerfFetch::SSLCertChainValidate(
void *data, int failures, int error_depth,
const serf_ssl_certificate_t * const *certs,
apr_size_t certs_count) {
return static_cast<SerfFetch*>(data)->HandleSSLCertValidation(
failures, error_depth, NULL);
}
#endif
// static
apr_status_t SerfFetch::ConnectionSetup(
apr_socket_t* socket, serf_bucket_t **read_bkt, serf_bucket_t **write_bkt,
void* setup_baton, apr_pool_t* pool) {
SerfFetch* fetch = static_cast<SerfFetch*>(setup_baton);
*read_bkt = serf_bucket_socket_create(socket, fetch->bucket_alloc_);
#if SERF_HTTPS_FETCHING
apr_status_t status = APR_SUCCESS;
if (fetch->using_https_) {
*read_bkt = serf_bucket_ssl_decrypt_create(*read_bkt,
fetch->ssl_context_,
fetch->bucket_alloc_);
if (fetch->ssl_context_ == NULL) {
fetch->ssl_context_ = serf_bucket_ssl_decrypt_context_get(*read_bkt);
if (fetch->ssl_context_ == NULL) {
status = APR_EGENERAL;
} else {
SerfUrlAsyncFetcher* fetcher = fetch->fetcher_;
const GoogleString& certs_dir = fetcher->ssl_certificates_dir();
const GoogleString& certs_file = fetcher->ssl_certificates_file();
if (!certs_file.empty()) {
status = serf_ssl_set_certificates_file(
fetch->ssl_context_, certs_file.c_str());
}
if ((status == APR_SUCCESS) && !certs_dir.empty()) {
status = serf_ssl_set_certificates_directory(fetch->ssl_context_,
certs_dir.c_str());
}
// If no explicit file or directory is specified, then use the
// compiled-in default.
if (certs_dir.empty() && certs_file.empty()) {
status = serf_ssl_use_default_certificates(fetch->ssl_context_);
}
}
if (status != APR_SUCCESS) {
return status;
}
}
serf_ssl_server_cert_callback_set(
fetch->ssl_context_, SSLCertValidate, fetch);
serf_ssl_server_cert_chain_callback_set(
fetch->ssl_context_, SSLCertValidate, SSLCertChainValidate, fetch);
status = serf_ssl_set_hostname(fetch->ssl_context_, fetch->sni_host_);
if (status != APR_SUCCESS) {
LOG(INFO) << "Unable to set hostname from serf fetcher. Connection "
"setup failed";
return status;
}
*write_bkt = serf_bucket_ssl_encrypt_create(*write_bkt,
fetch->ssl_context_,
fetch->bucket_alloc_);
}
#endif
return APR_SUCCESS;
}
// static
void SerfFetch::ClosedConnection(serf_connection_t* conn,
void* closed_baton,
apr_status_t why,
apr_pool_t* pool) {
SerfFetch* fetch = static_cast<SerfFetch*>(closed_baton);
if (why != APR_SUCCESS) {
fetch->message_handler_->Warning(
fetch->DebugInfo().c_str(), 0, "Connection close (code=%d %s).",
why, GetAprErrorString(why).c_str());
}
// Connection is closed.
fetch->connection_ = NULL;
}
// static
serf_bucket_t* SerfFetch::AcceptResponse(serf_request_t* request,
serf_bucket_t* stream,
void* acceptor_baton,
apr_pool_t* pool) {
// Get the per-request bucket allocator.
serf_bucket_alloc_t* bucket_alloc = serf_request_get_alloc(request);
// Create a barrier so the response doesn't eat us!
// From the comment in Serf:
// ### the stream does not have a barrier, this callback should generally
// ### add a barrier around the stream before incorporating it into a
// ### response bucket stack.
// ... i.e. the passed bucket becomes owned rather than
// ### borrowed.
serf_bucket_t* bucket = serf_bucket_barrier_create(stream, bucket_alloc);
return serf_bucket_response_create(bucket, bucket_alloc);
}
// The handler MUST process data from the response bucket until the
// bucket's read function states it would block (APR_STATUS_IS_EAGAIN).
// The handler is invoked only when new data arrives. If no further data
// arrives, and the handler does not process all available data, then the
// system can result in a deadlock around the unprocessed, but read, data.
//
// static
apr_status_t SerfFetch::HandleResponse(serf_request_t* request,
serf_bucket_t* response,
void* handler_baton,
apr_pool_t* pool) {
SerfFetch* fetch = static_cast<SerfFetch*>(handler_baton);
return fetch->HandleResponse(response);
}
// static
bool SerfFetch::MoreDataAvailable(apr_status_t status) {
// This OR is structured like this to make debugging easier, as it's
// not obvious when looking at the status mask which of these conditions
// is hit.
if (APR_STATUS_IS_EAGAIN(status)) {
return true;
}
return APR_STATUS_IS_EINTR(status);
}
// static
bool SerfFetch::IsStatusOk(apr_status_t status) {
return ((status == APR_SUCCESS) ||
APR_STATUS_IS_EOF(status) ||
MoreDataAvailable(status));
}
#if SERF_HTTPS_FETCHING
apr_status_t SerfFetch::HandleSSLCertValidation(
int errors, int failure_depth, const serf_ssl_certificate_t *cert) {
// TODO(jmarantz): is there value in logging the errors and failure_depth
// formals here?
// Note that HandleSSLCertValidation can be called multiple times for a single
// request. As far as I can tell, there is value in recording only one of
// these. For now, I have set up the logic so only the last error will be
// printed lazilly, in ReadHeaders.
if (((errors & SERF_SSL_CERT_SELF_SIGNED) != 0) &&
!fetcher_->allow_self_signed()) {
ssl_error_message_ = "SSL certificate is self-signed";
} else if (((errors & SERF_SSL_CERT_UNKNOWNCA) != 0) &&
!fetcher_->allow_unknown_certificate_authority()) {
ssl_error_message_ =
"SSL certificate has an unknown certificate authority";
} else if (((errors & SERF_SSL_CERT_NOTYETVALID) != 0) &&
!fetcher_->allow_certificate_not_yet_valid()) {
ssl_error_message_ = "SSL certificate is not yet valid";
} else if (errors & SERF_SSL_CERT_EXPIRED) {
ssl_error_message_ = "SSL certificate is expired";
} else if (errors & SERF_SSL_CERT_UNKNOWN_FAILURE) {
ssl_error_message_ = "SSL certificate has an unknown error";
}
if (ssl_error_message_ == NULL && async_fetch_ != NULL) {
if (// If cert is null that means we're being called via SSLCertChainError.
// We only need to check the host name matches when being called via
// SSLCertError, in which case cert won't be null.
cert != NULL &&
// No point in checking the host if we're allowing self-signed or a made
// up CA, since people can forge whatever they want and often don't
// bother to make the name match.
!fetcher_->allow_self_signed() &&
!fetcher_->allow_unknown_certificate_authority()) {
DCHECK(serf_ssl_cert_depth(cert) == 0) <<
"Serf should be filtering out intermediate certs before hitting us.";
// You would think we could do whatever serf_get.c does, but it turns
// out that does no checking at all. There's x509_check_host, added in
// 1.0.2, but when svn uses serf it rolls its own check because it wants
// to support older versions. We generally build with boringssl, which
// forked from 1.0.2 and has always had this function, but when we build
// with openssl we now require 1.0.2.
if (serf_ssl_check_host(cert, sni_host_) != 1) {
ssl_error_message_ = "Failed to match host.";
}
}
}
// Immediately call the fetch callback on a cert error. Note that
// HandleSSLCertValidation is called multiple times when there is an error, so
// check async_fetch before CallCallback.
if ((ssl_error_message_ != NULL) && (async_fetch_ != NULL)) {
fetcher_->cert_errors_->Add(1);
CallCallback(false); // sets async_fetch_ to null.
}
// TODO(jmarantz): I think the design of this system indicates
// that we should be returning APR_EGENERAL on failure. However I
// have found that doesn't work properly, at least for
// SERF_SSL_CERT_SELF_SIGNED. The request does not terminate
// quickly but instead times out. Thus we return APR_SUCCESS
// but change the status_code to 404, report an error, and suppress
// the output.
//
// TODO(jmarantz): consider aiding diagnosability with by changing the
// 404 to a 401 (Unauthorized) or 418 (I'm a teapot), or 459 (nginx
// internal cert error code).
return APR_SUCCESS;
}
#endif
apr_status_t SerfFetch::HandleResponse(serf_bucket_t* response) {
if (response == NULL) {
message_handler_->Message(
kInfo, "serf HandlerReponse called with NULL response for %s",
DebugInfo().c_str());
CallCallback(false);
return APR_EGENERAL;
}
// The response-handling code must be robust to packets coming in all at once,
// one byte at a time, or anything in between. If we get EAGAIN we need to
// return it to our caller so it can do more work and call us again. See the
// serf example code in serf_get.c.
apr_status_t status = APR_EAGAIN;
while (MoreDataAvailable(status) && (async_fetch_ != NULL) &&
!parser_.headers_complete()) {
if (!status_line_read_) {
status = ReadStatusLine(response);
}
if (status_line_read_ && !one_byte_read_) {
status = ReadOneByteFromBody(response);
}
if (one_byte_read_ && !parser_.headers_complete()) {
status = ReadHeaders(response);
}
if (APR_STATUS_IS_EAGAIN(status)) {
return status;
}
}
if (parser_.headers_complete()) {
status = ReadBody(response);
}
if ((async_fetch_ != NULL) &&
((APR_STATUS_IS_EOF(status) && parser_.headers_complete()) ||
(status == APR_EGENERAL))) {
bool success = (IsStatusOk(status) && parser_.headers_complete());
if (!parser_.headers_complete() && (async_fetch_ != NULL)) {
// Be careful not to leave headers in inconsistent state in some error
// conditions.
async_fetch_->response_headers()->Clear();
}
CallCallback(success);
}
return status;
}
apr_status_t SerfFetch::ReadStatusLine(serf_bucket_t* response) {
serf_status_line status_line;
apr_status_t status = serf_bucket_response_status(response, &status_line);
if (status == APR_SUCCESS) {
ResponseHeaders* response_headers = async_fetch_->response_headers();
response_headers->SetStatusAndReason(
static_cast<HttpStatus::Code>(status_line.code));
response_headers->set_major_version(status_line.version / 1000);
response_headers->set_minor_version(status_line.version % 1000);
status_line_read_ = true;
}
return status;
}
apr_status_t SerfFetch::ReadOneByteFromBody(serf_bucket_t* response) {
apr_size_t len = 0;
const char* data = NULL;
apr_status_t status = serf_bucket_read(response, 1, &data, &len);
if (!APR_STATUS_IS_EINTR(status) && IsStatusOk(status)) {
one_byte_read_ = true;
if (len == 1) {
has_saved_byte_ = true;
saved_byte_ = data[0];
}
}
return status;
}
apr_status_t SerfFetch::ReadHeaders(serf_bucket_t* response) {
serf_bucket_t* headers = serf_bucket_response_get_headers(response);
const char* data = NULL;
apr_size_t len = 0;
apr_status_t status = serf_bucket_read(headers, SERF_READ_ALL_AVAIL,
&data, &len);
// Feed valid chunks to the header parser --- but skip empty ones,
// which can occur for value-less headers, since otherwise they'd
// look like parse errors.
if (IsStatusOk(status) && (len > 0)) {
if (parser_.ParseChunk(StringPiece(data, len), message_handler_)) {
if (parser_.headers_complete()) {
ResponseHeaders* response_headers = async_fetch_->response_headers();
if (ssl_error_message_ != NULL) {
response_headers->set_status_code(HttpStatus::kNotFound);
message_handler_->Message(kInfo, "%s: %s", DebugInfo().c_str(),
ssl_error_message_);
has_saved_byte_ = false;
}
if (fetcher_->track_original_content_length()) {
// Set X-Original-Content-Length, if Content-Length is available.
int64 content_length;
if (response_headers->FindContentLength(&content_length)) {
response_headers->SetOriginalContentLength(content_length);
}
}
// Stream the one byte read from ReadOneByteFromBody to writer.
if (has_saved_byte_) {
++bytes_received_;
if (!async_fetch_->Write(StringPiece(&saved_byte_, 1),
message_handler_)) {
status = APR_EGENERAL;
}
}
}
} else {
status = APR_EGENERAL;
}
}
return status;
}
apr_status_t SerfFetch::ReadBody(serf_bucket_t* response) {
apr_status_t status = APR_EAGAIN;
const char* data = NULL;
apr_size_t len = 0;
apr_size_t bytes_to_flush = 0;
while (MoreDataAvailable(status) && (async_fetch_ != NULL)) {
status = serf_bucket_read(response, SERF_READ_ALL_AVAIL, &data, &len);
if (APR_STATUS_IS_EAGAIN(status)) {
break;
}
bytes_received_ += len;
bytes_to_flush += len;
if (IsStatusOk(status) && (len != 0) &&
!async_fetch_->Write(StringPiece(data, len), message_handler_)) {
status = APR_EGENERAL;
}
}
if ((bytes_to_flush != 0) && !async_fetch_->Flush(message_handler_)) {
status = APR_EGENERAL;
}
return status;
}
void SerfFetch::FixUserAgent() {
// Supply a default user-agent if none is present, and in any case
// append on a 'serf' suffix.
GoogleString user_agent;
ConstStringStarVector v;
RequestHeaders* request_headers = async_fetch_->request_headers();
if (request_headers->Lookup(HttpAttributes::kUserAgent, &v)) {
for (int i = 0, n = v.size(); i < n; ++i) {
if (i != 0) {
user_agent += " ";
}
if (v[i] != NULL) {
user_agent += *(v[i]);
}
}
request_headers->RemoveAll(HttpAttributes::kUserAgent);
}
if (user_agent.empty()) {
user_agent += "Serf/" SERF_VERSION_STRING;
}
GoogleString version = StrCat(
" (", kModPagespeedSubrequestUserAgent,
"/" MOD_PAGESPEED_VERSION_STRING "-" LASTCHANGE_STRING ")");
if (!StringPiece(user_agent).ends_with(version)) {
user_agent += version;
}
request_headers->Add(HttpAttributes::kUserAgent, user_agent);
}
// static
apr_status_t SerfFetch::SetupRequest(serf_request_t* request,
void* setup_baton,
serf_bucket_t** req_bkt,
serf_response_acceptor_t* acceptor,
void** acceptor_baton,
serf_response_handler_t* handler,
void** handler_baton,
apr_pool_t* pool) {
SerfFetch* fetch = static_cast<SerfFetch*>(setup_baton);
const char* url_path = apr_uri_unparse(pool, &fetch->url_,
APR_URI_UNP_OMITSITEPART);
// If there is an explicit Host header, then override the
// host field in the Serf structure, as we will not be able
// to override it after it is created; only append to it.
//
// Serf automatically populates the Host field based on the
// URL, and provides no mechanism to override it, except
// by hacking source. We hacked source.
//
// See src/third_party/serf/src/instaweb_context.c
fetch->FixUserAgent();
RequestHeaders* request_headers = fetch->async_fetch_->request_headers();
// Don't want to forward hop-by-hop stuff.
StringPieceVector names_to_sanitize =
HttpAttributes::SortedHopByHopHeaders();
request_headers->RemoveAllFromSortedArray(&names_to_sanitize[0],
names_to_sanitize.size());
// Also leave Content-Length to serf.
request_headers->RemoveAll(HttpAttributes::kContentLength);
serf_bucket_t* body_bkt = NULL;
const GoogleString& message_body = request_headers->message_body();
bool post_payload =
!message_body.empty() &&
(request_headers->method() == RequestHeaders::kPost);
if (post_payload) {
body_bkt = serf_bucket_simple_create(
message_body.data(), message_body.length(),
NULL /* no free function */, NULL /* no free baton*/,
serf_request_get_alloc(request));
}
*req_bkt = serf_request_bucket_request_create_for_host(
request, request_headers->method_string(),
url_path, body_bkt,
serf_request_get_alloc(request), fetch->host_header_);
serf_bucket_t* hdrs_bkt = serf_bucket_request_get_headers(*req_bkt);
// Add other headers from the caller's request. Skip the "Host:" header
// because it's set above.
for (int i = 0; i < request_headers->NumAttributes(); ++i) {
const GoogleString& name = request_headers->Name(i);
const GoogleString& value = request_headers->Value(i);
if (!(StringCaseEqual(name, HttpAttributes::kHost))) {
// Note: *_setn() stores a pointer to name and value instead of a
// copy of those values. So name and value must have long lifetimes.
// In this case, we depend on request_headers being unchanged for
// the lifetime of hdrs_bkt, which is a documented requirement of
// the UrlAsyncFetcher interface.
serf_bucket_headers_setn(hdrs_bkt, name.c_str(), value.c_str());
}
}
*acceptor = SerfFetch::AcceptResponse;
*acceptor_baton = fetch;
*handler = SerfFetch::HandleResponse;
*handler_baton = fetch;
return APR_SUCCESS;
}
bool SerfFetch::ParseUrl() {
apr_status_t status = 0;
status = apr_uri_parse(pool_, str_url_.c_str(), &url_);
if (status != APR_SUCCESS || url_.scheme == NULL) {
return false; // Failed to parse URL.
}
bool is_https = StringCaseEqual(url_.scheme, "https");
if (is_https && !fetcher_->allow_https()) {
return false;
}
if (!url_.port) {
url_.port = apr_uri_port_of_scheme(url_.scheme);
}
if (!url_.path) {
url_.path = apr_pstrdup(pool_, "/");
}
// Compute our host header. First see if there is an explicit specified
// Host: in the fetch object.
RequestHeaders* request_headers = async_fetch_->request_headers();
const char* host = request_headers->Lookup1(HttpAttributes::kHost);
if (host == NULL) {
host = SerfUrlAsyncFetcher::ExtractHostHeader(url_, pool_);
}
host_header_ = apr_pstrdup(pool_, host);
if (is_https) {
// SNI hosts, unlike Host: do not have a port number.
GoogleString sni_host =
SerfUrlAsyncFetcher::RemovePortFromHostHeader(host_header_);
sni_host_ = apr_pstrdup(pool_, sni_host.c_str());
}
return true;
}
class SerfThreadedFetcher : public SerfUrlAsyncFetcher {
public:
SerfThreadedFetcher(SerfUrlAsyncFetcher* parent, const char* proxy) :
SerfUrlAsyncFetcher(parent, proxy),
thread_id_(NULL),
initiate_mutex_(parent->thread_system()->NewMutex()),
initiate_fetches_(new SerfFetchPool()),
initiate_fetches_nonempty_(initiate_mutex_->NewCondvar()),
thread_finish_(false),
thread_started_(false) {
}
~SerfThreadedFetcher() {
// Let the thread terminate naturally by telling it to unblock,
// then waiting for it to finish its next active Poll operation.
{
// Indicate termination and unblock the worker thread so it can clean up.
ScopedMutex lock(initiate_mutex_.get());
if (thread_started_) {
thread_finish_ = true;
initiate_fetches_nonempty_->Signal();
} else {
LOG(INFO) << "Serf threaded not actually started, quick shutdown.";
return;
}
}
LOG(INFO) << "Waiting for threaded serf fetcher to terminate";
apr_status_t ignored_retval;
apr_thread_join(&ignored_retval, thread_id_);
// Under normal circumstances there shouldn't be any active fetches at
// this point. However, in practice we may have some lingering fetches that
// have timed out, and we need to clean those up properly before we can
// exit. We try to do this gracefully, but fall back to graceless cleanup
// if that fails.
// Before we can clean up, we must make sure we haven't initiated any
// fetches that haven't moved to the active pool yet. This should not
// happen, but we're exercising undue caution here. We do this by just
// moving them across. From this point, calls to InitiateFetch(...) are
// illegal, but we should be invoking this destructor from the only thread
// that could have called InitiateFetch anyhow.
TransferFetchesAndCheckDone(false);
// Although Cancel will be called in the base class destructor, we
// want to call it here as well, as it will make it easier for the
// thread to terminate.
CancelActiveFetches();
completed_fetches_.DeleteAll();
initiate_fetches_->DeleteAll();
}
void StartThread() {
CHECK_EQ(APR_SUCCESS,
apr_thread_create(&thread_id_, NULL, SerfThreadFn, this, pool_));
thread_started_ = true;
}
// Called from mainline to queue up a fetch for the thread. If the
// thread is idle then we can unlock it.
void InitiateFetch(SerfFetch* fetch) {
ScopedMutex lock(initiate_mutex_.get());
// We delay thread startup until we actually want to fetch something
// to avoid problems with ITK.
if (!thread_started_) {
StartThread();
}
// TODO(jmaessen): Consider adding an awaiting_nonempty_ flag to avoid
// spurious calls to Signal().
bool signal = initiate_fetches_->empty();
initiate_fetches_->Add(fetch);
if (signal) {
initiate_fetches_nonempty_->Signal();
}
}
void ShutDown() {
// See comments in the destructor above.. The big difference is that
// because we set shutdown_ to true new jobs can't actually come in.
{
// Acquisition order is initiate before hold, see e.g. AnyPendingFetches()
ScopedMutex hold_initiate(initiate_mutex_.get());
ScopedMutex hold(mutex_);
set_shutdown(true);
if (!thread_started_) {
return;
}
}
TransferFetchesAndCheckDone(false);
CancelActiveFetches();
}
protected:
bool AnyPendingFetches() {
ScopedMutex lock(initiate_mutex_.get());
// NOTE: We must hold both mutexes to avoid the case where we miss a fetch
// in transit.
return !initiate_fetches_->empty() ||
SerfUrlAsyncFetcher::AnyPendingFetches();
}
private:
static void* APR_THREAD_FUNC SerfThreadFn(apr_thread_t* thread_id,
void* context) {
SerfThreadedFetcher* stc = static_cast<SerfThreadedFetcher*>(context);
CHECK_EQ(thread_id, stc->thread_id_);
stc->SerfThread();
return NULL;
}
// Transfer fetches from initiate_fetches_ to active_fetches_. If there's no
// new fetches to initiate, check whether the webserver thread is trying to
// shut down the worker thread, and return true to indicate "done". Doesn't
// do any work if initiate_fetches_ is empty, but in that case if
// block_on_empty is true it will perform a bounded wait for
// initiate_fetches_nonempty_. Called by worker thread and during thread
// cleanup.
bool TransferFetchesAndCheckDone(bool block_on_empty) {
// Use a temp to minimize the amount of time we hold the
// initiate_mutex_ lock, so that the parent thread doesn't get
// blocked trying to initiate fetches.
scoped_ptr<SerfFetchPool> xfer_fetches;
{
ScopedMutex lock(initiate_mutex_.get());
// We must do this checking under the initiate_mutex_ lock.
if (initiate_fetches_->empty()) {
// No new work to do now.
if (!block_on_empty || thread_finish_) {
return thread_finish_;
} else {
// Wait until some work shows up. Note that after the wait we still
// must actually check that there's some work to be done.
initiate_fetches_nonempty_->TimedWait(Timer::kSecondMs);
if (initiate_fetches_->empty()) {
// On timeout / false wakeup, return control to caller; we might be
// finished or have other things to attend to.
return thread_finish_;
}
}
}
xfer_fetches.reset(new SerfFetchPool());
// Take mutex_ before relinquishing initiate_mutex_. This guarantees that
// AnyPendingFetches cannot see us in the time between emptying
// initiate_fetches_ and inserting into active_fetches_. At that time, it
// can look as though no fetch work is occurring. Note that we obtain
// mutex_ before performing the swap (but after creating the new pool)
// because additional fetches might arrive in the mean time. This was
// causing problems with timeout in TestThreeThreaded under valgrind,
// because we'd block waiting for mutex_ after a single fetch had been
// initiated, but not obtain mutex_ until after several more fetches
// arrived (at which point we'd go into the poll loop without initiating
// all available fetches).
mutex_->Lock();
xfer_fetches.swap(initiate_fetches_);
}
// Now that we've unblocked the parent thread, we can leisurely
// queue up the fetches, employing the proper lock for the active_fetches_
// set. Actually we expect we wll never have contention on this mutex
// from the thread.
while (!xfer_fetches->empty()) {
SerfFetch* fetch = xfer_fetches->RemoveOldest();
if (StartFetch(fetch)) {
SERF_DEBUG(LOG(INFO) << "Adding threaded fetch to url "
<< fetch->DebugInfo()
<< " (" << active_fetches_.size() << ")");
}
}
mutex_->Unlock();
return false;
}
void SerfThread() {
// Make sure we don't get yet-another copy of signals used by the webserver
// to shutdown here, to avoid double-free.
// TODO(morlovich): Port this to use ThreadSystem stuff, and have
// SystemThreadSystem take care of this automatically.
apr_setup_signal_thread();
// Initially there's no active fetch work to be done.
int num_active_fetches = 0;
while (!TransferFetchesAndCheckDone(num_active_fetches == 0)) {
// If initiate_fetches is empty, and there's no current active fetch
// work to do, we'll block in the above call. Otherwise the call will
// start initiated fetches (if any) without blocking.
// We set the poll interval to try to start new fetches promptly from the
// observer's perspective (ie .1s is perceptible, so we try to make sure
// new fetches are started after at most half that time). The downside is
// that we don't hand off control to serf / the OS for long periods when
// fetches are active but no data is arriving. We trust that doesn't
// happen often.
// TODO(jmaessen): Break out of Poll before timeout if work becomes
// available, so that we initiate new fetches as promptly as possible
// while continuing to serve the old ones. This would let us dial the
// poll interval up high (to multiple seconds). The classic trick here is
// to set up a pipe/FIFO/socket and add it to the set of things being
// read, then use a write to force wakeup. But will serf support this
// kind of thing?
const int64 kPollIntervalMs = Timer::kSecondMs / 20;
// If active_fetches_ is empty, we will not do any work and won't block
// here. num_active_fetches will be 0, and we'll block in the next
// call to TransferFetches above.
num_active_fetches = Poll(kPollIntervalMs);
SERF_DEBUG(LOG(INFO) << "Finished polling from serf thread ("
<< this << ")");
}
}
apr_thread_t* thread_id_;
// protects initiate_fetches_, initiate_fetches_nonempty_, thread_finish_
// and thread_started_.
scoped_ptr<ThreadSystem::CondvarCapableMutex> initiate_mutex_;
// pushed in the main thread; popped by TransferFetches().
scoped_ptr<SerfFetchPool> initiate_fetches_;
// condvar that indicates that initiate_fetches_ has become nonempty. During
// normal operation, only the serf worker thread consumes initiated fetches
// (this can change during thread shutdown), but the usual condition variable
// caveats apply: Just because the condition variable indicates
// initiate_fetches_nonempty_ doesn't mean it's true, and a waiting thread
// must check initiate_fetches_ explicitly while holding initiate_mutex_.
scoped_ptr<ThreadSystem::Condvar> initiate_fetches_nonempty_;
// Flag to signal worker to finish working and terminate.
bool thread_finish_;
// True if we actually started the worker thread. Protected by initiate_mutex_
bool thread_started_;
DISALLOW_COPY_AND_ASSIGN(SerfThreadedFetcher);
};
bool SerfFetch::Start(SerfUrlAsyncFetcher* fetcher) {
// Note: this is called in the thread's context, so this is when we do
// the pool ops.
fetcher_ = fetcher;
apr_pool_create(&pool_, fetcher_->pool());
bucket_alloc_ = serf_bucket_allocator_create(pool_, NULL, NULL);
fetch_start_ms_ = timer_->NowMs();
// Parse and validate the URL.
if (!ParseUrl()) {
return false;
}
using_https_ = StringCaseEqual("https", url_.scheme);
DCHECK(fetcher->allow_https() || !using_https_);
apr_status_t status = serf_connection_create2(&connection_,
fetcher_->serf_context(),
url_,
ConnectionSetup, this,
ClosedConnection, this,
pool_);
if (status != APR_SUCCESS) {
message_handler_->Error(DebugInfo().c_str(), 0,
"Error status=%d (%s) serf_connection_create2",
status, GetAprErrorString(status).c_str());
return false;
}
serf_connection_request_create(connection_, SetupRequest, this);
// Start the fetch. It will connect to the remote host, send the request,
// and accept the response, without blocking.
status = serf_context_run(
fetcher_->serf_context(), SERF_DURATION_NOBLOCK, fetcher_->pool());
if (status == APR_SUCCESS || APR_STATUS_IS_TIMEUP(status)) {
return true;
} else {
message_handler_->Error(DebugInfo().c_str(), 0,
"serf_context_run error status=%d (%s)",
status, GetAprErrorString(status).c_str());
return false;
}
}
void SerfFetch::ParseUrlForTesting(bool* status,
apr_uri_t** url,
const char** host_header,
const char** sni_host) {
*status = ParseUrl();
*url = &url_;
*host_header = host_header_;
*sni_host = sni_host_;
}
void SerfFetch::SetFetcherForTesting(SerfUrlAsyncFetcher* fetcher) {
fetcher_ = fetcher;
apr_pool_create(&pool_, fetcher_->pool());
}
// Set up the proxy for all the connections in the context. The proxy is in the
// format of hostname:port.
bool SerfUrlAsyncFetcher::SetupProxy(const char* proxy) {
apr_status_t status = 0;
if (proxy == NULL || *proxy == '\0') {
return true; // No proxy to be set.
}
apr_sockaddr_t* proxy_address = NULL;
apr_port_t proxy_port;
char* proxy_host;
char* proxy_scope;
status = apr_parse_addr_port(&proxy_host, &proxy_scope, &proxy_port, proxy,
pool_);
if (status != APR_SUCCESS || proxy_host == NULL || proxy_port == 0 ||
(status = apr_sockaddr_info_get(&proxy_address, proxy_host, APR_UNSPEC,
proxy_port, 0, pool_)) != APR_SUCCESS) {
return false;
}
serf_config_proxy(serf_context_, proxy_address);
return true;
}
SerfUrlAsyncFetcher::SerfUrlAsyncFetcher(const char* proxy, apr_pool_t* pool,
ThreadSystem* thread_system,
Statistics* statistics, Timer* timer,
int64 timeout_ms,
MessageHandler* message_handler)
: pool_(NULL),
thread_system_(thread_system),
timer_(timer),
mutex_(NULL),
serf_context_(NULL),
threaded_fetcher_(NULL),
active_count_(NULL),
request_count_(NULL),
byte_count_(NULL),
time_duration_ms_(NULL),
cancel_count_(NULL),
timeout_count_(NULL),
failure_count_(NULL),
cert_errors_(NULL),
timeout_ms_(timeout_ms),
shutdown_(false),
list_outstanding_urls_on_error_(false),
track_original_content_length_(false),
https_options_(0),
message_handler_(message_handler) {
CHECK(statistics != NULL);
request_count_ =
statistics->GetVariable(SerfStats::kSerfFetchRequestCount);
byte_count_ = statistics->GetVariable(SerfStats::kSerfFetchByteCount);
time_duration_ms_ =
statistics->GetVariable(SerfStats::kSerfFetchTimeDurationMs);
cancel_count_ = statistics->GetVariable(SerfStats::kSerfFetchCancelCount);
active_count_ = statistics->GetUpDownCounter(
SerfStats::kSerfFetchActiveCount);
timeout_count_ = statistics->GetVariable(SerfStats::kSerfFetchTimeoutCount);
failure_count_ = statistics->GetVariable(SerfStats::kSerfFetchFailureCount);
cert_errors_ = statistics->GetVariable(SerfStats::kSerfFetchCertErrors);
Init(pool, proxy);
threaded_fetcher_ = new SerfThreadedFetcher(this, proxy);
}
SerfUrlAsyncFetcher::SerfUrlAsyncFetcher(SerfUrlAsyncFetcher* parent,
const char* proxy)
: pool_(NULL),
thread_system_(parent->thread_system_),
timer_(parent->timer_),
mutex_(NULL),
serf_context_(NULL),
threaded_fetcher_(NULL),
active_count_(parent->active_count_),
request_count_(parent->request_count_),
byte_count_(parent->byte_count_),
time_duration_ms_(parent->time_duration_ms_),
cancel_count_(parent->cancel_count_),
timeout_count_(parent->timeout_count_),
failure_count_(parent->failure_count_),
cert_errors_(parent->cert_errors_),
timeout_ms_(parent->timeout_ms()),
shutdown_(false),
list_outstanding_urls_on_error_(parent->list_outstanding_urls_on_error_),
track_original_content_length_(parent->track_original_content_length_),
https_options_(parent->https_options_),
message_handler_(parent->message_handler_) {
Init(parent->pool(), proxy);
}
SerfUrlAsyncFetcher::~SerfUrlAsyncFetcher() {
CancelActiveFetches();
completed_fetches_.DeleteAll();
int orphaned_fetches = active_fetches_.size();
if (orphaned_fetches != 0) {
message_handler_->Message(
kError, "SerfFetcher destructed with %d orphaned fetches.",
orphaned_fetches);
if (active_count_ != NULL) {
active_count_->Add(-orphaned_fetches);
}
if (cancel_count_ != NULL) {
cancel_count_->Add(orphaned_fetches);
}
}
active_fetches_.DeleteAll();
if (threaded_fetcher_ != NULL) {
delete threaded_fetcher_;
}
delete mutex_;
apr_pool_destroy(pool_); // also calls apr_allocator_destroy on the allocator
}
void SerfUrlAsyncFetcher::ShutDown() {
// Note that we choose not to delete the threaded_fetcher_ to avoid worrying
// about races on its deletion.
if (threaded_fetcher_ != NULL) {
threaded_fetcher_->ShutDown();
}
ScopedMutex lock(mutex_);
shutdown_ = true;
CancelActiveFetchesMutexHeld();
}
void SerfUrlAsyncFetcher::Init(apr_pool_t* parent_pool, const char* proxy) {
// Here, we give each our Serf threads' (main and work) separate pools
// with separate threadsafe allocators.
pool_ = AprCreateThreadCompatiblePool(parent_pool);
mutex_ = thread_system_->NewMutex();
serf_context_ = serf_context_create(pool_);
if (!SetupProxy(proxy)) {
message_handler_->Message(kError, "Proxy failed: %s", proxy);
}
}
void SerfUrlAsyncFetcher::CancelActiveFetches() {
ScopedMutex lock(mutex_);
CancelActiveFetchesMutexHeld();
}
void SerfUrlAsyncFetcher::CancelActiveFetchesMutexHeld() {
// If there are still active requests, cancel them.
int num_canceled = 0;
while (!active_fetches_.empty()) {
// Canceling a fetch requires that the fetch reside in active_fetches_,
// but can invalidate iterators pointing to the affected fetch. To avoid
// trouble, we simply ask for the oldest element, knowing it will go away.
SerfFetch* fetch = active_fetches_.oldest();
LOG(WARNING) << "Aborting fetch of " << fetch->DebugInfo();
fetch->Cancel();
++num_canceled;
}
if (num_canceled != 0) {
if (cancel_count_ != NULL) {
cancel_count_->Add(num_canceled);
}
}
}
bool SerfUrlAsyncFetcher::StartFetch(SerfFetch* fetch) {
active_fetches_.Add(fetch);
active_count_->Add(1);
bool started = !shutdown_ && fetch->Start(this);
if (!started) {
fetch->message_handler()->Message(kWarning, "Fetch failed to start: %s",
fetch->DebugInfo().c_str());
active_fetches_.Remove(fetch);
active_count_->Add(-1);
fetch->CallbackDone(false);
delete fetch;
}
return started;
}
void SerfUrlAsyncFetcher::Fetch(const GoogleString& url,
MessageHandler* message_handler,
AsyncFetch* async_fetch) {
async_fetch = EnableInflation(async_fetch);
SerfFetch* fetch = new SerfFetch(url, async_fetch, message_handler, timer_);
request_count_->Add(1);
threaded_fetcher_->InitiateFetch(fetch);
// TODO(morlovich): There is quite a bit of code related to doing work
// both on 'this' and threaded_fetcher_ that could use cleaning up.
}
void SerfUrlAsyncFetcher::PrintActiveFetches(
MessageHandler* handler) const {
ScopedMutex mutex(mutex_);
for (SerfFetchPool::const_iterator p = active_fetches_.begin(),
e = active_fetches_.end(); p != e; ++p) {
SerfFetch* fetch = *p;
handler->Message(kInfo, "Active fetch: %s",
fetch->DebugInfo().c_str());
}
}
// If active_fetches_ is empty, this does no work and returns 0.
int SerfUrlAsyncFetcher::Poll(int64 max_wait_ms) {
// Run serf polling up to microseconds.
ScopedMutex mutex(mutex_);
if (!active_fetches_.empty()) {
apr_status_t status =
serf_context_run(serf_context_, 1000*max_wait_ms, pool_);
completed_fetches_.DeleteAll();
if (APR_STATUS_IS_TIMEUP(status)) {
// Remove expired fetches from the front of the queue.
// This relies on the insertion-ordering guarantee
// provided by the Pool iterator.
int64 stale_cutoff = timer_->NowMs() - timeout_ms_;
// This loop calls Cancel, which deletes a fetch and thus invalidates
// iterators; we thus rely on retrieving oldest().
while (!active_fetches_.empty()) {
SerfFetch* fetch = active_fetches_.oldest();
if (fetch->fetch_start_ms() >= stale_cutoff) {
// This and subsequent fetches are still active, so we're done.
break;
}
message_handler_->Message(
kWarning, "Fetch timed out: %s (%ld) waiting for %ld ms",
fetch->DebugInfo().c_str(),
static_cast<long>(active_fetches_.size()), // NOLINT
static_cast<long>(max_wait_ms)); // NOLINT
// Note that canceling the fetch will ultimately call FetchComplete and
// delete it from the pool.
if (timeout_count_ != NULL) {
timeout_count_->Add(1);
}
fetch->Cancel();
}
}
bool success = ((status == APR_SUCCESS) || APR_STATUS_IS_TIMEUP(status));
// TODO(jmarantz): provide the success status to the caller if there is a
// need.
if (!success && !active_fetches_.empty()) {
// TODO(jmarantz): I have a new theory that we are getting
// behind when our self-directed URL fetches queue up multiple
// requests for the same URL, which might be sending the Serf
// library into an n^2 situation with its polling, even though
// we are using an rb_tree to hold the active fetches. We
// should fix this by keeping a map from url->SerfFetch, where
// we'd have to store lists of Callback*, ResponseHeader*, Writer* so
// all interested parties were updated if and when the fetch finally
// completed.
// NOTE(jmaessen): this is actually hard because all the above data is
// process-local, and the multiple requests are likely cross-process.
//
// In the meantime by putting more detail into the log here, we'll
// know whether we are accumulating active fetches to make the
// server fall over.
message_handler_->Message(
kError,
"Serf status %d(%s) polling for %ld %s fetches for %g seconds",
status, GetAprErrorString(status).c_str(),
static_cast<long>(active_fetches_.size()), // NOLINT
(threaded_fetcher_ == NULL) ? "threaded" : "non-blocking",
max_wait_ms/1.0e3);
if (list_outstanding_urls_on_error_) {
int64 now_ms = timer_->NowMs();
for (Pool<SerfFetch>::iterator p = active_fetches_.begin(),
e = active_fetches_.end(); p != e; ++p) {
SerfFetch* fetch = *p;
int64 age_ms = now_ms - fetch->fetch_start_ms();
message_handler_->Message(kError, "URL %s active for %ld ms",
fetch->DebugInfo().c_str(),
static_cast<long>(age_ms)); // NOLINT
}
}
CleanupFetchesWithErrors();
}
}
return active_fetches_.size();
}
void SerfUrlAsyncFetcher::FetchComplete(SerfFetch* fetch) {
// We do not have a ScopedMutex in FetchComplete, because it is only
// called from Poll and CancelActiveFetches, which have ScopedMutexes.
// Note that SerfFetch::Cancel is currently not exposed from outside this
// class.
active_fetches_.Remove(fetch);
completed_fetches_.Add(fetch);
}
void SerfUrlAsyncFetcher::ReportCompletedFetchStats(SerfFetch* fetch) {
if (time_duration_ms_) {
time_duration_ms_->Add(fetch->TimeDuration());
}
if (byte_count_) {
byte_count_->Add(fetch->bytes_received());
}
if (active_count_) {
active_count_->Add(-1);
}
}
bool SerfUrlAsyncFetcher::AnyPendingFetches() {
ScopedMutex lock(mutex_);
return !active_fetches_.empty();
}
int SerfUrlAsyncFetcher:: ApproximateNumActiveFetches() {
ScopedMutex lock(mutex_);
return active_fetches_.size();
}
bool SerfUrlAsyncFetcher::WaitForActiveFetches(
int64 max_ms, MessageHandler* message_handler, WaitChoice wait_choice) {
bool ret = true;
if ((threaded_fetcher_ != NULL) && (wait_choice != kMainlineOnly)) {
ret &= threaded_fetcher_->WaitForActiveFetchesHelper(
max_ms, message_handler);
}
if (wait_choice != kThreadedOnly) {
ret &= WaitForActiveFetchesHelper(max_ms, message_handler);
}
return ret;
}
bool SerfUrlAsyncFetcher::WaitForActiveFetchesHelper(
int64 max_ms, MessageHandler* message_handler) {
bool any_pending_fetches = AnyPendingFetches();
if (any_pending_fetches) {
int64 now_ms = timer_->NowMs();
int64 end_ms = now_ms + max_ms;
while ((now_ms < end_ms) && any_pending_fetches) {
int64 remaining_ms = end_ms - now_ms;
SERF_DEBUG(LOG(INFO) << "Blocking process waiting " << remaining_ms
<< "ms for " << ApproximateNumActiveFetches()
<< " fetches to complete");
SERF_DEBUG(PrintActiveFetches(message_handler));
Poll(remaining_ms);
now_ms = timer_->NowMs();
any_pending_fetches = AnyPendingFetches();
}
if (any_pending_fetches) {
message_handler->Message(
kError, "Serf timeout waiting for fetches to complete:");
PrintActiveFetches(message_handler);
return false;
}
SERF_DEBUG(LOG(INFO) << "Serf successfully completed "
<< ApproximateNumActiveFetches() << " active fetches");
}
return true;
}
void SerfUrlAsyncFetcher::CleanupFetchesWithErrors() {
// Create a copy of list of active fetches, as we may have to cancel
// some failed ones, modifying the list.
std::vector<SerfFetch*> fetches;
for (SerfFetchPool::iterator i = active_fetches_.begin();
i != active_fetches_.end(); ++i) {
fetches.push_back(*i);
}
// Check each fetch to see if it needs cleanup because its serf connection
// got into an error state.
for (int i = 0, size = fetches.size(); i < size; ++i) {
fetches[i]->CleanupIfError();
}
}
void SerfUrlAsyncFetcher::InitStats(Statistics* statistics) {
statistics->AddVariable(SerfStats::kSerfFetchRequestCount);
statistics->AddVariable(SerfStats::kSerfFetchByteCount);
statistics->AddVariable(SerfStats::kSerfFetchTimeDurationMs);
statistics->AddVariable(SerfStats::kSerfFetchCancelCount);
statistics->AddUpDownCounter(SerfStats::kSerfFetchActiveCount);
statistics->AddVariable(SerfStats::kSerfFetchTimeoutCount);
statistics->AddVariable(SerfStats::kSerfFetchFailureCount);
statistics->AddVariable(SerfStats::kSerfFetchCertErrors);
}
void SerfUrlAsyncFetcher::set_list_outstanding_urls_on_error(bool x) {
list_outstanding_urls_on_error_ = x;
if (threaded_fetcher_ != NULL) {
threaded_fetcher_->set_list_outstanding_urls_on_error(x);
}
}
void SerfUrlAsyncFetcher::set_track_original_content_length(bool x) {
track_original_content_length_ = x;
if (threaded_fetcher_ != NULL) {
threaded_fetcher_->set_track_original_content_length(x);
}
}
bool SerfUrlAsyncFetcher::ParseHttpsOptions(StringPiece directive,
uint32* options,
GoogleString* error_message) {
StringPieceVector keywords;
SplitStringPieceToVector(directive, ",", &keywords, true);
uint32 https_options = 0;
for (int i = 0, n = keywords.size(); i < n; ++i) {
StringPiece keyword = keywords[i];
if (keyword == "enable") {
https_options |= kEnableHttps;
} else if (keyword == "disable") {
https_options &= ~static_cast<uint32>(kEnableHttps);
} else if (keyword == "allow_self_signed") {
https_options |= kAllowSelfSigned;
} else if (keyword == "allow_unknown_certificate_authority") {
https_options |= kAllowUnknownCertificateAuthority;
} else if (keyword == "allow_certificate_not_yet_valid") {
https_options |= kAllowCertificateNotYetValid;
} else {
StrAppend(error_message,
"Invalid HTTPS keyword: ", keyword, ", legal options are: "
SERF_HTTPS_KEYWORDS);
return false;
}
}
*options = https_options;
return true;
}
const char* SerfUrlAsyncFetcher::ExtractHostHeader(
const apr_uri_t& uri, apr_pool_t* pool) {
// Construct it ourselves from URL. Note that we shouldn't include the
// user info here, just host and any explicit port. The reason this is done
// with APR functions and not GoogleUrl is that APR URLs are what we have,
// as that's what Serf takes.
const char* host = apr_uri_unparse(pool, &uri,
APR_URI_UNP_OMITPATHINFO |
APR_URI_UNP_OMITUSERINFO);
// This still normally has the scheme, which we should drop.
stringpiece_ssize_type slash_pos = StringPiece(host).find_last_of('/');
if (slash_pos != StringPiece::npos) {
host += (slash_pos + 1);
}
return host;
}
GoogleString SerfUrlAsyncFetcher::RemovePortFromHostHeader(
const GoogleString& host) {
// SNI hosts, unlike Host: do not have a port number, so remove it.
// Note that the input isn't a URL, so using GoogleUrl would be awkward and
// a bit of an overkill. We need to be a bit careful, however, since IPv6
// Also uses :, but inside [].
size_t colon_pos = StringPiece(host).find_last_of(':');
size_t bracket_pos = StringPiece(host).find_last_of(']');
if (colon_pos == std::string::npos ||
(bracket_pos != std::string::npos && colon_pos < bracket_pos)) {
return host;
} else {
return host.substr(0, colon_pos);
}
}
bool SerfUrlAsyncFetcher::SetHttpsOptions(StringPiece directive) {
GoogleString error_message;
if (!ParseHttpsOptions(directive, &https_options_, &error_message)) {
message_handler_->MessageS(kError, error_message);
return false;
}
#if !SERF_HTTPS_FETCHING
if (allow_https()) {
message_handler_->MessageS(kError, "HTTPS fetching has not been compiled "
"into the binary, so it has not been enabled.");
https_options_ = 0;
}
#endif
if (threaded_fetcher_ != NULL) {
threaded_fetcher_->set_https_options(https_options_);
}
return true;
}
void SerfUrlAsyncFetcher::SetSslCertificatesDir(StringPiece dir) {
dir.CopyToString(&ssl_certificates_dir_);
if (threaded_fetcher_ != NULL) {
threaded_fetcher_->SetSslCertificatesDir(dir);
}
}
void SerfUrlAsyncFetcher::SetSslCertificatesFile(StringPiece file) {
file.CopyToString(&ssl_certificates_file_);
if (threaded_fetcher_ != NULL) {
threaded_fetcher_->SetSslCertificatesFile(file);
}
}
bool SerfUrlAsyncFetcher::allow_https() const {
return ((https_options_ & kEnableHttps) != 0);
}
bool SerfUrlAsyncFetcher::allow_self_signed() const {
return ((https_options_ & kAllowSelfSigned) != 0);
}
bool SerfUrlAsyncFetcher::allow_unknown_certificate_authority() const {
return ((https_options_ & kAllowUnknownCertificateAuthority) != 0);
}
bool SerfUrlAsyncFetcher::allow_certificate_not_yet_valid() const {
return ((https_options_ & kAllowCertificateNotYetValid) != 0);
}
bool SerfUrlAsyncFetcher::SupportsHttps() const {
return allow_https();
}
} // namespace net_instaweb