/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 **/

#include <climits>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include "catalog/CatalogDatabase.hpp"
#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryContext.pb.h"
#include "query_execution/QueryExecutionState.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/QueryManagerSingleNode.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
#include "query_execution/WorkerDirectory.hpp"
#include "query_execution/WorkerMessage.hpp"
#include "query_optimizer/QueryHandle.hpp"
#include "query_optimizer/QueryPlan.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "storage/InsertDestination.hpp"
#include "storage/InsertDestination.pb.h"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
#include "utility/DAG.hpp"
#include "utility/Macros.hpp"

#include "glog/logging.h"
#include "gtest/gtest.h"

#include "tmb/id_typedefs.h"

namespace tmb { class MessageBus; }

using std::move;
using std::unique_ptr;
using std::vector;

using tmb::client_id;

namespace quickstep {

namespace {

const partition_id kPartitionId = 0;

}  // namespace

class WorkOrderProtosContainer;

class MockWorkOrder : public WorkOrder {
 public:
  explicit MockWorkOrder(const int op_index)
      : WorkOrder(0), op_index_(op_index) {}

  void execute() override {
    VLOG(3) << "WorkOrder[" << op_index_ << "] executing.";
  }

  inline QueryPlan::DAGNodeIndex getOpIndex() const {
    return op_index_;
  }

 private:
  const QueryPlan::DAGNodeIndex op_index_;

  DISALLOW_COPY_AND_ASSIGN(MockWorkOrder);
};

class MockOperator: public RelationalOperator {
 public:
  enum function_name {
    kFeedInputBlock = 0,
    kDoneFeedingInputBlocks,
    kGetAllWorkOrders
  };

  MockOperator(const bool produce_workorders,
               const bool has_streaming_input,
               const int max_getworkorder_iters = 1,
               const int max_workorders = INT_MAX)
      : RelationalOperator(0 /* Query Id */),
        produce_workorders_(produce_workorders),
        has_streaming_input_(has_streaming_input),
        max_workorders_(max_workorders),
        max_getworkorder_iters_(max_getworkorder_iters),
        num_calls_get_workorders_(0),
        num_workorders_generated_(0),
        num_calls_feedblock_(0),
        num_calls_donefeedingblocks_(0) {
  }

  OperatorType getOperatorType() const override {
    return kMockOperator;
  }

  std::string getName() const override {
    return "MockOperator";
  }

#define MOCK_OP_LOG(x) VLOG(x) << "Op[" << op_index_ << "]: " << __func__ << ": "

  // The methods below are used to check whether QueryManager calls the Relational
  // operator, how many times it calls a particular method etc.
  inline int getNumWorkOrders() const {
    return num_workorders_generated_;
  }

  inline int getNumCalls(const function_name fname) const {
    switch (fname) {
      case kFeedInputBlock:
        return num_calls_feedblock_;
      case kDoneFeedingInputBlocks:
        return num_calls_donefeedingblocks_;
      case kGetAllWorkOrders:
        return num_calls_get_workorders_;
      default:
        return -1;
    }
  }

  void setInsertDestinationID(const QueryContext::insert_destination_id insert_destination_index) {
    insert_destination_index_ = insert_destination_index;
  }

  // Mock to trigger doneFeedingInputBlocks for the dependent operators
  // in QueryManager::markOperatorFinished.
  void setOutputRelationID(const relation_id rel_id) {
    output_relation_id_ = rel_id;
  }

  // Override methods from the base class.
  bool getAllWorkOrders(
      WorkOrdersContainer *container,
      QueryContext *query_context,
      StorageManager *storage_manager,
      const tmb::client_id foreman_client_id,
      tmb::MessageBus *bus) override {
    ++num_calls_get_workorders_;
    if (produce_workorders_) {
      if (has_streaming_input_) {
        if (num_calls_feedblock_ > 0 && (num_workorders_generated_ < max_workorders_)) {
          MOCK_OP_LOG(3) << "[stream] generate WorkOrder";
          container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
          ++num_workorders_generated_;
        }
      } else {
        if (num_workorders_generated_ < max_workorders_) {
          MOCK_OP_LOG(3) << "[static] generate WorkOrder";
          container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
          ++num_workorders_generated_;
        }
      }
    }
    MOCK_OP_LOG(3) << "count(" << num_calls_get_workorders_ << ") "
                   << "return(" << (num_calls_get_workorders_ == max_getworkorder_iters_) << ")";
    return num_calls_get_workorders_ == max_getworkorder_iters_;
  }

  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override {
    return true;
  }

  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
                      const partition_id part_id) override {
    ++num_calls_feedblock_;
    MOCK_OP_LOG(3) << "count(" << num_calls_feedblock_ << ")";
  }

