Minor refactored SortMergeRunOperator.
diff --git a/relational_operators/SortMergeRunOperator.cpp b/relational_operators/SortMergeRunOperator.cpp
index 7de17ab..3d2428e 100644
--- a/relational_operators/SortMergeRunOperator.cpp
+++ b/relational_operators/SortMergeRunOperator.cpp
@@ -50,22 +50,21 @@
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
- if (input_relation_is_stored_) {
- // Input blocks (or runs) are from base relation. Only possible when base
- // relation is stored sorted.
- if (!started_) {
+ if (!started_) {
+ started_ = true;
+ if (input_relation_is_stored_) {
+ // Input blocks (or runs) are from base relation. Only possible when base
+ // relation is stored sorted.
// Initialize merge tree completely, since all input runs are known.
merge_tree_.initializeTree(input_relation_block_ids_.size());
- started_ = true;
- initializeInputRuns();
- }
- } else {
- // Input blocks (or runs) are pipelined from the sorted run generation
- // operator.
- if (!started_ && !input_stream_done_) {
+
+ // Input blocks are from base relation; add all blocks.
+ merge_tree_.addInputBlocks(input_relation_block_ids_);
+ } else {
+ // Input blocks (or runs) are pipelined from the sorted run generation
+ // operator.
// Initialize merge tree for first pipeline mode.
merge_tree_.initializeForPipeline();
- started_ = true;
initializeInputRuns();
}
}
@@ -74,22 +73,20 @@
}
bool SortMergeRunOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
- if (input_relation_is_stored_) {
- // Input blocks (or runs) are from base relation. Only possible when base
- // relation is stored sorted.
- if (!started_) {
+ if (!started_) {
+ started_ = true;
+ if (input_relation_is_stored_) {
+ // Input blocks (or runs) are from base relation. Only possible when base
+ // relation is stored sorted.
// Initialize merge tree completely, since all input runs are known.
merge_tree_.initializeTree(input_relation_block_ids_.size());
- started_ = true;
- initializeInputRuns();
- }
- } else {
- // Input blocks (or runs) are pipelined from the sorted run generation
- // operator.
- if (!started_ && !input_stream_done_) {
+ // Input blocks are from base relation; add all blocks.
+ merge_tree_.addInputBlocks(input_relation_block_ids_);
+ } else {
+ // Input blocks (or runs) are pipelined from the sorted run generation
+ // operator.
// Initialize merge tree for first pipeline mode.
merge_tree_.initializeForPipeline();
- started_ = true;
initializeInputRuns();
}
}
@@ -196,25 +193,20 @@
void SortMergeRunOperator::initializeInputRuns() {
DCHECK(started_);
- if (input_relation_is_stored_) {
- // Input blocks are from base relation; add all blocks.
- merge_tree_.addInputBlocks(input_relation_block_ids_);
- } else {
- // Input blocks are pipelined; add only newly available blocks.
- if (num_input_workorders_generated_ < input_relation_block_ids_.size()) {
- std::vector<block_id> new_blocks(
- input_relation_block_ids_.begin() + num_input_workorders_generated_,
- input_relation_block_ids_.end());
- merge_tree_.addInputBlocks(new_blocks);
- num_input_workorders_generated_ += new_blocks.size();
- }
+ DCHECK(!input_relation_is_stored_);
+
+ // Input blocks are pipelined; add only newly available blocks.
+ if (num_input_workorders_generated_ < input_relation_block_ids_.size()) {
+ const std::vector<block_id> new_blocks(
+ input_relation_block_ids_.begin() + num_input_workorders_generated_,
+ input_relation_block_ids_.end());
+ merge_tree_.addInputBlocks(new_blocks);
+ num_input_workorders_generated_ = input_relation_block_ids_.size();
}
}
void SortMergeRunOperator::doneFeedingInputBlocks(const relation_id input_relation_id) {
- if (input_relation_is_stored_) {
- return;
- }
+ DCHECK(!input_relation_is_stored_);
// Now we know all the input blocks; compute the merge tree.
merge_tree_.initializeTree(input_relation_block_ids_.size());
diff --git a/relational_operators/SortMergeRunOperator.hpp b/relational_operators/SortMergeRunOperator.hpp
index 6287068..79b0442 100644
--- a/relational_operators/SortMergeRunOperator.hpp
+++ b/relational_operators/SortMergeRunOperator.hpp
@@ -123,7 +123,6 @@
run_relation_(run_relation),
run_block_destination_index_(run_block_destination_index),
input_relation_is_stored_(input_relation_is_stored),
- input_stream_done_(input_relation_is_stored),
started_(false) {
DCHECK_EQ(1u, input_relation.getNumPartitions());
DCHECK_EQ(1u, output_relation.getNumPartitions());
@@ -214,7 +213,6 @@
const QueryContext::insert_destination_id run_block_destination_index_;
const bool input_relation_is_stored_;
- const bool input_stream_done_;
bool started_;