| /** |
| * Copyright 2016 Yahoo Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "HandlerBase.h" |
| #include <boost/bind.hpp> |
| |
| #include <cassert> |
| |
| #include "LogUtils.h" |
| |
| DECLARE_LOG_OBJECT() |
| |
| namespace pulsar { |
| |
| HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic) |
| : client_(client), |
| topic_(topic), |
| connection_(), |
| mutex_(), |
| creationTimestamp_(now()), |
| operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())), |
| state_(Pending), |
| backoff_(milliseconds(100), seconds(60)), |
| timer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()) { |
| } |
| |
| HandlerBase::~HandlerBase() { |
| timer_->cancel(); |
| } |
| |
| void HandlerBase::start() { |
| grabCnx(); |
| } |
| |
| void HandlerBase::grabCnx() { |
| if (connection_.lock()) { |
| LOG_INFO(getName() << "Ignoring reconnection request since we're already connected"); |
| return; |
| } |
| |
| LOG_INFO(getName() << "Getting connection from pool"); |
| ClientImplPtr client = client_.lock(); |
| Future<Result, ClientConnectionWeakPtr> future = client->getConnection(topic_); |
| future.addListener( |
| boost::bind(&HandlerBase::handleNewConnection, _1, _2, get_weak_from_this())); |
| } |
| |
| void HandlerBase::handleNewConnection(Result result, ClientConnectionWeakPtr connection, |
| HandlerBaseWeakPtr weakHandler) { |
| HandlerBasePtr handler = weakHandler.lock(); |
| if (!handler) { |
| LOG_DEBUG("HandlerBase Weak reference is not valid anymore"); |
| return; |
| } |
| if (result == ResultOk) { |
| ClientConnectionPtr conn = connection.lock(); |
| if (conn) { |
| LOG_DEBUG(handler->getName() << "Connected to broker: " << conn->cnxString()); |
| handler->connectionOpened(conn); |
| return; |
| } |
| // TODO - look deeper into why the connection is null while the result is ResultOk |
| LOG_INFO(handler->getName() << "ClientConnectionPtr is no longer valid"); |
| } |
| handler->connectionFailed(result); |
| scheduleReconnection(handler); |
| } |
| |
| void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr connection, |
| HandlerBaseWeakPtr weakHandler) { |
| HandlerBasePtr handler = weakHandler.lock(); |
| if (!handler) { |
| LOG_DEBUG("HandlerBase Weak reference is not valid anymore"); |
| return; |
| } |
| |
| Lock lock(handler->mutex_); |
| State state = handler->state_; |
| |
| ClientConnectionPtr currentConnection = handler->connection_.lock(); |
| if (currentConnection && connection.lock().get() != currentConnection.get()) { |
| LOG_WARN( |
| handler->getName() << "Ignoring connection closed since we are already attached to a newer connection"); |
| return; |
| } |
| |
| if (currentConnection) { |
| currentConnection.reset(); |
| } |
| |
| switch (state) { |
| case Pending: |
| case Ready: |
| scheduleReconnection(handler); |
| break; |
| |
| case Closing: |
| case Closed: |
| case Failed: |
| LOG_DEBUG(handler->getName() << |
| "Ignoring connection closed event since the handler is not used anymore"); |
| break; |
| } |
| } |
| |
| bool HandlerBase::isRetriableError(Result result) { |
| switch (result) { |
| case ResultServiceUnitNotReady: |
| return true; |
| default: |
| return false; |
| } |
| } |
| |
| void HandlerBase::scheduleReconnection(HandlerBasePtr handler) { |
| if (handler->state_ == Pending || handler->state_ == Ready) { |
| TimeDuration delay = handler->backoff_.next(); |
| |
| LOG_INFO( |
| handler->getName() << "Schedule reconnection in " << (delay.total_milliseconds() / 1000.0) << " s"); |
| handler->timer_->expires_from_now(delay); |
| //passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled |
| //so we will not run into the case where grabCnx is invoked on out of scope handler |
| handler->timer_->async_wait(boost::bind(&HandlerBase::handleTimeout, _1, handler)); |
| } |
| } |
| |
| void HandlerBase::handleTimeout(const boost::system::error_code& ec, HandlerBasePtr handler) { |
| if (ec) { |
| LOG_DEBUG(handler->getName() << "Ignoring timer cancelled event, code[" << ec <<"]"); |
| return; |
| } else { |
| handler->grabCnx(); |
| } |
| } |
| |
| ptime now() { |
| return microsec_clock::universal_time(); |
| } |
| |
| int64_t currentTimeMillis() { |
| static ptime time_t_epoch(boost::gregorian::date(1970, 1, 1)); |
| |
| time_duration diff = now() - time_t_epoch; |
| return diff.total_milliseconds(); |
| } |
| |
| } |