  void doneFeedingInputBlocks(const relation_id rel_id) override {
    ++num_calls_donefeedingblocks_;
    MOCK_OP_LOG(3) << "count(" << num_calls_donefeedingblocks_ << ")";
  }

  QueryContext::insert_destination_id getInsertDestinationID() const override {
    return insert_destination_index_;
  }

  const relation_id getOutputRelationID() const override {
    return output_relation_id_;
  }

 private:
  const bool produce_workorders_;
  const bool has_streaming_input_;
  const int max_workorders_;
  const int max_getworkorder_iters_;

  int num_calls_get_workorders_;
  int num_workorders_generated_;
  int num_calls_feedblock_;
  int num_calls_donefeedingblocks_;

  QueryContext::insert_destination_id insert_destination_index_ = QueryContext::kInvalidInsertDestinationId;

  relation_id output_relation_id_ = -1;

#undef MOCK_OP_LOG

  DISALLOW_COPY_AND_ASSIGN(MockOperator);
};


class QueryManagerTest : public ::testing::Test {
 protected:
  virtual void SetUp() {
    db_.reset(new CatalogDatabase(nullptr /* catalog */, "database"));
    storage_manager_.reset(new StorageManager("./"));
    bus_.Initialize();
    query_handle_ = new QueryHandle(0 /* dummy query ID */, tmb::kClientIdNone /* cli_id */);
    query_plan_ = query_handle_->getQueryPlanMutable();
    query_handle_->getQueryContextProtoMutable()->set_query_id(query_handle_->query_id());
  }

  inline void constructQueryManager() {
    query_manager_.reset(new QueryManagerSingleNode(
        0, 1, query_handle_, db_.get(), storage_manager_.get(), &bus_));
  }

  inline const int getNumWorkOrdersInExecution(const QueryPlan::DAGNodeIndex index) const {
    return query_manager_->getQueryExecutionState().getNumQueuedWorkOrders(index);
  }

  inline const int getNumOperatorsFinished() const {
    return query_manager_->getQueryExecutionState().getNumOperatorsFinished();
  }

  inline bool getOperatorFinishedStatus(const QueryPlan::DAGNodeIndex index) const {
    return query_manager_->getQueryExecutionState().hasExecutionFinished(index);
  }

  inline bool placeDataPipelineMessage(const QueryPlan::DAGNodeIndex source_operator_index) {
    VLOG(3) << "Place DataPipeline message for Op[" << source_operator_index << "]";

    query_manager_->processDataPipelineMessage(source_operator_index,
                                               0 /* dummy block ID */,
                                               0 /* dummy relation ID */);
    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
  }

  inline bool placeWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
    VLOG(3) << "Place WorkOrderComplete message for Op[" << index << "]";

    query_manager_->processWorkOrderCompleteMessage(index, kPartitionId);
    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
  }

  inline bool placeRebuildWorkOrderCompleteMessage(const QueryPlan::DAGNodeIndex index) {
    VLOG(3) << "Place RebuildWorkOrderComplete message for Op[" << index << "]";

    query_manager_->processRebuildWorkOrderCompleteMessage(index, kPartitionId);
    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
  }

  inline bool placeOutputBlockMessage(const QueryPlan::DAGNodeIndex index) {
    VLOG(3) << "Place OutputBlock message for Op[" << index << "]";

    query_manager_->processDataPipelineMessage(index,
                                               0 /* dummy block ID */,
                                               0 /* dummy relation ID */);
    return query_manager_->getQueryExecutionState().hasQueryExecutionFinished();
  }

  unique_ptr<CatalogDatabase> db_;
  unique_ptr<StorageManager> storage_manager_;

  QueryPlan *query_plan_;  // Owned by 'query_handle_'.
  QueryHandle* query_handle_;  // Owned by 'query_manager_'.
  unique_ptr<QueryManagerSingleNode> query_manager_;

  MessageBusImpl bus_;

  client_id worker_client_id_;

  unique_ptr<WorkerDirectory> workers_;
};

