blob: bcd9687d9a41b652c623c87a0d14fa1f268fd72c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "metrics/metricsmgr-client.h"
#include <stdio.h>
#include <iostream>
#include <string>
#include "proto/messages.h"
#include "basics/basics.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
namespace heron {
namespace common {
using std::shared_ptr;
MetricsMgrClient::MetricsMgrClient(const sp_string& _hostname, sp_int32 _port,
const sp_string& _component_name, const sp_string& _instance_id,
int _instance_index,
shared_ptr<EventLoop> eventLoop, const NetworkOptions& _options)
: Client(eventLoop, _options),
hostname_(_hostname),
port_(_port),
component_name_(_component_name),
instance_id_(_instance_id),
instance_index_(_instance_index),
tmaster_location_(NULL),
metricscache_location_(NULL),
registered_(false) {
InstallResponseHandler(make_unique<proto::system::MetricPublisherRegisterRequest>(),
&MetricsMgrClient::HandleRegisterResponse);
Start();
}
MetricsMgrClient::~MetricsMgrClient() { delete tmaster_location_; delete metricscache_location_; }
void MetricsMgrClient::HandleConnect(NetworkErrorCode _status) {
if (_status == OK) {
LOG(INFO) << "Connected to metrics manager" << std::endl;
SendRegisterRequest();
} else {
LOG(ERROR) << "Could not connect to metrics mgr. Will Retry in 1 second\n";
AddTimer([this]() { this->ReConnect(); }, 1000000);
}
}
void MetricsMgrClient::HandleClose(NetworkErrorCode) {
LOG(ERROR) << "Metrics Mgr closed connection on us. Will Retry in 1 second\n";
AddTimer([this]() { this->ReConnect(); }, 1000000);
registered_ = false;
}
void MetricsMgrClient::ReConnect() { Start(); }
void MetricsMgrClient::SendRegisterRequest() {
auto request = make_unique<proto::system::MetricPublisherRegisterRequest>();
proto::system::MetricPublisher* publisher = request->mutable_publisher();
publisher->set_hostname(hostname_);
publisher->set_port(port_);
publisher->set_component_name(component_name_);
publisher->set_instance_id(instance_id_);
publisher->set_instance_index(instance_index_);
SendRequest(std::move(request), nullptr);
}
void MetricsMgrClient::HandleRegisterResponse(
void*, pool_unique_ptr<proto::system::MetricPublisherRegisterResponse> _response,
NetworkErrorCode _status) {
if (_status == OK && _response->status().status() != proto::system::OK) {
// What the heck we explicitly got a non ok response
LOG(ERROR) << "Recieved a non-ok status from metrics mgr" << std::endl;
::exit(1);
} else {
LOG(INFO) << "Successfully registered ourselves to the metricsmgr";
registered_ = true;
}
// Check if we need to send tmaster location
if (tmaster_location_) {
LOG(INFO) << "Sending TMaster Location to metricsmgr";
InternalSendTMasterLocation();
} else {
LOG(INFO) << "Do not have a TMasterLocation yet";
}
// Check if we need to send metricscache location
if (metricscache_location_) {
LOG(INFO) << "Sending MetricsCache Location to metricsmgr";
InternalSendMetricsCacheLocation();
} else {
LOG(INFO) << "Do not have a MetricsCacheLocation yet";
}
}
void MetricsMgrClient::SendTMasterLocation(const proto::tmaster::TMasterLocation& location) {
if (tmaster_location_) {
delete tmaster_location_;
}
tmaster_location_ = new proto::tmaster::TMasterLocation(location);
if (registered_) {
LOG(INFO) << "Sending TMaster Location to metricsmgr";
InternalSendTMasterLocation();
} else {
LOG(INFO) << "We have not yet registered to metricsmgr."
<< " Holding off sending TMasterLocation";
}
}
void MetricsMgrClient::SendMetricsCacheLocation(
const proto::tmaster::MetricsCacheLocation& location) {
if (metricscache_location_) {
delete metricscache_location_;
}
metricscache_location_ = new proto::tmaster::MetricsCacheLocation(location);
if (registered_) {
LOG(INFO) << "Sending MetricsCache Location to metricsmgr";
InternalSendMetricsCacheLocation();
} else {
LOG(INFO) << "We have not yet registered to metricsmgr."
<< " Holding off sending MetricsCacheLocation";
}
}
void MetricsMgrClient::SendMetrics(proto::system::MetricPublisherPublishMessage* _message) {
SendMessage(*_message);
delete _message;
}
void MetricsMgrClient::InternalSendTMasterLocation() {
CHECK(tmaster_location_);
proto::system::TMasterLocationRefreshMessage* m =
new proto::system::TMasterLocationRefreshMessage();
m->mutable_tmaster()->CopyFrom(*tmaster_location_);
SendMessage(*m);
delete m;
}
void MetricsMgrClient::InternalSendMetricsCacheLocation() {
CHECK(metricscache_location_);
proto::system::MetricsCacheLocationRefreshMessage* m =
new proto::system::MetricsCacheLocationRefreshMessage();
m->mutable_metricscache()->CopyFrom(*metricscache_location_);
SendMessage(*m);
delete m;
}
} // namespace common
} // namespace heron