/**
 *   Copyright 2011-2015 Quickstep Technologies LLC.
 *   Copyright 2015-2016 Pivotal Software, Inc.
 *
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
 *
 *       http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
 **/

#include "relational_operators/NestedLoopsJoinOperator.hpp"

#include <cstddef>
#include <memory>
#include <utility>
#include <vector>

#include "catalog/CatalogRelationSchema.hpp"
#include "expressions/predicate/Predicate.hpp"
#include "expressions/scalar/Scalar.hpp"
#include "query_execution/QueryContext.hpp"
#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
#include "relational_operators/WorkOrder.pb.h"
#include "storage/InsertDestination.hpp"
#include "storage/StorageBlock.hpp"
#include "storage/StorageBlockInfo.hpp"
#include "storage/StorageManager.hpp"
#include "storage/TupleStorageSubBlock.hpp"
#include "storage/ValueAccessor.hpp"
#include "types/containers/ColumnVectorsValueAccessor.hpp"

#include "glog/logging.h"

#include "tmb/id_typedefs.h"

using std::unique_ptr;
using std::vector;

namespace quickstep {

void NestedLoopsJoinOperator::feedInputBlocks(const relation_id rel_id,
                                              std::vector<block_id> *partially_filled_blocks) {
  for (std::vector<block_id>::const_iterator it = partially_filled_blocks->begin();
       it != partially_filled_blocks->end();
       ++it) {
    feedInputBlock(*it, rel_id);
  }
}

void NestedLoopsJoinOperator::feedInputBlock(const block_id input_block_id, const relation_id input_relation_id) {
  if (input_relation_id == left_input_relation_.getID()) {
    left_relation_block_ids_.push_back(input_block_id);
  } else if (input_relation_id == right_input_relation_.getID()) {
    right_relation_block_ids_.push_back(input_block_id);
  } else {
    FATAL_ERROR("The input block sent to the NestedLoopsJoinOperator belongs "
                "to a different relation than the left and right relations");
  }
}

bool NestedLoopsJoinOperator::getAllWorkOrders(
    WorkOrdersContainer *container,
    QueryContext *query_context,
    StorageManager *storage_manager,
    const tmb::client_id foreman_client_id,
    const tmb::client_id agent_client_id,
    tmb::MessageBus *bus) {
  if (left_relation_is_stored_ && right_relation_is_stored_) {
    // Make sure we generate workorders only once.
    if (!all_workorders_generated_) {
      for (const block_id left_block_id : left_relation_block_ids_) {
        for (const block_id right_block_id : right_relation_block_ids_) {
          container->addNormalWorkOrder(
              new NestedLoopsJoinWorkOrder(left_input_relation_,
                                           right_input_relation_,
                                           left_block_id,
                                           right_block_id,
                                           query_context->getPredicate(join_predicate_index_),
                                           query_context->getScalarGroup(selection_index_),
                                           query_context->getInsertDestination(output_destination_index_),
                                           storage_manager),
              op_index_);
        }
      }
      all_workorders_generated_ = true;
    }
    return all_workorders_generated_;
  } else if (!(left_relation_is_stored_ || right_relation_is_stored_)) {
    // Both relations are not stored.
    std::vector<block_id>::size_type new_left_blocks
        = left_relation_block_ids_.size() - num_left_workorders_generated_;
    std::vector<block_id>::size_type new_right_blocks
        = right_relation_block_ids_.size() - num_right_workorders_generated_;

    std::size_t new_workorders = 0;
    if (new_left_blocks > 0 && new_right_blocks > 0) {
      // Blocks added to both left and right relations.
      // First generate (left + new_left_blocks) * (new_right_blocks).
      new_workorders = getAllWorkOrdersHelperBothNotStored(container,
                                                           query_context,
                                                           storage_manager,
                                                           0,
                                                           left_relation_block_ids_.size(),
                                                           num_right_workorders_generated_,
                                                           right_relation_block_ids_.size());

      // Now generate new_left_blocks * (right).
      new_workorders += getAllWorkOrdersHelperBothNotStored(container,
                                                            query_context,
                                                            storage_manager,
                                                            num_left_workorders_generated_,
                                                            left_relation_block_ids_.size(),
                                                            0,
                                                            num_right_workorders_generated_);
    } else if (new_left_blocks == 0 && new_right_blocks > 0) {
      // Only new right blocks are added. Generate left * new_right_blocks.
      new_workorders = getAllWorkOrdersHelperBothNotStored(container,
                                                           query_context,
                                                           storage_manager,
                                                           0,
                                                           left_relation_block_ids_.size(),
                                                           num_right_workorders_generated_,
                                                           right_relation_block_ids_.size());
    } else if (new_left_blocks > 0 && new_right_blocks == 0) {
      // Generate new_left_blocks * right
      new_workorders = getAllWorkOrdersHelperBothNotStored(container,
                                                           query_context,
                                                           storage_manager,
                                                           num_left_workorders_generated_,
                                                           left_relation_block_ids_.size(),
                                                           0,
                                                           right_relation_block_ids_.size());
    }
    if (new_workorders > 0) {
      num_left_workorders_generated_ = left_relation_block_ids_.size();
      num_right_workorders_generated_ = right_relation_block_ids_.size();
    }
    return done_feeding_left_relation_ && done_feeding_right_relation_;
  } else {
    // Only one relation is a stored relation.
    return getAllWorkOrdersHelperOneStored(container, query_context, storage_manager);
  }
}

bool NestedLoopsJoinOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
  if (left_relation_is_stored_ && right_relation_is_stored_) {
    // Make sure we generate workorders only once.
    if (all_workorders_generated_) {
      return true;
    }

    all_workorders_generated_ = true;

    for (const block_id left_block_id : left_relation_block_ids_) {
      for (const block_id right_block_id : right_relation_block_ids_) {
        container->addWorkOrderProto(createWorkOrderProto(left_block_id, right_block_id),
                                     op_index_);
      }
    }

    return true;
  } else if (!(left_relation_is_stored_ || right_relation_is_stored_)) {
    // Both relations are not stored.
    const std::vector<block_id>::size_type new_left_blocks
        = left_relation_block_ids_.size() - num_left_workorders_generated_;
    const std::vector<block_id>::size_type new_right_blocks
        = right_relation_block_ids_.size() - num_right_workorders_generated_;

    std::size_t new_workorders = 0;
    if (new_left_blocks > 0 && new_right_blocks > 0) {
      // Blocks added to both left and right relations.
      // First generate (left + new_left_blocks) * (new_right_blocks).
      new_workorders =
          getAllWorkOrderProtosHelperBothNotStored(container,
                                                   0,
                                                   left_relation_block_ids_.size(),
                                                   num_right_workorders_generated_,
                                                   right_relation_block_ids_.size());

      // Now generate new_left_blocks * (right).
      new_workorders +=
          getAllWorkOrderProtosHelperBothNotStored(container,
                                                   num_left_workorders_generated_,
                                                   left_relation_block_ids_.size(),
                                                   0,
                                                   num_right_workorders_generated_);
    } else if (new_left_blocks == 0 && new_right_blocks > 0) {
      // Only new right blocks are added. Generate left * new_right_blocks.
      new_workorders =
          getAllWorkOrderProtosHelperBothNotStored(container,
                                                   0,
                                                   left_relation_block_ids_.size(),
                                                   num_right_workorders_generated_,
                                                   right_relation_block_ids_.size());
    } else if (new_left_blocks > 0 && new_right_blocks == 0) {
      // Generate new_left_blocks * right
      new_workorders =
          getAllWorkOrderProtosHelperBothNotStored(container,
                                                   num_left_workorders_generated_,
                                                   left_relation_block_ids_.size(),
                                                   0,
                                                   right_relation_block_ids_.size());
    }
    if (new_workorders > 0) {
      num_left_workorders_generated_ = left_relation_block_ids_.size();
      num_right_workorders_generated_ = right_relation_block_ids_.size();
    }
    return done_feeding_left_relation_ && done_feeding_right_relation_;
  } else {
    // Only one relation is a stored relation.
    return getAllWorkOrderProtosHelperOneStored(container);
  }
}

