blob: a4ca52587d498cfcf4ce22574356c8e4040cf390 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
**/
#ifndef TMB_MESSAGE_BUS_H_
#define TMB_MESSAGE_BUS_H_
#include <cstddef>
#include <cstdint>
#include <string>
#include <utility>
#include "tmb/id_typedefs.h"
#include "tmb/priority.h"
#include "tmb/tagged_message.h"
#include "tmb/internal/container_pusher.h"
#include "tmb/internal/iterator_adapter.h"
namespace tmb {
class Address;
class CancellationToken;
class MessageStyle;
namespace internal { class NetMessageRemovalInterface; }
/** \addtogroup TMB
* @{
*/
/**
* @brief A bus which allows clients to connect and send messages to eachother.
* This class defines the core interface for all MessageBus
* implementations.
**/
class MessageBus {
public:
/**
* @brief Indicates the result of a call to Send().
**/
enum class SendStatus {
kOK = 0, // Message successfully sent. This means that at
// least one recipient which was connected at the
// time the message was sent will have the message
// queued for delivery, and it will eventually be
// delivered if the recipient does not disconnect
// and the message timeout (if any) does not expire.
// Note that kOK will still be returned if some of
// the specified recipients of a message have already
// disconnected at the time the message is sent, so
// long as at least one recipient is still connected.
kNoReceivers, // The set of receivers specified by the Address is
// empty. This may be returned if all of the
// specified recipients have already disconnected at
// the time the message is sent.
kSenderNotConnected, // Error: the sender is not connected to this bus.
kSenderNotRegisteredForMessageType, // Error: the sender is not
// registered as a sender for the
// specified message type.
kReceiverNotRegisteredForMessageType // Error: an explicitly-specified
// recipient has not been registered
// as a receiver for the specified
// message type.
};
MessageBus() {
}
virtual ~MessageBus() {
}
/**
* @brief For MessageBus implementations which talk to remote servers
* (currently VoltDBMessageBus and ZookeeperMessageBus), this method
* adds a server to the set of remote servers which we will connect
* to. All servers specified should be in the same cluster. Has no
* effect for other MessageBus implementations.
* @warning This must be called at least once BEFORE calling Initialize()
* below to set up a VoltDBMessageBus or ZookeeperMessageBus.
* Calling this method after Initialize() has no effect.
* @warning Unlike other methods, this is not necessarily threadsafe.
*
* @param hostname The hostname or IP address of the server to connect to.
* @param port The TCP/IP port number to connect to on the server.
**/
virtual void AddServer(const std::string &hostname,
const std::uint16_t port) {
}
/**
* @brief Complete initialization of this MessageBus object and make it
* usable. Depending on the particular MessageBus implementation,
* calling this method may open database files, open connections to
* remote servers, etc.
* @warning This method MUST be called successfully before using any of the
* other methods below.
* @warning This method is not necessarily threadsafe, and should be called
* only once.
*
* @return true on success, false if some error occured.
**/
virtual bool Initialize() = 0;
/**
* @brief Reset the message bus and prepare the underlying storage, wiping
* out any existing state.
* @note Some MessageBus implementations, currently SQLiteMessageBus and
* ZookeeperMessageBus, must have this method called once before using
* the bus in order to set up certain persistent data structures for
* the bus.
* @warning This is NOT threadsafe, and must be called only once, AFTER
* Initialize() but before any calls to the methods below.
**/
virtual void ResetBus() = 0;
/**
* @brief Connect a client (an entity which can send and/or receive messages)
* to this bus.
*
* @return An ID which uniquely identifies the client among all the clients
* of this MessageBus.
**/
virtual client_id Connect() = 0;
/**
* @brief Disconnect a client from this bus, so that it may no longer send
* or receive messages.
* @note If there are messages pending for the client, they will not be
* delivered.
*
* @param client The ID of the client, originally provided by Connect(), to
* disconnect from this bus.
* @return true on success, false if some error is encountered (e.g.
* attempting to disconnect a client which is not actually
* connected).
**/
virtual bool Disconnect(const client_id client) = 0;
/**
* @brief Register a connected client as a sender of a particular type of
* message, allowing it to use Send() with the specified message type.
*
* @param sender_id The ID the sender, originally provided by Connect().
* @param message_type The application-specific type ID of the messages which
* will be sent.
* @return true on success, false if some error occured (e.g. there is no
* connected client with sender_id, or the specified sender is
* already registered as a sender of the specified message_type).
**/
virtual bool RegisterClientAsSender(const client_id sender_id,
const message_type_id message_type) = 0;
/**
* @brief Register a connected client as a receiver of a particular type of
* message, allowing it to start receiving messages of the specified
* type.
*
* @param receiver_id The ID of the receiver to deliver messages to.
* @param message_type The application-specific type ID of the messages to
* start receiving.
* @return true on success, false if some error occured (e.g. there is no
* connected client with receiver_id, or the specified receiver is
* already registered as a receiver of the specified message_type).
**/
virtual bool RegisterClientAsReceiver(
const client_id receiver_id,
const message_type_id message_type) = 0;
/**
* @brief Send a message asynchronously.
*
* @param sender_id The ID of the client which is sending the message.
* @param destination_address The Address describing which recipients should
* receive the message.
* @param style The style of messaging for this message.
* @param message The message to send (will be "moved from").
* @param priority The priority of the message.
* @param cancellation_token If non-NULL, make this message cancellable and
* overwrite with a token that can be used by the sender to cancel the
* message. Note that clients which receive the message will also be
* able to cancel it, possibly preventing the message from being
* received by their peers.
* @return The result of the send. See SendStatus for details.
**/
virtual SendStatus Send(
const client_id sender_id,
const Address &destination_address,
const MessageStyle &style,
TaggedMessage &&message, // NOLINT(whitespace/operators)
const Priority priority = kDefaultPriority,
CancellationToken *cancellation_token = nullptr) = 0;
/**
* @brief Receive a pending asynchronous message.
* @note This method blocks the caller until a message can be delivered. See
* also ReceiveIfAvailable().
* @note It can be more efficient to receive messages in a batch rather than
* one-at-a-time. See also ReceiveBatch().
*
* @param receiver_id The ID of the receiver which calls this method to get a
* message.
* @param minimum_priority The minimum priority of messages to receive.
* @param delete_immediately If true, delete the received message from the
* bus immediately during this call. Otherwise, message should be
* explicitly deleted by calling DeleteMessage().
* @return The next pending message for the specified receiver.
**/
AnnotatedMessage Receive(const client_id receiver_id,
const Priority minimum_priority = 0,
const bool delete_immediately = false) {
AnnotatedMessage message;
internal::SingleElementPusher pusher(&message);
ReceiveImpl(receiver_id, minimum_priority, 1, delete_immediately, &pusher);
return message;
}
/**
* @brief Receive a batch of pending asynchronous messages.
* @note This method blocks the caller until at least one message can be
* delivered. See also ReceiveBatchIfAvailable().
*
* @param receiver_id The ID of the receiver which calls this method to get a
* message.
* @param message_container A pointer to a container with a
* push_back(const AnnotatedMessage&) or push_back(AnnotatedMessage&&)
* method (e.g. std::vector<AnnotatedMessage>,
* std::list<AnnotatedMessage>, or std::deque<AnnotatedMessage>) which
* messages will be inserted into.
* @param minimum_priority The minimum priority of messages to receive.
* @param max_messages The maximum number of messages to deliver in the batch
* (0 for no limit).
* @param delete_immediately If true, delete the received messages from the
* bus immediately during this call. Otherwise, messages should be
* explicitly deleted later by calling DeleteMessages().
* @return The number of messages actually received in the batch (i.e. the
* number of messages inserted into message_container).
**/
template <typename ContainerT>
std::size_t ReceiveBatch(const client_id receiver_id,
ContainerT *message_container,
const Priority minimum_priority = 0,
const std::size_t max_messages = 0,
const bool delete_immediately = false) {
internal::ContainerPusherImpl<ContainerT> pusher(message_container);
return ReceiveImpl(receiver_id,
minimum_priority,
max_messages,
delete_immediately,
&pusher);
}
/**
* @brief Receive a pending asynchronous message if any is available when
* this method is called.
* @note Unlike Receive(), this method will not block indefinitely while
* waiting for a message.
* @note It can be more efficient to receive messages in a batch rather than
* one-at-a-time. See also ReceiveBatchIfAvailable().
*
* @param receiver_id The ID of the receiver which calls this method to get a
* message.
* @param message_ptr A pointer to an uninitialized AnnotatedMessage which
* will be overwritten by the received message, if any.
* @param minimum_priority The minimum priority of messages to receive.
* @param delete_immediately If true, delete the received message (if any)
* from the bus immediately during this call. Otherwise, message
* should be explicitly deleted later by calling DeleteMessage().
* @return true if a message was delivered (overwriting *message_ptr), false
* if there were no pending messages for the specified Receiver at
* the time of the call.
**/
bool ReceiveIfAvailable(const client_id receiver_id,
AnnotatedMessage *message_ptr,
const Priority minimum_priority = 0,
const bool delete_immediately = false) {
internal::SingleElementPusher pusher(message_ptr);
return ReceiveIfAvailableImpl(receiver_id,
minimum_priority,
1,
delete_immediately,
&pusher)
> 0;
}
/**
* @brief Receive a batch of pending asynchronous messages if any are
* available when this method is called.
* @note Unlike ReceiveBatch(), this method will not block indefinitely while
* waiting for a message.
*
* @param receiver_id The ID of the receiver which calls this method to get a
* message.
* @param message_container A pointer to a container with a
* push_back(const AnnotatedMessage&) or push_back(AnnotatedMessage&&)
* method (e.g. std::vector<AnnotatedMessage>,
* std::list<AnnotatedMessage>, or std::deque<AnnotatedMessage>) which
* messages will be inserted into.
* @param minimum_priority The minimum priority of messages to receive.
* @param max_messages The maximum number of messages to deliver in the batch
* (0 for no limit).
* @param delete_immediately If true, delete the received messages (if any)
* from the bus immediately during this call. Otherwise, messages
* should be explicitly deleted later by calling DeleteMessages().
* @return The number of messages actually received in the batch (i.e. the
* number of messages inserted into '*message_container').
**/
template <typename ContainerT>
std::size_t ReceiveBatchIfAvailable(const client_id receiver_id,
ContainerT *message_container,
const Priority minimum_priority = 0,
const std::size_t max_messages = 0,
const bool delete_immediately = false) {
internal::ContainerPusherImpl<ContainerT> pusher(message_container);
return ReceiveIfAvailableImpl(receiver_id,
minimum_priority,
max_messages,
delete_immediately,
&pusher);
}
/**
* @brief Delete a single message received by a call to one of the Receive
* methods.
*
* @param receiver_id The ID of the receiver which calls this method to
* delete a message.
* @param message A message previously received by the client.
**/
void DeleteMessage(const client_id receiver_id,
const AnnotatedMessage &message) {
internal::IteratorAdapterImpl<const AnnotatedMessage*>
adapter(&message, &message + 1);
DeleteImpl(receiver_id, &adapter);
}
/**
* @brief Delete a batch of messages received by a call to one of the Receive
* methods.
*
* @param receiver_id The ID of the receiver which calls this method to
* delete messages.
* @param begin An iterator to the first AnnotatedMessage to be deleted.
* @param end An iterator to one-past-the-end of the AnnotatedMessages to
* be deleted.
**/
template <typename IteratorT>
void DeleteMessages(const client_id receiver_id,
const IteratorT &begin,
const IteratorT &end) {
internal::IteratorAdapterImpl<IteratorT> adapter(begin, end);
DeleteImpl(receiver_id, &adapter);
}
/**
* @brief Cancel a previously sent message.
* @note This does NOT guarantee that clients will not receive the cancelled
* message, as they could have received it before it was cancelled. It
* will prevent the message from being received after the call is
* complete, however.
*
* @param sender_id The ID of the client which originally sent the message,
* and is now cancelling it.
* @param cancellation_token The cancellation token generated by the call to
* Send() which originally sent the message.
**/
virtual void CancelMessage(const client_id sender_id,
const CancellationToken &cancellation_token) = 0;
/**
* @brief Cancel a previously received message, preventing other clients from
* receiving it in the future.
* @note This does NOT guarantee that clients will not receive the cancelled
* message, as they could have received it before it was cancelled. It
* will prevent the message from being received after the call is
* complete, however.
* @note A message can only be cancelled if the sender originally made it
* cancellable. Attempting to cancel a non-cancellable message is a
* silent no-op.
*
* @param receiver_id The ID of the client which received the message and is
* now cancelling it.
* @param received_message The previously received message which is now being
* cancelled.
**/
void CancelMessage(const client_id receiver_id,
const AnnotatedMessage &received_message) {
internal::IteratorAdapterImpl<const AnnotatedMessage*>
adapter(&received_message, &received_message + 1);
CancelImpl(receiver_id, &adapter);
}
/**
* @brief Cancel a batch of previously received messages, preventing other
* clients from receiving them in the future.
* @note This does NOT guarantee that clients will not receive a cancelled
* message, as they could have received it before it was cancelled. It
* will prevent messages from being received after the call is
* complete, however.
* @note A message can only be cancelled if the sender originally made it
* cancellable. Attempting to cancel a non-cancellable message is a
* silent no-op.
*
* @param receiver_id The ID of the client which received the messages and is
* now cancelling them.
* @param begin An iterator to the first AnnotatedMessage to be cancelled.
* @param end An iterator to one-past-the-end of the AnnotatedMessages to
* be cancelled.
**/
template <typename IteratorT>
void CancelMessages(const client_id receiver_id,
const IteratorT &begin,
const IteratorT &end) {
internal::IteratorAdapterImpl<IteratorT> adapter(begin, end);
CancelImpl(receiver_id, &adapter);
}
/**
* @brief Count the number of pending messages queued for a particular
* client.
* @note This method may count messages that are already expired or
* cancelled, and therefore will never be delivered to the specified
* receiver.
*
* @param receiver_id The ID of the client to count pending queued messages
* for.
* @return The number of messages queued for the specified client.
**/
virtual std::size_t CountQueuedMessagesForClient(
const client_id receiver_id) = 0;
/**
* @brief Determine if GetNetMessageRemovalInterface() can be called for this
* MessageBus.
* @warning This is intended only for internal use by TMB network servers.
*
* @return true if GetNetMessageRemovalInterface() can be called safely.
**/
virtual bool SupportsNetMessageRemovalInterface() const {
return false;
}
/**
* @brief Return a pointer to this MessageBus cast to
* NetMessageRemovalInterface.
* @warning This is intended only for internal use by TMB network servers,
* and can only be called if SupportsNetMessageRemovalInterface()
* returns true.
*
* @return A pointer to this MessageBus cast to NetMessageRemovalInterface.
**/
virtual internal::NetMessageRemovalInterface*
GetNetMessageRemovalInterface();
protected:
// Default implementation does active polling. Implementations should
// override this if they can do something smarter.
virtual std::size_t ReceiveImpl(const client_id receiver_id,
const Priority minimum_priority,
const std::size_t max_messages,
const bool delete_immediately,
internal::ContainerPusher *pusher);
virtual std::size_t ReceiveIfAvailableImpl(
const client_id receiver_id,
const Priority minimum_priority,
const std::size_t max_messages,
const bool delete_immediately,
internal::ContainerPusher *pusher) = 0;
virtual void DeleteImpl(
const client_id receiver_id,
internal::IteratorAdapter<const AnnotatedMessage> *adapter) = 0;
virtual void CancelImpl(
const client_id receiver_id,
internal::IteratorAdapter<const AnnotatedMessage> *adapter) = 0;
private:
// The number of milliseconds to sleep between calls to
// ReceiveIfAvailableImpl() in the default active-polling implementation of
// ReceiveImpl().
static const unsigned int kReceivePollIntervalMS = 100;
// Disallow copy and assign:
MessageBus(const MessageBus &orig) = delete;
MessageBus& operator=(const MessageBus &rhs) = delete;
};
/** @} */
} // namespace tmb
#endif // TMB_MESSAGE_BUS_H_