| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| // |
| |
| #include "rocksdb/env.h" |
| #include "hdfs/env_hdfs.h" |
| |
| #ifdef USE_HDFS |
| #ifndef ROCKSDB_HDFS_FILE_C |
| #define ROCKSDB_HDFS_FILE_C |
| |
| #include <algorithm> |
| #include <stdio.h> |
| #include <sys/time.h> |
| #include <time.h> |
| #include <iostream> |
| #include <sstream> |
| #include "rocksdb/status.h" |
| #include "util/string_util.h" |
| |
| #define HDFS_EXISTS 0 |
| #define HDFS_DOESNT_EXIST -1 |
| #define HDFS_SUCCESS 0 |
| |
| // |
| // This file defines an HDFS environment for rocksdb. It uses the libhdfs |
| // api to access HDFS. All HDFS files created by one instance of rocksdb |
| // will reside on the same HDFS cluster. |
| // |
| |
| namespace rocksdb { |
| |
| namespace { |
| |
| // Log error message |
| static Status IOError(const std::string& context, int err_number) { |
| return (err_number == ENOSPC) ? |
| Status::NoSpace(context, strerror(err_number)) : |
| Status::IOError(context, strerror(err_number)); |
| } |
| |
| // assume that there is one global logger for now. It is not thread-safe, |
| // but need not be because the logger is initialized at db-open time. |
| static Logger* mylog = nullptr; |
| |
| // Used for reading a file from HDFS. It implements both sequential-read |
| // access methods as well as random read access methods. |
| class HdfsReadableFile : virtual public SequentialFile, |
| virtual public RandomAccessFile { |
| private: |
| hdfsFS fileSys_; |
| std::string filename_; |
| hdfsFile hfile_; |
| |
| public: |
| HdfsReadableFile(hdfsFS fileSys, const std::string& fname) |
| : fileSys_(fileSys), filename_(fname), hfile_(nullptr) { |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile opening file %s\n", |
| filename_.c_str()); |
| hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_RDONLY, 0, 0, 0); |
| ROCKS_LOG_DEBUG(mylog, |
| "[hdfs] HdfsReadableFile opened file %s hfile_=0x%p\n", |
| filename_.c_str(), hfile_); |
| } |
| |
| virtual ~HdfsReadableFile() { |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closing file %s\n", |
| filename_.c_str()); |
| hdfsCloseFile(fileSys_, hfile_); |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile closed file %s\n", |
| filename_.c_str()); |
| hfile_ = nullptr; |
| } |
| |
| bool isValid() { |
| return hfile_ != nullptr; |
| } |
| |
| // sequential access, read data at current offset in file |
| virtual Status Read(size_t n, Slice* result, char* scratch) { |
| Status s; |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile reading %s %ld\n", |
| filename_.c_str(), n); |
| |
| char* buffer = scratch; |
| size_t total_bytes_read = 0; |
| tSize bytes_read = 0; |
| tSize remaining_bytes = (tSize)n; |
| |
| // Read a total of n bytes repeatedly until we hit error or eof |
| while (remaining_bytes > 0) { |
| bytes_read = hdfsRead(fileSys_, hfile_, buffer, remaining_bytes); |
| if (bytes_read <= 0) { |
| break; |
| } |
| assert(bytes_read <= remaining_bytes); |
| |
| total_bytes_read += bytes_read; |
| remaining_bytes -= bytes_read; |
| buffer += bytes_read; |
| } |
| assert(total_bytes_read <= n); |
| |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile read %s\n", |
| filename_.c_str()); |
| |
| if (bytes_read < 0) { |
| s = IOError(filename_, errno); |
| } else { |
| *result = Slice(scratch, total_bytes_read); |
| } |
| |
| return s; |
| } |
| |
| // random access, read data from specified offset in file |
| virtual Status Read(uint64_t offset, size_t n, Slice* result, |
| char* scratch) const { |
| Status s; |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile preading %s\n", |
| filename_.c_str()); |
| ssize_t bytes_read = hdfsPread(fileSys_, hfile_, offset, |
| (void*)scratch, (tSize)n); |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile pread %s\n", |
| filename_.c_str()); |
| *result = Slice(scratch, (bytes_read < 0) ? 0 : bytes_read); |
| if (bytes_read < 0) { |
| // An error: return a non-ok status |
| s = IOError(filename_, errno); |
| } |
| return s; |
| } |
| |
| virtual Status Skip(uint64_t n) { |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile skip %s\n", |
| filename_.c_str()); |
| // get current offset from file |
| tOffset current = hdfsTell(fileSys_, hfile_); |
| if (current < 0) { |
| return IOError(filename_, errno); |
| } |
| // seek to new offset in file |
| tOffset newoffset = current + n; |
| int val = hdfsSeek(fileSys_, hfile_, newoffset); |
| if (val < 0) { |
| return IOError(filename_, errno); |
| } |
| return Status::OK(); |
| } |
| |
| private: |
| |
| // returns true if we are at the end of file, false otherwise |
| bool feof() { |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile feof %s\n", |
| filename_.c_str()); |
| if (hdfsTell(fileSys_, hfile_) == fileSize()) { |
| return true; |
| } |
| return false; |
| } |
| |
| // the current size of the file |
| tOffset fileSize() { |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsReadableFile fileSize %s\n", |
| filename_.c_str()); |
| hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, filename_.c_str()); |
| tOffset size = 0L; |
| if (pFileInfo != nullptr) { |
| size = pFileInfo->mSize; |
| hdfsFreeFileInfo(pFileInfo, 1); |
| } else { |
| throw HdfsFatalException("fileSize on unknown file " + filename_); |
| } |
| return size; |
| } |
| }; |
| |
| // Appends to an existing file in HDFS. |
| class HdfsWritableFile: public WritableFile { |
| private: |
| hdfsFS fileSys_; |
| std::string filename_; |
| hdfsFile hfile_; |
| |
| public: |
| HdfsWritableFile(hdfsFS fileSys, const std::string& fname) |
| : fileSys_(fileSys), filename_(fname) , hfile_(nullptr) { |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opening %s\n", |
| filename_.c_str()); |
| hfile_ = hdfsOpenFile(fileSys_, filename_.c_str(), O_WRONLY, 0, 0, 0); |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile opened %s\n", |
| filename_.c_str()); |
| assert(hfile_ != nullptr); |
| } |
| virtual ~HdfsWritableFile() { |
| if (hfile_ != nullptr) { |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n", |
| filename_.c_str()); |
| hdfsCloseFile(fileSys_, hfile_); |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n", |
| filename_.c_str()); |
| hfile_ = nullptr; |
| } |
| } |
| |
| // If the file was successfully created, then this returns true. |
| // Otherwise returns false. |
| bool isValid() { |
| return hfile_ != nullptr; |
| } |
| |
| // The name of the file, mostly needed for debug logging. |
| const std::string& getName() { |
| return filename_; |
| } |
| |
| virtual Status Append(const Slice& data) { |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Append %s\n", |
| filename_.c_str()); |
| const char* src = data.data(); |
| size_t left = data.size(); |
| size_t ret = hdfsWrite(fileSys_, hfile_, src, left); |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Appended %s\n", |
| filename_.c_str()); |
| if (ret != left) { |
| return IOError(filename_, errno); |
| } |
| return Status::OK(); |
| } |
| |
| virtual Status Flush() { |
| return Status::OK(); |
| } |
| |
| virtual Status Sync() { |
| Status s; |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Sync %s\n", |
| filename_.c_str()); |
| if (hdfsFlush(fileSys_, hfile_) == -1) { |
| return IOError(filename_, errno); |
| } |
| if (hdfsHSync(fileSys_, hfile_) == -1) { |
| return IOError(filename_, errno); |
| } |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile Synced %s\n", |
| filename_.c_str()); |
| return Status::OK(); |
| } |
| |
| // This is used by HdfsLogger to write data to the debug log file |
| virtual Status Append(const char* src, size_t size) { |
| if (hdfsWrite(fileSys_, hfile_, src, size) != (tSize)size) { |
| return IOError(filename_, errno); |
| } |
| return Status::OK(); |
| } |
| |
| virtual Status Close() { |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closing %s\n", |
| filename_.c_str()); |
| if (hdfsCloseFile(fileSys_, hfile_) != 0) { |
| return IOError(filename_, errno); |
| } |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsWritableFile closed %s\n", |
| filename_.c_str()); |
| hfile_ = nullptr; |
| return Status::OK(); |
| } |
| }; |
| |
| // The object that implements the debug logs to reside in HDFS. |
| class HdfsLogger : public Logger { |
| private: |
| HdfsWritableFile* file_; |
| uint64_t (*gettid_)(); // Return the thread id for the current thread |
| |
| public: |
| HdfsLogger(HdfsWritableFile* f, uint64_t (*gettid)()) |
| : file_(f), gettid_(gettid) { |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger opened %s\n", |
| file_->getName().c_str()); |
| } |
| |
| virtual ~HdfsLogger() { |
| ROCKS_LOG_DEBUG(mylog, "[hdfs] HdfsLogger closed %s\n", |
| file_->getName().c_str()); |
| delete file_; |
| if (mylog != nullptr && mylog == this) { |
| mylog = nullptr; |
| } |
| } |
| |
| virtual void Logv(const char* format, va_list ap) { |
| const uint64_t thread_id = (*gettid_)(); |
| |
| // We try twice: the first time with a fixed-size stack allocated buffer, |
| // and the second time with a much larger dynamically allocated buffer. |
| char buffer[500]; |
| for (int iter = 0; iter < 2; iter++) { |
| char* base; |
| int bufsize; |
| if (iter == 0) { |
| bufsize = sizeof(buffer); |
| base = buffer; |
| } else { |
| bufsize = 30000; |
| base = new char[bufsize]; |
| } |
| char* p = base; |
| char* limit = base + bufsize; |
| |
| struct timeval now_tv; |
| gettimeofday(&now_tv, nullptr); |
| const time_t seconds = now_tv.tv_sec; |
| struct tm t; |
| localtime_r(&seconds, &t); |
| p += snprintf(p, limit - p, |
| "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ", |
| t.tm_year + 1900, |
| t.tm_mon + 1, |
| t.tm_mday, |
| t.tm_hour, |
| t.tm_min, |
| t.tm_sec, |
| static_cast<int>(now_tv.tv_usec), |
| static_cast<long long unsigned int>(thread_id)); |
| |
| // Print the message |
| if (p < limit) { |
| va_list backup_ap; |
| va_copy(backup_ap, ap); |
| p += vsnprintf(p, limit - p, format, backup_ap); |
| va_end(backup_ap); |
| } |
| |
| // Truncate to available space if necessary |
| if (p >= limit) { |
| if (iter == 0) { |
| continue; // Try again with larger buffer |
| } else { |
| p = limit - 1; |
| } |
| } |
| |
| // Add newline if necessary |
| if (p == base || p[-1] != '\n') { |
| *p++ = '\n'; |
| } |
| |
| assert(p <= limit); |
| file_->Append(base, p-base); |
| file_->Flush(); |
| if (base != buffer) { |
| delete[] base; |
| } |
| break; |
| } |
| } |
| }; |
| |
| } // namespace |
| |
| // Finally, the hdfs environment |
| |
| const std::string HdfsEnv::kProto = "hdfs://"; |
| const std::string HdfsEnv::pathsep = "/"; |
| |
| // open a file for sequential reading |
| Status HdfsEnv::NewSequentialFile(const std::string& fname, |
| unique_ptr<SequentialFile>* result, |
| const EnvOptions& options) { |
| result->reset(); |
| HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname); |
| if (f == nullptr || !f->isValid()) { |
| delete f; |
| *result = nullptr; |
| return IOError(fname, errno); |
| } |
| result->reset(dynamic_cast<SequentialFile*>(f)); |
| return Status::OK(); |
| } |
| |
| // open a file for random reading |
| Status HdfsEnv::NewRandomAccessFile(const std::string& fname, |
| unique_ptr<RandomAccessFile>* result, |
| const EnvOptions& options) { |
| result->reset(); |
| HdfsReadableFile* f = new HdfsReadableFile(fileSys_, fname); |
| if (f == nullptr || !f->isValid()) { |
| delete f; |
| *result = nullptr; |
| return IOError(fname, errno); |
| } |
| result->reset(dynamic_cast<RandomAccessFile*>(f)); |
| return Status::OK(); |
| } |
| |
| // create a new file for writing |
| Status HdfsEnv::NewWritableFile(const std::string& fname, |
| unique_ptr<WritableFile>* result, |
| const EnvOptions& options) { |
| result->reset(); |
| Status s; |
| HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname); |
| if (f == nullptr || !f->isValid()) { |
| delete f; |
| *result = nullptr; |
| return IOError(fname, errno); |
| } |
| result->reset(dynamic_cast<WritableFile*>(f)); |
| return Status::OK(); |
| } |
| |
| class HdfsDirectory : public Directory { |
| public: |
| explicit HdfsDirectory(int fd) : fd_(fd) {} |
| ~HdfsDirectory() {} |
| |
| virtual Status Fsync() { return Status::OK(); } |
| |
| private: |
| int fd_; |
| }; |
| |
| Status HdfsEnv::NewDirectory(const std::string& name, |
| unique_ptr<Directory>* result) { |
| int value = hdfsExists(fileSys_, name.c_str()); |
| switch (value) { |
| case HDFS_EXISTS: |
| result->reset(new HdfsDirectory(0)); |
| return Status::OK(); |
| default: // fail if the directory doesn't exist |
| ROCKS_LOG_FATAL(mylog, "NewDirectory hdfsExists call failed"); |
| throw HdfsFatalException("hdfsExists call failed with error " + |
| ToString(value) + " on path " + name + |
| ".\n"); |
| } |
| } |
| |
| Status HdfsEnv::FileExists(const std::string& fname) { |
| int value = hdfsExists(fileSys_, fname.c_str()); |
| switch (value) { |
| case HDFS_EXISTS: |
| return Status::OK(); |
| case HDFS_DOESNT_EXIST: |
| return Status::NotFound(); |
| default: // anything else should be an error |
| ROCKS_LOG_FATAL(mylog, "FileExists hdfsExists call failed"); |
| return Status::IOError("hdfsExists call failed with error " + |
| ToString(value) + " on path " + fname + ".\n"); |
| } |
| } |
| |
| Status HdfsEnv::GetChildren(const std::string& path, |
| std::vector<std::string>* result) { |
| int value = hdfsExists(fileSys_, path.c_str()); |
| switch (value) { |
| case HDFS_EXISTS: { // directory exists |
| int numEntries = 0; |
| hdfsFileInfo* pHdfsFileInfo = 0; |
| pHdfsFileInfo = hdfsListDirectory(fileSys_, path.c_str(), &numEntries); |
| if (numEntries >= 0) { |
| for(int i = 0; i < numEntries; i++) { |
| char* pathname = pHdfsFileInfo[i].mName; |
| char* filename = std::rindex(pathname, '/'); |
| if (filename != nullptr) { |
| result->push_back(filename+1); |
| } |
| } |
| if (pHdfsFileInfo != nullptr) { |
| hdfsFreeFileInfo(pHdfsFileInfo, numEntries); |
| } |
| } else { |
| // numEntries < 0 indicates error |
| ROCKS_LOG_FATAL(mylog, "hdfsListDirectory call failed with error "); |
| throw HdfsFatalException( |
| "hdfsListDirectory call failed negative error.\n"); |
| } |
| break; |
| } |
| case HDFS_DOESNT_EXIST: // directory does not exist, exit |
| return Status::NotFound(); |
| default: // anything else should be an error |
| ROCKS_LOG_FATAL(mylog, "GetChildren hdfsExists call failed"); |
| throw HdfsFatalException("hdfsExists call failed with error " + |
| ToString(value) + ".\n"); |
| } |
| return Status::OK(); |
| } |
| |
| Status HdfsEnv::DeleteFile(const std::string& fname) { |
| if (hdfsDelete(fileSys_, fname.c_str(), 1) == 0) { |
| return Status::OK(); |
| } |
| return IOError(fname, errno); |
| }; |
| |
| Status HdfsEnv::CreateDir(const std::string& name) { |
| if (hdfsCreateDirectory(fileSys_, name.c_str()) == 0) { |
| return Status::OK(); |
| } |
| return IOError(name, errno); |
| }; |
| |
| Status HdfsEnv::CreateDirIfMissing(const std::string& name) { |
| const int value = hdfsExists(fileSys_, name.c_str()); |
| // Not atomic. state might change b/w hdfsExists and CreateDir. |
| switch (value) { |
| case HDFS_EXISTS: |
| return Status::OK(); |
| case HDFS_DOESNT_EXIST: |
| return CreateDir(name); |
| default: // anything else should be an error |
| ROCKS_LOG_FATAL(mylog, "CreateDirIfMissing hdfsExists call failed"); |
| throw HdfsFatalException("hdfsExists call failed with error " + |
| ToString(value) + ".\n"); |
| } |
| }; |
| |
| Status HdfsEnv::DeleteDir(const std::string& name) { |
| return DeleteFile(name); |
| }; |
| |
| Status HdfsEnv::GetFileSize(const std::string& fname, uint64_t* size) { |
| *size = 0L; |
| hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str()); |
| if (pFileInfo != nullptr) { |
| *size = pFileInfo->mSize; |
| hdfsFreeFileInfo(pFileInfo, 1); |
| return Status::OK(); |
| } |
| return IOError(fname, errno); |
| } |
| |
| Status HdfsEnv::GetFileModificationTime(const std::string& fname, |
| uint64_t* time) { |
| hdfsFileInfo* pFileInfo = hdfsGetPathInfo(fileSys_, fname.c_str()); |
| if (pFileInfo != nullptr) { |
| *time = static_cast<uint64_t>(pFileInfo->mLastMod); |
| hdfsFreeFileInfo(pFileInfo, 1); |
| return Status::OK(); |
| } |
| return IOError(fname, errno); |
| |
| } |
| |
| // The rename is not atomic. HDFS does not allow a renaming if the |
| // target already exists. So, we delete the target before attempting the |
| // rename. |
| Status HdfsEnv::RenameFile(const std::string& src, const std::string& target) { |
| hdfsDelete(fileSys_, target.c_str(), 1); |
| if (hdfsRename(fileSys_, src.c_str(), target.c_str()) == 0) { |
| return Status::OK(); |
| } |
| return IOError(src, errno); |
| } |
| |
| Status HdfsEnv::LockFile(const std::string& fname, FileLock** lock) { |
| // there isn's a very good way to atomically check and create |
| // a file via libhdfs |
| *lock = nullptr; |
| return Status::OK(); |
| } |
| |
| Status HdfsEnv::UnlockFile(FileLock* lock) { |
| return Status::OK(); |
| } |
| |
| Status HdfsEnv::NewLogger(const std::string& fname, |
| shared_ptr<Logger>* result) { |
| HdfsWritableFile* f = new HdfsWritableFile(fileSys_, fname); |
| if (f == nullptr || !f->isValid()) { |
| delete f; |
| *result = nullptr; |
| return IOError(fname, errno); |
| } |
| HdfsLogger* h = new HdfsLogger(f, &HdfsEnv::gettid); |
| result->reset(h); |
| if (mylog == nullptr) { |
| // mylog = h; // uncomment this for detailed logging |
| } |
| return Status::OK(); |
| } |
| |
| // The factory method for creating an HDFS Env |
| Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) { |
| *hdfs_env = new HdfsEnv(fsname); |
| return Status::OK(); |
| } |
| } // namespace rocksdb |
| |
| #endif // ROCKSDB_HDFS_FILE_C |
| |
| #else // USE_HDFS |
| |
| // dummy placeholders used when HDFS is not available |
| namespace rocksdb { |
| Status HdfsEnv::NewSequentialFile(const std::string& fname, |
| unique_ptr<SequentialFile>* result, |
| const EnvOptions& options) { |
| return Status::NotSupported("Not compiled with hdfs support"); |
| } |
| |
| Status NewHdfsEnv(Env** hdfs_env, const std::string& fsname) { |
| return Status::NotSupported("Not compiled with hdfs support"); |
| } |
| } |
| |
| #endif |