Refactored the data exchange operation.
Unlike the original implementation needs two round messages, the redesigned
block pull now needs only one round RPC, unless there is a RPC failure.
The network address for peer StorageManager is cached locally, and will be
updated once a block is created by a newly added peer.
diff --git a/query_execution/BlockLocator.cpp b/query_execution/BlockLocator.cpp
index 89c1c00..1c8c690 100644
--- a/query_execution/BlockLocator.cpp
+++ b/query_execution/BlockLocator.cpp
@@ -130,11 +130,8 @@
}
break;
}
- case kGetPeerDomainNetworkAddressesMessage: {
- serialization::BlockMessage proto;
- CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- processGetPeerDomainNetworkAddressesMessage(sender, proto.block_id());
+ case kGetAllDomainNetworkAddressesMessage: {
+ processGetAllDomainNetworkAddressesMessage(sender);
break;
}
case kBlockDomainUnregistrationMessage: {
@@ -193,32 +190,28 @@
move(message)));
}
-void BlockLocator::processGetPeerDomainNetworkAddressesMessage(const client_id receiver,
- const block_id block) {
- serialization::GetPeerDomainNetworkAddressesResponseMessage proto;
+void BlockLocator::processGetAllDomainNetworkAddressesMessage(const client_id receiver) {
+ serialization::GetAllDomainNetworkAddressesResponseMessage proto;
// NOTE(zuyu): We don't need to protect here, as all the writers are in the
// single thread.
- for (const block_id_domain domain : block_locations_[block]) {
- proto.add_domain_network_addresses(domain_network_addresses_[domain]);
+ for (const auto &domain_network_address_pair : domain_network_addresses_) {
+ (*proto.mutable_domain_network_addresses())[domain_network_address_pair.first] =
+ domain_network_address_pair.second;
}
const int proto_length = proto.ByteSize();
char *proto_bytes = static_cast<char*>(malloc(proto_length));
CHECK(proto.SerializeToArray(proto_bytes, proto_length));
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kGetPeerDomainNetworkAddressesResponseMessage);
+ TaggedMessage message(static_cast<const void*>(proto_bytes), proto_length,
+ kGetAllDomainNetworkAddressesResponseMessage);
free(proto_bytes);
DLOG(INFO) << "BlockLocator with Client " << locator_client_id_
- << " sent GetPeerDomainNetworkAddressesResponseMessage to StorageManager with Client " << receiver;
+ << " sent GetAllDomainNetworkAddressesResponseMessage to StorageManager with Client " << receiver;
CHECK(tmb::MessageBus::SendStatus::kOK ==
- QueryExecutionUtil::SendTMBMessage(bus_,
- locator_client_id_,
- receiver,
- move(message)));
+ QueryExecutionUtil::SendTMBMessage(bus_, locator_client_id_, receiver, move(message)));
}
} // namespace quickstep
diff --git a/query_execution/BlockLocator.hpp b/query_execution/BlockLocator.hpp
index f5572ca..82c28ae 100644
--- a/query_execution/BlockLocator.hpp
+++ b/query_execution/BlockLocator.hpp
@@ -74,8 +74,8 @@
bus_->RegisterClientAsReceiver(locator_client_id_, kAddBlockLocationMessage);
bus_->RegisterClientAsReceiver(locator_client_id_, kDeleteBlockLocationMessage);
- bus_->RegisterClientAsReceiver(locator_client_id_, kGetPeerDomainNetworkAddressesMessage);
- bus_->RegisterClientAsSender(locator_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
+ bus_->RegisterClientAsReceiver(locator_client_id_, kGetAllDomainNetworkAddressesMessage);
+ bus_->RegisterClientAsSender(locator_client_id_, kGetAllDomainNetworkAddressesResponseMessage);
bus_->RegisterClientAsReceiver(locator_client_id_, kBlockDomainUnregistrationMessage);
bus_->RegisterClientAsReceiver(locator_client_id_, kPoisonMessage);
@@ -146,7 +146,7 @@
private:
void processBlockDomainRegistrationMessage(const tmb::client_id receiver, const std::string &network_address);
- void processGetPeerDomainNetworkAddressesMessage(const tmb::client_id receiver, const block_id block);
+ void processGetAllDomainNetworkAddressesMessage(const tmb::client_id receiver);
tmb::MessageBus *bus_;
diff --git a/query_execution/QueryExecutionMessages.proto b/query_execution/QueryExecutionMessages.proto
index c70b339..9c59985 100644
--- a/query_execution/QueryExecutionMessages.proto
+++ b/query_execution/QueryExecutionMessages.proto
@@ -164,6 +164,6 @@
required fixed64 block_id = 1;
}
-message GetPeerDomainNetworkAddressesResponseMessage {
- repeated string domain_network_addresses = 1;
+message GetAllDomainNetworkAddressesResponseMessage {
+ map<uint32, string> domain_network_addresses = 1;
}
diff --git a/query_execution/QueryExecutionTypedefs.hpp b/query_execution/QueryExecutionTypedefs.hpp
index c56bcfd..80da7c5 100644
--- a/query_execution/QueryExecutionTypedefs.hpp
+++ b/query_execution/QueryExecutionTypedefs.hpp
@@ -115,8 +115,8 @@
kBlockDomainToShiftbossIndexMessage, // From StorageManager to BlockLocator.
kAddBlockLocationMessage, // From StorageManager to BlockLocator.
kDeleteBlockLocationMessage, // From StorageManager to BlockLocator.
- kGetPeerDomainNetworkAddressesMessage, // From StorageManager to BlockLocator.
- kGetPeerDomainNetworkAddressesResponseMessage, // From BlockLocator to StorageManager.
+ kGetAllDomainNetworkAddressesMessage, // From StorageManager to BlockLocator.
+ kGetAllDomainNetworkAddressesResponseMessage, // From BlockLocator to StorageManager.
kBlockDomainUnregistrationMessage, // From StorageManager to BlockLocator.
#endif
};
diff --git a/query_execution/QueryExecutionUtil.hpp b/query_execution/QueryExecutionUtil.hpp
index 1388426..e7aa512 100644
--- a/query_execution/QueryExecutionUtil.hpp
+++ b/query_execution/QueryExecutionUtil.hpp
@@ -83,8 +83,8 @@
case kBlockDomainToShiftbossIndexMessage: return "BlockDomainToShiftbossIndexMessage";
case kAddBlockLocationMessage: return "AddBlockLocationMessage";
case kDeleteBlockLocationMessage: return "DeleteBlockLocationMessage";
- case kGetPeerDomainNetworkAddressesMessage: return "GetPeerDomainNetworkAddressesMessage";
- case kGetPeerDomainNetworkAddressesResponseMessage: return "GetPeerDomainNetworkAddressesResponseMessage";
+ case kGetAllDomainNetworkAddressesMessage: return "GetAllDomainNetworkAddressesMessage";
+ case kGetAllDomainNetworkAddressesResponseMessage: return "GetAllDomainNetworkAddressesResponseMessage";
case kBlockDomainUnregistrationMessage: return "BlockDomainUnregistrationMessage";
#endif // QUICKSTEP_DISTRIBUTED
default:
diff --git a/query_execution/tests/BlockLocator_unittest.cpp b/query_execution/tests/BlockLocator_unittest.cpp
index 1a7cf17..1f0418a 100644
--- a/query_execution/tests/BlockLocator_unittest.cpp
+++ b/query_execution/tests/BlockLocator_unittest.cpp
@@ -106,21 +106,20 @@
}
void checkLoaded(const block_id block) {
- const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block);
- EXPECT_EQ(1u, peer_domain_network_addresses.size());
- EXPECT_STREQ(kDomainNetworkAddress, peer_domain_network_addresses[0].data());
+ unordered_set<block_id_domain> domains;
+ do {
+ domains = locator_->getBlockDomains(block);
+ } while (domains.empty());
- const unordered_set<block_id_domain> domains = locator_->getBlockDomains(block);
EXPECT_EQ(1u, domains.size());
EXPECT_EQ(1u, domains.count(block_domain_));
}
void checkEvicted(const block_id block) {
- const vector<string> peer_domain_network_addresses = storage_manager_->getPeerDomainNetworkAddresses(block);
- EXPECT_TRUE(peer_domain_network_addresses.empty());
-
- const unordered_set<block_id_domain> domains = locator_->getBlockDomains(block);
- EXPECT_TRUE(domains.empty());
+ unordered_set<block_id_domain> domains;
+ do {
+ domains = locator_->getBlockDomains(block);
+ } while (!domains.empty());
}
tmb::client_id worker_client_id_;
@@ -146,6 +145,10 @@
storage_manager_->createBlock(relation, relation.getDefaultStorageBlockLayout());
checkLoaded(block);
+ const string peer_domain_network_address =
+ storage_manager_->getPeerDomainNetworkAddress(BlockIdUtil::Domain(block));
+ EXPECT_STREQ(kDomainNetworkAddress, peer_domain_network_address.data());
+
ASSERT_TRUE(storage_manager_->saveBlockOrBlob(block));
storage_manager_->evictBlockOrBlob(block);
checkEvicted(block);
@@ -163,6 +166,10 @@
const block_id blob = storage_manager_->createBlob(kDefaultBlockSizeInSlots);
checkLoaded(blob);
+ const string peer_domain_network_address =
+ storage_manager_->getPeerDomainNetworkAddress(BlockIdUtil::Domain(blob));
+ EXPECT_STREQ(kDomainNetworkAddress, peer_domain_network_address.data());
+
ASSERT_TRUE(storage_manager_->saveBlockOrBlob(blob));
storage_manager_->evictBlockOrBlob(blob);
checkEvicted(blob);
diff --git a/storage/StorageManager.cpp b/storage/StorageManager.cpp
index c70eafa..b7d87e7 100644
--- a/storage/StorageManager.cpp
+++ b/storage/StorageManager.cpp
@@ -224,13 +224,14 @@
if (bus_) {
storage_manager_client_id_ = bus_->Connect();
- bus_->RegisterClientAsSender(storage_manager_client_id_, kGetPeerDomainNetworkAddressesMessage);
- bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetPeerDomainNetworkAddressesResponseMessage);
-
bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainToShiftbossIndexMessage);
bus_->RegisterClientAsSender(storage_manager_client_id_, kAddBlockLocationMessage);
bus_->RegisterClientAsSender(storage_manager_client_id_, kDeleteBlockLocationMessage);
+
+ bus_->RegisterClientAsSender(storage_manager_client_id_, kGetAllDomainNetworkAddressesMessage);
+ bus_->RegisterClientAsReceiver(storage_manager_client_id_, kGetAllDomainNetworkAddressesResponseMessage);
+
bus_->RegisterClientAsSender(storage_manager_client_id_, kBlockDomainUnregistrationMessage);
LOG(INFO) << "StorageManager with Client " << storage_manager_client_id_
@@ -548,10 +549,8 @@
return false;
}
- if (!response.is_valid()) {
- LOG(INFO) << "The pulling block not found in all the peers";
- return false;
- }
+ CHECK(response.is_valid())
+ << "The pulling block not found in all the peers";
const size_t num_slots = response.num_slots();
DCHECK_NE(num_slots, 0u);
@@ -577,46 +576,54 @@
return nullptr;
}
-vector<string> StorageManager::getPeerDomainNetworkAddresses(const block_id block) {
- serialization::BlockMessage proto;
- proto.set_block_id(block);
-
- const int proto_length = proto.ByteSize();
- char *proto_bytes = static_cast<char*>(malloc(proto_length));
- CHECK(proto.SerializeToArray(proto_bytes, proto_length));
-
- TaggedMessage message(static_cast<const void*>(proto_bytes),
- proto_length,
- kGetPeerDomainNetworkAddressesMessage);
- free(proto_bytes);
-
- DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_
- << " sent GetPeerDomainNetworkAddressesMessage to BlockLocator";
-
- DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
- DCHECK(bus_ != nullptr);
- CHECK(MessageBus::SendStatus::kOK ==
- QueryExecutionUtil::SendTMBMessage(bus_,
- storage_manager_client_id_,
- block_locator_client_id_,
- move(message)));
-
- const tmb::AnnotatedMessage annotated_message(bus_->Receive(storage_manager_client_id_, 0, true));
- const TaggedMessage &tagged_message = annotated_message.tagged_message;
- CHECK_EQ(block_locator_client_id_, annotated_message.sender);
- CHECK_EQ(kGetPeerDomainNetworkAddressesResponseMessage, tagged_message.message_type());
- DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_
- << " received GetPeerDomainNetworkAddressesResponseMessage from BlockLocator";
-
- serialization::GetPeerDomainNetworkAddressesResponseMessage response_proto;
- CHECK(response_proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
-
- vector<string> domain_network_addresses;
- for (int i = 0; i < response_proto.domain_network_addresses_size(); ++i) {
- domain_network_addresses.push_back(response_proto.domain_network_addresses(i));
+string StorageManager::getPeerDomainNetworkAddress(const block_id_domain block_domain) {
+ {
+ SpinSharedMutexSharedLock<false> read_lock(block_domain_network_addresses_shared_mutex_);
+ const auto cit = block_domain_network_addresses_.find(block_domain);
+ if (cit != block_domain_network_addresses_.end()) {
+ return cit->second;
+ }
}
- return domain_network_addresses;
+ {
+ SpinSharedMutexExclusiveLock<false> write_lock(block_domain_network_addresses_shared_mutex_);
+
+ // Check one more time if the block domain network info got set up by someone else.
+ auto cit = block_domain_network_addresses_.find(block_domain);
+ if (cit != block_domain_network_addresses_.end()) {
+ return cit->second;
+ }
+
+ DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_
+ << " sent GetAllDomainNetworkAddressesMessage to BlockLocator";
+
+ DCHECK_NE(block_locator_client_id_, tmb::kClientIdNone);
+ DCHECK(bus_ != nullptr);
+ CHECK(MessageBus::SendStatus::kOK ==
+ QueryExecutionUtil::SendTMBMessage(bus_, storage_manager_client_id_, block_locator_client_id_,
+ TaggedMessage(kGetAllDomainNetworkAddressesMessage)));
+
+ const tmb::AnnotatedMessage annotated_message(bus_->Receive(storage_manager_client_id_, 0, true));
+ const TaggedMessage &tagged_message = annotated_message.tagged_message;
+ CHECK_EQ(block_locator_client_id_, annotated_message.sender);
+ CHECK_EQ(kGetAllDomainNetworkAddressesResponseMessage, tagged_message.message_type());
+ DLOG(INFO) << "StorageManager with Client " << storage_manager_client_id_
+ << " received GetAllDomainNetworkAddressesResponseMessage from BlockLocator";
+
+ serialization::GetAllDomainNetworkAddressesResponseMessage proto;
+ CHECK(proto.ParseFromArray(tagged_message.message(), tagged_message.message_bytes()));
+
+ for (const auto &domain_network_address_pair : proto.domain_network_addresses()) {
+ const block_id_domain block_domain = domain_network_address_pair.first;
+ if (block_domain_network_addresses_.find(block_domain) == block_domain_network_addresses_.end()) {
+ block_domain_network_addresses_.emplace(block_domain, domain_network_address_pair.second);
+ }
+ }
+
+ cit = block_domain_network_addresses_.find(block_domain);
+ DCHECK(cit != block_domain_network_addresses_.end());
+ return cit->second;
+ }
}
void StorageManager::sendBlockLocationMessage(const block_id block,
@@ -663,37 +670,34 @@
// already loaded before this function gets called.
BlockHandle loaded_handle;
-#ifdef QUICKSTEP_DISTRIBUTED
// TODO(quickstep-team): Use a cost model to determine whether to load from
// a remote peer or the disk.
- if (BlockIdUtil::Domain(block) != block_domain_) {
- DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from a remote peer";
- const vector<string> peer_domain_network_addresses = getPeerDomainNetworkAddresses(block);
- for (const string &peer_domain_network_address : peer_domain_network_addresses) {
- DataExchangerClientAsync client(
- grpc::CreateChannel(peer_domain_network_address, grpc::InsecureChannelCredentials()),
- this);
+ const size_t num_slots = file_manager_->numSlots(block);
+ if (num_slots != 0) {
+ void *block_buffer = allocateSlots(num_slots, numa_node);
- if (client.Pull(block, numa_node, &loaded_handle)) {
- sendBlockLocationMessage(block, kAddBlockLocationMessage);
- return loaded_handle;
- }
+ const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
+ CHECK(status) << "Failed to read block from persistent storage: " << block;
+
+ loaded_handle.block_memory = block_buffer;
+ loaded_handle.block_memory_size = num_slots;
+ } else {
+ bool pull_succeeded = false;
+
+#ifdef QUICKSTEP_DISTRIBUTED
+ const string domain_network_address = getPeerDomainNetworkAddress(BlockIdUtil::Domain(block));
+ DLOG(INFO) << "Pulling Block " << BlockIdUtil::ToString(block) << " from " << domain_network_address;
+ DataExchangerClientAsync client(
+ grpc::CreateChannel(domain_network_address, grpc::InsecureChannelCredentials()), this);
+ while (!client.Pull(block, numa_node, &loaded_handle)) {
+ LOG(INFO) << "Retry pulling Block " << BlockIdUtil::ToString(block) << " from " << domain_network_address;
}
- DLOG(INFO) << "Failed to pull Block " << BlockIdUtil::ToString(block)
- << " from remote peers, so try to load from disk.";
- }
+ pull_succeeded = true;
#endif
- const size_t num_slots = file_manager_->numSlots(block);
- DEBUG_ASSERT(num_slots != 0);
- void *block_buffer = allocateSlots(num_slots, numa_node);
-
- const bool status = file_manager_->readBlockOrBlob(block, block_buffer, kSlotSizeBytes * num_slots);
- CHECK(status) << "Failed to read block from persistent storage: " << block;
-
- loaded_handle.block_memory = block_buffer;
- loaded_handle.block_memory_size = num_slots;
+ CHECK(pull_succeeded) << "Failed to pull Block " << BlockIdUtil::ToString(block) << " from remote peers.";
+ }
#ifdef QUICKSTEP_DISTRIBUTED
if (bus_) {
diff --git a/storage/StorageManager.hpp b/storage/StorageManager.hpp
index dc4b7e8..eb40891 100644
--- a/storage/StorageManager.hpp
+++ b/storage/StorageManager.hpp
@@ -446,14 +446,13 @@
};
/**
- * @brief Get the network info of all the remote StorageManagers which may
- * load the given block in the buffer pool.
+ * @brief Get the network info of the given block domain.
*
- * @param block The block or blob to pull.
+ * @param block_domain The domain of block or blob to pull.
*
- * @return The network info of all the possible peers to pull.
+ * @return The network info of the given block domain.
**/
- std::vector<std::string> getPeerDomainNetworkAddresses(const block_id block);
+ std::string getPeerDomainNetworkAddress(const block_id_domain block_domain);
/**
* @brief Update the block location info in BlockLocator.
@@ -615,6 +614,10 @@
std::unordered_map<block_id, BlockHandle> blocks_;
alignas(kCacheLineBytes) mutable SpinSharedMutex<false> blocks_shared_mutex_;
+ // Used to pull a remote block.
+ std::unordered_map<block_id_domain, std::string> block_domain_network_addresses_;
+ alignas(kCacheLineBytes) mutable SpinSharedMutex<false> block_domain_network_addresses_shared_mutex_;
+
// This lock manager is used with the following contract:
// (1) A block cannot be evicted unless an exclusive lock is held on its
// lock shard.