blob: 678236e1c4926f41cb9a562acd5c15dc1544d6b2 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "statestore/statestore-subscriber.h"
#include <sstream>
#include <utility>
#include <boost/algorithm/string/join.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/thread/shared_mutex.hpp>
#include <gutil/strings/substitute.h>
#include "common/logging.h"
#include "common/status.h"
#include "statestore/failure-detector.h"
#include "gen-cpp/StatestoreService_types.h"
#include "rpc/rpc-trace.h"
#include "rpc/thrift-util.h"
#include "statestore/statestore-service-client-wrapper.h"
#include "util/time.h"
#include "util/debug-util.h"
#include "common/names.h"
using boost::posix_time::seconds;
using namespace apache::thrift;
using namespace apache::thrift::transport;
using namespace strings;
DEFINE_int32(statestore_subscriber_timeout_seconds, 30, "The amount of time (in seconds)"
" that may elapse before the connection with the statestore is considered lost.");
DEFINE_int32(statestore_subscriber_cnxn_attempts, 10, "The number of times to retry an "
"RPC connection to the statestore. A setting of 0 means retry indefinitely");
DEFINE_int32(statestore_subscriber_cnxn_retry_interval_ms, 3000, "The interval, in ms, "
"to wait between attempts to make an RPC connection to the statestore.");
DECLARE_string(ssl_client_ca_certificate);
DECLARE_string(ssl_server_certificate);
DECLARE_string(ssl_private_key);
DECLARE_string(ssl_private_key_password_cmd);
DECLARE_string(ssl_cipher_list);
DECLARE_string(ssl_minimum_version);
namespace impala {
// Used to identify the statestore in the failure detector
const string STATESTORE_ID = "STATESTORE";
// Template for metrics that measure the processing time for individual topics.
const string CALLBACK_METRIC_PATTERN = "statestore-subscriber.topic-$0.processing-time-s";
// Duration, in ms, to sleep between attempts to reconnect to the
// statestore after a failure.
const int32_t SLEEP_INTERVAL_MS = 5000;
typedef ClientConnection<StatestoreServiceClientWrapper> StatestoreServiceConn;
// Proxy class for the subscriber heartbeat thrift API, which
// translates RPCs into method calls on the local subscriber object.
class StatestoreSubscriberThriftIf : public StatestoreSubscriberIf {
public:
StatestoreSubscriberThriftIf(StatestoreSubscriber* subscriber)
: subscriber_(subscriber) { DCHECK(subscriber != NULL); }
virtual void UpdateState(TUpdateStateResponse& response,
const TUpdateStateRequest& params) {
TUniqueId registration_id;
if (params.__isset.registration_id) {
registration_id = params.registration_id;
}
subscriber_->UpdateState(params.topic_deltas, registration_id,
&response.topic_updates, &response.skipped).ToThrift(&response.status);
// Make sure Thrift thinks the field is set.
response.__set_skipped(response.skipped);
}
virtual void Heartbeat(THeartbeatResponse& response, const THeartbeatRequest& request) {
subscriber_->Heartbeat(request.registration_id);
}
private:
StatestoreSubscriber* subscriber_;
};
StatestoreSubscriber::StatestoreSubscriber(const std::string& subscriber_id,
const TNetworkAddress& heartbeat_address, const TNetworkAddress& statestore_address,
MetricGroup* metrics)
: subscriber_id_(subscriber_id), heartbeat_address_(heartbeat_address),
statestore_address_(statestore_address),
thrift_iface_(new StatestoreSubscriberThriftIf(this)),
failure_detector_(new TimeoutFailureDetector(
seconds(FLAGS_statestore_subscriber_timeout_seconds),
seconds(FLAGS_statestore_subscriber_timeout_seconds / 2))),
is_registered_(false),
client_cache_(new StatestoreClientCache(FLAGS_statestore_subscriber_cnxn_attempts,
FLAGS_statestore_subscriber_cnxn_retry_interval_ms, 0, 0, "",
!FLAGS_ssl_client_ca_certificate.empty())),
metrics_(metrics->GetOrCreateChildGroup("statestore-subscriber")) {
connected_to_statestore_metric_ =
metrics_->AddProperty("statestore-subscriber.connected", false);
last_recovery_duration_metric_ = metrics_->AddGauge(
"statestore-subscriber.last-recovery-duration", 0.0);
last_recovery_time_metric_ = metrics_->AddProperty<string>(
"statestore-subscriber.last-recovery-time", "N/A");
topic_update_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
"statestore-subscriber.topic-update-interval-time");
topic_update_duration_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
"statestore-subscriber.topic-update-duration");
heartbeat_interval_metric_ = StatsMetric<double>::CreateAndRegister(metrics,
"statestore-subscriber.heartbeat-interval-time");
registration_id_metric_ = metrics->AddProperty<string>(
"statestore-subscriber.registration-id", "N/A");
client_cache_->InitMetrics(metrics, "statestore-subscriber.statestore");
}
Status StatestoreSubscriber::AddTopic(const Statestore::TopicId& topic_id,
bool is_transient, const UpdateCallback& callback) {
lock_guard<mutex> l(lock_);
if (is_registered_) return Status("Subscriber already started, can't add new topic");
Callbacks* cb = &(update_callbacks_[topic_id]);
cb->callbacks.push_back(callback);
if (cb->processing_time_metric == NULL) {
cb->processing_time_metric = StatsMetric<double>::CreateAndRegister(metrics_,
CALLBACK_METRIC_PATTERN, topic_id);
}
topic_registrations_[topic_id] = is_transient;
return Status::OK();
}
Status StatestoreSubscriber::Register() {
Status client_status;
StatestoreServiceConn client(client_cache_.get(), statestore_address_, &client_status);
RETURN_IF_ERROR(client_status);
TRegisterSubscriberRequest request;
request.topic_registrations.reserve(update_callbacks_.size());
for (const UpdateCallbacks::value_type& topic: update_callbacks_) {
TTopicRegistration thrift_topic;
thrift_topic.topic_name = topic.first;
thrift_topic.is_transient = topic_registrations_[topic.first];
request.topic_registrations.push_back(thrift_topic);
}
request.subscriber_location = heartbeat_address_;
request.subscriber_id = subscriber_id_;
TRegisterSubscriberResponse response;
RETURN_IF_ERROR(client.DoRpc(&StatestoreServiceClientWrapper::RegisterSubscriber,
request, &response));
Status status = Status(response.status);
if (status.ok()) connected_to_statestore_metric_->set_value(true);
if (response.__isset.registration_id) {
lock_guard<mutex> l(registration_id_lock_);
registration_id_ = response.registration_id;
const string& registration_string = PrintId(registration_id_);
registration_id_metric_->set_value(registration_string);
VLOG(1) << "Subscriber registration ID: " << registration_string;
} else {
VLOG(1) << "No subscriber registration ID received from statestore";
}
topic_update_interval_timer_.Start();
heartbeat_interval_timer_.Start();
return status;
}
Status StatestoreSubscriber::Start() {
Status status;
{
// Take the lock to ensure that, if a topic-update is received during registration
// (perhaps because Register() has succeeded, but we haven't finished setting up state
// on the client side), UpdateState() will reject the message.
lock_guard<mutex> l(lock_);
LOG(INFO) << "Starting statestore subscriber";
// Backend must be started before registration
boost::shared_ptr<TProcessor> processor(
new StatestoreSubscriberProcessor(thrift_iface_));
boost::shared_ptr<TProcessorEventHandler> event_handler(
new RpcEventHandler("statestore-subscriber", metrics_));
processor->setEventHandler(event_handler);
ThriftServerBuilder builder(
"StatestoreSubscriber", processor, heartbeat_address_.port);
if (EnableInternalSslConnections()) {
SSLProtocol ssl_version;
RETURN_IF_ERROR(
SSLProtoVersions::StringToProtocol(FLAGS_ssl_minimum_version, &ssl_version));
LOG(INFO) << "Enabling SSL for Statestore subscriber";
builder.ssl(FLAGS_ssl_server_certificate, FLAGS_ssl_private_key)
.pem_password_cmd(FLAGS_ssl_private_key_password_cmd)
.ssl_version(ssl_version)
.cipher_list(FLAGS_ssl_cipher_list);
}
ThriftServer* server;
RETURN_IF_ERROR(builder.Build(&server));
heartbeat_server_.reset(server);
RETURN_IF_ERROR(heartbeat_server_->Start());
LOG(INFO) << "Registering with statestore";
status = Register();
if (status.ok()) {
is_registered_ = true;
LOG(INFO) << "statestore registration successful";
} else {
LOG(INFO) << "statestore registration unsuccessful: " << status.GetDetail();
}
}
// Registration is finished at this point, so it's fine to release the lock.
RETURN_IF_ERROR(Thread::Create("statestore-subscriber", "recovery-mode-thread",
&StatestoreSubscriber::RecoveryModeChecker, this, &recovery_mode_thread_));
return status;
}
void StatestoreSubscriber::RecoveryModeChecker() {
failure_detector_->UpdateHeartbeat(STATESTORE_ID, true);
// Every few seconds, wake up and check if the failure detector has determined
// that the statestore has failed from our perspective. If so, enter recovery
// mode and try to reconnect, followed by reregistering all subscriptions.
while (true) {
if (failure_detector_->GetPeerState(STATESTORE_ID) == FailureDetector::FAILED) {
// When entering recovery mode, the class-wide lock_ is taken to
// ensure mutual exclusion with any operations in flight.
lock_guard<mutex> l(lock_);
MonotonicStopWatch recovery_timer;
recovery_timer.Start();
connected_to_statestore_metric_->set_value(false);
LOG(INFO) << subscriber_id_
<< ": Connection with statestore lost, entering recovery mode";
uint32_t attempt_count = 1;
while (true) {
LOG(INFO) << "Trying to re-register with statestore, attempt: "
<< attempt_count++;
Status status = Register();
if (status.ok()) {
// Make sure to update failure detector so that we don't immediately fail on the
// next loop while we're waiting for heartbeat messages to resume.
failure_detector_->UpdateHeartbeat(STATESTORE_ID, true);
LOG(INFO) << "Reconnected to statestore. Exiting recovery mode";
// Break out of enclosing while (true) to top of outer-scope loop.
break;
} else {
// Don't exit recovery mode, continue
LOG(WARNING) << "Failed to re-register with statestore: "
<< status.GetDetail();
SleepForMs(SLEEP_INTERVAL_MS);
}
last_recovery_duration_metric_->set_value(
recovery_timer.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
}
// When we're successful in re-registering, we don't do anything
// to re-send our updates to the statestore. It is the
// responsibility of individual clients to post missing updates
// back to the statestore. This saves a lot of complexity where
// we would otherwise have to cache updates here.
last_recovery_duration_metric_->set_value(
recovery_timer.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
last_recovery_time_metric_->set_value(CurrentTimeString());
}
SleepForMs(SLEEP_INTERVAL_MS);
}
}
Status StatestoreSubscriber::CheckRegistrationId(const RegistrationId& registration_id) {
{
lock_guard<mutex> r(registration_id_lock_);
// If this subscriber has just started, the registration_id_ may not have been set
// despite the statestore starting to send updates. The 'unset' RegistrationId is 0:0,
// so we can differentiate between a) an early message from an eager statestore, and
// b) a message that's targeted to a previous registration.
if (registration_id_ != TUniqueId() && registration_id != registration_id_) {
return Status(Substitute("Unexpected registration ID: $0, was expecting $1",
PrintId(registration_id), PrintId(registration_id_)));
}
}
return Status::OK();
}
void StatestoreSubscriber::Heartbeat(const RegistrationId& registration_id) {
const Status& status = CheckRegistrationId(registration_id);
if (status.ok()) {
heartbeat_interval_metric_->Update(
heartbeat_interval_timer_.Reset() / (1000.0 * 1000.0 * 1000.0));
failure_detector_->UpdateHeartbeat(STATESTORE_ID, true);
} else {
VLOG_RPC << "Heartbeat: " << status.GetDetail();
}
}
Status StatestoreSubscriber::UpdateState(const TopicDeltaMap& incoming_topic_deltas,
const RegistrationId& registration_id, vector<TTopicDelta>* subscriber_topic_updates,
bool* skipped) {
// We don't want to block here because this is an RPC, and delaying the return causes
// the statestore to delay sending further messages. The only time that lock_ might be
// taken concurrently is if:
//
// a) another update is still being processed (i.e. is still in UpdateState()). This
// could happen only when the subscriber has re-registered, and the statestore is still
// sending an update for the previous registration. In this case, return OK but set
// *skipped = true to tell the statestore to retry this update in the future.
//
// b) the subscriber is recovering, and has the lock held during
// RecoveryModeChecker(). Similarly, we set *skipped = true.
// TODO: Consider returning an error in this case so that the statestore will eventually
// stop sending updates even if re-registration fails.
try_mutex::scoped_try_lock l(lock_);
if (l) {
*skipped = false;
RETURN_IF_ERROR(CheckRegistrationId(registration_id));
// Only record updates received when not in recovery mode
topic_update_interval_metric_->Update(
topic_update_interval_timer_.Reset() / (1000.0 * 1000.0 * 1000.0));
MonotonicStopWatch sw;
sw.Start();
// Check the version ranges of all delta updates to ensure they can be applied
// to this subscriber. If any invalid ranges are found, request new update(s) with
// version ranges applicable to this subscriber.
bool found_unexpected_delta = false;
for (const TopicDeltaMap::value_type& delta: incoming_topic_deltas) {
TopicVersionMap::const_iterator itr = current_topic_versions_.find(delta.first);
if (itr != current_topic_versions_.end()) {
if (delta.second.is_delta && delta.second.from_version != itr->second) {
LOG(ERROR) << "Unexpected delta update to topic '" << delta.first << "' of "
<< "version range (" << delta.second.from_version << ":"
<< delta.second.to_version << "]. Expected delta start version: "
<< itr->second;
subscriber_topic_updates->push_back(TTopicDelta());
TTopicDelta& update = subscriber_topic_updates->back();
update.topic_name = delta.second.topic_name;
update.__set_from_version(itr->second);
found_unexpected_delta = true;
} else {
// Update the current topic version
current_topic_versions_[delta.first] = delta.second.to_version;
}
}
}
// Skip calling the callbacks when an unexpected delta update is found.
if (!found_unexpected_delta) {
for (const UpdateCallbacks::value_type& callbacks: update_callbacks_) {
MonotonicStopWatch sw;
sw.Start();
for (const UpdateCallback& callback: callbacks.second.callbacks) {
// TODO: Consider filtering the topics to only send registered topics to
// callbacks
callback(incoming_topic_deltas, subscriber_topic_updates);
}
callbacks.second.processing_time_metric->Update(
sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
}
}
sw.Stop();
topic_update_duration_metric_->Update(sw.ElapsedTime() / (1000.0 * 1000.0 * 1000.0));
} else {
*skipped = true;
}
return Status::OK();
}
}