blob: fd4f87efab76e1d6b783282c3b78355b9ae0e3fa [file] [log] [blame]
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "P_Cache.h"
#include "iocore/cache/AggregateWriteBuffer.h"
#include "iocore/cache/CacheEvacuateDocVC.h"
#define UINT_WRAP_LTE(_x, _y) (((_y) - (_x)) < INT_MAX) // exploit overflow
#define UINT_WRAP_GTE(_x, _y) (((_x) - (_y)) < INT_MAX) // exploit overflow
#define UINT_WRAP_LT(_x, _y) (((_x) - (_y)) >= INT_MAX) // exploit overflow
namespace
{
DbgCtl dbg_ctl_cache_evac{"cache_evac"};
DbgCtl dbg_ctl_cache_update{"cache_update"};
DbgCtl dbg_ctl_cache_disk_error{"cache_disk_error"};
DbgCtl dbg_ctl_cache_update_alt{"cache_update_alt"};
#ifdef DEBUG
DbgCtl dbg_ctl_cache_agg{"cache_agg"};
DbgCtl dbg_ctl_cache_stats{"cache_stats"};
DbgCtl dbg_ctl_cache_write{"cache_write"};
DbgCtl dbg_ctl_cache_insert{"cache_insert"};
DbgCtl dbg_ctl_agg_read{"agg_read"};
#endif
} // end anonymous namespace
// Given a key, finds the index of the alternate which matches
// used to get the alternate which is actually present in the document
int
get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key)
{
int alt_count = cache_vector->count();
CacheHTTPInfo *obj;
if (!alt_count) {
return -1;
}
for (int i = 0; i < alt_count; i++) {
obj = cache_vector->get(i);
if (obj->compare_object_key(&key)) {
return i;
}
}
return -1;
}
// Adds/Deletes alternate to the od->vector (write_vector). If the vector
// is empty, deletes the directory entry pointing to the vector. Each
// CacheVC must write the vector down to disk after making changes. If we
// wait till the last writer, that writer will have the responsibility of
// of writing the vector even if the http state machine aborts. This
// makes it easier to handle situations where writers abort.
int
CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
cancel_trigger();
if (od->reading_vec || od->writing_vec) {
VC_SCHED_LOCK_RETRY();
}
int ret = 0;
{
CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding);
if (!lock.is_locked() || od->writing_vec) {
VC_SCHED_LOCK_RETRY();
}
int vec = alternate.valid();
if (f.update) {
// all Update cases. Need to get the alternate index.
alternate_index = get_alternate_index(write_vector, update_key);
Dbg(dbg_ctl_cache_update, "updating alternate index %d frags %d", alternate_index,
alternate_index >= 0 ? write_vector->get(alternate_index)->get_frag_offset_count() : -1);
// if its an alternate delete
if (!vec) {
ink_assert(!total_len);
if (alternate_index >= 0) {
write_vector->remove(alternate_index, true);
alternate_index = CACHE_ALT_REMOVED;
if (!write_vector->count()) {
dir_delete(&first_key, stripe, &od->first_dir);
}
}
// the alternate is not there any more. somebody might have
// deleted it. Just close this writer
if (alternate_index != CACHE_ALT_REMOVED || !write_vector->count()) {
SET_HANDLER(&CacheVC::openWriteCloseDir);
return openWriteCloseDir(EVENT_IMMEDIATE, nullptr);
}
}
if (update_key == od->single_doc_key && (total_len || f.allow_empty_doc || !vec)) {
od->move_resident_alt = false;
}
}
if (cache_config_http_max_alts > 1 && write_vector->count() >= cache_config_http_max_alts && alternate_index < 0) {
if (od->move_resident_alt && get_alternate_index(write_vector, od->single_doc_key) == 0) {
od->move_resident_alt = false;
}
if (cache_config_log_alternate_eviction) {
// Initially there was an attempt to make alternate eviction a log
// field. However it was discovered this could not work because this
// code, in which alternates are evicted, happens during the processing
// of IO which happens after transaction logs are emitted and after the
// HttpSM is destructed. Instead, therefore, alternate eviction logging
// was implemented for diags.log with the
// proxy.config.cache.log.alternate.eviction toggle.
CacheHTTPInfo *info = write_vector->get(0);
HTTPHdr *request = info->request_get();
if (request->valid()) {
// Marking the request's target as dirty will guarantee that the
// internal members of the request used for printing the URL will be
// coherent and valid by the time it is printed.
request->mark_target_dirty();
// In contrast to url_string_get, this url_print interface doesn't
// use HTTPHdr's m_heap which is not valid at this point because the
// HttpSM is most likely gone.
int url_length = request->url_printed_length();
ats_scoped_mem<char> url_text;
url_text = static_cast<char *>(ats_malloc(url_length + 1));
int index = 0;
int offset = 0;
// url_print does not NULL terminate, so url_length instead of url_length + 1.
int ret = request->url_print(url_text.get(), url_length, &index, &offset);
url_text.get()[url_length] = '\0';
if (ret == 0) {
Note("Could not print URL of evicted alternate.");
} else {
Status("The maximum number of alternates was exceeded for a resource. "
"An alternate was evicted for URL: %.*s",
url_length, url_text.get());
}
}
}
write_vector->remove(0, true);
}
if (vec) {
/* preserve fragment offset data from old info. This method is
called iff the update is a header only update so the fragment
data should remain valid.
*/
// If we are not in header only updating case. Don't copy fragments.
if (alternate_index >= 0 &&
((total_len == 0 && alternate.get_frag_offset_count() == 0) && !(f.allow_empty_doc && this->vio.nbytes == 0))) {
alternate.copy_frag_offsets_from(write_vector->get(alternate_index));
}
alternate_index = write_vector->insert(&alternate, alternate_index);
}
if (od->move_resident_alt && first_buf.get() && !od->has_multiple_writers()) {
Doc *doc = reinterpret_cast<Doc *>(first_buf->data());
int small_doc = static_cast<int64_t>(doc->data_len()) < static_cast<int64_t>(cache_config_alt_rewrite_max_size);
int have_res_alt = doc->key == od->single_doc_key;
// if the new alternate is not written with the vector
// then move the old one with the vector
// if its a header only update move the resident alternate
// with the vector.
// We are sure that the body of the resident alternate that we are
// rewriting has not changed and the alternate is not being deleted,
// since we set od->move_resident_alt to 0 in that case
// (in updateVector)
if (small_doc && have_res_alt && (fragment || (f.update && !total_len))) {
// for multiple fragment document, we must have done
// CacheVC:openWriteCloseDataDone
ink_assert(!fragment || f.data_done);
od->move_resident_alt = false;
f.rewrite_resident_alt = 1;
write_len = doc->data_len();
Dbg(dbg_ctl_cache_update_alt, "rewriting resident alt size: %d key: %X, first_key: %X", write_len, doc->key.slice32(0),
first_key.slice32(0));
}
}
header_len = write_vector->marshal_length();
od->writing_vec = true;
f.use_first_key = 1;
SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
ret = do_write_call();
}
if (ret == EVENT_RETURN) {
return handleEvent(AIO_EVENT_DONE, nullptr);
}
return ret;
}
/*
The following fields of the CacheVC are used when writing down a fragment.
Make sure that each of the fields is set to a valid value before calling
this function
- frag_type. Checked to see if a vector needs to be marshalled.
- f.use_first_key. To decide if the vector should be marshalled and to set
the doc->key to the appropriate key (first_key or earliest_key)
- f.evac_vector. If set, the writer is pushed in the beginning of the
agg queue. And if !f.evac_vector && !f.update the alternate->object_size
is set to vc->total_len
- f.readers. If set, assumes that this is an evacuation, so the write
is not aborted even if vol->get_agg_todo_size() > agg_write_backlog
- f.evacuator. If this is an evacuation.
- f.rewrite_resident_alt. The resident alternate is rewritten.
- f.update. Used only if the write_vector needs to be written to disk.
Used to set the length of the alternate to total_len.
- write_vector. Used only if frag_type == CACHE_FRAG_TYPE_HTTP &&
(f.use_first_key || f.evac_vector) is set. Write_vector is written to disk
- alternate_index. Used only if write_vector needs to be written to disk.
Used to find out the VC's alternate in the write_vector and set its
length to tatal_len.
- write_len. The number of bytes for this fragment.
- total_len. The total number of bytes for the document so far.
Doc->total_len and alternate's total len is set to this value.
- first_key. Doc's first_key is set to this value.
- pin_in_cache. Doc's pinned value is set to this + ink_get_hrtime().
- earliest_key. If f.use_first_key, Doc's key is set to this value.
- key. If !f.use_first_key, Doc's key is set to this value.
- blocks. Used only if write_len is set. Data to be written
- offset. Used only if write_len is set. offset into the block to copy
the data from.
- buf. Used only if f.evacuator is set. Should point to the old document.
The functions sets the length, offset, pinned, head and phase of vc->dir.
*/
int
CacheVC::handleWrite(int event, Event * /* e ATS_UNUSED */)
{
// plain write case
ink_assert(!trigger);
frag_len = 0;
set_agg_write_in_progress();
POP_HANDLER;
bool max_doc_error = (cache_config_max_doc_size && (cache_config_max_doc_size < vio.ndone ||
(vio.nbytes != INT64_MAX && (cache_config_max_doc_size < vio.nbytes))));
// Make sure the size is correct for checking error conditions before calling add_writer(this).
agg_len = stripe->round_to_approx_size(write_len + header_len + frag_len + sizeof(Doc));
if (max_doc_error || !stripe->add_writer(this)) {
Metrics::Counter::increment(cache_rsb.write_backlog_failure);
Metrics::Counter::increment(stripe->cache_vol->vol_rsb.write_backlog_failure);
Metrics::Counter::increment(cache_rsb.status[op_type].failure);
Metrics::Counter::increment(stripe->cache_vol->vol_rsb.status[op_type].failure);
io.aio_result = AIO_SOFT_FAILURE;
if (event == EVENT_CALL) {
return EVENT_RETURN;
}
return handleEvent(AIO_EVENT_DONE, nullptr);
}
if (!stripe->is_io_in_progress()) {
return stripe->aggWrite(event, this);
}
return EVENT_CONT;
}
static char *
iobufferblock_memcpy(char *p, int len, IOBufferBlock *ab, int offset)
{
IOBufferBlock *b = ab;
while (b && len >= 0) {
char *start = b->_start;
char *end = b->_end;
int max_bytes = end - start;
max_bytes -= offset;
if (max_bytes <= 0) {
offset = -max_bytes;
b = b->next.get();
continue;
}
int bytes = len;
if (bytes >= max_bytes) {
bytes = max_bytes;
}
::memcpy(p, start + offset, bytes);
p += bytes;
len -= bytes;
b = b->next.get();
offset = 0;
}
return p;
}
EvacuationBlock *
Stripe::force_evacuate_head(Dir const *evac_dir, int pinned)
{
auto bucket = dir_evac_bucket(evac_dir);
if (!evac_bucket_valid(bucket)) {
DDbg(dbg_ctl_cache_evac, "dir_evac_bucket out of bounds, skipping evacuate: %" PRId64 "(%d), %d, %d", bucket, evacuate_size,
(int)dir_offset(evac_dir), (int)dir_phase(evac_dir));
return nullptr;
}
// build an evacuation block for the object
EvacuationBlock *b = evacuation_block_exists(evac_dir, this);
// if we have already started evacuating this document, its too late
// to evacuate the head...bad luck
if (b && b->f.done) {
return b;
}
if (!b) {
b = new_EvacuationBlock(mutex->thread_holding);
b->dir = *evac_dir;
DDbg(dbg_ctl_cache_evac, "force: %d, %d", (int)dir_offset(evac_dir), (int)dir_phase(evac_dir));
evacuate[bucket].push(b);
}
b->f.pinned = pinned;
b->f.evacuate_head = 1;
b->evac_frags.key.clear(); // ensure that the block gets evacuated no matter what
b->readers = 0; // ensure that the block does not disappear
return b;
}
void
Stripe::scan_for_pinned_documents()
{
if (cache_config_permit_pinning) {
// we can't evacuate anything between header->write_pos and
// header->write_pos + AGG_SIZE.
int ps = this->offset_to_vol_offset(header->write_pos + AGG_SIZE);
int pe = this->offset_to_vol_offset(header->write_pos + 2 * EVACUATION_SIZE + (len / PIN_SCAN_EVERY));
int vol_end_offset = this->offset_to_vol_offset(len + skip);
int before_end_of_vol = pe < vol_end_offset;
DDbg(dbg_ctl_cache_evac, "scan %d %d", ps, pe);
for (int i = 0; i < this->direntries(); i++) {
// is it a valid pinned object?
if (!dir_is_empty(&dir[i]) && dir_pinned(&dir[i]) && dir_head(&dir[i])) {
// select objects only within this PIN_SCAN region
int o = dir_offset(&dir[i]);
if (dir_phase(&dir[i]) == header->phase) {
if (before_end_of_vol || o >= (pe - vol_end_offset)) {
continue;
}
} else {
if (o < ps || o >= pe) {
continue;
}
}
force_evacuate_head(&dir[i], 1);
}
}
}
}
/* NOTE:: This state can be called by an AIO thread, so DON'T DON'T
DON'T schedule any events on this thread using VC_SCHED_XXX or
mutex->thread_holding->schedule_xxx_local(). ALWAYS use
eventProcessor.schedule_xxx().
*/
int
Stripe::aggWriteDone(int event, Event *e)
{
cancel_trigger();
// ensure we have the cacheDirSync lock if we intend to call it later
// retaking the current mutex recursively is a NOOP
CACHE_TRY_LOCK(lock, dir_sync_waiting ? cacheDirSync->mutex : mutex, mutex->thread_holding);
if (!lock.is_locked()) {
eventProcessor.schedule_in(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay));
return EVENT_CONT;
}
if (io.ok()) {
header->last_write_pos = header->write_pos;
header->write_pos += io.aiocb.aio_nbytes;
ink_assert(header->write_pos >= start);
DDbg(dbg_ctl_cache_agg, "Dir %s, Write: %" PRIu64 ", last Write: %" PRIu64 "", hash_text.get(), header->write_pos,
header->last_write_pos);
ink_assert(header->write_pos == header->agg_pos);
if (header->write_pos + EVACUATION_SIZE > scan_pos) {
periodic_scan();
}
this->_write_buffer.reset_buffer_pos();
header->write_serial++;
} else {
// delete all the directory entries that we inserted
// for fragments is this aggregation buffer
Dbg(dbg_ctl_cache_disk_error, "Write error on disk %s\n \
write range : [%" PRIu64 " - %" PRIu64 " bytes] [%" PRIu64 " - %" PRIu64 " blocks] \n",
hash_text.get(), (uint64_t)io.aiocb.aio_offset, (uint64_t)io.aiocb.aio_offset + io.aiocb.aio_nbytes,
(uint64_t)io.aiocb.aio_offset / CACHE_BLOCK_SIZE, (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / CACHE_BLOCK_SIZE);
Dir del_dir;
dir_clear(&del_dir);
for (int done = 0; done < this->_write_buffer.get_buffer_pos();) {
Doc *doc = reinterpret_cast<Doc *>(this->_write_buffer.get_buffer() + done);
dir_set_offset(&del_dir, header->write_pos + done);
dir_delete(&doc->key, this, &del_dir);
done += round_to_approx_size(doc->len);
}
this->_write_buffer.reset_buffer_pos();
}
set_io_not_in_progress();
// callback ready sync CacheVCs
CacheVC *c = nullptr;
while ((c = sync.dequeue())) {
if (UINT_WRAP_LTE(c->write_serial + 2, header->write_serial)) {
eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
} else {
sync.push(c); // put it back on the front
break;
}
}
if (dir_sync_waiting) {
dir_sync_waiting = false;
cacheDirSync->handleEvent(EVENT_IMMEDIATE, nullptr);
}
if (this->_write_buffer.get_pending_writers().head || sync.head) {
return aggWrite(event, e);
}
return EVENT_CONT;
}
CacheEvacuateDocVC *
new_DocEvacuator(int nbytes, Stripe *stripe)
{
CacheEvacuateDocVC *c = new_CacheEvacuateDocVC(stripe);
c->op_type = static_cast<int>(CacheOpType::Evacuate);
Metrics::Gauge::increment(cache_rsb.status[c->op_type].active);
Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[c->op_type].active);
c->buf = new_IOBufferData(iobuffer_size_to_index(nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED);
c->stripe = stripe;
c->f.evacuator = 1;
c->earliest_key.clear();
SET_CONTINUATION_HANDLER(c, &CacheEvacuateDocVC::evacuateDocDone);
return c;
}
static int
evacuate_fragments(CacheKey *key, CacheKey *earliest_key, int force, Stripe *stripe)
{
Dir dir, *last_collision = nullptr;
int i = 0;
while (dir_probe(key, stripe, &dir, &last_collision)) {
// next fragment cannot be a head...if it is, it must have been a
// directory collision.
if (dir_head(&dir)) {
continue;
}
EvacuationBlock *b = evacuation_block_exists(&dir, stripe);
if (!b) {
b = new_EvacuationBlock(stripe->mutex->thread_holding);
b->dir = dir;
b->evac_frags.key = *key;
b->evac_frags.earliest_key = *earliest_key;
stripe->evacuate[dir_evac_bucket(&dir)].push(b);
i++;
} else {
ink_assert(dir_offset(&dir) == dir_offset(&b->dir));
ink_assert(dir_phase(&dir) == dir_phase(&b->dir));
EvacuationKey *evac_frag = evacuationKeyAllocator.alloc();
evac_frag->key = *key;
evac_frag->earliest_key = *earliest_key;
evac_frag->link.next = b->evac_frags.link.next;
b->evac_frags.link.next = evac_frag;
}
if (force) {
b->readers = 0;
}
DDbg(dbg_ctl_cache_evac, "next fragment %X Earliest: %X offset %d phase %d force %d", (int)key->slice32(0),
(int)earliest_key->slice32(0), (int)dir_offset(&dir), (int)dir_phase(&dir), force);
}
return i;
}
int
Stripe::evacuateWrite(CacheEvacuateDocVC *evacuator, int event, Event *e)
{
// push to front of aggregation write list, so it is written first
evacuator->agg_len = round_to_approx_size((reinterpret_cast<Doc *>(evacuator->buf->data()))->len);
this->_write_buffer.add_bytes_pending_aggregation(evacuator->agg_len);
/* insert the evacuator after all the other evacuators */
CacheVC *cur = static_cast<CacheVC *>(this->_write_buffer.get_pending_writers().head);
CacheVC *after = nullptr;
for (; cur && cur->f.evacuator; cur = (CacheVC *)cur->link.next) {
after = cur;
}
ink_assert(evacuator->agg_len <= AGG_SIZE);
this->_write_buffer.get_pending_writers().insert(evacuator, after);
return aggWrite(event, e);
}
int
Stripe::evacuateDocReadDone(int event, Event *e)
{
cancel_trigger();
if (event != AIO_EVENT_DONE) {
return EVENT_DONE;
}
ink_assert(is_io_in_progress());
set_io_not_in_progress();
ink_assert(mutex->thread_holding == this_ethread());
Doc *doc = reinterpret_cast<Doc *>(doc_evacuator->buf->data());
CacheKey next_key;
EvacuationBlock *b = nullptr;
auto bucket = dir_evac_bucket(&doc_evacuator->overwrite_dir);
if (doc->magic != DOC_MAGIC) {
Dbg(dbg_ctl_cache_evac, "DOC magic: %X %d", (int)dir_tag(&doc_evacuator->overwrite_dir),
(int)dir_offset(&doc_evacuator->overwrite_dir));
ink_assert(doc->magic == DOC_MAGIC);
goto Ldone;
}
DDbg(dbg_ctl_cache_evac, "evacuateDocReadDone %X offset %d", (int)doc->key.slice32(0),
(int)dir_offset(&doc_evacuator->overwrite_dir));
if (evac_bucket_valid(bucket)) {
b = evacuate[bucket].head;
}
while (b) {
if (dir_offset(&b->dir) == dir_offset(&doc_evacuator->overwrite_dir)) {
break;
}
b = b->link.next;
}
if (!b) {
goto Ldone;
}
// coverity[Y2K38_SAFETY:FALSE]
if ((b->f.pinned && !b->readers) && doc->pinned < static_cast<uint32_t>(ink_get_hrtime() / HRTIME_SECOND)) {
goto Ldone;
}
if (dir_head(&b->dir) && b->f.evacuate_head) {
ink_assert(!b->evac_frags.key.fold());
// if its a head (vector), evacuation is real simple...we just
// need to write this vector down and overwrite the directory entry.
if (dir_compare_tag(&b->dir, &doc->first_key)) {
doc_evacuator->key = doc->first_key;
b->evac_frags.key = doc->first_key;
DDbg(dbg_ctl_cache_evac, "evacuating vector %X offset %d", (int)doc->first_key.slice32(0),
(int)dir_offset(&doc_evacuator->overwrite_dir));
b->f.unused = 57;
} else {
// if its an earliest fragment (alternate) evacuation, things get
// a little tricky. We have to propagate the earliest key to the next
// fragments for this alternate. The last fragment to be evacuated
// fixes up the lookaside buffer.
doc_evacuator->key = doc->key;
doc_evacuator->earliest_key = doc->key;
b->evac_frags.key = doc->key;
b->evac_frags.earliest_key = doc->key;
b->earliest_evacuator = doc_evacuator;
DDbg(dbg_ctl_cache_evac, "evacuating earliest %X %X evac: %p offset: %d", (int)b->evac_frags.key.slice32(0),
(int)doc->key.slice32(0), doc_evacuator, (int)dir_offset(&doc_evacuator->overwrite_dir));
b->f.unused = 67;
}
} else {
// find which key matches the document
EvacuationKey *ek = &b->evac_frags;
for (; ek && !(ek->key == doc->key); ek = ek->link.next) {
;
}
if (!ek) {
b->f.unused = 77;
goto Ldone;
}
doc_evacuator->key = ek->key;
doc_evacuator->earliest_key = ek->earliest_key;
DDbg(dbg_ctl_cache_evac, "evacuateDocReadDone key: %X earliest: %X", (int)ek->key.slice32(0), (int)ek->earliest_key.slice32(0));
b->f.unused = 87;
}
// if the tag in the c->dir does match the first_key in the
// document, then it has to be the earliest fragment. We guarantee that
// the first_key and the earliest_key will never collide (see
// Cache::open_write).
if (!dir_head(&b->dir) || !dir_compare_tag(&b->dir, &doc->first_key)) {
next_CacheKey(&next_key, &doc->key);
evacuate_fragments(&next_key, &doc_evacuator->earliest_key, !b->readers, this);
}
return evacuateWrite(doc_evacuator, event, e);
Ldone:
free_CacheEvacuateDocVC(doc_evacuator);
doc_evacuator = nullptr;
return aggWrite(event, e);
}
int
Stripe::evac_range(off_t low, off_t high, int evac_phase)
{
off_t s = this->offset_to_vol_offset(low);
off_t e = this->offset_to_vol_offset(high);
int si = dir_offset_evac_bucket(s);
int ei = dir_offset_evac_bucket(e);
for (int i = si; i <= ei; i++) {
EvacuationBlock *b = evacuate[i].head;
EvacuationBlock *first = nullptr;
int64_t first_offset = INT64_MAX;
for (; b; b = b->link.next) {
int64_t offset = dir_offset(&b->dir);
int phase = dir_phase(&b->dir);
if (offset >= s && offset < e && !b->f.done && phase == evac_phase) {
if (offset < first_offset) {
first = b;
first_offset = offset;
}
}
}
if (first) {
first->f.done = 1;
io.aiocb.aio_fildes = fd;
io.aiocb.aio_nbytes = dir_approx_size(&first->dir);
io.aiocb.aio_offset = this->vol_offset(&first->dir);
if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > static_cast<off_t>(skip + len)) {
io.aiocb.aio_nbytes = skip + len - io.aiocb.aio_offset;
}
doc_evacuator = new_DocEvacuator(io.aiocb.aio_nbytes, this);
doc_evacuator->overwrite_dir = first->dir;
io.aiocb.aio_buf = doc_evacuator->buf->data();
io.action = this;
io.thread = AIO_CALLBACK_THREAD_ANY;
DDbg(dbg_ctl_cache_evac, "evac_range evacuating %X %d", (int)dir_tag(&first->dir), (int)dir_offset(&first->dir));
SET_HANDLER(&Stripe::evacuateDocReadDone);
ink_assert(ink_aio_read(&io) >= 0);
return -1;
}
}
return 0;
}
static int
agg_copy(char *p, CacheVC *vc)
{
Stripe *stripe = vc->stripe;
off_t o = stripe->header->write_pos + stripe->get_agg_buf_pos();
if (!vc->f.evacuator) {
Doc *doc = reinterpret_cast<Doc *>(p);
IOBufferBlock *res_alt_blk = nullptr;
uint32_t len = vc->write_len + vc->header_len + vc->frag_len + sizeof(Doc);
ink_assert(vc->frag_type != CACHE_FRAG_TYPE_HTTP || len != sizeof(Doc));
ink_assert(stripe->round_to_approx_size(len) == vc->agg_len);
// update copy of directory entry for this document
dir_set_approx_size(&vc->dir, vc->agg_len);
dir_set_offset(&vc->dir, stripe->offset_to_vol_offset(o));
ink_assert(stripe->vol_offset(&vc->dir) < (stripe->skip + stripe->len));
dir_set_phase(&vc->dir, stripe->header->phase);
// fill in document header
doc->magic = DOC_MAGIC;
doc->len = len;
doc->hlen = vc->header_len;
doc->doc_type = vc->frag_type;
doc->v_major = CACHE_DB_MAJOR_VERSION;
doc->v_minor = CACHE_DB_MINOR_VERSION;
doc->unused = 0; // force this for forward compatibility.
doc->total_len = vc->total_len;
doc->first_key = vc->first_key;
doc->sync_serial = stripe->header->sync_serial;
vc->write_serial = doc->write_serial = stripe->header->write_serial;
doc->checksum = DOC_NO_CHECKSUM;
if (vc->pin_in_cache) {
dir_set_pinned(&vc->dir, 1);
// coverity[Y2K38_SAFETY:FALSE]
doc->pinned = static_cast<uint32_t>(ink_get_hrtime() / HRTIME_SECOND) + vc->pin_in_cache;
} else {
dir_set_pinned(&vc->dir, 0);
doc->pinned = 0;
}
if (vc->f.use_first_key) {
if (doc->data_len() || vc->f.allow_empty_doc) {
doc->key = vc->earliest_key;
} else { // the vector is being written by itself
if (vc->earliest_key.is_zero()) {
do {
rand_CacheKey(&doc->key);
} while (DIR_MASK_TAG(doc->key.slice32(2)) == DIR_MASK_TAG(vc->first_key.slice32(2)));
} else {
prev_CacheKey(&doc->key, &vc->earliest_key);
}
}
dir_set_head(&vc->dir, true);
} else {
doc->key = vc->key;
dir_set_head(&vc->dir, !vc->fragment);
}
if (vc->f.rewrite_resident_alt) {
ink_assert(vc->f.use_first_key);
Doc *res_doc = reinterpret_cast<Doc *>(vc->first_buf->data());
res_alt_blk = new_IOBufferBlock(vc->first_buf, res_doc->data_len(), sizeof(Doc) + res_doc->hlen);
doc->key = res_doc->key;
doc->total_len = res_doc->data_len();
}
// update the new_info object_key, and total_len and dirinfo
if (vc->header_len) {
ink_assert(vc->f.use_first_key);
if (vc->frag_type == CACHE_FRAG_TYPE_HTTP) {
ink_assert(vc->write_vector->count() > 0);
if (!vc->f.update && !vc->f.evac_vector) {
ink_assert(!(vc->first_key.is_zero()));
CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
http_info->object_size_set(vc->total_len);
}
// update + data_written => Update case (b)
// need to change the old alternate's object length
if (vc->f.update && vc->total_len) {
CacheHTTPInfo *http_info = vc->write_vector->get(vc->alternate_index);
http_info->object_size_set(vc->total_len);
}
ink_assert(!(((uintptr_t)&doc->hdr()[0]) & HDR_PTR_ALIGNMENT_MASK));
ink_assert(vc->header_len == vc->write_vector->marshal(doc->hdr(), vc->header_len));
} else {
memcpy(doc->hdr(), vc->header_to_write, vc->header_len);
}
// the single fragment flag is not used in the write call.
// putting it in for completeness.
vc->f.single_fragment = doc->single_fragment();
}
// move data
if (vc->write_len) {
{
ProxyMutex *mutex ATS_UNUSED = vc->stripe->mutex.get();
ink_assert(mutex->thread_holding == this_ethread());
#ifdef DEBUG
Metrics::Counter::increment(cache_rsb.write_bytes, vc->write_len);
Metrics::Counter::increment(stripe->cache_vol->vol_rsb.write_bytes, vc->write_len);
#endif
}
if (vc->f.rewrite_resident_alt) {
iobufferblock_memcpy(doc->data(), vc->write_len, res_alt_blk, 0);
} else {
iobufferblock_memcpy(doc->data(), vc->write_len, vc->blocks.get(), vc->offset);
}
#ifdef VERIFY_JTEST_DATA
if (f.use_first_key && header_len) {
int ib = 0, xd = 0;
char xx[500];
new_info.request_get().url_get().print(xx, 500, &ib, &xd);
char *x = xx;
for (int q = 0; q < 3; q++)
x = strchr(x + 1, '/');
ink_assert(!memcmp(doc->hdr(), x, ib - (x - xx)));
}
#endif
}
if (cache_config_enable_checksum) {
doc->checksum = 0;
for (char *b = doc->hdr(); b < reinterpret_cast<char *>(doc) + doc->len; b++) {
doc->checksum += *b;
}
}
if (vc->frag_type == CACHE_FRAG_TYPE_HTTP && vc->f.single_fragment) {
ink_assert(doc->hlen);
}
if (res_alt_blk) {
res_alt_blk->free();
}
return vc->agg_len;
} else {
// for evacuated documents, copy the data, and update directory
Doc *doc = reinterpret_cast<Doc *>(vc->buf->data());
int l = vc->stripe->round_to_approx_size(doc->len);
#ifdef DEBUG
Metrics::Counter::increment(cache_rsb.gc_frags_evacuated);
Metrics::Counter::increment(stripe->cache_vol->vol_rsb.gc_frags_evacuated);
#endif
doc->sync_serial = vc->stripe->header->sync_serial;
doc->write_serial = vc->stripe->header->write_serial;
memcpy(p, doc, doc->len);
vc->dir = vc->overwrite_dir;
dir_set_offset(&vc->dir, vc->stripe->offset_to_vol_offset(o));
dir_set_phase(&vc->dir, vc->stripe->header->phase);
return l;
}
}
inline void
Stripe::evacuate_cleanup_blocks(int i)
{
EvacuationBlock *b = evac_bucket_valid(i) ? evacuate[i].head : nullptr;
while (b) {
if (b->f.done && ((header->phase != dir_phase(&b->dir) && header->write_pos > this->vol_offset(&b->dir)) ||
(header->phase == dir_phase(&b->dir) && header->write_pos <= this->vol_offset(&b->dir)))) {
EvacuationBlock *x = b;
DDbg(dbg_ctl_cache_evac, "evacuate cleanup free %X offset %d", (int)b->evac_frags.key.slice32(0), (int)dir_offset(&b->dir));
b = b->link.next;
evacuate[i].remove(x);
free_EvacuationBlock(x, mutex->thread_holding);
continue;
}
b = b->link.next;
}
}
void
Stripe::evacuate_cleanup()
{
int64_t eo = ((header->write_pos - start) / CACHE_BLOCK_SIZE) + 1;
int64_t e = dir_offset_evac_bucket(eo);
int64_t sx = e - (evacuate_size / PIN_SCAN_EVERY) - 1;
int64_t s = sx;
int i;
if (e > evacuate_size) {
e = evacuate_size;
}
if (sx < 0) {
s = 0;
}
for (i = s; i < e; i++) {
evacuate_cleanup_blocks(i);
}
// if we have wrapped, handle the end bit
if (sx <= 0) {
s = evacuate_size + sx - 2;
if (s < 0) {
s = 0;
}
for (i = s; i < evacuate_size; i++) {
evacuate_cleanup_blocks(i);
}
}
}
void
Stripe::periodic_scan()
{
evacuate_cleanup();
scan_for_pinned_documents();
if (header->write_pos == start) {
scan_pos = start;
}
scan_pos += len / PIN_SCAN_EVERY;
}
void
Stripe::agg_wrap()
{
header->write_pos = start;
header->phase = !header->phase;
header->cycle++;
header->agg_pos = header->write_pos;
dir_lookaside_cleanup(this);
dir_clean_vol(this);
{
Stripe *stripe = this;
Metrics::Counter::increment(cache_rsb.directory_wrap);
Metrics::Counter::increment(stripe->cache_vol->vol_rsb.directory_wrap);
Note("Cache volume %d on disk '%s' wraps around", stripe->cache_vol->vol_number, stripe->hash_text.get());
}
periodic_scan();
}
/* NOTE: This state can be called by an AIO thread, so DON'T DON'T
DON'T schedule any events on this thread using VC_SCHED_XXX or
mutex->thread_holding->schedule_xxx_local(). ALWAYS use
eventProcessor.schedule_xxx().
Also, make sure that any functions called by this also use
the eventProcessor to schedule events
*/
int
Stripe::aggWrite(int event, void * /* e ATS_UNUSED */)
{
ink_assert(!is_io_in_progress());
Que(CacheVC, link) tocall;
CacheVC *c;
cancel_trigger();
Lagain:
this->aggregate_pending_writes(tocall);
// if we got nothing...
if (this->_write_buffer.is_empty()) {
if (!this->_write_buffer.get_pending_writers().head && !sync.head) { // nothing to get
return EVENT_CONT;
}
if (header->write_pos == start) {
// write aggregation too long, bad bad, punt on everything.
Note("write aggregation exceeds vol size");
ink_assert(!tocall.head);
ink_assert(false);
while ((c = this->get_pending_writers().dequeue())) {
this->_write_buffer.add_bytes_pending_aggregation(-c->agg_len);
eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
}
return EVENT_CONT;
}
// start back
if (this->get_pending_writers().head) {
agg_wrap();
goto Lagain;
}
}
// evacuate space
off_t end = header->write_pos + this->_write_buffer.get_buffer_pos() + EVACUATION_SIZE;
if (evac_range(header->write_pos, end, !header->phase) < 0) {
goto Lwait;
}
if (end > skip + len) {
if (evac_range(start, start + (end - (skip + len)), header->phase) < 0) {
goto Lwait;
}
}
// if write_buffer.get_pending_writers.head, then we are near the end of the disk, so
// write down the aggregation in whatever size it is.
if (this->_write_buffer.get_buffer_pos() < AGG_HIGH_WATER && !this->_write_buffer.get_pending_writers().head && !sync.head &&
!dir_sync_waiting) {
goto Lwait;
}
// write sync marker
if (this->_write_buffer.is_empty()) {
ink_assert(sync.head);
int l = round_to_approx_size(sizeof(Doc));
this->_write_buffer.seek(l);
Doc *d = reinterpret_cast<Doc *>(this->_write_buffer.get_buffer());
memset(static_cast<void *>(d), 0, sizeof(Doc));
d->magic = DOC_MAGIC;
d->len = l;
d->sync_serial = header->sync_serial;
d->write_serial = header->write_serial;
}
// set write limit
header->agg_pos = header->write_pos + this->_write_buffer.get_buffer_pos();
io.aiocb.aio_fildes = fd;
io.aiocb.aio_offset = header->write_pos;
io.aiocb.aio_buf = this->_write_buffer.get_buffer();
io.aiocb.aio_nbytes = this->_write_buffer.get_buffer_pos();
io.action = this;
/*
Callback on AIO thread so that we can issue a new write ASAP
as all writes are serialized in the volume. This is not necessary
for reads proceed independently.
*/
io.thread = AIO_CALLBACK_THREAD_AIO;
SET_HANDLER(&Stripe::aggWriteDone);
ink_aio_write(&io);
Lwait:
int ret = EVENT_CONT;
while ((c = tocall.dequeue())) {
if (event == EVENT_CALL && c->mutex->thread_holding == mutex->thread_holding) {
ret = EVENT_RETURN;
} else {
eventProcessor.schedule_imm(c, ET_CALL, AIO_EVENT_DONE);
}
}
return ret;
}
void
Stripe::aggregate_pending_writes(Queue<CacheVC, Continuation::Link_link> &tocall)
{
for (auto *c = static_cast<CacheVC *>(this->_write_buffer.get_pending_writers().head); c;) {
int writelen = c->agg_len;
// [amc] this is checked multiple places, on here was it strictly less.
ink_assert(writelen <= AGG_SIZE);
if (this->_write_buffer.get_buffer_pos() + writelen > AGG_SIZE ||
this->header->write_pos + this->_write_buffer.get_buffer_pos() + writelen > (this->skip + this->len)) {
break;
}
DDbg(dbg_ctl_agg_read, "copying: %d, %" PRIu64 ", key: %d", this->_write_buffer.get_buffer_pos(),
this->header->write_pos + this->_write_buffer.get_buffer_pos(), c->first_key.slice32(0));
int wrotelen = agg_copy(this->_write_buffer.get_buffer() + this->_write_buffer.get_buffer_pos(), c);
ink_assert(writelen == wrotelen);
this->_write_buffer.add_bytes_pending_aggregation(-writelen);
this->_write_buffer.add_buffer_pos(writelen);
CacheVC *n = (CacheVC *)c->link.next;
this->_write_buffer.get_pending_writers().dequeue();
if (c->f.sync && c->f.use_first_key) {
CacheVC *last = this->sync.tail;
while (last && UINT_WRAP_LT(c->write_serial, last->write_serial)) {
last = (CacheVC *)last->link.prev;
}
this->sync.insert(c, last);
} else if (c->f.evacuator) {
c->handleEvent(AIO_EVENT_DONE, nullptr);
} else {
tocall.enqueue(c);
}
c = n;
}
}
int
CacheVC::openWriteCloseDir(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
cancel_trigger();
{
CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
SET_HANDLER(&CacheVC::openWriteCloseDir);
ink_assert(!is_io_in_progress());
VC_SCHED_LOCK_RETRY();
}
stripe->close_write(this);
if (closed < 0 && fragment) {
dir_delete(&earliest_key, stripe, &earliest_dir);
}
}
if (dbg_ctl_cache_update.on()) {
if (f.update && closed > 0) {
if (!total_len && !f.allow_empty_doc && alternate_index != CACHE_ALT_REMOVED) {
Dbg(dbg_ctl_cache_update, "header only %d (%" PRIu64 ", %" PRIu64 ")", DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0],
update_key.b[1]);
} else if ((total_len || f.allow_empty_doc) && alternate_index != CACHE_ALT_REMOVED) {
Dbg(dbg_ctl_cache_update, "header body, %d, (%" PRIu64 ", %" PRIu64 "), (%" PRIu64 ", %" PRIu64 ")",
DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0], update_key.b[1], earliest_key.b[0], earliest_key.b[1]);
} else if (!total_len && alternate_index == CACHE_ALT_REMOVED) {
Dbg(dbg_ctl_cache_update, "alt delete, %d, (%" PRIu64 ", %" PRIu64 ")", DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0],
update_key.b[1]);
}
}
}
// update the appropriate stat variable
// These variables may not give the current no of documents with
// one, two and three or more fragments. This is because for
// updates we dont decrement the variable corresponding the old
// size of the document
if ((closed == 1) && (total_len > 0 || f.allow_empty_doc)) {
DDbg(dbg_ctl_cache_stats, "Fragment = %d", fragment);
Metrics::Counter::increment(cache_rsb.fragment_document_count[std::clamp(fragment, 0, 2)]);
Metrics::Counter::increment(stripe->cache_vol->vol_rsb.fragment_document_count[std::clamp(fragment, 0, 2)]);
}
if (f.close_complete) {
recursive++;
ink_assert(!stripe || this_ethread() != stripe->mutex->thread_holding);
vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, (void *)&vio);
recursive--;
}
return free_CacheVC(this);
}
int
CacheVC::openWriteCloseHeadDone(int event, Event *e)
{
if (event == AIO_EVENT_DONE) {
set_io_not_in_progress();
} else if (is_io_in_progress()) {
return EVENT_CONT;
}
{
CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
VC_LOCK_RETRY_EVENT();
}
od->writing_vec = false;
if (!io.ok()) {
goto Lclose;
}
ink_assert(f.use_first_key);
if (!od->dont_update_directory) {
if (dir_is_empty(&od->first_dir)) {
dir_insert(&first_key, stripe, &dir);
} else {
// multiple fragment vector write
dir_overwrite(&first_key, stripe, &dir, &od->first_dir, false);
// insert moved resident alternate
if (od->move_resident_alt) {
if (dir_valid(stripe, &od->single_doc_dir)) {
dir_insert(&od->single_doc_key, stripe, &od->single_doc_dir);
}
od->move_resident_alt = false;
}
}
od->first_dir = dir;
if (frag_type == CACHE_FRAG_TYPE_HTTP && f.single_fragment) {
// fragment is tied to the vector
od->move_resident_alt = true;
if (!f.rewrite_resident_alt) {
od->single_doc_key = earliest_key;
}
dir_assign(&od->single_doc_dir, &dir);
dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
}
}
}
Lclose:
return openWriteCloseDir(event, e);
}
int
CacheVC::openWriteCloseHead(int event, Event *e)
{
cancel_trigger();
f.use_first_key = 1;
if (io.ok()) {
ink_assert(fragment || (length == (int64_t)total_len));
} else {
return openWriteCloseDir(event, e);
}
if (f.data_done) {
write_len = 0;
} else {
write_len = length;
}
if (frag_type == CACHE_FRAG_TYPE_HTTP) {
SET_HANDLER(&CacheVC::updateVector);
return updateVector(EVENT_IMMEDIATE, nullptr);
} else {
header_len = header_to_write_len;
SET_HANDLER(&CacheVC::openWriteCloseHeadDone);
return do_write_lock();
}
}
int
CacheVC::openWriteCloseDataDone(int event, Event *e)
{
int ret = 0;
cancel_trigger();
if (event == AIO_EVENT_DONE) {
set_io_not_in_progress();
} else if (is_io_in_progress()) {
return EVENT_CONT;
}
if (!io.ok()) {
return openWriteCloseDir(event, e);
}
{
CACHE_TRY_LOCK(lock, stripe->mutex, this_ethread());
if (!lock.is_locked()) {
VC_LOCK_RETRY_EVENT();
}
if (!fragment) {
ink_assert(key == earliest_key);
earliest_dir = dir;
} else {
// Store the offset only if there is a table.
// Currently there is no alt (and thence no table) for non-HTTP.
if (alternate.valid()) {
alternate.push_frag_offset(write_pos);
}
}
fragment++;
write_pos += write_len;
dir_insert(&key, stripe, &dir);
blocks = iobufferblock_skip(blocks.get(), &offset, &length, write_len);
next_CacheKey(&key, &key);
if (length) {
write_len = length;
if (write_len > MAX_FRAG_SIZE) {
write_len = MAX_FRAG_SIZE;
}
if ((ret = do_write_call()) == EVENT_RETURN) {
goto Lcallreturn;
}
return ret;
}
f.data_done = 1;
return openWriteCloseHead(event, e); // must be called under vol lock from here
}
Lcallreturn:
return handleEvent(AIO_EVENT_DONE, nullptr);
}
int
CacheVC::openWriteClose(int event, Event *e)
{
cancel_trigger();
if (is_io_in_progress()) {
if (event != AIO_EVENT_DONE) {
return EVENT_CONT;
}
set_io_not_in_progress();
if (!io.ok()) {
return openWriteCloseDir(event, e);
}
}
if (closed > 0 || f.allow_empty_doc) {
if (total_len == 0) {
if (f.update || f.allow_empty_doc) {
return updateVector(event, e);
} else {
// If we've been CLOSE'd but nothing has been written then
// this close is transformed into an abort.
closed = -1;
return openWriteCloseDir(event, e);
}
}
if (length && (fragment || length > static_cast<int>(MAX_FRAG_SIZE))) {
SET_HANDLER(&CacheVC::openWriteCloseDataDone);
write_len = length;
if (write_len > MAX_FRAG_SIZE) {
write_len = MAX_FRAG_SIZE;
}
return do_write_lock_call();
} else {
return openWriteCloseHead(event, e);
}
} else {
return openWriteCloseDir(event, e);
}
}
int
CacheVC::openWriteWriteDone(int event, Event *e)
{
cancel_trigger();
if (event == AIO_EVENT_DONE) {
set_io_not_in_progress();
} else if (is_io_in_progress()) {
return EVENT_CONT;
}
// In the event of VC_EVENT_ERROR, the cont must do an io_close
if (!io.ok()) {
if (closed) {
closed = -1;
return die();
}
SET_HANDLER(&CacheVC::openWriteMain);
return calluser(VC_EVENT_ERROR);
}
{
CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
VC_LOCK_RETRY_EVENT();
}
// store the earliest directory. Need to remove the earliest dir
// in case the writer aborts.
if (!fragment) {
ink_assert(key == earliest_key);
earliest_dir = dir;
} else {
// Store the offset only if there is a table.
// Currently there is no alt (and thence no table) for non-HTTP.
if (alternate.valid()) {
alternate.push_frag_offset(write_pos);
}
}
++fragment;
write_pos += write_len;
dir_insert(&key, stripe, &dir);
DDbg(dbg_ctl_cache_insert, "WriteDone: %X, %X, %d", key.slice32(0), first_key.slice32(0), write_len);
blocks = iobufferblock_skip(blocks.get(), &offset, &length, write_len);
next_CacheKey(&key, &key);
}
if (closed) {
return die();
}
SET_HANDLER(&CacheVC::openWriteMain);
return openWriteMain(event, e);
}
static inline int
target_fragment_size()
{
uint64_t value = cache_config_target_fragment_size - sizeof(Doc);
ink_release_assert(value <= MAX_FRAG_SIZE);
return value;
}
int
CacheVC::openWriteMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
cancel_trigger();
int called_user = 0;
ink_assert(!is_io_in_progress());
Lagain:
if (!vio.buffer.writer()) {
if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE) {
return EVENT_DONE;
}
if (!vio.buffer.writer()) {
return EVENT_CONT;
}
}
if (vio.ntodo() <= 0) {
called_user = 1;
if (calluser(VC_EVENT_WRITE_COMPLETE) == EVENT_DONE) {
return EVENT_DONE;
}
ink_assert(!f.close_complete || !"close expected after write COMPLETE");
if (vio.ntodo() <= 0) {
return EVENT_CONT;
}
}
int64_t ntodo = static_cast<int64_t>(vio.ntodo() + length);
int64_t total_avail = vio.buffer.reader()->read_avail();
int64_t avail = total_avail;
int64_t towrite = avail + length;
if (towrite > ntodo) {
avail -= (towrite - ntodo);
towrite = ntodo;
}
if (towrite > static_cast<int>(MAX_FRAG_SIZE)) {
avail -= (towrite - MAX_FRAG_SIZE);
towrite = MAX_FRAG_SIZE;
}
if (!blocks && towrite) {
blocks = vio.buffer.reader()->block;
offset = vio.buffer.reader()->start_offset;
}
if (avail > 0) {
vio.buffer.reader()->consume(avail);
vio.ndone += avail;
total_len += avail;
}
length = static_cast<uint64_t>(towrite);
if (length > target_fragment_size() && (length < target_fragment_size() + target_fragment_size() / 4)) {
write_len = target_fragment_size();
} else {
write_len = length;
}
bool not_writing = towrite != ntodo && towrite < target_fragment_size();
if (!called_user) {
if (not_writing) {
called_user = 1;
if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE) {
return EVENT_DONE;
}
goto Lagain;
} else if (vio.ntodo() <= 0) {
goto Lagain;
}
}
if (not_writing) {
return EVENT_CONT;
}
if (towrite == ntodo && f.close_complete) {
closed = 1;
SET_HANDLER(&CacheVC::openWriteClose);
return openWriteClose(EVENT_NONE, nullptr);
}
SET_HANDLER(&CacheVC::openWriteWriteDone);
return do_write_lock_call();
}
// begin overwrite
int
CacheVC::openWriteOverwrite(int event, Event *e)
{
cancel_trigger();
if (event != AIO_EVENT_DONE) {
if (event == EVENT_IMMEDIATE) {
last_collision = nullptr;
}
} else {
Doc *doc = nullptr;
set_io_not_in_progress();
if (_action.cancelled) {
return openWriteCloseDir(event, e);
}
if (!io.ok()) {
goto Ldone;
}
doc = reinterpret_cast<Doc *>(buf->data());
if (!(doc->first_key == first_key)) {
goto Lcollision;
}
od->first_dir = dir;
first_buf = buf;
goto Ldone;
}
Lcollision: {
CACHE_TRY_LOCK(lock, stripe->mutex, this_ethread());
if (!lock.is_locked()) {
VC_LOCK_RETRY_EVENT();
}
int res = dir_probe(&first_key, stripe, &dir, &last_collision);
if (res > 0) {
if ((res = do_read_call(&first_key)) == EVENT_RETURN) {
goto Lcallreturn;
}
return res;
}
}
Ldone:
SET_HANDLER(&CacheVC::openWriteMain);
return callcont(CACHE_EVENT_OPEN_WRITE);
Lcallreturn:
return handleEvent(AIO_EVENT_DONE, nullptr); // hopefully a tail call
}
// openWriteStartDone handles vector read (addition of alternates)
// and lock misses
int
CacheVC::openWriteStartDone(int event, Event *e)
{
intptr_t err = ECACHE_NO_DOC;
cancel_trigger();
if (is_io_in_progress()) {
if (event != AIO_EVENT_DONE) {
return EVENT_CONT;
}
set_io_not_in_progress();
}
{
CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding);
if (!lock.is_locked()) {
VC_LOCK_RETRY_EVENT();
}
if (_action.cancelled && (!od || !od->has_multiple_writers())) {
goto Lcancel;
}
if (event == AIO_EVENT_DONE) { // vector read done
Doc *doc = reinterpret_cast<Doc *>(buf->data());
if (!io.ok()) {
err = ECACHE_READ_FAIL;
goto Lfailure;
}
/* INKqa07123.
A directory entry which is no longer valid may have been overwritten.
We need to start afresh from the beginning by setting last_collision
to nullptr.
*/
if (!dir_valid(stripe, &dir)) {
DDbg(dbg_ctl_cache_write, "OpenReadStartDone: Dir not valid: Write Head: %" PRId64 ", Dir: %" PRId64,
(int64_t)stripe->offset_to_vol_offset(stripe->header->write_pos), dir_offset(&dir));
last_collision = nullptr;
goto Lcollision;
}
if (!(doc->first_key == first_key)) {
goto Lcollision;
}
if (doc->magic != DOC_MAGIC || !doc->hlen || this->load_http_info(write_vector, doc, buf.object()) != doc->hlen) {
err = ECACHE_BAD_META_DATA;
goto Lfailure;
}
ink_assert(write_vector->count() > 0);
od->first_dir = dir;
first_dir = dir;
if (doc->single_fragment()) {
// fragment is tied to the vector
od->move_resident_alt = true;
od->single_doc_key = doc->key;
dir_assign(&od->single_doc_dir, &dir);
dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2));
}
first_buf = buf;
goto Lsuccess;
}
Lcollision:
int if_writers = ((uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES);
if (!od) {
if ((err = stripe->open_write(this, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0) {
goto Lfailure;
}
if (od->has_multiple_writers()) {
MUTEX_RELEASE(lock);
SET_HANDLER(&CacheVC::openWriteMain);
return callcont(CACHE_EVENT_OPEN_WRITE);
}
}
// check for collision
if (dir_probe(&first_key, stripe, &dir, &last_collision)) {
od->reading_vec = true;
int ret = do_read_call(&first_key);
if (ret == EVENT_RETURN) {
goto Lcallreturn;
}
return ret;
}
if (f.update) {
// fail update because vector has been GC'd
goto Lfailure;
}
}
Lsuccess:
od->reading_vec = false;
if (_action.cancelled) {
goto Lcancel;
}
SET_HANDLER(&CacheVC::openWriteMain);
return callcont(CACHE_EVENT_OPEN_WRITE);
Lfailure:
Metrics::Counter::increment(cache_rsb.status[op_type].failure);
Metrics::Counter::increment(stripe->cache_vol->vol_rsb.status[op_type].failure);
_action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-err);
Lcancel:
if (od) {
od->reading_vec = false;
return openWriteCloseDir(event, e);
} else {
return free_CacheVC(this);
}
Lcallreturn:
return handleEvent(AIO_EVENT_DONE, nullptr); // hopefully a tail call
}
// handle lock failures from main Cache::open_write entry points below
int
CacheVC::openWriteStartBegin(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */)
{
intptr_t err;
cancel_trigger();
if (_action.cancelled) {
return free_CacheVC(this);
}
if (((err = stripe->open_write_lock(this, false, 1)) > 0)) {
Metrics::Counter::increment(cache_rsb.status[op_type].failure);
Metrics::Counter::increment(stripe->cache_vol->vol_rsb.status[op_type].failure);
free_CacheVC(this);
_action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-err);
return EVENT_DONE;
}
if (err < 0) {
VC_SCHED_LOCK_RETRY();
}
if (f.overwrite) {
SET_HANDLER(&CacheVC::openWriteOverwrite);
return openWriteOverwrite(EVENT_IMMEDIATE, nullptr);
} else {
// write by key
SET_HANDLER(&CacheVC::openWriteMain);
return callcont(CACHE_EVENT_OPEN_WRITE);
}
}
// main entry point for writing of non-http documents
Action *
Cache::open_write(Continuation *cont, const CacheKey *key, CacheFragType frag_type, int options, time_t apin_in_cache,
const char *hostname, int host_len)
{
if (!CacheProcessor::IsCacheReady(frag_type)) {
cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-ECACHE_NOT_READY);
return ACTION_RESULT_DONE;
}
ink_assert(caches[frag_type] == this);
intptr_t res = 0;
CacheVC *c = new_CacheVC(cont);
SCOPED_MUTEX_LOCK(lock, c->mutex, this_ethread());
c->vio.op = VIO::WRITE;
c->op_type = static_cast<int>(CacheOpType::Write);
c->stripe = key_to_stripe(key, hostname, host_len);
Stripe *stripe = c->stripe;
Metrics::Gauge::increment(cache_rsb.status[c->op_type].active);
Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[c->op_type].active);
c->first_key = c->key = *key;
c->frag_type = frag_type;
/*
The transition from single fragment document to a multi-fragment document
would cause a problem if the key and the first_key collide. In case of
a collision, old vector data could be served to HTTP. Need to avoid that.
Also, when evacuating a fragment, we have to decide if its the first_key
or the earliest_key based on the dir_tag.
*/
do {
rand_CacheKey(&c->key);
} while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2)));
c->earliest_key = c->key;
c->info = nullptr;
c->f.overwrite = (options & CACHE_WRITE_OPT_OVERWRITE) != 0;
c->f.close_complete = (options & CACHE_WRITE_OPT_CLOSE_COMPLETE) != 0;
c->f.sync = (options & CACHE_WRITE_OPT_SYNC) == CACHE_WRITE_OPT_SYNC;
// coverity[Y2K38_SAFETY:FALSE]
c->pin_in_cache = static_cast<uint32_t>(apin_in_cache);
if ((res = c->stripe->open_write_lock(c, false, 1)) > 0) {
// document currently being written, abort
Metrics::Counter::increment(cache_rsb.status[c->op_type].failure);
Metrics::Counter::increment(stripe->cache_vol->vol_rsb.status[c->op_type].failure);
cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-res);
free_CacheVC(c);
return ACTION_RESULT_DONE;
}
if (res < 0) {
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartBegin);
c->trigger = CONT_SCHED_LOCK_RETRY(c);
return &c->_action;
}
if (!c->f.overwrite) {
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
c->callcont(CACHE_EVENT_OPEN_WRITE);
return ACTION_RESULT_DONE;
} else {
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteOverwrite);
if (c->openWriteOverwrite(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) {
return ACTION_RESULT_DONE;
} else {
return &c->_action;
}
}
}
// main entry point for writing of http documents
Action *
Cache::open_write(Continuation *cont, const CacheKey *key, CacheHTTPInfo *info, time_t apin_in_cache,
const CacheKey * /* key1 ATS_UNUSED */, CacheFragType type, const char *hostname, int host_len)
{
if (!CacheProcessor::IsCacheReady(type)) {
cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-ECACHE_NOT_READY);
return ACTION_RESULT_DONE;
}
ink_assert(caches[type] == this);
intptr_t err = 0;
int if_writers = (uintptr_t)info == CACHE_ALLOW_MULTIPLE_WRITES;
CacheVC *c = new_CacheVC(cont);
c->vio.op = VIO::WRITE;
c->first_key = *key;
/*
The transition from single fragment document to a multi-fragment document
would cause a problem if the key and the first_key collide. In case of
a collision, old vector data could be served to HTTP. Need to avoid that.
Also, when evacuating a fragment, we have to decide if its the first_key
or the earliest_key based on the dir_tag.
*/
do {
rand_CacheKey(&c->key);
} while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2)));
c->earliest_key = c->key;
c->frag_type = CACHE_FRAG_TYPE_HTTP;
c->stripe = key_to_stripe(key, hostname, host_len);
Stripe *stripe = c->stripe;
c->info = info;
if (c->info && (uintptr_t)info != CACHE_ALLOW_MULTIPLE_WRITES) {
/*
Update has the following code paths :
a) Update alternate header only :
In this case the vector has to be rewritten. The content
length(update_len) and the key for the document are set in the
new_info in the set_http_info call.
HTTP OPERATIONS
open_write with info set
set_http_info new_info
(total_len == 0)
close
b) Update alternate and data
In this case both the vector and the data needs to be rewritten.
This case is similar to the standard write of a document case except
that the new_info is inserted into the vector at the alternate_index
(overwriting the old alternate) rather than the end of the vector.
HTTP OPERATIONS
open_write with info set
set_http_info new_info
do_io_write => (total_len > 0)
close
c) Delete an alternate
The vector may need to be deleted (if there was only one alternate) or
rewritten (if there were more than one alternate).
HTTP OPERATIONS
open_write with info set
close
*/
c->f.update = 1;
c->op_type = static_cast<int>(CacheOpType::Update);
DDbg(dbg_ctl_cache_update, "Update called");
info->object_key_get(&c->update_key);
ink_assert(!(c->update_key.is_zero()));
c->update_len = info->object_size_get();
} else {
c->op_type = static_cast<int>(CacheOpType::Write);
}
Metrics::Gauge::increment(cache_rsb.status[c->op_type].active);
Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[c->op_type].active);
// coverity[Y2K38_SAFETY:FALSE]
c->pin_in_cache = static_cast<uint32_t>(apin_in_cache);
{
CACHE_TRY_LOCK(lock, c->stripe->mutex, cont->mutex->thread_holding);
if (lock.is_locked()) {
if ((err = c->stripe->open_write(c, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0) {
goto Lfailure;
}
// If there are multiple writers, then this one cannot be an update.
// Only the first writer can do an update. If that's the case, we can
// return success to the state machine now.;
if (c->od->has_multiple_writers()) {
goto Lmiss;
}
if (!dir_probe(key, c->stripe, &c->dir, &c->last_collision)) {
if (c->f.update) {
// fail update because vector has been GC'd
// This situation can also arise in openWriteStartDone
err = ECACHE_NO_DOC;
goto Lfailure;
}
// document doesn't exist, begin write
goto Lmiss;
} else {
c->od->reading_vec = true;
// document exists, read vector
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
switch (c->do_read_call(&c->first_key)) {
case EVENT_DONE:
return ACTION_RESULT_DONE;
case EVENT_RETURN:
goto Lcallreturn;
default:
return &c->_action;
}
}
}
// missed lock
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone);
CONT_SCHED_LOCK_RETRY(c);
return &c->_action;
}
Lmiss:
SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain);
c->callcont(CACHE_EVENT_OPEN_WRITE);
return ACTION_RESULT_DONE;
Lfailure:
Metrics::Counter::increment(cache_rsb.status[c->op_type].failure);
Metrics::Counter::increment(stripe->cache_vol->vol_rsb.status[c->op_type].failure);
cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, (void *)-err);
if (c->od) {
c->openWriteCloseDir(EVENT_IMMEDIATE, nullptr);
return ACTION_RESULT_DONE;
}
free_CacheVC(c);
return ACTION_RESULT_DONE;
Lcallreturn:
if (c->handleEvent(AIO_EVENT_DONE, nullptr) == EVENT_DONE) {
return ACTION_RESULT_DONE;
}
return &c->_action;
}