blob: a14cc28e423d7df8333b91f8c545a5aab98cab74 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "udf/python/python_udtf_client.h"
#include "arrow/array/array_nested.h"
#include "arrow/array/array_primitive.h"
#include "arrow/record_batch.h"
#include "arrow/type.h"
#include "common/status.h"
namespace doris {
Status PythonUDTFClient::create(const PythonUDFMeta& func_meta, ProcessPtr process,
PythonUDTFClientPtr* client) {
PythonUDTFClientPtr python_udtf_client = std::make_shared<PythonUDTFClient>();
RETURN_IF_ERROR(python_udtf_client->init(func_meta, std::move(process)));
*client = std::move(python_udtf_client);
return Status::OK();
}
Status PythonUDTFClient::evaluate(const arrow::RecordBatch& input,
std::shared_ptr<arrow::ListArray>* list_array) {
RETURN_IF_ERROR(begin_stream(input.schema()));
RETURN_IF_ERROR(write_batch(input));
// Read the response (ListArray-based)
std::shared_ptr<arrow::RecordBatch> response_batch;
RETURN_IF_ERROR(read_batch(&response_batch));
// Validate response structure: should have a single ListArray column
if (response_batch->num_columns() != 1) {
return Status::InternalError(
fmt::format("Invalid UDTF response: expected 1 column (ListArray), got {}",
response_batch->num_columns()));
}
auto list_array_ptr = response_batch->column(0);
if (list_array_ptr->type_id() != arrow::Type::LIST) {
return Status::InternalError(
fmt::format("Invalid UDTF response: expected ListArray, got type {}",
list_array_ptr->type()->ToString()));
}
*list_array = std::static_pointer_cast<arrow::ListArray>(list_array_ptr);
return Status::OK();
}
} // namespace doris