blob: b7e793012b207b42095e6008987728a87732305a [file] [log] [blame]
/** @file
A brief file description
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "iocore/cache/Cache.h"
#include "P_CacheDisk.h"
#include "P_CacheInternal.h"
#include "P_CacheVol.h"
#include "iocore/eventsystem/EThread.h"
#include "iocore/eventsystem/Lock.h"
#include "tsutil/DbgCtl.h"
#include "tscore/hugepages.h"
#include "tscore/ink_assert.h"
#include "tscore/ink_memory.h"
#include <cstring>
namespace
{
DbgCtl dbg_ctl_cache_dir_sync{"dir_sync"};
DbgCtl dbg_ctl_cache_init{"cache_init"};
// This is the oldest version number that is still usable.
short int const CACHE_DB_MAJOR_VERSION_COMPATIBLE = 21;
int
compare_ushort(void const *a, void const *b)
{
return *static_cast<unsigned short const *>(a) - *static_cast<unsigned short const *>(b);
}
} // namespace
struct StripeInitInfo {
off_t recover_pos;
AIOCallbackInternal vol_aio[4];
char *vol_h_f;
StripeInitInfo()
{
recover_pos = 0;
vol_h_f = static_cast<char *>(ats_memalign(ats_pagesize(), 4 * STORE_BLOCK_SIZE));
memset(vol_h_f, 0, 4 * STORE_BLOCK_SIZE);
}
~StripeInitInfo()
{
for (auto &i : vol_aio) {
i.action = nullptr;
i.mutex.clear();
}
free(vol_h_f);
}
};
////
// Stripe
//
int
Stripe::begin_read(CacheVC *cont) const
{
ink_assert(cont->mutex->thread_holding == this_ethread());
ink_assert(mutex->thread_holding == this_ethread());
// no need for evacuation as the entire document is already in memory
if (cont->f.single_fragment) {
return 0;
}
int i = dir_evac_bucket(&cont->earliest_dir);
EvacuationBlock *b;
for (b = evacuate[i].head; b; b = b->link.next) {
if (dir_offset(&b->dir) != dir_offset(&cont->earliest_dir)) {
continue;
}
if (b->readers) {
b->readers = b->readers + 1;
}
return 0;
}
// we don't actually need to preserve this block as it is already in
// memory, but this is easier, and evacuations are rare
EThread *t = cont->mutex->thread_holding;
b = new_EvacuationBlock(t);
b->readers = 1;
b->dir = cont->earliest_dir;
b->evac_frags.key = cont->earliest_key;
evacuate[i].push(b);
return 1;
}
int
Stripe::close_read(CacheVC *cont) const
{
EThread *t = cont->mutex->thread_holding;
ink_assert(t == this_ethread());
ink_assert(t == mutex->thread_holding);
if (dir_is_empty(&cont->earliest_dir)) {
return 1;
}
int i = dir_evac_bucket(&cont->earliest_dir);
EvacuationBlock *b;
for (b = evacuate[i].head; b;) {
EvacuationBlock *next = b->link.next;
if (dir_offset(&b->dir) != dir_offset(&cont->earliest_dir)) {
b = next;
continue;
}
if (b->readers && !--b->readers) {
evacuate[i].remove(b);
free_EvacuationBlock(b, t);
break;
}
b = next;
}
return 1;
}
/**
Add AIO task to clear Dir.
*/
int
Stripe::clear_dir_aio()
{
size_t dir_len = this->dirlen();
this->_clear_init();
SET_HANDLER(&Stripe::handle_dir_clear);
io.aiocb.aio_fildes = fd;
io.aiocb.aio_buf = raw_dir;
io.aiocb.aio_nbytes = dir_len;
io.aiocb.aio_offset = skip;
io.action = this;
io.thread = AIO_CALLBACK_THREAD_ANY;
io.then = nullptr;
ink_assert(ink_aio_write(&io));
return 0;
}
/**
Clear Dir directly. This is mainly used by unit tests. The clear_dir_aio() is the suitable function in most cases.
*/
int
Stripe::clear_dir()
{
size_t dir_len = this->dirlen();
this->_clear_init();
if (pwrite(this->fd, this->raw_dir, dir_len, this->skip) < 0) {
Warning("unable to clear cache directory '%s'", this->hash_text.get());
return -1;
}
return 0;
}
int
Stripe::init(char *s, off_t blocks, off_t dir_skip, bool clear)
{
char *seed_str = disk->hash_base_string ? disk->hash_base_string : s;
const size_t hash_seed_size = strlen(seed_str);
const size_t hash_text_size = hash_seed_size + 32;
hash_text = static_cast<char *>(ats_malloc(hash_text_size));
ink_strlcpy(hash_text, seed_str, hash_text_size);
snprintf(hash_text + hash_seed_size, (hash_text_size - hash_seed_size), " %" PRIu64 ":%" PRIu64 "",
static_cast<uint64_t>(dir_skip), static_cast<uint64_t>(blocks));
CryptoContext().hash_immediate(hash_id, hash_text, strlen(hash_text));
dir_skip = ROUND_TO_STORE_BLOCK((dir_skip < START_POS ? START_POS : dir_skip));
path = ats_strdup(s);
len = blocks * STORE_BLOCK_SIZE;
ink_assert(len <= MAX_STRIPE_SIZE);
skip = dir_skip;
prev_recover_pos = 0;
// successive approximation, directory/meta data eats up some storage
start = dir_skip;
this->_init_data();
data_blocks = (len - (start - skip)) / STORE_BLOCK_SIZE;
hit_evacuate_window = (data_blocks * cache_config_hit_evacuate_percent) / 100;
evacuate_size = static_cast<int>(len / EVACUATION_BUCKET_SIZE) + 2;
int evac_len = evacuate_size * sizeof(DLL<EvacuationBlock>);
evacuate = static_cast<DLL<EvacuationBlock> *>(ats_malloc(evac_len));
memset(static_cast<void *>(evacuate), 0, evac_len);
Dbg(dbg_ctl_cache_init, "Vol %s: allocating %zu directory bytes for a %lld byte volume (%lf%%)", hash_text.get(), dirlen(),
(long long)this->len, (double)dirlen() / (double)this->len * 100.0);
raw_dir = nullptr;
if (ats_hugepage_enabled()) {
raw_dir = static_cast<char *>(ats_alloc_hugepage(this->dirlen()));
}
if (raw_dir == nullptr) {
raw_dir = static_cast<char *>(ats_memalign(ats_pagesize(), this->dirlen()));
}
dir = reinterpret_cast<Dir *>(raw_dir + this->headerlen());
header = reinterpret_cast<StripteHeaderFooter *>(raw_dir);
footer = reinterpret_cast<StripteHeaderFooter *>(raw_dir + this->dirlen() - ROUND_TO_STORE_BLOCK(sizeof(StripteHeaderFooter)));
if (clear) {
Note("clearing cache directory '%s'", hash_text.get());
return clear_dir_aio();
}
init_info = new StripeInitInfo();
int footerlen = ROUND_TO_STORE_BLOCK(sizeof(StripteHeaderFooter));
off_t footer_offset = this->dirlen() - footerlen;
// try A
off_t as = skip;
Dbg(dbg_ctl_cache_init, "reading directory '%s'", hash_text.get());
SET_HANDLER(&Stripe::handle_header_read);
init_info->vol_aio[0].aiocb.aio_offset = as;
init_info->vol_aio[1].aiocb.aio_offset = as + footer_offset;
off_t bs = skip + this->dirlen();
init_info->vol_aio[2].aiocb.aio_offset = bs;
init_info->vol_aio[3].aiocb.aio_offset = bs + footer_offset;
for (unsigned i = 0; i < countof(init_info->vol_aio); i++) {
AIOCallback *aio = &(init_info->vol_aio[i]);
aio->aiocb.aio_fildes = fd;
aio->aiocb.aio_buf = &(init_info->vol_h_f[i * STORE_BLOCK_SIZE]);
aio->aiocb.aio_nbytes = footerlen;
aio->action = this;
aio->thread = AIO_CALLBACK_THREAD_ANY;
aio->then = (i < 3) ? &(init_info->vol_aio[i + 1]) : nullptr;
}
ink_assert(ink_aio_read(init_info->vol_aio));
return 0;
}
int
Stripe::handle_dir_clear(int event, void *data)
{
size_t dir_len = this->dirlen();
AIOCallback *op;
if (event == AIO_EVENT_DONE) {
op = static_cast<AIOCallback *>(data);
if (!op->ok()) {
Warning("unable to clear cache directory '%s'", hash_text.get());
disk->incrErrors(op);
}
if (op->aiocb.aio_nbytes == dir_len) {
/* clear the header for directory B. We don't need to clear the
whole of directory B. The header for directory B starts at
skip + len */
op->aiocb.aio_nbytes = ROUND_TO_STORE_BLOCK(sizeof(StripteHeaderFooter));
op->aiocb.aio_offset = skip + dir_len;
ink_assert(ink_aio_write(op));
return EVENT_DONE;
}
set_io_not_in_progress();
SET_HANDLER(&Stripe::dir_init_done);
dir_init_done(EVENT_IMMEDIATE, nullptr);
/* mark the volume as bad */
}
return EVENT_DONE;
}
int
Stripe::handle_dir_read(int event, void *data)
{
AIOCallback *op = static_cast<AIOCallback *>(data);
if (event == AIO_EVENT_DONE) {
if (!op->ok()) {
Note("Directory read failed: clearing cache directory %s", this->hash_text.get());
clear_dir_aio();
return EVENT_DONE;
}
}
if (!(header->magic == STRIPE_MAGIC && footer->magic == STRIPE_MAGIC &&
CACHE_DB_MAJOR_VERSION_COMPATIBLE <= header->version._major && header->version._major <= CACHE_DB_MAJOR_VERSION)) {
Warning("bad footer in cache directory for '%s', clearing", hash_text.get());
Note("STRIPE_MAGIC %d\n header magic: %d\n footer_magic %d\n CACHE_DB_MAJOR_VERSION_COMPATIBLE %d\n major version %d\n"
"CACHE_DB_MAJOR_VERSION %d\n",
STRIPE_MAGIC, header->magic, footer->magic, CACHE_DB_MAJOR_VERSION_COMPATIBLE, header->version._major,
CACHE_DB_MAJOR_VERSION);
Note("clearing cache directory '%s'", hash_text.get());
clear_dir_aio();
return EVENT_DONE;
}
CHECK_DIR(this);
sector_size = header->sector_size;
return this->recover_data();
}
int
Stripe::recover_data()
{
SET_HANDLER(&Stripe::handle_recover_from_data);
return handle_recover_from_data(EVENT_IMMEDIATE, nullptr);
}
/*
Philosophy: The idea is to find the region of disk that could be
inconsistent and remove all directory entries pointing to that potentially
inconsistent region.
Start from a consistent position (the write_pos of the last directory
synced to disk) and scan forward. Two invariants for docs that were
written to the disk after the directory was synced:
1. doc->magic == DOC_MAGIC
The following two cases happen only when the previous generation
documents are aligned with the current ones.
2. All the docs written to the disk
after the directory was synced will have their sync_serial <=
header->sync_serial + 1, because the write aggregation can take
indeterminate amount of time to sync. The doc->sync_serial can be
equal to header->sync_serial + 1, because we increment the sync_serial
before we sync the directory to disk.
3. The doc->sync_serial will always increase. If doc->sync_serial
decreases, the document was written in the previous phase
If either of these conditions fail and we are not too close to the end
(see the next comment ) then we're done
We actually start from header->last_write_pos instead of header->write_pos
to make sure that we haven't wrapped around the whole disk without
syncing the directory. Since the sync serial is 60 seconds, it is
entirely possible to write through the whole cache without
once syncing the directory. In this case, we need to clear the
cache.The documents written right before we synced the
directory to disk should have the write_serial <= header->sync_serial.
*/
int
Stripe::handle_recover_from_data(int event, void * /* data ATS_UNUSED */)
{
uint32_t got_len = 0;
uint32_t max_sync_serial = header->sync_serial;
char *s, *e = nullptr;
if (event == EVENT_IMMEDIATE) {
if (header->sync_serial == 0) {
io.aiocb.aio_buf = nullptr;
SET_HANDLER(&Stripe::handle_recover_write_dir);
return handle_recover_write_dir(EVENT_IMMEDIATE, nullptr);
}
// initialize
recover_wrapped = false;
last_sync_serial = 0;
last_write_serial = 0;
recover_pos = header->last_write_pos;
if (recover_pos >= skip + len) {
recover_wrapped = true;
recover_pos = start;
}
io.aiocb.aio_buf = static_cast<char *>(ats_memalign(ats_pagesize(), RECOVERY_SIZE));
io.aiocb.aio_nbytes = RECOVERY_SIZE;
if (static_cast<off_t>(recover_pos + io.aiocb.aio_nbytes) > static_cast<off_t>(skip + len)) {
io.aiocb.aio_nbytes = (skip + len) - recover_pos;
}
} else if (event == AIO_EVENT_DONE) {
if (!io.ok()) {
Warning("disk read error on recover '%s', clearing", hash_text.get());
disk->incrErrors(&io);
goto Lclear;
}
if (io.aiocb.aio_offset == header->last_write_pos) {
/* check that we haven't wrapped around without syncing
the directory. Start from last_write_serial (write pos the documents
were written to just before syncing the directory) and make sure
that all documents have write_serial <= header->write_serial.
*/
uint32_t to_check = header->write_pos - header->last_write_pos;
ink_assert(to_check && to_check < (uint32_t)io.aiocb.aio_nbytes);
uint32_t done = 0;
s = static_cast<char *>(io.aiocb.aio_buf);
while (done < to_check) {
Doc *doc = reinterpret_cast<Doc *>(s + done);
if (doc->magic != DOC_MAGIC || doc->write_serial > header->write_serial) {
Warning("no valid directory found while recovering '%s', clearing", hash_text.get());
goto Lclear;
}
done += round_to_approx_size(doc->len);
if (doc->sync_serial > last_write_serial) {
last_sync_serial = doc->sync_serial;
}
}
ink_assert(done == to_check);
got_len = io.aiocb.aio_nbytes - done;
recover_pos += io.aiocb.aio_nbytes;
s = static_cast<char *>(io.aiocb.aio_buf) + done;
e = s + got_len;
} else {
got_len = io.aiocb.aio_nbytes;
recover_pos += io.aiocb.aio_nbytes;
s = static_cast<char *>(io.aiocb.aio_buf);
e = s + got_len;
}
}
// examine what we got
if (got_len) {
Doc *doc = nullptr;
if (recover_wrapped && start == io.aiocb.aio_offset) {
doc = reinterpret_cast<Doc *>(s);
if (doc->magic != DOC_MAGIC || doc->write_serial < last_write_serial) {
recover_pos = skip + len - EVACUATION_SIZE;
goto Ldone;
}
}
// If execution reaches here, then @c got_len > 0 and e == s + got_len therefore s < e
// clang analyzer can't figure this out, so be explicit.
ink_assert(s < e);
while (s < e) {
doc = reinterpret_cast<Doc *>(s);
if (doc->magic != DOC_MAGIC || doc->sync_serial != last_sync_serial) {
if (doc->magic == DOC_MAGIC) {
if (doc->sync_serial > header->sync_serial) {
max_sync_serial = doc->sync_serial;
}
/*
doc->magic == DOC_MAGIC, but doc->sync_serial != last_sync_serial
This might happen in the following situations
1. We are starting off recovery. In this case the
last_sync_serial == header->sync_serial, but the doc->sync_serial
can be anywhere in the range (0, header->sync_serial + 1]
If this is the case, update last_sync_serial and continue;
2. A dir sync started between writing documents to the
aggregation buffer and hence the doc->sync_serial went up.
If the doc->sync_serial is greater than the last
sync serial and less than (header->sync_serial + 2) then
continue;
3. If the position we are recovering from is within AGG_SIZE
from the disk end, then we can't trust this document. The
aggregation buffer might have been larger than the remaining space
at the end and we decided to wrap around instead of writing
anything at that point. In this case, wrap around and start
from the beginning.
If neither of these 3 cases happen, then we are indeed done.
*/
// case 1
// case 2
if (doc->sync_serial > last_sync_serial && doc->sync_serial <= header->sync_serial + 1) {
last_sync_serial = doc->sync_serial;
s += round_to_approx_size(doc->len);
continue;
}
// case 3 - we have already recovered some data and
// (doc->sync_serial < last_sync_serial) ||
// (doc->sync_serial > header->sync_serial + 1).
// if we are too close to the end, wrap around
else if (recover_pos - (e - s) > (skip + len) - AGG_SIZE) {
recover_wrapped = true;
recover_pos = start;
io.aiocb.aio_nbytes = RECOVERY_SIZE;
break;
}
// we are done. This doc was written in the earlier phase
recover_pos -= e - s;
goto Ldone;
} else {
// doc->magic != DOC_MAGIC
// If we are in the danger zone - recover_pos is within AGG_SIZE
// from the end, then wrap around
recover_pos -= e - s;
if (recover_pos > (skip + len) - AGG_SIZE) {
recover_wrapped = true;
recover_pos = start;
io.aiocb.aio_nbytes = RECOVERY_SIZE;
break;
}
// we ar not in the danger zone
goto Ldone;
}
}
// doc->magic == DOC_MAGIC && doc->sync_serial == last_sync_serial
last_write_serial = doc->write_serial;
s += round_to_approx_size(doc->len);
}
/* if (s > e) then we gone through RECOVERY_SIZE; we need to
read more data off disk and continue recovering */
if (s >= e) {
/* In the last iteration, we increment s by doc->len...need to undo
that change */
if (s > e) {
s -= round_to_approx_size(doc->len);
}
recover_pos -= e - s;
if (recover_pos >= skip + len) {
recover_wrapped = true;
recover_pos = start;
}
io.aiocb.aio_nbytes = RECOVERY_SIZE;
if (static_cast<off_t>(recover_pos + io.aiocb.aio_nbytes) > static_cast<off_t>(skip + len)) {
io.aiocb.aio_nbytes = (skip + len) - recover_pos;
}
}
}
if (recover_pos == prev_recover_pos) { // this should never happen, but if it does break the loop
goto Lclear;
}
prev_recover_pos = recover_pos;
io.aiocb.aio_offset = recover_pos;
ink_assert(ink_aio_read(&io));
return EVENT_CONT;
Ldone: {
/* if we come back to the starting position, then we don't have to recover anything */
if (recover_pos == header->write_pos && recover_wrapped) {
SET_HANDLER(&Stripe::handle_recover_write_dir);
if (dbg_ctl_cache_init.on()) {
Note("recovery wrapped around. nothing to clear\n");
}
return handle_recover_write_dir(EVENT_IMMEDIATE, nullptr);
}
recover_pos += EVACUATION_SIZE; // safely cover the max write size
if (recover_pos < header->write_pos && (recover_pos + EVACUATION_SIZE >= header->write_pos)) {
Dbg(dbg_ctl_cache_init, "Head Pos: %" PRIu64 ", Rec Pos: %" PRIu64 ", Wrapped:%d", header->write_pos, recover_pos,
recover_wrapped);
Warning("no valid directory found while recovering '%s', clearing", hash_text.get());
goto Lclear;
}
if (recover_pos > skip + len) {
recover_pos -= skip + len;
}
// bump sync number so it is different from that in the Doc structs
uint32_t next_sync_serial = max_sync_serial + 1;
// make that the next sync does not overwrite our good copy!
if (!(header->sync_serial & 1) == !(next_sync_serial & 1)) {
next_sync_serial++;
}
// clear effected portion of the cache
off_t clear_start = this->offset_to_vol_offset(header->write_pos);
off_t clear_end = this->offset_to_vol_offset(recover_pos);
if (clear_start <= clear_end) {
dir_clear_range(clear_start, clear_end, this);
} else {
dir_clear_range(clear_start, DIR_OFFSET_MAX, this);
dir_clear_range(1, clear_end, this);
}
Note("recovery clearing offsets of Stripe %s : [%" PRIu64 ", %" PRIu64 "] sync_serial %d next %d\n", hash_text.get(),
header->write_pos, recover_pos, header->sync_serial, next_sync_serial);
footer->sync_serial = header->sync_serial = next_sync_serial;
for (int i = 0; i < 3; i++) {
AIOCallback *aio = &(init_info->vol_aio[i]);
aio->aiocb.aio_fildes = fd;
aio->action = this;
aio->thread = AIO_CALLBACK_THREAD_ANY;
aio->then = (i < 2) ? &(init_info->vol_aio[i + 1]) : nullptr;
}
int footerlen = ROUND_TO_STORE_BLOCK(sizeof(StripteHeaderFooter));
size_t dirlen = this->dirlen();
int B = header->sync_serial & 1;
off_t ss = skip + (B ? dirlen : 0);
init_info->vol_aio[0].aiocb.aio_buf = raw_dir;
init_info->vol_aio[0].aiocb.aio_nbytes = footerlen;
init_info->vol_aio[0].aiocb.aio_offset = ss;
init_info->vol_aio[1].aiocb.aio_buf = raw_dir + footerlen;
init_info->vol_aio[1].aiocb.aio_nbytes = dirlen - 2 * footerlen;
init_info->vol_aio[1].aiocb.aio_offset = ss + footerlen;
init_info->vol_aio[2].aiocb.aio_buf = raw_dir + dirlen - footerlen;
init_info->vol_aio[2].aiocb.aio_nbytes = footerlen;
init_info->vol_aio[2].aiocb.aio_offset = ss + dirlen - footerlen;
SET_HANDLER(&Stripe::handle_recover_write_dir);
ink_assert(ink_aio_write(init_info->vol_aio));
return EVENT_CONT;
}
Lclear:
free(static_cast<char *>(io.aiocb.aio_buf));
delete init_info;
init_info = nullptr;
clear_dir_aio();
return EVENT_CONT;
}
int
Stripe::handle_recover_write_dir(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
{
if (io.aiocb.aio_buf) {
free(static_cast<char *>(io.aiocb.aio_buf));
}
delete init_info;
init_info = nullptr;
set_io_not_in_progress();
scan_pos = header->write_pos;
periodic_scan();
SET_HANDLER(&Stripe::dir_init_done);
return dir_init_done(EVENT_IMMEDIATE, nullptr);
}
int
Stripe::handle_header_read(int event, void *data)
{
AIOCallback *op;
StripteHeaderFooter *hf[4];
switch (event) {
case AIO_EVENT_DONE:
op = static_cast<AIOCallback *>(data);
for (auto &i : hf) {
ink_assert(op != nullptr);
i = static_cast<StripteHeaderFooter *>(op->aiocb.aio_buf);
if (!op->ok()) {
Note("Header read failed: clearing cache directory %s", this->hash_text.get());
clear_dir_aio();
return EVENT_DONE;
}
op = op->then;
}
io.aiocb.aio_fildes = fd;
io.aiocb.aio_nbytes = this->dirlen();
io.aiocb.aio_buf = raw_dir;
io.action = this;
io.thread = AIO_CALLBACK_THREAD_ANY;
io.then = nullptr;
if (hf[0]->sync_serial == hf[1]->sync_serial &&
(hf[0]->sync_serial >= hf[2]->sync_serial || hf[2]->sync_serial != hf[3]->sync_serial)) {
SET_HANDLER(&Stripe::handle_dir_read);
if (dbg_ctl_cache_init.on()) {
Note("using directory A for '%s'", hash_text.get());
}
io.aiocb.aio_offset = skip;
ink_assert(ink_aio_read(&io));
}
// try B
else if (hf[2]->sync_serial == hf[3]->sync_serial) {
SET_HANDLER(&Stripe::handle_dir_read);
if (dbg_ctl_cache_init.on()) {
Note("using directory B for '%s'", hash_text.get());
}
io.aiocb.aio_offset = skip + this->dirlen();
ink_assert(ink_aio_read(&io));
} else {
Note("no good directory, clearing '%s' since sync_serials on both A and B copies are invalid", hash_text.get());
Note("Header A: %d\nFooter A: %d\n Header B: %d\n Footer B %d\n", hf[0]->sync_serial, hf[1]->sync_serial, hf[2]->sync_serial,
hf[3]->sync_serial);
clear_dir_aio();
delete init_info;
init_info = nullptr;
}
return EVENT_DONE;
default:
ink_assert(!"not reach here");
}
return EVENT_DONE;
}
int
Stripe::dir_init_done(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */)
{
if (!cache->cache_read_done) {
eventProcessor.schedule_in(this, HRTIME_MSECONDS(5), ET_CALL);
return EVENT_CONT;
} else {
int i = gnstripes++;
ink_assert(!gstripes[i]);
gstripes[i] = this;
SET_HANDLER(&Stripe::aggWrite);
cache->vol_initialized(fd != -1);
return EVENT_DONE;
}
}
int
Stripe::dir_check(bool /* fix ATS_UNUSED */) // TODO: we should eliminate this parameter ?
{
static int const SEGMENT_HISTOGRAM_WIDTH = 16;
int hist[SEGMENT_HISTOGRAM_WIDTH + 1] = {0};
unsigned short chain_tag[MAX_ENTRIES_PER_SEGMENT];
int32_t chain_mark[MAX_ENTRIES_PER_SEGMENT];
uint64_t total_buckets = buckets * segments;
uint64_t total_entries = total_buckets * DIR_DEPTH;
int frag_demographics[1 << DIR_SIZE_WIDTH][DIR_BLOCK_SIZES];
int j;
int stale = 0, in_use = 0, empty = 0;
int free = 0, head = 0, buckets_in_use = 0;
int max_chain_length = 0;
int64_t bytes_in_use = 0;
ink_zero(frag_demographics);
printf("Stripe '[%s]'\n", hash_text.get());
printf(" Directory Bytes: %" PRIu64 "\n", total_buckets * SIZEOF_DIR);
printf(" Segments: %d\n", segments);
printf(" Buckets per segment: %" PRIu64 "\n", buckets);
printf(" Entries: %" PRIu64 "\n", total_entries);
for (int s = 0; s < segments; s++) {
Dir *seg = this->dir_segment(s);
int seg_chain_max = 0;
int seg_empty = 0;
int seg_in_use = 0;
int seg_stale = 0;
int seg_bytes_in_use = 0;
int seg_dups = 0;
int seg_buckets_in_use = 0;
ink_zero(chain_tag);
memset(chain_mark, -1, sizeof(chain_mark));
for (int b = 0; b < buckets; b++) {
Dir *root = dir_bucket(b, seg);
int h = 0; // chain length starting in this bucket
// Walk the chain starting in this bucket
int chain_idx = 0;
int mark = 0;
++seg_buckets_in_use;
for (Dir *e = root; e; e = next_dir(e, seg)) {
if (!dir_offset(e)) {
++seg_empty;
--seg_buckets_in_use;
// this should only happen on the first dir in a bucket
ink_assert(nullptr == next_dir(e, seg));
break;
} else {
int e_idx = e - seg;
++h;
chain_tag[chain_idx++] = dir_tag(e);
if (chain_mark[e_idx] == mark) {
printf(" - Cycle of length %d detected for bucket %d\n", h, b);
} else if (chain_mark[e_idx] >= 0) {
printf(" - Entry %d is in chain %d and %d", e_idx, chain_mark[e_idx], mark);
} else {
chain_mark[e_idx] = mark;
}
if (!dir_valid(this, e)) {
++seg_stale;
} else {
uint64_t size = dir_approx_size(e);
if (dir_head(e)) {
++head;
}
++seg_in_use;
seg_bytes_in_use += size;
++frag_demographics[dir_size(e)][dir_big(e)];
}
}
}
// Check for duplicates (identical tags in the same bucket).
if (h > 1) {
unsigned short last;
qsort(chain_tag, h, sizeof(chain_tag[0]), &compare_ushort);
last = chain_tag[0];
for (int k = 1; k < h; ++k) {
if (last == chain_tag[k]) {
++seg_dups;
}
last = chain_tag[k];
}
}
++hist[std::min(h, SEGMENT_HISTOGRAM_WIDTH)];
seg_chain_max = std::max(seg_chain_max, h);
}
int fl_size = dir_freelist_length(this, s);
in_use += seg_in_use;
empty += seg_empty;
stale += seg_stale;
free += fl_size;
buckets_in_use += seg_buckets_in_use;
max_chain_length = std::max(max_chain_length, seg_chain_max);
bytes_in_use += seg_bytes_in_use;
printf(" - Segment-%d | Entries: used=%d stale=%d free=%d disk-bytes=%d Buckets: used=%d empty=%d max=%d avg=%.2f dups=%d\n",
s, seg_in_use, seg_stale, fl_size, seg_bytes_in_use, seg_buckets_in_use, seg_empty, seg_chain_max,
seg_buckets_in_use ? static_cast<float>(seg_in_use + seg_stale) / seg_buckets_in_use : 0.0, seg_dups);
}
printf(" - Stripe | Entries: in-use=%d stale=%d free=%d Buckets: empty=%d max=%d avg=%.2f\n", in_use, stale, free, empty,
max_chain_length, buckets_in_use ? static_cast<float>(in_use + stale) / buckets_in_use : 0);
printf(" Chain lengths: ");
for (j = 0; j < SEGMENT_HISTOGRAM_WIDTH; ++j) {
printf(" %d=%d ", j, hist[j]);
}
printf(" %d>=%d\n", SEGMENT_HISTOGRAM_WIDTH, hist[SEGMENT_HISTOGRAM_WIDTH]);
char tt[256];
printf(" Total Size: %" PRIu64 "\n", static_cast<uint64_t>(len));
printf(" Bytes in Use: %" PRIu64 " [%0.2f%%]\n", bytes_in_use, 100.0 * (static_cast<float>(bytes_in_use) / len));
printf(" Objects: %d\n", head);
printf(" Average Size: %" PRIu64 "\n", head ? (bytes_in_use / head) : 0);
printf(" Average Frags: %.2f\n", head ? static_cast<float>(in_use) / head : 0);
printf(" Write Position: %" PRIu64 "\n", header->write_pos - start);
printf(" Wrap Count: %d\n", header->cycle);
printf(" Phase: %s\n", header->phase ? "true" : "false");
ink_ctime_r(&header->create_time, tt);
tt[strlen(tt) - 1] = 0;
printf(" Sync Serial: %u\n", header->sync_serial);
printf(" Write Serial: %u\n", header->write_serial);
printf(" Create Time: %s\n", tt);
printf("\n");
printf(" Fragment size demographics\n");
for (int b = 0; b < DIR_BLOCK_SIZES; ++b) {
int block_size = DIR_BLOCK_SIZE(b);
int s = 0;
while (s < 1 << DIR_SIZE_WIDTH) {
for (int j = 0; j < 8; ++j, ++s) {
// The size markings are redundant. Low values (less than DIR_SHIFT_WIDTH) for larger
// base block sizes should never be used. Such entries should use the next smaller base block size.
if (b > 0 && s < 1 << DIR_BLOCK_SHIFT(1)) {
ink_assert(frag_demographics[s][b] == 0);
continue;
}
printf(" %8d[%2d:%1d]:%06d", (s + 1) * block_size, s, b, frag_demographics[s][b]);
}
printf("\n");
}
}
printf("\n");
return 0;
}
void
Stripe::_clear_init()
{
size_t dir_len = this->dirlen();
memset(this->raw_dir, 0, dir_len);
this->_init_dir();
this->header->magic = STRIPE_MAGIC;
this->header->version._major = CACHE_DB_MAJOR_VERSION;
this->header->version._minor = CACHE_DB_MINOR_VERSION;
this->scan_pos = this->header->agg_pos = this->header->write_pos = this->start;
this->header->last_write_pos = this->header->write_pos;
this->header->phase = 0;
this->header->cycle = 0;
this->header->create_time = time(nullptr);
this->header->dirty = 0;
this->sector_size = this->header->sector_size = this->disk->hw_sector_size;
*this->footer = *this->header;
}
void
Stripe::_init_dir()
{
int b, s, l;
for (s = 0; s < this->segments; s++) {
this->header->freelist[s] = 0;
Dir *seg = this->dir_segment(s);
for (l = 1; l < DIR_DEPTH; l++) {
for (b = 0; b < this->buckets; b++) {
Dir *bucket = dir_bucket(b, seg);
dir_free_entry(dir_bucket_row(bucket, l), s, this);
}
}
}
}
void
Stripe::_init_data_internal()
{
// step1: calculate the number of entries.
off_t total_entries = (this->len - (this->start - this->skip)) / cache_config_min_average_object_size;
// step2: calculate the number of buckets
off_t total_buckets = total_entries / DIR_DEPTH;
// step3: calculate the number of segments, no segment has more than 16384 buckets
this->segments = (total_buckets + (((1 << 16) - 1) / DIR_DEPTH)) / ((1 << 16) / DIR_DEPTH);
// step4: divide total_buckets into segments on average.
this->buckets = (total_buckets + this->segments - 1) / this->segments;
// step5: set the start pointer.
this->start = this->skip + 2 * this->dirlen();
}
void
Stripe::_init_data()
{
// iteratively calculate start + buckets
this->_init_data_internal();
this->_init_data_internal();
this->_init_data_internal();
}
bool
Stripe::add_writer(CacheVC *vc)
{
ink_assert(vc);
this->_write_buffer.add_bytes_pending_aggregation(vc->agg_len);
bool agg_error =
(vc->agg_len > AGG_SIZE || vc->header_len + sizeof(Doc) > MAX_FRAG_SIZE ||
(!vc->f.readers && (this->_write_buffer.get_bytes_pending_aggregation() > cache_config_agg_write_backlog + AGG_SIZE) &&
vc->write_len));
#ifdef CACHE_AGG_FAIL_RATE
agg_error = agg_error || ((uint32_t)vc->mutex->thread_holding->generator.random() < (uint32_t)(UINT_MAX * CACHE_AGG_FAIL_RATE));
#endif
if (agg_error) {
this->_write_buffer.add_bytes_pending_aggregation(-vc->agg_len);
} else {
ink_assert(vc->agg_len <= AGG_SIZE);
if (vc->f.evac_vector) {
this->get_pending_writers().push(vc);
} else {
this->get_pending_writers().enqueue(vc);
}
}
return !agg_error;
}
void
Stripe::shutdown(EThread *shutdown_thread)
{
// the process is going down, do a blocking call
// dont release the volume's lock, there could
// be another aggWrite in progress
MUTEX_TAKE_LOCK(this->mutex, shutdown_thread);
if (DISK_BAD(this->disk)) {
Dbg(dbg_ctl_cache_dir_sync, "Dir %s: ignoring -- bad disk", this->hash_text.get());
return;
}
size_t dirlen = this->dirlen();
ink_assert(dirlen > 0); // make clang happy - if not > 0 the vol is seriously messed up
if (!this->header->dirty && !this->dir_sync_in_progress) {
Dbg(dbg_ctl_cache_dir_sync, "Dir %s: ignoring -- not dirty", this->hash_text.get());
return;
}
// recompute hit_evacuate_window
this->hit_evacuate_window = (this->data_blocks * cache_config_hit_evacuate_percent) / 100;
// check if we have data in the agg buffer
// dont worry about the cachevc s in the agg queue
// directories have not been inserted for these writes
if (!this->_write_buffer.is_empty()) {
Dbg(dbg_ctl_cache_dir_sync, "Dir %s: flushing agg buffer first", this->hash_text.get());
this->flush_aggregate_write_buffer();
}
// We already asserted that dirlen > 0.
if (!this->dir_sync_in_progress) {
this->header->sync_serial++;
} else {
Dbg(dbg_ctl_cache_dir_sync, "Periodic dir sync in progress -- overwriting");
}
this->footer->sync_serial = this->header->sync_serial;
CHECK_DIR(d);
size_t B = this->header->sync_serial & 1;
off_t start = this->skip + (B ? dirlen : 0);
B = pwrite(this->fd, this->raw_dir, dirlen, start);
ink_assert(B == dirlen);
Dbg(dbg_ctl_cache_dir_sync, "done syncing dir for vol %s", this->hash_text.get());
}
bool
Stripe::flush_aggregate_write_buffer()
{
// set write limit
this->header->agg_pos = this->header->write_pos + this->_write_buffer.get_buffer_pos();
if (!this->_write_buffer.flush(this->fd, this->header->write_pos)) {
return false;
}
this->header->last_write_pos = this->header->write_pos;
this->header->write_pos += this->_write_buffer.get_buffer_pos();
ink_assert(this->header->write_pos == this->header->agg_pos);
this->_write_buffer.reset_buffer_pos();
this->header->write_serial++;
return true;
}
bool
Stripe::copy_from_aggregate_write_buffer(char *dest, Dir const &dir, size_t nbytes) const
{
if (!dir_agg_buf_valid(this, &dir)) {
return false;
}
int agg_offset = this->vol_offset(&dir) - this->header->write_pos;
this->_write_buffer.copy_from(dest, agg_offset, nbytes);
return true;
}