blob: 73fba429124ee9273541cf73c9f6ed890a17843c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "manager/tcontroller.h"
#include <iostream>
#include <map>
#include <string>
#include <utility>
#include <vector>
#include "basics/basics.h"
#include "basics/strutils.h"
#include "cereal/external/base64.hpp"
#include "config/topology-config-helper.h"
#include "errors/errors.h"
#include "manager/tmanager.h"
#include "network/network.h"
#include "proto/messages.h"
#include "threads/threads.h"
namespace heron {
namespace tmanager {
/*
* HTTP service controller.
*/
TController::TController(std::shared_ptr<EventLoop> eventLoop, const NetworkOptions& options,
TManager* tmanager)
: tmanager_(tmanager) {
http_server_ = make_unique<HTTPServer>(eventLoop, options);
/*
* Install the handlers
*/
// Activate and deactivate
auto cbActivate = [this](IncomingHTTPRequest* request) { this->HandleActivateRequest(request); };
http_server_->InstallCallBack("/activate", std::move(cbActivate));
auto cbDeActivate = [this](IncomingHTTPRequest* request) {
this->HandleDeActivateRequest(request);
};
http_server_->InstallCallBack("/deactivate", std::move(cbDeActivate));
// Clear checkpoint
auto cbCleanState = [this](IncomingHTTPRequest* request) {
this->HandleCleanStatefulCheckpointRequest(request);
};
http_server_->InstallCallBack("/clean_all_stateful_checkpoints", std::move(cbCleanState));
// Runtime config
auto cbUpdateRuntimeConfg = [this](IncomingHTTPRequest* request) {
this->HandleUpdateRuntimeConfigRequest(request);
};
http_server_->InstallCallBack("/runtime_config/update", std::move(cbUpdateRuntimeConfg));
// Get current physical plan
auto cbGetCurPPlan = [this](IncomingHTTPRequest* request) {
this->HandleGetCurPPlanRequest(request);
};
http_server_->InstallCallBack("/get_current_physical_plan", std::move(cbGetCurPPlan));
}
TController::~TController() {}
sp_int32 TController::Start() { return http_server_->Start(); }
void TController::HandleActivateRequest(IncomingHTTPRequest* request) {
LOG(INFO) << "Got a activate topology request from " << request->GetRemoteHost() << ":"
<< request->GetRemotePort();
// Validation
ValidationResult result;
if (!ValidateTopology(request, result)) {
http_server_->SendErrorReply(request, result.GetCode(), result.GetMessage());
delete request;
return;
}
if (tmanager_->GetTopologyState() != proto::api::PAUSED) {
LOG(ERROR) << "Topology not in paused state";
http_server_->SendErrorReply(request, 400);
delete request;
return;
}
auto cb = [request, this](proto::system::StatusCode status) {
this->HandleActivateRequestDone(request, status);
};
tmanager_->ActivateTopology(std::move(cb));
}
void TController::HandleActivateRequestDone(IncomingHTTPRequest* request,
proto::system::StatusCode _status) {
if (_status != proto::system::OK) {
LOG(ERROR) << "Unable to Activate topology " << _status;
http_server_->SendErrorReply(request, 500);
} else {
std::string s = "Topology successfully activated";
LOG(INFO) << s;
auto response = make_unique<OutgoingHTTPResponse>(request);
response->AddResponse(s);
http_server_->SendReply(request, 200, std::move(response));
}
delete request;
}
void TController::HandleDeActivateRequest(IncomingHTTPRequest* request) {
LOG(INFO) << "Got a deactivate topology request from " << request->GetRemoteHost() << ":"
<< request->GetRemotePort();
ValidationResult result;
if (!ValidateTopology(request, result)) {
http_server_->SendErrorReply(request, result.GetCode(), result.GetMessage());
delete request;
return;
}
if (tmanager_->GetTopologyState() != proto::api::RUNNING) {
LOG(ERROR) << "Topology not in running state";
http_server_->SendErrorReply(request, 400);
delete request;
return;
}
auto cb = [request, this](proto::system::StatusCode status) {
this->HandleDeActivateRequestDone(request, status);
};
tmanager_->DeActivateTopology(std::move(cb));
}
void TController::HandleDeActivateRequestDone(IncomingHTTPRequest* request,
proto::system::StatusCode _status) {
if (_status != proto::system::OK) {
LOG(ERROR) << "Unable to DeActivate topology " << _status;
http_server_->SendErrorReply(request, 500);
} else {
std::string s = "Topology successfully deactivated";
LOG(INFO) << s;
auto response = make_unique<OutgoingHTTPResponse>(request);
response->AddResponse(s);
http_server_->SendReply(request, 200, std::move(response));
}
delete request;
}
void TController::HandleCleanStatefulCheckpointRequest(IncomingHTTPRequest* request) {
LOG(INFO) << "Got a CleanStatefulCheckpoint request from " << request->GetRemoteHost() << ":"
<< request->GetRemotePort();
ValidationResult result;
if (!ValidateTopology(request, result)) {
http_server_->SendErrorReply(request, result.GetCode(), result.GetMessage());
delete request;
return;
}
if (clean_stateful_checkpoint_cb_) {
LOG(ERROR) << "Another clean request is already pending";
http_server_->SendErrorReply(request, 400);
delete request;
return;
}
clean_stateful_checkpoint_cb_ = [request, this](proto::system::StatusCode status) {
this->HandleCleanStatefulCheckpointRequestDone(request, status);
};
tmanager_->CleanAllStatefulCheckpoint();
}
void TController::HandleCleanStatefulCheckpointResponse(proto::system::StatusCode _status) {
if (clean_stateful_checkpoint_cb_) {
clean_stateful_checkpoint_cb_(_status);
clean_stateful_checkpoint_cb_ = nullptr;
}
}
void TController::HandleCleanStatefulCheckpointRequestDone(IncomingHTTPRequest* request,
proto::system::StatusCode _status) {
LOG(INFO) << "Done with CleanStatefulCheckpoint Request with " << _status;
if (_status != proto::system::OK) {
LOG(ERROR) << "Unable to CleanStatefulCheckpoint" << _status;
http_server_->SendErrorReply(request, 500);
} else {
std::string msg = "Checkpoints successfully cleaned";
LOG(INFO) << msg;
auto response = make_unique<OutgoingHTTPResponse>(request);
response->AddResponse(msg);
http_server_->SendReply(request, 200, std::move(response));
}
delete request;
}
void TController::HandleUpdateRuntimeConfigRequest(IncomingHTTPRequest* request) {
LOG(INFO) << "Got a RuntimeConfig request from " << request->GetRemoteHost() << ":"
<< request->GetRemotePort();
ValidationResult result;
if (!ValidateTopology(request, result)) {
http_server_->SendErrorReply(request, result.GetCode(), result.GetMessage());
delete request;
return;
}
// Look for runtime-config parameters
std::vector<std::string> parameters;
if (!request->GetAllValues("runtime-config", parameters)) {
LOG(ERROR) << "No runtime config is found";
http_server_->SendErrorReply(request, 400, "No runtime config is found."
" Usage: runtime-config=[COMPONENT:]<CONFIG>:<VALUE>.");
delete request;
return;
}
LOG(INFO) << "Found " << parameters.size() << " configs in request.";
// Parse new configs in request
std::map<std::string, std::map<std::string, std::string>> config;
if (!ParseRuntimeConfig(parameters, config)) {
http_server_->SendErrorReply(request, 400, "Failed to parse runtime configs."
" Possibly bad format. The expected format is [COMPONENT:]<CONFIG>:<VALUE>.");
delete request;
return;
}
// Validate them before applying
if (!tmanager_->ValidateRuntimeConfig(config)) {
http_server_->SendErrorReply(request, 400, "Failed to validate runtime configs");
delete request;
return;
}
auto cb = [request, this](proto::system::StatusCode status) {
this->HandleUpdateRuntimeConfigRequestDone(request, status);
};
if (!tmanager_->UpdateRuntimeConfig(config, std::move(cb))) {
http_server_->SendErrorReply(request, 400, "Failed to update runtime configs");
delete request;
return;
}
}
void TController::HandleUpdateRuntimeConfigRequestDone(IncomingHTTPRequest* request,
proto::system::StatusCode _status) {
if (_status != proto::system::OK) {
std::string error = "Failed to update runtime configs ";
error += _status;
LOG(ERROR) << error;
http_server_->SendErrorReply(request, 500, error);
} else {
const std::string message("Runtime config updated");
LOG(INFO) << message;
auto response = make_unique<OutgoingHTTPResponse>(request);
response->AddResponse(message);
http_server_->SendReply(request, 200, std::move(response));
}
delete request;
}
void TController::HandleGetCurPPlanRequest(IncomingHTTPRequest* request) {
LOG(INFO) << "Got a GetCurPPlan request from " << request->GetRemoteHost() << ":"
<< request->GetRemotePort();
// make sure all the stream managers are alive, in case that when container is fail,
// physical plan is still available at TManager but not a valid one.
if (tmanager_->GetStmgrsRegSummary()->absent_stmgrs_size() != 0) {
http_server_->SendErrorReply(request, 400);
delete request;
return;
}
if (tmanager_->getPhysicalPlan() == NULL) {
http_server_->SendErrorReply(request, 400);
} else {
std::string pplanString;
tmanager_->getPhysicalPlan()->SerializeToString(&pplanString);
// SerializeToString() returns object in binary format which needs to be encoded
const unsigned char * encodeString = (unsigned char *)pplanString.c_str();
std::string pplanStringFixed = cereal::base64::encode(encodeString, pplanString.size());
const std::string message("Get current physical plan");
LOG(INFO) << message;
auto response = make_unique<OutgoingHTTPResponse>(request);
response->AddResponse(pplanStringFixed);
http_server_->SendReply(request, 200, std::move(response));
}
delete request;
}
/*
* Validate topology.
* - topology id matches
* - topology is initialized
* return true if topology is validated, false otherwise with error details stored in result object
*/
bool TController::ValidateTopology(const IncomingHTTPRequest* request, ValidationResult& result) {
const std::string& id = request->GetValue("topologyid");
if (id == "") {
LOG(ERROR) << "Argument 'topologyid' not specified in the request";
result.SetResult(400, "Missing 'topologyid' argument in the request");
return false;
}
if (id != tmanager_->GetTopologyId()) {
LOG(ERROR) << "Topology id does not match";
result.SetResult(400, "Topology id does not match");
return false;
}
if (tmanager_->getPhysicalPlan() == NULL) {
LOG(ERROR) << "Tmanager still not initialized (physical plan is not available)";
result.SetResult(500, "Tmanager still not initialized (physical plan is not available)");
return false;
}
return true;
}
bool TController::ParseRuntimeConfig(const std::vector<std::string>& paramters,
std::map<std::string, std::map<std::string, std::string>>& retval) {
// Configs are in the followingconfigMap format: [component:]config:value.
// It seems the regex implementation in c++ 11 is not very stable (The regex
// pattern("^([\\w\\.-]+:)?[\\w\\.-]+:[\\w\\.-]+$") runs ok in osx but throws exception in
// linux). Therefore we are manually searching ':' here (there should be 1 or 2 ':'s) instead of
// using regex matching.
std::vector<std::string>::const_iterator iter;
for (iter = paramters.begin(); iter != paramters.end(); ++iter) {
// Split using ':'
std::vector<std::string> segments = StrUtils::split(*iter, ":");
if (segments.size() == 2) {
// Topology level config
const char* topology_key = config::TopologyConfigHelper::GetReservedTopologyConfigKey();
retval[topology_key][segments[0]] = segments[1];
} else if (segments.size() == 3) {
// Component level config
retval[segments[0]][segments[1]] = segments[2];
} else {
// Wrong number of ':'s
LOG(ERROR) << "Bad format detected in " << *iter << "."
<< " Each config should be in this format: [component:]config:value";
return false;
}
}
return true;
}
} // namespace tmanager
} // namespace heron