/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 **/

#ifndef QUICKSTEP_RELATIONAL_OPERATORS_NESTED_LOOPS_JOIN_OPERATOR_HPP_
#define QUICKSTEP_RELATIONAL_OPERATORS_NESTED_LOOPS_JOIN_OPERATOR_HPP_

#include <cstddef>
#include <memory>
#include <string>
#include <vector>

#include "catalog/CatalogRelation.hpp"
#include "catalog/CatalogTypedefs.hpp"
#include "catalog/PartitionScheme.hpp"
#include "catalog/PartitionSchemeHeader.hpp"
#include "query_execution/QueryContext.hpp"
#include "relational_operators/RelationalOperator.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "utility/Macros.hpp"

#include "glog/logging.h"

#include "tmb/id_typedefs.h"

namespace tmb { class MessageBus; }

namespace quickstep {

class CatalogRelationSchema;
class InsertDestination;
class Predicate;
class Scalar;
class StorageManager;
class TupleStorageSubBlock;
class WorkOrderProtosContainer;
class WorkOrdersContainer;

namespace serialization { class WorkOrder; }

/** \addtogroup RelationalOperators
 *  @{
 */

/**
 * @brief An operator which performs a simple (SLOW) nested-loops join on two
 *        relations.
 **/
class NestedLoopsJoinOperator : public RelationalOperator {
 public:
  /**
   * @brief Constructor.
   *
   * @param query_id The ID of the query to which this operator belongs.
   * @param nested_loops_join_index The ID of this operator.
   * @param left_input_relation The first relation in the join (order is not
   *        actually important).
   * @param right_input_relation The second relation in the join (order is not
   *        actually important).
   * @param num_partitions The number of partitions.
   * @param has_repartition Whether this operator does repartition.
   * @param output_relation The output relation.
   * @param output_destination_index The index of the InsertDestination in the
   *        QueryContext to insert the join results.
   * @param join_predicate_index The index of join predicate in QueryContext to
   *        evaluate for each pair of tuples in the input relations.
   *        (cannot be kInvalidPredicateId).
   * @param selection_index The group index of Scalars in QueryContext,
   *        corresponding to the attributes of the relation referred by
   *        output_relation_id. Each Scalar is evaluated for the joined tuples,
   *        and the resulting value is inserted into the join result.
   * @param left_relation_is_stored If left_input_relation is a stored relation.
   * @param right_relation_is_stored If right_input_relation is a stored
   *                                 relation.
   **/
  NestedLoopsJoinOperator(
      const std::size_t query_id,
      const std::size_t nested_loops_join_index,
      const CatalogRelation &left_input_relation,
      const CatalogRelation &right_input_relation,
      const std::size_t num_partitions,
      const bool has_repartition,
      const CatalogRelation &output_relation,
      const QueryContext::insert_destination_id output_destination_index,
      const QueryContext::predicate_id join_predicate_index,
      const QueryContext::scalar_group_id selection_index,
      const bool left_relation_is_stored,
      const bool right_relation_is_stored)
      : RelationalOperator(query_id, num_partitions, has_repartition, output_relation.getNumPartitions()),
        nested_loops_join_index_(nested_loops_join_index),
        left_input_relation_(left_input_relation),
        right_input_relation_(right_input_relation),
        output_relation_(output_relation),
        output_destination_index_(output_destination_index),
        join_predicate_index_(join_predicate_index),
        selection_index_(selection_index),
        left_relation_is_stored_(left_relation_is_stored),
        right_relation_is_stored_(right_relation_is_stored),
        left_relation_block_ids_(num_partitions),
        right_relation_block_ids_(num_partitions),
        num_left_workorders_generated_(num_partitions),
        num_right_workorders_generated_(num_partitions),
        done_feeding_left_relation_(false),
        done_feeding_right_relation_(false),
        all_workorders_generated_(false) {
    DCHECK_NE(join_predicate_index_, QueryContext::kInvalidPredicateId);

    if (left_relation_is_stored) {
      if (left_input_relation_.hasPartitionScheme()) {
        const PartitionScheme &part_scheme = *left_input_relation_.getPartitionScheme();
        DCHECK_EQ(num_partitions_, part_scheme.getPartitionSchemeHeader().getNumPartitions());
        for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
          left_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
        }
      } else {
        DCHECK_EQ(1u, num_partitions_);
        left_relation_block_ids_[0] = left_input_relation_.getBlocksSnapshot();
      }
    }

