blob: be8672432943f27553ff532559af520d15af1ac9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "orc/OrcFile.hh"
#include "Adaptor.hh"
#include "Utils.hh"
#include "orc/Exceptions.hh"
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <string.h>
#include <sys/stat.h>
#ifdef _MSC_VER
#include <io.h>
#define S_IRUSR _S_IREAD
#define S_IWUSR _S_IWRITE
#define stat _stat64
#define fstat _fstat64
#define fsync _commit
#else
#include <unistd.h>
#define O_BINARY 0
#endif
namespace orc {
DIAGNOSTIC_PUSH
#ifdef __clang__
DIAGNOSTIC_IGNORE("-Wunused-private-field")
#endif
class FileInputStream : public InputStream {
private:
std::string filename_;
int file_;
uint64_t totalLength_;
ReaderMetrics* metrics_;
public:
FileInputStream(std::string filename, ReaderMetrics* metrics)
: filename_(filename), metrics_(metrics) {
file_ = open(filename_.c_str(), O_BINARY | O_RDONLY);
if (file_ == -1) {
throw ParseError("Can't open " + filename_);
}
struct stat fileStat;
if (fstat(file_, &fileStat) == -1) {
throw ParseError("Can't stat " + filename_);
}
totalLength_ = static_cast<uint64_t>(fileStat.st_size);
}
~FileInputStream() override;
uint64_t getLength() const override {
return totalLength_;
}
uint64_t getNaturalReadSize() const override {
return 128 * 1024;
}
void read(void* buf, uint64_t length, uint64_t offset) override {
SCOPED_STOPWATCH(metrics_, IOBlockingLatencyUs, IOCount);
if (!buf) {
throw ParseError("Buffer is null");
}
ssize_t bytesRead = pread(file_, buf, length, static_cast<off_t>(offset));
if (bytesRead == -1) {
throw ParseError("Bad read of " + filename_);
}
if (static_cast<uint64_t>(bytesRead) != length) {
throw ParseError("Short read of " + filename_);
}
}
const std::string& getName() const override {
return filename_;
}
};
FileInputStream::~FileInputStream() {
close(file_);
}
std::unique_ptr<InputStream> readFile(const std::string& path, ReaderMetrics* metrics) {
#ifdef BUILD_LIBHDFSPP
if (strncmp(path.c_str(), "hdfs://", 7) == 0) {
return orc::readHdfsFile(std::string(path), metrics);
} else {
#endif
return orc::readLocalFile(std::string(path), metrics);
#ifdef BUILD_LIBHDFSPP
}
#endif
}
DIAGNOSTIC_POP
std::unique_ptr<InputStream> readLocalFile(const std::string& path, ReaderMetrics* metrics) {
return std::make_unique<FileInputStream>(path, metrics);
}
OutputStream::~OutputStream(){
// PASS
};
class FileOutputStream : public OutputStream {
private:
std::string filename_;
int file_;
uint64_t bytesWritten_;
bool closed_;
public:
FileOutputStream(std::string filename) {
bytesWritten_ = 0;
filename_ = filename;
closed_ = false;
file_ = open(filename_.c_str(), O_BINARY | O_CREAT | O_WRONLY | O_TRUNC, S_IRUSR | S_IWUSR);
if (file_ == -1) {
throw ParseError("Can't open " + filename_);
}
}
~FileOutputStream() override;
uint64_t getLength() const override {
return bytesWritten_;
}
uint64_t getNaturalWriteSize() const override {
return 128 * 1024;
}
void write(const void* buf, size_t length) override {
if (closed_) {
throw std::logic_error("Cannot write to closed stream.");
}
ssize_t bytesWrite = ::write(file_, buf, length);
if (bytesWrite == -1) {
throw ParseError("Bad write of " + filename_);
}
if (static_cast<uint64_t>(bytesWrite) != length) {
throw ParseError("Short write of " + filename_);
}
bytesWritten_ += static_cast<uint64_t>(bytesWrite);
}
const std::string& getName() const override {
return filename_;
}
void close() override {
if (!closed_) {
::close(file_);
closed_ = true;
}
}
void flush() override {
if (!closed_) {
::fsync(file_);
}
}
};
FileOutputStream::~FileOutputStream() {
if (!closed_) {
::close(file_);
closed_ = true;
}
}
std::unique_ptr<OutputStream> writeLocalFile(const std::string& path) {
return std::make_unique<FileOutputStream>(path);
}
} // namespace orc