blob: 2d93c5f889ba4c29120623538a9394e635014b01 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "statestore/statestore.h"
#include <boost/lexical_cast.hpp>
#include <boost/thread.hpp>
#include <thrift/Thrift.h>
#include <gutil/strings/substitute.h>
#include "common/status.h"
#include "gen-cpp/StatestoreService_types.h"
#include "rpc/thrift-util.h"
#include "statestore/failure-detector.h"
#include "statestore/statestore-subscriber-client-wrapper.h"
#include "util/debug-util.h"
#include "util/logging-support.h"
#include "util/time.h"
#include "util/uid-util.h"
#include "util/webserver.h"
#include "common/names.h"
using namespace apache::thrift;
using namespace impala;
using namespace rapidjson;
using namespace strings;
DEFINE_int32(statestore_max_missed_heartbeats, 10, "Maximum number of consecutive "
"heartbeat messages an impalad can miss before being declared failed by the "
"statestore.");
DEFINE_int32(statestore_num_update_threads, 10, "(Advanced) Number of threads used to "
" send topic updates in parallel to all registered subscribers.");
DEFINE_int32(statestore_update_frequency_ms, 2000, "(Advanced) Frequency (in ms) with"
" which the statestore sends topic updates to subscribers.");
DEFINE_int32(statestore_num_heartbeat_threads, 10, "(Advanced) Number of threads used to "
" send heartbeats in parallel to all registered subscribers.");
DEFINE_int32(statestore_heartbeat_frequency_ms, 1000, "(Advanced) Frequency (in ms) with"
" which the statestore sends heartbeat heartbeats to subscribers.");
DEFINE_int32(state_store_port, 24000, "port where StatestoreService is running");
DEFINE_int32(statestore_heartbeat_tcp_timeout_seconds, 3, "(Advanced) The time after "
"which a heartbeat RPC to a subscriber will timeout. This setting protects against "
"badly hung machines that are not able to respond to the heartbeat RPC in short "
"order");
// If this value is set too low, it's possible that UpdateState() might timeout during a
// working invocation, and only a restart of the statestore with a change in value would
// allow progress to be made. If set too high, a hung subscriber will waste an update
// thread for much longer than it needs to. We choose 5 minutes as a safe default because
// large catalogs can take a very long time to process, but rarely more than a minute. The
// loss of a single thread for five minutes should usually be invisible to the user; if
// there is a correlated set of machine hangs that exhausts most threads the cluster can
// already be said to be in a bad state. Note that the heartbeat mechanism will still
// evict those subscribers, so many queries will continue to operate.
DEFINE_int32(statestore_update_tcp_timeout_seconds, 300, "(Advanced) The time after "
"which an update RPC to a subscriber will timeout. This setting protects against "
"badly hung machines that are not able to respond to the update RPC in short "
"order.");
// Metric keys
// TODO: Replace 'backend' with 'subscriber' when we can coordinate a change with CM
const string STATESTORE_LIVE_SUBSCRIBERS = "statestore.live-backends";
const string STATESTORE_LIVE_SUBSCRIBERS_LIST = "statestore.live-backends.list";
const string STATESTORE_TOTAL_KEY_SIZE_BYTES = "statestore.total-key-size-bytes";
const string STATESTORE_TOTAL_VALUE_SIZE_BYTES = "statestore.total-value-size-bytes";
const string STATESTORE_TOTAL_TOPIC_SIZE_BYTES = "statestore.total-topic-size-bytes";
const string STATESTORE_UPDATE_DURATION = "statestore.topic-update-durations";
const string STATESTORE_HEARTBEAT_DURATION = "statestore.heartbeat-durations";
const Statestore::TopicEntry::Value Statestore::TopicEntry::NULL_VALUE = "";
// Initial version for each Topic registered by a Subscriber. Generally, the Topic will
// have a Version that is the MAX() of all entries in the Topic, but this initial
// value needs to be less than TopicEntry::TOPIC_ENTRY_INITIAL_VERSION to distinguish
// between the case where a Topic is empty and the case where the Topic only contains
// an item with the initial version.
const Statestore::TopicEntry::Version Statestore::Subscriber::TOPIC_INITIAL_VERSION = 0;
// Used to control the maximum size of the pending topic-update queue, in which there is
// at most one entry per subscriber.
const int32_t STATESTORE_MAX_SUBSCRIBERS = 10000;
// Updates or heartbeats that miss their deadline by this much are logged.
const uint32_t DEADLINE_MISS_THRESHOLD_MS = 2000;
typedef ClientConnection<StatestoreSubscriberClientWrapper> StatestoreSubscriberConn;
class StatestoreThriftIf : public StatestoreServiceIf {
public:
StatestoreThriftIf(Statestore* statestore)
: statestore_(statestore) {
DCHECK(statestore_ != NULL);
}
virtual void RegisterSubscriber(TRegisterSubscriberResponse& response,
const TRegisterSubscriberRequest& params) {
RegistrationId registration_id;
Status status = statestore_->RegisterSubscriber(params.subscriber_id,
params.subscriber_location, params.topic_registrations, &registration_id);
status.ToThrift(&response.status);
response.__set_registration_id(registration_id);
}
private:
Statestore* statestore_;
};
void Statestore::TopicEntry::SetValue(const Statestore::TopicEntry::Value& bytes,
TopicEntry::Version version) {
DCHECK(bytes == Statestore::TopicEntry::NULL_VALUE || bytes.size() > 0);
value_ = bytes;
version_ = version;
}
Statestore::TopicEntry::Version Statestore::Topic::Put(const string& key,
const Statestore::TopicEntry::Value& bytes) {
TopicEntryMap::iterator entry_it = entries_.find(key);
int64_t key_size_delta = 0;
int64_t value_size_delta = 0;
if (entry_it == entries_.end()) {
entry_it = entries_.insert(make_pair(key, TopicEntry())).first;
key_size_delta += key.size();
} else {
// Delete the old item from the version history. There is no need to search the
// version_history because there should only be at most a single item in the history
// at any given time
topic_update_log_.erase(entry_it->second.version());
value_size_delta -= entry_it->second.value().size();
}
value_size_delta += bytes.size();
entry_it->second.SetValue(bytes, ++last_version_);
topic_update_log_.insert(make_pair(entry_it->second.version(), key));
total_key_size_bytes_ += key_size_delta;
total_value_size_bytes_ += value_size_delta;
DCHECK_GE(total_key_size_bytes_, static_cast<int64_t>(0));
DCHECK_GE(total_value_size_bytes_, static_cast<int64_t>(0));
key_size_metric_->Increment(key_size_delta);
value_size_metric_->Increment(value_size_delta);
topic_size_metric_->Increment(key_size_delta + value_size_delta);
return entry_it->second.version();
}
void Statestore::Topic::DeleteIfVersionsMatch(TopicEntry::Version version,
const Statestore::TopicEntryKey& key) {
TopicEntryMap::iterator entry_it = entries_.find(key);
if (entry_it != entries_.end() && entry_it->second.version() == version) {
// Add a new entry with the the version history for this deletion and remove the old
// entry
topic_update_log_.erase(version);
topic_update_log_.insert(make_pair(++last_version_, key));
total_value_size_bytes_ -= entry_it->second.value().size();
DCHECK_GE(total_value_size_bytes_, static_cast<int64_t>(0));
value_size_metric_->Increment(entry_it->second.value().size());
topic_size_metric_->Increment(entry_it->second.value().size());
entry_it->second.SetValue(Statestore::TopicEntry::NULL_VALUE, last_version_);
}
}
Statestore::Subscriber::Subscriber(const SubscriberId& subscriber_id,
const RegistrationId& registration_id, const TNetworkAddress& network_address,
const vector<TTopicRegistration>& subscribed_topics)
: subscriber_id_(subscriber_id),
registration_id_(registration_id),
network_address_(network_address) {
for (const TTopicRegistration& topic: subscribed_topics) {
TopicState topic_state;
topic_state.is_transient = topic.is_transient;
topic_state.last_version = TOPIC_INITIAL_VERSION;
subscribed_topics_[topic.topic_name] = topic_state;
}
}
void Statestore::Subscriber::AddTransientUpdate(const TopicId& topic_id,
const TopicEntryKey& topic_key, TopicEntry::Version version) {
// Only record the update if the topic is transient
const Topics::const_iterator topic_it = subscribed_topics_.find(topic_id);
DCHECK(topic_it != subscribed_topics_.end());
if (topic_it->second.is_transient == true) {
transient_entries_[make_pair(topic_id, topic_key)] = version;
}
}
Statestore::TopicEntry::Version Statestore::Subscriber::LastTopicVersionProcessed(
const TopicId& topic_id) const {
Topics::const_iterator itr = subscribed_topics_.find(topic_id);
return itr == subscribed_topics_.end() ?
TOPIC_INITIAL_VERSION : itr->second.last_version;
}
void Statestore::Subscriber::SetLastTopicVersionProcessed(const TopicId& topic_id,
TopicEntry::Version version) {
subscribed_topics_[topic_id].last_version = version;
}
Statestore::Statestore(MetricGroup* metrics)
: exit_flag_(false),
subscriber_topic_update_threadpool_("statestore-update",
"subscriber-update-worker",
FLAGS_statestore_num_update_threads,
STATESTORE_MAX_SUBSCRIBERS,
bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this, false, _1, _2)),
subscriber_heartbeat_threadpool_("statestore-heartbeat",
"subscriber-heartbeat-worker",
FLAGS_statestore_num_heartbeat_threads,
STATESTORE_MAX_SUBSCRIBERS,
bind<void>(mem_fn(&Statestore::DoSubscriberUpdate), this, true, _1, _2)),
update_state_client_cache_(new StatestoreSubscriberClientCache(1, 0,
FLAGS_statestore_update_tcp_timeout_seconds * 1000,
FLAGS_statestore_update_tcp_timeout_seconds * 1000, "",
EnableInternalSslConnections())),
heartbeat_client_cache_(new StatestoreSubscriberClientCache(1, 0,
FLAGS_statestore_heartbeat_tcp_timeout_seconds * 1000,
FLAGS_statestore_heartbeat_tcp_timeout_seconds * 1000, "",
EnableInternalSslConnections())),
thrift_iface_(new StatestoreThriftIf(this)),
failure_detector_(new MissedHeartbeatFailureDetector(
FLAGS_statestore_max_missed_heartbeats,
FLAGS_statestore_max_missed_heartbeats / 2)) {
DCHECK(metrics != NULL);
num_subscribers_metric_ =
metrics->AddGauge<int64_t>(STATESTORE_LIVE_SUBSCRIBERS, 0);
subscriber_set_metric_ = SetMetric<string>::CreateAndRegister(metrics,
STATESTORE_LIVE_SUBSCRIBERS_LIST, set<string>());
key_size_metric_ = metrics->AddGauge<int64_t>(STATESTORE_TOTAL_KEY_SIZE_BYTES, 0);
value_size_metric_ = metrics->AddGauge<int64_t>(STATESTORE_TOTAL_VALUE_SIZE_BYTES, 0);
topic_size_metric_ = metrics->AddGauge<int64_t>(STATESTORE_TOTAL_TOPIC_SIZE_BYTES, 0);
topic_update_duration_metric_ =
StatsMetric<double>::CreateAndRegister(metrics, STATESTORE_UPDATE_DURATION);
heartbeat_duration_metric_ =
StatsMetric<double>::CreateAndRegister(metrics, STATESTORE_HEARTBEAT_DURATION);
update_state_client_cache_->InitMetrics(metrics, "subscriber-update-state");
heartbeat_client_cache_->InitMetrics(metrics, "subscriber-heartbeat");
}
Status Statestore::Init() {
RETURN_IF_ERROR(subscriber_topic_update_threadpool_.Init());
RETURN_IF_ERROR(subscriber_heartbeat_threadpool_.Init());
return Status::OK();
}
void Statestore::RegisterWebpages(Webserver* webserver) {
Webserver::UrlCallback topics_callback =
bind<void>(mem_fn(&Statestore::TopicsHandler), this, _1, _2);
webserver->RegisterUrlCallback("/topics", "statestore_topics.tmpl",
topics_callback);
Webserver::UrlCallback subscribers_callback =
bind<void>(&Statestore::SubscribersHandler, this, _1, _2);
webserver->RegisterUrlCallback("/subscribers", "statestore_subscribers.tmpl",
subscribers_callback);
RegisterLogLevelCallbacks(webserver, false);
}
void Statestore::TopicsHandler(const Webserver::ArgumentMap& args,
Document* document) {
lock_guard<mutex> l(subscribers_lock_);
lock_guard<mutex> t(topic_lock_);
Value topics(kArrayType);
for (const TopicMap::value_type& topic: topics_) {
Value topic_json(kObjectType);
Value topic_id(topic.second.id().c_str(), document->GetAllocator());
topic_json.AddMember("topic_id", topic_id, document->GetAllocator());
topic_json.AddMember("num_entries",
static_cast<uint64_t>(topic.second.entries().size()),
document->GetAllocator());
topic_json.AddMember(
"version", topic.second.last_version(), document->GetAllocator());
SubscriberId oldest_subscriber_id;
TopicEntry::Version oldest_subscriber_version =
GetMinSubscriberTopicVersion(topic.first, &oldest_subscriber_id);
topic_json.AddMember("oldest_version", oldest_subscriber_version,
document->GetAllocator());
Value oldest_id(oldest_subscriber_id.c_str(), document->GetAllocator());
topic_json.AddMember("oldest_id", oldest_id, document->GetAllocator());
int64_t key_size = topic.second.total_key_size_bytes();
int64_t value_size = topic.second.total_value_size_bytes();
Value key_size_json(PrettyPrinter::Print(key_size, TUnit::BYTES).c_str(),
document->GetAllocator());
topic_json.AddMember("key_size", key_size_json, document->GetAllocator());
Value value_size_json(PrettyPrinter::Print(value_size, TUnit::BYTES).c_str(),
document->GetAllocator());
topic_json.AddMember("value_size", value_size_json, document->GetAllocator());
Value total_size_json(
PrettyPrinter::Print(key_size + value_size, TUnit::BYTES).c_str(),
document->GetAllocator());
topic_json.AddMember("total_size", total_size_json, document->GetAllocator());
topics.PushBack(topic_json, document->GetAllocator());
}
document->AddMember("topics", topics, document->GetAllocator());
}
void Statestore::SubscribersHandler(const Webserver::ArgumentMap& args,
Document* document) {
lock_guard<mutex> l(subscribers_lock_);
Value subscribers(kArrayType);
for (const SubscriberMap::value_type& subscriber: subscribers_) {
Value sub_json(kObjectType);
Value subscriber_id(subscriber.second->id().c_str(), document->GetAllocator());
sub_json.AddMember("id", subscriber_id, document->GetAllocator());
Value address(lexical_cast<string>(subscriber.second->network_address()).c_str(),
document->GetAllocator());
sub_json.AddMember("address", address, document->GetAllocator());
sub_json.AddMember("num_topics",
static_cast<uint64_t>(subscriber.second->subscribed_topics().size()),
document->GetAllocator());
sub_json.AddMember("num_transient",
static_cast<uint64_t>(subscriber.second->transient_entries().size()),
document->GetAllocator());
Value registration_id(PrintId(subscriber.second->registration_id()).c_str(),
document->GetAllocator());
sub_json.AddMember("registration_id", registration_id, document->GetAllocator());
subscribers.PushBack(sub_json, document->GetAllocator());
}
document->AddMember("subscribers", subscribers, document->GetAllocator());
}
Status Statestore::OfferUpdate(const ScheduledSubscriberUpdate& update,
ThreadPool<ScheduledSubscriberUpdate>* threadpool) {
if (threadpool->GetQueueSize() >= STATESTORE_MAX_SUBSCRIBERS
|| !threadpool->Offer(update)) {
stringstream ss;
ss << "Maximum subscriber limit reached: " << STATESTORE_MAX_SUBSCRIBERS;
lock_guard<mutex> l(subscribers_lock_);
SubscriberMap::iterator subscriber_it = subscribers_.find(update.subscriber_id);
DCHECK(subscriber_it != subscribers_.end());
subscribers_.erase(subscriber_it);
LOG(ERROR) << ss.str();
return Status(ss.str());
}
return Status::OK();
}
Status Statestore::RegisterSubscriber(const SubscriberId& subscriber_id,
const TNetworkAddress& location,
const vector<TTopicRegistration>& topic_registrations,
RegistrationId* registration_id) {
if (subscriber_id.empty()) return Status("Subscriber ID cannot be empty string");
// Create any new topics first, so that when the subscriber is first sent a topic update
// by the worker threads its topics are guaranteed to exist.
{
lock_guard<mutex> l(topic_lock_);
for (const TTopicRegistration& topic: topic_registrations) {
TopicMap::iterator topic_it = topics_.find(topic.topic_name);
if (topic_it == topics_.end()) {
LOG(INFO) << "Creating new topic: ''" << topic.topic_name
<< "' on behalf of subscriber: '" << subscriber_id;
topics_.insert(make_pair(topic.topic_name, Topic(topic.topic_name,
key_size_metric_, value_size_metric_, topic_size_metric_)));
}
}
}
LOG(INFO) << "Registering: " << subscriber_id;
{
lock_guard<mutex> l(subscribers_lock_);
SubscriberMap::iterator subscriber_it = subscribers_.find(subscriber_id);
if (subscriber_it != subscribers_.end()) {
UnregisterSubscriber(subscriber_it->second.get());
}
UUIDToTUniqueId(subscriber_uuid_generator_(), registration_id);
shared_ptr<Subscriber> current_registration(
new Subscriber(subscriber_id, *registration_id, location, topic_registrations));
subscribers_.insert(make_pair(subscriber_id, current_registration));
failure_detector_->UpdateHeartbeat(
PrintId(current_registration->registration_id()), true);
num_subscribers_metric_->set_value(subscribers_.size());
subscriber_set_metric_->Add(subscriber_id);
}
// Add the subscriber to the update queue, with an immediate schedule.
ScheduledSubscriberUpdate update(0, subscriber_id, *registration_id);
RETURN_IF_ERROR(OfferUpdate(update, &subscriber_topic_update_threadpool_));
RETURN_IF_ERROR(OfferUpdate(update, &subscriber_heartbeat_threadpool_));
LOG(INFO) << "Subscriber '" << subscriber_id << "' registered (registration id: "
<< PrintId(*registration_id) << ")";
return Status::OK();
}
bool Statestore::FindSubscriber(const SubscriberId& subscriber_id,
const RegistrationId& registration_id, shared_ptr<Subscriber>* subscriber) {
DCHECK(subscriber != nullptr);
lock_guard<mutex> l(subscribers_lock_);
SubscriberMap::iterator it = subscribers_.find(subscriber_id);
if (it == subscribers_.end() ||
it->second->registration_id() != registration_id) return false;
*subscriber = it->second;
return true;
}
Status Statestore::SendTopicUpdate(Subscriber* subscriber, bool* update_skipped) {
// Time any successful RPCs (i.e. those for which UpdateState() completed, even though
// it may have returned an error.)
MonotonicStopWatch sw;
sw.Start();
// First thing: make a list of updates to send
TUpdateStateRequest update_state_request;
GatherTopicUpdates(*subscriber, &update_state_request);
// Set the expected registration ID, so that the subscriber can reject this update if
// they have moved on to a new registration instance.
update_state_request.__set_registration_id(subscriber->registration_id());
// Second: try and send it
Status status;
StatestoreSubscriberConn client(update_state_client_cache_.get(),
subscriber->network_address(), &status);
RETURN_IF_ERROR(status);
TUpdateStateResponse response;
RETURN_IF_ERROR(client.DoRpc(
&StatestoreSubscriberClientWrapper::UpdateState, update_state_request, &response));
status = Status(response.status);
if (!status.ok()) {
topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
return status;
}
*update_skipped = (response.__isset.skipped && response.skipped);
if (*update_skipped) {
// The subscriber skipped processing this update. We don't consider this a failure
// - subscribers can decide what they do with any update - so, return OK and set
// update_skipped so the caller can compensate.
topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
return Status::OK();
}
// At this point the updates are assumed to have been successfully processed by the
// subscriber. Update the subscriber's max version of each topic.
map<TopicEntryKey, TTopicDelta>::const_iterator topic_delta =
update_state_request.topic_deltas.begin();
for (; topic_delta != update_state_request.topic_deltas.end(); ++topic_delta) {
subscriber->SetLastTopicVersionProcessed(topic_delta->first,
topic_delta->second.to_version);
}
// Thirdly: perform any / all updates returned by the subscriber
{
lock_guard<mutex> l(topic_lock_);
for (const TTopicDelta& update: response.topic_updates) {
TopicMap::iterator topic_it = topics_.find(update.topic_name);
if (topic_it == topics_.end()) {
VLOG(1) << "Received update for unexpected topic:" << update.topic_name;
continue;
}
VLOG_RPC << "Received update for topic " << update.topic_name
<< " from " << subscriber->id() << ", number of entries: "
<< update.topic_entries.size();
// The subscriber sent back their from_version which indicates that they want to
// reset their max version for this topic to this value. The next update sent will
// be from this version.
if (update.__isset.from_version) {
LOG(INFO) << "Received request for different delta base of topic: "
<< update.topic_name << " from: " << subscriber->id()
<< " subscriber from_version: " << update.from_version;
subscriber->SetLastTopicVersionProcessed(topic_it->first, update.from_version);
}
Topic* topic = &topic_it->second;
for (const TTopicItem& item: update.topic_entries) {
TopicEntry::Version version = topic->Put(item.key, item.value);
subscriber->AddTransientUpdate(update.topic_name, item.key, version);
}
for (const string& key: update.topic_deletions) {
TopicEntry::Version version =
topic->Put(key, Statestore::TopicEntry::NULL_VALUE);
subscriber->AddTransientUpdate(update.topic_name, key, version);
}
}
}
topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
return Status::OK();
}
void Statestore::GatherTopicUpdates(const Subscriber& subscriber,
TUpdateStateRequest* update_state_request) {
{
lock_guard<mutex> l(topic_lock_);
for (const Subscriber::Topics::value_type& subscribed_topic:
subscriber.subscribed_topics()) {
TopicMap::const_iterator topic_it = topics_.find(subscribed_topic.first);
DCHECK(topic_it != topics_.end());
TopicEntry::Version last_processed_version =
subscriber.LastTopicVersionProcessed(topic_it->first);
const Topic& topic = topic_it->second;
TTopicDelta& topic_delta =
update_state_request->topic_deltas[subscribed_topic.first];
topic_delta.topic_name = subscribed_topic.first;
// If the subscriber version is > 0, send this update as a delta. Otherwise, this is
// a new subscriber so send them a non-delta update that includes all items in the
// topic.
topic_delta.is_delta = last_processed_version > Subscriber::TOPIC_INITIAL_VERSION;
topic_delta.__set_from_version(last_processed_version);
TopicUpdateLog::const_iterator next_update =
topic.topic_update_log().upper_bound(last_processed_version);
int64_t deleted_key_size_bytes = 0;
for (; next_update != topic.topic_update_log().end(); ++next_update) {
TopicEntryMap::const_iterator itr = topic.entries().find(next_update->second);
DCHECK(itr != topic.entries().end());
const TopicEntry& topic_entry = itr->second;
if (topic_entry.value() == Statestore::TopicEntry::NULL_VALUE) {
if (!topic_delta.is_delta) {
deleted_key_size_bytes += itr->first.size();
continue;
}
topic_delta.topic_deletions.push_back(itr->first);
} else {
topic_delta.topic_entries.push_back(TTopicItem());
TTopicItem& topic_item = topic_delta.topic_entries.back();
topic_item.key = itr->first;
// TODO: Does this do a needless copy?
topic_item.value = topic_entry.value();
}
}
if (!topic_delta.is_delta &&
topic.last_version() > Subscriber::TOPIC_INITIAL_VERSION) {
int64_t topic_size = topic.total_key_size_bytes() - deleted_key_size_bytes
+ topic.total_value_size_bytes();
VLOG_QUERY << "Preparing initial " << topic_delta.topic_name
<< " topic update for " << subscriber.id() << ". Size = "
<< PrettyPrinter::Print(topic_size, TUnit::BYTES);
}
if (topic.topic_update_log().size() > 0) {
// The largest version for this topic will be the last item in the version history
// map.
topic_delta.__set_to_version(topic.topic_update_log().rbegin()->first);
} else {
// There are no updates in the version history
topic_delta.__set_to_version(Subscriber::TOPIC_INITIAL_VERSION);
}
}
}
// Fill in the min subscriber topic version. This must be done after releasing
// topic_lock_.
lock_guard<mutex> l(subscribers_lock_);
typedef map<TopicId, TTopicDelta> TopicDeltaMap;
for (TopicDeltaMap::value_type& topic_delta: update_state_request->topic_deltas) {
topic_delta.second.__set_min_subscriber_topic_version(
GetMinSubscriberTopicVersion(topic_delta.first));
}
}
Statestore::TopicEntry::Version Statestore::GetMinSubscriberTopicVersion(
const TopicId& topic_id, SubscriberId* subscriber_id) {
TopicEntry::Version min_topic_version = numeric_limits<int64_t>::max();
bool found = false;
// Find the minimum version processed for this topic across all topic subscribers.
for (const SubscriberMap::value_type& subscriber: subscribers_) {
if (subscriber.second->subscribed_topics().find(topic_id) !=
subscriber.second->subscribed_topics().end()) {
found = true;
TopicEntry::Version last_processed_version =
subscriber.second->LastTopicVersionProcessed(topic_id);
if (last_processed_version < min_topic_version) {
min_topic_version = last_processed_version;
if (subscriber_id != NULL) *subscriber_id = subscriber.second->id();
}
}
}
return found ? min_topic_version : Subscriber::TOPIC_INITIAL_VERSION;
}
bool Statestore::ShouldExit() {
lock_guard<mutex> l(exit_flag_lock_);
return exit_flag_;
}
void Statestore::SetExitFlag() {
lock_guard<mutex> l(exit_flag_lock_);
exit_flag_ = true;
subscriber_topic_update_threadpool_.Shutdown();
}
Status Statestore::SendHeartbeat(Subscriber* subscriber) {
MonotonicStopWatch sw;
sw.Start();
Status status;
StatestoreSubscriberConn client(heartbeat_client_cache_.get(),
subscriber->network_address(), &status);
RETURN_IF_ERROR(status);
THeartbeatRequest request;
THeartbeatResponse response;
request.__set_registration_id(subscriber->registration_id());
RETURN_IF_ERROR(
client.DoRpc(&StatestoreSubscriberClientWrapper::Heartbeat, request, &response));
heartbeat_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
return Status::OK();
}
void Statestore::DoSubscriberUpdate(bool is_heartbeat, int thread_id,
const ScheduledSubscriberUpdate& update) {
int64_t update_deadline = update.deadline;
shared_ptr<Subscriber> subscriber;
// Check if the subscriber has re-registered, in which case we can ignore
// this scheduled update.
if (!FindSubscriber(update.subscriber_id, update.registration_id, &subscriber)) {
return;
}
const string hb_type = is_heartbeat ? "heartbeat" : "topic update";
if (update_deadline != 0) {
// Wait until deadline.
int64_t diff_ms = update_deadline - UnixMillis();
while (diff_ms > 0) {
SleepForMs(diff_ms);
diff_ms = update_deadline - UnixMillis();
}
// The subscriber can potentially reconnect by the time this thread wakes
// up. In such case, we can ignore this update.
if (UNLIKELY(!FindSubscriber(
subscriber->id(), subscriber->registration_id(), &subscriber))) {
return;
}
diff_ms = std::abs(diff_ms);
VLOG(3) << "Sending " << hb_type << " message to: " << update.subscriber_id
<< " (deadline accuracy: " << diff_ms << "ms)";
if (diff_ms > DEADLINE_MISS_THRESHOLD_MS && is_heartbeat) {
const string& msg = Substitute(
"Missed subscriber ($0) $1 deadline by $2ms, "
"consider increasing --statestore_heartbeat_frequency_ms (currently $3) on "
"this Statestore, and --statestore_subscriber_timeout_seconds "
"on subscribers (Impala daemons and the Catalog Server)",
update.subscriber_id, hb_type, diff_ms,
FLAGS_statestore_heartbeat_frequency_ms);
LOG(WARNING) << msg;
}
// Don't warn for topic updates - they can be slow and still correct. Recommending an
// increase in update period will just confuse (as will increasing the thread pool
// size) because it's hard for users to pick a good value, they may still see these
// messages and it won't be a correctness problem.
} else {
// The first update is scheduled immediately and has a deadline of 0. There's no need
// to wait.
VLOG(3) << "Initial " << hb_type << " message for: " << update.subscriber_id;
}
// Send the right message type, and compute the next deadline
int64_t deadline_ms = 0;
Status status;
if (is_heartbeat) {
status = SendHeartbeat(subscriber.get());
if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
// Add details to status to make it more useful, while preserving the stack
status.AddDetail(Substitute(
"Subscriber $0 timed-out during heartbeat RPC. Timeout is $1s.",
subscriber->id(), FLAGS_statestore_heartbeat_tcp_timeout_seconds));
}
deadline_ms = UnixMillis() + FLAGS_statestore_heartbeat_frequency_ms;
} else {
bool update_skipped;
status = SendTopicUpdate(subscriber.get(), &update_skipped);
if (status.code() == TErrorCode::RPC_RECV_TIMEOUT) {
// Add details to status to make it more useful, while preserving the stack
status.AddDetail(Substitute(
"Subscriber $0 timed-out during topic-update RPC. Timeout is $1s.",
subscriber->id(), FLAGS_statestore_update_tcp_timeout_seconds));
}
// If the subscriber responded that it skipped the last update sent, we assume that
// it was busy doing something else, and back off slightly before sending another.
int64_t update_interval = update_skipped ?
(2 * FLAGS_statestore_update_frequency_ms) :
FLAGS_statestore_update_frequency_ms;
deadline_ms = UnixMillis() + update_interval;
}
{
lock_guard<mutex> l(subscribers_lock_);
// Check again if this registration has been removed while we were processing the
// message.
SubscriberMap::iterator it = subscribers_.find(update.subscriber_id);
if (it == subscribers_.end() ||
it->second->registration_id() != update.registration_id) return;
if (!status.ok()) {
LOG(INFO) << "Unable to send " << hb_type << " message to subscriber "
<< update.subscriber_id << ", received error: " << status.GetDetail();
}
FailureDetector::PeerState state = is_heartbeat ?
failure_detector_->UpdateHeartbeat(update.subscriber_id, status.ok()) :
failure_detector_->GetPeerState(update.subscriber_id);
if (state == FailureDetector::FAILED) {
if (is_heartbeat) {
// TODO: Consider if a metric to track the number of failures would be useful.
LOG(INFO) << "Subscriber '" << subscriber->id() << "' has failed, disconnected "
<< "or re-registered (last known registration ID: "
<< update.registration_id << ")";
UnregisterSubscriber(subscriber.get());
}
} else {
// Schedule the next message.
VLOG(3) << "Next " << (is_heartbeat ? "heartbeat" : "update") << " deadline for: "
<< subscriber->id() << " is in " << deadline_ms << "ms";
status = OfferUpdate(ScheduledSubscriberUpdate(deadline_ms, subscriber->id(),
subscriber->registration_id()), is_heartbeat ?
&subscriber_heartbeat_threadpool_ : &subscriber_topic_update_threadpool_);
if (!status.ok()) {
LOG(INFO) << "Unable to send next " << (is_heartbeat ? "heartbeat" : "update")
<< " message to subscriber '" << subscriber->id() << "': "
<< status.GetDetail();
}
}
}
}
void Statestore::UnregisterSubscriber(Subscriber* subscriber) {
SubscriberMap::const_iterator it = subscribers_.find(subscriber->id());
if (it == subscribers_.end() ||
it->second->registration_id() != subscriber->registration_id()) {
// Already failed and / or replaced with a new registration
return;
}
// Close all active clients so that the next attempt to use them causes a Reopen()
update_state_client_cache_->CloseConnections(subscriber->network_address());
heartbeat_client_cache_->CloseConnections(subscriber->network_address());
// Prevent the failure detector from growing without bound
failure_detector_->EvictPeer(PrintId(subscriber->registration_id()));
// Delete all transient entries
lock_guard<mutex> topic_lock(topic_lock_);
for (Statestore::Subscriber::TransientEntryMap::value_type entry:
subscriber->transient_entries()) {
Statestore::TopicMap::iterator topic_it = topics_.find(entry.first.first);
DCHECK(topic_it != topics_.end());
topic_it->second.DeleteIfVersionsMatch(entry.second, // version
entry.first.second); // key
}
num_subscribers_metric_->Increment(-1L);
subscriber_set_metric_->Remove(subscriber->id());
subscribers_.erase(subscriber->id());
}
void Statestore::MainLoop() {
subscriber_topic_update_threadpool_.Join();
}