blob: 75468dc77ba5feb7ce63fa8e57edc17770b3ddce [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
#include <cstddef>
#include <cstdint>
#include <string>
#include <unordered_map>
#include <vector>
#include "tmb/id_typedefs.h"
#include "tmb/message_bus.h"
#include "tmb/priority.h"
#include "tmb/tagged_message.h"
#include "tmb/internal/lock_free_garbage_collector.h"
#include "tmb/internal/persistent_bus_state_interface.h"
#include "tmb/internal/zookeeper_receiver_context.h"
struct _zhandle;
struct String_vector;
namespace tmb {
namespace internal {
class ContainerPusher;
template <typename T> class IteratorAdapter;
class MemoryMirrorDeleteBatch;
class QueuedMessage;
template <typename T> class ThreadsafeSet;
class ZookeeperTransactionBatch;
} // namespace internal
class Address;
class CancellationToken;
class MessageStyle;
/** \addtogroup TMB
* @{
* @brief An implementation of MessageBus backed by an Apache Zookeeper server
* or servers. Suitable for communication across networks.
class ZookeeperMessageBus : public MessageBus,
public internal::PersistentBusStateInterface {
* @brief Constructor.
* @param path_prefix The path in the Zookeeper data tree where a particular
* TMB instance should store its data. Multiple TMB instances can
* coexist with each other, and with other applications, sharing the
* same Zookeeper instance, so long as they have different paths.
explicit ZookeeperMessageBus(const std::string &path_prefix);
void AddServer(const std::string &hostname, const std::uint16_t port)
bool Initialize() override;
void ResetBus() override;
client_id Connect() override;
bool Disconnect(const client_id client) override;
bool RegisterClientAsSender(const client_id sender_id,
const message_type_id message_type) override;
bool RegisterClientAsReceiver(const client_id receiver_id,
const message_type_id message_type) override;
MessageBus::SendStatus Send(
const client_id sender_id,
const Address &destination_address,
const MessageStyle &style,
TaggedMessage &&message, // NOLINT(whitespace/operators)
const Priority priority = kDefaultPriority,
CancellationToken *cancellation_token = nullptr) override;
void CancelMessage(const client_id sender_id,
const CancellationToken &cancellation_token) override;
std::size_t CountQueuedMessagesForClient(const client_id receiver_id)
// PersistentBusStateInterface implementations.
void ResetBusUnchecked() override {
client_id ConnectClientUnchecked() override {
return Connect();
bool DisconnectRequiresQueueDrain() const override {
return true;
void DisconnectClientUnchecked(
const client_id client,
const internal::ThreadsafeSet<message_type_id> &sendable_set,
const internal::ThreadsafeSet<message_type_id> &receivable_set,
const internal::MemoryMirrorDeleteBatch *queued_message_batch) override;
void InsertSendableUnchecked(const client_id client,
const message_type_id message_type) override {
RegisterClientAsSender(client, message_type);
void InsertMessageTypeUnchecked(
const message_type_id message_type) override;
void InsertReceivableUnchecked(
const client_id client,
const message_type_id message_type,
const internal::ThreadsafeSet<message_type_id> &receivable_set) override;
// This implementation ignores '*sender_message_counter'.
std::int64_t SendUnchecked(
const internal::QueuedMessage &message,
const std::vector<client_id> &verified_receivers,
CancellationToken *cancellation_token,
std::uint32_t *sender_message_counter) override;
internal::MemoryMirrorDeleteBatch* CreateDeleteBatch(
const client_id receiver_id) const override;
void DeleteMessagesUnchecked(
const internal::MemoryMirrorDeleteBatch &delete_batch) override;
void CancelMessageUnchecked(
const CancellationToken &cancellation_token) override;
void CancelMessagesUnchecked(
internal::IteratorAdapter<const AnnotatedMessage> *adapter) override;
std::unordered_map<client_id, TempClientHandle>* LoadState() override;
std::size_t ReceiveImpl(const client_id receiver_id,
const Priority minimum_priority,
const std::size_t max_messages,
const bool delete_immediately,
internal::ContainerPusher *pusher) override;
std::size_t ReceiveIfAvailableImpl(const client_id receiver_id,
const Priority minimum_priority,
const std::size_t max_messages,
const bool delete_immediately,
internal::ContainerPusher *pusher)
void DeleteImpl(const client_id receiver_id,
internal::IteratorAdapter<const AnnotatedMessage> *adapter)
void CancelImpl(const client_id receiver_id,
internal::IteratorAdapter<const AnnotatedMessage> *adapter)
// When sorting children of a queue node, if N nodes are the minimum
// required, then sort N + (N >> kExtraSortShift) at first so that there are
// some "extra" sorted nodes in case of expired or cancelled messages. Note
// that this is purely a heuristic optimization, and in case all of the
// sorted children are exhausted before N is satisfied, then additional
// children will be sorted as needed.
static const std::uint8_t kExtraSortShift = 3;
// Callback for asynchronous events related to the zookeeper connection
// itself.
static void HandleConnectionEvent(_zhandle *connection_handle,
int event_type,
int connection_state,
const char *node_path,
void *watcher_context);
// Zookeeper-compatible callback which casts '*watcher_context' to a
// ZookeeperReceiverContext and invokes HandleQueueChange().
static void QueueWatchCallback(_zhandle *connection_handle,
int event_type,
int connection_state,
const char *node_path,
void *watcher_context);
// Add operations to '*batch' to recursively delete 'path' and all of its
// children. Deletion will not actually occur until '*batch' is committed.
void RecursivelyDeletePath(const std::string &path,
internal::ZookeeperTransactionBatch *batch);
// Add operations to '*batch' to delete any entries for 'client' in the
// receiver directory.
void RemoveReceiverDirectoryEntries(
const client_id client,
internal::ZookeeperTransactionBatch *batch);
// Helper method for ReceiveImpl(), ReceiveIfAvailableImpl(), and
// HandleQueueChange() which sorts and parses '*queue_node_children' under
// '*queue_node_path' and attempts to fetch valid messages, optionally
// deleting them according to 'delete_immediately'. Returns the number of
// messages successfully received and pushed onto '*pusher'. Also deallocates
// all strings in '*queue_node_children'.
std::size_t ReceiveInternal(const client_id receiver_id,
const Priority minimum_priority,
const std::size_t max_messages,
const bool delete_immediately,
const char *queue_node_path,
internal::ContainerPusher *pusher,
String_vector *queue_node_children);
// Invoked by a callback when the set of children under '*queue_node_path'
// changes, which may possibly indicate that a receivable message has been
// enqueued. This method will invoke ReceiveInternal and wake up the thread
// waiting in ReceiveImpl() if at least one message has successfully been
// received.
void HandleQueueChange(const char *queue_node_path,
internal::ZookeeperReceiverContext *context);
// Retrieves the set of receivers for the message as '*message_path' and
// attempts to cancel the message by removing it from each receiver's queue.
// Effectively a no-op if the message was not originally created as
// cancellable, or was already cancelled. Note that this merely adds delete
// operations to '*batch' which will not actually take place until '*batch'
// is committed.
void CancelInternal(const char *message_path,
internal::ZookeeperTransactionBatch *batch);
const std::string path_prefix_;
std::string client_basepath_;
std::string receiver_directory_basepath_;
std::string message_basepath_;
std::string queue_basepath_;
std::string host_string_;
_zhandle *zookeeper_handle_;
// A set of "leftover" contexts that still have a spurious watch set for
// them, and should be deleted to avoid leaking memory if the watch callbacks
// do not fire before this ZookeeperMessageBus is destroyed.
// TODO(chasseur): Once support for removing watches comes to the Zookeeper
// C API, revisit this.
// Disallow copy and assign:
ZookeeperMessageBus(const ZookeeperMessageBus &orig) = delete;
ZookeeperMessageBus& operator=(const ZookeeperMessageBus &rhs) = delete;
/** @} */
} // namespace tmb