std::size_t NestedLoopsJoinOperator::getAllWorkOrdersHelperBothNotStored(WorkOrdersContainer *container,
                                                                         QueryContext *query_context,
                                                                         StorageManager *storage_manager,
                                                                         std::vector<block_id>::size_type left_min,
                                                                         std::vector<block_id>::size_type left_max,
                                                                         std::vector<block_id>::size_type right_min,
                                                                         std::vector<block_id>::size_type right_max) {
  DCHECK(!(left_relation_is_stored_ || right_relation_is_stored_));
  DCHECK(left_min <= left_max);
  DCHECK(right_min <= right_max);

  for (std::vector<block_id>::size_type left_index = left_min;
       left_index < left_max;
       ++left_index) {
    for (std::vector<block_id>::size_type right_index = right_min;
         right_index < right_max;
         ++right_index) {
      container->addNormalWorkOrder(
          new NestedLoopsJoinWorkOrder(left_input_relation_,
                                       right_input_relation_,
                                       left_relation_block_ids_[left_index],
                                       right_relation_block_ids_[right_index],
                                       query_context->getPredicate(join_predicate_index_),
                                       query_context->getScalarGroup(selection_index_),
                                       query_context->getInsertDestination(output_destination_index_),
                                       storage_manager),
          op_index_);
    }
  }
  // Return the number of workorders produced.
  return (left_max - left_min) * (right_max - right_min);
}