TEST_F(QueryManagerTest, SingleNodeDAGNoWorkOrdersTest) {
  // This test creates a DAG of a single node. No workorders are generated.
  query_plan_->addRelationalOperator(new MockOperator(false, false));

  const MockOperator &op = static_cast<const MockOperator &>(
      query_plan_->getQueryPlanDAG().getNodePayload(0));

  constructQueryManager();

  // We expect one call for op's getAllWorkOrders().
  EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
}

TEST_F(QueryManagerTest, SingleNodeDAGStaticWorkOrdersTest) {
  // This test creates a DAG of a single node. Static workorders are generated.
  const QueryPlan::DAGNodeIndex id =
      query_plan_->addRelationalOperator(new MockOperator(true, false, 1));

  const MockOperator &op = static_cast<const MockOperator &>(
      query_plan_->getQueryPlanDAG().getNodePayload(id));

  constructQueryManager();

  // We expect one call for op's getAllWorkOrders().
  EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));

  // One workorder is generated.
  EXPECT_EQ(1, op.getNumWorkOrders());

  unique_ptr<WorkerMessage> worker_message;
  worker_message.reset(query_manager_->getNextWorkerMessage(0, -1));
  EXPECT_TRUE(worker_message != nullptr);

  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
            worker_message->getType());
  EXPECT_EQ(0u, worker_message->getRelationalOpIndex());

  delete worker_message->getWorkOrder();

  EXPECT_EQ(1, getNumWorkOrdersInExecution(id));
  EXPECT_EQ(0, getNumOperatorsFinished());

  // Send a message to QueryManager upon workorder completion.
  // Last event processed by QueryManager.
  EXPECT_TRUE(placeWorkOrderCompleteMessage(id));

  EXPECT_EQ(0, getNumWorkOrdersInExecution(id));
  EXPECT_EQ(1, getNumOperatorsFinished());
  EXPECT_TRUE(getOperatorFinishedStatus(id));
}

TEST_F(QueryManagerTest, SingleNodeDAGDynamicWorkOrdersTest) {
  // This test creates a DAG of a single node. WorkOrders are generated
  // dynamically as pending work orders complete execution, i.e.,
  // getAllWorkOrders() is called multiple times.  getAllWorkOrders() will be
  // called 3 times and 3 work orders will be returned, i.e., 2 calls to
  // getAllWorkOrders() insert 2 WorkOrder and return false, and the last will
  // insert 1 WorkOrder and return true.

  // TODO(shoban): This test can not be more robust than this because of fixed
  // scaffolding of mocking. If we use gMock, we can do much better.
  const QueryPlan::DAGNodeIndex id =
      query_plan_->addRelationalOperator(new MockOperator(true, false, 3, 3));

  const MockOperator &op = static_cast<const MockOperator &>(
      query_plan_->getQueryPlanDAG().getNodePayload(id));

  constructQueryManager();

  for (int i = 0; i < 3; ++i) {
    // We expect one call for op's getAllWorkOrders().
    EXPECT_EQ(i + 1, op.getNumCalls(MockOperator::kGetAllWorkOrders));

    // One workorder is generated.
    // EXPECT_EQ(1, getWorkerInputQueueSize());
    EXPECT_EQ(i + 1, op.getNumWorkOrders());

    unique_ptr<WorkerMessage> worker_message;
    worker_message.reset(query_manager_->getNextWorkerMessage(id, -1));

    ASSERT_TRUE(worker_message != nullptr);
    EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
              worker_message->getType());
    EXPECT_EQ(id, worker_message->getRelationalOpIndex());

    delete worker_message->getWorkOrder();

    EXPECT_EQ(1, getNumWorkOrdersInExecution(id));
    EXPECT_EQ(0, getNumOperatorsFinished());

    if (i < 2) {
      // Send a message to QueryManager upon workorder completion.
      EXPECT_FALSE(placeWorkOrderCompleteMessage(id));
      query_manager_->fetchNormalWorkOrders(id);
    } else {
      // Send a message to QueryManager upon workorder completion.
      // Last event.
      EXPECT_TRUE(placeWorkOrderCompleteMessage(id));
    }
  }

  EXPECT_EQ(0, getNumWorkOrdersInExecution(id));

  EXPECT_EQ(1, getNumOperatorsFinished());
  EXPECT_TRUE(getOperatorFinishedStatus(id));

  // We place this check in the end, since it's true throughout the test.
  EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
}

