| /** @file |
| |
| A brief file description |
| |
| @section license License |
| |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| // Cache Inspector and State Pages |
| |
| #include "CacheEvacuateDocVC.h" |
| #include "CacheVC.h" |
| #include "P_CacheDoc.h" |
| #include "P_CacheInternal.h" |
| #include "P_CacheTest.h" |
| #include "Stripe.h" |
| #include "StripeSM.h" |
| #include "iocore/cache/Cache.h" |
| #include "tscore/Filenames.h" |
| #include "tscore/InkErrno.h" |
| #include "tscore/Layout.h" |
| |
| #ifdef AIO_FAULT_INJECTION |
| #include "iocore/aio/AIO_fault_injection.h" |
| #endif |
| |
| #include <atomic> |
| #include <unordered_set> |
| #include <fstream> |
| #include <string> |
| #include <filesystem> |
| |
| #define SCAN_BUF_SIZE RECOVERY_SIZE |
| #define SCAN_WRITER_LOCK_MAX_RETRY 5 |
| |
| extern void register_cache_stats(CacheStatsBlock *rsb, const std::string &prefix); |
| |
| constexpr ts::VersionNumber CACHE_DB_VERSION(CACHE_DB_MAJOR_VERSION, CACHE_DB_MINOR_VERSION); |
| |
| // Configuration |
| |
| int64_t cache_config_ram_cache_size = AUTO_SIZE_RAM_CACHE; |
| int cache_config_ram_cache_algorithm = 1; |
| int cache_config_ram_cache_compress = 0; |
| int cache_config_ram_cache_compress_percent = 90; |
| int cache_config_ram_cache_use_seen_filter = 1; |
| int cache_config_http_max_alts = 3; |
| int cache_config_log_alternate_eviction = 0; |
| int cache_config_dir_sync_frequency = 60; |
| int cache_config_dir_sync_delay = 500; |
| int cache_config_dir_sync_max_write = (2 * 1024 * 1024); |
| int cache_config_permit_pinning = 0; |
| int cache_config_select_alternate = 1; |
| int cache_config_max_doc_size = 0; |
| int cache_config_min_average_object_size = ESTIMATED_OBJECT_SIZE; |
| int64_t cache_config_ram_cache_cutoff = AGG_SIZE; |
| int cache_config_max_disk_errors = 5; |
| int cache_config_hit_evacuate_percent = 10; |
| int cache_config_hit_evacuate_size_limit = 0; |
| int cache_config_force_sector_size = 0; |
| int cache_config_target_fragment_size = DEFAULT_TARGET_FRAGMENT_SIZE; |
| int cache_config_agg_write_backlog = AGG_SIZE * 2; |
| int cache_config_enable_checksum = 0; |
| int cache_config_alt_rewrite_max_size = 4096; |
| int cache_config_read_while_writer = 0; |
| int cache_config_mutex_retry_delay = 2; |
| int cache_read_while_writer_retry_delay = 50; |
| int cache_config_read_while_writer_max_retries = 10; |
| int cache_config_persist_bad_disks = false; |
| |
| // Globals |
| |
| CacheStatsBlock cache_rsb; |
| Cache *theCache = nullptr; |
| std::vector<std::unique_ptr<CacheDisk>> gdisks; |
| int gndisks = 0; |
| Cache *caches[NUM_CACHE_FRAG_TYPES] = {nullptr}; |
| CacheSync *cacheDirSync = nullptr; |
| Store theCacheStore; |
| StripeSM **gstripes = nullptr; |
| std::atomic<int> gnstripes = 0; |
| ClassAllocator<CacheVC, false> cacheVConnectionAllocator("cacheVConnection"); |
| ClassAllocator<CacheEvacuateDocVC, false> cacheEvacuateDocVConnectionAllocator("cacheEvacuateDocVC"); |
| ClassAllocator<EvacuationBlock, false> evacuationBlockAllocator("evacuationBlock"); |
| ClassAllocator<CacheRemoveCont, false> cacheRemoveContAllocator("cacheRemoveCont"); |
| ClassAllocator<EvacuationKey, false> evacuationKeyAllocator("evacuationKey"); |
| std::unordered_set<std::string> known_bad_disks; |
| |
| namespace |
| { |
| |
| DbgCtl dbg_ctl_cache_scan_truss{"cache_scan_truss"}; |
| DbgCtl dbg_ctl_cache_init{"cache_init"}; |
| DbgCtl dbg_ctl_cache_hosting{"cache_hosting"}; |
| DbgCtl dbg_ctl_cache_update{"cache_update"}; |
| |
| } // end anonymous namespace |
| |
| // Global list of the volumes created |
| Queue<CacheVol> cp_list; |
| int cp_list_len = 0; |
| ConfigVolumes config_volumes; |
| |
| #if TS_HAS_TESTS |
| void |
| force_link_CacheTestCaller() |
| { |
| force_link_CacheTest(); |
| } |
| #endif |
| |
| static int |
| validate_rww(int new_value) |
| { |
| if (new_value) { |
| auto http_bg_fill{RecGetRecordFloat("proxy.config.http.background_fill_completed_threshold").value_or(0)}; |
| if (http_bg_fill > 0.0) { |
| Note("to enable reading while writing a document, %s should be 0.0: read while writing disabled", |
| "proxy.config.http.background_fill_completed_threshold"); |
| return 0; |
| } |
| if (cache_config_max_doc_size > 0) { |
| Note("to enable reading while writing a document, %s should be 0: read while writing disabled", |
| "proxy.config.cache.max_doc_size"); |
| return 0; |
| } |
| return new_value; |
| } |
| return 0; |
| } |
| |
| static int |
| update_cache_config(const char * /* name ATS_UNUSED */, RecDataT /* data_type ATS_UNUSED */, RecData data, |
| void * /* cookie ATS_UNUSED */) |
| { |
| int new_value = validate_rww(data.rec_int); |
| cache_config_read_while_writer = new_value; |
| |
| return 0; |
| } |
| |
| static_assert(static_cast<int>(TS_EVENT_CACHE_OPEN_READ) == static_cast<int>(CACHE_EVENT_OPEN_READ)); |
| static_assert(static_cast<int>(TS_EVENT_CACHE_OPEN_READ_FAILED) == static_cast<int>(CACHE_EVENT_OPEN_READ_FAILED)); |
| static_assert(static_cast<int>(TS_EVENT_CACHE_OPEN_WRITE) == static_cast<int>(CACHE_EVENT_OPEN_WRITE)); |
| static_assert(static_cast<int>(TS_EVENT_CACHE_OPEN_WRITE_FAILED) == static_cast<int>(CACHE_EVENT_OPEN_WRITE_FAILED)); |
| static_assert(static_cast<int>(TS_EVENT_CACHE_REMOVE) == static_cast<int>(CACHE_EVENT_REMOVE)); |
| static_assert(static_cast<int>(TS_EVENT_CACHE_REMOVE_FAILED) == static_cast<int>(CACHE_EVENT_REMOVE_FAILED)); |
| static_assert(static_cast<int>(TS_EVENT_CACHE_SCAN) == static_cast<int>(CACHE_EVENT_SCAN)); |
| static_assert(static_cast<int>(TS_EVENT_CACHE_SCAN_FAILED) == static_cast<int>(CACHE_EVENT_SCAN_FAILED)); |
| static_assert(static_cast<int>(TS_EVENT_CACHE_SCAN_OBJECT) == static_cast<int>(CACHE_EVENT_SCAN_OBJECT)); |
| static_assert(static_cast<int>(TS_EVENT_CACHE_SCAN_OPERATION_BLOCKED) == static_cast<int>(CACHE_EVENT_SCAN_OPERATION_BLOCKED)); |
| static_assert(static_cast<int>(TS_EVENT_CACHE_SCAN_OPERATION_FAILED) == static_cast<int>(CACHE_EVENT_SCAN_OPERATION_FAILED)); |
| static_assert(static_cast<int>(TS_EVENT_CACHE_SCAN_DONE) == static_cast<int>(CACHE_EVENT_SCAN_DONE)); |
| |
| void |
| Cache::vol_initialized(bool result) |
| { |
| if (result) { |
| ink_atomic_increment(&total_good_nvol, 1); |
| } |
| if (total_nvol == ink_atomic_increment(&total_initialized_vol, 1) + 1) { |
| open_done(); |
| } |
| } |
| |
| int |
| AIO_failure_handler::handle_disk_failure(int /* event ATS_UNUSED */, void *data) |
| { |
| /* search for the matching file descriptor */ |
| if (!CacheProcessor::cache_ready) { |
| return EVENT_DONE; |
| } |
| int disk_no = 0; |
| AIOCallback *cb = static_cast<AIOCallback *>(data); |
| |
| for (; disk_no < gndisks; disk_no++) { |
| CacheDisk *d = gdisks[disk_no].get(); |
| |
| if (d->fd == cb->aiocb.aio_fildes) { |
| char message[256]; |
| d->incrErrors(cb); |
| |
| if (!DISK_BAD(d)) { |
| snprintf(message, sizeof(message), "Error accessing Disk %s [%d/%d]", d->path, d->num_errors, cache_config_max_disk_errors); |
| Warning("%s", message); |
| } else if (!DISK_BAD_SIGNALLED(d)) { |
| snprintf(message, sizeof(message), "too many errors accessing disk %s [%d/%d]: declaring disk bad", d->path, d->num_errors, |
| cache_config_max_disk_errors); |
| Warning("%s", message); |
| cacheProcessor.mark_storage_offline(d); // take it out of service |
| } |
| break; |
| } |
| } |
| |
| delete cb; |
| return EVENT_DONE; |
| } |
| |
| int |
| Cache::open_done() |
| { |
| Action *register_ShowCache(Continuation * c, HTTPHdr * h); |
| Action *register_ShowCacheInternal(Continuation * c, HTTPHdr * h); |
| |
| if (total_good_nvol == 0) { |
| ready = CacheInitState::FAILED; |
| cacheProcessor.cacheInitialized(); |
| return 0; |
| } |
| |
| { |
| CacheHostTable *hosttable_raw = new CacheHostTable(this, scheme); |
| hosttable.reset(hosttable_raw); |
| hosttable_raw->register_config_callback(&hosttable); |
| } |
| |
| ReplaceablePtr<CacheHostTable>::ScopedReader hosttable(&this->hosttable); |
| if (hosttable->gen_host_rec.num_cachevols == 0) { |
| ready = CacheInitState::FAILED; |
| } else { |
| ready = CacheInitState::INITIALIZED; |
| } |
| |
| // TS-3848 |
| if (ready == CacheInitState::FAILED && cacheProcessor.waitForCache() >= 2) { |
| Emergency("Failed to initialize cache host table"); |
| } |
| |
| cacheProcessor.cacheInitialized(); |
| |
| return 0; |
| } |
| |
| int |
| Cache::open(bool clear, bool /* fix ATS_UNUSED */) |
| { |
| int i; |
| off_t blocks = 0; |
| cache_read_done = 0; |
| total_initialized_vol = 0; |
| total_nvol = 0; |
| total_good_nvol = 0; |
| |
| RecEstablishStaticConfigInt32(cache_config_min_average_object_size, "proxy.config.cache.min_average_object_size"); |
| Dbg(dbg_ctl_cache_init, "Cache::open - proxy.config.cache.min_average_object_size = %d", cache_config_min_average_object_size); |
| |
| CacheVol *cp = cp_list.head; |
| for (; cp; cp = cp->link.next) { |
| if (cp->scheme == scheme) { |
| cp->stripes = static_cast<StripeSM **>(ats_malloc(cp->num_vols * sizeof(StripeSM *))); |
| int vol_no = 0; |
| for (i = 0; i < gndisks; i++) { |
| if (cp->disk_stripes[i] && !DISK_BAD(cp->disk_stripes[i]->disk)) { |
| DiskStripeBlockQueue *q = cp->disk_stripes[i]->dpb_queue.head; |
| for (; q; q = q->link.next) { |
| blocks = q->b->len; |
| CacheDisk *d = cp->disk_stripes[i]->disk; |
| cp->stripes[vol_no] = new StripeSM(d, blocks, q->b->offset, cp->avg_obj_size, cp->fragment_size); |
| cp->stripes[vol_no]->cache = this; |
| cp->stripes[vol_no]->cache_vol = cp; |
| |
| bool vol_clear = clear || d->cleared || q->new_block; |
| cp->stripes[vol_no]->init(vol_clear); |
| vol_no++; |
| cache_size += blocks; |
| } |
| } |
| } |
| total_nvol += vol_no; |
| } |
| } |
| if (total_nvol == 0) { |
| return open_done(); |
| } |
| cache_read_done = 1; |
| return 0; |
| } |
| |
| int |
| Cache::close() |
| { |
| return -1; |
| } |
| |
| Action * |
| Cache::lookup(Continuation *cont, const CacheKey *key, CacheFragType type, std::string_view hostname) const |
| { |
| if (!CacheProcessor::IsCacheReady(type)) { |
| cont->handleEvent(CACHE_EVENT_LOOKUP_FAILED, nullptr); |
| return ACTION_RESULT_DONE; |
| } |
| |
| StripeSM *stripe = key_to_stripe(key, hostname); |
| CacheVC *c = new_CacheVC(cont); |
| SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead); |
| c->vio.op = VIO::READ; |
| c->op_type = static_cast<int>(CacheOpType::Lookup); |
| ts::Metrics::Gauge::increment(cache_rsb.status[c->op_type].active); |
| ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[c->op_type].active); |
| c->first_key = c->key = *key; |
| c->frag_type = type; |
| c->f.lookup = 1; |
| c->stripe = stripe; |
| c->last_collision = nullptr; |
| |
| if (c->handleEvent(EVENT_INTERVAL, nullptr) == EVENT_CONT) { |
| return &c->_action; |
| } else { |
| return ACTION_RESULT_DONE; |
| } |
| } |
| |
| Action * |
| Cache::open_read(Continuation *cont, const CacheKey *key, CacheFragType type, std::string_view hostname) const |
| { |
| if (!CacheProcessor::IsCacheReady(type)) { |
| cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast<void *>(-ECACHE_NOT_READY)); |
| return ACTION_RESULT_DONE; |
| } |
| ink_assert(caches[type] == this); |
| |
| StripeSM *stripe = key_to_stripe(key, hostname); |
| Dir result, *last_collision = nullptr; |
| ProxyMutex *mutex = cont->mutex.get(); |
| OpenDirEntry *od = nullptr; |
| CacheVC *c = nullptr; |
| { |
| CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding); |
| if (!lock.is_locked() || (od = stripe->open_read(key)) || stripe->directory.probe(key, stripe, &result, &last_collision)) { |
| c = new_CacheVC(cont); |
| SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead); |
| c->vio.op = VIO::READ; |
| c->op_type = static_cast<int>(CacheOpType::Read); |
| ts::Metrics::Gauge::increment(cache_rsb.status[c->op_type].active); |
| ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[c->op_type].active); |
| c->first_key = c->key = c->earliest_key = *key; |
| c->stripe = stripe; |
| c->frag_type = type; |
| c->od = od; |
| } |
| if (!c) { |
| goto Lmiss; |
| } |
| if (!lock.is_locked()) { |
| CONT_SCHED_LOCK_RETRY(c); |
| return &c->_action; |
| } |
| if (c->od) { |
| goto Lwriter; |
| } |
| c->dir = result; |
| c->last_collision = last_collision; |
| switch (c->do_read_call(&c->key)) { |
| case EVENT_DONE: |
| return ACTION_RESULT_DONE; |
| case EVENT_RETURN: |
| goto Lcallreturn; |
| default: |
| return &c->_action; |
| } |
| } |
| Lmiss: |
| ts::Metrics::Counter::increment(cache_rsb.status[static_cast<int>(CacheOpType::Read)].failure); |
| ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.status[static_cast<int>(CacheOpType::Read)].failure); |
| cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast<void *>(-ECACHE_NO_DOC)); |
| return ACTION_RESULT_DONE; |
| Lwriter: |
| SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter); |
| if (c->handleEvent(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) { |
| return ACTION_RESULT_DONE; |
| } |
| return &c->_action; |
| Lcallreturn: |
| if (c->handleEvent(AIO_EVENT_DONE, nullptr) == EVENT_DONE) { |
| return ACTION_RESULT_DONE; |
| } |
| return &c->_action; |
| } |
| |
| // main entry point for writing of non-http documents |
| Action * |
| Cache::open_write(Continuation *cont, const CacheKey *key, CacheFragType frag_type, int options, time_t apin_in_cache, |
| std::string_view hostname) const |
| { |
| if (!CacheProcessor::IsCacheReady(frag_type)) { |
| cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, reinterpret_cast<void *>(-ECACHE_NOT_READY)); |
| return ACTION_RESULT_DONE; |
| } |
| |
| ink_assert(caches[frag_type] == this); |
| |
| intptr_t res = 0; |
| CacheVC *c = new_CacheVC(cont); |
| SCOPED_MUTEX_LOCK(lock, c->mutex, this_ethread()); |
| c->vio.op = VIO::WRITE; |
| c->op_type = static_cast<int>(CacheOpType::Write); |
| c->stripe = key_to_stripe(key, hostname); |
| StripeSM *stripe = c->stripe; |
| ts::Metrics::Gauge::increment(cache_rsb.status[c->op_type].active); |
| ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[c->op_type].active); |
| c->first_key = c->key = *key; |
| c->frag_type = frag_type; |
| /* |
| The transition from single fragment document to a multi-fragment document |
| would cause a problem if the key and the first_key collide. In case of |
| a collision, old vector data could be served to HTTP. Need to avoid that. |
| Also, when evacuating a fragment, we have to decide if its the first_key |
| or the earliest_key based on the dir_tag. |
| */ |
| do { |
| rand_CacheKey(&c->key); |
| } while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2))); |
| c->earliest_key = c->key; |
| c->info = nullptr; |
| c->f.overwrite = (options & CACHE_WRITE_OPT_OVERWRITE) != 0; |
| c->f.close_complete = (options & CACHE_WRITE_OPT_CLOSE_COMPLETE) != 0; |
| c->f.sync = (options & CACHE_WRITE_OPT_SYNC) == CACHE_WRITE_OPT_SYNC; |
| // coverity[Y2K38_SAFETY:FALSE] |
| c->pin_in_cache = static_cast<uint32_t>(apin_in_cache); |
| |
| if ((res = c->stripe->open_write_lock(c, false, 1)) > 0) { |
| // document currently being written, abort |
| ts::Metrics::Counter::increment(cache_rsb.status[c->op_type].failure); |
| ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.status[c->op_type].failure); |
| cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, reinterpret_cast<void *>(-res)); |
| free_CacheVC(c); |
| return ACTION_RESULT_DONE; |
| } |
| if (res < 0) { |
| SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartBegin); |
| c->trigger = CONT_SCHED_LOCK_RETRY(c); |
| return &c->_action; |
| } |
| if (!c->f.overwrite) { |
| SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain); |
| c->callcont(CACHE_EVENT_OPEN_WRITE); |
| return ACTION_RESULT_DONE; |
| } else { |
| SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteOverwrite); |
| if (c->openWriteOverwrite(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) { |
| return ACTION_RESULT_DONE; |
| } else { |
| return &c->_action; |
| } |
| } |
| } |
| |
| Action * |
| Cache::remove(Continuation *cont, const CacheKey *key, CacheFragType type, std::string_view hostname) const |
| { |
| if (!CacheProcessor::IsCacheReady(type)) { |
| if (cont) { |
| cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, nullptr); |
| } |
| return ACTION_RESULT_DONE; |
| } |
| |
| Ptr<ProxyMutex> mutex; |
| if (!cont) { |
| cont = new_CacheRemoveCont(); |
| } |
| |
| CACHE_TRY_LOCK(lock, cont->mutex, this_ethread()); |
| ink_assert(lock.is_locked()); |
| StripeSM *stripe = key_to_stripe(key, hostname); |
| // coverity[var_decl] |
| Dir result; |
| dir_clear(&result); // initialized here, set result empty so we can recognize missed lock |
| mutex = cont->mutex; |
| |
| CacheVC *c = new_CacheVC(cont); |
| c->vio.op = VIO::NONE; |
| c->frag_type = type; |
| c->op_type = static_cast<int>(CacheOpType::Remove); |
| ts::Metrics::Gauge::increment(cache_rsb.status[c->op_type].active); |
| ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[c->op_type].active); |
| c->first_key = c->key = *key; |
| c->stripe = stripe; |
| c->dir = result; |
| c->f.remove = 1; |
| |
| SET_CONTINUATION_HANDLER(c, &CacheVC::removeEvent); |
| int ret = c->removeEvent(EVENT_IMMEDIATE, nullptr); |
| if (ret == EVENT_DONE) { |
| return ACTION_RESULT_DONE; |
| } else { |
| return &c->_action; |
| } |
| } |
| |
| Action * |
| Cache::scan(Continuation *cont, std::string_view hostname, int KB_per_second) const |
| { |
| Dbg(dbg_ctl_cache_scan_truss, "inside scan"); |
| if (!CacheProcessor::IsCacheReady(CACHE_FRAG_TYPE_HTTP)) { |
| cont->handleEvent(CACHE_EVENT_SCAN_FAILED, nullptr); |
| return ACTION_RESULT_DONE; |
| } |
| |
| CacheVC *c = new_CacheVC(cont); |
| c->stripe = nullptr; |
| /* do we need to make a copy */ |
| c->hostname = hostname; |
| c->op_type = static_cast<int>(CacheOpType::Scan); |
| c->buf = new_IOBufferData(BUFFER_SIZE_FOR_XMALLOC(SCAN_BUF_SIZE), MEMALIGNED); |
| c->scan_msec_delay = (SCAN_BUF_SIZE / KB_per_second); |
| c->offset = 0; |
| SET_CONTINUATION_HANDLER(c, &CacheVC::scanStripe); |
| eventProcessor.schedule_in(c, HRTIME_MSECONDS(c->scan_msec_delay)); |
| cont->handleEvent(CACHE_EVENT_SCAN, c); |
| return &c->_action; |
| } |
| |
| Action * |
| Cache::open_read(Continuation *cont, const CacheKey *key, CacheHTTPHdr *request, const HttpConfigAccessor *params, |
| CacheFragType type, std::string_view hostname) const |
| { |
| if (!CacheProcessor::IsCacheReady(type)) { |
| cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast<void *>(-ECACHE_NOT_READY)); |
| return ACTION_RESULT_DONE; |
| } |
| ink_assert(caches[type] == this); |
| |
| StripeSM *stripe = key_to_stripe(key, hostname); |
| Dir result, *last_collision = nullptr; |
| ProxyMutex *mutex = cont->mutex.get(); |
| OpenDirEntry *od = nullptr; |
| CacheVC *c = nullptr; |
| |
| { |
| CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding); |
| if (!lock.is_locked() || (od = stripe->open_read(key)) || stripe->directory.probe(key, stripe, &result, &last_collision)) { |
| c = new_CacheVC(cont); |
| c->first_key = c->key = c->earliest_key = *key; |
| c->stripe = stripe; |
| c->vio.op = VIO::READ; |
| c->op_type = static_cast<int>(CacheOpType::Read); |
| ts::Metrics::Gauge::increment(cache_rsb.status[c->op_type].active); |
| ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[c->op_type].active); |
| c->request.copy_shallow(request); |
| c->frag_type = CACHE_FRAG_TYPE_HTTP; |
| c->params = params; |
| c->od = od; |
| } |
| if (!lock.is_locked()) { |
| SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead); |
| CONT_SCHED_LOCK_RETRY(c); |
| return &c->_action; |
| } |
| if (!c) { |
| goto Lmiss; |
| } |
| if (c->od) { |
| goto Lwriter; |
| } |
| // hit |
| c->dir = c->first_dir = result; |
| c->last_collision = last_collision; |
| SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead); |
| switch (c->do_read_call(&c->key)) { |
| case EVENT_DONE: |
| return ACTION_RESULT_DONE; |
| case EVENT_RETURN: |
| goto Lcallreturn; |
| default: |
| return &c->_action; |
| } |
| } |
| Lmiss: |
| ts::Metrics::Counter::increment(cache_rsb.status[static_cast<int>(CacheOpType::Read)].failure); |
| ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.status[static_cast<int>(CacheOpType::Read)].failure); |
| cont->handleEvent(CACHE_EVENT_OPEN_READ_FAILED, reinterpret_cast<void *>(-ECACHE_NO_DOC)); |
| return ACTION_RESULT_DONE; |
| Lwriter: |
| cont->handleEvent(CACHE_EVENT_OPEN_READ_RWW, nullptr); |
| SET_CONTINUATION_HANDLER(c, &CacheVC::openReadFromWriter); |
| if (c->handleEvent(EVENT_IMMEDIATE, nullptr) == EVENT_DONE) { |
| return ACTION_RESULT_DONE; |
| } |
| return &c->_action; |
| Lcallreturn: |
| if (c->handleEvent(AIO_EVENT_DONE, nullptr) == EVENT_DONE) { |
| return ACTION_RESULT_DONE; |
| } |
| return &c->_action; |
| } |
| |
| // main entry point for writing of http documents |
| Action * |
| Cache::open_write(Continuation *cont, const CacheKey *key, CacheHTTPInfo *info, time_t apin_in_cache, CacheFragType type, |
| std::string_view hostname) const |
| { |
| if (!CacheProcessor::IsCacheReady(type)) { |
| cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, reinterpret_cast<void *>(-ECACHE_NOT_READY)); |
| return ACTION_RESULT_DONE; |
| } |
| |
| ink_assert(caches[type] == this); |
| intptr_t err = 0; |
| int if_writers = reinterpret_cast<uintptr_t>(info) == CACHE_ALLOW_MULTIPLE_WRITES; |
| CacheVC *c = new_CacheVC(cont); |
| c->vio.op = VIO::WRITE; |
| c->first_key = *key; |
| /* |
| The transition from single fragment document to a multi-fragment document |
| would cause a problem if the key and the first_key collide. In case of |
| a collision, old vector data could be served to HTTP. Need to avoid that. |
| Also, when evacuating a fragment, we have to decide if its the first_key |
| or the earliest_key based on the dir_tag. |
| */ |
| do { |
| rand_CacheKey(&c->key); |
| } while (DIR_MASK_TAG(c->key.slice32(2)) == DIR_MASK_TAG(c->first_key.slice32(2))); |
| c->earliest_key = c->key; |
| c->frag_type = CACHE_FRAG_TYPE_HTTP; |
| c->stripe = key_to_stripe(key, hostname); |
| StripeSM *stripe = c->stripe; |
| c->info = info; |
| if (c->info && reinterpret_cast<uintptr_t>(info) != CACHE_ALLOW_MULTIPLE_WRITES) { |
| /* |
| Update has the following code paths : |
| a) Update alternate header only : |
| In this case the vector has to be rewritten. The content |
| length(update_len) and the key for the document are set in the |
| new_info in the set_http_info call. |
| HTTP OPERATIONS |
| open_write with info set |
| set_http_info new_info |
| (total_len == 0) |
| close |
| b) Update alternate and data |
| In this case both the vector and the data needs to be rewritten. |
| This case is similar to the standard write of a document case except |
| that the new_info is inserted into the vector at the alternate_index |
| (overwriting the old alternate) rather than the end of the vector. |
| HTTP OPERATIONS |
| open_write with info set |
| set_http_info new_info |
| do_io_write => (total_len > 0) |
| close |
| c) Delete an alternate |
| The vector may need to be deleted (if there was only one alternate) or |
| rewritten (if there were more than one alternate). |
| HTTP OPERATIONS |
| open_write with info set |
| close |
| */ |
| c->f.update = 1; |
| c->op_type = static_cast<int>(CacheOpType::Update); |
| DDbg(dbg_ctl_cache_update, "Update called"); |
| info->object_key_get(&c->update_key); |
| ink_assert(!(c->update_key.is_zero())); |
| c->update_len = info->object_size_get(); |
| } else { |
| c->op_type = static_cast<int>(CacheOpType::Write); |
| } |
| |
| ts::Metrics::Gauge::increment(cache_rsb.status[c->op_type].active); |
| ts::Metrics::Gauge::increment(stripe->cache_vol->vol_rsb.status[c->op_type].active); |
| // coverity[Y2K38_SAFETY:FALSE] |
| c->pin_in_cache = static_cast<uint32_t>(apin_in_cache); |
| |
| { |
| CACHE_TRY_LOCK(lock, c->stripe->mutex, cont->mutex->thread_holding); |
| if (lock.is_locked()) { |
| if ((err = c->stripe->open_write(c, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0) { |
| goto Lfailure; |
| } |
| // If there are multiple writers, then this one cannot be an update. |
| // Only the first writer can do an update. If that's the case, we can |
| // return success to the state machine now.; |
| if (c->od->has_multiple_writers()) { |
| goto Lmiss; |
| } |
| if (!c->stripe->directory.probe(key, c->stripe, &c->dir, &c->last_collision)) { |
| if (c->f.update) { |
| // fail update because vector has been GC'd |
| // This situation can also arise in openWriteStartDone |
| err = ECACHE_NO_DOC; |
| goto Lfailure; |
| } |
| // document doesn't exist, begin write |
| goto Lmiss; |
| } else { |
| c->od->reading_vec = true; |
| // document exists, read vector |
| SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone); |
| switch (c->do_read_call(&c->first_key)) { |
| case EVENT_DONE: |
| return ACTION_RESULT_DONE; |
| case EVENT_RETURN: |
| goto Lcallreturn; |
| default: |
| return &c->_action; |
| } |
| } |
| } |
| // missed lock |
| SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteStartDone); |
| CONT_SCHED_LOCK_RETRY(c); |
| return &c->_action; |
| } |
| |
| Lmiss: |
| SET_CONTINUATION_HANDLER(c, &CacheVC::openWriteMain); |
| c->callcont(CACHE_EVENT_OPEN_WRITE); |
| return ACTION_RESULT_DONE; |
| |
| Lfailure: |
| ts::Metrics::Counter::increment(cache_rsb.status[c->op_type].failure); |
| ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.status[c->op_type].failure); |
| cont->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, reinterpret_cast<void *>(-err)); |
| if (c->od) { |
| c->openWriteCloseDir(EVENT_IMMEDIATE, nullptr); |
| return ACTION_RESULT_DONE; |
| } |
| free_CacheVC(c); |
| return ACTION_RESULT_DONE; |
| |
| Lcallreturn: |
| if (c->handleEvent(AIO_EVENT_DONE, nullptr) == EVENT_DONE) { |
| return ACTION_RESULT_DONE; |
| } |
| return &c->_action; |
| } |
| |
| // CacheVConnection |
| CacheVConnection::CacheVConnection() : VConnection(nullptr) {} |
| |
| // if generic_host_rec.stripes == nullptr, what do we do??? |
| StripeSM * |
| Cache::key_to_stripe(const CacheKey *key, std::string_view hostname) const |
| { |
| ReplaceablePtr<CacheHostTable>::ScopedReader hosttable(&this->hosttable); |
| |
| uint32_t h = (key->slice32(2) >> DIR_TAG_WIDTH) % STRIPE_HASH_TABLE_SIZE; |
| unsigned short *hash_table = hosttable->gen_host_rec.vol_hash_table; |
| const CacheHostRecord *host_rec = &hosttable->gen_host_rec; |
| |
| if (hosttable->m_numEntries > 0 && !hostname.empty()) { |
| CacheHostResult res; |
| hosttable->Match(hostname, &res); |
| if (res.record) { |
| unsigned short *host_hash_table = res.record->vol_hash_table; |
| if (host_hash_table) { |
| if (dbg_ctl_cache_hosting.on()) { |
| char format_str[50]; |
| snprintf(format_str, sizeof(format_str), "Volume: %%xd for host: %%.%ds", static_cast<int>(hostname.length())); |
| Dbg(dbg_ctl_cache_hosting, format_str, res.record, hostname.data()); |
| } |
| return res.record->stripes[host_hash_table[h]]; |
| } |
| } |
| } |
| if (hash_table) { |
| if (dbg_ctl_cache_hosting.on()) { |
| char format_str[50]; |
| snprintf(format_str, sizeof(format_str), "Generic volume: %%xd for host: %%.%ds", static_cast<int>(hostname.length())); |
| Dbg(dbg_ctl_cache_hosting, format_str, host_rec, hostname.data()); |
| } |
| return host_rec->stripes[hash_table[h]]; |
| } else { |
| return host_rec->stripes[0]; |
| } |
| } |
| |
| int |
| FragmentSizeUpdateCb(const char * /* name ATS_UNUSED */, RecDataT /* data_type ATS_UNUSED */, RecData data, |
| void * /* cookie ATS_UNUSED */) |
| { |
| if (sizeof(Doc) >= static_cast<size_t>(data.rec_int) || static_cast<size_t>(data.rec_int) - sizeof(Doc) > MAX_FRAG_SIZE) { |
| Warning("The fragments size exceed the limitation, ignore: %" PRId64 ", %d", data.rec_int, cache_config_target_fragment_size); |
| return 0; |
| } |
| |
| cache_config_target_fragment_size = data.rec_int; |
| return 0; |
| } |
| |
| void |
| ink_cache_init(ts::ModuleVersion v) |
| { |
| ink_release_assert(v.check(CACHE_MODULE_VERSION)); |
| |
| RecEstablishStaticConfigInt(cache_config_ram_cache_size, "proxy.config.cache.ram_cache.size"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.ram_cache.size = %" PRId64 " = %" PRId64 "Mb", cache_config_ram_cache_size, |
| cache_config_ram_cache_size / (1024 * 1024)); |
| |
| RecEstablishStaticConfigInt32(cache_config_ram_cache_algorithm, "proxy.config.cache.ram_cache.algorithm"); |
| RecEstablishStaticConfigInt32(cache_config_ram_cache_compress, "proxy.config.cache.ram_cache.compress"); |
| RecEstablishStaticConfigInt32(cache_config_ram_cache_compress_percent, "proxy.config.cache.ram_cache.compress_percent"); |
| cache_config_ram_cache_use_seen_filter = RecGetRecordInt("proxy.config.cache.ram_cache.use_seen_filter").value_or(0); |
| |
| RecEstablishStaticConfigInt32(cache_config_http_max_alts, "proxy.config.cache.limits.http.max_alts"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.limits.http.max_alts = %d", cache_config_http_max_alts); |
| |
| RecEstablishStaticConfigInt32(cache_config_log_alternate_eviction, "proxy.config.cache.log.alternate.eviction"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.log.alternate.eviction = %d", cache_config_log_alternate_eviction); |
| |
| RecEstablishStaticConfigInt(cache_config_ram_cache_cutoff, "proxy.config.cache.ram_cache_cutoff"); |
| Dbg(dbg_ctl_cache_init, "cache_config_ram_cache_cutoff = %" PRId64 " = %" PRId64 "Mb", cache_config_ram_cache_cutoff, |
| cache_config_ram_cache_cutoff / (1024 * 1024)); |
| |
| RecEstablishStaticConfigInt32(cache_config_permit_pinning, "proxy.config.cache.permit.pinning"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.permit.pinning = %d", cache_config_permit_pinning); |
| |
| RecEstablishStaticConfigInt32(cache_config_dir_sync_frequency, "proxy.config.cache.dir.sync_frequency"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.dir.sync_frequency = %d", cache_config_dir_sync_frequency); |
| |
| RecEstablishStaticConfigInt32(cache_config_dir_sync_delay, "proxy.config.cache.dir.sync_delay"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.dir.sync_delay = %d", cache_config_dir_sync_delay); |
| |
| RecEstablishStaticConfigInt32(cache_config_dir_sync_max_write, "proxy.config.cache.dir.sync_max_write"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.dir.sync_max_write = %d", cache_config_dir_sync_max_write); |
| |
| RecEstablishStaticConfigInt32(cache_config_select_alternate, "proxy.config.cache.select_alternate"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.select_alternate = %d", cache_config_select_alternate); |
| |
| RecEstablishStaticConfigInt32(cache_config_max_doc_size, "proxy.config.cache.max_doc_size"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.max_doc_size = %d = %dMb", cache_config_max_doc_size, |
| cache_config_max_doc_size / (1024 * 1024)); |
| |
| RecEstablishStaticConfigInt32(cache_config_mutex_retry_delay, "proxy.config.cache.mutex_retry_delay"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.mutex_retry_delay = %dms", cache_config_mutex_retry_delay); |
| |
| RecEstablishStaticConfigInt32(cache_config_read_while_writer_max_retries, "proxy.config.cache.read_while_writer.max_retries"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.read_while_writer.max_retries = %d", cache_config_read_while_writer_max_retries); |
| |
| RecEstablishStaticConfigInt32(cache_read_while_writer_retry_delay, "proxy.config.cache.read_while_writer_retry.delay"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.read_while_writer_retry.delay = %dms", cache_read_while_writer_retry_delay); |
| |
| RecEstablishStaticConfigInt32(cache_config_hit_evacuate_percent, "proxy.config.cache.hit_evacuate_percent"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.hit_evacuate_percent = %d", cache_config_hit_evacuate_percent); |
| |
| RecEstablishStaticConfigInt32(cache_config_hit_evacuate_size_limit, "proxy.config.cache.hit_evacuate_size_limit"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.hit_evacuate_size_limit = %d", cache_config_hit_evacuate_size_limit); |
| |
| RecEstablishStaticConfigInt32(cache_config_force_sector_size, "proxy.config.cache.force_sector_size"); |
| |
| ink_assert(RecRegisterConfigUpdateCb("proxy.config.cache.target_fragment_size", FragmentSizeUpdateCb, nullptr) != REC_ERR_FAIL); |
| cache_config_target_fragment_size = RecGetRecordInt("proxy.config.cache.target_fragment_size").value_or(0); |
| |
| if (cache_config_target_fragment_size == 0) { |
| cache_config_target_fragment_size = DEFAULT_TARGET_FRAGMENT_SIZE; |
| } else if (cache_config_target_fragment_size - sizeof(Doc) > MAX_FRAG_SIZE) { |
| Warning("The fragments size exceed the limitation, setting to MAX_FRAG_SIZE (%ld)", MAX_FRAG_SIZE + sizeof(Doc)); |
| cache_config_target_fragment_size = MAX_FRAG_SIZE + sizeof(Doc); |
| } |
| |
| RecEstablishStaticConfigInt32(cache_config_max_disk_errors, "proxy.config.cache.max_disk_errors"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.max_disk_errors = %d", cache_config_max_disk_errors); |
| |
| RecEstablishStaticConfigInt32(cache_config_agg_write_backlog, "proxy.config.cache.agg_write_backlog"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.agg_write_backlog = %d", cache_config_agg_write_backlog); |
| |
| RecEstablishStaticConfigInt32(cache_config_enable_checksum, "proxy.config.cache.enable_checksum"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.enable_checksum = %d", cache_config_enable_checksum); |
| |
| RecEstablishStaticConfigInt32(cache_config_alt_rewrite_max_size, "proxy.config.cache.alt_rewrite_max_size"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.alt_rewrite_max_size = %d", cache_config_alt_rewrite_max_size); |
| |
| RecEstablishStaticConfigInt32(cache_config_read_while_writer, "proxy.config.cache.enable_read_while_writer"); |
| cache_config_read_while_writer = validate_rww(cache_config_read_while_writer); |
| RecRegisterConfigUpdateCb("proxy.config.cache.enable_read_while_writer", update_cache_config, nullptr); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.enable_read_while_writer = %d", cache_config_read_while_writer); |
| |
| register_cache_stats(&cache_rsb, "proxy.process.cache"); |
| |
| cacheProcessor.wait_for_cache = RecGetRecordInt("proxy.config.http.wait_for_cache").value_or(0); |
| |
| RecEstablishStaticConfigInt32(cache_config_persist_bad_disks, "proxy.config.cache.persist_bad_disks"); |
| Dbg(dbg_ctl_cache_init, "proxy.config.cache.persist_bad_disks = %d", cache_config_persist_bad_disks); |
| if (cache_config_persist_bad_disks) { |
| std::filesystem::path localstatedir{Layout::get()->localstatedir}; |
| std::filesystem::path bad_disks_path{localstatedir / ts::filename::BAD_DISKS}; |
| std::fstream bad_disks_file{bad_disks_path.c_str(), bad_disks_file.in}; |
| if (bad_disks_file.good()) { |
| for (std::string line; std::getline(bad_disks_file, line);) { |
| if (bad_disks_file.fail()) { |
| Error("Failed while trying to read known bad disks file: %s", bad_disks_path.c_str()); |
| break; |
| } |
| if (!line.empty()) { |
| known_bad_disks.insert(std::move(line)); |
| } |
| } |
| } |
| // not having a bad disks file is not an error. |
| |
| unsigned long known_bad_count = known_bad_disks.size(); |
| Warning("%lu previously known bad disks were recorded in %s. They will not be added to the cache.", known_bad_count, |
| bad_disks_path.c_str()); |
| } |
| |
| Result result = theCacheStore.read_config(); |
| if (result.failed()) { |
| Fatal("Failed to read cache configuration %s: %s", ts::filename::STORAGE, result.message()); |
| } |
| } |