/**
 *   Copyright 2015-2016 Pivotal Software, Inc.
 *
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
 **/

#ifndef QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_
#define QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_

#include <cstddef>
#include <memory>

#include "catalog/CatalogDatabaseCache.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/WorkerDirectory.hpp"
#include "threading/Thread.hpp"
#include "utility/Macros.hpp"

#include "glog/logging.h"
#include "gtest/gtest_prod.h"

#include "tmb/address.h"
#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"

namespace quickstep {

class StorageManager;

namespace serialization {
class CatalogDatabase;
class QueryContext;
}  // namespace serialization

/** \addtogroup QueryExecution
 *  @{
 */

/**
 * @brief The Shiftboss accepts workorder protos from shiftboss, and assigns
 *        the workorders to workers.
 **/
class Shiftboss : public Thread {
 public:
  /**
   * @brief Constructor.
   *
   * @param bus A pointer to the TMB.
   * @param storage_manager The StorageManager to use.
   * @param workers A pointer to the WorkerDirectory.
   * @param cpu_id The ID of the CPU to which the Shiftboss thread can be pinned.
   *
   * @note If cpu_id is not specified, Shiftboss thread can be possibly moved
   *       around on different CPUs by the OS.
  **/
  Shiftboss(tmb::MessageBus *bus,
            StorageManager *storage_manager,
            WorkerDirectory *workers,
            const int cpu_id = -1)
      : bus_(DCHECK_NOTNULL(bus)),
        storage_manager_(DCHECK_NOTNULL(storage_manager)),
        workers_(DCHECK_NOTNULL(workers)),
        cpu_id_(cpu_id),
        shiftboss_client_id_(tmb::kClientIdNone),
        foreman_client_id_(tmb::kClientIdNone),
        max_msgs_per_worker_(1),
        start_worker_index_(0u) {
    // Check to have at least one Worker.
    DCHECK_GT(workers->getNumWorkers(), 0u);

    shiftboss_client_id_ = bus_->Connect();
    LOG(INFO) << "Shiftboss TMB client ID: " << shiftboss_client_id_;
    DCHECK_NE(shiftboss_client_id_, tmb::kClientIdNone);

    // Messages between Foreman and Shiftboss.
    bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossRegistrationMessage);
    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kShiftbossRegistrationResponseMessage);

    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kShiftbossInitiateMessage);
    bus_->RegisterClientAsSender(shiftboss_client_id_, kShiftbossInitiateResponseMessage);

    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kInitiateRebuildMessage);
    bus_->RegisterClientAsSender(shiftboss_client_id_, kInitiateRebuildResponseMessage);

    // Message sent to Worker.
    bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderMessage);

    // Message sent to Foreman.
    bus_->RegisterClientAsSender(shiftboss_client_id_, kCatalogRelationNewBlockMessage);
    bus_->RegisterClientAsSender(shiftboss_client_id_, kDataPipelineMessage);
    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrdersAvailableMessage);
    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderFeedbackMessage);

    // Forward the following message types from Foreman to Workers.
    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderMessage);
    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderMessage);

    // Forward the following message types from Workers to Foreman.
    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kWorkOrderCompleteMessage);
    bus_->RegisterClientAsSender(shiftboss_client_id_, kWorkOrderCompleteMessage);

    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);
    bus_->RegisterClientAsSender(shiftboss_client_id_, kRebuildWorkOrderCompleteMessage);

    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kQueryResultRelationMessage);
    bus_->RegisterClientAsSender(shiftboss_client_id_, kQueryResultRelationResponseMessage);

    // Stop itself.
    bus_->RegisterClientAsReceiver(shiftboss_client_id_, kPoisonMessage);
    // Stop all workers.
    bus_->RegisterClientAsSender(shiftboss_client_id_, kPoisonMessage);

    for (std::size_t i = 0; i < workers_->getNumWorkers(); ++i) {
      worker_addresses_.AddRecipient(workers_->getClientID(i));
    }

    registerWithForeman();
  }

  ~Shiftboss() override {
  }

  /**
   * @brief Get the TMB client ID of Shiftboss thread.
   *
   * @return TMB client ID of shiftboss thread.
   **/
  inline tmb::client_id getBusClientID() const {
    return shiftboss_client_id_;
  }

  /**
   * @brief Get the Work Order processing capacity of all Workers managed by
   *        Shiftboss during a single round of WorkOrder dispatch.
   **/
  inline std::size_t getWorkOrderCapacity() const {
    DCHECK_NE(max_msgs_per_worker_, 0u);
    return max_msgs_per_worker_ * workers_->getNumWorkers();
  }

  /**
   * @brief Get the Worker to assign WorkOrders for execution. Block to wait if
   *        all Workers have reached their capacity for queued WorkOrders.
   **/
  // TODO(zuyu): To achieve non-blocking, we need a queue to cache received
  // normal Work Order protos from Foreman and the generated rebuild Work Orders.
  inline std::size_t getSchedulableWorker();

  /**
   * @brief Set the maximum number of messages that should be allocated to each
   *        worker during a single round of WorkOrder dispatch.
   *
   * @param max_msgs_per_worker Maximum number of messages.
   **/
  inline void setMaxMessagesPerWorker(const std::size_t max_msgs_per_worker) {
    max_msgs_per_worker_ = max_msgs_per_worker;
  }

 protected:
  /**
   * @brief The shiftboss receives workorders, and based on the response it
   *        assigns workorders to workers.
   *
   * @note  The workers who get the messages from the Shiftboss execute and
   *        subsequently delete the WorkOrder contained in the message.
   **/
  void run() override;

 private:
  void registerWithForeman();

  /**
   * @brief Process the Shiftboss initiate message and ack back.
   *
   * @param query_id The given query id.
   * @param catalog_database_cache_proto The proto used to update
   *        CatalogDatabaseCache.
   * @param query_context_proto The QueryContext proto.
   **/
  void processShiftbossInitiateMessage(const std::size_t query_id,
                                       const serialization::CatalogDatabase &catalog_database_cache_proto,
                                       const serialization::QueryContext &query_context_proto);

  /**
   * @brief Process the RebuildWorkOrder initiate message and ack back.
   *
   * @param op_index The index of the operator for rebuild work orders.
   * @param dest_index The InsertDestination index in QueryContext to rebuild.
   * @param rel_id The relation that needs to generate rebuild work orders.
   **/
  void processInitiateRebuildMessage(const std::size_t op_index,
                                     const QueryContext::insert_destination_id dest_index,
                                     const relation_id rel_id);

  tmb::MessageBus *bus_;

  CatalogDatabaseCache database_cache_;
  StorageManager *storage_manager_;
  WorkerDirectory *workers_;

  // The ID of the CPU that the Shiftboss thread can optionally be pinned to.
  const int cpu_id_;

  tmb::client_id shiftboss_client_id_, foreman_client_id_;

  // TMB recipients for all workers managed by this Shiftboss.
  tmb::Address worker_addresses_;

  // During a single round of WorkOrder dispatch, a Worker should be allocated
  // at most these many WorkOrders.
  std::size_t max_msgs_per_worker_;

  // The worker index for scheduling Work Order.
  std::size_t start_worker_index_;

  // TODO(zuyu): Use query_id.
  std::unique_ptr<QueryContext> query_context_;

  DISALLOW_COPY_AND_ASSIGN(Shiftboss);
};

/** @} */

}  // namespace quickstep

#endif  // QUICKSTEP_QUERY_EXECUTION_SHIFTBOSS_HPP_
