blob: 092ce7211e90e53c1cd3feabb7ef3da18e9c1eea [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifdef BUILD_RUST_READERS
#include "format/lance/lance_rust_reader.h"
#include <arrow/c/bridge.h>
#include <arrow/record_batch.h>
#include <arrow/result.h>
#include <arrow/type.h>
#include <rapidjson/document.h>
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>
#include "core/block/block.h"
#include "core/block/column_with_type_and_name.h"
#include "core/data_type/data_type_array.h"
#include "core/data_type/data_type_date_or_datetime_v2.h"
#include "core/data_type/data_type_decimal.h"
#include "core/data_type/data_type_nullable.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/data_type_string.h"
#include "core/data_type/primitive_type.h"
#include "format/lance/lance_ffi.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "util/timezone_utils.h"
namespace doris {
#include "common/compile_check_avoid_begin.h"
const std::vector<SlotDescriptor*> LanceRustReader::_empty_slot_descs;
LanceRustReader::LanceRustReader(const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state, RuntimeProfile* /*profile*/,
const TFileRangeDesc& range,
const TFileScanRangeParams* range_params)
: _file_slot_descs(file_slot_descs), _state(state), _range(range), _params(range_params) {
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, _ctzz);
}
LanceRustReader::LanceRustReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
io::IOContext* /*io_ctx*/)
: _file_slot_descs(_empty_slot_descs),
_state(nullptr),
_range(range),
_params(&params),
_schema_only(true) {
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, _ctzz);
}
LanceRustReader::~LanceRustReader() {
static_cast<void>(close());
}
Status LanceRustReader::init_reader() {
return _open_with_json(false);
}
Status LanceRustReader::init_schema_reader() {
return _open_with_json(true);
}
Status LanceRustReader::_open_with_json(bool schema_only) {
std::string uri = _range.path;
if (uri.empty()) {
return Status::InvalidArgument("Lance reader: dataset URI is empty");
}
// Lance datasets are directories (e.g., data.lance/).
// The TVF path may point to a file inside (e.g., data.lance/data/xxx.lance).
// Strip back to the .lance dataset root and extract the fragment file name
// so we only read the specific fragment assigned to this scan range.
std::string fragment_file;
auto lance_pos = uri.find(".lance");
if (lance_pos != std::string::npos) {
auto end = lance_pos + 6; // ".lance" is 6 chars
if (end < uri.size() && uri[end] == '/') {
// Extract the relative path after the dataset root (e.g., "data/xxx.lance")
fragment_file = uri.substr(end + 1);
uri = uri.substr(0, end);
}
}
// Build JSON config for the Rust FFI
rapidjson::Document doc(rapidjson::kObjectType);
auto& alloc = doc.GetAllocator();
doc.AddMember("uri", rapidjson::Value(uri.c_str(), alloc), alloc);
// Pass fragment file name so Rust reads only this fragment, not all
if (!fragment_file.empty() && !schema_only) {
doc.AddMember("fragment_file", rapidjson::Value(fragment_file.c_str(), alloc), alloc);
}
// Columns (only for data reads, not schema-only)
rapidjson::Value cols(rapidjson::kArrayType);
if (!schema_only) {
for (const auto* slot : _file_slot_descs) {
cols.PushBack(rapidjson::Value(slot->col_name().c_str(), alloc), alloc);
}
}
doc.AddMember("columns", cols, alloc);
// Batch size
size_t batch_size = schema_only ? 1 : 4096;
if (!schema_only && _state) {
size_t bs = static_cast<size_t>(_state->query_options().batch_size);
if (bs > 0) batch_size = bs;
}
doc.AddMember("batch_size", static_cast<uint64_t>(batch_size), alloc);
// Version (time travel) from TLanceFileDesc
uint64_t version = 0;
if (_range.__isset.table_format_params && _range.table_format_params.__isset.lance_params &&
_range.table_format_params.lance_params.__isset.version) {
version = static_cast<uint64_t>(_range.table_format_params.lance_params.version);
}
doc.AddMember("version", version, alloc);
// Storage options (S3 credentials from scan range params properties)
rapidjson::Value storage_opts(rapidjson::kObjectType);
if (_range.__isset.table_format_params && _range.table_format_params.__isset.lance_params &&
_range.table_format_params.lance_params.__isset.dataset_uri) {
// If a specific dataset_uri is set in lance_params, use it instead
const auto& lance_uri = _range.table_format_params.lance_params.dataset_uri;
if (!lance_uri.empty()) {
doc.RemoveMember("uri");
doc.AddMember("uri", rapidjson::Value(lance_uri.c_str(), alloc), alloc);
}
}
// Map Doris S3 property keys to lance/object_store standard keys
static const std::vector<std::pair<std::string, std::string>> s3_key_mapping = {
{"AWS_ACCESS_KEY", "aws_access_key_id"},
{"AWS_SECRET_KEY", "aws_secret_access_key"},
{"AWS_TOKEN", "aws_session_token"},
{"AWS_ENDPOINT", "aws_endpoint"},
{"AWS_REGION", "aws_region"},
};
// Read S3 properties from TFileScanRangeParams.properties
if (_params && _params->__isset.properties) {
for (const auto& [k, v] : _params->properties) {
// Map Doris key names to object_store key names
bool mapped = false;
for (const auto& [doris_key, lance_key] : s3_key_mapping) {
if (k == doris_key) {
storage_opts.AddMember(rapidjson::Value(lance_key.c_str(), alloc),
rapidjson::Value(v.c_str(), alloc), alloc);
mapped = true;
break;
}
}
// Pass through any unrecognized keys as-is
if (!mapped && !v.empty()) {
storage_opts.AddMember(rapidjson::Value(k.c_str(), alloc),
rapidjson::Value(v.c_str(), alloc), alloc);
}
}
}
doc.AddMember("storage_options", storage_opts, alloc);
// Serialize to JSON string
rapidjson::StringBuffer buffer;
rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
doc.Accept(writer);
std::string config_json = buffer.GetString();
LanceReaderHandle handle = nullptr;
int32_t rc = lance_reader_open_json(reinterpret_cast<const uint8_t*>(config_json.data()),
config_json.size(), &handle);
if (rc != lance_ffi::LANCE_FFI_OK) {
return _get_ffi_error(rc);
}
_reader_handle = handle;
return Status::OK();
}
Status LanceRustReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) {
if (!_reader_handle) {
return Status::InternalError("Lance reader is not initialized");
}
if (_col_name_to_block_idx.empty()) {
_col_name_to_block_idx = block->get_name_to_pos_map();
}
ArrowSchema c_schema {};
ArrowArray c_array {};
bool is_eof = false;
int64_t batch_bytes = 0;
int32_t rc =
lance_reader_next_batch(_reader_handle, &c_schema, &c_array, &is_eof, &batch_bytes);
if (rc == lance_ffi::LANCE_FFI_EOF || is_eof) {
*read_rows = 0;
*eof = true;
return Status::OK();
}
if (rc != lance_ffi::LANCE_FFI_OK) {
return _get_ffi_error(rc);
}
// Import Arrow C Data Interface into C++ RecordBatch
arrow::Result<std::shared_ptr<arrow::RecordBatch>> import_result =
arrow::ImportRecordBatch(&c_array, &c_schema);
if (!import_result.ok()) {
return Status::InternalError("Failed to import Lance arrow batch: {}",
import_result.status().message());
}
auto record_batch = std::move(import_result).ValueUnsafe();
const auto num_rows = static_cast<size_t>(record_batch->num_rows());
const auto num_columns = record_batch->num_columns();
// Convert Arrow columns to Doris Block columns (same pattern as PaimonCppReader)
auto columns_guard = block->mutate_columns_scoped();
auto& columns = columns_guard.mutable_columns();
for (int c = 0; c < num_columns; ++c) {
const auto& field = record_batch->schema()->field(c);
auto it = _col_name_to_block_idx.find(field->name());
if (it == _col_name_to_block_idx.end()) {
continue;
}
const auto block_pos = it->second;
try {
RETURN_IF_ERROR(columns_guard.get_datatype_by_position(block_pos)
->get_serde()
->read_column_from_arrow(*columns[block_pos],
record_batch->column(c).get(), 0,
num_rows, _ctzz));
} catch (Exception& e) {
return Status::InternalError("Failed to convert Lance arrow to block: {}", e.what());
}
}
*read_rows = num_rows;
*eof = false;
return Status::OK();
}
Status LanceRustReader::_get_columns_impl(
std::unordered_map<std::string, DataTypePtr>* name_to_type) {
if (_schema_only && _reader_handle) {
// In schema-only mode, get columns from the Lance schema
ArrowSchema c_schema {};
int32_t rc = lance_reader_get_schema(_reader_handle, &c_schema);
if (rc != lance_ffi::LANCE_FFI_OK) {
return _get_ffi_error(rc);
}
auto import_result = arrow::ImportSchema(&c_schema);
if (!import_result.ok()) {
return Status::InternalError("Failed to import Lance schema: {}",
import_result.status().message());
}
auto schema = std::move(import_result).ValueUnsafe();
for (const auto& field : schema->fields()) {
auto doris_type = _arrow_type_to_doris_type(field->type());
if (doris_type) {
name_to_type->emplace(field->name(), make_nullable(doris_type));
}
}
return Status::OK();
}
for (const auto* slot : _file_slot_descs) {
name_to_type->emplace(slot->col_name(), slot->type());
}
return Status::OK();
}
Status LanceRustReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) {
if (!_reader_handle) {
return Status::InternalError("Lance reader is not initialized");
}
ArrowSchema c_schema {};
int32_t rc = lance_reader_get_schema(_reader_handle, &c_schema);
if (rc != lance_ffi::LANCE_FFI_OK) {
return _get_ffi_error(rc);
}
auto import_result = arrow::ImportSchema(&c_schema);
if (!import_result.ok()) {
return Status::InternalError("Failed to import Lance schema: {}",
import_result.status().message());
}
auto schema = std::move(import_result).ValueUnsafe();
for (const auto& field : schema->fields()) {
auto doris_type = _arrow_type_to_doris_type(field->type());
if (doris_type) {
col_names->push_back(field->name());
col_types->push_back(make_nullable(doris_type));
}
}
return Status::OK();
}
Status LanceRustReader::close() {
if (_reader_handle) {
lance_reader_close(_reader_handle);
_reader_handle = nullptr;
}
return Status::OK();
}
Status LanceRustReader::_get_ffi_error(int32_t status_code) const {
constexpr size_t kBufSize = 1024;
uint8_t buf[kBufSize];
size_t len = lance_reader_last_error(buf, kBufSize);
std::string msg;
if (len > 0) {
msg.assign(reinterpret_cast<const char*>(buf), len);
} else {
msg = fmt::format("Lance FFI error code: {}", status_code);
}
return Status::InternalError("Rust Lance reader: {}", msg);
}
DataTypePtr LanceRustReader::_arrow_type_to_doris_type(
const std::shared_ptr<arrow::DataType>& arrow_type) {
// Arrow type IDs: STRING and UTF8 are the same value in arrow-cpp.
// LARGE_STRING and LARGE_UTF8 are the same. Doris has no unsigned int types
// except UInt8 (used for BOOLEAN), so unsigned ints are widened to signed.
switch (arrow_type->id()) {
case arrow::Type::BOOL:
return std::make_shared<DataTypeUInt8>();
case arrow::Type::INT8:
case arrow::Type::UINT8:
return std::make_shared<DataTypeInt8>();
case arrow::Type::INT16:
case arrow::Type::UINT16:
return std::make_shared<DataTypeInt16>();
case arrow::Type::INT32:
case arrow::Type::UINT32:
return std::make_shared<DataTypeInt32>();
case arrow::Type::INT64:
case arrow::Type::UINT64:
return std::make_shared<DataTypeInt64>();
case arrow::Type::HALF_FLOAT:
case arrow::Type::FLOAT:
return std::make_shared<DataTypeFloat32>();
case arrow::Type::DOUBLE:
return std::make_shared<DataTypeFloat64>();
case arrow::Type::STRING:
case arrow::Type::LARGE_STRING:
return std::make_shared<DataTypeString>();
case arrow::Type::BINARY:
case arrow::Type::LARGE_BINARY:
return std::make_shared<DataTypeString>();
case arrow::Type::DATE32:
case arrow::Type::DATE64:
return std::make_shared<DataTypeDateV2>();
case arrow::Type::TIMESTAMP:
return std::make_shared<DataTypeDateTimeV2>();
case arrow::Type::DECIMAL128: {
auto decimal_type = std::static_pointer_cast<arrow::Decimal128Type>(arrow_type);
return create_decimal(decimal_type->precision(), decimal_type->scale(), false);
}
case arrow::Type::LIST:
case arrow::Type::LARGE_LIST: {
auto list_type = std::static_pointer_cast<arrow::BaseListType>(arrow_type);
auto inner = _arrow_type_to_doris_type(list_type->value_type());
if (inner) {
return std::make_shared<DataTypeArray>(make_nullable(inner));
}
return nullptr;
}
case arrow::Type::FIXED_SIZE_LIST: {
auto fsl_type = std::static_pointer_cast<arrow::FixedSizeListType>(arrow_type);
auto inner = _arrow_type_to_doris_type(fsl_type->value_type());
if (inner) {
return std::make_shared<DataTypeArray>(make_nullable(inner));
}
return nullptr;
}
default:
// Unsupported types fall back to string
return std::make_shared<DataTypeString>();
}
}
#include "common/compile_check_avoid_end.h"
} // namespace doris
#endif // BUILD_RUST_READERS