blob: 5fe7adc9ba8a4e7c4c595ebc72e7a1d6ea0ffd8d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <stdio.h>
#include <iostream>
#include <string>
#include "gateway/stmgr-client.h"
#include "proto/messages.h"
#include "basics/basics.h"
#include "threads/threads.h"
#include "network/network.h"
#include "config/heron-internals-config-reader.h"
namespace heron {
namespace instance {
StMgrClient::StMgrClient(
std::shared_ptr<EventLoop> eventLoop, const NetworkOptions& options,
const std::string& topologyName, const std::string& topologyId,
const proto::system::Instance& instanceProto,
std::shared_ptr<GatewayMetrics> gatewayMetrics,
std::function<void(pool_unique_ptr<proto::system::PhysicalPlan>)> pplanWatcher,
std::function<void(pool_unique_ptr<proto::system::HeronTupleSet2>)> tupleWatcher)
: Client(eventLoop, options),
topologyName_(topologyName),
topologyId_(topologyId),
instanceProto_(instanceProto),
gatewayMetrics_(gatewayMetrics),
pplanWatcher_(std::move(pplanWatcher)),
tupleWatcher_(std::move(tupleWatcher)),
ndropped_messages_(0) {
reconnect_interval_ = config::HeronInternalsConfigReader::Instance()
->GetHeronInstanceReconnectStreammgrIntervalSec();
max_reconnect_times_ = config::HeronInternalsConfigReader::Instance()
->GetHeronInstanceReconnectStreammgrTimes();
reconnect_attempts_ = 0;
InstallResponseHandler(make_unique<proto::stmgr::RegisterInstanceRequest>(),
&StMgrClient::HandleRegisterResponse);
InstallMessageHandler(&StMgrClient::HandlePhysicalPlan);
InstallMessageHandler(&StMgrClient::HandleTupleMessage);
}
StMgrClient::~StMgrClient() {
Stop();
}
void StMgrClient::HandleConnect(NetworkErrorCode status) {
if (status == OK) {
LOG(INFO) << "Connected to stmgr " << instanceProto_.stmgr_id() << " running at "
<< get_clientoptions().get_host() << ":" << get_clientoptions().get_port()
<< std::endl;
reconnect_attempts_ = 0;
SendRegisterRequest();
} else {
LOG(WARNING) << "Could not connect to stmgr " << instanceProto_.stmgr_id() << " running at "
<< get_clientoptions().get_host() << ":" << get_clientoptions().get_port()
<< " due to: " << status << std::endl;
++reconnect_attempts_;
if (reconnect_attempts_ > max_reconnect_times_) {
LOG(FATAL) << "Could not connect to stmgr " << instanceProto_.stmgr_id() << " running at "
<< get_clientoptions().get_host() << ":" << get_clientoptions().get_port()
<< " after repeated attempts. Dying...";
}
LOG(INFO) << "Retrying again..." << std::endl;
AddTimer([this]() { this->OnReconnectTimer(); }, reconnect_interval_ * 1000 * 1000);
}
}
void StMgrClient::HandleClose(NetworkErrorCode code) {
if (code == OK) {
LOG(INFO) << "We closed our server connection with stmgr " << instanceProto_.stmgr_id()
<< " running at " << get_clientoptions().get_host()
<< ":" << get_clientoptions().get_port();
} else {
LOG(INFO) << "Stmgr " << instanceProto_.stmgr_id() << " running at "
<< get_clientoptions().get_host()
<< ":" << get_clientoptions().get_port() << " closed connection with code " << code;
}
LOG(INFO) << "Will try to reconnect again" << std::endl;
AddTimer([this]() { this->OnReconnectTimer(); }, reconnect_interval_ * 1000 * 1000);
}
void StMgrClient::HandleRegisterResponse(
void*,
pool_unique_ptr<proto::stmgr::RegisterInstanceResponse> response,
NetworkErrorCode status) {
if (status != OK) {
LOG(ERROR) << "NonOK network code " << status << " for register response from stmgr "
<< instanceProto_.stmgr_id() << " running at " << get_clientoptions().get_host()
<< ":" << get_clientoptions().get_port();
Stop();
return;
}
proto::system::StatusCode stat = response->status().status();
if (stat != proto::system::OK) {
LOG(ERROR) << "NonOK register response " << stat << " from stmgr "
<< instanceProto_.stmgr_id() << " running at "
<< get_clientoptions().get_host() << ":" << get_clientoptions().get_port();
Stop();
return;
}
LOG(INFO) << "Registered with our stmgr " << instanceProto_.stmgr_id() << " running at "
<< get_clientoptions().get_host() << ":" << get_clientoptions().get_port();
if (response->has_pplan()) {
LOG(INFO) << "Registration response had a pplan";
using std::move;
pplanWatcher_(move(pool_unique_ptr<proto::system::PhysicalPlan>(response->release_pplan())));
}
}
void StMgrClient::OnReconnectTimer() { Start(); }
void StMgrClient::SendRegisterRequest() {
auto request = make_unique<proto::stmgr::RegisterInstanceRequest>();
request->set_topology_name(topologyName_);
request->set_topology_id(topologyId_);
request->mutable_instance()->CopyFrom(instanceProto_);
SendRequest(std::move(request), nullptr);
return;
}
void StMgrClient::HandlePhysicalPlan(
pool_unique_ptr<proto::stmgr::NewInstanceAssignmentMessage> msg) {
LOG(INFO) << "Got a Physical Plan from our stmgr " << instanceProto_.stmgr_id() << " running at "
<< get_clientoptions().get_host() << ":" << get_clientoptions().get_port();
pplanWatcher_(std::move(pool_unique_ptr<proto::system::PhysicalPlan>(msg->release_pplan())));
}
void StMgrClient::HandleTupleMessage(pool_unique_ptr<proto::system::HeronTupleSet2> msg) {
gatewayMetrics_->updateReceivedPacketsCount(1);
gatewayMetrics_->updateReceivedPacketsSize(msg->ByteSize());
tupleWatcher_(std::move(msg));
}
void StMgrClient::SendTupleMessage(const proto::system::HeronTupleSet& msg) {
if (IsConnected()) {
gatewayMetrics_->updateSentPacketsCount(1);
gatewayMetrics_->updateSentPacketsSize(msg.ByteSize());
SendMessage(msg);
} else {
gatewayMetrics_->updateDroppedPacketsCount(1);
gatewayMetrics_->updateDroppedPacketsSize(msg.ByteSize());
if (++ndropped_messages_ % 100 == 0) {
LOG(INFO) << "Dropping " << ndropped_messages_ << "th tuple message to stmgr "
<< instanceProto_.stmgr_id() << " because it is not connected";
}
}
}
void StMgrClient::putBackPressure() {
auto conn = static_cast<Connection*>(conn_);
if (!conn->isUnderBackPressure()) {
LOG(INFO) << "Buffer to Slave Thread at maximum capacity. Clamping down on reads from Stmgr";
conn->putBackPressure();
}
}
void StMgrClient::removeBackPressure() {
auto conn = static_cast<Connection*>(conn_);
if (conn->isUnderBackPressure()) {
LOG(INFO) << "Buffer to Slave Thread less than capacity. Resuming reads from stmgr";
conn->removeBackPressure();
}
}
} // namespace instance
} // namespace heron