| /******************************************************************** |
| * 2014 - |
| * open source under Apache License Version 2.0 |
| ********************************************************************/ |
| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "Exception.h" |
| #include "ExceptionInternal.h" |
| #include "FileSystemInter.h" |
| #include "InputStreamImpl.h" |
| #include "InputStreamInter.h" |
| #include "LocalBlockReader.h" |
| #include "Logger.h" |
| #include "RemoteBlockReader.h" |
| #include "server/Datanode.h" |
| #include "Thread.h" |
| |
| #include <algorithm> |
| #include <ifaddrs.h> |
| #include <inttypes.h> |
| #include <iostream> |
| #include <sys/socket.h> |
| #include <sys/types.h> |
| #include <unistd.h> |
| |
| namespace Hdfs { |
| namespace Internal { |
| |
| unordered_set<std::string> BuildLocalAddrSet() { |
| unordered_set<std::string> set; |
| struct ifaddrs * ifAddr = NULL; |
| struct ifaddrs * pifAddr = NULL; |
| struct sockaddr * addr; |
| |
| if (getifaddrs(&ifAddr)) { |
| THROW(HdfsNetworkException, |
| "InputStreamImpl: cannot get local network interface: %s", |
| GetSystemErrorInfo(errno)); |
| } |
| |
| try { |
| std::vector<char> host; |
| const char * pHost; |
| host.resize(INET6_ADDRSTRLEN + 1); |
| |
| for (pifAddr = ifAddr; pifAddr != NULL; pifAddr = pifAddr->ifa_next) { |
| addr = pifAddr->ifa_addr; |
| |
| if (!addr) { |
| continue; |
| } |
| |
| memset(&host[0], 0, INET6_ADDRSTRLEN + 1); |
| |
| if (addr->sa_family == AF_INET) { |
| pHost = |
| inet_ntop(addr->sa_family, |
| &(reinterpret_cast<struct sockaddr_in *>(addr))->sin_addr, |
| &host[0], INET6_ADDRSTRLEN); |
| } else if (addr->sa_family == AF_INET6) { |
| pHost = |
| inet_ntop(addr->sa_family, |
| &(reinterpret_cast<struct sockaddr_in6 *>(addr))->sin6_addr, |
| &host[0], INET6_ADDRSTRLEN); |
| } else { |
| continue; |
| } |
| |
| if (NULL == pHost) { |
| THROW(HdfsNetworkException, |
| "InputStreamImpl: cannot get convert network address to textual form: %s", |
| GetSystemErrorInfo(errno)); |
| } |
| |
| set.insert(pHost); |
| } |
| |
| /* |
| * add hostname. |
| */ |
| long hostlen = sysconf(_SC_HOST_NAME_MAX); |
| host.resize(hostlen + 1); |
| |
| if (gethostname(&host[0], host.size())) { |
| THROW(HdfsNetworkException, |
| "InputStreamImpl: cannot get hostname: %s", |
| GetSystemErrorInfo(errno)); |
| } |
| |
| set.insert(&host[0]); |
| } catch (...) { |
| if (ifAddr != NULL) { |
| freeifaddrs(ifAddr); |
| } |
| |
| throw; |
| } |
| |
| if (ifAddr != NULL) { |
| freeifaddrs(ifAddr); |
| } |
| |
| return set; |
| } |
| |
| InputStreamImpl::InputStreamImpl() : |
| closed(true), localRead(true), readFromUnderConstructedBlock(false), verify( |
| true), maxGetBlockInfoRetry(3), cursor(0), endOfCurBlock(0), lastBlockBeingWrittenLength( |
| 0), prefetchSize(0), peerCache(NULL) { |
| #ifdef MOCK |
| stub = NULL; |
| #endif |
| } |
| |
| InputStreamImpl::~InputStreamImpl() { |
| } |
| |
| void InputStreamImpl::checkStatus() { |
| if (closed) { |
| THROW(HdfsIOException, "InputStreamImpl: stream is not opened."); |
| } |
| |
| if (lastError != exception_ptr()) { |
| rethrow_exception(lastError); |
| } |
| } |
| |
| |
| int64_t InputStreamImpl::readBlockLength(const LocatedBlock & b) { |
| const std::vector<DatanodeInfo> & nodes = b.getLocations(); |
| int replicaNotFoundCount = nodes.size(); |
| |
| for (size_t i = 0; i < nodes.size(); ++i) { |
| try { |
| int64_t n = 0; |
| shared_ptr<Datanode> dn; |
| RpcAuth a = auth; |
| a.getUser().addToken(b.getToken()); |
| #ifdef MOCK |
| |
| if (stub) { |
| dn = stub->getDatanode(); |
| } else { |
| dn = shared_ptr < Datanode > (new DatanodeImpl(nodes[i].getIpAddr().c_str(), |
| nodes[i].getIpcPort(), *conf, a)); |
| } |
| |
| #else |
| dn = shared_ptr < Datanode > (new DatanodeImpl(nodes[i].getIpAddr().c_str(), |
| nodes[i].getIpcPort(), *conf, a)); |
| #endif |
| n = dn->getReplicaVisibleLength(b); |
| |
| if (n >= 0) { |
| return n; |
| } |
| } catch (const ReplicaNotFoundException & e) { |
| std::string buffer; |
| LOG(LOG_ERROR, |
| "InputStreamImpl: failed to get block visible length for Block: %s file %s from Datanode: %s\n%s", |
| b.toString().c_str(), path.c_str(), nodes[i].formatAddress().c_str(), GetExceptionDetail(e, buffer)); |
| LOG(INFO, |
| "InputStreamImpl: retry get block visible length for Block: %s file %s from other datanode", |
| b.toString().c_str(), path.c_str()); |
| --replicaNotFoundCount; |
| } catch (const HdfsIOException & e) { |
| std::string buffer; |
| LOG(LOG_ERROR, |
| "InputStreamImpl: failed to get block visible length for Block: %s file %s from Datanode: %s\n%s", |
| b.toString().c_str(), path.c_str(), nodes[i].formatAddress().c_str(), GetExceptionDetail(e, buffer)); |
| LOG(INFO, |
| "InputStreamImpl: retry get block visible length for Block: %s file %s from other datanode", |
| b.toString().c_str(), path.c_str()); |
| } |
| } |
| |
| // Namenode told us about these locations, but none know about the replica |
| // means that we hit the race between pipeline creation start and end. |
| // we require all 3 because some other exception could have happened |
| // on a DN that has it. we want to report that error |
| if (replicaNotFoundCount == 0) { |
| return 0; |
| } |
| |
| return -1; |
| } |
| |
| /** |
| * Getting blocks locations'information from namenode |
| */ |
| void InputStreamImpl::updateBlockInfos() { |
| int retry = maxGetBlockInfoRetry; |
| |
| for (int i = 0; i < retry; ++i) { |
| try { |
| if (!lbs) { |
| lbs = shared_ptr < LocatedBlocksImpl > (new LocatedBlocksImpl); |
| } |
| |
| filesystem->getBlockLocations(path, cursor, prefetchSize, *lbs); |
| |
| if (lbs->isLastBlockComplete()) { |
| lastBlockBeingWrittenLength = 0; |
| } else { |
| shared_ptr<LocatedBlock> last = lbs->getLastBlock(); |
| |
| if (!last) { |
| lastBlockBeingWrittenLength = 0; |
| } else { |
| lastBlockBeingWrittenLength = readBlockLength(*last); |
| |
| if (lastBlockBeingWrittenLength == -1) { |
| if (i + 1 >= retry) { |
| THROW(HdfsIOException, |
| "InputStreamImpl: failed to get block visible length for Block: %s from all Datanode.", |
| last->toString().c_str()); |
| } else { |
| LOG(LOG_ERROR, |
| "InputStreamImpl: failed to get block visible length for Block: %s file %s from all Datanode.", |
| last->toString().c_str(), path.c_str()); |
| |
| try { |
| sleep_for(milliseconds(4000)); |
| } catch (...) { |
| } |
| |
| continue; |
| } |
| } |
| |
| last->setNumBytes(lastBlockBeingWrittenLength); |
| } |
| } |
| |
| return; |
| } catch (const HdfsRpcException & e) { |
| std::string buffer; |
| LOG(LOG_ERROR, |
| "InputStreamImpl: failed to get block information for file %s, %s", |
| path.c_str(), GetExceptionDetail(e, buffer)); |
| |
| if (i + 1 >= retry) { |
| throw; |
| } |
| } |
| |
| LOG(INFO, |
| "InputStreamImpl: retry to get block information for file: %s, already tried %d time(s).", |
| path.c_str(), i + 1); |
| } |
| } |
| |
| int64_t InputStreamImpl::getFileLength() { |
| int64_t length = lbs->getFileLength(); |
| |
| if (!lbs->isLastBlockComplete()) { |
| length += lastBlockBeingWrittenLength; |
| } |
| |
| return length; |
| } |
| |
| void InputStreamImpl::seekToBlock(const LocatedBlock & lb) { |
| if (cursor >= lbs->getFileLength()) { |
| assert(!lbs->isLastBlockComplete()); |
| readFromUnderConstructedBlock = true; |
| } else { |
| readFromUnderConstructedBlock = false; |
| } |
| |
| assert(cursor >= lb.getOffset() |
| && cursor < lb.getOffset() + lb.getNumBytes()); |
| curBlock = shared_ptr < LocatedBlock > (new LocatedBlock(lb)); |
| int64_t blockSize = curBlock->getNumBytes(); |
| assert(blockSize > 0); |
| endOfCurBlock = blockSize + curBlock->getOffset(); |
| failedNodes.clear(); |
| blockReader.reset(); |
| } |
| |
| bool InputStreamImpl::choseBestNode() { |
| const std::vector<DatanodeInfo> & nodes = curBlock->getLocations(); |
| |
| for (size_t i = 0; i < nodes.size(); ++i) { |
| if (std::binary_search(failedNodes.begin(), failedNodes.end(), |
| nodes[i])) { |
| continue; |
| } |
| |
| curNode = nodes[i]; |
| return true; |
| } |
| |
| return false; |
| } |
| |
| bool InputStreamImpl::isLocalNode() { |
| static const unordered_set<std::string> LocalAddrSet = BuildLocalAddrSet(); |
| bool retval = LocalAddrSet.find(curNode.getIpAddr()) != LocalAddrSet.end(); |
| return retval; |
| } |
| |
| void InputStreamImpl::setupBlockReader(bool temporaryDisableLocalRead) { |
| bool lastReadFromLocal = false; |
| exception_ptr lastException; |
| |
| while (true) { |
| if (!choseBestNode()) { |
| try { |
| if (lastException) { |
| rethrow_exception(lastException); |
| } |
| } catch (...) { |
| NESTED_THROW(HdfsIOException, |
| "InputStreamImpl: all nodes have been tried and no valid replica can be read for Block: %s.", |
| curBlock->toString().c_str()); |
| } |
| |
| THROW(HdfsIOException, |
| "InputStreamImpl: all nodes have been tried and no valid replica can be read for Block: %s.", |
| curBlock->toString().c_str()); |
| } |
| |
| try { |
| int64_t offset, len; |
| offset = cursor - curBlock->getOffset(); |
| assert(offset >= 0); |
| len = curBlock->getNumBytes() - offset; |
| assert(len > 0); |
| |
| if (!temporaryDisableLocalRead && !lastReadFromLocal && |
| !readFromUnderConstructedBlock && localRead && isLocalNode()) { |
| lastReadFromLocal = true; |
| |
| shared_ptr<ReadShortCircuitInfo> info; |
| ReadShortCircuitInfoBuilder builder(curNode, auth, *conf); |
| |
| try { |
| info = builder.fetchOrCreate(*curBlock, curBlock->getToken()); |
| |
| if (!info) { |
| continue; |
| } |
| |
| assert(info->isValid()); |
| blockReader = shared_ptr<BlockReader>( |
| new LocalBlockReader(info, *curBlock, offset, verify, |
| *conf, localReaderBuffer)); |
| } catch (...) { |
| if (info) { |
| info->setValid(false); |
| } |
| |
| throw; |
| } |
| } else { |
| const char * clientName = filesystem->getClientName(); |
| lastReadFromLocal = false; |
| blockReader = shared_ptr<BlockReader>(new RemoteBlockReader( |
| *curBlock, curNode, *peerCache, offset, len, |
| curBlock->getToken(), clientName, verify, *conf)); |
| } |
| |
| break; |
| } catch (const HdfsIOException & e) { |
| lastException = current_exception(); |
| std::string buffer; |
| |
| if (lastReadFromLocal) { |
| LOG(LOG_ERROR, |
| "cannot setup block reader for Block: %s file %s on Datanode: %s.\n%s\n" |
| "retry the same node but disable read shortcircuit feature", |
| curBlock->toString().c_str(), path.c_str(), |
| curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer)); |
| /* |
| * do not add node into failedNodes since we will retry the same node but |
| * disable local block reading |
| */ |
| } else { |
| LOG(LOG_ERROR, |
| "cannot setup block reader for Block: %s file %s on Datanode: %s.\n%s\nretry another node", |
| curBlock->toString().c_str(), path.c_str(), |
| curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer)); |
| failedNodes.push_back(curNode); |
| std::sort(failedNodes.begin(), failedNodes.end()); |
| } |
| } |
| } |
| } |
| |
| void InputStreamImpl::open(shared_ptr<FileSystemInter> fs, const char * path, |
| bool verifyChecksum) { |
| if (NULL == path || 0 == strlen(path)) { |
| THROW(InvalidParameter, "path is invalid."); |
| } |
| |
| try { |
| openInternal(fs, path, verifyChecksum); |
| } catch (...) { |
| close(); |
| throw; |
| } |
| } |
| |
| void InputStreamImpl::openInternal(shared_ptr<FileSystemInter> fs, const char * path, |
| bool verifyChecksum) { |
| try { |
| filesystem = fs; |
| verify = verifyChecksum; |
| this->path = fs->getStandardPath(path); |
| LOG(DEBUG2, "%p, open file %s for read, verfyChecksum is %s", this, this->path.c_str(), (verifyChecksum ? "true" : "false")); |
| conf = shared_ptr < SessionConfig > (new SessionConfig(fs->getConf())); |
| this->auth = RpcAuth(fs->getUserInfo(), RpcAuth::ParseMethod(conf->getRpcAuthMethod())); |
| prefetchSize = conf->getDefaultBlockSize() * conf->getPrefetchSize(); |
| localRead = conf->isReadFromLocal(); |
| maxGetBlockInfoRetry = conf->getMaxGetBlockInfoRetry(); |
| peerCache = &fs->getPeerCache(); |
| updateBlockInfos(); |
| closed = false; |
| } catch (const HdfsCanceled & e) { |
| throw; |
| } catch (const FileNotFoundException & e) { |
| throw; |
| } catch (const HdfsException & e) { |
| NESTED_THROW(HdfsIOException, "InputStreamImpl: cannot open file: %s.", |
| this->path.c_str()); |
| } |
| } |
| |
| int32_t InputStreamImpl::read(char * buf, int32_t size) { |
| checkStatus(); |
| |
| try { |
| int64_t prvious = cursor; |
| int32_t done = readInternal(buf, size); |
| LOG(DEBUG3, "%p read file %s size is %d, offset %" PRId64 " done %d, next pos %" PRId64, this, path.c_str(), size, |
| prvious, done, cursor); |
| return done; |
| } catch (const HdfsEndOfStream & e) { |
| throw; |
| } catch (...) { |
| lastError = current_exception(); |
| throw; |
| } |
| } |
| |
| int32_t InputStreamImpl::readOneBlock(char * buf, int32_t size, bool shouldUpdateMetadataOnFailure) { |
| bool temporaryDisableLocalRead = false; |
| std::string buffer; |
| |
| while (true) { |
| try { |
| /* |
| * Setup block reader here and handle failure. |
| */ |
| if (!blockReader) { |
| setupBlockReader(temporaryDisableLocalRead); |
| temporaryDisableLocalRead = false; |
| } |
| } catch (const HdfsInvalidBlockToken & e) { |
| std::string buffer; |
| LOG(LOG_ERROR, |
| "InputStreamImpl: failed to read Block: %s file %s, \n%s, retry after updating block informations.", |
| curBlock->toString().c_str(), path.c_str(), GetExceptionDetail(e, buffer)); |
| return -1; |
| } catch (const HdfsIOException & e) { |
| /* |
| * In setupBlockReader, we have tried all the replicas. |
| * We now update block informations once, and try again. |
| */ |
| if (shouldUpdateMetadataOnFailure) { |
| LOG(LOG_ERROR, |
| "InputStreamImpl: failed to read Block: %s file %s, \n%s, retry after updating block informations.", |
| curBlock->toString().c_str(), path.c_str(), |
| GetExceptionDetail(e, buffer)); |
| return -1; |
| } else { |
| /* |
| * We have updated block informations and failed again. |
| */ |
| throw; |
| } |
| } |
| |
| /* |
| * Block reader has been setup, read from block reader. |
| */ |
| try { |
| int32_t todo = size; |
| todo = todo < endOfCurBlock - cursor ? |
| todo : static_cast<int32_t>(endOfCurBlock - cursor); |
| assert(blockReader); |
| todo = blockReader->read(buf, todo); |
| cursor += todo; |
| /* |
| * Exit the loop and function from here if success. |
| */ |
| return todo; |
| } catch (const HdfsIOException & e) { |
| /* |
| * Failed to read from current block reader, |
| * add the current datanode to invalid node list and try again. |
| */ |
| LOG(LOG_ERROR, |
| "InputStreamImpl: failed to read Block: %s file %s from Datanode: %s, \n%s, " |
| "retry read again from another Datanode.", |
| curBlock->toString().c_str(), path.c_str(), |
| curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer)); |
| |
| if (conf->doesNotRetryAnotherNode()) { |
| throw; |
| } |
| } catch (const ChecksumException & e) { |
| LOG(LOG_ERROR, |
| "InputStreamImpl: failed to read Block: %s file %s from Datanode: %s, \n%s, " |
| "retry read again from another Datanode.", |
| curBlock->toString().c_str(), path.c_str(), |
| curNode.formatAddress().c_str(), GetExceptionDetail(e, buffer)); |
| } |
| |
| /* |
| * Successfully create the block reader but failed to read. |
| * Disable the local block reader and try the same node again. |
| */ |
| if (!blockReader || dynamic_cast<LocalBlockReader *>(blockReader.get())) { |
| temporaryDisableLocalRead = true; |
| } else { |
| /* |
| * Remote block reader failed to read, try another node. |
| */ |
| LOG(INFO, "IntputStreamImpl: Add invalid datanode %s to failed datanodes and try another datanode again for file %s.", |
| curNode.formatAddress().c_str(), path.c_str()); |
| failedNodes.push_back(curNode); |
| std::sort(failedNodes.begin(), failedNodes.end()); |
| } |
| |
| blockReader.reset(); |
| } |
| } |
| |
| /** |
| * To read data from hdfs. |
| * @param buf the buffer used to filled. |
| * @param size buffer size. |
| * @return return the number of bytes filled in the buffer, it may less than size. |
| */ |
| int32_t InputStreamImpl::readInternal(char * buf, int32_t size) { |
| int updateMetadataOnFailure = conf->getMaxReadBlockRetry(); |
| |
| try { |
| do { |
| const LocatedBlock * lb = NULL; |
| |
| /* |
| * Check if we have got the block information we need. |
| */ |
| if (!lbs || cursor >= getFileLength() |
| || (cursor >= endOfCurBlock && !(lb = lbs->findBlock(cursor)))) { |
| /* |
| * Get block information from namenode. |
| * Do RPC failover work in updateBlockInfos. |
| */ |
| updateBlockInfos(); |
| |
| /* |
| * We already have the up-to-date block information, |
| * Check if we reach the end of file. |
| */ |
| if (cursor >= getFileLength()) { |
| THROW(HdfsEndOfStream, |
| "InputStreamImpl: read over EOF, current position: %" PRId64 ", read size: %d, from file: %s", |
| cursor, size, path.c_str()); |
| } |
| } |
| |
| /* |
| * If we reach the end of block or the block information has just updated, |
| * seek to the right block to read. |
| */ |
| if (cursor >= endOfCurBlock) { |
| lb = lbs->findBlock(cursor); |
| |
| if (!lb) { |
| THROW(HdfsIOException, |
| "InputStreamImpl: cannot find block information at position: %" PRId64 " for file: %s", |
| cursor, path.c_str()); |
| } |
| |
| /* |
| * Seek to the right block, setup all needed variable, |
| * but do not setup block reader, setup it latter. |
| */ |
| seekToBlock(*lb); |
| } |
| |
| int32_t retval = readOneBlock(buf, size, updateMetadataOnFailure > 0); |
| |
| /* |
| * Now we have tried all replicas and failed. |
| * We will update metadata once and try again. |
| */ |
| if (retval < 0) { |
| lbs.reset(); |
| endOfCurBlock = 0; |
| --updateMetadataOnFailure; |
| |
| try { |
| sleep_for(seconds(1)); |
| } catch (...) { |
| } |
| |
| continue; |
| } |
| |
| return retval; |
| } while (true); |
| } catch (const HdfsCanceled & e) { |
| throw; |
| } catch (const HdfsEndOfStream & e) { |
| throw; |
| } catch (const HdfsException & e) { |
| /* |
| * wrap the underlying error and rethrow. |
| */ |
| NESTED_THROW(HdfsIOException, |
| "InputStreamImpl: cannot read file: %s, from position %" PRId64 ", size: %d.", |
| path.c_str(), cursor, size); |
| } |
| } |
| |
| /** |
| * To read data from hdfs, block until get the given size of bytes. |
| * @param buf the buffer used to filled. |
| * @param size the number of bytes to be read. |
| */ |
| void InputStreamImpl::readFully(char * buf, int64_t size) { |
| LOG(DEBUG3, "readFully file %s size is %" PRId64 ", offset %" PRId64, path.c_str(), size, cursor); |
| checkStatus(); |
| |
| try { |
| return readFullyInternal(buf, size); |
| } catch (const HdfsEndOfStream & e) { |
| throw; |
| } catch (...) { |
| lastError = current_exception(); |
| throw; |
| } |
| } |
| |
| void InputStreamImpl::readFullyInternal(char * buf, int64_t size) { |
| int32_t done; |
| int64_t pos = cursor, todo = size; |
| |
| try { |
| while (todo > 0) { |
| done = todo < std::numeric_limits<int32_t>::max() ? |
| static_cast<int32_t>(todo) : |
| std::numeric_limits<int32_t>::max(); |
| done = readInternal(buf + (size - todo), done); |
| todo -= done; |
| } |
| } catch (const HdfsCanceled & e) { |
| throw; |
| } catch (const HdfsEndOfStream & e) { |
| THROW(HdfsEndOfStream, |
| "InputStreamImpl: read over EOF, current position: %" PRId64 ", read size: %" PRId64 ", from file: %s", |
| pos, size, path.c_str()); |
| } catch (const HdfsException & e) { |
| NESTED_THROW(HdfsIOException, |
| "InputStreamImpl: cannot read fully from file: %s, from position %" PRId64 ", size: %" PRId64 ".", |
| path.c_str(), pos, size); |
| } |
| } |
| |
| int64_t InputStreamImpl::available() { |
| checkStatus(); |
| |
| try { |
| if (blockReader) { |
| return blockReader->available(); |
| } |
| } catch (...) { |
| lastError = current_exception(); |
| throw; |
| } |
| |
| return 0; |
| } |
| |
| /** |
| * To move the file point to the given position. |
| * @param size the given position. |
| */ |
| void InputStreamImpl::seek(int64_t pos) { |
| LOG(DEBUG2, "%p seek file %s to %" PRId64 ", offset %" PRId64, this, path.c_str(), pos, cursor); |
| checkStatus(); |
| |
| try { |
| seekInternal(pos); |
| } catch (...) { |
| lastError = current_exception(); |
| throw; |
| } |
| } |
| |
| void InputStreamImpl::seekInternal(int64_t pos) { |
| if (cursor == pos) { |
| return; |
| } |
| |
| if (!lbs || pos > getFileLength()) { |
| updateBlockInfos(); |
| |
| if (pos > getFileLength()) { |
| THROW(HdfsEndOfStream, |
| "InputStreamImpl: seek over EOF, current position: %" PRId64 ", seek target: %" PRId64 ", in file: %s", |
| cursor, pos, path.c_str()); |
| } |
| } |
| |
| try { |
| if (blockReader && pos > cursor && pos < endOfCurBlock) { |
| blockReader->skip(pos - cursor); |
| cursor = pos; |
| return; |
| } |
| } catch (const HdfsIOException & e) { |
| std::string buffer; |
| LOG(LOG_ERROR, "InputStreamImpl: failed to skip %" PRId64 " bytes in current block reader for file %s\n%s", |
| pos - cursor, path.c_str(), GetExceptionDetail(e, buffer)); |
| LOG(INFO, "InputStreamImpl: retry to seek to position %" PRId64 " for file %s", pos, path.c_str()); |
| } catch (const ChecksumException & e) { |
| std::string buffer; |
| LOG(LOG_ERROR, "InputStreamImpl: failed to skip %" PRId64 " bytes in current block reader for file %s\n%s", |
| pos - cursor, path.c_str(), GetExceptionDetail(e, buffer)); |
| LOG(INFO, "InputStreamImpl: retry to seek to position %" PRId64 " for file %s", pos, path.c_str()); |
| } |
| |
| /** |
| * the seek target exceed the current block or skip failed in current block reader. |
| * reset current block reader and set the cursor to the target position to seek. |
| */ |
| endOfCurBlock = 0; |
| blockReader.reset(); |
| cursor = pos; |
| } |
| |
| /** |
| * To get the current file point position. |
| * @return the position of current file point. |
| */ |
| int64_t InputStreamImpl::tell() { |
| checkStatus(); |
| LOG(DEBUG2, "tell file %s at %" PRId64, path.c_str(), cursor); |
| return cursor; |
| } |
| |
| /** |
| * Close the stream. |
| */ |
| void InputStreamImpl::close() { |
| LOG(DEBUG2, "%p close file %s for read", this, path.c_str()); |
| closed = true; |
| localRead = true; |
| readFromUnderConstructedBlock = false; |
| verify = true; |
| filesystem.reset(); |
| cursor = 0; |
| endOfCurBlock = 0; |
| lastBlockBeingWrittenLength = 0; |
| prefetchSize = 0; |
| blockReader.reset(); |
| curBlock.reset(); |
| lbs.reset(); |
| conf.reset(); |
| failedNodes.clear(); |
| path.clear(); |
| localReaderBuffer.resize(0); |
| lastError = exception_ptr(); |
| } |
| |
| std::string InputStreamImpl::toString() { |
| if (path.empty()) { |
| return std::string("InputStream for path ") + path; |
| } else { |
| return std::string("InputStream (not opened)"); |
| } |
| } |
| |
| } |
| } |