blob: 8649ebec1a8f2b4e23977adaaed53643035b1223 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/es/es_scan_reader.h"
#include <gtest/gtest.h>
#include <map>
#include <string>
#include <vector>
#include "common/logging.h"
#include "exec/es/es_scroll_query.h"
#include "http/ev_http_server.h"
#include "http/http_channel.h"
#include "http/http_handler.h"
#include "http/http_request.h"
#include "rapidjson/document.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"
namespace doris {
class RestSearchAction : public HttpHandler {
public:
void handle(HttpRequest* req) override {
std::string user;
std::string passwd;
if (!parse_basic_auth(*req, &user, &passwd) || user != "root") {
HttpChannel::send_basic_challenge(req, "abc");
return;
}
req->add_output_header(HttpHeaders::CONTENT_TYPE, "application/json");
if (req->method() == HttpMethod::POST) {
std::string post_body = req->get_request_body();
rapidjson::Document post_doc;
post_doc.Parse<0>(post_body.c_str());
int size = 1;
if (post_doc.HasMember("size")) {
rapidjson::Value& size_value = post_doc["size"];
size = size_value.GetInt();
}
std::string _scroll_id(std::to_string(size));
rapidjson::Document search_result;
rapidjson::Document::AllocatorType& allocator = search_result.GetAllocator();
search_result.SetObject();
rapidjson::Value scroll_id_value(_scroll_id.c_str(), allocator);
search_result.AddMember("_scroll_id", scroll_id_value, allocator);
rapidjson::Value outer_hits(rapidjson::kObjectType);
outer_hits.AddMember("total", 10, allocator);
rapidjson::Value inner_hits(rapidjson::kArrayType);
rapidjson::Value source_document(rapidjson::kObjectType);
source_document.AddMember("id", 1, allocator);
rapidjson::Value value_node("1", allocator);
source_document.AddMember("value", value_node, allocator);
inner_hits.PushBack(source_document, allocator);
outer_hits.AddMember("hits", inner_hits, allocator);
search_result.AddMember("hits", outer_hits, allocator);
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
search_result.Accept(writer);
//send DELETE scroll post request
std::string search_result_json = buffer.GetString();
HttpChannel::send_reply(req, search_result_json);
} else {
std::string response = "test1";
HttpChannel::send_reply(req, response);
}
}
};
class RestSearchScrollAction : public HttpHandler {
public:
void handle(HttpRequest* req) override {
std::string user;
std::string passwd;
if (!parse_basic_auth(*req, &user, &passwd) || user != "root") {
HttpChannel::send_basic_challenge(req, "abc");
return;
}
if (req->method() == HttpMethod::POST) {
std::string post_body = req->get_request_body();
rapidjson::Document post_doc;
post_doc.Parse<0>(post_body.c_str());
std::string scroll_id;
if (!post_doc.HasMember("scroll_id")) {
HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, "invalid scroll request");
return;
} else {
rapidjson::Value& scroll_id_value = post_doc["scroll_id"];
scroll_id = scroll_id_value.GetString();
int offset = atoi(scroll_id.c_str());
if (offset > 10) {
rapidjson::Document end_search_result;
rapidjson::Document::AllocatorType& allocator =
end_search_result.GetAllocator();
end_search_result.SetObject();
rapidjson::Value scroll_id_value("11", allocator);
end_search_result.AddMember("_scroll_id", scroll_id_value, allocator);
rapidjson::Value outer_hits(rapidjson::kObjectType);
outer_hits.AddMember("total", 0, allocator);
end_search_result.AddMember("hits", outer_hits, allocator);
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
end_search_result.Accept(writer);
//send DELETE scroll post request
std::string end_search_result_json = buffer.GetString();
HttpChannel::send_reply(req, end_search_result_json);
return;
} else {
int start = offset + 1;
rapidjson::Document search_result;
rapidjson::Document::AllocatorType& allocator = search_result.GetAllocator();
search_result.SetObject();
rapidjson::Value scroll_id_value(std::to_string(start).c_str(), allocator);
search_result.AddMember("_scroll_id", scroll_id_value, allocator);
rapidjson::Value outer_hits(rapidjson::kObjectType);
outer_hits.AddMember("total", 1, allocator);
rapidjson::Value inner_hits(rapidjson::kArrayType);
rapidjson::Value source_document(rapidjson::kObjectType);
source_document.AddMember("id", start, allocator);
rapidjson::Value value_node(std::to_string(start).c_str(), allocator);
source_document.AddMember("value", value_node, allocator);
inner_hits.PushBack(source_document, allocator);
outer_hits.AddMember("hits", inner_hits, allocator);
search_result.AddMember("hits", outer_hits, allocator);
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
search_result.Accept(writer);
//send DELETE scroll post request
std::string search_result_json = buffer.GetString();
HttpChannel::send_reply(req, search_result_json);
return;
}
}
}
}
};
class RestClearScrollAction : public HttpHandler {
public:
void handle(HttpRequest* req) override {
std::string user;
std::string passwd;
if (!parse_basic_auth(*req, &user, &passwd) || user != "root") {
HttpChannel::send_basic_challenge(req, "abc");
return;
}
if (req->method() == HttpMethod::DELETE) {
std::string post_body = req->get_request_body();
rapidjson::Document post_doc;
post_doc.Parse<0>(post_body.c_str());
std::string scroll_id;
if (!post_doc.HasMember("scroll_id")) {
HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, "invalid scroll request");
return;
} else {
rapidjson::Document clear_scroll_result;
rapidjson::Document::AllocatorType& allocator = clear_scroll_result.GetAllocator();
clear_scroll_result.SetObject();
clear_scroll_result.AddMember("succeeded", true, allocator);
clear_scroll_result.AddMember("num_freed", 1, allocator);
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
clear_scroll_result.Accept(writer);
std::string clear_scroll_result_json = buffer.GetString();
HttpChannel::send_reply(req, clear_scroll_result_json);
return;
}
}
}
};
static RestSearchAction rest_search_action = RestSearchAction();
static RestSearchScrollAction rest_search_scroll_action = RestSearchScrollAction();
static RestClearScrollAction rest_clear_scroll_action = RestClearScrollAction();
static EvHttpServer* mock_es_server = nullptr;
static int real_port = 0;
class MockESServerTest : public testing::Test {
public:
MockESServerTest() {}
~MockESServerTest() override {}
static void SetUpTestCase() {
mock_es_server = new EvHttpServer(0);
mock_es_server->register_handler(POST, "/{index}/{type}/_search", &rest_search_action);
mock_es_server->register_handler(POST, "/_search/scroll", &rest_search_scroll_action);
mock_es_server->register_handler(DELETE, "/_search/scroll", &rest_clear_scroll_action);
mock_es_server->start();
real_port = mock_es_server->get_real_port();
ASSERT_NE(0, real_port);
}
static void TearDownTestCase() { delete mock_es_server; }
};
TEST_F(MockESServerTest, workflow) {
std::string target = "http://127.0.0.1:" + std::to_string(real_port);
std::vector<std::string> fields = {"id", "value"};
std::map<std::string, std::string> props;
props[ESScanReader::KEY_INDEX] = "tindex";
props[ESScanReader::KEY_TYPE] = "doc";
props[ESScanReader::KEY_USER_NAME] = "root";
props[ESScanReader::KEY_PASS_WORD] = "root";
props[ESScanReader::KEY_SHARD] = "0";
props[ESScanReader::KEY_BATCH_SIZE] = "1";
std::vector<EsPredicate*> predicates;
std::map<std::string, std::string> docvalue_context;
bool doc_value_mode = false;
props[ESScanReader::KEY_QUERY] = ESScrollQueryBuilder::build(props, fields, predicates,
docvalue_context, &doc_value_mode);
ESScanReader reader(target, props, doc_value_mode);
auto st = reader.open();
ASSERT_TRUE(st.ok());
bool eos = false;
std::unique_ptr<ScrollParser> parser = nullptr;
while (!eos) {
st = reader.get_next(&eos, parser);
ASSERT_TRUE(st.ok());
if (eos) {
break;
}
}
auto cst = reader.close();
ASSERT_TRUE(cst.ok());
}
} // namespace doris
int main(int argc, char* argv[]) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}