| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/es/es_scan_reader.h" |
| |
| #include <stdlib.h> |
| |
| #include <map> |
| #include <sstream> |
| #include <string> |
| |
| #include "common/config.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "exec/es/es_scroll_parser.h" |
| #include "exec/es/es_scroll_query.h" |
| #include "http/http_method.h" |
| |
| namespace doris { |
| |
| // hits.hits._id used for obtain ES document `_id` |
| const std::string SOURCE_SCROLL_SEARCH_FILTER_PATH = |
| "filter_path=_scroll_id,hits.hits._source,hits.total,hits.hits._id"; |
| // hits.hits._score used for processing field not exists in one batch |
| const std::string DOCVALUE_SCROLL_SEARCH_FILTER_PATH = |
| "filter_path=_scroll_id,hits.total,hits.hits._score,hits.hits.fields"; |
| |
| const std::string REQUEST_PREFERENCE_PREFIX = "&preference=_shards:"; |
| const std::string REQUEST_SEARCH_SCROLL_PATH = "/_search/scroll"; |
| const std::string REQUEST_SEPARATOR = "/"; |
| |
| ESScanReader::ESScanReader(const std::string& target, |
| const std::map<std::string, std::string>& props, bool doc_value_mode) |
| : _scroll_keep_alive(config::es_scroll_keepalive), |
| _http_timeout_ms(config::es_http_timeout_ms), |
| _doc_value_mode(doc_value_mode) { |
| _target = target; |
| _index = props.at(KEY_INDEX); |
| if (props.find(KEY_TYPE) != props.end()) { |
| _type = props.at(KEY_TYPE); |
| } |
| if (props.find(KEY_USER_NAME) != props.end()) { |
| _user_name = props.at(KEY_USER_NAME); |
| } |
| if (props.find(KEY_PASS_WORD) != props.end()) { |
| _passwd = props.at(KEY_PASS_WORD); |
| } |
| if (props.find(KEY_SHARD) != props.end()) { |
| _shards = props.at(KEY_SHARD); |
| } |
| if (props.find(KEY_QUERY) != props.end()) { |
| _query = props.at(KEY_QUERY); |
| } |
| if (props.find(KEY_HTTP_SSL_ENABLED) != props.end()) { |
| std::istringstream(props.at(KEY_HTTP_SSL_ENABLED)) >> std::boolalpha >> _use_ssl_client; |
| } |
| |
| std::string batch_size_str = props.at(KEY_BATCH_SIZE); |
| _batch_size = atoi(batch_size_str.c_str()); |
| std::string filter_path = |
| _doc_value_mode ? DOCVALUE_SCROLL_SEARCH_FILTER_PATH : SOURCE_SCROLL_SEARCH_FILTER_PATH; |
| |
| // When shard_id is negative(-1), the request will be sent to ES without shard preference. |
| int32_t shard_id = std::stoi(_shards); |
| if (props.find(KEY_TERMINATE_AFTER) != props.end()) { |
| _exactly_once = true; |
| std::stringstream scratch; |
| // just send a normal search against the elasticsearch with additional terminate_after param to achieve terminate early effect when limit take effect |
| if (_type.empty()) { |
| if (shard_id < 0) { |
| scratch << _target << REQUEST_SEPARATOR << _index << "/_search?" << filter_path; |
| } else { |
| // `terminate_after` and `size` can not be used together in scroll request of ES 8.x |
| scratch << _target << REQUEST_SEPARATOR << _index << "/_search?" |
| << REQUEST_PREFERENCE_PREFIX << _shards << "&" << filter_path; |
| } |
| } else { |
| if (shard_id < 0) { |
| scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type |
| << "/_search?" |
| << "terminate_after=" << props.at(KEY_TERMINATE_AFTER) << "&" |
| << filter_path; |
| } else { |
| scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type |
| << "/_search?" |
| << "terminate_after=" << props.at(KEY_TERMINATE_AFTER) |
| << REQUEST_PREFERENCE_PREFIX << _shards << "&" << filter_path; |
| } |
| } |
| _search_url = scratch.str(); |
| } else { |
| _exactly_once = false; |
| std::stringstream scratch; |
| // scroll request for scanning |
| // add terminate_after for the first scroll to avoid decompress all postings list |
| if (_type.empty()) { |
| if (shard_id < 0) { |
| scratch << _target << REQUEST_SEPARATOR << _index << "/_search?" |
| << "scroll=" << _scroll_keep_alive << "&" << filter_path; |
| } else { |
| // `terminate_after` and `size` can not be used together in scroll request of ES 8.x |
| scratch << _target << REQUEST_SEPARATOR << _index << "/_search?" |
| << "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards |
| << "&" << filter_path; |
| } |
| } else { |
| if (shard_id < 0) { |
| scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type |
| << "/_search?" |
| << "scroll=" << _scroll_keep_alive << "&" << filter_path |
| << "&terminate_after=" << batch_size_str; |
| } else { |
| scratch << _target << REQUEST_SEPARATOR << _index << REQUEST_SEPARATOR << _type |
| << "/_search?" |
| << "scroll=" << _scroll_keep_alive << REQUEST_PREFERENCE_PREFIX << _shards |
| << "&" << filter_path << "&terminate_after=" << batch_size_str; |
| } |
| } |
| _init_scroll_url = scratch.str(); |
| _next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + filter_path; |
| } |
| _eos = false; |
| } |
| |
| ESScanReader::~ESScanReader() {} |
| |
| Status ESScanReader::open() { |
| _is_first = true; |
| // we do not enable set_fail_on_error for ES http request to get more detail error messages |
| bool set_fail_on_error = false; |
| if (_exactly_once) { |
| RETURN_IF_ERROR(_network_client.init(_search_url, set_fail_on_error)); |
| LOG(INFO) << "search request URL: " << _search_url; |
| } else { |
| RETURN_IF_ERROR(_network_client.init(_init_scroll_url, set_fail_on_error)); |
| LOG(INFO) << "First scroll request URL: " << _init_scroll_url; |
| } |
| _network_client.set_basic_auth(_user_name, _passwd); |
| _network_client.set_content_type("application/json"); |
| _network_client.set_timeout_ms(_http_timeout_ms); |
| if (_use_ssl_client) { |
| _network_client.use_untrusted_ssl(); |
| } |
| // phase open, we cached the first response for `get_next` phase |
| Status status = _network_client.execute_post_request(_query, &_cached_response); |
| if (!status.ok() || _network_client.get_http_status() != 200) { |
| std::stringstream ss; |
| ss << "Failed to connect to ES server, errmsg is: " << status |
| << ", response: " << _cached_response; |
| LOG(WARNING) << ss.str(); |
| return Status::InternalError(ss.str()); |
| } |
| VLOG_CRITICAL << "open _cached response: " << _cached_response; |
| return Status::OK(); |
| } |
| |
| Status ESScanReader::get_next(bool* scan_eos, std::unique_ptr<ScrollParser>& scroll_parser) { |
| std::string response; |
| // if is first scroll request, should return the cached response |
| *scan_eos = true; |
| if (_eos) { |
| return Status::OK(); |
| } |
| |
| if (_is_first) { |
| response = _cached_response; |
| _is_first = false; |
| } else { |
| if (_exactly_once) { |
| return Status::OK(); |
| } |
| // we do not enable set_fail_on_error for ES http request to get more detail error messages |
| bool set_fail_on_error = false; |
| RETURN_IF_ERROR(_network_client.init(_next_scroll_url, set_fail_on_error)); |
| _network_client.set_basic_auth(_user_name, _passwd); |
| _network_client.set_content_type("application/json"); |
| _network_client.set_timeout_ms(_http_timeout_ms); |
| if (_use_ssl_client) { |
| _network_client.use_untrusted_ssl(); |
| } |
| RETURN_IF_ERROR(_network_client.execute_post_request( |
| ESScrollQueryBuilder::build_next_scroll_body(_scroll_id, _scroll_keep_alive), |
| &response)); |
| long status = _network_client.get_http_status(); |
| if (status == 404) { |
| LOG(WARNING) << "request scroll search failure 404[" |
| << ", response: " << (response.empty() ? "empty response" : response) |
| << "]"; |
| return Status::InternalError("No search context found for {}", _scroll_id); |
| } |
| if (status != 200) { |
| LOG(WARNING) << "request scroll search failure[" |
| << "http status: " << status |
| << ", response: " << (response.empty() ? "empty response" : response) |
| << "]"; |
| return Status::InternalError("request scroll search failure: {}", |
| (response.empty() ? "empty response" : response)); |
| } |
| } |
| |
| scroll_parser.reset(new ScrollParser(_doc_value_mode)); |
| VLOG_CRITICAL << "get_next request ES, returned response: " << response; |
| Status status = scroll_parser->parse(response, _exactly_once); |
| if (!status.ok()) { |
| _eos = true; |
| LOG(WARNING) << status; |
| return status; |
| } |
| |
| // request ES just only once |
| if (_exactly_once) { |
| _eos = true; |
| } else { |
| _scroll_id = scroll_parser->get_scroll_id(); |
| if (scroll_parser->get_size() == 0) { |
| _eos = true; |
| return Status::OK(); |
| } |
| |
| _eos = scroll_parser->get_size() < _batch_size; |
| } |
| *scan_eos = false; |
| return Status::OK(); |
| } |
| |
| Status ESScanReader::close() { |
| if (_scroll_id.empty()) { |
| return Status::OK(); |
| } |
| |
| std::string scratch_target = _target + REQUEST_SEARCH_SCROLL_PATH; |
| // we do not enable set_fail_on_error for ES http request to get more detail error messages |
| bool set_fail_on_error = false; |
| RETURN_IF_ERROR(_network_client.init(scratch_target, set_fail_on_error)); |
| _network_client.set_basic_auth(_user_name, _passwd); |
| _network_client.set_method(DELETE); |
| _network_client.set_content_type("application/json"); |
| _network_client.set_timeout_ms(_http_timeout_ms); |
| if (_use_ssl_client) { |
| _network_client.use_untrusted_ssl(); |
| } |
| std::string response; |
| RETURN_IF_ERROR(_network_client.execute_delete_request( |
| ESScrollQueryBuilder::build_clear_scroll_body(_scroll_id), &response)); |
| long status = _network_client.get_http_status(); |
| if (status == 200) { |
| return Status::OK(); |
| } else { |
| LOG(WARNING) << "es_scan_reader delete scroll context failure[" |
| << "http status: " << status |
| << ", response: " << (response.empty() ? "empty response" : response) << "]"; |
| return Status::InternalError("es_scan_reader delete scroll context failure"); |
| } |
| } |
| } // namespace doris |