    if (right_relation_is_stored) {
      if (right_input_relation_.hasPartitionScheme()) {
        const PartitionScheme &part_scheme = *right_input_relation_.getPartitionScheme();
        DCHECK_EQ(num_partitions_, part_scheme.getPartitionSchemeHeader().getNumPartitions());
        for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
          right_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
        }
      } else {
        // Broadcast right (smaller) side upon partitioned nlj.
        for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
          right_relation_block_ids_[part_id] = right_input_relation_.getBlocksSnapshot();
        }
      }
    }
  }

  ~NestedLoopsJoinOperator() override {}

  OperatorType getOperatorType() const override {
    return kNestedLoopsJoin;
  }

  std::string getName() const override {
    return "NestedLoopsJoinOperator";
  }

  bool getAllWorkOrders(WorkOrdersContainer *container,
                        QueryContext *query_context,
                        StorageManager *storage_manager,
                        const tmb::client_id scheduler_client_id,
                        tmb::MessageBus *bus) override;

  bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;

  void doneFeedingInputBlocks(const relation_id rel_id) override {
    if (rel_id == left_input_relation_.getID()) {
      done_feeding_left_relation_ = true;
    } else if (rel_id == right_input_relation_.getID()) {
      done_feeding_right_relation_ = true;
    } else {
      FATAL_ERROR("Wrong relation ID in doneFeedingInputBlocks method.");
    }
  }

  void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
                      const partition_id part_id) override {
    if (input_relation_id == left_input_relation_.getID()) {
      left_relation_block_ids_[part_id].push_back(input_block_id);
    } else if (input_relation_id == right_input_relation_.getID()) {
      // Broadcast right (smaller) side upon partitioned nlj.
      for (partition_id input_part_id = 0; input_part_id < num_partitions_; ++input_part_id) {
        right_relation_block_ids_[input_part_id].push_back(input_block_id);
      }
    } else {
      LOG(FATAL) << "The input block sent to the NestedLoopsJoinOperator belongs "
                 << "to a different relation than the left and right relations";
    }
  }

  QueryContext::insert_destination_id getInsertDestinationID() const override {
    return output_destination_index_;
  }

  const relation_id getOutputRelationID() const override {
    return output_relation_.getID();
  }

 private:
  /**
   * @brief Pairs block IDs from left and right relation block IDs and generates
   *        NestedLoopsJoinWorkOrders and pushes them to the WorkOrdersContainer
   *        when both relations are not stored relations.
   *
   * @param container A pointer to the WorkOrdersContainer to store the
   *                  resulting WorkOrders.
   * @param query_context The QueryContext that stores query execution states.
   * @param storage_manager The StorageManager to use.
   * @param part_id The partition ID.
   * @param left_min The starting index in left_relation_block_ids_ from where
   *                 we begin generating NestedLoopsJoinWorkOrders.
   * @param left_max The index in left_relation_block_ids_ until which we
   *                 generate NestedLoopsJoinWorkOrders (excluding left_max).
   * @param right_min The starting index in right_relation_block_ids_ from where
   *                  we begin generating NestedLoopsJoinWorkOrders.
   * @param right_max The index in right_relation_block_ids_ until which we
   *                  generate NestedLoopsJoinWorkOrders. (excluding right_max).
   *
   * @return The number of workorders generated during the execution of this
   *         function.
   **/
  std::size_t getAllWorkOrdersHelperBothNotStored(WorkOrdersContainer *container,
                                                  QueryContext *query_context,
                                                  StorageManager *storage_manager,
                                                  const partition_id part_id,
                                                  std::vector<block_id>::size_type left_min,
                                                  std::vector<block_id>::size_type left_max,
                                                  std::vector<block_id>::size_type right_min,
                                                  std::vector<block_id>::size_type right_max);

  /**
   * @brief Pairs block IDs from left and right relation block IDs and generates
   *        NestedLoopsJoinWorkOrders and pushes them to the WorkOrdersContainer
   *        when only one relation is a stored relation.
   *
   * @param container A pointer to the WorkOrdersContainer to store the
   *                  resulting WorkOrders.
   * @param query_context The QueryContext that stores query execution states.
   * @param storage_manager The StorageManager to use.
   *
   * @return Whether all work orders have been generated.
   **/
  bool getAllWorkOrdersHelperOneStored(WorkOrdersContainer *container,
                                       QueryContext *query_context,
                                       StorageManager *storage_manager);

  /**
   * @brief Pairs block IDs from left and right relation block IDs and generates
   *        NestedLoopsJoinWorkOrder protos and pushes them to the
   *        WorkOrderProtosContainer when both relations are not stored
   *        relations.
   *
   * @param container A pointer to the WorkOrderProtosContainer to store the
   *                  resulting WorkOrder protos.
   * @param left_min The starting index in left_relation_block_ids_ from where
   *                 we begin generating NestedLoopsJoinWorkOrders.
   * @param left_max The index in left_relation_block_ids_ until which we
   *                 generate NestedLoopsJoinWorkOrders (excluding left_max).
   * @param right_min The starting index in right_relation_block_ids_ from where
   *                  we begin generating NestedLoopsJoinWorkOrders.
   * @param right_max The index in right_relation_block_ids_ until which we
   *                  generate NestedLoopsJoinWorkOrders. (excluding right_max).
   *
   * @return The number of workorder protos generated during the execution of this
   *         function.
   **/
  std::size_t getAllWorkOrderProtosHelperBothNotStored(WorkOrderProtosContainer *container,
                                                       const partition_id part_id,
                                                       const std::vector<block_id>::size_type left_min,
                                                       const std::vector<block_id>::size_type left_max,
                                                       const std::vector<block_id>::size_type right_min,
                                                       const std::vector<block_id>::size_type right_max);

  /**
   * @brief Pairs block IDs from left and right relation block IDs and generates
   *        NestedLoopsJoinWorkOrder protos and pushes them to the
   *        WorkOrderProtosContainer when only one relation is a stored relation.
   *
   * @param container A pointer to the WorkOrderProtosContainer to store the
   *                  resulting WorkOrder protos.
   *
   * @return Whether all work orders have been generated.
   **/
  bool getAllWorkOrderProtosHelperOneStored(WorkOrderProtosContainer *container);

  serialization::WorkOrder* createWorkOrderProto(const partition_id part_id,
                                                 const block_id left_block,
                                                 const block_id right_block);

  const std::size_t nested_loops_join_index_;

  const CatalogRelation &left_input_relation_;
  const CatalogRelation &right_input_relation_;

  const CatalogRelation &output_relation_;
  const QueryContext::insert_destination_id output_destination_index_;

  const QueryContext::predicate_id join_predicate_index_;
  const QueryContext::scalar_group_id selection_index_;

  const bool left_relation_is_stored_;
  const bool right_relation_is_stored_;

  std::vector<BlocksInPartition> left_relation_block_ids_;
  std::vector<BlocksInPartition> right_relation_block_ids_;

  // At a given point of time, we have paired num_left_workorders_generated[part_id]
  // number of blocks from the left relation with num_right_workorders_generated[part_id]
  // number of blocks from the right relation for a given 'part_id'.
  std::vector<std::size_t> num_left_workorders_generated_;
  std::vector<std::size_t> num_right_workorders_generated_;

  bool done_feeding_left_relation_;
  bool done_feeding_right_relation_;

  // Applicable only when both the relations are stored relations.
  bool all_workorders_generated_;

  DISALLOW_COPY_AND_ASSIGN(NestedLoopsJoinOperator);
};

