Thread local insert destination block checkout mechanism added.
diff --git a/storage/InsertDestination.cpp b/storage/InsertDestination.cpp
index 354bed4..e7c3972 100644
--- a/storage/InsertDestination.cpp
+++ b/storage/InsertDestination.cpp
@@ -332,28 +332,43 @@
void BlockPoolInsertDestination::getPartiallyFilledBlocks(std::vector<MutableBlockReference> *partial_blocks) {
SpinMutexLock lock(mutex_);
- for (std::vector<MutableBlockReference>::size_type i = 0; i < available_block_refs_.size(); ++i) {
- partial_blocks->push_back((std::move(available_block_refs_[i])));
+ for (auto it = available_block_refs_.begin(); it != available_block_refs_.end(); ++it) {
+ for (std::vector<MutableBlockReference>::size_type i = 0; i < it->second.size(); ++i) {
+ partial_blocks->push_back((std::move(it->second[i])));
+ }
+ it->second.clear();
}
available_block_refs_.clear();
}
MutableBlockReference BlockPoolInsertDestination::getBlockForInsertion() {
- SpinMutexLock lock(mutex_);
- if (available_block_refs_.empty()) {
- if (available_block_ids_.empty()) {
+ // Find out the thread which made the request.
+ {
+ SpinMutexLock lock(mutex_);
+ const tmb::client_id worker_thread_client_id = thread_id_map_.getValue();
+ if (available_block_refs_.find(worker_thread_client_id) == available_block_refs_.end()) {
+ // Entry for the worker not present.
+ available_block_refs_[worker_thread_client_id];
+ available_block_ids_[worker_thread_client_id];
return createNewBlock();
+ }
+ if (available_block_refs_[worker_thread_client_id].empty()) {
+ // No available refs
+ if (!available_block_ids_[worker_thread_client_id].empty()) {
+ // Block IDs are available.
+ const block_id id = available_block_ids_[worker_thread_client_id].back();
+ available_block_ids_[worker_thread_client_id].pop_back();
+ MutableBlockReference retval = storage_manager_->getBlockMutable(id, relation_);
+ return retval;
+ }
} else {
- const block_id id = available_block_ids_.back();
- available_block_ids_.pop_back();
- MutableBlockReference retval = storage_manager_->getBlockMutable(id, relation_);
+ // Some block refs available.
+ MutableBlockReference retval = std::move(available_block_refs_[worker_thread_client_id].back());
+ available_block_refs_[worker_thread_client_id].pop_back();
return retval;
}
- } else {
- MutableBlockReference retval = std::move(available_block_refs_.back());
- available_block_refs_.pop_back();
- return retval;
}
+ return createNewBlock();
}
void BlockPoolInsertDestination::returnBlock(MutableBlockReference &&block, const bool full) {
@@ -362,7 +377,8 @@
if (full) {
done_block_ids_.push_back(block->getID());
} else {
- available_block_refs_.push_back(std::move(block));
+ const tmb::client_id worker_thread_client_id = thread_id_map_.getValue();
+ available_block_refs_[worker_thread_client_id].push_back(std::move(block));
return;
}
}
@@ -377,8 +393,11 @@
}
const std::vector<block_id>& BlockPoolInsertDestination::getTouchedBlocksInternal() {
- for (std::vector<MutableBlockReference>::size_type i = 0; i < available_block_refs_.size(); ++i) {
- done_block_ids_.push_back(available_block_refs_[i]->getID());
+ for (auto it = available_block_refs_.begin(); it != available_block_refs_.end(); ++it) {
+ for (std::vector<MutableBlockReference>::size_type i = 0; i < it->second.size(); ++i) {
+ done_block_ids_.push_back((it->second)[i]->getID());
+ }
+ it->second.clear();
}
available_block_refs_.clear();
diff --git a/storage/InsertDestination.hpp b/storage/InsertDestination.hpp
index 670cd6c..d6a96bc 100644
--- a/storage/InsertDestination.hpp
+++ b/storage/InsertDestination.hpp
@@ -366,7 +366,7 @@
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus)
: InsertDestination(relation, layout, storage_manager, relational_op_index, scheduler_client_id, bus),
- available_block_ids_(std::move(blocks)) {
+ global_block_ids_(std::move(blocks)) {
// TODO(chasseur): Once block fill statistics are available, replace this
// with something smarter.
}
@@ -389,12 +389,17 @@
FRIEND_TEST(ForemanTest, TwoNodesDAGPartiallyFilledBlocksTest);
FRIEND_TEST(QueryManagerTest, TwoNodesDAGPartiallyFilledBlocksTest);
- // A vector of references to blocks which are loaded in memory.
+ /*// A vector of references to blocks which are loaded in memory.
std::vector<MutableBlockReference> available_block_refs_;
// A vector of blocks from the relation that are not loaded in memory yet.
- std::vector<block_id> available_block_ids_;
+ std::vector<block_id> available_block_ids_;*/
// A vector of fully filled blocks.
std::vector<block_id> done_block_ids_;
+ std::unordered_map<tmb::client_id, std::vector<MutableBlockReference>> available_block_refs_;
+ std::unordered_map<tmb::client_id, std::vector<block_id>> available_block_ids_;
+
+ // A pool of globally available blocks.
+ std::vector<block_id> global_block_ids_;
DISALLOW_COPY_AND_ASSIGN(BlockPoolInsertDestination);
};