blob: 1ad637b15030f48cfa5ecf3d81fcb8b898cef5ca [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "HTTPHandlers.h"
#include <algorithm>
#include "CivetStream.h"
#include "io/CRCStream.h"
#include "io/BufferStream.h"
#include "rapidjson/error/en.h"
#include "minifi-cpp/utils/gsl.h"
#include "agent/build_description.h"
#include "range/v3/algorithm/contains.hpp"
#include "range/v3/view/filter.hpp"
#include "range/v3/view/view.hpp"
#include "utils/net/DNS.h"
#include "sitetosite/HttpSiteToSiteClient.h"
#include "utils/StringUtils.h"
namespace org::apache::nifi::minifi::test {
bool SiteToSiteLocationResponder::handleGet(CivetServer* /*server*/, struct mg_connection *conn) {
std::string site2site_rest_resp = "{"
"\"revision\": {"
"\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\""
"},"
"\"controller\": {"
"\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\","
"\"name\": \"NiFi Flow\","
"\"siteToSiteSecure\": ";
site2site_rest_resp += (isSecure ? "true" : "false");
site2site_rest_resp += "}}";
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
"text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
site2site_rest_resp.length());
mg_printf(conn, "%s", site2site_rest_resp.c_str());
return true;
}
bool PeerResponder::handleGet(CivetServer* /*server*/, struct mg_connection *conn) {
#ifdef WIN32
std::string hostname = org::apache::nifi::minifi::utils::net::getMyHostName();
#else
std::string hostname = "localhost";
#endif
std::string site2site_rest_resp = R"({"peers" : [{ "hostname": ")" + hostname + R"(", "port": )" + port + R"(, "secure": false, "flowFileCount" : 0 }] })";
std::stringstream headers;
headers << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n";
mg_printf(conn, "%s", headers.str().c_str());
mg_printf(conn, "%s", site2site_rest_resp.c_str());
return true;
}
bool SiteToSiteBaseResponder::handleGet(CivetServer* /*server*/, struct mg_connection *conn) {
std::string site2site_rest_resp =
"{\"controller\":{\"id\":\"96dab149-0162-1000-7924-ed3122d6ea2b\",\"name\":\"NiFi Flow\",\"comments\":\"\",\"runningCount\":3,\"stoppedCount\":6,\"invalidCount\":1,\"disabledCount\":0,\"inputPortCount\":1,\"outputPortCount\":1,\"remoteSiteListeningPort\":10443,\"siteToSiteSecure\":false,\"instanceId\":\"13881505-0167-1000-be72-aa29341a3e9a\",\"inputPorts\":[{\"id\":\"471deef6-2a6e-4a7d-912a-81cc17e3a204\",\"name\":\"RPGIN\",\"comments\":\"\",\"state\":\"RUNNING\"}],\"outputPorts\":[{\"id\":\"9cf15a63-0166-1000-1b29-027406d96013\",\"name\":\"ddsga\",\"comments\":\"\",\"state\":\"STOPPED\"}]}}"; // NOLINT line length
std::stringstream headers;
headers << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n";
mg_printf(conn, "%s", headers.str().c_str());
mg_printf(conn, "%s", site2site_rest_resp.c_str());
return true;
}
TransactionResponder::TransactionResponder(std::string base_url, std::string port_id, bool input_port, bool wrong_uri, bool empty_transaction_uri)
: base_url(std::move(base_url)),
wrong_uri(wrong_uri),
empty_transaction_uri(empty_transaction_uri),
input_port(input_port),
port_id(std::move(port_id)),
flow_files_feed_(nullptr) {
if (input_port) {
transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de96";
transaction_id_str += std::to_string(transaction_id.load());
transaction_id++;
} else {
transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de95";
transaction_id_str += std::to_string(transaction_id_output.load());
transaction_id_output++;
}
}
bool TransactionResponder::handlePost(CivetServer* /*server*/, struct mg_connection *conn) {
auto req_info = mg_get_request_info(conn);
static const std::unordered_map<std::string, std::string> expected_headers {
{std::string{minifi::sitetosite::HttpSiteToSiteClient::PROTOCOL_VERSION_HEADER}, "1"},
{std::string{minifi::sitetosite::HttpSiteToSiteClient::HANDSHAKE_PROPERTY_USE_COMPRESSION}, "false"},
{std::string{minifi::sitetosite::HttpSiteToSiteClient::HANDSHAKE_PROPERTY_REQUEST_EXPIRATION}, "20000"},
{std::string{minifi::sitetosite::HttpSiteToSiteClient::HANDSHAKE_PROPERTY_BATCH_COUNT}, "5"},
{std::string{minifi::sitetosite::HttpSiteToSiteClient::HANDSHAKE_PROPERTY_BATCH_SIZE}, "100"},
{std::string{minifi::sitetosite::HttpSiteToSiteClient::HANDSHAKE_PROPERTY_BATCH_DURATION}, "30000"}
};
std::unordered_map<std::string, std::string> received_headers;
for (int i = 0; i < req_info->num_headers; ++i) {
auto header = &req_info->http_headers[i];
received_headers[std::string(header->name)] = std::string(header->value);
}
for (const auto& header : expected_headers) {
CHECK(received_headers.contains(header.first));
CHECK(received_headers[header.first] == header.second);
}
std::string site2site_rest_resp;
std::stringstream headers;
headers << "HTTP/1.1 201 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nX-Location-Uri-Intent: ";
if (wrong_uri)
headers << "ohstuff\r\n";
else
headers << "transaction-url\r\n";
std::string port_type;
if (input_port)
port_type = "input-ports";
else
port_type = "output-ports";
if (!empty_transaction_uri)
headers << "locAtion: " << base_url << "/site-to-site/" << port_type << "/" << port_id << "/transactions/" << transaction_id_str << "\r\n";
headers << "Connection: close\r\n\r\n";
mg_printf(conn, "%s", headers.str().c_str());
mg_printf(conn, "%s", site2site_rest_resp.c_str());
return true;
}
bool FlowFileResponder::handlePost(CivetServer* /*server*/, struct mg_connection *conn) {
std::string site2site_rest_resp;
std::stringstream headers;
if (!wrong_uri) {
minifi::io::CivetStream civet_stream(conn);
minifi::io::CRCStream < minifi::io::CivetStream > stream(gsl::make_not_null(&civet_stream));
uint32_t num_attributes = 0;
uint64_t total_size = 0;
{
const auto read = stream.read(num_attributes);
if (!isServerRunning()) return false;
REQUIRE(read > 0);
total_size += read;
}
const auto flow = std::make_shared<FlowObj>();
for (uint32_t i = 0; i < num_attributes; i++) {
std::string name;
std::string value;
{
const auto read = stream.read(name, true);
if (!isServerRunning()) return false;
REQUIRE(read > 0);
total_size += read;
}
{
const auto read = stream.read(value, true);
if (!isServerRunning()) return false;
REQUIRE(read > 0);
total_size += read;
}
flow->attributes[name] = value;
}
uint64_t length{};
{
const auto read = stream.read(length);
if (!isServerRunning()) return false;
REQUIRE(read > 0);
total_size += read;
}
total_size += length;
flow->data.resize(gsl::narrow<size_t>(length));
flow->total_size = total_size;
{
const auto read = stream.read(flow->data);
if (!isServerRunning()) return false;
(void)read;
REQUIRE(read == length);
}
if (!invalid_checksum) {
site2site_rest_resp = std::to_string(stream.getCRC());
flow_files_.enqueue(flow);
} else {
site2site_rest_resp = "Imawrongchecksumshortandstout";
}
headers << "HTTP/1.1 202 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n";
} else {
headers << "HTTP/1.1 404\r\nConnection: close\r\n\r\n";
}
mg_printf(conn, "%s", headers.str().c_str());
mg_printf(conn, "%s", site2site_rest_resp.c_str());
return true;
}
bool FlowFileResponder::handleGet(CivetServer* /*server*/, struct mg_connection *conn) {
if (flow_files_feed_->size_approx() > 0) {
std::shared_ptr<FlowObj> flowobj;
std::vector<std::shared_ptr<FlowObj>> flows;
uint64_t total = 0;
while (flow_files_feed_->try_dequeue(flowobj)) {
flows.push_back(flowobj);
total += flowobj->total_size;
}
mg_printf(conn,
"HTTP/1.1 200 OK\r\n"
"Content-Length: %" PRIu64 "\r\n"
"Content-Type: application/octet-stream\r\n"
"Connection: close\r\n\r\n",
total);
minifi::io::BufferStream serializer;
minifi::io::CRCStream <minifi::io::OutputStream> stream(gsl::make_not_null(&serializer));
for (const auto& flow : flows) {
auto num_attributes = gsl::narrow<uint32_t>(flow->attributes.size());
stream.write(num_attributes);
for (const auto& entry : flow->attributes) {
stream.write(entry.first);
stream.write(entry.second);
}
uint64_t length = flow->data.size();
stream.write(length);
stream.write(flow->data);
}
} else {
mg_printf(conn, "HTTP/1.1 200 OK\r\nConnection: "
"close\r\nContent-Length: 0\r\n");
mg_printf(conn, "Content-Type: text/plain\r\n\r\n");
}
return true;
}
bool DeleteTransactionResponder::handleDelete(CivetServer* /*server*/, struct mg_connection *conn) {
std::string site2site_rest_resp;
std::stringstream headers;
std::string resp;
CivetServer::getParam(conn, "responseCode", resp);
headers << "HTTP/1.1 " << response_code << "\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\n";
headers << "Connection: close\r\n\r\n";
mg_printf(conn, "%s", headers.str().c_str());
mg_printf(conn, "%s", site2site_rest_resp.c_str());
return true;
}
void HeartbeatHandler::sendHeartbeatResponse(const std::vector<C2Operation>& operations, struct mg_connection * conn) {
rapidjson::Document hb_obj{rapidjson::kObjectType};
hb_obj.AddMember("operation", "heartbeat", hb_obj.GetAllocator());
hb_obj.AddMember("requested_operations", rapidjson::kArrayType, hb_obj.GetAllocator());
for (const auto& c2_operation : operations) {
rapidjson::Value op{rapidjson::kObjectType};
op.AddMember("operation", c2_operation.operation, hb_obj.GetAllocator());
op.AddMember("operationid", c2_operation.operation_id, hb_obj.GetAllocator());
op.AddMember("operand", c2_operation.operand, hb_obj.GetAllocator());
if (!c2_operation.args.empty()) {
rapidjson::Value args{rapidjson::kObjectType};
for (auto& [arg_name, arg_val] : c2_operation.args) {
rapidjson::Value json_arg_val;
if (auto* json_val = arg_val.json()) {
json_arg_val.CopyFrom(*json_val, hb_obj.GetAllocator());
} else {
json_arg_val.SetString(arg_val.to_string(), hb_obj.GetAllocator());
}
args.AddMember(rapidjson::StringRef(arg_name), json_arg_val, hb_obj.GetAllocator());
}
op.AddMember("args", args, hb_obj.GetAllocator());
}
hb_obj["requested_operations"].PushBack(op, hb_obj.GetAllocator());
}
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
hb_obj.Accept(writer);
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
"text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
buffer.GetLength());
mg_printf(conn, "%s", buffer.GetString());
}
void HeartbeatHandler::verifyJsonHasAgentManifest(const rapidjson::Document& root, const std::vector<std::string>& verify_components, const std::vector<std::string>& disallowed_properties) {
bool found = false;
REQUIRE(root.HasMember("agentInfo"));
REQUIRE(root["agentInfo"].HasMember("agentManifest"));
REQUIRE(root["agentInfo"]["agentManifest"].HasMember("bundles"));
REQUIRE(root["agentInfo"].HasMember("agentManifestHash"));
const std::string manifestHash = root["agentInfo"]["agentManifestHash"].GetString();
REQUIRE(manifestHash.length() == 128);
// throws if not a valid hexadecimal hash
const auto hashVec = minifi::utils::string::from_hex(manifestHash);
REQUIRE(hashVec.size() == 64);
for (auto &bundle : root["agentInfo"]["agentManifest"]["bundles"].GetArray()) {
REQUIRE(bundle.HasMember("artifact"));
std::string str = bundle["artifact"].GetString();
if (str == "minifi-standard-processors") {
std::vector<std::string> classes;
for (auto &proc : bundle["componentManifest"]["processors"].GetArray()) {
classes.push_back(proc["type"].GetString());
}
auto group = minifi::BuildDescription{}.getClassDescriptions(str);
for (const auto& proc : group.processors_) {
REQUIRE(std::find(classes.begin(), classes.end(), proc.full_name_) != std::end(classes));
(void)proc;
found = true;
}
}
}
REQUIRE(found);
verifySupportedOperations(root, verify_components, disallowed_properties);
}
void HeartbeatHandler::verify(struct mg_connection *conn) {
auto post_data = readPayload(conn);
if (!isServerRunning()) {
return;
}
if (!IsNullOrEmpty(post_data)) {
rapidjson::Document root;
rapidjson::ParseResult result = root.Parse(post_data.data(), post_data.size());
if (!result) {
throw std::runtime_error(fmt::format("JSON parse error: {0}\n JSON data: {1}", std::string(rapidjson::GetParseError_En(result.Code())), post_data));
}
std::string operation = root["operation"].GetString();
if (operation == "heartbeat") {
handleHeartbeat(root, conn);
} else if (operation == "acknowledge") {
handleAcknowledge(root);
} else {
throw std::runtime_error("operation not supported " + operation);
}
}
}
std::set<std::string> HeartbeatHandler::getOperandsOfProperties(const rapidjson::Value& operation_node) {
std::set<std::string> operands;
REQUIRE(operation_node.HasMember("properties"));
const auto& properties_node = operation_node["properties"];
for (auto it = properties_node.MemberBegin(); it != properties_node.MemberEnd(); ++it) {
operands.insert(it->name.GetString());
}
return operands;
}
void HeartbeatHandler::verifyMetadata(const rapidjson::Value& operation_node, const std::unordered_map<std::string, Metadata>& operand_with_metadata) {
std::unordered_map<std::string, Metadata> operand_with_metadata_found;
const auto& properties_node = operation_node["properties"];
for (auto prop_it = properties_node.MemberBegin(); prop_it != properties_node.MemberEnd(); ++prop_it) {
if (prop_it->value.ObjectEmpty()) {
continue;
}
Metadata metadata_item;
for (auto metadata_it = prop_it->value.MemberBegin(); metadata_it != prop_it->value.MemberEnd(); ++metadata_it) {
std::vector<std::unordered_map<std::string, std::string>> values;
for (const auto& value : metadata_it->value.GetArray()) {
std::unordered_map<std::string, std::string> value_item;
for (auto value_it = value.MemberBegin(); value_it != value.MemberEnd(); ++value_it) {
value_item.emplace(value_it->name.GetString(), value_it->value.GetString());
}
values.push_back(value_item);
}
metadata_item.emplace(metadata_it->name.GetString(), values);
}
operand_with_metadata_found.emplace(prop_it->name.GetString(), metadata_item);
}
REQUIRE(operand_with_metadata_found == operand_with_metadata);
}
void HeartbeatHandler::verifyProperties(const rapidjson::Value& operation_node, minifi::c2::Operation operation,
const std::vector<std::string>& verify_components, const std::vector<std::string>& disallowed_properties) {
switch (operation) {
case minifi::c2::Operation::describe: {
verifyOperands<minifi::c2::DescribeOperand>(operation_node);
break;
}
case minifi::c2::Operation::update: {
std::vector<std::unordered_map<std::string, std::string>> config_properties;
const auto prop_reader = [this](const std::string& sensitive_props) { return configuration_->getString(sensitive_props); };
const auto sensitive_props = minifi::Configuration::getSensitiveProperties(prop_reader);
auto allowed_not_sensitive_configuration_properties = minifi::Configuration::CONFIGURATION_PROPERTIES | ranges::views::filter([&](const auto& configuration_property) {
const auto& configuration_property_name = configuration_property.first;
return !ranges::contains(sensitive_props, configuration_property_name) && !ranges::contains(disallowed_properties, configuration_property_name);
});
for (const auto& [property_name, property_validator] : allowed_not_sensitive_configuration_properties) {
std::unordered_map<std::string, std::string> config_property;
config_property.emplace("propertyName", property_name);
if (auto value = configuration_->getRawValue(std::string(property_name))) {
config_property.emplace("propertyValue", *value);
}
if (const auto nifi_standard_validator = property_validator->getEquivalentNifiStandardValidatorName()) {
config_property.emplace("validator", *nifi_standard_validator);
}
config_properties.push_back(config_property);
}
Metadata metadata;
metadata.emplace("availableProperties", config_properties);
std::unordered_map<std::string, Metadata> operand_with_metadata;
operand_with_metadata.emplace("properties", metadata);
verifyOperands<minifi::c2::UpdateOperand>(operation_node, operand_with_metadata);
break;
}
case minifi::c2::Operation::transfer: {
verifyOperands<minifi::c2::TransferOperand>(operation_node);
break;
}
case minifi::c2::Operation::clear: {
verifyOperands<minifi::c2::ClearOperand>(operation_node);
break;
}
case minifi::c2::Operation::start:
case minifi::c2::Operation::stop: {
auto operands = getOperandsOfProperties(operation_node);
REQUIRE(operands.contains("c2"));
// FlowController is also present, but this handler has no way of knowing its UUID to test it
for (const auto& component : verify_components) {
REQUIRE(operands.contains(component));
}
break;
}
default:
break;
}
}
void HeartbeatHandler::verifySupportedOperations(const rapidjson::Document& root, const std::vector<std::string>& verify_components, const std::vector<std::string>& disallowed_properties) {
auto& agent_manifest = root["agentInfo"]["agentManifest"];
REQUIRE(agent_manifest.HasMember("supportedOperations"));
std::set<std::string> operations;
for (const auto& operation_node : agent_manifest["supportedOperations"].GetArray()) {
REQUIRE(operation_node.HasMember("type"));
operations.insert(operation_node["type"].GetString());
verifyProperties(operation_node, minifi::utils::enumCast<minifi::c2::Operation>(operation_node["type"].GetString(), true), verify_components, disallowed_properties);
}
REQUIRE(operations == std::set<std::string>(magic_enum::enum_names<minifi::c2::Operation>().begin(), magic_enum::enum_names<minifi::c2::Operation>().end()));
}
void StoppingHeartbeatHandler::sendStartStopOperation(struct mg_connection *conn) {
std::lock_guard<std::mutex> lock(start_stop_send_mutex_);
std::string requested_operation;
if (post_count_ == 0) {
requested_operation = R"({ "operationid" : 41, "operation" : "stop", "operand" : "2438e3c8-015a-1000-79ca-83af40ec1991" }, )"
R"({ "operationid" : 42, "operation" : "stop", "operand" : "FlowController" })";
} else if (post_count_ == 1) {
requested_operation = R"({ "operationid" : 43, "operation" : "start", "operand" : "2438e3c8-015a-1000-79ca-83af40ec1991" }, )"
R"({ "operationid" : 44, "operation" : "start", "operand" : "FlowController" })";
} else if (post_count_ == 2) {
requested_operation = R"({ "identifier" : 45, "operation" : "STOP", "operand" : "PROCESSOR", "args" : { "processorId" : "2438e3c8-015a-1000-79ca-83af40ec1992" } }, )"
R"({ "identifier" : 46, "operation" : "STOP", "operand" : "FLOW" })";
} else if (post_count_ == 3) {
requested_operation = R"({ "identifier" : 47, "operation" : "START", "operand" : "PROCESSOR", "args" : { "processorId" : "2438e3c8-015a-1000-79ca-83af40ec1992" } }, )"
R"({ "identifier" : 48, "operation" : "START", "operand" : "FLOW" })";
} else {
requested_operation = R"({ "identifier" : 49, "operation" : "STOP", "operand" : "PROCESSOR", "args" : { "processorId" : "9998e3c8-015a-1000-79ca-83af40ec1999" } }, )"
R"({ "identifier" : 50, "operation" : "STOP", "operand" : "PROCESSOR" })";
}
std::string resp = R"({"operation" : "heartbeat", "requested_operations" : [ )" + requested_operation + " ]}";
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
"text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
resp.length());
mg_printf(conn, "%s", resp.c_str());
++post_count_;
}
bool C2FlowProvider::handleGet(CivetServer* /*server*/, struct mg_connection *conn) {
std::ifstream myfile(test_file_location_.c_str(), std::ios::in | std::ios::binary);
if (myfile.good()) {
std::string str((std::istreambuf_iterator<char>(myfile)), (std::istreambuf_iterator<char>()));
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
"text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
str.length());
mg_printf(conn, "%s", str.c_str());
} else {
mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
}
return true;
}
bool C2UpdateHandler::handlePost(CivetServer* /*server*/, struct mg_connection *conn) {
calls_++;
if (!response_.empty()) {
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
"text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
response_.length());
mg_printf(conn, "%s", response_.c_str());
response_.clear();
} else {
mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n");
}
return true;
}
void C2UpdateHandler::setC2RestResponse(const std::string& url, const std::string& name, const std::optional<std::string>& persist) {
std::string content = R"({"location": ")" + url + "\"";
if (persist) {
content += R"(, "persist": ")" + *persist + "\"";
}
content += "}";
response_ =
"{\"operation\" : \"heartbeat\", "
"\"requested_operations\": [ {"
"\"operation\" : \"update\", "
"\"operationid\" : \"8675309\", "
"\"name\": \"" + name + "\", "
"\"content\": " + content + "}]}";
}
bool C2FailedUpdateHandler::handlePost(CivetServer *server, struct mg_connection *conn) {
calls_++;
const auto data = readPayload(conn);
if (data.find("operationState") != std::string::npos) {
REQUIRE(data.find("state\": \"NOT_APPLIED") != std::string::npos);
}
return C2UpdateHandler::handlePost(server, conn);
}
void TimeoutingHTTPHandler::respond(struct mg_connection *conn) {
if (!wait_times_.empty() && wait_times_[0] > std::chrono::seconds(0)) {
sleep_for(wait_times_[0]);
}
int chunk_count = std::max(static_cast<int>(wait_times_.size()) - 1, 0);
mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: %d\r\nConnection: close\r\n\r\n", chunk_count);
for (int chunkIdx = 0; chunkIdx < chunk_count; ++chunkIdx) {
mg_printf(conn, "a");
if (wait_times_[chunkIdx + 1].count() > 0) {
sleep_for(wait_times_[chunkIdx + 1]);
}
}
}
bool HttpGetResponder::handleGet(CivetServer* /*server*/, struct mg_connection *conn) {
puts("handle get");
static const std::string site2site_rest_resp = "hi this is a get test";
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
"text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n",
site2site_rest_resp.length());
mg_printf(conn, "%s", site2site_rest_resp.c_str());
return true;
}
bool C2AcknowledgeHandler::handlePost(CivetServer* /*server*/, struct mg_connection* conn) {
std::string req = readPayload(conn);
rapidjson::Document root;
root.Parse(req.data(), req.size());
std::string result_state;
std::string details;
if (root.IsObject() && root.HasMember("operationState")) {
if (root["operationState"].IsObject()) {
if (root["operationState"].HasMember("state")) {
result_state = root["operationState"]["state"].GetString();
std::lock_guard<std::mutex> guard(apply_count_mtx_);
++apply_count_[result_state];
}
if (root["operationState"].HasMember("details")) {
details = root["operationState"]["details"].GetString();
}
}
}
if (root.IsObject() && root.HasMember("operationId")) {
std::lock_guard<std::mutex> guard(ack_operations_mtx_);
acknowledged_operations_.insert({root["operationId"].GetString(), OpResult{result_state, details}});
}
mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: "
"text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n");
return true;
}
} // namespace org::apache::nifi::minifi::test