bool NestedLoopsJoinOperator::getAllWorkOrdersHelperOneStored(WorkOrdersContainer *container,
                                                              QueryContext *query_context,
                                                              StorageManager *storage_manager) {
  DCHECK(left_relation_is_stored_ ^ right_relation_is_stored_);
  DCHECK(query_context != nullptr);

  const Predicate *join_predicate = query_context->getPredicate(join_predicate_index_);
  const vector<unique_ptr<const Scalar>> &selection =
      query_context->getScalarGroup(selection_index_);
  InsertDestination *output_destination =
      query_context->getInsertDestination(output_destination_index_);

  if (left_relation_is_stored_) {
    for (std::vector<block_id>::size_type right_index = num_right_workorders_generated_;
         right_index < right_relation_block_ids_.size();
         ++right_index) {
      for (const block_id left_block_id : left_relation_block_ids_) {
        container->addNormalWorkOrder(
            new NestedLoopsJoinWorkOrder(
                left_input_relation_,
                right_input_relation_,
                left_block_id,
                right_relation_block_ids_[right_index],
                join_predicate,
                selection,
                output_destination,
                storage_manager),
            op_index_);
      }
    }
    num_right_workorders_generated_ = right_relation_block_ids_.size();
    return done_feeding_right_relation_;
  } else {
    for (std::vector<block_id>::size_type left_index = num_left_workorders_generated_;
         left_index < left_relation_block_ids_.size();
         ++left_index) {
      for (const block_id right_block_id : right_relation_block_ids_) {
        container->addNormalWorkOrder(
            new NestedLoopsJoinWorkOrder(left_input_relation_,
                                         right_input_relation_,
                                         left_relation_block_ids_[left_index],
                                         right_block_id,
                                         join_predicate,
                                         selection,
                                         output_destination,
                                         storage_manager),
            op_index_);
      }
    }
    num_left_workorders_generated_ = left_relation_block_ids_.size();
    return done_feeding_left_relation_;
  }
}

std::size_t NestedLoopsJoinOperator::getAllWorkOrderProtosHelperBothNotStored(
    WorkOrderProtosContainer *container,
    const std::vector<block_id>::size_type left_min,
    const std::vector<block_id>::size_type left_max,
    const std::vector<block_id>::size_type right_min,
    const std::vector<block_id>::size_type right_max) {
  DCHECK(!(left_relation_is_stored_ || right_relation_is_stored_));
  DCHECK_LE(left_min, left_max);
  DCHECK_LE(right_min, right_max);

  for (std::vector<block_id>::size_type left_index = left_min;
       left_index < left_max;
       ++left_index) {
    for (std::vector<block_id>::size_type right_index = right_min;
         right_index < right_max;
         ++right_index) {
      container->addWorkOrderProto(
          createWorkOrderProto(left_relation_block_ids_[left_index], right_relation_block_ids_[right_index]),
          op_index_);
    }
  }
  // Return the number of workorders produced.
  return (left_max - left_min) * (right_max - right_min);
}
bool NestedLoopsJoinOperator::getAllWorkOrderProtosHelperOneStored(WorkOrderProtosContainer *container) {
  DCHECK(left_relation_is_stored_ ^ right_relation_is_stored_);

  if (left_relation_is_stored_) {
    for (std::vector<block_id>::size_type right_index = num_right_workorders_generated_;
         right_index < right_relation_block_ids_.size();
         ++right_index) {
      for (const block_id left_block_id : left_relation_block_ids_) {
        container->addWorkOrderProto(
            createWorkOrderProto(left_block_id, right_relation_block_ids_[right_index]),
            op_index_);
      }
    }
    num_right_workorders_generated_ = right_relation_block_ids_.size();
    return done_feeding_right_relation_;
  } else {
    for (std::vector<block_id>::size_type left_index = num_left_workorders_generated_;
         left_index < left_relation_block_ids_.size();
         ++left_index) {
      for (const block_id right_block_id : right_relation_block_ids_) {
        container->addWorkOrderProto(
            createWorkOrderProto(left_relation_block_ids_[left_index], right_block_id),
            op_index_);
      }
    }
    num_left_workorders_generated_ = left_relation_block_ids_.size();
    return done_feeding_left_relation_;
  }
}

