/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 **/

#include "relational_operators/NestedLoopsJoinOperator.hpp"

#include <cstddef>
#include <memory>
#include <utility>
#include <vector>

#include "catalog/CatalogRelationSchema.hpp"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
#include "relational_operators/WorkOrder.pb.h"
#include "storage/InsertDestination.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
#include "storage/TupleStorageSubBlock.hpp"
#include "storage/ValueAccessor.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"
#include "utility/ColumnVectorCache.hpp"

#include "glog/logging.h"

#include "tmb/id_typedefs.h"

using std::unique_ptr;
using std::vector;

namespace quickstep {

bool NestedLoopsJoinOperator::getAllWorkOrders(
    WorkOrdersContainer *container,
    QueryContext *query_context,
    StorageManager *storage_manager,
    const tmb::client_id scheduler_client_id,
    tmb::MessageBus *bus) {
  if (left_relation_is_stored_ && right_relation_is_stored_) {
    // Make sure we generate workorders only once.
    if (!all_workorders_generated_) {
      for (const block_id left_block_id : left_relation_block_ids_) {
        for (const block_id right_block_id : right_relation_block_ids_) {
          container->addNormalWorkOrder(
              new NestedLoopsJoinWorkOrder(
                  query_id_,
                  left_input_relation_,
                  right_input_relation_,
                  left_block_id,
                  right_block_id,
                  query_context->getPredicate(join_predicate_index_),
                  query_context->getScalarGroup(selection_index_),
                  query_context->getInsertDestination(
                      output_destination_index_),
                  storage_manager),
              op_index_);
        }
      }
      all_workorders_generated_ = true;
    }
    return all_workorders_generated_;
  } else if (!(left_relation_is_stored_ || right_relation_is_stored_)) {
    // Both relations are not stored.
    std::vector<block_id>::size_type new_left_blocks
        = left_relation_block_ids_.size() - num_left_workorders_generated_;
    std::vector<block_id>::size_type new_right_blocks
        = right_relation_block_ids_.size() - num_right_workorders_generated_;

    std::size_t new_workorders = 0;
    if (new_left_blocks > 0 && new_right_blocks > 0) {
      // Blocks added to both left and right relations.
      // First generate (left + new_left_blocks) * (new_right_blocks).
      new_workorders = getAllWorkOrdersHelperBothNotStored(container,
                                                           query_context,
                                                           storage_manager,
                                                           0,
                                                           left_relation_block_ids_.size(),
                                                           num_right_workorders_generated_,
                                                           right_relation_block_ids_.size());

      // Now generate new_left_blocks * (right).
      new_workorders += getAllWorkOrdersHelperBothNotStored(container,
                                                            query_context,
                                                            storage_manager,
                                                            num_left_workorders_generated_,
                                                            left_relation_block_ids_.size(),
                                                            0,
                                                            num_right_workorders_generated_);
    } else if (new_left_blocks == 0 && new_right_blocks > 0) {
      // Only new right blocks are added. Generate left * new_right_blocks.
      new_workorders = getAllWorkOrdersHelperBothNotStored(container,
                                                           query_context,
                                                           storage_manager,
                                                           0,
                                                           left_relation_block_ids_.size(),
                                                           num_right_workorders_generated_,
                                                           right_relation_block_ids_.size());
    } else if (new_left_blocks > 0 && new_right_blocks == 0) {
      // Generate new_left_blocks * right
      new_workorders = getAllWorkOrdersHelperBothNotStored(container,
                                                           query_context,
                                                           storage_manager,
                                                           num_left_workorders_generated_,
                                                           left_relation_block_ids_.size(),
                                                           0,
                                                           right_relation_block_ids_.size());
    }
    if (new_workorders > 0) {
      num_left_workorders_generated_ = left_relation_block_ids_.size();
      num_right_workorders_generated_ = right_relation_block_ids_.size();
    }
    return done_feeding_left_relation_ && done_feeding_right_relation_;
  } else {
    // Only one relation is a stored relation.
    return getAllWorkOrdersHelperOneStored(container, query_context, storage_manager);
  }
}

bool NestedLoopsJoinOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
  if (left_relation_is_stored_ && right_relation_is_stored_) {
    // Make sure we generate workorders only once.
    if (!all_workorders_generated_) {
      for (const block_id left_block_id : left_relation_block_ids_) {
        for (const block_id right_block_id : right_relation_block_ids_) {
          container->addWorkOrderProto(createWorkOrderProto(left_block_id, right_block_id),
                                       op_index_);
        }
      }
      all_workorders_generated_ = true;
    }
    return true;
  } else if (!(left_relation_is_stored_ || right_relation_is_stored_)) {
    // Both relations are not stored.
    const std::vector<block_id>::size_type new_left_blocks
        = left_relation_block_ids_.size() - num_left_workorders_generated_;
    const std::vector<block_id>::size_type new_right_blocks
        = right_relation_block_ids_.size() - num_right_workorders_generated_;

    std::size_t new_workorders = 0;
    if (new_left_blocks > 0 && new_right_blocks > 0) {
      // Blocks added to both left and right relations.
      // First generate (left + new_left_blocks) * (new_right_blocks).
      new_workorders =
          getAllWorkOrderProtosHelperBothNotStored(container,
                                                   0,
                                                   left_relation_block_ids_.size(),
                                                   num_right_workorders_generated_,
                                                   right_relation_block_ids_.size());

      // Now generate new_left_blocks * (right).
      new_workorders +=
          getAllWorkOrderProtosHelperBothNotStored(container,
                                                   num_left_workorders_generated_,
                                                   left_relation_block_ids_.size(),
                                                   0,
                                                   num_right_workorders_generated_);
    } else if (new_left_blocks == 0 && new_right_blocks > 0) {
      // Only new right blocks are added. Generate left * new_right_blocks.
      new_workorders =
          getAllWorkOrderProtosHelperBothNotStored(container,
                                                   0,
                                                   left_relation_block_ids_.size(),
                                                   num_right_workorders_generated_,
                                                   right_relation_block_ids_.size());
    } else if (new_left_blocks > 0 && new_right_blocks == 0) {
      // Generate new_left_blocks * right
      new_workorders =
          getAllWorkOrderProtosHelperBothNotStored(container,
                                                   num_left_workorders_generated_,
                                                   left_relation_block_ids_.size(),
                                                   0,
                                                   right_relation_block_ids_.size());
    }
    if (new_workorders > 0) {
      num_left_workorders_generated_ = left_relation_block_ids_.size();
      num_right_workorders_generated_ = right_relation_block_ids_.size();
    }
    return done_feeding_left_relation_ && done_feeding_right_relation_;
  } else {
    // Only one relation is a stored relation.
    return getAllWorkOrderProtosHelperOneStored(container);
  }
}

