| /** @file |
| |
| A brief file description |
| |
| @section license License |
| |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| #include "P_Cache.h" |
| |
| // Cache Inspector and State Pages |
| #include "P_CacheTest.h" |
| #include "StatPages.h" |
| |
| #include "tscore/I_Layout.h" |
| #include "tscore/Filenames.h" |
| |
| #include "HttpTransactCache.h" |
| #include "HttpSM.h" |
| #include "HttpCacheSM.h" |
| #include "InkAPIInternal.h" |
| |
| #include "tscore/hugepages.h" |
| |
| #include <atomic> |
| |
| constexpr ts::VersionNumber CACHE_DB_VERSION(CACHE_DB_MAJOR_VERSION, CACHE_DB_MINOR_VERSION); |
| |
| // Compilation Options |
| #define USELESS_REENABLES // allow them for now |
| // #define VERIFY_JTEST_DATA |
| |
| static size_t DEFAULT_RAM_CACHE_MULTIPLIER = 10; // I.e. 10x 1MB per 1GB of disk. |
| |
| // This is the oldest version number that is still usable. |
| static short int const CACHE_DB_MAJOR_VERSION_COMPATIBLE = 21; |
| |
| #define DOCACHE_CLEAR_DYN_STAT(x) \ |
| do { \ |
| RecSetRawStatSum(rsb, x, 0); \ |
| RecSetRawStatCount(rsb, x, 0); \ |
| } while (0); |
| |
| // Configuration |
| |
| int64_t cache_config_ram_cache_size = AUTO_SIZE_RAM_CACHE; |
| int cache_config_ram_cache_algorithm = 1; |
| int cache_config_ram_cache_compress = 0; |
| int cache_config_ram_cache_compress_percent = 90; |
| int cache_config_ram_cache_use_seen_filter = 1; |
| int cache_config_http_max_alts = 3; |
| int cache_config_log_alternate_eviction = 0; |
| int cache_config_dir_sync_frequency = 60; |
| int cache_config_permit_pinning = 0; |
| int cache_config_select_alternate = 1; |
| int cache_config_max_doc_size = 0; |
| int cache_config_min_average_object_size = ESTIMATED_OBJECT_SIZE; |
| int64_t cache_config_ram_cache_cutoff = AGG_SIZE; |
| int cache_config_max_disk_errors = 5; |
| int cache_config_hit_evacuate_percent = 10; |
| int cache_config_hit_evacuate_size_limit = 0; |
| int cache_config_force_sector_size = 0; |
| int cache_config_target_fragment_size = DEFAULT_TARGET_FRAGMENT_SIZE; |
| int cache_config_agg_write_backlog = AGG_SIZE * 2; |
| int cache_config_enable_checksum = 0; |
| int cache_config_alt_rewrite_max_size = 4096; |
| int cache_config_read_while_writer = 0; |
| int cache_config_mutex_retry_delay = 2; |
| int cache_read_while_writer_retry_delay = 50; |
| int cache_config_read_while_writer_max_retries = 10; |
| |
| // Globals |
| |
| RecRawStatBlock *cache_rsb = nullptr; |
| Cache *theCache = nullptr; |
| CacheDisk **gdisks = nullptr; |
| int gndisks = 0; |
| std::atomic<int> initialize_disk = 0; |
| Cache *caches[NUM_CACHE_FRAG_TYPES] = {nullptr}; |
| CacheSync *cacheDirSync = nullptr; |
| Store theCacheStore; |
| int CacheProcessor::initialized = CACHE_INITIALIZING; |
| uint32_t CacheProcessor::cache_ready = 0; |
| int CacheProcessor::start_done = 0; |
| bool CacheProcessor::clear = false; |
| bool CacheProcessor::fix = false; |
| bool CacheProcessor::check = false; |
| int CacheProcessor::start_internal_flags = 0; |
| int CacheProcessor::auto_clear_flag = 0; |
| CacheProcessor cacheProcessor; |
| Vol **gvol = nullptr; |
| std::atomic<int> gnvol = 0; |
| ClassAllocator<CacheVC> cacheVConnectionAllocator("cacheVConnection"); |
| ClassAllocator<EvacuationBlock> evacuationBlockAllocator("evacuationBlock"); |
| ClassAllocator<CacheRemoveCont> cacheRemoveContAllocator("cacheRemoveCont"); |
| ClassAllocator<EvacuationKey> evacuationKeyAllocator("evacuationKey"); |
| int CacheVC::size_to_init = -1; |
| CacheKey zero_key; |
| |
| struct VolInitInfo { |
| off_t recover_pos; |
| AIOCallbackInternal vol_aio[4]; |
| char *vol_h_f; |
| |
| VolInitInfo() |
| { |
| recover_pos = 0; |
| vol_h_f = static_cast<char *>(ats_memalign(ats_pagesize(), 4 * STORE_BLOCK_SIZE)); |
| memset(vol_h_f, 0, 4 * STORE_BLOCK_SIZE); |
| } |
| |
| ~VolInitInfo() |
| { |
| for (auto &i : vol_aio) { |
| i.action = nullptr; |
| i.mutex.clear(); |
| } |
| free(vol_h_f); |
| } |
| }; |
| |
| #if AIO_MODE == AIO_MODE_NATIVE |
| struct VolInit : public Continuation { |
| Vol *vol; |
| char *path; |
| off_t blocks; |
| int64_t offset; |
| bool vol_clear; |
| |
| int |
| mainEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) |
| { |
| vol->init(path, blocks, offset, vol_clear); |
| mutex.clear(); |
| delete this; |
| return EVENT_DONE; |
| } |
| |
| VolInit(Vol *v, char *p, off_t b, int64_t o, bool c) : Continuation(v->mutex), vol(v), path(p), blocks(b), offset(o), vol_clear(c) |
| { |
| SET_HANDLER(&VolInit::mainEvent); |
| } |
| }; |
| |
| struct DiskInit : public Continuation { |
| CacheDisk *disk; |
| char *s; |
| off_t blocks; |
| off_t askip; |
| int ahw_sector_size; |
| int fildes; |
| bool clear; |
| |
| int |
| mainEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) |
| { |
| disk->open(s, blocks, askip, ahw_sector_size, fildes, clear); |
| ats_free(s); |
| mutex.clear(); |
| delete this; |
| return EVENT_DONE; |
| } |
| |
| DiskInit(CacheDisk *d, char *str, off_t b, off_t skip, int sector, int f, bool c) |
| : Continuation(d->mutex), disk(d), s(ats_strdup(str)), blocks(b), askip(skip), ahw_sector_size(sector), fildes(f), clear(c) |
| { |
| SET_HANDLER(&DiskInit::mainEvent); |
| } |
| }; |
| #endif |
| void cplist_init(); |
| static void cplist_update(); |
| int cplist_reconfigure(); |
| static int create_volume(int volume_number, off_t size_in_blocks, int scheme, CacheVol *cp); |
| static void rebuild_host_table(Cache *cache); |
| void register_cache_stats(RecRawStatBlock *rsb, const char *prefix); |
| |
| // Global list of the volumes created |
| Queue<CacheVol> cp_list; |
| int cp_list_len = 0; |
| ConfigVolumes config_volumes; |
| |
| #if TS_HAS_TESTS |
| void |
| force_link_CacheTestCaller() |
| { |
| force_link_CacheTest(); |
| } |
| #endif |
| |
| int64_t |
| cache_bytes_used(int volume) |
| { |
| uint64_t used = 0; |
| |
| for (int i = 0; i < gnvol; i++) { |
| if (!DISK_BAD(gvol[i]->disk) && (volume == -1 || gvol[i]->cache_vol->vol_number == volume)) { |
| if (!gvol[i]->header->cycle) { |
| used += gvol[i]->header->write_pos - gvol[i]->start; |
| } else { |
| used += gvol[i]->len - gvol[i]->dirlen() - EVACUATION_SIZE; |
| } |
| } |
| } |
| |
| return used; |
| } |
| |
| int |
| cache_stats_bytes_used_cb(const char *name, RecDataT data_type, RecData *data, RecRawStatBlock *rsb, int id) |
| { |
| int volume = -1; |
| char *p; |
| |
| // Well, there's no way to pass along the volume ID, so extracting it from the stat name. |
| p = strstr(const_cast<char *>(name), "volume_"); |
| if (p != nullptr) { |
| // I'm counting on the compiler to optimize out strlen("volume_"). |
| volume = strtol(p + strlen("volume_"), nullptr, 10); |
| } |
| |
| if (cacheProcessor.initialized == CACHE_INITIALIZED) { |
| int64_t used, total = 0; |
| float percent_full; |
| |
| used = cache_bytes_used(volume); |
| RecSetGlobalRawStatSum(rsb, id, used); |
| RecRawStatSyncSum(name, data_type, data, rsb, id); |
| RecGetGlobalRawStatSum(rsb, static_cast<int>(cache_bytes_total_stat), &total); |
| percent_full = static_cast<float>(used) / static_cast<float>(total) * 100; |
| // The percent_full float below gets rounded down |
| RecSetGlobalRawStatSum(rsb, static_cast<int>(cache_percent_full_stat), static_cast<int64_t>(percent_full)); |
| } |
| |
| return 1; |
| } |
| |
| static int |
| validate_rww(int new_value) |
| { |
| if (new_value) { |
| float http_bg_fill; |
| |
| REC_ReadConfigFloat(http_bg_fill, "proxy.config.http.background_fill_completed_threshold"); |
| if (http_bg_fill > 0.0) { |
| Note("to enable reading while writing a document, %s should be 0.0: read while writing disabled", |
| "proxy.config.http.background_fill_completed_threshold"); |
| return 0; |
| } |
| if (cache_config_max_doc_size > 0) { |
| Note("to enable reading while writing a document, %s should be 0: read while writing disabled", |
| "proxy.config.cache.max_doc_size"); |
| return 0; |
| } |
| return new_value; |
| } |
| return 0; |
| } |
| |
| static int |
| update_cache_config(const char * /* name ATS_UNUSED */, RecDataT /* data_type ATS_UNUSED */, RecData data, |
| void * /* cookie ATS_UNUSED */) |
| { |
| int new_value = validate_rww(data.rec_int); |
| cache_config_read_while_writer = new_value; |
| |
| return 0; |
| } |
| |
| CacheVC::CacheVC() |
| { |
| size_to_init = sizeof(CacheVC) - (size_t) & ((CacheVC *)nullptr)->vio; |
| memset((void *)&vio, 0, size_to_init); |
| } |
| |
| HTTPInfo::FragOffset * |
| CacheVC::get_frag_table() |
| { |
| ink_assert(alternate.valid()); |
| return alternate.valid() ? alternate.get_frag_table() : nullptr; |
| } |
| |
| VIO * |
| CacheVC::do_io_read(Continuation *c, int64_t nbytes, MIOBuffer *abuf) |
| { |
| ink_assert(vio.op == VIO::READ); |
| vio.buffer.writer_for(abuf); |
| vio.set_continuation(c); |
| vio.ndone = 0; |
| vio.nbytes = nbytes; |
| vio.vc_server = this; |
| #ifdef DEBUG |
| ink_assert(!c || c->mutex->thread_holding); |
| #endif |
| if (c && !trigger && !recursive) { |
| trigger = c->mutex->thread_holding->schedule_imm_local(this); |
| } |
| return &vio; |
| } |
| |
| VIO * |
| CacheVC::do_io_pread(Continuation *c, int64_t nbytes, MIOBuffer *abuf, int64_t offset) |
| { |
| ink_assert(vio.op == VIO::READ); |
| vio.buffer.writer_for(abuf); |
| vio.set_continuation(c); |
| vio.ndone = 0; |
| vio.nbytes = nbytes; |
| vio.vc_server = this; |
| seek_to = offset; |
| #ifdef DEBUG |
| ink_assert(c->mutex->thread_holding); |
| #endif |
| if (!trigger && !recursive) { |
| trigger = c->mutex->thread_holding->schedule_imm_local(this); |
| } |
| return &vio; |
| } |
| |
| VIO * |
| CacheVC::do_io_write(Continuation *c, int64_t nbytes, IOBufferReader *abuf, bool owner) |
| { |
| ink_assert(vio.op == VIO::WRITE); |
| ink_assert(!owner); |
| vio.buffer.reader_for(abuf); |
| vio.set_continuation(c); |
| vio.ndone = 0; |
| vio.nbytes = nbytes; |
| vio.vc_server = this; |
| #ifdef DEBUG |
| ink_assert(!c || c->mutex->thread_holding); |
| #endif |
| if (c && !trigger && !recursive) { |
| trigger = c->mutex->thread_holding->schedule_imm_local(this); |
| } |
| return &vio; |
| } |
| |
| void |
| CacheVC::do_io_close(int alerrno) |
| { |
| ink_assert(mutex->thread_holding == this_ethread()); |
| int previous_closed = closed; |
| closed = (alerrno == -1) ? 1 : -1; // Stupid default arguments |
| DDebug("cache_close", "do_io_close %p %d %d", this, alerrno, closed); |
| if (!previous_closed && !recursive) { |
| die(); |
| } |
| } |
| |
| void |
| CacheVC::reenable(VIO *avio) |
| { |
| DDebug("cache_reenable", "reenable %p", this); |
| (void)avio; |
| #ifdef DEBUG |
| ink_assert(avio->mutex->thread_holding); |
| #endif |
| if (!trigger) { |
| #ifndef USELESS_REENABLES |
| if (vio.op == VIO::READ) { |
| if (vio.buffer.mbuf->max_read_avail() > vio.buffer.writer()->water_mark) |
| ink_assert(!"useless reenable of cache read"); |
| } else if (!vio.buffer.reader()->read_avail()) |
| ink_assert(!"useless reenable of cache write"); |
| #endif |
| trigger = avio->mutex->thread_holding->schedule_imm_local(this); |
| } |
| } |
| |
| void |
| CacheVC::reenable_re(VIO *avio) |
| { |
| DDebug("cache_reenable", "reenable_re %p", this); |
| (void)avio; |
| #ifdef DEBUG |
| ink_assert(avio->mutex->thread_holding); |
| #endif |
| if (!trigger) { |
| if (!is_io_in_progress() && !recursive) { |
| handleEvent(EVENT_NONE, (void *)nullptr); |
| } else { |
| trigger = avio->mutex->thread_holding->schedule_imm_local(this); |
| } |
| } |
| } |
| |
| bool |
| CacheVC::get_data(int i, void *data) |
| { |
| switch (i) { |
| case CACHE_DATA_HTTP_INFO: |
| *(static_cast<CacheHTTPInfo **>(data)) = &alternate; |
| return true; |
| case CACHE_DATA_RAM_CACHE_HIT_FLAG: |
| *(static_cast<int *>(data)) = !f.not_from_ram_cache; |
| return true; |
| default: |
| break; |
| } |
| return false; |
| } |
| |
| int64_t |
| CacheVC::get_object_size() |
| { |
| return (this)->doc_len; |
| } |
| |
| bool |
| CacheVC::set_data(int /* i ATS_UNUSED */, void * /* data */) |
| { |
| ink_assert(!"CacheVC::set_data should not be called!"); |
| return true; |
| } |
| |
| void |
| CacheVC::get_http_info(CacheHTTPInfo **ainfo) |
| { |
| *ainfo = &(this)->alternate; |
| } |
| |
| // set_http_info must be called before do_io_write |
| // cluster vc does an optimization where it calls do_io_write() before |
| // calling set_http_info(), but it guarantees that the info will |
| // be set before transferring any bytes |
| void |
| CacheVC::set_http_info(CacheHTTPInfo *ainfo) |
| { |
| ink_assert(!total_len); |
| if (f.update) { |
| ainfo->object_key_set(update_key); |
| ainfo->object_size_set(update_len); |
| } else { |
| ainfo->object_key_set(earliest_key); |
| // don't know the total len yet |
| } |
| |
| MIMEField *field = ainfo->m_alt->m_response_hdr.field_find(MIME_FIELD_CONTENT_LENGTH, MIME_LEN_CONTENT_LENGTH); |
| if (field && !field->value_get_int64()) { |
| f.allow_empty_doc = 1; |
| // Set the object size here to zero in case this is a cache replace where the new object |
| // length is zero but the old object was not. |
| ainfo->object_size_set(0); |
| } else { |
| f.allow_empty_doc = 0; |
| } |
| |
| alternate.copy_shallow(ainfo); |
| ainfo->clear(); |
| } |
| |
| bool |
| CacheVC::set_pin_in_cache(time_t time_pin) |
| { |
| if (total_len) { |
| ink_assert(!"should Pin the document before writing"); |
| return false; |
| } |
| if (vio.op != VIO::WRITE) { |
| ink_assert(!"Pinning only allowed while writing objects to the cache"); |
| return false; |
| } |
| pin_in_cache = time_pin; |
| return true; |
| } |
| |
| time_t |
| CacheVC::get_pin_in_cache() |
| { |
| return pin_in_cache; |
| } |
| |
| int |
| Vol::begin_read(CacheVC *cont) |
| { |
| ink_assert(cont->mutex->thread_holding == this_ethread()); |
| ink_assert(mutex->thread_holding == this_ethread()); |
| #ifdef CACHE_STAT_PAGES |
| ink_assert(!cont->stat_link.next && !cont->stat_link.prev); |
| stat_cache_vcs.enqueue(cont, cont->stat_link); |
| #endif |
| // no need for evacuation as the entire document is already in memory |
| if (cont->f.single_fragment) { |
| return 0; |
| } |
| int i = dir_evac_bucket(&cont->earliest_dir); |
| EvacuationBlock *b; |
| for (b = evacuate[i].head; b; b = b->link.next) { |
| if (dir_offset(&b->dir) != dir_offset(&cont->earliest_dir)) { |
| continue; |
| } |
| if (b->readers) { |
| b->readers = b->readers + 1; |
| } |
| return 0; |
| } |
| // we don't actually need to preserve this block as it is already in |
| // memory, but this is easier, and evacuations are rare |
| EThread *t = cont->mutex->thread_holding; |
| b = new_EvacuationBlock(t); |
| b->readers = 1; |
| b->dir = cont->earliest_dir; |
| b->evac_frags.key = cont->earliest_key; |
| evacuate[i].push(b); |
| return 1; |
| } |
| |
| int |
| Vol::close_read(CacheVC *cont) |
| { |
| EThread *t = cont->mutex->thread_holding; |
| ink_assert(t == this_ethread()); |
| ink_assert(t == mutex->thread_holding); |
| if (dir_is_empty(&cont->earliest_dir)) { |
| return 1; |
| } |
| int i = dir_evac_bucket(&cont->earliest_dir); |
| EvacuationBlock *b; |
| for (b = evacuate[i].head; b;) { |
| EvacuationBlock *next = b->link.next; |
| if (dir_offset(&b->dir) != dir_offset(&cont->earliest_dir)) { |
| b = next; |
| continue; |
| } |
| if (b->readers && !--b->readers) { |
| evacuate[i].remove(b); |
| free_EvacuationBlock(b, t); |
| break; |
| } |
| b = next; |
| } |
| #ifdef CACHE_STAT_PAGES |
| stat_cache_vcs.remove(cont, cont->stat_link); |
| ink_assert(!cont->stat_link.next && !cont->stat_link.prev); |
| #endif |
| return 1; |
| } |
| |
| // Cache Processor |
| |
| int |
| CacheProcessor::start(int, size_t) |
| { |
| return start_internal(0); |
| } |
| |
| static const int DEFAULT_CACHE_OPTIONS = (O_RDWR); |
| |
| int |
| CacheProcessor::start_internal(int flags) |
| { |
| ink_assert((int)TS_EVENT_CACHE_OPEN_READ == (int)CACHE_EVENT_OPEN_READ); |
| ink_assert((int)TS_EVENT_CACHE_OPEN_READ_FAILED == (int)CACHE_EVENT_OPEN_READ_FAILED); |
| ink_assert((int)TS_EVENT_CACHE_OPEN_WRITE == (int)CACHE_EVENT_OPEN_WRITE); |
| ink_assert((int)TS_EVENT_CACHE_OPEN_WRITE_FAILED == (int)CACHE_EVENT_OPEN_WRITE_FAILED); |
| ink_assert((int)TS_EVENT_CACHE_REMOVE == (int)CACHE_EVENT_REMOVE); |
| ink_assert((int)TS_EVENT_CACHE_REMOVE_FAILED == (int)CACHE_EVENT_REMOVE_FAILED); |
| ink_assert((int)TS_EVENT_CACHE_SCAN == (int)CACHE_EVENT_SCAN); |
| ink_assert((int)TS_EVENT_CACHE_SCAN_FAILED == (int)CACHE_EVENT_SCAN_FAILED); |
| ink_assert((int)TS_EVENT_CACHE_SCAN_OBJECT == (int)CACHE_EVENT_SCAN_OBJECT); |
| ink_assert((int)TS_EVENT_CACHE_SCAN_OPERATION_BLOCKED == (int)CACHE_EVENT_SCAN_OPERATION_BLOCKED); |
| ink_assert((int)TS_EVENT_CACHE_SCAN_OPERATION_FAILED == (int)CACHE_EVENT_SCAN_OPERATION_FAILED); |
| ink_assert((int)TS_EVENT_CACHE_SCAN_DONE == (int)CACHE_EVENT_SCAN_DONE); |
| #if AIO_MODE == AIO_MODE_NATIVE |
| for (EThread *et : eventProcessor.active_group_threads(ET_NET)) { |
| et->diskHandler = new DiskHandler(); |
| et->schedule_imm(et->diskHandler); |
| } |
| #endif |
| start_internal_flags = flags; |
| clear = !!(flags & PROCESSOR_RECONFIGURE) || auto_clear_flag; |
| fix = !!(flags & PROCESSOR_FIX); |
| check = (flags & PROCESSOR_CHECK) != 0; |
| start_done = 0; |
| |
| /* read the config file and create the data structures corresponding |
| to the file */ |
| gndisks = theCacheStore.n_disks; |
| gdisks = static_cast<CacheDisk **>(ats_malloc(gndisks * sizeof(CacheDisk *))); |
| |
| // Temporaries to carry values between loops |
| char **paths = static_cast<char **>(alloca(sizeof(char *) * gndisks)); |
| memset(paths, 0, sizeof(char *) * gndisks); |
| int *fds = static_cast<int *>(alloca(sizeof(int) * gndisks)); |
| memset(fds, 0, sizeof(int) * gndisks); |
| int *sector_sizes = static_cast<int *>(alloca(sizeof(int) * gndisks)); |
| memset(sector_sizes, 0, sizeof(int) * gndisks); |
| Span **sds = static_cast<Span **>(alloca(sizeof(Span *) * gndisks)); |
| memset(sds, 0, sizeof(Span *) * gndisks); |
| |
| gndisks = 0; |
| ink_aio_set_callback(new AIO_Callback_handler()); |
| |
| config_volumes.read_config_file(); |
| |
| /* |
| create CacheDisk objects for each span in the configuration file and store in gdisks |
| */ |
| for (unsigned i = 0; i < theCacheStore.n_disks; i++) { |
| Span *sd = theCacheStore.disk[i]; |
| int opts = DEFAULT_CACHE_OPTIONS; |
| |
| if (!paths[gndisks]) { |
| paths[gndisks] = static_cast<char *>(alloca(PATH_NAME_MAX)); |
| } |
| ink_strlcpy(paths[gndisks], sd->pathname, PATH_NAME_MAX); |
| if (!sd->file_pathname) { |
| ink_strlcat(paths[gndisks], "/cache.db", PATH_NAME_MAX); |
| opts |= O_CREAT; |
| } |
| |
| #ifdef O_DIRECT |
| opts |= O_DIRECT; |
| #endif |
| #ifdef O_DSYNC |
| opts |= O_DSYNC; |
| #endif |
| if (check) { |
| opts &= ~O_CREAT; |
| opts |= O_RDONLY; |
| } |
| |
| int fd = open(paths[gndisks], opts, 0644); |
| int64_t blocks = sd->blocks; |
| |
| if (fd < 0 && (opts & O_CREAT)) { // Try without O_DIRECT if this is a file on filesystem, e.g. tmpfs. |
| fd = open(paths[gndisks], DEFAULT_CACHE_OPTIONS | O_CREAT, 0644); |
| } |
| |
| if (fd >= 0) { |
| bool diskok = true; |
| if (!sd->file_pathname) { |
| if (!check) { |
| if (ftruncate(fd, blocks * STORE_BLOCK_SIZE) < 0) { |
| Warning("unable to truncate cache file '%s' to %" PRId64 " blocks", paths[gndisks], blocks); |
| diskok = false; |
| } |
| } else { // read-only mode checks |
| struct stat sbuf; |
| if (-1 == fstat(fd, &sbuf)) { |
| fprintf(stderr, "Failed to stat cache file for directory %s\n", paths[gndisks]); |
| diskok = false; |
| } else if (blocks != sbuf.st_size / STORE_BLOCK_SIZE) { |
| fprintf(stderr, "Cache file for directory %s is %" PRId64 " bytes, expected %" PRId64 "\n", paths[gndisks], |
| sbuf.st_size, blocks * static_cast<int64_t>(STORE_BLOCK_SIZE)); |
| diskok = false; |
| } |
| } |
| } |
| if (diskok) { |
| int sector_size = sd->hw_sector_size; |
| |
| gdisks[gndisks] = new CacheDisk(); |
| if (check) { |
| gdisks[gndisks]->read_only_p = true; |
| } |
| gdisks[gndisks]->forced_volume_num = sd->forced_volume_num; |
| if (sd->hash_base_string) { |
| gdisks[gndisks]->hash_base_string = ats_strdup(sd->hash_base_string); |
| } |
| |
| if (sector_size < cache_config_force_sector_size) { |
| sector_size = cache_config_force_sector_size; |
| } |
| |
| // It's actually common that the hardware I/O size is larger than the store block size as |
| // storage systems increasingly want larger I/Os. For example, on macOS, the filesystem block |
| // size is always reported as 1MB. |
| if (sd->hw_sector_size <= 0 || sector_size > STORE_BLOCK_SIZE) { |
| Note("resetting hardware sector size from %d to %d", sector_size, STORE_BLOCK_SIZE); |
| sector_size = STORE_BLOCK_SIZE; |
| } |
| sector_sizes[gndisks] = sector_size; |
| fds[gndisks] = fd; |
| sds[gndisks] = sd; |
| fd = -1; |
| gndisks++; |
| } |
| } else { |
| if (errno == EINVAL) { |
| Warning("cache unable to open '%s': It must be placed on a file system that supports direct I/O.", paths[gndisks]); |
| } else { |
| Warning("cache unable to open '%s': %s", paths[gndisks], strerror(errno)); |
| } |
| } |
| if (fd >= 0) { |
| close(fd); |
| } |
| } |
| |
| // Before we kick off asynchronous operations, make sure sufficient disks are available and we don't just shutdown |
| // Exiting with background threads in operation will likely cause a seg fault |
| start_done = 1; |
| |
| if (gndisks == 0) { |
| CacheProcessor::initialized = CACHE_INIT_FAILED; |
| // Have to do this here because no IO events were scheduled and so @c diskInitialized() won't be called. |
| if (cb_after_init) { |
| cb_after_init(); |
| } |
| |
| if (this->waitForCache() > 1) { |
| Emergency("Cache initialization failed - no disks available but cache required"); |
| } else { |
| Warning("unable to open cache disk(s): Cache Disabled\n"); |
| return -1; // pointless, AFAICT this is ignored. |
| } |
| } else if (this->waitForCache() == 3 && static_cast<unsigned int>(gndisks) < theCacheStore.n_disks_in_config) { |
| CacheProcessor::initialized = CACHE_INIT_FAILED; |
| if (cb_after_init) { |
| cb_after_init(); |
| } |
| Emergency("Cache initialization failed - only %d out of %d disks were valid and all were required.", gndisks, |
| theCacheStore.n_disks_in_config); |
| } else if (this->waitForCache() == 2 && static_cast<unsigned int>(gndisks) < theCacheStore.n_disks_in_config) { |
| Warning("Cache initialization incomplete - only %d out of %d disks were valid.", gndisks, theCacheStore.n_disks_in_config); |
| } |
| |
| // If we got here, we have enough disks to proceed |
| for (int j = 0; j < gndisks; j++) { |
| Span *sd = sds[j]; |
| ink_release_assert(sds[j] != nullptr); // Defeat clang-analyzer |
| off_t skip = ROUND_TO_STORE_BLOCK((sd->offset < START_POS ? START_POS + sd->alignment : sd->offset)); |
| int64_t blocks = sd->blocks - (skip >> STORE_BLOCK_SHIFT); |
| #if AIO_MODE == AIO_MODE_NATIVE |
| eventProcessor.schedule_imm(new DiskInit(gdisks[j], paths[j], blocks, skip, sector_sizes[j], fds[j], clear)); |
| #else |
| gdisks[j]->open(paths[j], blocks, skip, sector_sizes[j], fds[j], clear); |
| #endif |
| |
| Debug("cache_hosting", "Disk: %d:%s, blocks: %" PRId64 "", gndisks, paths[j], blocks); |
| } |
| |
| return 0; |
| } |
| |
| void |
| CacheProcessor::diskInitialized() |
| { |
| int n_init = initialize_disk++; |
| int bad_disks = 0; |
| int res = 0; |
| int i; |
| |
| // Wait for all the cache disks are initialized |
| if (n_init != gndisks - 1) { |
| return; |
| } |
| |
| // Check and remove bad disks from gdisks[] |
| for (i = 0; i < gndisks; i++) { |
| if (DISK_BAD(gdisks[i])) { |
| delete gdisks[i]; |
| gdisks[i] = nullptr; |
| bad_disks++; |
| } else if (bad_disks > 0) { |
| gdisks[i - bad_disks] = gdisks[i]; |
| gdisks[i] = nullptr; |
| } |
| } |
| if (bad_disks > 0) { |
| // Update the number of available cache disks |
| gndisks -= bad_disks; |
| // Check if this is a fatal error |
| if (this->waitForCache() == 3 || (0 == gndisks && this->waitForCache() == 2)) { |
| // This could be passed off to @c cacheInitialized (as with volume config problems) but I think |
| // the more specific error message here is worth the extra code. |
| CacheProcessor::initialized = CACHE_INIT_FAILED; |
| if (cb_after_init) { |
| cb_after_init(); |
| } |
| Emergency("Cache initialization failed - only %d of %d disks were available.", gndisks, theCacheStore.n_disks_in_config); |
| } else if (this->waitForCache() == 2) { |
| Warning("Cache initialization incomplete - only %d of %d disks were available.", gndisks, theCacheStore.n_disks_in_config); |
| } |
| } |
| |
| /* Practically just took all bad_disks offline so update the stats. */ |
| RecSetGlobalRawStatSum(cache_rsb, cache_span_offline_stat, bad_disks); |
| RecIncrGlobalRawStat(cache_rsb, cache_span_failing_stat, -bad_disks); |
| RecSetGlobalRawStatSum(cache_rsb, cache_span_online_stat, gndisks); |
| |
| /* create the cachevol list only if num volumes are greater than 0. */ |
| if (config_volumes.num_volumes == 0) { |
| /* if no volumes, default to just an http cache */ |
| res = cplist_reconfigure(); |
| } else { |
| // else |
| /* create the cachevol list. */ |
| cplist_init(); |
| /* now change the cachevol list based on the config file */ |
| res = cplist_reconfigure(); |
| } |
| |
| if (res == -1) { |
| /* problems initializing the volume.config. Punt */ |
| gnvol = 0; |
| cacheInitialized(); |
| return; |
| } else { |
| CacheVol *cp = cp_list.head; |
| for (; cp; cp = cp->link.next) { |
| cp->vol_rsb = RecAllocateRawStatBlock(static_cast<int>(cache_stat_count)); |
| char vol_stat_str_prefix[256]; |
| snprintf(vol_stat_str_prefix, sizeof(vol_stat_str_prefix), "proxy.process.cache.volume_%d", cp->vol_number); |
| register_cache_stats(cp->vol_rsb, vol_stat_str_prefix); |
| } |
| } |
| |
| gvol = static_cast<Vol **>(ats_malloc(gnvol * sizeof(Vol *))); |
| memset(gvol, 0, gnvol * sizeof(Vol *)); |
| gnvol = 0; |
| for (i = 0; i < gndisks; i++) { |
| CacheDisk *d = gdisks[i]; |
| if (is_debug_tag_set("cache_hosting")) { |
| int j; |
| Debug("cache_hosting", "Disk: %d:%s: Vol Blocks: %u: Free space: %" PRIu64, i, d->path, d->header->num_diskvol_blks, |
| d->free_space); |
| for (j = 0; j < static_cast<int>(d->header->num_volumes); j++) { |
| Debug("cache_hosting", "\tVol: %d Size: %" PRIu64, d->disk_vols[j]->vol_number, d->disk_vols[j]->size); |
| } |
| for (j = 0; j < static_cast<int>(d->header->num_diskvol_blks); j++) { |
| Debug("cache_hosting", "\tBlock No: %d Size: %" PRIu64 " Free: %u", d->header->vol_info[j].number, |
| d->header->vol_info[j].len, d->header->vol_info[j].free); |
| } |
| } |
| if (!check) { |
| d->sync(); |
| } |
| } |
| if (config_volumes.num_volumes == 0) { |
| theCache = new Cache(); |
| theCache->scheme = CACHE_HTTP_TYPE; |
| theCache->open(clear, fix); |
| return; |
| } |
| if (config_volumes.num_http_volumes != 0) { |
| theCache = new Cache(); |
| theCache->scheme = CACHE_HTTP_TYPE; |
| theCache->open(clear, fix); |
| } |
| } |
| |
| void |
| CacheProcessor::cacheInitialized() |
| { |
| int i; |
| |
| if (theCache && (theCache->ready == CACHE_INITIALIZING)) { |
| return; |
| } |
| |
| int caches_ready = 0; |
| int cache_init_ok = 0; |
| /* allocate ram size in proportion to the disk space the |
| volume occupies */ |
| int64_t total_size = 0; // count in HTTP & MIXT |
| uint64_t total_cache_bytes = 0; // bytes that can used in total_size |
| uint64_t total_direntries = 0; // all the direntries in the cache |
| uint64_t used_direntries = 0; // and used |
| uint64_t vol_total_cache_bytes = 0; |
| uint64_t vol_total_direntries = 0; |
| uint64_t vol_used_direntries = 0; |
| Vol *vol; |
| |
| ProxyMutex *mutex = this_ethread()->mutex.get(); |
| |
| if (theCache) { |
| total_size += theCache->cache_size; |
| Debug("cache_init", "CacheProcessor::cacheInitialized - theCache, total_size = %" PRId64 " = %" PRId64 " MB", total_size, |
| total_size / ((1024 * 1024) / STORE_BLOCK_SIZE)); |
| if (theCache->ready == CACHE_INIT_FAILED) { |
| Debug("cache_init", "CacheProcessor::cacheInitialized - failed to initialize the cache for http: cache disabled"); |
| Warning("failed to initialize the cache for http: cache disabled\n"); |
| } else { |
| caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_HTTP); |
| caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_NONE); |
| caches[CACHE_FRAG_TYPE_HTTP] = theCache; |
| caches[CACHE_FRAG_TYPE_NONE] = theCache; |
| } |
| } |
| |
| // Update stripe version data. |
| if (gnvol) { // start with whatever the first stripe is. |
| cacheProcessor.min_stripe_version = cacheProcessor.max_stripe_version = gvol[0]->header->version; |
| } |
| // scan the rest of the stripes. |
| for (i = 1; i < gnvol; i++) { |
| Vol *v = gvol[i]; |
| if (v->header->version < cacheProcessor.min_stripe_version) { |
| cacheProcessor.min_stripe_version = v->header->version; |
| } |
| if (cacheProcessor.max_stripe_version < v->header->version) { |
| cacheProcessor.max_stripe_version = v->header->version; |
| } |
| } |
| |
| if (caches_ready) { |
| Debug("cache_init", "CacheProcessor::cacheInitialized - caches_ready=0x%0X, gnvol=%d", (unsigned int)caches_ready, |
| gnvol.load()); |
| |
| int64_t ram_cache_bytes = 0; |
| |
| if (gnvol) { |
| // new ram_caches, with algorithm from the config |
| for (i = 0; i < gnvol; i++) { |
| switch (cache_config_ram_cache_algorithm) { |
| default: |
| case RAM_CACHE_ALGORITHM_CLFUS: |
| gvol[i]->ram_cache = new_RamCacheCLFUS(); |
| break; |
| case RAM_CACHE_ALGORITHM_LRU: |
| gvol[i]->ram_cache = new_RamCacheLRU(); |
| break; |
| } |
| } |
| // let us calculate the Size |
| if (cache_config_ram_cache_size == AUTO_SIZE_RAM_CACHE) { |
| Debug("cache_init", "CacheProcessor::cacheInitialized - cache_config_ram_cache_size == AUTO_SIZE_RAM_CACHE"); |
| for (i = 0; i < gnvol; i++) { |
| vol = gvol[i]; |
| |
| if (gvol[i]->cache_vol->ramcache_enabled) { |
| gvol[i]->ram_cache->init(vol->dirlen() * DEFAULT_RAM_CACHE_MULTIPLIER, vol); |
| ram_cache_bytes += gvol[i]->dirlen(); |
| Debug("cache_init", "CacheProcessor::cacheInitialized - ram_cache_bytes = %" PRId64 " = %" PRId64 "Mb", ram_cache_bytes, |
| ram_cache_bytes / (1024 * 1024)); |
| CACHE_VOL_SUM_DYN_STAT(cache_ram_cache_bytes_total_stat, (int64_t)gvol[i]->dirlen()); |
| } |
| vol_total_cache_bytes = gvol[i]->len - gvol[i]->dirlen(); |
| total_cache_bytes += vol_total_cache_bytes; |
| Debug("cache_init", "CacheProcessor::cacheInitialized - total_cache_bytes = %" PRId64 " = %" PRId64 "Mb", |
| total_cache_bytes, total_cache_bytes / (1024 * 1024)); |
| |
| CACHE_VOL_SUM_DYN_STAT(cache_bytes_total_stat, vol_total_cache_bytes); |
| |
| vol_total_direntries = gvol[i]->buckets * gvol[i]->segments * DIR_DEPTH; |
| total_direntries += vol_total_direntries; |
| CACHE_VOL_SUM_DYN_STAT(cache_direntries_total_stat, vol_total_direntries); |
| |
| vol_used_direntries = dir_entries_used(gvol[i]); |
| CACHE_VOL_SUM_DYN_STAT(cache_direntries_used_stat, vol_used_direntries); |
| used_direntries += vol_used_direntries; |
| } |
| |
| } else { |
| // we got configured memory size |
| // TODO, should we check the available system memories, or you will |
| // OOM or swapout, that is not a good situation for the server |
| Debug("cache_init", "CacheProcessor::cacheInitialized - %" PRId64 " != AUTO_SIZE_RAM_CACHE", cache_config_ram_cache_size); |
| int64_t http_ram_cache_size = |
| (theCache) ? |
| static_cast<int64_t>((static_cast<double>(theCache->cache_size) / total_size) * cache_config_ram_cache_size) : |
| 0; |
| Debug("cache_init", "CacheProcessor::cacheInitialized - http_ram_cache_size = %" PRId64 " = %" PRId64 "Mb", |
| http_ram_cache_size, http_ram_cache_size / (1024 * 1024)); |
| int64_t stream_ram_cache_size = cache_config_ram_cache_size - http_ram_cache_size; |
| Debug("cache_init", "CacheProcessor::cacheInitialized - stream_ram_cache_size = %" PRId64 " = %" PRId64 "Mb", |
| stream_ram_cache_size, stream_ram_cache_size / (1024 * 1024)); |
| |
| // Dump some ram_cache size information in debug mode. |
| Debug("ram_cache", "config: size = %" PRId64 ", cutoff = %" PRId64 "", cache_config_ram_cache_size, |
| cache_config_ram_cache_cutoff); |
| |
| for (i = 0; i < gnvol; i++) { |
| vol = gvol[i]; |
| double factor; |
| if (gvol[i]->cache == theCache && gvol[i]->cache_vol->ramcache_enabled) { |
| ink_assert(gvol[i]->cache != nullptr); |
| factor = static_cast<double>(static_cast<int64_t>(gvol[i]->len >> STORE_BLOCK_SHIFT)) / theCache->cache_size; |
| Debug("cache_init", "CacheProcessor::cacheInitialized - factor = %f", factor); |
| gvol[i]->ram_cache->init(static_cast<int64_t>(http_ram_cache_size * factor), vol); |
| ram_cache_bytes += static_cast<int64_t>(http_ram_cache_size * factor); |
| CACHE_VOL_SUM_DYN_STAT(cache_ram_cache_bytes_total_stat, (int64_t)(http_ram_cache_size * factor)); |
| } else if (gvol[i]->cache_vol->ramcache_enabled) { |
| ink_release_assert(!"Unexpected non-HTTP cache volume"); |
| } |
| Debug("cache_init", "CacheProcessor::cacheInitialized[%d] - ram_cache_bytes = %" PRId64 " = %" PRId64 "Mb", i, |
| ram_cache_bytes, ram_cache_bytes / (1024 * 1024)); |
| vol_total_cache_bytes = gvol[i]->len - gvol[i]->dirlen(); |
| total_cache_bytes += vol_total_cache_bytes; |
| CACHE_VOL_SUM_DYN_STAT(cache_bytes_total_stat, vol_total_cache_bytes); |
| Debug("cache_init", "CacheProcessor::cacheInitialized - total_cache_bytes = %" PRId64 " = %" PRId64 "Mb", |
| total_cache_bytes, total_cache_bytes / (1024 * 1024)); |
| |
| vol_total_direntries = gvol[i]->buckets * gvol[i]->segments * DIR_DEPTH; |
| total_direntries += vol_total_direntries; |
| CACHE_VOL_SUM_DYN_STAT(cache_direntries_total_stat, vol_total_direntries); |
| |
| vol_used_direntries = dir_entries_used(gvol[i]); |
| CACHE_VOL_SUM_DYN_STAT(cache_direntries_used_stat, vol_used_direntries); |
| used_direntries += vol_used_direntries; |
| } |
| } |
| switch (cache_config_ram_cache_compress) { |
| default: |
| Fatal("unknown RAM cache compression type: %d", cache_config_ram_cache_compress); |
| case CACHE_COMPRESSION_NONE: |
| case CACHE_COMPRESSION_FASTLZ: |
| break; |
| case CACHE_COMPRESSION_LIBZ: |
| #ifndef HAVE_ZLIB_H |
| Fatal("libz not available for RAM cache compression"); |
| #endif |
| break; |
| case CACHE_COMPRESSION_LIBLZMA: |
| #ifndef HAVE_LZMA_H |
| Fatal("lzma not available for RAM cache compression"); |
| #endif |
| break; |
| } |
| |
| GLOBAL_CACHE_SET_DYN_STAT(cache_ram_cache_bytes_total_stat, ram_cache_bytes); |
| GLOBAL_CACHE_SET_DYN_STAT(cache_bytes_total_stat, total_cache_bytes); |
| GLOBAL_CACHE_SET_DYN_STAT(cache_direntries_total_stat, total_direntries); |
| GLOBAL_CACHE_SET_DYN_STAT(cache_direntries_used_stat, used_direntries); |
| if (!check) { |
| dir_sync_init(); |
| } |
| cache_init_ok = 1; |
| } else { |
| Warning("cache unable to open any vols, disabled"); |
| } |
| } |
| if (cache_init_ok) { |
| // Initialize virtual cache |
| CacheProcessor::initialized = CACHE_INITIALIZED; |
| CacheProcessor::cache_ready = caches_ready; |
| Note("cache enabled"); |
| } else { |
| CacheProcessor::initialized = CACHE_INIT_FAILED; |
| Note("cache disabled"); |
| } |
| |
| // Fire callback to signal initialization finished. |
| if (cb_after_init) { |
| cb_after_init(); |
| } |
| |
| // TS-3848 |
| if (CACHE_INIT_FAILED == CacheProcessor::initialized && cacheProcessor.waitForCache() > 1) { |
| Emergency("Cache initialization failed with cache required, exiting."); |
| } |
| } |
| |
| void |
| CacheProcessor::stop() |
| { |
| } |
| |
| int |
| CacheProcessor::dir_check(bool afix) |
| { |
| for (int i = 0; i < gnvol; i++) { |
| gvol[i]->dir_check(afix); |
| } |
| return 0; |
| } |
| |
| int |
| CacheProcessor::db_check(bool afix) |
| { |
| for (int i = 0; i < gnvol; i++) { |
| gvol[i]->db_check(afix); |
| } |
| return 0; |
| } |
| |
| Action * |
| CacheProcessor::lookup(Continuation *cont, const CacheKey *key, CacheFragType frag_type, const char *hostname, int host_len) |
| { |
| return caches[frag_type]->lookup(cont, key, frag_type, hostname, host_len); |
| } |
| |
| Action * |
| CacheProcessor::open_read(Continuation *cont, const CacheKey *key, CacheFragType frag_type, const char *hostname, int hostlen) |
| { |
| return caches[frag_type]->open_read(cont, key, frag_type, hostname, hostlen); |
| } |
| |
| Action * |
| CacheProcessor::open_write(Continuation *cont, CacheKey *key, CacheFragType frag_type, int expected_size ATS_UNUSED, int options, |
| time_t pin_in_cache, char *hostname, int host_len) |
| { |
| return caches[frag_type]->open_write(cont, key, frag_type, options, pin_in_cache, hostname, host_len); |
| } |
| |
| Action * |
| CacheProcessor::remove(Continuation *cont, const CacheKey *key, CacheFragType frag_type, const char *hostname, int host_len) |
| { |
| Debug("cache_remove", "[CacheProcessor::remove] Issuing cache delete for %u", cache_hash(*key)); |
| return caches[frag_type]->remove(cont, key, frag_type, hostname, host_len); |
| } |
| |
| Action * |
| CacheProcessor::lookup(Continuation *cont, const HttpCacheKey *key, CacheFragType frag_type) |
| { |
| return lookup(cont, &key->hash, frag_type, key->hostname, key->hostlen); |
| } |
| |
| Action * |
| CacheProcessor::scan(Continuation *cont, char *hostname, int host_len, int KB_per_second) |
| { |
| return caches[CACHE_FRAG_TYPE_HTTP]->scan(cont, hostname, host_len, KB_per_second); |
| } |
| |
| int |
| CacheProcessor::IsCacheEnabled() |
| { |
| return CacheProcessor::initialized; |
| } |
| |
| bool |
| CacheProcessor::IsCacheReady(CacheFragType type) |
| { |
| if (IsCacheEnabled() != CACHE_INITIALIZED) { |
| return false; |
| } |
| return static_cast<bool>(cache_ready & (1 << type)); |
| } |
| |
| int |
| Vol::db_check(bool /* fix ATS_UNUSED */) |
| { |
| char tt[256]; |
| printf(" Data for [%s]\n", hash_text.get()); |
| printf(" Length: %" PRIu64 "\n", static_cast<uint64_t>(len)); |
| printf(" Write Position: %" PRIu64 "\n", static_cast<uint64_t>(header->write_pos - skip)); |
| printf(" Phase: %d\n", static_cast<int>(!!header->phase)); |
| ink_ctime_r(&header->create_time, tt); |
| tt[strlen(tt) - 1] = 0; |
| printf(" Create Time: %s\n", tt); |
| printf(" Sync Serial: %u\n", static_cast<unsigned int>(header->sync_serial)); |
| printf(" Write Serial: %u\n", static_cast<unsigned int>(header->write_serial)); |
| printf("\n"); |
| |
| return 0; |
| } |
| |
| static void |
| vol_init_data_internal(Vol *d) |
| { |
| // step1: calculate the number of entries. |
| off_t total_entries = (d->len - (d->start - d->skip)) / cache_config_min_average_object_size; |
| // step2: calculate the number of buckets |
| off_t total_buckets = total_entries / DIR_DEPTH; |
| // step3: calculate the number of segments, no segment has more than 16384 buckets |
| d->segments = (total_buckets + (((1 << 16) - 1) / DIR_DEPTH)) / ((1 << 16) / DIR_DEPTH); |
| // step4: divide total_buckets into segments on average. |
| d->buckets = (total_buckets + d->segments - 1) / d->segments; |
| // step5: set the start pointer. |
| d->start = d->skip + 2 * d->dirlen(); |
| } |
| |
| static void |
| vol_init_data(Vol *d) |
| { |
| // iteratively calculate start + buckets |
| vol_init_data_internal(d); |
| vol_init_data_internal(d); |
| vol_init_data_internal(d); |
| } |
| |
| void |
| vol_init_dir(Vol *d) |
| { |
| int b, s, l; |
| |
| for (s = 0; s < d->segments; s++) { |
| d->header->freelist[s] = 0; |
| Dir *seg = d->dir_segment(s); |
| for (l = 1; l < DIR_DEPTH; l++) { |
| for (b = 0; b < d->buckets; b++) { |
| Dir *bucket = dir_bucket(b, seg); |
| dir_free_entry(dir_bucket_row(bucket, l), s, d); |
| } |
| } |
| } |
| } |
| |
| void |
| vol_clear_init(Vol *d) |
| { |
| size_t dir_len = d->dirlen(); |
| memset(d->raw_dir, 0, dir_len); |
| vol_init_dir(d); |
| d->header->magic = VOL_MAGIC; |
| d->header->version._major = CACHE_DB_MAJOR_VERSION; |
| d->header->version._minor = CACHE_DB_MINOR_VERSION; |
| d->scan_pos = d->header->agg_pos = d->header->write_pos = d->start; |
| d->header->last_write_pos = d->header->write_pos; |
| d->header->phase = 0; |
| d->header->cycle = 0; |
| d->header->create_time = time(nullptr); |
| d->header->dirty = 0; |
| d->sector_size = d->header->sector_size = d->disk->hw_sector_size; |
| *d->footer = *d->header; |
| } |
| |
| int |
| vol_dir_clear(Vol *d) |
| { |
| size_t dir_len = d->dirlen(); |
| vol_clear_init(d); |
| |
| if (pwrite(d->fd, d->raw_dir, dir_len, d->skip) < 0) { |
| Warning("unable to clear cache directory '%s'", d->hash_text.get()); |
| return -1; |
| } |
| return 0; |
| } |
| |
| int |
| Vol::clear_dir() |
| { |
| size_t dir_len = this->dirlen(); |
| vol_clear_init(this); |
| |
| SET_HANDLER(&Vol::handle_dir_clear); |
| |
| io.aiocb.aio_fildes = fd; |
| io.aiocb.aio_buf = raw_dir; |
| io.aiocb.aio_nbytes = dir_len; |
| io.aiocb.aio_offset = skip; |
| io.action = this; |
| io.thread = AIO_CALLBACK_THREAD_ANY; |
| io.then = nullptr; |
| ink_assert(ink_aio_write(&io)); |
| return 0; |
| } |
| |
| int |
| Vol::init(char *s, off_t blocks, off_t dir_skip, bool clear) |
| { |
| char *seed_str = disk->hash_base_string ? disk->hash_base_string : s; |
| const size_t hash_seed_size = strlen(seed_str); |
| const size_t hash_text_size = hash_seed_size + 32; |
| |
| hash_text = static_cast<char *>(ats_malloc(hash_text_size)); |
| ink_strlcpy(hash_text, seed_str, hash_text_size); |
| snprintf(hash_text + hash_seed_size, (hash_text_size - hash_seed_size), " %" PRIu64 ":%" PRIu64 "", |
| static_cast<uint64_t>(dir_skip), static_cast<uint64_t>(blocks)); |
| CryptoContext().hash_immediate(hash_id, hash_text, strlen(hash_text)); |
| |
| dir_skip = ROUND_TO_STORE_BLOCK((dir_skip < START_POS ? START_POS : dir_skip)); |
| path = ats_strdup(s); |
| len = blocks * STORE_BLOCK_SIZE; |
| ink_assert(len <= MAX_VOL_SIZE); |
| skip = dir_skip; |
| prev_recover_pos = 0; |
| |
| // successive approximation, directory/meta data eats up some storage |
| start = dir_skip; |
| vol_init_data(this); |
| data_blocks = (len - (start - skip)) / STORE_BLOCK_SIZE; |
| hit_evacuate_window = (data_blocks * cache_config_hit_evacuate_percent) / 100; |
| |
| evacuate_size = static_cast<int>(len / EVACUATION_BUCKET_SIZE) + 2; |
| int evac_len = evacuate_size * sizeof(DLL<EvacuationBlock>); |
| evacuate = static_cast<DLL<EvacuationBlock> *>(ats_malloc(evac_len)); |
| memset(static_cast<void *>(evacuate), 0, evac_len); |
| |
| Debug("cache_init", "Vol %s: allocating %zu directory bytes for a %lld byte volume (%lf%%)", hash_text.get(), dirlen(), |
| (long long)this->len, (double)dirlen() / (double)this->len * 100.0); |
| |
| raw_dir = nullptr; |
| if (ats_hugepage_enabled()) { |
| raw_dir = static_cast<char *>(ats_alloc_hugepage(this->dirlen())); |
| } |
| if (raw_dir == nullptr) { |
| raw_dir = static_cast<char *>(ats_memalign(ats_pagesize(), this->dirlen())); |
| } |
| |
| dir = reinterpret_cast<Dir *>(raw_dir + this->headerlen()); |
| header = reinterpret_cast<VolHeaderFooter *>(raw_dir); |
| footer = reinterpret_cast<VolHeaderFooter *>(raw_dir + this->dirlen() - ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter))); |
| |
| if (clear) { |
| Note("clearing cache directory '%s'", hash_text.get()); |
| return clear_dir(); |
| } |
| |
| init_info = new VolInitInfo(); |
| int footerlen = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter)); |
| off_t footer_offset = this->dirlen() - footerlen; |
| // try A |
| off_t as = skip; |
| |
| Debug("cache_init", "reading directory '%s'", hash_text.get()); |
| SET_HANDLER(&Vol::handle_header_read); |
| init_info->vol_aio[0].aiocb.aio_offset = as; |
| init_info->vol_aio[1].aiocb.aio_offset = as + footer_offset; |
| off_t bs = skip + this->dirlen(); |
| init_info->vol_aio[2].aiocb.aio_offset = bs; |
| init_info->vol_aio[3].aiocb.aio_offset = bs + footer_offset; |
| |
| for (unsigned i = 0; i < countof(init_info->vol_aio); i++) { |
| AIOCallback *aio = &(init_info->vol_aio[i]); |
| aio->aiocb.aio_fildes = fd; |
| aio->aiocb.aio_buf = &(init_info->vol_h_f[i * STORE_BLOCK_SIZE]); |
| aio->aiocb.aio_nbytes = footerlen; |
| aio->action = this; |
| aio->thread = AIO_CALLBACK_THREAD_ANY; |
| aio->then = (i < 3) ? &(init_info->vol_aio[i + 1]) : nullptr; |
| } |
| #if AIO_MODE == AIO_MODE_NATIVE |
| ink_assert(ink_aio_readv(init_info->vol_aio)); |
| #else |
| ink_assert(ink_aio_read(init_info->vol_aio)); |
| #endif |
| return 0; |
| } |
| |
| int |
| Vol::handle_dir_clear(int event, void *data) |
| { |
| size_t dir_len = this->dirlen(); |
| AIOCallback *op; |
| |
| if (event == AIO_EVENT_DONE) { |
| op = static_cast<AIOCallback *>(data); |
| if (static_cast<size_t>(op->aio_result) != op->aiocb.aio_nbytes) { |
| Warning("unable to clear cache directory '%s'", hash_text.get()); |
| disk->incrErrors(op); |
| fd = -1; |
| } |
| |
| if (op->aiocb.aio_nbytes == dir_len) { |
| /* clear the header for directory B. We don't need to clear the |
| whole of directory B. The header for directory B starts at |
| skip + len */ |
| op->aiocb.aio_nbytes = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter)); |
| op->aiocb.aio_offset = skip + dir_len; |
| ink_assert(ink_aio_write(op)); |
| return EVENT_DONE; |
| } |
| set_io_not_in_progress(); |
| SET_HANDLER(&Vol::dir_init_done); |
| dir_init_done(EVENT_IMMEDIATE, nullptr); |
| /* mark the volume as bad */ |
| } |
| return EVENT_DONE; |
| } |
| |
| int |
| Vol::handle_dir_read(int event, void *data) |
| { |
| AIOCallback *op = static_cast<AIOCallback *>(data); |
| |
| if (event == AIO_EVENT_DONE) { |
| if (static_cast<size_t>(op->aio_result) != op->aiocb.aio_nbytes) { |
| Note("Directory read failed: clearing cache directory %s", this->hash_text.get()); |
| clear_dir(); |
| return EVENT_DONE; |
| } |
| } |
| |
| if (!(header->magic == VOL_MAGIC && footer->magic == VOL_MAGIC && CACHE_DB_MAJOR_VERSION_COMPATIBLE <= header->version._major && |
| header->version._major <= CACHE_DB_MAJOR_VERSION)) { |
| Warning("bad footer in cache directory for '%s', clearing", hash_text.get()); |
| Note("VOL_MAGIC %d\n header magic: %d\n footer_magic %d\n CACHE_DB_MAJOR_VERSION_COMPATIBLE %d\n major version %d\n" |
| "CACHE_DB_MAJOR_VERSION %d\n", |
| VOL_MAGIC, header->magic, footer->magic, CACHE_DB_MAJOR_VERSION_COMPATIBLE, header->version._major, |
| CACHE_DB_MAJOR_VERSION); |
| Note("clearing cache directory '%s'", hash_text.get()); |
| clear_dir(); |
| return EVENT_DONE; |
| } |
| CHECK_DIR(this); |
| |
| sector_size = header->sector_size; |
| |
| return this->recover_data(); |
| } |
| |
| int |
| Vol::recover_data() |
| { |
| SET_HANDLER(&Vol::handle_recover_from_data); |
| return handle_recover_from_data(EVENT_IMMEDIATE, nullptr); |
| } |
| |
| /* |
| Philosophy: The idea is to find the region of disk that could be |
| inconsistent and remove all directory entries pointing to that potentially |
| inconsistent region. |
| Start from a consistent position (the write_pos of the last directory |
| synced to disk) and scan forward. Two invariants for docs that were |
| written to the disk after the directory was synced: |
| |
| 1. doc->magic == DOC_MAGIC |
| |
| The following two cases happen only when the previous generation |
| documents are aligned with the current ones. |
| |
| 2. All the docs written to the disk |
| after the directory was synced will have their sync_serial <= |
| header->sync_serial + 1, because the write aggregation can take |
| indeterminate amount of time to sync. The doc->sync_serial can be |
| equal to header->sync_serial + 1, because we increment the sync_serial |
| before we sync the directory to disk. |
| |
| 3. The doc->sync_serial will always increase. If doc->sync_serial |
| decreases, the document was written in the previous phase |
| |
| If either of these conditions fail and we are not too close to the end |
| (see the next comment ) then we're done |
| |
| We actually start from header->last_write_pos instead of header->write_pos |
| to make sure that we haven't wrapped around the whole disk without |
| syncing the directory. Since the sync serial is 60 seconds, it is |
| entirely possible to write through the whole cache without |
| once syncing the directory. In this case, we need to clear the |
| cache.The documents written right before we synced the |
| directory to disk should have the write_serial <= header->sync_serial. |
| |
| */ |
| |
| int |
| Vol::handle_recover_from_data(int event, void * /* data ATS_UNUSED */) |
| { |
| uint32_t got_len = 0; |
| uint32_t max_sync_serial = header->sync_serial; |
| char *s, *e = nullptr; |
| if (event == EVENT_IMMEDIATE) { |
| if (header->sync_serial == 0) { |
| io.aiocb.aio_buf = nullptr; |
| SET_HANDLER(&Vol::handle_recover_write_dir); |
| return handle_recover_write_dir(EVENT_IMMEDIATE, nullptr); |
| } |
| // initialize |
| recover_wrapped = false; |
| last_sync_serial = 0; |
| last_write_serial = 0; |
| recover_pos = header->last_write_pos; |
| if (recover_pos >= skip + len) { |
| recover_wrapped = true; |
| recover_pos = start; |
| } |
| io.aiocb.aio_buf = static_cast<char *>(ats_memalign(ats_pagesize(), RECOVERY_SIZE)); |
| io.aiocb.aio_nbytes = RECOVERY_SIZE; |
| if (static_cast<off_t>(recover_pos + io.aiocb.aio_nbytes) > static_cast<off_t>(skip + len)) { |
| io.aiocb.aio_nbytes = (skip + len) - recover_pos; |
| } |
| } else if (event == AIO_EVENT_DONE) { |
| if (io.aiocb.aio_nbytes != static_cast<size_t>(io.aio_result)) { |
| Warning("disk read error on recover '%s', clearing", hash_text.get()); |
| disk->incrErrors(&io); |
| goto Lclear; |
| } |
| if (io.aiocb.aio_offset == header->last_write_pos) { |
| /* check that we haven't wrapped around without syncing |
| the directory. Start from last_write_serial (write pos the documents |
| were written to just before syncing the directory) and make sure |
| that all documents have write_serial <= header->write_serial. |
| */ |
| uint32_t to_check = header->write_pos - header->last_write_pos; |
| ink_assert(to_check && to_check < (uint32_t)io.aiocb.aio_nbytes); |
| uint32_t done = 0; |
| s = static_cast<char *>(io.aiocb.aio_buf); |
| while (done < to_check) { |
| Doc *doc = reinterpret_cast<Doc *>(s + done); |
| if (doc->magic != DOC_MAGIC || doc->write_serial > header->write_serial) { |
| Warning("no valid directory found while recovering '%s', clearing", hash_text.get()); |
| goto Lclear; |
| } |
| done += round_to_approx_size(doc->len); |
| if (doc->sync_serial > last_write_serial) { |
| last_sync_serial = doc->sync_serial; |
| } |
| } |
| ink_assert(done == to_check); |
| |
| got_len = io.aiocb.aio_nbytes - done; |
| recover_pos += io.aiocb.aio_nbytes; |
| s = static_cast<char *>(io.aiocb.aio_buf) + done; |
| e = s + got_len; |
| } else { |
| got_len = io.aiocb.aio_nbytes; |
| recover_pos += io.aiocb.aio_nbytes; |
| s = static_cast<char *>(io.aiocb.aio_buf); |
| e = s + got_len; |
| } |
| } |
| // examine what we got |
| if (got_len) { |
| Doc *doc = nullptr; |
| |
| if (recover_wrapped && start == io.aiocb.aio_offset) { |
| doc = reinterpret_cast<Doc *>(s); |
| if (doc->magic != DOC_MAGIC || doc->write_serial < last_write_serial) { |
| recover_pos = skip + len - EVACUATION_SIZE; |
| goto Ldone; |
| } |
| } |
| |
| // If execution reaches here, then @c got_len > 0 and e == s + got_len therefore s < e |
| // clang analyzer can't figure this out, so be explicit. |
| ink_assert(s < e); |
| while (s < e) { |
| doc = reinterpret_cast<Doc *>(s); |
| |
| if (doc->magic != DOC_MAGIC || doc->sync_serial != last_sync_serial) { |
| if (doc->magic == DOC_MAGIC) { |
| if (doc->sync_serial > header->sync_serial) { |
| max_sync_serial = doc->sync_serial; |
| } |
| |
| /* |
| doc->magic == DOC_MAGIC, but doc->sync_serial != last_sync_serial |
| This might happen in the following situations |
| 1. We are starting off recovery. In this case the |
| last_sync_serial == header->sync_serial, but the doc->sync_serial |
| can be anywhere in the range (0, header->sync_serial + 1] |
| If this is the case, update last_sync_serial and continue; |
| |
| 2. A dir sync started between writing documents to the |
| aggregation buffer and hence the doc->sync_serial went up. |
| If the doc->sync_serial is greater than the last |
| sync serial and less than (header->sync_serial + 2) then |
| continue; |
| |
| 3. If the position we are recovering from is within AGG_SIZE |
| from the disk end, then we can't trust this document. The |
| aggregation buffer might have been larger than the remaining space |
| at the end and we decided to wrap around instead of writing |
| anything at that point. In this case, wrap around and start |
| from the beginning. |
| |
| If neither of these 3 cases happen, then we are indeed done. |
| |
| */ |
| |
| // case 1 |
| // case 2 |
| if (doc->sync_serial > last_sync_serial && doc->sync_serial <= header->sync_serial + 1) { |
| last_sync_serial = doc->sync_serial; |
| s += round_to_approx_size(doc->len); |
| continue; |
| } |
| // case 3 - we have already recovered some data and |
| // (doc->sync_serial < last_sync_serial) || |
| // (doc->sync_serial > header->sync_serial + 1). |
| // if we are too close to the end, wrap around |
| else if (recover_pos - (e - s) > (skip + len) - AGG_SIZE) { |
| recover_wrapped = true; |
| recover_pos = start; |
| io.aiocb.aio_nbytes = RECOVERY_SIZE; |
| |
| break; |
| } |
| // we are done. This doc was written in the earlier phase |
| recover_pos -= e - s; |
| goto Ldone; |
| } else { |
| // doc->magic != DOC_MAGIC |
| // If we are in the danger zone - recover_pos is within AGG_SIZE |
| // from the end, then wrap around |
| recover_pos -= e - s; |
| if (recover_pos > (skip + len) - AGG_SIZE) { |
| recover_wrapped = true; |
| recover_pos = start; |
| io.aiocb.aio_nbytes = RECOVERY_SIZE; |
| |
| break; |
| } |
| // we ar not in the danger zone |
| goto Ldone; |
| } |
| } |
| // doc->magic == DOC_MAGIC && doc->sync_serial == last_sync_serial |
| last_write_serial = doc->write_serial; |
| s += round_to_approx_size(doc->len); |
| } |
| |
| /* if (s > e) then we gone through RECOVERY_SIZE; we need to |
| read more data off disk and continue recovering */ |
| if (s >= e) { |
| /* In the last iteration, we increment s by doc->len...need to undo |
| that change */ |
| if (s > e) { |
| s -= round_to_approx_size(doc->len); |
| } |
| recover_pos -= e - s; |
| if (recover_pos >= skip + len) { |
| recover_wrapped = true; |
| recover_pos = start; |
| } |
| io.aiocb.aio_nbytes = RECOVERY_SIZE; |
| if (static_cast<off_t>(recover_pos + io.aiocb.aio_nbytes) > static_cast<off_t>(skip + len)) { |
| io.aiocb.aio_nbytes = (skip + len) - recover_pos; |
| } |
| } |
| } |
| if (recover_pos == prev_recover_pos) { // this should never happen, but if it does break the loop |
| goto Lclear; |
| } |
| prev_recover_pos = recover_pos; |
| io.aiocb.aio_offset = recover_pos; |
| ink_assert(ink_aio_read(&io)); |
| return EVENT_CONT; |
| |
| Ldone : { |
| /* if we come back to the starting position, then we don't have to recover anything */ |
| if (recover_pos == header->write_pos && recover_wrapped) { |
| SET_HANDLER(&Vol::handle_recover_write_dir); |
| if (is_debug_tag_set("cache_init")) { |
| Note("recovery wrapped around. nothing to clear\n"); |
| } |
| return handle_recover_write_dir(EVENT_IMMEDIATE, nullptr); |
| } |
| |
| recover_pos += EVACUATION_SIZE; // safely cover the max write size |
| if (recover_pos < header->write_pos && (recover_pos + EVACUATION_SIZE >= header->write_pos)) { |
| Debug("cache_init", "Head Pos: %" PRIu64 ", Rec Pos: %" PRIu64 ", Wrapped:%d", header->write_pos, recover_pos, recover_wrapped); |
| Warning("no valid directory found while recovering '%s', clearing", hash_text.get()); |
| goto Lclear; |
| } |
| |
| if (recover_pos > skip + len) { |
| recover_pos -= skip + len; |
| } |
| // bump sync number so it is different from that in the Doc structs |
| uint32_t next_sync_serial = max_sync_serial + 1; |
| // make that the next sync does not overwrite our good copy! |
| if (!(header->sync_serial & 1) == !(next_sync_serial & 1)) { |
| next_sync_serial++; |
| } |
| // clear effected portion of the cache |
| off_t clear_start = this->offset_to_vol_offset(header->write_pos); |
| off_t clear_end = this->offset_to_vol_offset(recover_pos); |
| if (clear_start <= clear_end) { |
| dir_clear_range(clear_start, clear_end, this); |
| } else { |
| dir_clear_range(clear_start, DIR_OFFSET_MAX, this); |
| dir_clear_range(1, clear_end, this); |
| } |
| |
| Note("recovery clearing offsets of Vol %s : [%" PRIu64 ", %" PRIu64 "] sync_serial %d next %d\n", hash_text.get(), |
| header->write_pos, recover_pos, header->sync_serial, next_sync_serial); |
| |
| footer->sync_serial = header->sync_serial = next_sync_serial; |
| |
| for (int i = 0; i < 3; i++) { |
| AIOCallback *aio = &(init_info->vol_aio[i]); |
| aio->aiocb.aio_fildes = fd; |
| aio->action = this; |
| aio->thread = AIO_CALLBACK_THREAD_ANY; |
| aio->then = (i < 2) ? &(init_info->vol_aio[i + 1]) : nullptr; |
| } |
| int footerlen = ROUND_TO_STORE_BLOCK(sizeof(VolHeaderFooter)); |
| size_t dirlen = this->dirlen(); |
| int B = header->sync_serial & 1; |
| off_t ss = skip + (B ? dirlen : 0); |
| |
| init_info->vol_aio[0].aiocb.aio_buf = raw_dir; |
| init_info->vol_aio[0].aiocb.aio_nbytes = footerlen; |
| init_info->vol_aio[0].aiocb.aio_offset = ss; |
| init_info->vol_aio[1].aiocb.aio_buf = raw_dir + footerlen; |
| init_info->vol_aio[1].aiocb.aio_nbytes = dirlen - 2 * footerlen; |
| init_info->vol_aio[1].aiocb.aio_offset = ss + footerlen; |
| init_info->vol_aio[2].aiocb.aio_buf = raw_dir + dirlen - footerlen; |
| init_info->vol_aio[2].aiocb.aio_nbytes = footerlen; |
| init_info->vol_aio[2].aiocb.aio_offset = ss + dirlen - footerlen; |
| |
| SET_HANDLER(&Vol::handle_recover_write_dir); |
| #if AIO_MODE == AIO_MODE_NATIVE |
| ink_assert(ink_aio_writev(init_info->vol_aio)); |
| #else |
| ink_assert(ink_aio_write(init_info->vol_aio)); |
| #endif |
| return EVENT_CONT; |
| } |
| |
| Lclear: |
| free(static_cast<char *>(io.aiocb.aio_buf)); |
| delete init_info; |
| init_info = nullptr; |
| clear_dir(); |
| return EVENT_CONT; |
| } |
| |
| int |
| Vol::handle_recover_write_dir(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */) |
| { |
| if (io.aiocb.aio_buf) { |
| free(static_cast<char *>(io.aiocb.aio_buf)); |
| } |
| delete init_info; |
| init_info = nullptr; |
| set_io_not_in_progress(); |
| scan_pos = header->write_pos; |
| periodic_scan(); |
| SET_HANDLER(&Vol::dir_init_done); |
| return dir_init_done(EVENT_IMMEDIATE, nullptr); |
| } |
| |
| int |
| Vol::handle_header_read(int event, void *data) |
| { |
| AIOCallback *op; |
| VolHeaderFooter *hf[4]; |
| switch (event) { |
| case AIO_EVENT_DONE: |
| op = static_cast<AIOCallback *>(data); |
| for (auto &i : hf) { |
| ink_assert(op != nullptr); |
| i = static_cast<VolHeaderFooter *>(op->aiocb.aio_buf); |
| if (static_cast<size_t>(op->aio_result) != op->aiocb.aio_nbytes) { |
| Note("Header read failed: clearing cache directory %s", this->hash_text.get()); |
| clear_dir(); |
| return EVENT_DONE; |
| } |
| op = op->then; |
| } |
| |
| io.aiocb.aio_fildes = fd; |
| io.aiocb.aio_nbytes = this->dirlen(); |
| io.aiocb.aio_buf = raw_dir; |
| io.action = this; |
| io.thread = AIO_CALLBACK_THREAD_ANY; |
| io.then = nullptr; |
| |
| if (hf[0]->sync_serial == hf[1]->sync_serial && |
| (hf[0]->sync_serial >= hf[2]->sync_serial || hf[2]->sync_serial != hf[3]->sync_serial)) { |
| SET_HANDLER(&Vol::handle_dir_read); |
| if (is_debug_tag_set("cache_init")) { |
| Note("using directory A for '%s'", hash_text.get()); |
| } |
| io.aiocb.aio_offset = skip; |
| ink_assert(ink_aio_read(&io)); |
| } |
| // try B |
| else if (hf[2]->sync_serial == hf[3]->sync_serial) { |
| SET_HANDLER(&Vol::handle_dir_read); |
| if (is_debug_tag_set("cache_init")) { |
| Note("using directory B for '%s'", hash_text.get()); |
| } |
| io.aiocb.aio_offset = skip + this->dirlen(); |
| ink_assert(ink_aio_read(&io)); |
| } else { |
| Note("no good directory, clearing '%s' since sync_serials on both A and B copies are invalid", hash_text.get()); |
| Note("Header A: %d\nFooter A: %d\n Header B: %d\n Footer B %d\n", hf[0]->sync_serial, hf[1]->sync_serial, hf[2]->sync_serial, |
| hf[3]->sync_serial); |
| clear_dir(); |
| delete init_info; |
| init_info = nullptr; |
| } |
| return EVENT_DONE; |
| default: |
| ink_assert(!"not reach here"); |
| } |
| return EVENT_DONE; |
| } |
| |
| int |
| Vol::dir_init_done(int /* event ATS_UNUSED */, void * /* data ATS_UNUSED */) |
| { |
| if (!cache->cache_read_done) { |
| eventProcessor.schedule_in(this, HRTIME_MSECONDS(5), ET_CALL); |
| return EVENT_CONT; |
| } else { |
| int vol_no = gnvol++; |
| ink_assert(!gvol[vol_no]); |
| gvol[vol_no] = this; |
| SET_HANDLER(&Vol::aggWrite); |
| cache->vol_initialized(fd != -1); |
| return EVENT_DONE; |
| } |
| } |
| |
| // explicit pair for random table in build_vol_hash_table |
| struct rtable_pair { |
| unsigned int rval; ///< relative value, used to sort. |
| unsigned int idx; ///< volume mapping table index. |
| }; |
| |
| // comparison operator for random table in build_vol_hash_table |
| // sorts based on the randomly assigned rval |
| static int |
| cmprtable(const void *aa, const void *bb) |
| { |
| rtable_pair *a = (rtable_pair *)aa; |
| rtable_pair *b = (rtable_pair *)bb; |
| if (a->rval < b->rval) { |
| return -1; |
| } |
| if (a->rval > b->rval) { |
| return 1; |
| } |
| return 0; |
| } |
| |
| void |
| build_vol_hash_table(CacheHostRecord *cp) |
| { |
| int num_vols = cp->num_vols; |
| unsigned int *mapping = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols)); |
| Vol **p = static_cast<Vol **>(ats_malloc(sizeof(Vol *) * num_vols)); |
| |
| memset(mapping, 0, num_vols * sizeof(unsigned int)); |
| memset(p, 0, num_vols * sizeof(Vol *)); |
| uint64_t total = 0; |
| int bad_vols = 0; |
| int map = 0; |
| uint64_t used = 0; |
| // initialize number of elements per vol |
| for (int i = 0; i < num_vols; i++) { |
| if (DISK_BAD(cp->vols[i]->disk)) { |
| bad_vols++; |
| continue; |
| } |
| mapping[map] = i; |
| p[map++] = cp->vols[i]; |
| total += (cp->vols[i]->len >> STORE_BLOCK_SHIFT); |
| } |
| |
| num_vols -= bad_vols; |
| |
| if (!num_vols || !total) { |
| // all the disks are corrupt, |
| if (cp->vol_hash_table) { |
| new_Freer(cp->vol_hash_table, CACHE_MEM_FREE_TIMEOUT); |
| } |
| cp->vol_hash_table = nullptr; |
| ats_free(mapping); |
| ats_free(p); |
| return; |
| } |
| |
| unsigned int *forvol = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols)); |
| unsigned int *gotvol = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols)); |
| unsigned int *rnd = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols)); |
| unsigned short *ttable = static_cast<unsigned short *>(ats_malloc(sizeof(unsigned short) * VOL_HASH_TABLE_SIZE)); |
| unsigned short *old_table; |
| unsigned int *rtable_entries = static_cast<unsigned int *>(ats_malloc(sizeof(unsigned int) * num_vols)); |
| unsigned int rtable_size = 0; |
| |
| // estimate allocation |
| for (int i = 0; i < num_vols; i++) { |
| forvol[i] = (VOL_HASH_TABLE_SIZE * (p[i]->len >> STORE_BLOCK_SHIFT)) / total; |
| used += forvol[i]; |
| rtable_entries[i] = p[i]->len / VOL_HASH_ALLOC_SIZE; |
| rtable_size += rtable_entries[i]; |
| gotvol[i] = 0; |
| } |
| // spread around the excess |
| int extra = VOL_HASH_TABLE_SIZE - used; |
| for (int i = 0; i < extra; i++) { |
| forvol[i % num_vols]++; |
| } |
| // seed random number generator |
| for (int i = 0; i < num_vols; i++) { |
| uint64_t x = p[i]->hash_id.fold(); |
| rnd[i] = static_cast<unsigned int>(x); |
| } |
| // initialize table to "empty" |
| for (int i = 0; i < VOL_HASH_TABLE_SIZE; i++) { |
| ttable[i] = VOL_HASH_EMPTY; |
| } |
| // generate random numbers proportional to allocation |
| rtable_pair *rtable = static_cast<rtable_pair *>(ats_malloc(sizeof(rtable_pair) * rtable_size)); |
| int rindex = 0; |
| for (int i = 0; i < num_vols; i++) { |
| for (int j = 0; j < static_cast<int>(rtable_entries[i]); j++) { |
| rtable[rindex].rval = next_rand(&rnd[i]); |
| rtable[rindex].idx = i; |
| rindex++; |
| } |
| } |
| ink_assert(rindex == (int)rtable_size); |
| // sort (rand #, vol $ pairs) |
| qsort(rtable, rtable_size, sizeof(rtable_pair), cmprtable); |
| unsigned int width = (1LL << 32) / VOL_HASH_TABLE_SIZE; |
| unsigned int pos; // target position to allocate |
| // select vol with closest random number for each bucket |
| int i = 0; // index moving through the random numbers |
| for (int j = 0; j < VOL_HASH_TABLE_SIZE; j++) { |
| pos = width / 2 + j * width; // position to select closest to |
| while (pos > rtable[i].rval && i < static_cast<int>(rtable_size) - 1) { |
| i++; |
| } |
| ttable[j] = mapping[rtable[i].idx]; |
| gotvol[rtable[i].idx]++; |
| } |
| for (int i = 0; i < num_vols; i++) { |
| Debug("cache_init", "build_vol_hash_table index %d mapped to %d requested %d got %d", i, mapping[i], forvol[i], gotvol[i]); |
| } |
| // install new table |
| if (nullptr != (old_table = ink_atomic_swap(&(cp->vol_hash_table), ttable))) { |
| new_Freer(old_table, CACHE_MEM_FREE_TIMEOUT); |
| } |
| ats_free(mapping); |
| ats_free(p); |
| ats_free(forvol); |
| ats_free(gotvol); |
| ats_free(rnd); |
| ats_free(rtable_entries); |
| ats_free(rtable); |
| } |
| |
| void |
| Cache::vol_initialized(bool result) |
| { |
| if (result) { |
| ink_atomic_increment(&total_good_nvol, 1); |
| } |
| if (total_nvol == ink_atomic_increment(&total_initialized_vol, 1) + 1) { |
| open_done(); |
| } |
| } |
| |
| /** Set the state of a disk programmatically. |
| */ |
| bool |
| CacheProcessor::mark_storage_offline(CacheDisk *d, ///< Target disk |
| bool admin) |
| { |
| bool zret; // indicates whether there's any online storage left. |
| int p; |
| uint64_t total_bytes_delete = 0; |
| uint64_t total_dir_delete = 0; |
| uint64_t used_dir_delete = 0; |
| |
| /* Don't mark it again, it will invalidate the stats! */ |
| if (!d->online) { |
| return this->has_online_storage(); |
| } |
| |
| d->online = false; |
| |
| if (!DISK_BAD(d)) { |
| SET_DISK_BAD(d); |
| } |
| |
| for (p = 0; p < gnvol; p++) { |
| if (d->fd == gvol[p]->fd) { |
| total_dir_delete += gvol[p]->buckets * gvol[p]->segments * DIR_DEPTH; |
| used_dir_delete += dir_entries_used(gvol[p]); |
| total_bytes_delete += gvol[p]->len - gvol[p]->dirlen(); |
| } |
| } |
| |
| RecIncrGlobalRawStat(cache_rsb, cache_bytes_total_stat, -total_bytes_delete); |
| RecIncrGlobalRawStat(cache_rsb, cache_direntries_total_stat, -total_dir_delete); |
| RecIncrGlobalRawStat(cache_rsb, cache_direntries_used_stat, -used_dir_delete); |
| |
| /* Update the span metrics, if failing then move the span from "failing" to "offline" bucket |
| * if operator took it offline, move it from "online" to "offline" bucket */ |
| RecIncrGlobalRawStat(cache_rsb, admin ? cache_span_online_stat : cache_span_failing_stat, -1); |
| RecIncrGlobalRawStat(cache_rsb, cache_span_offline_stat, 1); |
| |
| if (theCache) { |
| rebuild_host_table(theCache); |
| } |
| |
| zret = this->has_online_storage(); |
| if (!zret) { |
| Warning("All storage devices offline, cache disabled"); |
| CacheProcessor::cache_ready = 0; |
| } else { // check cache types specifically |
| if (theCache && !theCache->hosttable->gen_host_rec.vol_hash_table) { |
| unsigned int caches_ready = 0; |
| caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_HTTP); |
| caches_ready = caches_ready | (1 << CACHE_FRAG_TYPE_NONE); |
| caches_ready = ~caches_ready; |
| CacheProcessor::cache_ready &= caches_ready; |
| Warning("all volumes for http cache are corrupt, http cache disabled"); |
| } |
| } |
| |
| return zret; |
| } |
| |
| bool |
| CacheProcessor::has_online_storage() const |
| { |
| CacheDisk **dptr = gdisks; |
| for (int disk_no = 0; disk_no < gndisks; ++disk_no, ++dptr) { |
| if (!DISK_BAD(*dptr) && (*dptr)->online) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| int |
| AIO_Callback_handler::handle_disk_failure(int /* event ATS_UNUSED */, void *data) |
| { |
| /* search for the matching file descriptor */ |
| if (!CacheProcessor::cache_ready) { |
| return EVENT_DONE; |
| } |
| int disk_no = 0; |
| AIOCallback *cb = static_cast<AIOCallback *>(data); |
| |
| for (; disk_no < gndisks; disk_no++) { |
| CacheDisk *d = gdisks[disk_no]; |
| |
| if (d->fd == cb->aiocb.aio_fildes) { |
| char message[256]; |
| d->incrErrors(cb); |
| |
| if (!DISK_BAD(d)) { |
| snprintf(message, sizeof(message), "Error accessing Disk %s [%d/%d]", d->path, d->num_errors, cache_config_max_disk_errors); |
| Warning("%s", message); |
| RecSignalManager(REC_SIGNAL_CACHE_WARNING, message); |
| } else if (!DISK_BAD_SIGNALLED(d)) { |
| snprintf(message, sizeof(message), "too many errors accessing disk %s [%d/%d]: declaring disk bad", d->path, d->num_errors, |
| cache_config_max_disk_errors); |
| Warning("%s", message); |
| RecSignalManager(REC_SIGNAL_CACHE_ERROR, message); |
| cacheProcessor.mark_storage_offline(d); // take it out of service |
| } |
| break; |
| } |
| } |
| |
| delete cb; |
| return EVENT_DONE; |
| } |
| |
| int |
| Cache::open_done() |
| { |
| Action *register_ShowCache(Continuation * c, HTTPHdr * h); |
| Action *register_ShowCacheInternal(Continuation * c, HTTPHdr * h); |
| statPagesManager.register_http("cache", register_ShowCache); |
| statPagesManager.register_http("cache-internal", register_ShowCacheInternal); |
| |
| if (total_good_nvol == 0) { |
| ready = CACHE_INIT_FAILED; |
| cacheProcessor.cacheInitialized(); |
| return 0; |
| } |
| |
| hosttable = new CacheHostTable(this, scheme); |
| hosttable->register_config_callback(&hosttable); |
| |
| if (hosttable->gen_host_rec.num_cachevols == 0) { |
| ready = CACHE_INIT_FAILED; |
| } else { |
| ready = CACHE_INITIALIZED; |
| } |
| |
| // TS-3848 |
| if (ready == CACHE_INIT_FAILED && cacheProcessor.waitForCache() >= 2) { |
| Emergency("Failed to initialize cache host table"); |
| } |
| |
| cacheProcessor.cacheInitialized(); |
| |
| return 0; |
| } |
| |
| int |
| Cache::open(bool clear, bool /* fix ATS_UNUSED */) |
| { |
| int i; |
| off_t blocks = 0; |
| cache_read_done = 0; |
| total_initialized_vol = 0; |
| total_nvol = 0; |
| total_good_nvol = 0; |
| |
| REC_EstablishStaticConfigInt32(cache_config_min_average_object_size, "proxy.config.cache.min_average_object_size"); |
| Debug("cache_init", "Cache::open - proxy.config.cache.min_average_object_size = %d", (int)cache_config_min_average_object_size); |
| |
| CacheVol *cp = cp_list.head; |
| for (; cp; cp = cp->link.next) { |
| if (cp->scheme == scheme) { |
| cp->vols = static_cast<Vol **>(ats_malloc(cp->num_vols * sizeof(Vol *))); |
| int vol_no = 0; |
| for (i = 0; i < gndisks; i++) { |
| if (cp->disk_vols[i] && !DISK_BAD(cp->disk_vols[i]->disk)) { |
| DiskVolBlockQueue *q = cp->disk_vols[i]->dpb_queue.head; |
| for (; q; q = q->link.next) { |
| cp->vols[vol_no] = new Vol(); |
| CacheDisk *d = cp->disk_vols[i]->disk; |
| cp->vols[vol_no]->disk = d; |
| cp->vols[vol_no]->fd = d->fd; |
| cp->vols[vol_no]->cache = this; |
| cp->vols[vol_no]->cache_vol = cp; |
| blocks = q->b->len; |
| |
| bool vol_clear = clear || d->cleared || q->new_block; |
| #if AIO_MODE == AIO_MODE_NATIVE |
| eventProcessor.schedule_imm(new VolInit(cp->vols[vol_no], d->path, blocks, q->b->offset, vol_clear)); |
| #else |
| cp->vols[vol_no]->init(d->path, blocks, q->b->offset, vol_clear); |
| #endif |
| vol_no++; |
| cache_size += blocks; |
| } |
| } |
| } |
| total_nvol += vol_no; |
| } |
| } |
| if (total_nvol == 0) { |
| return open_done(); |
| } |
| cache_read_done = 1; |
| return 0; |
| } |
| |
| int |
| Cache::close() |
| { |
| return -1; |
| } |
| |
| int |
| CacheVC::dead(int /* event ATS_UNUSED */, Event * /*e ATS_UNUSED */) |
| { |
| ink_assert(0); |
| return EVENT_DONE; |
| } |
| |
| bool |
| CacheVC::is_pread_capable() |
| { |
| return !f.read_from_writer_called; |
| } |
| |
| #define STORE_COLLISION 1 |
| |
| static void |
| unmarshal_helper(Doc *doc, Ptr<IOBufferData> &buf, int &okay) |
| { |
| using UnmarshalFunc = int(char *buf, int len, RefCountObj *block_ref); |
| UnmarshalFunc *unmarshal_func = &HTTPInfo::unmarshal; |
| ts::VersionNumber version(doc->v_major, doc->v_minor); |
| |
| // introduced by https://github.com/apache/trafficserver/pull/4874, this is used to distinguish the doc version |
| // before and after #4847 |
| if (version < CACHE_DB_VERSION) { |
| unmarshal_func = &HTTPInfo::unmarshal_v24_1; |
| } |
| |
| char *tmp = doc->hdr(); |
| int len = doc->hlen; |
| while (len > 0) { |
| int r = unmarshal_func(tmp, len, buf.get()); |
| if (r < 0) { |
| ink_assert(!"CacheVC::handleReadDone unmarshal failed"); |
| okay = 0; |
| break; |
| } |
| len -= r; |
| tmp += r; |
| } |
| } |
| |
| // [amc] I think this is where all disk reads from cache funnel through here. |
| int |
| CacheVC::handleReadDone(int event, Event *e) |
| { |
| cancel_trigger(); |
| ink_assert(this_ethread() == mutex->thread_holding); |
| |
| Doc *doc = nullptr; |
| if (event == AIO_EVENT_DONE) { |
| set_io_not_in_progress(); |
| } else if (is_io_in_progress()) { |
| return EVENT_CONT; |
| } |
| { |
| MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding); |
| if (!lock.is_locked()) { |
| VC_SCHED_LOCK_RETRY(); |
| } |
| if ((!dir_valid(vol, &dir)) || (!io.ok())) { |
| if (!io.ok()) { |
| Debug("cache_disk_error", "Read error on disk %s\n \ |
| read range : [%" PRIu64 " - %" PRIu64 " bytes] [%" PRIu64 " - %" PRIu64 " blocks] \n", |
| vol->hash_text.get(), (uint64_t)io.aiocb.aio_offset, (uint64_t)io.aiocb.aio_offset + io.aiocb.aio_nbytes, |
| (uint64_t)io.aiocb.aio_offset / 512, (uint64_t)(io.aiocb.aio_offset + io.aiocb.aio_nbytes) / 512); |
| } |
| goto Ldone; |
| } |
| |
| doc = reinterpret_cast<Doc *>(buf->data()); |
| ink_assert(vol->mutex->nthread_holding < 1000); |
| ink_assert(doc->magic == DOC_MAGIC); |
| |
| if (ts::VersionNumber(doc->v_major, doc->v_minor) > CACHE_DB_VERSION) { |
| // future version, count as corrupted |
| doc->magic = DOC_CORRUPT; |
| Debug("cache_bc", "Object is future version %d:%d - disk %s - doc id = %" PRIx64 ":%" PRIx64 "", doc->v_major, doc->v_minor, |
| vol->hash_text.get(), read_key->slice64(0), read_key->slice64(1)); |
| goto Ldone; |
| } |
| |
| #ifdef VERIFY_JTEST_DATA |
| char xx[500]; |
| if (read_key && *read_key == doc->key && request.valid() && !dir_head(&dir) && !vio.ndone) { |
| int ib = 0, xd = 0; |
| request.url_get()->print(xx, 500, &ib, &xd); |
| char *x = xx; |
| for (int q = 0; q < 3; q++) |
| x = strchr(x + 1, '/'); |
| ink_assert(!memcmp(doc->data(), x, ib - (x - xx))); |
| } |
| #endif |
| |
| if (is_debug_tag_set("cache_read")) { |
| char xt[CRYPTO_HEX_SIZE]; |
| Debug("cache_read", "Read complete on fragment %s. Length: data payload=%d this fragment=%d total doc=%" PRId64 " prefix=%d", |
| doc->key.toHexStr(xt), doc->data_len(), doc->len, doc->total_len, doc->prefix_len()); |
| } |
| |
| // put into ram cache? |
| if (io.ok() && ((doc->first_key == *read_key) || (doc->key == *read_key) || STORE_COLLISION) && doc->magic == DOC_MAGIC) { |
| int okay = 1; |
| if (!f.doc_from_ram_cache) { |
| f.not_from_ram_cache = 1; |
| } |
| if (cache_config_enable_checksum && doc->checksum != DOC_NO_CHECKSUM) { |
| // verify that the checksum matches |
| uint32_t checksum = 0; |
| for (char *b = doc->hdr(); b < reinterpret_cast<char *>(doc) + doc->len; b++) { |
| checksum += *b; |
| } |
| ink_assert(checksum == doc->checksum); |
| if (checksum != doc->checksum) { |
| Note("cache: checksum error for [%" PRIu64 " %" PRIu64 "] len %d, hlen %d, disk %s, offset %" PRIu64 " size %zu", |
| doc->first_key.b[0], doc->first_key.b[1], doc->len, doc->hlen, vol->path, (uint64_t)io.aiocb.aio_offset, |
| (size_t)io.aiocb.aio_nbytes); |
| doc->magic = DOC_CORRUPT; |
| okay = 0; |
| } |
| } |
| (void)e; // Avoid compiler warnings |
| bool http_copy_hdr = false; |
| http_copy_hdr = |
| cache_config_ram_cache_compress && !f.doc_from_ram_cache && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen; |
| // If http doc we need to unmarshal the headers before putting in the ram cache |
| // unless it could be compressed |
| if (!http_copy_hdr && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen && okay) { |
| unmarshal_helper(doc, buf, okay); |
| } |
| // Put the request in the ram cache only if its a open_read or lookup |
| if (vio.op == VIO::READ && okay) { |
| bool cutoff_check; |
| // cutoff_check : |
| // doc_len == 0 for the first fragment (it is set from the vector) |
| // The decision on the first fragment is based on |
| // doc->total_len |
| // After that, the decision is based of doc_len (doc_len != 0) |
| // (cache_config_ram_cache_cutoff == 0) : no cutoffs |
| cutoff_check = |
| ((!doc_len && static_cast<int64_t>(doc->total_len) < cache_config_ram_cache_cutoff) || |
| (doc_len && static_cast<int64_t>(doc_len) < cache_config_ram_cache_cutoff) || !cache_config_ram_cache_cutoff); |
| if (cutoff_check && !f.doc_from_ram_cache) { |
| uint64_t o = dir_offset(&dir); |
| vol->ram_cache->put(read_key, buf.get(), doc->len, http_copy_hdr, o); |
| } |
| if (!doc_len) { |
| // keep a pointer to it. In case the state machine decides to |
| // update this document, we don't have to read it back in memory |
| // again |
| vol->first_fragment_key = *read_key; |
| vol->first_fragment_offset = dir_offset(&dir); |
| vol->first_fragment_data = buf; |
| } |
| } // end VIO::READ check |
| // If it could be compressed, unmarshal after |
| if (http_copy_hdr && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen && okay) { |
| unmarshal_helper(doc, buf, okay); |
| } |
| } // end io.ok() check |
| } |
| Ldone: |
| POP_HANDLER; |
| return handleEvent(AIO_EVENT_DONE, nullptr); |
| } |
| |
| int |
| CacheVC::handleRead(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) |
| { |
| cancel_trigger(); |
| |
| f.doc_from_ram_cache = false; |
| |
| // check ram cache |
| ink_assert(vol->mutex->thread_holding == this_ethread()); |
| int64_t o = dir_offset(&dir); |
| int ram_hit_state = vol->ram_cache->get(read_key, &buf, static_cast<uint64_t>(o)); |
| f.compressed_in_ram = (ram_hit_state > RAM_HIT_COMPRESS_NONE) ? 1 : 0; |
| if (ram_hit_state >= RAM_HIT_COMPRESS_NONE) { |
| goto LramHit; |
| } |
| |
| // check if it was read in the last open_read call |
| if (*read_key == vol->first_fragment_key && dir_offset(&dir) == vol->first_fragment_offset) { |
| buf = vol->first_fragment_data; |
| goto LmemHit; |
| } |
| // see if its in the aggregation buffer |
| if (dir_agg_buf_valid(vol, &dir)) { |
| int agg_offset = vol->vol_offset(&dir) - vol->header->write_pos; |
| buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED); |
| ink_assert((agg_offset + io.aiocb.aio_nbytes) <= (unsigned)vol->agg_buf_pos); |
| char *doc = buf->data(); |
| char *agg = vol->agg_buffer + agg_offset; |
| memcpy(doc, agg, io.aiocb.aio_nbytes); |
| io.aio_result = io.aiocb.aio_nbytes; |
| SET_HANDLER(&CacheVC::handleReadDone); |
| return EVENT_RETURN; |
| } |
| |
| io.aiocb.aio_fildes = vol->fd; |
| io.aiocb.aio_offset = vol->vol_offset(&dir); |
| if (static_cast<off_t>(io.aiocb.aio_offset + io.aiocb.aio_nbytes) > static_cast<off_t>(vol->skip + vol->len)) { |
| io.aiocb.aio_nbytes = vol->skip + vol->len - io.aiocb.aio_offset; |
| } |
| buf = new_IOBufferData(iobuffer_size_to_index(io.aiocb.aio_nbytes, MAX_BUFFER_SIZE_INDEX), MEMALIGNED); |
| io.aiocb.aio_buf = buf->data(); |
| io.action = this; |
| io.thread = mutex->thread_holding->tt == DEDICATED ? AIO_CALLBACK_THREAD_ANY : mutex->thread_holding; |
| SET_HANDLER(&CacheVC::handleReadDone); |
| ink_assert(ink_aio_read(&io) >= 0); |
| CACHE_DEBUG_INCREMENT_DYN_STAT(cache_pread_count_stat); |
| return EVENT_CONT; |
| |
| LramHit : { |
| f.doc_from_ram_cache = true; |
| io.aio_result = io.aiocb.aio_nbytes; |
| Doc *doc = reinterpret_cast<Doc *>(buf->data()); |
| if (cache_config_ram_cache_compress && doc->doc_type == CACHE_FRAG_TYPE_HTTP && doc->hlen) { |
| SET_HANDLER(&CacheVC::handleReadDone); |
| return EVENT_RETURN; |
| } |
| } |
| LmemHit: |
| f.doc_from_ram_cache = true; |
| io.aio_result = io.aiocb.aio_nbytes; |
| POP_HANDLER; |
| return EVENT_RETURN; // allow the caller to release the volume lock |
| } |
| |
| Action * |
| Cache::lookup(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int host_len) |
| { |
| if (!CacheProcessor::IsCacheReady(type)) { |
| cont->handleEvent(CACHE_EVENT_LOOKUP_FAILED, nullptr); |
| return ACTION_RESULT_DONE; |
| } |
| |
| Vol *vol = key_to_vol(key, hostname, host_len); |
| ProxyMutex *mutex = cont->mutex.get(); |
| CacheVC *c = new_CacheVC(cont); |
| SET_CONTINUATION_HANDLER(c, &CacheVC::openReadStartHead); |
| c->vio.op = VIO::READ; |
| c->base_stat = cache_lookup_active_stat; |
| CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE); |
| c->first_key = c->key = *key; |
| c->frag_type = type; |
| c->f.lookup = 1; |
| c->vol = vol; |
| c->last_collision = nullptr; |
| |
| if (c->handleEvent(EVENT_INTERVAL, nullptr) == EVENT_CONT) { |
| return &c->_action; |
| } else { |
| return ACTION_RESULT_DONE; |
| } |
| } |
| |
| int |
| CacheVC::removeEvent(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) |
| { |
| cancel_trigger(); |
| set_io_not_in_progress(); |
| { |
| MUTEX_TRY_LOCK(lock, vol->mutex, mutex->thread_holding); |
| if (!lock.is_locked()) { |
| VC_SCHED_LOCK_RETRY(); |
| } |
| if (_action.cancelled) { |
| if (od) { |
| vol->close_write(this); |
| od = nullptr; |
| } |
| goto Lfree; |
| } |
| if (!f.remove_aborted_writers) { |
| if (vol->open_write(this, true, 1)) { |
| // writer exists |
| od = vol->open_read(&key); |
| ink_release_assert(od); |
| od->dont_update_directory = true; |
| od = nullptr; |
| } else { |
| od->dont_update_directory = true; |
| } |
| f.remove_aborted_writers = 1; |
| } |
| Lread: |
| SET_HANDLER(&CacheVC::removeEvent); |
| if (!buf) { |
| goto Lcollision; |
| } |
| if (!dir_valid(vol, &dir)) { |
| last_collision = nullptr; |
| goto Lcollision; |
| } |
| // check read completed correct FIXME: remove bad vols |
| if (static_cast<size_t>(io.aio_result) != io.aiocb.aio_nbytes) { |
| goto Ldone; |
| } |
| { |
| // verify that this is our document |
| Doc *doc = reinterpret_cast<Doc *>(buf->data()); |
| /* should be first_key not key..right?? */ |
| if (doc->first_key == key) { |
| ink_assert(doc->magic == DOC_MAGIC); |
| if (dir_delete(&key, vol, &dir) > 0) { |
| if (od) { |
| vol->close_write(this); |
| } |
| od = nullptr; |
| goto Lremoved; |
| } |
| goto Ldone; |
| } |
| } |
| Lcollision: |
| // check for collision |
| if (dir_probe(&key, vol, &dir, &last_collision) > 0) { |
| int ret = do_read_call(&key); |
| if (ret == EVENT_RETURN) { |
| goto Lread; |
| } |
| return ret; |
| } |
| Ldone: |
| CACHE_INCREMENT_DYN_STAT(cache_remove_failure_stat); |
| if (od) { |
| vol->close_write(this); |
| } |
| } |
| ink_assert(!vol || this_ethread() != vol->mutex->thread_holding); |
| _action.continuation->handleEvent(CACHE_EVENT_REMOVE_FAILED, (void *)-ECACHE_NO_DOC); |
| goto Lfree; |
| Lremoved: |
| _action.continuation->handleEvent(CACHE_EVENT_REMOVE, nullptr); |
| Lfree: |
| return free_CacheVC(this); |
| } |
| |
| Action * |
| Cache::remove(Continuation *cont, const CacheKey *key, CacheFragType type, const char *hostname, int host_len) |
| { |
| if (!CacheProcessor::IsCacheReady(type)) { |
| if (cont) { |
| cont->handleEvent(CACHE_EVENT_REMOVE_FAILED, nullptr); |
| } |
| return ACTION_RESULT_DONE; |
| } |
| |
| Ptr<ProxyMutex> mutex; |
| if (!cont) { |
| cont = new_CacheRemoveCont(); |
| } |
| |
| CACHE_TRY_LOCK(lock, cont->mutex, this_ethread()); |
| ink_assert(lock.is_locked()); |
| Vol *vol = key_to_vol(key, hostname, host_len); |
| // coverity[var_decl] |
| Dir result; |
| dir_clear(&result); // initialized here, set result empty so we can recognize missed lock |
| mutex = cont->mutex; |
| |
| CacheVC *c = new_CacheVC(cont); |
| c->vio.op = VIO::NONE; |
| c->frag_type = type; |
| c->base_stat = cache_remove_active_stat; |
| CACHE_INCREMENT_DYN_STAT(c->base_stat + CACHE_STAT_ACTIVE); |
| c->first_key = c->key = *key; |
| c->vol = vol; |
| c->dir = result; |
| c->f.remove = 1; |
| |
| SET_CONTINUATION_HANDLER(c, &CacheVC::removeEvent); |
| int ret = c->removeEvent(EVENT_IMMEDIATE, nullptr); |
| if (ret == EVENT_DONE) { |
| return ACTION_RESULT_DONE; |
| } else { |
| return &c->_action; |
| } |
| } |
| // CacheVConnection |
| |
| CacheVConnection::CacheVConnection() : VConnection(nullptr) {} |
| |
| void |
| cplist_init() |
| { |
| cp_list_len = 0; |
| for (int i = 0; i < gndisks; i++) { |
| CacheDisk *d = gdisks[i]; |
| DiskVol **dp = d->disk_vols; |
| for (unsigned int j = 0; j < d->header->num_volumes; j++) { |
| ink_assert(dp[j]->dpb_queue.head); |
| CacheVol *p = cp_list.head; |
| while (p) { |
| if (p->vol_number == dp[j]->vol_number) { |
| ink_assert(p->scheme == (int)dp[j]->dpb_queue.head->b->type); |
| p->size += dp[j]->size; |
| p->num_vols += dp[j]->num_volblocks; |
| p->disk_vols[i] = dp[j]; |
| break; |
| } |
| p = p->link.next; |
| } |
| if (!p) { |
| // did not find a volume in the cache vol list...create |
| // a new one |
| CacheVol *new_p = new CacheVol(); |
| new_p->vol_number = dp[j]->vol_number; |
| new_p->num_vols = dp[j]->num_volblocks; |
| new_p->size = dp[j]->size; |
| new_p->scheme = dp[j]->dpb_queue.head->b->type; |
| new_p->disk_vols = static_cast<DiskVol **>(ats_malloc(gndisks * sizeof(DiskVol *))); |
| memset(new_p->disk_vols, 0, gndisks * sizeof(DiskVol *)); |
| new_p->disk_vols[i] = dp[j]; |
| cp_list.enqueue(new_p); |
| cp_list_len++; |
| } |
| } |
| } |
| } |
| |
| static int fillExclusiveDisks(CacheVol *cp); |
| |
| void |
| cplist_update() |
| { |
| /* go through cplist and delete volumes that are not in the volume.config */ |
| CacheVol *cp = cp_list.head; |
| ConfigVol *config_vol; |
| |
| while (cp) { |
| for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) { |
| if (config_vol->number == cp->vol_number) { |
| if (cp->scheme == config_vol->scheme) { |
| cp->ramcache_enabled = config_vol->ramcache_enabled; |
| config_vol->cachep = cp; |
| } else { |
| /* delete this volume from all the disks */ |
| int d_no; |
| int clearCV = 1; |
| |
| for (d_no = 0; d_no < gndisks; d_no++) { |
| if (cp->disk_vols[d_no]) { |
| if (cp->disk_vols[d_no]->disk->forced_volume_num == cp->vol_number) { |
| clearCV = 0; |
| config_vol->cachep = cp; |
| } else { |
| cp->disk_vols[d_no]->disk->delete_volume(cp->vol_number); |
| cp->disk_vols[d_no] = nullptr; |
| } |
| } |
| } |
| if (clearCV) { |
| config_vol = nullptr; |
| } |
| } |
| break; |
| } |
| } |
| |
| if (!config_vol) { |
| // did not find a matching volume in the config file. |
| // Delete the volume from the cache vol list |
| int d_no; |
| for (d_no = 0; d_no < gndisks; d_no++) { |
| if (cp->disk_vols[d_no]) { |
| cp->disk_vols[d_no]->disk->delete_volume(cp->vol_number); |
| } |
| } |
| CacheVol *temp_cp = cp; |
| cp = cp->link.next; |
| cp_list.remove(temp_cp); |
| cp_list_len--; |
| delete temp_cp; |
| continue; |
| } else { |
| cp = cp->link.next; |
| } |
| } |
| |
| // Look for (exclusive) spans forced to a specific volume but not yet referenced by any volumes in cp_list, |
| // if found then create a new volume. This also makes sure new exclusive disk volumes are created first |
| // before any other new volumes to assure proper span free space calculation and proper volume block distribution. |
| for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) { |
| if (nullptr == config_vol->cachep) { |
| // Find out if this is a forced volume assigned exclusively to a span which was cleared (hence not referenced in cp_list). |
| // Note: non-exclusive cleared spans are not handled here, only the "exclusive" |
| bool forced_volume = false; |
| for (int d_no = 0; d_no < gndisks; d_no++) { |
| if (gdisks[d_no]->forced_volume_num == config_vol->number) { |
| forced_volume = true; |
| } |
| } |
| |
| if (forced_volume) { |
| CacheVol *new_cp = new CacheVol(); |
| if (nullptr != new_cp) { |
| new_cp->disk_vols = static_cast<decltype(new_cp->disk_vols)>(ats_malloc(gndisks * sizeof(DiskVol *))); |
| if (nullptr != new_cp->disk_vols) { |
| memset(new_cp->disk_vols, 0, gndisks * sizeof(DiskVol *)); |
| new_cp->vol_number = config_vol->number; |
| new_cp->scheme = config_vol->scheme; |
| config_vol->cachep = new_cp; |
| fillExclusiveDisks(config_vol->cachep); |
| cp_list.enqueue(new_cp); |
| } else { |
| delete new_cp; |
| } |
| } |
| } |
| } else { |
| // Fill if this is exclusive disk. |
| fillExclusiveDisks(config_vol->cachep); |
| } |
| } |
| } |
| |
| static int |
| fillExclusiveDisks(CacheVol *cp) |
| { |
| int diskCount = 0; |
| int volume_number = cp->vol_number; |
| |
| Debug("cache_init", "volume %d", volume_number); |
| for (int i = 0; i < gndisks; i++) { |
| if (gdisks[i]->forced_volume_num != volume_number) { |
| continue; |
| } |
| |
| /* OK, this should be an "exclusive" disk (span). */ |
| diskCount++; |
| |
| /* There should be a single "forced" volume and no other volumes should exist on this "exclusive" disk (span) */ |
| bool found_nonforced_volumes = false; |
| for (int j = 0; j < static_cast<int>(gdisks[i]->header->num_volumes); j++) { |
| if (volume_number != gdisks[i]->disk_vols[j]->vol_number) { |
| found_nonforced_volumes = true; |
| break; |
| } |
| } |
| |
| if (found_nonforced_volumes) { |
| /* The user had created several volumes before - clear the disk and create one volume for http */ |
| Note("Clearing Disk: %s", gdisks[i]->path); |
| gdisks[i]->delete_all_volumes(); |
| } else if (1 == gdisks[i]->header->num_volumes) { |
| /* "Forced" volumes take the whole disk (span) hence nothing more to do for this span. */ |
| continue; |
| } |
| |
| /* Now, volumes have been either deleted or did not exist to begin with so we need to create them. */ |
| |
| int64_t size_diff = gdisks[i]->num_usable_blocks; |
| DiskVolBlock *dpb; |
| do { |
| dpb = gdisks[i]->create_volume(volume_number, size_diff, cp->scheme); |
| if (dpb) { |
| if (!cp->disk_vols[i]) { |
| cp->disk_vols[i] = gdisks[i]->get_diskvol(volume_number); |
| } |
| size_diff -= dpb->len; |
| cp->size += dpb->len; |
| cp->num_vols++; |
| } else { |
| Debug("cache_init", "create_volume failed"); |
| break; |
| } |
| } while ((size_diff > 0)); |
| } |
| |
| /* Report back the number of disks (spans) that were assigned to volume specified by volume_number. */ |
| return diskCount; |
| } |
| |
| int |
| cplist_reconfigure() |
| { |
| int64_t size; |
| int volume_number; |
| off_t size_in_blocks; |
| ConfigVol *config_vol; |
| |
| gnvol = 0; |
| if (config_volumes.num_volumes == 0) { |
| /* only the http cache */ |
| CacheVol *cp = new CacheVol(); |
| cp->vol_number = 0; |
| cp->scheme = CACHE_HTTP_TYPE; |
| cp->disk_vols = static_cast<DiskVol **>(ats_malloc(gndisks * sizeof(DiskVol *))); |
| memset(cp->disk_vols, 0, gndisks * sizeof(DiskVol *)); |
| cp_list.enqueue(cp); |
| cp_list_len++; |
| for (int i = 0; i < gndisks; i++) { |
| if (gdisks[i]->header->num_volumes != 1 || gdisks[i]->disk_vols[0]->vol_number != 0) { |
| /* The user had created several volumes before - clear the disk |
| and create one volume for http */ |
| Note("Clearing Disk: %s", gdisks[i]->path); |
| gdisks[i]->delete_all_volumes(); |
| } |
| if (gdisks[i]->cleared) { |
| uint64_t free_space = gdisks[i]->free_space * STORE_BLOCK_SIZE; |
| int vols = (free_space / MAX_VOL_SIZE) + 1; |
| for (int p = 0; p < vols; p++) { |
| off_t b = gdisks[i]->free_space / (vols - p); |
| Debug("cache_hosting", "blocks = %" PRId64, (int64_t)b); |
| DiskVolBlock *dpb = gdisks[i]->create_volume(0, b, CACHE_HTTP_TYPE); |
| ink_assert(dpb && dpb->len == (uint64_t)b); |
| } |
| ink_assert(gdisks[i]->free_space == 0); |
| } |
| |
| ink_assert(gdisks[i]->header->num_volumes == 1); |
| DiskVol **dp = gdisks[i]->disk_vols; |
| gnvol += dp[0]->num_volblocks; |
| cp->size += dp[0]->size; |
| cp->num_vols += dp[0]->num_volblocks; |
| cp->disk_vols[i] = dp[0]; |
| } |
| |
| } else { |
| for (int i = 0; i < gndisks; i++) { |
| if (gdisks[i]->header->num_volumes == 1 && gdisks[i]->disk_vols[0]->vol_number == 0) { |
| /* The user had created several volumes before - clear the disk |
| and create one volume for http */ |
| Note("Clearing Disk: %s", gdisks[i]->path); |
| gdisks[i]->delete_all_volumes(); |
| } |
| } |
| |
| /* change percentages in the config partitions to absolute value */ |
| off_t tot_space_in_blks = 0; |
| off_t blocks_per_vol = VOL_BLOCK_SIZE / STORE_BLOCK_SIZE; |
| /* sum up the total space available on all the disks. |
| round down the space to 128 megabytes */ |
| for (int i = 0; i < gndisks; i++) { |
| // Exclude exclusive disks (with forced volumes) from the following total space calculation, |
| // in such a way forced volumes will not impact volume percentage calculations. |
| if (-1 == gdisks[i]->forced_volume_num) { |
| tot_space_in_blks += (gdisks[i]->num_usable_blocks / blocks_per_vol) * blocks_per_vol; |
| } |
| } |
| |
| double percent_remaining = 100.00; |
| for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) { |
| if (config_vol->in_percent) { |
| if (config_vol->percent > percent_remaining) { |
| Warning("total volume sizes added up to more than 100%%!"); |
| Warning("no volumes created"); |
| return -1; |
| } |
| |
| // Find if the volume is forced and if it is then calculate the total forced volume size. |
| // Forced volumes take the whole span (disk) also sum all disk space this volume is forced to. |
| int64_t tot_forced_space_in_blks = 0; |
| for (int i = 0; i < gndisks; i++) { |
| if (config_vol->number == gdisks[i]->forced_volume_num) { |
| tot_forced_space_in_blks += (gdisks[i]->num_usable_blocks / blocks_per_vol) * blocks_per_vol; |
| } |
| } |
| |
| int64_t space_in_blks = 0; |
| if (0 == tot_forced_space_in_blks) { |
| // Calculate the space as percentage of total space in blocks. |
| space_in_blks = static_cast<int64_t>(((config_vol->percent / percent_remaining)) * tot_space_in_blks); |
| } else { |
| // Forced volumes take all disk space, so no percentage calculations here. |
| space_in_blks = tot_forced_space_in_blks; |
| } |
| |
| space_in_blks = space_in_blks >> (20 - STORE_BLOCK_SHIFT); |
| /* round down to 128 megabyte multiple */ |
| space_in_blks = (space_in_blks >> 7) << 7; |
| config_vol->size = space_in_blks; |
| |
| if (0 == tot_forced_space_in_blks) { |
| tot_space_in_blks -= space_in_blks << (20 - STORE_BLOCK_SHIFT); |
| percent_remaining -= (config_vol->size < 128) ? 0 : config_vol->percent; |
| } |
| } |
| if (config_vol->size < 128) { |
| Warning("the size of volume %d (%" PRId64 ") is less than the minimum required volume size %d", config_vol->number, |
| (int64_t)config_vol->size, 128); |
| Warning("volume %d is not created", config_vol->number); |
| } |
| Debug("cache_hosting", "Volume: %d Size: %" PRId64 " Ramcache: %d", config_vol->number, (int64_t)config_vol->size, |
| config_vol->ramcache_enabled); |
| } |
| cplist_update(); |
| |
| /* go through volume config and grow and create volumes */ |
| for (config_vol = config_volumes.cp_queue.head; config_vol; config_vol = config_vol->link.next) { |
| size = config_vol->size; |
| if (size < 128) { |
| continue; |
| } |
| |
| volume_number = config_vol->number; |
| |
| size_in_blocks = (static_cast<off_t>(size) * 1024 * 1024) / STORE_BLOCK_SIZE; |
| |
| if (config_vol->cachep && config_vol->cachep->num_vols > 0) { |
| gnvol += config_vol->cachep->num_vols; |
| continue; |
| } |
| |
| if (!config_vol->cachep) { |
| // we did not find a corresponding entry in cache vol...create one |
| |
| CacheVol *new_cp = new CacheVol(); |
| new_cp->disk_vols = static_cast<DiskVol **>(ats_malloc(gndisks * sizeof(DiskVol *))); |
| memset(new_cp->disk_vols, 0, gndisks * sizeof(DiskVol *)); |
| if (create_volume(config_vol->number, size_in_blocks, config_vol->scheme, new_cp)) { |
| ats_free(new_cp->disk_vols); |
| new_cp->disk_vols = nullptr; |
| delete new_cp; |
| return -1; |
| } |
| cp_list.enqueue(new_cp); |
| cp_list_len++; |
| config_vol->cachep = new_cp; |
| gnvol += new_cp->num_vols; |
| continue; |
| } |
| // else |
| CacheVol *cp = config_vol->cachep; |
| ink_assert(cp->size <= size_in_blocks); |
| if (cp->size == size_in_blocks) { |
| gnvol += cp->num_vols; |
| continue; |
| } |
| // else the size is greater... |
| /* search the cp_list */ |
| |
| int *sorted_vols = new int[gndisks]; |
| for (int i = 0; i < gndisks; i++) { |
| sorted_vols[i] = i; |
| } |
| for (int i = 0; i < gndisks - 1; i++) { |
| int smallest = sorted_vols[i]; |
| int smallest_ndx = i; |
| for (int j = i + 1; j < gndisks; j++) { |
| int curr = sorted_vols[j]; |
| DiskVol *dvol = cp->disk_vols[curr]; |
| if (gdisks[curr]->cleared) { |
| ink_assert(!dvol); |
| // disks that are cleared should be filled first |
| smallest = curr; |
| smallest_ndx = j; |
| } else if (!dvol && cp->disk_vols[smallest]) { |
| smallest = curr; |
| smallest_ndx = j; |
| } else if (dvol && cp->disk_vols[smallest] && (dvol->size < cp->disk_vols[smallest]->size)) { |
| smallest = curr; |
| smallest_ndx = j; |
| } |
| } |
| sorted_vols[smallest_ndx] = sorted_vols[i]; |
| sorted_vols[i] = smallest; |
| } |
| |
| int64_t size_to_alloc = size_in_blocks - cp->size; |
| int disk_full = 0; |
| for (int i = 0; (i < gndisks) && size_to_alloc; i++) { |
| int disk_no = sorted_vols[i]; |
| ink_assert(cp->disk_vols[sorted_vols[gndisks - 1]]); |
| int largest_vol = cp->disk_vols[sorted_vols[gndisks - 1]]->size; |
| |
| /* allocate storage on new disk. Find the difference |
| between the biggest volume on any disk and |
| the volume on this disk and try to make |
| them equal */ |
| int64_t size_diff = (cp->disk_vols[disk_no]) ? largest_vol - cp->disk_vols[disk_no]->size : largest_vol; |
| size_diff = (size_diff < size_to_alloc) ? size_diff : size_to_alloc; |
| /* if size_diff == 0, then the disks have volumes of the |
| same sizes, so we don't need to balance the disks */ |
| if (size_diff == 0) { |
| break; |
| } |
| |
| DiskVolBlock *dpb; |
| do { |
| dpb = gdisks[disk_no]->create_volume(volume_number, size_diff, cp->scheme); |
| if (dpb) { |
| if (!cp->disk_vols[disk_no]) { |
| cp->disk_vols[disk_no] = gdisks[disk_no]->get_diskvol(volume_number); |
| } |
| size_diff -= dpb->len; |
| cp->size += dpb->len; |
| cp->num_vols++; |
| } else { |
| break; |
| } |
| } while ((size_diff > 0)); |
| |
| if (!dpb) { |
| disk_full++; |
| } |
| |
| size_to_alloc = size_in_blocks - cp->size; |
| } |
| |
| delete[] sorted_vols; |
| |
| if (size_to_alloc) { |
| if (create_volume(volume_number, size_to_alloc, cp->scheme, cp)) { |
| return -1; |
| } |
| } |
| gnvol += cp->num_vols; |
| } |
| } |
| return 0; |
| } |
| |
| // This is some really bad code, and needs to be rewritten! |
| int |
| create_volume(int volume_number, off_t size_in_blocks, int scheme, CacheVol *cp) |
| { |
| static int curr_vol = 0; // FIXME: this will not reinitialize correctly |
| off_t to_create = size_in_blocks; |
| off_t blocks_per_vol = VOL_BLOCK_SIZE >> STORE_BLOCK_SHIFT; |
| int full_disks = 0; |
| |
| cp->vol_number = volume_number; |
| cp->scheme = scheme; |
| if (fillExclusiveDisks(cp)) { |
| Debug("cache_init", "volume successfully filled from forced disks: volume_number=%d", volume_number); |
| return 0; |
| } |
| |
| int *sp = new int[gndisks]; |
| memset(sp, 0, gndisks * sizeof(int)); |
| |
| int i = curr_vol; |
| while (size_in_blocks > 0) { |
| if (gdisks[i]->free_space >= (sp[i] + blocks_per_vol)) { |
| sp[i] += blocks_per_vol; |
| size_in_blocks -= blocks_per_vol; |
| full_disks = 0; |
| } else { |
| full_disks += 1; |
| if (full_disks == gndisks) { |
| char config_file[PATH_NAME_MAX]; |
| REC_ReadConfigString(config_file, "proxy.config.cache.volume_filename", PATH_NAME_MAX); |
| if (cp->size) { |
| Warning("not enough space to increase volume: [%d] to size: [%" PRId64 "]", volume_number, |
| (int64_t)((to_create + cp->size) >> (20 - STORE_BLOCK_SHIFT))); |
| } else { |
| Warning("not enough space to create volume: [%d], size: [%" PRId64 "]", volume_number, |
| (int64_t)(to_create >> (20 - STORE_BLOCK_SHIFT))); |
| } |
| |
| Note("edit the %s file and restart traffic_server", config_file); |
| delete[] sp; |
| return -1; |
| } |
| } |
| i = (i + 1) % gndisks; |
| } |
| cp->vol_number = volume_number; |
| cp->scheme = scheme; |
| curr_vol = i; |
| for (i = 0; i < gndisks; i++) { |
| if (sp[i] > 0) { |
| while (sp[i] > 0) { |
| DiskVolBlock *p = gdisks[i]->create_volume(volume_number, sp[i], scheme); |
| ink_assert(p && (p->len >= (unsigned int)blocks_per_vol)); |
| sp[i] -= p->len; |
| cp->num_vols++; |
| cp->size += p->len; |
| } |
| if (!cp->disk_vols[i]) { |
| cp->disk_vols[i] = gdisks[i]->get_diskvol(volume_number); |
| } |
| } |
| } |
| delete[] sp; |
| return 0; |
| } |
| |
| void |
| rebuild_host_table(Cache *cache) |
| { |
| build_vol_hash_table(&cache->hosttable->gen_host_rec); |
| if (cache->hosttable->m_numEntries != 0) { |
| CacheHostMatcher *hm = cache->hosttable->getHostMatcher(); |
| CacheHostRecord *h_rec = hm->getDataArray(); |
| int h_rec_len = hm->getNumElements(); |
| int i; |
| for (i = 0; i < h_rec_len; i++) { |
| build_vol_hash_table(&h_rec[i]); |
| } |
| } |
| } |
| |
| // if generic_host_rec.vols == nullptr, what do we do??? |
| Vol * |
| Cache::key_to_vol(const CacheKey *key, const char *hostname, int host_len) |
| { |
| uint32_t h = (key->slice32(2) >> DIR_TAG_WIDTH) % VOL_HASH_TABLE_SIZE; |
| unsigned short *hash_table = hosttable->gen_host_rec.vol_hash_table; |
| CacheHostRecord *host_rec = &hosttable->gen_host_rec; |
| |
| if (hosttable->m_numEntries > 0 && host_len) { |
| CacheHostResult res; |
| hosttable->Match(hostname, host_len, &res); |
| if (res.record) { |
| unsigned short *host_hash_table = res.record->vol_hash_table; |
| if (host_hash_table) { |
| if (is_debug_tag_set("cache_hosting")) { |
| char format_str[50]; |
| snprintf(format_str, sizeof(format_str), "Volume: %%xd for host: %%.%ds", host_len); |
| Debug("cache_hosting", format_str, res.record, hostname); |
| } |
| return res.record->vols[host_hash_table[h]]; |
| } |
| } |
| } |
| if (hash_table) { |
| if (is_debug_tag_set("cache_hosting")) { |
| char format_str[50]; |
| snprintf(format_str, sizeof(format_str), "Generic volume: %%xd for host: %%.%ds", host_len); |
| Debug("cache_hosting", format_str, host_rec, hostname); |
| } |
| return host_rec->vols[hash_table[h]]; |
| } else { |
| return host_rec->vols[0]; |
| } |
| } |
| |
| static void |
| reg_int(const char *str, int stat, RecRawStatBlock *rsb, const char *prefix, RecRawStatSyncCb sync_cb = RecRawStatSyncSum) |
| { |
| char stat_str[256]; |
| snprintf(stat_str, sizeof(stat_str), "%s.%s", prefix, str); |
| RecRegisterRawStat(rsb, RECT_PROCESS, stat_str, RECD_INT, RECP_NON_PERSISTENT, stat, sync_cb); |
| DOCACHE_CLEAR_DYN_STAT(stat) |
| } |
| #define REG_INT(_str, _stat) reg_int(_str, (int)_stat, rsb, prefix) |
| |
| // Register Stats |
| void |
| register_cache_stats(RecRawStatBlock *rsb, const char *prefix) |
| { |
| // Special case for this sucker, since it uses its own aggregator. |
| reg_int("bytes_used", cache_bytes_used_stat, rsb, prefix, cache_stats_bytes_used_cb); |
| |
| REG_INT("bytes_total", cache_bytes_total_stat); |
| REG_INT("ram_cache.total_bytes", cache_ram_cache_bytes_total_stat); |
| REG_INT("ram_cache.bytes_used", cache_ram_cache_bytes_stat); |
| REG_INT("ram_cache.hits", cache_ram_cache_hits_stat); |
| REG_INT("ram_cache.misses", cache_ram_cache_misses_stat); |
| REG_INT("pread_count", cache_pread_count_stat); |
| REG_INT("percent_full", cache_percent_full_stat); |
| REG_INT("lookup.active", cache_lookup_active_stat); |
| REG_INT("lookup.success", cache_lookup_success_stat); |
| REG_INT("lookup.failure", cache_lookup_failure_stat); |
| REG_INT("read.active", cache_read_active_stat); |
| REG_INT("read.success", cache_read_success_stat); |
| REG_INT("read.failure", cache_read_failure_stat); |
| REG_INT("read.seek.failure", cache_read_seek_fail_stat); |
| REG_INT("write.active", cache_write_active_stat); |
| REG_INT("write.success", cache_write_success_stat); |
| REG_INT("write.failure", cache_write_failure_stat); |
| REG_INT("write.backlog.failure", cache_write_backlog_failure_stat); |
| REG_INT("update.active", cache_update_active_stat); |
| REG_INT("update.success", cache_update_success_stat); |
| REG_INT("update.failure", cache_update_failure_stat); |
| REG_INT("remove.active", cache_remove_active_stat); |
| REG_INT("remove.success", cache_remove_success_stat); |
| REG_INT("remove.failure", cache_remove_failure_stat); |
| REG_INT("evacuate.active", cache_evacuate_active_stat); |
| REG_INT("evacuate.success", cache_evacuate_success_stat); |
| REG_INT("evacuate.failure", cache_evacuate_failure_stat); |
| REG_INT("scan.active", cache_scan_active_stat); |
| REG_INT("scan.success", cache_scan_success_stat); |
| REG_INT("scan.failure", cache_scan_failure_stat); |
| REG_INT("direntries.total", cache_direntries_total_stat); |
| REG_INT("direntries.used", cache_direntries_used_stat); |
| REG_INT("directory_collision", cache_directory_collision_count_stat); |
| REG_INT("frags_per_doc.1", cache_single_fragment_document_count_stat); |
| REG_INT("frags_per_doc.2", cache_two_fragment_document_count_stat); |
| REG_INT("frags_per_doc.3+", cache_three_plus_plus_fragment_document_count_stat); |
| REG_INT("read_busy.success", cache_read_busy_success_stat); |
| REG_INT("read_busy.failure", cache_read_busy_failure_stat); |
| REG_INT("write_bytes_stat", cache_write_bytes_stat); |
| REG_INT("vector_marshals", cache_hdr_vector_marshal_stat); |
| REG_INT("hdr_marshals", cache_hdr_marshal_stat); |
| REG_INT("hdr_marshal_bytes", cache_hdr_marshal_bytes_stat); |
| REG_INT("gc_bytes_evacuated", cache_gc_bytes_evacuated_stat); |
| REG_INT("gc_frags_evacuated", cache_gc_frags_evacuated_stat); |
| REG_INT("wrap_count", cache_directory_wrap_stat); |
| REG_INT("sync.count", cache_directory_sync_count_stat); |
| REG_INT("sync.bytes", cache_directory_sync_bytes_stat); |
| REG_INT("sync.time", cache_directory_sync_time_stat); |
| REG_INT("span.errors.read", cache_span_errors_read_stat); |
| REG_INT("span.errors.write", cache_span_errors_write_stat); |
| REG_INT("span.failing", cache_span_failing_stat); |
| REG_INT("span.offline", cache_span_offline_stat); |
| REG_INT("span.online", cache_span_online_stat); |
| } |
| |
| int |
| FragmentSizeUpdateCb(const char * /* name ATS_UNUSED */, RecDataT /* data_type ATS_UNUSED */, RecData data, void *cookie) |
| { |
| if (sizeof(Doc) >= static_cast<size_t>(data.rec_int) || static_cast<size_t>(data.rec_int) - sizeof(Doc) > MAX_FRAG_SIZE) { |
| Warning("The fragments size exceed the limitation, ignore: %" PRId64 ", %d", data.rec_int, cache_config_target_fragment_size); |
| return 0; |
| } |
| |
| cache_config_target_fragment_size = data.rec_int; |
| return 0; |
| } |
| |
| void |
| ink_cache_init(ts::ModuleVersion v) |
| { |
| ink_release_assert(v.check(CACHE_MODULE_VERSION)); |
| |
| cache_rsb = RecAllocateRawStatBlock(static_cast<int>(cache_stat_count)); |
| |
| REC_EstablishStaticConfigInteger(cache_config_ram_cache_size, "proxy.config.cache.ram_cache.size"); |
| Debug("cache_init", "proxy.config.cache.ram_cache.size = %" PRId64 " = %" PRId64 "Mb", cache_config_ram_cache_size, |
| cache_config_ram_cache_size / (1024 * 1024)); |
| |
| REC_EstablishStaticConfigInt32(cache_config_ram_cache_algorithm, "proxy.config.cache.ram_cache.algorithm"); |
| REC_EstablishStaticConfigInt32(cache_config_ram_cache_compress, "proxy.config.cache.ram_cache.compress"); |
| REC_EstablishStaticConfigInt32(cache_config_ram_cache_compress_percent, "proxy.config.cache.ram_cache.compress_percent"); |
| REC_ReadConfigInt32(cache_config_ram_cache_use_seen_filter, "proxy.config.cache.ram_cache.use_seen_filter"); |
| |
| REC_EstablishStaticConfigInt32(cache_config_http_max_alts, "proxy.config.cache.limits.http.max_alts"); |
| Debug("cache_init", "proxy.config.cache.limits.http.max_alts = %d", cache_config_http_max_alts); |
| |
| REC_EstablishStaticConfigInt32(cache_config_log_alternate_eviction, "proxy.config.cache.log.alternate.eviction"); |
| Debug("cache_init", "proxy.config.cache.log.alternate.eviction = %d", cache_config_log_alternate_eviction); |
| |
| REC_EstablishStaticConfigInteger(cache_config_ram_cache_cutoff, "proxy.config.cache.ram_cache_cutoff"); |
| Debug("cache_init", "cache_config_ram_cache_cutoff = %" PRId64 " = %" PRId64 "Mb", cache_config_ram_cache_cutoff, |
| cache_config_ram_cache_cutoff / (1024 * 1024)); |
| |
| REC_EstablishStaticConfigInt32(cache_config_permit_pinning, "proxy.config.cache.permit.pinning"); |
| Debug("cache_init", "proxy.config.cache.permit.pinning = %d", cache_config_permit_pinning); |
| |
| REC_EstablishStaticConfigInt32(cache_config_dir_sync_frequency, "proxy.config.cache.dir.sync_frequency"); |
| Debug("cache_init", "proxy.config.cache.dir.sync_frequency = %d", cache_config_dir_sync_frequency); |
| |
| REC_EstablishStaticConfigInt32(cache_config_select_alternate, "proxy.config.cache.select_alternate"); |
| Debug("cache_init", "proxy.config.cache.select_alternate = %d", cache_config_select_alternate); |
| |
| REC_EstablishStaticConfigInt32(cache_config_max_doc_size, "proxy.config.cache.max_doc_size"); |
| Debug("cache_init", "proxy.config.cache.max_doc_size = %d = %dMb", cache_config_max_doc_size, |
| cache_config_max_doc_size / (1024 * 1024)); |
| |
| REC_EstablishStaticConfigInt32(cache_config_mutex_retry_delay, "proxy.config.cache.mutex_retry_delay"); |
| Debug("cache_init", "proxy.config.cache.mutex_retry_delay = %dms", cache_config_mutex_retry_delay); |
| |
| REC_EstablishStaticConfigInt32(cache_config_read_while_writer_max_retries, "proxy.config.cache.read_while_writer.max_retries"); |
| Debug("cache_init", "proxy.config.cache.read_while_writer.max_retries = %d", cache_config_read_while_writer_max_retries); |
| |
| REC_EstablishStaticConfigInt32(cache_read_while_writer_retry_delay, "proxy.config.cache.read_while_writer_retry.delay"); |
| Debug("cache_init", "proxy.config.cache.read_while_writer_retry.delay = %dms", cache_read_while_writer_retry_delay); |
| |
| REC_EstablishStaticConfigInt32(cache_config_hit_evacuate_percent, "proxy.config.cache.hit_evacuate_percent"); |
| Debug("cache_init", "proxy.config.cache.hit_evacuate_percent = %d", cache_config_hit_evacuate_percent); |
| |
| REC_EstablishStaticConfigInt32(cache_config_hit_evacuate_size_limit, "proxy.config.cache.hit_evacuate_size_limit"); |
| Debug("cache_init", "proxy.config.cache.hit_evacuate_size_limit = %d", cache_config_hit_evacuate_size_limit); |
| |
| REC_EstablishStaticConfigInt32(cache_config_force_sector_size, "proxy.config.cache.force_sector_size"); |
| |
| ink_assert(REC_RegisterConfigUpdateFunc("proxy.config.cache.target_fragment_size", FragmentSizeUpdateCb, nullptr) != |
| REC_ERR_FAIL); |
| REC_ReadConfigInt32(cache_config_target_fragment_size, "proxy.config.cache.target_fragment_size"); |
| |
| if (cache_config_target_fragment_size == 0 || cache_config_target_fragment_size - sizeof(Doc) > MAX_FRAG_SIZE) { |
| cache_config_target_fragment_size = DEFAULT_TARGET_FRAGMENT_SIZE; |
| } |
| |
| REC_EstablishStaticConfigInt32(cache_config_max_disk_errors, "proxy.config.cache.max_disk_errors"); |
| Debug("cache_init", "proxy.config.cache.max_disk_errors = %d", cache_config_max_disk_errors); |
| |
| REC_EstablishStaticConfigInt32(cache_config_agg_write_backlog, "proxy.config.cache.agg_write_backlog"); |
| Debug("cache_init", "proxy.config.cache.agg_write_backlog = %d", cache_config_agg_write_backlog); |
| |
| REC_EstablishStaticConfigInt32(cache_config_enable_checksum, "proxy.config.cache.enable_checksum"); |
| Debug("cache_init", "proxy.config.cache.enable_checksum = %d", cache_config_enable_checksum); |
| |
| REC_EstablishStaticConfigInt32(cache_config_alt_rewrite_max_size, "proxy.config.cache.alt_rewrite_max_size"); |
| Debug("cache_init", "proxy.config.cache.alt_rewrite_max_size = %d", cache_config_alt_rewrite_max_size); |
| |
| REC_EstablishStaticConfigInt32(cache_config_read_while_writer, "proxy.config.cache.enable_read_while_writer"); |
| cache_config_read_while_writer = validate_rww(cache_config_read_while_writer); |
| REC_RegisterConfigUpdateFunc("proxy.config.cache.enable_read_while_writer", update_cache_config, nullptr); |
| Debug("cache_init", "proxy.config.cache.enable_read_while_writer = %d", cache_config_read_while_writer); |
| |
| register_cache_stats(cache_rsb, "proxy.process.cache"); |
| |
| REC_ReadConfigInteger(cacheProcessor.wait_for_cache, "proxy.config.http.wait_for_cache"); |
| |
| Result result = theCacheStore.read_config(); |
| if (result.failed()) { |
| Fatal("Failed to read cache configuration %s: %s", ts::filename::STORAGE, result.message()); |
| } |
| } |
| |
| //---------------------------------------------------------------------------- |
| Action * |
| CacheProcessor::open_read(Continuation *cont, const HttpCacheKey *key, CacheHTTPHdr *request, |
| const OverridableHttpConfigParams *params, time_t pin_in_cache, CacheFragType type) |
| { |
| return caches[type]->open_read(cont, &key->hash, request, params, type, key->hostname, key->hostlen); |
| } |
| |
| //---------------------------------------------------------------------------- |
| Action * |
| CacheProcessor::open_write(Continuation *cont, int expected_size, const HttpCacheKey *key, CacheHTTPHdr *request, |
| CacheHTTPInfo *old_info, time_t pin_in_cache, CacheFragType type) |
| { |
| return caches[type]->open_write(cont, &key->hash, old_info, pin_in_cache, nullptr /* key1 */, type, key->hostname, key->hostlen); |
| } |
| |
| //---------------------------------------------------------------------------- |
| // Note: this should not be called from the cluster processor, or bad |
| // recursion could occur. This is merely a convenience wrapper. |
| Action * |
| CacheProcessor::remove(Continuation *cont, const HttpCacheKey *key, CacheFragType frag_type) |
| { |
| return caches[frag_type]->remove(cont, &key->hash, frag_type, key->hostname, key->hostlen); |
| } |
| |
| CacheDisk * |
| CacheProcessor::find_by_path(const char *path, int len) |
| { |
| if (CACHE_INITIALIZED == initialized) { |
| // If no length is passed in, assume it's null terminated. |
| if (0 >= len && 0 != *path) { |
| len = strlen(path); |
| } |
| |
| for (int i = 0; i < gndisks; ++i) { |
| if (0 == strncmp(path, gdisks[i]->path, len)) { |
| return gdisks[i]; |
| } |
| } |
| } |
| |
| return nullptr; |
| } |