blob: 130a5eab9a5643c4fbd753d2dd0fa2b5da048979 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
// TODO(chasseur): Better error handling in place of asserts (also handle
// exceptions thrown by VoltDB).
#include "tmb/voltdb_message_bus.h"
#include <cassert>
#include <chrono> // NOLINT(build/c++11)
#include <cstdint>
#include <limits>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "InvocationResponse.hpp" // NOLINT(build/include)
#include "ParameterSet.hpp" // NOLINT(build/include)
#include "Procedure.hpp" // NOLINT(build/include)
#include "Row.hpp" // NOLINT(build/include)
#include "Table.h" // NOLINT(build/include)
#include "TableIterator.h" // NOLINT(build/include)
#include "tmb/address.h"
#include "tmb/cancellation_token.h"
#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
#include "tmb/message_style.h"
#include "tmb/priority.h"
#include "tmb/tagged_message.h"
#include "tmb/internal/c_string_buffer.h"
#include "tmb/internal/container_pusher.h"
#include "tmb/internal/memory_mirror_delete_batch.h"
#include "tmb/internal/persistent_bus_state_interface.h"
#include "tmb/internal/queued_message.h"
#include "tmb/internal/shared_bool.h"
#include "tmb/internal/voltdb_connection_pool.h"
#include "tmb/internal/voltdb_procedure_warehouse.h"
namespace {
inline void AddTimestampParam(
const std::chrono::time_point<std::chrono::high_resolution_clock> &time,
voltdb::ParameterSet *parameter_set) {
if (time.time_since_epoch().count() == 0) {
parameter_set->addNull();
} else {
// VoltDB timestamps are represented as an int64_t counting microseconds
// since the epoch.
parameter_set->addTimestamp(
std::chrono::duration_cast<std::chrono::microseconds>(
time.time_since_epoch()).count());
}
}
class VoltDBMemoryMirrorDeleteBatch
: public tmb::internal::MemoryMirrorDeleteBatch {
public:
explicit VoltDBMemoryMirrorDeleteBatch(const tmb::client_id receiver_id)
: tmb::internal::MemoryMirrorDeleteBatch(receiver_id) {
}
~VoltDBMemoryMirrorDeleteBatch() {
}
inline void AddMessage(
const tmb::internal::QueuedMessage &message) override {
message_ids_.push_back(message.GetMessageID());
}
private:
std::vector<std::int64_t> message_ids_;
friend class tmb::VoltDBMessageBus;
};
} // namespace
namespace tmb {
void VoltDBMessageBus::ResetBus() {
voltdb::InvocationResponse response(
connection_pool_.InvokeProcedure(warehouse_.GetProcedure(
ProcedureID::kResetBus).get()));
assert(response.success());
}
client_id VoltDBMessageBus::Connect() {
voltdb::InvocationResponse response(
connection_pool_.InvokeProcedure(warehouse_.GetProcedure(
ProcedureID::kConnectClient).get()));
while (response.statusCode() == voltdb::STATUS_CODE_GRACEFUL_FAILURE) {
// Probably an accidental collision on client_id, so retry.
response = connection_pool_.InvokeProcedure(warehouse_.GetProcedure(
internal::VoltDBProcedureWarehouse::ProcedureID::kConnectClient)
.get());
}
assert(response.success());
std::vector<voltdb::Table> data = response.results();
assert(data.size() == 1);
assert(data[0].rowCount() == 1);
assert(data[0].columnCount() == 1);
voltdb::TableIterator table_it = data[0].iterator();
assert(table_it.hasNext());
voltdb::Row row = table_it.next();
return row.getInt64(0);
}
bool VoltDBMessageBus::Disconnect(const client_id client) {
ProcedureHandle disconnect_procedure(warehouse_.GetProcedure(
ProcedureID::kDisconnectClient));
disconnect_procedure->params()->addInt32(client);
voltdb::InvocationResponse response(
connection_pool_.InvokeProcedure(disconnect_procedure.get()));
disconnect_procedure.release();
if (response.success()) {
return true;
} else {
// Expect that transaction was aborted because client was not connected.
assert(response.statusCode() == voltdb::STATUS_CODE_USER_ABORT);
return false;
}
}
bool VoltDBMessageBus::RegisterClientAsSender(
const client_id sender_id,
const message_type_id message_type) {
ProcedureHandle register_sender_procedure(warehouse_.GetProcedure(
ProcedureID::kRegisterSender));
register_sender_procedure->params()->addInt32(sender_id)
.addInt32(message_type);
voltdb::InvocationResponse response(
connection_pool_.InvokeProcedure(register_sender_procedure.get()));
register_sender_procedure.release();
if (response.success()) {
return true;
} else {
// STATUS_CODE_USER_ABORT occurs if the client was not connected.
// STATUS_CODE_GRACEFUL_FAILURE occurs if the client is already registered
// as a sender of the given message type.
assert((response.statusCode() == voltdb::STATUS_CODE_USER_ABORT)
|| (response.statusCode() == voltdb::STATUS_CODE_GRACEFUL_FAILURE));
return false;
}
}
bool VoltDBMessageBus::RegisterClientAsReceiver(
const client_id receiver_id,
const message_type_id message_type) {
ProcedureHandle register_receiver_procedure(warehouse_.GetProcedure(
ProcedureID::kRegisterReceiver));
register_receiver_procedure->params()->addInt32(receiver_id)
.addInt32(message_type);
voltdb::InvocationResponse response(connection_pool_.InvokeProcedure(
register_receiver_procedure.get()));
register_receiver_procedure.release();
if (response.success()) {
return true;
} else {
// STATUS_CODE_USER_ABORT occurs if the client was not connected.
// STATUS_CODE_GRACEFUL_FAILURE occurs if the client is already registered
// as a receiver of the given message type.
assert((response.statusCode() == voltdb::STATUS_CODE_USER_ABORT)
|| (response.statusCode() == voltdb::STATUS_CODE_GRACEFUL_FAILURE));
return false;
}
}
MessageBus::SendStatus VoltDBMessageBus::Send(
const client_id sender_id,
const Address &destination_address,
const MessageStyle &style,
TaggedMessage &&message, // NOLINT(whitespace/operators)
const Priority priority,
CancellationToken *cancellation_token) {
ProcedureHandle send_procedure;
if (destination_address.send_to_all_) {
send_procedure = warehouse_.GetProcedure(ProcedureID::kSendToAny);
voltdb::ParameterSet *params = send_procedure->params();
params->addInt32(sender_id);
AddTimestampParam(style.expiration_time_, params);
params->addInt16(priority);
params->addInt32(message.message_type());
params->addBytes(message.message_bytes(),
static_cast<const std::uint8_t*>(message.message()));
params->addInt8(style.broadcast_ ? 1 : 0);
params->addInt8(cancellation_token == nullptr ? 0 : 1);
} else if ((destination_address.explicit_recipients_.size() == 1)
&& (cancellation_token == nullptr)) {
send_procedure = warehouse_.GetProcedure(
ProcedureID::kSendToSingleExplicitReceiver);
voltdb::ParameterSet *params = send_procedure->params();
params->addInt32(sender_id);
params->addInt32(*(destination_address.explicit_recipients_.begin()));
AddTimestampParam(style.expiration_time_, params);
params->addInt16(priority);
params->addInt32(message.message_type());
params->addBytes(message.message_bytes(),
static_cast<const std::uint8_t*>(message.message()));
} else {
send_procedure = warehouse_.GetProcedure(
ProcedureID::kSendToExplicitReceivers);
voltdb::ParameterSet *params = send_procedure->params();
params->addInt32(sender_id);
std::vector<std::int32_t> receivers_vec(
destination_address.explicit_recipients_.begin(),
destination_address.explicit_recipients_.end());
params->addInt32(receivers_vec);
AddTimestampParam(style.expiration_time_, params);
params->addInt16(priority);
params->addInt32(message.message_type());
params->addBytes(
message.message_bytes(),
static_cast<const std::uint8_t*>(message.message()));
params->addInt8(style.broadcast_ ? 1 : 0);
params->addInt8(cancellation_token == nullptr ? 0 : 1);
}
voltdb::InvocationResponse response(
connection_pool_.InvokeProcedure(send_procedure.get()));
send_procedure.release();
assert(response.success());
std::vector<voltdb::Table> data = response.results();
assert(data.size() == 1);
assert(data[0].rowCount() == 1);
assert((data[0].columnCount() == 1) || (data[0].columnCount() == 2));
voltdb::TableIterator table_it = data[0].iterator();
assert(table_it.hasNext());
voltdb::Row row = table_it.next();
if ((row.getInt64(0) == 0) && (cancellation_token != nullptr)) {
cancellation_token->message_id_ = row.getInt64(1);
}
return static_cast<MessageBus::SendStatus>(row.getInt64(0));
}
void VoltDBMessageBus::CancelMessage(
const client_id sender_id,
const CancellationToken &cancellation_token) {
if (cancellation_token.message_id_ == -1) {
return;
}
std::vector<std::int64_t> message_ids(1, cancellation_token.message_id_);
ProcedureHandle cancel_messages_procedure(warehouse_.GetProcedure(
ProcedureID::kCancelMessages));
cancel_messages_procedure->params()->addInt64(message_ids);
voltdb::InvocationResponse response(
connection_pool_.InvokeProcedure(cancel_messages_procedure.get()));
assert(response.success());
}
std::size_t VoltDBMessageBus::CountQueuedMessagesForClient(
const client_id receiver_id) {
ProcedureHandle count_queued_messages_for_client_procedure(
warehouse_.GetProcedure(ProcedureID::kCountQueuedMessagesForClient));
count_queued_messages_for_client_procedure->params()->addInt32(receiver_id);
voltdb::InvocationResponse response(connection_pool_.InvokeProcedure(
count_queued_messages_for_client_procedure.get()));
assert(response.success());
std::vector<voltdb::Table> data = response.results();
assert(data.size() == 1);
assert(data[0].rowCount() == 1);
assert(data[0].columnCount() == 1);
voltdb::TableIterator table_it = data[0].iterator();
assert(table_it.hasNext());
voltdb::Row row = table_it.next();
return row.getInt64(0);
}
void VoltDBMessageBus::DeleteById(
const client_id receiver_id,
const std::vector<std::int64_t> &message_ids) {
ProcedureHandle delete_procedure(warehouse_.GetProcedure(
ProcedureID::kDeleteMessagesUnchecked));
delete_procedure->params()->addInt32(receiver_id)
.addInt64(message_ids);
voltdb::InvocationResponse response(
connection_pool_.InvokeProcedure(delete_procedure.get()));
assert(response.success());
}
void VoltDBMessageBus::SenderCancelById(
const client_id sender_id,
const std::vector<std::int64_t> &message_ids) {
ProcedureHandle cancel_messages_procedure(warehouse_.GetProcedure(
ProcedureID::kCancelMessages));
cancel_messages_procedure->params()->addInt64(message_ids);
voltdb::InvocationResponse response(
connection_pool_.InvokeProcedure(cancel_messages_procedure.get()));
assert(response.success());
}
void VoltDBMessageBus::ReceiverCancelById(
const client_id receiver_id,
const std::vector<std::int64_t> &message_ids) {
ProcedureHandle cancel_messages_procedure(warehouse_.GetProcedure(
ProcedureID::kCancelMessages));
cancel_messages_procedure->params()->addInt64(message_ids);
voltdb::InvocationResponse response(
connection_pool_.InvokeProcedure(cancel_messages_procedure.get()));
assert(response.success());
}
void VoltDBMessageBus::InsertSendableUnchecked(
const client_id client,
const message_type_id message_type) {
ProcedureHandle register_sender_unchecked_procedure(warehouse_.GetProcedure(
ProcedureID::kRegisterSenderUnchecked));
register_sender_unchecked_procedure->params()->addInt32(client)
.addInt32(message_type);
voltdb::InvocationResponse response(connection_pool_.InvokeProcedure(
register_sender_unchecked_procedure.get()));
assert(response.success());
}
void VoltDBMessageBus::InsertReceivableUnchecked(
const client_id client,
const message_type_id message_type,
const internal::ThreadsafeSet<message_type_id> &receivable_set) {
ProcedureHandle register_receiver_unchecked_procedure(
warehouse_.GetProcedure(ProcedureID::kRegisterReceiverUnchecked));
register_receiver_unchecked_procedure->params()->addInt32(client)
.addInt32(message_type);
voltdb::InvocationResponse response(connection_pool_.InvokeProcedure(
register_receiver_unchecked_procedure.get()));
assert(response.success());
}
std::int64_t VoltDBMessageBus::SendUnchecked(
const internal::QueuedMessage &message,
const std::vector<client_id> &verified_receivers,
CancellationToken *cancellation_token,
std::uint32_t *sender_message_counter) {
ProcedureHandle send_procedure;
if ((cancellation_token == nullptr) && (verified_receivers.size() == 1)) {
send_procedure = warehouse_.GetProcedure(
ProcedureID::kSendToSingleExplicitReceiverUnchecked);
voltdb::ParameterSet *params = send_procedure->params();
params->addInt32(message.GetAnnotatedMessage().sender);
params->addInt32(verified_receivers.front());
AddTimestampParam(message.GetAnnotatedMessage().send_time, params);
AddTimestampParam(message.GetExpirationTime(), params);
params->addInt16(message.GetPriority());
params->addInt32(
message.GetAnnotatedMessage().tagged_message.message_type());
params->addBytes(
message.GetAnnotatedMessage().tagged_message.message_bytes(),
static_cast<const std::uint8_t*>(
message.GetAnnotatedMessage().tagged_message.message()));
} else {
send_procedure = warehouse_.GetProcedure(
ProcedureID::kSendToExplicitReceiversUnchecked);
voltdb::ParameterSet *params = send_procedure->params();
params->addInt32(message.GetAnnotatedMessage().sender);
std::vector<std::int32_t> receivers_vec(verified_receivers.begin(),
verified_receivers.end());
params->addInt32(receivers_vec);
AddTimestampParam(message.GetAnnotatedMessage().send_time, params);
AddTimestampParam(message.GetExpirationTime(), params);
params->addInt16(message.GetPriority());
params->addInt32(
message.GetAnnotatedMessage().tagged_message.message_type());
params->addBytes(
message.GetAnnotatedMessage().tagged_message.message_bytes(),
static_cast<const std::uint8_t*>(
message.GetAnnotatedMessage().tagged_message.message()));
params->addInt8(cancellation_token == nullptr ? 0 : 1);
}
voltdb::InvocationResponse response(
connection_pool_.InvokeProcedure(send_procedure.get()));
send_procedure.release();
assert(response.success());
std::vector<voltdb::Table> data = response.results();
assert(data.size() == 1);
assert(data[0].rowCount() == 1);
assert(data[0].columnCount() == 1);
voltdb::TableIterator table_it = data[0].iterator();
assert(table_it.hasNext());
voltdb::Row row = table_it.next();
std::int64_t message_id = row.getInt64(0);
if (cancellation_token != nullptr) {
cancellation_token->message_id_ = message_id;
}
return message_id;
}
internal::MemoryMirrorDeleteBatch* VoltDBMessageBus::CreateDeleteBatch(
const client_id receiver_id) const {
return new VoltDBMemoryMirrorDeleteBatch(receiver_id);
}
void VoltDBMessageBus::DeleteMessagesUnchecked(
const internal::MemoryMirrorDeleteBatch &delete_batch) {
const VoltDBMemoryMirrorDeleteBatch &voltdb_delete_batch
= static_cast<const VoltDBMemoryMirrorDeleteBatch&>(delete_batch);
ProcedureHandle delete_procedure(warehouse_.GetProcedure(
ProcedureID::kDeleteMessagesUnchecked));
delete_procedure->params()->addInt32(voltdb_delete_batch.receiver_id_)
.addInt64(voltdb_delete_batch.message_ids_);
voltdb::InvocationResponse response(
connection_pool_.InvokeProcedure(delete_procedure.get()));
assert(response.success());
}
void VoltDBMessageBus::CancelMessageUnchecked(
const CancellationToken &cancellation_token) {
std::vector<std::int64_t> message_ids(1, cancellation_token.message_id_);
ProcedureHandle cancel_messages_procedure(warehouse_.GetProcedure(
ProcedureID::kCancelMessages));
cancel_messages_procedure->params()->addInt64(message_ids);
voltdb::InvocationResponse response(
connection_pool_.InvokeProcedure(cancel_messages_procedure.get()));
assert(response.success());
}
void VoltDBMessageBus::CancelMessagesUnchecked(
internal::IteratorAdapter<const AnnotatedMessage> *adapter) {
std::vector<std::int64_t> message_ids;
while (adapter->Valid()) {
if ((*adapter)->in_memory_cancel_flag.Valid()) {
message_ids.push_back((*adapter)->deletion_token.message_id);
}
adapter->Next();
}
ProcedureHandle cancel_messages_procedure(warehouse_.GetProcedure(
ProcedureID::kCancelMessages));
cancel_messages_procedure->params()->addInt64(message_ids);
voltdb::InvocationResponse response(
connection_pool_.InvokeProcedure(cancel_messages_procedure.get()));
assert(response.success());
}
std::unordered_map<client_id,
internal::PersistentBusStateInterface::TempClientHandle>*
VoltDBMessageBus::LoadState() {
voltdb::InvocationResponse response(
connection_pool_.InvokeProcedure(warehouse_.GetProcedure(
ProcedureID::kLoadState).get()));
assert(response.success());
std::vector<voltdb::Table> data = response.results();
assert(data.size() == 5);
std::unique_ptr<std::unordered_map<client_id, TempClientHandle>> client_map(
new std::unordered_map<client_id, TempClientHandle>());
// Create client records.
voltdb::TableIterator client_it = data[0].iterator();
while (client_it.hasNext()) {
voltdb::Row client_row = client_it.next();
client_map->emplace(client_row.getInt32(0), TempClientHandle());
}
// Load sendable types.
voltdb::TableIterator sendable_it = data[1].iterator();
while (sendable_it.hasNext()) {
voltdb::Row sendable_row = sendable_it.next();
client_map->find(sendable_row.getInt32(0))->second.sendable->emplace(
sendable_row.getInt32(1));
}
// Load receivable types.
voltdb::TableIterator receivable_it = data[2].iterator();
while (receivable_it.hasNext()) {
voltdb::Row receivable_row = receivable_it.next();
client_map->find(receivable_row.getInt32(0))->second.receivable->emplace(
receivable_row.getInt32(1));
}
// Reconstruct cancellation tokens for cancellable messages.
std::unordered_map<std::int64_t, internal::SharedBool> cancellation_flags;
voltdb::TableIterator cancellable_it = data[3].iterator();
while (cancellable_it.hasNext()) {
voltdb::Row cancellable_row = cancellable_it.next();
std::int64_t message_id = cancellable_row.getInt64(0);
cancellation_flags.emplace(message_id, internal::SharedBool(false));
}
// Load messages.
voltdb::TableIterator message_it = data[4].iterator();
while (message_it.hasNext()) {
voltdb::Row message_row = message_it.next();
client_id receiver_id = message_row.getInt32(0);
std::int64_t message_id = message_row.getInt64(1);
std::unordered_map<client_id, TempClientHandle>::iterator client_it
= client_map->find(receiver_id);
assert(client_it != client_map->end());
int payload_bytes = message_row.getInt32(8);
internal::CStringBuffer payload_buffer(payload_bytes);
int bytes_written;
bool written = message_row.getVarbinary(
7,
payload_bytes,
reinterpret_cast<std::uint8_t*>(payload_buffer.Get()),
&bytes_written);
assert(written);
assert(bytes_written == payload_bytes);
TaggedMessage msg;
msg.acquire_message(payload_buffer.Release(),
payload_bytes,
message_row.getInt32(6));
std::chrono::time_point<std::chrono::high_resolution_clock> send_time(
std::chrono::microseconds(message_row.getTimestamp(3)));
std::chrono::time_point<std::chrono::high_resolution_clock>
expiration_time;
if (message_row.getTimestamp(4)
!= std::numeric_limits<std::int64_t>::max()) {
expiration_time
= std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::microseconds(message_row.getTimestamp(4)));
}
internal::SharedBool cancellation_flag;
std::unordered_map<std::int64_t, internal::SharedBool>::const_iterator
cancellation_flag_it = cancellation_flags.find(message_id);
if (cancellation_flag_it != cancellation_flags.end()) {
cancellation_flag = cancellation_flag_it->second;
}
internal::QueuedMessage queued_msg(message_row.getInt32(2),
message_row.getInt16(5),
send_time,
expiration_time,
cancellation_flag,
std::move(msg));
queued_msg.SetMessageID(message_id);
client_it->second.queued_messages.emplace_back(std::move(queued_msg));
}
return client_map.release();
}
std::size_t VoltDBMessageBus::ReceiveIfAvailableImpl(
const client_id receiver_id,
const Priority minimum_priority,
const std::size_t max_messages,
const bool delete_immediately,
internal::ContainerPusher *pusher) {
ProcedureHandle receive_procedure(warehouse_.GetProcedure(
delete_immediately ? ProcedureID::kReceiveAndDelete
: ProcedureID::kReceive));
receive_procedure->params()->addInt32(receiver_id)
.addInt16(minimum_priority)
.addInt64(max_messages);
voltdb::InvocationResponse response(
connection_pool_.InvokeProcedure(receive_procedure.get()));
receive_procedure.release();
assert(response.success());
std::vector<voltdb::Table> data = response.results();
assert(data.size() == 1);
if (data[0].rowCount() > 0) {
voltdb::TableIterator table_it = data[0].iterator();
while (table_it.hasNext()) {
AnnotatedMessage message;
voltdb::Row message_row = table_it.next();
message.deletion_token.message_id = message_row.getInt64(0);
message.sender = message_row.getInt32(1);
message.send_time
= std::chrono::time_point<std::chrono::high_resolution_clock>(
std::chrono::microseconds(message_row.getTimestamp(2)));
int payload_bytes = message_row.getInt32(5);
internal::CStringBuffer payload_buffer(payload_bytes);
int bytes_written;
bool written = message_row.getVarbinary(
4,
payload_bytes,
reinterpret_cast<std::uint8_t*>(payload_buffer.Get()),
&bytes_written);
assert(written);
assert(bytes_written == payload_bytes);
message.tagged_message.acquire_message(payload_buffer.Release(),
payload_bytes,
message_row.getInt32(3));
pusher->Push(std::move(message));
}
}
return data[0].rowCount();
}
void VoltDBMessageBus::DeleteImpl(
const client_id receiver_id,
internal::IteratorAdapter<const AnnotatedMessage> *adapter) {
std::vector<std::int64_t> message_ids;
while (adapter->Valid()) {
message_ids.push_back((*adapter)->deletion_token.message_id);
adapter->Next();
}
if (message_ids.empty()) {
return;
}
ProcedureHandle delete_messages_procedure(warehouse_.GetProcedure(
ProcedureID::kDeleteMessages));
delete_messages_procedure->params()->addInt32(receiver_id)
.addInt64(message_ids);
voltdb::InvocationResponse response(
connection_pool_.InvokeProcedure(delete_messages_procedure.get()));
assert(response.success());
}
void VoltDBMessageBus::CancelImpl(
const client_id receiver_id,
internal::IteratorAdapter<const AnnotatedMessage> *adapter) {
std::vector<std::int64_t> message_ids;
while (adapter->Valid()) {
message_ids.push_back((*adapter)->deletion_token.message_id);
adapter->Next();
}
if (message_ids.empty()) {
return;
}
ProcedureHandle cancel_messages_procedure(warehouse_.GetProcedure(
ProcedureID::kCancelMessages));
cancel_messages_procedure->params()->addInt64(message_ids);
voltdb::InvocationResponse response(
connection_pool_.InvokeProcedure(cancel_messages_procedure.get()));
assert(response.success());
}
} // namespace tmb