| /* |
| * Copyright (c) 2019-2022 ExpoLab, UC Davis |
| * |
| * Permission is hereby granted, free of charge, to any person |
| * obtaining a copy of this software and associated documentation |
| * files (the "Software"), to deal in the Software without |
| * restriction, including without limitation the rights to use, |
| * copy, modify, merge, publish, distribute, sublicense, and/or |
| * sell copies of the Software, and to permit persons to whom the |
| * Software is furnished to do so, subject to the following conditions: |
| * |
| * The above copyright notice and this permission notice shall be |
| * included in all copies or substantial portions of the Software. |
| * |
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, |
| * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES |
| * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND |
| * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT |
| * HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, |
| * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER |
| * DEALINGS IN THE SOFTWARE. |
| * |
| */ |
| |
| #include "service/http_server/crow_service.h" |
| |
| #include <glog/logging.h> |
| #include <rapidjson/document.h> |
| #include <rapidjson/stringbuffer.h> |
| #include <rapidjson/writer.h> |
| #include <stdint.h> |
| #include <stdlib.h> |
| #include <sys/stat.h> |
| #include <sys/types.h> |
| #include <unistd.h> |
| |
| #include <ctime> |
| #include <fstream> |
| #include <memory> |
| #include <string> |
| #include <thread> |
| |
| #include <unordered_set> |
| #include <mutex> |
| |
| |
| using crow::request; |
| using crow::response; |
| using resdb::ResDBConfig; |
| using resdb::BatchUserRequest; |
| |
| namespace sdk { |
| |
| CrowService::CrowService(ResDBConfig config, ResDBConfig server_config, |
| uint16_t port_num) |
| : config_(config), |
| server_config_(server_config), |
| port_num_(port_num), |
| kv_client_(config_), |
| txn_client_(server_config_) {} |
| |
| void CrowService::run() { |
| crow::SimpleApp app; |
| |
| // For adding and removing websocket connections |
| std::mutex mtx; |
| |
| // Get all values |
| CROW_ROUTE(app, "/v1/transactions/getall") |
| ([this](const crow::request& req, response& res) { |
| auto values = kv_client_.GetValues(); |
| if (values != nullptr) { |
| LOG(INFO) << "client getvalues value = " << values->c_str(); |
| |
| // Send updated blocks list to websocket |
| if (users.size() > 0) { |
| for (auto u : users) |
| u->send_text("Update blocks"); |
| } |
| |
| res.set_header("Content-Type", "application/json"); |
| res.end(std::string(values->c_str())); |
| } else { |
| res.code = internal_server_error_code; |
| res.set_header("Content-Type", "text/plain"); |
| res.end("getvalues fail"); |
| } |
| }); |
| |
| // Get value of specific id |
| // <ip>:18000/v1/transactions/get?id='sampleid' |
| CROW_ROUTE(app, "/v1/transactions/get") |
| ([this](const crow::request& req, response& res) { |
| const std::string id = req.url_params.get("id"); |
| auto value = kv_client_.Get(id); |
| if (value != nullptr) { |
| LOG(INFO) << "client get value = " << value->c_str(); |
| |
| // Send updated blocks list to websocket |
| if (users.size() > 0) { |
| for (auto u : users) |
| u->send_text("Update blocks"); |
| } |
| |
| res.set_header("Content-Type", "application/json"); |
| res.end(std::string(value->c_str())); |
| } else { |
| res.code = internal_server_error_code; |
| res.set_header("Content-Type", "text/plain"); |
| res.end("get value fail\n"); |
| } |
| }); |
| |
| // Get values based on key range |
| // <ip>:18000/v1/transactions/getrange?"min_id=<samplemin>&max_id=<samplemax>" |
| CROW_ROUTE(app, "/v1/transactions/getrange") |
| ([this](const crow::request& req, response& res) { |
| if (req.url_params.get("min_id") == nullptr) { |
| res.code = bad_request_code; |
| res.set_header("Content-Type", "text/plain"); |
| res.end("no min_id specified\nExample usage: <ip>:18000/v1/transactions/getrange?\"min_id=<samplemin>&max_id=<samplemax>\"\n"); |
| return; |
| } |
| if (req.url_params.get("max_id") == nullptr) { |
| res.code = bad_request_code; |
| res.set_header("Content-Type", "text/plain"); |
| res.end("no max_id specified\nExample usage: <ip>:18000/v1/transactions/getrange?\"min_id=<samplemin>&max_id=<samplemax>\"\n"); |
| return; |
| } |
| |
| const std::string min_id = req.url_params.get("min_id"); |
| const std::string max_id = req.url_params.get("max_id"); |
| |
| auto value = kv_client_.GetRange(min_id, max_id); |
| if (value != nullptr) { |
| LOG(INFO) << "client getrange value = " << value->c_str(); |
| |
| // Send updated blocks list to websocket |
| if (users.size() > 0) { |
| for (auto u : users) |
| u->send_text("Update blocks"); |
| } |
| |
| res.set_header("Content-Type", "application/json"); |
| res.end(std::string(value->c_str())); |
| } else { |
| res.code = internal_server_error_code; |
| res.set_header("Content-Type", "text/plain"); |
| res.end("getrange fail\n"); |
| } |
| }); |
| |
| // Commit a key-value pair, extracting the id parameter from the JSON |
| // object and setting the value as the entire JSON object |
| CROW_ROUTE(app, "/v1/transactions/commit") |
| .methods("POST"_method)([this](const request& req) { |
| std::string body = req.body; |
| resdb::SDKTransaction transaction = resdb::ParseSDKTransaction(body); |
| const std::string id = transaction.id; |
| const std::string value = transaction.value; |
| |
| // Set key-value pair in kv server |
| int retval = kv_client_.Set(id, value); |
| |
| if (retval != 0) { |
| LOG(ERROR) << "Error when trying to commit id " << id; |
| response res(internal_server_error_code, "id: " + id); |
| res.set_header("Content-Type", "text/plain"); |
| return res; |
| } |
| LOG(INFO) << "Set " << id << " to " << value; |
| |
| // Send updated blocks list to websocket |
| if (users.size() > 0) { |
| for (auto u : users) |
| u->send_text("Update blocks"); |
| } |
| |
| response res(created_code, "id: " + id); // Created status code |
| res.set_header("Content-Type", "text/plain"); |
| return res; |
| }); |
| |
| CROW_ROUTE(app, "/v1/blocks/getall")([this](const crow::request& req, response& res) { |
| auto values = GetAllBlocks(1); |
| res.set_header("Content-Type", "application/json"); |
| res.end(values); |
| }); |
| |
| // Retrieve blocks in batches of size of the int parameter |
| CROW_ROUTE(app, "/v1/blocks/get") |
| ([this](const crow::request& req, response& res) { |
| if (req.url_params.get("batch_size") == nullptr) { |
| res.code = bad_request_code; |
| res.set_header("Content-Type", "text/plain"); |
| res.end("no batch_size specified\nExample usage: <ip>:18000/v1/blocks/get?batch_size=1>\"\n"); |
| return; |
| } |
| // TODO: catch conversion error |
| int batch_size = atoi(req.url_params.get("batch_size")); |
| auto values = GetAllBlocks(batch_size); |
| if (values == "") { |
| res.code = internal_server_error_code; |
| res.set_header("Content-Type", "text/plain"); |
| res.end("get replica state fail\n"); |
| exit(1); |
| }; |
| res.set_header("Content-Type", "application/json"); |
| res.end(values); |
| }); |
| |
| // Retrieve blocks within a range |
| CROW_ROUTE(app, "/v1/blocks/getrange") |
| ([this](const crow::request& req, response& res) { |
| if (req.url_params.get("min_seq") == nullptr) { |
| res.code = bad_request_code; |
| res.set_header("Content-Type", "text/plain"); |
| res.end("no min_seq specified\nExample usage: <ip>:18000/v1/blocks/getrange?\"min_seq=1&max_seq=3\">\n"); |
| return; |
| } |
| if (req.url_params.get("max_seq") == nullptr) { |
| res.code = bad_request_code; |
| res.set_header("Content-Type", "text/plain"); |
| res.end("no max_seq specified\nExample usage: <ip>:18000/v1/blocks/getrange?\"min_seq=1&max_seq=3\">\n"); |
| return; |
| } |
| |
| char* pEnd; // dummy pointer used to pass to strtol |
| const uint64_t min_seq = strtol(req.url_params.get("min_seq"), &pEnd, 10); |
| const uint64_t max_seq = strtol(req.url_params.get("max_seq"), &pEnd, 10); |
| |
| if (min_seq == 0 || max_seq == 0 || min_seq > max_seq) { |
| res.code = bad_request_code; |
| res.set_header("Content-Type", "text/plain"); |
| res.end("Invalid range query. Blocks are 1-indexed.\n"); |
| return; |
| } |
| auto resp = txn_client_.GetTxn(min_seq, max_seq); |
| absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( |
| uint64_t min_seq, uint64_t max_seq); |
| if (!resp.ok()) { |
| LOG(ERROR) << "get replica state fail"; |
| res.code = internal_server_error_code; |
| res.set_header("Content-Type", "text/plain"); |
| res.end("get replica state fail\n"); |
| exit(1); |
| } |
| |
| std::string values = "[\n"; |
| bool first_iteration = true; |
| for (auto& txn : *resp) { |
| BatchUserRequest request; |
| KVRequest kv_request; |
| |
| std::string cur_batch_str = ""; |
| if (request.ParseFromString(txn.second)) { |
| LOG(INFO) << request.DebugString(); |
| |
| if (!first_iteration) cur_batch_str.append(","); |
| first_iteration = false; |
| |
| // id |
| uint64_t local_id = request.local_id(); |
| cur_batch_str.append("{\"id\": " + std::to_string(local_id)); |
| |
| // number |
| cur_batch_str.append(", \"number\": \"" + std::to_string(local_id) + "\""); |
| |
| // transactions |
| cur_batch_str.append(", \"transactions\": ["); |
| bool first_transaction = true; |
| for (auto& sub_req : request.user_requests()) { |
| kv_request.ParseFromString(sub_req.request().data()); |
| std::string kv_request_json = ParseKVRequest(kv_request); |
| |
| if (!first_transaction) cur_batch_str.append(","); |
| first_transaction = false; |
| cur_batch_str.append(kv_request_json); |
| cur_batch_str.append("\n"); |
| } |
| cur_batch_str.append("]"); // close transactions list |
| |
| // size |
| cur_batch_str.append(", \"size\": " + std::to_string(request.ByteSizeLong())); |
| |
| // createdAt |
| uint64_t createtime = request.createtime(); |
| cur_batch_str.append(", \"createdAt\": \"" + ParseCreateTime(createtime) + "\""); |
| } |
| cur_batch_str.append("}\n"); |
| values.append(cur_batch_str); |
| } |
| values.append("]\n"); |
| res.set_header("Content-Type", "application/json"); |
| res.end(values); |
| }); |
| |
| CROW_ROUTE(app, "/blockupdatelistener") |
| .websocket() |
| .onopen([&](crow::websocket::connection& conn) { |
| LOG(INFO) << "Opened websocket"; |
| std::lock_guard<std::mutex> _(mtx); |
| users.insert(&conn); |
| }) |
| .onclose([&](crow::websocket::connection& conn, const std::string& reason) { |
| LOG(INFO) << "Closed websocket"; |
| std::lock_guard<std::mutex> _(mtx); |
| users.erase(&conn); |
| }) |
| .onmessage([&](crow::websocket::connection& /*conn*/, const std::string& data, bool is_binary){ |
| // do nothing |
| }); |
| |
| // For metadata table on the Explorer |
| CROW_ROUTE(app, "/populatetable") |
| ([this](const crow::request& req, response& res) { |
| std::vector<resdb::ReplicaInfo> replicas = config_.GetReplicaInfos(); |
| size_t replica_num = replicas[0].id() - 1; |
| uint32_t client_num = config_.GetReplicaNum(); |
| uint32_t worker_num = config_.GetWorkerNum(); |
| uint32_t client_batch_num = config_.ClientBatchNum(); |
| uint32_t max_process_txn = config_.GetMaxProcessTxn(); |
| uint32_t client_batch_wait_time = config_.ClientBatchWaitTimeMS(); |
| uint32_t input_worker_num = config_.GetInputWorkerNum(); |
| uint32_t output_worker_num = config_.GetOutputWorkerNum(); |
| int client_timeout_ms = config_.GetClientTimeoutMs(); |
| int min_data_receive_num = config_.GetMinDataReceiveNum(); |
| size_t max_malicious_replica_num = config_.GetMaxMaliciousReplicaNum(); |
| int checkpoint_water_mark = config_.GetCheckPointWaterMark(); |
| |
| std::string values = ""; |
| values.append("[{ \"replicaNum\": " + std::to_string(replica_num) |
| + ", \"clientNum\": " + std::to_string(client_num) |
| + ", \"workerNum\" : " + std::to_string(worker_num) |
| + ", \"clientBatchNum\" : " + std::to_string(client_batch_num) |
| + ", \"maxProcessTxn\" : " + std::to_string(max_process_txn) |
| + ", \"clientBatchWaitTime\" : " + std::to_string(client_batch_wait_time) |
| + ", \"inputWorkerNum\" : " + std::to_string(input_worker_num) |
| + ", \"outputWorkerNum\" : " + std::to_string(output_worker_num) |
| + ", \"clientTimeoutMs\" : " + std::to_string(client_timeout_ms) |
| + ", \"minDataReceiveNum\" : " + std::to_string(min_data_receive_num) |
| + ", \"maxMaliciousReplicaNum\" : " + std::to_string(max_malicious_replica_num) |
| + ", \"checkpointWaterMark\" : " + std::to_string(checkpoint_water_mark) |
| + "" "}]"); |
| LOG(INFO) << std::string(values.c_str()); |
| res.set_header("Content-Type", "application/json"); |
| res.end(std::string(values.c_str())); |
| }); |
| |
| // Run the Crow app |
| app.port(port_num_).multithreaded().run(); |
| } |
| |
| // If batch_size is 1, the function will not add the extra outer [] braces |
| // Otherwise, a list of lists of blocks will be returned |
| std::string CrowService::GetAllBlocks(int batch_size) { |
| int min_seq = 1; |
| bool full_batches = true; |
| |
| std::string values = "[\n"; |
| bool first_batch = true; |
| while (full_batches) { |
| std::string cur_batch_str = ""; |
| if (!first_batch) cur_batch_str.append(",\n"); |
| if (batch_size > 1) cur_batch_str.append("["); |
| first_batch = false; |
| |
| int max_seq = min_seq + batch_size - 1; |
| auto resp = txn_client_.GetTxn(min_seq, max_seq); |
| absl::StatusOr<std::vector<std::pair<uint64_t, std::string>>> GetTxn( |
| uint64_t min_seq, uint64_t max_seq); |
| if (!resp.ok()) { |
| LOG(ERROR) << "get replica state fail"; |
| return ""; |
| }; |
| |
| int cur_size = 0; |
| bool first_batch_element = true; |
| for (auto& txn : *resp) { |
| BatchUserRequest request; |
| KVRequest kv_request; |
| cur_size++; |
| if (request.ParseFromString(txn.second)) { |
| LOG(INFO) << request.DebugString(); |
| |
| if (!first_batch_element) cur_batch_str.append(","); |
| first_batch_element = false; |
| |
| // id |
| uint64_t local_id = request.local_id(); |
| cur_batch_str.append("{\"id\": " + std::to_string(local_id)); |
| |
| // number |
| cur_batch_str.append(", \"number\": \"" + std::to_string(local_id) + "\""); |
| |
| // transactions |
| cur_batch_str.append(", \"transactions\": ["); |
| bool first_transaction = true; |
| for (auto& sub_req : request.user_requests()) { |
| kv_request.ParseFromString(sub_req.request().data()); |
| std::string kv_request_json = ParseKVRequest(kv_request); |
| |
| if (!first_transaction) cur_batch_str.append(","); |
| first_transaction = false; |
| cur_batch_str.append(kv_request_json); |
| cur_batch_str.append("\n"); |
| } |
| cur_batch_str.append("]"); // close transactions list |
| |
| // size |
| cur_batch_str.append(", \"size\": " + std::to_string(request.ByteSizeLong())); |
| |
| // createdAt |
| uint64_t createtime = request.createtime(); |
| cur_batch_str.append(", \"createdAt\": \"" + ParseCreateTime(createtime) + "\""); |
| } |
| cur_batch_str.append("}\n"); |
| } |
| full_batches = cur_size == batch_size; |
| if (batch_size > 1) cur_batch_str.append("]"); |
| |
| if (cur_size > 0) values.append(cur_batch_str); |
| |
| min_seq += batch_size; |
| } |
| |
| values.append("\n]\n"); |
| |
| return values; |
| } |
| |
| // Helper function used by the blocks endpoints to create JSON strings |
| std::string CrowService::ParseKVRequest(const KVRequest& kv_request) { |
| rapidjson::Document doc; |
| if (kv_request.cmd() == 1) { // SET |
| doc.SetObject(); |
| rapidjson::Document::AllocatorType& allocator = doc.GetAllocator(); |
| rapidjson::Value val(rapidjson::kObjectType); |
| doc.AddMember("cmd", "SET", allocator); |
| doc.AddMember("key", kv_request.key(), allocator); |
| doc.AddMember("value", kv_request.value(), allocator); |
| } else if (kv_request.cmd() == 2) { // GET |
| doc.SetObject(); |
| rapidjson::Document::AllocatorType& allocator = doc.GetAllocator(); |
| rapidjson::Value val(rapidjson::kObjectType); |
| doc.AddMember("cmd", "GET", allocator); |
| doc.AddMember("key", kv_request.key(), allocator); |
| } else if (kv_request.cmd() == 3) { // GETVALUES |
| doc.SetObject(); |
| rapidjson::Document::AllocatorType& allocator = doc.GetAllocator(); |
| rapidjson::Value val(rapidjson::kObjectType); |
| doc.AddMember("cmd", "GETVALUES", allocator); |
| } else if (kv_request.cmd() == 4) { // GETRANGE |
| doc.SetObject(); |
| rapidjson::Document::AllocatorType& allocator = doc.GetAllocator(); |
| rapidjson::Value val(rapidjson::kObjectType); |
| doc.AddMember("cmd", "GETRANGE", allocator); |
| doc.AddMember("min_key", kv_request.key(), allocator); |
| doc.AddMember("max_key", kv_request.value(), allocator); |
| } |
| |
| rapidjson::StringBuffer buffer; |
| rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); |
| doc.Accept(writer); |
| return buffer.GetString(); |
| } |
| |
| std::string CrowService::ParseCreateTime(uint64_t createtime) { |
| std::string timestr = ""; |
| uint64_t sec = createtime / 1000000; // see resilientdb/common/utils/utils.cpp |
| |
| std::tm *tm_gmt = std::gmtime((time_t*) &sec); |
| int year = tm_gmt->tm_year + 1900; |
| int month = tm_gmt->tm_mon; // 0-indexed |
| int day = tm_gmt-> tm_mday; |
| |
| std::string months[12] = {"Jan", "Feb", "Mar", "Apr", "May", "Jun", "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"}; |
| |
| // Using date time string format to support the Explorer transaction chart |
| if (day < 10) timestr += "0"; |
| timestr += std::to_string(day) + " " + months[month] + " " + std::to_string(year) + " "; |
| timestr += std::to_string(tm_gmt->tm_hour) + ":" + std::to_string(tm_gmt->tm_min) + ":" + std::to_string(tm_gmt->tm_sec) + " GMT"; |
| |
| return timestr; |
| } |
| |
| |
| } // namespace resdb |