TEST_F(QueryManagerTest, TwoNodesDAGBlockingLinkTest) {
  // We use two nodes in the DAG with a blocking link between them.
  // There is no streaming of data involved in this test.
  const QueryPlan::DAGNodeIndex id1 =
      query_plan_->addRelationalOperator(new MockOperator(true, false));
  const QueryPlan::DAGNodeIndex id2 =
      query_plan_->addRelationalOperator(new MockOperator(true, false));

  // Create a blocking link.
  query_plan_->addDirectDependency(id2, id1, true);

  static_cast<MockOperator *>(
      query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1))
          ->setOutputRelationID(0xdead);

  const MockOperator &op1 = static_cast<const MockOperator &>(
      query_plan_->getQueryPlanDAG().getNodePayload(id1));
  const MockOperator &op2 = static_cast<const MockOperator &>(
      query_plan_->getQueryPlanDAG().getNodePayload(id2));

  constructQueryManager();

  // Only op1 should receive a call to getAllWorkOrders initially.
  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));

  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));

  // Only op1 should produce a workorder.
  EXPECT_EQ(1, op1.getNumWorkOrders());
  EXPECT_EQ(0, op2.getNumWorkOrders());

  // Foreman hasn't yet got workorder completion response for the workorder.
  unique_ptr<WorkerMessage> worker_message;
  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));

  EXPECT_TRUE(worker_message != nullptr);
  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
            worker_message->getType());
  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());

  delete worker_message->getWorkOrder();

  EXPECT_EQ(1, getNumWorkOrdersInExecution(id1));
  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
  EXPECT_EQ(0, getNumOperatorsFinished());

  // Send a message to Foreman upon workorder (generated by op1) completion.
  EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));

  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
  // op1 is over now, op2 still to go.
  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
  EXPECT_EQ(1, getNumOperatorsFinished());

  EXPECT_TRUE(getOperatorFinishedStatus(id1));
  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
  EXPECT_FALSE(getOperatorFinishedStatus(id2));

  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
  EXPECT_TRUE(worker_message != nullptr);
  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
            worker_message->getType());
  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());

  delete worker_message->getWorkOrder();

  EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));

  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
  // op2 should get first call of getAllWorkOrders() when op1 is over.
  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));

  EXPECT_EQ(1, op2.getNumWorkOrders());

  // Send a message to QueryManager upon workorder (generated by op2) completion.
  // Note that the worker hasn't yet popped the workorder. Usually this won't
  // happen as workers pop workorders first, execute and then send the response.
  EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));

  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));

  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));

  EXPECT_EQ(2, getNumOperatorsFinished());
  EXPECT_TRUE(getOperatorFinishedStatus(id1));
  EXPECT_TRUE(getOperatorFinishedStatus(id2));

  // Expect no additional calls to getAllWorkOrders.
  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
}

