blob: d4ba61834a9a14bfb02674f5b6e51d36cba9d91b [file]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "InputStream.hh"
#include "orc/Exceptions.hh"
#include <algorithm>
#include <iomanip>
namespace orc {
void printBuffer(std::ostream& out, const char* buffer, uint64_t length) {
const uint64_t width = 24;
out << std::hex;
for (uint64_t line = 0; line < (length + width - 1) / width; ++line) {
out << std::setfill('0') << std::setw(7) << (line * width);
for (uint64_t byte = 0; byte < width && line * width + byte < length; ++byte) {
out << " " << std::setfill('0') << std::setw(2)
<< static_cast<uint64_t>(0xff & buffer[line * width + byte]);
}
out << "\n";
}
out << std::dec;
}
PositionProvider::PositionProvider(const std::list<uint64_t>& posns) {
position_ = posns.begin();
}
uint64_t PositionProvider::next() {
uint64_t result = *position_;
++position_;
return result;
}
uint64_t PositionProvider::current() {
return *position_;
}
SeekableInputStream::~SeekableInputStream() {
// PASS
}
SeekableArrayInputStream::~SeekableArrayInputStream() {
// PASS
}
SeekableArrayInputStream::SeekableArrayInputStream(const unsigned char* values, uint64_t size,
uint64_t blkSize)
: data_(reinterpret_cast<const char*>(values)) {
length_ = size;
position_ = 0;
blockSize_ = blkSize == 0 ? length_ : static_cast<uint64_t>(blkSize);
}
SeekableArrayInputStream::SeekableArrayInputStream(const char* values, uint64_t size,
uint64_t blkSize)
: data_(values) {
length_ = size;
position_ = 0;
blockSize_ = blkSize == 0 ? length_ : static_cast<uint64_t>(blkSize);
}
bool SeekableArrayInputStream::Next(const void** buffer, int* size) {
uint64_t currentSize = std::min(length_ - position_, blockSize_);
if (currentSize > 0) {
*buffer = data_ + position_;
*size = static_cast<int>(currentSize);
position_ += currentSize;
return true;
}
*size = 0;
return false;
}
void SeekableArrayInputStream::BackUp(int count) {
if (count >= 0) {
uint64_t unsignedCount = static_cast<uint64_t>(count);
if (unsignedCount <= blockSize_ && unsignedCount <= position_) {
position_ -= unsignedCount;
} else {
throw std::logic_error("Can't backup that much!");
}
}
}
bool SeekableArrayInputStream::Skip(int count) {
if (count >= 0) {
uint64_t unsignedCount = static_cast<uint64_t>(count);
if (unsignedCount <= length_ - position_) {
position_ += unsignedCount;
return true;
} else {
position_ = length_;
}
}
return false;
}
int64_t SeekableArrayInputStream::ByteCount() const {
return static_cast<int64_t>(position_);
}
void SeekableArrayInputStream::seek(PositionProvider& seekPosition) {
position_ = seekPosition.next();
}
std::string SeekableArrayInputStream::getName() const {
std::ostringstream result;
result << "SeekableArrayInputStream " << position_ << " of " << length_;
return result.str();
}
static uint64_t computeBlock(uint64_t request, uint64_t length) {
return std::min(length, request == 0 ? 256 * 1024 : request);
}
SeekableFileInputStream::SeekableFileInputStream(InputStream* stream, uint64_t offset,
uint64_t byteCount, MemoryPool& pool,
uint64_t blockSize)
: pool_(pool),
input_(stream),
start_(offset),
length_(byteCount),
blockSize_(computeBlock(blockSize, length_)) {
position_ = 0;
buffer_.reset(new DataBuffer<char>(pool_));
pushBack_ = 0;
}
SeekableFileInputStream::~SeekableFileInputStream() {
// PASS
}
bool SeekableFileInputStream::Next(const void** data, int* size) {
uint64_t bytesRead;
if (pushBack_ != 0) {
*data = buffer_->data() + (buffer_->size() - pushBack_);
bytesRead = pushBack_;
} else {
bytesRead = std::min(length_ - position_, blockSize_);
buffer_->resize(bytesRead);
if (bytesRead > 0) {
input_->read(buffer_->data(), bytesRead, start_ + position_);
*data = static_cast<void*>(buffer_->data());
}
}
position_ += bytesRead;
pushBack_ = 0;
*size = static_cast<int>(bytesRead);
return bytesRead != 0;
}
void SeekableFileInputStream::BackUp(int signedCount) {
if (signedCount < 0) {
throw std::logic_error("can't backup negative distances");
}
uint64_t count = static_cast<uint64_t>(signedCount);
if (pushBack_ > 0) {
throw std::logic_error("can't backup unless we just called Next");
}
if (count > blockSize_ || count > position_) {
throw std::logic_error("can't backup that far");
}
pushBack_ = static_cast<uint64_t>(count);
position_ -= pushBack_;
}
bool SeekableFileInputStream::Skip(int signedCount) {
if (signedCount < 0) {
return false;
}
uint64_t count = static_cast<uint64_t>(signedCount);
if (count > length_ - position_) {
position_ = length_;
} else {
position_ += count;
}
pushBack_ = 0;
return position_ < length_;
}
int64_t SeekableFileInputStream::ByteCount() const {
return static_cast<int64_t>(position_);
}
void SeekableFileInputStream::seek(PositionProvider& location) {
position_ = location.next();
if (position_ > length_) {
position_ = length_;
throw std::logic_error("seek too far");
}
pushBack_ = 0;
}
std::string SeekableFileInputStream::getName() const {
std::ostringstream result;
result << input_->getName() << " from " << start_ << " for " << length_;
return result.str();
}
} // namespace orc