blob: 5db50bd179cdfcb109f14f9a71dda74a97443ba7 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/es/es_scroll_parser.h"
#include <absl/strings/substitute.h>
#include <cctz/time_zone.h>
#include <glog/logging.h>
#include <rapidjson/allocators.h>
#include <rapidjson/encodings.h>
#include <stdint.h>
#include <string.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <cstdlib>
#include <ostream>
#include <string>
#include "common/status.h"
#include "rapidjson/document.h"
#include "rapidjson/rapidjson.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"
#include "runtime/decimalv2_value.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/jsonb_value.h"
#include "runtime/primitive_type.h"
#include "runtime/types.h"
#include "util/binary_cast.hpp"
#include "util/string_parser.hpp"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/core/field.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/runtime/vdatetime_value.h"
namespace doris {
static const char* FIELD_SCROLL_ID = "_scroll_id";
static const char* FIELD_HITS = "hits";
static const char* FIELD_INNER_HITS = "hits";
static const char* FIELD_SOURCE = "_source";
static const char* FIELD_ID = "_id";
// get the original json data type
std::string json_type_to_string(rapidjson::Type type) {
switch (type) {
case rapidjson::kNumberType:
return "Number";
case rapidjson::kStringType:
return "Varchar/Char";
case rapidjson::kArrayType:
return "Array";
case rapidjson::kObjectType:
return "Object";
case rapidjson::kNullType:
return "Null Type";
case rapidjson::kFalseType:
case rapidjson::kTrueType:
return "True/False";
default:
return "Unknown Type";
}
}
// transfer rapidjson::Value to string representation
std::string json_value_to_string(const rapidjson::Value& value) {
rapidjson::StringBuffer scratch_buffer;
rapidjson::Writer<rapidjson::StringBuffer> temp_writer(scratch_buffer);
value.Accept(temp_writer);
return scratch_buffer.GetString();
}
static const std::string ERROR_INVALID_COL_DATA =
"Data source returned inconsistent column data. "
"Expected value of type {} based on column metadata. This likely indicates a "
"problem with the data source library.";
static const std::string ERROR_MEM_LIMIT_EXCEEDED =
"DataSourceScanNode::$0() failed to allocate "
"$1 bytes for $2.";
static const std::string ERROR_COL_DATA_IS_ARRAY =
"Data source returned an array for the type $0"
"based on column metadata.";
static const std::string INVALID_NULL_VALUE =
"Invalid null value occurs: Non-null column `$0` contains NULL";
#define RETURN_ERROR_IF_COL_IS_ARRAY(col, type, is_array) \
do { \
if (col.IsArray() == is_array) { \
std::stringstream ss; \
ss << "Expected value of type: " << type_to_string(type) \
<< "; but found type: " << json_type_to_string(col.GetType()) \
<< "; Document slice is : " << json_value_to_string(col); \
return Status::RuntimeError(ss.str()); \
} \
} while (false)
#define RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type) \
do { \
if (!col.IsString()) { \
std::stringstream ss; \
ss << "Expected value of type: " << type_to_string(type) \
<< "; but found type: " << json_type_to_string(col.GetType()) \
<< "; Document source slice is : " << json_value_to_string(col); \
return Status::RuntimeError(ss.str()); \
} \
} while (false)
#define RETURN_ERROR_IF_COL_IS_NOT_NUMBER(col, type) \
do { \
if (!col.IsNumber()) { \
std::stringstream ss; \
ss << "Expected value of type: " << type_to_string(type) \
<< "; but found type: " << json_type_to_string(col.GetType()) \
<< "; Document value is: " << json_value_to_string(col); \
return Status::RuntimeError(ss.str()); \
} \
} while (false)
#define RETURN_ERROR_IF_PARSING_FAILED(result, col, type) \
do { \
if (result != StringParser::PARSE_SUCCESS) { \
std::stringstream ss; \
ss << "Expected value of type: " << type_to_string(type) \
<< "; but found type: " << json_type_to_string(col.GetType()) \
<< "; Document source slice is : " << json_value_to_string(col); \
return Status::RuntimeError(ss.str()); \
} \
} while (false)
#define RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type) \
do { \
std::stringstream ss; \
ss << "Expected value of type: " << type_to_string(type) \
<< "; but found type: " << json_type_to_string(col.GetType()) \
<< "; Document slice is : " << json_value_to_string(col); \
return Status::RuntimeError(ss.str()); \
} while (false)
template <typename T>
Status get_int_value(const rapidjson::Value& col, PrimitiveType type, void* slot,
bool pure_doc_value) {
if (col.IsNumber()) {
*reinterpret_cast<T*>(slot) = (T)(sizeof(T) < 8 ? col.GetInt() : col.GetInt64());
return Status::OK();
}
if (pure_doc_value && col.IsArray() && !col.Empty()) {
RETURN_ERROR_IF_COL_IS_NOT_NUMBER(col[0], type);
*reinterpret_cast<T*>(slot) = (T)(sizeof(T) < 8 ? col[0].GetInt() : col[0].GetInt64());
return Status::OK();
}
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);
StringParser::ParseResult result;
const std::string& val = col.GetString();
size_t len = col.GetStringLength();
T v = StringParser::string_to_int<T>(val.c_str(), len, &result);
RETURN_ERROR_IF_PARSING_FAILED(result, col, type);
if (sizeof(T) < 16) {
*reinterpret_cast<T*>(slot) = v;
} else {
DCHECK(sizeof(T) == 16);
memcpy(slot, &v, sizeof(v));
}
return Status::OK();
}
template <PrimitiveType T>
Status get_date_value_int(const rapidjson::Value& col, PrimitiveType type, bool is_date_str,
typename PrimitiveTypeTraits<T>::ColumnItemType* slot,
const cctz::time_zone& time_zone) {
constexpr bool is_datetime_v1 = T == TYPE_DATE || T == TYPE_DATETIME;
typename PrimitiveTypeTraits<T>::CppType dt_val;
if (is_date_str) {
const std::string str_date = col.GetString();
int str_length = col.GetStringLength();
bool success = false;
if (str_length > 19) {
std::chrono::system_clock::time_point tp;
// time_zone suffix pattern
// Z/+08:00/-04:30
RE2 time_zone_pattern(R"([+-]\d{2}:\d{2}|Z)");
bool ok = false;
std::string fmt;
re2::StringPiece value;
if (time_zone_pattern.Match(str_date, 0, str_date.size(), RE2::UNANCHORED, &value, 1)) {
// with time_zone info
// YYYY-MM-DDTHH:MM:SSZ or YYYY-MM-DDTHH:MM:SS+08:00
// or 2022-08-08T12:10:10.000Z or YYYY-MM-DDTHH:MM:SS-08:00
fmt = "%Y-%m-%dT%H:%M:%E*S%Ez";
cctz::time_zone ctz;
// find time_zone by time_zone suffix string
TimezoneUtils::find_cctz_time_zone(value.as_string(), ctz);
ok = cctz::parse(fmt, str_date, ctz, &tp);
} else {
// without time_zone info
// 2022-08-08T12:10:10.000
fmt = "%Y-%m-%dT%H:%M:%E*S";
// If the time without time_zone info, ES will assume it is UTC time.
// So we parse it in Doris with UTC time zone.
ok = cctz::parse(fmt, str_date, cctz::utc_time_zone(), &tp);
}
if (ok) {
// The local time zone can change by session variable `time_zone`
// We should use the user specified time zone, not the actual system local time zone.
success = true;
dt_val.from_unixtime(std::chrono::system_clock::to_time_t(tp), time_zone);
}
} else if (str_length == 19) {
// YYYY-MM-DDTHH:MM:SS
if (*(str_date.c_str() + 10) == 'T') {
std::chrono::system_clock::time_point tp;
const bool ok =
cctz::parse("%Y-%m-%dT%H:%M:%S", str_date, cctz::utc_time_zone(), &tp);
if (ok) {
success = true;
dt_val.from_unixtime(std::chrono::system_clock::to_time_t(tp), time_zone);
}
} else {
// YYYY-MM-DD HH:MM:SS
success = dt_val.from_date_str(str_date.c_str(), str_length);
}
} else if (str_length == 13) {
// string long like "1677895728000"
int64_t time_long = std::atol(str_date.c_str());
if (time_long > 0) {
success = true;
dt_val.from_unixtime(time_long / 1000, time_zone);
}
} else {
// YYYY-MM-DD or others
success = dt_val.from_date_str(str_date.c_str(), str_length);
}
if (!success) {
RETURN_ERROR_IF_CAST_FORMAT_ERROR(col, type);
}
} else {
dt_val.from_unixtime(col.GetInt64() / 1000, time_zone);
}
if constexpr (is_datetime_v1) {
if (type == TYPE_DATE) {
dt_val.cast_to_date();
} else {
dt_val.to_datetime();
}
}
*reinterpret_cast<typename PrimitiveTypeTraits<T>::ColumnItemType*>(slot) =
binary_cast<typename PrimitiveTypeTraits<T>::CppType,
typename PrimitiveTypeTraits<T>::ColumnItemType>(
*reinterpret_cast<typename PrimitiveTypeTraits<T>::CppType*>(&dt_val));
return Status::OK();
}
template <PrimitiveType T>
Status get_date_int(const rapidjson::Value& col, PrimitiveType type, bool pure_doc_value,
typename PrimitiveTypeTraits<T>::ColumnItemType* slot,
const cctz::time_zone& time_zone) {
// this would happend just only when `enable_docvalue_scan = false`, and field has timestamp format date from _source
if (col.IsNumber()) {
// ES process date/datetime field would use millisecond timestamp for index or docvalue
// processing date type field, if a number is encountered, Doris On ES will force it to be processed according to ms
// Doris On ES needs to be consistent with ES, so just divided by 1000 because the unit for from_unixtime is seconds
return get_date_value_int<T>(col, type, false, slot, time_zone);
} else if (col.IsArray() && pure_doc_value && !col.Empty()) {
// this would happened just only when `enable_docvalue_scan = true`
// ES add default format for all field after ES 6.4, if we not provided format for `date` field ES would impose
// a standard date-format for date field as `2020-06-16T00:00:00.000Z`
// At present, we just process this string format date. After some PR were merged into Doris, we would impose `epoch_mills` for
// date field's docvalue
if (col[0].IsString()) {
return get_date_value_int<T>(col[0], type, true, slot, time_zone);
}
// ES would return millisecond timestamp for date field, divided by 1000 because the unit for from_unixtime is seconds
return get_date_value_int<T>(col[0], type, false, slot, time_zone);
} else {
// this would happened just only when `enable_docvalue_scan = false`, and field has string format date from _source
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);
return get_date_value_int<T>(col, type, true, slot, time_zone);
}
}
template <PrimitiveType T>
Status fill_date_int(const rapidjson::Value& col, PrimitiveType type, bool pure_doc_value,
vectorized::IColumn* col_ptr, const cctz::time_zone& time_zone) {
typename PrimitiveTypeTraits<T>::ColumnItemType data;
RETURN_IF_ERROR((get_date_int<T>(col, type, pure_doc_value, &data, time_zone)));
col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&data)), 0);
return Status::OK();
}
template <typename T>
Status get_float_value(const rapidjson::Value& col, PrimitiveType type, void* slot,
bool pure_doc_value) {
static_assert(sizeof(T) == 4 || sizeof(T) == 8);
if (col.IsNumber()) {
*reinterpret_cast<T*>(slot) = (T)(sizeof(T) == 4 ? col.GetFloat() : col.GetDouble());
return Status::OK();
}
if (pure_doc_value && col.IsArray() && !col.Empty()) {
*reinterpret_cast<T*>(slot) = (T)(sizeof(T) == 4 ? col[0].GetFloat() : col[0].GetDouble());
return Status::OK();
}
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);
StringParser::ParseResult result;
const std::string& val = col.GetString();
size_t len = col.GetStringLength();
T v = StringParser::string_to_float<T>(val.c_str(), len, &result);
RETURN_ERROR_IF_PARSING_FAILED(result, col, type);
*reinterpret_cast<T*>(slot) = v;
return Status::OK();
}
template <typename T>
Status insert_float_value(const rapidjson::Value& col, PrimitiveType type,
vectorized::IColumn* col_ptr, bool pure_doc_value, bool nullable) {
static_assert(sizeof(T) == 4 || sizeof(T) == 8);
if (col.IsNumber() && nullable) {
T value = (T)(sizeof(T) == 4 ? col.GetFloat() : col.GetDouble());
col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&value)), 0);
return Status::OK();
}
if (pure_doc_value && col.IsArray() && !col.Empty() && nullable) {
T value = (T)(sizeof(T) == 4 ? col[0].GetFloat() : col[0].GetDouble());
col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&value)), 0);
return Status::OK();
}
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);
StringParser::ParseResult result;
const std::string& val = col.GetString();
size_t len = col.GetStringLength();
T v = StringParser::string_to_float<T>(val.c_str(), len, &result);
RETURN_ERROR_IF_PARSING_FAILED(result, col, type);
col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&v)), 0);
return Status::OK();
}
template <typename T>
Status insert_int_value(const rapidjson::Value& col, PrimitiveType type,
vectorized::IColumn* col_ptr, bool pure_doc_value, bool nullable) {
if (col.IsNumber()) {
T value;
// ES allows inserting float and double in int/long types.
// To parse these numbers in Doris, we direct cast them to int types.
if (col.IsDouble()) {
value = static_cast<T>(col.GetDouble());
} else if (col.IsFloat()) {
value = static_cast<T>(col.GetFloat());
} else {
value = (T)(sizeof(T) < 8 ? col.GetInt() : col.GetInt64());
}
col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&value)), 0);
return Status::OK();
}
auto parse_and_insert_data = [&](const rapidjson::Value& col_value) -> Status {
StringParser::ParseResult result;
std::string val = col_value.GetString();
// ES allows inserting numbers and characters containing decimals in numeric types.
// To parse these numbers in Doris, we remove the decimals here.
size_t pos = val.find('.');
if (pos != std::string::npos) {
val = val.substr(0, pos);
}
size_t len = val.length();
T v = StringParser::string_to_int<T>(val.c_str(), len, &result);
RETURN_ERROR_IF_PARSING_FAILED(result, col_value, type);
col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&v)), 0);
return Status::OK();
};
if (pure_doc_value && col.IsArray() && !col.Empty()) {
if (col[0].IsNumber()) {
T value = (T)(sizeof(T) < 8 ? col[0].GetInt() : col[0].GetInt64());
col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&value)), 0);
return Status::OK();
} else {
RETURN_ERROR_IF_COL_IS_ARRAY(col[0], type, true);
RETURN_ERROR_IF_COL_IS_NOT_STRING(col[0], type);
return parse_and_insert_data(col[0]);
}
}
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
RETURN_ERROR_IF_COL_IS_NOT_STRING(col, type);
return parse_and_insert_data(col);
}
template <PrimitiveType T>
Status handle_value(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
typename PrimitiveTypeTraits<T>::ColumnItemType& val) {
if constexpr (T == TYPE_TINYINT || T == TYPE_SMALLINT || T == TYPE_INT || T == TYPE_BIGINT ||
T == TYPE_LARGEINT) {
RETURN_IF_ERROR(get_int_value<typename PrimitiveTypeTraits<T>::ColumnItemType>(
col, sub_type, &val, pure_doc_value));
return Status::OK();
}
if constexpr (T == TYPE_FLOAT) {
RETURN_IF_ERROR(get_float_value<float>(col, sub_type, &val, pure_doc_value));
return Status::OK();
}
if constexpr (T == TYPE_DOUBLE) {
RETURN_IF_ERROR(get_float_value<double>(col, sub_type, &val, pure_doc_value));
return Status::OK();
}
if constexpr (T == TYPE_STRING || T == TYPE_CHAR || T == TYPE_VARCHAR) {
RETURN_ERROR_IF_COL_IS_ARRAY(col, sub_type, true);
if (!col.IsString()) {
val = json_value_to_string(col);
} else {
val = col.GetString();
}
return Status::OK();
}
if constexpr (T == TYPE_BOOLEAN) {
if (col.IsBool()) {
val = col.GetBool();
return Status::OK();
}
if (col.IsNumber()) {
val = col.GetInt();
return Status::OK();
}
bool is_nested_str = false;
if (pure_doc_value && col.IsArray() && !col.Empty() && col[0].IsBool()) {
val = col[0].GetBool();
return Status::OK();
} else if (pure_doc_value && col.IsArray() && !col.Empty() && col[0].IsString()) {
is_nested_str = true;
} else if (pure_doc_value && col.IsArray()) {
return Status::InternalError(ERROR_INVALID_COL_DATA, "BOOLEAN");
}
const rapidjson::Value& str_col = is_nested_str ? col[0] : col;
const std::string& str_val = str_col.GetString();
size_t val_size = str_col.GetStringLength();
StringParser::ParseResult result;
val = StringParser::string_to_bool(str_val.c_str(), val_size, &result);
RETURN_ERROR_IF_PARSING_FAILED(result, str_col, sub_type);
return Status::OK();
}
throw Exception(ErrorCode::INTERNAL_ERROR, "Un-supported type: {}", type_to_string(T));
}
template <PrimitiveType T>
Status process_single_column(const rapidjson::Value& col, PrimitiveType sub_type,
bool pure_doc_value, vectorized::Array& array) {
typename PrimitiveTypeTraits<T>::ColumnItemType val;
RETURN_IF_ERROR(handle_value<T>(col, sub_type, pure_doc_value, val));
array.push_back(vectorized::Field::create_field<T>(val));
return Status::OK();
}
template <PrimitiveType T>
Status process_column_array(const rapidjson::Value& col, PrimitiveType sub_type,
bool pure_doc_value, vectorized::Array& array) {
for (const auto& sub_col : col.GetArray()) {
RETURN_IF_ERROR(process_single_column<T>(sub_col, sub_type, pure_doc_value, array));
}
return Status::OK();
}
template <PrimitiveType T>
Status process_column(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
vectorized::Array& array) {
if (!col.IsArray()) {
return process_single_column<T>(col, sub_type, pure_doc_value, array);
} else {
return process_column_array<T>(col, sub_type, pure_doc_value, array);
}
}
template <PrimitiveType T>
Status process_date_column(const rapidjson::Value& col, PrimitiveType sub_type, bool pure_doc_value,
vectorized::Array& array, const cctz::time_zone& time_zone) {
if (!col.IsArray()) {
typename PrimitiveTypeTraits<T>::ColumnItemType data;
RETURN_IF_ERROR((get_date_int<T>(col, sub_type, pure_doc_value, &data, time_zone)));
array.push_back(vectorized::Field::create_field<T>(data));
} else {
for (const auto& sub_col : col.GetArray()) {
typename PrimitiveTypeTraits<T>::ColumnItemType data;
RETURN_IF_ERROR((get_date_int<T>(sub_col, sub_type, pure_doc_value, &data, time_zone)));
array.push_back(vectorized::Field::create_field<T>(data));
}
}
return Status::OK();
}
Status ScrollParser::parse_column(const rapidjson::Value& col, PrimitiveType sub_type,
bool pure_doc_value, vectorized::Array& array,
const cctz::time_zone& time_zone) {
switch (sub_type) {
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING:
return process_column<TYPE_STRING>(col, sub_type, pure_doc_value, array);
case TYPE_TINYINT:
return process_column<TYPE_TINYINT>(col, sub_type, pure_doc_value, array);
case TYPE_SMALLINT:
return process_column<TYPE_SMALLINT>(col, sub_type, pure_doc_value, array);
case TYPE_INT:
return process_column<TYPE_INT>(col, sub_type, pure_doc_value, array);
case TYPE_BIGINT:
return process_column<TYPE_BIGINT>(col, sub_type, pure_doc_value, array);
case TYPE_LARGEINT:
return process_column<TYPE_LARGEINT>(col, sub_type, pure_doc_value, array);
case TYPE_FLOAT:
return process_column<TYPE_FLOAT>(col, sub_type, pure_doc_value, array);
case TYPE_DOUBLE:
return process_column<TYPE_DOUBLE>(col, sub_type, pure_doc_value, array);
case TYPE_BOOLEAN:
return process_column<TYPE_BOOLEAN>(col, sub_type, pure_doc_value, array);
// date/datetime v2 is the default type for catalog table,
// see https://github.com/apache/doris/pull/16304
// No need to support date and datetime types.
case TYPE_DATEV2: {
return process_date_column<TYPE_DATEV2>(col, sub_type, pure_doc_value, array, time_zone);
}
case TYPE_DATETIMEV2: {
return process_date_column<TYPE_DATETIMEV2>(col, sub_type, pure_doc_value, array,
time_zone);
}
default:
LOG(ERROR) << "Do not support Array type: " << sub_type;
return Status::InternalError("Unsupported type");
}
}
ScrollParser::ScrollParser(bool doc_value_mode) : _size(0), _line_index(0) {}
ScrollParser::~ScrollParser() = default;
Status ScrollParser::parse(const std::string& scroll_result, bool exactly_once) {
// rely on `_size !=0 ` to determine whether scroll ends
_size = 0;
_document_node.Parse(scroll_result.c_str(), scroll_result.length());
if (_document_node.HasParseError()) {
return Status::InternalError("Parsing json error, json is: {}", scroll_result);
}
if (!exactly_once && !_document_node.HasMember(FIELD_SCROLL_ID)) {
LOG(WARNING) << "Document has not a scroll id field scroll response:" << scroll_result;
return Status::InternalError("Document has not a scroll id field");
}
if (!exactly_once) {
const rapidjson::Value& scroll_node = _document_node[FIELD_SCROLL_ID];
_scroll_id = scroll_node.GetString();
}
// { hits: { total : 2, "hits" : [ {}, {}, {} ]}}
const rapidjson::Value& outer_hits_node = _document_node[FIELD_HITS];
// if has no inner hits, there has no data in this index
if (!outer_hits_node.HasMember(FIELD_INNER_HITS)) {
return Status::OK();
}
const rapidjson::Value& inner_hits_node = outer_hits_node[FIELD_INNER_HITS];
// this happened just the end of scrolling
if (!inner_hits_node.IsArray()) {
return Status::OK();
}
_inner_hits_node.CopyFrom(inner_hits_node, _document_node.GetAllocator());
// how many documents contains in this batch
_size = _inner_hits_node.Size();
return Status::OK();
}
int ScrollParser::get_size() const {
return _size;
}
const std::string& ScrollParser::get_scroll_id() {
return _scroll_id;
}
Status ScrollParser::fill_columns(const TupleDescriptor* tuple_desc,
std::vector<vectorized::MutableColumnPtr>& columns,
bool* line_eof,
const std::map<std::string, std::string>& docvalue_context,
const cctz::time_zone& time_zone) {
*line_eof = true;
if (_size <= 0 || _line_index >= _size) {
return Status::OK();
}
const rapidjson::Value& obj = _inner_hits_node[_line_index++];
bool pure_doc_value = false;
if (obj.HasMember("fields")) {
pure_doc_value = true;
}
// obj may be neither have `_source` nor `fields` field.
const rapidjson::Value* line = nullptr;
if (obj.HasMember(FIELD_SOURCE)) {
line = &obj[FIELD_SOURCE];
} else if (obj.HasMember("fields")) {
line = &obj["fields"];
}
for (int i = 0; i < tuple_desc->slots().size(); ++i) {
const SlotDescriptor* slot_desc = tuple_desc->slots()[i];
auto col_ptr = columns[i].get();
if (!slot_desc->is_materialized()) {
continue;
}
if (slot_desc->col_name() == FIELD_ID) {
// actually this branch will not be reached, this is guaranteed by Doris FE.
if (pure_doc_value) {
return Status::RuntimeError("obtain `_id` is not supported in doc_values mode");
}
// obj[FIELD_ID] must not be NULL
std::string _id = obj[FIELD_ID].GetString();
size_t len = _id.length();
col_ptr->insert_data(const_cast<const char*>(_id.data()), len);
continue;
}
const char* col_name = pure_doc_value ? docvalue_context.at(slot_desc->col_name()).c_str()
: slot_desc->col_name().c_str();
if (line == nullptr || line->FindMember(col_name) == line->MemberEnd()) {
if (slot_desc->is_nullable()) {
auto* nullable_column = reinterpret_cast<vectorized::ColumnNullable*>(col_ptr);
nullable_column->insert_data(nullptr, 0);
continue;
} else {
std::string details = absl::Substitute(INVALID_NULL_VALUE, col_name);
return Status::RuntimeError(details);
}
}
const rapidjson::Value& col = (*line)[col_name];
auto type = slot_desc->type()->get_primitive_type();
// when the column value is null, the subsequent type casting will report an error
if (col.IsNull() && slot_desc->is_nullable()) {
col_ptr->insert_data(nullptr, 0);
continue;
} else if (col.IsNull() && !slot_desc->is_nullable()) {
std::string details = absl::Substitute(INVALID_NULL_VALUE, col_name);
return Status::RuntimeError(details);
}
switch (type) {
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
// sometimes elasticsearch user post some not-string value to Elasticsearch Index.
// because of reading value from _source, we can not process all json type and then just transfer the value to original string representation
// this may be a tricky, but we can work around this issue
std::string val;
if (pure_doc_value) {
if (col.Empty()) {
break;
} else if (!col[0].IsString()) {
val = json_value_to_string(col[0]);
} else {
val = col[0].GetString();
}
} else {
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
if (!col.IsString()) {
val = json_value_to_string(col);
} else {
val = col.GetString();
}
}
size_t val_size = val.length();
col_ptr->insert_data(const_cast<const char*>(val.data()), val_size);
break;
}
case TYPE_TINYINT: {
RETURN_IF_ERROR(insert_int_value<int8_t>(col, type, col_ptr, pure_doc_value,
slot_desc->is_nullable()));
break;
}
case TYPE_SMALLINT: {
RETURN_IF_ERROR(insert_int_value<int16_t>(col, type, col_ptr, pure_doc_value,
slot_desc->is_nullable()));
break;
}
case TYPE_INT: {
RETURN_IF_ERROR(insert_int_value<int32_t>(col, type, col_ptr, pure_doc_value,
slot_desc->is_nullable()));
break;
}
case TYPE_BIGINT: {
RETURN_IF_ERROR(insert_int_value<int64_t>(col, type, col_ptr, pure_doc_value,
slot_desc->is_nullable()));
break;
}
case TYPE_LARGEINT: {
RETURN_IF_ERROR(insert_int_value<__int128>(col, type, col_ptr, pure_doc_value,
slot_desc->is_nullable()));
break;
}
case TYPE_DOUBLE: {
RETURN_IF_ERROR(insert_float_value<double>(col, type, col_ptr, pure_doc_value,
slot_desc->is_nullable()));
break;
}
case TYPE_FLOAT: {
RETURN_IF_ERROR(insert_float_value<float>(col, type, col_ptr, pure_doc_value,
slot_desc->is_nullable()));
break;
}
case TYPE_BOOLEAN: {
if (col.IsBool()) {
int8_t val = col.GetBool();
col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&val)), 0);
break;
}
if (col.IsNumber()) {
int8_t val = col.GetInt();
col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&val)), 0);
break;
}
bool is_nested_str = false;
if (pure_doc_value && col.IsArray() && !col.Empty() && col[0].IsBool()) {
int8_t val = col[0].GetBool();
col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&val)), 0);
break;
} else if (pure_doc_value && col.IsArray() && !col.Empty() && col[0].IsString()) {
is_nested_str = true;
} else if (pure_doc_value && col.IsArray()) {
return Status::InternalError(ERROR_INVALID_COL_DATA, "BOOLEAN");
}
const rapidjson::Value& str_col = is_nested_str ? col[0] : col;
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
const std::string& val = str_col.GetString();
size_t val_size = str_col.GetStringLength();
StringParser::ParseResult result;
bool b = StringParser::string_to_bool(val.c_str(), val_size, &result);
RETURN_ERROR_IF_PARSING_FAILED(result, str_col, type);
col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&b)), 0);
break;
}
case TYPE_DECIMALV2: {
DecimalV2Value data;
if (col.IsDouble()) {
data.assign_from_double(col.GetDouble());
} else {
std::string val;
if (pure_doc_value) {
if (col.Empty()) {
break;
} else if (!col[0].IsString()) {
val = json_value_to_string(col[0]);
} else {
val = col[0].GetString();
}
} else {
RETURN_ERROR_IF_COL_IS_ARRAY(col, type, true);
if (!col.IsString()) {
val = json_value_to_string(col);
} else {
val = col.GetString();
}
}
data.parse_from_str(val.data(), val.length());
}
col_ptr->insert_data(const_cast<const char*>(reinterpret_cast<char*>(&data)), 0);
break;
}
case TYPE_DATE:
RETURN_IF_ERROR(
fill_date_int<TYPE_DATE>(col, type, pure_doc_value, col_ptr, time_zone));
break;
case TYPE_DATETIME:
RETURN_IF_ERROR(
fill_date_int<TYPE_DATETIME>(col, type, pure_doc_value, col_ptr, time_zone));
break;
case TYPE_DATEV2:
RETURN_IF_ERROR(
fill_date_int<TYPE_DATEV2>(col, type, pure_doc_value, col_ptr, time_zone));
break;
case TYPE_DATETIMEV2: {
RETURN_IF_ERROR(
fill_date_int<TYPE_DATETIMEV2>(col, type, pure_doc_value, col_ptr, time_zone));
break;
}
case TYPE_ARRAY: {
vectorized::Array array;
const auto& sub_type =
assert_cast<const vectorized::DataTypeArray*>(
vectorized::remove_nullable(tuple_desc->slots()[i]->type()).get())
->get_nested_type()
->get_primitive_type();
RETURN_IF_ERROR(parse_column(col, sub_type, pure_doc_value, array, time_zone));
col_ptr->insert(vectorized::Field::create_field<TYPE_ARRAY>(array));
break;
}
case TYPE_JSONB: {
JsonBinaryValue jsonb_value;
RETURN_IF_ERROR(jsonb_value.from_json_string(json_value_to_string(col)));
vectorized::JsonbField json(jsonb_value.value(), jsonb_value.size());
col_ptr->insert(vectorized::Field::create_field<TYPE_JSONB>(json));
break;
}
default: {
LOG(ERROR) << "Unsupported data type: " << type_to_string(type);
DCHECK(false);
break;
}
}
}
*line_eof = false;
return Status::OK();
}
} // namespace doris