blob: bfeeeb5d560127f516c022c56d135eabbf5a4d4a [file] [log] [blame]
/**
* Copyright 2016 Yahoo Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "HandlerBase.h"
#include <boost/bind.hpp>
#include <cassert>
#include "LogUtils.h"
DECLARE_LOG_OBJECT()
namespace pulsar {
HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic)
: client_(client),
topic_(topic),
connection_(),
mutex_(),
creationTimestamp_(now()),
operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())),
state_(Pending),
backoff_(milliseconds(100), seconds(60)),
timer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()) {
}
HandlerBase::~HandlerBase() {
timer_->cancel();
}
void HandlerBase::start() {
grabCnx();
}
void HandlerBase::grabCnx() {
if (connection_.lock()) {
LOG_INFO(getName() << "Ignoring reconnection request since we're already connected");
return;
}
LOG_INFO(getName() << "Getting connection from pool");
ClientImplPtr client = client_.lock();
Future<Result, ClientConnectionWeakPtr> future = client->getConnection(topic_);
future.addListener(
boost::bind(&HandlerBase::handleNewConnection, _1, _2, get_weak_from_this()));
}
void HandlerBase::handleNewConnection(Result result, ClientConnectionWeakPtr connection,
HandlerBaseWeakPtr weakHandler) {
HandlerBasePtr handler = weakHandler.lock();
if (!handler) {
LOG_DEBUG("HandlerBase Weak reference is not valid anymore");
return;
}
if (result == ResultOk) {
ClientConnectionPtr conn = connection.lock();
if (conn) {
LOG_DEBUG(handler->getName() << "Connected to broker: " << conn->cnxString());
handler->connectionOpened(conn);
return;
}
// TODO - look deeper into why the connection is null while the result is ResultOk
LOG_INFO(handler->getName() << "ClientConnectionPtr is no longer valid");
}
handler->connectionFailed(result);
scheduleReconnection(handler);
}
void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr connection,
HandlerBaseWeakPtr weakHandler) {
HandlerBasePtr handler = weakHandler.lock();
if (!handler) {
LOG_DEBUG("HandlerBase Weak reference is not valid anymore");
return;
}
Lock lock(handler->mutex_);
State state = handler->state_;
ClientConnectionPtr currentConnection = handler->connection_.lock();
if (currentConnection && connection.lock().get() != currentConnection.get()) {
LOG_WARN(
handler->getName() << "Ignoring connection closed since we are already attached to a newer connection");
return;
}
if (currentConnection) {
currentConnection.reset();
}
switch (state) {
case Pending:
case Ready:
scheduleReconnection(handler);
break;
case Closing:
case Closed:
case Failed:
LOG_DEBUG(handler->getName() <<
"Ignoring connection closed event since the handler is not used anymore");
break;
}
}
bool HandlerBase::isRetriableError(Result result) {
switch (result) {
case ResultServiceUnitNotReady:
return true;
default:
return false;
}
}
void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
if (handler->state_ == Pending || handler->state_ == Ready) {
TimeDuration delay = handler->backoff_.next();
LOG_INFO(
handler->getName() << "Schedule reconnection in " << (delay.total_milliseconds() / 1000.0) << " s");
handler->timer_->expires_from_now(delay);
//passing shared_ptr here since time_ will get destroyed, so tasks will be cancelled
//so we will not run into the case where grabCnx is invoked on out of scope handler
handler->timer_->async_wait(boost::bind(&HandlerBase::handleTimeout, _1, handler));
}
}
void HandlerBase::handleTimeout(const boost::system::error_code& ec, HandlerBasePtr handler) {
if (ec) {
LOG_DEBUG(handler->getName() << "Ignoring timer cancelled event, code[" << ec <<"]");
return;
} else {
handler->grabCnx();
}
}
ptime now() {
return microsec_clock::universal_time();
}
int64_t currentTimeMillis() {
static ptime time_t_epoch(boost::gregorian::date(1970, 1, 1));
time_duration diff = now() - time_t_epoch;
return diff.total_milliseconds();
}
}