std::size_t NestedLoopsJoinOperator::getAllWorkOrdersHelperBothNotStored(WorkOrdersContainer *container,
                                                                         QueryContext *query_context,
                                                                         StorageManager *storage_manager,
                                                                         std::vector<block_id>::size_type left_min,
                                                                         std::vector<block_id>::size_type left_max,
                                                                         std::vector<block_id>::size_type right_min,
                                                                         std::vector<block_id>::size_type right_max) {
  DCHECK(!(left_relation_is_stored_ || right_relation_is_stored_));
  DCHECK(left_min <= left_max);
  DCHECK(right_min <= right_max);

  for (std::vector<block_id>::size_type left_index = left_min;
       left_index < left_max;
       ++left_index) {
    for (std::vector<block_id>::size_type right_index = right_min;
         right_index < right_max;
         ++right_index) {
      container->addNormalWorkOrder(
          new NestedLoopsJoinWorkOrder(
              query_id_,
              left_input_relation_,
              right_input_relation_,
              left_relation_block_ids_[left_index],
              right_relation_block_ids_[right_index],
              query_context->getPredicate(join_predicate_index_),
              query_context->getScalarGroup(selection_index_),
              query_context->getInsertDestination(output_destination_index_),
              storage_manager),
          op_index_);
    }
  }
  // Return the number of workorders produced.
  return (left_max - left_min) * (right_max - right_min);
}

