| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "utils/metrics.h" |
| |
| #include <absl/strings/string_view.h> |
| #include <boost/algorithm/string/join.hpp> |
| #include <boost/asio/basic_deadline_timer.hpp> |
| #include <boost/date_time/posix_time/posix_time_duration.hpp> |
| #include <boost/system/error_code.hpp> |
| #include <fmt/core.h> |
| #include <unistd.h> |
| #include <new> |
| |
| #include "http/http_method.h" |
| #include "http/http_status_code.h" |
| #include "runtime/api_layer1.h" |
| #include "runtime/rpc/rpc_address.h" |
| #include "runtime/rpc/rpc_engine.h" |
| #include "runtime/service_app.h" |
| #include "runtime/service_engine.h" |
| #include "runtime/task/task.h" |
| #include "utils/flags.h" |
| #include "utils/rand.h" |
| #include "utils/shared_io_service.h" |
| #include "utils/string_conv.h" |
| #include "utils/strings.h" |
| |
| DSN_DEFINE_uint64(metrics, |
| entity_retirement_delay_ms, |
| 10 * 60 * 1000, |
| "The retention interval (milliseconds) for an entity after it becomes stale."); |
| |
| DSN_DECLARE_string(cluster_name); |
| |
| METRIC_DEFINE_entity(server); |
| |
| dsn::metric_entity_ptr server_metric_entity() |
| { |
| static auto entity = METRIC_ENTITY_server.instantiate("server"); |
| return entity; |
| } |
| |
| namespace dsn { |
| |
| metric_entity::metric_entity(const metric_entity_prototype *prototype, |
| const std::string &id, |
| const attr_map &attrs) |
| : _prototype(prototype), _id(id), _attrs(attrs), _retire_time_ms(0) |
| { |
| } |
| |
| metric_entity::~metric_entity() |
| { |
| // We have to wait for all of close operations to be finished. Waiting for close operations to |
| // be finished in the destructor of each metirc may lead to memory leak detected in ASAN test |
| // for dsn_utils_test, since the percentile is also referenced by shared_io_service which is |
| // still alive without being destructed after ASAN test for dsn_utils_test is finished. |
| close(close_option::kWait); |
| } |
| |
| void metric_entity::close(close_option option) |
| { |
| utils::auto_write_lock l(_lock); |
| |
| // To close all metrics owned by an entity, it's more efficient to firstly issue an asynchronous |
| // close request to each metric; then, just wait for all of the close operations to be finished. |
| // It's inefficient to wait for each metric to be closed one by one. Therefore, the metric is |
| // not closed in its destructor. |
| for (auto &m : _metrics) { |
| if (m.second->prototype()->type() == metric_type::kPercentile) { |
| auto p = down_cast<closeable_metric *>(m.second.get()); |
| p->close(); |
| } |
| } |
| |
| if (option == close_option::kNoWait) { |
| return; |
| } |
| |
| // Wait for all of the close operations to be finished. |
| for (auto &m : _metrics) { |
| if (m.second->prototype()->type() == metric_type::kPercentile) { |
| auto p = down_cast<closeable_metric *>(m.second.get()); |
| p->wait(); |
| } |
| } |
| } |
| |
| metric_entity::attr_map metric_entity::attributes() const |
| { |
| utils::auto_read_lock l(_lock); |
| return _attrs; |
| } |
| |
| metric_entity::metric_map metric_entity::metrics() const |
| { |
| utils::auto_read_lock l(_lock); |
| return _metrics; |
| } |
| |
| void metric_entity::set_attributes(const attr_map &attrs) |
| { |
| utils::auto_write_lock l(_lock); |
| _attrs = attrs; |
| } |
| |
| void metric_entity::encode_type(metric_json_writer &writer) const |
| { |
| writer.Key(kMetricEntityTypeField.c_str()); |
| json::json_encode(writer, _prototype->name()); |
| } |
| |
| void metric_entity::encode_id(metric_json_writer &writer) const |
| { |
| writer.Key(kMetricEntityIdField.c_str()); |
| json::json_encode(writer, _id); |
| } |
| |
| namespace { |
| |
| void encode_attrs(dsn::metric_json_writer &writer, const dsn::metric_entity::attr_map &attrs) |
| { |
| // Empty attributes are allowed and will just be encoded as {}. |
| |
| writer.Key(dsn::kMetricEntityAttrsField.c_str()); |
| |
| writer.StartObject(); |
| for (const auto &attr : attrs) { |
| writer.Key(attr.first.c_str()); |
| dsn::json::json_encode(writer, attr.second); |
| } |
| writer.EndObject(); |
| } |
| |
| void encode_metrics(dsn::metric_json_writer &writer, |
| const dsn::metric_entity::metric_map &metrics, |
| const dsn::metric_filters &filters) |
| { |
| // We shouldn't reach here if no metric is chosen, thus just mark an assertion. |
| CHECK(!metrics.empty(), |
| "this entity should not be encoded into the response since no metric is chosen"); |
| |
| writer.Key(dsn::kMetricEntityMetricsField.c_str()); |
| |
| writer.StartArray(); |
| for (const auto &m : metrics) { |
| m.second->take_snapshot(writer, filters); |
| } |
| writer.EndArray(); |
| } |
| |
| } // anonymous namespace |
| |
| void metric_entity::take_snapshot(metric_json_writer &writer, const metric_filters &filters) const |
| { |
| if (!filters.match_entity_type(_prototype->name())) { |
| return; |
| } |
| |
| if (!filters.match_entity_id(_id)) { |
| return; |
| } |
| |
| attr_map my_attrs; |
| metric_map target_metrics; |
| |
| { |
| utils::auto_read_lock l(_lock); |
| |
| if (!filters.match_entity_attrs(_attrs)) { |
| return; |
| } |
| |
| filters.extract_entity_metrics(_metrics, target_metrics); |
| if (target_metrics.empty()) { |
| // None of metrics is chosen, there is no need to take snapshot for |
| // this entity. |
| return; |
| } |
| |
| my_attrs = _attrs; |
| } |
| |
| // At least one metric of this entity has been chosen, thus take snapshot and encode |
| // this entity as json format. |
| writer.StartObject(); |
| encode_type(writer); |
| encode_id(writer); |
| encode_attrs(writer, my_attrs); |
| encode_metrics(writer, target_metrics, filters); |
| writer.EndObject(); |
| } |
| |
| bool metric_entity::is_stale() const |
| { |
| // Since this entity itself is still being accessed, its reference count should be 1 |
| // at least. |
| CHECK_GE(get_count(), 1); |
| |
| // This entity is considered stale once there is only one reference for it kept in the |
| // registry. |
| return get_count() == 1; |
| } |
| |
| void metric_filters::extract_entity_metrics(const metric_entity::metric_map &candidates, |
| metric_entity::metric_map &target_metrics) const |
| { |
| if (entity_metrics.empty()) { |
| target_metrics = candidates; |
| return; |
| } |
| |
| target_metrics.clear(); |
| for (const auto &candidate : candidates) { |
| if (match(candidate.first->name().data(), entity_metrics)) { |
| target_metrics.emplace(candidate.first, candidate.second); |
| } |
| } |
| } |
| |
| std::string metric_filters::to_query_string() const |
| { |
| #define COMBINE_FIELD_PAIR(name, container) \ |
| do { \ |
| if (container.empty()) { \ |
| break; \ |
| } \ |
| \ |
| std::string pair(#name); \ |
| pair += '='; \ |
| pair += boost::join(container, ","); \ |
| fields.push_back(std::move(pair)); \ |
| } while (0) |
| |
| std::vector<std::string> fields; |
| COMBINE_FIELD_PAIR(with_metric_fields, with_metric_fields); |
| COMBINE_FIELD_PAIR(types, entity_types); |
| COMBINE_FIELD_PAIR(ids, entity_ids); |
| COMBINE_FIELD_PAIR(attributes, entity_attrs); |
| COMBINE_FIELD_PAIR(metrics, entity_metrics); |
| |
| #undef COMBINE_FIELD_PAIR |
| |
| return boost::join(fields, "&"); |
| } |
| |
| metric_entity_ptr metric_entity_prototype::instantiate(const std::string &id, |
| const metric_entity::attr_map &attrs) const |
| { |
| return metric_registry::instance().find_or_create_entity(this, id, attrs); |
| } |
| |
| metric_entity_ptr metric_entity_prototype::instantiate(const std::string &id) const |
| { |
| return instantiate(id, {}); |
| } |
| |
| metric_entity_prototype::metric_entity_prototype(const char *name) : _name(name) {} |
| |
| metric_entity_prototype::~metric_entity_prototype() {} |
| |
| const std::string metrics_http_service::kMetricsRootPath(""); |
| const std::string metrics_http_service::kMetricsQuerySubPath("metrics"); |
| const std::string |
| metrics_http_service::kMetricsQueryPath('/' + metrics_http_service::kMetricsQuerySubPath); |
| |
| metrics_http_service::metrics_http_service(metric_registry *registry) : _registry(registry) |
| { |
| register_handler(kMetricsQuerySubPath, |
| std::bind(&metrics_http_service::get_metrics_handler, |
| this, |
| std::placeholders::_1, |
| std::placeholders::_2), |
| fmt::format("ip:port{}", kMetricsQueryPath)); |
| } |
| |
| namespace { |
| |
| template <typename Container> |
| void parse_as(const std::string &field_value, Container &container) |
| { |
| utils::split_args(field_value.c_str(), container, ','); |
| } |
| |
| inline void encode_error(dsn::metric_json_writer &writer, const char *error_message) |
| { |
| writer.StartObject(); |
| writer.Key("error_message"); |
| dsn::json::json_encode(writer, error_message); |
| writer.EndObject(); |
| } |
| |
| inline std::string encode_error_as_json(const char *error_message) |
| { |
| return encode_as_json( |
| [error_message](metric_json_writer &writer) { encode_error(writer, error_message); }); |
| } |
| |
| dsn::metric_filters::metric_fields_type get_brief_metric_fields() |
| { |
| dsn::metric_filters::metric_fields_type fields = {kMetricNameField, kMetricSingleValueField}; |
| for (const auto &kth : kAllKthPercentiles) { |
| fields.insert(kth.name); |
| } |
| return fields; |
| } |
| |
| const dsn::metric_filters::metric_fields_type kBriefMetricFields = get_brief_metric_fields(); |
| |
| } // anonymous namespace |
| |
| void metrics_http_service::get_metrics_handler(const http_request &req, http_response &resp) |
| { |
| if (req.method != http_method::GET) { |
| resp.body = encode_error_as_json("please use 'GET' method while querying for metrics"); |
| resp.status_code = http_status_code::kBadRequest; |
| return; |
| } |
| |
| metric_filters filters; |
| bool with_metric_fields = false; |
| bool detail = false; |
| for (const auto &field : req.query_args) { |
| if (field.first == "with_metric_fields") { |
| parse_as(field.second, filters.with_metric_fields); |
| with_metric_fields = true; |
| } else if (field.first == "types") { |
| parse_as(field.second, filters.entity_types); |
| } else if (field.first == "ids") { |
| parse_as(field.second, filters.entity_ids); |
| } else if (field.first == "attributes") { |
| parse_as(field.second, filters.entity_attrs); |
| if ((filters.entity_attrs.size() & 1) != 0) { |
| resp.body = |
| encode_error_as_json("the number of arguments for attributes should be even, " |
| "since each attribute name always pairs with a value"); |
| resp.status_code = http_status_code::kBadRequest; |
| return; |
| } |
| } else if (field.first == "metrics") { |
| parse_as(field.second, filters.entity_metrics); |
| } else if (field.first == "detail") { |
| if (!buf2bool(field.second, detail)) { |
| resp.body = encode_error_as_json("the value of detail should be a boolean value, " |
| "i.e. true or false"); |
| resp.status_code = http_status_code::kBadRequest; |
| return; |
| } |
| } else { |
| auto error_message = fmt::format("unknown field {}={}", field.first, field.second); |
| resp.body = encode_error_as_json(error_message.c_str()); |
| resp.status_code = http_status_code::kBadRequest; |
| return; |
| } |
| } |
| |
| // `with_metric_fields` takes precedence over `detail`: once `with_metric_fields` is |
| // specified, it will be considered firstly. |
| if (!with_metric_fields && !detail) { |
| filters.with_metric_fields = kBriefMetricFields; |
| } |
| |
| resp.body = take_snapshot_as_json(_registry, filters); |
| resp.status_code = http_status_code::kOk; |
| } |
| |
| metric_registry::metric_registry() : _http_service(this) |
| { |
| // We should ensure that metric_registry is destructed before shared_io_service is destructed. |
| // Once shared_io_service is destructed before metric_registry is destructed, |
| // boost::asio::io_service needed by metrics in metric_registry such as metric_timer will |
| // be released firstly, then will lead to heap-use-after-free error since percentiles in |
| // metric_registry are still running but the resources they needed have been released. |
| tools::shared_io_service::instance(); |
| |
| start_timer(); |
| } |
| |
| metric_registry::~metric_registry() |
| { |
| utils::auto_write_lock l(_lock); |
| |
| // Once the registery is chosen to be destructed, all of the entities and metrics owned by it |
| // will no longer be needed. |
| // |
| // The reason why each entity is closed in the registery rather than in the destructor of each |
| // entity is that close(kNoWait) for the entity will return immediately without waiting for any |
| // close operation to be finished. |
| // |
| // Thus, to close all entities owned by a registery, it's more efficient to firstly issue a |
| // close request for all entities; then, just wait for all of the close operations to be |
| // finished in the destructor of each entity. It's inefficient to wait for each entity to be |
| // closed one by one. |
| for (auto &entity : _entities) { |
| entity.second->close(metric_entity::close_option::kNoWait); |
| } |
| |
| stop_timer(); |
| } |
| |
| void metric_registry::on_close() {} |
| |
| void metric_registry::start_timer() |
| { |
| if (_timer) { |
| return; |
| } |
| |
| // Once an entity is considered stale, it will be retired after the retention interval, |
| // namely FLAGS_entity_retirement_delay_ms milliseconds. Therefore, if the interval of |
| // the timer is also set to FLAGS_entity_retirement_delay_ms, in the next round, it's |
| // just about time to retire this entity. |
| _timer.reset(new metric_timer(FLAGS_entity_retirement_delay_ms, |
| std::bind(&metric_registry::process_stale_entities, this), |
| std::bind(&metric_registry::on_close, this))); |
| } |
| |
| void metric_registry::stop_timer() |
| { |
| if (!_timer) { |
| return; |
| } |
| |
| // Close the timer synchronously. |
| _timer->close(); |
| _timer->wait(); |
| |
| // Reset the timer to mark that it has been stopped, now it could be started. |
| _timer.reset(); |
| } |
| |
| metric_registry::entity_map metric_registry::entities() const |
| { |
| utils::auto_read_lock l(_lock); |
| return _entities; |
| } |
| |
| metric_entity_ptr metric_registry::find_or_create_entity(const metric_entity_prototype *prototype, |
| const std::string &id, |
| const metric_entity::attr_map &attrs) |
| { |
| utils::auto_write_lock l(_lock); |
| |
| entity_map::const_iterator iter = _entities.find(id); |
| |
| metric_entity_ptr entity; |
| if (iter == _entities.end()) { |
| entity = new metric_entity(prototype, id, attrs); |
| _entities[id] = entity; |
| } else { |
| CHECK_STREQ_MSG( |
| prototype->name(), |
| iter->second->prototype()->name(), |
| "new prototype '{}' is inconsistent with old prototype '{}' for entity '{}'", |
| prototype->name(), |
| iter->second->prototype()->name(), |
| id); |
| |
| iter->second->set_attributes(attrs); |
| entity = iter->second; |
| } |
| |
| return entity; |
| } |
| |
| namespace { |
| |
| #define ENCODE_OBJ_VAL(cond, val) \ |
| do { \ |
| if (dsn_likely(cond)) { \ |
| dsn::json::json_encode(writer, val); \ |
| } else { \ |
| dsn::json::json_encode(writer, "unknown"); \ |
| } \ |
| } while (0) |
| |
| void encode_cluster(dsn::metric_json_writer &writer) |
| { |
| writer.Key(dsn::kMetricClusterField.c_str()); |
| |
| ENCODE_OBJ_VAL(!utils::is_empty(FLAGS_cluster_name), FLAGS_cluster_name); |
| } |
| |
| void encode_role(dsn::metric_json_writer &writer) |
| { |
| writer.Key(dsn::kMetricRoleField.c_str()); |
| |
| const auto *const node = dsn::task::get_current_node2(); |
| ENCODE_OBJ_VAL(node != nullptr, node->get_service_app_info().full_name); |
| } |
| |
| void encode_host(dsn::metric_json_writer &writer) |
| { |
| writer.Key(dsn::kMetricHostField.c_str()); |
| |
| char hostname[1024]; |
| ENCODE_OBJ_VAL(gethostname(hostname, sizeof(hostname)) == 0, hostname); |
| } |
| |
| void encode_port(dsn::metric_json_writer &writer) |
| { |
| writer.Key(dsn::kMetricPortField.c_str()); |
| |
| const auto *const rpc = dsn::task::get_current_rpc2(); |
| ENCODE_OBJ_VAL(rpc != nullptr, rpc->primary_address().port()); |
| } |
| |
| void encode_timestamp_ns(dsn::metric_json_writer &writer) |
| { |
| writer.Key(dsn::kMetricTimestampNsField.c_str()); |
| |
| ENCODE_OBJ_VAL(true, dsn_now_ns()); |
| } |
| |
| #undef ENCODE_OBJ_VAL |
| |
| } // anonymous namespace |
| |
| void metric_registry::encode_entities(metric_json_writer &writer, |
| const metric_filters &filters) const |
| { |
| writer.Key(dsn::kMetricEntitiesField.c_str()); |
| |
| writer.StartArray(); |
| |
| { |
| utils::auto_read_lock l(_lock); |
| |
| for (const auto &entity : _entities) { |
| entity.second->take_snapshot(writer, filters); |
| } |
| } |
| |
| writer.EndArray(); |
| } |
| |
| void metric_registry::take_snapshot(metric_json_writer &writer, const metric_filters &filters) const |
| { |
| writer.StartObject(); |
| encode_cluster(writer); |
| encode_role(writer); |
| encode_host(writer); |
| encode_port(writer); |
| encode_timestamp_ns(writer); |
| encode_entities(writer, filters); |
| writer.EndObject(); |
| } |
| |
| metric_registry::collected_entities_info metric_registry::collect_stale_entities() const |
| { |
| collected_entities_info collected_info; |
| |
| const auto now = dsn_now_ms(); |
| |
| utils::auto_read_lock l(_lock); |
| |
| for (const auto &entity : _entities) { |
| if (!entity.second->is_stale()) { |
| if (entity.second->_retire_time_ms > 0) { |
| // This entity had been scheduled to be retired. However, it was reemployed |
| // after that. It has been in use since then, therefore its scheduled time |
| // for retirement should be reset to 0. |
| collected_info.collected_entities.insert(entity.first); |
| } |
| continue; |
| } |
| |
| if (entity.second->_retire_time_ms > now) { |
| // This entity has been scheduled to be retired, however it is still within |
| // the retention interval. Thus do not collect it. |
| ++collected_info.num_scheduled_entities; |
| continue; |
| } |
| |
| collected_info.collected_entities.insert(entity.first); |
| } |
| |
| collected_info.num_all_entities = _entities.size(); |
| return collected_info; |
| } |
| |
| metric_registry::retired_entities_stat |
| metric_registry::retire_stale_entities(const collected_entity_list &collected_entities) |
| { |
| if (collected_entities.empty()) { |
| // Do not lock for empty list. |
| return retired_entities_stat(); |
| } |
| |
| retired_entities_stat retired_stat; |
| |
| const auto now = dsn_now_ms(); |
| |
| utils::auto_write_lock l(_lock); |
| |
| for (const auto &collected_entity : collected_entities) { |
| auto iter = _entities.find(collected_entity); |
| if (dsn_unlikely(iter == _entities.end())) { |
| // The entity has been removed from the registry for some unusual reason. |
| continue; |
| } |
| |
| if (!iter->second->is_stale()) { |
| if (iter->second->_retire_time_ms > 0) { |
| // For those entities which are reemployed, their scheduled time for retirement |
| // should be reset to 0 though previously they could have been scheduled to be |
| // retired. |
| iter->second->_retire_time_ms = 0; |
| ++retired_stat.num_reemployed_entities; |
| } |
| continue; |
| } |
| |
| if (dsn_unlikely(iter->second->_retire_time_ms > now)) { |
| // Since in collect_stale_entities() we've filtered the metrics which have been |
| // outside the retention interval, this is unlikely to happen. However, we still |
| // check here. |
| continue; |
| } |
| |
| if (iter->second->_retire_time_ms == 0) { |
| // The entity should be marked with a scheduled time for retirement, since it has |
| // already been considered stale. |
| iter->second->_retire_time_ms = now + FLAGS_entity_retirement_delay_ms; |
| ++retired_stat.num_recently_scheduled_entities; |
| continue; |
| } |
| |
| // Once the entity is outside the retention interval, retire it from the registry. |
| _entities.erase(iter); |
| ++retired_stat.num_retired_entities; |
| } |
| |
| return retired_stat; |
| } |
| |
| void metric_registry::process_stale_entities() |
| { |
| LOG_INFO("begin to process stale metric entities"); |
| |
| const auto &collected_info = collect_stale_entities(); |
| const auto &retired_stat = retire_stale_entities(collected_info.collected_entities); |
| |
| LOG_INFO("stat for metric entities: total={}, collected={}, retired={}, scheduled={}, " |
| "recently_scheduled={}, reemployed={}", |
| collected_info.num_all_entities, |
| collected_info.collected_entities.size(), |
| retired_stat.num_retired_entities, |
| collected_info.num_scheduled_entities, |
| retired_stat.num_recently_scheduled_entities, |
| retired_stat.num_reemployed_entities); |
| } |
| |
| metric_prototype::metric_prototype(const ctor_args &args) : _args(args) {} |
| |
| metric_prototype::~metric_prototype() {} |
| |
| metric::metric(const metric_prototype *prototype) : _prototype(prototype) {} |
| |
| closeable_metric::closeable_metric(const metric_prototype *prototype) : metric(prototype) {} |
| |
| uint64_t metric_timer::generate_initial_delay_ms(uint64_t interval_ms) |
| { |
| CHECK_GT(interval_ms, 0); |
| |
| if (interval_ms < 1000) { |
| return rand::next_u64() % interval_ms + 50; |
| } |
| |
| uint64_t interval_seconds = interval_ms / 1000; |
| return (rand::next_u64() % interval_seconds + 1) * 1000 + rand::next_u64() % 1000; |
| } |
| |
| metric_timer::metric_timer(uint64_t interval_ms, on_exec_fn on_exec, on_close_fn on_close) |
| : _initial_delay_ms(generate_initial_delay_ms(interval_ms)), |
| _interval_ms(interval_ms), |
| _on_exec(on_exec), |
| _on_close(on_close), |
| _state(state::kRunning), |
| _completed(), |
| _timer(new boost::asio::deadline_timer(tools::shared_io_service::instance().ios)) |
| { |
| _timer->expires_from_now(boost::posix_time::milliseconds(_initial_delay_ms)); |
| _timer->async_wait(std::bind(&metric_timer::on_timer, this, std::placeholders::_1)); |
| } |
| |
| void metric_timer::close() |
| { |
| // If the timer has already expired when cancel() is called, then the handlers for asynchronous |
| // wait operations will: |
| // * have already been invoked; or |
| // * have been queued for invocation in the near future. |
| // |
| // These handlers can no longer be cancelled, and therefore are passed an error code that |
| // indicates the successful completion of the wait operation. Thus set the state of timer to |
| // kClosing to tell on_timer() that the timer should be closed even if it is not called with |
| // operation_canceled. |
| auto expected_state = state::kRunning; |
| if (_state.compare_exchange_strong(expected_state, state::kClosing)) { |
| _timer->cancel(); |
| } |
| } |
| |
| void metric_timer::wait() { _completed.wait(); } |
| |
| void metric_timer::on_close() |
| { |
| _on_close(); |
| _completed.notify(); |
| } |
| |
| void metric_timer::on_timer(const boost::system::error_code &ec) |
| { |
| // This macro is defined for the case that handlers for asynchronous wait operations are no |
| // longer cancelled. It just checks the internal state atomically (since close() can also be |
| // called simultaneously) for kClosing; once it's matched, it will stop the timer by not executing |
| // any future handler. |
| #define TRY_PROCESS_TIMER_CLOSING() \ |
| do { \ |
| auto expected_state = state::kClosing; \ |
| if (_state.compare_exchange_strong(expected_state, state::kClosed)) { \ |
| on_close(); \ |
| return; \ |
| } \ |
| } while (0) |
| |
| if (dsn_unlikely(!!ec)) { |
| CHECK_EQ_MSG(static_cast<int>(boost::system::errc::operation_canceled), |
| ec.value(), |
| "failed to exec on_timer with an error that cannot be handled: {}", |
| ec.message()); |
| |
| // Cancel can only be launched by close(). |
| auto expected_state = state::kClosing; |
| CHECK(_state.compare_exchange_strong(expected_state, state::kClosed), |
| "wrong state for metric_timer: {}, while expecting closing state", |
| static_cast<int>(expected_state)); |
| on_close(); |
| |
| return; |
| } |
| |
| TRY_PROCESS_TIMER_CLOSING(); |
| _on_exec(); |
| |
| TRY_PROCESS_TIMER_CLOSING(); |
| _timer->expires_from_now(boost::posix_time::milliseconds(_interval_ms)); |
| _timer->async_wait(std::bind(&metric_timer::on_timer, this, std::placeholders::_1)); |
| #undef TRY_PROCESS_TIMER_CLOSING |
| } |
| |
| } // namespace dsn |