/*!
 * Copyright (c) 2017 by Contributors
 * \file threaded_engine_test.cc
 * \brief threaded engine tests
*/
#include <time.h>
#include <unistd.h>
#include <dmlc/logging.h>
#include <gtest/gtest.h>
#include <mxnet/engine.h>
#include <dmlc/timer.h>
#include <cstdio>
#include <thread>
#include <chrono>
#include <vector>

#include "../src/engine/engine_impl.h"

/**
 * present the following workload
 *  n = reads.size()
 *  data[write] = (data[reads[0]] + ... data[reads[n]]) / n
 *  std::this_thread::sleep_for(std::chrono::microsecons(time));
 */
struct Workload {
  std::vector<int> reads;
  int write;
  int time;
};

static u_int32_t seed_ = 0xdeadbeef;

/**
 * generate a list of workloads
 */
void GenerateWorkload(int num_workloads, int num_var,
                      int min_read, int max_read,
                      int min_time, int max_time,
                      std::vector<Workload>* workloads) {
  workloads->clear();
  workloads->resize(num_workloads);
  for (int i = 0; i < num_workloads; ++i) {
    auto& wl = workloads->at(i);
    wl.write = rand_r(&seed_) % num_var;
    int r = rand_r(&seed_);
    int num_read = min_read + (r % (max_read - min_read));
    for (int j = 0; j < num_read; ++j) {
      wl.reads.push_back(rand_r(&seed_) % num_var);
    }
    wl.time = min_time + rand_r(&seed_) % (max_time - min_time);
  }
}

/**
 * evaluate a single workload
 */
void EvaluateWorload(const Workload& wl, std::vector<double>* data) {
  double tmp = 0;
  for (int i : wl.reads) tmp += data->at(i);
  data->at(wl.write) = tmp / (wl.reads.size() + 1);
  if (wl.time > 0) {
    std::this_thread::sleep_for(std::chrono::microseconds(wl.time));
  }
}

/**
 * evaluate a list of workload, return the time used
 */
double EvaluateWorloads(const std::vector<Workload>& workloads,
                      mxnet::Engine* engine,
                      std::vector<double>* data) {
  using namespace mxnet;
  double t = dmlc::GetTime();
  std::vector<Engine::VarHandle> vars;
  if (engine) {
    for (size_t i = 0; i < data->size(); ++i) {
      vars.push_back(engine->NewVariable());
    }
  }

  for (const auto& wl : workloads) {
    if (wl.reads.size() == 0) continue;
    if (engine == NULL) {
      EvaluateWorload(wl, data);
    } else {
      auto func = [wl, data](RunContext ctx, Engine::CallbackOnComplete cb) {
        EvaluateWorload(wl, data); cb();
      };
      std::vector<Engine::VarHandle> reads;
      for (auto i : wl.reads) {
        if (i != wl.write) reads.push_back(vars[i]);
      }
      engine->PushAsync(func, Context::CPU(), reads, {vars[wl.write]});
    }
  }

  if (engine) {
    engine->WaitForAll();
  }
  return dmlc::GetTime() - t;
}

TEST(Engine, RandSumExpr) {
  std::vector<Workload> workloads;
  int num_repeat = 5;
  const int num_engine = 4;

  std::vector<double> t(num_engine, 0.0);
  std::vector<mxnet::Engine*> engine(num_engine);

  engine[0] = NULL;
  engine[1] = mxnet::engine::CreateNaiveEngine();
  engine[2] = mxnet::engine::CreateThreadedEnginePooled();
  engine[3] = mxnet::engine::CreateThreadedEnginePerDevice();

  for (int repeat = 0; repeat < num_repeat; ++repeat) {
    srand(time(NULL) + repeat);
    int num_var = 100;
    GenerateWorkload(10000, num_var, 2, 20, 1, 10, &workloads);
    std::vector<std::vector<double>> data(num_engine);
    for (int i = 0; i < num_engine; ++i) {
      data[i].resize(num_var, 1.0);
      t[i] += EvaluateWorloads(workloads, engine[i], &data[i]);
    }

    for (int i = 1; i < num_engine; ++i) {
      for (int j = 0; j < num_var; ++j) EXPECT_EQ(data[0][j], data[i][j]);
    }
    LOG(INFO) << "data: " << data[0][1] << " " << data[0][2] << "...";
  }


  LOG(INFO) << "baseline\t\t"  << t[0] << " sec";
  LOG(INFO) << "NaiveEngine\t\t"  << t[1] << " sec";
  LOG(INFO) << "ThreadedEnginePooled\t" << t[2] << " sec";
  LOG(INFO) << "ThreadedEnginePerDevice\t" << t[3] << " sec";
}

