blob: c3786b131e7d4dbf3406f34dca6bfc6ff75d6076 [file] [log] [blame]
/*
* Copyright 2015 Twitter, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "manager/stmgr-client.h"
#include <stdio.h>
#include <iostream>
#include <string>
#include "manager/stmgr-clientmgr.h"
#include "proto/messages.h"
#include "basics/basics.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
#include "config/heron-internals-config-reader.h"
#include "metrics/metrics.h"
namespace heron {
namespace stmgr {
// Num data tuples sent to other stream managers
const sp_string METRIC_DATA_TUPLES_TO_STMGRS = "__tuples_to_stmgrs";
// Num ack tuples sent to other stream managers
const sp_string METRIC_ACK_TUPLES_TO_STMGRS = "__ack_tuples_to_stmgrs";
// Num fail tuples sent to other stream managers
const sp_string METRIC_FAIL_TUPLES_TO_STMGRS = "__fail_tuples_to_stmgrs";
// Num data tuples lost while sending to other stmgrs
const sp_string METRIC_DATA_TUPLES_TO_STMGRS_LOST = "__tuples_to_stmgrs_lost";
// Num ack tuples lost while sending to other stream managers
const sp_string METRIC_ACK_TUPLES_TO_STMGRS_LOST = "__ack_tuples_to_stmgrs_lost";
// Num fail tuples lost while sending to other stream managers
const sp_string METRIC_FAIL_TUPLES_TO_STMGRS_LOST = "__fail_tuples_to_stmgrs_lost";
// Bytes sent to other stream managers
const sp_string METRIC_BYTES_TO_STMGRS = "__bytes_to_stmgrs";
// Bytes lost to other stream managers
const sp_string METRIC_BYTES_TO_STMGRS_LOST = "__bytes_to_stmgrs_lost";
// Number of times we send hello messages to stmgrs
const sp_string METRIC_HELLO_MESSAGES_TO_STMGRS = "__hello_messages_to_stmgrs";
StMgrClient::StMgrClient(EventLoop* eventLoop, const NetworkOptions& _options,
const sp_string& _topology_name, const sp_string& _topology_id,
const sp_string& _our_id, const sp_string& _other_id,
StMgrClientMgr* _client_manager,
heron::common::MetricsMgrSt* _metrics_manager_client)
: Client(eventLoop, _options),
topology_name_(_topology_name),
topology_id_(_topology_id),
our_stmgr_id_(_our_id),
other_stmgr_id_(_other_id),
quit_(false),
client_manager_(_client_manager),
metrics_manager_client_(_metrics_manager_client),
ndropped_messages_(0),
reconnect_attempts_(0),
is_registered_(false) {
reconnect_other_streammgrs_interval_sec_ =
config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrClientReconnectIntervalSec();
reconnect_other_streammgrs_max_attempt_ =
config::HeronInternalsConfigReader::Instance()->GetHeronStreammgrClientReconnectMaxAttempts();
InstallResponseHandler(new proto::stmgr::StrMgrHelloRequest(), &StMgrClient::HandleHelloResponse);
InstallMessageHandler(&StMgrClient::HandleTupleStreamMessage);
stmgr_client_metrics_ = new heron::common::MultiCountMetric();
metrics_manager_client_->register_metric("__client_" + other_stmgr_id_, stmgr_client_metrics_);
}
StMgrClient::~StMgrClient() {
Stop();
metrics_manager_client_->unregister_metric("__client_" + other_stmgr_id_);
delete stmgr_client_metrics_;
}
void StMgrClient::Quit() {
quit_ = true;
Stop();
}
void StMgrClient::HandleConnect(NetworkErrorCode _status) {
if (_status == OK) {
LOG(INFO) << "Connected to stmgr " << other_stmgr_id_ << " running at "
<< get_clientoptions().get_host() << ":" << get_clientoptions().get_port()
<< std::endl;
// reset the reconnect attempt once connection established
reconnect_attempts_ = 0;
if (quit_) {
Stop();
} else {
SendHelloRequest();
}
} else {
LOG(WARNING) << "Could not connect to stmgr " << other_stmgr_id_ << " running at "
<< get_clientoptions().get_host() << ":" << get_clientoptions().get_port()
<< " due to: " << _status << std::endl;
if (quit_) {
LOG(ERROR) << "Instructed to quit. Quitting...";
delete this;
return;
} else {
LOG(INFO) << "Retrying again..." << std::endl;
AddTimer([this]() { this->OnReConnectTimer(); },
reconnect_other_streammgrs_interval_sec_ * 1000 * 1000);
}
}
}
void StMgrClient::HandleClose(NetworkErrorCode _code) {
if (_code == OK) {
LOG(INFO) << "We closed our server connection with stmgr " << other_stmgr_id_ << " running at "
<< get_clientoptions().get_host() << ":" << get_clientoptions().get_port()
<< std::endl;
} else {
LOG(INFO) << "Stmgr " << other_stmgr_id_ << " running at " << get_clientoptions().get_host()
<< ":" << get_clientoptions().get_port() << " closed connection with code " << _code
<< std::endl;
}
if (quit_) {
delete this;
} else {
LOG(INFO) << "Will try to reconnect again after 1 seconds" << std::endl;
AddTimer([this]() { this->OnReConnectTimer(); },
reconnect_other_streammgrs_interval_sec_ * 1000 * 1000);
}
}
void StMgrClient::HandleHelloResponse(void*, proto::stmgr::StrMgrHelloResponse* _response,
NetworkErrorCode _status) {
if (_status != OK) {
LOG(ERROR) << "NonOK network code " << _status << " for register response from stmgr "
<< other_stmgr_id_ << " running at " << get_clientoptions().get_host() << ":"
<< get_clientoptions().get_port();
__global_protobuf_pool_release__(_response);
Stop();
return;
}
proto::system::StatusCode status = _response->status().status();
if (status != proto::system::OK) {
LOG(ERROR) << "NonOK register response " << status << " from stmgr " << other_stmgr_id_
<< " running at " << get_clientoptions().get_host() << ":"
<< get_clientoptions().get_port();
__global_protobuf_pool_release__(_response);
Stop();
return;
}
__global_protobuf_pool_release__(_response);
if (client_manager_->DidAnnounceBackPressure()) {
SendStartBackPressureMessage();
}
}
void StMgrClient::OnReConnectTimer() {
reconnect_attempts_ += 1;
if (reconnect_attempts_ < reconnect_other_streammgrs_max_attempt_) {
Start();
} else {
LOG(FATAL) << "Could not connect to stmgr " << other_stmgr_id_
<< " after reaching the max reconnect attempts. Quitting...";
}
}
void StMgrClient::SendHelloRequest() {
auto request = new proto::stmgr::StrMgrHelloRequest();
request->set_topology_name(topology_name_);
request->set_topology_id(topology_id_);
request->set_stmgr(our_stmgr_id_);
SendRequest(request, NULL);
stmgr_client_metrics_->scope(METRIC_HELLO_MESSAGES_TO_STMGRS)->incr_by(1);
return;
}
void StMgrClient::SendTupleStreamMessage(proto::stmgr::TupleStreamMessage2& _msg) {
if (IsConnected()) {
SendMessage(_msg);
} else {
if (++ndropped_messages_ % 100 == 0) {
LOG(INFO) << "Dropping " << ndropped_messages_ << "th tuple message to stmgr "
<< other_stmgr_id_ << " because it is not connected";
}
}
}
void StMgrClient::HandleTupleStreamMessage(proto::stmgr::TupleStreamMessage2* _message) {
__global_protobuf_pool_release__(_message);
LOG(FATAL) << "We should not receive tuple messages in the client" << std::endl;
}
void StMgrClient::StartBackPressureConnectionCb(Connection* _connection) {
_connection->setCausedBackPressure();
// Ask the StMgrServer to stop consuming. The client does
// not consume anything
client_manager_->StartBackPressureOnServer(other_stmgr_id_);
}
void StMgrClient::StopBackPressureConnectionCb(Connection* _connection) {
_connection->unsetCausedBackPressure();
// Call the StMgrServers removeBackPressure method
client_manager_->StopBackPressureOnServer(other_stmgr_id_);
}
void StMgrClient::SendStartBackPressureMessage() {
REQID_Generator generator;
REQID rand = generator.generate();
// generator.generate(rand);
proto::stmgr::StartBackPressureMessage* message = nullptr;
message = __global_protobuf_pool_acquire__(message);
message->set_topology_name(topology_name_);
message->set_topology_id(topology_id_);
message->set_stmgr(our_stmgr_id_);
message->set_message_id(rand.str());
SendMessage(*message);
__global_protobuf_pool_release__(message);
}
void StMgrClient::SendStopBackPressureMessage() {
REQID_Generator generator;
REQID rand = generator.generate();
// generator.generate(rand);
proto::stmgr::StopBackPressureMessage* message = nullptr;
message = __global_protobuf_pool_acquire__(message);
message->set_topology_name(topology_name_);
message->set_topology_id(topology_id_);
message->set_stmgr(our_stmgr_id_);
message->set_message_id(rand.str());
SendMessage(*message);
__global_protobuf_pool_release__(message);
}
} // namespace stmgr
} // namespace heron