blob: aa058433884f6cf90762313adc2ec5930e12f141 [file] [log] [blame]
/** @file
ConfigReloadTrace — reload progress checker and task timeout detection.
@section license License
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
#include "mgmt/config/ConfigReloadTrace.h"
#include "mgmt/config/ConfigContext.h"
#include "records/RecCore.h"
#include <algorithm>
#include "tsutil/Metrics.h"
#include "tsutil/ts_time_parser.h"
namespace
{
DbgCtl dbg_ctl_config{"config.reload"};
/// Helper to read a time duration from records configuration.
[[nodiscard]] std::chrono::milliseconds
read_time_record(std::string_view record_name, std::string_view default_value, std::chrono::milliseconds fallback,
std::chrono::milliseconds minimum = std::chrono::milliseconds{0})
{
// record_name / default_value are compile-time string_view constants, always null-terminated.
char str[128] = {0};
auto result = RecGetRecordString(record_name.data(), str, sizeof(str));
std::string_view value = (result.has_value() && !result->empty()) ? result.value() : default_value;
auto [duration, errata] = ts::time_parser(value);
if (!errata.is_ok()) {
Dbg(dbg_ctl_config, "Failed to parse '%.*s' value '%.*s': using fallback", static_cast<int>(record_name.size()),
record_name.data(), static_cast<int>(value.size()), value.data());
return fallback;
}
auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(duration);
// Enforce minimum if specified
if (minimum.count() > 0 && ms < minimum) {
Dbg(dbg_ctl_config, "'%.*s' value %lldms below minimum, using %lldms", static_cast<int>(record_name.size()), record_name.data(),
static_cast<long long>(ms.count()), static_cast<long long>(minimum.count()));
return minimum;
}
return ms;
}
} // namespace
std::chrono::milliseconds
ConfigReloadProgress::get_configured_timeout()
{
return read_time_record(RECORD_TIMEOUT, DEFAULT_TIMEOUT, std::chrono::hours{1});
}
std::chrono::milliseconds
ConfigReloadProgress::get_configured_check_interval()
{
return read_time_record(RECORD_CHECK_INTERVAL, DEFAULT_CHECK_INTERVAL, std::chrono::seconds{2},
std::chrono::milliseconds{MIN_CHECK_INTERVAL_MS});
}
ConfigContext
ConfigReloadTask::add_child(std::string_view description)
{
std::unique_lock<std::shared_mutex> lock(_mutex);
// Read token directly - can't call get_token() as it would deadlock (tries to acquire shared_lock on same mutex)
auto trace = std::make_shared<ConfigReloadTask>(_info.token, description, false, shared_from_this());
_info.sub_tasks.push_back(trace);
return ConfigContext{trace, description};
}
ConfigReloadTask &
ConfigReloadTask::log(std::string const &text)
{
std::unique_lock<std::shared_mutex> lock(_mutex);
_info.logs.push_back(text);
return *this;
}
void
ConfigReloadTask::add_sub_task(ConfigReloadTaskPtr sub_task)
{
{
std::unique_lock<std::shared_mutex> lock(_mutex);
Dbg(dbg_ctl_config, "Adding subtask %s to task %s", sub_task->get_description().c_str(), _info.description.c_str());
_info.sub_tasks.push_back(sub_task);
}
// Re-aggregate only if the parent prematurely reached SUCCESS. Record-triggered
// handlers register asynchronously (via config_update_cont, ~3s timer), so a
// new CREATED subtask can arrive after all previously known subtasks completed.
// Guarding on SUCCESS avoids redundant work when the parent is still IN_PROGRESS
// and avoids overwriting a TIMEOUT set by the progress checker.
if (get_state() == State::SUCCESS) {
aggregate_status();
}
}
bool
ConfigReloadTask::has_subtask_for_key(std::string_view key) const
{
std::shared_lock<std::shared_mutex> lock(_mutex);
return std::any_of(_info.sub_tasks.begin(), _info.sub_tasks.end(),
[&key](const auto &t) { return t->_info.config_key == key; }); // config_key is immutable once in sub_tasks
}
ConfigReloadTaskPtr
ConfigReloadTask::find_subtask_by_key(std::string_view key) const
{
std::shared_lock<std::shared_mutex> lock(_mutex);
auto it =
std::find_if(_info.sub_tasks.begin(), _info.sub_tasks.end(), [&key](const auto &t) { return t->_info.config_key == key; });
return it != _info.sub_tasks.end() ? *it : nullptr;
}
void
ConfigReloadTask::set_in_progress()
{
this->set_state_and_notify(State::IN_PROGRESS);
}
void
ConfigReloadTask::set_completed()
{
this->set_state_and_notify(State::SUCCESS);
}
void
ConfigReloadTask::set_failed()
{
this->set_state_and_notify(State::FAIL);
}
/// @note Main-task only. Does not call notify_parent() — do not call on subtasks.
void
ConfigReloadTask::mark_as_bad_state(std::string_view reason)
{
std::unique_lock<std::shared_mutex> lock(_mutex);
// Once a task reaches SUCCESS, FAIL, or TIMEOUT, reject further transitions.
if (is_terminal(_info.state)) {
Warning("ConfigReloadTask '%s': ignoring mark_as_bad_state from %.*s — already terminal.", _info.description.c_str(),
static_cast<int>(state_to_string(_info.state).size()), state_to_string(_info.state).data());
return;
}
_info.state = State::TIMEOUT;
_atomic_last_updated_ms.store(now_ms(), std::memory_order_release);
if (!reason.empty()) {
// Push directly to avoid deadlock (log() would try to acquire same mutex)
_info.logs.emplace_back(reason);
}
}
void
ConfigReloadTask::notify_parent()
{
Dbg(dbg_ctl_config, "parent null =%s , parent main task? %s", _parent ? "false" : "true",
(_parent && _parent->is_main_task()) ? "true" : "false");
if (_parent) {
_parent->aggregate_status();
}
}
void
ConfigReloadTask::set_state_and_notify(State state)
{
{
std::unique_lock<std::shared_mutex> lock(_mutex);
if (_info.state == state) {
return;
}
// Once a task reaches a terminal state, reject further transitions.
if (is_terminal(_info.state)) {
auto const cur_str = state_to_string(_info.state);
auto const new_str = state_to_string(state);
Dbg(dbg_ctl_config, "ConfigReloadTask '%s': ignoring transition from %.*s to %.*s — already terminal.",
_info.description.c_str(), static_cast<int>(cur_str.size()), cur_str.data(), static_cast<int>(new_str.size()),
new_str.data());
return;
}
Dbg(dbg_ctl_config, "State changed to %.*s for task %s", static_cast<int>(state_to_string(state).size()),
state_to_string(state).data(), _info.description.c_str());
_info.state = state;
_atomic_last_updated_ms.store(now_ms(), std::memory_order_release);
}
// Now that the lock is released, we can safely notify the parent.
this->notify_parent();
}
void
ConfigReloadTask::aggregate_status()
{
// Use unique_lock throughout to avoid TOCTOU race and data races
std::unique_lock<std::shared_mutex> lock(_mutex);
if (_info.sub_tasks.empty()) {
// No subtasks - keep current state (don't change to CREATED)
return;
}
bool any_failed = false;
bool any_in_progress = false;
bool all_success = true;
bool all_created = true;
for (const auto &sub_task : _info.sub_tasks) {
State sub_state = sub_task->get_state();
switch (sub_state) {
case State::FAIL:
case State::TIMEOUT: // Treat TIMEOUT as failure
any_failed = true;
all_success = false;
all_created = false;
break;
case State::IN_PROGRESS: // Handle IN_PROGRESS explicitly!
any_in_progress = true;
all_success = false;
all_created = false;
break;
case State::SUCCESS:
all_created = false;
break;
case State::CREATED:
all_success = false;
break;
case State::INVALID:
default:
// Unknown state - treat as not success, not created
all_success = false;
all_created = false;
break;
}
}
// Determine new parent state based on children
// Priority: FAIL/TIMEOUT > IN_PROGRESS > SUCCESS > CREATED
State new_state;
if (any_failed) {
new_state = State::FAIL;
} else if (any_in_progress) {
// If any subtask is still working, parent is IN_PROGRESS
new_state = State::IN_PROGRESS;
} else if (all_success) {
Dbg(dbg_ctl_config, "Setting %s task '%s' to SUCCESS (all subtasks succeeded)", _info.main_task ? "main" : "sub",
_info.description.c_str());
new_state = State::SUCCESS;
} else if (all_created && !_info.main_task) {
Dbg(dbg_ctl_config, "Setting %s task '%s' to CREATED (all subtasks created)", _info.main_task ? "main" : "sub",
_info.description.c_str());
new_state = State::CREATED;
} else {
// Mixed state or main task with created subtasks - keep as IN_PROGRESS
Dbg(dbg_ctl_config, "Setting %s task '%s' to IN_PROGRESS (mixed state)", _info.main_task ? "main" : "sub",
_info.description.c_str());
new_state = State::IN_PROGRESS;
}
// Only update if state actually changed
if (_info.state != new_state) {
_info.state = new_state;
_atomic_last_updated_ms.store(now_ms(), std::memory_order_release);
}
// Release lock before notifying parent to avoid potential deadlock
lock.unlock();
if (_parent) {
_parent->aggregate_status();
}
}
int64_t
ConfigReloadTask::get_last_updated_time_ms() const
{
int64_t last_time_ms = _atomic_last_updated_ms.load(std::memory_order_acquire);
std::shared_lock<std::shared_mutex> lock(_mutex);
for (const auto &sub_task : _info.sub_tasks) {
int64_t sub_time_ms = sub_task->get_own_last_updated_time_ms();
if (sub_time_ms > last_time_ms) {
last_time_ms = sub_time_ms;
}
}
return last_time_ms;
}
std::time_t
ConfigReloadTask::get_last_updated_time() const
{
return static_cast<std::time_t>(
std::chrono::duration_cast<std::chrono::seconds>(std::chrono::milliseconds{get_last_updated_time_ms()}).count());
}
void
ConfigReloadTask::start_progress_checker()
{
std::unique_lock<std::shared_mutex> lock(_mutex);
if (!_reload_progress_checker_started && _info.main_task && _info.state == State::IN_PROGRESS) { // can only start once
auto *checker = new ConfigReloadProgress(shared_from_this());
eventProcessor.schedule_in(checker, HRTIME_MSECONDS(checker->get_check_interval().count()), ET_TASK);
_reload_progress_checker_started = true;
}
}
// reload progress checker
int
ConfigReloadProgress::check_progress(int /* etype */, void * /* data */)
{
Dbg(dbg_ctl_config, "Checking progress for reload task %s - descr: %s", _reload ? _reload->get_token().c_str() : "null",
_reload ? _reload->get_description().c_str() : "null");
if (_reload == nullptr) {
return EVENT_DONE;
}
auto const current_state = _reload->get_state();
if (ConfigReloadTask::is_terminal(current_state)) {
auto const state_str = ConfigReloadTask::state_to_string(current_state);
if (_awaiting_terminal_confirmation) {
// Confirmed terminal — safe to stop.
Dbg(dbg_ctl_config, "Reload task %s confirmed %.*s after grace period, stopping progress check.",
_reload->get_token().c_str(), static_cast<int>(state_str.size()), state_str.data());
return EVENT_DONE;
}
// First observation of terminal state — reschedule once more to confirm
// it isn't pulled back to IN_PROGRESS by a late reserve_subtask().
_awaiting_terminal_confirmation = true;
Dbg(dbg_ctl_config, "Reload task %s reached %.*s, scheduling confirmation in %lldms.", _reload->get_token().c_str(),
static_cast<int>(state_str.size()), state_str.data(), static_cast<long long>(TERMINAL_CONFIRMATION_DELAY.count()));
eventProcessor.schedule_in(this, HRTIME_MSECONDS(TERMINAL_CONFIRMATION_DELAY.count()), ET_TASK);
return EVENT_CONT;
}
// State was pulled back from terminal (e.g., SUCCESS -> IN_PROGRESS) — reset and continue.
_awaiting_terminal_confirmation = false;
// Get configured timeout (read dynamically to allow runtime changes)
// Returns 0ms if disabled (timeout string is "0" or empty)
auto max_running_time = get_configured_timeout();
// Check if timeout is disabled (0ms means disabled)
if (max_running_time.count() == 0) {
Dbg(dbg_ctl_config, "Timeout disabled - task %s will run indefinitely until completion or manual cancellation",
_reload->get_token().c_str());
// Still reschedule to detect completion, but don't timeout
eventProcessor.schedule_in(this, HRTIME_MSECONDS(_every.count()), ET_TASK);
return EVENT_CONT;
}
// ok, it's running, should we keep it running?
auto ct = std::chrono::system_clock::from_time_t(_reload->get_created_time());
auto lut = std::chrono::system_clock::from_time_t(_reload->get_last_updated_time());
std::string buf;
if (lut + max_running_time < std::chrono::system_clock::now()) {
if (_reload->contains_dependents()) {
swoc::bwprint(buf, "Task {} timed out after {}ms with no reload action (no config to reload). Last state: {}",
_reload->get_token(), max_running_time.count(), ConfigReloadTask::state_to_string(current_state));
} else {
swoc::bwprint(buf, "Reload task {} timed out after {}ms. Previous state: {}.", _reload->get_token(), max_running_time.count(),
ConfigReloadTask::state_to_string(current_state));
}
_reload->mark_as_bad_state(buf);
Dbg(dbg_ctl_config, "%s", buf.c_str());
return EVENT_DONE;
}
swoc::bwprint(buf,
"Reload task {} ongoing with state {}, created at {} and last update at {}. Timeout in {}ms. Will check again.",
_reload->get_token(), ConfigReloadTask::state_to_string(current_state),
swoc::bwf::Date(std::chrono::system_clock::to_time_t(ct)),
swoc::bwf::Date(std::chrono::system_clock::to_time_t(lut)), max_running_time.count());
Dbg(dbg_ctl_config, "%s", buf.c_str());
eventProcessor.schedule_in(this, HRTIME_MSECONDS(_every.count()), ET_TASK);
return EVENT_CONT;
}
ConfigReloadProgress::ConfigReloadProgress(ConfigReloadTaskPtr reload)
: Continuation(new_ProxyMutex()), _reload{reload}, _every{get_configured_check_interval()}
{
SET_HANDLER(&ConfigReloadProgress::check_progress);
}