blob: cf747de60cf894b13d929d8a6ee47a498e088c86 [file] [log] [blame]
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#pragma once
#include "tscore/ink_platform.h"
#include "tscore/InkErrno.h"
#include "HTTP.h"
#include "P_CacheHttp.h"
struct EvacuationBlock;
// Compilation Options
#define ALTERNATES 1
// #define CACHE_LOCK_FAIL_RATE 0.001
// #define CACHE_AGG_FAIL_RATE 0.005
// #define CACHE_INSPECTOR_PAGES
#define MAX_CACHE_VCS_PER_THREAD 500
#define INTEGRAL_FRAGS 4
#ifdef CACHE_INSPECTOR_PAGES
#ifdef DEBUG
#define CACHE_STAT_PAGES
#endif
#endif
#ifdef DEBUG
#define DDebug(tag, fmt, ...) Debug(tag, fmt, ##__VA_ARGS__)
#else
#define DDebug(tag, fmt, ...)
#endif
#define AIO_SOFT_FAILURE -100000
#ifndef CACHE_LOCK_FAIL_RATE
#define CACHE_TRY_LOCK(_l, _m, _t) MUTEX_TRY_LOCK(_l, _m, _t)
#else
#define CACHE_TRY_LOCK(_l, _m, _t) \
MUTEX_TRY_LOCK(_l, _m, _t); \
if ((uint32_t)_t->generator.random() < (uint32_t)(UINT_MAX * CACHE_LOCK_FAIL_RATE)) \
CACHE_MUTEX_RELEASE(_l)
#endif
#define VC_LOCK_RETRY_EVENT() \
do { \
trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay), event); \
return EVENT_CONT; \
} while (0)
#define VC_SCHED_LOCK_RETRY() \
do { \
trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); \
return EVENT_CONT; \
} while (0)
#define CONT_SCHED_LOCK_RETRY_RET(_c) \
do { \
_c->mutex->thread_holding->schedule_in_local(_c, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); \
return EVENT_CONT; \
} while (0)
#define CONT_SCHED_LOCK_RETRY(_c) _c->mutex->thread_holding->schedule_in_local(_c, HRTIME_MSECONDS(cache_config_mutex_retry_delay))
#define VC_SCHED_WRITER_RETRY() \
do { \
ink_assert(!trigger); \
writer_lock_retry++; \
ink_hrtime _t = HRTIME_MSECONDS(cache_read_while_writer_retry_delay); \
if (writer_lock_retry > 2) \
_t = HRTIME_MSECONDS(cache_read_while_writer_retry_delay) * 2; \
trigger = mutex->thread_holding->schedule_in_local(this, _t); \
return EVENT_CONT; \
} while (0)
// cache stats definitions
enum {
cache_bytes_used_stat,
cache_bytes_total_stat,
cache_ram_cache_bytes_stat,
cache_ram_cache_bytes_total_stat,
cache_direntries_total_stat,
cache_direntries_used_stat,
cache_ram_cache_hits_stat,
cache_ram_cache_misses_stat,
cache_pread_count_stat,
cache_percent_full_stat,
cache_lookup_active_stat,
cache_lookup_success_stat,
cache_lookup_failure_stat,
cache_read_active_stat,
cache_read_success_stat,
cache_read_failure_stat,
cache_read_seek_fail_stat,
cache_write_active_stat,
cache_write_success_stat,
cache_write_failure_stat,
cache_write_backlog_failure_stat,
cache_update_active_stat,
cache_update_success_stat,
cache_update_failure_stat,
cache_remove_active_stat,
cache_remove_success_stat,
cache_remove_failure_stat,
cache_evacuate_active_stat,
cache_evacuate_success_stat,
cache_evacuate_failure_stat,
cache_scan_active_stat,
cache_scan_success_stat,
cache_scan_failure_stat,
cache_directory_collision_count_stat,
cache_single_fragment_document_count_stat,
cache_two_fragment_document_count_stat,
cache_three_plus_plus_fragment_document_count_stat,
cache_read_busy_success_stat,
cache_read_busy_failure_stat,
cache_gc_bytes_evacuated_stat,
cache_gc_frags_evacuated_stat,
cache_write_bytes_stat,
cache_hdr_vector_marshal_stat,
cache_hdr_marshal_stat,
cache_hdr_marshal_bytes_stat,
cache_directory_wrap_stat,
cache_directory_sync_count_stat,
cache_directory_sync_time_stat,
cache_directory_sync_bytes_stat,
/* AIO read/write error counters */
cache_span_errors_read_stat,
cache_span_errors_write_stat,
/* Span related gauges. A span "moves" from "online" (errors==0)
* to "failing" (errors > 0 && errors < proxy.config.cache.max_disk_errors)
* to "offline"(errors >= proxy.config.cache.max_disk_errors.
* "failing" + "offline" + "online" = total number of spans */
cache_span_offline_stat,
cache_span_online_stat,
cache_span_failing_stat,
cache_stat_count
};
extern RecRawStatBlock *cache_rsb;
#define GLOBAL_CACHE_SET_DYN_STAT(x, y) RecSetGlobalRawStatSum(cache_rsb, (x), (y))
#define CACHE_SET_DYN_STAT(x, y) \
RecSetGlobalRawStatSum(cache_rsb, (x), (y)) RecSetGlobalRawStatSum(vol->cache_vol->vol_rsb, (x), (y))
#define CACHE_INCREMENT_DYN_STAT(x) \
do { \
RecIncrRawStat(cache_rsb, mutex->thread_holding, (int)(x), 1); \
RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int)(x), 1); \
} while (0);
#define CACHE_DECREMENT_DYN_STAT(x) \
do { \
RecIncrRawStat(cache_rsb, mutex->thread_holding, (int)(x), -1); \
RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int)(x), -1); \
} while (0);
#define CACHE_VOL_SUM_DYN_STAT(x, y) RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int)(x), (int64_t)y);
#define CACHE_SUM_DYN_STAT(x, y) \
do { \
RecIncrRawStat(cache_rsb, mutex->thread_holding, (int)(x), (int64_t)(y)); \
RecIncrRawStat(vol->cache_vol->vol_rsb, mutex->thread_holding, (int)(x), (int64_t)(y)); \
} while (0);
#define CACHE_SUM_DYN_STAT_THREAD(x, y) \
do { \
RecIncrRawStat(cache_rsb, this_ethread(), (int)(x), (int64_t)(y)); \
RecIncrRawStat(vol->cache_vol->vol_rsb, this_ethread(), (int)(x), (int64_t)(y)); \
} while (0);
#define GLOBAL_CACHE_SUM_GLOBAL_DYN_STAT(x, y) RecIncrGlobalRawStatSum(cache_rsb, (x), (y))
#define CACHE_SUM_GLOBAL_DYN_STAT(x, y) \
RecIncrGlobalRawStatSum(cache_rsb, (x), (y)) RecIncrGlobalRawStatSum(vol->cache_vol->vol_rsb, (x), (y))
#define CACHE_CLEAR_DYN_STAT(x) \
do { \
RecSetRawStatSum(cache_rsb, (x), 0); \
RecSetRawStatCount(cache_rsb, (x), 0); \
RecSetRawStatSum(vol->cache_vol->vol_rsb, (x), 0); \
RecSetRawStatCount(vol->cache_vol->vol_rsb, (x), 0); \
} while (0);
// Configuration
extern int cache_config_dir_sync_frequency;
extern int cache_config_http_max_alts;
extern int cache_config_log_alternate_eviction;
extern int cache_config_permit_pinning;
extern int cache_config_select_alternate;
extern int cache_config_max_doc_size;
extern int cache_config_min_average_object_size;
extern int cache_config_agg_write_backlog;
extern int cache_config_enable_checksum;
extern int cache_config_alt_rewrite_max_size;
extern int cache_config_read_while_writer;
extern int cache_config_agg_write_backlog;
extern int cache_config_ram_cache_compress;
extern int cache_config_ram_cache_compress_percent;
extern int cache_config_ram_cache_use_seen_filter;
extern int cache_config_hit_evacuate_percent;
extern int cache_config_hit_evacuate_size_limit;
extern int cache_config_force_sector_size;
extern int cache_config_target_fragment_size;
extern int cache_config_mutex_retry_delay;
extern int cache_read_while_writer_retry_delay;
extern int cache_config_read_while_writer_max_retries;
// CacheVC
struct CacheVC : public CacheVConnection {
CacheVC();
VIO *do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *buf) override;
VIO *do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *buf, int64_t offset) override;
VIO *do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *buf, bool owner = false) override;
void do_io_close(int lerrno = -1) override;
void reenable(VIO *avio) override;
void reenable_re(VIO *avio) override;
bool get_data(int i, void *data) override;
bool set_data(int i, void *data) override;
bool
is_ram_cache_hit() const override
{
ink_assert(vio.op == VIO::READ);
return !f.not_from_ram_cache;
}
int
get_header(void **ptr, int *len) override
{
if (first_buf) {
Doc *doc = (Doc *)first_buf->data();
*ptr = doc->hdr();
*len = doc->hlen;
return 0;
}
return -1;
}
int
set_header(void *ptr, int len) override
{
header_to_write = ptr;
header_to_write_len = len;
return 0;
}
int
get_single_data(void **ptr, int *len) override
{
if (first_buf) {
Doc *doc = (Doc *)first_buf->data();
if (doc->data_len() == doc->total_len) {
*ptr = doc->data();
*len = doc->data_len();
return 0;
}
}
return -1;
}
int
get_volume_number() const override
{
if (vol && vol->cache_vol) {
return vol->cache_vol->vol_number;
}
return -1;
}
const char *
get_disk_path() const override
{
if (vol && vol->disk) {
return vol->disk->path;
}
return nullptr;
}
bool
is_compressed_in_ram() const override
{
ink_assert(vio.op == VIO::READ);
return f.compressed_in_ram;
}
bool writer_done();
int calluser(int event);
int callcont(int event);
int die();
int dead(int event, Event *e);
int handleReadDone(int event, Event *e);
int handleRead(int event, Event *e);
int do_read_call(CacheKey *akey);
int handleWrite(int event, Event *e);
int handleWriteLock(int event, Event *e);
int do_write_call();
int do_write_lock();
int do_write_lock_call();
int do_sync(uint32_t target_write_serial);
int openReadClose(int event, Event *e);
int openReadReadDone(int event, Event *e);
int openReadMain(int event, Event *e);
int openReadStartEarliest(int event, Event *e);
int openReadVecWrite(int event, Event *e);
int openReadStartHead(int event, Event *e);
int openReadFromWriter(int event, Event *e);
int openReadFromWriterMain(int event, Event *e);
int openReadFromWriterFailure(int event, Event *);
int openReadChooseWriter(int event, Event *e);
int openReadDirDelete(int event, Event *e);
int openWriteCloseDir(int event, Event *e);
int openWriteCloseHeadDone(int event, Event *e);
int openWriteCloseHead(int event, Event *e);
int openWriteCloseDataDone(int event, Event *e);
int openWriteClose(int event, Event *e);
int openWriteRemoveVector(int event, Event *e);
int openWriteWriteDone(int event, Event *e);
int openWriteOverwrite(int event, Event *e);
int openWriteMain(int event, Event *e);
int openWriteStartDone(int event, Event *e);
int openWriteStartBegin(int event, Event *e);
int updateVector(int event, Event *e);
int updateReadDone(int event, Event *e);
int updateVecWrite(int event, Event *e);
int removeEvent(int event, Event *e);
int linkWrite(int event, Event *e);
int derefRead(int event, Event *e);
int scanVol(int event, Event *e);
int scanObject(int event, Event *e);
int scanUpdateDone(int event, Event *e);
int scanOpenWrite(int event, Event *e);
int scanRemoveDone(int event, Event *e);
int
is_io_in_progress()
{
return io.aiocb.aio_fildes != AIO_NOT_IN_PROGRESS;
}
void
set_io_not_in_progress()
{
io.aiocb.aio_fildes = AIO_NOT_IN_PROGRESS;
}
void
set_agg_write_in_progress()
{
io.aiocb.aio_fildes = AIO_AGG_WRITE_IN_PROGRESS;
}
int evacuateDocDone(int event, Event *e);
int evacuateReadHead(int event, Event *e);
void cancel_trigger();
int64_t get_object_size() override;
void set_http_info(CacheHTTPInfo *info) override;
void get_http_info(CacheHTTPInfo **info) override;
/** Get the fragment table.
@return The address of the start of the fragment table,
or @c nullptr if there is no fragment table.
*/
virtual HTTPInfo::FragOffset *get_frag_table();
/** Load alt pointers and do fixups if needed.
@return Length of header data used for alternates.
*/
virtual uint32_t load_http_info(CacheHTTPInfoVector *info, struct Doc *doc, RefCountObj *block_ptr = nullptr);
bool is_pread_capable() override;
bool set_pin_in_cache(time_t time_pin) override;
time_t get_pin_in_cache() override;
// offsets from the base stat
#define CACHE_STAT_ACTIVE 0
#define CACHE_STAT_SUCCESS 1
#define CACHE_STAT_FAILURE 2
// number of bytes to memset to 0 in the CacheVC when we free
// it. All member variables starting from vio are memset to 0.
// This variable is initialized in CacheVC constructor.
static int size_to_init;
// Start Region A
// This set of variables are not reset when the cacheVC is freed.
// A CacheVC must set these to the correct values whenever needed
// These are variables that are always set to the correct values
// before being used by the CacheVC
CacheKey key, first_key, earliest_key, update_key;
Dir dir, earliest_dir, overwrite_dir, first_dir;
// end Region A
// Start Region B
// These variables are individually cleared or reset when the
// CacheVC is freed. All these variables must be reset/cleared
// in free_CacheVC.
Action _action;
CacheHTTPHdr request;
CacheHTTPInfoVector vector;
CacheHTTPInfo alternate;
Ptr<IOBufferData> buf;
Ptr<IOBufferData> first_buf;
Ptr<IOBufferBlock> blocks; // data available to write
Ptr<IOBufferBlock> writer_buf;
OpenDirEntry *od = nullptr;
AIOCallbackInternal io;
int alternate_index = CACHE_ALT_INDEX_DEFAULT; // preferred position in vector
LINK(CacheVC, opendir_link);
#ifdef CACHE_STAT_PAGES
LINK(CacheVC, stat_link);
#endif
// end Region B
// Start Region C
// These variables are memset to 0 when the structure is freed.
// The size of this region is size_to_init which is initialized
// in the CacheVC constructor. It assumes that vio is the start
// of this region.
// NOTE: NOTE: NOTE: If vio is NOT the start, then CHANGE the
// size_to_init initialization
VIO vio;
CacheFragType frag_type;
CacheHTTPInfo *info;
CacheHTTPInfoVector *write_vector;
const OverridableHttpConfigParams *params;
int header_len; // for communicating with agg_copy
int frag_len; // for communicating with agg_copy
uint32_t write_len; // for communicating with agg_copy
uint32_t agg_len; // for communicating with aggWrite
uint32_t write_serial; // serial of the final write for SYNC
Vol *vol;
Dir *last_collision;
Event *trigger;
CacheKey *read_key;
ContinuationHandler save_handler;
uint32_t pin_in_cache;
ink_hrtime start_time;
int base_stat;
int recursive;
int closed;
uint64_t seek_to; // pread offset
int64_t offset; // offset into 'blocks' of data to write
int64_t writer_offset; // offset of the writer for reading from a writer
int64_t length; // length of data available to write
int64_t doc_pos; // read position in 'buf'
uint64_t write_pos; // length written
uint64_t total_len; // total length written and available to write
uint64_t doc_len; // total_length (of the selected alternate for HTTP)
uint64_t update_len;
int fragment;
int scan_msec_delay;
CacheVC *write_vc;
char *hostname;
int host_len;
int header_to_write_len;
void *header_to_write;
short writer_lock_retry;
union {
uint32_t flags;
struct {
unsigned int use_first_key : 1;
unsigned int overwrite : 1; // overwrite first_key Dir if it exists
unsigned int close_complete : 1; // WRITE_COMPLETE is final
unsigned int sync : 1; // write to be committed to durable storage before WRITE_COMPLETE
unsigned int evacuator : 1;
unsigned int single_fragment : 1;
unsigned int evac_vector : 1;
unsigned int lookup : 1;
unsigned int update : 1;
unsigned int remove : 1;
unsigned int remove_aborted_writers : 1;
unsigned int open_read_timeout : 1; // UNUSED
unsigned int data_done : 1;
unsigned int read_from_writer_called : 1;
unsigned int not_from_ram_cache : 1; // entire object was from ram cache
unsigned int rewrite_resident_alt : 1;
unsigned int readers : 1;
unsigned int doc_from_ram_cache : 1;
unsigned int hit_evacuate : 1;
unsigned int compressed_in_ram : 1; // compressed state in ram cache
unsigned int allow_empty_doc : 1; // used for cache empty http document
} f;
};
// BTF optimization used to skip reading stuff in cache partition that doesn't contain any
// dir entries.
char *scan_vol_map;
// BTF fix to handle objects that overlapped over two different reads,
// this is how much we need to back up the buffer to get the start of the overlapping object.
off_t scan_fix_buffer_offset;
// end region C
};
#define PUSH_HANDLER(_x) \
do { \
ink_assert(handler != (ContinuationHandler)(&CacheVC::dead)); \
save_handler = handler; \
handler = (ContinuationHandler)(_x); \
} while (0)
#define POP_HANDLER \
do { \
handler = save_handler; \
ink_assert(handler != (ContinuationHandler)(&CacheVC::dead)); \
} while (0)
struct CacheRemoveCont : public Continuation {
int event_handler(int event, void *data);
CacheRemoveCont() : Continuation(nullptr) {}
};
// Global Data
extern ClassAllocator<CacheVC> cacheVConnectionAllocator;
extern CacheKey zero_key;
extern CacheSync *cacheDirSync;
// Function Prototypes
int cache_write(CacheVC *, CacheHTTPInfoVector *);
int get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key);
CacheVC *new_DocEvacuator(int nbytes, Vol *d);
// inline Functions
TS_INLINE CacheVC *
new_CacheVC(Continuation *cont)
{
EThread *t = cont->mutex->thread_holding;
CacheVC *c = THREAD_ALLOC(cacheVConnectionAllocator, t);
c->vector.data.data = &c->vector.data.fast_data[0];
c->_action = cont;
c->mutex = cont->mutex;
c->start_time = Thread::get_hrtime();
c->setThreadAffinity(t);
ink_assert(c->trigger == nullptr);
Debug("cache_new", "new %p", c);
#ifdef CACHE_STAT_PAGES
ink_assert(!c->stat_link.next);
ink_assert(!c->stat_link.prev);
#endif
dir_clear(&c->dir);
return c;
}
TS_INLINE int
free_CacheVC(CacheVC *cont)
{
Debug("cache_free", "free %p", cont);
ProxyMutex *mutex = cont->mutex.get();
Vol *vol = cont->vol;
if (vol) {
CACHE_DECREMENT_DYN_STAT(cont->base_stat + CACHE_STAT_ACTIVE);
if (cont->closed > 0) {
CACHE_INCREMENT_DYN_STAT(cont->base_stat + CACHE_STAT_SUCCESS);
} // else abort,cancel
}
ink_assert(mutex->thread_holding == this_ethread());
if (cont->trigger) {
cont->trigger->cancel();
}
ink_assert(!cont->is_io_in_progress());
ink_assert(!cont->od);
/* calling cont->io.action = nullptr causes compile problem on 2.6 solaris
release build....weird??? For now, null out continuation and mutex
of the action separately */
cont->io.action.continuation = nullptr;
cont->io.action.mutex = nullptr;
cont->io.mutex.clear();
cont->io.aio_result = 0;
cont->io.aiocb.aio_nbytes = 0;
cont->request.reset();
cont->vector.clear();
cont->vio.buffer.clear();
cont->vio.mutex.clear();
if (cont->vio.op == VIO::WRITE && cont->alternate_index == CACHE_ALT_INDEX_DEFAULT) {
cont->alternate.destroy();
} else {
cont->alternate.clear();
}
cont->_action.cancelled = 0;
cont->_action.mutex.clear();
cont->mutex.clear();
cont->buf.clear();
cont->first_buf.clear();
cont->blocks.clear();
cont->writer_buf.clear();
cont->alternate_index = CACHE_ALT_INDEX_DEFAULT;
ats_free(cont->scan_vol_map);
memset((char *)&cont->vio, 0, cont->size_to_init);
#ifdef CACHE_STAT_PAGES
ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
#endif
#ifdef DEBUG
SET_CONTINUATION_HANDLER(cont, &CacheVC::dead);
#endif
THREAD_FREE(cont, cacheVConnectionAllocator, this_thread());
return EVENT_DONE;
}
TS_INLINE int
CacheVC::calluser(int event)
{
recursive++;
ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
vio.cont->handleEvent(event, (void *)&vio);
recursive--;
if (closed) {
die();
return EVENT_DONE;
}
return EVENT_CONT;
}
TS_INLINE int
CacheVC::callcont(int event)
{
recursive++;
ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
_action.continuation->handleEvent(event, this);
recursive--;
if (closed) {
die();
} else if (vio.vc_server) {
handleEvent(EVENT_IMMEDIATE, nullptr);
}
return EVENT_DONE;
}
TS_INLINE int
CacheVC::do_read_call(CacheKey *akey)
{
doc_pos = 0;
read_key = akey;
io.aiocb.aio_nbytes = dir_approx_size(&dir);
PUSH_HANDLER(&CacheVC::handleRead);
return handleRead(EVENT_CALL, nullptr);
}
TS_INLINE int
CacheVC::do_write_call()
{
PUSH_HANDLER(&CacheVC::handleWrite);
return handleWrite(EVENT_CALL, nullptr);
}
TS_INLINE void
CacheVC::cancel_trigger()
{
if (trigger) {
trigger->cancel_action();
trigger = nullptr;
}
}
TS_INLINE int
CacheVC::die()
{
if (vio.op == VIO::WRITE) {
if (f.update && total_len) {
alternate.object_key_set(earliest_key);
}
if (!is_io_in_progress()) {
SET_HANDLER(&CacheVC::openWriteClose);
if (!recursive) {
openWriteClose(EVENT_NONE, nullptr);
}
} // else catch it at the end of openWriteWriteDone
return EVENT_CONT;
} else {
if (is_io_in_progress()) {
save_handler = (ContinuationHandler)&CacheVC::openReadClose;
} else {
SET_HANDLER(&CacheVC::openReadClose);
if (!recursive) {
openReadClose(EVENT_NONE, nullptr);
}
}
return EVENT_CONT;
}
}
TS_INLINE int
CacheVC::handleWriteLock(int /* event ATS_UNUSED */, Event *e)
{
cancel_trigger();
int ret = 0;
{
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
set_agg_write_in_progress();
trigger = mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
return EVENT_CONT;
}
ret = handleWrite(EVENT_CALL, e);
}
if (ret == EVENT_RETURN) {
return handleEvent(AIO_EVENT_DONE, nullptr);
}
return EVENT_CONT;
}
TS_INLINE int
CacheVC::do_write_lock()
{
PUSH_HANDLER(&CacheVC::handleWriteLock);
return handleWriteLock(EVENT_NONE, nullptr);
}
TS_INLINE int
CacheVC::do_write_lock_call()
{
PUSH_HANDLER(&CacheVC::handleWriteLock);
return handleWriteLock(EVENT_CALL, nullptr);
}
TS_INLINE bool
CacheVC::writer_done()
{
OpenDirEntry *cod = od;
if (!cod) {
cod = vol->open_read(&first_key);
}
CacheVC *w = (cod) ? cod->writers.head : nullptr;
// If the write vc started after the reader, then its not the
// original writer, since we never choose a writer that started
// after the reader. The original writer was deallocated and then
// reallocated for the same first_key
for (; w && (w != write_vc || w->start_time > start_time); w = (CacheVC *)w->opendir_link.next) {
;
}
if (!w) {
return true;
}
return false;
}
TS_INLINE int
Vol::close_write(CacheVC *cont)
{
#ifdef CACHE_STAT_PAGES
ink_assert(stat_cache_vcs.head);
stat_cache_vcs.remove(cont, cont->stat_link);
ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
#endif
return open_dir.close_write(cont);
}
// Returns 0 on success or a positive error code on failure
TS_INLINE int
Vol::open_write(CacheVC *cont, int allow_if_writers, int max_writers)
{
Vol *vol = this;
bool agg_error = false;
if (!cont->f.remove) {
agg_error = (!cont->f.update && agg_todo_size > cache_config_agg_write_backlog);
#ifdef CACHE_AGG_FAIL_RATE
agg_error = agg_error || ((uint32_t)mutex->thread_holding->generator.random() < (uint32_t)(UINT_MAX * CACHE_AGG_FAIL_RATE));
#endif
}
if (agg_error) {
CACHE_INCREMENT_DYN_STAT(cache_write_backlog_failure_stat);
return ECACHE_WRITE_FAIL;
}
if (open_dir.open_write(cont, allow_if_writers, max_writers)) {
#ifdef CACHE_STAT_PAGES
ink_assert(cont->mutex->thread_holding == this_ethread());
ink_assert(!cont->stat_link.next && !cont->stat_link.prev);
stat_cache_vcs.enqueue(cont, cont->stat_link);
#endif
return 0;
}
return ECACHE_DOC_BUSY;
}
TS_INLINE int
Vol::close_write_lock(CacheVC *cont)
{
EThread *t = cont->mutex->thread_holding;
CACHE_TRY_LOCK(lock, mutex, t);
if (!lock.is_locked()) {
return -1;
}
return close_write(cont);
}
TS_INLINE int
Vol::open_write_lock(CacheVC *cont, int allow_if_writers, int max_writers)
{
EThread *t = cont->mutex->thread_holding;
CACHE_TRY_LOCK(lock, mutex, t);
if (!lock.is_locked()) {
return -1;
}
return open_write(cont, allow_if_writers, max_writers);
}
TS_INLINE OpenDirEntry *
Vol::open_read_lock(CryptoHash *key, EThread *t)
{
CACHE_TRY_LOCK(lock, mutex, t);
if (!lock.is_locked()) {
return nullptr;
}
return open_dir.open_read(key);
}
TS_INLINE int
Vol::begin_read_lock(CacheVC *cont)
{
// no need for evacuation as the entire document is already in memory
#ifndef CACHE_STAT_PAGES
if (cont->f.single_fragment) {
return 0;
}
#endif
// VC is enqueued in stat_cache_vcs in the begin_read call
EThread *t = cont->mutex->thread_holding;
CACHE_TRY_LOCK(lock, mutex, t);
if (!lock.is_locked()) {
return -1;
}
return begin_read(cont);
}
TS_INLINE int
Vol::close_read_lock(CacheVC *cont)
{
EThread *t = cont->mutex->thread_holding;
CACHE_TRY_LOCK(lock, mutex, t);
if (!lock.is_locked()) {
return -1;
}
return close_read(cont);
}
TS_INLINE int
dir_delete_lock(CacheKey *key, Vol *d, ProxyMutex *m, Dir *del)
{
EThread *thread = m->thread_holding;
CACHE_TRY_LOCK(lock, d->mutex, thread);
if (!lock.is_locked()) {
return -1;
}
return dir_delete(key, d, del);
}
TS_INLINE int
dir_insert_lock(CacheKey *key, Vol *d, Dir *to_part, ProxyMutex *m)
{
EThread *thread = m->thread_holding;
CACHE_TRY_LOCK(lock, d->mutex, thread);
if (!lock.is_locked()) {
return -1;
}
return dir_insert(key, d, to_part);
}
TS_INLINE int
dir_overwrite_lock(CacheKey *key, Vol *d, Dir *to_part, ProxyMutex *m, Dir *overwrite, bool must_overwrite = true)
{
EThread *thread = m->thread_holding;
CACHE_TRY_LOCK(lock, d->mutex, thread);
if (!lock.is_locked()) {
return -1;
}
return dir_overwrite(key, d, to_part, overwrite, must_overwrite);
}
void TS_INLINE
rand_CacheKey(CacheKey *next_key, Ptr<ProxyMutex> &mutex)
{
next_key->b[0] = mutex->thread_holding->generator.random();
next_key->b[1] = mutex->thread_holding->generator.random();
}
extern uint8_t CacheKey_next_table[];
void TS_INLINE
next_CacheKey(CacheKey *next_key, CacheKey *key)
{
uint8_t *b = (uint8_t *)next_key;
uint8_t *k = (uint8_t *)key;
b[0] = CacheKey_next_table[k[0]];
for (int i = 1; i < 16; i++) {
b[i] = CacheKey_next_table[(b[i - 1] + k[i]) & 0xFF];
}
}
extern uint8_t CacheKey_prev_table[];
void TS_INLINE
prev_CacheKey(CacheKey *prev_key, CacheKey *key)
{
uint8_t *b = (uint8_t *)prev_key;
uint8_t *k = (uint8_t *)key;
for (int i = 15; i > 0; i--) {
b[i] = 256 + CacheKey_prev_table[k[i]] - k[i - 1];
}
b[0] = CacheKey_prev_table[k[0]];
}
TS_INLINE unsigned int
next_rand(unsigned int *p)
{
unsigned int seed = *p;
seed = 1103515145 * seed + 12345;
*p = seed;
return seed;
}
extern ClassAllocator<CacheRemoveCont> cacheRemoveContAllocator;
TS_INLINE CacheRemoveCont *
new_CacheRemoveCont()
{
CacheRemoveCont *cache_rm = cacheRemoveContAllocator.alloc();
cache_rm->mutex = new_ProxyMutex();
SET_CONTINUATION_HANDLER(cache_rm, &CacheRemoveCont::event_handler);
return cache_rm;
}
TS_INLINE void
free_CacheRemoveCont(CacheRemoveCont *cache_rm)
{
cache_rm->mutex = nullptr;
cacheRemoveContAllocator.free(cache_rm);
}
TS_INLINE int
CacheRemoveCont::event_handler(int event, void *data)
{
(void)event;
(void)data;
free_CacheRemoveCont(this);
return EVENT_DONE;
}
int64_t cache_bytes_used();
int64_t cache_bytes_total();
#ifdef DEBUG
#define CACHE_DEBUG_INCREMENT_DYN_STAT(_x) CACHE_INCREMENT_DYN_STAT(_x)
#define CACHE_DEBUG_SUM_DYN_STAT(_x, _y) CACHE_SUM_DYN_STAT(_x, _y)
#else
#define CACHE_DEBUG_INCREMENT_DYN_STAT(_x)
#define CACHE_DEBUG_SUM_DYN_STAT(_x, _y)
#endif
struct CacheHostRecord;
struct Vol;
class CacheHostTable;
struct Cache {
int cache_read_done = 0;
int total_good_nvol = 0;
int total_nvol = 0;
int ready = CACHE_INITIALIZING;
int64_t cache_size = 0; // in store block size
CacheHostTable *hosttable = nullptr;
int total_initialized_vol = 0;
CacheType scheme = CACHE_NONE_TYPE;
int open(bool reconfigure, bool fix);
int close();
Action *lookup(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int host_len);
Action *open_read(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int len);
Action *open_write(Continuation *cont, const CacheKey *key, CacheFragType frag_type, int options = 0,
time_t pin_in_cache = (time_t)0, const char *hostname = nullptr, int host_len = 0);
Action *remove(Continuation *cont, const CacheKey *key, CacheFragType type = CACHE_FRAG_TYPE_HTTP, const char *hostname = nullptr,
int host_len = 0);
Action *scan(Continuation *cont, const char *hostname = nullptr, int host_len = 0, int KB_per_second = 2500);
Action *open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request, const OverridableHttpConfigParams *params,
CacheFragType type, const char *hostname, int host_len);
Action *open_write(Continuation *cont, const CacheKey *key, CacheHTTPInfo *old_info, time_t pin_in_cache = (time_t)0,
const CacheKey *key1 = nullptr, CacheFragType type = CACHE_FRAG_TYPE_HTTP, const char *hostname = nullptr,
int host_len = 0);
static void generate_key(CryptoHash *hash, CacheURL *url);
static void generate_key(HttpCacheKey *hash, CacheURL *url, cache_generation_t generation = -1);
Action *link(Continuation *cont, const CacheKey *from, const CacheKey *to, CacheFragType type, const char *hostname,
int host_len);
Action *deref(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int host_len);
void vol_initialized(bool result);
int open_done();
Vol *key_to_vol(const CacheKey *key, const char *hostname, int host_len);
Cache() {}
};
extern Cache *theCache;
extern Cache *caches[NUM_CACHE_FRAG_TYPES];
TS_INLINE void
Cache::generate_key(CryptoHash *hash, CacheURL *url)
{
url->hash_get(hash);
}
TS_INLINE void
Cache::generate_key(HttpCacheKey *key, CacheURL *url, cache_generation_t generation)
{
key->hostname = url->host_get(&key->hostlen);
url->hash_get(&key->hash, generation);
}
TS_INLINE unsigned int
cache_hash(const CryptoHash &hash)
{
uint64_t f = hash.fold();
unsigned int mhash = (unsigned int)(f >> 32);
return mhash;
}
LINK_DEFINITION(CacheVC, opendir_link)