blob: d230463e1b13adb3c7fd658223af212edf5d863f [file] [log] [blame]
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "iocore/cache/Cache.h"
#include "iocore/cache/CacheDefs.h"
#include "P_CacheDisk.h"
#include "P_CacheDoc.h"
#include "P_CacheHttp.h"
#include "P_CacheInternal.h"
#include "Stripe.h"
// must be included after the others
#include "CacheVC.h"
// hdrs
#include "proxy/hdrs/HTTP.h"
#include "proxy/hdrs/MIME.h"
// aio
#include "iocore/aio/AIO.h"
#include "tscore/InkErrno.h"
// tsapi
#if DEBUG
#include "tsutil/Metrics.h"
#endif
#include "tscore/Version.h"
// inkevent
#include "iocore/eventsystem/Continuation.h"
#include "iocore/eventsystem/EThread.h"
#include "iocore/eventsystem/Event.h"
#include "iocore/eventsystem/IOBuffer.h"
#include "iocore/eventsystem/Lock.h"
#include "iocore/eventsystem/VIO.h"
// tscore
#include "tscore/ink_assert.h"
#include "tscore/ink_hrtime.h"
#include "tscore/Ptr.h"
// ts
#include "tsutil/DbgCtl.h"
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <ctime>
namespace
{
DbgCtl dbg_ctl_cache_bc{"cache_bc"};
DbgCtl dbg_ctl_cache_disk_error{"cache_disk_error"};
DbgCtl dbg_ctl_cache_read{"cache_read"};
DbgCtl dbg_ctl_cache_scan{"cache_scan"};
DbgCtl dbg_ctl_cache_scan_truss{"cache_scan_truss"};
#ifdef DEBUG
DbgCtl dbg_ctl_cache_close{"cache_close"};
DbgCtl dbg_ctl_cache_reenable{"cache_reenable"};
#endif
} // end anonymous namespace
// Compilation Options
#define SCAN_BUF_SIZE RECOVERY_SIZE
#define SCAN_WRITER_LOCK_MAX_RETRY 5
#define STORE_COLLISION 1
#define USELESS_REENABLES // allow them for now
extern int64_t cache_config_ram_cache_cutoff;
/* Next block with some data in it in this partition. Returns end of partition if no more
* locations.
*
* d - Stripe
* vol_map - precalculated map
* offset - offset to start looking at (and data at this location has not been read yet). */
static off_t
next_in_map(Stripe *stripe, char *vol_map, off_t offset)
{
off_t start_offset = stripe->vol_offset_to_offset(0);
off_t new_off = (offset - start_offset);
off_t vol_len = stripe->vol_relative_length(start_offset);
while (new_off < vol_len && !vol_map[new_off / SCAN_BUF_SIZE]) {
new_off += SCAN_BUF_SIZE;
}
if (new_off >= vol_len) {
return vol_len + start_offset;
}
return new_off + start_offset;
}
// Function in CacheDir.cc that we need for make_vol_map().
int dir_bucket_loop_fix(Dir *start_dir, int s, Directory *directory);
// TODO: If we used a bit vector, we could make a smaller map structure.
// TODO: If we saved a high water mark we could have a smaller buf, and avoid searching it
// when we are asked about the highest interesting offset.
/* Make map of what blocks in partition are used.
*
* d - Stripe to make a map of. */
static char *
make_vol_map(Stripe *stripe)
{
// Map will be one byte for each SCAN_BUF_SIZE bytes.
off_t start_offset = stripe->vol_offset_to_offset(0);
off_t vol_len = stripe->vol_relative_length(start_offset);
size_t map_len = (vol_len + (SCAN_BUF_SIZE - 1)) / SCAN_BUF_SIZE;
char *vol_map = static_cast<char *>(ats_malloc(map_len));
memset(vol_map, 0, map_len);
// Scan directories.
// Copied from dir_entries_used() and modified to fill in the map instead.
for (int s = 0; s < stripe->directory.segments; s++) {
Dir *seg = stripe->directory.get_segment(s);
for (int b = 0; b < stripe->directory.buckets; b++) {
Dir *e = dir_bucket(b, seg);
if (dir_bucket_loop_fix(e, s, &stripe->directory)) {
break;
}
while (e) {
if (dir_offset(e) && stripe->dir_valid(e) && stripe->dir_agg_valid(e) && dir_head(e)) {
off_t offset = stripe->vol_offset(e) - start_offset;
if (offset <= vol_len) {
vol_map[offset / SCAN_BUF_SIZE] = 1;
}
}
e = next_dir(e, seg);
if (!e) {
break;
}
}
}
}
return vol_map;
}
int CacheVC::size_to_init = -1;
CacheVC::CacheVC()
{
// Initialize Region C
size_to_init = sizeof(CacheVC) - reinterpret_cast<size_t>(&(static_cast<CacheVC *>(nullptr))->vio);
memset(reinterpret_cast<void *>(&vio), 0, size_to_init);
}
VIO *
CacheVC::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *abuf)
{
ink_assert(vio.op == VIO::READ);
vio.set_writer(abuf);
vio.set_continuation(c);
vio.ndone = 0;
vio.nbytes = nbytes;
vio.vc_server = this;
#ifdef DEBUG
ink_assert(!c || c->mutex->thread_holding);
#endif
if (c && !trigger && !recursive) {
trigger = c->mutex->thread_holding->schedule_imm_local(this);
}
return &vio;
}
VIO *
CacheVC::do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *abuf, int64_t offset)
{
ink_assert(vio.op == VIO::READ);
vio.set_writer(abuf);
vio.set_continuation(c);
vio.ndone = 0;
vio.nbytes = nbytes;
vio.vc_server = this;
seek_to = offset;
#ifdef DEBUG
ink_assert(c->mutex->thread_holding);
#endif
if (!trigger && !recursive) {
trigger = c->mutex->thread_holding->schedule_imm_local(this);
}
return &vio;
}
VIO *
CacheVC::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuf, bool owner)
{
ink_assert(vio.op == VIO::WRITE);
ink_assert(!owner);
vio.set_reader(abuf);
vio.set_continuation(c);
vio.ndone = 0;
vio.nbytes = nbytes;
vio.vc_server = this;
#ifdef DEBUG
ink_assert(!c || c->mutex->thread_holding);
#endif
if (c && !trigger && !recursive) {
trigger = c->mutex->thread_holding->schedule_imm_local(this);
}
return &vio;
}
void
CacheVC::do_io_close(int alerrno)
{
ink_assert(mutex->thread_holding == this_ethread());
int previous_closed = closed;
closed = (alerrno == -1) ? 1 : -1; // Stupid default arguments
DDbg(dbg_ctl_cache_close, "do_io_close %p %d %d", this, alerrno, closed);
if (!previous_closed && !recursive) {
die();
}
}
void
CacheVC::reenable(VIO *avio)
{
DDbg(dbg_ctl_cache_reenable, "reenable %p", this);
#ifdef DEBUG
ink_assert(avio->mutex->thread_holding);
#endif
if (!trigger) {
#ifndef USELESS_REENABLES
if (vio.op == VIO::READ) {
if (vio.buffer.mbuf->max_read_avail() > vio.get_writer->water_mark)
ink_assert(!"useless reenable of cache read");
} else if (!vio.get_reader()->read_avail())
ink_assert(!"useless reenable of cache write");
#endif
trigger = avio->mutex->thread_holding->schedule_imm_local(this);
}
}
void
CacheVC::reenable_re(VIO *avio)
{
DDbg(dbg_ctl_cache_reenable, "reenable_re %p", this);
#ifdef DEBUG
ink_assert(avio->mutex->thread_holding);
#endif
if (!trigger) {
if (!is_io_in_progress() && !recursive) {
handleEvent(EVENT_NONE);
} else {
trigger = avio->mutex->thread_holding->schedule_imm_local(this);
}
}
}
bool
CacheVC::get_data(int i, void *data)
{
switch (i) {
case CACHE_DATA_HTTP_INFO:
*(static_cast<CacheHTTPInfo **>(data)) = &alternate;
return true;
case CACHE_DATA_RAM_CACHE_HIT_FLAG:
*(static_cast<int *>(data)) = f.doc_from_ram_cache;
return true;
default:
break;
}
return false;
}
int64_t
CacheVC::get_object_size()
{
return (this)->doc_len;
}
bool
CacheVC::set_data(int /* i ATS_UNUSED */, void * /* data */)
{
ink_assert(!"CacheVC::set_data should not be called!");
return true;
}
int
CacheVC::dead(int /* event ATS_UNUSED */, Event * /*e ATS_UNUSED */)
{
ink_assert(0);
return EVENT_DONE;
}
static void
unmarshal_helper(Doc *doc, Ptr<IOBufferData> &buf, int &okay)
{
using UnmarshalFunc = int(char *buf, int len, RefCountObj *block_ref);
UnmarshalFunc *unmarshal_func = &HTTPInfo::unmarshal;
ts::VersionNumber version(doc->v_major, doc->v_minor);
// introduced by https://github.com/apache/trafficserver/pull/4874, this is used to distinguish the doc version
// before and after #4847
if (version < CACHE_DB_VERSION) {
unmarshal_func = &HTTPInfo::unmarshal_v24_1;
}
char *tmp = doc->hdr();
int len = doc->hlen;
while (len > 0) {
int r = unmarshal_func(tmp, len, buf.get());
if (r < 0) {
ink_assert(!"CacheVC::handleReadDone unmarshal failed");
okay = 0;
break;
}
len -= r;
tmp += r;
}
}
// [amc] I think this is where all disk reads from cache funnel through here.
int
CacheVC::handleReadDone(int event, Event * /* e ATS_UNUSED */)
{
cancel_trigger();
ink_assert(this_ethread() == mutex->thread_holding);
Doc *doc = nullptr;
if (event == AIO_EVENT_DONE) {
set_io_not_in_progress();
} else if (is_io_in_progress()) {
return EVENT_CONT;
}
if (DISK_BAD(stripe->disk)) {
io.aio_result = -1;
Error("Canceling cache read: disk %s is bad.", stripe->hash_text.get());
goto Ldone;
}
{
MUTEX_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
VC_SCHED_LOCK_RETRY();
}
if ((!stripe->dir_valid(&dir)) || (!io.ok())) {
if (!io.ok()) {
Dbg(dbg_ctl_cache_disk_error, "Read error on disk %s\n \
read range : [%" PRIu64 " - %" PRIu64 " bytes] [%" PRIu64 " - %" PRIu64 " blocks] \n",
stripe->hash_text.get(), (uint64_t)io.aiocb.aio_offset, (uint64_t)io.aiocb.aio_offset + io.aiocb.aio_nbytes,
(uint64_t)io.aiocb.aio_offset / 512, (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / 512);
}
goto Ldone;
}
doc = reinterpret_cast<Doc *>(buf->data());
ink_assert(stripe->mutex->nthread_holding < 1000);
ink_assert(doc->magic == DOC_MAGIC);
if (ts::VersionNumber(doc->v_major, doc->v_minor) > CACHE_DB_VERSION) {
// future version, count as corrupted
doc->magic = DOC_CORRUPT;
Dbg(dbg_ctl_cache_bc, "Object is future version %d:%d - disk %s - doc id = %" PRIx64 ":%" PRIx64 "", doc->v_major,
doc->v_minor, stripe->hash_text.get(), read_key->slice64(0), read_key->slice64(1));
goto Ldone;
}
if (dbg_ctl_cache_read.on()) {
char xt[CRYPTO_HEX_SIZE];
Dbg(dbg_ctl_cache_read,
"Read complete on fragment %s. Length: data payload=%d this fragment=%d total doc=%" PRId64 " prefix=%d",
doc->key.toHexStr(xt), doc->data_len(), doc->len, doc->total_len, doc->prefix_len());
}
// put into ram cache?
if (io.ok() && ((doc->first_key == *read_key) || (doc->key == *read_key) || STORE_COLLISION) && doc->magic == DOC_MAGIC) {
int okay = 1;
if (cache_config_enable_checksum && doc->checksum != DOC_NO_CHECKSUM) {
// verify that the checksum matches
uint32_t checksum = 0;
for (char *b = doc->hdr(); b < reinterpret_cast<char *>(doc) + doc->len; b++) {
checksum += *b;
}
ink_assert(checksum == doc->checksum);
if (checksum != doc->checksum) {
Note("cache: checksum error for [%" PRIu64 " %" PRIu64 "] len %d, hlen %d, disk %s, offset %" PRIu64 " size %zu",
doc->first_key.b[0], doc->first_key.b[1], doc->len, doc->hlen, stripe->disk->path, (uint64_t)io.aiocb.aio_offset,
(size_t)io.aiocb.aio_nbytes);
doc->magic = DOC_CORRUPT;
okay = 0;
}
}
bool http_copy_hdr = false;
http_copy_hdr =
cache_config_ram_cache_compress && !f.doc_from_ram_cache && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen;
// If http doc we need to unmarshal the headers before putting in the ram cache
// unless it could be compressed
if (!http_copy_hdr && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen && okay) {
unmarshal_helper(doc, buf, okay);
}
// Put the request in the ram cache only if its a open_read or lookup
if (vio.op == VIO::READ && okay) {
bool cutoff_check;
// cutoff_check :
// doc_len == 0 for the first fragment (it is set from the vector)
// The decision on the first fragment is based on
// doc->total_len
// After that, the decision is based of doc_len (doc_len != 0)
// (cache_config_ram_cache_cutoff == 0) : no cutoffs
cutoff_check =
((!doc_len && static_cast<int64_t>(doc->total_len) < cache_config_ram_cache_cutoff) ||
(doc_len && static_cast<int64_t>(doc_len) < cache_config_ram_cache_cutoff) || !cache_config_ram_cache_cutoff);
if (cutoff_check && !f.doc_from_ram_cache) {
uint64_t o = dir_offset(&dir);
stripe->ram_cache->put(read_key, buf.get(), doc->len, http_copy_hdr, o);
}
if (!doc_len) {
// keep a pointer to it. In case the state machine decides to
// update this document, we don't have to read it back in memory
// again
stripe->first_fragment_key = *read_key;
stripe->first_fragment_offset = dir_offset(&dir);
stripe->first_fragment_data = buf;
}
} // end VIO::READ check
// If it could be compressed, unmarshal after
if (http_copy_hdr && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen && okay) {
unmarshal_helper(doc, buf, okay);
}
} // end io.ok() check
}
Ldone:
POP_HANDLER;
return handleEvent(AIO_EVENT_DONE, nullptr);
}
int
CacheVC::handleRead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
cancel_trigger();
f.doc_from_ram_cache = false;
ink_assert(stripe->mutex->thread_holding == this_ethread());
if (load_from_ram_cache()) {
goto LramHit;
} else if (load_from_last_open_read_call()) {
goto LmemHit;
} else if (load_from_aggregation_buffer()) {
f.doc_from_ram_cache = true;
io.aio_result = io.aiocb.aio_nbytes;
SET_HANDLER(&CacheVC::handleReadDone);
return EVENT_RETURN;
}
ts::Metrics::Counter::increment(cache_rsb.all_mem_misses);
ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.all_mem_misses);
io.aiocb.aio_fildes = stripe->fd;
io.aiocb.aio_offset = stripe->vol_offset(&dir);
if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > static_cast<off_t>(stripe->skip + stripe->len)) {
io.aiocb.aio_nbytes = stripe->skip + stripe->len - io.aiocb.aio_offset;
}
buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
io.aiocb.aio_buf = buf->data();
io.action = this;
io.thread = mutex->thread_holding->tt == DEDICATED ? AIO_CALLBACK_THREAD_ANY : mutex->thread_holding;
SET_HANDLER(&CacheVC::handleReadDone);
ink_assert(ink_aio_read(&io) >= 0);
// ToDo: Why are these for debug only ??
#if DEBUG
ts::Metrics::Counter::increment(cache_rsb.pread_count);
ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.pread_count);
#endif
return EVENT_CONT;
LramHit: {
f.doc_from_ram_cache = true;
io.aio_result = io.aiocb.aio_nbytes;
Doc *doc = reinterpret_cast<Doc *>(buf->data());
if (cache_config_ram_cache_compress && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen) {
SET_HANDLER(&CacheVC::handleReadDone);
return EVENT_RETURN;
}
}
LmemHit:
f.doc_from_ram_cache = true;
io.aio_result = io.aiocb.aio_nbytes;
POP_HANDLER;
return EVENT_RETURN; // allow the caller to release the volume lock
}
bool
CacheVC::load_from_ram_cache()
{
int64_t o = dir_offset(&this->dir);
int ram_hit_state = this->stripe->ram_cache->get(read_key, &this->buf, static_cast<uint64_t>(o));
f.compressed_in_ram = (ram_hit_state > RAM_HIT_COMPRESS_NONE) ? 1 : 0;
return ram_hit_state >= RAM_HIT_COMPRESS_NONE;
}
bool
CacheVC::load_from_last_open_read_call()
{
if (*this->read_key == this->stripe->first_fragment_key && dir_offset(&this->dir) == this->stripe->first_fragment_offset) {
this->buf = this->stripe->first_fragment_data;
ts::Metrics::Counter::increment(cache_rsb.last_open_read_hits);
ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.last_open_read_hits);
return true;
}
return false;
}
bool
CacheVC::load_from_aggregation_buffer()
{
if (!this->stripe->dir_agg_buf_valid(&this->dir)) {
return false;
}
this->buf = new_IOBufferData(iobuffer_size_to_index(this->io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
char *doc = this->buf->data();
[[maybe_unused]] bool success = this->stripe->copy_from_aggregate_write_buffer(doc, dir, this->io.aiocb.aio_nbytes);
// We already confirmed that the copy was valid, so it should not fail.
ink_assert(success);
ts::Metrics::Counter::increment(cache_rsb.agg_buffer_hits);
ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.agg_buffer_hits);
return true;
}
int
CacheVC::removeEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
cancel_trigger();
set_io_not_in_progress();
{
MUTEX_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
VC_SCHED_LOCK_RETRY();
}
if (_action.cancelled) {
if (od) {
stripe->close_write(this);
od = nullptr;
}
goto Lfree;
}
if (!f.remove_aborted_writers) {
if (stripe->open_write(this, true, 1)) {
// writer exists
od = stripe->open_read(&key);
ink_release_assert(od);
od->dont_update_directory = true;
od = nullptr;
} else {
od->dont_update_directory = true;
}
f.remove_aborted_writers = 1;
}
Lread:
SET_HANDLER(&CacheVC::removeEvent);
if (!buf) {
goto Lcollision;
}
if (!stripe->dir_valid(&dir)) {
last_collision = nullptr;
goto Lcollision;
}
// check read completed correct FIXME: remove bad vols
if (!io.ok()) {
goto Ldone;
}
{
// verify that this is our document
Doc *doc = reinterpret_cast<Doc *>(buf->data());
/* should be first_key not key..right?? */
if (doc->first_key == key) {
ink_assert(doc->magic == DOC_MAGIC);
if (stripe->directory.remove(&key, stripe, &dir) > 0) {
if (od) {
stripe->close_write(this);
}
od = nullptr;
goto Lremoved;
}
goto Ldone;
}
}
Lcollision:
// check for collision
if (stripe->directory.probe(&key, stripe, &dir, &last_collision) > 0) {
int ret = do_read_call(&key);
if (ret == EVENT_RETURN) {
goto Lread;
}
return ret;
}
Ldone:
ts::Metrics::Counter::increment(cache_rsb.status[static_cast<int>(CacheOpType::Remove)].failure);
ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.status[static_cast<int>(CacheOpType::Remove)].failure);
if (od) {
stripe->close_write(this);
}
}
ink_assert(!stripe || this_ethread() != stripe->mutex->thread_holding);
_action.continuation->handleEvent(CACHE_EVENT_REMOVE_FAILED, reinterpret_cast<void *>(-ECACHE_NO_DOC));
goto Lfree;
Lremoved:
_action.continuation->handleEvent(CACHE_EVENT_REMOVE, nullptr);
Lfree:
return free_CacheVC(this);
}
int
CacheVC::scanStripe(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
Dbg(dbg_ctl_cache_scan_truss, "%p", this);
if (_action.cancelled) {
return free_CacheVC(this);
}
ReplaceablePtr<CacheHostTable>::ScopedReader hosttable(&theCache->hosttable);
const CacheHostRecord *rec = &hosttable->gen_host_rec;
if (!hostname.empty()) {
CacheHostResult res;
hosttable->Match(hostname, &res);
if (res.record) {
rec = res.record;
}
}
if (!stripe) {
if (!rec->num_vols) {
goto Ldone;
}
stripe = rec->stripes[0];
} else {
for (int i = 0; i < rec->num_vols - 1; i++) {
if (stripe == rec->stripes[i]) {
stripe = rec->stripes[i + 1];
goto Lcont;
}
}
goto Ldone;
}
Lcont:
fragment = 0;
SET_HANDLER(&CacheVC::scanObject);
eventProcessor.schedule_in(this, HRTIME_MSECONDS(scan_msec_delay));
return EVENT_CONT;
Ldone:
_action.continuation->handleEvent(CACHE_EVENT_SCAN_DONE, nullptr);
return free_CacheVC(this);
}
int
CacheVC::scanObject(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanObject", this);
Doc *doc = nullptr;
void *result = nullptr;
int hlen = 0;
char hname[500];
bool hostinfo_copied = false;
off_t next_object_len = 0;
bool might_need_overlap_read = false;
cancel_trigger();
set_io_not_in_progress();
if (_action.cancelled) {
return free_CacheVC(this);
}
CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
Dbg(dbg_ctl_cache_scan_truss, "delay %p:scanObject", this);
mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
return EVENT_CONT;
}
if (!fragment) { // initialize for first read
fragment = 1;
scan_stripe_map = make_vol_map(stripe);
io.aiocb.aio_offset = next_in_map(stripe, scan_stripe_map, stripe->vol_offset_to_offset(0));
if (io.aiocb.aio_offset >= static_cast<off_t>(stripe->skip + stripe->len)) {
goto Lnext_vol;
}
io.aiocb.aio_nbytes = SCAN_BUF_SIZE;
io.aiocb.aio_buf = buf->data();
io.action = this;
io.thread = AIO_CALLBACK_THREAD_ANY;
Dbg(dbg_ctl_cache_scan_truss, "read %p:scanObject", this);
goto Lread;
}
if (!io.ok()) {
result = reinterpret_cast<void *>(-ECACHE_READ_FAIL);
goto Ldone;
}
doc = reinterpret_cast<Doc *>(buf->data() + offset);
// If there is data in the buffer before the start that is from a partial object read previously
// Fix things as if we read it this time.
if (scan_fix_buffer_offset) {
io.aio_result += scan_fix_buffer_offset;
io.aiocb.aio_nbytes += scan_fix_buffer_offset;
io.aiocb.aio_offset -= scan_fix_buffer_offset;
io.aiocb.aio_buf = static_cast<char *>(io.aiocb.aio_buf) - scan_fix_buffer_offset;
scan_fix_buffer_offset = 0;
}
while (static_cast<off_t>(reinterpret_cast<char *>(doc) - buf->data()) + next_object_len <
static_cast<off_t>(io.aiocb.aio_nbytes)) {
might_need_overlap_read = false;
doc = reinterpret_cast<Doc *>(reinterpret_cast<char *>(doc) + next_object_len);
next_object_len = stripe->round_to_approx_size(doc->len);
int i;
bool changed;
if (doc->magic != DOC_MAGIC) {
next_object_len = CACHE_BLOCK_SIZE;
Dbg(dbg_ctl_cache_scan_truss, "blockskip %p:scanObject", this);
continue;
}
if (doc->doc_type != CACHE_FRAG_TYPE_HTTP || !doc->hlen) {
goto Lskip;
}
last_collision = nullptr;
while (true) {
if (!stripe->directory.probe(&doc->first_key, stripe, &dir, &last_collision)) {
goto Lskip;
}
if (!stripe->dir_agg_valid(&dir) || !dir_head(&dir) ||
(stripe->vol_offset(&dir) != io.aiocb.aio_offset + (reinterpret_cast<char *>(doc) - buf->data()))) {
continue;
}
break;
}
if (doc->data() - buf->data() > static_cast<int>(io.aiocb.aio_nbytes)) {
might_need_overlap_read = true;
goto Lskip;
}
{
char *tmp = doc->hdr();
int len = doc->hlen;
while (len > 0) {
int r = HTTPInfo::unmarshal(tmp, len, buf.get());
if (r < 0) {
ink_assert(!"CacheVC::scanObject unmarshal failed");
goto Lskip;
}
len -= r;
tmp += r;
}
}
if (this->load_http_info(&vector, doc) != doc->hlen) {
goto Lskip;
}
changed = false;
hostinfo_copied = false;
for (i = 0; i < vector.count(); i++) {
if (!vector.get(i)->valid()) {
goto Lskip;
}
if (!hostinfo_copied) {
auto host{vector.get(i)->request_get()->host_get()};
hlen = static_cast<int>(host.length());
memccpy(hname, host.data(), 0, 500);
hname[hlen] = 0;
Dbg(dbg_ctl_cache_scan, "hostname = '%s', hostlen = %d", hname, hlen);
hostinfo_copied = true;
}
vector.get(i)->object_key_get(&key);
alternate_index = i;
// verify that the earliest block exists, reducing 'false hit' callbacks
if (!(key == doc->key)) {
last_collision = nullptr;
if (!stripe->directory.probe(&key, stripe, &earliest_dir, &last_collision)) {
continue;
}
}
earliest_key = key;
int result1 = _action.continuation->handleEvent(CACHE_EVENT_SCAN_OBJECT, vector.get(i));
switch (result1) {
case CACHE_SCAN_RESULT_CONTINUE:
continue;
case CACHE_SCAN_RESULT_DELETE:
changed = true;
vector.remove(i, true);
i--;
continue;
case CACHE_SCAN_RESULT_DELETE_ALL_ALTERNATES:
changed = true;
vector.clear();
i = 0;
break;
case CACHE_SCAN_RESULT_UPDATE:
ink_assert(alternate_index >= 0);
vector.insert(&alternate, alternate_index);
if (!vector.get(alternate_index)->valid()) {
continue;
}
changed = true;
continue;
case EVENT_DONE:
goto Lcancel;
default:
ink_assert(!"unexpected CACHE_SCAN_RESULT");
continue;
}
}
if (changed) {
if (!vector.count()) {
ink_assert(hostinfo_copied);
SET_HANDLER(&CacheVC::scanRemoveDone);
// force remove even if there is a writer
cacheProcessor.remove(this, &doc->first_key, CACHE_FRAG_TYPE_HTTP,
std::string_view{hname, static_cast<std::string_view::size_type>(hlen)});
return EVENT_CONT;
} else {
offset = reinterpret_cast<char *>(doc) - buf->data();
write_len = 0;
frag_type = CACHE_FRAG_TYPE_HTTP;
f.use_first_key = 1;
f.evac_vector = 1;
alternate_index = CACHE_ALT_REMOVED;
writer_lock_retry = 0;
first_key = key = doc->first_key;
earliest_key.clear();
SET_HANDLER(&CacheVC::scanOpenWrite);
return scanOpenWrite(EVENT_NONE, nullptr);
}
}
continue;
Lskip:;
}
vector.clear();
// If we had an object that went past the end of the buffer, and it is small enough to fix,
// fix it.
if (might_need_overlap_read &&
(static_cast<off_t>(reinterpret_cast<char *>(doc) - buf->data()) + next_object_len >
static_cast<off_t>(io.aiocb.aio_nbytes)) &&
next_object_len > 0) {
off_t partial_object_len = io.aiocb.aio_nbytes - (reinterpret_cast<char *>(doc) - buf->data());
// Copy partial object to beginning of the buffer.
memmove(buf->data(), reinterpret_cast<char *>(doc), partial_object_len);
io.aiocb.aio_offset += io.aiocb.aio_nbytes;
io.aiocb.aio_nbytes = SCAN_BUF_SIZE - partial_object_len;
io.aiocb.aio_buf = buf->data() + partial_object_len;
scan_fix_buffer_offset = partial_object_len;
} else { // Normal case, where we ended on a object boundary.
io.aiocb.aio_offset += (reinterpret_cast<char *>(doc) - buf->data()) + next_object_len;
Dbg(dbg_ctl_cache_scan_truss, "next %p:scanObject %" PRId64, this, (int64_t)io.aiocb.aio_offset);
io.aiocb.aio_offset = next_in_map(stripe, scan_stripe_map, io.aiocb.aio_offset);
Dbg(dbg_ctl_cache_scan_truss, "next_in_map %p:scanObject %" PRId64, this, (int64_t)io.aiocb.aio_offset);
io.aiocb.aio_nbytes = SCAN_BUF_SIZE;
io.aiocb.aio_buf = buf->data();
scan_fix_buffer_offset = 0;
}
if (io.aiocb.aio_offset >= stripe->skip + stripe->len) {
Lnext_vol:
SET_HANDLER(&CacheVC::scanStripe);
eventProcessor.schedule_in(this, HRTIME_MSECONDS(scan_msec_delay));
return EVENT_CONT;
}
Lread:
io.aiocb.aio_fildes = stripe->fd;
if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > static_cast<off_t>(stripe->skip + stripe->len)) {
io.aiocb.aio_nbytes = stripe->skip + stripe->len - io.aiocb.aio_offset;
}
offset = 0;
ink_assert(ink_aio_read(&io) >= 0);
Dbg(dbg_ctl_cache_scan_truss, "read %p:scanObject %" PRId64 " %zu", this, (int64_t)io.aiocb.aio_offset,
(size_t)io.aiocb.aio_nbytes);
return EVENT_CONT;
Ldone:
Dbg(dbg_ctl_cache_scan_truss, "done %p:scanObject", this);
_action.continuation->handleEvent(CACHE_EVENT_SCAN_DONE, result);
Lcancel:
return free_CacheVC(this);
}
int
CacheVC::scanRemoveDone(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanRemoveDone", this);
Dbg(dbg_ctl_cache_scan, "remove done.");
alternate.destroy();
SET_HANDLER(&CacheVC::scanObject);
return handleEvent(EVENT_IMMEDIATE, nullptr);
}
int
CacheVC::scanOpenWrite(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanOpenWrite", this);
cancel_trigger();
// get volume lock
if (writer_lock_retry > SCAN_WRITER_LOCK_MAX_RETRY) {
int r = _action.continuation->handleEvent(CACHE_EVENT_SCAN_OPERATION_BLOCKED, nullptr);
Dbg(dbg_ctl_cache_scan, "still haven't got the writer lock, asking user..");
switch (r) {
case CACHE_SCAN_RESULT_RETRY:
writer_lock_retry = 0;
break;
case CACHE_SCAN_RESULT_CONTINUE:
SET_HANDLER(&CacheVC::scanObject);
return scanObject(EVENT_IMMEDIATE, nullptr);
}
}
int ret = 0;
{
CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
Dbg(dbg_ctl_cache_scan, "stripe->mutex %p:scanOpenWrite", this);
VC_SCHED_LOCK_RETRY();
}
Dbg(dbg_ctl_cache_scan, "trying for writer lock");
if (stripe->open_write(this, false, 1)) {
writer_lock_retry++;
SET_HANDLER(&CacheVC::scanOpenWrite);
mutex->thread_holding->schedule_in_local(this, scan_msec_delay);
return EVENT_CONT;
}
ink_assert(this->od);
// put all the alternates in the open directory vector
int alt_count = vector.count();
for (int i = 0; i < alt_count; i++) {
write_vector->insert(vector.get(i));
}
od->writing_vec = true;
vector.clear(false);
// check that the directory entry was not overwritten
// if so return failure
Dbg(dbg_ctl_cache_scan, "got writer lock");
Dir *l = nullptr;
Dir d;
Doc *doc = reinterpret_cast<Doc *>(buf->data() + offset);
offset = reinterpret_cast<char *>(doc) - buf->data() + stripe->round_to_approx_size(doc->len);
// if the doc contains some data, then we need to create
// a new directory entry for this fragment. Remember the
// offset and the key in earliest_key
dir_assign(&od->first_dir, &dir);
if (doc->total_len) {
dir_assign(&od->single_doc_dir, &dir);
dir_set_tag(&od->single_doc_dir, doc->key.slice32(2));
od->single_doc_key = doc->key;
od->move_resident_alt = true;
}
while (true) {
if (!stripe->directory.probe(&first_key, stripe, &d, &l)) {
stripe->close_write(this);
_action.continuation->handleEvent(CACHE_EVENT_SCAN_OPERATION_FAILED, nullptr);
SET_HANDLER(&CacheVC::scanObject);
return handleEvent(EVENT_IMMEDIATE, nullptr);
}
if (memcmp(&dir, &d, SIZEOF_DIR)) {
Dbg(dbg_ctl_cache_scan, "dir entry has changed");
continue;
}
break;
}
// the document was not modified
// we are safe from now on as we hold the
// writer lock on the doc
if (f.evac_vector) {
header_len = write_vector->marshal_length();
}
SET_HANDLER(&CacheVC::scanUpdateDone);
ret = do_write_call();
}
if (ret == EVENT_RETURN) {
return handleEvent(AIO_EVENT_DONE, nullptr);
}
return ret;
}
int
CacheVC::scanUpdateDone(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
Dbg(dbg_ctl_cache_scan_truss, "inside %p:scanUpdateDone", this);
cancel_trigger();
// get volume lock
CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding);
if (lock.is_locked()) {
// insert a directory entry for the previous fragment
stripe->directory.overwrite(&first_key, stripe, &dir, &od->first_dir, false);
if (od->move_resident_alt) {
stripe->directory.insert(&od->single_doc_key, stripe, &od->single_doc_dir);
}
ink_assert(stripe->open_read(&first_key));
ink_assert(this->od);
stripe->close_write(this);
SET_HANDLER(&CacheVC::scanObject);
return handleEvent(EVENT_IMMEDIATE, nullptr);
} else {
mutex->thread_holding->schedule_in_local(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
return EVENT_CONT;
}
}
// set_http_info must be called before do_io_write
// cluster vc does an optimization where it calls do_io_write() before
// calling set_http_info(), but it guarantees that the info will
// be set before transferring any bytes
void
CacheVC::set_http_info(CacheHTTPInfo *ainfo)
{
ink_assert(!total_len);
if (f.update) {
ainfo->object_key_set(update_key);
ainfo->object_size_set(update_len);
} else {
ainfo->object_key_set(earliest_key);
// don't know the total len yet
}
MIMEField *field = ainfo->m_alt->m_response_hdr.field_find(static_cast<std::string_view>(MIME_FIELD_CONTENT_LENGTH));
if ((field && !field->value_get_int64()) || ainfo->m_alt->m_response_hdr.status_get() == HTTPStatus::NO_CONTENT) {
f.allow_empty_doc = 1;
// Set the object size here to zero in case this is a cache replace where the new object
// length is zero but the old object was not.
ainfo->object_size_set(0);
} else {
f.allow_empty_doc = 0;
}
alternate.copy_shallow(ainfo);
ainfo->clear();
}
void
CacheVC::get_http_info(CacheHTTPInfo **ainfo)
{
*ainfo = &(this)->alternate;
}
HTTPInfo::FragOffset *
CacheVC::get_frag_table()
{
ink_assert(alternate.valid());
return alternate.valid() ? alternate.get_frag_table() : nullptr;
}
bool
CacheVC::is_pread_capable()
{
return !f.read_from_writer_called;
}
bool
CacheVC::set_pin_in_cache(time_t time_pin)
{
if (total_len) {
ink_assert(!"should Pin the document before writing");
return false;
}
if (vio.op != VIO::WRITE) {
ink_assert(!"Pinning only allowed while writing objects to the cache");
return false;
}
pin_in_cache = time_pin;
return true;
}
time_t
CacheVC::get_pin_in_cache()
{
return pin_in_cache;
}