TEST_F(QueryManagerTest, TwoNodesDAGPipeLinkTest) {
  // We use two nodes in the DAG with a non-blocking link between them.
  // We stream output of op1 to op2. Sequeuce of events is as follows:
  // 1. op1 creates a workorder.
  // 2. We send a "block full" (from op1) to QueryManager.
  // 3. op2 creates a workorder because of step 2.
  const QueryPlan::DAGNodeIndex id1 =
      query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
  const QueryPlan::DAGNodeIndex id2 =
      query_plan_->addRelationalOperator(new MockOperator(true, true, 2));

  // Create a non-blocking link.
  query_plan_->addDirectDependency(id2, id1, false);

  static_cast<MockOperator *>(
      query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1))
      ->setOutputRelationID(0xdead);

  const MockOperator &op1 = static_cast<const MockOperator &>(
      query_plan_->getQueryPlanDAG().getNodePayload(id1));
  const MockOperator &op2 = static_cast<const MockOperator &>(
      query_plan_->getQueryPlanDAG().getNodePayload(id2));

  constructQueryManager();

  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
  EXPECT_EQ(1, op1.getNumWorkOrders());
  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));

  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
  // op2 will generate workorder only after receiving a streaming input.
  EXPECT_EQ(0, op2.getNumWorkOrders());
  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kFeedInputBlock));

  unique_ptr<WorkerMessage> worker_message;
  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));

  EXPECT_TRUE(worker_message != nullptr);
  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
            worker_message->getType());
  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());

  delete worker_message->getWorkOrder();

  // Send a message to QueryManager upon block getting full (output of op1).
  EXPECT_FALSE(placeOutputBlockMessage(id1));

  // op1 is not finished yet because the response of workorder completion hasn't
  // been received yet by the QueryManager.
  EXPECT_FALSE(getOperatorFinishedStatus(id1));
  EXPECT_FALSE(getOperatorFinishedStatus(id2));

  // No additional call to op1's getAllWorkOrders.
  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
  EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));

  // Output from op1 should be fed to op2.
  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kFeedInputBlock));

  // A call to op2's getAllWorkOrders because of the streamed input.
  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
  EXPECT_EQ(1, op2.getNumWorkOrders());

  // Place a message of a workorder completion of op1 on Foreman's input queue.
  EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));

  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));
  EXPECT_TRUE(getOperatorFinishedStatus(id1));
  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));

  // An additional call to op2's getAllWorkOrders because of completion of op1.
  EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
  EXPECT_EQ(2, op2.getNumWorkOrders());

  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));

  EXPECT_TRUE(worker_message != nullptr);
  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
            worker_message->getType());
  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());

  delete worker_message->getWorkOrder();

  // Place a message of a workorder completion of op2 on Foreman's input queue.
  EXPECT_FALSE(placeWorkOrderCompleteMessage(id2));

  EXPECT_TRUE(getOperatorFinishedStatus(id1));

  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));

  EXPECT_TRUE(worker_message != nullptr);
  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
            worker_message->getType());
  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());

  delete worker_message->getWorkOrder();

  EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
  EXPECT_FALSE(getOperatorFinishedStatus(id2));

  // Send a message to Foreman upon workorder (generated by op2) completion.
  EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));

  EXPECT_TRUE(getOperatorFinishedStatus(id1));
  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));

  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));
  EXPECT_TRUE(getOperatorFinishedStatus(id2));
}

