| // Copyright 2010 Google Inc. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| // TODO(jmarantz): Avoid initiating fetches for resources already in flight. |
| // The challenge is that we would want to call all the callbacks that indicated |
| // interest in a particular URL once the callback completed. Alternatively, |
| // this could be done in a level above the URL fetcher. |
| |
| #include "net/instaweb/apache/serf_url_async_fetcher.h" |
| |
| #include <algorithm> |
| #include <string> |
| #include <vector> |
| |
| #include "apr_atomic.h" |
| #include "apr_strings.h" |
| #include "apr_thread_proc.h" |
| #include "apr_version.h" |
| #include "base/basictypes.h" |
| #include "base/stl_util-inl.h" |
| #include "net/instaweb/apache/apr_mutex.h" |
| #include "net/instaweb/public/version.h" |
| #include "net/instaweb/util/public/message_handler.h" |
| #include "net/instaweb/util/public/meta_data.h" |
| #include "net/instaweb/util/public/simple_meta_data.h" |
| #include "net/instaweb/util/public/statistics.h" |
| #include "net/instaweb/util/public/string_util.h" |
| #include "net/instaweb/util/public/timer.h" |
| #include "net/instaweb/util/public/writer.h" |
| #include "third_party/serf/src/serf.h" |
| #include "third_party/serf/src/serf_bucket_util.h" |
| |
| // Until this fetcher has some mileage on it, it is useful to keep around |
| // an easy way to turn on lots of debug messages. But they do get a bit chatty |
| // when things are working well. |
| #define SERF_DEBUG(x) |
| |
| namespace { |
| const int kBufferSize = 2048; |
| const char kFetchMethod[] = "GET"; |
| } // namespace |
| |
| extern "C" { |
| // Declares a new function added to |
| // src/third_party/serf/instaweb_context.c |
| serf_bucket_t* serf_request_bucket_request_create_for_host( |
| serf_request_t *request, |
| const char *method, |
| const char *uri, |
| serf_bucket_t *body, |
| serf_bucket_alloc_t *allocator, const char* host); |
| } |
| |
| namespace net_instaweb { |
| |
| const char SerfStats::kSerfFetchRequestCount[] = "serf_fetch_request_count"; |
| const char SerfStats::kSerfFetchByteCount[] = "serf_fetch_bytes_count"; |
| const char SerfStats::kSerfFetchTimeDurationMs[] = |
| "serf_fetch_time_duration_ms"; |
| const char SerfStats::kSerfFetchCancelCount[] = "serf_fetch_cancel_count"; |
| const char SerfStats::kSerfFetchOutstandingCount[] = |
| "serf_fetch_outstanding_count"; |
| const char SerfStats::kSerfFetchTimeoutCount[] = "serf_fetch_timeout_count"; |
| |
| std::string GetAprErrorString(apr_status_t status) { |
| char error_str[1024]; |
| apr_strerror(status, error_str, sizeof(error_str)); |
| return error_str; |
| } |
| |
| // TODO(lsong): Move this to a separate file. Necessary? |
| class SerfFetch { |
| public: |
| // TODO(lsong): make use of request_headers. |
| SerfFetch(apr_pool_t* pool, |
| const std::string& url, |
| const MetaData& request_headers, |
| MetaData* response_headers, |
| Writer* fetched_content_writer, |
| MessageHandler* message_handler, |
| UrlAsyncFetcher::Callback* callback, |
| Timer* timer) |
| : fetcher_(NULL), |
| timer_(timer), |
| str_url_(url), |
| response_headers_(response_headers), |
| fetched_content_writer_(fetched_content_writer), |
| message_handler_(message_handler), |
| callback_(callback), |
| connection_(NULL), |
| byte_received_(0), |
| fetch_start_ms_(0), |
| fetch_end_ms_(0) { |
| request_headers_.CopyFrom(request_headers); |
| apr_pool_create(&pool_, pool); |
| bucket_alloc_ = serf_bucket_allocator_create(pool_, NULL, NULL); |
| } |
| |
| ~SerfFetch() { |
| if (connection_ != NULL) { |
| serf_connection_close(connection_); |
| } |
| apr_pool_destroy(pool_); |
| } |
| |
| // Start the fetch. It returns immediately. This can only be run when |
| // locked with fetcher->mutex_. |
| bool Start(SerfUrlAsyncFetcher* fetcher); |
| |
| const char* str_url() { return str_url_.c_str(); } |
| |
| // This must be called while holding SerfUrlAsyncFetcher's mutex_. |
| void Cancel() { |
| CallCallback(false); |
| } |
| |
| // Calls the callback supplied by the user. This needs to happen |
| // exactly once. In some error cases it appears that Serf calls |
| // HandleResponse multiple times on the same object. |
| // |
| // This must be called while holding SerfUrlAsyncFetcher's mutex_. |
| void CallCallback(bool success) { |
| if (callback_ == NULL) { |
| LOG(INFO) << "Serf callback more than once on same fetch " << str_url() |
| << " (" << this << ")"; |
| } else { |
| UrlAsyncFetcher::Callback* callback = callback_; |
| callback_ = NULL; |
| response_headers_ = NULL; |
| callback->Done(success); |
| fetch_end_ms_ = timer_->NowMs(); |
| fetcher_->FetchComplete(this); |
| } |
| } |
| |
| int64 TimeDuration() const { |
| if ((fetch_start_ms_ != 0) && (fetch_end_ms_ != 0)) { |
| return fetch_end_ms_ - fetch_start_ms_; |
| } else { |
| return 0; |
| } |
| } |
| int64 fetch_start_ms() const { return fetch_start_ms_; } |
| |
| size_t byte_received() const { return byte_received_; } |
| MessageHandler* message_handler() { return message_handler_; } |
| |
| private: |
| |
| // Static functions used in callbacks. |
| static serf_bucket_t* ConnectionSetup( |
| apr_socket_t* socket, void* setup_baton, apr_pool_t* pool) { |
| SerfFetch* fetch = static_cast<SerfFetch*>(setup_baton); |
| return serf_bucket_socket_create(socket, fetch->bucket_alloc_); |
| } |
| |
| static void ClosedConnection(serf_connection_t* conn, |
| void* closed_baton, |
| apr_status_t why, |
| apr_pool_t* pool) { |
| SerfFetch* fetch = static_cast<SerfFetch*>(closed_baton); |
| if (why != APR_SUCCESS) { |
| fetch->message_handler_->Warning( |
| fetch->str_url_.c_str(), 0, "Connection close (code=%d %s).", |
| why, GetAprErrorString(why).c_str()); |
| } |
| // Connection is closed. |
| fetch->connection_ = NULL; |
| } |
| |
| static serf_bucket_t* AcceptResponse(serf_request_t* request, |
| serf_bucket_t* stream, |
| void* acceptor_baton, |
| apr_pool_t* pool) { |
| // Get the per-request bucket allocator. |
| serf_bucket_alloc_t* bucket_alloc = serf_request_get_alloc(request); |
| // Create a barrier so the response doesn't eat us! |
| // From the comment in Serf: |
| // ### the stream does not have a barrier, this callback should generally |
| // ### add a barrier around the stream before incorporating it into a |
| // ### response bucket stack. |
| // ... i.e. the passed bucket becomes owned rather than |
| // ### borrowed. |
| serf_bucket_t* bucket = serf_bucket_barrier_create(stream, bucket_alloc); |
| return serf_bucket_response_create(bucket, bucket_alloc); |
| } |
| |
| static apr_status_t HandleResponse(serf_request_t* request, |
| serf_bucket_t* response, |
| void* handler_baton, |
| apr_pool_t* pool) { |
| SerfFetch* fetch = static_cast<SerfFetch*>(handler_baton); |
| return fetch->HandleResponse(request, response); |
| } |
| |
| // The handler MUST process data from the response bucket until the |
| // bucket's read function states it would block (APR_STATUS_IS_EAGAIN). |
| // The handler is invoked only when new data arrives. If no further data |
| // arrives, and the handler does not process all available data, then the |
| // system can result in a deadlock around the unprocessed, but read, data. |
| apr_status_t HandleResponse(serf_request_t* request, |
| serf_bucket_t* response) { |
| apr_status_t status = APR_EGENERAL; |
| if (response_headers_ == NULL) { |
| LOG(INFO) << "HandleResponse called on URL " << str_url() |
| << "(" << this << "), which is already erased"; |
| return status; |
| } |
| |
| serf_status_line status_line; |
| if ((response != NULL) && |
| ((status = serf_bucket_response_status(response, &status_line)) |
| == APR_SUCCESS)) { |
| response_headers_->SetStatusAndReason( |
| static_cast<HttpStatus::Code>(status_line.code)); |
| response_headers_->set_major_version(status_line.version / 1000); |
| response_headers_->set_minor_version(status_line.version % 1000); |
| const char* data = NULL; |
| apr_size_t len = 0; |
| while ((status = serf_bucket_read(response, kBufferSize, &data, &len)) |
| == APR_SUCCESS || APR_STATUS_IS_EOF(status) || |
| APR_STATUS_IS_EAGAIN(status)) { |
| byte_received_ += len; |
| if (len > 0 && |
| !fetched_content_writer_->Write( |
| StringPiece(data, len), message_handler_)) { |
| status = APR_EGENERAL; |
| break; |
| } |
| if (status != APR_SUCCESS) { |
| break; |
| } |
| } |
| // We could read the headers earlier, but then we have to check if we |
| // have received the headers. At EOF of response, we have the headers |
| // already. Read them. |
| if (APR_STATUS_IS_EOF(status)) { |
| status = ReadHeaders(response); |
| } |
| } |
| if (!APR_STATUS_IS_EAGAIN(status)) { |
| bool success = APR_STATUS_IS_EOF(status); |
| CallCallback(success); |
| } |
| return status; |
| } |
| |
| apr_status_t ReadHeaders(serf_bucket_t* response) { |
| apr_status_t status = APR_SUCCESS; |
| serf_bucket_t* headers = serf_bucket_response_get_headers(response); |
| const char* data = NULL; |
| apr_size_t num_bytes = 0; |
| while ((status = serf_bucket_read(headers, kBufferSize, &data, &num_bytes)) |
| == APR_SUCCESS || APR_STATUS_IS_EOF(status) || |
| APR_STATUS_IS_EAGAIN(status)) { |
| if (response_headers_->headers_complete()) { |
| status = APR_EGENERAL; |
| message_handler_->Info(str_url_.c_str(), 0, |
| "headers complete but more data coming"); |
| } else { |
| StringPiece str_piece(data, num_bytes); |
| apr_size_t parsed_len = |
| response_headers_->ParseChunk(str_piece, message_handler_); |
| if (parsed_len != num_bytes) { |
| status = APR_EGENERAL; |
| message_handler_->Error(str_url_.c_str(), 0, |
| "unexpected bytes at end of header"); |
| } |
| } |
| if (status != APR_SUCCESS) { |
| break; |
| } |
| } |
| if (APR_STATUS_IS_EOF(status) |
| && !response_headers_->headers_complete()) { |
| message_handler_->Error(str_url_.c_str(), 0, |
| "eof on incomplete headers code=%d %s", |
| status, GetAprErrorString(status).c_str()); |
| status = APR_EGENERAL; |
| } |
| return status; |
| } |
| |
| // Ensures that a user-agent string is included, and that the mod_pagespeed |
| // version is appended. |
| void FixUserAgent() { |
| // Supply a default user-agent if none is present, and in any case |
| // append on a 'serf' suffix. |
| std::string user_agent; |
| CharStarVector v; |
| if (request_headers_.Lookup(HttpAttributes::kUserAgent, &v)) { |
| for (int i = 0, n = v.size(); i < n; ++i) { |
| if (i != 0) { |
| user_agent += " "; |
| } |
| user_agent += v[i]; |
| } |
| request_headers_.RemoveAll(HttpAttributes::kUserAgent); |
| } |
| if (user_agent.empty()) { |
| user_agent += "Serf/" SERF_VERSION_STRING; |
| } |
| StringPiece version(" mod_pagespeed/" MOD_PAGESPEED_VERSION_STRING "-" |
| LASTCHANGE_STRING); |
| if (!StringPiece(user_agent).ends_with(version)) { |
| user_agent.append(version.data(), version.size()); |
| } |
| request_headers_.Add(HttpAttributes::kUserAgent, user_agent); |
| } |
| |
| static apr_status_t SetupRequest(serf_request_t* request, |
| void* setup_baton, |
| serf_bucket_t** req_bkt, |
| serf_response_acceptor_t* acceptor, |
| void** acceptor_baton, |
| serf_response_handler_t* handler, |
| void** handler_baton, |
| apr_pool_t* pool) { |
| SerfFetch* fetch = static_cast<SerfFetch*>(setup_baton); |
| const char* url_path = apr_uri_unparse(pool, &fetch->url_, |
| APR_URI_UNP_OMITSITEPART); |
| |
| // If there is an explicit Host header, then override the |
| // host field in the Serf structure, as we will not be able |
| // to override it after it is created; only append to it. |
| // |
| // Serf automatically populates the Host field based on the |
| // URL, and provides no mechanism to override it, except |
| // by hacking source. We hacked source. |
| // |
| // See src/third_party/serf/src/instaweb_context.c |
| CharStarVector v; |
| const char* host = NULL; |
| if (fetch->request_headers_.Lookup(HttpAttributes::kHost, &v) && |
| (v.size() == 1)) { |
| host = v[0]; |
| } |
| |
| fetch->FixUserAgent(); |
| |
| *req_bkt = serf_request_bucket_request_create_for_host( |
| request, kFetchMethod, |
| url_path, NULL, |
| serf_request_get_alloc(request), host); |
| serf_bucket_t* hdrs_bkt = serf_bucket_request_get_headers(*req_bkt); |
| |
| for (int i = 0; i < fetch->request_headers_.NumAttributes(); ++i) { |
| const char* name = fetch->request_headers_.Name(i); |
| const char* value = fetch->request_headers_.Value(i); |
| if ((strcasecmp(name, HttpAttributes::kUserAgent) == 0) || |
| (strcasecmp(name, HttpAttributes::kAcceptEncoding) == 0) || |
| (strcasecmp(name, HttpAttributes::kReferer) == 0)) { |
| serf_bucket_headers_setn(hdrs_bkt, name, value); |
| } |
| } |
| |
| // TODO(jmarantz): add accept-encoding:gzip even if not requested by |
| // the caller, but then decompress in the output handler. |
| |
| *acceptor = SerfFetch::AcceptResponse; |
| *acceptor_baton = fetch; |
| *handler = SerfFetch::HandleResponse; |
| *handler_baton = fetch; |
| return APR_SUCCESS; |
| } |
| |
| bool ParseUrl() { |
| apr_status_t status = 0; |
| status = apr_uri_parse(pool_, str_url_.c_str(), &url_); |
| if (status != APR_SUCCESS) { |
| return false; // Failed to parse URL. |
| } |
| |
| // TODO(lsong): We do not handle HTTPS for now. HTTPS needs authentication |
| // verifying certificates, etc. |
| if (strcasecmp(url_.scheme, "https") == 0) { |
| return false; |
| } |
| if (!url_.port) { |
| url_.port = apr_uri_port_of_scheme(url_.scheme); |
| } |
| if (!url_.path) { |
| url_.path = apr_pstrdup(pool_, "/"); |
| } |
| return true; |
| } |
| |
| SerfUrlAsyncFetcher* fetcher_; |
| Timer* timer_; |
| const std::string str_url_; |
| SimpleMetaData request_headers_; |
| MetaData* response_headers_; |
| Writer* fetched_content_writer_; |
| MessageHandler* message_handler_; |
| UrlAsyncFetcher::Callback* callback_; |
| |
| apr_pool_t* pool_; |
| serf_bucket_alloc_t* bucket_alloc_; |
| apr_uri_t url_; |
| serf_connection_t* connection_; |
| size_t byte_received_; |
| int64 fetch_start_ms_; |
| int64 fetch_end_ms_; |
| |
| |
| DISALLOW_COPY_AND_ASSIGN(SerfFetch); |
| }; |
| |
| class SerfThreadedFetcher : public SerfUrlAsyncFetcher { |
| public: |
| SerfThreadedFetcher(SerfUrlAsyncFetcher* parent, const char* proxy) : |
| SerfUrlAsyncFetcher(parent, proxy), |
| initiate_mutex_(pool_), |
| terminate_mutex_(pool_), |
| thread_done_(false) { |
| terminate_mutex_.Lock(); |
| CHECK_EQ(APR_SUCCESS, |
| apr_thread_create(&thread_id_, NULL, SerfThreadFn, this, pool_)); |
| } |
| |
| ~SerfThreadedFetcher() { |
| // Although Cancel will be called in the base class destructor, we |
| // want to call it here as well, as it will make it easier for the |
| // thread to terminate. |
| CancelOutstandingFetches(); |
| STLDeleteElements(&completed_fetches_); |
| STLDeleteElements(&initiate_fetches_); |
| |
| // Let the thread terminate naturally by unlocking its mutexes. |
| thread_done_ = true; |
| mutex_->Unlock(); |
| LOG(INFO) << "Waiting for threaded serf fetcher to terminate"; |
| terminate_mutex_.Lock(); |
| terminate_mutex_.Unlock(); |
| } |
| |
| // Called from mainline to queue up a fetch for the thread. If the |
| // thread is idle then we can unlock it. |
| void InitiateFetch(SerfFetch* fetch) { |
| ScopedMutex lock(&initiate_mutex_); |
| initiate_fetches_.push_back(fetch); |
| } |
| |
| private: |
| static void* SerfThreadFn(apr_thread_t* thread_id, void* context) { |
| SerfThreadedFetcher* stc = static_cast<SerfThreadedFetcher*>(context); |
| CHECK_EQ(thread_id, stc->thread_id_); |
| stc->SerfThread(); |
| return NULL; |
| } |
| |
| // Thread-called function to transfer fetches from initiate_fetches_ vector to |
| // the active_fetches_ queue. Doesn't do any work if initiate_fetches_ is |
| // empty. |
| void TransferFetches() { |
| // Use a temp that to minimize the amount of time we hold the |
| // initiate_mutex_ lock, so that the parent thread doesn't get |
| // blocked trying to initiate fetches. |
| FetchVector xfer_fetches; |
| { |
| ScopedMutex lock(&initiate_mutex_); |
| xfer_fetches.swap(initiate_fetches_); |
| } |
| |
| // Now that we've unblocked the parent thread, we can leisurely |
| // queue up the fetches, employing the proper lock for the active_fetches_ |
| // set. Actually we expect we wll never have contention on this mutex |
| // from the thread. |
| if (!xfer_fetches.empty()) { |
| int num_started = 0; |
| ScopedMutex lock(mutex_); |
| for (int i = 0, n = xfer_fetches.size(); i < n; ++i) { |
| SerfFetch* fetch = xfer_fetches[i]; |
| if (fetch->Start(this)) { |
| SERF_DEBUG(LOG(INFO) << "Adding threaded fetch to url " |
| << fetch->str_url() |
| << " (" << active_fetches_.size() << ")"); |
| active_fetches_.push_back(fetch); |
| active_fetch_map_[fetch] = --active_fetches_.end(); |
| ++num_started; |
| } else { |
| delete fetch; |
| } |
| } |
| if ((num_started != 0) && (outstanding_count_ != NULL)) { |
| outstanding_count_->Add(num_started); |
| } |
| } |
| } |
| |
| void SerfThread() { |
| while (!thread_done_) { |
| // If initiate_fetches is empty, we will not do any work. |
| TransferFetches(); |
| |
| const int64 kPollIntervalUs = 500000; |
| SERF_DEBUG(LOG(INFO) << "Polling from serf thread (" << this << ")"); |
| // If active_fetches_ is empty, we will not do any work. |
| int num_outstanding_fetches = Poll(kPollIntervalUs); |
| SERF_DEBUG(LOG(INFO) << "Finished polling from serf thread (" |
| << this << ")"); |
| // We don't want to spin busily waiting for new fetches. We could use a |
| // semaphore, but we're not really concerned with latency here, so we can |
| // just check every once in a while. |
| if (num_outstanding_fetches == 0) { |
| sleep(1); |
| } |
| } |
| terminate_mutex_.Unlock(); |
| } |
| |
| apr_thread_t* thread_id_; |
| |
| // protects initiate_fetches_ |
| AprMutex initiate_mutex_; |
| // pushed in the main thread; popped in the serf thread. |
| std::vector<SerfFetch*> initiate_fetches_; |
| |
| // Allows parent to block till thread exits |
| AprMutex terminate_mutex_; |
| bool thread_done_; |
| |
| DISALLOW_COPY_AND_ASSIGN(SerfThreadedFetcher); |
| }; |
| |
| bool SerfFetch::Start(SerfUrlAsyncFetcher* fetcher) { |
| fetch_start_ms_ = timer_->NowMs(); |
| fetcher_ = fetcher; |
| // Parse and validate the URL. |
| if (!ParseUrl()) { |
| return false; |
| } |
| |
| apr_status_t status = serf_connection_create2(&connection_, |
| fetcher_->serf_context(), |
| url_, |
| ConnectionSetup, this, |
| ClosedConnection, this, |
| pool_); |
| if (status != APR_SUCCESS) { |
| message_handler_->Error(str_url_.c_str(), 0, |
| "Error status=%d (%s) serf_connection_create2", |
| status, GetAprErrorString(status).c_str()); |
| return false; |
| } |
| serf_connection_request_create(connection_, SetupRequest, this); |
| |
| // Start the fetch. It will connect to the remote host, send the request, |
| // and accept the response, without blocking. |
| status = serf_context_run(fetcher_->serf_context(), 0, fetcher_->pool()); |
| |
| if (status == APR_SUCCESS || APR_STATUS_IS_TIMEUP(status)) { |
| return true; |
| } else { |
| message_handler_->Error(str_url_.c_str(), 0, |
| "serf_context_run error status=%d (%s)", |
| status, GetAprErrorString(status).c_str()); |
| return false; |
| } |
| } |
| |
| |
| // Set up the proxy for all the connections in the context. The proxy is in the |
| // format of hostname:port. |
| bool SerfUrlAsyncFetcher::SetupProxy(const char* proxy) { |
| apr_status_t status = 0; |
| if (proxy == NULL || *proxy == '\0') { |
| return true; // No proxy to be set. |
| } |
| |
| apr_sockaddr_t* proxy_address = NULL; |
| apr_port_t proxy_port; |
| char* proxy_host; |
| char* proxy_scope; |
| status = apr_parse_addr_port(&proxy_host, &proxy_scope, &proxy_port, proxy, |
| pool_); |
| if (status != APR_SUCCESS || proxy_host == NULL || proxy_port == 0 || |
| (status = apr_sockaddr_info_get(&proxy_address, proxy_host, APR_UNSPEC, |
| proxy_port, 0, pool_)) != APR_SUCCESS) { |
| return false; |
| } |
| serf_config_proxy(serf_context_, proxy_address); |
| return true; |
| } |
| |
| SerfUrlAsyncFetcher::SerfUrlAsyncFetcher(const char* proxy, apr_pool_t* pool, |
| Statistics* statistics, Timer* timer, |
| int64 timeout_ms) |
| : pool_(pool), |
| timer_(timer), |
| mutex_(NULL), |
| serf_context_(NULL), |
| threaded_fetcher_(NULL), |
| outstanding_count_(NULL), |
| request_count_(NULL), |
| byte_count_(NULL), |
| time_duration_ms_(NULL), |
| cancel_count_(NULL), |
| timeout_count_(NULL), |
| timeout_ms_(timeout_ms) { |
| if (statistics != NULL) { |
| request_count_ = |
| statistics->GetVariable(SerfStats::kSerfFetchRequestCount); |
| byte_count_ = statistics->GetVariable(SerfStats::kSerfFetchByteCount); |
| time_duration_ms_ = |
| statistics->GetVariable(SerfStats::kSerfFetchTimeDurationMs); |
| cancel_count_ = statistics->GetVariable(SerfStats::kSerfFetchCancelCount); |
| outstanding_count_ = statistics->GetVariable( |
| SerfStats::kSerfFetchOutstandingCount); |
| timeout_count_ = statistics->GetVariable( |
| SerfStats::kSerfFetchTimeoutCount); |
| } |
| mutex_ = new AprMutex(pool_); |
| serf_context_ = serf_context_create(pool_); |
| threaded_fetcher_ = new SerfThreadedFetcher(this, proxy); |
| if (!SetupProxy(proxy)) { |
| LOG(WARNING) << "Proxy failed: " << proxy; |
| } |
| } |
| |
| SerfUrlAsyncFetcher::SerfUrlAsyncFetcher(SerfUrlAsyncFetcher* parent, |
| const char* proxy) |
| : pool_(parent->pool_), |
| timer_(parent->timer_), |
| mutex_(NULL), |
| serf_context_(NULL), |
| threaded_fetcher_(NULL), |
| outstanding_count_(parent->outstanding_count_), |
| request_count_(parent->request_count_), |
| byte_count_(parent->byte_count_), |
| time_duration_ms_(parent->time_duration_ms_), |
| cancel_count_(parent->cancel_count_), |
| timeout_count_(parent->timeout_count_), |
| timeout_ms_(parent->timeout_ms()) { |
| mutex_ = new AprMutex(pool_); |
| serf_context_ = serf_context_create(pool_); |
| threaded_fetcher_ = NULL; |
| if (!SetupProxy(proxy)) { |
| LOG(WARNING) << "Proxy failed: " << proxy; |
| } |
| } |
| |
| SerfUrlAsyncFetcher::~SerfUrlAsyncFetcher() { |
| CancelOutstandingFetches(); |
| STLDeleteElements(&completed_fetches_); |
| int orphaned_fetches = active_fetches_.size(); |
| if (orphaned_fetches != 0) { |
| LOG(ERROR) << "SerfFecher destructed with " << orphaned_fetches |
| << " orphaned fetches."; |
| if (outstanding_count_ != NULL) { |
| outstanding_count_->Add(-orphaned_fetches); |
| } |
| if (cancel_count_ != NULL) { |
| cancel_count_->Add(orphaned_fetches); |
| } |
| } |
| |
| STLDeleteElements(&active_fetches_); |
| active_fetch_map_.clear(); |
| if (threaded_fetcher_ != NULL) { |
| delete threaded_fetcher_; |
| } |
| delete mutex_; |
| } |
| |
| void SerfUrlAsyncFetcher::CancelOutstandingFetches() { |
| // If there are still active requests, cancel them. |
| int num_canceled = 0; |
| { |
| ScopedMutex lock(mutex_); |
| while (!active_fetches_.empty()) { |
| FetchQueueEntry p = active_fetches_.begin(); |
| SerfFetch* fetch = *p; |
| LOG(WARNING) << "Aborting fetch of " << fetch->str_url(); |
| fetch->Cancel(); |
| ++num_canceled; |
| } |
| } |
| if (num_canceled != 0) { |
| if (cancel_count_ != NULL) { |
| cancel_count_->Add(num_canceled); |
| } |
| if (outstanding_count_ != NULL) { |
| outstanding_count_->Add(-num_canceled); |
| } |
| } |
| } |
| |
| bool SerfUrlAsyncFetcher::StreamingFetch(const std::string& url, |
| const MetaData& request_headers, |
| MetaData* response_headers, |
| Writer* fetched_content_writer, |
| MessageHandler* message_handler, |
| UrlAsyncFetcher::Callback* callback) { |
| SerfFetch* fetch = new SerfFetch( |
| pool_, url, request_headers, response_headers, fetched_content_writer, |
| message_handler, callback, timer_); |
| if (request_count_ != NULL) { |
| request_count_->Add(1); |
| } |
| if (callback->EnableThreaded()) { |
| message_handler->Message(kInfo, "Initiating async fetch for %s", |
| url.c_str()); |
| threaded_fetcher_->InitiateFetch(fetch); |
| } else { |
| message_handler->Message(kInfo, "Initiating blocking fetch for %s", |
| url.c_str()); |
| bool started = false; |
| { |
| ScopedMutex mutex(mutex_); |
| started = fetch->Start(this); |
| if (started) { |
| active_fetches_.push_back(fetch); |
| active_fetch_map_[fetch] = --active_fetches_.end(); |
| if (outstanding_count_ != NULL) { |
| outstanding_count_->Add(1); |
| } |
| } else { |
| delete fetch; |
| } |
| } |
| } |
| return false; |
| } |
| |
| void SerfUrlAsyncFetcher::PrintOutstandingFetches( |
| MessageHandler* handler) const { |
| ScopedMutex mutex(mutex_); |
| for (FetchQueue::const_iterator p = active_fetches_.begin(), |
| e = active_fetches_.end(); p != e; ++p) { |
| SerfFetch* fetch = *p; |
| handler->Message(kInfo, "Outstanding fetch: %s", |
| fetch->str_url()); |
| } |
| } |
| |
| // If active_fetches_ is empty, this does no work and returns 0. |
| int SerfUrlAsyncFetcher::Poll(int64 microseconds) { |
| // Run serf polling up to microseconds. |
| ScopedMutex mutex(mutex_); |
| if (!active_fetches_.empty()) { |
| apr_status_t status = serf_context_run(serf_context_, microseconds, pool_); |
| STLDeleteElements(&completed_fetches_); |
| if (APR_STATUS_IS_TIMEUP(status)) { |
| // Remove expired fetches from the front of the queue. |
| int64 stale_cutoff = timer_->NowMs() - timeout_ms_; |
| FetchQueueEntry p = active_fetches_.begin(), e = active_fetches_.end(); |
| int timeouts = 0; |
| while ((p != e) && ((*p)->fetch_start_ms() < stale_cutoff)) { |
| SerfFetch* fetch = *p; |
| ++p; |
| LOG(WARNING) << "Fetch timed out: " << fetch->str_url(); |
| timeouts++; |
| fetch->Cancel(); |
| } |
| if ((timeouts > 0) && (timeout_count_ != NULL)) { |
| timeout_count_->Add(timeouts); |
| } |
| } |
| bool success = ((status == APR_SUCCESS) || APR_STATUS_IS_TIMEUP(status)); |
| // TODO(jmarantz): provide the success status to the caller if there is a |
| // need. |
| if (!success && !active_fetches_.empty()) { |
| // TODO(jmarantz): I have a new theory that we are getting |
| // behind when our self-directed URL fetches queue up multiple |
| // requests for the same URL, which might be sending the Serf |
| // library into an n^2 situation with its polling, even though |
| // we are using an rb_tree to hold the outstanding fetches. We |
| // should fix this by keeping a map from url->SerfFetch, where |
| // we'd have to store lists of Callback*, ResponseHeader*, Writer* so |
| // all interested parties were updated if and when the fetch finally |
| // completed. |
| // |
| // In the meantime by putting more detail into the log here, we'll |
| // know whether we are accumulating outstanding fetches to make the |
| // server fall over. |
| LOG(ERROR) << "Serf status " << status << " (" |
| << GetAprErrorString(status) << " ) polling for " |
| << active_fetches_.size() |
| << ((threaded_fetcher_ == NULL) ? ": (threaded)" |
| : ": (non-blocking)") |
| << " (" << this << ") for " << microseconds/1.0e6 |
| << " seconds"; |
| } |
| } |
| return active_fetches_.size(); |
| } |
| |
| void SerfUrlAsyncFetcher::FetchComplete(SerfFetch* fetch) { |
| // We do not have a ScopedMutex in FetchComplete, because it is only |
| // called from Poll and CancelOutstandingFetches, which have ScopedMutexes. |
| // Note that SerfFetch::Cancel is currently not exposed from outside this |
| // class. |
| LOG(WARNING) << "FetchComplete(" << fetch->str_url() << ", " << fetch << ")"; |
| FetchMapEntry map_entry = active_fetch_map_.find(fetch); |
| CHECK(map_entry != active_fetch_map_.end()); |
| active_fetches_.erase(map_entry->second); |
| active_fetch_map_.erase(map_entry); |
| completed_fetches_.push_back(fetch); |
| fetch->message_handler()->Message(kInfo, "Fetch complete: %s", |
| fetch->str_url()); |
| if (time_duration_ms_) { |
| time_duration_ms_->Add(fetch->TimeDuration()); |
| } |
| if (byte_count_) { |
| byte_count_->Add(fetch->byte_received()); |
| } |
| if (outstanding_count_) { |
| outstanding_count_->Add(-1); |
| } |
| } |
| |
| size_t SerfUrlAsyncFetcher::NumActiveFetches() { |
| ScopedMutex lock(mutex_); |
| return active_fetches_.size(); |
| } |
| |
| bool SerfUrlAsyncFetcher::WaitForInProgressFetches( |
| int64 max_ms, MessageHandler* message_handler, WaitChoice wait_choice) { |
| bool ret = true; |
| if ((threaded_fetcher_ != NULL) && (wait_choice != kMainlineOnly)) { |
| ret &= threaded_fetcher_->WaitForInProgressFetchesHelper( |
| max_ms, message_handler); |
| } |
| if (wait_choice != kThreadedOnly) { |
| ret &= WaitForInProgressFetchesHelper(max_ms, message_handler); |
| } |
| return ret; |
| } |
| |
| bool SerfUrlAsyncFetcher::WaitForInProgressFetchesHelper( |
| int64 max_ms, MessageHandler* message_handler) { |
| int num_active_fetches = NumActiveFetches(); |
| if (num_active_fetches != 0) { |
| int64 now_ms = timer_->NowMs(); |
| int64 end_ms = now_ms + max_ms; |
| while ((now_ms < end_ms) && (num_active_fetches != 0)) { |
| int64 remaining_ms = end_ms - now_ms; |
| SERF_DEBUG(LOG(INFO) << "Blocking process waiting " << remaining_ms |
| << "ms for " << active_fetches_.size() << " to complete"); |
| SERF_DEBUG(PrintOutstandingFetches(message_handler)); |
| Poll(1000 * remaining_ms); |
| now_ms = timer_->NowMs(); |
| num_active_fetches = NumActiveFetches(); |
| } |
| if (!active_fetches_.empty()) { |
| message_handler->Message( |
| kError, "Serf timeout waiting for %d to complete", |
| num_active_fetches); |
| return false; |
| } |
| SERF_DEBUG(LOG(INFO) << "Serf successfully completed outstanding fetches"); |
| } |
| return true; |
| } |
| void SerfUrlAsyncFetcher::Initialize(Statistics* statistics) { |
| if (statistics != NULL) { |
| statistics->AddVariable(SerfStats::kSerfFetchRequestCount); |
| statistics->AddVariable(SerfStats::kSerfFetchByteCount); |
| statistics->AddVariable(SerfStats::kSerfFetchTimeDurationMs); |
| statistics->AddVariable(SerfStats::kSerfFetchCancelCount); |
| statistics->AddVariable(SerfStats::kSerfFetchOutstandingCount); |
| statistics->AddVariable(SerfStats::kSerfFetchTimeoutCount); |
| } |
| } |
| |
| } // namespace net_instaweb |