blob: cff181864d32de3dbddf22a4304ce803a89ff8e2 [file] [log] [blame]
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "P_Cache.h"
#define UINT_WRAP_LTE(_x, _y) (((_y) - (_x)) < INT_MAX) // exploit overflow
#define UINT_WRAP_GTE(_x, _y) (((_x) - (_y)) < INT_MAX) // exploit overflow
#define UINT_WRAP_LT(_x, _y) (((_x) - (_y)) >= INT_MAX) // exploit overflow
// Given a key, finds the index of the alternate which matches
// used to get the alternate which is actually present in the document
int
get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key)
{
int alt_count = cache_vector->count();
CacheHTTPInfo *obj;
if (!alt_count) {
return -1;
}
for (int i = 0; i < alt_count; i++) {
obj = cache_vector->get(i);
if (obj->compare_object_key(&key)) {
// Debug("cache_key", "Resident alternate key %X", key.slice32(0));
return i;
}
}
return -1;
}
// Adds/Deletes alternate to the od->vector (write_vector). If the vector
// is empty, deletes the directory entry pointing to the vector. Each
// CacheVC must write the vector down to disk after making changes. If we
// wait till the last writer, that writer will have the responsibility of
// of writing the vector even if the http state machine aborts. This
// makes it easier to handle situations where writers abort.
int
CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
cancel_trigger();
if (od->reading_vec || od->writing_vec) {
VC_SCHED_LOCK_RETRY();
}
int ret = 0;
{
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked() || od->writing_vec) {
VC_SCHED_LOCK_RETRY();
}
int vec = alternate.valid();
if (f.update) {
// all Update cases. Need to get the alternate index.
alternate_index = get_alternate_index(write_vector, update_key);
Debug("cache_update", "updating alternate index %d frags %d", alternate_index,
alternate_index >= 0 ? write_vector->get(alternate_index)->get_frag_offset_count() : -1);
// if its an alternate delete
if (!vec) {
ink_assert(!total_len);
if (alternate_index >= 0) {
write_vector->remove(alternate_index, true);
alternate_index = CACHE_ALT_REMOVED;
if (!write_vector->count()) {
dir_delete(&first_key, vol, &od->first_dir);
}
}
// the alternate is not there any more. somebody might have
// deleted it. Just close this writer
if (alternate_index != CACHE_ALT_REMOVED || !write_vector->count()) {
SET_HANDLER(&CacheVC::openWriteCloseDir);
return openWriteCloseDir(EVENT_IMMEDIATE, nullptr);
}
}
if (update_key == od->single_doc_key && (total_len || f.allow_empty_doc || !vec)) {
od->move_resident_alt = false;
}
}
if (cache_config_http_max_alts > 1 && write_vector->count() >= cache_config_http_max_alts && alternate_index < 0) {
if (od->move_resident_alt && get_alternate_index(write_vector, od->single_doc_key) == 0) {
od->move_resident_alt = false;
}
write_vector->remove(0, true);
}
if (vec) {
/* preserve fragment offset data from old info. This method is
called iff the update is a header only update so the fragment
data should remain valid.
*/
// If we are not in header only updating case. Don't copy fragments.
if (alternate_index >= 0 &&
((total_len == 0 && alternate.get_frag_offset_count() == 0) && !(f.allow_empty_doc && this->vio.nbytes == 0))) {
alternate.copy_frag_offsets_from(write_vector->get(alternate_index));
}
alternate_index = write_vector->insert(&alternate, alternate_index);
}
if (od->move_resident_alt && first_buf.get() && !od->has_multiple_writers()) {
Doc *doc = reinterpret_cast<Doc *>(first_buf->data());
int small_doc = static_cast<int64_t>(doc->data_len()) < static_cast<int64_t>(cache_config_alt_rewrite_max_size);
int have_res_alt = doc->key == od->single_doc_key;
// if the new alternate is not written with the vector
// then move the old one with the vector
// if its a header only update move the resident alternate
// with the vector.
// We are sure that the body of the resident alternate that we are
// rewriting has not changed and the alternate is not being deleted,
// since we set od->move_resident_alt to 0 in that case
// (in updateVector)
if (small_doc && have_res_alt && (fragment || (f.update && !total_len))) {
// for multiple fragment document, we must have done
// CacheVC:openWriteCloseDataDone
ink_assert(!fragment || f.data_done);
od->move_resident_alt = false;
f.rewrite_resident_alt = 1;
write_len = doc->data_len();
Debug("cache_update_alt", "rewriting resident alt size: %d key: %X, first_key: %X", write_len, doc->key.slice32(0),
first_key.slice32(0));
}
}
header_len = write_vector->marshal_length();
od->writing_vec = true;
f.use_first_key = 1;
SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
ret = do_write_call();
}
if (ret == EVENT_RETURN) {
return handleEvent(AIO_EVENT_DONE, nullptr);
}
return ret;
}
/*
The following fields of the CacheVC are used when writing down a fragment.
Make sure that each of the fields is set to a valid value before calling
this function
- frag_type. Checked to see if a vector needs to be marshalled.
- f.use_first_key. To decide if the vector should be marshalled and to set
the doc->key to the appropriate key (first_key or earliest_key)
- f.evac_vector. If set, the writer is pushed in the beginning of the
agg queue. And if !f.evac_vector && !f.update the alternate->object_size
is set to vc->total_len
- f.readers. If set, assumes that this is an evacuation, so the write
is not aborted even if vol->agg_todo_size > agg_write_backlog
- f.evacuator. If this is an evacuation.
- f.rewrite_resident_alt. The resident alternate is rewritten.
- f.update. Used only if the write_vector needs to be written to disk.
Used to set the length of the alternate to total_len.
- write_vector. Used only if frag_type == CACHE_FRAG_TYPE_HTTP &&
(f.use_fist_key || f.evac_vector) is set. Write_vector is written to disk
- alternate_index. Used only if write_vector needs to be written to disk.
Used to find out the VC's alternate in the write_vector and set its
length to tatal_len.
- write_len. The number of bytes for this fragment.
- total_len. The total number of bytes for the document so far.
Doc->total_len and alternate's total len is set to this value.
- first_key. Doc's first_key is set to this value.
- pin_in_cache. Doc's pinned value is set to this + Thread::get_hrtime().
- earliest_key. If f.use_first_key, Doc's key is set to this value.
- key. If !f.use_first_key, Doc's key is set to this value.
- blocks. Used only if write_len is set. Data to be written
- offset. Used only if write_len is set. offset into the block to copy
the data from.
- buf. Used only if f.evacuator is set. Should point to the old document.
The functions sets the length, offset, pinned, head and phase of vc->dir.
*/
int
CacheVC::handleWrite(int event, Event * /* e ATS_UNUSED */)
{
// plain write case
ink_assert(!trigger);
frag_len = 0;
set_agg_write_in_progress();
POP_HANDLER;
agg_len = vol->round_to_approx_size(write_len + header_len + frag_len + sizeof(Doc));
vol->agg_todo_size += agg_len;
bool agg_error = (agg_len > AGG_SIZE || header_len + sizeof(Doc) > MAX_FRAG_SIZE ||
(!f.readers && (vol->agg_todo_size > cache_config_agg_write_backlog + AGG_SIZE) && write_len));
#ifdef CACHE_AGG_FAIL_RATE
agg_error = agg_error || ((uint32_t)mutex->thread_holding->generator.random() < (uint32_t)(UINT_MAX * CACHE_AGG_FAIL_RATE));
#endif
bool max_doc_error = (cache_config_max_doc_size && (cache_config_max_doc_size < vio.ndone ||
(vio.nbytes != INT64_MAX && (cache_config_max_doc_size < vio.nbytes))));
if (agg_error || max_doc_error) {
CACHE_INCREMENT_DYN_STAT(cache_write_backlog_failure_stat);
CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
vol->agg_todo_size -= agg_len;
io.aio_result = AIO_SOFT_FAILURE;
if (event == EVENT_CALL) {
return EVENT_RETURN;
}
return handleEvent(AIO_EVENT_DONE, nullptr);
}
ink_assert(agg_len <= AGG_SIZE);
if (f.evac_vector) {
vol->agg.push(this);
} else {
vol->agg.enqueue(this);
}
if (!vol->is_io_in_progress()) {
return vol->aggWrite(event, this);
}
return EVENT_CONT;
}
static char *
iobufferblock_memcpy(char *p, int len, IOBufferBlock *ab, int offset)
{
IOBufferBlock *b = ab;
while (b && len >= 0) {
char *start = b->_start;
char *end = b->_end;
int max_bytes = end - start;
max_bytes -= offset;
if (max_bytes <= 0) {
offset = -max_bytes;
b = b->next.get();
continue;
}
int bytes = len;
if (bytes >= max_bytes) {
bytes = max_bytes;
}
::memcpy(p, start + offset, bytes);
p += bytes;
len -= bytes;
b = b->next.get();
offset = 0;
}
return p;
}
EvacuationBlock *
Vol::force_evacuate_head(Dir *evac_dir, int pinned)
{
// build an evacuation block for the object
EvacuationBlock *b = evacuation_block_exists(evac_dir, this);
// if we have already started evacuating this document, its too late
// to evacuate the head...bad luck
if (b && b->f.done) {
return b;
}
if (!b) {
b = new_EvacuationBlock(mutex->thread_holding);
b->dir = *evac_dir;
DDebug("cache_evac", "force: %d, %d", (int)dir_offset(evac_dir), (int)dir_phase(evac_dir));
evacuate[dir_evac_bucket(evac_dir)].push(b);
}
b->f.pinned = pinned;
b->f.evacuate_head = 1;
b->evac_frags.key = zero_key; // ensure that the block gets
// evacuated no matter what
b->readers = 0; // ensure that the block does not disappear
return b;
}
void
Vol::scan_for_pinned_documents()
{
if (cache_config_permit_pinning) {
// we can't evacuate anything between header->write_pos and
// header->write_pos + AGG_SIZE.
int ps = this->offset_to_vol_offset(header->write_pos + AGG_SIZE);
int pe = this->offset_to_vol_offset(header->write_pos + 2 * EVACUATION_SIZE + (len / PIN_SCAN_EVERY));
int vol_end_offset = this->offset_to_vol_offset(len + skip);
int before_end_of_vol = pe < vol_end_offset;
DDebug("cache_evac", "scan %d %d", ps, pe);
for (int i = 0; i < this->direntries(); i++) {
// is it a valid pinned object?
if (!dir_is_empty(&dir[i]) && dir_pinned(&dir[i]) && dir_head(&dir[i])) {
// select objects only within this PIN_SCAN region
int o = dir_offset(&dir[i]);
if (dir_phase(&dir[i]) == header->phase) {
if (before_end_of_vol || o >= (pe - vol_end_offset)) {
continue;
}
} else {
if (o < ps || o >= pe) {
continue;
}
}
force_evacuate_head(&dir[i], 1);
// DDebug("cache_evac", "scan pinned at offset %d %d %d %d %d %d",
// (int)dir_offset(&b->dir), ps, o , pe, i, (int)b->f.done);
}
}
}
}
/* NOTE:: This state can be called by an AIO thread, so DON'T DON'T
DON'T schedule any events on this thread using VC_SCHED_XXX or
mutex->thread_holding->schedule_xxx_local(). ALWAYS use
eventProcessor.schedule_xxx().
*/
int
Vol::aggWriteDone(int event, Event *e)
{
cancel_trigger();
// ensure we have the cacheDirSync lock if we intend to call it later
// retaking the current mutex recursively is a NOOP
CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, mutex->thread_holding);
if (!lock.is_locked()) {
eventProcessor.schedule_in(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
return EVENT_CONT;
}
if (io.ok()) {
header->last_write_pos = header->write_pos;
header->write_pos += io.aiocb.aio_nbytes;
ink_assert(header->write_pos >= start);
DDebug("cache_agg", "Dir %s, Write: %" PRIu64 ", last Write: %" PRIu64 "", hash_text.get(), header->write_pos,
header->last_write_pos);
ink_assert(header->write_pos == header->agg_pos);
if (header->write_pos + EVACUATION_SIZE > scan_pos) {
periodic_scan();
}
agg_buf_pos = 0;
header->write_serial++;
} else {
// delete all the directory entries that we inserted
// for fragments is this aggregation buffer
Debug("cache_disk_error", "Write error on disk %s\n \
write range : [%" PRIu64 " - %" PRIu64 " bytes] [%" PRIu64 " - %" PRIu64 " blocks] \n",
hash_text.get(), (uint64_t)io.aiocb.aio_offset, (uint64_t)io.aiocb.aio_offset + io.aiocb.aio_nbytes,
(uint64_t)io.aiocb.aio_offset / CACHE_BLOCK_SIZE,
(uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / CACHE_BLOCK_SIZE);
Dir del_dir;
dir_clear(&del_dir);
for (int done = 0; done < agg_buf_pos;) {
Doc *doc = reinterpret_cast<Doc *>(agg_buffer + done);
dir_set_offset(&del_dir, header->write_pos + done);
dir_delete(&doc->key, this, &del_dir);
done += round_to_approx_size(doc->len);
}
agg_buf_pos = 0;
}
set_io_not_in_progress();
// callback ready sync CacheVCs
CacheVC *c = nullptr;
while ((c = sync.dequeue())) {
if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial)) {
eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
} else {
sync.push(c); // put it back on the front
break;
}
}
if (dir_sync_waiting) {
dir_sync_waiting = false;
cacheDirSync->handleEvent(EVENT_IMMEDIATE, nullptr);
}
if (agg.head || sync.head) {
return aggWrite(event, e);
}
return EVENT_CONT;
}
CacheVC *
new_DocEvacuator(int nbytes, Vol *vol)
{
CacheVC *c = new_CacheVC(vol);
ProxyMutex *mutex = vol->mutex.get();
c->base_stat = cache_evacuate_active_stat;
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
c->buf = new_IOBufferData(iobuffer_size_to_index(nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
c->vol = vol;
c->f.evacuator = 1;
c->earliest_key = zero_key;
SET_CONTINUATION_HANDLER(c, &CacheVC::evacuateDocDone);
return c;
}
int
CacheVC::evacuateReadHead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
// The evacuator vc shares the lock with the volition mutex
ink_assert(vol->mutex->thread_holding == this_ethread());
cancel_trigger();
Doc *doc = reinterpret_cast<Doc *>(buf->data());
CacheHTTPInfo *alternate_tmp = nullptr;
if (!io.ok()) {
goto Ldone;
}
// a directory entry which is no longer valid may have been overwritten
if (!dir_valid(vol, &dir)) {
last_collision = nullptr;
goto Lcollision;
}
if (doc->magic != DOC_MAGIC || !(doc->first_key == first_key)) {
goto Lcollision;
}
alternate_tmp = nullptr;
if (doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen) {
// its an http document
if (this->load_http_info(&vector, doc) != doc->hlen) {
Note("bad vector detected during evacuation");
goto Ldone;
}
alternate_index = get_alternate_index(&vector, earliest_key);
if (alternate_index < 0) {
goto Ldone;
}
alternate_tmp = vector.get(alternate_index);
doc_len = alternate_tmp->object_size_get();
Debug("cache_evac", "evacuateReadHead http earliest %X first: %X len: %" PRId64, first_key.slice32(0), earliest_key.slice32(0),
doc_len);
} else {
// non-http document
CacheKey next_key;
next_CacheKey(&next_key, &doc->key);
if (!(next_key == earliest_key)) {
goto Ldone;
}
doc_len = doc->total_len;
DDebug("cache_evac", "evacuateReadHead non-http earliest %X first: %X len: %" PRId64, first_key.slice32(0),
earliest_key.slice32(0), doc_len);
}
if (doc_len == total_len) {
// the whole document has been evacuated. Insert the directory
// entry in the directory.
dir_lookaside_fixup(&earliest_key, vol);
return free_CacheVC(this);
}
return EVENT_CONT;
Lcollision:
if (dir_probe(&first_key, vol, &dir, &last_collision)) {
int ret = do_read_call(&first_key);
if (ret == EVENT_RETURN) {
return handleEvent(AIO_EVENT_DONE, nullptr);
}
return ret;
}
Ldone:
dir_lookaside_remove(&earliest_key, vol);
return free_CacheVC(this);
}
int
CacheVC::evacuateDocDone(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
ink_assert(vol->mutex->thread_holding == this_ethread());
Doc *doc = reinterpret_cast<Doc *>(buf->data());
DDebug("cache_evac", "evacuateDocDone %X o %d p %d new_o %d new_p %d", (int)key.slice32(0), (int)dir_offset(&overwrite_dir),
(int)dir_phase(&overwrite_dir), (int)dir_offset(&dir), (int)dir_phase(&dir));
int i = dir_evac_bucket(&overwrite_dir);
// nasty beeping race condition, need to have the EvacuationBlock here
EvacuationBlock *b = vol->evacuate[i].head;
for (; b; b = b->link.next) {
if (dir_offset(&b->dir) == dir_offset(&overwrite_dir)) {
// If the document is single fragment (although not tied to the vector),
// then we don't have to put the directory entry in the lookaside
// buffer. But, we have no way of finding out if the document is
// single fragment. doc->single_fragment() can be true for a multiple
// fragment document since total_len and doc->len could be equal at
// the time we write the fragment down. To be on the safe side, we
// only overwrite the entry in the directory if its not a head.
if (!dir_head(&overwrite_dir)) {
// find the earliest key
EvacuationKey *evac = &b->evac_frags;
for (; evac && !(evac->key == doc->key); evac = evac->link.next) {
;
}
ink_assert(evac);
if (!evac) {
break;
}
if (evac->earliest_key.fold()) {
DDebug("cache_evac", "evacdocdone: evacuating key %X earliest %X", evac->key.slice32(0), evac->earliest_key.slice32(0));
EvacuationBlock *eblock = nullptr;
Dir dir_tmp;
dir_lookaside_probe(&evac->earliest_key, vol, &dir_tmp, &eblock);
if (eblock) {
CacheVC *earliest_evac = eblock->earliest_evacuator;
earliest_evac->total_len += doc->data_len();
if (earliest_evac->total_len == earliest_evac->doc_len) {
dir_lookaside_fixup(&evac->earliest_key, vol);
free_CacheVC(earliest_evac);
}
}
}
dir_overwrite(&doc->key, vol, &dir, &overwrite_dir);
}
// if the tag in the overwrite_dir matches the first_key in the
// document, then it has to be the vector. We guarantee that
// the first_key and the earliest_key will never collide (see
// Cache::open_write). Once we know its the vector, we can
// safely overwrite the first_key in the directory.
if (dir_head(&overwrite_dir) && b->f.evacuate_head) {
DDebug("cache_evac", "evacuateDocDone evacuate_head %X %X hlen %d offset %d", (int)key.slice32(0), (int)doc->key.slice32(0),
doc->hlen, (int)dir_offset(&overwrite_dir));
if (dir_compare_tag(&overwrite_dir, &doc->first_key)) {
OpenDirEntry *cod;
DDebug("cache_evac", "evacuating vector: %X %d", (int)doc->first_key.slice32(0), (int)dir_offset(&overwrite_dir));
if ((cod = vol->open_read(&doc->first_key))) {
// writer exists
DDebug("cache_evac", "overwriting the open directory %X %d %d", (int)doc->first_key.slice32(0),
(int)dir_offset(&cod->first_dir), (int)dir_offset(&dir));
cod->first_dir = dir;
}
if (dir_overwrite(&doc->first_key, vol, &dir, &overwrite_dir)) {
int64_t o = dir_offset(&overwrite_dir), n = dir_offset(&dir);
vol->ram_cache->fixup(&doc->first_key, static_cast<uint32_t>(o >> 32), static_cast<uint32_t>(o),
static_cast<uint32_t>(n >> 32), static_cast<uint32_t>(n));
}
} else {
DDebug("cache_evac", "evacuating earliest: %X %d", (int)doc->key.slice32(0), (int)dir_offset(&overwrite_dir));
ink_assert(dir_compare_tag(&overwrite_dir, &doc->key));
ink_assert(b->earliest_evacuator == this);
total_len += doc->data_len();
first_key = doc->first_key;
earliest_dir = dir;
if (dir_probe(&first_key, vol, &dir, &last_collision) > 0) {
dir_lookaside_insert(b, vol, &earliest_dir);
// read the vector
SET_HANDLER(&CacheVC::evacuateReadHead);
int ret = do_read_call(&first_key);
if (ret == EVENT_RETURN) {
return handleEvent(AIO_EVENT_DONE, nullptr);
}
return ret;
}
}
}
break;
}
}
return free_CacheVC(this);
}
static int
evacuate_fragments(CacheKey *key, CacheKey *earliest_key, int force, Vol *vol)
{
Dir dir, *last_collision = nullptr;
int i = 0;
while (dir_probe(key, vol, &dir, &last_collision)) {
// next fragment cannot be a head...if it is, it must have been a
// directory collision.
if (dir_head(&dir)) {
continue;
}
EvacuationBlock *b = evacuation_block_exists(&dir, vol);
if (!b) {
b = new_EvacuationBlock(vol->mutex->thread_holding);
b->dir = dir;
b->evac_frags.key = *key;
b->evac_frags.earliest_key = *earliest_key;
vol->evacuate[dir_evac_bucket(&dir)].push(b);
i++;
} else {
ink_assert(dir_offset(&dir) == dir_offset(&b->dir));
ink_assert(dir_phase(&dir) == dir_phase(&b->dir));
EvacuationKey *evac_frag = evacuationKeyAllocator.alloc();
evac_frag->key = *key;
evac_frag->earliest_key = *earliest_key;
evac_frag->link.next = b->evac_frags.link.next;
b->evac_frags.link.next = evac_frag;
}
if (force) {
b->readers = 0;
}
DDebug("cache_evac", "next fragment %X Earliest: %X offset %d phase %d force %d", (int)key->slice32(0),
(int)earliest_key->slice32(0), (int)dir_offset(&dir), (int)dir_phase(&dir), force);
}
return i;
}
int
Vol::evacuateWrite(CacheVC *evacuator, int event, Event *e)
{
// push to front of aggregation write list, so it is written first
evacuator->agg_len = round_to_approx_size((reinterpret_cast<Doc *>(evacuator->buf->data()))->len);
agg_todo_size += evacuator->agg_len;
/* insert the evacuator after all the other evacuators */
CacheVC *cur = static_cast<CacheVC *>(agg.head);
CacheVC *after = nullptr;
for (; cur && cur->f.evacuator; cur = (CacheVC *)cur->link.next) {
after = cur;
}
ink_assert(evacuator->agg_len <= AGG_SIZE);
agg.insert(evacuator, after);
return aggWrite(event, e);
}
int
Vol::evacuateDocReadDone(int event, Event *e)
{
cancel_trigger();
if (event != AIO_EVENT_DONE) {
return EVENT_DONE;
}
ink_assert(is_io_in_progress());
set_io_not_in_progress();
ink_assert(mutex->thread_holding == this_ethread());
Doc *doc = reinterpret_cast<Doc *>(doc_evacuator->buf->data());
CacheKey next_key;
EvacuationBlock *b = nullptr;
if (doc->magic != DOC_MAGIC) {
Debug("cache_evac", "DOC magic: %X %d", (int)dir_tag(&doc_evacuator->overwrite_dir),
(int)dir_offset(&doc_evacuator->overwrite_dir));
ink_assert(doc->magic == DOC_MAGIC);
goto Ldone;
}
DDebug("cache_evac", "evacuateDocReadDone %X offset %d", (int)doc->key.slice32(0),
(int)dir_offset(&doc_evacuator->overwrite_dir));
b = evacuate[dir_evac_bucket(&doc_evacuator->overwrite_dir)].head;
while (b) {
if (dir_offset(&b->dir) == dir_offset(&doc_evacuator->overwrite_dir)) {
break;
}
b = b->link.next;
}
if (!b) {
goto Ldone;
}
if ((b->f.pinned && !b->readers) && doc->pinned < static_cast<uint32_t>(Thread::get_hrtime() / HRTIME_SECOND)) {
goto Ldone;
}
if (dir_head(&b->dir) && b->f.evacuate_head) {
ink_assert(!b->evac_frags.key.fold());
// if its a head (vector), evacuation is real simple...we just
// need to write this vector down and overwrite the directory entry.
if (dir_compare_tag(&b->dir, &doc->first_key)) {
doc_evacuator->key = doc->first_key;
b->evac_frags.key = doc->first_key;
DDebug("cache_evac", "evacuating vector %X offset %d", (int)doc->first_key.slice32(0),
(int)dir_offset(&doc_evacuator->overwrite_dir));
b->f.unused = 57;
} else {
// if its an earliest fragment (alternate) evacuation, things get
// a little tricky. We have to propagate the earliest key to the next
// fragments for this alternate. The last fragment to be evacuated
// fixes up the lookaside buffer.
doc_evacuator->key = doc->key;
doc_evacuator->earliest_key = doc->key;
b->evac_frags.key = doc->key;
b->evac_frags.earliest_key = doc->key;
b->earliest_evacuator = doc_evacuator;
DDebug("cache_evac", "evacuating earliest %X %X evac: %p offset: %d", (int)b->evac_frags.key.slice32(0),
(int)doc->key.slice32(0), doc_evacuator, (int)dir_offset(&doc_evacuator->overwrite_dir));
b->f.unused = 67;
}
} else {
// find which key matches the document
EvacuationKey *ek = &b->evac_frags;
for (; ek && !(ek->key == doc->key); ek = ek->link.next) {
;
}
if (!ek) {
b->f.unused = 77;
goto Ldone;
}
doc_evacuator->key = ek->key;
doc_evacuator->earliest_key = ek->earliest_key;
DDebug("cache_evac", "evacuateDocReadDone key: %X earliest: %X", (int)ek->key.slice32(0), (int)ek->earliest_key.slice32(0));
b->f.unused = 87;
}
// if the tag in the c->dir does match the first_key in the
// document, then it has to be the earliest fragment. We guarantee that
// the first_key and the earliest_key will never collide (see
// Cache::open_write).
if (!dir_head(&b->dir) || !dir_compare_tag(&b->dir, &doc->first_key)) {
next_CacheKey(&next_key, &doc->key);
evacuate_fragments(&next_key, &doc_evacuator->earliest_key, !b->readers, this);
}
return evacuateWrite(doc_evacuator, event, e);
Ldone:
free_CacheVC(doc_evacuator);
doc_evacuator = nullptr;
return aggWrite(event, e);
}
int
Vol::evac_range(off_t low, off_t high, int evac_phase)
{
off_t s = this->offset_to_vol_offset(low);
off_t e = this->offset_to_vol_offset(high);
int si = dir_offset_evac_bucket(s);
int ei = dir_offset_evac_bucket(e);
for (int i = si; i <= ei; i++) {
EvacuationBlock *b = evacuate[i].head;
EvacuationBlock *first = nullptr;
int64_t first_offset = INT64_MAX;
for (; b; b = b->link.next) {
int64_t offset = dir_offset(&b->dir);
int phase = dir_phase(&b->dir);
if (offset >= s && offset < e && !b->f.done && phase == evac_phase) {
if (offset < first_offset) {
first = b;
first_offset = offset;
}
}
}
if (first) {
first->f.done = 1;
io.aiocb.aio_fildes = fd;
io.aiocb.aio_nbytes = dir_approx_size(&first->dir);
io.aiocb.aio_offset = this->vol_offset(&first->dir);
if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > static_cast<off_t>(skip + len)) {
io.aiocb.aio_nbytes = skip + len - io.aiocb.aio_offset;
}
doc_evacuator = new_DocEvacuator(io.aiocb.aio_nbytes, this);
doc_evacuator->overwrite_dir = first->dir;
io.aiocb.aio_buf = doc_evacuator->buf->data();
io.action = this;
io.thread = AIO_CALLBACK_THREAD_ANY;
DDebug("cache_evac", "evac_range evacuating %X %d", (int)dir_tag(&first->dir), (int)dir_offset(&first->dir));
SET_HANDLER(&Vol::evacuateDocReadDone);
ink_assert(ink_aio_read(&io) >= 0);
return -1;
}
}
return 0;
}
static int
agg_copy(char *p, CacheVC *vc)
{
Vol *vol = vc->vol;
off_t o = vol->header->write_pos + vol->agg_buf_pos;
if (!vc->f.evacuator) {
Doc *doc = reinterpret_cast<Doc *>(p);
IOBufferBlock *res_alt_blk = nullptr;
uint32_t len = vc->write_len + vc->header_len + vc->frag_len + sizeof(Doc);
ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeof(Doc));
ink_assert(vol->round_to_approx_size(len) == vc->agg_len);
// update copy of directory entry for this document
dir_set_approx_size(&vc->dir, vc->agg_len);
dir_set_offset(&vc->dir, vol->offset_to_vol_offset(o));
ink_assert(vol->vol_offset(&vc->dir) < (vol->skip + vol->len));
dir_set_phase(&vc->dir, vol->header->phase);
// fill in document header
doc->magic = DOC_MAGIC;
doc->len = len;
doc->hlen = vc->header_len;
doc->doc_type = vc->frag_type;
doc->v_major = CACHE_DB_MAJOR_VERSION;
doc->v_minor = CACHE_DB_MINOR_VERSION;
doc->unused = 0; // force this for forward compatibility.
doc->total_len = vc->total_len;
doc->first_key = vc->first_key;
doc->sync_serial = vol->header->sync_serial;
vc->write_serial = doc->write_serial = vol->header->write_serial;
doc->checksum = DOC_NO_CHECKSUM;
if (vc->pin_in_cache) {
dir_set_pinned(&vc->dir, 1);
doc->pinned = static_cast<uint32_t>(Thread::get_hrtime() / HRTIME_SECOND) + vc->pin_in_cache;
} else {
dir_set_pinned(&vc->dir, 0);
doc->pinned = 0;
}
if (vc->f.use_first_key) {
if (doc->data_len() || vc->f.allow_empty_doc) {
doc->key = vc->earliest_key;
} else { // the vector is being written by itself
if (vc->earliest_key == zero_key) {
do {
rand_CacheKey(&doc->key, vc->vol->mutex);
} while (DIR_MASK_TAG(doc->key.slice32(2)) == DIR_MASK_TAG(vc->first_key.slice32(2)));
} else {
prev_CacheKey(&doc->key, &vc->earliest_key);
}
}
dir_set_head(&vc->dir, true);
} else {
doc->key = vc->key;
dir_set_head(&vc->dir, !vc->fragment);
}
if (vc->f.rewrite_resident_alt) {
ink_assert(vc->f.use_first_key);
Doc *res_doc = reinterpret_cast<Doc *>(vc->first_buf->data());
res_alt_blk = new_IOBufferBlock(vc->first_buf, res_doc->data_len(), sizeof(Doc) + res_doc->hlen);
doc->key = res_doc->key;
doc->total_len = res_doc->data_len();
}
// update the new_info object_key, and total_len and dirinfo
if (vc->header_len) {
ink_assert(vc->f.use_first_key);
if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
ink_assert(vc->write_vector->count() > 0);
if (!vc->f.update && !vc->f.evac_vector) {
ink_assert(!(vc->first_key == zero_key));
CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
http_info->object_size_set(vc->total_len);
}
// update + data_written => Update case (b)
// need to change the old alternate's object length
if (vc->f.update && vc->total_len) {
CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
http_info->object_size_set(vc->total_len);
}
ink_assert(!(((uintptr_t)&doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), vc->header_len));
} else {
memcpy(doc->hdr(), vc->header_to_write, vc->header_len);
}
// the single fragment flag is not used in the write call.
// putting it in for completeness.
vc->f.single_fragment = doc->single_fragment();
}
// move data
if (vc->write_len) {
{
ProxyMutex *mutex ATS_UNUSED = vc->vol->mutex.get();
ink_assert(mutex->thread_holding == this_ethread());
CACHE_DEBUG_SUM_DYN_STAT(cache_write_bytes_stat, vc->write_len);
}
if (vc->f.rewrite_resident_alt) {
iobufferblock_memcpy(doc->data(), vc->write_len, res_alt_blk, 0);
} else {
iobufferblock_memcpy(doc->data(), vc->write_len, vc->blocks.get(), vc->offset);
}
#ifdef VERIFY_JTEST_DATA
if (f.use_first_key && header_len) {
int ib = 0, xd = 0;
char xx[500];
new_info.request_get().url_get().print(xx, 500, &ib, &xd);
char *x = xx;
for (int q = 0; q < 3; q++)
x = strchr(x + 1, '/');
ink_assert(!memcmp(doc->hdr(), x, ib - (x - xx)));
}
#endif
}
if (cache_config_enable_checksum) {
doc->checksum = 0;
for (char *b = doc->hdr(); b < reinterpret_cast<char *>(doc) + doc->len; b++) {
doc->checksum += *b;
}
}
if (vc->frag_type == CACHE_FRAG_TYPE_HTTP && vc->f.single_fragment) {
ink_assert(doc->hlen);
}
if (res_alt_blk) {
res_alt_blk->free();
}
return vc->agg_len;
} else {
// for evacuated documents, copy the data, and update directory
Doc *doc = reinterpret_cast<Doc *>(vc->buf->data());
int l = vc->vol->round_to_approx_size(doc->len);
{
ProxyMutex *mutex ATS_UNUSED = vc->vol->mutex.get();
ink_assert(mutex->thread_holding == this_ethread());
CACHE_DEBUG_INCREMENT_DYN_STAT(cache_gc_frags_evacuated_stat);
CACHE_DEBUG_SUM_DYN_STAT(cache_gc_bytes_evacuated_stat, l);
}
doc->sync_serial = vc->vol->header->sync_serial;
doc->write_serial = vc->vol->header->write_serial;
memcpy(p, doc, doc->len);
vc->dir = vc->overwrite_dir;
dir_set_offset(&vc->dir, vc->vol->offset_to_vol_offset(o));
dir_set_phase(&vc->dir, vc->vol->header->phase);
return l;
}
}
inline void
Vol::evacuate_cleanup_blocks(int i)
{
EvacuationBlock *b = evacuate[i].head;
while (b) {
if (b->f.done && ((header->phase != dir_phase(&b->dir) && header->write_pos > this->vol_offset(&b->dir)) ||
(header->phase == dir_phase(&b->dir) && header->write_pos <= this->vol_offset(&b->dir)))) {
EvacuationBlock *x = b;
DDebug("cache_evac", "evacuate cleanup free %X offset %d", (int)b->evac_frags.key.slice32(0), (int)dir_offset(&b->dir));
b = b->link.next;
evacuate[i].remove(x);
free_EvacuationBlock(x, mutex->thread_holding);
continue;
}
b = b->link.next;
}
}
void
Vol::evacuate_cleanup()
{
int64_t eo = ((header->write_pos - start) / CACHE_BLOCK_SIZE) + 1;
int64_t e = dir_offset_evac_bucket(eo);
int64_t sx = e - (evacuate_size / PIN_SCAN_EVERY) - 1;
int64_t s = sx;
int i;
if (e > evacuate_size) {
e = evacuate_size;
}
if (sx < 0) {
s = 0;
}
for (i = s; i < e; i++) {
evacuate_cleanup_blocks(i);
}
// if we have wrapped, handle the end bit
if (sx <= 0) {
s = evacuate_size + sx - 2;
if (s < 0) {
s = 0;
}
for (i = s; i < evacuate_size; i++) {
evacuate_cleanup_blocks(i);
}
}
}
void
Vol::periodic_scan()
{
evacuate_cleanup();
scan_for_pinned_documents();
if (header->write_pos == start) {
scan_pos = start;
}
scan_pos += len / PIN_SCAN_EVERY;
}
void
Vol::agg_wrap()
{
header->write_pos = start;
header->phase = !header->phase;
header->cycle++;
header->agg_pos = header->write_pos;
dir_lookaside_cleanup(this);
dir_clean_vol(this);
{
Vol *vol = this;
CACHE_INCREMENT_DYN_STAT(cache_directory_wrap_stat);
Note("Cache volume %d on disk '%s' wraps around", vol->cache_vol->vol_number, vol->hash_text.get());
}
periodic_scan();
}
/* NOTE: This state can be called by an AIO thread, so DON'T DON'T
DON'T schedule any events on this thread using VC_SCHED_XXX or
mutex->thread_holding->schedule_xxx_local(). ALWAYS use
eventProcessor.schedule_xxx().
Also, make sure that any functions called by this also use
the eventProcessor to schedule events
*/
int
Vol::aggWrite(int event, void * /* e ATS_UNUSED */)
{
ink_assert(!is_io_in_progress());
Que(CacheVC, link) tocall;
CacheVC *c;
cancel_trigger();
Lagain:
// calculate length of aggregated write
for (c = static_cast<CacheVC *>(agg.head); c;) {
int writelen = c->agg_len;
// [amc] this is checked multiple places, on here was it strictly less.
ink_assert(writelen <= AGG_SIZE);
if (agg_buf_pos + writelen > AGG_SIZE || header->write_pos + agg_buf_pos + writelen > (skip + len)) {
break;
}
DDebug("agg_read", "copying: %d, %" PRIu64 ", key: %d", agg_buf_pos, header->write_pos + agg_buf_pos, c->first_key.slice32(0));
int wrotelen = agg_copy(agg_buffer + agg_buf_pos, c);
ink_assert(writelen == wrotelen);
agg_todo_size -= writelen;
agg_buf_pos += writelen;
CacheVC *n = (CacheVC *)c->link.next;
agg.dequeue();
if (c->f.sync && c->f.use_first_key) {
CacheVC *last = sync.tail;
while (last && UINT_WRAP_LT(c->write_serial, last->write_serial)) {
last = (CacheVC *)last->link.prev;
}
sync.insert(c, last);
} else if (c->f.evacuator) {
c->handleEvent(AIO_EVENT_DONE, nullptr);
} else {
tocall.enqueue(c);
}
c = n;
}
// if we got nothing...
if (!agg_buf_pos) {
if (!agg.head && !sync.head) { // nothing to get
return EVENT_CONT;
}
if (header->write_pos == start) {
// write aggregation too long, bad bad, punt on everything.
Note("write aggregation exceeds vol size");
ink_assert(!tocall.head);
ink_assert(false);
while ((c = agg.dequeue())) {
agg_todo_size -= c->agg_len;
eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
}
return EVENT_CONT;
}
// start back
if (agg.head) {
agg_wrap();
goto Lagain;
}
}
// evacuate space
off_t end = header->write_pos + agg_buf_pos + EVACUATION_SIZE;
if (evac_range(header->write_pos, end, !header->phase) < 0) {
goto Lwait;
}
if (end > skip + len) {
if (evac_range(start, start + (end - (skip + len)), header->phase) < 0) {
goto Lwait;
}
}
// if agg.head, then we are near the end of the disk, so
// write down the aggregation in whatever size it is.
if (agg_buf_pos < AGG_HIGH_WATER && !agg.head && !sync.head && !dir_sync_waiting) {
goto Lwait;
}
// write sync marker
if (!agg_buf_pos) {
ink_assert(sync.head);
int l = round_to_approx_size(sizeof(Doc));
agg_buf_pos = l;
Doc *d = reinterpret_cast<Doc *>(agg_buffer);
memset(static_cast<void *>(d), 0, sizeof(Doc));
d->magic = DOC_MAGIC;
d->len = l;
d->sync_serial = header->sync_serial;
d->write_serial = header->write_serial;
}
// set write limit
header->agg_pos = header->write_pos + agg_buf_pos;
io.aiocb.aio_fildes = fd;
io.aiocb.aio_offset = header->write_pos;
io.aiocb.aio_buf = agg_buffer;
io.aiocb.aio_nbytes = agg_buf_pos;
io.action = this;
/*
Callback on AIO thread so that we can issue a new write ASAP
as all writes are serialized in the volume. This is not necessary
for reads proceed independently.
*/
io.thread = AIO_CALLBACK_THREAD_AIO;
SET_HANDLER(&Vol::aggWriteDone);
ink_aio_write(&io);
Lwait:
int ret = EVENT_CONT;
while ((c = tocall.dequeue())) {
if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding) {
ret = EVENT_RETURN;
} else {
eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
}
}
return ret;
}
int
CacheVC::openWriteCloseDir(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
cancel_trigger();
{
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
SET_HANDLER(&CacheVC::openWriteCloseDir);
ink_assert(!is_io_in_progress());
VC_SCHED_LOCK_RETRY();
}
vol->close_write(this);
if (closed < 0 && fragment) {
dir_delete(&earliest_key, vol, &earliest_dir);
}
}
if (is_debug_tag_set("cache_update")) {
if (f.update && closed > 0) {
if (!total_len && !f.allow_empty_doc && alternate_index != CACHE_ALT_REMOVED) {
Debug("cache_update", "header only %d (%" PRIu64 ", %" PRIu64 ")", DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0],
update_key.b[1]);
} else if ((total_len || f.allow_empty_doc) && alternate_index != CACHE_ALT_REMOVED) {
Debug("cache_update", "header body, %d, (%" PRIu64 ", %" PRIu64 "), (%" PRIu64 ", %" PRIu64 ")",
DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0], update_key.b[1], earliest_key.b[0], earliest_key.b[1]);
} else if (!total_len && alternate_index == CACHE_ALT_REMOVED) {
Debug("cache_update", "alt delete, %d, (%" PRIu64 ", %" PRIu64 ")", DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0],
update_key.b[1]);
}
}
}
// update the appropriate stat variable
// These variables may not give the current no of documents with
// one, two and three or more fragments. This is because for
// updates we dont decrement the variable corresponding the old
// size of the document
if ((closed == 1) && (total_len > 0 || f.allow_empty_doc)) {
DDebug("cache_stats", "Fragment = %d", fragment);
switch (fragment) {
case 0:
CACHE_INCREMENT_DYN_STAT(cache_single_fragment_document_count_stat);
break;
case 1:
CACHE_INCREMENT_DYN_STAT(cache_two_fragment_document_count_stat);
break;
default:
CACHE_INCREMENT_DYN_STAT(cache_three_plus_plus_fragment_document_count_stat);
break;
}
}
if (f.close_complete) {
recursive++;
ink_assert(!vol || this_ethread() != vol->mutex->thread_holding);
vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, (void *)&vio);
recursive--;
}
return free_CacheVC(this);
}
int
CacheVC::openWriteCloseHeadDone(int event, Event *e)
{
if (event == AIO_EVENT_DONE) {
set_io_not_in_progress();
} else if (is_io_in_progress()) {
return EVENT_CONT;
}
{
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
VC_LOCK_RETRY_EVENT();
}
od->writing_vec = false;
if (!io.ok()) {
goto Lclose;
}
ink_assert(f.use_first_key);
if (!od->dont_update_directory) {
if (dir_is_empty(&od->first_dir)) {
dir_insert(&first_key, vol, &dir);
} else {
// multiple fragment vector write
dir_overwrite(&first_key, vol, &dir, &od->first_dir, false);
// insert moved resident alternate
if (od->move_resident_alt) {
if (dir_valid(vol, &od->single_doc_dir)) {
dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
}
od->move_resident_alt = false;
}
}
od->first_dir = dir;
if (frag_type == CACHE_FRAG_TYPE_HTTP && f.single_fragment) {
// fragment is tied to the vector
od->move_resident_alt = true;
if (!f.rewrite_resident_alt) {
od->single_doc_key = earliest_key;
}
dir_assign(&od->single_doc_dir, &dir);
dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
}
}
}
Lclose:
return openWriteCloseDir(event, e);
}
int
CacheVC::openWriteCloseHead(int event, Event *e)
{
cancel_trigger();
f.use_first_key = 1;
if (io.ok()) {
ink_assert(fragment || (length == (int64_t)total_len));
} else {
return openWriteCloseDir(event, e);
}
if (f.data_done) {
write_len = 0;
} else {
write_len = length;
}
if (frag_type == CACHE_FRAG_TYPE_HTTP) {
SET_HANDLER(&CacheVC::updateVector);
return updateVector(EVENT_IMMEDIATE, nullptr);
} else {
header_len = header_to_write_len;
SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
return do_write_lock();
}
}
int
CacheVC::openWriteCloseDataDone(int event, Event *e)
{
int ret = 0;
cancel_trigger();
if (event == AIO_EVENT_DONE) {
set_io_not_in_progress();
} else if (is_io_in_progress()) {
return EVENT_CONT;
}
if (!io.ok()) {
return openWriteCloseDir(event, e);
}
{
CACHE_TRY_LOCK(lock, vol->mutex, this_ethread());
if (!lock.is_locked()) {
VC_LOCK_RETRY_EVENT();
}
if (!fragment) {
ink_assert(key == earliest_key);
earliest_dir = dir;
} else {
// Store the offset only if there is a table.
// Currently there is no alt (and thence no table) for non-HTTP.
if (alternate.valid()) {
alternate.push_frag_offset(write_pos);
}
}
fragment++;
write_pos += write_len;
dir_insert(&key, vol, &dir);
blocks = iobufferblock_skip(blocks.get(), &offset, &length, write_len);
next_CacheKey(&key, &key);
if (length) {
write_len = length;
if (write_len > MAX_FRAG_SIZE) {
write_len = MAX_FRAG_SIZE;
}
if ((ret = do_write_call()) == EVENT_RETURN) {
goto Lcallreturn;
}
return ret;
}
f.data_done = 1;
return openWriteCloseHead(event, e); // must be called under vol lock from here
}
Lcallreturn:
return handleEvent(AIO_EVENT_DONE, nullptr);
}
int
CacheVC::openWriteClose(int event, Event *e)
{
cancel_trigger();
if (is_io_in_progress()) {
if (event != AIO_EVENT_DONE) {
return EVENT_CONT;
}
set_io_not_in_progress();
if (!io.ok()) {
return openWriteCloseDir(event, e);
}
}
if (closed > 0 || f.allow_empty_doc) {
if (total_len == 0) {
if (f.update || f.allow_empty_doc) {
return updateVector(event, e);
} else {
// If we've been CLOSE'd but nothing has been written then
// this close is transformed into an abort.
closed = -1;
return openWriteCloseDir(event, e);
}
}
if (length && (fragment || length > static_cast<int>(MAX_FRAG_SIZE))) {
SET_HANDLER(&CacheVC::openWriteCloseDataDone);
write_len = length;
if (write_len > MAX_FRAG_SIZE) {
write_len = MAX_FRAG_SIZE;
}
return do_write_lock_call();
} else {
return openWriteCloseHead(event, e);
}
} else {
return openWriteCloseDir(event, e);
}
}
int
CacheVC::openWriteWriteDone(int event, Event *e)
{
cancel_trigger();
if (event == AIO_EVENT_DONE) {
set_io_not_in_progress();
} else if (is_io_in_progress()) {
return EVENT_CONT;
}
// In the event of VC_EVENT_ERROR, the cont must do an io_close
if (!io.ok()) {
if (closed) {
closed = -1;
return die();
}
SET_HANDLER(&CacheVC::openWriteMain);
return calluser(VC_EVENT_ERROR);
}
{
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
VC_LOCK_RETRY_EVENT();
}
// store the earliest directory. Need to remove the earliest dir
// in case the writer aborts.
if (!fragment) {
ink_assert(key == earliest_key);
earliest_dir = dir;
} else {
// Store the offset only if there is a table.
// Currently there is no alt (and thence no table) for non-HTTP.
if (alternate.valid()) {
alternate.push_frag_offset(write_pos);
}
}
++fragment;
write_pos += write_len;
dir_insert(&key, vol, &dir);
DDebug("cache_insert", "WriteDone: %X, %X, %d", key.slice32(0), first_key.slice32(0), write_len);
blocks = iobufferblock_skip(blocks.get(), &offset, &length, write_len);
next_CacheKey(&key, &key);
}
if (closed) {
return die();
}
SET_HANDLER(&CacheVC::openWriteMain);
return openWriteMain(event, e);
}
static inline int
target_fragment_size()
{
uint64_t value = cache_config_target_fragment_size - sizeof(Doc);
ink_release_assert(value <= MAX_FRAG_SIZE);
return value;
}
int
CacheVC::openWriteMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
cancel_trigger();
int called_user = 0;
ink_assert(!is_io_in_progress());
Lagain:
if (!vio.buffer.writer()) {
if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE) {
return EVENT_DONE;
}
if (!vio.buffer.writer()) {
return EVENT_CONT;
}
}
if (vio.ntodo() <= 0) {
called_user = 1;
if (calluser(VC_EVENT_WRITE_COMPLETE) == EVENT_DONE) {
return EVENT_DONE;
}
ink_assert(!f.close_complete || !"close expected after write COMPLETE");
if (vio.ntodo() <= 0) {
return EVENT_CONT;
}
}
int64_t ntodo = static_cast<int64_t>(vio.ntodo() + length);
int64_t total_avail = vio.buffer.reader()->read_avail();
int64_t avail = total_avail;
int64_t towrite = avail + length;
if (towrite > ntodo) {
avail -= (towrite - ntodo);
towrite = ntodo;
}
if (towrite > static_cast<int>(MAX_FRAG_SIZE)) {
avail -= (towrite - MAX_FRAG_SIZE);
towrite = MAX_FRAG_SIZE;
}
if (!blocks && towrite) {
blocks = vio.buffer.reader()->block;
offset = vio.buffer.reader()->start_offset;
}
if (avail > 0) {
vio.buffer.reader()->consume(avail);
vio.ndone += avail;
total_len += avail;
}
length = static_cast<uint64_t>(towrite);
if (length > target_fragment_size() && (length < target_fragment_size() + target_fragment_size() / 4)) {
write_len = target_fragment_size();
} else {
write_len = length;
}
bool not_writing = towrite != ntodo && towrite < target_fragment_size();
if (!called_user) {
if (not_writing) {
called_user = 1;
if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE) {
return EVENT_DONE;
}
goto Lagain;
} else if (vio.ntodo() <= 0) {
goto Lagain;
}
}
if (not_writing) {
return EVENT_CONT;
}
if (towrite == ntodo && f.close_complete) {
closed = 1;
SET_HANDLER(&CacheVC::openWriteClose);
return openWriteClose(EVENT_NONE, nullptr);
}
SET_HANDLER(&CacheVC::openWriteWriteDone);
return do_write_lock_call();
}
// begin overwrite
int
CacheVC::openWriteOverwrite(int event, Event *e)
{
cancel_trigger();
if (event != AIO_EVENT_DONE) {
if (event == EVENT_IMMEDIATE) {
last_collision = nullptr;
}
} else {
Doc *doc = nullptr;
set_io_not_in_progress();
if (_action.cancelled) {
return openWriteCloseDir(event, e);
}
if (!io.ok()) {
goto Ldone;
}
doc = reinterpret_cast<Doc *>(buf->data());
if (!(doc->first_key == first_key)) {
goto Lcollision;
}
od->first_dir = dir;
first_buf = buf;
goto Ldone;
}
Lcollision : {
CACHE_TRY_LOCK(lock, vol->mutex, this_ethread());
if (!lock.is_locked()) {
VC_LOCK_RETRY_EVENT();
}
int res = dir_probe(&first_key, vol, &dir, &last_collision);
if (res > 0) {
if ((res = do_read_call(&first_key)) == EVENT_RETURN) {
goto Lcallreturn;
}
return res;
}
}
Ldone:
SET_HANDLER(&CacheVC::openWriteMain);
return callcont(CACHE_EVENT_OPEN_WRITE);
Lcallreturn:
return handleEvent(AIO_EVENT_DONE, nullptr); // hopefully a tail call
}
// openWriteStartDone handles vector read (addition of alternates)
// and lock misses
int
CacheVC::openWriteStartDone(int event, Event *e)
{
intptr_t err = ECACHE_NO_DOC;
cancel_trigger();
if (is_io_in_progress()) {
if (event != AIO_EVENT_DONE) {
return EVENT_CONT;
}
set_io_not_in_progress();
}
{
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
VC_LOCK_RETRY_EVENT();
}
if (_action.cancelled && (!od || !od->has_multiple_writers())) {
goto Lcancel;
}
if (event == AIO_EVENT_DONE) { // vector read done
Doc *doc = reinterpret_cast<Doc *>(buf->data());
if (!io.ok()) {
err = ECACHE_READ_FAIL;
goto Lfailure;
}
/* INKqa07123.
A directory entry which is no longer valid may have been overwritten.
We need to start afresh from the beginning by setting last_collision
to nullptr.
*/
if (!dir_valid(vol, &dir)) {
DDebug("cache_write", "OpenReadStartDone: Dir not valid: Write Head: %" PRId64 ", Dir: %" PRId64,
(int64_t)vol->offset_to_vol_offset(vol->header->write_pos), dir_offset(&dir));
last_collision = nullptr;
goto Lcollision;
}
if (!(doc->first_key == first_key)) {
goto Lcollision;
}
if (doc->magic != DOC_MAGIC || !doc->hlen || this->load_http_info(write_vector, doc, buf.object()) != doc->hlen) {
err = ECACHE_BAD_META_DATA;
goto Lfailure;
}
ink_assert(write_vector->count() > 0);
od->first_dir = dir;
first_dir = dir;
if (doc->single_fragment()) {
// fragment is tied to the vector
od->move_resident_alt = true;
od->single_doc_key = doc->key;
dir_assign(&od->single_doc_dir, &dir);
dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
}
first_buf = buf;
goto Lsuccess;
}
Lcollision:
int if_writers = ((uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES);
if (!od) {
if ((err = vol->open_write(this, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0) {
goto Lfailure;
}
if (od->has_multiple_writers()) {
MUTEX_RELEASE(lock);
SET_HANDLER(&CacheVC::openWriteMain);
return callcont(CACHE_EVENT_OPEN_WRITE);
}
}
// check for collision
if (dir_probe(&first_key, vol, &dir, &last_collision)) {
od->reading_vec = true;
int ret = do_read_call(&first_key);
if (ret == EVENT_RETURN) {
goto Lcallreturn;
}
return ret;
}
if (f.update) {
// fail update because vector has been GC'd
goto Lfailure;
}
}
Lsuccess:
od->reading_vec = false;
if (_action.cancelled) {
goto Lcancel;
}
SET_HANDLER(&CacheVC::openWriteMain);
return callcont(CACHE_EVENT_OPEN_WRITE);
Lfailure:
CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
_action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-err);
Lcancel:
if (od) {
od->reading_vec = false;
return openWriteCloseDir(event, e);
} else {
return free_CacheVC(this);
}
Lcallreturn:
return handleEvent(AIO_EVENT_DONE, nullptr); // hopefully a tail call
}
// handle lock failures from main Cache::open_write entry points below
int
CacheVC::openWriteStartBegin(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
intptr_t err;
cancel_trigger();
if (_action.cancelled) {
return free_CacheVC(this);
}
if (((err = vol->open_write_lock(this, false, 1)) > 0)) {
CACHE_INCREMENT_DYN_STAT(base_stat + CACHE_STAT_FAILURE);
free_CacheVC(this);
_action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-err);
return EVENT_DONE;
}
if (err < 0) {
VC_SCHED_LOCK_RETRY();
}
if (f.overwrite) {
SET_HANDLER(&CacheVC::openWriteOverwrite);
return openWriteOverwrite(EVENT_IMMEDIATE, nullptr);
} else {
// write by key
SET_HANDLER(&CacheVC::openWriteMain);
return callcont(CACHE_EVENT_OPEN_WRITE);
}
}
// main entry point for writing of of non-http documents
Action *
Cache::open_write(Continuation *cont, const CacheKey *key, CacheFragType frag_type, int options, time_t apin_in_cache,
const char *hostname, int host_len)
{
if (!CacheProcessor::IsCacheReady(frag_type)) {
cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-ECACHE_NOT_READY);
return ACTION_RESULT_DONE;
}
ink_assert(caches[frag_type] == this);
intptr_t res = 0;
CacheVC *c = new_CacheVC(cont);
ProxyMutex *mutex = cont->mutex.get();
SCOPED_MUTEX_LOCK(lock, c->mutex, this_ethread());
c->vio.op = VIO::WRITE;
c->base_stat = cache_write_active_stat;
c->vol = key_to_vol(key, hostname, host_len);
Vol *vol = c->vol;
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
c->first_key = c->key = *key;
c->frag_type = frag_type;
/*
The transition from single fragment document to a multi-fragment document
would cause a problem if the key and the first_key collide. In case of
a collision, old vector data could be served to HTTP. Need to avoid that.
Also, when evacuating a fragment, we have to decide if its the first_key
or the earliest_key based on the dir_tag.
*/
do {
rand_CacheKey(&c->key, cont->mutex);
} while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2)));
c->earliest_key = c->key;
c->info = nullptr;
c->f.overwrite = (options & CACHE_WRITE_OPT_OVERWRITE) != 0;
c->f.close_complete = (options & CACHE_WRITE_OPT_CLOSE_COMPLETE) != 0;
c->f.sync = (options & CACHE_WRITE_OPT_SYNC) == CACHE_WRITE_OPT_SYNC;
c->pin_in_cache = static_cast<uint32_t>(apin_in_cache);
if ((res = c->vol->open_write_lock(c, false, 1)) > 0) {
// document currently being written, abort
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-res);
free_CacheVC(c);
return ACTION_RESULT_DONE;
}
if (res < 0) {
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartBegin);
c->trigger = CONT_SCHED_LOCK_RETRY(c);
return &c->_action;
}
if (!c->f.overwrite) {
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
c->callcont(CACHE_EVENT_OPEN_WRITE);
return ACTION_RESULT_DONE;
} else {
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteOverwrite);
if (c->openWriteOverwrite(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) {
return ACTION_RESULT_DONE;
} else {
return &c->_action;
}
}
}
// main entry point for writing of http documents
Action *
Cache::open_write(Continuation *cont, const CacheKey *key, CacheHTTPInfo *info, time_t apin_in_cache,
const CacheKey * /* key1 ATS_UNUSED */, CacheFragType type, const char *hostname, int host_len)
{
if (!CacheProcessor::IsCacheReady(type)) {
cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-ECACHE_NOT_READY);
return ACTION_RESULT_DONE;
}
ink_assert(caches[type] == this);
intptr_t err = 0;
int if_writers = (uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES;
CacheVC *c = new_CacheVC(cont);
ProxyMutex *mutex = cont->mutex.get();
c->vio.op = VIO::WRITE;
c->first_key = *key;
/*
The transition from single fragment document to a multi-fragment document
would cause a problem if the key and the first_key collide. In case of
a collision, old vector data could be served to HTTP. Need to avoid that.
Also, when evacuating a fragment, we have to decide if its the first_key
or the earliest_key based on the dir_tag.
*/
do {
rand_CacheKey(&c->key, cont->mutex);
} while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2)));
c->earliest_key = c->key;
c->frag_type = CACHE_FRAG_TYPE_HTTP;
c->vol = key_to_vol(key, hostname, host_len);
Vol *vol = c->vol;
c->info = info;
if (c->info && (uintptr_t)info != CACHE_ALLOW_MULTIPLE_WRITES) {
/*
Update has the following code paths :
a) Update alternate header only :
In this case the vector has to be rewritten. The content
length(update_len) and the key for the document are set in the
new_info in the set_http_info call.
HTTP OPERATIONS
open_write with info set
set_http_info new_info
(total_len == 0)
close
b) Update alternate and data
In this case both the vector and the data needs to be rewritten.
This case is similar to the standard write of a document case except
that the new_info is inserted into the vector at the alternate_index
(overwriting the old alternate) rather than the end of the vector.
HTTP OPERATIONS
open_write with info set
set_http_info new_info
do_io_write => (total_len > 0)
close
c) Delete an alternate
The vector may need to be deleted (if there was only one alternate) or
rewritten (if there were more than one alternate). The deletion of the
vector is done in openWriteRemoveVector.
HTTP OPERATIONS
open_write with info set
close
*/
c->f.update = 1;
c->base_stat = cache_update_active_stat;
DDebug("cache_update", "Update called");
info->object_key_get(&c->update_key);
ink_assert(!(c->update_key == zero_key));
c->update_len = info->object_size_get();
} else {
c->base_stat = cache_write_active_stat;
}
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
c->pin_in_cache = static_cast<uint32_t>(apin_in_cache);
{
CACHE_TRY_LOCK(lock, c->vol->mutex, cont->mutex->thread_holding);
if (lock.is_locked()) {
if ((err = c->vol->open_write(c, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0) {
goto Lfailure;
}
// If there are multiple writers, then this one cannot be an update.
// Only the first writer can do an update. If that's the case, we can
// return success to the state machine now.;
if (c->od->has_multiple_writers()) {
goto Lmiss;
}
if (!dir_probe(key, c->vol, &c->dir, &c->last_collision)) {
if (c->f.update) {
// fail update because vector has been GC'd
// This situation can also arise in openWriteStartDone
err = ECACHE_NO_DOC;
goto Lfailure;
}
// document doesn't exist, begin write
goto Lmiss;
} else {
c->od->reading_vec = true;
// document exists, read vector
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
switch (c->do_read_call(&c->first_key)) {
case EVENT_DONE:
return ACTION_RESULT_DONE;
case EVENT_RETURN:
goto Lcallreturn;
default:
return &c->_action;
}
}
}
// missed lock
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
CONT_SCHED_LOCK_RETRY(c);
return &c->_action;
}
Lmiss:
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
c->callcont(CACHE_EVENT_OPEN_WRITE);
return ACTION_RESULT_DONE;
Lfailure:
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_FAILURE);
cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-err);
if (c->od) {
c->openWriteCloseDir(EVENT_IMMEDIATE, nullptr);
return ACTION_RESULT_DONE;
}
free_CacheVC(c);
return ACTION_RESULT_DONE;
Lcallreturn:
if (c->handleEvent(AIO_EVENT_DONE, nullptr) == EVENT_DONE) {
return ACTION_RESULT_DONE;
}
return &c->_action;
}