| /** @file |
| |
| A brief file description |
| |
| @section license License |
| |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| #include "CacheVC.h" |
| #include "P_CacheDoc.h" |
| #include "P_CacheHttp.h" |
| #include "P_CacheInternal.h" |
| #include "iocore/cache/Cache.h" |
| #include "tscore/InkErrno.h" |
| #include "tsutil/DbgCtl.h" |
| |
| namespace |
| { |
| |
| DbgCtl dbg_ctl_cache_update{"cache_update"}; |
| DbgCtl dbg_ctl_cache_update_alt{"cache_update_alt"}; |
| |
| #ifdef DEBUG |
| |
| DbgCtl dbg_ctl_cache_stats{"cache_stats"}; |
| DbgCtl dbg_ctl_cache_write{"cache_write"}; |
| DbgCtl dbg_ctl_cache_insert{"cache_insert"}; |
| |
| #endif |
| |
| } // end anonymous namespace |
| |
| // Given a key, finds the index of the alternate which matches |
| // used to get the alternate which is actually present in the document |
| int |
| get_alternate_index(CacheHTTPInfoVector *cache_vector, CacheKey key) |
| { |
| int alt_count = cache_vector->count(); |
| CacheHTTPInfo *obj; |
| if (!alt_count) { |
| return -1; |
| } |
| for (int i = 0; i < alt_count; i++) { |
| obj = cache_vector->get(i); |
| if (obj->compare_object_key(&key)) { |
| return i; |
| } |
| } |
| return -1; |
| } |
| |
| // Adds/Deletes alternate to the od->vector (write_vector). If the vector |
| // is empty, deletes the directory entry pointing to the vector. Each |
| // CacheVC must write the vector down to disk after making changes. If we |
| // wait till the last writer, that writer will have the responsibility of |
| // of writing the vector even if the http state machine aborts. This |
| // makes it easier to handle situations where writers abort. |
| int |
| CacheVC::updateVector(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) |
| { |
| cancel_trigger(); |
| if (od->reading_vec || od->writing_vec) { |
| VC_SCHED_LOCK_RETRY(); |
| } |
| int ret = 0; |
| { |
| CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding); |
| if (!lock.is_locked() || od->writing_vec) { |
| VC_SCHED_LOCK_RETRY(); |
| } |
| |
| int vec = alternate.valid(); |
| if (f.update) { |
| // all Update cases. Need to get the alternate index. |
| alternate_index = get_alternate_index(write_vector, update_key); |
| Dbg(dbg_ctl_cache_update, "updating alternate index %d frags %d", alternate_index, |
| alternate_index >= 0 ? write_vector->get(alternate_index)->get_frag_offset_count() : -1); |
| // if its an alternate delete |
| if (!vec) { |
| ink_assert(!total_len); |
| if (alternate_index >= 0) { |
| write_vector->remove(alternate_index, true); |
| alternate_index = CACHE_ALT_REMOVED; |
| if (!write_vector->count()) { |
| stripe->directory.remove(&first_key, stripe, &od->first_dir); |
| } |
| } |
| // the alternate is not there any more. somebody might have |
| // deleted it. Just close this writer |
| if (alternate_index != CACHE_ALT_REMOVED || !write_vector->count()) { |
| SET_HANDLER(&CacheVC::openWriteCloseDir); |
| return openWriteCloseDir(EVENT_IMMEDIATE, nullptr); |
| } |
| } |
| if (update_key == od->single_doc_key && (total_len || f.allow_empty_doc || !vec)) { |
| od->move_resident_alt = false; |
| } |
| } |
| if (cache_config_http_max_alts > 1 && write_vector->count() >= cache_config_http_max_alts && alternate_index < 0) { |
| if (od->move_resident_alt && get_alternate_index(write_vector, od->single_doc_key) == 0) { |
| od->move_resident_alt = false; |
| } |
| if (cache_config_log_alternate_eviction) { |
| // Initially there was an attempt to make alternate eviction a log |
| // field. However it was discovered this could not work because this |
| // code, in which alternates are evicted, happens during the processing |
| // of IO which happens after transaction logs are emitted and after the |
| // HttpSM is destructed. Instead, therefore, alternate eviction logging |
| // was implemented for diags.log with the |
| // proxy.config.cache.log.alternate.eviction toggle. |
| CacheHTTPInfo *info = write_vector->get(0); |
| HTTPHdr *request = info->request_get(); |
| if (request->valid()) { |
| // Marking the request's target as dirty will guarantee that the |
| // internal members of the request used for printing the URL will be |
| // coherent and valid by the time it is printed. |
| request->mark_target_dirty(); |
| // In contrast to url_string_get, this url_print interface doesn't |
| // use HTTPHdr's m_heap which is not valid at this point because the |
| // HttpSM is most likely gone. |
| int url_length = request->url_printed_length(); |
| ats_scoped_mem<char> url_text; |
| url_text = static_cast<char *>(ats_malloc(url_length + 1)); |
| int index = 0; |
| int offset = 0; |
| // url_print does not NULL terminate, so url_length instead of url_length + 1. |
| int ret = request->url_print(url_text.get(), url_length, &index, &offset); |
| url_text.get()[url_length] = '\0'; |
| if (ret == 0) { |
| Note("Could not print URL of evicted alternate."); |
| } else { |
| Status("The maximum number of alternates was exceeded for a resource. " |
| "An alternate was evicted for URL: %.*s", |
| url_length, url_text.get()); |
| } |
| } |
| } |
| write_vector->remove(0, true); |
| } |
| if (vec) { |
| /* preserve fragment offset data from old info. This method is |
| called iff the update is a header only update so the fragment |
| data should remain valid. |
| */ |
| // If we are not in header only updating case. Don't copy fragments. |
| if (alternate_index >= 0 && |
| ((total_len == 0 && alternate.get_frag_offset_count() == 0) && !(f.allow_empty_doc && this->vio.nbytes == 0))) { |
| alternate.copy_frag_offsets_from(write_vector->get(alternate_index)); |
| } |
| alternate_index = write_vector->insert(&alternate, alternate_index); |
| } |
| |
| if (od->move_resident_alt && first_buf.get() && !od->has_multiple_writers()) { |
| Doc *doc = reinterpret_cast<Doc *>(first_buf->data()); |
| int small_doc = static_cast<int64_t>(doc->data_len()) < static_cast<int64_t>(cache_config_alt_rewrite_max_size); |
| int have_res_alt = doc->key == od->single_doc_key; |
| // if the new alternate is not written with the vector |
| // then move the old one with the vector |
| // if its a header only update move the resident alternate |
| // with the vector. |
| // We are sure that the body of the resident alternate that we are |
| // rewriting has not changed and the alternate is not being deleted, |
| // since we set od->move_resident_alt to 0 in that case |
| // (in updateVector) |
| if (small_doc && have_res_alt && (fragment || (f.update && !total_len))) { |
| // for multiple fragment document, we must have done |
| // CacheVC:openWriteCloseDataDone |
| ink_assert(!fragment || f.data_done); |
| od->move_resident_alt = false; |
| f.rewrite_resident_alt = 1; |
| write_len = doc->data_len(); |
| Dbg(dbg_ctl_cache_update_alt, "rewriting resident alt size: %d key: %X, first_key: %X", write_len, doc->key.slice32(0), |
| first_key.slice32(0)); |
| } |
| } |
| header_len = write_vector->marshal_length(); |
| od->writing_vec = true; |
| f.use_first_key = 1; |
| SET_HANDLER(&CacheVC::openWriteCloseHeadDone); |
| ret = do_write_call(); |
| } |
| if (ret == EVENT_RETURN) { |
| return handleEvent(AIO_EVENT_DONE, nullptr); |
| } |
| return ret; |
| } |
| /* |
| The following fields of the CacheVC are used when writing down a fragment. |
| Make sure that each of the fields is set to a valid value before calling |
| this function |
| - frag_type. Checked to see if a vector needs to be marshalled. |
| - f.use_first_key. To decide if the vector should be marshalled and to set |
| the doc->key to the appropriate key (first_key or earliest_key) |
| - f.evac_vector. If set, the writer is pushed in the beginning of the |
| agg queue. And if !f.evac_vector && !f.update the alternate->object_size |
| is set to vc->total_len |
| - f.readers. If set, assumes that this is an evacuation, so the write |
| is not aborted even if |
| stripe->_write_buffer.get_bytes_pending_aggregation() > agg_write_backlog |
| - f.evacuator. If this is an evacuation. |
| - f.rewrite_resident_alt. The resident alternate is rewritten. |
| - f.update. Used only if the write_vector needs to be written to disk. |
| Used to set the length of the alternate to total_len. |
| - write_vector. Used only if frag_type == CACHE_FRAG_TYPE_HTTP && |
| (f.use_first_key || f.evac_vector) is set. Write_vector is written to disk |
| - alternate_index. Used only if write_vector needs to be written to disk. |
| Used to find out the VC's alternate in the write_vector and set its |
| length to tatal_len. |
| - write_len. The number of bytes for this fragment. |
| - total_len. The total number of bytes for the document so far. |
| Doc->total_len and alternate's total len is set to this value. |
| - first_key. Doc's first_key is set to this value. |
| - pin_in_cache. Doc's pinned value is set to this + ink_get_hrtime(). |
| - earliest_key. If f.use_first_key, Doc's key is set to this value. |
| - key. If !f.use_first_key, Doc's key is set to this value. |
| - blocks. Used only if write_len is set. Data to be written |
| - offset. Used only if write_len is set. offset into the block to copy |
| the data from. |
| - buf. Used only if f.evacuator is set. Should point to the old document. |
| The functions sets the length, offset, pinned, head and phase of vc->dir. |
| */ |
| |
| int |
| CacheVC::handleWrite(int event, Event * /* e ATS_UNUSED */) |
| { |
| // plain write case |
| ink_assert(!trigger); |
| frag_len = 0; |
| |
| set_agg_write_in_progress(); |
| POP_HANDLER; |
| |
| bool max_doc_error = (cache_config_max_doc_size && (cache_config_max_doc_size < vio.ndone || |
| (vio.nbytes != INT64_MAX && (cache_config_max_doc_size < vio.nbytes)))); |
| // Make sure the size is correct for checking error conditions before calling add_writer(this). |
| agg_len = stripe->round_to_approx_size(write_len + header_len + frag_len + sizeof(Doc)); |
| if (max_doc_error || !stripe->add_writer(this)) { |
| ts::Metrics::Counter::increment(cache_rsb.write_backlog_failure); |
| ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.write_backlog_failure); |
| ts::Metrics::Counter::increment(cache_rsb.status[op_type].failure); |
| ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.status[op_type].failure); |
| io.aio_result = AIO_SOFT_FAILURE; |
| if (event == EVENT_CALL) { |
| return EVENT_RETURN; |
| } |
| return handleEvent(AIO_EVENT_DONE, nullptr); |
| } |
| if (!stripe->is_io_in_progress()) { |
| return stripe->aggWrite(event, this); |
| } |
| return EVENT_CONT; |
| } |
| |
| int |
| CacheVC::openWriteCloseDir(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) |
| { |
| cancel_trigger(); |
| { |
| CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding); |
| if (!lock.is_locked()) { |
| SET_HANDLER(&CacheVC::openWriteCloseDir); |
| ink_assert(!is_io_in_progress()); |
| VC_SCHED_LOCK_RETRY(); |
| } |
| stripe->close_write(this); |
| if (closed < 0 && fragment) { |
| stripe->directory.remove(&earliest_key, stripe, &earliest_dir); |
| } |
| } |
| if (dbg_ctl_cache_update.on()) { |
| if (f.update && closed > 0) { |
| if (!total_len && !f.allow_empty_doc && alternate_index != CACHE_ALT_REMOVED) { |
| Dbg(dbg_ctl_cache_update, "header only %d (%" PRIu64 ", %" PRIu64 ")", DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0], |
| update_key.b[1]); |
| |
| } else if ((total_len || f.allow_empty_doc) && alternate_index != CACHE_ALT_REMOVED) { |
| Dbg(dbg_ctl_cache_update, "header body, %d, (%" PRIu64 ", %" PRIu64 "), (%" PRIu64 ", %" PRIu64 ")", |
| DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0], update_key.b[1], earliest_key.b[0], earliest_key.b[1]); |
| } else if (!total_len && alternate_index == CACHE_ALT_REMOVED) { |
| Dbg(dbg_ctl_cache_update, "alt delete, %d, (%" PRIu64 ", %" PRIu64 ")", DIR_MASK_TAG(first_key.slice32(2)), update_key.b[0], |
| update_key.b[1]); |
| } |
| } |
| } |
| // update the appropriate stat variable |
| // These variables may not give the current no of documents with |
| // one, two and three or more fragments. This is because for |
| // updates we dont decrement the variable corresponding the old |
| // size of the document |
| if ((closed == 1) && (total_len > 0 || f.allow_empty_doc)) { |
| DDbg(dbg_ctl_cache_stats, "Fragment = %d", fragment); |
| |
| ts::Metrics::Counter::increment(cache_rsb.fragment_document_count[std::clamp(fragment, 0, 2)]); |
| ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.fragment_document_count[std::clamp(fragment, 0, 2)]); |
| } |
| if (f.close_complete) { |
| recursive++; |
| ink_assert(!stripe || this_ethread() != stripe->mutex->thread_holding); |
| vio.cont->handleEvent(VC_EVENT_WRITE_COMPLETE, reinterpret_cast<void *>(&vio)); |
| recursive--; |
| } |
| return free_CacheVC(this); |
| } |
| |
| int |
| CacheVC::openWriteCloseHeadDone(int event, Event *e) |
| { |
| if (event == AIO_EVENT_DONE) { |
| set_io_not_in_progress(); |
| } else if (is_io_in_progress()) { |
| return EVENT_CONT; |
| } |
| { |
| CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding); |
| if (!lock.is_locked()) { |
| VC_LOCK_RETRY_EVENT(); |
| } |
| od->writing_vec = false; |
| if (!io.ok()) { |
| goto Lclose; |
| } |
| ink_assert(f.use_first_key); |
| if (!od->dont_update_directory) { |
| if (dir_is_empty(&od->first_dir)) { |
| stripe->directory.insert(&first_key, stripe, &dir); |
| } else { |
| // multiple fragment vector write |
| stripe->directory.overwrite(&first_key, stripe, &dir, &od->first_dir, false); |
| // insert moved resident alternate |
| if (od->move_resident_alt) { |
| if (stripe->dir_valid(&od->single_doc_dir)) { |
| stripe->directory.insert(&od->single_doc_key, stripe, &od->single_doc_dir); |
| } |
| od->move_resident_alt = false; |
| } |
| } |
| od->first_dir = dir; |
| if (frag_type == CACHE_FRAG_TYPE_HTTP && f.single_fragment) { |
| // fragment is tied to the vector |
| od->move_resident_alt = true; |
| if (!f.rewrite_resident_alt) { |
| od->single_doc_key = earliest_key; |
| } |
| dir_assign(&od->single_doc_dir, &dir); |
| dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2)); |
| } |
| } |
| } |
| Lclose: |
| return openWriteCloseDir(event, e); |
| } |
| |
| int |
| CacheVC::openWriteCloseHead(int event, Event *e) |
| { |
| cancel_trigger(); |
| f.use_first_key = 1; |
| if (io.ok()) { |
| ink_assert(fragment || (length == static_cast<int64_t>(total_len))); |
| } else { |
| return openWriteCloseDir(event, e); |
| } |
| if (f.data_done) { |
| write_len = 0; |
| } else { |
| write_len = length; |
| } |
| if (frag_type == CACHE_FRAG_TYPE_HTTP) { |
| SET_HANDLER(&CacheVC::updateVector); |
| return updateVector(EVENT_IMMEDIATE, nullptr); |
| } else { |
| header_len = header_to_write_len; |
| SET_HANDLER(&CacheVC::openWriteCloseHeadDone); |
| return do_write_lock(); |
| } |
| } |
| |
| int |
| CacheVC::openWriteCloseDataDone(int event, Event *e) |
| { |
| int ret = 0; |
| cancel_trigger(); |
| |
| if (event == AIO_EVENT_DONE) { |
| set_io_not_in_progress(); |
| } else if (is_io_in_progress()) { |
| return EVENT_CONT; |
| } |
| if (!io.ok()) { |
| return openWriteCloseDir(event, e); |
| } |
| { |
| CACHE_TRY_LOCK(lock, stripe->mutex, this_ethread()); |
| if (!lock.is_locked()) { |
| VC_LOCK_RETRY_EVENT(); |
| } |
| if (!fragment) { |
| ink_assert(key == earliest_key); |
| earliest_dir = dir; |
| } else { |
| // Store the offset only if there is a table. |
| // Currently there is no alt (and thence no table) for non-HTTP. |
| if (alternate.valid()) { |
| alternate.push_frag_offset(write_pos); |
| } |
| } |
| fragment++; |
| write_pos += write_len; |
| stripe->directory.insert(&key, stripe, &dir); |
| blocks = iobufferblock_skip(blocks.get(), &offset, &length, write_len); |
| next_CacheKey(&key, &key); |
| if (length) { |
| write_len = length; |
| if (write_len > MAX_FRAG_SIZE) { |
| write_len = MAX_FRAG_SIZE; |
| } |
| if ((ret = do_write_call()) == EVENT_RETURN) { |
| goto Lcallreturn; |
| } |
| return ret; |
| } |
| f.data_done = 1; |
| return openWriteCloseHead(event, e); // must be called under vol lock from here |
| } |
| Lcallreturn: |
| return handleEvent(AIO_EVENT_DONE, nullptr); |
| } |
| |
| int |
| CacheVC::openWriteClose(int event, Event *e) |
| { |
| cancel_trigger(); |
| if (is_io_in_progress()) { |
| if (event != AIO_EVENT_DONE) { |
| return EVENT_CONT; |
| } |
| set_io_not_in_progress(); |
| if (!io.ok()) { |
| return openWriteCloseDir(event, e); |
| } |
| } |
| if (closed > 0 || f.allow_empty_doc) { |
| if (total_len == 0) { |
| if (f.update || f.allow_empty_doc) { |
| return updateVector(event, e); |
| } else { |
| // If we've been CLOSE'd but nothing has been written then |
| // this close is transformed into an abort. |
| closed = -1; |
| return openWriteCloseDir(event, e); |
| } |
| } |
| if (length && (fragment || length > static_cast<int>(MAX_FRAG_SIZE))) { |
| SET_HANDLER(&CacheVC::openWriteCloseDataDone); |
| write_len = length; |
| if (write_len > MAX_FRAG_SIZE) { |
| write_len = MAX_FRAG_SIZE; |
| } |
| return do_write_lock_call(); |
| } else { |
| return openWriteCloseHead(event, e); |
| } |
| } else { |
| return openWriteCloseDir(event, e); |
| } |
| } |
| |
| int |
| CacheVC::openWriteWriteDone(int event, Event *e) |
| { |
| cancel_trigger(); |
| if (event == AIO_EVENT_DONE) { |
| set_io_not_in_progress(); |
| } else if (is_io_in_progress()) { |
| return EVENT_CONT; |
| } |
| // In the event of VC_EVENT_ERROR, the cont must do an io_close |
| if (!io.ok()) { |
| if (closed) { |
| closed = -1; |
| return die(); |
| } |
| SET_HANDLER(&CacheVC::openWriteMain); |
| return calluser(VC_EVENT_ERROR); |
| } |
| { |
| CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding); |
| if (!lock.is_locked()) { |
| VC_LOCK_RETRY_EVENT(); |
| } |
| // store the earliest directory. Need to remove the earliest dir |
| // in case the writer aborts. |
| if (!fragment) { |
| ink_assert(key == earliest_key); |
| earliest_dir = dir; |
| } else { |
| // Store the offset only if there is a table. |
| // Currently there is no alt (and thence no table) for non-HTTP. |
| if (alternate.valid()) { |
| alternate.push_frag_offset(write_pos); |
| } |
| } |
| ++fragment; |
| write_pos += write_len; |
| stripe->directory.insert(&key, stripe, &dir); |
| DDbg(dbg_ctl_cache_insert, "WriteDone: %X, %X, %d", key.slice32(0), first_key.slice32(0), write_len); |
| blocks = iobufferblock_skip(blocks.get(), &offset, &length, write_len); |
| next_CacheKey(&key, &key); |
| } |
| if (closed) { |
| return die(); |
| } |
| SET_HANDLER(&CacheVC::openWriteMain); |
| return openWriteMain(event, e); |
| } |
| |
| static inline int |
| target_fragment_size(int target_frag_size) |
| { |
| uint64_t value = (target_frag_size > 0 ? target_frag_size : cache_config_target_fragment_size) - sizeof(Doc); |
| |
| ink_release_assert(value <= MAX_FRAG_SIZE); |
| return value; |
| } |
| |
| int |
| CacheVC::openWriteMain(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) |
| { |
| cancel_trigger(); |
| int called_user = 0; |
| ink_assert(!is_io_in_progress()); |
| Lagain: |
| if (!vio.get_writer()) { |
| if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE) { |
| return EVENT_DONE; |
| } |
| if (!vio.get_writer()) { |
| return EVENT_CONT; |
| } |
| } |
| if (vio.ntodo() <= 0) { |
| called_user = 1; |
| if (calluser(VC_EVENT_WRITE_COMPLETE) == EVENT_DONE) { |
| return EVENT_DONE; |
| } |
| ink_assert(!f.close_complete || !"close expected after write COMPLETE"); |
| if (vio.ntodo() <= 0) { |
| return EVENT_CONT; |
| } |
| } |
| int64_t ntodo = static_cast<int64_t>(vio.ntodo() + length); |
| int64_t total_avail = vio.get_reader()->read_avail(); |
| int64_t avail = total_avail; |
| int64_t towrite = avail + length; |
| int frag_size = target_fragment_size(stripe->frag_size); |
| |
| if (towrite > ntodo) { |
| avail -= (towrite - ntodo); |
| towrite = ntodo; |
| } |
| if (towrite > static_cast<int>(MAX_FRAG_SIZE)) { |
| avail -= (towrite - MAX_FRAG_SIZE); |
| towrite = MAX_FRAG_SIZE; |
| } |
| if (!blocks && towrite) { |
| blocks = vio.get_reader()->block; |
| offset = vio.get_reader()->start_offset; |
| } |
| if (avail > 0) { |
| vio.get_reader()->consume(avail); |
| vio.ndone += avail; |
| total_len += avail; |
| } |
| length = static_cast<uint64_t>(towrite); |
| if (length > frag_size && (length < frag_size + frag_size / 4)) { |
| write_len = frag_size; |
| } else { |
| write_len = length; |
| } |
| bool not_writing = towrite != ntodo && towrite < frag_size; |
| if (!called_user) { |
| if (not_writing) { |
| called_user = 1; |
| if (calluser(VC_EVENT_WRITE_READY) == EVENT_DONE) { |
| return EVENT_DONE; |
| } |
| goto Lagain; |
| } else if (vio.ntodo() <= 0) { |
| goto Lagain; |
| } |
| } |
| if (not_writing) { |
| return EVENT_CONT; |
| } |
| if (towrite == ntodo && f.close_complete) { |
| closed = 1; |
| SET_HANDLER(&CacheVC::openWriteClose); |
| return openWriteClose(EVENT_NONE, nullptr); |
| } |
| SET_HANDLER(&CacheVC::openWriteWriteDone); |
| return do_write_lock_call(); |
| } |
| |
| // begin overwrite |
| int |
| CacheVC::openWriteOverwrite(int event, Event *e) |
| { |
| cancel_trigger(); |
| if (event != AIO_EVENT_DONE) { |
| if (event == EVENT_IMMEDIATE) { |
| last_collision = nullptr; |
| } |
| } else { |
| Doc *doc = nullptr; |
| set_io_not_in_progress(); |
| if (_action.cancelled) { |
| return openWriteCloseDir(event, e); |
| } |
| if (!io.ok()) { |
| goto Ldone; |
| } |
| doc = reinterpret_cast<Doc *>(buf->data()); |
| if (!(doc->first_key == first_key)) { |
| goto Lcollision; |
| } |
| od->first_dir = dir; |
| first_buf = buf; |
| goto Ldone; |
| } |
| Lcollision: { |
| CACHE_TRY_LOCK(lock, stripe->mutex, this_ethread()); |
| if (!lock.is_locked()) { |
| VC_LOCK_RETRY_EVENT(); |
| } |
| int res = stripe->directory.probe(&first_key, stripe, &dir, &last_collision); |
| if (res > 0) { |
| if ((res = do_read_call(&first_key)) == EVENT_RETURN) { |
| goto Lcallreturn; |
| } |
| return res; |
| } |
| } |
| Ldone: |
| SET_HANDLER(&CacheVC::openWriteMain); |
| return callcont(CACHE_EVENT_OPEN_WRITE); |
| Lcallreturn: |
| return handleEvent(AIO_EVENT_DONE, nullptr); // hopefully a tail call |
| } |
| |
| // openWriteStartDone handles vector read (addition of alternates) |
| // and lock misses |
| int |
| CacheVC::openWriteStartDone(int event, Event *e) |
| { |
| intptr_t err = ECACHE_NO_DOC; |
| cancel_trigger(); |
| if (is_io_in_progress()) { |
| if (event != AIO_EVENT_DONE) { |
| return EVENT_CONT; |
| } |
| set_io_not_in_progress(); |
| } |
| { |
| CACHE_TRY_LOCK(lock, stripe->mutex, mutex->thread_holding); |
| if (!lock.is_locked()) { |
| VC_LOCK_RETRY_EVENT(); |
| } |
| |
| if (_action.cancelled && (!od || !od->has_multiple_writers())) { |
| goto Lcancel; |
| } |
| |
| if (event == AIO_EVENT_DONE) { // vector read done |
| Doc *doc = reinterpret_cast<Doc *>(buf->data()); |
| if (!io.ok()) { |
| err = ECACHE_READ_FAIL; |
| goto Lfailure; |
| } |
| |
| /* INKqa07123. |
| A directory entry which is no longer valid may have been overwritten. |
| We need to start afresh from the beginning by setting last_collision |
| to nullptr. |
| */ |
| if (!stripe->dir_valid(&dir)) { |
| DDbg(dbg_ctl_cache_write, "OpenReadStartDone: Dir not valid: Write Head: %" PRId64 ", Dir: %" PRId64, |
| (int64_t)stripe->offset_to_vol_offset(stripe->directory.header->write_pos), dir_offset(&dir)); |
| last_collision = nullptr; |
| goto Lcollision; |
| } |
| if (!(doc->first_key == first_key)) { |
| goto Lcollision; |
| } |
| |
| if (doc->magic != DOC_MAGIC || !doc->hlen || this->load_http_info(write_vector, doc, buf.object()) != doc->hlen) { |
| err = ECACHE_BAD_META_DATA; |
| goto Lfailure; |
| } |
| ink_assert(write_vector->count() > 0); |
| od->first_dir = dir; |
| first_dir = dir; |
| if (doc->single_fragment()) { |
| // fragment is tied to the vector |
| od->move_resident_alt = true; |
| od->single_doc_key = doc->key; |
| dir_assign(&od->single_doc_dir, &dir); |
| dir_set_tag(&od->single_doc_dir, od->single_doc_key.slice32(2)); |
| } |
| first_buf = buf; |
| goto Lsuccess; |
| } |
| |
| Lcollision: |
| int if_writers = (reinterpret_cast<uintptr_t>(info) == CACHE_ALLOW_MULTIPLE_WRITES); |
| if (!od) { |
| if ((err = stripe->open_write(this, if_writers, cache_config_http_max_alts > 1 ? cache_config_http_max_alts : 0)) > 0) { |
| goto Lfailure; |
| } |
| if (od->has_multiple_writers()) { |
| MUTEX_RELEASE(lock); |
| SET_HANDLER(&CacheVC::openWriteMain); |
| return callcont(CACHE_EVENT_OPEN_WRITE); |
| } |
| } |
| // check for collision |
| if (stripe->directory.probe(&first_key, stripe, &dir, &last_collision)) { |
| od->reading_vec = true; |
| int ret = do_read_call(&first_key); |
| if (ret == EVENT_RETURN) { |
| goto Lcallreturn; |
| } |
| return ret; |
| } |
| if (f.update) { |
| // fail update because vector has been GC'd |
| goto Lfailure; |
| } |
| } |
| Lsuccess: |
| od->reading_vec = false; |
| if (_action.cancelled) { |
| goto Lcancel; |
| } |
| SET_HANDLER(&CacheVC::openWriteMain); |
| return callcont(CACHE_EVENT_OPEN_WRITE); |
| |
| Lfailure: |
| ts::Metrics::Counter::increment(cache_rsb.status[op_type].failure); |
| ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.status[op_type].failure); |
| _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, reinterpret_cast<void *>(-err)); |
| Lcancel: |
| if (od) { |
| od->reading_vec = false; |
| return openWriteCloseDir(event, e); |
| } else { |
| return free_CacheVC(this); |
| } |
| Lcallreturn: |
| return handleEvent(AIO_EVENT_DONE, nullptr); // hopefully a tail call |
| } |
| |
| // handle lock failures from main Cache::open_write entry points below |
| int |
| CacheVC::openWriteStartBegin(int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) |
| { |
| intptr_t err; |
| cancel_trigger(); |
| if (_action.cancelled) { |
| return free_CacheVC(this); |
| } |
| if (((err = stripe->open_write_lock(this, false, 1)) > 0)) { |
| ts::Metrics::Counter::increment(cache_rsb.status[op_type].failure); |
| ts::Metrics::Counter::increment(stripe->cache_vol->vol_rsb.status[op_type].failure); |
| free_CacheVC(this); |
| _action.continuation->handleEvent(CACHE_EVENT_OPEN_WRITE_FAILED, reinterpret_cast<void *>(-err)); |
| return EVENT_DONE; |
| } |
| if (err < 0) { |
| VC_SCHED_LOCK_RETRY(); |
| } |
| if (f.overwrite) { |
| SET_HANDLER(&CacheVC::openWriteOverwrite); |
| return openWriteOverwrite(EVENT_IMMEDIATE, nullptr); |
| } else { |
| // write by key |
| SET_HANDLER(&CacheVC::openWriteMain); |
| return callcont(CACHE_EVENT_OPEN_WRITE); |
| } |
| } |