| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "ExpiryTaskManager.hpp" |
| |
| #include <future> |
| |
| #include "DistributedSystemImpl.hpp" |
| #include "util/Log.hpp" |
| |
| namespace apache { |
| namespace geode { |
| namespace client { |
| |
| ExpiryTaskManager::ExpiryTaskManager() |
| : running_(false), |
| work_guard_(boost::asio::make_work_guard(io_context_)), |
| last_task_id_(0) {} |
| |
| ExpiryTaskManager::~ExpiryTaskManager() noexcept { |
| if (running_) { |
| stop(); |
| } |
| } |
| |
| void ExpiryTaskManager::start() { |
| if (running_) { |
| throw IllegalStateException( |
| "Tried to start ExpiryTaskManager when it was already running"); |
| } |
| |
| std::promise<bool> start_promise; |
| auto start_future = start_promise.get_future(); |
| runner_ = std::thread{[this, &start_promise] { |
| start_promise.set_value(true); |
| Log::setThreadName("NC ETM Thread"); |
| |
| LOGFINE("ExpiryTaskManager thread is running."); |
| io_context_.run(); |
| LOGFINE("ExpiryTaskManager thread has stopped."); |
| }}; |
| |
| running_ = start_future.get(); |
| } |
| |
| void ExpiryTaskManager::stop() { |
| { |
| std::unique_lock<std::mutex> lock(mutex_); |
| |
| if (!running_) { |
| throw IllegalStateException( |
| "Tried to stop ExpiryTaskManager when it was not running"); |
| } |
| |
| LOGDEBUG("Stopping ExpiryTaskManager..."); |
| |
| work_guard_.reset(); |
| running_ = false; |
| |
| cancel_all(); |
| } |
| |
| runner_.join(); |
| } |
| |
| ExpiryTask::id_t ExpiryTaskManager::schedule( |
| std::shared_ptr<ExpiryTask> task, const ExpiryTask::duration_t &delay, |
| const ExpiryTask::duration_t &interval) { |
| std::unique_lock<std::mutex> lock(mutex_); |
| |
| if (!running_) { |
| LOGDEBUG("Tried to add a task while ExpiryTaskManager is not running"); |
| return ExpiryTask::invalid(); |
| } |
| |
| auto task_id = last_task_id_++; |
| if (task_id == ExpiryTask::invalid()) { |
| last_task_id_ = 0; |
| task_id = 0; |
| } |
| |
| task->id(task_id).interval(interval); |
| task_map_.emplace(task_id, std::move(task)).first->second->reset(delay); |
| |
| using apache::geode::internal::chrono::duration::to_string; |
| LOGDEBUG("Task %zu has been scheduled in %s with an interval of %s", task_id, |
| to_string(delay).c_str(), to_string(interval).c_str()); |
| return task_id; |
| } |
| |
| int32_t ExpiryTaskManager::reset(ExpiryTask::id_t task_id, |
| const ExpiryTask::duration_t &delay) { |
| std::unique_lock<std::mutex> lock(mutex_); |
| auto &&iter = task_map_.find(task_id); |
| if (iter == task_map_.end()) { |
| return -1; |
| } |
| |
| auto n = iter->second->reset(delay); |
| return static_cast<int32_t>(n); |
| } |
| |
| int32_t ExpiryTaskManager::cancel(ExpiryTask::id_t task_id) { |
| std::unique_lock<std::mutex> lock(mutex_); |
| auto &&iter = task_map_.find(task_id); |
| if (iter == task_map_.end()) { |
| return -1; |
| } |
| |
| auto n = iter->second->cancel(); |
| task_map_.erase(iter); |
| return static_cast<int32_t>(n); |
| } |
| |
| void ExpiryTaskManager::cancel_all() { |
| for (auto &&iter : task_map_) { |
| iter.second->cancel(); |
| } |
| |
| task_map_.clear(); |
| } |
| |
| void ExpiryTaskManager::remove(ExpiryTask::id_t task_id) { |
| std::unique_lock<std::mutex> lock(mutex_); |
| task_map_.erase(task_id); |
| } |
| } // namespace client |
| } // namespace geode |
| } // namespace apache |