/**
 * @brief A WorkOrder produced by NestedLoopsJoinOperator
 **/
class NestedLoopsJoinWorkOrder : public WorkOrder {
 public:
  /**
   * @brief Constructor.
   *
   * @param query_id The ID of the query to which this operator belongs.
   * @param left_input_relation The first relation in the join (order is not
   *        actually important).
   * @param right_input_relation The second relation in the join (order is not
   *        actually important).
   * @param part_id The partition id.
   * @param left_block_id The block id of the first relation.
   * @param right_block_id The block id of the second relation.
   * @param join_predicate The join predicate to evaluate for each pair of
   *        tuples in the input relations. (cannot be NULL).
   * @param selection A list of Scalars corresponding to the relation attributes
   *        in \c output_destination. Each Scalar is evaluated for the joined
   *        tuples, and the resulting value is inserted into the join result.
   * @param output_destination The InsertDestination to insert the join results.
   * @param storage_manager The StorageManager to use.
   **/
  NestedLoopsJoinWorkOrder(
      const std::size_t query_id,
      const CatalogRelationSchema &left_input_relation,
      const CatalogRelationSchema &right_input_relation,
      const partition_id part_id,
      const block_id left_block_id,
      const block_id right_block_id,
      const Predicate *join_predicate,
      const std::vector<std::unique_ptr<const Scalar>> &selection,
      InsertDestination *output_destination,
      StorageManager *storage_manager)
      : WorkOrder(query_id, part_id),
        left_input_relation_(left_input_relation),
        right_input_relation_(right_input_relation),
        left_block_id_(left_block_id),
        right_block_id_(right_block_id),
        join_predicate_(DCHECK_NOTNULL(join_predicate)),
        selection_(selection),
        output_destination_(DCHECK_NOTNULL(output_destination)),
        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}

  ~NestedLoopsJoinWorkOrder() override {}

  /**
   * @exception TupleTooLargeForBlock A tuple produced by this join was too
   *            large to insert into an empty block provided by
   *            output_destination_. Join may be partially complete (with
   *            some tuples inserted into the destination) when this exception
   *            is thrown, causing potential inconsistency.
   **/
  void execute() override;

 private:
  template <bool LEFT_PACKED, bool RIGHT_PACKED>
  void executeHelper(const TupleStorageSubBlock &left_store,
                     const TupleStorageSubBlock &right_store);

  const CatalogRelationSchema &left_input_relation_, &right_input_relation_;
  const block_id left_block_id_, right_block_id_;
  const Predicate *join_predicate_;
  const std::vector<std::unique_ptr<const Scalar>> &selection_;

  InsertDestination *output_destination_;
  StorageManager *storage_manager_;

  DISALLOW_COPY_AND_ASSIGN(NestedLoopsJoinWorkOrder);
};

/** @} */

}  // namespace quickstep

#endif  // QUICKSTEP_RELATIONAL_OPERATORS_NESTED_LOOPS_JOIN_OPERATOR_HPP_
