blob: 3bc692dead9206233e5563a956f20b30df7dd9b5 [file] [log] [blame]
/** @file
Main program file for Cache Tool.
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "CacheDefs.h"
#include <iostream>
#include <fcntl.h>
using namespace std;
using namespace ts;
using ts::Errata;
namespace ts
{
std::ostream &
operator<<(std::ostream &s, Bytes const &n)
{
return s << n.count() << " bytes";
}
std::ostream &
operator<<(std::ostream &s, Kilobytes const &n)
{
return s << n.count() << " KB";
}
std::ostream &
operator<<(std::ostream &s, Megabytes const &n)
{
return s << n.count() << " MB";
}
std::ostream &
operator<<(std::ostream &s, Gigabytes const &n)
{
return s << n.count() << " GB";
}
std::ostream &
operator<<(std::ostream &s, Terabytes const &n)
{
return s << n.count() << " TB";
}
std::ostream &
operator<<(std::ostream &s, CacheStripeBlocks const &n)
{
return s << n.count() << " stripe blocks";
}
std::ostream &
operator<<(std::ostream &s, CacheStoreBlocks const &n)
{
return s << n.count() << " store blocks";
}
std::ostream &
operator<<(std::ostream &s, CacheDataBlocks const &n)
{
return s << n.count() << " data blocks";
}
Errata
URLparser::parseURL(TextView URI)
{
Errata zret;
static const TextView HTTP("http");
static const TextView HTTPS("https");
TextView scheme = URI.take_prefix_at(':');
if ((strcasecmp(scheme, HTTP) == 0) || (strcasecmp(scheme, HTTPS) == 0)) {
TextView hostname = URI.take_prefix_at(':');
if (!hostname) // i.e. port not present
{
}
}
return zret;
}
int
URLparser::getPort(std::string &fullURL, int &port_ptr, int &port_len)
{
url_matcher matcher;
int n_port = -1;
int u_pos = -1;
if (fullURL.find("https") == 0) {
u_pos = 8;
n_port = 443;
} else if (fullURL.find("http") == 0) {
u_pos = 7;
n_port = 80;
}
if (u_pos != -1) {
fullURL.insert(u_pos, ":@");
TextView url(fullURL.data(), static_cast<int>(fullURL.size()));
url += 9;
TextView hostPort = url.take_prefix_at(':');
if (!hostPort.empty()) // i.e. port is present
{
TextView port = url.take_prefix_at('/');
if (port.empty()) { // i.e. backslash is not present, then the rest of the url must be just port
port = url;
}
if (matcher.portmatch(port.data(), port.size())) {
TextView text;
n_port = svtoi(port, &text);
if (text == port) {
port_ptr = fullURL.find(':', 9);
port_len = port.size();
return n_port;
}
}
}
return n_port;
} else {
std::cout << "No scheme provided for: " << fullURL << std::endl;
return -1;
}
}
uint32_t
Doc::prefix_len()
{
return sizeof(Doc) + hlen;
}
uint32_t
Doc::data_len()
{
return len - sizeof(Doc) - hlen;
}
int
Doc::single_fragment()
{
return data_len() == total_len;
}
char *
Doc::hdr()
{
return reinterpret_cast<char *>(this) + sizeof(Doc);
}
char *
Doc::data()
{
return this->hdr() + hlen;
}
} // end namespace ts
int cache_config_min_average_object_size = ESTIMATED_OBJECT_SIZE;
CacheStoreBlocks Vol_hash_alloc_size(1024);
// Default this to read only, only enable write if specifically required.
int OPEN_RW_FLAG = O_RDONLY;
namespace ct
{
bool
Stripe::validate_sync_serial()
{
// check if A sync_serials match and A is at least as updated as B
return (_meta[0][0].sync_serial == _meta[0][1].sync_serial &&
(_meta[0][0].sync_serial >= _meta[1][0].sync_serial ||
_meta[1][0].sync_serial != _meta[1][1].sync_serial)) || // OR check if B's sync_serials match
(_meta[1][0].sync_serial == _meta[1][1].sync_serial);
}
Errata
Stripe::clear()
{
Errata zret;
alignas(512) static char zero[CacheStoreBlocks::SCALE]; // should be all zero, it's static.
for (auto i : {A, B}) {
for (auto j : {HEAD, FOOT}) {
ssize_t n = pwrite(_span->_fd, zero, CacheStoreBlocks::SCALE, this->_meta_pos[i][j]);
if (n < CacheStoreBlocks::SCALE) {
std::cout << "Failed to clear stripe header" << std::endl;
}
}
}
return zret;
}
Stripe::Chunk::~Chunk()
{
this->clear();
}
void
Stripe::Chunk::append(MemSpan<void> m)
{
_chain.push_back(m);
}
void
Stripe::Chunk::clear()
{
for (auto &m : _chain) {
free(const_cast<void *>(m.data()));
}
_chain.clear();
}
Stripe::Stripe(Span *span, const Bytes &start, const CacheStoreBlocks &len) : _span(span), _start(start), _len(len)
{
ts::bwprint(hashText, "{} {}:{}", span->_path.view(), _start.count(), _len.count());
CryptoContext().hash_immediate(hash_id, hashText.data(), static_cast<int>(hashText.size()));
printf("hash id of stripe is hash of %.*s\n", static_cast<int>(hashText.size()), hashText.data());
}
bool
Stripe::isFree() const
{
return 0 == _vol_idx;
}
// TODO: Implement the whole logic
Errata
Stripe::InitializeMeta()
{
Errata zret;
// memset(this->raw_dir, 0, dir_len);
for (auto &i : _meta) {
for (auto &j : i) {
j.magic = StripeMeta::MAGIC;
j.version._major = ts::CACHE_DB_MAJOR_VERSION;
j.version._minor = ts::CACHE_DB_MINOR_VERSION;
j.agg_pos = j.last_write_pos = j.write_pos = this->_content;
j.phase = j.cycle = j.sync_serial = j.write_serial = j.dirty = 0;
j.create_time = time(nullptr);
j.sector_size = DEFAULT_HW_SECTOR_SIZE;
}
}
if (!freelist) // freelist is not allocated yet
{
freelist = static_cast<uint16_t *>(malloc(_segments * sizeof(uint16_t))); // segments has already been calculated
}
if (!dir) // for new spans, this will likely be nullptr as we don't need to read the stripe meta from disk
{
char *raw_dir = static_cast<char *>(ats_memalign(ats_pagesize(), this->vol_dirlen()));
dir = reinterpret_cast<CacheDirEntry *>(raw_dir + this->vol_headerlen());
}
init_dir();
return zret;
}
// Need to be bit more robust at some point.
bool
Stripe::validateMeta(StripeMeta const *meta)
{
// Need to be bit more robust at some point.
return StripeMeta::MAGIC == meta->magic && meta->version._major <= ts::CACHE_DB_MAJOR_VERSION &&
meta->version._minor <= 2 // This may have always been zero, actually.
;
}
bool
Stripe::probeMeta(MemSpan<void> &mem, StripeMeta const *base_meta)
{
while (mem.size() >= sizeof(StripeMeta)) {
StripeMeta const *meta = static_cast<StripeMeta *>(mem.data());
if (this->validateMeta(meta) && (base_meta == nullptr || // no base version to check against.
(meta->version == base_meta->version) // need more checks here I think.
)) {
return true;
}
// The meta data is stored aligned on a stripe block boundary, so only need to check there.
mem.remove_prefix(CacheStoreBlocks::SCALE);
}
return false;
}
Errata
Stripe::updateHeaderFooter()
{
Errata zret;
this->vol_init_data();
int64_t hdr_size = this->vol_headerlen();
int64_t dir_size = this->vol_dirlen();
Bytes footer_offset = Bytes(dir_size - ROUND_TO_STORE_BLOCK(sizeof(StripeMeta)));
_meta_pos[A][HEAD] = round_down(_start);
_meta_pos[A][FOOT] = round_down(_start + footer_offset);
_meta_pos[B][HEAD] = round_down(this->_start + Bytes(dir_size));
_meta_pos[B][FOOT] = round_down(this->_start + Bytes(dir_size) + footer_offset);
std::cout << "updating header " << _meta_pos[0][0] << std::endl;
std::cout << "updating header " << _meta_pos[0][1] << std::endl;
std::cout << "updating header " << _meta_pos[1][0] << std::endl;
std::cout << "updating header " << _meta_pos[1][1] << std::endl;
InitializeMeta();
if (!OPEN_RW_FLAG) {
zret.push(0, 1, "Writing Not Enabled.. Please use --write to enable writing to disk");
return zret;
}
char *meta_t = static_cast<char *>(ats_memalign(ats_pagesize(), dir_size));
// copy headers
for (auto i : {A, B}) {
// copy header
memcpy(meta_t, &_meta[i][HEAD], sizeof(StripeMeta));
// copy freelist
memcpy(meta_t + sizeof(StripeMeta) - sizeof(uint16_t), this->freelist, this->_segments * sizeof(uint16_t));
ssize_t n = pwrite(_span->_fd, meta_t, hdr_size, _meta_pos[i][HEAD]);
if (n < hdr_size) {
std::cout << "problem writing header to disk: " << strerror(errno) << ":"
<< " " << n << "<" << hdr_size << std::endl;
zret = Errata::Message(0, errno, "Failed to write stripe header ");
ats_free(meta_t);
return zret;
}
// copy dir entries
dir_size = dir_size - hdr_size - ROUND_TO_STORE_BLOCK(sizeof(StripeMeta));
memcpy(meta_t, (char *)dir, dir_size);
n = pwrite(_span->_fd, meta_t, dir_size, _meta_pos[i][HEAD] + hdr_size); //
if (n < dir_size) {
std::cout << "problem writing dir to disk: " << strerror(errno) << ":"
<< " " << n << "<" << dir_size << std::endl;
zret = Errata::Message(0, errno, "Failed to write stripe header ");
ats_free(meta_t);
return zret;
}
// copy footer,
memcpy(meta_t, &_meta[i][FOOT], sizeof(StripeMeta));
int64_t footer_size = ROUND_TO_STORE_BLOCK(sizeof(StripeMeta));
n = pwrite(_span->_fd, meta_t, footer_size, _meta_pos[i][FOOT]);
if (n < footer_size) {
std::cout << "problem writing footer to disk: " << strerror(errno) << ":"
<< " " << n << "<" << footer_size << std::endl;
zret = Errata::Message(0, errno, "Failed to write stripe header ");
ats_free(meta_t);
return zret;
}
}
ats_free(meta_t);
return zret;
}
size_t
Stripe::vol_dirlen()
{
return vol_headerlen() + ROUND_TO_STORE_BLOCK(((size_t)this->_buckets) * DIR_DEPTH * this->_segments * SIZEOF_DIR) +
ROUND_TO_STORE_BLOCK(sizeof(StripeMeta));
}
void
Stripe::vol_init_data_internal()
{
this->_buckets =
((this->_len.count() * 8192 - (this->_content - this->_start)) / cache_config_min_average_object_size) / DIR_DEPTH;
this->_segments = (this->_buckets + (((1 << 16) - 1) / DIR_DEPTH)) / ((1 << 16) / DIR_DEPTH);
this->_buckets = (this->_buckets + this->_segments - 1) / this->_segments;
this->_content = this->_start + Bytes(2 * vol_dirlen());
}
void
Stripe::vol_init_data()
{
// iteratively calculate start + buckets
this->vol_init_data_internal();
this->vol_init_data_internal();
this->vol_init_data_internal();
}
void
Stripe::updateLiveData(enum Copy c)
{
// CacheStoreBlocks delta{_meta_pos[c][FOOT] - _meta_pos[c][HEAD]};
CacheStoreBlocks header_len(0);
// int64_t n_buckets;
// int64_t n_segments;
/*
* COMMENTING THIS SECTION FOR NOW TO USE THE EXACT LOGIN USED IN ATS TO CALCULATE THE NUMBER OF SEGMENTS AND BUCKETS
// Past the header is the segment free list heads which if sufficiently long (> ~4K) can take
// more than 1 store block. Start with a guess of 1 and adjust upwards as needed. A 2TB stripe
// with an AOS of 8000 has roughly 3700 segments meaning that for even 10TB drives this loop
// should only be a few iterations.
do {
++header_len;
n_buckets = Bytes(delta - header_len) / (sizeof(CacheDirEntry) * ts::ENTRIES_PER_BUCKET);
n_segments = n_buckets / ts::MAX_BUCKETS_PER_SEGMENT;
// This should never be more than one loop, usually none.
while ((n_buckets / n_segments) > ts::MAX_BUCKETS_PER_SEGMENT)
++n_segments;
} while ((sizeof(StripeMeta) + sizeof(uint16_t) * n_segments) > static_cast<size_t>(header_len));
_buckets = n_buckets / n_segments;
_segments = n_segments;
*/
_directory._skip = header_len;
}
bool
dir_compare_tag(const CacheDirEntry *e, const CryptoHash *key)
{
return (dir_tag(e) == DIR_MASK_TAG(key->slice32(2)));
}
int
vol_in_phase_valid(Stripe *d, CacheDirEntry *e)
{
return (dir_offset(e) - 1 < ((d->_meta[0][0].write_pos + d->agg_buf_pos - d->_start) / CACHE_BLOCK_SIZE));
}
int
vol_out_of_phase_valid(Stripe *d, CacheDirEntry *e)
{
return (dir_offset(e) - 1 >= ((d->_meta[0][0].agg_pos - d->_start) / CACHE_BLOCK_SIZE));
}
bool
Stripe::dir_valid(CacheDirEntry *_e)
{
return (this->_meta[0][0].phase == dir_phase(_e) ? vol_in_phase_valid(this, _e) : vol_out_of_phase_valid(this, _e));
}
Bytes
Stripe::stripe_offset(CacheDirEntry *e)
{
return this->_content + Bytes((dir_offset(e) * CACHE_BLOCK_SIZE) - CACHE_BLOCK_SIZE);
}
int
Stripe::dir_probe(CryptoHash *key, CacheDirEntry *result, CacheDirEntry **last_collision)
{
int segment = key->slice32(0) % this->_segments;
int bucket = key->slice32(1) % this->_buckets;
CacheDirEntry *seg = this->dir_segment(segment);
CacheDirEntry *e = nullptr;
e = dir_bucket(bucket, seg);
char *stripe_buff2 = nullptr;
Doc *doc = nullptr;
// TODO: collision craft is pending.. look at the main ATS code. Assuming no collision for now
if (dir_offset(e)) {
do {
if (dir_compare_tag(e, key)) {
if (dir_valid(e)) {
stripe_buff2 = static_cast<char *>(ats_memalign(ats_pagesize(), dir_approx_size(e)));
std::cout << "dir_probe hit: found seg: " << segment << " bucket: " << bucket << " offset: " << dir_offset(e)
<< "size: " << dir_approx_size(e) << std::endl;
break;
} else {
// let's skip deleting for now
// e = dir_delete_entry(e, p ,segment);
// continue;
}
}
e = next_dir(e, seg);
} while (e);
if (e == nullptr) {
std::cout << "No directory entry found matching the URL key" << std::endl;
return 0;
}
int fd = _span->_fd;
Bytes offset = stripe_offset(e);
int64_t size = dir_approx_size(e);
ssize_t n = pread(fd, stripe_buff2, size, offset);
if (n < size) {
std::cout << "Failed to read content from the Stripe:" << strerror(errno) << std::endl;
}
doc = reinterpret_cast<Doc *>(stripe_buff2);
std::string hdr(doc->hdr(), doc->hlen);
std::string data_(doc->data(), doc->data_len());
std::cout << "DATA\n" << data_ << std::endl;
} else {
std::cout << "Not found in the Cache" << std::endl;
}
free(stripe_buff2);
return 0; // Why does this have a non-void return?
}
CacheDirEntry *
Stripe::dir_delete_entry(CacheDirEntry *e, CacheDirEntry *p, int s)
{
CacheDirEntry *seg = this->dir_segment(s);
int no = dir_next(e);
this->_meta[0][0].dirty = 1;
if (p) {
unsigned int fo = this->freelist[s];
unsigned int eo = dir_to_offset(e, seg);
dir_clear(e);
dir_set_next(p, no);
dir_set_next(e, fo);
if (fo) {
dir_set_prev(dir_from_offset(fo, seg), eo);
}
this->freelist[s] = eo;
} else {
CacheDirEntry *n = next_dir(e, seg);
if (n) {
dir_assign(e, n);
dir_delete_entry(n, e, s);
return e;
} else {
dir_clear(e);
return nullptr;
}
}
return dir_from_offset(no, seg);
}
void
Stripe::walk_all_buckets()
{
for (int s = 0; s < this->_segments; s++) {
if (walk_bucket_chain(s)) {
std::cout << "Loop present in Segment " << s << std::endl;
}
}
}
bool
Stripe::walk_bucket_chain(int s)
{
CacheDirEntry *seg = this->dir_segment(s);
std::bitset<65536> b_bitset;
b_bitset.reset();
for (int b = 0; b < this->_buckets; b++) {
CacheDirEntry *p = nullptr;
auto *dir_b = dir_bucket(b, seg);
CacheDirEntry *e = dir_b;
int len = 0;
while (e) {
len++;
int i = dir_to_offset(e, seg);
if (b_bitset.test(i)) {
std::cout << "bit already set in "
<< "seg " << s << " bucket " << b << std::endl;
}
if (i > 0) { // i.e., not the first dir in the segment
b_bitset[i] = true;
}
#if 1
if (!dir_valid(e) || !dir_offset(e)) {
// std::cout<<"dir_clean in segment "<<s<<" =>cleaning "<<e<<" tag"<<dir_tag(e)<<" boffset"<< dir_offset(e)<< " bucket:
// "<<dir_b<< " bucket len: "<<dir_bucket_length(dir_b, s)<<std::endl;
e = dir_delete_entry(e, p, s);
continue;
}
#endif
p = e;
e = next_dir(e, seg);
}
// std::cout<<"dir len in this bucket "<<len<<std::endl;
}
return false;
}
void
Stripe::dir_free_entry(CacheDirEntry *e, int s)
{
CacheDirEntry *seg = this->dir_segment(s);
unsigned int fo = this->freelist[s];
unsigned int eo = dir_to_offset(e, seg);
dir_set_next(e, fo);
if (fo) {
dir_set_prev(dir_from_offset(fo, seg), eo);
}
this->freelist[s] = eo;
}
// adds all the directory entries
// in a segment to the segment freelist
void
Stripe::dir_init_segment(int s)
{
this->freelist[s] = 0;
CacheDirEntry *seg = this->dir_segment(s);
int l, b;
memset(seg, 0, SIZEOF_DIR * DIR_DEPTH * this->_buckets);
for (l = 1; l < DIR_DEPTH; l++) {
for (b = 0; b < this->_buckets; b++) {
CacheDirEntry *bucket = dir_bucket(b, seg);
this->dir_free_entry(dir_bucket_row(bucket, l), s);
}
}
}
void
Stripe::init_dir()
{
for (int s = 0; s < this->_segments; s++) {
this->freelist[s] = 0;
CacheDirEntry *seg = this->dir_segment(s);
int l, b;
for (l = 1; l < DIR_DEPTH; l++) {
for (b = 0; b < this->_buckets; b++) {
CacheDirEntry *bucket = dir_bucket(b, seg);
this->dir_free_entry(dir_bucket_row(bucket, l), s);
// std::cout<<"freelist"<<this->freelist[s]<<std::endl;
}
}
}
}
Errata
Stripe::loadDir()
{
Errata zret;
int64_t dirlen = this->vol_dirlen();
char *raw_dir = static_cast<char *>(ats_memalign(ats_pagesize(), dirlen));
dir = reinterpret_cast<CacheDirEntry *>(raw_dir + this->vol_headerlen());
// read directory
ssize_t n = pread(this->_span->_fd, raw_dir, dirlen, this->_start);
if (n < dirlen) {
std::cout << "Failed to read Dir from stripe @" << this->hashText;
}
return zret;
}
//
// Cache Directory
//
#if 0
// return value 1 means no loop
// zero indicates loop
int
dir_bucket_loop_check(CacheDirEntry *start_dir, CacheDirEntry *seg)
{
if (start_dir == nullptr) {
return 1;
}
CacheDirEntry *p1 = start_dir;
CacheDirEntry *p2 = start_dir;
while (p2) {
// p1 moves by one entry per iteration
assert(p1);
p1 = next_dir(p1, seg);
// p2 moves by two entries per iteration
p2 = next_dir(p2, seg);
if (p2) {
p2 = next_dir(p2, seg);
} else {
return 1;
}
if (p2 == p1) {
return 0; // we have a loop
}
}
return 1;
}
#endif
int
Stripe::dir_freelist_length(int s)
{
int free = 0;
CacheDirEntry *seg = this->dir_segment(s);
CacheDirEntry *e = dir_from_offset(this->freelist[s], seg);
if (this->check_loop(s)) {
return (DIR_DEPTH - 1) * this->_buckets;
}
while (e) {
free++;
e = next_dir(e, seg);
}
return free;
}
int
Stripe::check_loop(int s)
{
// look for loop in the segment
// rewrite the freelist if loop is present
CacheDirEntry *seg = this->dir_segment(s);
CacheDirEntry *e = dir_from_offset(this->freelist[s], seg);
std::bitset<65536> f_bitset;
f_bitset.reset();
while (e) {
int i = dir_next(e);
if (f_bitset.test(i)) {
// bit was set in a previous round so a loop is present
std::cout << "<check_loop> Loop present in Span" << this->_span->_path.string() << " Stripe: " << this->hashText
<< "Segment: " << s << std::endl;
this->dir_init_segment(s);
return 1;
}
f_bitset[i] = true;
e = dir_from_offset(i, seg);
}
return 0;
}
int
compare_ushort(void const *a, void const *b)
{
return *static_cast<unsigned short const *>(a) - *static_cast<unsigned short const *>(b);
}
void
Stripe::dir_check()
{
static int const SEGMENT_HISTOGRAM_WIDTH = 16;
int hist[SEGMENT_HISTOGRAM_WIDTH + 1] = {0};
unsigned short chain_tag[MAX_ENTRIES_PER_SEGMENT];
int32_t chain_mark[MAX_ENTRIES_PER_SEGMENT];
this->loadMeta();
this->loadDir();
// uint64_t total_buckets = _segments * _buckets;
// uint64_t total_entries = total_buckets * DIR_DEPTH;
int frag_demographics[1 << DIR_SIZE_WIDTH][DIR_BLOCK_SIZES];
int j;
int stale = 0, in_use = 0, empty = 0;
int free = 0, head = 0, buckets_in_use = 0;
int max_chain_length = 0;
int64_t bytes_in_use = 0;
std::cout << "Stripe '[" << hashText << "]'" << std::endl;
std::cout << " Directory Bytes: " << _segments * _buckets * SIZEOF_DIR << std::endl;
std::cout << " Segments: " << _segments << std::endl;
std::cout << " Buckets per segment: " << _buckets << std::endl;
std::cout << " Entries: " << _segments * _buckets * DIR_DEPTH << std::endl;
for (int s = 0; s < _segments; s++) {
CacheDirEntry *seg = this->dir_segment(s);
int seg_chain_max = 0;
int seg_empty = 0;
int seg_in_use = 0;
int seg_stale = 0;
int seg_bytes_in_use = 0;
int seg_dups = 0;
int seg_buckets_in_use = 0;
ink_zero(chain_tag);
memset(chain_mark, -1, sizeof(chain_mark));
for (int b = 0; b < _buckets; b++) {
CacheDirEntry *root = dir_bucket(b, seg);
int h = 0;
int chain_idx = 0;
int mark = 0;
++seg_buckets_in_use;
// walking through the directories
for (CacheDirEntry *e = root; e; e = next_dir(e, seg)) {
if (!dir_offset(e)) {
++seg_empty;
--seg_buckets_in_use;
// this should only happen on the first dir in a bucket
assert(nullptr == next_dir(e, seg));
break;
} else {
int e_idx = e - seg;
++h;
chain_tag[chain_idx++] = dir_tag(e);
if (chain_mark[e_idx] == mark) {
printf(" - Cycle of length %d detected for bucket %d\n", h, b);
} else if (chain_mark[e_idx] >= 0) {
printf(" - Entry %d is in chain %d and %d", e_idx, chain_mark[e_idx], mark);
} else {
chain_mark[e_idx] = mark;
}
if (!dir_valid(e)) {
++seg_stale;
} else {
uint64_t size = dir_approx_size(e);
if (dir_head(e)) {
++head;
}
++seg_in_use;
seg_bytes_in_use += size;
++frag_demographics[dir_size(e)][dir_big(e)];
}
}
e = next_dir(e, seg);
if (!e) {
break;
}
}
// Check for duplicates (identical tags in the same bucket).
if (h > 1) {
unsigned short last;
qsort(chain_tag, h, sizeof(chain_tag[0]), &compare_ushort);
last = chain_tag[0];
for (int k = 1; k < h; ++k) {
if (last == chain_tag[k]) {
++seg_dups;
}
last = chain_tag[k];
}
}
++hist[std::min(h, SEGMENT_HISTOGRAM_WIDTH)];
seg_chain_max = std::max(seg_chain_max, h);
}
int fl_size = dir_freelist_length(s);
in_use += seg_in_use;
empty += seg_empty;
stale += seg_stale;
free += fl_size;
buckets_in_use += seg_buckets_in_use;
max_chain_length = std::max(max_chain_length, seg_chain_max);
bytes_in_use += seg_bytes_in_use;
printf(" - Segment-%d | Entries: used=%d stale=%d free=%d disk-bytes=%d Buckets: used=%d empty=%d max=%d avg=%.2f dups=%d\n",
s, seg_in_use, seg_stale, fl_size, seg_bytes_in_use, seg_buckets_in_use, seg_empty, seg_chain_max,
seg_buckets_in_use ? static_cast<float>(seg_in_use + seg_stale) / seg_buckets_in_use : 0.0, seg_dups);
}
//////////////////
printf(" - Stripe | Entries: in-use=%d stale=%d free=%d Buckets: empty=%d max=%d avg=%.2f\n", in_use, stale, free, empty,
max_chain_length, buckets_in_use ? static_cast<float>(in_use + stale) / buckets_in_use : 0);
printf(" Chain lengths: ");
for (j = 0; j < SEGMENT_HISTOGRAM_WIDTH; ++j) {
printf(" %d=%d ", j, hist[j]);
}
printf(" %d>=%d\n", SEGMENT_HISTOGRAM_WIDTH, hist[SEGMENT_HISTOGRAM_WIDTH]);
char tt[256];
printf(" Total Size: %" PRIu64 "\n", static_cast<uint64_t>(_len.count()));
printf(" Bytes in Use: %" PRIu64 " [%0.2f%%]\n", bytes_in_use, 100.0 * (static_cast<float>(bytes_in_use) / _len.count()));
printf(" Objects: %d\n", head);
printf(" Average Size: %" PRIu64 "\n", head ? (bytes_in_use / head) : 0);
printf(" Average Frags: %.2f\n", head ? static_cast<float>(in_use) / head : 0);
printf(" Write Position: %" PRIu64 "\n", _meta[0][0].write_pos - _content.count());
printf(" Wrap Count: %d\n", _meta[0][0].cycle);
printf(" Phase: %s\n", _meta[0][0].phase ? "true" : "false");
ctime_r(&_meta[0][0].create_time, tt);
tt[strlen(tt) - 1] = 0;
printf(" Sync Serial: %u\n", _meta[0][0].sync_serial);
printf(" Write Serial: %u\n", _meta[0][0].write_serial);
printf(" Create Time: %s\n", tt);
printf("\n");
printf(" Fragment size demographics\n");
for (int b = 0; b < DIR_BLOCK_SIZES; ++b) {
int block_size = DIR_BLOCK_SIZE(b);
int s = 0;
while (s < 1 << DIR_SIZE_WIDTH) {
for (int j = 0; j < 8; ++j, ++s) {
// The size markings are redundant. Low values (less than DIR_SHIFT_WIDTH) for larger
// base block sizes should never be used. Such entries should use the next smaller base block size.
if (b > 0 && s < 1 << DIR_BLOCK_SHIFT(1)) {
assert(frag_demographics[s][b] == 0);
continue;
}
printf(" %8d[%2d:%1d]:%06d", (s + 1) * block_size, s, b, frag_demographics[s][b]);
}
printf("\n");
}
}
printf("\n");
////////////////
}
Errata
Stripe::loadMeta()
{
// Read from disk in chunks of this size. This needs to be a multiple of both the
// store block size and the directory entry size so neither goes across read boundaries.
// Beyond that the value should be in the ~10MB range for what I guess is best performance
// vs. blocking production disk I/O on a live system.
constexpr static int64_t N = (1 << 8) * CacheStoreBlocks::SCALE * sizeof(CacheDirEntry);
Errata zret;
int fd = _span->_fd;
Bytes n;
bool found;
MemSpan<void> data; // The current view of the read buffer.
Bytes delta;
Bytes pos = _start;
// Avoid searching the entire span, because some of it must be content. Assume that AOS is more than 160
// which means at most 10/160 (1/16) of the span can be directory/header.
Bytes limit = pos + _len / 16;
size_t io_align = _span->_geometry.blocksz;
StripeMeta const *meta;
std::unique_ptr<char> bulk_buff; // Buffer for bulk reads.
static const size_t SBSIZE = CacheStoreBlocks::SCALE; // save some typing.
alignas(SBSIZE) char stripe_buff[SBSIZE]; // Use when reading a single stripe block.
alignas(SBSIZE) char stripe_buff2[SBSIZE]; // use to save the stripe freelist
if (io_align > SBSIZE) {
return Errata::Message(0, 1, "Cannot load stripe ", _idx, " on span ", _span->_path.string(),
" because the I/O block alignment ", io_align, " is larger than the buffer alignment ", SBSIZE);
}
_directory._start = pos;
// Header A must be at the start of the stripe block.
// Todo: really need to check pread() for failure.
ssize_t headerbyteCount = pread(fd, stripe_buff2, SBSIZE, pos);
n.assign(headerbyteCount);
data.assign(stripe_buff2, n);
meta = static_cast<StripeMeta *>(data.data());
// TODO:: We need to read more data at this point to populate dir
if (this->validateMeta(meta)) {
delta = Bytes(data.rebind<char>().data() - stripe_buff2);
_meta[A][HEAD] = *meta;
_meta_pos[A][HEAD] = round_down(pos + Bytes(delta));
pos += round_up(SBSIZE);
_directory._skip = Bytes(SBSIZE); // first guess, updated in @c updateLiveData when the header length is computed.
// Search for Footer A. Nothing for it except to grub through the disk.
// The searched data is cached so it's available for directory parsing later if needed.
while (pos < limit) {
char *buff = static_cast<char *>(ats_memalign(io_align, N));
bulk_buff.reset(buff);
n.assign(pread(fd, buff, N, pos));
data.assign(buff, n);
found = this->probeMeta(data, &_meta[A][HEAD]);
if (found) {
ptrdiff_t diff = data.rebind<char>().data() - buff;
_meta[A][FOOT] = static_cast<StripeMeta *>(data.data())[0];
_meta_pos[A][FOOT] = round_down(pos + Bytes(diff));
// don't bother attaching block if the footer is at the start
if (diff > 0) {
_directory._clip = Bytes(N - diff);
_directory.append({bulk_buff.release(), N});
}
data.remove_prefix(SBSIZE); // skip footer for checking on B copy.
break;
} else {
_directory.append({bulk_buff.release(), N});
pos += round_up(N);
}
}
} else {
zret.push(0, 1, "Header A not found");
}
pos = _meta_pos[A][FOOT];
// Technically if Copy A is valid, Copy B is not needed. But at this point it's cheap to retrieve
// (as the exact offset is computable).
if (_meta_pos[A][FOOT] > 0) {
delta = _meta_pos[A][FOOT] - _meta_pos[A][HEAD];
// Header B should be immediately after Footer A. If at the end of the last read,
// do another read.
pos = this->_start + Bytes(vol_dirlen());
meta = static_cast<StripeMeta *>(data.data());
if (this->validateMeta(meta)) {
_meta[B][HEAD] = *meta;
_meta_pos[B][HEAD] = round_down(pos);
// Footer B must be at the same relative offset to Header B as Footer A -> Header A.
pos += delta;
n = Bytes(pread(fd, stripe_buff, ts::CacheStoreBlocks::SCALE, pos));
data.assign(stripe_buff, n);
meta = static_cast<StripeMeta *>(data.data());
if (this->validateMeta(meta)) {
_meta[B][FOOT] = *meta;
_meta_pos[B][FOOT] = round_down(pos);
}
}
}
if (_meta_pos[A][FOOT] > 0) {
if (_meta[A][HEAD].sync_serial == _meta[A][FOOT].sync_serial &&
(0 == _meta_pos[B][FOOT] || _meta[B][HEAD].sync_serial != _meta[B][FOOT].sync_serial ||
_meta[A][HEAD].sync_serial >= _meta[B][HEAD].sync_serial)) {
this->updateLiveData(A);
} else if (_meta_pos[B][FOOT] > 0 && _meta[B][HEAD].sync_serial == _meta[B][FOOT].sync_serial) {
this->updateLiveData(B);
} else {
zret.push(0, 1, "Invalid stripe data - candidates found but sync serial data not valid. ", _meta[A][HEAD].sync_serial, ":",
_meta[A][FOOT].sync_serial, ":", _meta[B][HEAD].sync_serial, ":", _meta[B][FOOT].sync_serial);
}
}
n.assign(headerbyteCount);
data.assign(stripe_buff2, n);
meta = static_cast<StripeMeta *>(data.data());
// copy freelist
freelist = static_cast<uint16_t *>(malloc(_segments * sizeof(uint16_t)));
for (int i = 0; i < _segments; i++) {
freelist[i] = meta->freelist[i];
}
if (!zret) {
_directory.clear();
}
return zret;
}
} // namespace ct