blob: 047c634c54e0beaba04c19578fb061c51c8e3a87 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "manager/instance-server.h"
#include <iostream>
#include <map>
#include <string>
#include <unordered_set>
#include <vector>
#include "manager/checkpoint-gateway.h"
#include "util/neighbour-calculator.h"
#include "manager/stmgr.h"
#include "proto/messages.h"
#include "basics/basics.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
#include "config/helper.h"
#include "config/heron-internals-config-reader.h"
#include "config/topology-config-helper.h"
#include "metrics/metrics.h"
namespace heron {
namespace stmgr {
using std::make_shared;
// Num data tuples sent to instances associated with this stream manager
const sp_string METRIC_DATA_TUPLES_TO_INSTANCES = "__tuples_to_workers";
// Num ack tuples sent to instances associated with this stream manager
const sp_string METRIC_ACK_TUPLES_TO_INSTANCES = "__ack_tuples_to_workers";
// Num fail tuples sent to instances associated with this stream manager
const sp_string METRIC_FAIL_TUPLES_TO_INSTANCES = "__fail_tuples_to_workers";
// Bytes sent to instances
const sp_string METRIC_BYTES_TO_INSTANCES = "__bytes_to_workers";
// Num data tuples from instances associated with this stream manager
const sp_string METRIC_DATA_TUPLES_FROM_INSTANCES = "__tuples_from_workers";
// Num ack tuples from instances associated with this stream manager
const sp_string METRIC_ACK_TUPLES_FROM_INSTANCES = "__ack_tuples_from_workers";
// Num fail tuples from instances associated with this stream manager
const sp_string METRIC_FAIL_TUPLES_FROM_INSTANCES = "__fail_tuples_from_workers";
// Bytes received from instances
const sp_string METRIC_BYTES_FROM_INSTANCES = "__bytes_from_workers";
// Num data tuples lost since instances is not connected
const sp_string METRIC_DATA_TUPLES_TO_INSTANCES_LOST = "__tuples_to_workers_lost";
// Num ack tuples lost since instances is not connected
const sp_string METRIC_ACK_TUPLES_TO_INSTANCES_LOST = "__ack_tuples_to_workers_lost";
// Num fail tuples lost since instances is not connected
const sp_string METRIC_FAIL_TUPLES_TO_INSTANCES_LOST = "__fail_tuples_to_workers_lost";
// Num bytes lost since instances is not connected
const sp_string METRIC_BYTES_TO_INSTANCES_LOST = "__bytes_to_workers_lost";
// Time spent in back pressure caused by instances managed by this stmgr.
const sp_string METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_LOCAL_INSTANCE =
"__time_spent_back_pressure_by_local_instance";
// Time spent in back pressure aggregated - back pressure initiated by us +
// others
const sp_string METRIC_TIME_SPENT_BACK_PRESSURE_AGGR = "__server/__time_spent_back_pressure_aggr";
// Time spent in back pressure because of a component id. The comp id will be
// appended to the string below
const sp_string METRIC_TIME_SPENT_BACK_PRESSURE_COMPID = "__time_spent_back_pressure_by_compid/";
// Prefix for connection buffer's metrics
const sp_string CONNECTION_BUFFER_BY_INSTANCEID = "__connection_buffer_by_instanceid/";
// Prefix for connection buffer's length metrics. This is different
// from METRIC_DATA_TUPLES_TO_INSTANCES as that counts
// the tuples when they are sent to the instance -- this metric
// will be used to count the tuples as they are received
const sp_string CONNECTION_BUFFER_LENGTH_BY_INSTANCEID =
"__connection_buffer_length_by_instanceid/";
// TODO(mfu): Read this value from config
const sp_int64 SYSTEM_METRICS_SAMPLE_INTERVAL_MICROSECOND = 10_s;
InstanceServer::InstanceServer(
shared_ptr<EventLoop> eventLoop,
const NetworkOptions& _options,
const sp_string& _topology_name, const sp_string& _topology_id,
const sp_string& _stmgr_id,
const std::vector<sp_string>& _expected_instances,
StMgr* _stmgr,
shared_ptr<heron::common::MetricsMgrSt> const& _metrics_manager_client,
shared_ptr<NeighbourCalculator> _neighbour_calculator,
bool _droptuples_upon_backpressure)
: Server(eventLoop, _options),
topology_name_(_topology_name),
topology_id_(_topology_id),
stmgr_id_(_stmgr_id),
expected_instances_(_expected_instances),
stmgr_(_stmgr),
metrics_manager_client_(_metrics_manager_client),
neighbour_calculator_(_neighbour_calculator),
droptuples_upon_backpressure_(_droptuples_upon_backpressure) {
// instance related handlers
InstallRequestHandler(&InstanceServer::HandleRegisterInstanceRequest);
InstallMessageHandler(&InstanceServer::HandleTupleSetMessage);
InstallMessageHandler(&InstanceServer::HandleStoreInstanceStateCheckpointMessage);
InstallMessageHandler(&InstanceServer::HandleRestoreInstanceStateResponse);
instance_server_metrics_ = make_shared<heron::common::MultiCountMetric>();
back_pressure_metric_aggr_ = make_shared<heron::common::TimeSpentMetric>();
metrics_manager_client_->register_metric("__server", instance_server_metrics_);
metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_AGGR,
back_pressure_metric_aggr_);
back_pressure_metric_caused_by_local_instances_ = make_shared<heron::common::TimeSpentMetric>();
metrics_manager_client_->register_metric(METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_LOCAL_INSTANCE,
back_pressure_metric_caused_by_local_instances_);
spouts_under_back_pressure_ = false;
// Update queue related metrics every 10 seconds
CHECK_GT(eventLoop_->registerTimer([this](EventLoop::Status status) {
this->UpdateQueueMetrics(status);
}, true, SYSTEM_METRICS_SAMPLE_INTERVAL_MICROSECOND), 0);
sp_uint64 drain_threshold_bytes =
config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrStatefulBufferSizeMb() * 1_MB;
stateful_gateway_ = make_shared<CheckpointGateway>(drain_threshold_bytes, neighbour_calculator_,
metrics_manager_client_,
std::bind(&InstanceServer::DrainTupleSet, this, std::placeholders::_1, std::placeholders::_2),
std::bind(&InstanceServer::DrainTupleStream, this, std::placeholders::_1),
std::bind(&InstanceServer::DrainCheckpoint, this, std::placeholders::_1,
std::placeholders::_2));
}
InstanceServer::~InstanceServer() {
Stop();
// Unregister and delete the metrics.
for (auto immIter = instance_metric_map_.begin();
immIter != instance_metric_map_.end();
immIter = instance_metric_map_.erase(immIter)) {
const sp_string& instance_id = immIter->first;
for (auto iter = instance_info_.begin(); iter != instance_info_.end(); ++iter) {
if (iter->second->instance_->instance_id() != instance_id) continue;
InstanceData* data = iter->second;
Connection* iConn = data->conn_;
if (!iConn) break;
sp_string metric_name = MakeBackPressureCompIdMetricName(instance_id);
metrics_manager_client_->unregister_metric(metric_name);
}
}
// Clean the connection_buffer_metric_map
for (auto qmmIter = connection_buffer_metric_map_.begin();
qmmIter != connection_buffer_metric_map_.end();) {
const sp_string& instance_id = qmmIter->first;
for (auto iter = instance_info_.begin(); iter != instance_info_.end(); ++iter) {
if (iter->second->instance_->instance_id() != instance_id) continue;
InstanceData* data = iter->second;
Connection* iConn = data->conn_;
if (!iConn) break;
sp_string metric_name = MakeQueueSizeCompIdMetricName(instance_id);
metrics_manager_client_->unregister_metric(metric_name);
}
qmmIter = connection_buffer_metric_map_.erase(qmmIter);
}
// Clean the connection_buffer_length_metric_map
for (auto qlmIter = connection_buffer_length_metric_map_.begin();
qlmIter != connection_buffer_length_metric_map_.end();) {
const sp_string& instance_id = qlmIter->first;
sp_string metric_name = MakeQueueLengthCompIdMetricName(instance_id);
metrics_manager_client_->unregister_metric(metric_name);
qlmIter = connection_buffer_length_metric_map_.erase(qlmIter);
}
metrics_manager_client_->unregister_metric("__server");
metrics_manager_client_->unregister_metric(METRIC_TIME_SPENT_BACK_PRESSURE_AGGR);
metrics_manager_client_->unregister_metric(
METRIC_TIME_SPENT_BACK_PRESSURE_CAUSED_BY_LOCAL_INSTANCE);
// cleanup the instance info
for (auto iter = instance_info_.begin(); iter != instance_info_.end(); ++iter) {
delete iter->second;
}
}
void InstanceServer::GetInstanceInfo(std::vector<proto::system::Instance*>& _return) {
for (auto iter = instance_info_.begin(); iter != instance_info_.end(); ++iter) {
_return.push_back(iter->second->instance_);
}
}
proto::system::Instance* InstanceServer::GetInstanceInfo(sp_int32 _task_id) {
auto iter = instance_info_.find(_task_id);
if (iter == instance_info_.end()) {
return NULL;
} else {
return iter->second->instance_;
}
}
sp_string InstanceServer::MakeBackPressureCompIdMetricName(const sp_string& instanceid) {
return METRIC_TIME_SPENT_BACK_PRESSURE_COMPID + instanceid;
}
sp_string InstanceServer::MakeQueueSizeCompIdMetricName(const sp_string& instanceid) {
return CONNECTION_BUFFER_BY_INSTANCEID + instanceid;
}
sp_string InstanceServer::MakeQueueLengthCompIdMetricName(const sp_string& instanceid) {
return CONNECTION_BUFFER_LENGTH_BY_INSTANCEID + instanceid;
}
void InstanceServer::UpdateQueueMetrics(EventLoop::Status) {
for (auto itr = active_instances_.begin(); itr != active_instances_.end(); ++itr) {
sp_int32 task_id = itr->second;
const sp_string& instance_id = instance_info_[task_id]->instance_->instance_id();
sp_int32 bytes = itr->first->getOutstandingBytes();
connection_buffer_metric_map_[instance_id]->scope("bytes")->record(bytes);
}
}
void InstanceServer::HandleNewConnection(Connection* _conn) {
// There is nothing to be done here. Instead we wait
// for the register/hello
LOG(INFO) << "Instance Server got new connection " << _conn << " from "
<< _conn->getIPAddress() << ":" << _conn->getPort();
}
void InstanceServer::HandleConnectionClose(Connection* _conn, NetworkErrorCode) {
LOG(INFO) << "Instance Server got connection close of " << _conn << " from "
<< _conn->getIPAddress() << ":" << _conn->getPort();
// Did the instance ever announce back pressure to us
if (remote_ends_who_caused_back_pressure_.find(GetInstanceName(_conn)) !=
remote_ends_who_caused_back_pressure_.end()) {
_conn->unsetCausedBackPressure();
remote_ends_who_caused_back_pressure_.erase(GetInstanceName(_conn));
auto instance_metric = instance_metric_map_[GetInstanceName(_conn)];
instance_metric->Stop();
if (!stmgr_->DidAnnounceBackPressure()) {
stmgr_->SendStopBackPressureToOtherStMgrs();
}
}
// Note: Connections to other stream managers get handled in StmgrClient
// Now attempt to stop the back pressure
AttemptStopBackPressureFromSpouts();
auto iiter = active_instances_.find(_conn);
if (iiter != active_instances_.end()) {
sp_int32 task_id = iiter->second;
CHECK(instance_info_.find(task_id) != instance_info_.end());
sp_string instance_id = instance_info_[task_id]->instance_->instance_id();
LOG(INFO) << "Instance " << instance_id << " closed connection";
// Remove the connection from active instances
active_instances_.erase(_conn);
// Set the connection to null. Do not delete the structure
instance_info_[task_id]->set_connection(NULL);
// Clean the instance_metric_map
auto immiter = instance_metric_map_.find(instance_id);
if (immiter != instance_metric_map_.end()) {
metrics_manager_client_->unregister_metric(MakeBackPressureCompIdMetricName(instance_id));
instance_metric_map_.erase(instance_id);
}
// Clean the connection_buffer_metric_map_
auto qmmiter = connection_buffer_metric_map_.find(instance_id);
if (qmmiter != connection_buffer_metric_map_.end()) {
metrics_manager_client_->unregister_metric(MakeQueueSizeCompIdMetricName(instance_id));
connection_buffer_metric_map_.erase(instance_id);
}
// Clean the connection_buffer_length_metric_map_
auto qlmiter = connection_buffer_length_metric_map_.find(instance_id);
if (qlmiter != connection_buffer_length_metric_map_.end()) {
metrics_manager_client_->unregister_metric(MakeQueueLengthCompIdMetricName(instance_id));
connection_buffer_length_metric_map_.erase(instance_id);
}
stmgr_->HandleDeadInstance(task_id);
}
}
void InstanceServer::HandleRegisterInstanceRequest(REQID _reqid, Connection* _conn,
pool_unique_ptr<proto::stmgr::RegisterInstanceRequest> _request) {
LOG(INFO) << "Got HandleRegisterInstanceRequest from connection " << _conn << " and instance "
<< _request->instance().instance_id();
// Do some basic checks
bool error = false;
if (_request->topology_name() != topology_name_ || _request->topology_id() != topology_id_) {
LOG(ERROR) << "Invalid topology name/id in register instance request"
<< " Found " << _request->topology_name() << " and " << _request->topology_id();
error = true;
}
const sp_string& instance_id = _request->instance().instance_id();
sp_int32 task_id = _request->instance().info().task_id();
if (!error) {
bool expected = false;
for (size_t i = 0; i < expected_instances_.size(); ++i) {
if (expected_instances_[i] == instance_id) {
expected = true;
break;
}
}
if (!expected) {
LOG(ERROR) << "Unexpected instance in register instance request " << instance_id;
error = true;
}
}
if (instance_info_.find(task_id) != instance_info_.end() &&
instance_info_[task_id]->conn_ != NULL) {
LOG(ERROR) << "Instance with the same task id already exists in our map " << instance_id;
LOG(ERROR) << "Closing the old connection";
instance_info_[task_id]->conn_->closeConnection();
error = true;
}
if (error) {
proto::stmgr::RegisterInstanceResponse response;
response.mutable_status()->set_status(proto::system::NOTOK);
SendResponse(_reqid, _conn, response);
} else {
LOG(INFO) << "New instance registered with us " << instance_id;
active_instances_[_conn] = task_id;
if (instance_info_.find(task_id) == instance_info_.end()) {
instance_info_[task_id] = new InstanceData(_request->release_instance());
}
// Create a metric for this instance
if (instance_metric_map_.find(instance_id) == instance_metric_map_.end()) {
auto instance_metric = make_shared<heron::common::TimeSpentMetric>();
metrics_manager_client_->register_metric(MakeBackPressureCompIdMetricName(instance_id),
instance_metric);
instance_metric_map_[instance_id] = instance_metric;
}
if (connection_buffer_metric_map_.find(instance_id) == connection_buffer_metric_map_.end()) {
auto queue_metric = make_shared<heron::common::MultiMeanMetric>();
metrics_manager_client_->register_metric(MakeQueueSizeCompIdMetricName(instance_id),
queue_metric);
connection_buffer_metric_map_[instance_id] = queue_metric;
}
if (connection_buffer_length_metric_map_.find(instance_id) ==
connection_buffer_length_metric_map_.end()) {
task_id_to_name[task_id] = instance_id;
auto queue_metric = make_shared<heron::common::MultiCountMetric>();
metrics_manager_client_->register_metric(MakeQueueLengthCompIdMetricName(instance_id),
queue_metric);
connection_buffer_length_metric_map_[instance_id] = queue_metric;
}
instance_info_[task_id]->set_connection(_conn);
proto::stmgr::RegisterInstanceResponse response;
response.mutable_status()->set_status(proto::system::OK);
const shared_ptr<proto::system::PhysicalPlan> pplan = stmgr_->GetPhysicalPlan();
if (pplan) {
response.mutable_pplan()->CopyFrom(*pplan);
}
SendResponse(_reqid, _conn, response);
// Apply rate limit to the connection
if (pplan) {
const std::string component = instance_info_[task_id]->instance_->info().component_name();
SetRateLimit(*pplan, component, _conn);
}
// Have all the instances connected to us?
if (HaveAllInstancesConnectedToUs()) {
// Notify to stmgr so that it might want to connect to tmaster
stmgr_->HandleAllInstancesConnected();
}
}
}
void InstanceServer::HandleTupleSetMessage(Connection* _conn,
pool_unique_ptr<proto::system::HeronTupleSet> _message) {
auto iter = active_instances_.find(_conn);
if (iter == active_instances_.end()) {
LOG(ERROR) << "Received TupleSet from unknown instance connection. Dropping..";
return;
}
if (_message->has_data()) {
instance_server_metrics_->scope(METRIC_DATA_TUPLES_FROM_INSTANCES)
->incr_by(_message->data().tuples_size());
} else if (_message->has_control()) {
instance_server_metrics_->scope(METRIC_ACK_TUPLES_FROM_INSTANCES)
->incr_by(_message->control().acks_size());
instance_server_metrics_->scope(METRIC_FAIL_TUPLES_FROM_INSTANCES)
->incr_by(_message->control().fails_size());
}
stmgr_->HandleInstanceData(iter->second, instance_info_[iter->second]->local_spout_,
std::move(_message));
}
void InstanceServer::SendToInstance2(pool_unique_ptr<proto::stmgr::TupleStreamMessage> _message) {
sp_string instance_id = task_id_to_name[_message->task_id()];
ConnectionBufferLengthMetricMap::const_iterator it =
connection_buffer_length_metric_map_.find(instance_id);
if ( it != connection_buffer_length_metric_map_.end() )
connection_buffer_length_metric_map_
[instance_id]->scope("packets")->incr_by(_message->num_tuples());
stateful_gateway_->SendToInstance(std::move(_message));
}
void InstanceServer::DrainTupleStream(proto::stmgr::TupleStreamMessage* _message) {
sp_int32 task_id = _message->task_id();
TaskIdInstanceDataMap::iterator iter = instance_info_.find(task_id);
if (iter == instance_info_.end() || iter->second->conn_ == NULL) {
LOG_EVERY_N(ERROR, 100) << "task_id " << task_id << " has not yet connected to us. Dropping...";
} else {
SendMessage(iter->second->conn_, _message->set().size(),
heron_tuple_set_2_, _message->set().c_str());
}
__global_protobuf_pool_release__(_message);
}
void InstanceServer::SendToInstance2(sp_int32 _task_id,
proto::system::HeronTupleSet2* _message) {
if (_message->has_data()) {
sp_string instance_id = task_id_to_name[_task_id];
ConnectionBufferLengthMetricMap::const_iterator it =
connection_buffer_length_metric_map_.find(instance_id);
if ( it != connection_buffer_length_metric_map_.end() )
connection_buffer_length_metric_map_
[instance_id]->scope("packets")->incr_by(_message->data().tuples_size());
}
stateful_gateway_->SendToInstance(_task_id, _message);
}
void InstanceServer::DrainTupleSet(sp_int32 _task_id,
proto::system::HeronTupleSet2* _message) {
TaskIdInstanceDataMap::iterator iter = instance_info_.find(_task_id);
if (iter == instance_info_.end() || iter->second->conn_ == NULL
|| (droptuples_upon_backpressure_ && iter->second->conn_->hasCausedBackPressure())) {
LOG_EVERY_N(ERROR, 100) << "task_id " << _task_id << " has not yet connected to us or is not "
<< "keeping up and droptuples_upon_backpressure is set to true. "
<< "Dropping...";
if (_message->has_data()) {
instance_server_metrics_->scope(METRIC_DATA_TUPLES_TO_INSTANCES_LOST)
->incr_by(_message->data().tuples_size());
} else if (_message->has_control()) {
instance_server_metrics_->scope(METRIC_ACK_TUPLES_TO_INSTANCES_LOST)
->incr_by(_message->control().acks_size());
instance_server_metrics_->scope(METRIC_FAIL_TUPLES_TO_INSTANCES_LOST)
->incr_by(_message->control().fails_size());
}
} else {
if (_message->has_data()) {
instance_server_metrics_->scope(METRIC_DATA_TUPLES_TO_INSTANCES)
->incr_by(_message->data().tuples_size());
} else if (_message->has_control()) {
instance_server_metrics_->scope(METRIC_ACK_TUPLES_TO_INSTANCES)
->incr_by(_message->control().acks_size());
instance_server_metrics_->scope(METRIC_FAIL_TUPLES_TO_INSTANCES)
->incr_by(_message->control().fails_size());
}
SendMessage(iter->second->conn_, *_message);
}
__global_protobuf_pool_release__(_message);
}
void InstanceServer::DrainCheckpoint(sp_int32 _task_id,
pool_unique_ptr<proto::ckptmgr::InitiateStatefulCheckpoint> _message) {
TaskIdInstanceDataMap::iterator iter = instance_info_.find(_task_id);
if (iter == instance_info_.end() || iter->second->conn_ == NULL) {
LOG(ERROR) << "task_id " << _task_id << " has not yet connected to us. Dropping...";
} else {
LOG(INFO) << "Sending Initiate Checkpoint Message to local task " << _task_id;
SendMessage(iter->second->conn_, *_message);
}
}
void InstanceServer::BroadcastNewPhysicalPlan(const proto::system::PhysicalPlan& _pplan) {
// TODO(vikasr) We do not handle any changes to our local assignment
LOG(INFO) << "Broadcasting new PhysicalPlan:";
config::TopologyConfigHelper::LogTopology(_pplan.topology());
ComputeLocalSpouts(_pplan);
proto::stmgr::NewInstanceAssignmentMessage new_assignment;
new_assignment.mutable_pplan()->CopyFrom(_pplan);
for (auto iter = active_instances_.begin(); iter != active_instances_.end(); ++iter) {
LOG(INFO) << "Sending new physical plan to instance with task_id: " << iter->second;
Connection* conn = iter->first;
SendMessage(conn, new_assignment);
// Update connection's rate limiting base on component config
sp_int32 id = iter->second;
const std::string component = instance_info_[id]->instance_->info().component_name();
SetRateLimit(_pplan, component, conn);
}
}
void InstanceServer::BroadcastStatefulCheckpointSaved(
const proto::ckptmgr::StatefulConsistentCheckpointSaved& _msg) {
for (auto & iter : active_instances_) {
LOG(INFO) << "Sending checkpoint: " << _msg.consistent_checkpoint().checkpoint_id()
<< " saved message to instance with task_id: " << iter.second;
SendMessage(iter.first, _msg);
}
}
void InstanceServer::SetRateLimit(const proto::system::PhysicalPlan& _pplan,
const std::string& _component,
Connection* _conn) const {
sp_int64 read_bps =
config::TopologyConfigHelper::GetComponentOutputBPS(_pplan.topology(), _component);
sp_int32 parallelism =
config::TopologyConfigHelper::GetComponentParallelism(_pplan.topology(), _component);
// burst rate is 1.5 x of regular rate
sp_int64 burst_read_bps = read_bps + read_bps / 2;
LOG(INFO) << "Parallelism of component " << _component << " is " << parallelism;
LOG(INFO) << "Read BPS of component " << _component << " is " << read_bps;
if (parallelism > 0 && read_bps >= 0 && burst_read_bps >= 0) {
LOG(INFO) << "Set rate limit in " << _component << " to " << read_bps << "/" << burst_read_bps;
_conn->setRateLimit(read_bps / parallelism, burst_read_bps / parallelism);
} else {
LOG(INFO) << "Disable rate limit in " << _component;
_conn->disableRateLimit();
}
}
void InstanceServer::ComputeLocalSpouts(const proto::system::PhysicalPlan& _pplan) {
std::unordered_set<sp_int32> local_spouts;
config::PhysicalPlanHelper::GetLocalSpouts(_pplan, stmgr_id_, local_spouts);
for (auto iter = instance_info_.begin(); iter != instance_info_.end(); ++iter) {
if (local_spouts.find(iter->first) != local_spouts.end()) {
iter->second->set_local_spout();
}
}
}
sp_string InstanceServer::GetInstanceName(Connection* _connection) {
// Indicate which instance component had back pressure
auto itr = active_instances_.find(_connection);
if (itr != active_instances_.end()) {
sp_int32 task_id = itr->second;
const sp_string& instance_id = instance_info_[task_id]->instance_->instance_id();
return instance_id;
}
return "";
}
// This function is called when the buffer in the connection is full (the instance is not consuming
// tuples fast enough).
void InstanceServer::StartBackPressureConnectionCb(Connection* _connection) {
// The connection will notify us when we can stop the back pressure
_connection->setCausedBackPressure();
// Find the instance this connection belongs to
sp_string instance_name = GetInstanceName(_connection);
CHECK_NE(instance_name, "");
LOG(WARNING) << "We observe back pressure on sending data to instance " << instance_name;
if (droptuples_upon_backpressure_) {
// We will drop further tuples to this instance until the backpressure lets off
LOG(WARNING) << "Backpressure mechanism not initiated since droptuples_upon_backpressure "
<< "is set";
return;
}
if (!stmgr_->DidAnnounceBackPressure()) {
stmgr_->SendStartBackPressureToOtherStMgrs();
// Start backpressure from local instances metric
back_pressure_metric_caused_by_local_instances_->Start();
}
// Indicate which instance component had back pressure
auto instance_metric = instance_metric_map_[instance_name];
instance_metric->Start();
remote_ends_who_caused_back_pressure_.insert(instance_name);
StartBackPressureOnSpouts();
}
// This function is called when the buffer in the connection is empty (the tuples are drained).
void InstanceServer::StopBackPressureConnectionCb(Connection* _connection) {
_connection->unsetCausedBackPressure();
// Find the instance this connection belongs to
sp_string instance_name = GetInstanceName(_connection);
CHECK_NE(instance_name, "");
LOG(WARNING) << "We don't observe back pressure now on sending data to instance "
<< instance_name;
if (droptuples_upon_backpressure_) {
// We had not initiated any kind of backpressure mechanism in this mode.
LOG(WARNING) << "Backpressure mechanism not initiated since droptuples_upon_backpressure "
<< "is set";
return;
}
CHECK(remote_ends_who_caused_back_pressure_.find(instance_name) !=
remote_ends_who_caused_back_pressure_.end());
remote_ends_who_caused_back_pressure_.erase(instance_name);
// Indicate which instance component stopped back pressure
auto instance_metric = instance_metric_map_[instance_name];
instance_metric->Stop();
if (!stmgr_->DidAnnounceBackPressure()) {
stmgr_->SendStopBackPressureToOtherStMgrs();
// Stop backpressure from local instances metric
back_pressure_metric_caused_by_local_instances_->Stop();
}
AttemptStopBackPressureFromSpouts();
}
void InstanceServer::StartBackPressureOnSpouts() {
if (!spouts_under_back_pressure_) {
LOG(WARNING) << "Stopping reading from spouts to do back pressure";
spouts_under_back_pressure_ = true;
// Put back pressure on all spouts
for (auto iiter = instance_info_.begin(); iiter != instance_info_.end(); ++iiter) {
if (!iiter->second->local_spout_) continue;
if (!iiter->second->conn_) continue;
if (!iiter->second->conn_->isUnderBackPressure()) iiter->second->conn_->putBackPressure();
}
back_pressure_metric_aggr_->Start();
}
}
void InstanceServer::AttemptStopBackPressureFromSpouts() {
if (spouts_under_back_pressure_ && !stmgr_->DidAnnounceBackPressure() &&
!stmgr_->DidOthersAnnounceBackPressure()) {
LOG(INFO) << "Starting reading from spouts to relieve back pressure";
spouts_under_back_pressure_ = false;
// Remove backpressure from all pipes
for (auto iiter = instance_info_.begin(); iiter != instance_info_.end(); ++iiter) {
if (!iiter->second->local_spout_) continue;
if (!iiter->second->conn_) continue;
if (iiter->second->conn_->isUnderBackPressure()) iiter->second->conn_->removeBackPressure();
}
back_pressure_metric_aggr_->Stop();
}
}
void InstanceServer::InitiateStatefulCheckpoint(const sp_string& _checkpoint_tag) {
for (auto iter = instance_info_.begin(); iter != instance_info_.end(); ++iter) {
// Checkpoint markers originate from spouts.
if (iter->second->is_local_spout() && iter->second->conn_) {
LOG(INFO) << "Propagating Initiate Stateful Checkpoint for "
<< _checkpoint_tag << " to local spout "
<< iter->second->instance_->info().component_name()
<< " with task_id " << iter->second->instance_->info().task_id();
auto message = make_unique<proto::ckptmgr::InitiateStatefulCheckpoint>();
message->set_checkpoint_id(_checkpoint_tag);
SendMessage(iter->second->conn_, *message);
}
}
}
void InstanceServer::HandleStoreInstanceStateCheckpointMessage(Connection* _conn,
pool_unique_ptr<proto::ckptmgr::StoreInstanceStateCheckpoint> _message) {
ConnectionTaskIdMap::iterator iter = active_instances_.find(_conn);
if (iter == active_instances_.end()) {
LOG(ERROR) << "Hmm.. Got InstaceStateCheckpoint Message from an unknown connection";
return;
}
sp_int32 task_id = iter->second;
TaskIdInstanceDataMap::iterator it = instance_info_.find(task_id);
if (it == instance_info_.end()) {
LOG(ERROR) << "Hmm.. Got InstaceStateCheckpoint Message from unknown task_id "
<< task_id;
return;
}
// send the checkpoint message to all downstream task ids
stmgr_->HandleStoreInstanceStateCheckpoint(_message->state(), *(it->second->instance_));
}
void InstanceServer::HandleRestoreInstanceStateResponse(Connection* _conn,
pool_unique_ptr<proto::ckptmgr::RestoreInstanceStateResponse> _message) {
ConnectionTaskIdMap::iterator iter = active_instances_.find(_conn);
if (iter == active_instances_.end()) {
LOG(ERROR) << "Hmm.. Got RestoreInstanceStateResponse Message from an unknown connection";
return;
}
sp_int32 task_id = iter->second;
TaskIdInstanceDataMap::iterator it = instance_info_.find(task_id);
if (it == instance_info_.end()) {
LOG(ERROR) << "Hmm.. Got RestoreInstanceStateResponse Message from unknown task_id "
<< task_id;
return;
}
// send the checkpoint message to all downstream task ids
stmgr_->HandleRestoreInstanceStateResponse(task_id, _message->status(),
_message->checkpoint_id());
}
void InstanceServer::HandleCheckpointMarker(sp_int32 _src_task_id, sp_int32 _destination_task_id,
const sp_string& _checkpoint_id) {
// So received a checkpoint marker from an upstream task
stateful_gateway_->HandleUpstreamMarker(_src_task_id, _destination_task_id, _checkpoint_id);
}
bool InstanceServer::SendRestoreInstanceStateRequest(sp_int32 _task_id,
const proto::ckptmgr::InstanceStateCheckpoint& _state) {
LOG(INFO) << "Sending RestoreInstanceState request to task " << _task_id;
if (instance_info_.find(_task_id) == instance_info_.end()) {
LOG(WARNING) << "Cannot send RestoreInstanceState Request to task "
<< _task_id << " because it is not connected to us";
return false;
}
Connection* conn = instance_info_[_task_id]->conn_;
if (conn) {
auto message = make_unique<proto::ckptmgr::RestoreInstanceStateRequest>();
message->mutable_state()->CopyFrom(_state);
SendMessage(conn, *message);
return true;
} else {
LOG(WARNING) << "Cannot send RestoreInstanceState Request to task "
<< _task_id << " because it is not connected to us";
return false;
}
}
void InstanceServer::SendStartInstanceStatefulProcessing(const std::string& _ckpt_id) {
for (auto kv : instance_info_) {
Connection* conn = kv.second->conn_;
if (conn) {
auto message = make_unique<proto::ckptmgr::StartInstanceStatefulProcessing>();
message->set_checkpoint_id(_ckpt_id);
SendMessage(conn, *message);
} else {
LOG(WARNING) << "Cannot send StartInstanceStatefulProcessing to task "
<< kv.first << " because it is not connected to us";
}
}
}
void InstanceServer::ClearCache() {
stateful_gateway_->Clear();
}
} // namespace stmgr
} // namespace heron