blob: 4b2086d47b2e42dee92652ab1a04935b30faa06c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "tmaster/src/cpp/manager/stats-interface.h"
#include <iostream>
#include <string>
#include "manager/tmaster.h"
#include "manager/tmetrics-collector.h"
#include "metrics/tmaster-metrics.h"
#include "basics/basics.h"
#include "errors/errors.h"
#include "threads/threads.h"
#include "network/network.h"
#include "proto/tmaster.pb.h"
namespace heron {
namespace tmaster {
StatsInterface::StatsInterface(EventLoop* eventLoop, const NetworkOptions& _options,
TMetricsCollector* _collector, TMaster* _tmaster)
: metrics_collector_(_collector), tmaster_(_tmaster) {
http_server_ = new HTTPServer(eventLoop, _options);
// Install the handlers
auto cbHandleStats = [this](IncomingHTTPRequest* request) { this->HandleStatsRequest(request); };
auto cbHandleException = [this](IncomingHTTPRequest* request) {
this->HandleExceptionRequest(request);
};
auto cbHandleExceptionSummary = [this](IncomingHTTPRequest* request) {
this->HandleExceptionSummaryRequest(request);
};
auto cbHandleStmgrsRegistrationSummary = [this](IncomingHTTPRequest* request) {
this->HandleStmgrsRegistrationSummaryRequest(request);
};
auto cbHandleUnknown = [this](IncomingHTTPRequest* request) {
this->HandleUnknownRequest(request);
};
http_server_->InstallCallBack("/stats", std::move(cbHandleStats));
http_server_->InstallCallBack("/exceptions", std::move(cbHandleException));
http_server_->InstallCallBack("/exceptionsummary", std::move(cbHandleExceptionSummary));
http_server_->InstallCallBack("/stmgrsregistrationsummary",
std::move(cbHandleStmgrsRegistrationSummary));
http_server_->InstallGenericCallBack(std::move(cbHandleUnknown));
CHECK(http_server_->Start() == SP_OK);
}
StatsInterface::~StatsInterface() { delete http_server_; }
void StatsInterface::HandleStatsRequest(IncomingHTTPRequest* _request) {
LOG(INFO) << "Got a stats request " << _request->GetQuery();
// get the entire stuff
unsigned char* pb = _request->ExtractFromPostData(0, _request->GetPayloadSize());
proto::tmaster::MetricRequest req;
if (!req.ParseFromArray(pb, _request->GetPayloadSize())) {
LOG(ERROR) << "Unable to deserialize post data specified in StatsRequest";
http_server_->SendErrorReply(_request, 400);
delete _request;
return;
}
proto::tmaster::MetricResponse* res =
metrics_collector_->GetMetrics(req, tmaster_->getInitialTopology());
sp_string response_string;
CHECK(res->SerializeToString(&response_string));
OutgoingHTTPResponse* response = new OutgoingHTTPResponse(_request);
response->AddHeader("Content-Type", "application/octet-stream");
response->AddHeader("Content-Length", std::to_string(response_string.size()));
response->AddResponse(response_string);
http_server_->SendReply(_request, 200, response);
delete res;
delete _request;
LOG(INFO) << "Done with stats request ";
}
void StatsInterface::HandleExceptionRequest(IncomingHTTPRequest* _request) {
LOG(INFO) << "Request for exceptions" << _request->GetQuery();
// Get the Exception request proto.
unsigned char* request_data = _request->ExtractFromPostData(0, _request->GetPayloadSize());
heron::proto::tmaster::ExceptionLogRequest exception_request;
if (!exception_request.ParseFromArray(request_data, _request->GetPayloadSize())) {
LOG(ERROR) << "Unable to deserialize post data specified in ExceptionRequest" << std::endl;
http_server_->SendErrorReply(_request, 400);
delete _request;
return;
}
heron::proto::tmaster::ExceptionLogResponse* exception_response =
metrics_collector_->GetExceptions(exception_request);
sp_string response_string;
CHECK(exception_response->SerializeToString(&response_string));
OutgoingHTTPResponse* http_response = new OutgoingHTTPResponse(_request);
http_response->AddHeader("Content-Type", "application/octet-stream");
http_response->AddHeader("Content-Length", std::to_string(response_string.size()));
http_response->AddResponse(response_string);
http_server_->SendReply(_request, 200, http_response);
delete exception_response;
delete _request;
LOG(INFO) << "Returned exceptions response";
}
void StatsInterface::HandleExceptionSummaryRequest(IncomingHTTPRequest* _request) {
LOG(INFO) << "Request for exception summary " << _request->GetQuery();
unsigned char* request_data = _request->ExtractFromPostData(0, _request->GetPayloadSize());
heron::proto::tmaster::ExceptionLogRequest exception_request;
if (!exception_request.ParseFromArray(request_data, _request->GetPayloadSize())) {
LOG(ERROR) << "Unable to deserialize post data specified in ExceptionRequest" << std::endl;
http_server_->SendErrorReply(_request, 400);
delete _request;
return;
}
heron::proto::tmaster::ExceptionLogResponse* exception_response =
metrics_collector_->GetExceptionsSummary(exception_request);
sp_string response_string;
CHECK(exception_response->SerializeToString(&response_string));
OutgoingHTTPResponse* http_response = new OutgoingHTTPResponse(_request);
http_response->AddHeader("Content-Type", "application/octet-stream");
http_response->AddHeader("Content-Length", std::to_string(response_string.size()));
http_response->AddResponse(response_string);
http_server_->SendReply(_request, 200, http_response);
delete exception_response;
delete _request;
LOG(INFO) << "Returned exceptions response";
}
void StatsInterface::HandleStmgrsRegistrationSummaryRequest(IncomingHTTPRequest* _request) {
LOG(INFO) << "Request for stream managers registration summary " << _request->GetQuery();
unsigned char* request_data =
_request->ExtractFromPostData(0, _request->GetPayloadSize());
heron::proto::tmaster::StmgrsRegistrationSummaryRequest stmgrs_reg_request;
if (!stmgrs_reg_request.ParseFromArray(request_data, _request->GetPayloadSize())) {
LOG(ERROR) << "Unable to deserialize post data specified in" <<
"StmgrsRegistrationSummaryRequest" << std::endl;
http_server_->SendErrorReply(_request, 400);
delete _request;
return;
}
auto stmgrs_reg_summary_response = tmaster_->GetStmgrsRegSummary();
sp_string response_string;
CHECK(stmgrs_reg_summary_response->SerializeToString(&response_string));
auto http_response = new OutgoingHTTPResponse(_request);
http_response->AddHeader("Content-Type", "application/octet-stream");
http_response->AddHeader("Content-Length", std::to_string(response_string.size()));
http_response->AddResponse(response_string);
http_server_->SendReply(_request, 200, http_response);
delete _request;
LOG(INFO) << "Returned stream managers registration summary response";
}
void StatsInterface::HandleUnknownRequest(IncomingHTTPRequest* _request) {
LOG(WARNING) << "Got an unknown request " << _request->GetQuery();
http_server_->SendErrorReply(_request, 400);
delete _request;
}
} // namespace tmaster
} // namespace heron