| /* |
| * Copyright 2012 Google Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| // Author: morlovich@google.com (Maksim Orlovich) |
| // |
| // A fetcher that talks to mod_spdy for requests matching a certain |
| // domain (and passes the rest to fallthrough fetcher). |
| // |
| // Based in large part on mod_spdy's http_to_spdy_filter.cc and |
| // spdy_to_http_filter.cc |
| |
| #include "pagespeed/apache/mod_spdy_fetcher.h" |
| |
| #include "util_filter.h" |
| |
| #include <algorithm> |
| #include <cstddef> |
| |
| #include "base/logging.h" |
| #include "pagespeed/apache/interface_mod_spdy.h" |
| #include "pagespeed/apache/mod_spdy_fetch_controller.h" |
| #include "net/instaweb/http/public/async_fetch.h" |
| #include "net/instaweb/http/public/http_response_parser.h" |
| #include "net/instaweb/rewriter/public/rewrite_driver.h" |
| #include "pagespeed/kernel/base/statistics.h" |
| #include "pagespeed/kernel/base/string.h" |
| #include "pagespeed/kernel/base/string_util.h" |
| #include "pagespeed/kernel/base/string_writer.h" |
| #include "pagespeed/kernel/base/timer.h" |
| #include "pagespeed/kernel/http/google_url.h" |
| #include "pagespeed/kernel/http/request_headers.h" |
| |
| struct spdy_slave_connection; |
| |
| namespace net_instaweb { |
| |
| namespace { |
| |
| const char kFetchExecutionLatencyUsHistogram[] = |
| "fetch_from_mod_spdy_execution_latency_us"; |
| const int kFetchExecutionLatencyUsHistogramMaxValue = 500 * 1000; |
| |
| ap_filter_rec_t* apache_to_mps_filter_handle = NULL; |
| ap_filter_rec_t* mps_to_apache_filter_handle = NULL; |
| |
| struct MpsToApacheFilterContext { |
| MpsToApacheFilterContext(const GoogleString url, |
| AsyncFetch* in_fetch, |
| MessageHandler* in_handler) |
| : handler(in_handler), |
| request_headers(in_fetch->request_headers()), |
| message_body(request_headers->message_body()), |
| pos(0), |
| in_body(false) { |
| StringWriter writer(&request_str); |
| request_headers->WriteAsHttp(url, &writer, handler); |
| } |
| |
| MessageHandler* handler; |
| const RequestHeaders* request_headers; |
| GoogleString request_str; // request line + headers |
| |
| // We have to copy the message body since the AsyncFetch might actually |
| // be deleted by ::Done while our filter is still active. This is not worth |
| // optimizing since we do not actually use POST with our fetcher under |
| // normal conditions with MPS. |
| GoogleString message_body; |
| size_t pos; |
| bool in_body; |
| }; |
| |
| // This takes an HTTP request information (from an MpsToApacheFilterContext) |
| // and generates appropriate bucket brigade for it. |
| apr_status_t MpsToApacheFilter(ap_filter_t* filter, |
| apr_bucket_brigade* brigade, |
| ap_input_mode_t mode, |
| apr_read_type_e block, |
| apr_off_t readbytes) { |
| const size_t max_bytes = std::max(static_cast<apr_off_t>(1), readbytes); |
| |
| if (filter->next != NULL) { |
| LOG(WARNING) << "MpsToApacheFilter is not the last filter in the chain: " |
| << filter->next->frec->name; |
| } |
| |
| MpsToApacheFilterContext* context = |
| static_cast<MpsToApacheFilterContext*>(filter->ctx); |
| |
| if (!context) { |
| return APR_EOF; |
| } |
| |
| if (mode == AP_MODE_INIT) { |
| return APR_SUCCESS; |
| } |
| |
| // Switch over to body if we're done with headers. |
| if (!context->in_body && |
| context->pos >= context->request_str.size()) { |
| context->in_body = true; |
| context->pos = 0; |
| } |
| |
| // Which string are we going to be reading from? (We don't actually |
| // attempt to read from more than one). |
| const GoogleString& in = context->in_body ? |
| context->message_body : |
| context->request_str; |
| |
| |
| // Detect EOF. This is enough due to the "switch to body" bit above. |
| if (context->pos >= in.size()) { |
| return APR_EOF; |
| } |
| |
| size_t bytes_read = 0; |
| size_t bytes_available = in.size() - context->pos; |
| |
| // Synthesize an EOS bucket if needed. |
| if ((bytes_read == bytes_available) && |
| (context->in_body || context->request_headers->message_body().empty())) { |
| APR_BRIGADE_INSERT_TAIL(brigade, apr_bucket_eos_create( |
| brigade->bucket_alloc)); |
| filter->ctx = NULL; |
| delete context; |
| return APR_SUCCESS; |
| } |
| |
| const char* bytes = in.data() + context->pos; |
| // Byte read ops. |
| if (mode == AP_MODE_READBYTES || mode == AP_MODE_SPECULATIVE || |
| mode == AP_MODE_EXHAUSTIVE) { |
| bytes_read = (mode == AP_MODE_EXHAUSTIVE) ? |
| bytes_available : |
| std::min(bytes_available, max_bytes); |
| } else if (mode == AP_MODE_GETLINE) { |
| StringPiece remaining(bytes, bytes_available); |
| size_t eol_pos = remaining.find('\n'); |
| if (eol_pos != StringPiece::npos) { |
| bytes_read = eol_pos + 1; |
| } else { |
| bytes_read = bytes_available; |
| } |
| } else { |
| // Not doing AP_MODE_EATCRLF. See mod_spdy http_to_spdy_filter.cc |
| // for why. |
| DCHECK(mode == AP_MODE_EATCRLF); |
| LOG(WARNING) << "Unsupported read mode" << mode; |
| return APR_ENOTIMPL; |
| } |
| |
| // If we managed to read any data, put it into the brigade. We use a |
| // transient bucket (as opposed to a heap bucket) to avoid an extra string |
| // copy. |
| if (bytes_read > 0) { |
| APR_BRIGADE_INSERT_TAIL(brigade, apr_bucket_transient_create( |
| bytes, bytes_read, brigade->bucket_alloc)); |
| } |
| |
| // Advance position on non-speculative reads. |
| if (mode != AP_MODE_SPECULATIVE) { |
| context->pos += bytes_read; |
| } |
| return APR_SUCCESS; |
| } |
| |
| struct ApacheToMpsFilterContext { |
| ApacheToMpsFilterContext(AsyncFetch* in_fetch, MessageHandler* in_handler) |
| : target_fetch(in_fetch), handler(in_handler), |
| response_parser(in_fetch->response_headers(), in_fetch, handler), |
| ok(true) {} |
| |
| AsyncFetch* target_fetch; |
| MessageHandler* handler; |
| HttpResponseParser response_parser; |
| bool ok; |
| }; |
| |
| // TODO(sligocki): Perhaps we can merge this with instaweb_in_place_filter(). |
| apr_status_t ApacheToMpsFilter(ap_filter_t* filter, |
| apr_bucket_brigade* input_brigade) { |
| // mod_spdy fed us some bits through Apache --- direct them to our client. |
| |
| // This is a NETWORK-level filter, so there shouldn't be any filter after us. |
| if (filter->next != NULL) { |
| LOG(WARNING) << "ApacheToMpsFilter is not the last filter in the chain " |
| << "(it is followed by " << filter->next->frec->name << ")"; |
| } |
| |
| // According to the page at |
| // http://httpd.apache.org/docs/2.3/developer/output-filters.html |
| // we should never pass an empty brigade down the chain, but to be safe, we |
| // should be prepared to accept one and do nothing. |
| if (APR_BRIGADE_EMPTY(input_brigade)) { |
| LOG(INFO) << "ApacheToMpsFilter received an empty brigade."; |
| return APR_SUCCESS; |
| } |
| |
| ApacheToMpsFilterContext* context = |
| static_cast<ApacheToMpsFilterContext*>(filter->ctx); |
| AsyncFetch* target = (context != NULL) ? context->target_fetch : NULL; |
| |
| while (!APR_BRIGADE_EMPTY(input_brigade)) { |
| apr_bucket* bucket = APR_BRIGADE_FIRST(input_brigade); |
| |
| if (APR_BUCKET_IS_METADATA(bucket)) { |
| if (context == NULL) { |
| // Ignore metadata buckets after EOS. |
| } else if (APR_BUCKET_IS_EOS(bucket)) { |
| bool ok = context->ok; |
| |
| // EOS bucket -- there should be no more data buckets in this stream. |
| // We denote this by dropping the context and zeroing out the pointer |
| // to it in the filter. |
| filter->ctx = NULL; |
| delete context; |
| |
| target->Done(ok); |
| } else if (APR_BUCKET_IS_FLUSH(bucket)) { |
| target->Flush(context->handler); |
| } else { |
| // Unknown metadata bucket. This bucket has no meaning to us, and |
| // there's no further filter to pass it to, so we just ignore it. |
| } |
| } else if (context == NULL) { |
| // We shouldn't be getting any data buckets after an EOS (since this is a |
| // connection-level filter, we do sometimes see other metadata buckets |
| // after the EOS). If we do get them, ignore them. |
| LOG(INFO) << "ApacheToMpsFilter received " << bucket->type->name |
| << " bucket after an EOS (and ignored it)."; |
| } else { |
| // Data bucket -- get ready to read. |
| const char* data = NULL; |
| apr_size_t data_length = 0; |
| |
| // First, try a non-blocking read. |
| apr_status_t status = apr_bucket_read(bucket, &data, &data_length, |
| APR_NONBLOCK_READ); |
| |
| if (status == APR_SUCCESS) { |
| // All OK! (will write below) |
| } else if (APR_STATUS_IS_EAGAIN(status)) { |
| // Non-blocking read failed with EAGAIN, so try again with a blocking |
| // read. |
| status = apr_bucket_read(bucket, &data, &data_length, APR_BLOCK_READ); |
| if (status != APR_SUCCESS) { |
| LOG(ERROR) << "Blocking read failed with status " << status; |
| } |
| } |
| |
| // Send bytes to our client, if we're successful. |
| if (status == APR_SUCCESS) { |
| StringPiece piece(data, data_length); |
| HttpResponseParser& response_parser = context->response_parser; |
| |
| bool had_headers = response_parser.headers_complete(); |
| if (response_parser.ParseChunk(piece)) { |
| if (!had_headers && response_parser.headers_complete()) { |
| target->HeadersComplete(); |
| } |
| } else { |
| context->ok = false; |
| } |
| } else { |
| context->ok = false; |
| // Since we didn't successfully consume this bucket, don't delete it; |
| // rather, leave it (and any remaining buckets) in the brigade. |
| return status; // failure |
| } |
| } |
| |
| // We consumed this bucket successfully, so delete it and move on to the |
| // next. |
| apr_bucket_delete(bucket); |
| } |
| return APR_SUCCESS; |
| } |
| |
| } // namespace |
| |
| void ModSpdyFetcher::Initialize() { |
| mps_to_apache_filter_handle = ap_register_input_filter( |
| "MOD_PAGESPEED_TO_MOD_SPDY", // name |
| MpsToApacheFilter, // filter function |
| NULL, // init function (n/a in our case) |
| AP_FTYPE_NETWORK); // filter type |
| |
| apache_to_mps_filter_handle = ap_register_output_filter( |
| "MOD_SPDY_TO_MOD_PAGESPEED", // name |
| ApacheToMpsFilter, // filter function |
| NULL, // init function (n/a in our case) |
| AP_FTYPE_NETWORK); // filter type |
| } |
| |
| ModSpdyFetcher::ModSpdyFetcher(ModSpdyFetchController* controller, |
| StringPiece url, |
| RewriteDriver* driver, |
| spdy_slave_connection_factory* factory) |
| : controller_(controller), |
| fallback_fetcher_(driver->async_fetcher()), |
| stats_(driver->statistics()), |
| connection_factory_(factory) { |
| GoogleUrl gurl(url); |
| if (gurl.IsWebValid()) { |
| gurl.Origin().CopyToString(&own_origin_); |
| } |
| } |
| |
| ModSpdyFetcher::~ModSpdyFetcher() { |
| } |
| |
| void ModSpdyFetcher::InitStats(Statistics* statistics) { |
| Histogram* histo = |
| statistics->AddHistogram(kFetchExecutionLatencyUsHistogram); |
| histo->SetMaxValue(kFetchExecutionLatencyUsHistogramMaxValue); |
| ModSpdyFetchController::InitStats(statistics); |
| } |
| |
| bool ModSpdyFetcher::ShouldUseOn(request_rec* req) { |
| // We want to get involved for all HTTPS resources, so that any resources |
| // we generate for SPDY clients can also be served to HTTPS clients safely. |
| // |
| // It's not sufficient to just check is_https, however, since in case the |
| // top-level connection is SPDY, this will be a slave connection that will |
| // have mod_ssl off. |
| return mod_ssl_is_https(req->connection) || |
| (mod_spdy_get_spdy_version(req->connection) != 0); |
| } |
| |
| void ModSpdyFetcher::Fetch(const GoogleString& url, |
| MessageHandler* message_handler, |
| AsyncFetch* fetch) { |
| // Only fetch from mod_spdy if the hostname matches that of outgoing |
| // connection, and if we have access to appropriate mod_spdy exports. |
| GoogleUrl parsed_url(url); |
| if (connection_factory_ != NULL && |
| parsed_url.IsWebValid() && !own_origin_.empty() && |
| parsed_url.Origin() == own_origin_) { |
| controller_->ScheduleBlockingFetch(this, url, stats_, |
| message_handler, fetch); |
| } else { |
| fallback_fetcher_->Fetch(url, message_handler, fetch); |
| } |
| } |
| |
| void ModSpdyFetcher::BlockingFetch( |
| const GoogleString& url, ModSpdyFetchController* controller, |
| Statistics* stats, MessageHandler* message_handler, AsyncFetch* fetch) { |
| int64 start_time_us = controller->timer()->NowUs(); |
| |
| // These will normally be deleted by their filter functions |
| // (but we do cleanup if something went wrong) |
| MpsToApacheFilterContext* in_context = |
| new MpsToApacheFilterContext( |
| url, fetch, message_handler); |
| ApacheToMpsFilterContext* out_context = |
| new ApacheToMpsFilterContext(fetch, message_handler); |
| spdy_slave_connection* slave_connection = |
| mod_spdy_create_slave_connection( |
| connection_factory_, |
| mps_to_apache_filter_handle, in_context, |
| apache_to_mps_filter_handle, out_context); |
| if (slave_connection != NULL) { |
| mod_spdy_run_slave_connection(slave_connection); |
| mod_spdy_destroy_slave_connection(slave_connection); |
| |
| // Important: at this point 'this' may already be deleted, since any |
| // user callbacks may have run (potentially releasing the owning |
| // RewriteDriver), so we should not be accessing any member |
| // variables. |
| if (!controller->is_shut_down()) { |
| // Don't report stats once the controller is shut down, since |
| // things may be getting torn down right about now. |
| int64 end_time_us = controller->timer()->NowUs(); |
| stats->GetHistogram(kFetchExecutionLatencyUsHistogram)->Add( |
| end_time_us - start_time_us); |
| } |
| return; |
| } else { |
| delete in_context; |
| delete out_context; |
| fallback_fetcher_->Fetch(url, message_handler, fetch); |
| } |
| } |
| |
| } // namespace net_instaweb |