blob: 707a98c7faa5b7c9c6ba4d3cc6ad3db1f3a30802 [file] [log] [blame]
#ifndef LM_BUILDER_MULTI_STREAM__
#define LM_BUILDER_MULTI_STREAM__
#include "lm/builder/ngram_stream.hh"
#include "util/scoped.hh"
#include "util/stream/chain.hh"
#include <cstddef>
#include <new>
#include <assert.h>
#include <stdlib.h>
namespace lm { namespace builder {
template <class T> class FixedArray {
public:
explicit FixedArray(std::size_t count) {
Init(count);
}
FixedArray() : newed_end_(NULL) {}
void Init(std::size_t count) {
assert(!block_.get());
block_.reset(malloc(sizeof(T) * count));
if (!block_.get()) throw std::bad_alloc();
newed_end_ = begin();
}
FixedArray(const FixedArray &from) {
std::size_t size = from.newed_end_ - static_cast<const T*>(from.block_.get());
Init(size);
for (std::size_t i = 0; i < size; ++i) {
new(end()) T(from[i]);
Constructed();
}
}
~FixedArray() { clear(); }
T *begin() { return static_cast<T*>(block_.get()); }
const T *begin() const { return static_cast<const T*>(block_.get()); }
// Always call Constructed after successful completion of new.
T *end() { return newed_end_; }
const T *end() const { return newed_end_; }
T &back() { return *(end() - 1); }
const T &back() const { return *(end() - 1); }
std::size_t size() const { return end() - begin(); }
bool empty() const { return begin() == end(); }
T &operator[](std::size_t i) { return begin()[i]; }
const T &operator[](std::size_t i) const { return begin()[i]; }
template <class C> void push_back(const C &c) {
new (end()) T(c);
Constructed();
}
void clear() {
for (T *i = begin(); i != end(); ++i)
i->~T();
newed_end_ = begin();
}
protected:
void Constructed() {
++newed_end_;
}
private:
util::scoped_malloc block_;
T *newed_end_;
};
class Chains;
class ChainPositions : public FixedArray<util::stream::ChainPosition> {
public:
ChainPositions() {}
void Init(Chains &chains);
explicit ChainPositions(Chains &chains) {
Init(chains);
}
};
class Chains : public FixedArray<util::stream::Chain> {
private:
template <class T, void (T::*ptr)(const ChainPositions &) = &T::Run> struct CheckForRun {
typedef Chains type;
};
public:
explicit Chains(std::size_t limit) : FixedArray<util::stream::Chain>(limit) {}
template <class Worker> typename CheckForRun<Worker>::type &operator>>(const Worker &worker) {
threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker));
return *this;
}
template <class Worker> typename CheckForRun<Worker>::type &operator>>(const boost::reference_wrapper<Worker> &worker) {
threads_.push_back(new util::stream::Thread(ChainPositions(*this), worker));
return *this;
}
Chains &operator>>(const util::stream::Recycler &recycler) {
for (util::stream::Chain *i = begin(); i != end(); ++i)
*i >> recycler;
return *this;
}
void Wait(bool release_memory = true) {
threads_.clear();
for (util::stream::Chain *i = begin(); i != end(); ++i) {
i->Wait(release_memory);
}
}
private:
boost::ptr_vector<util::stream::Thread> threads_;
Chains(const Chains &);
void operator=(const Chains &);
};
inline void ChainPositions::Init(Chains &chains) {
FixedArray<util::stream::ChainPosition>::Init(chains.size());
for (util::stream::Chain *i = chains.begin(); i != chains.end(); ++i) {
new (end()) util::stream::ChainPosition(i->Add()); Constructed();
}
}
inline Chains &operator>>(Chains &chains, ChainPositions &positions) {
positions.Init(chains);
return chains;
}
class NGramStreams : public FixedArray<NGramStream> {
public:
NGramStreams() {}
// This puts a dummy NGramStream at the beginning (useful to algorithms that need to reference something at the beginning).
void InitWithDummy(const ChainPositions &positions) {
FixedArray<NGramStream>::Init(positions.size() + 1);
new (end()) NGramStream(); Constructed();
for (const util::stream::ChainPosition *i = positions.begin(); i != positions.end(); ++i) {
push_back(*i);
}
}
// Limit restricts to positions[0,limit)
void Init(const ChainPositions &positions, std::size_t limit) {
FixedArray<NGramStream>::Init(limit);
for (const util::stream::ChainPosition *i = positions.begin(); i != positions.begin() + limit; ++i) {
push_back(*i);
}
}
void Init(const ChainPositions &positions) {
Init(positions, positions.size());
}
NGramStreams(const ChainPositions &positions) {
Init(positions);
}
};
inline Chains &operator>>(Chains &chains, NGramStreams &streams) {
ChainPositions positions;
chains >> positions;
streams.Init(positions);
return chains;
}
}} // namespaces
#endif // LM_BUILDER_MULTI_STREAM__