TEST_F(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest) {
  // In this test, we create a 2-node DAG with a non-blocking link between them.
  // There is no streaming of data from op1 to op2 during the execution of op1.
  // op1 produces a partially filled block at the end of its execution which is
  // rebuilt and then fed to op2.
  const QueryPlan::DAGNodeIndex id1 =
      query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
  const QueryPlan::DAGNodeIndex id2 =
      query_plan_->addRelationalOperator(new MockOperator(true, true, 2, 1));

  // Create a non-blocking link.
  query_plan_->addDirectDependency(id2, id1, false);

  // Create a relation, owned by db_.*/
  CatalogRelation *relation =
      new CatalogRelation(nullptr /* catalog_database */, "test_relation");
  const relation_id output_relation_id = db_->addRelation(relation);

  // Setup the InsertDestination proto in the query context proto.
  serialization::QueryContext *query_context_proto =
      query_handle_->getQueryContextProtoMutable();

  const QueryContext::insert_destination_id insert_destination_index =
      query_context_proto->insert_destinations_size();
  serialization::InsertDestination *insert_destination_proto =
      query_context_proto->add_insert_destinations();

  insert_destination_proto->set_insert_destination_type(
      serialization::InsertDestinationType::BLOCK_POOL);
  insert_destination_proto->set_relation_id(output_relation_id);
  insert_destination_proto->set_relational_op_index(id1);

  MockOperator *op1_mutable = static_cast<MockOperator *>(
      query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(id1));
  op1_mutable->setInsertDestinationID(insert_destination_index);
  op1_mutable->setOutputRelationID(output_relation_id);

  const MockOperator &op1 = static_cast<const MockOperator &>(
      query_plan_->getQueryPlanDAG().getNodePayload(id1));
  const MockOperator &op2 = static_cast<const MockOperator &>(
      query_plan_->getQueryPlanDAG().getNodePayload(id2));

  constructQueryManager();

  // NOTE(zuyu): An operator generally has no ideas about partially filled
  // blocks, but InsertDestination in QueryContext does.
  // Mock to add partially filled blocks in the InsertDestination.
  InsertDestination *insert_destination =
      query_manager_->getQueryContextMutable()->getInsertDestination(
          insert_destination_index);
  DCHECK(insert_destination != nullptr);
  MutableBlockReference block_ref;
  static_cast<BlockPoolInsertDestination *>(insert_destination)
      ->available_block_refs_.push_back(move(block_ref));

  EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
  EXPECT_EQ(1, op1.getNumWorkOrders());

  EXPECT_EQ(0, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
  EXPECT_EQ(0, op2.getNumWorkOrders());

  unique_ptr<WorkerMessage> worker_message;
  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));

  EXPECT_TRUE(worker_message != nullptr);
  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
            worker_message->getType());
  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());

  delete worker_message->getWorkOrder();

  // Send a message to QueryManager upon workorder (generated by op1) completion.
  EXPECT_FALSE(placeWorkOrderCompleteMessage(id1));

  EXPECT_EQ(0, getNumWorkOrdersInExecution(id1));

  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));
  EXPECT_TRUE(worker_message != nullptr);
  EXPECT_EQ(WorkerMessage::WorkerMessageType::kRebuildWorkOrder,
            worker_message->getType());

  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());

  delete worker_message->getWorkOrder();

  // op1 generates a rebuild workorder. The block is rebuilt and streamed
  // to Foreman.
  EXPECT_FALSE(placeDataPipelineMessage(id1));

  EXPECT_FALSE(placeRebuildWorkOrderCompleteMessage(id1));
  // Based on the streamed input, op2's getAllWorkOrders should produce a
  // workorder.
  EXPECT_EQ(2, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
  EXPECT_EQ(1, op2.getNumWorkOrders());

  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));

  EXPECT_TRUE(worker_message != nullptr);
  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
            worker_message->getType());

  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());

  delete worker_message->getWorkOrder();

  EXPECT_TRUE(getOperatorFinishedStatus(id1));
  EXPECT_EQ(1, op2.getNumCalls(MockOperator::kDoneFeedingInputBlocks));
  EXPECT_FALSE(getOperatorFinishedStatus(id2));
  EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));

  // Send a message to QueryManager upon workorder (generated by op2) completion.
  EXPECT_TRUE(placeWorkOrderCompleteMessage(id2));

  EXPECT_EQ(0, getNumWorkOrdersInExecution(id2));

  EXPECT_TRUE(getOperatorFinishedStatus(id2));
}