bool NestedLoopsJoinOperator::getAllWorkOrdersHelperOneStored(WorkOrdersContainer *container,
                                                              QueryContext *query_context,
                                                              StorageManager *storage_manager) {
  DCHECK(left_relation_is_stored_ ^ right_relation_is_stored_);
  DCHECK(query_context != nullptr);

  const Predicate *join_predicate = query_context->getPredicate(join_predicate_index_);
  const vector<unique_ptr<const Scalar>> &selection =
      query_context->getScalarGroup(selection_index_);
  InsertDestination *output_destination =
      query_context->getInsertDestination(output_destination_index_);

  if (left_relation_is_stored_) {
    for (std::vector<block_id>::size_type right_index = num_right_workorders_generated_;
         right_index < right_relation_block_ids_.size();
         ++right_index) {
      for (const block_id left_block_id : left_relation_block_ids_) {
        container->addNormalWorkOrder(
            new NestedLoopsJoinWorkOrder(
                query_id_,
                left_input_relation_,
                right_input_relation_,
                left_block_id,
                right_relation_block_ids_[right_index],
                join_predicate,
                selection,
                output_destination,
                storage_manager),
            op_index_);
      }
    }
    num_right_workorders_generated_ = right_relation_block_ids_.size();
    return done_feeding_right_relation_;
  } else {
    for (std::vector<block_id>::size_type left_index = num_left_workorders_generated_;
         left_index < left_relation_block_ids_.size();
         ++left_index) {
      for (const block_id right_block_id : right_relation_block_ids_) {
        container->addNormalWorkOrder(
            new NestedLoopsJoinWorkOrder(query_id_,
                                         left_input_relation_,
                                         right_input_relation_,
                                         left_relation_block_ids_[left_index],
                                         right_block_id,
                                         join_predicate,
                                         selection,
                                         output_destination,
                                         storage_manager),
            op_index_);
      }
    }
    num_left_workorders_generated_ = left_relation_block_ids_.size();
    return done_feeding_left_relation_;
  }
}

std::size_t NestedLoopsJoinOperator::getAllWorkOrderProtosHelperBothNotStored(
    WorkOrderProtosContainer *container,
    const std::vector<block_id>::size_type left_min,
    const std::vector<block_id>::size_type left_max,
    const std::vector<block_id>::size_type right_min,
    const std::vector<block_id>::size_type right_max) {
  DCHECK(!(left_relation_is_stored_ || right_relation_is_stored_));
  DCHECK_LE(left_min, left_max);
  DCHECK_LE(right_min, right_max);

  for (std::vector<block_id>::size_type left_index = left_min;
       left_index < left_max;
       ++left_index) {
    for (std::vector<block_id>::size_type right_index = right_min;
         right_index < right_max;
         ++right_index) {
      container->addWorkOrderProto(
          createWorkOrderProto(left_relation_block_ids_[left_index], right_relation_block_ids_[right_index]),
          op_index_);
    }
  }
  // Return the number of workorders produced.
  return (left_max - left_min) * (right_max - right_min);
}

bool NestedLoopsJoinOperator::getAllWorkOrderProtosHelperOneStored(WorkOrderProtosContainer *container) {
  DCHECK(left_relation_is_stored_ ^ right_relation_is_stored_);

  if (left_relation_is_stored_) {
    for (std::vector<block_id>::size_type right_index = num_right_workorders_generated_;
         right_index < right_relation_block_ids_.size();
         ++right_index) {
      for (const block_id left_block_id : left_relation_block_ids_) {
        container->addWorkOrderProto(
            createWorkOrderProto(left_block_id, right_relation_block_ids_[right_index]),
            op_index_);
      }
    }
    num_right_workorders_generated_ = right_relation_block_ids_.size();
    return done_feeding_right_relation_;
  } else {
    for (std::vector<block_id>::size_type left_index = num_left_workorders_generated_;
         left_index < left_relation_block_ids_.size();
         ++left_index) {
      for (const block_id right_block_id : right_relation_block_ids_) {
        container->addWorkOrderProto(
            createWorkOrderProto(left_relation_block_ids_[left_index], right_block_id),
            op_index_);
      }
    }
    num_left_workorders_generated_ = left_relation_block_ids_.size();
    return done_feeding_left_relation_;
  }
}

serialization::WorkOrder* NestedLoopsJoinOperator::createWorkOrderProto(const block_id left_block,
                                                                        const block_id right_block) {
  serialization::WorkOrder *proto = new serialization::WorkOrder;
  proto->set_work_order_type(serialization::NESTED_LOOP_JOIN);
  proto->set_query_id(query_id_);

  proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::left_relation_id, left_input_relation_.getID());
  proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::right_relation_id, right_input_relation_.getID());
  proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::left_block_id, left_block);
  proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::right_block_id, right_block);
  proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::insert_destination_index,
                      output_destination_index_);
  proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::join_predicate_index, join_predicate_index_);
  proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::selection_index, selection_index_);

  return proto;
}


