| /** @file |
| |
| A brief file description |
| |
| @section license License |
| |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| #include "CacheVC.h" |
| #include "P_CacheDir.h" |
| #include "P_CacheDoc.h" |
| #include "P_CacheInternal.h" |
| #include "PreservationTable.h" |
| #include "Stripe.h" |
| |
| #include "tscore/hugepages.h" |
| #include "tscore/Random.h" |
| #include "ts/ats_probe.h" |
| #include "iocore/eventsystem/Tasks.h" |
| |
| #include <unordered_map> |
| |
| #ifdef LOOP_CHECK_MODE |
| #define DIR_LOOP_THRESHOLD 1000 |
| #endif |
| |
| namespace |
| { |
| |
| DbgCtl dbg_ctl_cache_dir_sync{"dir_sync"}; |
| DbgCtl dbg_ctl_cache_check_dir{"cache_check_dir"}; |
| DbgCtl dbg_ctl_dir_clean{"dir_clean"}; |
| |
| #ifdef DEBUG |
| |
| DbgCtl dbg_ctl_cache_stats{"cache_stats"}; |
| DbgCtl dbg_ctl_dir_probe_hit{"dir_probe_hit"}; |
| DbgCtl dbg_ctl_dir_probe_tag{"dir_probe_tag"}; |
| DbgCtl dbg_ctl_dir_probe_miss{"dir_probe_miss"}; |
| DbgCtl dbg_ctl_dir_insert{"dir_insert"}; |
| DbgCtl dbg_ctl_dir_overwrite{"dir_overwrite"}; |
| DbgCtl dbg_ctl_dir_lookaside{"dir_lookaside"}; |
| |
| #endif |
| |
| } // end anonymous namespace |
| |
| // Globals |
| |
| ClassAllocator<OpenDirEntry, false> openDirEntryAllocator("openDirEntry"); |
| |
| // OpenDir |
| |
| OpenDir::OpenDir() |
| { |
| SET_HANDLER(&OpenDir::signal_readers); |
| } |
| |
| /* |
| If allow_if_writers is false, open_write fails if there are other writers. |
| max_writers sets the maximum number of concurrent writers that are |
| allowed. Only The first writer can set the max_writers. It is ignored |
| for later writers. |
| Returns 1 on success and 0 on failure. |
| */ |
| int |
| OpenDir::open_write(CacheVC *cont, int allow_if_writers, int max_writers) |
| { |
| ink_assert(cont->stripe->mutex->thread_holding == this_ethread()); |
| unsigned int h = cont->first_key.slice32(0); |
| int b = h % OPEN_DIR_BUCKETS; |
| for (OpenDirEntry *d = bucket[b].head; d; d = d->link.next) { |
| if (!(d->writers.head->first_key == cont->first_key)) { |
| continue; |
| } |
| if (allow_if_writers && d->num_writers < d->max_writers) { |
| d->writers.push(cont); |
| d->num_writers++; |
| cont->od = d; |
| cont->write_vector = &d->vector; |
| return 1; |
| } |
| return 0; |
| } |
| OpenDirEntry *od = THREAD_ALLOC(openDirEntryAllocator, cont->mutex->thread_holding); |
| od->readers.head = nullptr; |
| od->writers.push(cont); |
| od->num_writers = 1; |
| od->max_writers = max_writers; |
| od->vector.data.data = &od->vector.data.fast_data[0]; |
| od->dont_update_directory = false; |
| od->move_resident_alt = false; |
| od->reading_vec = false; |
| od->writing_vec = false; |
| dir_clear(&od->first_dir); |
| cont->od = od; |
| cont->write_vector = &od->vector; |
| bucket[b].push(od); |
| return 1; |
| } |
| |
| int |
| OpenDir::signal_readers(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) |
| { |
| Queue<CacheVC, Link_CacheVC_opendir_link> newly_delayed_readers; |
| EThread *t = mutex->thread_holding; |
| CacheVC *c = nullptr; |
| while ((c = delayed_readers.dequeue())) { |
| CACHE_TRY_LOCK(lock, c->mutex, t); |
| if (lock.is_locked()) { |
| c->f.open_read_timeout = 0; |
| c->handleEvent(EVENT_IMMEDIATE, nullptr); |
| continue; |
| } |
| newly_delayed_readers.push(c); |
| } |
| if (newly_delayed_readers.head) { |
| delayed_readers = newly_delayed_readers; |
| EThread *t1 = newly_delayed_readers.head->mutex->thread_holding; |
| if (!t1) { |
| t1 = mutex->thread_holding; |
| } |
| t1->schedule_in(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); |
| } |
| return 0; |
| } |
| |
| int |
| OpenDir::close_write(CacheVC *cont) |
| { |
| ink_assert(cont->stripe->mutex->thread_holding == this_ethread()); |
| cont->od->writers.remove(cont); |
| cont->od->num_writers--; |
| if (!cont->od->writers.head) { |
| unsigned int h = cont->first_key.slice32(0); |
| int b = h % OPEN_DIR_BUCKETS; |
| bucket[b].remove(cont->od); |
| delayed_readers.append(cont->od->readers); |
| signal_readers(0, nullptr); |
| cont->od->vector.clear(); |
| THREAD_FREE(cont->od, openDirEntryAllocator, cont->mutex->thread_holding); |
| } |
| cont->od = nullptr; |
| return 0; |
| } |
| |
| OpenDirEntry * |
| OpenDir::open_read(const CryptoHash *key) const |
| { |
| unsigned int h = key->slice32(0); |
| int b = h % OPEN_DIR_BUCKETS; |
| for (OpenDirEntry *d = bucket[b].head; d; d = d->link.next) { |
| if (d->writers.head->first_key == *key) { |
| return d; |
| } |
| } |
| return nullptr; |
| } |
| |
| // |
| // Cache Directory |
| // |
| |
| // return value 1 means no loop |
| // zero indicates loop |
| int |
| dir_bucket_loop_check(Dir *start_dir, Dir *seg) |
| { |
| if (start_dir == nullptr) { |
| return 1; |
| } |
| |
| Dir *p1 = start_dir; |
| Dir *p2 = start_dir; |
| |
| while (p2) { |
| // p1 moves by one entry per iteration |
| ink_assert(p1); |
| p1 = next_dir(p1, seg); |
| // p2 moves by two entries per iteration |
| p2 = next_dir(p2, seg); |
| if (p2) { |
| p2 = next_dir(p2, seg); |
| } else { |
| return 1; |
| } |
| |
| if (p2 == p1) { |
| return 0; // we have a loop |
| } |
| } |
| return 1; |
| } |
| |
| // adds all the directory entries |
| // in a segment to the segment freelist |
| void |
| dir_init_segment(int s, Directory *directory) |
| { |
| directory->header->freelist[s] = 0; |
| Dir *seg = directory->get_segment(s); |
| int l, b; |
| memset(static_cast<void *>(seg), 0, SIZEOF_DIR * DIR_DEPTH * directory->buckets); |
| for (l = 1; l < DIR_DEPTH; l++) { |
| for (b = 0; b < directory->buckets; b++) { |
| Dir *bucket = dir_bucket(b, seg); |
| directory->free_entry(dir_bucket_row(bucket, l), s); |
| } |
| } |
| } |
| |
| // break the infinite loop in directory entries |
| // Note : abuse of the token bit in dir entries |
| int |
| dir_bucket_loop_fix(Dir *start_dir, int s, Directory *directory) |
| { |
| if (!dir_bucket_loop_check(start_dir, directory->get_segment(s))) { |
| Warning("Dir loop exists, clearing segment %d", s); |
| dir_init_segment(s, directory); |
| return 1; |
| } |
| return 0; |
| } |
| |
| int |
| Directory::freelist_length(int s) |
| { |
| int free = 0; |
| Dir *seg = this->get_segment(s); |
| Dir *e = dir_from_offset(this->header->freelist[s], seg); |
| if (dir_bucket_loop_fix(e, s, this)) { |
| return (DIR_DEPTH - 1) * this->buckets; |
| } |
| while (e) { |
| free++; |
| e = next_dir(e, seg); |
| } |
| return free; |
| } |
| |
| int |
| Directory::bucket_length(Dir *b, int s) |
| { |
| Dir *e = b; |
| int i = 0; |
| Dir *seg = this->get_segment(s); |
| #ifdef LOOP_CHECK_MODE |
| if (dir_bucket_loop_fix(b, s, this)) |
| return 1; |
| #endif |
| while (e) { |
| i++; |
| if (i > 100) { |
| return -1; |
| } |
| e = next_dir(e, seg); |
| } |
| return i; |
| } |
| |
| int |
| Directory::check() |
| { |
| int i, s; |
| Dbg(dbg_ctl_cache_check_dir, "inside check dir"); |
| for (s = 0; s < this->segments; s++) { |
| Dir *seg = this->get_segment(s); |
| for (i = 0; i < this->buckets; i++) { |
| Dir *b = dir_bucket(i, seg); |
| if (!(this->bucket_length(b, s) >= 0)) { |
| return 0; |
| } |
| if (!(!dir_next(b) || dir_offset(b))) { |
| return 0; |
| } |
| if (!(dir_bucket_loop_check(b, seg))) { |
| return 0; |
| } |
| } |
| } |
| return 1; |
| } |
| |
| inline void |
| unlink_from_freelist(Dir *e, int s, Directory *directory) |
| { |
| Dir *seg = directory->get_segment(s); |
| Dir *p = dir_from_offset(dir_prev(e), seg); |
| if (p) { |
| dir_set_next(p, dir_next(e)); |
| } else { |
| directory->header->freelist[s] = dir_next(e); |
| } |
| Dir *n = dir_from_offset(dir_next(e), seg); |
| if (n) { |
| dir_set_prev(n, dir_prev(e)); |
| } |
| } |
| |
| inline Dir * |
| dir_delete_entry(Dir *e, Dir *p, int s, Directory *directory) |
| { |
| Dir *seg = directory->get_segment(s); |
| int no = dir_next(e); |
| directory->header->dirty = 1; |
| if (p) { |
| unsigned int fo = directory->header->freelist[s]; |
| unsigned int eo = dir_to_offset(e, seg); |
| dir_clear(e); |
| dir_set_next(p, no); |
| dir_set_next(e, fo); |
| if (fo) { |
| dir_set_prev(dir_from_offset(fo, seg), eo); |
| } |
| directory->header->freelist[s] = eo; |
| } else { |
| Dir *n = next_dir(e, seg); |
| if (n) { |
| // "Shuffle" here means that we're copying the second entry's data to the head entry's location, and removing the second entry |
| // - because the head entry can't be moved. |
| ATS_PROBE3(cache_dir_shuffle, s, dir_to_offset(e, seg), dir_to_offset(n, seg)); |
| dir_assign(e, n); |
| dir_delete_entry(n, e, s, directory); |
| return e; |
| } else { |
| dir_clear(e); |
| return nullptr; |
| } |
| } |
| return dir_from_offset(no, seg); |
| } |
| |
| inline void |
| dir_clean_bucket(Dir *b, int s, StripeSM *stripe) |
| { |
| Dir *e = b, *p = nullptr; |
| Dir *seg = stripe->directory.get_segment(s); |
| #ifdef LOOP_CHECK_MODE |
| int loop_count = 0; |
| #endif |
| do { |
| #ifdef LOOP_CHECK_MODE |
| loop_count++; |
| if (loop_count > DIR_LOOP_THRESHOLD) { |
| if (dir_bucket_loop_fix(b, s, vol->directory)) |
| return; |
| } |
| #endif |
| if (!stripe->dir_valid(e) || !dir_offset(e)) { |
| if (dbg_ctl_dir_clean.on()) { |
| Dbg(dbg_ctl_dir_clean, "cleaning Stripe:%s: %p tag %X boffset %" PRId64 " b %p p %p bucket len %d", stripe->hash_text.get(), |
| e, dir_tag(e), dir_offset(e), b, p, stripe->directory.bucket_length(b, s)); |
| } |
| if (dir_offset(e)) { |
| ts::Metrics::Gauge::decrement(cache_rsb.direntries_used); |
| ts::Metrics::Gauge::decrement(stripe->cache_vol->vol_rsb.direntries_used); |
| } |
| // Match cache_dir_remove arguments |
| ATS_PROBE7(cache_dir_remove_clean_bucket, stripe->fd, s, dir_to_offset(e, seg), dir_offset(e), dir_approx_size(e), 0, 0); |
| e = dir_delete_entry(e, p, s, &stripe->directory); |
| continue; |
| } |
| p = e; |
| e = next_dir(e, seg); |
| } while (e); |
| } |
| |
| void |
| Directory::clean_segment(int s, StripeSM *stripe) |
| { |
| Dir *seg = this->get_segment(s); |
| for (int64_t i = 0; i < this->buckets; i++) { |
| dir_clean_bucket(dir_bucket(i, seg), s, stripe); |
| ink_assert(!dir_next(dir_bucket(i, seg)) || dir_offset(dir_bucket(i, seg))); |
| } |
| } |
| |
| void |
| Directory::cleanup(StripeSM *stripe) |
| { |
| for (int64_t i = 0; i < this->segments; i++) { |
| this->clean_segment(i, stripe); |
| } |
| CHECK_DIR(d); |
| } |
| |
| void |
| Directory::clear_range(off_t start, off_t end, StripeSM *stripe) |
| { |
| for (off_t i = 0; i < this->entries(); i++) { |
| Dir *e = dir_index(stripe, i); |
| if (dir_offset(e) >= static_cast<int64_t>(start) && dir_offset(e) < static_cast<int64_t>(end)) { |
| ts::Metrics::Gauge::decrement(cache_rsb.direntries_used); |
| ts::Metrics::Gauge::decrement(stripe->cache_vol->vol_rsb.direntries_used); |
| dir_set_offset(e, 0); // delete |
| } |
| } |
| this->cleanup(stripe); |
| } |
| |
| void |
| check_bucket_not_contains(Dir *b, Dir *e, Dir *seg) |
| { |
| Dir *x = b; |
| do { |
| if (x == e) { |
| break; |
| } |
| x = next_dir(x, seg); |
| } while (x); |
| ink_assert(!x); |
| } |
| |
| void |
| freelist_clean(int s, StripeSM *stripe) |
| { |
| stripe->directory.clean_segment(s, stripe); |
| if (stripe->directory.header->freelist[s]) { |
| return; |
| } |
| Warning("cache directory overflow on '%s' segment %d, purging...", stripe->disk->path, s); |
| int n = 0; |
| Dir *seg = stripe->directory.get_segment(s); |
| for (int bi = 0; bi < stripe->directory.buckets; bi++) { |
| Dir *b = dir_bucket(bi, seg); |
| for (int l = 0; l < DIR_DEPTH; l++) { |
| Dir *e = dir_bucket_row(b, l); |
| if (dir_head(e) && !(n++ % 10)) { |
| ts::Metrics::Gauge::decrement(cache_rsb.direntries_used); |
| ts::Metrics::Gauge::decrement(stripe->cache_vol->vol_rsb.direntries_used); |
| dir_set_offset(e, 0); // delete |
| } |
| } |
| } |
| stripe->directory.clean_segment(s, stripe); |
| } |
| |
| inline Dir * |
| freelist_pop(int s, StripeSM *stripe) |
| { |
| Dir *seg = stripe->directory.get_segment(s); |
| Dir *e = dir_from_offset(stripe->directory.header->freelist[s], seg); |
| if (!e) { |
| freelist_clean(s, stripe); |
| return nullptr; |
| } |
| stripe->directory.header->freelist[s] = dir_next(e); |
| // if the freelist if bad, punt. |
| if (dir_offset(e)) { |
| dir_init_segment(s, &stripe->directory); |
| return nullptr; |
| } |
| Dir *h = dir_from_offset(stripe->directory.header->freelist[s], seg); |
| if (h) { |
| dir_set_prev(h, 0); |
| } |
| return e; |
| } |
| |
| void |
| Directory::free_entry(Dir *e, int s) |
| { |
| Dir *seg = this->get_segment(s); |
| unsigned int fo = this->header->freelist[s]; |
| unsigned int eo = dir_to_offset(e, seg); |
| dir_set_next(e, fo); |
| if (fo) { |
| dir_set_prev(dir_from_offset(fo, seg), eo); |
| } |
| this->header->freelist[s] = eo; |
| } |
| |
| int |
| Directory::probe(const CacheKey *key, StripeSM *stripe, Dir *result, Dir **last_collision) |
| { |
| ink_assert(stripe->mutex->thread_holding == this_ethread()); |
| int s = key->slice32(0) % this->segments; |
| int b = key->slice32(1) % this->buckets; |
| Dir *seg = this->get_segment(s); |
| Dir *e = nullptr, *p = nullptr, *collision = *last_collision; |
| CHECK_DIR(d); |
| #ifdef LOOP_CHECK_MODE |
| if (dir_bucket_loop_fix(dir_bucket(b, seg), s, this)) |
| return 0; |
| #endif |
| Lagain: |
| e = dir_bucket(b, seg); |
| if (dir_offset(e)) { |
| do { |
| if (dir_compare_tag(e, key)) { |
| ink_assert(dir_offset(e)); |
| // Bug: 51680. Need to check collision before checking |
| // dir_valid(). In case of a collision, if !dir_valid(), we |
| // don't want to call dir_delete_entry. |
| if (collision) { |
| if (collision == e) { |
| collision = nullptr; |
| // increment collision stat |
| // Note: dir_probe could be called multiple times |
| // for the same document and so the collision stat |
| // may not accurately reflect the number of documents |
| // having the same first_key |
| DDbg(dbg_ctl_cache_stats, "Incrementing dir collisions"); |
| ts::Metrics::Counter::increment(cache_rsb.directory_collision); |
| ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.directory_collision); |
| } |
| goto Lcont; |
| } |
| if (stripe->dir_valid(e)) { |
| DDbg(dbg_ctl_dir_probe_hit, "found %X %X vol %d bucket %d boffset %" PRId64 "", key->slice32(0), key->slice32(1), |
| stripe->fd, b, dir_offset(e)); |
| dir_assign(result, e); |
| *last_collision = e; |
| ink_assert(dir_offset(e) * CACHE_BLOCK_SIZE < stripe->len); |
| return 1; |
| } else { // delete the invalid entry |
| ts::Metrics::Gauge::decrement(cache_rsb.direntries_used); |
| ts::Metrics::Gauge::decrement(stripe->cache_vol->vol_rsb.direntries_used); |
| ATS_PROBE7(cache_dir_remove_invalid, stripe->fd, s, dir_to_offset(e, seg), dir_offset(e), dir_approx_size(e), |
| key->slice64(0), key->slice64(1)); |
| e = dir_delete_entry(e, p, s, this); |
| continue; |
| } |
| } else { |
| DDbg(dbg_ctl_dir_probe_tag, "tag mismatch %p %X vs expected %X", e, dir_tag(e), key->slice32(3)); |
| } |
| Lcont: |
| p = e; |
| e = next_dir(e, seg); |
| } while (e); |
| } |
| if (collision) { // last collision no longer in the list, retry |
| DDbg(dbg_ctl_cache_stats, "Incrementing dir collisions"); |
| ts::Metrics::Counter::increment(cache_rsb.directory_collision); |
| ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.directory_collision); |
| collision = nullptr; |
| goto Lagain; |
| } |
| DDbg(dbg_ctl_dir_probe_miss, "missed %X %X on vol %d bucket %d at %p", key->slice32(0), key->slice32(1), stripe->fd, b, seg); |
| CHECK_DIR(d); |
| return 0; |
| } |
| |
| int |
| Directory::insert(const CacheKey *key, StripeSM *stripe, Dir *to_part) |
| { |
| ink_assert(stripe->mutex->thread_holding == this_ethread()); |
| int s = key->slice32(0) % this->segments, l; |
| int bi = key->slice32(1) % this->buckets; |
| ink_assert(dir_approx_size(to_part) <= MAX_FRAG_SIZE + sizeof(Doc)); |
| Dir *seg = this->get_segment(s); |
| Dir *e = nullptr; |
| Dir *b = dir_bucket(bi, seg); |
| #if defined(DEBUG) && defined(DO_CHECK_DIR_FAST) |
| unsigned int t = DIR_MASK_TAG(key->slice32(2)); |
| Dir *col = b; |
| while (col) { |
| ink_assert((dir_tag(col) != t) || (dir_offset(col) != dir_offset(to_part))); |
| col = next_dir(col, seg); |
| } |
| #endif |
| CHECK_DIR(d); |
| |
| Lagain: |
| // get from this row first |
| e = b; |
| if (dir_is_empty(e)) { |
| goto Lfill; |
| } |
| for (l = 1; l < DIR_DEPTH; l++) { |
| e = dir_bucket_row(b, l); |
| if (dir_is_empty(e)) { |
| unlink_from_freelist(e, s, this); |
| goto Llink; |
| } |
| } |
| // get one from the freelist |
| e = freelist_pop(s, stripe); |
| if (!e) { |
| goto Lagain; |
| } |
| Llink: |
| // dir_probe searches from head to tail of list and resumes from last_collision. |
| // Need to insert at the tail of the list so that no entries can be inserted |
| // before last_collision. This means walking the entire list on each insert, |
| // but at least the lists are completely in memory and should be quite short |
| Dir *prev, *last; |
| |
| l = 0; |
| last = b; |
| do { |
| prev = last; |
| last = next_dir(last, seg); |
| } while (last && (++l <= this->buckets * DIR_DEPTH)); |
| |
| dir_set_next(e, 0); |
| dir_set_next(prev, dir_to_offset(e, seg)); |
| Lfill: |
| dir_assign_data(e, to_part); |
| dir_set_tag(e, key->slice32(2)); |
| ink_assert(stripe->vol_offset(e) < (stripe->skip + stripe->len)); |
| DDbg(dbg_ctl_dir_insert, "insert %p %X into vol %d bucket %d at %p tag %X %X boffset %" PRId64 "", e, key->slice32(0), stripe->fd, |
| bi, e, key->slice32(1), dir_tag(e), dir_offset(e)); |
| ATS_PROBE7(cache_dir_insert, stripe->fd, s, dir_to_offset(e, seg), dir_offset(e), dir_approx_size(e), key->slice64(0), |
| key->slice64(1)); |
| CHECK_DIR(d); |
| stripe->directory.header->dirty = 1; |
| ts::Metrics::Gauge::increment(cache_rsb.direntries_used); |
| ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.direntries_used); |
| |
| return 1; |
| } |
| |
| int |
| Directory::overwrite(const CacheKey *key, StripeSM *stripe, Dir *dir, Dir *overwrite, bool must_overwrite) |
| { |
| ink_assert(stripe->mutex->thread_holding == this_ethread()); |
| int s = key->slice32(0) % this->segments, l; |
| int bi = key->slice32(1) % this->buckets; |
| Dir *seg = this->get_segment(s); |
| Dir *e = nullptr; |
| Dir *b = dir_bucket(bi, seg); |
| unsigned int t = DIR_MASK_TAG(key->slice32(2)); |
| int res = 1; |
| #ifdef LOOP_CHECK_MODE |
| int loop_count = 0; |
| bool loop_possible = true; |
| #endif |
| CHECK_DIR(d); |
| |
| ink_assert(static_cast<unsigned int>(dir_approx_size(dir)) <= |
| static_cast<unsigned int>((MAX_FRAG_SIZE + sizeof(Doc)))); // XXX - size should be unsigned |
| Lagain: |
| // find entry to overwrite |
| e = b; |
| if (dir_offset(e)) { |
| do { |
| #ifdef LOOP_CHECK_MODE |
| loop_count++; |
| if (loop_count > DIR_LOOP_THRESHOLD && loop_possible) { |
| if (dir_bucket_loop_fix(b, s, this)) { |
| loop_possible = false; |
| goto Lagain; |
| } |
| } |
| #endif |
| if (dir_tag(e) == t && dir_offset(e) == dir_offset(overwrite)) { |
| goto Lfill; |
| } |
| e = next_dir(e, seg); |
| } while (e); |
| } |
| if (must_overwrite) { |
| return 0; |
| } |
| res = 0; |
| // get from this row first |
| e = b; |
| if (dir_is_empty(e)) { |
| ts::Metrics::Gauge::increment(cache_rsb.direntries_used); |
| ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.direntries_used); |
| goto Lfill; |
| } |
| for (l = 1; l < DIR_DEPTH; l++) { |
| e = dir_bucket_row(b, l); |
| if (dir_is_empty(e)) { |
| unlink_from_freelist(e, s, this); |
| goto Llink; |
| } |
| } |
| // get one from the freelist |
| e = freelist_pop(s, stripe); |
| if (!e) { |
| goto Lagain; |
| } |
| Llink: |
| ts::Metrics::Gauge::increment(cache_rsb.direntries_used); |
| ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.direntries_used); |
| // as with dir_insert above, need to insert new entries at the tail of the linked list |
| Dir *prev, *last; |
| |
| l = 0; |
| last = b; |
| do { |
| prev = last; |
| last = next_dir(last, seg); |
| } while (last && (++l <= this->buckets * DIR_DEPTH)); |
| |
| dir_set_next(e, 0); |
| dir_set_next(prev, dir_to_offset(e, seg)); |
| Lfill: |
| dir_assign_data(e, dir); |
| dir_set_tag(e, t); |
| ink_assert(stripe->vol_offset(e) < stripe->skip + stripe->len); |
| DDbg(dbg_ctl_dir_overwrite, "overwrite %p %X into vol %d bucket %d at %p tag %X %X boffset %" PRId64 "", e, key->slice32(0), |
| stripe->fd, bi, e, t, dir_tag(e), dir_offset(e)); |
| CHECK_DIR(d); |
| this->header->dirty = 1; |
| return res; |
| } |
| |
| int |
| Directory::remove(const CacheKey *key, StripeSM *stripe, Dir *del) |
| { |
| ink_assert(stripe->mutex->thread_holding == this_ethread()); |
| int s = key->slice32(0) % this->segments; |
| int b = key->slice32(1) % this->buckets; |
| Dir *seg = this->get_segment(s); |
| Dir *e = nullptr, *p = nullptr; |
| #ifdef LOOP_CHECK_MODE |
| int loop_count = 0; |
| #endif |
| CHECK_DIR(vol); |
| |
| e = dir_bucket(b, seg); |
| if (dir_offset(e)) { |
| do { |
| #ifdef LOOP_CHECK_MODE |
| loop_count++; |
| if (loop_count > DIR_LOOP_THRESHOLD) { |
| if (dir_bucket_loop_fix(dir_bucket(b, seg), s, this)) |
| return 0; |
| } |
| #endif |
| int64_t offset = dir_offset(e); |
| if (dir_compare_tag(e, key) && offset == dir_offset(del)) { |
| ts::Metrics::Gauge::decrement(cache_rsb.direntries_used); |
| ts::Metrics::Gauge::decrement(stripe->cache_vol->vol_rsb.direntries_used); |
| ATS_PROBE7(cache_dir_remove, stripe->fd, s, dir_to_offset(e, seg), offset, dir_approx_size(e), key->slice64(0), |
| key->slice64(1)); |
| dir_delete_entry(e, p, s, this); |
| CHECK_DIR(d); |
| return 1; |
| } |
| p = e; |
| e = next_dir(e, seg); |
| } while (e); |
| } |
| CHECK_DIR(vol); |
| return 0; |
| } |
| |
| // Lookaside Cache |
| |
| int |
| dir_lookaside_probe(const CacheKey *key, StripeSM *stripe, Dir *result, EvacuationBlock **eblock) |
| { |
| ink_assert(stripe->mutex->thread_holding == this_ethread()); |
| int i = key->slice32(3) % LOOKASIDE_SIZE; |
| EvacuationBlock *b = stripe->lookaside[i].head; |
| while (b) { |
| if (b->evac_frags.key == *key) { |
| if (stripe->dir_valid(&b->new_dir)) { |
| *result = b->new_dir; |
| DDbg(dbg_ctl_dir_lookaside, "probe %X success", key->slice32(0)); |
| if (eblock) { |
| *eblock = b; |
| } |
| return 1; |
| } |
| } |
| b = b->link.next; |
| } |
| DDbg(dbg_ctl_dir_lookaside, "probe %X failed", key->slice32(0)); |
| return 0; |
| } |
| |
| int |
| dir_lookaside_insert(EvacuationBlock *eblock, StripeSM *stripe, Dir *to) |
| { |
| CacheKey *key = &eblock->evac_frags.earliest_key; |
| DDbg(dbg_ctl_dir_lookaside, "insert %X %X, offset %" PRId64 " phase %d", key->slice32(0), key->slice32(1), dir_offset(to), |
| dir_phase(to)); |
| ink_assert(stripe->mutex->thread_holding == this_ethread()); |
| int i = key->slice32(3) % LOOKASIDE_SIZE; |
| EvacuationBlock *b = new_EvacuationBlock(); |
| b->evac_frags.key = *key; |
| b->evac_frags.earliest_key = *key; |
| b->earliest_evacuator = eblock->earliest_evacuator; |
| ink_assert(b->earliest_evacuator); |
| b->dir = eblock->dir; |
| b->new_dir = *to; |
| stripe->lookaside[i].push(b); |
| return 1; |
| } |
| |
| int |
| dir_lookaside_fixup(const CacheKey *key, StripeSM *stripe) |
| { |
| ink_assert(stripe->mutex->thread_holding == this_ethread()); |
| int i = key->slice32(3) % LOOKASIDE_SIZE; |
| EvacuationBlock *b = stripe->lookaside[i].head; |
| while (b) { |
| if (b->evac_frags.key == *key) { |
| int res = stripe->directory.overwrite(key, stripe, &b->new_dir, &b->dir, false); |
| DDbg(dbg_ctl_dir_lookaside, "fixup %X %X offset %" PRId64 " phase %d %d", key->slice32(0), key->slice32(1), |
| dir_offset(&b->new_dir), dir_phase(&b->new_dir), res); |
| int64_t o = dir_offset(&b->dir), n = dir_offset(&b->new_dir); |
| stripe->ram_cache->fixup(key, static_cast<uint64_t>(o), static_cast<uint64_t>(n)); |
| stripe->lookaside[i].remove(b); |
| free_EvacuationBlock(b); |
| return res; |
| } |
| b = b->link.next; |
| } |
| DDbg(dbg_ctl_dir_lookaside, "fixup %X %X failed", key->slice32(0), key->slice32(1)); |
| return 0; |
| } |
| |
| void |
| dir_lookaside_cleanup(StripeSM *stripe) |
| { |
| ink_assert(stripe->mutex->thread_holding == this_ethread()); |
| for (auto &i : stripe->lookaside) { |
| EvacuationBlock *b = i.head; |
| while (b) { |
| if (!stripe->dir_valid(&b->new_dir)) { |
| EvacuationBlock *nb = b->link.next; |
| DDbg(dbg_ctl_dir_lookaside, "cleanup %X %X cleaned up", b->evac_frags.earliest_key.slice32(0), |
| b->evac_frags.earliest_key.slice32(1)); |
| i.remove(b); |
| free_CacheEvacuateDocVC(b->earliest_evacuator); |
| free_EvacuationBlock(b); |
| b = nb; |
| goto Lagain; |
| } |
| b = b->link.next; |
| Lagain:; |
| } |
| } |
| } |
| |
| void |
| dir_lookaside_remove(const CacheKey *key, StripeSM *stripe) |
| { |
| ink_assert(stripe->mutex->thread_holding == this_ethread()); |
| int i = key->slice32(3) % LOOKASIDE_SIZE; |
| EvacuationBlock *b = stripe->lookaside[i].head; |
| while (b) { |
| if (b->evac_frags.key == *key) { |
| DDbg(dbg_ctl_dir_lookaside, "remove %X %X offset %" PRId64 " phase %d", key->slice32(0), key->slice32(1), |
| dir_offset(&b->new_dir), dir_phase(&b->new_dir)); |
| stripe->lookaside[i].remove(b); |
| free_EvacuationBlock(b); |
| return; |
| } |
| b = b->link.next; |
| } |
| DDbg(dbg_ctl_dir_lookaside, "remove %X %X failed", key->slice32(0), key->slice32(1)); |
| return; |
| } |
| |
| // Cache Dir Sync |
| |
| void |
| dir_sync_init() |
| { |
| static std::vector<std::unique_ptr<CacheSync>> cache_syncs; |
| static bool initialized = false; |
| std::unordered_map<CacheDisk *, std::vector<int>> drive_stripe_map; |
| |
| if (initialized) { |
| Warning("dir_sync_init() called multiple times - ignoring"); |
| return; |
| } |
| initialized = true; |
| |
| for (int i = 0; i < gnstripes; i++) { |
| drive_stripe_map[gstripes[i]->disk].push_back(i); |
| } |
| |
| if (drive_stripe_map.empty()) { |
| Dbg(dbg_ctl_cache_dir_sync, "No stripes to sync - dir_sync_init complete"); |
| return; |
| } |
| |
| int num_tasks = std::max(1, (cache_config_dir_sync_parallel_tasks == -1) ? static_cast<int>(drive_stripe_map.size()) : |
| cache_config_dir_sync_parallel_tasks); |
| |
| cache_syncs.resize(num_tasks); |
| for (int i = 0; i < num_tasks; i++) { |
| cache_syncs[i] = std::make_unique<CacheSync>(); |
| } |
| |
| int task_idx = 0; |
| |
| for (auto &[disk, indices] : drive_stripe_map) { |
| int target_task = task_idx % num_tasks; |
| |
| Dbg(dbg_ctl_cache_dir_sync, "Disk %s: %zu stripe(s) assigned to task %d", disk->path, indices.size(), target_task); |
| for (int stripe_idx : indices) { |
| cache_syncs[target_task]->stripe_indices.push_back(stripe_idx); |
| } |
| task_idx++; |
| } |
| |
| for (int i = 0; i < num_tasks; i++) { |
| Dbg(dbg_ctl_cache_dir_sync, "Task %d: syncing %zu stripe(s)", i, cache_syncs[i]->stripe_indices.size()); |
| cache_syncs[i]->current_index = 0; |
| cache_syncs[i]->trigger = |
| eventProcessor.schedule_in(cache_syncs[i].get(), HRTIME_SECONDS(cache_config_dir_sync_frequency), ET_TASK); |
| } |
| } |
| |
| void |
| CacheSync::aio_write(int fd, char *b, int n, off_t o) |
| { |
| io.aiocb.aio_fildes = fd; |
| io.aiocb.aio_offset = o; |
| io.aiocb.aio_nbytes = n; |
| io.aiocb.aio_buf = b; |
| io.action = this; |
| io.thread = AIO_CALLBACK_THREAD_ANY; |
| ink_assert(ink_aio_write(&io) >= 0); |
| } |
| |
| uint64_t |
| Directory::entries_used() |
| { |
| uint64_t full = 0; |
| uint64_t sfull = 0; |
| for (int s = 0; s < this->segments; full += sfull, s++) { |
| Dir *seg = this->get_segment(s); |
| sfull = 0; |
| for (int b = 0; b < this->buckets; b++) { |
| Dir *e = dir_bucket(b, seg); |
| if (dir_bucket_loop_fix(e, s, this)) { |
| sfull = 0; |
| break; |
| } |
| while (e) { |
| if (dir_offset(e)) { |
| sfull++; |
| } |
| e = next_dir(e, seg); |
| if (!e) { |
| break; |
| } |
| } |
| } |
| } |
| return full; |
| } |
| |
| /* |
| * this function flushes the cache meta data to disk when |
| * the cache is shutdown. Must *NOT* be used during regular |
| * operation. |
| */ |
| |
| void |
| sync_cache_dir_on_shutdown() |
| { |
| Dbg(dbg_ctl_cache_dir_sync, "sync started"); |
| EThread *t = reinterpret_cast<EThread *>(0xdeadbeef); |
| for (int i = 0; i < gnstripes; i++) { |
| gstripes[i]->shutdown(t); |
| } |
| Dbg(dbg_ctl_cache_dir_sync, "sync done"); |
| } |
| |
| int |
| CacheSync::mainEvent(int event, Event * /* e ATS_UNUSED */) |
| { |
| if (trigger) { |
| trigger->cancel_action(); |
| trigger = nullptr; |
| } |
| |
| Lrestart: |
| if (current_index >= static_cast<int>(stripe_indices.size())) { |
| current_index = 0; |
| #if FREE_BUF_BETWEEN_CYCLES |
| // Free buffer between sync cycles to avoid holding large amounts of memory |
| if (buf) { |
| if (buf_huge) { |
| ats_free_hugepage(buf, buflen); |
| } else { |
| ats_free(buf); |
| } |
| buf = nullptr; |
| buflen = 0; |
| buf_huge = false; |
| } |
| #endif |
| Dbg(dbg_ctl_cache_dir_sync, "sync cycle done"); |
| trigger = eventProcessor.schedule_in(this, HRTIME_SECONDS(cache_config_dir_sync_frequency), ET_TASK); |
| return EVENT_CONT; |
| } |
| stripe_index = stripe_indices[current_index]; |
| current_index++; |
| |
| StripeSM *stripe = gstripes[stripe_index]; // must be named "vol" to make STAT macros work. |
| |
| if (event == AIO_EVENT_DONE) { |
| // AIO Thread |
| if (!io.ok()) { |
| Warning("vol write error during directory sync '%s'", gstripes[stripe_index]->hash_text.get()); |
| event = EVENT_NONE; |
| goto Ldone; |
| } |
| ts::Metrics::Counter::increment(cache_rsb.directory_sync_bytes, io.aio_result); |
| ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.directory_sync_bytes, io.aio_result); |
| trigger = eventProcessor.schedule_in(this, HRTIME_MSECONDS(cache_config_dir_sync_delay)); |
| return EVENT_CONT; |
| } |
| { |
| CACHE_TRY_LOCK(lock, gstripes[stripe_index]->mutex, mutex->thread_holding); |
| if (!lock.is_locked()) { |
| trigger = eventProcessor.schedule_in(this, HRTIME_MSECONDS(cache_config_mutex_retry_delay)); |
| return EVENT_CONT; |
| } |
| |
| if (!stripe->dir_sync_in_progress) { |
| start_time = ink_get_hrtime(); |
| } |
| |
| stripe->recompute_hit_evacuate_window(); |
| |
| if (DISK_BAD(stripe->disk)) { |
| goto Ldone; |
| } |
| |
| int headerlen = ROUND_TO_STORE_BLOCK(sizeof(StripeHeaderFooter)); |
| size_t dirlen = stripe->dirlen(); |
| if (!writepos) { |
| // start |
| Dbg(dbg_ctl_cache_dir_sync, "sync started"); |
| /* Don't sync the directory to disk if its not dirty. Syncing the |
| clean directory to disk is also the cause of INKqa07151. Increasing |
| the serial number causes the cache to recover more data than necessary. |
| The dirty bit it set in dir_insert, overwrite and dir_delete_entry |
| */ |
| if (!stripe->directory.header->dirty) { |
| Dbg(dbg_ctl_cache_dir_sync, "Dir %s not dirty", stripe->hash_text.get()); |
| goto Ldone; |
| } |
| if (stripe->is_io_in_progress() || stripe->get_agg_buf_pos()) { |
| Dbg(dbg_ctl_cache_dir_sync, "Dir %s: waiting for agg buffer", stripe->hash_text.get()); |
| stripe->dir_sync_waiting = true; |
| stripe->waiting_dir_sync = this; |
| if (!stripe->is_io_in_progress()) { |
| stripe->aggWrite(EVENT_IMMEDIATE, nullptr); |
| } |
| return EVENT_CONT; |
| } |
| Dbg(dbg_ctl_cache_dir_sync, "pos: %" PRIu64 " Dir %s dirty...syncing to disk", stripe->directory.header->write_pos, |
| stripe->hash_text.get()); |
| stripe->directory.header->dirty = 0; |
| if (buflen < dirlen) { |
| if (buf) { |
| if (buf_huge) { |
| ats_free_hugepage(buf, buflen); |
| } else { |
| ats_free(buf); |
| } |
| buf = nullptr; |
| } |
| buflen = dirlen; |
| if (ats_hugepage_enabled()) { |
| buf = static_cast<char *>(ats_alloc_hugepage(buflen)); |
| buf_huge = true; |
| } |
| if (buf == nullptr) { |
| buf = static_cast<char *>(ats_memalign(ats_pagesize(), buflen)); |
| buf_huge = false; |
| } |
| } |
| stripe->directory.header->sync_serial++; |
| stripe->directory.footer->sync_serial = stripe->directory.header->sync_serial; |
| CHECK_DIR(d); |
| memcpy(buf, stripe->directory.raw_dir, dirlen); |
| stripe->dir_sync_in_progress = true; |
| } |
| size_t B = stripe->directory.header->sync_serial & 1; |
| off_t start = stripe->skip + (B ? dirlen : 0); |
| |
| if (!writepos) { |
| // write header |
| aio_write(stripe->fd, buf + writepos, headerlen, start + writepos); |
| writepos += headerlen; |
| } else if (writepos < static_cast<off_t>(dirlen) - headerlen) { |
| // write part of body |
| int l = cache_config_dir_sync_max_write; |
| if (writepos + l > static_cast<off_t>(dirlen) - headerlen) { |
| l = dirlen - headerlen - writepos; |
| } |
| aio_write(stripe->fd, buf + writepos, l, start + writepos); |
| writepos += l; |
| } else if (writepos < static_cast<off_t>(dirlen)) { |
| ink_assert(writepos == (off_t)dirlen - headerlen); |
| // write footer |
| aio_write(stripe->fd, buf + writepos, headerlen, start + writepos); |
| writepos += headerlen; |
| } else { |
| stripe->dir_sync_in_progress = false; |
| ts::Metrics::Counter::increment(cache_rsb.directory_sync_count); |
| ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.directory_sync_count); |
| ts::Metrics::Counter::increment(cache_rsb.directory_sync_time, ink_get_hrtime() - start_time); |
| ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.directory_sync_time, ink_get_hrtime() - start_time); |
| start_time = 0; |
| goto Ldone; |
| } |
| return EVENT_CONT; |
| } |
| Ldone: |
| writepos = 0; |
| goto Lrestart; |
| } |
| |
| // |
| // Static Tables |
| // |
| |
| // permutation table |
| const uint8_t CacheKey_next_table[256] = { |
| 21, 53, 167, 51, 255, 126, 241, 151, 115, 66, 155, 174, 226, 215, 80, 188, 12, 95, 8, 24, 162, 201, 46, 104, 79, 172, |
| 39, 68, 56, 144, 142, 217, 101, 62, 14, 108, 120, 90, 61, 47, 132, 199, 110, 166, 83, 125, 57, 65, 19, 130, 148, 116, |
| 228, 189, 170, 1, 71, 0, 252, 184, 168, 177, 88, 229, 242, 237, 183, 55, 13, 212, 240, 81, 211, 74, 195, 205, 147, 93, |
| 30, 87, 86, 63, 135, 102, 233, 106, 118, 163, 107, 10, 243, 136, 160, 119, 43, 161, 206, 141, 203, 78, 175, 36, 37, 140, |
| 224, 197, 185, 196, 248, 84, 122, 73, 152, 157, 18, 225, 219, 145, 45, 2, 171, 249, 173, 32, 143, 137, 69, 41, 35, 89, |
| 33, 98, 179, 214, 114, 231, 251, 123, 180, 194, 29, 3, 178, 31, 192, 164, 15, 234, 26, 230, 91, 156, 5, 16, 23, 244, |
| 58, 50, 4, 67, 134, 165, 60, 235, 250, 7, 138, 216, 49, 139, 191, 154, 11, 52, 239, 59, 111, 245, 9, 64, 25, 129, |
| 247, 232, 190, 246, 109, 22, 112, 210, 221, 181, 92, 169, 48, 100, 193, 77, 103, 133, 70, 220, 207, 223, 176, 204, 76, 186, |
| 200, 208, 158, 182, 227, 222, 131, 38, 187, 238, 6, 34, 253, 128, 146, 44, 94, 127, 105, 153, 113, 20, 27, 124, 159, 17, |
| 72, 218, 96, 149, 213, 42, 28, 254, 202, 40, 117, 82, 97, 209, 54, 236, 121, 75, 85, 150, 99, 198, |
| }; |
| |
| // permutation table |
| const uint8_t CacheKey_prev_table[256] = { |
| 57, 55, 119, 141, 158, 152, 218, 165, 18, 178, 89, 172, 16, 68, 34, 146, 153, 233, 114, 48, 229, 0, 187, 154, 19, 180, |
| 148, 230, 240, 140, 78, 143, 123, 130, 219, 128, 101, 102, 215, 26, 243, 127, 239, 94, 223, 118, 22, 39, 194, 168, 157, 3, |
| 173, 1, 248, 67, 28, 46, 156, 175, 162, 38, 33, 81, 179, 47, 9, 159, 27, 126, 200, 56, 234, 111, 73, 251, 206, 197, |
| 99, 24, 14, 71, 245, 44, 109, 252, 80, 79, 62, 129, 37, 150, 192, 77, 224, 17, 236, 246, 131, 254, 195, 32, 83, 198, |
| 23, 226, 85, 88, 35, 186, 42, 176, 188, 228, 134, 8, 51, 244, 86, 93, 36, 250, 110, 137, 231, 45, 5, 225, 221, 181, |
| 49, 214, 40, 199, 160, 82, 91, 125, 166, 169, 103, 97, 30, 124, 29, 117, 222, 76, 50, 237, 253, 7, 112, 227, 171, 10, |
| 151, 113, 210, 232, 92, 95, 20, 87, 145, 161, 43, 2, 60, 193, 54, 120, 25, 122, 11, 100, 204, 61, 142, 132, 138, 191, |
| 211, 66, 59, 106, 207, 216, 15, 53, 184, 170, 144, 196, 139, 74, 107, 105, 255, 41, 208, 21, 242, 98, 205, 75, 96, 202, |
| 209, 247, 189, 72, 69, 238, 133, 13, 167, 31, 235, 116, 201, 190, 213, 203, 104, 115, 12, 212, 52, 63, 149, 135, 183, 84, |
| 147, 163, 249, 65, 217, 174, 70, 6, 64, 90, 155, 177, 185, 182, 108, 121, 164, 136, 58, 220, 241, 4, |
| }; |