blob: acb754e738abc004c0353192469ee61a66ecdf4e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "namenode_tracker.h"
#include "common/logging.h"
#include "common/libhdfs_events_impl.h"
#include "common/util.h"
namespace hdfs {
static std::string format_endpoints(const std::vector<boost::asio::ip::tcp::endpoint> &pts) {
std::stringstream ss;
for(unsigned int i=0; i<pts.size(); i++)
if(i == pts.size() - 1)
ss << pts[i];
else
ss << pts[i] << ", ";
return ss.str();
}
HANamenodeTracker::HANamenodeTracker(const std::vector<ResolvedNamenodeInfo> &servers,
std::shared_ptr<IoService> ioservice,
std::shared_ptr<LibhdfsEvents> event_handlers)
: enabled_(false), resolved_(false),
ioservice_(ioservice), event_handlers_(event_handlers)
{
LOG_TRACE(kRPC, << "HANamenodeTracker got the following nodes");
for(unsigned int i=0;i<servers.size();i++)
LOG_TRACE(kRPC, << servers[i].str());
if(servers.size() >= 2) {
LOG_TRACE(kRPC, << "Creating HA namenode tracker");
if(servers.size() > 2) {
LOG_WARN(kRPC, << "Nameservice declares more than two nodes. Some won't be used.");
}
active_info_ = servers[0];
standby_info_ = servers[1];
LOG_INFO(kRPC, << "HA enabled. Using the following namenodes from the configuration."
<< "\nNote: Active namenode cannot be determined until a connection has been made.")
LOG_INFO(kRPC, << "First namenode url = " << active_info_.uri.str());
LOG_INFO(kRPC, << "Second namenode url = " << standby_info_.uri.str());
enabled_ = true;
if(!active_info_.endpoints.empty() || !standby_info_.endpoints.empty()) {
resolved_ = true;
}
}
}
HANamenodeTracker::~HANamenodeTracker() {}
bool HANamenodeTracker::GetFailoverAndUpdate(const std::vector<boost::asio::ip::tcp::endpoint>& current_endpoints,
ResolvedNamenodeInfo& out)
{
mutex_guard swap_lock(swap_lock_);
// Cannot look up without a key.
if(current_endpoints.size() == 0) {
event_handlers_->call(FS_NN_EMPTY_ENDPOINTS_EVENT, active_info_.nameservice.c_str(),
0 /*Not much to say about context without endpoints*/);
LOG_ERROR(kRPC, << "HANamenodeTracker@" << this << "::GetFailoverAndUpdate requires at least 1 endpoint.");
return false;
}
LOG_TRACE(kRPC, << "Swapping from endpoint " << current_endpoints[0]);
if(IsCurrentActive_locked(current_endpoints[0])) {
std::swap(active_info_, standby_info_);
if(event_handlers_)
event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(),
reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
out = active_info_;
} else if(IsCurrentStandby_locked(current_endpoints[0])) {
// Connected to standby
if(event_handlers_)
event_handlers_->call(FS_NN_FAILOVER_EVENT, active_info_.nameservice.c_str(),
reinterpret_cast<int64_t>(active_info_.uri.str().c_str()));
out = active_info_;
} else {
// Invalid state (or a NIC was added that didn't show up during DNS)
std::stringstream errorMsg; // asio specializes endpoing operator<< for stringstream
errorMsg << "Unable to find RPC connection in config. Looked for " << current_endpoints[0] << " in\n"
<< format_endpoints(active_info_.endpoints) << " and\n"
<< format_endpoints(standby_info_.endpoints) << std::endl;
LOG_ERROR(kRPC, << errorMsg.str());
return false;
}
// Extra DNS on swapped node to try and get EPs if it didn't already have them
if(out.endpoints.empty()) {
LOG_WARN(kRPC, << "No endpoints for node " << out.uri.str() << " attempting to resolve again");
if(!ResolveInPlace(ioservice_, out)) {
// Stuck retrying against the same NN that was able to be resolved in this case
LOG_ERROR(kRPC, << "Fallback endpoint resolution for node " << out.uri.str()
<< " failed. Please make sure your configuration is up to date.");
}
}
return true;
}
bool HANamenodeTracker::IsCurrentActive_locked(const boost::asio::ip::tcp::endpoint &ep) const {
for(unsigned int i=0;i<active_info_.endpoints.size();i++) {
if(ep.address() == active_info_.endpoints[i].address()) {
if(ep.port() != active_info_.endpoints[i].port())
LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << active_info_.endpoints[i] << " trying anyway..");
return true;
}
}
return false;
}
bool HANamenodeTracker::IsCurrentStandby_locked(const boost::asio::ip::tcp::endpoint &ep) const {
for(unsigned int i=0;i<standby_info_.endpoints.size();i++) {
if(ep.address() == standby_info_.endpoints[i].address()) {
if(ep.port() != standby_info_.endpoints[i].port())
LOG_WARN(kRPC, << "Port mismatch: " << ep << " vs " << standby_info_.endpoints[i] << " trying anyway..");
return true;
}
}
return false;
}
} // end namespace hdfs