| #include "lm/common/compare.hh" |
| #include "lm/common/model_buffer.hh" |
| #include "lm/common/ngram.hh" |
| #include "util/stream/chain.hh" |
| #include "util/stream/multi_stream.hh" |
| #include "util/stream/sort.hh" |
| #include "lm/interpolate/split_worker.hh" |
| |
| #include <boost/program_options.hpp> |
| #include <boost/version.hpp> |
| |
| #if defined(_WIN32) || defined(_WIN64) |
| |
| // Windows doesn't define <unistd.h> |
| // |
| // So we define what we need here instead: |
| // |
| #define STDIN_FILENO = 0 |
| #define STDOUT_FILENO = 1 |
| #else // Huzzah for POSIX! |
| #include <unistd.h> |
| #endif |
| |
| /* |
| * This is a simple example program that takes in intermediate |
| * suffix-sorted ngram files and outputs two sets of files: one for backoff |
| * probability values (raw numbers, in suffix order) and one for |
| * probability values (ngram id and probability, in *context* order) |
| */ |
| int main(int argc, char *argv[]) { |
| using namespace lm::interpolate; |
| |
| const std::size_t ONE_GB = 1 << 30; |
| const std::size_t SIXTY_FOUR_MB = 1 << 26; |
| const std::size_t NUMBER_OF_BLOCKS = 2; |
| |
| std::string FILE_NAME = "ngrams"; |
| std::string CONTEXT_SORTED_FILENAME = "csorted-ngrams"; |
| std::string BACKOFF_FILENAME = "backoffs"; |
| std::string TMP_DIR = "/tmp/"; |
| |
| try { |
| namespace po = boost::program_options; |
| po::options_description options("canhazinterp Pass-3 options"); |
| |
| options.add_options() |
| ("help,h", po::bool_switch(), "Show this help message") |
| ("ngrams,n", po::value<std::string>(&FILE_NAME), "ngrams file") |
| ("csortngrams,c", po::value<std::string>(&CONTEXT_SORTED_FILENAME), "context sorted ngrams file") |
| ("backoffs,b", po::value<std::string>(&BACKOFF_FILENAME), "backoffs file") |
| ("tmpdir,t", po::value<std::string>(&TMP_DIR), "tmp dir"); |
| po::variables_map vm; |
| po::store(po::parse_command_line(argc, argv, options), vm); |
| |
| // Display help |
| if(vm["help"].as<bool>()) { |
| std::cerr << "Usage: " << options << std::endl; |
| return 1; |
| } |
| } |
| catch(const std::exception &e) { |
| |
| std::cerr << e.what() << std::endl; |
| return 1; |
| |
| } |
| |
| // The basic strategy here is to have three chains: |
| // - The first reads the ngram order inputs using ModelBuffer. Those are |
| // then stripped of their backoff values and fed into the third chain; |
| // the backoff values *themselves* are written to the second chain. |
| // |
| // - The second chain takes the backoff values and writes them out to a |
| // file (one for each order). |
| // |
| // - The third chain takes just the probability values and ngrams and |
| // writes them out, sorted in context-order, to a file (one for each |
| // order). |
| |
| // This will be used to read in the binary intermediate files. There is |
| // one file per order (e.g. ngrams.1, ngrams.2, ...) |
| lm::ModelBuffer buffer(FILE_NAME); |
| |
| // Create a separate chains for each ngram order for: |
| // - Input from the intermediate files |
| // - Output to the backoff file |
| // - Output to the (context-sorted) probability file |
| util::stream::Chains ngram_inputs(buffer.Order()); |
| util::stream::Chains backoff_chains(buffer.Order()); |
| util::stream::Chains prob_chains(buffer.Order()); |
| for (std::size_t i = 0; i < buffer.Order(); ++i) { |
| ngram_inputs.push_back(util::stream::ChainConfig( |
| lm::NGram<lm::ProbBackoff>::TotalSize(i + 1), NUMBER_OF_BLOCKS, ONE_GB)); |
| |
| backoff_chains.push_back( |
| util::stream::ChainConfig(sizeof(float), NUMBER_OF_BLOCKS, ONE_GB)); |
| |
| prob_chains.push_back(util::stream::ChainConfig( |
| sizeof(lm::WordIndex) * (i + 1) + sizeof(float), NUMBER_OF_BLOCKS, |
| ONE_GB)); |
| } |
| |
| // This sets the input for each of the ngram order chains to the |
| // appropriate file |
| buffer.Source(ngram_inputs); |
| |
| util::FixedArray<util::scoped_ptr<SplitWorker> > workers(buffer.Order()); |
| for (std::size_t i = 0; i < buffer.Order(); ++i) { |
| // Attach a SplitWorker to each of the ngram input chains, writing to the |
| // corresponding order's backoff and probability chains |
| workers.push_back( |
| new SplitWorker(i + 1, backoff_chains[i], prob_chains[i])); |
| ngram_inputs[i] >> boost::ref(*workers.back()); |
| } |
| |
| util::stream::SortConfig sort_cfg; |
| sort_cfg.temp_prefix = TMP_DIR; |
| sort_cfg.buffer_size = SIXTY_FOUR_MB; |
| sort_cfg.total_memory = ONE_GB; |
| |
| // This will parallel merge sort the individual order files, putting |
| // them in context-order instead of suffix-order. |
| // |
| // Two new threads will be running, each owned by the chains[i] object. |
| // - The first executes BlockSorter.Run() to sort the n-gram entries |
| // - The second executes WriteAndRecycle.Run() to write each sorted |
| // block to disk as a temporary file |
| util::stream::Sorts<lm::ContextOrder> sorts(buffer.Order()); |
| for (std::size_t i = 0; i < prob_chains.size(); ++i) { |
| sorts.push_back(prob_chains[i], sort_cfg, lm::ContextOrder(i + 1)); |
| } |
| |
| // Set the sort output to be on the same chain |
| for (std::size_t i = 0; i < prob_chains.size(); ++i) { |
| // The following call to Chain::Wait() |
| // joins the threads owned by chains[i]. |
| // |
| // As such the following call won't return |
| // until all threads owned by chains[i] have completed. |
| // |
| // The following call also resets chain[i] |
| // so that it can be reused |
| // (including free'ing the memory previously used by the chain) |
| prob_chains[i].Wait(); |
| |
| // In an ideal world (without memory restrictions) |
| // we could merge all of the previously sorted blocks |
| // by reading them all completely into memory |
| // and then running merge sort over them. |
| // |
| // In the real world, we have memory restrictions; |
| // depending on how many blocks we have, |
| // and how much memory we can use to read from each block |
| // (sort_config.buffer_size) |
| // it may be the case that we have insufficient memory |
| // to read sort_config.buffer_size of data from each block from disk. |
| // |
| // If this occurs, then it will be necessary to perform one or more rounds |
| // of merge sort on disk; |
| // doing so will reduce the number of blocks that we will eventually |
| // need to read from |
| // when performing the final round of merge sort in memory. |
| // |
| // So, the following call determines whether it is necessary |
| // to perform one or more rounds of merge sort on disk; |
| // if such on-disk merge sorting is required, such sorting is performed. |
| // |
| // Finally, the following method launches a thread that calls |
| // OwningMergingReader.Run() |
| // to perform the final round of merge sort in memory. |
| // |
| // Merge sort could have be invoked directly |
| // so that merge sort memory doesn't coexist with Chain memory. |
| sorts[i].Output(prob_chains[i]); |
| } |
| |
| // Create another model buffer for our output on e.g. csorted-ngrams.1, |
| // csorted-ngrams.2, ... |
| lm::ModelBuffer output_buf(CONTEXT_SORTED_FILENAME, true, false); |
| output_buf.Sink(prob_chains, buffer.Counts()); |
| |
| // Create a third model buffer for our backoff output on e.g. backoff.1, |
| // backoff.2, ... |
| lm::ModelBuffer boff_buf(BACKOFF_FILENAME, true, false); |
| boff_buf.Sink(backoff_chains, buffer.Counts()); |
| |
| // Joins all threads that chains owns, |
| // and does a for loop over each chain object in chains, |
| // calling chain.Wait() on each such chain object |
| ngram_inputs.Wait(true); |
| backoff_chains.Wait(true); |
| prob_chains.Wait(true); |
| |
| return 0; |
| } |