| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| #pragma once |
| |
| #include <pulsar/Result.h> |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <chrono> |
| #include <functional> |
| #include <memory> |
| |
| #include "Backoff.h" |
| #include "ExecutorService.h" |
| #include "Future.h" |
| #include "LogUtils.h" |
| #include "ResultUtils.h" |
| #include "TimeUtils.h" |
| |
| namespace pulsar { |
| |
| template <typename T> |
| class RetryableOperation : public std::enable_shared_from_this<RetryableOperation<T>> { |
| struct PassKey { |
| explicit PassKey() {} |
| }; |
| |
| RetryableOperation(const std::string& name, std::function<Future<Result, T>()>&& func, |
| TimeDuration timeout, DeadlineTimerPtr timer) |
| : name_(name), |
| func_(std::move(func)), |
| timeout_(timeout), |
| backoff_(std::chrono::milliseconds(100), timeout_ + timeout_, std::chrono::milliseconds(0)), |
| timer_(timer) {} |
| |
| public: |
| template <typename... Args> |
| explicit RetryableOperation(PassKey, Args&&... args) : RetryableOperation(std::forward<Args>(args)...) {} |
| |
| template <typename... Args> |
| static std::shared_ptr<RetryableOperation<T>> create(Args&&... args) { |
| return std::make_shared<RetryableOperation<T>>(PassKey{}, std::forward<Args>(args)...); |
| } |
| |
| Future<Result, T> run() { |
| bool expected = false; |
| if (!started_.compare_exchange_strong(expected, true)) { |
| return promise_.getFuture(); |
| } |
| return runImpl(timeout_); |
| } |
| |
| void cancel() { |
| promise_.setFailed(ResultDisconnected); |
| ASIO_ERROR ec; |
| timer_->cancel(ec); |
| } |
| |
| private: |
| const std::string name_; |
| std::function<Future<Result, T>()> func_; |
| const TimeDuration timeout_; |
| Backoff backoff_; |
| Promise<Result, T> promise_; |
| std::atomic_bool started_{false}; |
| DeadlineTimerPtr timer_; |
| |
| // Fix the "declared with greater visibility" error for GCC <= 7 |
| #ifdef __GNUC__ |
| __attribute__((visibility("hidden"))) |
| #endif |
| Future<Result, T> |
| runImpl(TimeDuration remainingTime) { |
| std::weak_ptr<RetryableOperation<T>> weakSelf{this->shared_from_this()}; |
| func_().addListener([this, weakSelf, remainingTime](Result result, const T& value) { |
| auto self = weakSelf.lock(); |
| if (!self) { |
| return; |
| } |
| if (result == ResultOk) { |
| promise_.setValue(value); |
| return; |
| } |
| if (!isResultRetryable(result)) { |
| promise_.setFailed(result); |
| return; |
| } |
| if (toMillis(remainingTime) <= 0) { |
| promise_.setFailed(ResultTimeout); |
| return; |
| } |
| |
| auto delay = std::min(backoff_.next(), remainingTime); |
| timer_->expires_from_now(delay); |
| |
| auto nextRemainingTime = remainingTime - delay; |
| LOG_INFO("Reschedule " << name_ << " for " << toMillis(delay) |
| << " ms, remaining time: " << toMillis(nextRemainingTime) << " ms"); |
| timer_->async_wait([this, weakSelf, nextRemainingTime](const ASIO_ERROR& ec) { |
| auto self = weakSelf.lock(); |
| if (!self) { |
| return; |
| } |
| if (ec) { |
| if (ec == ASIO::error::operation_aborted) { |
| LOG_DEBUG("Timer for " << name_ << " is cancelled"); |
| promise_.setFailed(ResultTimeout); |
| } else { |
| LOG_WARN("Timer for " << name_ << " failed: " << ec.message()); |
| } |
| } else { |
| LOG_DEBUG("Run operation " << name_ << ", remaining time: " << toMillis(nextRemainingTime) |
| << " ms"); |
| runImpl(nextRemainingTime); |
| } |
| }); |
| }); |
| return promise_.getFuture(); |
| } |
| |
| DECLARE_LOG_OBJECT() |
| }; |
| |
| } // namespace pulsar |