blob: fc89341239f9938286cb19f4efddd1a95c565c88 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/es/es_scroll_query.h"
#include <glog/logging.h>
#include <rapidjson/encodings.h>
#include <rapidjson/rapidjson.h>
#include <stdlib.h>
#include <sstream>
#include "exec/es/es_scan_reader.h"
#include "rapidjson/document.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"
namespace doris {
ESScrollQueryBuilder::ESScrollQueryBuilder() {}
ESScrollQueryBuilder::~ESScrollQueryBuilder() {}
std::string ESScrollQueryBuilder::build_next_scroll_body(const std::string& scroll_id,
const std::string& scroll) {
rapidjson::Document scroll_dsl;
rapidjson::Document::AllocatorType& allocator = scroll_dsl.GetAllocator();
scroll_dsl.SetObject();
rapidjson::Value scroll_id_value(scroll_id.c_str(), allocator);
scroll_dsl.AddMember("scroll_id", scroll_id_value, allocator);
rapidjson::Value scroll_value(scroll.c_str(), allocator);
scroll_dsl.AddMember("scroll", scroll_value, allocator);
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
scroll_dsl.Accept(writer);
return buffer.GetString();
}
std::string ESScrollQueryBuilder::build_clear_scroll_body(const std::string& scroll_id) {
rapidjson::Document delete_scroll_dsl;
rapidjson::Document::AllocatorType& allocator = delete_scroll_dsl.GetAllocator();
delete_scroll_dsl.SetObject();
rapidjson::Value scroll_id_value(scroll_id.c_str(), allocator);
delete_scroll_dsl.AddMember("scroll_id", scroll_id_value, allocator);
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
delete_scroll_dsl.Accept(writer);
return buffer.GetString();
}
std::string ESScrollQueryBuilder::build(const std::map<std::string, std::string>& properties,
const std::vector<std::string>& fields,
const std::map<std::string, std::string>& docvalue_context,
bool* doc_value_mode) {
rapidjson::Document es_query_dsl;
rapidjson::Document::AllocatorType& allocator = es_query_dsl.GetAllocator();
es_query_dsl.SetObject();
// generate the filter clause
rapidjson::Document scratch_document;
rapidjson::Value query_node(rapidjson::kObjectType);
// use fe generate dsl, it must be placed outside the if, otherwise it will cause problems in AddMember
rapidjson::Document fe_query_dsl;
DCHECK(properties.find(ESScanReader::KEY_QUERY_DSL) != properties.end());
auto query_dsl = properties.at(ESScanReader::KEY_QUERY_DSL);
es_query_dsl.AddMember("query", fe_query_dsl.Parse(query_dsl.c_str(), query_dsl.length()),
allocator);
// Doris FE already has checked docvalue-scan optimization
bool pure_docvalue = true;
if (properties.find(ESScanReader::KEY_DOC_VALUES_MODE) != properties.end()) {
pure_docvalue = atoi(properties.at(ESScanReader::KEY_DOC_VALUES_MODE).c_str());
} else {
// check docvalue scan optimization, used for compatibility
if (docvalue_context.size() == 0 || docvalue_context.size() < fields.size()) {
pure_docvalue = false;
} else {
for (auto& select_field : fields) {
if (docvalue_context.find(select_field) == docvalue_context.end()) {
pure_docvalue = false;
break;
}
}
}
}
*doc_value_mode = pure_docvalue;
rapidjson::Value source_node(rapidjson::kArrayType);
if (pure_docvalue) {
for (auto& select_field : fields) {
rapidjson::Value field(docvalue_context.at(select_field).c_str(), allocator);
source_node.PushBack(field, allocator);
}
} else {
for (auto& select_field : fields) {
rapidjson::Value field(select_field.c_str(), allocator);
source_node.PushBack(field, allocator);
}
}
// just filter the selected fields for reducing the network cost
if (pure_docvalue) {
es_query_dsl.AddMember("stored_fields", "_none_", allocator);
es_query_dsl.AddMember("docvalue_fields", source_node, allocator);
} else {
es_query_dsl.AddMember("_source", source_node, allocator);
}
int size;
if (properties.find(ESScanReader::KEY_TERMINATE_AFTER) != properties.end()) {
size = atoi(properties.at(ESScanReader::KEY_TERMINATE_AFTER).c_str());
} else {
size = atoi(properties.at(ESScanReader::KEY_BATCH_SIZE).c_str());
}
std::string shard_id;
if (properties.find(ESScanReader::KEY_SHARD) != properties.end()) {
shard_id = properties.at(ESScanReader::KEY_SHARD);
}
// To maintain consistency with the query, when shard_id is negative, do not add sort node in scroll request body.
if (!shard_id.empty() && std::stoi(shard_id) >= 0) {
rapidjson::Value sort_node(rapidjson::kArrayType);
// use the scroll-scan mode for scan index documents
rapidjson::Value field("_doc", allocator);
sort_node.PushBack(field, allocator);
es_query_dsl.AddMember("sort", sort_node, allocator);
}
// number of documents returned
es_query_dsl.AddMember("size", size, allocator);
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
es_query_dsl.Accept(writer);
std::string es_query_dsl_json = buffer.GetString();
LOG(INFO) << "Generated ES queryDSL [ " << es_query_dsl_json << " ]";
return es_query_dsl_json;
}
} // namespace doris