blob: 56d18e3c09249f4f7cbdd8374516e7a7879fd084 [file] [log] [blame]
/********************************************************************
* 2014 -
* open source under Apache License Version 2.0
********************************************************************/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "client/DataTransferProtocolSender.h"
#include "ReadShortCircuitInfo.h"
#include "server/Datanode.h"
#include "datatransfer.pb.h"
#include "Exception.h"
#include "ExceptionInternal.h"
#include "network/DomainSocket.h"
#include "SWCrc32c.h"
#include "HWCrc32c.h"
#include "StringUtil.h"
#include <inttypes.h>
#include <sstream>
#include <vector>
namespace Hdfs {
namespace Internal {
ReadShortCircuitFDCacheType
ReadShortCircuitInfoBuilder::ReadShortCircuitFDCache;
BlockLocalPathInfoCacheType
ReadShortCircuitInfoBuilder::BlockLocalPathInfoCache;
ReadShortCircuitInfo::~ReadShortCircuitInfo() {
try {
dataFile.reset();
metaFile.reset();
ReadShortCircuitInfoBuilder::release(*this);
} catch (...) {
}
}
ReadShortCircuitFDHolder::~ReadShortCircuitFDHolder() {
if (metafd != -1) {
::close(metafd);
}
if (datafd != -1) {
::close(datafd);
}
}
ReadShortCircuitInfoBuilder::ReadShortCircuitInfoBuilder(
const DatanodeInfo& dnInfo, const RpcAuth& auth, const SessionConfig& conf)
: dnInfo(dnInfo), auth(auth), conf(conf) {}
shared_ptr<ReadShortCircuitInfo> ReadShortCircuitInfoBuilder::fetchOrCreate(
const ExtendedBlock& block, const Token token) {
shared_ptr<ReadShortCircuitInfo> retval;
ReadShortCircuitInfoKey key(dnInfo.getXferPort(), block.getBlockId(),
block.getPoolId());
if (conf.isLegacyLocalBlockReader()) {
if (auth.getProtocol() != AuthProtocol::NONE) {
LOG(WARNING,
"Legacy read-shortcircuit only works for simple "
"authentication");
return shared_ptr<ReadShortCircuitInfo>();
}
BlockLocalPathInfo info = getBlockLocalPathInfo(block, token);
assert(block.getBlockId() == info.getBlock().getBlockId() &&
block.getPoolId() == info.getBlock().getPoolId());
if (0 != access(info.getLocalMetaPath(), R_OK)) {
invalidBlockLocalPathInfo(block);
LOG(WARNING,
"Legacy read-shortcircuit is enabled but path:%s is not "
"readable.",
info.getLocalMetaPath());
return shared_ptr<ReadShortCircuitInfo>();
}
retval = createReadShortCircuitInfo(key, info);
} else {
shared_ptr<ReadShortCircuitFDHolder> fds;
// find a pair available file descriptors in cache.
if (ReadShortCircuitFDCache.findAndErase(key, &fds)) {
try {
LOG(DEBUG1,
"Get file descriptors from cache for block %s, cache size %zu",
block.toString().c_str(), ReadShortCircuitFDCache.size());
return createReadShortCircuitInfo(key, fds);
} catch (...) {
// failed to create file wrapper from fds, retry with new fds.
}
}
// create a new one
retval = createReadShortCircuitInfo(key, block, token);
ReadShortCircuitFDCache.setMaxSize(conf.getMaxFileDescriptorCacheSize());
}
return retval;
}
void ReadShortCircuitInfoBuilder::release(const ReadShortCircuitInfo& info) {
if (info.isValid() && !info.isLegacy()) {
ReadShortCircuitFDCache.insert(info.getKey(), info.getFdHolder());
LOG(DEBUG1,
"Inserted file descriptors into cache for block %s, cache size %zu",
info.formatBlockInfo().c_str(), ReadShortCircuitFDCache.size());
}
}
BlockLocalPathInfo ReadShortCircuitInfoBuilder::getBlockLocalPathInfo(
const ExtendedBlock& block, const Token& token) {
BlockLocalPathInfo retval;
ReadShortCircuitInfoKey key(dnInfo.getXferPort(), block.getBlockId(),
block.getPoolId());
try {
if (!BlockLocalPathInfoCache.find(key, &retval)) {
RpcAuth a = auth;
SessionConfig c = conf;
c.setRpcMaxRetryOnConnect(1);
/*
* only kerberos based authentication is allowed, do not add
* token
*/
shared_ptr<Datanode> dn = shared_ptr<Datanode>(new DatanodeImpl(
dnInfo.getIpAddr().c_str(), dnInfo.getIpcPort(), c, a));
dn->getBlockLocalPathInfo(block, token, retval);
BlockLocalPathInfoCache.setMaxSize(conf.getMaxLocalBlockInfoCacheSize());
BlockLocalPathInfoCache.insert(key, retval);
LOG(DEBUG1, "Inserted block %s to local block info cache, cache size %zu",
block.toString().c_str(), BlockLocalPathInfoCache.size());
} else {
LOG(DEBUG1,
"Get local block info from cache for block %s, cache size %zu",
block.toString().c_str(), BlockLocalPathInfoCache.size());
}
} catch (const HdfsIOException& e) {
throw;
} catch (const HdfsException& e) {
NESTED_THROW(HdfsIOException,
"ReadShortCircuitInfoBuilder: Failed to get block local "
"path information.");
}
return retval;
}
void ReadShortCircuitInfoBuilder::invalidBlockLocalPathInfo(
const ExtendedBlock& block) {
BlockLocalPathInfoCache.erase(ReadShortCircuitInfoKey(
dnInfo.getXferPort(), block.getBlockId(), block.getPoolId()));
}
shared_ptr<ReadShortCircuitInfo>
ReadShortCircuitInfoBuilder::createReadShortCircuitInfo(
const ReadShortCircuitInfoKey& key, const BlockLocalPathInfo& info) {
shared_ptr<FileWrapper> dataFile;
shared_ptr<FileWrapper> metaFile;
std::string metaFilePath = info.getLocalMetaPath();
std::string dataFilePath = info.getLocalBlockPath();
if (conf.doUseMappedFile()) {
metaFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper);
dataFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper);
} else {
metaFile = shared_ptr<CFileWrapper>(new CFileWrapper);
dataFile = shared_ptr<CFileWrapper>(new CFileWrapper);
}
if (!metaFile->open(metaFilePath)) {
THROW(HdfsIOException,
"ReadShortCircuitInfoBuilder cannot open metadata file \"%s\", %s",
metaFilePath.c_str(), GetSystemErrorInfo(errno));
}
if (!dataFile->open(dataFilePath)) {
THROW(HdfsIOException,
"ReadShortCircuitInfoBuilder cannot open data file \"%s\", %s",
dataFilePath.c_str(), GetSystemErrorInfo(errno));
}
dataFile->seek(0);
metaFile->seek(0);
shared_ptr<ReadShortCircuitInfo> retval(new ReadShortCircuitInfo(key, true));
retval->setDataFile(dataFile);
retval->setMetaFile(metaFile);
return retval;
}
std::string ReadShortCircuitInfoBuilder::buildDomainSocketAddress(
uint32_t port) {
std::string domainSocketPath = conf.getDomainSocketPath();
if (domainSocketPath.empty()) {
THROW(HdfsIOException,
"ReadShortCircuitInfoBuilder: \"dfs.domain.socket.path\" is not "
"set");
}
std::stringstream ss;
ss.imbue(std::locale::classic());
ss << port;
StringReplaceAll(domainSocketPath, "_PORT", ss.str());
return domainSocketPath;
}
shared_ptr<ReadShortCircuitInfo>
ReadShortCircuitInfoBuilder::createReadShortCircuitInfo(
const ReadShortCircuitInfoKey& key, const ExtendedBlock& block,
const Token& token) {
std::string addr = buildDomainSocketAddress(key.dnPort);
DomainSocketImpl sock;
sock.connect(addr.c_str(), 0, conf.getInputConnTimeout());
DataTransferProtocolSender sender(sock, conf.getInputWriteTimeout(), addr);
sender.requestShortCircuitFds(block, token, MaxReadShortCircuitVersion);
shared_ptr<ReadShortCircuitFDHolder> fds =
receiveReadShortCircuitFDs(sock, block);
return createReadShortCircuitInfo(key, fds);
}
shared_ptr<ReadShortCircuitFDHolder>
ReadShortCircuitInfoBuilder::receiveReadShortCircuitFDs(
Socket& sock, const ExtendedBlock& block) {
std::vector<char> respBuffer;
int readTimeout = conf.getInputReadTimeout();
shared_ptr<BufferedSocketReader> in(
new BufferedSocketReaderImpl(sock, 0)); // disable buffer
int32_t respSize = in->readVarint32(readTimeout);
if (respSize <= 0 || respSize > 10 * 1024 * 1024) {
THROW(HdfsIOException,
"ReadShortCircuitInfoBuilder get a invalid response size: %d, "
"Block: %s, "
"from Datanode: %s",
respSize, block.toString().c_str(), dnInfo.formatAddress().c_str());
}
respBuffer.resize(respSize);
in->readFully(&respBuffer[0], respSize, readTimeout);
BlockOpResponseProto resp;
if (!resp.ParseFromArray(&respBuffer[0], respBuffer.size())) {
THROW(HdfsIOException,
"ReadShortCircuitInfoBuilder cannot parse BlockOpResponseProto "
"from "
"Datanode response, "
"Block: %s, from Datanode: %s",
block.toString().c_str(), dnInfo.formatAddress().c_str());
}
if (resp.status() != Status::DT_PROTO_SUCCESS) {
std::string msg;
if (resp.has_message()) {
msg = resp.message();
}
if (resp.status() == Status::DT_PROTO_ERROR_ACCESS_TOKEN) {
THROW(HdfsInvalidBlockToken,
"ReadShortCircuitInfoBuilder: block's token is invalid. "
"Datanode: %s, Block: %s",
dnInfo.formatAddress().c_str(), block.toString().c_str());
} else if (resp.status() == Status::DT_PROTO_ERROR_UNSUPPORTED) {
THROW(HdfsIOException,
"short-circuit read access is disabled for "
"DataNode %s. reason: %s",
dnInfo.formatAddress().c_str(),
(msg.empty() ? "check Datanode's log for more information"
: msg.c_str()));
} else {
THROW(HdfsIOException,
"ReadShortCircuitInfoBuilder: Datanode return an error when "
"sending read request to Datanode: %s, Block: %s, %s.",
dnInfo.formatAddress().c_str(), block.toString().c_str(),
(msg.empty() ? "check Datanode's log for more information"
: msg.c_str()));
}
}
DomainSocketImpl* domainSocket = dynamic_cast<DomainSocketImpl*>(&sock);
if (NULL == domainSocket) {
THROW(HdfsIOException, "Read short-circuit only works with Domain Socket");
}
shared_ptr<ReadShortCircuitFDHolder> fds(new ReadShortCircuitFDHolder);
std::vector<int> tempFds(2, -1);
respBuffer.resize(1);
domainSocket->receiveFileDescriptors(&tempFds[0], tempFds.size(),
&respBuffer[0], respBuffer.size());
assert(tempFds[0] != -1 && "failed to receive data file descriptor");
assert(tempFds[1] != -1 && "failed to receive metadata file descriptor");
fds->datafd = tempFds[0];
fds->metafd = tempFds[1];
return fds;
}
shared_ptr<ReadShortCircuitInfo>
ReadShortCircuitInfoBuilder::createReadShortCircuitInfo(
const ReadShortCircuitInfoKey& key,
const shared_ptr<ReadShortCircuitFDHolder>& fds) {
shared_ptr<FileWrapper> dataFile;
shared_ptr<FileWrapper> metaFile;
if (conf.doUseMappedFile()) {
metaFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper);
dataFile = shared_ptr<MappedFileWrapper>(new MappedFileWrapper);
} else {
metaFile = shared_ptr<CFileWrapper>(new CFileWrapper);
dataFile = shared_ptr<CFileWrapper>(new CFileWrapper);
}
metaFile->open(fds->metafd, false);
dataFile->open(fds->datafd, false);
dataFile->seek(0);
metaFile->seek(0);
shared_ptr<ReadShortCircuitInfo> retval(new ReadShortCircuitInfo(key, false));
retval->setFdHolder(fds);
retval->setDataFile(dataFile);
retval->setMetaFile(metaFile);
return retval;
}
}
}