blob: 4c33cdf5d267a63365652b02f8701fbbd13b7f65 [file] [log] [blame]
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "P_Cache.h"
#include "HttpCacheSM.h" //Added to get the scope of HttpCacheSM object.
Action *
Cache::open_read(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int host_len)
{
if (!CacheProcessor::IsCacheReady(type)) {
cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-ECACHE_NOT_READY);
return ACTION_RESULT_DONE;
}
ink_assert(caches[type] == this);
Vol *vol = key_to_vol(key, hostname, host_len);
Dir result, *last_collision = nullptr;
ProxyMutex *mutex = cont->mutex.get();
OpenDirEntry *od = nullptr;
CacheVC *c = nullptr;
{
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked() || (od = vol->open_read(key)) || dir_probe(key, vol, &result, &last_collision)) {
c = new_CacheVC(cont);
SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
c->vio.op = VIO::READ;
c->base_stat = cache_read_active_stat;
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
c->first_key = c->key = c->earliest_key = *key;
c->vol = vol;
c->frag_type = type;
c->od = od;
}
if (!c) {
goto Lmiss;
}
if (!lock.is_locked()) {
CONT_SCHED_LOCK_RETRY(c);
return &c->_action;
}
if (c->od) {
goto Lwriter;
}
c->dir = result;
c->last_collision = last_collision;
switch (c->do_read_call(&c->key)) {
case EVENT_DONE:
return ACTION_RESULT_DONE;
case EVENT_RETURN:
goto Lcallreturn;
default:
return &c->_action;
}
}
Lmiss:
CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-ECACHE_NO_DOC);
return ACTION_RESULT_DONE;
Lwriter:
SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
if (c->handleEvent(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) {
return ACTION_RESULT_DONE;
}
return &c->_action;
Lcallreturn:
if (c->handleEvent(AIO_EVENT_DONE, nullptr) == EVENT_DONE) {
return ACTION_RESULT_DONE;
}
return &c->_action;
}
Action *
Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request, const OverridableHttpConfigParams *params,
CacheFragType type, const char *hostname, int host_len)
{
if (!CacheProcessor::IsCacheReady(type)) {
cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-ECACHE_NOT_READY);
return ACTION_RESULT_DONE;
}
ink_assert(caches[type] == this);
Vol *vol = key_to_vol(key, hostname, host_len);
Dir result, *last_collision = nullptr;
ProxyMutex *mutex = cont->mutex.get();
OpenDirEntry *od = nullptr;
CacheVC *c = nullptr;
{
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked() || (od = vol->open_read(key)) || dir_probe(key, vol, &result, &last_collision)) {
c = new_CacheVC(cont);
c->first_key = c->key = c->earliest_key = *key;
c->vol = vol;
c->vio.op = VIO::READ;
c->base_stat = cache_read_active_stat;
CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE);
c->request.copy_shallow(request);
c->frag_type = CACHE_FRAG_TYPE_HTTP;
c->params = params;
c->od = od;
}
if (!lock.is_locked()) {
SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
CONT_SCHED_LOCK_RETRY(c);
return &c->_action;
}
if (!c) {
goto Lmiss;
}
if (c->od) {
goto Lwriter;
}
// hit
c->dir = c->first_dir = result;
c->last_collision = last_collision;
SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead);
switch (c->do_read_call(&c->key)) {
case EVENT_DONE:
return ACTION_RESULT_DONE;
case EVENT_RETURN:
goto Lcallreturn;
default:
return &c->_action;
}
}
Lmiss:
CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-ECACHE_NO_DOC);
return ACTION_RESULT_DONE;
Lwriter:
// this is a horrible violation of the interface and should be fixed (FIXME)
((HttpCacheSM *)cont)->set_readwhilewrite_inprogress(true);
SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter);
if (c->handleEvent(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) {
return ACTION_RESULT_DONE;
}
return &c->_action;
Lcallreturn:
if (c->handleEvent(AIO_EVENT_DONE, nullptr) == EVENT_DONE) {
return ACTION_RESULT_DONE;
}
return &c->_action;
}
uint32_t
CacheVC::load_http_info(CacheHTTPInfoVector *info, Doc *doc, RefCountObj *block_ptr)
{
uint32_t zret = info->get_handles(doc->hdr(), doc->hlen, block_ptr);
if (!this->f.doc_from_ram_cache && // ram cache is always already fixed up.
// If this is an old object, the object version will be old or 0, in either case this is
// correct. Forget the 4.2 compatibility, always update older versioned objects.
ts::VersionNumber(doc->v_major, doc->v_minor) < CACHE_DB_VERSION) {
for (int i = info->xcount - 1; i >= 0; --i) {
info->data(i).alternate.m_alt->m_response_hdr.m_mime->recompute_accelerators_and_presence_bits();
info->data(i).alternate.m_alt->m_request_hdr.m_mime->recompute_accelerators_and_presence_bits();
}
}
return zret;
}
int
CacheVC::openReadFromWriterFailure(int event, Event *e)
{
od = nullptr;
vector.clear(false);
CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
CACHE_INCREMENT_DYN_STAT(cache_read_busy_failure_stat);
_action.continuation->handleEvent(event, e);
free_CacheVC(this);
return EVENT_DONE;
}
int
CacheVC::openReadChooseWriter(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
intptr_t err = ECACHE_DOC_BUSY;
CacheVC *w = nullptr;
ink_assert(vol->mutex->thread_holding == mutex->thread_holding && write_vc == nullptr);
if (!od) {
return EVENT_RETURN;
}
if (frag_type != CACHE_FRAG_TYPE_HTTP) {
ink_assert(od->num_writers == 1);
w = od->writers.head;
if (w->start_time > start_time || w->closed < 0) {
od = nullptr;
return EVENT_RETURN;
}
if (!w->closed) {
return -err;
}
write_vc = w;
} else {
write_vector = &od->vector;
int write_vec_cnt = write_vector->count();
for (int c = 0; c < write_vec_cnt; c++) {
vector.insert(write_vector->get(c));
}
// check if all the writers who came before this reader have
// set the http_info.
for (w = static_cast<CacheVC *>(od->writers.head); w; w = static_cast<CacheVC *>(w->opendir_link.next)) {
if (w->start_time > start_time || w->closed < 0) {
continue;
}
if (!w->closed && !cache_config_read_while_writer) {
return -err;
}
if (w->alternate_index != CACHE_ALT_INDEX_DEFAULT) {
continue;
}
if (!w->closed && !w->alternate.valid()) {
od = nullptr;
ink_assert(!write_vc);
vector.clear(false);
return EVENT_CONT;
}
// construct the vector from the writers.
int alt_ndx = CACHE_ALT_INDEX_DEFAULT;
if (w->f.update) {
// all Update cases. Need to get the alternate index.
alt_ndx = get_alternate_index(&vector, w->update_key);
// if its an alternate delete
if (!w->alternate.valid()) {
if (alt_ndx >= 0) {
vector.remove(alt_ndx, false);
}
continue;
}
}
if (w->alternate.valid()) {
vector.insert(&w->alternate, alt_ndx);
}
}
if (!vector.count()) {
if (od->reading_vec) {
// the writer(s) are reading the vector, so there is probably
// an old vector. Since this reader came before any of the
// current writers, we should return the old data
od = nullptr;
return EVENT_RETURN;
}
return -ECACHE_NO_DOC;
}
if (cache_config_select_alternate) {
alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
if (alternate_index < 0) {
return -ECACHE_ALT_MISS;
}
} else {
alternate_index = 0;
}
CacheHTTPInfo *obj = vector.get(alternate_index);
for (w = static_cast<CacheVC *>(od->writers.head); w; w = static_cast<CacheVC *>(w->opendir_link.next)) {
if (obj->m_alt == w->alternate.m_alt) {
write_vc = w;
break;
}
}
vector.clear(false);
if (!write_vc) {
DDebug("cache_read_agg", "%p: key: %X writer alternate different: %d", this, first_key.slice32(1), alternate_index);
od = nullptr;
return EVENT_RETURN;
}
DDebug("cache_read_agg", "%p: key: %X eKey: %d # alts: %d, ndx: %d, # writers: %d writer: %p", this, first_key.slice32(1),
write_vc->earliest_key.slice32(1), vector.count(), alternate_index, od->num_writers, write_vc);
}
return EVENT_NONE;
}
int
CacheVC::openReadFromWriter(int event, Event *e)
{
if (!f.read_from_writer_called) {
// The assignment to last_collision as nullptr was
// made conditional after INKqa08411
last_collision = nullptr;
// Let's restart the clock from here - the first time this a reader
// gets in this state. Its possible that the open_read was called
// before the open_write, but the reader could not get the volume
// lock. If we don't reset the clock here, we won't choose any writer
// and hence fail the read request.
start_time = Thread::get_hrtime();
f.read_from_writer_called = 1;
}
cancel_trigger();
intptr_t err = ECACHE_DOC_BUSY;
DDebug("cache_read_agg", "%p: key: %X In openReadFromWriter", this, first_key.slice32(1));
if (_action.cancelled) {
od = nullptr; // only open for read so no need to close
return free_CacheVC(this);
}
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
VC_SCHED_LOCK_RETRY();
}
od = vol->open_read(&first_key); // recheck in case the lock failed
if (!od) {
MUTEX_RELEASE(lock);
write_vc = nullptr;
SET_HANDLER(&CacheVC::openReadStartHead);
return openReadStartHead(event, e);
} else {
ink_assert(od == vol->open_read(&first_key));
}
if (!write_vc) {
int ret = openReadChooseWriter(event, e);
if (ret < 0) {
MUTEX_RELEASE(lock);
SET_HANDLER(&CacheVC::openReadFromWriterFailure);
return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast<Event *>(ret));
} else if (ret == EVENT_RETURN) {
MUTEX_RELEASE(lock);
SET_HANDLER(&CacheVC::openReadStartHead);
return openReadStartHead(event, e);
} else if (ret == EVENT_CONT) {
ink_assert(!write_vc);
if (writer_lock_retry < cache_config_read_while_writer_max_retries) {
VC_SCHED_WRITER_RETRY();
} else {
return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
}
} else {
ink_assert(write_vc);
}
} else {
if (writer_done()) {
MUTEX_RELEASE(lock);
DDebug("cache_read_agg", "%p: key: %X writer %p has left, continuing as normal read", this, first_key.slice32(1), write_vc);
od = nullptr;
write_vc = nullptr;
SET_HANDLER(&CacheVC::openReadStartHead);
return openReadStartHead(event, e);
}
}
OpenDirEntry *cod = od;
od = nullptr;
// someone is currently writing the document
if (write_vc->closed < 0) {
MUTEX_RELEASE(lock);
write_vc = nullptr;
// writer aborted, continue as if there is no writer
SET_HANDLER(&CacheVC::openReadStartHead);
return openReadStartHead(EVENT_IMMEDIATE, nullptr);
}
// allow reading from unclosed writer for http requests only.
ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP || write_vc->closed);
if (!write_vc->closed && !write_vc->fragment) {
if (!cache_config_read_while_writer || frag_type != CACHE_FRAG_TYPE_HTTP ||
writer_lock_retry >= cache_config_read_while_writer_max_retries) {
MUTEX_RELEASE(lock);
return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
}
DDebug("cache_read_agg", "%p: key: %X writer: closed:%d, fragment:%d, retry: %d", this, first_key.slice32(1), write_vc->closed,
write_vc->fragment, writer_lock_retry);
VC_SCHED_WRITER_RETRY();
}
CACHE_TRY_LOCK(writer_lock, write_vc->mutex, mutex->thread_holding);
if (!writer_lock.is_locked()) {
DDebug("cache_read_agg", "%p: key: %X lock miss", this, first_key.slice32(1));
VC_SCHED_LOCK_RETRY();
}
MUTEX_RELEASE(lock);
if (!write_vc->io.ok()) {
return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
}
if (frag_type == CACHE_FRAG_TYPE_HTTP) {
DDebug("cache_read_agg", "%p: key: %X http passed stage 1, closed: %d, frag: %d", this, first_key.slice32(1), write_vc->closed,
write_vc->fragment);
if (!write_vc->alternate.valid()) {
return openReadFromWriterFailure(CACHE_EVENT_OPEN_READ_FAILED, (Event *)-err);
}
alternate.copy(&write_vc->alternate);
vector.insert(&alternate);
alternate.object_key_get(&key);
write_vc->f.readers = 1;
if (!(write_vc->f.update && write_vc->total_len == 0)) {
key = write_vc->earliest_key;
if (!write_vc->closed) {
alternate.object_size_set(write_vc->vio.nbytes);
} else {
alternate.object_size_set(write_vc->total_len);
}
} else {
key = write_vc->update_key;
ink_assert(write_vc->closed);
DDebug("cache_read_agg", "%p: key: %X writer header update", this, first_key.slice32(1));
// Update case (b) : grab doc_len from the writer's alternate
doc_len = alternate.object_size_get();
if (write_vc->update_key == cod->single_doc_key && (cod->move_resident_alt || write_vc->f.rewrite_resident_alt) &&
write_vc->first_buf.get()) {
// the resident alternate is being updated and its a
// header only update. The first_buf of the writer has the
// document body.
Doc *doc = reinterpret_cast<Doc *>(write_vc->first_buf->data());
writer_buf = new_IOBufferBlock(write_vc->first_buf, doc->data_len(), doc->prefix_len());
MUTEX_RELEASE(writer_lock);
ink_assert(doc_len == doc->data_len());
length = doc_len;
f.single_fragment = 1;
doc_pos = 0;
earliest_key = key;
dir_clean(&first_dir);
dir_clean(&earliest_dir);
SET_HANDLER(&CacheVC::openReadFromWriterMain);
CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
return callcont(CACHE_EVENT_OPEN_READ);
}
// want to snarf the new headers from the writer
// and then continue as if nothing happened
last_collision = nullptr;
MUTEX_RELEASE(writer_lock);
SET_HANDLER(&CacheVC::openReadStartEarliest);
return openReadStartEarliest(event, e);
}
} else {
DDebug("cache_read_agg", "%p: key: %X non-http passed stage 1", this, first_key.slice32(1));
key = write_vc->earliest_key;
}
if (write_vc->fragment) {
doc_len = write_vc->vio.nbytes;
last_collision = nullptr;
DDebug("cache_read_agg", "%p: key: %X closed: %d, fragment: %d, len: %d starting first fragment", this, first_key.slice32(1),
write_vc->closed, write_vc->fragment, (int)doc_len);
MUTEX_RELEASE(writer_lock);
// either a header + body update or a new document
SET_HANDLER(&CacheVC::openReadStartEarliest);
return openReadStartEarliest(event, e);
}
writer_buf = write_vc->blocks;
writer_offset = write_vc->offset;
length = write_vc->length;
// copy the vector
f.single_fragment = !write_vc->fragment; // single fragment doc
doc_pos = 0;
earliest_key = write_vc->earliest_key;
ink_assert(earliest_key == key);
doc_len = write_vc->total_len;
dir_clean(&first_dir);
dir_clean(&earliest_dir);
DDebug("cache_read_agg", "%p: key: %X %X: single fragment read", this, first_key.slice32(1), key.slice32(0));
MUTEX_RELEASE(writer_lock);
SET_HANDLER(&CacheVC::openReadFromWriterMain);
CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
return callcont(CACHE_EVENT_OPEN_READ);
}
int
CacheVC::openReadFromWriterMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
cancel_trigger();
if (seek_to) {
vio.ndone = seek_to;
seek_to = 0;
}
IOBufferBlock *b = nullptr;
int64_t ntodo = vio.ntodo();
if (ntodo <= 0) {
return EVENT_CONT;
}
if (length < (static_cast<int64_t>(doc_len)) - vio.ndone) {
DDebug("cache_read_agg", "truncation %X", first_key.slice32(1));
if (is_action_tag_set("cache")) {
ink_release_assert(false);
}
Warning("Document %X truncated at %d of %d, reading from writer", first_key.slice32(1), (int)vio.ndone, (int)doc_len);
return calluser(VC_EVENT_ERROR);
}
/* its possible that the user did a do_io_close before
openWriteWriteDone was called. */
if (length > (static_cast<int64_t>(doc_len)) - vio.ndone) {
int64_t skip_bytes = length - (doc_len - vio.ndone);
iobufferblock_skip(writer_buf.get(), &writer_offset, &length, skip_bytes);
}
int64_t bytes = length;
if (bytes > vio.ntodo()) {
bytes = vio.ntodo();
}
if (vio.ndone >= static_cast<int64_t>(doc_len)) {
ink_assert(bytes <= 0);
// reached the end of the document and the user still wants more
return calluser(VC_EVENT_EOS);
}
b = iobufferblock_clone(writer_buf.get(), writer_offset, bytes);
writer_buf = iobufferblock_skip(writer_buf.get(), &writer_offset, &length, bytes);
vio.buffer.writer()->append_block(b);
vio.ndone += bytes;
if (vio.ntodo() <= 0) {
return calluser(VC_EVENT_READ_COMPLETE);
} else {
return calluser(VC_EVENT_READ_READY);
}
}
int
CacheVC::openReadClose(int event, Event * /* e ATS_UNUSED */)
{
cancel_trigger();
if (is_io_in_progress()) {
if (event != AIO_EVENT_DONE) {
return EVENT_CONT;
}
set_io_not_in_progress();
}
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
VC_SCHED_LOCK_RETRY();
}
if (f.hit_evacuate && dir_valid(vol, &first_dir) && closed > 0) {
if (f.single_fragment) {
vol->force_evacuate_head(&first_dir, dir_pinned(&first_dir));
} else if (dir_valid(vol, &earliest_dir)) {
vol->force_evacuate_head(&first_dir, dir_pinned(&first_dir));
vol->force_evacuate_head(&earliest_dir, dir_pinned(&earliest_dir));
}
}
vol->close_read(this);
return free_CacheVC(this);
}
int
CacheVC::openReadReadDone(int event, Event *e)
{
Doc *doc = nullptr;
cancel_trigger();
if (event == EVENT_IMMEDIATE) {
return EVENT_CONT;
}
set_io_not_in_progress();
{
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
VC_SCHED_LOCK_RETRY();
}
if (event == AIO_EVENT_DONE && !io.ok()) {
goto Lerror;
}
if (last_collision && // no missed lock
dir_valid(vol, &dir)) // object still valid
{
doc = reinterpret_cast<Doc *>(buf->data());
if (doc->magic != DOC_MAGIC) {
char tmpstring[CRYPTO_HEX_SIZE];
if (doc->magic == DOC_CORRUPT) {
Warning("Middle: Doc checksum does not match for %s", key.toHexStr(tmpstring));
} else {
Warning("Middle: Doc magic does not match for %s", key.toHexStr(tmpstring));
}
goto Lerror;
}
if (doc->key == key) {
goto LreadMain;
}
}
if (last_collision && dir_offset(&dir) != dir_offset(last_collision)) {
last_collision = nullptr; // object has been/is being overwritten
}
if (dir_probe(&key, vol, &dir, &last_collision)) {
int ret = do_read_call(&key);
if (ret == EVENT_RETURN) {
goto Lcallreturn;
}
return EVENT_CONT;
} else if (write_vc) {
if (writer_done()) {
last_collision = nullptr;
while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
DDebug("cache_read_agg", "%p: key: %X ReadRead complete: %d", this, first_key.slice32(1), (int)vio.ndone);
doc_len = vio.ndone;
goto Ldone;
}
}
DDebug("cache_read_agg", "%p: key: %X ReadRead writer aborted: %d", this, first_key.slice32(1), (int)vio.ndone);
goto Lerror;
}
if (writer_lock_retry < cache_config_read_while_writer_max_retries) {
DDebug("cache_read_agg", "%p: key: %X ReadRead retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
VC_SCHED_WRITER_RETRY(); // wait for writer
} else {
DDebug("cache_read_agg", "%p: key: %X ReadRead retries exhausted, bailing..: %d", this, first_key.slice32(1),
(int)vio.ndone);
goto Ldone;
}
}
// fall through for truncated documents
}
Lerror : {
char tmpstring[CRYPTO_HEX_SIZE];
if (request.valid()) {
int url_length;
const char *url_text = request.url_get()->string_get_ref(&url_length);
Warning("Document %s truncated, url[%.*s] .. clearing", earliest_key.toHexStr(tmpstring), url_length, url_text);
} else {
Warning("Document %s truncated .. clearing", earliest_key.toHexStr(tmpstring));
}
dir_delete(&earliest_key, vol, &earliest_dir);
return calluser(VC_EVENT_ERROR);
}
Ldone:
return calluser(VC_EVENT_EOS);
Lcallreturn:
return handleEvent(AIO_EVENT_DONE, nullptr);
LreadMain:
fragment++;
doc_pos = doc->prefix_len();
next_CacheKey(&key, &key);
SET_HANDLER(&CacheVC::openReadMain);
return openReadMain(event, e);
}
int
CacheVC::openReadMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
cancel_trigger();
Doc *doc = reinterpret_cast<Doc *>(buf->data());
int64_t ntodo = vio.ntodo();
int64_t bytes = doc->len - doc_pos;
IOBufferBlock *b = nullptr;
if (seek_to) { // handle do_io_pread
if (seek_to >= doc_len) {
vio.ndone = doc_len;
return calluser(VC_EVENT_EOS);
}
HTTPInfo::FragOffset *frags = alternate.get_frag_table();
if (is_debug_tag_set("cache_seek")) {
char b[CRYPTO_HEX_SIZE], c[CRYPTO_HEX_SIZE];
Debug("cache_seek", "Seek @ %" PRId64 " in %s from #%d @ %" PRId64 "/%d:%s", seek_to, first_key.toHexStr(b), fragment,
doc_pos, doc->len, doc->key.toHexStr(c));
}
/* Because single fragment objects can migrate to hang off an alt vector
they can appear to the VC as multi-fragment when they are not really.
The essential difference is the existence of a fragment table.
*/
if (frags) {
int target = 0;
HTTPInfo::FragOffset next_off = frags[target];
int lfi = static_cast<int>(alternate.get_frag_offset_count()) - 1;
ink_assert(lfi >= 0); // because it's not a single frag doc.
/* Note: frag[i].offset is the offset of the first byte past the
i'th fragment. So frag[0].offset is the offset of the first
byte of fragment 1. In addition the # of fragments is one
more than the fragment table length, the start of the last
fragment being the last offset in the table.
*/
if (fragment == 0 || seek_to < frags[fragment - 1] || (fragment <= lfi && frags[fragment] <= seek_to)) {
// search from frag 0 on to find the proper frag
while (seek_to >= next_off && target < lfi) {
next_off = frags[++target];
}
if (target == lfi && seek_to >= next_off) {
++target;
}
} else { // shortcut if we are in the fragment already
target = fragment;
}
if (target != fragment) {
// Lread will read the next fragment always, so if that
// is the one we want, we don't need to do anything
int cfi = fragment;
--target;
while (target > fragment) {
next_CacheKey(&key, &key);
++fragment;
}
while (target < fragment) {
prev_CacheKey(&key, &key);
--fragment;
}
if (is_debug_tag_set("cache_seek")) {
char target_key_str[CRYPTO_HEX_SIZE];
key.toHexStr(target_key_str);
Debug("cache_seek", "Seek #%d @ %" PRId64 " -> #%d @ %" PRId64 ":%s", cfi, doc_pos, target, seek_to, target_key_str);
}
goto Lread;
}
}
doc_pos = doc->prefix_len() + seek_to;
if (fragment && frags) {
doc_pos -= static_cast<int64_t>(frags[fragment - 1]);
}
vio.ndone = 0;
seek_to = 0;
ntodo = vio.ntodo();
bytes = doc->len - doc_pos;
if (is_debug_tag_set("cache_seek")) {
char target_key_str[CRYPTO_HEX_SIZE];
Debug("cache_seek", "Read # %d @ %" PRId64 "/%d for %" PRId64 " %s", fragment, doc_pos, doc->len, bytes,
key.toHexStr(target_key_str));
}
// This shouldn't happen for HTTP assets but it does
// occasionally in production. This is a temporary fix
// to clean up broken objects until the root cause can
// be found. It must be the case that either the fragment
// offsets are incorrect or a fragment table isn't being
// created when it should be.
if (frag_type == CACHE_FRAG_TYPE_HTTP && bytes < 0) {
char xt[CRYPTO_HEX_SIZE];
char yt[CRYPTO_HEX_SIZE];
int url_length = 0;
char const *url_text = nullptr;
if (request.valid()) {
url_text = request.url_get()->string_get_ref(&url_length);
}
int64_t prev_frag_size = 0;
if (fragment && frags) {
prev_frag_size = static_cast<int64_t>(frags[fragment - 1]);
}
Warning("cache_seek range request bug: read %s targ %s - %s frag # %d (prev_frag %" PRId64 ") @ %" PRId64 "/%d for %" PRId64
" tot %" PRId64 " url '%.*s'",
doc->key.toHexStr(xt), key.toHexStr(yt), f.single_fragment ? "single" : "multi", fragment, prev_frag_size, doc_pos,
doc->len, bytes, doc->total_len, url_length, url_text);
doc->magic = DOC_CORRUPT;
CACHE_INCREMENT_DYN_STAT(cache_read_seek_fail_stat);
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
SET_HANDLER(&CacheVC::openReadDirDelete);
VC_SCHED_LOCK_RETRY();
}
dir_delete(&earliest_key, vol, &earliest_dir);
goto Lerror;
}
}
if (ntodo <= 0) {
return EVENT_CONT;
}
if (vio.buffer.writer()->max_read_avail() > vio.buffer.writer()->water_mark && vio.ndone) { // initiate read of first block
return EVENT_CONT;
}
if ((bytes <= 0) && vio.ntodo() >= 0) {
goto Lread;
}
if (bytes > vio.ntodo()) {
bytes = vio.ntodo();
}
b = new_IOBufferBlock(buf, bytes, doc_pos);
b->_buf_end = b->_end;
vio.buffer.writer()->append_block(b);
vio.ndone += bytes;
doc_pos += bytes;
if (vio.ntodo() <= 0) {
return calluser(VC_EVENT_READ_COMPLETE);
} else {
if (calluser(VC_EVENT_READ_READY) == EVENT_DONE) {
return EVENT_DONE;
}
// we have to keep reading until we give the user all the
// bytes it wanted or we hit the watermark.
if (vio.ntodo() > 0 && !vio.buffer.writer()->high_water()) {
goto Lread;
}
return EVENT_CONT;
}
Lread : {
if (vio.ndone >= static_cast<int64_t>(doc_len)) {
// reached the end of the document and the user still wants more
return calluser(VC_EVENT_EOS);
}
last_collision = nullptr;
writer_lock_retry = 0;
// if the state machine calls reenable on the callback from the cache,
// we set up a schedule_imm event. The openReadReadDone discards
// EVENT_IMMEDIATE events. So, we have to cancel that trigger and set
// a new EVENT_INTERVAL event.
cancel_trigger();
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
SET_HANDLER(&CacheVC::openReadMain);
VC_SCHED_LOCK_RETRY();
}
if (dir_probe(&key, vol, &dir, &last_collision)) {
SET_HANDLER(&CacheVC::openReadReadDone);
int ret = do_read_call(&key);
if (ret == EVENT_RETURN) {
goto Lcallreturn;
}
return EVENT_CONT;
} else if (write_vc) {
if (writer_done()) {
last_collision = nullptr;
while (dir_probe(&earliest_key, vol, &dir, &last_collision)) {
if (dir_offset(&dir) == dir_offset(&earliest_dir)) {
DDebug("cache_read_agg", "%p: key: %X ReadMain complete: %d", this, first_key.slice32(1), (int)vio.ndone);
doc_len = vio.ndone;
goto Leos;
}
}
DDebug("cache_read_agg", "%p: key: %X ReadMain writer aborted: %d", this, first_key.slice32(1), (int)vio.ndone);
goto Lerror;
}
DDebug("cache_read_agg", "%p: key: %X ReadMain retrying: %d", this, first_key.slice32(1), (int)vio.ndone);
SET_HANDLER(&CacheVC::openReadMain);
VC_SCHED_WRITER_RETRY();
}
if (is_action_tag_set("cache")) {
ink_release_assert(false);
}
Warning("Document %X truncated at %d of %d, missing fragment %X", first_key.slice32(1), (int)vio.ndone, (int)doc_len,
key.slice32(1));
// remove the directory entry
dir_delete(&earliest_key, vol, &earliest_dir);
}
Lerror:
return calluser(VC_EVENT_ERROR);
Leos:
return calluser(VC_EVENT_EOS);
Lcallreturn:
return handleEvent(AIO_EVENT_DONE, nullptr);
}
/*
This code follows CacheVC::openReadStartHead closely,
if you change this you might have to change that.
*/
int
CacheVC::openReadStartEarliest(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
int ret = 0;
Doc *doc = nullptr;
cancel_trigger();
set_io_not_in_progress();
if (_action.cancelled) {
return free_CacheVC(this);
}
{
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
VC_SCHED_LOCK_RETRY();
}
if (!buf) {
goto Lread;
}
if (!io.ok()) {
goto Ldone;
}
// an object needs to be outside the aggregation window in order to be
// be evacuated as it is read
if (!dir_agg_valid(vol, &dir)) {
// a directory entry which is no longer valid may have been overwritten
if (!dir_valid(vol, &dir)) {
last_collision = nullptr;
}
goto Lread;
}
doc = reinterpret_cast<Doc *>(buf->data());
if (doc->magic != DOC_MAGIC) {
char tmpstring[CRYPTO_HEX_SIZE];
if (is_action_tag_set("cache")) {
ink_release_assert(false);
}
if (doc->magic == DOC_CORRUPT) {
Warning("Earliest: Doc checksum does not match for %s", key.toHexStr(tmpstring));
} else {
Warning("Earliest : Doc magic does not match for %s", key.toHexStr(tmpstring));
}
// remove the dir entry
dir_delete(&key, vol, &dir);
// try going through the directory entries again
// in case the dir entry we deleted doesnt correspond
// to the key we are looking for. This is possible
// because of directory collisions
last_collision = nullptr;
goto Lread;
}
if (!(doc->key == key)) { // collision
goto Lread;
}
// success
earliest_key = key;
doc_pos = doc->prefix_len();
next_CacheKey(&key, &doc->key);
vol->begin_read(this);
if (vol->within_hit_evacuate_window(&earliest_dir) &&
(!cache_config_hit_evacuate_size_limit || doc_len <= static_cast<uint64_t>(cache_config_hit_evacuate_size_limit))) {
DDebug("cache_hit_evac", "dir: %" PRId64 ", write: %" PRId64 ", phase: %d", dir_offset(&earliest_dir),
vol->offset_to_vol_offset(vol->header->write_pos), vol->header->phase);
f.hit_evacuate = 1;
}
goto Lsuccess;
Lread:
if (dir_probe(&key, vol, &earliest_dir, &last_collision) || dir_lookaside_probe(&key, vol, &earliest_dir, nullptr)) {
dir = earliest_dir;
if ((ret = do_read_call(&key)) == EVENT_RETURN) {
goto Lcallreturn;
}
return ret;
}
// read has detected that alternate does not exist in the cache.
// rewrite the vector.
if (!f.read_from_writer_called && frag_type == CACHE_FRAG_TYPE_HTTP) {
// don't want any writers while we are evacuating the vector
if (!vol->open_write(this, false, 1)) {
Doc *doc1 = reinterpret_cast<Doc *>(first_buf->data());
uint32_t len = this->load_http_info(write_vector, doc1);
ink_assert(len == doc1->hlen && write_vector->count() > 0);
write_vector->remove(alternate_index, true);
// if the vector had one alternate, delete it's directory entry
if (len != doc1->hlen || !write_vector->count()) {
// sometimes the delete fails when there is a race and another read
// finds that the directory entry has been overwritten
// (cannot assert on the return value)
dir_delete(&first_key, vol, &first_dir);
} else {
buf = nullptr;
last_collision = nullptr;
write_len = 0;
header_len = write_vector->marshal_length();
f.evac_vector = 1;
f.use_first_key = 1;
key = first_key;
// always use od->first_dir to overwrite a directory.
// If an evacuation happens while a vector is being updated
// the evacuator changes the od->first_dir to the new directory
// that it inserted
od->first_dir = first_dir;
od->writing_vec = true;
earliest_key = zero_key;
// set up this VC as a alternate delete write_vc
vio.op = VIO::WRITE;
total_len = 0;
f.update = 1;
alternate_index = CACHE_ALT_REMOVED;
/////////////////////////////////////////////////////////////////
// change to create a directory entry for a resident alternate //
// when another alternate does not exist. //
/////////////////////////////////////////////////////////////////
if (doc1->total_len > 0) {
od->move_resident_alt = true;
od->single_doc_key = doc1->key;
dir_assign(&od->single_doc_dir, &dir);
dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
}
SET_HANDLER(&CacheVC::openReadVecWrite);
if ((ret = do_write_call()) == EVENT_RETURN) {
goto Lcallreturn;
}
return ret;
}
}
}
// open write failure - another writer, so don't modify the vector
Ldone:
if (od) {
vol->close_write(this);
}
}
CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
_action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-ECACHE_NO_DOC);
return free_CacheVC(this);
Lcallreturn:
return handleEvent(AIO_EVENT_DONE, nullptr); // hopefully a tail call
Lsuccess:
if (write_vc) {
CACHE_INCREMENT_DYN_STAT(cache_read_busy_success_stat);
}
SET_HANDLER(&CacheVC::openReadMain);
return callcont(CACHE_EVENT_OPEN_READ);
}
// create the directory entry after the vector has been evacuated
// the volume lock has been taken when this function is called
int
CacheVC::openReadVecWrite(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
cancel_trigger();
set_io_not_in_progress();
ink_assert(od);
od->writing_vec = false;
if (_action.cancelled) {
return openWriteCloseDir(EVENT_IMMEDIATE, nullptr);
}
{
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
VC_SCHED_LOCK_RETRY();
}
if (io.ok()) {
ink_assert(f.evac_vector);
ink_assert(frag_type == CACHE_FRAG_TYPE_HTTP);
ink_assert(!buf);
f.evac_vector = false;
last_collision = nullptr;
f.update = 0;
alternate_index = CACHE_ALT_INDEX_DEFAULT;
f.use_first_key = 0;
vio.op = VIO::READ;
dir_overwrite(&first_key, vol, &dir, &od->first_dir);
if (od->move_resident_alt) {
dir_insert(&od->single_doc_key, vol, &od->single_doc_dir);
}
int alt_ndx = HttpTransactCache::SelectFromAlternates(write_vector, &request, params);
vol->close_write(this);
if (alt_ndx >= 0) {
vector.clear();
// we don't need to start all over again, since we already
// have the vector in memory. But this is simpler and this
// case is rare.
goto Lrestart;
}
} else {
vol->close_write(this);
}
}
CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
_action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-ECACHE_ALT_MISS);
return free_CacheVC(this);
Lrestart:
SET_HANDLER(&CacheVC::openReadStartHead);
return openReadStartHead(EVENT_IMMEDIATE, nullptr);
}
/*
This code follows CacheVC::openReadStartEarliest closely,
if you change this you might have to change that.
*/
int
CacheVC::openReadStartHead(int event, Event *e)
{
intptr_t err = ECACHE_NO_DOC;
Doc *doc = nullptr;
cancel_trigger();
set_io_not_in_progress();
if (_action.cancelled) {
return free_CacheVC(this);
}
{
CACHE_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
VC_SCHED_LOCK_RETRY();
}
if (!buf) {
goto Lread;
}
if (!io.ok()) {
goto Ldone;
}
// an object needs to be outside the aggregation window in order to be
// be evacuated as it is read
if (!dir_agg_valid(vol, &dir)) {
// a directory entry which is no longer valid may have been overwritten
if (!dir_valid(vol, &dir)) {
last_collision = nullptr;
}
goto Lread;
}
doc = reinterpret_cast<Doc *>(buf->data());
if (doc->magic != DOC_MAGIC) {
char tmpstring[CRYPTO_HEX_SIZE];
if (is_action_tag_set("cache")) {
ink_release_assert(false);
}
if (doc->magic == DOC_CORRUPT) {
Warning("Head: Doc checksum does not match for %s", key.toHexStr(tmpstring));
} else {
Warning("Head : Doc magic does not match for %s", key.toHexStr(tmpstring));
}
// remove the dir entry
dir_delete(&key, vol, &dir);
// try going through the directory entries again
// in case the dir entry we deleted doesnt correspond
// to the key we are looking for. This is possible
// because of directory collisions
last_collision = nullptr;
goto Lread;
}
if (!(doc->first_key == key)) {
goto Lread;
}
if (f.lookup) {
goto Lookup;
}
earliest_dir = dir;
CacheHTTPInfo *alternate_tmp;
if (frag_type == CACHE_FRAG_TYPE_HTTP) {
uint32_t uml;
ink_assert(doc->hlen);
if (!doc->hlen) {
goto Ldone;
}
if ((uml = this->load_http_info(&vector, doc)) != doc->hlen) {
if (buf) {
HTTPCacheAlt *alt = reinterpret_cast<HTTPCacheAlt *>(doc->hdr());
int32_t alt_length = 0;
// count should be reasonable, as vector is initialized and unlikely to be too corrupted
// by bad disk data - count should be the number of successfully unmarshalled alts.
for (int32_t i = 0; i < vector.count(); ++i) {
CacheHTTPInfo *info = vector.get(i);
if (info && info->m_alt) {
alt_length += info->m_alt->m_unmarshal_len;
}
}
Note("OpenReadHead failed for cachekey %X : vector inconsistency - "
"unmarshalled %d expecting %d in %d (base=%zu, ver=%d:%d) "
"- vector n=%d size=%d"
"first alt=%d[%s]",
key.slice32(0), uml, doc->hlen, doc->len, sizeof(Doc), doc->v_major, doc->v_minor, vector.count(), alt_length,
alt->m_magic,
(CACHE_ALT_MAGIC_ALIVE == alt->m_magic ?
"alive" :
CACHE_ALT_MAGIC_MARSHALED == alt->m_magic ? "serial" : CACHE_ALT_MAGIC_DEAD == alt->m_magic ? "dead" : "bogus"));
dir_delete(&key, vol, &dir);
}
err = ECACHE_BAD_META_DATA;
goto Ldone;
}
if (cache_config_select_alternate) {
alternate_index = HttpTransactCache::SelectFromAlternates(&vector, &request, params);
if (alternate_index < 0) {
err = ECACHE_ALT_MISS;
goto Ldone;
}
} else {
alternate_index = 0;
}
alternate_tmp = vector.get(alternate_index);
if (!alternate_tmp->valid()) {
if (buf) {
Note("OpenReadHead failed for cachekey %X : alternate inconsistency", key.slice32(0));
dir_delete(&key, vol, &dir);
}
goto Ldone;
}
alternate.copy_shallow(alternate_tmp);
alternate.object_key_get(&key);
doc_len = alternate.object_size_get();
if (key == doc->key) { // is this my data?
f.single_fragment = doc->single_fragment();
ink_assert(f.single_fragment); // otherwise need to read earliest
ink_assert(doc->hlen);
doc_pos = doc->prefix_len();
next_CacheKey(&key, &doc->key);
} else {
f.single_fragment = false;
}
} else {
next_CacheKey(&key, &doc->key);
f.single_fragment = doc->single_fragment();
doc_pos = doc->prefix_len();
doc_len = doc->total_len;
}
if (is_debug_tag_set("cache_read")) { // amc debug
char xt[CRYPTO_HEX_SIZE], yt[CRYPTO_HEX_SIZE];
Debug("cache_read", "CacheReadStartHead - read %s target %s - %s %d of %" PRId64 " bytes, %d fragments",
doc->key.toHexStr(xt), key.toHexStr(yt), f.single_fragment ? "single" : "multi", doc->len, doc->total_len,
alternate.get_frag_offset_count());
}
// the first fragment might have been gc'ed. Make sure the first
// fragment is there before returning CACHE_EVENT_OPEN_READ
if (!f.single_fragment) {
goto Learliest;
}
if (vol->within_hit_evacuate_window(&dir) &&
(!cache_config_hit_evacuate_size_limit || doc_len <= static_cast<uint64_t>(cache_config_hit_evacuate_size_limit))) {
DDebug("cache_hit_evac", "dir: %" PRId64 ", write: %" PRId64 ", phase: %d", dir_offset(&dir),
vol->offset_to_vol_offset(vol->header->write_pos), vol->header->phase);
f.hit_evacuate = 1;
}
first_buf = buf;
vol->begin_read(this);
goto Lsuccess;
Lread:
// check for collision
// INKqa07684 - Cache::lookup returns CACHE_EVENT_OPEN_READ_FAILED.
// don't want to go through this BS of reading from a writer if
// its a lookup. In this case lookup will fail while the document is
// being written to the cache.
OpenDirEntry *cod = vol->open_read(&key);
if (cod && !f.read_from_writer_called) {
if (f.lookup) {
err = ECACHE_DOC_BUSY;
goto Ldone;
}
od = cod;
MUTEX_RELEASE(lock);
SET_HANDLER(&CacheVC::openReadFromWriter);
return handleEvent(EVENT_IMMEDIATE, nullptr);
}
if (dir_probe(&key, vol, &dir, &last_collision)) {
first_dir = dir;
int ret = do_read_call(&key);
if (ret == EVENT_RETURN) {
goto Lcallreturn;
}
return ret;
}
}
Ldone:
if (!f.lookup) {
CACHE_INCREMENT_DYN_STAT(cache_read_failure_stat);
_action.continuation->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, (void *)-err);
} else {
CACHE_INCREMENT_DYN_STAT(cache_lookup_failure_stat);
_action.continuation->handleEvent(CACHE_EVENT_LOOKUP_FAILED, (void *)-err);
}
return free_CacheVC(this);
Lcallreturn:
return handleEvent(AIO_EVENT_DONE, nullptr); // hopefully a tail call
Lsuccess:
SET_HANDLER(&CacheVC::openReadMain);
return callcont(CACHE_EVENT_OPEN_READ);
Lookup:
CACHE_INCREMENT_DYN_STAT(cache_lookup_success_stat);
_action.continuation->handleEvent(CACHE_EVENT_LOOKUP, nullptr);
return free_CacheVC(this);
Learliest:
first_buf = buf;
buf = nullptr;
earliest_key = key;
last_collision = nullptr;
SET_HANDLER(&CacheVC::openReadStartEarliest);
return openReadStartEarliest(event, e);
}
/*
Handle a directory delete event in case of some detected corruption.
*/
int
CacheVC::openReadDirDelete(int event, Event *e)
{
MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
VC_SCHED_LOCK_RETRY();
}
dir_delete(&earliest_key, vol, &earliest_dir);
return calluser(VC_EVENT_ERROR);
}