| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #ifndef LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ |
| #define LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ |
| |
| #include "civetweb.h" |
| #include "CivetServer.h" |
| #include "concurrentqueue.h" |
| #include "CivetStream.h" |
| #include "io/CRCStream.h" |
| #include "rapidjson/document.h" |
| #include <cinttypes> |
| #include <utility> |
| #include "HTTPUtils.h" |
| #include "ServerAwareHandler.h" |
| |
| static std::atomic<int> transaction_id; |
| static std::atomic<int> transaction_id_output; |
| |
| struct FlowObj { |
| FlowObj() = default; |
| |
| FlowObj(FlowObj &&other) noexcept |
| : total_size(other.total_size), |
| attributes(std::move(other.attributes)), |
| data(std::move(other.data)) |
| { } |
| |
| uint64_t total_size{0}; |
| std::map<std::string, std::string> attributes; |
| std::vector<uint8_t> data; |
| }; |
| |
| class SiteToSiteLocationResponder : public ServerAwareHandler { |
| public: |
| explicit SiteToSiteLocationResponder(bool isSecure) |
| : isSecure(isSecure) { |
| } |
| bool handleGet(CivetServer *server, struct mg_connection *conn) override { |
| std::string site2site_rest_resp = "{" |
| "\"revision\": {" |
| "\"clientId\": \"483d53eb-53ec-4e93-b4d4-1fc3d23dae6f\"" |
| "}," |
| "\"controller\": {" |
| "\"id\": \"fe4a3a42-53b6-4af1-a80d-6fdfe60de97f\"," |
| "\"name\": \"NiFi Flow\"," |
| "\"siteToSiteSecure\": "; |
| site2site_rest_resp += (isSecure ? "true" : "false"); |
| site2site_rest_resp += "}}"; |
| mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " |
| "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", |
| site2site_rest_resp.length()); |
| mg_printf(conn, "%s", site2site_rest_resp.c_str()); |
| return true; |
| } |
| |
| protected: |
| bool isSecure; |
| }; |
| |
| class PeerResponder : public ServerAwareHandler { |
| public: |
| |
| explicit PeerResponder(std::string base_url) { |
| std::string scheme; |
| assert(parse_http_components(base_url, port, scheme, path)); |
| } |
| |
| bool handleGet(CivetServer *server, struct mg_connection *conn) override { |
| |
| #ifdef WIN32 |
| std::string hostname = org::apache::nifi::minifi::io::Socket::getMyHostName(); |
| #else |
| std::string hostname = "localhost"; |
| #endif |
| std::string site2site_rest_resp = "{\"peers\" : [{ \"hostname\": \"" + hostname + "\", \"port\": " + port + ", \"secure\": false, \"flowFileCount\" : 0 }] }"; |
| std::stringstream headers; |
| headers << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n"; |
| mg_printf(conn, "%s", headers.str().c_str()); |
| mg_printf(conn, "%s", site2site_rest_resp.c_str()); |
| return true; |
| } |
| |
| protected: |
| std::string base_url; |
| std::string port; |
| std::string path; |
| }; |
| |
| class SiteToSiteBaseResponder : public ServerAwareHandler { |
| public: |
| |
| explicit SiteToSiteBaseResponder(std::string base_url) |
| : base_url(std::move(base_url)) { |
| } |
| |
| bool handleGet(CivetServer *server, struct mg_connection *conn) override { |
| std::string site2site_rest_resp = |
| "{\"controller\":{\"id\":\"96dab149-0162-1000-7924-ed3122d6ea2b\",\"name\":\"NiFi Flow\",\"comments\":\"\",\"runningCount\":3,\"stoppedCount\":6,\"invalidCount\":1,\"disabledCount\":0,\"inputPortCount\":1,\"outputPortCount\":1,\"remoteSiteListeningPort\":10443,\"siteToSiteSecure\":false,\"instanceId\":\"13881505-0167-1000-be72-aa29341a3e9a\",\"inputPorts\":[{\"id\":\"471deef6-2a6e-4a7d-912a-81cc17e3a204\",\"name\":\"RPGIN\",\"comments\":\"\",\"state\":\"RUNNING\"}],\"outputPorts\":[{\"id\":\"9cf15a63-0166-1000-1b29-027406d96013\",\"name\":\"ddsga\",\"comments\":\"\",\"state\":\"STOPPED\"}]}}"; |
| std::stringstream headers; |
| headers << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n"; |
| mg_printf(conn, "%s", headers.str().c_str()); |
| mg_printf(conn, "%s", site2site_rest_resp.c_str()); |
| return true; |
| } |
| |
| protected: |
| std::string base_url; |
| }; |
| |
| class TransactionResponder : public ServerAwareHandler { |
| public: |
| |
| explicit TransactionResponder(std::string base_url, std::string port_id, bool input_port, bool wrong_uri = false, bool empty_transaction_uri = false) |
| : base_url(std::move(base_url)), |
| wrong_uri(wrong_uri), |
| empty_transaction_uri(empty_transaction_uri), |
| input_port(input_port), |
| port_id(std::move(port_id)), |
| flow_files_feed_(nullptr) { |
| |
| if (input_port) { |
| transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de96"; |
| transaction_id_str += std::to_string(transaction_id.load()); |
| transaction_id++; |
| } else { |
| transaction_id_str = "fe4a3a42-53b6-4af1-a80d-6fdfe60de95"; |
| transaction_id_str += std::to_string(transaction_id_output.load()); |
| transaction_id_output++; |
| } |
| } |
| |
| bool handlePost(CivetServer *server, struct mg_connection *conn) override { |
| std::string site2site_rest_resp; |
| std::stringstream headers; |
| headers << "HTTP/1.1 201 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nX-Location-Uri-Intent: "; |
| if (wrong_uri) |
| headers << "ohstuff\r\n"; |
| else |
| headers << "transaction-url\r\n"; |
| |
| std::string port_type; |
| |
| if (input_port) |
| port_type = "input-ports"; |
| else |
| port_type = "output-ports"; |
| if (!empty_transaction_uri) |
| headers << "locAtion: " << base_url << "/site-to-site/" << port_type << "/" << port_id << "/transactions/" << transaction_id_str << "\r\n"; |
| headers << "Connection: close\r\n\r\n"; |
| mg_printf(conn, "%s", headers.str().c_str()); |
| mg_printf(conn, "%s", site2site_rest_resp.c_str()); |
| return true; |
| } |
| |
| void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) { |
| flow_files_feed_ = feed; |
| } |
| |
| std::string getTransactionId() { |
| return transaction_id_str; |
| } |
| protected: |
| std::string base_url; |
| std::string transaction_id_str; |
| bool wrong_uri; |
| bool empty_transaction_uri; |
| bool input_port; |
| std::string port_id; |
| moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; |
| }; |
| |
| class FlowFileResponder : public ServerAwareHandler { |
| public: |
| |
| explicit FlowFileResponder(bool input_port, bool wrong_uri = false, bool invalid_checksum = false) |
| : wrong_uri(wrong_uri), |
| input_port(input_port), |
| invalid_checksum(invalid_checksum), |
| flow_files_feed_(nullptr) { |
| } |
| |
| moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *getFlows() { |
| return &flow_files_; |
| } |
| |
| void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) { |
| flow_files_feed_ = feed; |
| } |
| |
| bool handlePost(CivetServer *server, struct mg_connection *conn) override { |
| std::string site2site_rest_resp; |
| std::stringstream headers; |
| |
| if (!wrong_uri) { |
| minifi::io::CivetStream civet_stream(conn); |
| minifi::io::CRCStream < minifi::io::CivetStream > stream(&civet_stream); |
| uint32_t num_attributes; |
| int read; |
| uint64_t total_size = 0; |
| read = stream.read(num_attributes); |
| if(!isServerRunning())return false; |
| assert(read > 0); total_size += read; |
| |
| auto flow = std::make_shared<FlowObj>(); |
| |
| for (int i = 0; i < num_attributes; i++) { |
| std::string name, value; |
| read = stream.readUTF(name, true); |
| if(!isServerRunning())return false; |
| assert(read > 0); total_size += read; |
| read = stream.readUTF(value, true); |
| if(!isServerRunning())return false; |
| assert(read > 0); total_size += read; |
| flow->attributes[name] = value; |
| } |
| uint64_t length; |
| read = stream.read(length); |
| if(!isServerRunning())return false; |
| assert(read > 0); total_size += read; |
| |
| total_size += length; |
| flow->data.resize(length); |
| flow->total_size = total_size; |
| |
| read = stream.readData(flow->data.data(), length); |
| if(!isServerRunning())return false; |
| assert(read == length); |
| |
| assert(flow->attributes["path"] == "."); |
| assert(!flow->attributes["uuid"].empty()); |
| assert(!flow->attributes["filename"].empty()); |
| |
| if (!invalid_checksum) { |
| site2site_rest_resp = std::to_string(stream.getCRC()); |
| flow_files_.enqueue(flow); |
| } else { |
| site2site_rest_resp = "Imawrongchecksumshortandstout"; |
| } |
| |
| headers << "HTTP/1.1 202 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n"; |
| } else { |
| headers << "HTTP/1.1 404\r\nConnection: close\r\n\r\n"; |
| } |
| |
| mg_printf(conn, "%s", headers.str().c_str()); |
| mg_printf(conn, "%s", site2site_rest_resp.c_str()); |
| return true; |
| } |
| |
| bool handleGet(CivetServer *server, struct mg_connection *conn) override { |
| |
| if (flow_files_feed_->size_approx() > 0) { |
| std::shared_ptr<FlowObj> flowobj; |
| uint8_t buf[1]; |
| std::vector<std::shared_ptr<FlowObj>> flows; |
| uint64_t total = 0; |
| |
| while (flow_files_feed_->try_dequeue(flowobj)) { |
| flows.push_back(flowobj); |
| total += flowobj->total_size; |
| } |
| mg_printf(conn, |
| "HTTP/1.1 200 OK\r\n" |
| "Content-Length: %" PRIu64 "\r\n" |
| "Content-Type: application/octet-stream\r\n" |
| "Connection: close\r\n\r\n", |
| total); |
| minifi::io::BaseStream serializer; |
| minifi::io::CRCStream < minifi::io::BaseStream > stream(&serializer); |
| for (const auto& flow : flows) { |
| uint32_t num_attributes = flow->attributes.size(); |
| stream.write(num_attributes); |
| for (const auto& entry : flow->attributes) { |
| stream.writeUTF(entry.first); |
| stream.writeUTF(entry.second); |
| } |
| uint64_t length = flow->data.size(); |
| stream.write(length); |
| stream.writeData(flow->data.data(), length); |
| } |
| } else { |
| mg_printf(conn, "HTTP/1.1 200 OK\r\nConnection: " |
| "close\r\nContent-Length: 0\r\n"); |
| mg_printf(conn, "Content-Type: text/plain\r\n\r\n"); |
| |
| } |
| return true; |
| } |
| |
| void setFlowUrl(std::string flowUrl) { |
| base_url = std::move(flowUrl); |
| } |
| |
| protected: |
| // base url |
| std::string base_url; |
| // set the wrong url |
| bool wrong_uri; |
| // we are running an input port |
| bool input_port; |
| // invalid checksum is returned. |
| bool invalid_checksum; |
| moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> flow_files_; |
| moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; |
| }; |
| |
| class DeleteTransactionResponder : public ServerAwareHandler { |
| public: |
| |
| explicit DeleteTransactionResponder(std::string base_url, std::string response_code, int expected_resp_code) |
| : flow_files_feed_(nullptr), |
| base_url(std::move(base_url)), |
| response_code(std::move(response_code)) { |
| expected_resp_code_str = std::to_string(expected_resp_code); |
| } |
| |
| explicit DeleteTransactionResponder(std::string base_url, std::string response_code, moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) |
| : flow_files_feed_(feed), |
| base_url(std::move(base_url)), |
| response_code(std::move(response_code)) { |
| } |
| |
| bool handleDelete(CivetServer *server, struct mg_connection *conn) override { |
| std::string site2site_rest_resp; |
| std::stringstream headers; |
| std::string resp; |
| CivetServer::getParam(conn, "responseCode", resp); |
| headers << "HTTP/1.1 " << response_code << "\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\n"; |
| headers << "Connection: close\r\n\r\n"; |
| mg_printf(conn, "%s", headers.str().c_str()); |
| mg_printf(conn, "%s", site2site_rest_resp.c_str()); |
| return true; |
| } |
| |
| void setFeed(moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *feed) { |
| flow_files_feed_ = feed; |
| } |
| |
| protected: |
| moodycamel::ConcurrentQueue<std::shared_ptr<FlowObj>> *flow_files_feed_; |
| std::string base_url; |
| std::string expected_resp_code_str; |
| std::string response_code; |
| }; |
| |
| std::string readPayload(struct mg_connection *conn) { |
| std::string response; |
| int readBytes; |
| |
| std::array<char, 1024> buffer; |
| while ((readBytes = mg_read(conn, buffer.data(), buffer.size())) > 0) { |
| response.append(buffer.data(), readBytes); |
| } |
| return response; |
| } |
| |
| class HeartbeatHandler : public ServerAwareHandler { |
| public: |
| void sendStopOperation(struct mg_connection *conn) { |
| std::string resp = "{\"operation\" : \"heartbeat\", \"requested_operations\" : [{ \"operationid\" : 41, \"operation\" : \"stop\", \"operand\" : \"invoke\" }, " |
| "{ \"operationid\" : 42, \"operation\" : \"stop\", \"operand\" : \"FlowController\" } ]}"; |
| mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " |
| "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", |
| resp.length()); |
| mg_printf(conn, "%s", resp.c_str()); |
| } |
| |
| void sendHeartbeatResponse(const std::string& operation, const std::string& operand, const std::string& operationId, struct mg_connection * conn) { |
| std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" |
| "\"operation\" : \"" + operation + "\"," |
| "\"operationid\" : \"" + operationId + "\"," |
| "\"operand\": \"" + operand + "\"}]}"; |
| |
| mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " |
| "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", |
| heartbeat_response.length()); |
| mg_printf(conn, "%s", heartbeat_response.c_str()); |
| } |
| |
| void verifyJsonHasAgentManifest(const rapidjson::Document& root) { |
| bool found = false; |
| assert(root.HasMember("agentInfo")); |
| assert(root["agentInfo"].HasMember("agentManifest")); |
| assert(root["agentInfo"]["agentManifest"].HasMember("bundles")); |
| |
| for (auto &bundle : root["agentInfo"]["agentManifest"]["bundles"].GetArray()) { |
| assert(bundle.HasMember("artifact")); |
| std::string str = bundle["artifact"].GetString(); |
| if (str == "minifi-standard-processors") { |
| |
| std::vector<std::string> classes; |
| for (auto &proc : bundle["componentManifest"]["processors"].GetArray()) { |
| classes.push_back(proc["type"].GetString()); |
| } |
| |
| auto group = minifi::BuildDescription::getClassDescriptions(str); |
| for (const auto& proc : group.processors_) { |
| assert(std::find(classes.begin(), classes.end(), proc.class_name_) != std::end(classes)); |
| found = true; |
| } |
| |
| } |
| } |
| assert(found); |
| } |
| |
| virtual void handleHeartbeat(const rapidjson::Document& root, struct mg_connection *) { |
| verifyJsonHasAgentManifest(root); |
| } |
| |
| virtual void handleAcknowledge(const rapidjson::Document&) { |
| } |
| |
| void verify(struct mg_connection *conn) { |
| auto post_data = readPayload(conn); |
| //std::cerr << post_data << std::endl; |
| if (!IsNullOrEmpty(post_data)) { |
| rapidjson::Document root; |
| rapidjson::ParseResult ok = root.Parse(post_data.data(), post_data.size()); |
| assert(ok); |
| std::string operation = root["operation"].GetString(); |
| if (operation == "heartbeat") { |
| handleHeartbeat(root, conn); |
| } else if (operation == "acknowledge") { |
| handleAcknowledge(root); |
| } else { |
| throw std::runtime_error("operation not supported " + operation); |
| } |
| } |
| } |
| |
| bool handlePost(CivetServer *, struct mg_connection *conn) override { |
| verify(conn); |
| sendStopOperation(conn); |
| return true; |
| } |
| }; |
| |
| class C2UpdateHandler : public ServerAwareHandler { |
| public: |
| explicit C2UpdateHandler(const std::string& test_file_location) |
| : test_file_location_(test_file_location) { |
| } |
| |
| bool handlePost(CivetServer *server, struct mg_connection *conn) override { |
| calls_++; |
| if (!response_.empty()) { |
| mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " |
| "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", |
| response_.length()); |
| mg_printf(conn, "%s", response_.c_str()); |
| response_.clear(); |
| } else { |
| mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); |
| } |
| |
| return true; |
| } |
| |
| bool handleGet(CivetServer *server, struct mg_connection *conn) override { |
| std::ifstream myfile(test_file_location_.c_str(), std::ios::in | std::ios::binary); |
| if (myfile.good()) { |
| std::string str((std::istreambuf_iterator<char>(myfile)), (std::istreambuf_iterator<char>())); |
| mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " |
| "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", |
| str.length()); |
| mg_printf(conn, "%s", str.c_str()); |
| } else { |
| mg_printf(conn, "HTTP/1.1 500 Internal Server Error\r\n"); |
| } |
| |
| return true; |
| } |
| |
| virtual void setC2RestResponse(const std::string& url, const std::string& name) { |
| response_ = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ {" |
| "\"operation\" : \"update\", " |
| "\"operationid\" : \"8675309\", " |
| "\"name\": \"" + name + "\", \"content\": { \"location\": \"" + url + "\"}}]}"; |
| } |
| |
| std::atomic<size_t> calls_{0}; |
| private: |
| std::string test_file_location_; |
| std::string response_; |
| }; |
| |
| class C2FailedUpdateHandler : public C2UpdateHandler { |
| public: |
| explicit C2FailedUpdateHandler(const std::string& test_file_location) |
| : C2UpdateHandler(test_file_location) { |
| } |
| |
| bool handlePost(CivetServer *server, struct mg_connection *conn) override { |
| calls_++; |
| const auto data = readPayload(conn); |
| |
| if (data.find("operationState") != std::string::npos) { |
| assert(data.find("state\": \"NOT_APPLIED") != std::string::npos); |
| } |
| |
| return C2UpdateHandler::handlePost(server, conn); |
| } |
| }; |
| |
| class InvokeHTTPCouldNotConnectHandler : public ServerAwareHandler { |
| }; |
| |
| class InvokeHTTPResponseOKHandler : public ServerAwareHandler { |
| public: |
| bool handlePost(CivetServer *, struct mg_connection *conn) override { |
| mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"); |
| return true; |
| } |
| }; |
| |
| class InvokeHTTPResponse404Handler : public ServerAwareHandler { |
| public: |
| bool handlePost(CivetServer *, struct mg_connection *conn) override { |
| mg_printf(conn, "HTTP/1.1 404 Not Found\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"); |
| return true; |
| } |
| }; |
| |
| class InvokeHTTPResponse501Handler : public ServerAwareHandler { |
| public: |
| bool handlePost(CivetServer *, struct mg_connection *conn) override { |
| mg_printf(conn, "HTTP/1.1 501 Not Implemented\r\nContent-Type: text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"); |
| return true; |
| } |
| }; |
| |
| class TimeoutingHTTPHandler : public ServerAwareHandler { |
| public: |
| TimeoutingHTTPHandler(std::vector<std::chrono::milliseconds> wait_times) |
| : wait_times_(wait_times) { |
| } |
| bool handlePost(CivetServer *, struct mg_connection *conn) override { |
| respond(conn); |
| return true; |
| } |
| bool handleGet(CivetServer *, struct mg_connection *conn) override { |
| respond(conn); |
| return true; |
| } |
| bool handleDelete(CivetServer *, struct mg_connection *conn) override { |
| respond(conn); |
| return true; |
| } |
| bool handlePut(CivetServer *, struct mg_connection *conn) override { |
| respond(conn); |
| return true; |
| } |
| private: |
| void respond(struct mg_connection *conn) { |
| if (!wait_times_.empty() && wait_times_[0] > std::chrono::seconds(0)) { |
| sleep_for(wait_times_[0]); |
| } |
| int chunk_count = std::max(static_cast<int>(wait_times_.size()) - 1, 0); |
| mg_printf(conn, "HTTP/1.1 201 OK\r\nContent-Type: text/plain\r\nContent-Length: %d\r\nConnection: close\r\n\r\n", chunk_count); |
| for (int chunkIdx = 0; chunkIdx < chunk_count; ++chunkIdx) { |
| mg_printf(conn, "a"); |
| if (wait_times_[chunkIdx + 1].count() > 0) { |
| sleep_for(wait_times_[chunkIdx + 1]); |
| } |
| } |
| } |
| std::vector<std::chrono::milliseconds> wait_times_; |
| }; |
| #endif /* LIBMINIFI_TEST_CURL_TESTS_SITETOSITEHTTP_HTTPHANDLERS_H_ */ |