| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| #include "HandlerBase.h" |
| |
| #include "Backoff.h" |
| #include "ClientConnection.h" |
| #include "ClientImpl.h" |
| #include "ExecutorService.h" |
| #include "LogUtils.h" |
| #include "ResultUtils.h" |
| #include "TimeUtils.h" |
| |
| DECLARE_LOG_OBJECT() |
| |
| namespace pulsar { |
| |
| HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic, const Backoff& backoff) |
| : topic_(std::make_shared<std::string>(topic)), |
| client_(client), |
| connectionKeySuffix_(client->getConnectionPool().generateRandomIndex()), |
| executor_(client->getIOExecutorProvider()->get()), |
| mutex_(), |
| creationTimestamp_(TimeUtils::now()), |
| operationTimeut_(std::chrono::seconds(client->conf().getOperationTimeoutSeconds())), |
| state_(NotStarted), |
| backoff_(backoff), |
| epoch_(0), |
| timer_(executor_->createDeadlineTimer()), |
| creationTimer_(executor_->createDeadlineTimer()), |
| reconnectionPending_(false), |
| redirectedClusterURI_("") {} |
| |
| HandlerBase::~HandlerBase() { |
| ASIO_ERROR ignored; |
| timer_->cancel(ignored); |
| creationTimer_->cancel(ignored); |
| } |
| |
| void HandlerBase::start() { |
| // guard against concurrent state changes such as closing |
| State state = NotStarted; |
| if (state_.compare_exchange_strong(state, Pending)) { |
| grabCnx(); |
| } |
| creationTimer_->expires_from_now(operationTimeut_); |
| std::weak_ptr<HandlerBase> weakSelf{shared_from_this()}; |
| creationTimer_->async_wait([this, weakSelf](const ASIO_ERROR& error) { |
| auto self = weakSelf.lock(); |
| if (self && !error) { |
| connectionFailed(ResultTimeout); |
| ASIO_ERROR ignored; |
| timer_->cancel(ignored); |
| } |
| }); |
| } |
| |
| ClientConnectionWeakPtr HandlerBase::getCnx() const { |
| Lock lock(connectionMutex_); |
| return connection_; |
| } |
| |
| void HandlerBase::setCnx(const ClientConnectionPtr& cnx) { |
| Lock lock(connectionMutex_); |
| auto previousCnx = connection_.lock(); |
| if (previousCnx) { |
| beforeConnectionChange(*previousCnx); |
| } |
| connection_ = cnx; |
| } |
| |
| void HandlerBase::grabCnx() { grabCnx(boost::none); } |
| |
| Future<Result, ClientConnectionPtr> HandlerBase::getConnection( |
| const ClientImplPtr& client, const boost::optional<std::string>& assignedBrokerUrl) { |
| if (assignedBrokerUrl && client->getLookupCount() > 0) { |
| return client->connect(getRedirectedClusterURI(), assignedBrokerUrl.get(), connectionKeySuffix_); |
| } else { |
| return client->getConnection(getRedirectedClusterURI(), topic(), connectionKeySuffix_); |
| } |
| } |
| |
| void HandlerBase::grabCnx(const boost::optional<std::string>& assignedBrokerUrl) { |
| bool expectedState = false; |
| if (!reconnectionPending_.compare_exchange_strong(expectedState, true)) { |
| LOG_INFO(getName() << "Ignoring reconnection attempt since there's already a pending reconnection"); |
| return; |
| } |
| |
| if (getCnx().lock()) { |
| LOG_INFO(getName() << "Ignoring reconnection request since we're already connected"); |
| reconnectionPending_ = false; |
| return; |
| } |
| |
| LOG_INFO(getName() << "Getting connection from pool"); |
| ClientImplPtr client = client_.lock(); |
| if (!client) { |
| LOG_WARN(getName() << "Client is invalid when calling grabCnx()"); |
| connectionFailed(ResultAlreadyClosed); |
| reconnectionPending_ = false; |
| return; |
| } |
| auto self = shared_from_this(); |
| auto cnxFuture = getConnection(client, assignedBrokerUrl); |
| cnxFuture.addListener([this, self](Result result, const ClientConnectionPtr& cnx) { |
| if (result == ResultOk) { |
| LOG_DEBUG(getName() << "Connected to broker: " << cnx->cnxString()); |
| connectionOpened(cnx).addListener([this, self](Result result, bool) { |
| // Do not use bool, only Result. |
| reconnectionPending_ = false; |
| if (result != ResultOk && isResultRetryable(result)) { |
| scheduleReconnection(); |
| } |
| }); |
| } else { |
| connectionFailed(result); |
| reconnectionPending_ = false; |
| scheduleReconnection(); |
| } |
| }); |
| } |
| |
| void HandlerBase::handleDisconnection(Result result, const ClientConnectionPtr& cnx) { |
| State state = state_; |
| |
| ClientConnectionPtr currentConnection = getCnx().lock(); |
| if (currentConnection && cnx.get() != currentConnection.get()) { |
| LOG_WARN( |
| getName() << "Ignoring connection closed since we are already attached to a newer connection"); |
| return; |
| } |
| |
| resetCnx(); |
| |
| if (isResultRetryable(result)) { |
| scheduleReconnection(); |
| return; |
| } |
| |
| switch (state) { |
| case Pending: |
| case Ready: |
| scheduleReconnection(); |
| break; |
| |
| case NotStarted: |
| case Closing: |
| case Closed: |
| case Producer_Fenced: |
| case Failed: |
| LOG_DEBUG(getName() << "Ignoring connection closed event since the handler is not used anymore"); |
| break; |
| } |
| } |
| void HandlerBase::scheduleReconnection() { scheduleReconnection(boost::none); } |
| void HandlerBase::scheduleReconnection(const boost::optional<std::string>& assignedBrokerUrl) { |
| const auto state = state_.load(); |
| |
| if (state == Pending || state == Ready) { |
| TimeDuration delay = assignedBrokerUrl ? std::chrono::milliseconds(0) : backoff_.next(); |
| |
| LOG_INFO(getName() << "Schedule reconnection in " << (toMillis(delay) / 1000.0) << " s"); |
| timer_->expires_from_now(delay); |
| // passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled |
| // so we will not run into the case where grabCnx is invoked on out of scope handler |
| auto name = getName(); |
| std::weak_ptr<HandlerBase> weakSelf{shared_from_this()}; |
| timer_->async_wait([name, weakSelf, assignedBrokerUrl](const ASIO_ERROR& ec) { |
| auto self = weakSelf.lock(); |
| if (self) { |
| self->handleTimeout(ec, assignedBrokerUrl); |
| } else { |
| LOG_WARN(name << "Cancel the reconnection since the handler is destroyed"); |
| } |
| }); |
| } |
| } |
| |
| void HandlerBase::handleTimeout(const ASIO_ERROR& ec, const boost::optional<std::string>& assignedBrokerUrl) { |
| if (ec) { |
| LOG_DEBUG(getName() << "Ignoring timer cancelled event, code[" << ec << "]"); |
| return; |
| } else { |
| epoch_++; |
| grabCnx(assignedBrokerUrl); |
| } |
| } |
| |
| Result HandlerBase::convertToTimeoutIfNecessary(Result result, ptime startTimestamp) const { |
| if (isResultRetryable(result) && (TimeUtils::now() - startTimestamp >= operationTimeut_)) { |
| return ResultTimeout; |
| } else { |
| return result; |
| } |
| } |
| |
| void HandlerBase::setRedirectedClusterURI(const std::string& serviceUrl) { |
| Lock lock(mutex_); |
| redirectedClusterURI_ = serviceUrl; |
| } |
| const std::string& HandlerBase::getRedirectedClusterURI() { |
| Lock lock(mutex_); |
| return redirectedClusterURI_; |
| } |
| |
| } // namespace pulsar |