blob: 85428b4033bbb5db08bda00fdc37217bc2976617 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "udf/python/python_udf_meta.h"
#include <arrow/util/base64.h>
#include <fmt/core.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include <sstream>
#include "common/status.h"
#include "util/arrow/utils.h"
#include "util/string_util.h"
namespace doris {
Status PythonUDFMeta::convert_types_to_schema(const vectorized::DataTypes& types,
const std::string& timezone,
std::shared_ptr<arrow::Schema>* schema) {
assert(!types.empty());
arrow::SchemaBuilder builder;
for (size_t i = 0; i < types.size(); ++i) {
std::shared_ptr<arrow::DataType> arrow_type;
RETURN_IF_ERROR(convert_to_arrow_type(types[i], &arrow_type, timezone));
std::shared_ptr<arrow::Field> field = std::make_shared<arrow::Field>(
"arg" + std::to_string(i), arrow_type, types[i]->is_nullable());
RETURN_DORIS_STATUS_IF_ERROR(builder.AddField(field));
}
RETURN_DORIS_STATUS_IF_RESULT_ERROR(schema, builder.Finish());
return Status::OK();
}
Status PythonUDFMeta::serialize_arrow_schema(const std::shared_ptr<arrow::Schema>& schema,
std::shared_ptr<arrow::Buffer>* out) {
RETURN_DORIS_STATUS_IF_RESULT_ERROR(
out, arrow::ipc::SerializeSchema(*schema, arrow::default_memory_pool()));
return Status::OK();
}
/*
json format:
{
"name": "xxx",
"symbol": "xxx",
"location": "xxx",
"udf_load_type": 0 or 1,
"client_type": 0 (UDF) or 1 (UDAF) or 2 (UDTF),
"runtime_version": "x.xx.xx",
"always_nullable": true,
"inline_code": "base64_inline_code",
"input_types": "base64_input_types",
"return_type": "base64_return_type"
}
*/
Status PythonUDFMeta::serialize_to_json(std::string* json_str) const {
rapidjson::Document doc;
doc.SetObject();
auto& allocator = doc.GetAllocator();
doc.AddMember("name", rapidjson::Value().SetString(name.c_str(), allocator), allocator);
doc.AddMember("symbol", rapidjson::Value().SetString(symbol.c_str(), allocator), allocator);
doc.AddMember("location", rapidjson::Value().SetString(location.c_str(), allocator), allocator);
doc.AddMember("udf_load_type", rapidjson::Value().SetInt(static_cast<int>(type)), allocator);
doc.AddMember("client_type", rapidjson::Value().SetInt(static_cast<int>(client_type)),
allocator);
doc.AddMember("runtime_version",
rapidjson::Value().SetString(runtime_version.c_str(), allocator), allocator);
doc.AddMember("always_nullable", rapidjson::Value().SetBool(always_nullable), allocator);
{
// Serialize base64 inline code to json
std::string base64_str = arrow::util::base64_encode(inline_code);
doc.AddMember("inline_code", rapidjson::Value().SetString(base64_str.c_str(), allocator),
allocator);
}
{
// Serialize base64 input types to json
std::shared_ptr<arrow::Schema> input_schema;
RETURN_IF_ERROR(convert_types_to_schema(input_types, TimezoneUtils::default_time_zone,
&input_schema));
std::shared_ptr<arrow::Buffer> input_schema_buffer;
RETURN_IF_ERROR(serialize_arrow_schema(input_schema, &input_schema_buffer));
std::string base64_str =
arrow::util::base64_encode({input_schema_buffer->data_as<char>(),
static_cast<size_t>(input_schema_buffer->size())});
doc.AddMember("input_types", rapidjson::Value().SetString(base64_str.c_str(), allocator),
allocator);
}
{
// Serialize base64 return type to json
std::shared_ptr<arrow::Schema> return_schema;
RETURN_IF_ERROR(convert_types_to_schema({return_type}, TimezoneUtils::default_time_zone,
&return_schema));
std::shared_ptr<arrow::Buffer> return_schema_buffer;
RETURN_IF_ERROR(serialize_arrow_schema(return_schema, &return_schema_buffer));
std::string base64_str =
arrow::util::base64_encode({return_schema_buffer->data_as<char>(),
static_cast<size_t>(return_schema_buffer->size())});
doc.AddMember("return_type", rapidjson::Value().SetString(base64_str.c_str(), allocator),
allocator);
}
// Convert document to json string
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
doc.Accept(writer);
*json_str = std::string(buffer.GetString(), buffer.GetSize());
return Status::OK();
}
std::string PythonUDFMeta::to_string() const {
std::stringstream input_types_ss;
input_types_ss << "<";
for (size_t i = 0; i < input_types.size(); ++i) {
input_types_ss << input_types[i]->get_name();
if (i != input_types.size() - 1) {
input_types_ss << ", ";
}
}
input_types_ss << ">";
return fmt::format(
"[name: {}, symbol: {}, location: {}, runtime_version: {}, always_nullable: {}, "
"inline_code: {}][input_types: {}][return_type: {}]",
name, symbol, location, runtime_version, always_nullable, inline_code,
input_types_ss.str(), return_type->get_name());
}
Status PythonUDFMeta::check() const {
if (trim(name).empty()) {
return Status::InvalidArgument("Python UDF name is empty");
}
if (trim(symbol).empty()) {
return Status::InvalidArgument("Python UDF symbol is empty");
}
if (trim(runtime_version).empty()) {
return Status::InvalidArgument("Python UDF runtime version is empty");
}
if (input_types.empty()) {
return Status::InvalidArgument("Python UDF input types is empty");
}
if (!return_type) {
return Status::InvalidArgument("Python UDF return type is empty");
}
if (type == PythonUDFLoadType::UNKNOWN) {
return Status::InvalidArgument(
"Python UDF load type is invalid, please check inline code or file path");
}
if (type == PythonUDFLoadType::MODULE) {
if (trim(location).empty()) {
return Status::InvalidArgument("Non-inline Python UDF location is empty");
}
if (trim(checksum).empty()) {
return Status::InvalidArgument("Non-inline Python UDF checksum is empty");
}
}
return Status::OK();
}
} // namespace doris