blob: f2d0abd758fe13d5e8a0a79c083e4b2b1038b751 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "tmanager/src/cpp/manager/stats-interface.h"
#include <iostream>
#include <string>
#include "manager/tmanager.h"
#include "manager/tmetrics-collector.h"
#include "metrics/tmanager-metrics.h"
#include "basics/basics.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
#include "proto/tmanager.pb.h"
namespace heron {
namespace tmanager {
StatsInterface::StatsInterface(std::shared_ptr<EventLoop> eventLoop, const NetworkOptions& _options,
shared_ptr<TMetricsCollector> _collector, TManager* _tmanager)
: metrics_collector_(_collector), tmanager_(_tmanager) {
http_server_ = make_unique<HTTPServer>(eventLoop, _options);
// Install the handlers
auto cbHandleStats = [this](IncomingHTTPRequest* request) { this->HandleStatsRequest(request); };
auto cbHandleException = [this](IncomingHTTPRequest* request) {
this->HandleExceptionRequest(request);
};
auto cbHandleExceptionSummary = [this](IncomingHTTPRequest* request) {
this->HandleExceptionSummaryRequest(request);
};
auto cbHandleStmgrsRegistrationSummary = [this](IncomingHTTPRequest* request) {
this->HandleStmgrsRegistrationSummaryRequest(request);
};
auto cbHandleUnknown = [this](IncomingHTTPRequest* request) {
this->HandleUnknownRequest(request);
};
http_server_->InstallCallBack("/stats", std::move(cbHandleStats));
http_server_->InstallCallBack("/exceptions", std::move(cbHandleException));
http_server_->InstallCallBack("/exceptionsummary", std::move(cbHandleExceptionSummary));
http_server_->InstallCallBack("/stmgrsregistrationsummary",
std::move(cbHandleStmgrsRegistrationSummary));
http_server_->InstallGenericCallBack(std::move(cbHandleUnknown));
CHECK(http_server_->Start() == SP_OK);
}
StatsInterface::~StatsInterface() {}
void StatsInterface::HandleStatsRequest(IncomingHTTPRequest* _request) {
LOG(INFO) << "Got a stats request " << _request->GetQuery();
// get the entire stuff
unsigned char* pb = _request->ExtractFromPostData(0, _request->GetPayloadSize());
proto::tmanager::MetricRequest req;
if (!req.ParseFromArray(pb, _request->GetPayloadSize())) {
LOG(ERROR) << "Unable to deserialize post data specified in StatsRequest";
http_server_->SendErrorReply(_request, 400);
delete _request;
return;
}
auto res = metrics_collector_->GetMetrics(req, tmanager_->getInitialTopology());
sp_string response_string;
CHECK(res->SerializeToString(&response_string));
auto response = make_unique<OutgoingHTTPResponse>(_request);
response->AddHeader("Content-Type", "application/octet-stream");
response->AddHeader("Content-Length", std::to_string(response_string.size()));
response->AddResponse(response_string);
http_server_->SendReply(_request, 200, std::move(response));
delete _request;
LOG(INFO) << "Done with stats request ";
}
void StatsInterface::HandleExceptionRequest(IncomingHTTPRequest* _request) {
LOG(INFO) << "Request for exceptions" << _request->GetQuery();
// Get the Exception request proto.
unsigned char* request_data = _request->ExtractFromPostData(0, _request->GetPayloadSize());
heron::proto::tmanager::ExceptionLogRequest exception_request;
if (!exception_request.ParseFromArray(request_data, _request->GetPayloadSize())) {
LOG(ERROR) << "Unable to deserialize post data specified in ExceptionRequest" << std::endl;
http_server_->SendErrorReply(_request, 400);
delete _request;
return;
}
unique_ptr<heron::proto::tmanager::ExceptionLogResponse> exception_response =
metrics_collector_->GetExceptions(exception_request);
sp_string response_string;
CHECK(exception_response->SerializeToString(&response_string));
auto http_response = make_unique<OutgoingHTTPResponse>(_request);
http_response->AddHeader("Content-Type", "application/octet-stream");
http_response->AddHeader("Content-Length", std::to_string(response_string.size()));
http_response->AddResponse(response_string);
http_server_->SendReply(_request, 200, std::move(http_response));
delete _request;
LOG(INFO) << "Returned exceptions response";
}
void StatsInterface::HandleExceptionSummaryRequest(IncomingHTTPRequest* _request) {
LOG(INFO) << "Request for exception summary " << _request->GetQuery();
unsigned char* request_data = _request->ExtractFromPostData(0, _request->GetPayloadSize());
heron::proto::tmanager::ExceptionLogRequest exception_request;
if (!exception_request.ParseFromArray(request_data, _request->GetPayloadSize())) {
LOG(ERROR) << "Unable to deserialize post data specified in ExceptionRequest" << std::endl;
http_server_->SendErrorReply(_request, 400);
delete _request;
return;
}
auto exception_response = metrics_collector_->GetExceptionsSummary(exception_request);
sp_string response_string;
CHECK(exception_response->SerializeToString(&response_string));
auto http_response = make_unique<OutgoingHTTPResponse>(_request);
http_response->AddHeader("Content-Type", "application/octet-stream");
http_response->AddHeader("Content-Length", std::to_string(response_string.size()));
http_response->AddResponse(response_string);
http_server_->SendReply(_request, 200, std::move(http_response));
delete _request;
LOG(INFO) << "Returned exceptions response";
}
void StatsInterface::HandleStmgrsRegistrationSummaryRequest(IncomingHTTPRequest* _request) {
LOG(INFO) << "Request for stream managers registration summary " << _request->GetQuery();
unsigned char* request_data =
_request->ExtractFromPostData(0, _request->GetPayloadSize());
heron::proto::tmanager::StmgrsRegistrationSummaryRequest stmgrs_reg_request;
if (!stmgrs_reg_request.ParseFromArray(request_data, _request->GetPayloadSize())) {
LOG(ERROR) << "Unable to deserialize post data specified in" <<
"StmgrsRegistrationSummaryRequest" << std::endl;
http_server_->SendErrorReply(_request, 400);
delete _request;
return;
}
auto stmgrs_reg_summary_response = tmanager_->GetStmgrsRegSummary();
sp_string response_string;
CHECK(stmgrs_reg_summary_response->SerializeToString(&response_string));
auto http_response = make_unique<OutgoingHTTPResponse>(_request);
http_response->AddHeader("Content-Type", "application/octet-stream");
http_response->AddHeader("Content-Length", std::to_string(response_string.size()));
http_response->AddResponse(response_string);
http_server_->SendReply(_request, 200, std::move(http_response));
delete _request;
LOG(INFO) << "Returned stream managers registration summary response";
}
void StatsInterface::HandleUnknownRequest(IncomingHTTPRequest* _request) {
LOG(WARNING) << "Got an unknown request " << _request->GetQuery();
http_server_->SendErrorReply(_request, 400);
delete _request;
}
} // namespace tmanager
} // namespace heron