blob: 7de17ab5da17e4d4123e36369016e07c1e3a2c98 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#include "relational_operators/SortMergeRunOperator.hpp"
#include <cstddef>
#include <cstdlib>
#include <utility>
#include <vector>
#include "query_execution/QueryExecutionTypedefs.hpp"
#include "query_execution/WorkOrderProtosContainer.hpp"
#include "query_execution/WorkOrdersContainer.hpp"
#include "relational_operators/SortMergeRunOperator.pb.h"
#include "relational_operators/SortMergeRunOperatorHelpers.hpp"
#include "relational_operators/WorkOrder.pb.h"
#include "threading/ThreadIDBasedMap.hpp"
#include "glog/logging.h"
#include "tmb/id_typedefs.h"
namespace quickstep {
class InsertDestination;
using merge_run_operator::Run;
using merge_run_operator::RunMerger;
using merge_run_operator::MergeTree;
bool SortMergeRunOperator::getAllWorkOrders(
WorkOrdersContainer *container,
QueryContext *query_context,
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
if (input_relation_is_stored_) {
// Input blocks (or runs) are from base relation. Only possible when base
// relation is stored sorted.
if (!started_) {
// Initialize merge tree completely, since all input runs are known.
merge_tree_.initializeTree(input_relation_block_ids_.size());
started_ = true;
initializeInputRuns();
}
} else {
// Input blocks (or runs) are pipelined from the sorted run generation
// operator.
if (!started_ && !input_stream_done_) {
// Initialize merge tree for first pipeline mode.
merge_tree_.initializeForPipeline();
started_ = true;
initializeInputRuns();
}
}
// Generate runs from merge tree.
return generateWorkOrders(container, query_context, storage_manager, scheduler_client_id, bus);
}
bool SortMergeRunOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
if (input_relation_is_stored_) {
// Input blocks (or runs) are from base relation. Only possible when base
// relation is stored sorted.
if (!started_) {
// Initialize merge tree completely, since all input runs are known.
merge_tree_.initializeTree(input_relation_block_ids_.size());
started_ = true;
initializeInputRuns();
}
} else {
// Input blocks (or runs) are pipelined from the sorted run generation
// operator.
if (!started_ && !input_stream_done_) {
// Initialize merge tree for first pipeline mode.
merge_tree_.initializeForPipeline();
started_ = true;
initializeInputRuns();
}
}
// Get merge jobs from merge tree.
std::vector<MergeTree::MergeJob> jobs;
const bool done_generating = merge_tree_.getMergeJobs(&jobs);
for (std::vector<MergeTree::MergeJob>::size_type job_id = 0;
job_id < jobs.size();
++job_id) {
// Add work order for each merge job.
container->addWorkOrderProto(createWorkOrderProto(&jobs[job_id]), op_index_);
}
return done_generating;
}
serialization::WorkOrder* SortMergeRunOperator::createWorkOrderProto(
merge_run_operator::MergeTree::MergeJob *job) {
DCHECK(job != nullptr);
DCHECK(!job->runs.empty());
serialization::WorkOrder *proto = new serialization::WorkOrder;
proto->set_work_order_type(serialization::SORT_MERGE_RUN);
proto->set_query_id(query_id_);
proto->SetExtension(serialization::SortMergeRunWorkOrder::operator_index, op_index_);
proto->SetExtension(serialization::SortMergeRunWorkOrder::sort_config_index, sort_config_index_);
for (const merge_run_operator::Run &run : job->runs) {
serialization::Run *run_proto = proto->AddExtension(serialization::SortMergeRunWorkOrder::runs);
for (const block_id block : run) {
run_proto->add_blocks(block);
}
}
proto->SetExtension(serialization::SortMergeRunWorkOrder::top_k, top_k_);
proto->SetExtension(serialization::SortMergeRunWorkOrder::merge_level, job->level);
proto->SetExtension(serialization::SortMergeRunWorkOrder::relation_id,
job->level > 0 ? run_relation_.getID()
: input_relation_.getID());
proto->SetExtension(serialization::SortMergeRunWorkOrder::insert_destination_index,
job->is_final_level ? output_destination_index_
: run_block_destination_index_);
return proto;
}
WorkOrder *SortMergeRunOperator::createWorkOrder(
merge_run_operator::MergeTree::MergeJob *job,
QueryContext *query_context,
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
DCHECK(!job->runs.empty());
DCHECK(query_context != nullptr);
InsertDestination *output_destination =
query_context->getInsertDestination(job->is_final_level
? output_destination_index_
: run_block_destination_index_);
// Create a work order from the merge job from merge tree.
return new SortMergeRunWorkOrder(
query_id_,
query_context->getSortConfig(sort_config_index_),
job->level > 0 ? run_relation_ : input_relation_,
std::move(job->runs),
top_k_,
job->level,
output_destination,
storage_manager,
op_index_,
scheduler_client_id,
bus);
}
bool SortMergeRunOperator::generateWorkOrders(
WorkOrdersContainer *container,
QueryContext *query_context,
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
std::vector<MergeTree::MergeJob> jobs;
// Get merge jobs from merge tree.
bool done_generating = merge_tree_.getMergeJobs(&jobs);
for (std::vector<MergeTree::MergeJob>::size_type job_id = 0;
job_id != jobs.size();
++job_id) {
// Add work order for each merge job.
container->addNormalWorkOrder(createWorkOrder(&jobs[job_id],
query_context,
storage_manager,
scheduler_client_id,
bus),
op_index_);
}
return done_generating;
}
void SortMergeRunOperator::initializeInputRuns() {
DCHECK(started_);
if (input_relation_is_stored_) {
// Input blocks are from base relation; add all blocks.
merge_tree_.addInputBlocks(input_relation_block_ids_);
} else {
// Input blocks are pipelined; add only newly available blocks.
if (num_input_workorders_generated_ < input_relation_block_ids_.size()) {
std::vector<block_id> new_blocks(
input_relation_block_ids_.begin() + num_input_workorders_generated_,
input_relation_block_ids_.end());
merge_tree_.addInputBlocks(new_blocks);
num_input_workorders_generated_ += new_blocks.size();
}
}
}
void SortMergeRunOperator::doneFeedingInputBlocks(const relation_id input_relation_id) {
if (input_relation_is_stored_) {
return;
}
// Now we know all the input blocks; compute the merge tree.
merge_tree_.initializeTree(input_relation_block_ids_.size());
// Initialize runs for the final time.
initializeInputRuns();
// If the final merge was already scheduled, fix it to write to correct output
// destinaton.
merge_tree_.checkAndFixFinalMerge();
}
namespace {
/**
* @brief Object to store the output run of a merge step with methods to
* serialize and deserialize from its corresponding protobuf.
**/
class SortMergeRunOutput {
public:
/**
* @brief Constructor with move semantics for creating the object from a
* vector of block_ids.
*
* @param merge_level Level of the current merge.
* @param blocks Rvalue of the vector of blocks in the output.
**/
SortMergeRunOutput(std::size_t merge_level, std::vector<block_id> &&blocks)
: merge_level_(merge_level), blocks_(std::move(blocks)) {}
/**
* @brief Constructor from protobuf object.
*
* @param proto Protobuf of merged run.
**/
explicit SortMergeRunOutput(serialization::SortMergeRunOutput &proto)
: merge_level_(static_cast<std::size_t>(proto.merge_level())) {
blocks_.reserve(proto.blocks_size());
for (const auto block : proto.blocks()) {
blocks_.emplace_back(block);
}
}
/**
* @brief Method to serialize the object.
*
* @return Pair with pointer serialized blob and size of the blob.
**/
std::pair<void *, std::size_t> serialize() const {
serialization::SortMergeRunOutput msg;
msg.set_merge_level(merge_level_);
for (const block_id block : blocks_) {
msg.add_blocks(block);
}
const std::size_t msg_size = msg.ByteSize();
char *msg_bytes = static_cast<char *>(std::malloc(msg_size));
CHECK(msg.SerializeToArray(msg_bytes, msg_size));
return std::make_pair(msg_bytes, msg_size);
}
/**
* @brief Get merge level.
*
* @return Merge level.
**/
std::size_t getMergeLevel() const { return merge_level_; }
/**
* @brief Get mutable pointer to the blocks in the run.
*
* @return Pointer to vector of block_ids in the run.
**/
std::vector<block_id>* getBlocksMutable() { return &blocks_; }
private:
std::size_t merge_level_;
std::vector<block_id> blocks_;
};
} // namespace
void SortMergeRunOperator::receiveFeedbackMessage(
const WorkOrder::FeedbackMessage &msg) {
CHECK(SortMergeRunOperator::kRunOutputMessage == msg.type());
// Deserialize completion message from merge run work order.
serialization::SortMergeRunOutput run_output_proto;
CHECK(run_output_proto.ParseFromArray(msg.payload(), msg.payload_size()));
// Write the output run to merge tree.
SortMergeRunOutput run_output(run_output_proto);
merge_tree_.writeOutputRun(run_output.getMergeLevel(),
run_output.getBlocksMutable());
}
void SortMergeRunWorkOrder::execute() {
// Merge input runs.
merge_run_operator::RunMerger run_merger(sort_config_,
std::move(input_runs_),
top_k_,
run_relation_,
output_destination_,
merge_level_,
storage_manager_);
run_merger.doMerge();
// Serialize completion message with output run.
SortMergeRunOutput output(run_merger.getMergeLevel(),
std::move(*run_merger.getOutputRunMutable()));
std::pair<void *, std::size_t> serialized_output = output.serialize();
// Send completion message to operator.
FeedbackMessage msg(SortMergeRunOperator::kRunOutputMessage,
getQueryID(),
operator_index_,
serialized_output.first,
serialized_output.second);
SendFeedbackMessage(
bus_, ClientIDMap::Instance()->getValue(), scheduler_client_id_, msg);
}
} // namespace quickstep