TEST_F(QueryManagerTest, MultipleNodesNoOutputTest) {
  // When an operator produces workorders but no output, the QueryManager should
  // check the dependents of this operator to make progress.
  const QueryPlan::DAGNodeIndex kNumNodes = 5;

  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
    if (i == 0) {
      query_plan_->addRelationalOperator(new MockOperator(true, false));
    } else {
      query_plan_->addRelationalOperator(new MockOperator(true, true));
    }
    VLOG(3) << i;
  }

  /**
   * The DAG looks like this:
   *
   * op1 -> op2 -> op3 -> op4 -> op5
   *
   **/
  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes - 1; ++i) {
    query_plan_->addDirectDependency(i + 1, i, false);
    static_cast<MockOperator*>(query_plan_->getQueryPlanDAGMutable()->getNodePayloadMutable(i))
        ->setOutputRelationID(0xdead);
  }

  std::vector<const MockOperator*> operators;
  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
    operators.push_back(static_cast<const MockOperator*>(&query_plan_->getQueryPlanDAG().getNodePayload(i)));
  }

  constructQueryManager();

  // operators[0] should have produced a workorder by now.
  EXPECT_EQ(1, operators[0]->getNumCalls(MockOperator::kGetAllWorkOrders));
  EXPECT_EQ(1, operators[0]->getNumWorkOrders());

  unique_ptr<WorkerMessage> worker_message;
  worker_message.reset(query_manager_->getNextWorkerMessage(0, -1));

  EXPECT_TRUE(worker_message != nullptr);
  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
            worker_message->getType());

  EXPECT_EQ(0, worker_message->getRelationalOpIndex());

  delete worker_message->getWorkOrder();

  EXPECT_EQ(1, getNumWorkOrdersInExecution(0));
  EXPECT_FALSE(getOperatorFinishedStatus(0));

  for (QueryPlan::DAGNodeIndex i = 1; i < kNumNodes; ++i) {
    EXPECT_EQ(0, operators[i]->getNumCalls(MockOperator::kGetAllWorkOrders));
  }

  // Send a message to QueryManager upon workorder (generated by operators[0])
  // completion.
  EXPECT_TRUE(placeWorkOrderCompleteMessage(0));

  for (QueryPlan::DAGNodeIndex i = 0; i < kNumNodes; ++i) {
    EXPECT_EQ(0, getNumWorkOrdersInExecution(i));
    EXPECT_TRUE(getOperatorFinishedStatus(i));
    if (i < kNumNodes - 1) {
      EXPECT_EQ(1, operators[i + 1]->getNumCalls(MockOperator::kDoneFeedingInputBlocks));
    }
  }
}

TEST_F(QueryManagerTest, OutOfOrderWorkOrderCompletionTest) {
  // Consider two operators, both generate one workorder each. The dependent's
  // workorder finishes before dependency's workorder.
  const QueryPlan::DAGNodeIndex id1 = query_plan_->addRelationalOperator(new MockOperator(true, false, 1));
  const QueryPlan::DAGNodeIndex id2 = query_plan_->addRelationalOperator(new MockOperator(true, true, 2, 1));

  // Create a non-blocking link.
  query_plan_->addDirectDependency(id2, id1, false);

  constructQueryManager();

  unique_ptr<WorkerMessage> worker_message;
  worker_message.reset(query_manager_->getNextWorkerMessage(id1, -1));

  EXPECT_TRUE(worker_message != nullptr);
  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
            worker_message->getType());

  EXPECT_EQ(id1, worker_message->getRelationalOpIndex());

  delete worker_message->getWorkOrder();

  // Send a message to QueryManager upon a block (output of op1) getting full.
  EXPECT_FALSE(placeOutputBlockMessage(id1));

  // op1 is not finished yet because the response of workorder completion hasn't
  // been received yet.
  EXPECT_FALSE(getOperatorFinishedStatus(id1));
  EXPECT_FALSE(getOperatorFinishedStatus(id2));

  worker_message.reset(query_manager_->getNextWorkerMessage(id2, -1));
  EXPECT_TRUE(worker_message != nullptr);
  EXPECT_EQ(WorkerMessage::WorkerMessageType::kWorkOrder,
            worker_message->getType());

  EXPECT_EQ(id2, worker_message->getRelationalOpIndex());

  delete worker_message->getWorkOrder();

  // As mentioned earlier, op2 finishes before op1.
  EXPECT_FALSE(placeWorkOrderCompleteMessage(id2));

  // op1's workorder execution is over.
  EXPECT_TRUE(placeWorkOrderCompleteMessage(id1));

  EXPECT_TRUE(getOperatorFinishedStatus(id1));
  EXPECT_TRUE(getOperatorFinishedStatus(id2));
}

}  // namespace quickstep
