blob: 726e2a72d59f3968c0c8db3d98a72aa7f3c31e4e [file] [log] [blame]
#include "util/stream/rewindable_stream.hh"
#include "util/pcqueue.hh"
#include <iostream>
namespace util {
namespace stream {
RewindableStream::RewindableStream()
: current_(NULL), in_(NULL), out_(NULL), poisoned_(true) {
// nothing
}
void RewindableStream::Init(const ChainPosition &position) {
UTIL_THROW_IF2(in_, "RewindableStream::Init twice");
in_ = position.in_;
out_ = position.out_;
hit_poison_ = false;
poisoned_ = false;
progress_ = position.progress_;
entry_size_ = position.GetChain().EntrySize();
block_size_ = position.GetChain().BlockSize();
block_count_ = position.GetChain().BlockCount();
blocks_it_ = 0;
marked_ = NULL;
UTIL_THROW_IF2(block_count_ < 2, "RewindableStream needs block_count at least two");
AppendBlock();
}
RewindableStream &RewindableStream::operator++() {
assert(*this);
assert(current_ < block_end_);
assert(current_);
assert(blocks_it_ < blocks_.size());
current_ += entry_size_;
if (UTIL_UNLIKELY(current_ == block_end_)) {
// Fetch another block if necessary.
if (++blocks_it_ == blocks_.size()) {
if (!marked_) {
Flush(blocks_.begin() + blocks_it_);
blocks_it_ = 0;
}
AppendBlock();
assert(poisoned_ || (blocks_it_ == blocks_.size() - 1));
if (poisoned_) return *this;
}
Block &cur_block = blocks_[blocks_it_];
current_ = static_cast<uint8_t*>(cur_block.Get());
block_end_ = current_ + cur_block.ValidSize();
}
assert(current_);
assert(current_ >= static_cast<uint8_t*>(blocks_[blocks_it_].Get()));
assert(current_ < block_end_);
assert(block_end_ == blocks_[blocks_it_].ValidEnd());
return *this;
}
void RewindableStream::Mark() {
marked_ = current_;
Flush(blocks_.begin() + blocks_it_);
blocks_it_ = 0;
}
void RewindableStream::Rewind() {
if (current_ != marked_) {
poisoned_ = false;
}
blocks_it_ = 0;
current_ = marked_;
block_end_ = static_cast<const uint8_t*>(blocks_[blocks_it_].ValidEnd());
assert(current_);
assert(current_ >= static_cast<uint8_t*>(blocks_[blocks_it_].Get()));
assert(current_ < block_end_);
assert(block_end_ == blocks_[blocks_it_].ValidEnd());
}
void RewindableStream::Poison() {
if (blocks_.empty()) return;
assert(*this);
assert(blocks_it_ == blocks_.size() - 1);
// Produce all buffered blocks.
blocks_.back().SetValidSize(current_ - static_cast<uint8_t*>(blocks_.back().Get()));
Flush(blocks_.end());
blocks_it_ = 0;
Block poison;
if (!hit_poison_) {
in_->Consume(poison);
}
poison.SetToPoison();
out_->Produce(poison);
hit_poison_ = true;
poisoned_ = true;
}
void RewindableStream::AppendBlock() {
if (UTIL_UNLIKELY(blocks_.size() >= block_count_)) {
std::cerr << "RewindableStream trying to use more blocks than available" << std::endl;
abort();
}
if (UTIL_UNLIKELY(hit_poison_)) {
poisoned_ = true;
return;
}
Block get;
// The loop is needed since it is *feasible* that we're given 0 sized but
// valid blocks
do {
in_->Consume(get);
if (UTIL_LIKELY(get)) {
blocks_.push_back(get);
} else {
hit_poison_ = true;
poisoned_ = true;
return;
}
} while (UTIL_UNLIKELY(get.ValidSize() == 0));
current_ = static_cast<uint8_t*>(blocks_.back().Get());
block_end_ = static_cast<const uint8_t*>(blocks_.back().ValidEnd());
blocks_it_ = blocks_.size() - 1;
}
void RewindableStream::Flush(std::deque<Block>::iterator to) {
for (std::deque<Block>::iterator i = blocks_.begin(); i != to; ++i) {
out_->Produce(*i);
progress_ += i->ValidSize();
}
blocks_.erase(blocks_.begin(), to);
}
}
}