serialization::WorkOrder* NestedLoopsJoinOperator::createWorkOrderProto(const block_id left_block,
                                                                        const block_id right_block) {
  serialization::WorkOrder *proto = new serialization::WorkOrder;
  proto->set_work_order_type(serialization::NESTED_LOOP_JOIN);

  proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::left_relation_id, left_input_relation_.getID());
  proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::right_relation_id, right_input_relation_.getID());
  proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::left_block_id, left_block);
  proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::right_block_id, right_block);
  proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::insert_destination_index,
                      output_destination_index_);
  proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::join_predicate_index, join_predicate_index_);
  proto->SetExtension(serialization::NestedLoopsJoinWorkOrder::selection_index, selection_index_);

  return proto;
}


template <bool LEFT_PACKED, bool RIGHT_PACKED>
void NestedLoopsJoinWorkOrder::executeHelper(const TupleStorageSubBlock &left_store,
                                             const TupleStorageSubBlock &right_store) {
  const relation_id left_input_relation_id = left_input_relation_.getID();
  const relation_id right_input_relation_id = right_input_relation_.getID();

  const tuple_id left_max_tid = left_store.getMaxTupleID();
  const tuple_id right_max_tid = right_store.getMaxTupleID();

  std::unique_ptr<ValueAccessor> left_accessor(left_store.createValueAccessor());
  std::unique_ptr<ValueAccessor> right_accessor(right_store.createValueAccessor());

  // Check each pair of tuples in the two blocks and build up a list of
  // matches.
  //
  // TODO(chasseur): Vectorize evaluation of this join predicate.
  std::vector<std::pair<tuple_id, tuple_id>> joined_tuple_ids;
  for (tuple_id left_tid = 0; left_tid <= left_max_tid; ++left_tid) {
    if (LEFT_PACKED || left_store.hasTupleWithID(left_tid)) {
      // For each tuple in the left block...
      for (tuple_id right_tid = 0; right_tid <= right_max_tid; ++right_tid) {
        if (RIGHT_PACKED || right_store.hasTupleWithID(right_tid)) {
          // For each tuple in the right block...
          if (join_predicate_->matchesForJoinedTuples(*left_accessor,
                                                      left_input_relation_id,
                                                      left_tid,
                                                      *right_accessor,
                                                      right_input_relation_id,
                                                      right_tid)) {
            joined_tuple_ids.emplace_back(left_tid, right_tid);
          }
        }
      }
    }
  }

  if (!joined_tuple_ids.empty()) {
    // TODO(chasseur): If all the output expressions are ScalarAttributes,
    // we could implement a similar fast-path to StorageBlock::selectSimple()
    // that avoids a copy.
    //
    // TODO(chasseur): Depending on the selectivity of the join predicate,
    // 'joined_tuple_ids' might be very long (up to the full cross-product of
    // tuples in a pair of blocks in the most extreme case). The temporary
    // result could then wind up taking a great deal of memory (significantly
    // more than what would fit in a typical StorageBlock). It may be
    // worthwhile to implement a heuristic where we only materialize temporary
    // results N tuples at a time instead of all at once (where N is some
    // number tuned high enough to good benefits from vectorized expression
    // evaluation and data movement, but low enough that temporary memory
    // requirements don't get out of hand).
    ColumnVectorsValueAccessor temp_result;
    for (vector<unique_ptr<const Scalar>>::const_iterator selection_cit = selection_.begin();
         selection_cit != selection_.end();
         ++selection_cit) {
      temp_result.addColumn((*selection_cit)->getAllValuesForJoin(left_input_relation_id,
                                                                  left_accessor.get(),
                                                                  right_input_relation_id,
                                                                  right_accessor.get(),
                                                                  joined_tuple_ids));
    }

    output_destination_->bulkInsertTuples(&temp_result);
  }
}

void NestedLoopsJoinWorkOrder::execute() {
  BlockReference left(
      storage_manager_->getBlock(left_block_id_, left_input_relation_));
  BlockReference right(
      storage_manager_->getBlock(right_block_id_, right_input_relation_));

  const TupleStorageSubBlock &left_store = left->getTupleStorageSubBlock();
  const TupleStorageSubBlock &right_store = right->getTupleStorageSubBlock();

  if (left_store.isPacked()) {
    if (right_store.isPacked()) {
      executeHelper<true, true>(left_store, right_store);
    } else {
      executeHelper<true, false>(left_store, right_store);
    }
  } else {
    if (right_store.isPacked()) {
      executeHelper<false, true>(left_store, right_store);
    } else {
      executeHelper<false, false>(left_store, right_store);
    }
  }
}

}  // namespace quickstep