template <bool LEFT_PACKED, bool RIGHT_PACKED>
void NestedLoopsJoinWorkOrder::executeHelper(const TupleStorageSubBlock &left_store,
                                             const TupleStorageSubBlock &right_store) {
  const relation_id left_input_relation_id = left_input_relation_.getID();
  const relation_id right_input_relation_id = right_input_relation_.getID();

  const tuple_id left_max_tid = left_store.getMaxTupleID();
  const tuple_id right_max_tid = right_store.getMaxTupleID();

  std::unique_ptr<ValueAccessor> left_accessor(left_store.createValueAccessor());
  std::unique_ptr<ValueAccessor> right_accessor(right_store.createValueAccessor());

  // Check each pair of tuples in the two blocks and build up a list of
  // matches.
  //
  // TODO(chasseur): Vectorize evaluation of this join predicate.
  std::vector<std::pair<tuple_id, tuple_id>> joined_tuple_ids;
  for (tuple_id left_tid = 0; left_tid <= left_max_tid; ++left_tid) {
    if (LEFT_PACKED || left_store.hasTupleWithID(left_tid)) {
      // For each tuple in the left block...
      for (tuple_id right_tid = 0; right_tid <= right_max_tid; ++right_tid) {
        if (RIGHT_PACKED || right_store.hasTupleWithID(right_tid)) {
          // For each tuple in the right block...
          if (join_predicate_->matchesForJoinedTuples(*left_accessor,
                                                      left_input_relation_id,
                                                      left_tid,
                                                      *right_accessor,
                                                      right_input_relation_id,
                                                      right_tid)) {
            joined_tuple_ids.emplace_back(left_tid, right_tid);
          }
        }
      }
    }
  }

  if (!joined_tuple_ids.empty()) {
    // TODO(chasseur): If all the output expressions are ScalarAttributes,
    // we could implement a similar fast-path to StorageBlock::selectSimple()
    // that avoids a copy.
    //
    // TODO(chasseur): Depending on the selectivity of the join predicate,
    // 'joined_tuple_ids' might be very long (up to the full cross-product of
    // tuples in a pair of blocks in the most extreme case). The temporary
    // result could then wind up taking a great deal of memory (significantly
    // more than what would fit in a typical StorageBlock). It may be
    // worthwhile to implement a heuristic where we only materialize temporary
    // results N tuples at a time instead of all at once (where N is some
    // number tuned high enough to good benefits from vectorized expression
    // evaluation and data movement, but low enough that temporary memory
    // requirements don't get out of hand).
    ColumnVectorsValueAccessor temp_result;
    std::unique_ptr<ColumnVectorCache> cv_cache = std::make_unique<ColumnVectorCache>();
    for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection_.begin();
         selection_cit != selection_.end();
         ++selection_cit) {
      temp_result.addColumn((*selection_cit)->getAllValuesForJoin(left_input_relation_id,
                                                                  left_accessor.get(),
                                                                  right_input_relation_id,
                                                                  right_accessor.get(),
                                                                  joined_tuple_ids,
                                                                  cv_cache.get()));
    }
    cv_cache.reset();

    output_destination_->bulkInsertTuples(&temp_result);
  }
}

void NestedLoopsJoinWorkOrder::execute() {
  BlockReference left(
      storage_manager_->getBlock(left_block_id_, left_input_relation_));
  BlockReference right(
      storage_manager_->getBlock(right_block_id_, right_input_relation_));

  const TupleStorageSubBlock &left_store = left->getTupleStorageSubBlock();
  const TupleStorageSubBlock &right_store = right->getTupleStorageSubBlock();

  if (left_store.isPacked()) {
    if (right_store.isPacked()) {
      executeHelper<true, true>(left_store, right_store);
    } else {
      executeHelper<true, false>(left_store, right_store);
    }
  } else {
    if (right_store.isPacked()) {
      executeHelper<false, true>(left_store, right_store);
    } else {
      executeHelper<false, false>(left_store, right_store);
    }
  }
}

}  // namespace quickstep
