| #ifndef UTIL_STREAM_CHAIN_H |
| #define UTIL_STREAM_CHAIN_H |
| |
| #include "util/stream/block.hh" |
| #include "util/stream/config.hh" |
| #include "util/stream/multi_progress.hh" |
| #include "util/scoped.hh" |
| |
| #include <boost/ptr_container/ptr_vector.hpp> |
| #include <boost/thread/thread.hpp> |
| |
| #include <cstddef> |
| #include <cassert> |
| |
| namespace util { |
| template <class T> class PCQueue; |
| namespace stream { |
| |
| class ChainConfigException : public Exception { |
| public: |
| ChainConfigException() throw(); |
| ~ChainConfigException() throw(); |
| }; |
| |
| class Chain; |
| class RewindableStream; |
| |
| /** |
| * Encapsulates a @ref PCQueue "producer queue" and a @ref PCQueue "consumer queue" within a @ref Chain "chain". |
| * |
| * Specifies position in chain for Link constructor. |
| */ |
| class ChainPosition { |
| public: |
| const Chain &GetChain() const { return *chain_; } |
| private: |
| friend class Chain; |
| friend class Link; |
| friend class RewindableStream; |
| ChainPosition(PCQueue<Block> &in, PCQueue<Block> &out, Chain *chain, MultiProgress &progress) |
| : in_(&in), out_(&out), chain_(chain), progress_(progress.Add()) {} |
| |
| PCQueue<Block> *in_, *out_; |
| |
| Chain *chain_; |
| |
| WorkerProgress progress_; |
| }; |
| |
| |
| /** |
| * Encapsulates a worker thread processing data at a given position in the chain. |
| * |
| * Each instance of this class owns one boost thread in which the worker is Run(). |
| */ |
| class Thread { |
| public: |
| |
| /** |
| * Constructs a new Thread in which the provided Worker is Run(). |
| * |
| * Position is usually ChainPosition but if there are multiple streams involved, this can be ChainPositions. |
| * |
| * After a call to this constructor, the provided worker will be running within a boost thread owned by the newly constructed Thread object. |
| */ |
| template <class Position, class Worker> Thread(const Position &position, const Worker &worker) |
| : thread_(boost::ref(*this), position, worker) {} |
| |
| ~Thread(); |
| |
| /** |
| * Launches the provided worker in this object's boost thread. |
| * |
| * This method is called automatically by this class's @ref Thread() "constructor". |
| */ |
| template <class Position, class Worker> void operator()(const Position &position, Worker &worker) { |
| // try { |
| worker.Run(position); |
| // } catch (const std::exception &e) { |
| // UnhandledException(e); |
| // } |
| } |
| |
| private: |
| void UnhandledException(const std::exception &e); |
| |
| boost::thread thread_; |
| }; |
| |
| /** |
| * This resets blocks to full valid size. Used to close the loop in Chain by recycling blocks. |
| */ |
| class Recycler { |
| public: |
| /** |
| * Resets the blocks in the chain such that the blocks' respective valid sizes match the chain's block size. |
| * |
| * @see Block::SetValidSize() |
| * @see Chain::BlockSize() |
| */ |
| void Run(const ChainPosition &position); |
| }; |
| |
| extern const Recycler kRecycle; |
| class WriteAndRecycle; |
| class PWriteAndRecycle; |
| |
| /** |
| * Represents a sequence of workers, through which @ref Block "blocks" can pass. |
| */ |
| class Chain { |
| private: |
| template <class T, void (T::*ptr)(const ChainPosition &) = &T::Run> struct CheckForRun { |
| typedef Chain type; |
| }; |
| |
| public: |
| |
| /** |
| * Constructs a configured Chain. |
| * |
| * @param config Specifies how to configure the Chain. |
| */ |
| explicit Chain(const ChainConfig &config); |
| |
| /** |
| * Destructs a Chain. |
| * |
| * This method waits for the chain's threads to complete, |
| * and frees the memory held by this chain. |
| */ |
| ~Chain(); |
| |
| void ActivateProgress() { |
| assert(!Running()); |
| progress_.Activate(); |
| } |
| |
| void SetProgressTarget(uint64_t target) { |
| progress_.SetTarget(target); |
| } |
| |
| /** |
| * Gets the number of bytes in each record of a Block. |
| * |
| * @see ChainConfig::entry_size |
| */ |
| std::size_t EntrySize() const { |
| return config_.entry_size; |
| } |
| |
| /** |
| * Gets the inital @ref Block::ValidSize "valid size" for @ref Block "blocks" in this chain. |
| * |
| * @see Block::ValidSize |
| */ |
| std::size_t BlockSize() const { |
| return block_size_; |
| } |
| |
| /** |
| * Number of blocks going through the Chain. |
| */ |
| std::size_t BlockCount() const { |
| return config_.block_count; |
| } |
| |
| /** Two ways to add to the chain: Add() or operator>>. */ |
| ChainPosition Add(); |
| |
| /** |
| * Adds a new worker to this chain, |
| * and runs that worker in a new Thread owned by this chain. |
| * |
| * The worker must have a Run method that accepts a position argument. |
| * |
| * @see Thread::operator()() |
| */ |
| template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) { |
| assert(!complete_called_); |
| threads_.push_back(new Thread(Add(), worker)); |
| return *this; |
| } |
| |
| /** |
| * Adds a new worker to this chain (but avoids copying that worker), |
| * and runs that worker in a new Thread owned by this chain. |
| * |
| * The worker must have a Run method that accepts a position argument. |
| * |
| * @see Thread::operator()() |
| */ |
| template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) { |
| assert(!complete_called_); |
| threads_.push_back(new Thread(Add(), worker)); |
| return *this; |
| } |
| |
| // Note that Link and Stream also define operator>> outside this class. |
| |
| // To complete the loop, call CompleteLoop(), >> kRecycle, or the destructor. |
| void CompleteLoop() { |
| threads_.push_back(new Thread(Complete(), kRecycle)); |
| } |
| |
| /** |
| * Adds a Recycler worker to this chain, |
| * and runs that worker in a new Thread owned by this chain. |
| */ |
| Chain &operator>>(const Recycler &) { |
| CompleteLoop(); |
| return *this; |
| } |
| |
| /** |
| * Adds a WriteAndRecycle worker to this chain, |
| * and runs that worker in a new Thread owned by this chain. |
| */ |
| Chain &operator>>(const WriteAndRecycle &writer); |
| Chain &operator>>(const PWriteAndRecycle &writer); |
| |
| // Chains are reusable. Call Wait to wait for everything to finish and free memory. |
| void Wait(bool release_memory = true); |
| |
| // Waits for the current chain to complete (if any) then starts again. |
| void Start(); |
| |
| bool Running() const { return !queues_.empty(); } |
| |
| private: |
| ChainPosition Complete(); |
| |
| ChainConfig config_; |
| |
| std::size_t block_size_; |
| |
| scoped_malloc memory_; |
| |
| boost::ptr_vector<PCQueue<Block> > queues_; |
| |
| bool complete_called_; |
| |
| boost::ptr_vector<Thread> threads_; |
| |
| MultiProgress progress_; |
| }; |
| |
| // Create the link in the worker thread using the position token. |
| /** |
| * Represents a C++ style iterator over @ref Block "blocks". |
| */ |
| class Link { |
| public: |
| |
| // Either default construct and Init or just construct all at once. |
| |
| /** |
| * Constructs an @ref Init "initialized" link. |
| * |
| * @see Init |
| */ |
| explicit Link(const ChainPosition &position); |
| |
| /** |
| * Constructs a link that must subsequently be @ref Init "initialized". |
| * |
| * @see Init |
| */ |
| Link(); |
| |
| /** |
| * Initializes the link with the input @ref PCQueue "consumer queue" and output @ref PCQueue "producer queue" at a given @ref ChainPosition "position" in the @ref Chain "chain". |
| * |
| * @see Link() |
| */ |
| void Init(const ChainPosition &position); |
| |
| /** |
| * Destructs the link object. |
| * |
| * If necessary, this method will pass a poison block |
| * to this link's output @ref PCQueue "producer queue". |
| * |
| * @see Block::SetToPoison() |
| */ |
| ~Link(); |
| |
| /** |
| * Gets a reference to the @ref Block "block" at this link. |
| */ |
| Block &operator*() { return current_; } |
| |
| /** |
| * Gets a const reference to the @ref Block "block" at this link. |
| */ |
| const Block &operator*() const { return current_; } |
| |
| /** |
| * Gets a pointer to the @ref Block "block" at this link. |
| */ |
| Block *operator->() { return ¤t_; } |
| |
| /** |
| * Gets a const pointer to the @ref Block "block" at this link. |
| */ |
| const Block *operator->() const { return ¤t_; } |
| |
| /** |
| * Gets the link at the next @ref ChainPosition "position" in the @ref Chain "chain". |
| */ |
| Link &operator++(); |
| |
| /** |
| * Returns true if the @ref Block "block" at this link encapsulates a valid (non-NULL) block of memory. |
| * |
| * This method is a user-defined implicit conversion function to boolean; |
| * among other things, this method enables bare instances of this class |
| * to be used as the condition of an if statement. |
| */ |
| operator bool() const { return current_; } |
| |
| /** |
| * @ref Block::SetToPoison() "Poisons" the @ref Block "block" at this link, |
| * and passes this now-poisoned block to this link's output @ref PCQueue "producer queue". |
| * |
| * @see Block::SetToPoison() |
| */ |
| void Poison(); |
| |
| private: |
| Block current_; |
| PCQueue<Block> *in_, *out_; |
| |
| bool poisoned_; |
| |
| WorkerProgress progress_; |
| }; |
| |
| inline Chain &operator>>(Chain &chain, Link &link) { |
| link.Init(chain.Add()); |
| return chain; |
| } |
| |
| } // namespace stream |
| } // namespace util |
| |
| #endif // UTIL_STREAM_CHAIN_H |