blob: b7a3a2810e4f232fd67357e3974052e8c81bd24c [file] [log] [blame]
/*
* Copyright 2012 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Author: jmarantz@google.com (Joshua Marantz)
#include "pagespeed/system/apr_mem_cache.h"
#include <memory>
#include "apr_pools.h" // NOLINT
#include "base/logging.h"
#include "pagespeed/system/apr_thread_compatible_pool.h"
#include "pagespeed/kernel/base/hasher.h"
#include "pagespeed/kernel/base/hostname_util.h"
#include "pagespeed/kernel/base/message_handler.h"
#include "pagespeed/kernel/base/shared_string.h"
#include "pagespeed/kernel/base/stack_buffer.h"
#include "pagespeed/kernel/base/statistics.h"
#include "pagespeed/kernel/base/string_util.h"
#include "pagespeed/kernel/base/timer.h"
#include "pagespeed/kernel/cache/cache_interface.h"
#include "pagespeed/kernel/cache/key_value_codec.h"
#include "third_party/aprutil/apr_memcache2.h"
namespace net_instaweb {
namespace {
// Defaults copied from Apache 2.4 src distribution:
// src/modules/cache/mod_socache_memcache.c
const int kDefaultMemcachedPort = 11211;
const int kDefaultServerMin = 0; // minimum # client sockets to open
const int kDefaultServerSmax = 1; // soft max # client connections to open
const char kMemCacheTimeouts[] = "memcache_timeouts";
const char kLastErrorCheckpointMs[] = "memcache_last_error_checkpoint_ms";
const char kErrorBurstSize[] = "memcache_error_burst_size";
// time-to-live of a client connection. There is a bug in the APR
// implementation, where the TTL argument to apr_memcache2_server_create was
// being interpreted in microseconds, rather than seconds.
//
// See: http://mail-archives.apache.org/mod_mbox/apr-dev/201209.mbox/browser
// and: http://svn.apache.org/viewvc?view=revision&revision=1390530
//
// TODO(jmarantz): figure out somehow if that fix is applied, and if so,
// do not multiply by 1M.
const int kDefaultServerTtlUs = 600*1000*1000;
const int kTimeoutUnset = -1;
} // namespace
AprMemCache::AprMemCache(const StringPiece& servers, int thread_limit,
Hasher* hasher, Statistics* statistics,
Timer* timer, MessageHandler* handler)
: valid_server_spec_(false),
thread_limit_(thread_limit),
timeout_us_(kTimeoutUnset),
pool_(NULL),
memcached_(NULL),
hasher_(hasher),
timer_(timer),
timeouts_(statistics->GetVariable(kMemCacheTimeouts)),
last_error_checkpoint_ms_(statistics->GetUpDownCounter(
kLastErrorCheckpointMs)),
error_burst_size_(statistics->GetUpDownCounter(kErrorBurstSize)),
is_machine_local_(true),
message_handler_(handler) {
servers.CopyToString(&server_spec_);
pool_ = AprCreateThreadCompatiblePool(NULL);
// Get our hostname for the is_machine_local_ analysis below.
GoogleString hostname(GetHostname());
// Don't try to connect on construction; we don't want to bother creating
// connections to the memcached servers in the root process. But do parse
// the server spec so we can determine its validity.
//
// TODO(jmarantz): consider doing an initial connect/disconnect during
// config parsing to get better error reporting on Apache startup.
StringPieceVector server_vector;
SplitStringPieceToVector(servers, ",", &server_vector, true);
bool success = true;
for (int i = 0, n = server_vector.size(); i < n; ++i) {
StringPieceVector host_port;
int port = kDefaultMemcachedPort;
SplitStringPieceToVector(server_vector[i], ":", &host_port, true);
bool ok = false;
if (host_port.size() == 1) {
ok = true;
} else if (host_port.size() == 2) {
ok = StringToInt(host_port[1], &port);
}
if (ok) {
// If any host isn't "localhost" then the machine isn't local.
is_machine_local_ &= IsLocalhost(host_port[0], hostname);
host_port[0].CopyToString(StringVectorAdd(&hosts_));
ports_.push_back(port);
} else {
message_handler_->Message(kError, "Invalid memcached sever: %s",
server_vector[i].as_string().c_str());
success = false;
}
}
valid_server_spec_ = success && !server_vector.empty();
}
AprMemCache::~AprMemCache() {
apr_pool_destroy(pool_);
}
void AprMemCache::InitStats(Statistics* statistics) {
statistics->AddVariable(kMemCacheTimeouts);
statistics->AddUpDownCounter(kLastErrorCheckpointMs);
statistics->AddUpDownCounter(kErrorBurstSize);
}
bool AprMemCache::Connect() {
apr_status_t status =
apr_memcache2_create(pool_, hosts_.size(), 0, &memcached_);
bool success = false;
if ((status == APR_SUCCESS) && !hosts_.empty()) {
success = true;
CHECK_EQ(hosts_.size(), ports_.size());
for (int i = 0, n = hosts_.size(); i < n; ++i) {
apr_memcache2_server_t* server = NULL;
status = apr_memcache2_server_create(
pool_, hosts_[i].c_str(), ports_[i],
kDefaultServerMin, kDefaultServerSmax,
thread_limit_, kDefaultServerTtlUs, &server);
if ((status != APR_SUCCESS) ||
((status = apr_memcache2_add_server(memcached_, server) !=
APR_SUCCESS))) {
char buf[kStackBufferSize];
apr_strerror(status, buf, sizeof(buf));
message_handler_->Message(
kError, "Failed to attach memcached server %s:%d %s (%d)",
hosts_[i].c_str(), ports_[i], buf, status);
success = false;
} else {
if (timeout_us_ != kTimeoutUnset) {
apr_memcache2_set_timeout_microseconds(memcached_, timeout_us_);
}
servers_.push_back(server);
}
}
}
return success;
}
void AprMemCache::DecodeValueMatchingKeyAndCallCallback(
const GoogleString& key, const char* data, size_t data_len,
const char* calling_method, Callback* callback) {
SharedString key_and_value;
key_and_value.Assign(data, data_len);
GoogleString actual_key;
if (key_value_codec::Decode(&key_and_value, &actual_key, callback->value())) {
if (key == actual_key) {
ValidateAndReportResult(actual_key, CacheInterface::kAvailable, callback);
} else {
message_handler_->Message(
kError, "AprMemCache::%s key collision %s != %s",
calling_method, key.c_str(), actual_key.c_str());
ValidateAndReportResult(key, CacheInterface::kNotFound, callback);
}
} else {
message_handler_->Message(
kError, "AprMemCache::%s decoding error on key %s",
calling_method, key.c_str());
ValidateAndReportResult(key, CacheInterface::kNotFound, callback);
}
}
void AprMemCache::Get(const GoogleString& key, Callback* callback) {
if (!IsHealthy()) {
ValidateAndReportResult(key, CacheInterface::kNotFound, callback);
return;
}
apr_pool_t* data_pool;
apr_pool_create(&data_pool, pool_);
CHECK(data_pool != NULL) << "apr_pool_t data_pool allocation failure";
GoogleString hashed_key = hasher_->Hash(key);
char* data;
apr_size_t data_len;
apr_status_t status = apr_memcache2_getp(
memcached_, data_pool, hashed_key.c_str(), &data, &data_len, NULL);
if (status == APR_SUCCESS) {
DecodeValueMatchingKeyAndCallCallback(key, data, data_len, "Get", callback);
} else {
if (status != APR_NOTFOUND) {
RecordError();
char buf[kStackBufferSize];
apr_strerror(status, buf, sizeof(buf));
message_handler_->Message(
kError, "AprMemCache::Get error: %s (%d) on key %s",
buf, status, key.c_str());
if (status == APR_TIMEUP) {
timeouts_->Add(1);
}
}
ValidateAndReportResult(key, CacheInterface::kNotFound, callback);
}
apr_pool_destroy(data_pool);
}
void AprMemCache::MultiGet(MultiGetRequest* request) {
if (!IsHealthy()) {
ReportMultiGetNotFound(request);
return;
}
// apr_memcache2_multgetp documentation indicates it may clear the
// temp_pool inside the function. Thus it is risky to pass the same
// pool for both temp_pool and data_pool, as we need to read the
// data after the call.
apr_pool_t* data_pool;
apr_pool_create(&data_pool, pool_);
CHECK(data_pool != NULL) << "apr_pool_t data_pool allocation failure";
apr_pool_t* temp_pool = NULL;
apr_pool_create(&temp_pool, pool_);
CHECK(temp_pool != NULL) << "apr_pool_t temp_pool allocation failure";
apr_hash_t* hash_table = apr_hash_make(data_pool);
StringVector hashed_keys;
for (int i = 0, n = request->size(); i < n; ++i) {
GoogleString hashed_key = hasher_->Hash((*request)[i].key);
hashed_keys.push_back(hashed_key);
apr_memcache2_add_multget_key(data_pool, hashed_key.c_str(), &hash_table);
}
apr_status_t status = apr_memcache2_multgetp(memcached_, temp_pool, data_pool,
hash_table);
apr_pool_destroy(temp_pool);
bool error_recorded = false;
if (status == APR_SUCCESS) {
for (int i = 0, n = request->size(); i < n; ++i) {
CacheInterface::KeyCallback* key_callback = &(*request)[i];
const GoogleString& key = key_callback->key;
Callback* callback = key_callback->callback;
const GoogleString& hashed_key = hashed_keys[i];
apr_memcache2_value_t* value = static_cast<apr_memcache2_value_t*>(
apr_hash_get(hash_table, hashed_key.data(), hashed_key.size()));
if (value == NULL) {
status = APR_NOTFOUND;
} else {
status = value->status;
}
if (status == APR_SUCCESS) {
DecodeValueMatchingKeyAndCallCallback(key, value->data, value->len,
"MultiGet", callback);
} else {
if (status != APR_NOTFOUND) {
if (!error_recorded) {
// Only count 1 error towards threshold on MultiGet failure.
error_recorded = true;
RecordError();
}
char buf[kStackBufferSize];
apr_strerror(status, buf, sizeof(buf));
message_handler_->Message(
kError, "AprMemCache::MultiGet error: %s (%d) on key %s",
buf, status, key.c_str());
if (status == APR_TIMEUP) {
timeouts_->Add(1);
}
}
ValidateAndReportResult(key, CacheInterface::kNotFound, callback);
}
}
delete request;
} else {
RecordError();
char buf[kStackBufferSize];
apr_strerror(status, buf, sizeof(buf));
message_handler_->Message(
kError, "AprMemCache::MultiGet error: %s (%d) on %d keys",
buf, status, static_cast<int>(request->size()));
ReportMultiGetNotFound(request);
}
apr_pool_destroy(data_pool);
}
void AprMemCache::PutHelper(const GoogleString& key,
SharedString* key_and_value) {
// I believe apr_memcache2_set erroneously takes a char* for the value.
// Hence we const_cast.
GoogleString hashed_key = hasher_->Hash(key);
apr_status_t status = apr_memcache2_set(
memcached_, hashed_key.c_str(),
const_cast<char*>(key_and_value->data()), key_and_value->size(),
0, 0);
if (status != APR_SUCCESS) {
RecordError();
char buf[kStackBufferSize];
apr_strerror(status, buf, sizeof(buf));
int value_size = key_value_codec::GetValueSizeFromKeyAndKeyValue(
key, *key_and_value);
message_handler_->Message(
kError, "AprMemCache::Put error: %s (%d) on key %s, value-size %d",
buf, status, key.c_str(), value_size);
if (status == APR_TIMEUP) {
timeouts_->Add(1);
}
}
}
void AprMemCache::PutWithKeyInValue(const GoogleString& key,
SharedString* key_and_value) {
if (!IsHealthy()) {
return;
}
PutHelper(key, key_and_value);
}
void AprMemCache::Put(const GoogleString& key, SharedString* value) {
if (!IsHealthy()) {
return;
}
SharedString key_and_value;
if (key_value_codec::Encode(key, value, &key_and_value)) {
PutHelper(key, &key_and_value);
} else {
message_handler_->Message(
kError, "AprMemCache::Put error: key size %d too large, first "
"100 bytes of key is: %s",
static_cast<int>(key.size()), key.substr(0, 100).c_str());
}
}
void AprMemCache::Delete(const GoogleString& key) {
if (!IsHealthy()) {
return;
}
// Note that deleting a key whose value exceeds our size threshold
// will not actually remove it from the fallback cache. However, it
// will remove our sentinel indicating that it's in the fallback cache,
// and therefore it will be functionally deleted.
//
// TODO(jmarantz): determine whether it's better to defensively delete
// it from the fallback cache even though most data will not be, thus
// incurring file system overhead for small data deleted from memcached.
//
// Another option would be to issue a Get before the Delete to see
// if it's in the fallback cache, but that would send more load to
// memcached, possibly transferring significant amounts of data that
// will be tossed.
GoogleString hashed_key = hasher_->Hash(key);
apr_status_t status = apr_memcache2_delete(memcached_, hashed_key.c_str(), 0);
if ((status != APR_SUCCESS) && (status != APR_NOTFOUND)) {
RecordError();
char buf[kStackBufferSize];
apr_strerror(status, buf, sizeof(buf));
message_handler_->Message(
kError, "AprMemCache::Delete error: %s (%d) on key %s", buf, status,
key.c_str());
if (status == APR_TIMEUP) {
timeouts_->Add(1);
}
}
}
bool AprMemCache::GetStatus(GoogleString* buffer) {
apr_pool_t* temp_pool = NULL;
apr_pool_create(&temp_pool, pool_);
CHECK(temp_pool != NULL) << "apr_pool_t allocation failure";
bool ret = true;
for (int i = 0, n = servers_.size(); i < n; ++i) {
apr_memcache2_stats_t* stats;
apr_status_t status = apr_memcache2_stats(servers_[i], temp_pool, &stats);
if (status == APR_SUCCESS) {
StrAppend(buffer, "memcached server ", hosts_[i], ":",
IntegerToString(ports_[i]), " version ", stats->version);
StrAppend(buffer, " pid ", IntegerToString(stats->pid), " up ",
IntegerToString(stats->uptime), " seconds \n");
StrAppend(buffer, "bytes: ",
Integer64ToString(stats->bytes), "\n");
StrAppend(buffer, "bytes_read: ",
Integer64ToString(stats->bytes_read), "\n");
StrAppend(buffer, "bytes_written: ",
Integer64ToString(stats->bytes_written), "\n");
StrAppend(buffer, "cmd_get: ",
IntegerToString(stats->cmd_get), "\n");
StrAppend(buffer, "cmd_set: ",
IntegerToString(stats->cmd_set), "\n");
StrAppend(buffer, "connection_structures: ",
IntegerToString(stats->connection_structures), "\n");
StrAppend(buffer, "curr_connections: ",
IntegerToString(stats->curr_connections), "\n");
StrAppend(buffer, "curr_items: ",
IntegerToString(stats->curr_items), "\n");
StrAppend(buffer, "evictions: ",
Integer64ToString(stats->evictions), "\n");
StrAppend(buffer, "get_hits: ",
IntegerToString(stats->get_hits), "\n");
StrAppend(buffer, "get_misses: ",
IntegerToString(stats->get_misses), "\n");
StrAppend(buffer, "limit_maxbytes: ",
IntegerToString(stats->limit_maxbytes), "\n");
StrAppend(buffer, "pointer_size: ",
IntegerToString(stats->pointer_size), "\n");
StrAppend(buffer, "rusage_system: ",
Integer64ToString(stats->rusage_system), "\n");
StrAppend(buffer, "rusage_user: ",
Integer64ToString(stats->pointer_size), "\n");
StrAppend(buffer, "threads: ",
IntegerToString(stats->threads), "\n");
StrAppend(buffer, "total_connections: ",
IntegerToString(stats->total_connections), "\n");
StrAppend(buffer, "total_items: ",
IntegerToString(stats->total_items), "\n");
StrAppend(buffer, "\n");
// TODO(jmarantz): add the rest of the stats from http://apr.apache.org
// /docs/apr-util/1.4/structapr__memcache__stats__t.html
} else {
ret = false;
}
}
apr_pool_destroy(temp_pool);
return ret;
}
void AprMemCache::RecordError() {
// Note that we are sharing state with other Apache child processes,
// and we use Statistics Variables to determine our current health
// status. In Apache those are implemented via shared memory.
int64 time_ms = timer_->NowMs();
int64 last_error_checkpoint_ms = last_error_checkpoint_ms_->Get();
int64 delta_ms = time_ms - last_error_checkpoint_ms;
// The first time we catch an error we'll set the time of the error.
// We'll keep counting errors for 30 seconds declaring sickness when
// we reach 4. That's an approximation because there will be
// cross-process races between accesses of the time & counts.
//
// When we get to 30 seconds since the start of the error burst we
// clear everything & start counting again.
if (delta_ms > kHealthCheckpointIntervalMs) {
last_error_checkpoint_ms_->Set(time_ms);
error_burst_size_->Set(1);
} else {
error_burst_size_->Add(1);
}
}
bool AprMemCache::IsHealthy() const {
if (shutdown_.value()) {
return false;
}
int64 time_ms = timer_->NowMs();
int64 last_error_checkpoint_ms = last_error_checkpoint_ms_->Get();
int64 delta_ms = time_ms - last_error_checkpoint_ms;
int64 error_burst_size = error_burst_size_->Get();
if (delta_ms > kHealthCheckpointIntervalMs) {
if (error_burst_size >= kMaxErrorBurst) {
// We were sick, but now it seems enough time has expired to
// see whether we've recovered.
message_handler_->Message(
kInfo, "AprMemCache::IsHealthy error: Attempting to recover");
}
error_burst_size_->Set(0);
return true;
}
return error_burst_size < kMaxErrorBurst;
}
void AprMemCache::ShutDown() {
shutdown_.set_value(true);
}
void AprMemCache::set_timeout_us(int timeout_us) {
timeout_us_ = timeout_us;
if ((memcached_ != NULL) && (timeout_us != kTimeoutUnset)) {
apr_memcache2_set_timeout_microseconds(memcached_, timeout_us);
}
}
} // namespace net_instaweb