blob: 0ee7fab6fbb9374c2ce919be7cc65965322e49fc [file] [log] [blame]
#ifndef UTIL_STREAM_MULTI_STREAM_H
#define UTIL_STREAM_MULTI_STREAM_H
#include "util/fixed_array.hh"
#include "util/scoped.hh"
#include "util/stream/chain.hh"
#include "util/stream/stream.hh"
#include <cstddef>
#include <new>
#include <assert.h>
#include <stdlib.h>
namespace util { namespace stream {
class Chains;
class ChainPositions : public util::FixedArray<util::stream::ChainPosition> {
public:
ChainPositions() {}
void Init(Chains &chains);
explicit ChainPositions(Chains &chains) {
Init(chains);
}
};
class Chains : public util::FixedArray<util::stream::Chain> {
private:
template <class T, void (T::*ptr)(const ChainPositions &) = &T::Run> struct CheckForRun {
typedef Chains type;
};
public:
// Must call Init.
Chains() {}
explicit Chains(std::size_t limit) : util::FixedArray<util::stream::Chain>(limit) {}
template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) {
threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker));
return *this;
}
template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) {
threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker));
return *this;
}
Chains &operator>>(const util::stream::Recycler &recycler) {
for (util::stream::Chain *i = begin(); i != end(); ++i)
*i >> recycler;
return *this;
}
void Wait(bool release_memory = true) {
threads_.clear();
for (util::stream::Chain *i = begin(); i != end(); ++i) {
i->Wait(release_memory);
}
}
private:
boost::ptr_vector<util::stream::Thread> threads_;
Chains(const Chains &);
void operator=(const Chains &);
};
inline void ChainPositions::Init(Chains &chains) {
util::FixedArray<util::stream::ChainPosition>::Init(chains.size());
for (util::stream::Chain *i = chains.begin(); i != chains.end(); ++i) {
// use "placement new" syntax to initalize ChainPosition in an already-allocated memory location
new (end()) util::stream::ChainPosition(i->Add()); Constructed();
}
}
inline Chains &operator>>(Chains &chains, ChainPositions &positions) {
positions.Init(chains);
return chains;
}
template <class T> class GenericStreams : public util::FixedArray<T> {
private:
typedef util::FixedArray<T> P;
public:
GenericStreams() {}
// This puts a dummy T at the beginning (useful to algorithms that need to reference something at the beginning).
void InitWithDummy(const ChainPositions &positions) {
P::Init(positions.size() + 1);
new (P::end()) T(); // use "placement new" syntax to initalize T in an already-allocated memory location
P::Constructed();
for (const util::stream::ChainPosition *i = positions.begin(); i != positions.end(); ++i) {
P::push_back(*i);
}
}
// Limit restricts to positions[0,limit)
void Init(const ChainPositions &positions, std::size_t limit) {
P::Init(limit);
for (const util::stream::ChainPosition *i = positions.begin(); i != positions.begin() + limit; ++i) {
P::push_back(*i);
}
}
void Init(const ChainPositions &positions) {
Init(positions, positions.size());
}
GenericStreams(const ChainPositions &positions) {
Init(positions);
}
};
template <class T> inline Chains &operator>>(Chains &chains, GenericStreams<T> &streams) {
ChainPositions positions;
chains >> positions;
streams.Init(positions);
return chains;
}
typedef GenericStreams<Stream> Streams;
}} // namespaces
#endif // UTIL_STREAM_MULTI_STREAM_H