blob: 13358ba737a70f16bc14631f456678915a3190cc [file] [log] [blame]
/*
* Copyright 2012 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Author: jefftk@google.com (Jeff Kaufman)
#include "ngx_base_fetch.h"
#include "ngx_list_iterator.h"
#include "ngx_pagespeed.h"
#include "net/instaweb/http/public/response_headers.h"
#include "net/instaweb/rewriter/public/rewrite_stats.h"
#include "net/instaweb/util/public/google_message_handler.h"
#include "net/instaweb/util/public/message_handler.h"
namespace net_instaweb {
NgxBaseFetch::NgxBaseFetch(ngx_http_request_t* r, int pipe_fd,
NgxServerContext* server_context,
const RequestContextPtr& request_ctx,
PreserveCachingHeaders preserve_caching_headers)
: AsyncFetch(request_ctx),
request_(r),
server_context_(server_context),
done_called_(false),
last_buf_sent_(false),
pipe_fd_(pipe_fd),
references_(2),
ipro_lookup_(false),
preserve_caching_headers_(preserve_caching_headers) {
if (pthread_mutex_init(&mutex_, NULL)) CHECK(0);
}
NgxBaseFetch::~NgxBaseFetch() {
pthread_mutex_destroy(&mutex_);
}
void NgxBaseFetch::Lock() {
pthread_mutex_lock(&mutex_);
}
void NgxBaseFetch::Unlock() {
pthread_mutex_unlock(&mutex_);
}
bool NgxBaseFetch::HandleWrite(const StringPiece& sp,
MessageHandler* handler) {
Lock();
buffer_.append(sp.data(), sp.size());
Unlock();
return true;
}
// should only be called in nginx thread
ngx_int_t NgxBaseFetch::CopyBufferToNginx(ngx_chain_t** link_ptr) {
CHECK(!(done_called_ && last_buf_sent_))
<< "CopyBufferToNginx() was called after the last buffer was sent";
// there is no buffer to send
if (!done_called_ && buffer_.empty()) {
*link_ptr = NULL;
return NGX_AGAIN;
}
int rc = string_piece_to_buffer_chain(
request_->pool, buffer_, link_ptr, done_called_ /* send_last_buf */);
if (rc != NGX_OK) {
return rc;
}
// Done with buffer contents now.
buffer_.clear();
if (done_called_) {
last_buf_sent_ = true;
return NGX_OK;
}
return NGX_AGAIN;
}
// There may also be a race condition if this is called between the last Write()
// and Done() such that we're sending an empty buffer with last_buf set, which I
// think nginx will reject.
ngx_int_t NgxBaseFetch::CollectAccumulatedWrites(ngx_chain_t** link_ptr) {
ngx_int_t rc;
Lock();
rc = CopyBufferToNginx(link_ptr);
Unlock();
return rc;
}
ngx_int_t NgxBaseFetch::CollectHeaders(ngx_http_headers_out_t* headers_out) {
const ResponseHeaders* pagespeed_headers = response_headers();
if (content_length_known()) {
headers_out->content_length = NULL;
headers_out->content_length_n = content_length();
}
return copy_response_headers_to_ngx(request_, *pagespeed_headers,
preserve_caching_headers_);
}
void NgxBaseFetch::RequestCollection() {
int rc;
char c = 'A'; // What byte we write is arbitrary.
while (true) {
rc = write(pipe_fd_, &c, 1);
if (rc == 1) {
break;
} else if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
// TODO(jefftk): is this rare enough that spinning isn't a problem? Could
// we get into a case where the pipe fills up and we spin forever?
} else {
perror("NgxBaseFetch::RequestCollection");
break;
}
}
}
void NgxBaseFetch::HandleHeadersComplete() {
int status_code = response_headers()->status_code();
bool status_ok = (status_code != 0) && (status_code < 400);
if (!ipro_lookup_ || status_ok) {
// If this is a 404 response we need to count it in the stats.
if (response_headers()->status_code() == HttpStatus::kNotFound) {
server_context_->rewrite_stats()->resource_404_count()->Add(1);
}
}
// For the IPRO lookup, suppress notification of the nginx side here.
// If we send both this event and the one from done, nasty stuff will happen
// if we loose the race with with the nginx side destructing this base fetch
// instance (and thereby clearing the byte and its pending extraneous event.
if (!ipro_lookup_) {
RequestCollection(); // Headers available.
}
}
bool NgxBaseFetch::HandleFlush(MessageHandler* handler) {
RequestCollection(); // A new part of the response body is available.
return true;
}
void NgxBaseFetch::Release() {
DecrefAndDeleteIfUnreferenced();
}
void NgxBaseFetch::DecrefAndDeleteIfUnreferenced() {
// Creates a full memory barrier.
if (__sync_add_and_fetch(&references_, -1) == 0) {
delete this;
}
}
void NgxBaseFetch::HandleDone(bool success) {
// TODO(jefftk): it's possible that instead of locking here we can just modify
// CopyBufferToNginx to only read done_called_ once.
Lock();
done_called_ = true;
Unlock();
close(pipe_fd_); // Indicates to nginx that we're done with the rewrite.
pipe_fd_ = -1;
DecrefAndDeleteIfUnreferenced();
}
} // namespace net_instaweb