blob: 727d369ce048af34524d9f0c200a74bd281585af [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include <cstdint>
#include <map>
#include <memory>
#include <string>
#include <vector>
#include <boost/thread/shared_mutex.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/unordered_map.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include "common/atomic.h"
#include "common/status.h"
#include "gen-cpp/StatestoreService.h"
#include "gen-cpp/StatestoreSubscriber.h"
#include "gen-cpp/Types_types.h"
#include "rpc/thrift-client.h"
#include "runtime/client-cache.h"
#include "runtime/timestamp-value.h"
#include "statestore/failure-detector.h"
#include "statestore/statestore-subscriber-client-wrapper.h"
#include "util/aligned-new.h"
#include "util/metrics-fwd.h"
#include "util/thread-pool.h"
#include "util/webserver.h"
namespace impala {
class Status;
typedef ClientCache<StatestoreSubscriberClientWrapper> StatestoreSubscriberClientCache;
typedef TUniqueId RegistrationId;
/// The Statestore is a soft-state key-value store that maintains a set of Topics, which
/// are maps from string keys to byte array values.
///
/// Topics are subscribed to by subscribers, which are remote clients of the statestore
/// which express an interest in some set of Topics. The statestore sends topic updates to
/// subscribers via periodic 'update' messages, and also sends periodic 'heartbeat'
/// messages, which are used to detect the liveness of a subscriber. Updates for each
/// topic are delivered sequentially to each subscriber per subscription. E.g. if a
/// subscriber is subscribed to a topic "foo", the statestore will not deliver topic
/// updates for "foo" out-of-order or concurrently, but the updates may be sent
/// concurrently or out-of-order with "bar".
///
/// In response to 'update' messages, subscribers, send topic updates to the statestore to
/// merge with the current topic. These updates are then sent to all other subscribers in
/// their next update message. The next message is scheduled update_frequency in the
/// future, unless the subscriber indicated that it skipped processing an update, in which
/// case the statestore will back off slightly before re-sending a new update. The
/// update frequency is determined by FLAGS_statestore_update_frequency_ms or
/// FLAGS_statestore_priority_update_frequency, depending on whether the topic is a
/// prioritized topic.
///
/// Prioritized topics are topics that are small but important to delivery in a timely
/// manner. Handling those topics in a separate threadpool prevents large updates of other
/// topics slowing or blocking dissemination of updates to prioritized topics.
///
/// Topic entries usually have human-readable keys, and values which are some serialised
/// representation of a data structure, e.g. a Thrift struct. The contents of a value's
/// byte string is opaque to the statestore, which maintains no information about how to
/// deserialise it. Subscribers must use convention to interpret each other's updates.
///
/// A subscriber may have marked some updates that it made as 'transient', which implies
/// that those entries should be deleted once the subscriber is no longer connected (this
/// is judged by the statestore's failure-detector, which will mark a subscriber as failed
/// when it has not responded to a number of successive heartbeat messages). Transience
/// is tracked per-topic-per-subscriber, so two different subscribers may treat the same
/// topic differently wrt to the transience of their updates.
///
/// The statestore tracks the history of updates to each topic, with each topic update
/// getting a sequentially increasing version number that is unique across the topic.
///
/// Subscribers also track the max version of each topic which they have have successfully
/// processed. The statestore can use this information to send a delta of updates to a
/// subscriber, rather than all items in the topic. For non-delta updates, the statestore
/// will send an update that includes all values in the topic.
///
/// Subscribers may filter the keys within a subscribed topic by an optional prefix. If
/// a key filter prefix is specified, only entries matching that prefix will be sent to
/// the subscriber in updates. Note that this may result in empty updates being sent
/// to subscribers in the case that all updated keys have been excluded by the filter.
/// These empty updates are important so that subscribers can keep track of the current
/// version number and report back their progress in receiving the topic contents.
///
/// +================+
/// | Implementation |
/// +================+
///
/// Locking:
/// --------
/// The lock acquisition order is:
/// 1. 'subscribers_lock_'
/// 2. 'topics_map_lock_'
/// 3. Subscriber::transient_entry_lock_
/// 4. Topic::lock_ (terminal)
class Statestore : public CacheLineAligned {
public:
/// A SubscriberId uniquely identifies a single subscriber, and is
/// provided by the subscriber at registration time.
typedef std::string SubscriberId;
/// A TopicId uniquely identifies a single topic
typedef std::string TopicId;
/// A TopicEntryKey uniquely identifies a single entry in a topic
typedef std::string TopicEntryKey;
/// The only constructor; initialises member variables only.
Statestore(MetricGroup* metrics);
/// Destructor, should not be called once the Statestore is initialized.
~Statestore();
/// Initialize and start the backing ThriftServer with port 'state_store_port'.
/// Initialize the ThreadPools used for updates and heartbeats. Returns an error if
/// any of the above initialization fails.
Status Init(int32_t state_store_port) WARN_UNUSED_RESULT;
/// Registers a new subscriber with the given unique subscriber ID, running a subscriber
/// service at the given location, with the provided list of topic subscriptions.
/// The registration_id output parameter is the unique ID for this registration, used to
/// distinguish old registrations from new ones for the same subscriber. On successful
/// registration, the subscriber is added to the update queue, with an immediate
/// schedule.
///
/// If a registration already exists for this subscriber, the old registration is
/// removed
/// and a new one is created. Subscribers may receive an update intended for the old
/// registration, since one may be in flight when a new RegisterSubscriber() is
/// received.
Status RegisterSubscriber(const SubscriberId& subscriber_id,
const TNetworkAddress& location,
const std::vector<TTopicRegistration>& topic_registrations,
RegistrationId* registration_id) WARN_UNUSED_RESULT;
void RegisterWebpages(Webserver* webserver);
/// The main processing loop. Runs infinitely.
void MainLoop();
/// Shut down some background threads. Only used for testing. Note that this is not
/// a clean shutdown because we can't correctly tear down 'thrift_server_', so
/// not all background threads are stopped and this object cannot be destroyed.
void ShutdownForTesting();
/// Returns the Thrift API interface that proxies requests onto the local Statestore.
const boost::shared_ptr<StatestoreServiceIf>& thrift_iface() const {
return thrift_iface_;
}
/// Names of prioritized topics that are handled in a separate threadpool. The topic
/// names are hardcoded here for expediency. Ideally we would have a more generic
/// interface for specifying prioritized topics, but for now we only have a small
/// fixed set of topics.
/// Topic tracking the set of live Impala daemon instances.
static const std::string IMPALA_MEMBERSHIP_TOPIC;
/// Topic tracking the state of admission control on all coordinators.
static const std::string IMPALA_REQUEST_QUEUE_TOPIC;
int32_t port() { return thrift_server_->port(); }
/// Amount of time in ms that it takes the statestore to decide that a executor is down
/// after it stops responding to heartbeats.
static int64_t FailedExecutorDetectionTimeMs();
private:
/// A TopicEntry is a single entry in a topic, and logically is a <string, byte string>
/// pair.
class TopicEntry {
public:
/// A Value is a string of bytes, for which std::string is a convenient representation.
typedef std::string Value;
/// A version is a monotonically increasing counter. Each update to a topic has its own
/// unique version with the guarantee that sequentially later updates have larger
/// version numbers.
typedef int64_t Version;
/// The Version value used to initialize a new TopicEntry.
static const Version TOPIC_ENTRY_INITIAL_VERSION = 1L;
/// Sets the value of this entry to the byte / length pair. The caller is responsible
/// for ensuring, if required, that the version parameter is larger than the
/// current version() TODO: Consider enforcing version monotonicity here.
void SetValue(const Value& bytes, Version version);
/// Sets a new version for this entry.
void SetVersion(Version version) { version_ = version; }
/// Sets the is_deleted_ flag for this entry.
void SetDeleted(bool is_deleted) { is_deleted_ = is_deleted; }
TopicEntry() : version_(TOPIC_ENTRY_INITIAL_VERSION),
is_deleted_(false) { }
const Value& value() const { return value_; }
uint64_t version() const { return version_; }
uint32_t length() const { return value_.size(); }
bool is_deleted() const { return is_deleted_; }
private:
/// Byte string value, owned by this TopicEntry. The value is opaque to the
/// statestore, and is interpreted only by subscribers.
Value value_;
/// The version of this entry. Every update is assigned a monotonically increasing
/// version number so that only the minimal set of changes can be sent from the
/// statestore to a subscriber.
Version version_;
/// Indicates if the entry has been deleted. If true, the entry will still be
/// retained to track changes to send to subscribers.
bool is_deleted_;
};
/// Map from TopicEntryKey to TopicEntry, maintained by a Topic object.
typedef boost::unordered_map<TopicEntryKey, TopicEntry> TopicEntryMap;
/// Map from Version to TopicEntryKey, maintained by a Topic object. Effectively a log of
/// the updates made to a Topic, ordered by version.
typedef std::map<TopicEntry::Version, TopicEntryKey> TopicUpdateLog;
/// A Topic is logically a map between a string key and a sequence of bytes. A <string,
/// bytes> pair is a TopicEntry.
//
/// Each topic has a unique version number that tracks the number of updates made to the
/// topic. This is to support sending only the delta of changes on every update.
class Topic {
public:
Topic(const TopicId& topic_id, IntGauge* key_size_metric,
IntGauge* value_size_metric, IntGauge* topic_size_metric)
: topic_id_(topic_id), last_version_(0L), total_key_size_bytes_(0L),
total_value_size_bytes_(0L), key_size_metric_(key_size_metric),
value_size_metric_(value_size_metric), topic_size_metric_(topic_size_metric) { }
/// Add entries with the given keys and values. If is_deleted is true for an entry,
/// it is considered deleted, and may be garbage collected in the future. Each entry
/// is assigned a new version number by the Topic, and the version numbers are
/// returned.
///
/// Safe to call concurrently from multiple threads (for different subscribers).
/// Acquires an exclusive write lock for the topic.
std::vector<TopicEntry::Version> Put(const std::vector<TTopicItem>& entries);
/// Deletes all the topic entries and updates the topic metrics. It doesn't
/// reset the last_version_ to ensure that versions are monotonically
/// increasing.
///
/// Safe to call concurrently from multiple threads (for different
/// subscribers). Acquires an exclusive lock for the topic.
void ClearAllEntries();
/// Utility method to support removing transient entries. We track the version numbers
/// of entries added by subscribers, and remove entries with the same version number
/// when that subscriber fails (the same entry may exist, but may have been updated by
/// another subscriber giving it a new version number)
//
/// Deletion means marking the entry as deleted and incrementing its version
/// number.
///
/// Safe to call concurrently from multiple threads (for different subscribers).
/// Acquires an exclusive write lock for the topic.
void DeleteIfVersionsMatch(TopicEntry::Version version, const TopicEntryKey& key);
/// Build a delta update to send to 'subscriber_id' including the deltas greater
/// than 'last_processed_version' (not inclusive). Only those items whose keys
/// start with 'filter_prefix' are included in the update.
///
/// Safe to call concurrently from multiple threads (for different subscribers).
/// Acquires a shared read lock for the topic.
void BuildDelta(const SubscriberId& subscriber_id,
TopicEntry::Version last_processed_version, const std::string& filter_prefix,
TTopicDelta* delta);
/// Adds entries representing the current topic state to 'topic_json'.
void ToJson(rapidjson::Document* document, rapidjson::Value* topic_json);
private:
/// Unique identifier for this topic. Should be human-readable.
const TopicId topic_id_;
/// Reader-writer lock to protect state below. This is a terminal lock - no
/// other locks should be acquired while holding this one. boost::shared_mutex
/// gives writers priority over readers in acquiring the lock, which prevents
/// starvation.
boost::shared_mutex lock_;
/// Map from topic entry key to topic entry.
TopicEntryMap entries_;
/// Tracks the last version that was assigned to an entry in this Topic. Incremented
/// every time an entry is added in Put() so each TopicEntry is tagged with a unique
/// version value.
TopicEntry::Version last_version_;
/// Contains a history of updates to this Topic, with each key being a Version and the
/// value being a TopicEntryKey. Used to look up the TopicEntry in the entries_ map
/// corresponding to each version update.
//
/// TODO: Looking up a TopicEntry from the topic_update_log_ requires two reads - one
/// to get the TopicEntryKey and another to use that key to look up the corresponding
/// TopicEntry in the entries_ map. Based on performance needs, we may need to revisit
/// this to find a way to do the look up using a single read.
TopicUpdateLog topic_update_log_;
/// Total memory occupied by the key strings, in bytes
int64_t total_key_size_bytes_;
/// Total memory occupied by the value byte strings, in bytes.
int64_t total_value_size_bytes_;
/// Metrics shared across all topics to sum the size in bytes of keys, values and both
IntGauge* key_size_metric_;
IntGauge* value_size_metric_;
IntGauge* topic_size_metric_;
};
/// Protects the 'topics_' map. Should be held shared when reading or holding a
/// reference to entries in the map and exclusively when modifying the map.
/// See the class comment for the lock acquisition order. boost::shared_mutex
/// gives writers priority over readers in acquiring the lock, which prevents
/// starvation.
boost::shared_mutex topics_map_lock_;
/// The entire set of topics tracked by the statestore
typedef boost::unordered_map<TopicId, Topic> TopicMap;
TopicMap topics_;
/// The statestore-side representation of an individual subscriber client, which tracks a
/// variety of bookkeeping information. This includes the list of subscribed topics (and
/// whether updates to them should be deleted on failure), the list of updates made by
/// this subscriber (in order to be able to efficiently delete them on failure), and the
/// subscriber's ID and network location.
class Subscriber {
public:
Subscriber(const SubscriberId& subscriber_id, const RegistrationId& registration_id,
const TNetworkAddress& network_address,
const std::vector<TTopicRegistration>& subscribed_topics);
/// Information about a subscriber's subscription to a specific topic.
struct TopicSubscription {
TopicSubscription(bool is_transient, bool populate_min_subscriber_topic_version,
std::string filter_prefix)
: is_transient(is_transient),
populate_min_subscriber_topic_version(populate_min_subscriber_topic_version),
filter_prefix(std::move(filter_prefix)) {}
/// Whether entries written by this subscriber should be considered transient.
const bool is_transient;
/// Whether min_subscriber_topic_version needs to be filled in for this
/// subscription.
const bool populate_min_subscriber_topic_version;
/// The prefix for which the subscriber wants to see updates.
const std::string filter_prefix;
/// The last topic entry version successfully processed by this subscriber. Only
/// written by a single thread at a time but can be read concurrently.
AtomicInt64 last_version{TOPIC_INITIAL_VERSION};
/// Map from the key to the version of a transient update made by this subscriber.
/// protected by Subscriber:: 'transient_entries_lock_'.
boost::unordered_map<TopicEntryKey, TopicEntry::Version> transient_entries_;
};
/// The set of topics subscribed to, and current state (as seen by this subscriber) of
/// the topic.
typedef boost::unordered_map<TopicId, TopicSubscription> Topics;
/// The Version value used to initialize new Topic subscriptions for this Subscriber.
static const TopicEntry::Version TOPIC_INITIAL_VERSION;
const Topics& non_priority_subscribed_topics() const {
return non_priority_subscribed_topics_;
}
const Topics& priority_subscribed_topics() const { return priority_subscribed_topics_; }
const TNetworkAddress& network_address() const { return network_address_; }
const SubscriberId& id() const { return subscriber_id_; }
const RegistrationId& registration_id() const { return registration_id_; }
/// Returns the time elapsed (in seconds) since the last heartbeat.
double SecondsSinceHeartbeat() const {
return (static_cast<double>(MonotonicMillis() - last_heartbeat_ts_.Load()))
/ 1000.0;
}
/// Get the Topics map that would be used to store 'topic_id'.
const Topics& GetTopicsMapForId(const TopicId& topic_id) const {
return IsPrioritizedTopic(topic_id) ? priority_subscribed_topics_
: non_priority_subscribed_topics_;
}
Topics* GetTopicsMapForId(const TopicId& topic_id) {
return IsPrioritizedTopic(topic_id) ? &priority_subscribed_topics_
: &non_priority_subscribed_topics_;
}
/// Records the fact that updates to this topic are owned by this subscriber. The
/// version number of each update (which must be at the corresponding index in
/// 'versions' is saved so that only those updates which are made most recently by
/// this subscriber - and not overwritten by another subscriber - are deleted on
/// failure. If the topic each entry belongs to is not marked as transient, no update
/// will be recorded. Should not be called concurrently from multiple threads for a
/// given 'topic_id'.
///
/// Returns false if DeleteAllTransientEntries() was called and 'topic_id' entries
/// are transient, in which case the caller should delete the entries themselves.
bool AddTransientEntries(const TopicId& topic_id,
const std::vector<TTopicItem>& entries,
const std::vector<TopicEntry::Version>& entry_versions) WARN_UNUSED_RESULT;
/// Delete all transient topic entries for this subscriber from 'global_topics'.
///
/// Statestore::topics_map_lock_ (in shared mode) must be held by the caller.
void DeleteAllTransientEntries(TopicMap* global_topics);
/// Returns the number of transient entries.
int64_t NumTransientEntries();
/// Returns the last version of the topic which this subscriber has successfully
/// processed. Will never decrease.
TopicEntry::Version LastTopicVersionProcessed(const TopicId& topic_id) const;
/// Sets the subscriber's last processed version of the topic to the given value. This
/// should only be set when once a subscriber has succesfully processed the given
/// update corresponding to this version. Should not be called concurrently from
/// multiple threads for a given 'topic_id'.
void SetLastTopicVersionProcessed(const TopicId& topic_id,
TopicEntry::Version version);
/// Refresh the subscriber's last heartbeat timestamp to the current monotonic time.
void RefreshLastHeartbeatTimestamp();
private:
/// Unique human-readable identifier for this subscriber, set by the subscriber itself
/// on a Register call.
const SubscriberId subscriber_id_;
/// Unique identifier for the current registration of this subscriber. A new
/// registration ID is handed out every time a subscriber successfully calls
/// RegisterSubscriber() to distinguish between distinct connections from subscribers
/// with the same subscriber_id_.
const RegistrationId registration_id_;
/// The location of the subscriber service that this subscriber runs.
const TNetworkAddress network_address_;
/// Maps of topic subscriptions to current TopicSubscription, with separate maps for
/// priority and non-priority topics. The state describes whether updates on the
/// topic are 'transient' (i.e., to be deleted upon subscriber failure) or not
/// and contains the version number of the last update processed by this Subscriber
/// on the topic. The set of keys is not modified after construction.
Topics priority_subscribed_topics_;
Topics non_priority_subscribed_topics_;
/// The timestamp of the last successful heartbeat in milliseconds. A timestamp much
/// older than the heartbeat frequency implies an unresponsive subscriber.
AtomicInt64 last_heartbeat_ts_{0};
/// Lock held when adding or deleting transient entries. See class comment for lock
/// acquisition order.
boost::mutex transient_entry_lock_;
/// True once DeleteAllTransientEntries() has been called during subscriber
/// unregisteration. Protected by 'transient_entry_lock_'
bool unregistered_ = false;
};
/// Protects access to subscribers_ and subscriber_uuid_generator_. See the class
/// comment for the lock acquisition order.
boost::mutex subscribers_lock_;
/// Map of subscribers currently connected; upon failure their entry is removed from this
/// map. Subscribers must only be removed by UnregisterSubscriber() which ensures that
/// the correct cleanup is done. If a subscriber re-registers, it must be unregistered
/// prior to re-entry into this map.
//
/// Subscribers are held in shared_ptrs so that RegisterSubscriber() may overwrite their
/// entry in this map while UpdateSubscriber() tries to update an existing registration
/// without risk of use-after-free.
typedef boost::unordered_map<SubscriberId, std::shared_ptr<Subscriber>>
SubscriberMap;
SubscriberMap subscribers_;
/// Used to generated unique IDs for each new registration.
boost::uuids::random_generator subscriber_uuid_generator_;
/// Work item passed to both kinds of subscriber update threads.
struct ScheduledSubscriberUpdate {
/// *Earliest* time (in Unix time) that the next message should be sent.
int64_t deadline;
/// SubscriberId and RegistrationId of the registered subscriber instance this message
/// is intended for.
SubscriberId subscriber_id;
RegistrationId registration_id;
ScheduledSubscriberUpdate() {}
ScheduledSubscriberUpdate(int64_t next_update_time, SubscriberId s_id,
RegistrationId r_id): deadline(next_update_time), subscriber_id(s_id),
registration_id(r_id) {}
};
/// The statestore has three pools of threads that send messages to subscribers
/// one-by-one. One pool deals with 'heartbeat' messages that update failure detection
/// state, and the remaining pools send 'topic update' messages that contain the
/// actual topic data that a subscriber does not yet have, with one pool dedicated to
/// a set of special "prioritized" topics.
///
/// Each message is scheduled for some time in the future and each worker thread
/// will sleep until that time has passed to rate-limit messages. Subscribers are
/// placed back into the queue once they have been processed. A subscriber may have many
/// entries in a queue, but no more than one for each registration associated with that
/// subscriber. Since at most one registration is considered 'live' per subscriber, this
/// guarantees that subscribers_.size() - 1 'live' subscribers ahead of any subscriber in
/// the queue.
///
/// Messages may be delayed for any number of reasons, including scheduler
/// interference, lock unfairness when submitting to the thread pool and head-of-line
/// blocking when threads are occupied sending messages to slow subscribers
/// (subscribers are not guaranteed to be in the queue in next-update order).
///
/// Delays for heartbeat messages can result in the subscriber that is kept waiting
/// assuming that the statestore has failed. Correct configuration of heartbeat message
/// frequency and subscriber timeout is therefore very important, and depends upon the
/// cluster size. See --statestore_heartbeat_frequency_ms and
/// --statestore_subscriber_timeout_seconds. We expect that the provided defaults will
/// work up to clusters of several hundred nodes.
///
/// Subscribers are therefore not processed in lock-step, and one subscriber may have
/// seen many more messages than another during the same interval (if the second
/// subscriber runs slow for any reason).
enum class UpdateKind {
TOPIC_UPDATE,
PRIORITY_TOPIC_UPDATE,
HEARTBEAT
};
ThreadPool<ScheduledSubscriberUpdate> subscriber_topic_update_threadpool_;
ThreadPool<ScheduledSubscriberUpdate> subscriber_priority_topic_update_threadpool_;
ThreadPool<ScheduledSubscriberUpdate> subscriber_heartbeat_threadpool_;
/// Thread that monitors the heartbeats of all subscribers.
std::unique_ptr<Thread> heartbeat_monitoring_thread_;
/// Flag to indicate that the statestore has been initialized.
bool initialized_ = false;
/// Cache of subscriber clients used for UpdateState() RPCs. Only one client per
/// subscriber should be used, but the cache helps with the client lifecycle on failure.
boost::scoped_ptr<StatestoreSubscriberClientCache> update_state_client_cache_;
/// Cache of subscriber clients used for Heartbeat() RPCs. Separate from
/// update_state_client_cache_ because we enable TCP-level timeouts for these calls,
/// whereas they are not safe for UpdateState() RPCs which can take an unbounded amount
/// of time.
boost::scoped_ptr<StatestoreSubscriberClientCache> heartbeat_client_cache_;
/// Container for the internal statestore service.
boost::scoped_ptr<ThriftServer> thrift_server_;
/// Pointer to the MetricGroup for this statestore. Not owned.
MetricGroup* metrics_;
/// Thrift API implementation which proxies requests onto this Statestore
boost::shared_ptr<StatestoreServiceIf> thrift_iface_;
/// Failure detector for subscribers. If a subscriber misses a configurable number of
/// consecutive heartbeat messages, it is considered failed and a) its transient topic
/// entries are removed and b) its entry in the subscriber map is erased. The
/// subscriber ID is used to identify peers for failure detection purposes. Subscriber
/// state is evicted from the failure detector when the subscriber is unregistered,
/// so old subscribers do not occupy memory and the failure detection state does not
/// carry over to any new registrations of the previous subscriber.
boost::scoped_ptr<MissedHeartbeatFailureDetector> failure_detector_;
/// Metric that track the registered, non-failed subscribers.
IntGauge* num_subscribers_metric_;
SetMetric<std::string>* subscriber_set_metric_;
/// Metrics shared across all topics to sum the size in bytes of keys, values and both
IntGauge* key_size_metric_;
IntGauge* value_size_metric_;
IntGauge* topic_size_metric_;
/// Tracks the distribution of topic-update durations for regular and prioritized topic
/// updates. This measures the time spent in calling the UpdateState() RPC which
/// includes network transmission cost and subscriber-side processing time.
StatsMetric<double>* topic_update_duration_metric_;
StatsMetric<double>* priority_topic_update_duration_metric_;
/// Same as above, but for SendHeartbeat() RPCs.
StatsMetric<double>* heartbeat_duration_metric_;
/// Utility method to add an update to the given thread pool, and to fail if the thread
/// pool is already at capacity. Assumes that subscribers_lock_ is held by the caller.
Status OfferUpdate(const ScheduledSubscriberUpdate& update,
ThreadPool<ScheduledSubscriberUpdate>* thread_pool) WARN_UNUSED_RESULT;
/// Sends either a heartbeat or topic update message to the subscriber in 'update' at
/// the closest possible time to the first member of 'update'. If 'update_kind' is
/// HEARTBEAT, sends a heartbeat update, otherwise the set of priority/non-priority
/// pending topic updates is sent. Once complete, the next update is scheduled and
/// added to the appropriate queue.
void DoSubscriberUpdate(UpdateKind update_kind, int thread_id,
const ScheduledSubscriberUpdate& update);
/// Does the work of updating a single subscriber, by calling UpdateState() on the client
/// to send a list of topic deltas to the subscriber. If that call fails (either because
/// the RPC could not be completed, or the subscriber indicated an error), this method
/// returns a non-OK status immediately without further processing.
///
/// The subscriber may indicated that it skipped processing the message, either because
/// it was not ready to do so or because it was busy. In that case, the UpdateState() RPC
/// will return OK (since there was no error) and the output parameter update_skipped is
/// set to true. Otherwise, any updates returned by the subscriber are applied to their
/// target topics.
Status SendTopicUpdate(Subscriber* subscriber, UpdateKind update_kind,
bool* update_skipped) WARN_UNUSED_RESULT;
/// Sends a heartbeat message to subscriber. Returns false if there was some error
/// performing the RPC.
Status SendHeartbeat(Subscriber* subscriber) WARN_UNUSED_RESULT;
/// Returns true (and sets subscriber to the corresponding Subscriber object) if a
/// registered subscriber exists in the subscribers_ map with the given subscriber_id
/// and registration_id. False otherwise.
bool FindSubscriber(const SubscriberId& subscriber_id,
const RegistrationId& registration_id, std::shared_ptr<Subscriber>* subscriber)
WARN_UNUSED_RESULT;
/// Unregister a subscriber, removing all of its transient entries and evicting it from
/// the subscriber map. Callers must hold subscribers_lock_ prior to calling this
/// method.
void UnregisterSubscriber(Subscriber* subscriber);
/// Populates a TUpdateStateRequest with the update state for this subscriber. Iterates
/// over all updates in all priority or non-priority subscribed topics, based on
/// 'update_kind'. The given TUpdateStateRequest object is populated with the
/// changes to the subscribed topics. Takes the topics_map_lock_ and subscribers_lock_.
void GatherTopicUpdates(const Subscriber& subscriber, UpdateKind update_kind,
TUpdateStateRequest* update_state_request);
/// Returns the minimum last processed topic version across all subscribers for the given
/// topic ID. Calculated by enumerating all subscribers and looking at their
/// LastTopicVersionProcessed() for this topic. The value returned will always be <=
/// topics_[topic_id].last_version_. Returns TOPIC_INITIAL_VERSION if no subscribers are
/// registered to the topic. The subscriber ID to whom the min version belongs can also
/// be retrieved using the optional subscriber_id output parameter. If multiple
/// subscribers have the same min version, the subscriber_id may be set to any one of the
/// matching subscribers.
//
/// Must be called holding the subscribers_ lock.
//
/// TODO: Update the min subscriber version only when a topic is updated, rather than
/// each time a subscriber is updated. One way to do this would be to keep a priority
/// queue in Topic of each subscriber's last processed version of the topic.
TopicEntry::Version GetMinSubscriberTopicVersion(
const TopicId& topic_id, SubscriberId* subscriber_id = NULL);
/// Returns true if this topic should be handled by the priority pool.
static bool IsPrioritizedTopic(const std::string& topic);
/// Return human-readable name for 'kind'.
static const char* GetUpdateKindName(UpdateKind kind);
/// Return the thread pool to process updates of 'kind'.
ThreadPool<ScheduledSubscriberUpdate>* GetThreadPool(UpdateKind kind);
/// Webpage handler: upon return, 'document' will contain a list of topics as follows:
/// "topics": [
/// {
/// "topic_id": "catalog-update",
/// "num_entries": 1165,
/// "version": 2476,
/// "oldest_version": 2476,
/// "oldest_id": "henry-impala:26000",
/// "key_size": "42.94 KB",
/// "value_size": "9.54 MB",
/// "total_size": "9.58 MB"
/// }, ]
void TopicsHandler(const Webserver::WebRequest& req, rapidjson::Document* document);
/// Webpage handler: upon return 'document' will contain a list of subscribers as
/// follows:
/// "subscribers": [
/// {
/// "id": "henry-impala:26000",
/// "address": "henry-impala:23020",
/// "num_topics": 1,
/// "num_transient": 0,
/// "registration_id": "414d28c84930d987:abcffd70b3346fb7"
/// }
/// ]
void SubscribersHandler(const Webserver::WebRequest& req,
rapidjson::Document* document);
/// Monitors the heartbeats of all subscribers every
/// FLAGS_heartbeat_monitoring_frequency_ms milliseconds. If a subscriber's
/// last_heartbeat_ts_ has not been updated in that interval, it logs the subscriber's
/// id.
[[noreturn]] void MonitorSubscriberHeartbeat();
};
} // namespace impala