blob: 1ad11199883e5879530ed5c0f317444052c5d11f [file] [log] [blame]
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#pragma once
#include "iocore/eventsystem/Continuation.h"
#include "tscore/ink_platform.h"
#include "tscore/InkErrno.h"
#include "proxy/hdrs/HTTP.h"
#include "P_CacheHttp.h"
#include "P_CacheHosting.h"
#include "tsutil/Metrics.h"
#include "iocore/cache/CacheVC.h"
#include "iocore/cache/CacheEvacuateDocVC.h"
using ts::Metrics;
struct EvacuationBlock;
// Compilation Options
#define ALTERNATES 1
// #define CACHE_LOCK_FAIL_RATE 0.001
// #define CACHE_AGG_FAIL_RATE 0.005
#define MAX_CACHE_VCS_PER_THREAD 500
#define INTEGRAL_FRAGS 4
#ifdef DEBUG
#define DDbg(dbg_ctl, fmt, ...) Dbg(dbg_ctl, fmt, ##__VA_ARGS__)
#else
#define DDbg(dbg_ctl, fmt, ...)
#endif
#define AIO_SOFT_FAILURE -100000
#ifndef CACHE_LOCK_FAIL_RATE
#define CACHE_TRY_LOCK(_l, _m, _t) MUTEX_TRY_LOCK(_l, _m, _t)
#else
#define CACHE_TRY_LOCK(_l, _m, _t) \
MUTEX_TRY_LOCK(_l, _m, _t); \
if ((uint32_t)_t->generator.random() < (uint32_t)(UINT_MAX * CACHE_LOCK_FAIL_RATE)) \
CACHE_MUTEX_RELEASE(_l)
#endif
#define VC_LOCK_RETRY_EVENT() \
do { \
trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay), event); \
return EVENT_CONT; \
} while (0)
#define VC_SCHED_LOCK_RETRY() \
do { \
trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); \
return EVENT_CONT; \
} while (0)
#define CONT_SCHED_LOCK_RETRY_RET(_c) \
do { \
_c->mutex->thread_holding->schedule_in_local(_c, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); \
return EVENT_CONT; \
} while (0)
#define CONT_SCHED_LOCK_RETRY(_c) _c->mutex->thread_holding->schedule_in_local(_c, HRTIME_MSECONDS(cache_config_mutex_retry_delay))
#define VC_SCHED_WRITER_RETRY() \
do { \
ink_assert(!trigger); \
writer_lock_retry++; \
ink_hrtime _t = HRTIME_MSECONDS(cache_read_while_writer_retry_delay); \
if (writer_lock_retry > 2) \
_t = HRTIME_MSECONDS(cache_read_while_writer_retry_delay) * 2; \
trigger = mutex->thread_holding->schedule_in_local(this, _t); \
return EVENT_CONT; \
} while (0)
extern CacheStatsBlock cache_rsb;
// Configuration
extern int cache_config_dir_sync_frequency;
extern int cache_config_http_max_alts;
extern int cache_config_log_alternate_eviction;
extern int cache_config_permit_pinning;
extern int cache_config_select_alternate;
extern int cache_config_max_doc_size;
extern int cache_config_min_average_object_size;
extern int cache_config_agg_write_backlog;
extern int cache_config_enable_checksum;
extern int cache_config_alt_rewrite_max_size;
extern int cache_config_read_while_writer;
extern int cache_config_agg_write_backlog;
extern int cache_config_ram_cache_compress;
extern int cache_config_ram_cache_compress_percent;
extern int cache_config_ram_cache_use_seen_filter;
extern int cache_config_hit_evacuate_percent;
extern int cache_config_hit_evacuate_size_limit;
extern int cache_config_force_sector_size;
extern int cache_config_target_fragment_size;
extern int cache_config_mutex_retry_delay;
extern int cache_read_while_writer_retry_delay;
extern int cache_config_read_while_writer_max_retries;
#define PUSH_HANDLER(_x) \
do { \
ink_assert(handler != (ContinuationHandler)(&CacheVC::dead)); \
save_handler = handler; \
handler = (ContinuationHandler)(_x); \
} while (0)
#define POP_HANDLER \
do { \
handler = save_handler; \
ink_assert(handler != (ContinuationHandler)(&CacheVC::dead)); \
} while (0)
struct CacheRemoveCont : public Continuation {
int event_handler(int event, void *data);
CacheRemoveCont() : Continuation(nullptr) {}
};
// Global Data
extern ClassAllocator<CacheVC> cacheVConnectionAllocator;
extern ClassAllocator<CacheEvacuateDocVC> cacheEvacuateDocVConnectionAllocator;
extern CacheSync *cacheDirSync;
// Function Prototypes
int cache_write(CacheVC *, CacheHTTPInfoVector *);
int get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key);
CacheEvacuateDocVC *new_DocEvacuator(int nbytes, Stripe *stripe);
// inline Functions
inline CacheVC *
new_CacheVC(Continuation *cont)
{
EThread *t = cont->mutex->thread_holding;
CacheVC *c = THREAD_ALLOC(cacheVConnectionAllocator, t);
c->vector.data.data = &c->vector.data.fast_data[0];
c->_action = cont;
c->mutex = cont->mutex;
c->start_time = ink_get_hrtime();
c->setThreadAffinity(t);
ink_assert(c->trigger == nullptr);
static DbgCtl dbg_ctl{"cache_new"};
Dbg(dbg_ctl, "new %p", c);
dir_clear(&c->dir);
return c;
}
inline int
free_CacheVCCommon(CacheVC *cont)
{
static DbgCtl dbg_ctl{"cache_free"};
Dbg(dbg_ctl, "free %p", cont);
ProxyMutex *mutex = cont->mutex.get();
Stripe *stripe = cont->stripe;
if (stripe) {
Metrics::Gauge::decrement(cache_rsb.status[cont->op_type].active);
Metrics::Gauge::decrement(stripe->cache_vol->vol_rsb.status[cont->op_type].active);
if (cont->closed > 0) {
Metrics::Counter::increment(cache_rsb.status[cont->op_type].success);
Metrics::Counter::increment(stripe->cache_vol->vol_rsb.status[cont->op_type].success);
} // else abort,cancel
}
ink_assert(mutex->thread_holding == this_ethread());
if (cont->trigger) {
cont->trigger->cancel();
}
ink_assert(!cont->is_io_in_progress());
ink_assert(!cont->od);
cont->io.action = nullptr;
cont->io.mutex.clear();
cont->io.aio_result = 0;
cont->io.aiocb.aio_nbytes = 0;
cont->request.reset();
cont->vector.clear();
cont->vio.buffer.clear();
cont->vio.mutex.clear();
if (cont->vio.op == VIO::WRITE && cont->alternate_index == CACHE_ALT_INDEX_DEFAULT) {
cont->alternate.destroy();
} else {
cont->alternate.clear();
}
cont->_action.cancelled = 0;
cont->_action.mutex.clear();
cont->mutex.clear();
cont->buf.clear();
cont->first_buf.clear();
cont->blocks.clear();
cont->writer_buf.clear();
cont->alternate_index = CACHE_ALT_INDEX_DEFAULT;
ats_free(cont->scan_stripe_map);
memset((char *)&cont->vio, 0, cont->size_to_init);
#ifdef DEBUG
SET_CONTINUATION_HANDLER(cont, &CacheVC::dead);
#endif
return EVENT_DONE;
}
inline int
free_CacheVC(CacheVC *cont)
{
free_CacheVCCommon(cont);
THREAD_FREE(cont, cacheVConnectionAllocator, this_thread());
return EVENT_DONE;
}
inline int
free_CacheEvacuateDocVC(CacheEvacuateDocVC *cont)
{
free_CacheVCCommon(cont);
THREAD_FREE(cont, cacheEvacuateDocVConnectionAllocator, this_thread());
return EVENT_DONE;
}
inline int
CacheVC::calluser(int event)
{
recursive++;
ink_assert(!stripe || this_ethread() != stripe->mutex->thread_holding);
vio.cont->handleEvent(event, (void *)&vio);
recursive--;
if (closed) {
die();
return EVENT_DONE;
}
return EVENT_CONT;
}
inline int
CacheVC::callcont(int event)
{
recursive++;
ink_assert(!stripe || this_ethread() != stripe->mutex->thread_holding);
_action.continuation->handleEvent(event, this);
recursive--;
if (closed) {
die();
} else if (vio.vc_server) {
handleEvent(EVENT_IMMEDIATE, nullptr);
}
return EVENT_DONE;
}
inline int
CacheVC::do_read_call(CacheKey *akey)
{
doc_pos = 0;
read_key = akey;
io.aiocb.aio_nbytes = dir_approx_size(&dir);
PUSH_HANDLER(&CacheVC::handleRead);
return handleRead(EVENT_CALL, nullptr);
}
inline int
CacheVC::do_write_call()
{
PUSH_HANDLER(&CacheVC::handleWrite);
return handleWrite(EVENT_CALL, nullptr);
}
inline void
CacheVC::cancel_trigger()
{
if (trigger) {
trigger->cancel_action();
trigger = nullptr;
}
}
inline int
CacheVC::die()
{
if (vio.op == VIO::WRITE) {
if (f.update && total_len) {
alternate.object_key_set(earliest_key);
}
if (!is_io_in_progress()) {
SET_HANDLER(&CacheVC::openWriteClose);
if (!recursive) {
openWriteClose(EVENT_NONE, nullptr);
}
} // else catch it at the end of openWriteWriteDone
return EVENT_CONT;
} else {
if (is_io_in_progress()) {
save_handler = (ContinuationHandler)&CacheVC::openReadClose;
} else {
SET_HANDLER(&CacheVC::openReadClose);
if (!recursive) {
openReadClose(EVENT_NONE, nullptr);
}
}
return EVENT_CONT;
}
}
inline int
CacheVC::handleWriteLock(int /* event ATS_UNUSED */, Event *e)
{
cancel_trigger();
int ret = 0;
{
CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
set_agg_write_in_progress();
trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
return EVENT_CONT;
}
ret = handleWrite(EVENT_CALL, e);
}
if (ret == EVENT_RETURN) {
return handleEvent(AIO_EVENT_DONE, nullptr);
}
return EVENT_CONT;
}
inline int
CacheVC::do_write_lock()
{
PUSH_HANDLER(&CacheVC::handleWriteLock);
return handleWriteLock(EVENT_NONE, nullptr);
}
inline int
CacheVC::do_write_lock_call()
{
PUSH_HANDLER(&CacheVC::handleWriteLock);
return handleWriteLock(EVENT_CALL, nullptr);
}
inline bool
CacheVC::writer_done()
{
OpenDirEntry *cod = od;
if (!cod) {
cod = stripe->open_read(&first_key);
}
CacheVC *w = (cod) ? cod->writers.head : nullptr;
// If the write vc started after the reader, then its not the
// original writer, since we never choose a writer that started
// after the reader. The original writer was deallocated and then
// reallocated for the same first_key
for (; w && (w != write_vc || w->start_time > start_time); w = (CacheVC *)w->opendir_link.next) {
;
}
if (!w) {
return true;
}
return false;
}
inline int
Stripe::close_write(CacheVC *cont)
{
return open_dir.close_write(cont);
}
// Returns 0 on success or a positive error code on failure
inline int
Stripe::open_write(CacheVC *cont, int allow_if_writers, int max_writers)
{
Stripe *stripe = this;
bool agg_error = false;
if (!cont->f.remove) {
agg_error = (!cont->f.update && this->get_agg_todo_size() > cache_config_agg_write_backlog);
#ifdef CACHE_AGG_FAIL_RATE
agg_error = agg_error || ((uint32_t)mutex->thread_holding->generator.random() < (uint32_t)(UINT_MAX * CACHE_AGG_FAIL_RATE));
#endif
}
if (agg_error) {
Metrics::Counter::increment(cache_rsb.write_backlog_failure);
Metrics::Counter::increment(stripe->cache_vol->vol_rsb.write_backlog_failure);
return ECACHE_WRITE_FAIL;
}
if (open_dir.open_write(cont, allow_if_writers, max_writers)) {
return 0;
}
return ECACHE_DOC_BUSY;
}
inline int
Stripe::close_write_lock(CacheVC *cont)
{
EThread *t = cont->mutex->thread_holding;
CACHE_TRY_LOCK(lock, mutex, t);
if (!lock.is_locked()) {
return -1;
}
return close_write(cont);
}
inline int
Stripe::open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers)
{
EThread *t = cont->mutex->thread_holding;
CACHE_TRY_LOCK(lock, mutex, t);
if (!lock.is_locked()) {
return -1;
}
return open_write(cont, allow_if_writers, max_writers);
}
inline OpenDirEntry *
Stripe::open_read_lock(CryptoHash *key, EThread *t)
{
CACHE_TRY_LOCK(lock, mutex, t);
if (!lock.is_locked()) {
return nullptr;
}
return open_dir.open_read(key);
}
inline int
Stripe::begin_read_lock(CacheVC *cont)
{
// no need for evacuation as the entire document is already in memory
if (cont->f.single_fragment) {
return 0;
}
EThread *t = cont->mutex->thread_holding;
CACHE_TRY_LOCK(lock, mutex, t);
if (!lock.is_locked()) {
return -1;
}
return begin_read(cont);
}
inline int
Stripe::close_read_lock(CacheVC *cont)
{
EThread *t = cont->mutex->thread_holding;
CACHE_TRY_LOCK(lock, mutex, t);
if (!lock.is_locked()) {
return -1;
}
return close_read(cont);
}
inline int
dir_delete_lock(CacheKey *key, Stripe *stripe, ProxyMutex *m, Dir *del)
{
EThread *thread = m->thread_holding;
CACHE_TRY_LOCK(lock, stripe->mutex, thread);
if (!lock.is_locked()) {
return -1;
}
return dir_delete(key, stripe, del);
}
inline int
dir_insert_lock(CacheKey *key, Stripe *stripe, Dir *to_part, ProxyMutex *m)
{
EThread *thread = m->thread_holding;
CACHE_TRY_LOCK(lock, stripe->mutex, thread);
if (!lock.is_locked()) {
return -1;
}
return dir_insert(key, stripe, to_part);
}
inline int
dir_overwrite_lock(CacheKey *key, Stripe *stripe, Dir *to_part, ProxyMutex *m, Dir *overwrite, bool must_overwrite = true)
{
EThread *thread = m->thread_holding;
CACHE_TRY_LOCK(lock, stripe->mutex, thread);
if (!lock.is_locked()) {
return -1;
}
return dir_overwrite(key, stripe, to_part, overwrite, must_overwrite);
}
void inline rand_CacheKey(CacheKey *next_key)
{
EThread *ethread = this_ethread();
next_key->b[0] = ethread->generator.random();
next_key->b[1] = ethread->generator.random();
}
extern const uint8_t CacheKey_next_table[256];
void inline next_CacheKey(CacheKey *next, const CacheKey *key)
{
next->u8[0] = CacheKey_next_table[key->u8[0]];
for (int i = 1; i < 16; i++) {
next->u8[i] = CacheKey_next_table[(next->u8[i - 1] + key->u8[i]) & 0xFF];
}
}
extern const uint8_t CacheKey_prev_table[256];
void inline prev_CacheKey(CacheKey *prev, const CacheKey *key)
{
for (int i = 15; i > 0; i--) {
prev->u8[i] = 256 + CacheKey_prev_table[key->u8[i]] - key->u8[i - 1];
}
prev->u8[0] = CacheKey_prev_table[key->u8[0]];
}
inline unsigned int
next_rand(unsigned int *p)
{
unsigned int seed = *p;
seed = 1103515145 * seed + 12345;
*p = seed;
return seed;
}
extern ClassAllocator<CacheRemoveCont> cacheRemoveContAllocator;
inline CacheRemoveCont *
new_CacheRemoveCont()
{
CacheRemoveCont *cache_rm = cacheRemoveContAllocator.alloc();
cache_rm->mutex = new_ProxyMutex();
SET_CONTINUATION_HANDLER(cache_rm, &CacheRemoveCont::event_handler);
return cache_rm;
}
inline void
free_CacheRemoveCont(CacheRemoveCont *cache_rm)
{
cache_rm->mutex = nullptr;
cacheRemoveContAllocator.free(cache_rm);
}
inline int
CacheRemoveCont::event_handler(int event, void *data)
{
(void)event;
(void)data;
free_CacheRemoveCont(this);
return EVENT_DONE;
}
struct CacheHostRecord;
class Stripe;
class CacheHostTable;
struct Cache {
int cache_read_done = 0;
int total_good_nvol = 0;
int total_nvol = 0;
int ready = CACHE_INITIALIZING;
int64_t cache_size = 0; // in store block size
int total_initialized_vol = 0;
CacheType scheme = CACHE_NONE_TYPE;
ReplaceablePtr<CacheHostTable> hosttable;
int open(bool reconfigure, bool fix);
int close();
Action *lookup(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int host_len);
Action *open_read(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int len);
Action *open_write(Continuation *cont, const CacheKey *key, CacheFragType frag_type, int options = 0,
time_t pin_in_cache = (time_t)0, const char *hostname = nullptr, int host_len = 0);
Action *remove(Continuation *cont, const CacheKey *key, CacheFragType type = CACHE_FRAG_TYPE_HTTP, const char *hostname = nullptr,
int host_len = 0);
Action *scan(Continuation *cont, const char *hostname = nullptr, int host_len = 0, int KB_per_second = 2500);
Action *open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request, const HttpConfigAccessor *params,
CacheFragType type, const char *hostname, int host_len);
Action *open_write(Continuation *cont, const CacheKey *key, CacheHTTPInfo *old_info, time_t pin_in_cache = (time_t)0,
const CacheKey *key1 = nullptr, CacheFragType type = CACHE_FRAG_TYPE_HTTP, const char *hostname = nullptr,
int host_len = 0);
static void generate_key(CryptoHash *hash, CacheURL *url);
static void generate_key(HttpCacheKey *hash, CacheURL *url, bool ignore_query = false, cache_generation_t generation = -1);
void vol_initialized(bool result);
int open_done();
Stripe *key_to_stripe(const CacheKey *key, const char *hostname, int host_len);
Cache() {}
};
extern Cache *theCache;
extern Cache *caches[NUM_CACHE_FRAG_TYPES];
inline void
Cache::generate_key(CryptoHash *hash, CacheURL *url)
{
url->hash_get(hash);
}
inline void
Cache::generate_key(HttpCacheKey *key, CacheURL *url, bool ignore_query, cache_generation_t generation)
{
key->hostname = url->host_get(&key->hostlen);
url->hash_get(&key->hash, ignore_query, generation);
}
inline unsigned int
cache_hash(const CryptoHash &hash)
{
uint64_t f = hash.fold();
unsigned int mhash = (unsigned int)(f >> 32);
return mhash;
}