| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <mutex> |
| #include <string> |
| |
| #include <boost/scoped_ptr.hpp> |
| #include <boost/shared_ptr.hpp> |
| #include <boost/thread/shared_mutex.hpp> |
| |
| #include "gen-cpp/StatestoreService.h" |
| #include "gen-cpp/StatestoreSubscriber.h" |
| #include "rpc/thrift-client.h" |
| #include "rpc/thrift-util.h" |
| #include "statestore/statestore.h" |
| #include "statestore/statestore-service-client-wrapper.h" |
| #include "util/stopwatch.h" |
| #include "gen-cpp/StatestoreService.h" |
| #include "gen-cpp/StatestoreSubscriber.h" |
| |
| namespace impala { |
| |
| class Status; |
| class TimeoutFailureDetector; |
| class Thread; |
| class ThriftServer; |
| class TNetworkAddress; |
| |
| typedef ClientCache<StatestoreServiceClientWrapper> StatestoreClientCache; |
| |
| /// A StatestoreSubscriber communicates with a statestore periodically through the exchange |
| /// of topic update messages. These messages contain updates from the statestore to a list |
| /// of 'topics' that the subscriber is interested in; in response the subscriber sends a |
| /// list of changes that it wishes to make to a topic. The statestore also sends more |
| /// frequent 'heartbeat' messages that confirm the connection between statestore and |
| /// subscriber is still active. |
| // |
| /// Clients of the subscriber register topics of interest, and a function to call once an |
| /// update has been received. Each callback may optionally add one or more updates to a |
| /// list of topic updates to be sent back to the statestore. See AddTopic for the |
| /// requirements placed on these callbacks. |
| // |
| /// Topics must be subscribed to before the subscriber is connected to the statestore: |
| /// there is no way to add a new subscription after the subscriber has successfully |
| /// registered. |
| // |
| /// If the subscriber does not receive heartbeats from the statestore within a configurable |
| /// period of time, the subscriber enters 'recovery mode', where it continually attempts to |
| /// re-register with the statestore. Recovery mode is not triggered if a heartbeat takes a |
| /// long time to process locally. |
| class StatestoreSubscriber { |
| public: |
| /// Only constructor. |
| /// subscriber_id - should be unique across the cluster, identifies this subscriber |
| // |
| /// heartbeat_address - the local address on which the heartbeat service which |
| /// communicates with the statestore should be started. |
| // |
| /// statestore_address - the address of the statestore to register with |
| StatestoreSubscriber(const std::string& subscriber_id, |
| const TNetworkAddress& heartbeat_address, |
| const TNetworkAddress& statestore_address, |
| MetricGroup* metrics); |
| |
| /// A TopicDeltaMap is passed to each callback. See UpdateCallback for more details. |
| typedef std::map<Statestore::TopicId, TTopicDelta> TopicDeltaMap; |
| |
| /// Function called to update a service with new state. Called in a |
| /// separate thread to the one in which it is registered. |
| // |
| /// Every UpdateCallback is invoked every time that an update for the |
| /// topic is received from the statestore. Therefore the callback should |
| /// not assume that the TopicDeltaMap contains an entry for their |
| /// particular topic of interest. |
| // |
| /// If a delta for a particular topic does not have the 'is_delta' |
| /// flag set, clients should assume that the delta contains the |
| /// entire known state for that topic. This occurs particularly after |
| /// statestore failure, and usually clients will need to republish |
| /// any local state that is missing. |
| // |
| /// Callbacks may publish new updates to any topic via the |
| /// topic_updates parameter, although updates for unknown topics |
| /// (i.e. those with no subscribers) will be ignored. |
| typedef boost::function<void (const TopicDeltaMap& state, |
| std::vector<TTopicDelta>* topic_updates)> UpdateCallback; |
| |
| /// Adds a topic to the set of topics that updates will be received |
| /// for. When a topic update is received, the supplied UpdateCallback |
| /// will be invoked. Therefore clients should ensure that it is safe |
| /// to invoke callback for the entire lifetime of the subscriber; in |
| /// particular this means that the subscriber should be torn-down |
| /// before any objects that own callbacks. |
| // |
| /// Must be called before Start(), in which case it will return |
| /// Status::OK. Otherwise an error will be returned. |
| Status AddTopic(const Statestore::TopicId& topic_id, bool is_transient, |
| bool populate_min_subscriber_topic_version, std::string filter_prefix, |
| const UpdateCallback& callback); |
| |
| /// Registers this subscriber with the statestore, and starts the |
| /// heartbeat service, as well as a thread to check for failure and |
| /// initiate recovery mode. |
| // |
| /// Returns OK unless some error occurred, like a failure to connect. |
| Status Start(); |
| |
| /// Return the port that the subscriber is listening on. |
| int heartbeat_port() const { return heartbeat_address_.port; } |
| |
| const std::string& id() const { return subscriber_id_; } |
| |
| /// Returns true if the statestore has recovered and the configurable post-recovery |
| /// grace period has not yet elapsed. |
| bool IsInPostRecoveryGracePeriod() const; |
| |
| private: |
| /// Unique, but opaque, identifier for this subscriber. |
| const std::string subscriber_id_; |
| |
| /// Address of the statestore |
| TNetworkAddress statestore_address_; |
| |
| /// Implementation of the heartbeat thrift interface, which proxies |
| /// calls onto this object. |
| boost::shared_ptr<StatestoreSubscriberIf> thrift_iface_; |
| |
| /// Container for the heartbeat server. |
| std::shared_ptr<ThriftServer> heartbeat_server_; |
| |
| /// Failure detector that tracks heartbeat messages from the statestore. |
| boost::scoped_ptr<impala::TimeoutFailureDetector> failure_detector_; |
| |
| /// Thread in which RecoveryModeChecker runs. |
| std::unique_ptr<Thread> recovery_mode_thread_; |
| |
| /// statestore client cache - only one client is ever used. Initialized in constructor. |
| /// The StatestoreClientCache is created with num_retries = 1 and wait_ms = 0. |
| /// Connections are still retried, but the retry mechanism is driven by DoRpcWithRetry. |
| /// Clients should always use DoRpcWithRetry rather than DoRpc to ensure that both RPCs |
| /// and connections are retried. |
| boost::scoped_ptr<StatestoreClientCache> client_cache_; |
| |
| /// MetricGroup instance that all metrics are registered in. Not owned by this class. |
| MetricGroup* metrics_; |
| |
| /// Metric to indicate if we are successfully registered with the statestore |
| BooleanProperty* connected_to_statestore_metric_; |
| |
| /// Metric to count the total number of connection failures to the statestore |
| IntCounter* connection_failure_metric_; |
| |
| /// Amount of time last spent in recovery mode |
| DoubleGauge* last_recovery_duration_metric_; |
| |
| /// When the last recovery happened. |
| StringProperty* last_recovery_time_metric_; |
| |
| /// Accumulated statistics on the frequency of topic-update messages, including samples |
| /// from all topics. |
| StatsMetric<double>* topic_update_interval_metric_; |
| |
| /// Accumulated statistics on the time taken to process each topic-update message from |
| /// the statestore (that is, to call all callbacks) |
| StatsMetric<double>* topic_update_duration_metric_; |
| |
| /// Accumulated statistics on the frequency of heartbeat messages |
| StatsMetric<double>* heartbeat_interval_metric_; |
| |
| /// Tracks the time between heartbeat messages. Only updated by Heartbeat(), which |
| /// should not run concurrently with itself. Use a ConcurrentStopWatch because the |
| /// watch is started in one thread, but read in another. |
| ConcurrentStopWatch heartbeat_interval_timer_; |
| |
| /// Current registration ID, in string form. |
| StringProperty* registration_id_metric_; |
| |
| /// Object-wide lock that protects the below members. Must be held exclusively when |
| /// modifying the members, except when modifying TopicRegistrations - see |
| /// TopicRegistration::update_lock for details of locking there. Held in shared mode |
| /// when processing topic updates to prevent concurrent updates to other state. Most |
| /// private methods must be called holding this lock; this is noted in the method |
| /// comments. |
| boost::shared_mutex lock_; |
| |
| /// Address that the heartbeat service should be started on. Initialised in constructor, |
| /// updated in Start() with the actual port if the wildcard port 0 was specified. |
| /// If FLAGS_statestore_subscriber_use_resolved_address is true, this is set to the |
| /// resolved IP address in Start(). |
| TNetworkAddress heartbeat_address_; |
| |
| /// Set to true after Register(...) is successful, after which no |
| /// more topics may be subscribed to. |
| bool is_registered_; |
| |
| /// Protects registration_id_. Must be taken after lock_ if both are to be taken |
| /// together. |
| std::mutex registration_id_lock_; |
| |
| /// Set during Register(), this is the unique ID of the current registration with the |
| /// statestore. If this subscriber must recover, or disconnects and then reconnects, the |
| /// registration_id_ will change after Register() is called again. This allows the |
| /// subscriber to reject communication from the statestore that pertains to a previous |
| /// registration. |
| RegistrationId registration_id_; |
| |
| /// Monotonic timestamp of the last successful registration. |
| AtomicInt64 last_registration_ms_{0}; |
| |
| struct TopicRegistration { |
| /// Held when processing a topic update. 'StatestoreSubscriber::lock_' must be held in |
| /// shared mode before acquiring this lock. If taking multiple update locks, they must |
| /// be acquired in ascending order of topic name. |
| std::mutex update_lock; |
| |
| /// Whether the subscriber considers this topic to be "transient", that is any updates |
| /// it makes will be deleted upon failure or disconnection. |
| bool is_transient = false; |
| |
| /// Whether this subscriber needs the min_subscriber_topic_version field to be filled |
| /// in on updates. |
| bool populate_min_subscriber_topic_version = false; |
| |
| /// Only subscribe to keys with the provided prefix. |
| std::string filter_prefix; |
| |
| /// The last version of the topic this subscriber processed. |
| /// -1 if no updates have been processed yet. |
| int64_t current_topic_version = -1; |
| |
| /// Owned by the MetricGroup instance. Tracks how long callbacks took to process this |
| /// topic. |
| StatsMetric<double>* processing_time_metric = nullptr; |
| |
| /// Tracks the time between topic-update messages to update 'update_interval_metric'. |
| MonotonicStopWatch update_interval_timer; |
| |
| /// Owned by the MetricGroup instances. Tracks the time between the end of the last |
| /// update RPC for this topic and the start of the next. |
| StatsMetric<double>* update_interval_metric = nullptr; |
| |
| /// Callback for all services that have registered for updates. |
| std::vector<UpdateCallback> callbacks; |
| }; |
| |
| /// One entry for every topic subscribed to. 'lock_' must be held exclusively to add or |
| /// remove entries from the map or held as a shared lock to lookup entries in the map. |
| /// Modifications to the contents of each TopicRegistration is protected by |
| /// TopicRegistration::update_lock. |
| boost::unordered_map<Statestore::TopicId, TopicRegistration> topic_registrations_; |
| |
| /// Subscriber thrift implementation, needs to access UpdateState |
| friend class StatestoreSubscriberThriftIf; |
| |
| /// Called when the statestore sends a topic update. Each registered callback is called |
| /// in turn with the given map of incoming_topic_deltas from the statestore. Each |
| /// TTopicDelta sent from the statestore to the subscriber will contain the topic name, a |
| /// list of additions to the topic, a list of deletions from the topic, and the version |
| /// range the update covers. A from_version of 0 indicates a non-delta update. In |
| /// response, any updates to the topic by the subscriber are aggregated in |
| /// subscriber_topic_updates and returned to the statestore. Each update is a TTopicDelta |
| /// that contains a list of additions to the topic and a list of deletions from the |
| /// topic. Additionally, if a subscriber has received an unexpected delta update version |
| /// range, they can request a new delta update by setting the "from_version" field of the |
| /// TTopicDelta response. The next statestore update will be based off the version the |
| /// subscriber responds with. If the subscriber is in recovery mode, this method returns |
| /// immediately. |
| // |
| /// Returns an error if some error was encountered (e.g. the supplied registration ID was |
| /// unexpected), and OK otherwise. The output parameter 'skipped' is set to true if the |
| /// subscriber chose not to process this topic-update (if, for example, a concurrent |
| /// update was being processed, or if the subscriber currently believes it is |
| /// recovering). Doing so indicates that no topics were updated during this call. |
| Status UpdateState(const TopicDeltaMap& incoming_topic_deltas, |
| const RegistrationId& registration_id, |
| std::vector<TTopicDelta>* subscriber_topic_updates, bool* skipped); |
| |
| /// Called when the statestore sends a heartbeat message. Updates the failure detector. |
| void Heartbeat(const RegistrationId& registration_id); |
| |
| /// Run in a separate thread. In a loop, check failure_detector_ to see if the statestore |
| /// is still sending heartbeat messages. If not, enter 'recovery mode' where a |
| /// reconnection is repeatedly attempted. Once reconnected, all existing subscriptions |
| /// and services are reregistered and normal operation resumes. |
| // |
| /// During recovery mode, any public methods that are started will block on lock_, which |
| /// is only released when recovery finishes. In practice, all registrations are made |
| /// early in the life of an impalad before the statestore could be detected as failed. |
| [[noreturn]] void RecoveryModeChecker(); |
| |
| /// Creates a client of the remote statestore and sends a list of |
| /// topics to register for. Returns OK unless there is some problem |
| /// connecting, or the statestore reports an error. |
| Status Register(); |
| |
| /// Returns OK if registration_id == registration_id_, or if registration_id_ is not yet |
| /// set, an error otherwise. Used to confirm that RPCs from the statestore are intended |
| /// for the current registration epoch. |
| Status CheckRegistrationId(const RegistrationId& registration_id); |
| |
| int64_t MilliSecondsSinceLastRegistration() const { |
| return MonotonicMillis() - last_registration_ms_.Load(); |
| } |
| }; |
| } |