void Foo(mxnet::RunContext, int i) { printf("The fox says %d\n", i); }

TEST(Engine, basics) {
  auto&& engine = mxnet::Engine::Get();
  auto&& var = engine->NewVariable();
  std::vector<mxnet::Engine::OprHandle> oprs;

  // Test #1
  printf("============= Test #1 ==============\n");
  for (int i = 0; i < 10; ++i) {
    oprs.push_back(engine->NewOperator(
        [i](mxnet::RunContext ctx, mxnet::Engine::CallbackOnComplete cb) {
          Foo(ctx, i);
          std::this_thread::sleep_for(std::chrono::seconds{1});
          cb();
        },
        {var}, {}));
    engine->Push(oprs.at(i), mxnet::Context{});
  }
  engine->WaitForAll();
  printf("Going to push delete\n");
  // std::this_thread::sleep_for(std::chrono::seconds{1});
  for (auto&& i : oprs) {
    engine->DeleteOperator(i);
  }
  engine->DeleteVariable([](mxnet::RunContext) {}, mxnet::Context{}, var);
  engine->WaitForAll();

  printf("============= Test #2 ==============\n");
  var = engine->NewVariable();
  oprs.clear();
  for (int i = 0; i < 10; ++i) {
    oprs.push_back(engine->NewOperator(
        [i](mxnet::RunContext ctx, mxnet::Engine::CallbackOnComplete cb) {
          Foo(ctx, i);
          std::this_thread::sleep_for(std::chrono::milliseconds{500});
          cb();
        },
        {}, {var}));
    engine->Push(oprs.at(i), mxnet::Context{});
  }
  // std::this_thread::sleep_for(std::chrono::seconds{1});
  engine->WaitForAll();
  for (auto&& i : oprs) {
    engine->DeleteOperator(i);
  }
  engine->DeleteVariable([](mxnet::RunContext) {}, mxnet::Context{}, var);

  printf("============= Test #3 ==============\n");
  var = engine->NewVariable();
  oprs.clear();
  engine->WaitForVar(var);
  engine->DeleteVariable([](mxnet::RunContext) {}, mxnet::Context{}, var);
  engine->WaitForAll();

  printf("============= Test #4 ==============\n");
  var = engine->NewVariable();
  oprs.clear();
  oprs.push_back(engine->NewOperator(
      [](mxnet::RunContext ctx, mxnet::Engine::CallbackOnComplete cb) {
        std::this_thread::sleep_for(std::chrono::seconds{2});
        Foo(ctx, 42);
        cb();
      },
      {}, {var}, mxnet::FnProperty::kCopyFromGPU));
  engine->Push(oprs.at(0), mxnet::Context{});
  LOG(INFO) << "IO operator pushed, should wait for 2 seconds.";
  engine->WaitForVar(var);
  LOG(INFO) << "OK, here I am.";
  for (auto&& i : oprs) {
    engine->DeleteOperator(i);
  }
  engine->DeleteVariable([](mxnet::RunContext) {}, mxnet::Context{}, var);
  engine->WaitForAll();

  printf("============= Test #5 ==============\n");
  var = engine->NewVariable();
  oprs.clear();
  oprs.push_back(engine->NewOperator(
      [](mxnet::RunContext ctx, mxnet::Engine::CallbackOnComplete cb) {
        Foo(ctx, 42);
        std::this_thread::sleep_for(std::chrono::seconds{2});
        cb();
      },
      {var}, {}));
  engine->Push(oprs.at(0), mxnet::Context{});
  LOG(INFO) << "Operator pushed, should not wait.";
  engine->WaitForVar(var);
  LOG(INFO) << "OK, here I am.";
  engine->WaitForAll();
  LOG(INFO) << "That was 2 seconds.";
  for (auto&& i : oprs) {
    engine->DeleteOperator(i);
  }
  engine->DeleteVariable([](mxnet::RunContext) {}, mxnet::Context{}, var);
  engine->WaitForAll();
  var = nullptr;
  oprs.clear();
  LOG(INFO) << "All pass";
}
