blob: bb130415ae28f4427c2773be57ce946fb60c46cf [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exprs/rpc_fn.h"
#include <fmt/format.h>
#include "exprs/rpc_fn_comm.h"
#include "runtime/fragment_mgr.h"
#include "util/brpc_client_cache.h"
#include "vec/columns/column.h"
#include "vec/columns/column_vector.h"
#include "vec/core/block.h"
#include "vec/core/column_numbers.h"
#include "vec/data_types/data_type_decimal.h"
#include "vec/data_types/data_type_nullable.h"
namespace doris {
RPCFn::RPCFn(RuntimeState* state, const TFunction& fn, int fn_ctx_id, bool is_agg)
: _state(state), _fn(fn), _fn_ctx_id(fn_ctx_id), _is_agg(is_agg) {
_client = ExecEnv::GetInstance()->brpc_function_client_cache()->get_client(_server_addr);
if (!_is_agg) {
_function_name = _fn.scalar_fn.symbol;
_server_addr = _fn.hdfs_location;
_signature = fmt::format("{}: [{}/{}]", _fn.name.function_name, _fn.hdfs_location,
_fn.scalar_fn.symbol);
}
}
RPCFn::RPCFn(const TFunction& fn, bool is_agg) : RPCFn(nullptr, fn, -1, is_agg) {}
RPCFn::RPCFn(RuntimeState* state, const TFunction& fn, AggregationStep step, bool is_agg)
: RPCFn(nullptr, fn, -1, is_agg) {
_step = step;
DCHECK(is_agg) << "Only used for agg fns";
switch (_step) {
case INIT: {
_function_name = _fn.aggregate_fn.init_fn_symbol;
_server_addr = _fn.hdfs_location;
_signature = fmt::format("{}: [{}/{}]", _fn.name.function_name, _fn.hdfs_location,
_fn.aggregate_fn.init_fn_symbol);
break;
}
case UPDATE: {
_function_name = _fn.aggregate_fn.init_fn_symbol;
break;
}
case MERGE: {
_function_name = _fn.aggregate_fn.merge_fn_symbol;
break;
}
case SERIALIZE: {
_function_name = _fn.aggregate_fn.serialize_fn_symbol;
break;
}
case GET_VALUE: {
_function_name = _fn.aggregate_fn.get_value_fn_symbol;
break;
}
case FINALIZE: {
_function_name = _fn.aggregate_fn.finalize_fn_symbol;
break;
}
case REMOVE: {
_function_name = _fn.aggregate_fn.remove_fn_symbol;
break;
}
default:
CHECK(false) << "invalid AggregationStep: " << _step;
break;
}
_server_addr = _fn.hdfs_location;
_signature = fmt::format("{}: [{}/{}]", _fn.name.function_name, _server_addr, _function_name);
}
Status RPCFn::call_internal(ExprContext* context, TupleRow* row, PFunctionCallResponse* response,
const std::vector<Expr*>& exprs) {
FunctionContext* fn_ctx = context->fn_context(_fn_ctx_id);
PFunctionCallRequest request;
request.set_function_name(_function_name);
for (int i = 0; i < exprs.size(); ++i) {
PValues* arg = request.add_args();
void* src_slot = context->get_value(exprs[i], row);
PGenericType* ptype = arg->mutable_type();
if (src_slot == nullptr) {
arg->set_has_null(true);
arg->add_null_map(true);
} else {
arg->set_has_null(false);
}
switch (exprs[i]->type().type) {
case TYPE_BOOLEAN: {
ptype->set_id(PGenericType::BOOLEAN);
arg->add_bool_value(*(bool*)src_slot);
break;
}
case TYPE_TINYINT: {
ptype->set_id(PGenericType::INT8);
arg->add_int32_value(*(int8_t*)src_slot);
break;
}
case TYPE_SMALLINT: {
ptype->set_id(PGenericType::INT16);
arg->add_int32_value(*(int16_t*)src_slot);
break;
}
case TYPE_INT: {
ptype->set_id(PGenericType::INT32);
arg->add_int32_value(*(int*)src_slot);
break;
}
case TYPE_BIGINT: {
ptype->set_id(PGenericType::INT64);
arg->add_int64_value(*(int64_t*)src_slot);
break;
}
case TYPE_LARGEINT: {
ptype->set_id(PGenericType::INT128);
char buffer[sizeof(__int128)];
memcpy(buffer, src_slot, sizeof(__int128));
arg->add_bytes_value(buffer, sizeof(__int128));
break;
}
case TYPE_DOUBLE: {
ptype->set_id(PGenericType::DOUBLE);
arg->add_double_value(*(double*)src_slot);
break;
}
case TYPE_FLOAT: {
ptype->set_id(PGenericType::FLOAT);
arg->add_float_value(*(float*)src_slot);
break;
}
case TYPE_VARCHAR:
case TYPE_STRING:
case TYPE_CHAR: {
ptype->set_id(PGenericType::STRING);
StringValue value = *reinterpret_cast<StringValue*>(src_slot);
arg->add_string_value(value.ptr, value.len);
break;
}
case TYPE_HLL: {
ptype->set_id(PGenericType::HLL);
StringValue value = *reinterpret_cast<StringValue*>(src_slot);
arg->add_string_value(value.ptr, value.len);
break;
}
case TYPE_OBJECT: {
ptype->set_id(PGenericType::BITMAP);
StringValue value = *reinterpret_cast<StringValue*>(src_slot);
arg->add_string_value(value.ptr, value.len);
break;
}
case TYPE_DECIMALV2: {
ptype->set_id(PGenericType::DECIMAL128);
ptype->mutable_decimal_type()->set_precision(exprs[i]->type().precision);
ptype->mutable_decimal_type()->set_scale(exprs[i]->type().scale);
char buffer[sizeof(__int128)];
memcpy(buffer, src_slot, sizeof(__int128));
arg->add_bytes_value(buffer, sizeof(__int128));
break;
}
case TYPE_DATE: {
ptype->set_id(PGenericType::DATE);
const auto* time_val = (const DateTimeValue*)(src_slot);
PDateTime* date_time = arg->add_datetime_value();
date_time->set_day(time_val->day());
date_time->set_month(time_val->month());
date_time->set_year(time_val->year());
break;
}
case TYPE_DATETIME: {
ptype->set_id(PGenericType::DATETIME);
const auto* time_val = (const DateTimeValue*)(src_slot);
PDateTime* date_time = arg->add_datetime_value();
date_time->set_day(time_val->day());
date_time->set_month(time_val->month());
date_time->set_year(time_val->year());
date_time->set_hour(time_val->hour());
date_time->set_minute(time_val->minute());
date_time->set_second(time_val->second());
date_time->set_microsecond(time_val->microsecond());
break;
}
case TYPE_TIME: {
ptype->set_id(PGenericType::DATETIME);
const auto* time_val = (const DateTimeValue*)(src_slot);
PDateTime* date_time = arg->add_datetime_value();
date_time->set_hour(time_val->hour());
date_time->set_minute(time_val->minute());
date_time->set_second(time_val->second());
date_time->set_microsecond(time_val->microsecond());
break;
}
default: {
std::string error_msg =
fmt::format("data time not supported: {}", exprs[i]->type().type);
fn_ctx->set_error(error_msg.c_str());
cancel(error_msg);
break;
}
}
}
brpc::Controller cntl;
_client->fn_call(&cntl, &request, response, nullptr);
if (cntl.Failed()) {
std::string error_msg =
fmt::format("call rpc function {} failed: {}", _signature, cntl.ErrorText());
fn_ctx->set_error(error_msg.c_str());
cancel(error_msg);
return Status::InternalError(error_msg);
}
if (!response->has_status() || response->result_size() == 0) {
std::string error_msg =
fmt::format("call rpc function {} failed: status or result is not set: {}",
_signature, response->status().DebugString());
fn_ctx->set_error(error_msg.c_str());
cancel(error_msg);
return Status::InternalError(error_msg);
}
if (response->status().status_code() != 0) {
std::string error_msg = fmt::format("call rpc function {} failed: {}", _signature,
response->status().DebugString());
fn_ctx->set_error(error_msg.c_str());
cancel(error_msg);
return Status::InternalError(error_msg);
}
return Status::OK();
}
void RPCFn::cancel(const std::string& msg) {
_state->exec_env()->fragment_mgr()->cancel(_state->fragment_instance_id(),
PPlanFragmentCancelReason::CALL_RPC_ERROR, msg);
}
Status RPCFn::vec_call(FunctionContext* context, vectorized::Block& block,
const vectorized::ColumnNumbers& arguments, size_t result,
size_t input_rows_count) {
PFunctionCallRequest request;
PFunctionCallResponse response;
request.set_function_name(_function_name);
convert_block_to_proto(block, arguments, input_rows_count, &request);
brpc::Controller cntl;
_client->fn_call(&cntl, &request, &response, nullptr);
if (cntl.Failed()) {
return Status::InternalError("call to rpc function {} failed: {}", _signature,
cntl.ErrorText());
}
if (!response.has_status() || response.result_size() == 0) {
return Status::InternalError("call rpc function {} failed: status or result is not set.",
_signature);
}
if (response.status().status_code() != 0) {
return Status::InternalError("call to rpc function {} failed: {}", _signature,
response.status().DebugString());
}
convert_to_block(block, response.result(0), result);
return Status::OK();
}
} // namespace doris