blob: b65bc89c64eacfa8c03532e7689c5c2a4123e32c [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/delete_handler.h"
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/olap_file.pb.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <string>
#include <vector>
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "olap/block_column_predicate.h"
#include "olap/olap_common.h"
#include "olap/predicate_creator.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "util/debug_points.h"
#include "vec/functions/cast/cast_parameters.h"
#include "vec/functions/cast/cast_to_boolean.h"
#include "vec/functions/cast/cast_to_date_or_datetime_impl.hpp"
#include "vec/functions/cast/cast_to_datetimev2_impl.hpp"
#include "vec/functions/cast/cast_to_datev2_impl.hpp"
#include "vec/functions/cast/cast_to_decimal.h"
#include "vec/functions/cast/cast_to_float.h"
#include "vec/functions/cast/cast_to_int.h"
#include "vec/functions/cast/cast_to_ip.h"
using apache::thrift::ThriftDebugString;
using std::vector;
using std::string;
using ::google::protobuf::RepeatedPtrField;
namespace doris {
template <PrimitiveType PType>
Status convert(const vectorized::DataTypePtr& data_type, const std::string& str,
vectorized::Arena& arena, typename PrimitiveTypeTraits<PType>::CppType& res) {
if constexpr (PType == TYPE_TINYINT || PType == TYPE_SMALLINT || PType == TYPE_INT ||
PType == TYPE_BIGINT || PType == TYPE_LARGEINT) {
vectorized::CastParameters parameters;
if (!vectorized::CastToInt::from_string({str.data(), str.size()}, res, parameters)) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid {} string. str={}", type_to_string(data_type->get_primitive_type()),
str);
}
return Status::OK();
}
if constexpr (PType == TYPE_FLOAT || PType == TYPE_DOUBLE) {
vectorized::CastParameters parameters;
if (!vectorized::CastToFloat::from_string({str.data(), str.size()}, res, parameters)) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid {} string. str={}", type_to_string(data_type->get_primitive_type()),
str);
}
return Status::OK();
}
if constexpr (PType == TYPE_DATE) {
vectorized::CastParameters parameters;
if (!vectorized::CastToDateOrDatetime::from_string<false>({str.data(), str.size()}, res,
nullptr, parameters)) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid {} string. str={}", type_to_string(data_type->get_primitive_type()),
str);
}
return Status::OK();
}
if constexpr (PType == TYPE_DATETIME) {
vectorized::CastParameters parameters;
if (!vectorized::CastToDateOrDatetime::from_string<true>({str.data(), str.size()}, res,
nullptr, parameters)) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid {} string. str={}", type_to_string(data_type->get_primitive_type()),
str);
}
return Status::OK();
}
if constexpr (PType == TYPE_DATEV2) {
vectorized::CastParameters parameters;
if (!vectorized::CastToDateV2::from_string({str.data(), str.size()}, res, nullptr,
parameters)) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid {} string. str={}", type_to_string(data_type->get_primitive_type()),
str);
}
return Status::OK();
}
if constexpr (PType == TYPE_DATETIMEV2) {
vectorized::CastParameters parameters;
if (!vectorized::CastToDatetimeV2::from_string({str.data(), str.size()}, res, nullptr,
data_type->get_scale(), parameters)) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid {} string. str={}", type_to_string(data_type->get_primitive_type()),
str);
}
return Status::OK();
}
if constexpr (PType == TYPE_TIMESTAMPTZ) {
vectorized::CastParameters parameters;
if (!vectorized::CastToTimstampTz::from_string({str.data(), str.size()}, res, parameters,
nullptr, data_type->get_scale())) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid {} string. str={}", type_to_string(data_type->get_primitive_type()),
str);
}
return Status::OK();
}
if constexpr (PType == TYPE_CHAR) {
size_t target = assert_cast<const vectorized::DataTypeString*>(
vectorized::remove_nullable(data_type).get())
->len();
res = {str.data(), str.size()};
if (target > str.size()) {
char* buffer = arena.alloc(target);
memset(buffer, 0, target);
memcpy(buffer, str.data(), str.size());
res = {buffer, target};
}
return Status::OK();
}
if constexpr (PType == TYPE_STRING || PType == TYPE_VARCHAR) {
char* buffer = arena.alloc(str.size());
memcpy(buffer, str.data(), str.size());
res = {buffer, str.size()};
return Status::OK();
}
if constexpr (PType == TYPE_BOOLEAN) {
vectorized::CastParameters parameters;
vectorized::UInt8 tmp;
if (!vectorized::CastToBool::from_string({str.data(), str.size()}, tmp, parameters)) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid {} string. str={}", type_to_string(data_type->get_primitive_type()),
str);
}
res = tmp != 0;
return Status::OK();
}
if constexpr (PType == TYPE_IPV4) {
vectorized::CastParameters parameters;
if (!vectorized::CastToIPv4::from_string({str.data(), str.size()}, res, parameters)) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid {} string. str={}", type_to_string(data_type->get_primitive_type()),
str);
}
return Status::OK();
}
if constexpr (PType == TYPE_IPV6) {
vectorized::CastParameters parameters;
if (!vectorized::CastToIPv6::from_string({str.data(), str.size()}, res, parameters)) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid {} string. str={}", type_to_string(data_type->get_primitive_type()),
str);
}
return Status::OK();
}
if constexpr (PType == TYPE_DECIMALV2) {
vectorized::CastParameters parameters;
vectorized::Decimal128V2 tmp;
if (!vectorized::CastToDecimal::from_string({str.data(), str.size()}, tmp,
data_type->get_precision(),
data_type->get_scale(), parameters)) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid {} string. str={}", type_to_string(data_type->get_primitive_type()),
str);
}
res = DecimalV2Value(tmp.value);
return Status::OK();
} else if constexpr (is_decimal(PType)) {
vectorized::CastParameters parameters;
if (!vectorized::CastToDecimal::from_string({str.data(), str.size()}, res,
data_type->get_precision(),
data_type->get_scale(), parameters)) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid {} string. str={}", type_to_string(data_type->get_primitive_type()),
str);
}
return Status::OK();
}
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"unsupported data type in delete handler. type={}",
type_to_string(data_type->get_primitive_type()));
}
#define CONVERT_CASE(PType) \
case PType: { \
set = build_set<PType>(); \
for (const auto& s : str) { \
typename PrimitiveTypeTraits<PType>::CppType tmp; \
RETURN_IF_ERROR(convert<PType>(data_type, s, arena, tmp)); \
set->insert(reinterpret_cast<const void*>(&tmp)); \
} \
return Status::OK(); \
}
Status convert(const vectorized::DataTypePtr& data_type, const std::list<std::string>& str,
vectorized::Arena& arena, std::shared_ptr<HybridSetBase>& set) {
switch (data_type->get_primitive_type()) {
CONVERT_CASE(TYPE_TINYINT);
CONVERT_CASE(TYPE_SMALLINT);
CONVERT_CASE(TYPE_INT);
CONVERT_CASE(TYPE_BIGINT);
CONVERT_CASE(TYPE_LARGEINT);
CONVERT_CASE(TYPE_FLOAT);
CONVERT_CASE(TYPE_DOUBLE);
CONVERT_CASE(TYPE_DATE);
CONVERT_CASE(TYPE_DATETIME);
CONVERT_CASE(TYPE_DATEV2);
CONVERT_CASE(TYPE_DATETIMEV2);
CONVERT_CASE(TYPE_TIMESTAMPTZ);
CONVERT_CASE(TYPE_BOOLEAN);
CONVERT_CASE(TYPE_IPV4);
CONVERT_CASE(TYPE_IPV6);
CONVERT_CASE(TYPE_DECIMALV2);
CONVERT_CASE(TYPE_DECIMAL32);
CONVERT_CASE(TYPE_DECIMAL64);
CONVERT_CASE(TYPE_DECIMAL128I);
CONVERT_CASE(TYPE_DECIMAL256);
CONVERT_CASE(TYPE_CHAR);
CONVERT_CASE(TYPE_VARCHAR);
CONVERT_CASE(TYPE_STRING);
default:
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"unsupported data type in delete handler. type={}",
type_to_string(data_type->get_primitive_type()));
}
return Status::OK();
}
#undef CONVERT_CASE
#define CONVERT_CASE(PType) \
case PType: { \
typename PrimitiveTypeTraits<PType>::CppType tmp; \
RETURN_IF_ERROR(convert<PType>(type, res.value_str.front(), arena, tmp)); \
v.data = reinterpret_cast<const char*>(&tmp); \
v.size = sizeof(tmp); \
switch (res.condition_op) { \
case PredicateType::EQ: \
predicate = \
create_comparison_predicate0<PredicateType::EQ>(index, type, v, true, arena); \
return Status::OK(); \
case PredicateType::NE: \
predicate = \
create_comparison_predicate0<PredicateType::NE>(index, type, v, true, arena); \
return Status::OK(); \
case PredicateType::GT: \
predicate = \
create_comparison_predicate0<PredicateType::GT>(index, type, v, true, arena); \
return Status::OK(); \
case PredicateType::GE: \
predicate = \
create_comparison_predicate0<PredicateType::GE>(index, type, v, true, arena); \
return Status::OK(); \
case PredicateType::LT: \
predicate = \
create_comparison_predicate0<PredicateType::LT>(index, type, v, true, arena); \
return Status::OK(); \
case PredicateType::LE: \
predicate = \
create_comparison_predicate0<PredicateType::LE>(index, type, v, true, arena); \
return Status::OK(); \
default: \
return Status::Error<ErrorCode::INVALID_ARGUMENT>( \
"invalid condition operator. operator={}", type_to_op_str(res.condition_op)); \
} \
}
Status parse_to_predicate(const uint32_t index, const vectorized::DataTypePtr& type,
DeleteHandler::ConditionParseResult& res, vectorized::Arena& arena,
std::shared_ptr<ColumnPredicate>& predicate) {
DCHECK_EQ(res.value_str.size(), 1);
if (res.condition_op == PredicateType::IS_NULL ||
res.condition_op == PredicateType::IS_NOT_NULL) {
predicate = NullPredicate::create_shared(
index, res.condition_op == PredicateType::IS_NOT_NULL, type->get_primitive_type());
return Status::OK();
}
StringRef v;
switch (type->get_primitive_type()) {
CONVERT_CASE(TYPE_TINYINT);
CONVERT_CASE(TYPE_SMALLINT);
CONVERT_CASE(TYPE_INT);
CONVERT_CASE(TYPE_BIGINT);
CONVERT_CASE(TYPE_LARGEINT);
CONVERT_CASE(TYPE_FLOAT);
CONVERT_CASE(TYPE_DOUBLE);
CONVERT_CASE(TYPE_DATE);
CONVERT_CASE(TYPE_DATETIME);
CONVERT_CASE(TYPE_DATEV2);
CONVERT_CASE(TYPE_DATETIMEV2);
CONVERT_CASE(TYPE_TIMESTAMPTZ);
CONVERT_CASE(TYPE_BOOLEAN);
CONVERT_CASE(TYPE_IPV4);
CONVERT_CASE(TYPE_IPV6);
CONVERT_CASE(TYPE_DECIMALV2);
CONVERT_CASE(TYPE_DECIMAL32);
CONVERT_CASE(TYPE_DECIMAL64);
CONVERT_CASE(TYPE_DECIMAL128I);
CONVERT_CASE(TYPE_DECIMAL256);
case TYPE_CHAR:
case TYPE_VARCHAR:
case TYPE_STRING: {
RETURN_IF_ERROR(convert<TYPE_STRING>(type, res.value_str.front(), arena, v));
switch (res.condition_op) {
case PredicateType::EQ:
predicate =
create_comparison_predicate0<PredicateType::EQ>(index, type, v, true, arena);
return Status::OK();
case PredicateType::NE:
predicate =
create_comparison_predicate0<PredicateType::NE>(index, type, v, true, arena);
return Status::OK();
case PredicateType::GT:
predicate =
create_comparison_predicate0<PredicateType::GT>(index, type, v, true, arena);
return Status::OK();
case PredicateType::GE:
predicate =
create_comparison_predicate0<PredicateType::GE>(index, type, v, true, arena);
return Status::OK();
case PredicateType::LT:
predicate =
create_comparison_predicate0<PredicateType::LT>(index, type, v, true, arena);
return Status::OK();
case PredicateType::LE:
predicate =
create_comparison_predicate0<PredicateType::LE>(index, type, v, true, arena);
return Status::OK();
default:
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid condition operator. operator={}", type_to_op_str(res.condition_op));
}
break;
}
default:
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"unsupported data type in delete handler. type={}",
type_to_string(type->get_primitive_type()));
}
return Status::OK();
#undef CONVERT_CASE
}
Status parse_to_in_predicate(const uint32_t index, const vectorized::DataTypePtr& type,
DeleteHandler::ConditionParseResult& res, vectorized::Arena& arena,
std::shared_ptr<ColumnPredicate>& predicate) {
DCHECK_GT(res.value_str.size(), 1);
switch (res.condition_op) {
case PredicateType::IN_LIST: {
std::shared_ptr<HybridSetBase> set;
RETURN_IF_ERROR(convert(type, res.value_str, arena, set));
predicate = create_in_list_predicate<PredicateType::IN_LIST>(index, type, set, true);
break;
}
case PredicateType::NOT_IN_LIST: {
std::shared_ptr<HybridSetBase> set;
RETURN_IF_ERROR(convert(type, res.value_str, arena, set));
predicate = create_in_list_predicate<PredicateType::NOT_IN_LIST>(index, type, set, true);
break;
}
default:
return Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid condition operator. operator={}",
type_to_op_str(res.condition_op));
}
return Status::OK();
}
// construct sub condition from TCondition
std::string construct_sub_predicate(const TCondition& condition) {
string op = condition.condition_op;
if (op == "<") {
op += "<";
} else if (op == ">") {
op += ">";
}
string condition_str;
if ("IS" == op) {
// ATTN: tricky! Surround IS with spaces to make it "special"
condition_str = condition.column_name + " IS " + condition.condition_values[0];
} else { // multi-elements IN expr has been processed with InPredicatePB
if (op == "*=") {
op = "=";
} else if (op == "!*=") {
op = "!=";
}
condition_str = condition.column_name + op + "'" + condition.condition_values[0] + "'";
}
return condition_str;
}
// make operators from FE adaptive to BE
std::string trans_op(const std::string& opt) {
std::string op = string(opt);
if (op == "<") {
op += "<";
} else if (op == ">") {
op += ">";
}
if ("IS" != op) {
if (op == "*=") {
op = "=";
} else if (op == "!*=") {
op = "!=";
}
}
return op;
}
Status DeleteHandler::generate_delete_predicate(const TabletSchema& schema,
const std::vector<TCondition>& conditions,
DeletePredicatePB* del_pred) {
DBUG_EXECUTE_IF("DeleteHandler::generate_delete_predicate.inject_failure", {
return Status::Error<false>(dp->param<int>("error_code"),
dp->param<std::string>("error_msg"));
})
if (conditions.empty()) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid parameters for store_cond. condition_size={}", conditions.size());
}
// Check whether the delete condition meets the requirements
for (const TCondition& condition : conditions) {
RETURN_IF_ERROR(check_condition_valid(schema, condition));
}
// Store delete condition
for (const TCondition& condition : conditions) {
if (condition.condition_values.size() > 1) {
InPredicatePB* in_pred = del_pred->add_in_predicates();
if (condition.__isset.column_unique_id) {
in_pred->set_column_unique_id(condition.column_unique_id);
}
in_pred->set_column_name(condition.column_name);
bool is_not_in = condition.condition_op == "!*=";
in_pred->set_is_not_in(is_not_in);
for (const auto& condition_value : condition.condition_values) {
in_pred->add_values(condition_value);
}
LOG(INFO) << "store one sub-delete condition. condition name=" << in_pred->column_name()
<< "condition size=" << in_pred->values().size();
} else {
// write sub predicate v1 for compactbility
std::string condition_str = construct_sub_predicate(condition);
VLOG_NOTICE << __PRETTY_FUNCTION__ << " condition_str: " << condition_str;
del_pred->add_sub_predicates(condition_str);
DeleteSubPredicatePB* sub_predicate = del_pred->add_sub_predicates_v2();
if (condition.__isset.column_unique_id) {
// only light schema change capable table set this field
sub_predicate->set_column_unique_id(condition.column_unique_id);
} else {
try {
[[maybe_unused]] auto parsed_cond = parse_condition(condition_str);
} catch (const Exception& e) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>(
"failed to parse condition_str, condition={}, error={}",
ThriftDebugString(condition), e.to_string());
}
}
sub_predicate->set_column_name(condition.column_name);
sub_predicate->set_op(trans_op(condition.condition_op));
sub_predicate->set_cond_value(condition.condition_values[0]);
LOG(INFO) << "store one sub-delete condition. condition="
<< fmt::format(" {} {} {}", condition.column_name, condition.condition_op,
condition.condition_values[0]);
}
}
del_pred->set_version(-1);
return Status::OK();
}
Status DeleteHandler::convert_to_sub_pred_v2(DeletePredicatePB* delete_pred,
TabletSchemaSPtr schema) {
if (!delete_pred->sub_predicates().empty() && delete_pred->sub_predicates_v2().empty()) {
for (const auto& condition_str : delete_pred->sub_predicates()) {
auto* sub_pred = delete_pred->add_sub_predicates_v2();
auto condition = parse_condition(condition_str);
const auto& column = *DORIS_TRY(schema->column(condition.column_name));
sub_pred->set_column_unique_id(column.unique_id());
sub_pred->set_column_name(condition.column_name);
sub_pred->set_op(type_to_op_str(condition.condition_op));
sub_pred->set_cond_value(condition.value_str.front());
}
}
auto* in_pred_list = delete_pred->mutable_in_predicates();
for (auto& in_pred : *in_pred_list) {
const auto& column = *DORIS_TRY(schema->column(in_pred.column_name()));
in_pred.set_column_unique_id(column.unique_id());
}
return Status::OK();
}
bool DeleteHandler::is_condition_value_valid(const TabletColumn& column,
const std::string& condition_op,
const string& value_str) {
if ("IS" == condition_op && ("NULL" == value_str || "NOT NULL" == value_str)) {
return true;
}
FieldType field_type = column.type();
switch (field_type) {
case FieldType::OLAP_FIELD_TYPE_TINYINT:
return valid_signed_number<int8_t>(value_str);
case FieldType::OLAP_FIELD_TYPE_SMALLINT:
return valid_signed_number<int16_t>(value_str);
case FieldType::OLAP_FIELD_TYPE_INT:
return valid_signed_number<int32_t>(value_str);
case FieldType::OLAP_FIELD_TYPE_BIGINT:
return valid_signed_number<int64_t>(value_str);
case FieldType::OLAP_FIELD_TYPE_LARGEINT:
return valid_signed_number<int128_t>(value_str);
case FieldType::OLAP_FIELD_TYPE_UNSIGNED_TINYINT:
return valid_unsigned_number<uint8_t>(value_str);
case FieldType::OLAP_FIELD_TYPE_UNSIGNED_SMALLINT:
return valid_unsigned_number<uint16_t>(value_str);
case FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT:
return valid_unsigned_number<uint32_t>(value_str);
case FieldType::OLAP_FIELD_TYPE_UNSIGNED_BIGINT:
return valid_unsigned_number<uint64_t>(value_str);
case FieldType::OLAP_FIELD_TYPE_DECIMAL:
return valid_decimal(value_str, column.precision(), column.frac());
case FieldType::OLAP_FIELD_TYPE_DECIMAL32:
return valid_decimal(value_str, column.precision(), column.frac());
case FieldType::OLAP_FIELD_TYPE_DECIMAL64:
return valid_decimal(value_str, column.precision(), column.frac());
case FieldType::OLAP_FIELD_TYPE_DECIMAL128I:
return valid_decimal(value_str, column.precision(), column.frac());
case FieldType::OLAP_FIELD_TYPE_DECIMAL256:
return valid_decimal(value_str, column.precision(), column.frac());
case FieldType::OLAP_FIELD_TYPE_CHAR:
case FieldType::OLAP_FIELD_TYPE_VARCHAR:
return value_str.size() <= column.length();
case FieldType::OLAP_FIELD_TYPE_STRING:
return value_str.size() <= config::string_type_length_soft_limit_bytes;
case FieldType::OLAP_FIELD_TYPE_DATE:
case FieldType::OLAP_FIELD_TYPE_DATETIME:
case FieldType::OLAP_FIELD_TYPE_DATEV2:
case FieldType::OLAP_FIELD_TYPE_DATETIMEV2:
case FieldType::OLAP_FIELD_TYPE_TIMESTAMPTZ:
return valid_datetime(value_str, column.frac());
case FieldType::OLAP_FIELD_TYPE_BOOL:
return valid_bool(value_str);
case FieldType::OLAP_FIELD_TYPE_IPV4:
return valid_ipv4(value_str);
case FieldType::OLAP_FIELD_TYPE_IPV6:
return valid_ipv6(value_str);
default:
LOG(WARNING) << "unknown field type. [type=" << int(field_type) << "]";
}
return false;
}
Status DeleteHandler::check_condition_valid(const TabletSchema& schema, const TCondition& cond) {
// Check whether the column exists
int32_t field_index = schema.field_index(cond.column_name);
if (field_index < 0) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>("field is not existent. [field_index={}]",
field_index);
}
// Delete condition should only applied on key columns or duplicate key table, and
// the condition column type should not be float or double.
const TabletColumn& column = schema.column(field_index);
if (column.type() == FieldType::OLAP_FIELD_TYPE_DOUBLE ||
column.type() == FieldType::OLAP_FIELD_TYPE_FLOAT) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>("data type is float or double.");
}
// Check operator and operands size are matched.
if ("*=" != cond.condition_op && "!*=" != cond.condition_op &&
cond.condition_values.size() != 1) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid condition value size. [size={}]",
cond.condition_values.size());
}
// Check each operand is valid
for (const auto& condition_value : cond.condition_values) {
if (!is_condition_value_valid(column, cond.condition_op, condition_value)) {
return Status::Error<ErrorCode::INVALID_ARGUMENT>("invalid condition value. [value={}]",
condition_value);
}
}
if (!cond.__isset.column_unique_id) {
LOG(WARNING) << "column=" << cond.column_name
<< " in predicate does not have uid, table id=" << schema.table_id();
// TODO(tsy): make it fail here after FE forbidding hard-link-schema-change
return Status::OK();
}
if (schema.field_index(cond.column_unique_id) == -1) {
const auto& err_msg =
fmt::format("column id does not exists in table={}, schema version={},",
schema.table_id(), schema.schema_version());
return Status::Error<ErrorCode::INVALID_ARGUMENT>(err_msg);
}
if (!iequal(schema.column_by_uid(cond.column_unique_id).name(), cond.column_name)) {
const auto& err_msg = fmt::format(
"colum name={} does not belongs to column uid={}, which "
"column name={}, "
"delete_cond.column_name ={}",
cond.column_name, cond.column_unique_id,
schema.column_by_uid(cond.column_unique_id).name(), cond.column_name);
return Status::Error<ErrorCode::INVALID_ARGUMENT>(err_msg);
}
return Status::OK();
}
PredicateType DeleteHandler::parse_condition_op(const std::string& op_str,
const std::list<std::string>& cond_values) {
if (trim(to_lower(op_str)) == "=") {
return PredicateType::EQ;
} else if (trim(to_lower(op_str)) == "!=") {
return PredicateType::NE;
} else if (trim(to_lower(op_str)) == ">>") {
return PredicateType::GT;
} else if (trim(to_lower(op_str)) == "<<") {
return PredicateType::LT;
} else if (trim(to_lower(op_str)) == ">=") {
return PredicateType::GE;
} else if (trim(to_lower(op_str)) == "<=") {
return PredicateType::LE;
} else if (trim(to_lower(op_str)) == "*=") {
return cond_values.size() > 1 ? PredicateType::IN_LIST : PredicateType::EQ;
} else if (trim(to_lower(op_str)) == "!*=") {
return cond_values.size() > 1 ? PredicateType::NOT_IN_LIST : PredicateType::NE;
} else if (trim(to_lower(op_str)) == "is") {
return to_lower(cond_values.front()) == "null" ? PredicateType::IS_NULL
: PredicateType::IS_NOT_NULL;
} else {
throw Exception(Status::Error<ErrorCode::INVALID_ARGUMENT>(
"invalid condition operator. operator={}", op_str));
}
return PredicateType::UNKNOWN;
}
DeleteHandler::ConditionParseResult DeleteHandler::parse_condition(
const DeleteSubPredicatePB& sub_cond) {
ConditionParseResult res;
if (!sub_cond.has_column_name() || !sub_cond.has_op() || !sub_cond.has_cond_value()) {
throw Exception(Status::Error<ErrorCode::INVALID_ARGUMENT>(
"fail to parse condition. condition={} {} {}", sub_cond.column_name(),
sub_cond.op(), sub_cond.cond_value()));
}
if (sub_cond.has_column_unique_id()) {
res.col_unique_id = sub_cond.column_unique_id();
}
res.column_name = sub_cond.column_name();
res.value_str.push_back(sub_cond.cond_value());
res.condition_op = parse_condition_op(sub_cond.op(), res.value_str);
return res;
}
// clang-format off
// Condition string format, the format is (column_name)(op)(value)
// eg: condition_str="c1 = 1597751948193618247 and length(source)<1;\n;\n"
// column_name: matches "c1", must include FeNameFormat.java COLUMN_NAME_REGEX
// and compactible with any the lagacy
// operator: matches "="
// value: matches "1597751948193618247 and length(source)<1;\n;\n"
//
// For more info, see DeleteHandler::construct_sub_predicates
// FIXME(gavin): This is a tricky implementation, it should not be the final resolution, refactor it.
const char* const CONDITION_STR_PATTERN =
// .----------------- column-name --------------------------. .----------------------- operator ------------------------. .------------ value ----------.
R"(([_a-zA-Z@0-9\s/\p{L}][.a-zA-Z0-9_+-/?@#$%^&*"\s,:\p{L}]*)\s*((?:=)|(?:!=)|(?:>>)|(?:<<)|(?:>=)|(?:<=)|(?:\*=)|(?: IS ))\s*('((?:[\s\S]+)?)'|(?:[\s\S]+)?))";
// '----------------- group 1 ------------------------------' '--------------------- group 2 ---------------------------' | '-- group 4--' |
// match any of: = != >> << >= <= *= " IS " '----------- group 3 ---------'
// match **ANY THING** without(4)
// or with(3) single quote
// clang-format on
RE2 DELETE_HANDLER_REGEX(CONDITION_STR_PATTERN);
DeleteHandler::ConditionParseResult DeleteHandler::parse_condition(
const std::string& condition_str) {
ConditionParseResult res;
std::string col_name, op, value, g4;
bool matched = RE2::FullMatch(condition_str, DELETE_HANDLER_REGEX, &col_name, &op, &value,
&g4); // exact match
if (!matched) {
throw Exception(
Status::InvalidArgument("fail to sub condition. condition={}", condition_str));
}
res.column_name = col_name;
// match string with single quotes, a = b or a = 'b'
if (!g4.empty()) {
res.value_str.push_back(g4);
} else {
res.value_str.push_back(value);
}
res.condition_op = DeleteHandler::parse_condition_op(op, res.value_str);
VLOG_NOTICE << "parsed condition_str: col_name={" << col_name << "} op={" << op << "} val={"
<< res.value_str.back() << "}";
return res;
}
template <typename SubPredType>
requires(std::is_same_v<SubPredType, DeleteSubPredicatePB> or
std::is_same_v<SubPredType, std::string>)
Status DeleteHandler::_parse_column_pred(TabletSchemaSPtr complete_schema,
TabletSchemaSPtr delete_pred_related_schema,
const RepeatedPtrField<SubPredType>& sub_pred_list,
DeleteConditions* delete_conditions) {
for (const auto& sub_predicate : sub_pred_list) {
auto condition = parse_condition(sub_predicate);
int32_t col_unique_id = -1;
if constexpr (std::is_same_v<SubPredType, DeleteSubPredicatePB>) {
if (sub_predicate.has_column_unique_id()) [[likely]] {
col_unique_id = sub_predicate.column_unique_id();
}
}
if (col_unique_id < 0) {
const auto& column =
*DORIS_TRY(delete_pred_related_schema->column(condition.column_name));
col_unique_id = column.unique_id();
}
condition.col_unique_id = col_unique_id;
const auto& column = complete_schema->column_by_uid(col_unique_id);
uint32_t index = complete_schema->field_index(col_unique_id);
std::shared_ptr<ColumnPredicate> predicate;
RETURN_IF_ERROR(parse_to_predicate(index, column.get_vec_type(), condition,
_predicate_arena, predicate));
if (predicate != nullptr) {
delete_conditions->column_predicate_vec.push_back(predicate);
}
}
return Status::OK();
}
Status DeleteHandler::init(TabletSchemaSPtr tablet_schema,
const std::vector<RowsetMetaSharedPtr>& delete_preds, int64_t version) {
DCHECK(!_is_inited) << "reinitialize delete handler.";
DCHECK(version >= 0) << "invalid parameters. version=" << version;
for (const auto& delete_pred : delete_preds) {
// Skip the delete condition with large version
if (delete_pred->version().first > version) {
continue;
}
// Need the tablet schema at the delete condition to parse the accurate column
const auto& delete_pred_related_schema = delete_pred->tablet_schema();
const auto& delete_condition = delete_pred->delete_predicate();
DeleteConditions temp;
temp.filter_version = delete_pred->version().first;
if (!delete_condition.sub_predicates_v2().empty()) {
RETURN_IF_ERROR(_parse_column_pred(tablet_schema, delete_pred_related_schema,
delete_condition.sub_predicates_v2(), &temp));
} else {
// make it compatible with the former versions
RETURN_IF_ERROR(_parse_column_pred(tablet_schema, delete_pred_related_schema,
delete_condition.sub_predicates(), &temp));
}
for (const auto& in_predicate : delete_condition.in_predicates()) {
ConditionParseResult condition;
condition.column_name = in_predicate.column_name();
int32_t col_unique_id = -1;
if (in_predicate.has_column_unique_id()) {
col_unique_id = in_predicate.column_unique_id();
} else {
// if upgrade from version 2.0.x, column_unique_id maybe not set
const auto& pre_column =
*DORIS_TRY(delete_pred_related_schema->column(condition.column_name));
col_unique_id = pre_column.unique_id();
}
if (col_unique_id == -1) {
return Status::Error<ErrorCode::DELETE_INVALID_CONDITION>(
"cannot get column_unique_id for column {}", condition.column_name);
}
condition.col_unique_id = col_unique_id;
condition.condition_op =
in_predicate.is_not_in() ? PredicateType::NOT_IN_LIST : PredicateType::IN_LIST;
for (const auto& value : in_predicate.values()) {
condition.value_str.push_back(value);
}
const auto& column = tablet_schema->column_by_uid(col_unique_id);
uint32_t index = tablet_schema->field_index(col_unique_id);
std::shared_ptr<ColumnPredicate> predicate;
RETURN_IF_ERROR(parse_to_in_predicate(index, column.get_vec_type(), condition,
_predicate_arena, predicate));
temp.column_predicate_vec.push_back(predicate);
}
_del_conds.emplace_back(std::move(temp));
}
_is_inited = true;
return Status::OK();
}
DeleteHandler::~DeleteHandler() {
if (!_is_inited) {
return;
}
_del_conds.clear();
_is_inited = false;
}
void DeleteHandler::get_delete_conditions_after_version(
int64_t version, AndBlockColumnPredicate* and_block_column_predicate_ptr,
std::unordered_map<int32_t, std::vector<std::shared_ptr<const ColumnPredicate>>>*
del_predicates_for_zone_map) const {
for (const auto& del_cond : _del_conds) {
if (del_cond.filter_version > version) {
// now, only query support delete column predicate operator
if (!del_cond.column_predicate_vec.empty()) {
if (del_cond.column_predicate_vec.size() == 1) {
auto single_column_block_predicate = SingleColumnBlockPredicate::create_unique(
del_cond.column_predicate_vec[0]);
and_block_column_predicate_ptr->add_column_predicate(
std::move(single_column_block_predicate));
if (del_predicates_for_zone_map->count(
del_cond.column_predicate_vec[0]->column_id()) < 1) {
del_predicates_for_zone_map->insert(
{del_cond.column_predicate_vec[0]->column_id(),
std::vector<std::shared_ptr<const ColumnPredicate>> {}});
}
(*del_predicates_for_zone_map)[del_cond.column_predicate_vec[0]->column_id()]
.push_back(del_cond.column_predicate_vec[0]);
} else {
auto or_column_predicate = OrBlockColumnPredicate::create_unique();
// build or_column_predicate
// when delete from where a = 1 and b = 2, we can not use del_predicates_for_zone_map to filter zone page,
// so here do not put predicate to del_predicates_for_zone_map,
// refer #17145 for more details.
// // TODO: need refactor design and code to use more version delete and more column delete to filter zone page.
std::for_each(del_cond.column_predicate_vec.cbegin(),
del_cond.column_predicate_vec.cend(),
[&or_column_predicate](
const std::shared_ptr<const ColumnPredicate> predicate) {
or_column_predicate->add_column_predicate(
SingleColumnBlockPredicate::create_unique(predicate));
});
and_block_column_predicate_ptr->add_column_predicate(
std::move(or_column_predicate));
}
}
}
}
}
} // namespace doris