blob: 8ecb2a0518dbba2695f9bca97d6624d8a4c5bc65 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <fstream>
#include "Stream.hh"
#ifndef _WIN32
#include "unistd.h"
#include "fcntl.h"
#include "errno.h"
#ifndef O_BINARY
#define O_BINARY 0
#endif
#else
#include "Windows.h"
#ifdef min
#undef min
#endif
#endif
using std::unique_ptr;
using std::istream;
using std::ostream;
namespace avro {
namespace {
struct BufferCopyIn {
virtual ~BufferCopyIn() { }
virtual void seek(size_t len) = 0;
virtual bool read(uint8_t* b, size_t toRead, size_t& actual) = 0;
};
struct FileBufferCopyIn : public BufferCopyIn {
#ifdef _WIN32
HANDLE h_;
FileBufferCopyIn(const char* filename) :
h_(::CreateFileA(filename, GENERIC_READ, 0, NULL, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL)) {
if (h_ == INVALID_HANDLE_VALUE) {
throw Exception(boost::format("Cannot open file: %1%") % ::GetLastError());
}
}
~FileBufferCopyIn() {
::CloseHandle(h_);
}
void seek(size_t len) {
if (::SetFilePointer(h_, len, NULL, FILE_CURRENT) == INVALID_SET_FILE_POINTER && ::GetLastError() != NO_ERROR) {
throw Exception(boost::format("Cannot skip file: %1%") % ::GetLastError());
}
}
bool read(uint8_t* b, size_t toRead, size_t& actual) {
DWORD dw = 0;
if (! ::ReadFile(h_, b, toRead, &dw, NULL)) {
throw Exception(boost::format("Cannot read file: %1%") % ::GetLastError());
}
actual = static_cast<size_t>(dw);
return actual != 0;
}
#else
const int fd_;
FileBufferCopyIn(const char* filename) :
fd_(open(filename, O_RDONLY | O_BINARY)) {
if (fd_ < 0) {
throw Exception(boost::format("Cannot open file: %1%") %
::strerror(errno));
}
}
~FileBufferCopyIn() {
::close(fd_);
}
void seek(size_t len) {
off_t r = ::lseek(fd_, len, SEEK_CUR);
if (r == static_cast<off_t>(-1)) {
throw Exception(boost::format("Cannot skip file: %1%") %
strerror(errno));
}
}
bool read(uint8_t* b, size_t toRead, size_t& actual) {
int n = ::read(fd_, b, toRead);
if (n > 0) {
actual = n;
return true;
}
return false;
}
#endif
};
struct IStreamBufferCopyIn : public BufferCopyIn {
istream& is_;
IStreamBufferCopyIn(istream& is) : is_(is) {
}
void seek(size_t len) {
if (! is_.seekg(len, std::ios_base::cur)) {
throw Exception("Cannot skip stream");
}
}
bool read(uint8_t* b, size_t toRead, size_t& actual) {
is_.read(reinterpret_cast<char*>(b), toRead);
if (is_.bad()) {
return false;
}
actual = static_cast<size_t>(is_.gcount());
return (! is_.eof() || actual != 0);
}
};
struct NonSeekableIStreamBufferCopyIn : public IStreamBufferCopyIn {
NonSeekableIStreamBufferCopyIn(istream& is) : IStreamBufferCopyIn(is) { }
void seek(size_t len) {
const size_t bufSize = 4096;
uint8_t buf[bufSize];
while (len > 0) {
size_t n = std::min(len, bufSize);
is_.read(reinterpret_cast<char*>(buf), n);
if (is_.bad()) {
throw Exception("Cannot skip stream");
}
size_t actual = static_cast<size_t>(is_.gcount());
if (is_.eof() && actual == 0) {
throw Exception("Cannot skip stream");
}
len -= n;
}
}
};
}
class BufferCopyInInputStream : public SeekableInputStream {
const size_t bufferSize_;
uint8_t* const buffer_;
unique_ptr<BufferCopyIn> in_;
size_t byteCount_;
uint8_t* next_;
size_t available_;
bool next(const uint8_t** data, size_t *size) {
if (available_ == 0 && ! fill()) {
return false;
}
*data = next_;
*size = available_;
next_ += available_;
byteCount_ += available_;
available_ = 0;
return true;
}
void backup(size_t len) {
next_ -= len;
available_ += len;
byteCount_ -= len;
}
void skip(size_t len) {
while (len > 0) {
if (available_ == 0) {
in_->seek(len);
byteCount_ += len;
return;
}
size_t n = std::min(available_, len);
available_ -= n;
next_ += n;
len -= n;
byteCount_ += n;
}
}
size_t byteCount() const { return byteCount_; }
bool fill() {
size_t n = 0;
if (in_->read(buffer_, bufferSize_, n)) {
next_ = buffer_;
available_ = n;
return true;
}
return false;
}
void seek(int64_t position) {
// BufferCopyIn::seek is relative to byteCount_, whereas position is
// absolute.
in_->seek(position - byteCount_ - available_);
byteCount_ = position;
available_ = 0;
}
public:
BufferCopyInInputStream(unique_ptr<BufferCopyIn> in, size_t bufferSize) :
bufferSize_(bufferSize),
buffer_(new uint8_t[bufferSize]),
in_(std::move(in)),
byteCount_(0),
next_(buffer_),
available_(0) { }
~BufferCopyInInputStream() {
delete[] buffer_;
}
};
namespace {
struct BufferCopyOut {
virtual ~BufferCopyOut() { }
virtual void write(const uint8_t* b, size_t len) = 0;
};
struct FileBufferCopyOut : public BufferCopyOut {
#ifdef _WIN32
HANDLE h_;
FileBufferCopyOut(const char* filename) :
h_(::CreateFileA(filename, GENERIC_WRITE, 0, NULL, CREATE_ALWAYS, FILE_ATTRIBUTE_NORMAL, NULL)) {
if (h_ == INVALID_HANDLE_VALUE) {
throw Exception(boost::format("Cannot open file: %1%") % ::GetLastError());
}
}
~FileBufferCopyOut() {
::CloseHandle(h_);
}
void write(const uint8_t* b, size_t len) {
while (len > 0) {
DWORD dw = 0;
if (! ::WriteFile(h_, b, len, &dw, NULL)) {
throw Exception(boost::format("Cannot read file: %1%") % ::GetLastError());
}
b += dw;
len -= dw;
}
}
#else
const int fd_;
FileBufferCopyOut(const char* filename) :
fd_(::open(filename, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, 0644)) {
if (fd_ < 0) {
throw Exception(boost::format("Cannot open file: %1%") %
::strerror(errno));
}
}
~FileBufferCopyOut() {
::close(fd_);
}
void write(const uint8_t* b, size_t len) {
if (::write(fd_, b, len) < 0) {
throw Exception(boost::format("Cannot write file: %1%") %
::strerror(errno));
}
}
#endif
};
struct OStreamBufferCopyOut : public BufferCopyOut {
ostream& os_;
OStreamBufferCopyOut(ostream& os) : os_(os) {
}
void write(const uint8_t* b, size_t len) {
os_.write(reinterpret_cast<const char*>(b), len);
}
};
}
class BufferCopyOutputStream : public OutputStream {
size_t bufferSize_;
uint8_t* const buffer_;
unique_ptr<BufferCopyOut> out_;
uint8_t* next_;
size_t available_;
size_t byteCount_;
// Invaiant: byteCount_ == byteswritten + bufferSize_ - available_;
bool next(uint8_t** data, size_t* len) {
if (available_ == 0) {
flush();
}
*data = next_;
*len = available_;
next_ += available_;
byteCount_ += available_;
available_ = 0;
return true;
}
void backup(size_t len) {
available_ += len;
next_ -= len;
byteCount_ -= len;
}
uint64_t byteCount() const {
return byteCount_;
}
void flush() {
out_->write(buffer_, bufferSize_ - available_);
next_ = buffer_;
available_ = bufferSize_;
}
public:
BufferCopyOutputStream(unique_ptr<BufferCopyOut> out, size_t bufferSize) :
bufferSize_(bufferSize),
buffer_(new uint8_t[bufferSize]),
out_(std::move(out)),
next_(buffer_),
available_(bufferSize_), byteCount_(0) { }
~BufferCopyOutputStream() {
delete[] buffer_;
}
};
unique_ptr<InputStream> fileInputStream(const char* filename,
size_t bufferSize)
{
unique_ptr<BufferCopyIn> in(new FileBufferCopyIn(filename));
return unique_ptr<InputStream>( new BufferCopyInInputStream(std::move(in), bufferSize));
}
unique_ptr<SeekableInputStream> fileSeekableInputStream(const char* filename,
size_t bufferSize)
{
unique_ptr<BufferCopyIn> in(new FileBufferCopyIn(filename));
return unique_ptr<SeekableInputStream>( new BufferCopyInInputStream(std::move(in),
bufferSize));
}
unique_ptr<InputStream> istreamInputStream(istream& is, size_t bufferSize)
{
unique_ptr<BufferCopyIn> in(new IStreamBufferCopyIn(is));
return unique_ptr<InputStream>( new BufferCopyInInputStream(std::move(in), bufferSize));
}
unique_ptr<InputStream> nonSeekableIstreamInputStream(
istream& is, size_t bufferSize)
{
unique_ptr<BufferCopyIn> in(new NonSeekableIStreamBufferCopyIn(is));
return unique_ptr<InputStream>( new BufferCopyInInputStream(std::move(in), bufferSize));
}
unique_ptr<OutputStream> fileOutputStream(const char* filename,
size_t bufferSize)
{
unique_ptr<BufferCopyOut> out(new FileBufferCopyOut(filename));
return unique_ptr<OutputStream>(new BufferCopyOutputStream(std::move(out), bufferSize));
}
unique_ptr<OutputStream> ostreamOutputStream(ostream& os,
size_t bufferSize)
{
unique_ptr<BufferCopyOut> out(new OStreamBufferCopyOut(os));
return unique_ptr<OutputStream>(new BufferCopyOutputStream(std::move(out), bufferSize));
}
} // namespace avro