| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "jni_connector.h" |
| |
| #include <glog/logging.h> |
| |
| #include <sstream> |
| #include <variant> |
| |
| #include "jni.h" |
| #include "runtime/decimalv2_value.h" |
| #include "runtime/define_primitive_type.h" |
| #include "runtime/runtime_state.h" |
| #include "util/jni-util.h" |
| #include "vec/columns/column_array.h" |
| #include "vec/columns/column_map.h" |
| #include "vec/columns/column_nullable.h" |
| #include "vec/columns/column_string.h" |
| #include "vec/columns/column_struct.h" |
| #include "vec/columns/column_varbinary.h" |
| #include "vec/core/block.h" |
| #include "vec/core/types.h" |
| #include "vec/data_types/data_type_array.h" |
| #include "vec/data_types/data_type_map.h" |
| #include "vec/data_types/data_type_nullable.h" |
| #include "vec/data_types/data_type_struct.h" |
| #include "vec/data_types/data_type_varbinary.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| class RuntimeProfile; |
| } // namespace doris |
| |
| namespace doris::vectorized { |
| |
| #define FOR_FIXED_LENGTH_TYPES(M) \ |
| M(PrimitiveType::TYPE_TINYINT, ColumnInt8, Int8) \ |
| M(PrimitiveType::TYPE_BOOLEAN, ColumnUInt8, UInt8) \ |
| M(PrimitiveType::TYPE_SMALLINT, ColumnInt16, Int16) \ |
| M(PrimitiveType::TYPE_INT, ColumnInt32, Int32) \ |
| M(PrimitiveType::TYPE_BIGINT, ColumnInt64, Int64) \ |
| M(PrimitiveType::TYPE_LARGEINT, ColumnInt128, Int128) \ |
| M(PrimitiveType::TYPE_FLOAT, ColumnFloat32, Float32) \ |
| M(PrimitiveType::TYPE_DOUBLE, ColumnFloat64, Float64) \ |
| M(PrimitiveType::TYPE_DECIMALV2, ColumnDecimal128V2, Int128) \ |
| M(PrimitiveType::TYPE_DECIMAL128I, ColumnDecimal128V3, Int128) \ |
| M(PrimitiveType::TYPE_DECIMAL32, ColumnDecimal32, Int32) \ |
| M(PrimitiveType::TYPE_DECIMAL64, ColumnDecimal64, Int64) \ |
| M(PrimitiveType::TYPE_DATE, ColumnDate, Int64) \ |
| M(PrimitiveType::TYPE_DATEV2, ColumnDateV2, UInt32) \ |
| M(PrimitiveType::TYPE_DATETIME, ColumnDateTime, Int64) \ |
| M(PrimitiveType::TYPE_DATETIMEV2, ColumnDateTimeV2, UInt64) \ |
| M(PrimitiveType::TYPE_IPV4, ColumnIPv4, IPv4) \ |
| M(PrimitiveType::TYPE_IPV6, ColumnIPv6, IPv6) |
| |
| Status JniConnector::open(RuntimeState* state, RuntimeProfile* profile) { |
| _state = state; |
| _profile = profile; |
| ADD_TIMER(_profile, _connector_name.c_str()); |
| _open_scanner_time = ADD_CHILD_TIMER(_profile, "OpenScannerTime", _connector_name.c_str()); |
| _java_scan_time = ADD_CHILD_TIMER(_profile, "JavaScanTime", _connector_name.c_str()); |
| _java_append_data_time = |
| ADD_CHILD_TIMER(_profile, "JavaAppendDataTime", _connector_name.c_str()); |
| _java_create_vector_table_time = |
| ADD_CHILD_TIMER(_profile, "JavaCreateVectorTableTime", _connector_name.c_str()); |
| _fill_block_time = ADD_CHILD_TIMER(_profile, "FillBlockTime", _connector_name.c_str()); |
| _max_time_split_weight_counter = _profile->add_conditition_counter( |
| "MaxTimeSplitWeight", TUnit::UNIT, [](int64_t _c, int64_t c) { return c > _c; }, |
| _connector_name.c_str()); |
| _java_scan_watcher = 0; |
| // cannot put the env into fields, because frames in an env object is limited |
| // to avoid limited frames in a thread, we should get local env in a method instead of in whole object. |
| JNIEnv* env = nullptr; |
| int batch_size = 0; |
| if (!_is_table_schema) { |
| batch_size = _state->batch_size(); |
| } |
| RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); |
| SCOPED_RAW_TIMER(&_jni_scanner_open_watcher); |
| _scanner_params.emplace("time_zone", _state->timezone()); |
| RETURN_IF_ERROR(_init_jni_scanner(env, batch_size)); |
| // Call org.apache.doris.common.jni.JniScanner#open |
| env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_open); |
| RETURN_ERROR_IF_EXC(env); |
| _scanner_opened = true; |
| return Status::OK(); |
| } |
| |
| Status JniConnector::init( |
| const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { |
| // TODO: This logic need to be changed. |
| // See the comment of "predicates" field in JniScanner.java |
| |
| // _generate_predicates(colname_to_value_range); |
| // if (_predicates_length != 0 && _predicates != nullptr) { |
| // int64_t predicates_address = (int64_t)_predicates.get(); |
| // // We can call org.apache.doris.common.jni.vec.ScanPredicate#parseScanPredicates to parse the |
| // // serialized predicates in java side. |
| // _scanner_params.emplace("push_down_predicates", std::to_string(predicates_address)); |
| // } |
| return Status::OK(); |
| } |
| |
| Status JniConnector::get_next_block(Block* block, size_t* read_rows, bool* eof) { |
| // Call org.apache.doris.common.jni.JniScanner#getNextBatchMeta |
| // return the address of meta information |
| JNIEnv* env = nullptr; |
| RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); |
| long meta_address = 0; |
| { |
| SCOPED_RAW_TIMER(&_java_scan_watcher); |
| meta_address = env->CallLongMethod(_jni_scanner_obj, _jni_scanner_get_next_batch); |
| } |
| RETURN_ERROR_IF_EXC(env); |
| if (meta_address == 0) { |
| // Address == 0 when there's no data in scanner |
| *read_rows = 0; |
| *eof = true; |
| return Status::OK(); |
| } |
| _set_meta(meta_address); |
| long num_rows = _table_meta.next_meta_as_long(); |
| if (num_rows == 0) { |
| *read_rows = 0; |
| *eof = true; |
| return Status::OK(); |
| } |
| RETURN_IF_ERROR(_fill_block(block, num_rows)); |
| *read_rows = num_rows; |
| *eof = false; |
| env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table); |
| RETURN_ERROR_IF_EXC(env); |
| _has_read += num_rows; |
| return Status::OK(); |
| } |
| |
| Status JniConnector::get_table_schema(std::string& table_schema_str) { |
| JNIEnv* env = nullptr; |
| RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); |
| |
| jstring jstr = (jstring)env->CallObjectMethod(_jni_scanner_obj, _jni_scanner_get_table_schema); |
| RETURN_ERROR_IF_EXC(env); |
| |
| const char* cstr = env->GetStringUTFChars(jstr, nullptr); |
| RETURN_ERROR_IF_EXC(env); |
| |
| if (cstr == nullptr) { |
| return Status::RuntimeError("GetStringUTFChars returned null"); |
| } |
| |
| table_schema_str = std::string(cstr); // copy to std::string |
| env->ReleaseStringUTFChars(jstr, cstr); |
| env->DeleteLocalRef(jstr); |
| return Status::OK(); |
| } |
| |
| Status JniConnector::get_statistics(JNIEnv* env, std::map<std::string, std::string>* result) { |
| result->clear(); |
| jobject metrics = env->CallObjectMethod(_jni_scanner_obj, _jni_scanner_get_statistics); |
| jthrowable exc = (env)->ExceptionOccurred(); |
| if (exc != nullptr) { |
| LOG(WARNING) << "get_statistics has error: " |
| << JniUtil::GetJniExceptionMsg(env).to_string(); |
| env->DeleteLocalRef(metrics); |
| return Status::OK(); |
| } |
| RETURN_IF_ERROR(JniUtil::convert_to_cpp_map(env, metrics, result)); |
| env->DeleteLocalRef(metrics); |
| return Status::OK(); |
| } |
| |
| Status JniConnector::close() { |
| if (!_closed) { |
| JNIEnv* env = nullptr; |
| RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); |
| if (_scanner_opened && _jni_scanner_obj != nullptr) { |
| COUNTER_UPDATE(_open_scanner_time, _jni_scanner_open_watcher); |
| COUNTER_UPDATE(_fill_block_time, _fill_block_watcher); |
| |
| RETURN_ERROR_IF_EXC(env); |
| int64_t _append = (int64_t)env->CallLongMethod(_jni_scanner_obj, |
| _jni_scanner_get_append_data_time); |
| RETURN_ERROR_IF_EXC(env); |
| COUNTER_UPDATE(_java_append_data_time, _append); |
| |
| int64_t _create = (int64_t)env->CallLongMethod( |
| _jni_scanner_obj, _jni_scanner_get_create_vector_table_time); |
| RETURN_ERROR_IF_EXC(env); |
| COUNTER_UPDATE(_java_create_vector_table_time, _create); |
| |
| COUNTER_UPDATE(_java_scan_time, _java_scan_watcher - _append - _create); |
| |
| _max_time_split_weight_counter->conditional_update( |
| _jni_scanner_open_watcher + _fill_block_watcher + _java_scan_watcher, |
| _self_split_weight); |
| |
| // _fill_block may be failed and returned, we should release table in close. |
| // org.apache.doris.common.jni.JniScanner#releaseTable is idempotent |
| env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table); |
| RETURN_ERROR_IF_EXC(env); |
| env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_close); |
| RETURN_ERROR_IF_EXC(env); |
| env->DeleteGlobalRef(_jni_scanner_obj); |
| RETURN_ERROR_IF_EXC(env); |
| } |
| if (_jni_scanner_cls != nullptr) { |
| // _jni_scanner_cls may be null if init connector failed |
| env->DeleteGlobalRef(_jni_scanner_cls); |
| } |
| _closed = true; |
| jthrowable exc = (env)->ExceptionOccurred(); |
| if (exc != nullptr) { |
| // Ensure successful resource release |
| throw Exception(Status::FatalError("Failed to release jni resource: {}", |
| JniUtil::GetJniExceptionMsg(env).to_string())); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status JniConnector::_init_jni_scanner(JNIEnv* env, int batch_size) { |
| RETURN_IF_ERROR( |
| JniUtil::get_jni_scanner_class(env, _connector_class.c_str(), &_jni_scanner_cls)); |
| if (_jni_scanner_cls == nullptr) [[unlikely]] { |
| if (env->ExceptionOccurred()) { |
| env->ExceptionDescribe(); |
| } |
| return Status::InternalError("Fail to get JniScanner class."); |
| } |
| RETURN_ERROR_IF_EXC(env); |
| |
| jmethodID scanner_constructor = |
| env->GetMethodID(_jni_scanner_cls, "<init>", "(ILjava/util/Map;)V"); |
| RETURN_ERROR_IF_EXC(env); |
| |
| // prepare constructor parameters |
| jobject hashmap_object; |
| RETURN_IF_ERROR(JniUtil::convert_to_java_map(env, _scanner_params, &hashmap_object)); |
| jobject jni_scanner_obj = |
| env->NewObject(_jni_scanner_cls, scanner_constructor, batch_size, hashmap_object); |
| |
| RETURN_ERROR_IF_EXC(env); |
| |
| // prepare constructor parameters |
| env->DeleteGlobalRef(hashmap_object); |
| RETURN_ERROR_IF_EXC(env); |
| |
| _jni_scanner_open = env->GetMethodID(_jni_scanner_cls, "open", "()V"); |
| RETURN_ERROR_IF_EXC(env); |
| _jni_scanner_get_next_batch = env->GetMethodID(_jni_scanner_cls, "getNextBatchMeta", "()J"); |
| RETURN_ERROR_IF_EXC(env); |
| _jni_scanner_get_append_data_time = |
| env->GetMethodID(_jni_scanner_cls, "getAppendDataTime", "()J"); |
| RETURN_ERROR_IF_EXC(env); |
| _jni_scanner_get_create_vector_table_time = |
| env->GetMethodID(_jni_scanner_cls, "getCreateVectorTableTime", "()J"); |
| RETURN_ERROR_IF_EXC(env); |
| _jni_scanner_get_table_schema = |
| env->GetMethodID(_jni_scanner_cls, "getTableSchema", "()Ljava/lang/String;"); |
| RETURN_ERROR_IF_EXC(env); |
| _jni_scanner_close = env->GetMethodID(_jni_scanner_cls, "close", "()V"); |
| _jni_scanner_release_column = env->GetMethodID(_jni_scanner_cls, "releaseColumn", "(I)V"); |
| _jni_scanner_release_table = env->GetMethodID(_jni_scanner_cls, "releaseTable", "()V"); |
| _jni_scanner_get_statistics = |
| env->GetMethodID(_jni_scanner_cls, "getStatistics", "()Ljava/util/Map;"); |
| RETURN_ERROR_IF_EXC(env); |
| RETURN_IF_ERROR(JniUtil::LocalToGlobalRef(env, jni_scanner_obj, &_jni_scanner_obj)); |
| env->DeleteLocalRef(jni_scanner_obj); |
| RETURN_ERROR_IF_EXC(env); |
| return Status::OK(); |
| } |
| |
| Status JniConnector::fill_block(Block* block, const ColumnNumbers& arguments, long table_address) { |
| if (table_address == 0) { |
| return Status::InternalError("table_address is 0"); |
| } |
| TableMetaAddress table_meta(table_address); |
| long num_rows = table_meta.next_meta_as_long(); |
| for (size_t i : arguments) { |
| if (block->get_by_position(i).column.get() == nullptr) { |
| auto return_type = block->get_data_type(i); |
| bool result_nullable = return_type->is_nullable(); |
| ColumnUInt8::MutablePtr null_col = nullptr; |
| if (result_nullable) { |
| return_type = remove_nullable(return_type); |
| null_col = ColumnUInt8::create(); |
| } |
| auto res_col = return_type->create_column(); |
| if (result_nullable) { |
| block->replace_by_position( |
| i, ColumnNullable::create(std::move(res_col), std::move(null_col))); |
| } else { |
| block->replace_by_position(i, std::move(res_col)); |
| } |
| } else if (is_column_const(*(block->get_by_position(i).column))) { |
| auto doris_column = block->get_by_position(i).column->convert_to_full_column_if_const(); |
| bool is_nullable = block->get_by_position(i).type->is_nullable(); |
| block->replace_by_position(i, is_nullable ? make_nullable(doris_column) : doris_column); |
| } |
| auto& column_with_type_and_name = block->get_by_position(i); |
| auto& column_ptr = column_with_type_and_name.column; |
| auto& column_type = column_with_type_and_name.type; |
| RETURN_IF_ERROR(_fill_column(table_meta, column_ptr, column_type, num_rows)); |
| } |
| return Status::OK(); |
| } |
| |
| Status JniConnector::_fill_block(Block* block, size_t num_rows) { |
| SCOPED_RAW_TIMER(&_fill_block_watcher); |
| JNIEnv* env = nullptr; |
| RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); |
| for (int i = 0; i < _column_names.size(); ++i) { |
| auto& column_with_type_and_name = block->get_by_name(_column_names[i]); |
| auto& column_ptr = column_with_type_and_name.column; |
| auto& column_type = column_with_type_and_name.type; |
| RETURN_IF_ERROR(_fill_column(_table_meta, column_ptr, column_type, num_rows)); |
| // Column is not released when _fill_column failed. It will be released when releasing table. |
| env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_column, i); |
| RETURN_ERROR_IF_EXC(env); |
| } |
| return Status::OK(); |
| } |
| |
| Status JniConnector::_fill_column(TableMetaAddress& address, ColumnPtr& doris_column, |
| const DataTypePtr& data_type, size_t num_rows) { |
| auto logical_type = data_type->get_primitive_type(); |
| void* null_map_ptr = address.next_meta_as_ptr(); |
| if (null_map_ptr == nullptr) { |
| // org.apache.doris.common.jni.vec.ColumnType.Type#UNSUPPORTED will set column address as 0 |
| return Status::InternalError("Unsupported type {} in java side", data_type->get_name()); |
| } |
| MutableColumnPtr data_column; |
| if (doris_column->is_nullable()) { |
| auto* nullable_column = |
| reinterpret_cast<vectorized::ColumnNullable*>(doris_column->assume_mutable().get()); |
| data_column = nullable_column->get_nested_column_ptr(); |
| NullMap& null_map = nullable_column->get_null_map_data(); |
| size_t origin_size = null_map.size(); |
| null_map.resize(origin_size + num_rows); |
| memcpy(null_map.data() + origin_size, static_cast<bool*>(null_map_ptr), num_rows); |
| } else { |
| data_column = doris_column->assume_mutable(); |
| } |
| // Date and DateTime are deprecated and not supported. |
| switch (logical_type) { |
| //FIXME: in Doris we check data then insert. jdbc external table may have some data invalid for doris. |
| // should add check otherwise it may break some of our assumption now. |
| #define DISPATCH(TYPE_INDEX, COLUMN_TYPE, CPP_TYPE) \ |
| case TYPE_INDEX: \ |
| return _fill_fixed_length_column<COLUMN_TYPE, CPP_TYPE>( \ |
| data_column, reinterpret_cast<CPP_TYPE*>(address.next_meta_as_ptr()), num_rows); |
| FOR_FIXED_LENGTH_TYPES(DISPATCH) |
| #undef DISPATCH |
| case PrimitiveType::TYPE_STRING: |
| [[fallthrough]]; |
| case PrimitiveType::TYPE_CHAR: |
| [[fallthrough]]; |
| case PrimitiveType::TYPE_VARCHAR: |
| return _fill_string_column(address, data_column, num_rows); |
| case PrimitiveType::TYPE_ARRAY: |
| return _fill_array_column(address, data_column, data_type, num_rows); |
| case PrimitiveType::TYPE_MAP: |
| return _fill_map_column(address, data_column, data_type, num_rows); |
| case PrimitiveType::TYPE_STRUCT: |
| return _fill_struct_column(address, data_column, data_type, num_rows); |
| case PrimitiveType::TYPE_VARBINARY: |
| return _fill_varbinary_column(address, data_column, num_rows); |
| default: |
| return Status::InvalidArgument("Unsupported type {} in jni scanner", data_type->get_name()); |
| } |
| return Status::OK(); |
| } |
| |
| Status JniConnector::_fill_varbinary_column(TableMetaAddress& address, |
| MutableColumnPtr& doris_column, size_t num_rows) { |
| auto* meta_base = reinterpret_cast<char*>(address.next_meta_as_ptr()); |
| auto& varbinary_col = assert_cast<ColumnVarbinary&>(*doris_column); |
| // Java side writes per-row metadata as 16 bytes: [len: long][addr: long] |
| for (size_t i = 0; i < num_rows; ++i) { |
| // Read length (first 8 bytes) |
| int64_t len = 0; |
| memcpy(&len, meta_base + 16 * i, sizeof(len)); |
| if (len <= 0) { |
| varbinary_col.insert_default(); |
| } else { |
| // Read address (next 8 bytes) |
| uint64_t addr_u = 0; |
| memcpy(&addr_u, meta_base + 16 * i + 8, sizeof(addr_u)); |
| const char* src = reinterpret_cast<const char*>(addr_u); |
| varbinary_col.insert_data(src, static_cast<size_t>(len)); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status JniConnector::_fill_string_column(TableMetaAddress& address, MutableColumnPtr& doris_column, |
| size_t num_rows) { |
| auto& string_col = static_cast<ColumnString&>(*doris_column); |
| ColumnString::Chars& string_chars = string_col.get_chars(); |
| ColumnString::Offsets& string_offsets = string_col.get_offsets(); |
| int* offsets = reinterpret_cast<int*>(address.next_meta_as_ptr()); |
| char* chars = reinterpret_cast<char*>(address.next_meta_as_ptr()); |
| |
| // This judgment is necessary, otherwise the following statement `offsets[num_rows - 1]` out of bounds |
| // What's more, This judgment must be placed after `address.next_meta_as_ptr()` |
| // because `address.next_meta_as_ptr` will make `address._meta_index` plus 1 |
| if (num_rows == 0) { |
| return Status::OK(); |
| } |
| |
| size_t origin_chars_size = string_chars.size(); |
| string_chars.resize(origin_chars_size + offsets[num_rows - 1]); |
| memcpy(string_chars.data() + origin_chars_size, chars, offsets[num_rows - 1]); |
| |
| size_t origin_offsets_size = string_offsets.size(); |
| size_t start_offset = string_offsets[origin_offsets_size - 1]; |
| string_offsets.resize(origin_offsets_size + num_rows); |
| for (size_t i = 0; i < num_rows; ++i) { |
| string_offsets[origin_offsets_size + i] = |
| static_cast<unsigned int>(offsets[i] + start_offset); |
| } |
| return Status::OK(); |
| } |
| |
| Status JniConnector::_fill_array_column(TableMetaAddress& address, MutableColumnPtr& doris_column, |
| const DataTypePtr& data_type, size_t num_rows) { |
| ColumnPtr& element_column = static_cast<ColumnArray&>(*doris_column).get_data_ptr(); |
| const DataTypePtr& element_type = |
| (assert_cast<const DataTypeArray*>(remove_nullable(data_type).get())) |
| ->get_nested_type(); |
| ColumnArray::Offsets64& offsets_data = static_cast<ColumnArray&>(*doris_column).get_offsets(); |
| |
| int64_t* offsets = reinterpret_cast<int64_t*>(address.next_meta_as_ptr()); |
| size_t origin_size = offsets_data.size(); |
| offsets_data.resize(origin_size + num_rows); |
| size_t start_offset = offsets_data[origin_size - 1]; |
| for (size_t i = 0; i < num_rows; ++i) { |
| offsets_data[origin_size + i] = offsets[i] + start_offset; |
| } |
| |
| // offsets[num_rows - 1] == offsets_data[origin_size + num_rows - 1] - start_offset |
| // but num_row equals 0 when there are all empty arrays |
| return _fill_column(address, element_column, element_type, |
| offsets_data[origin_size + num_rows - 1] - start_offset); |
| } |
| |
| Status JniConnector::_fill_map_column(TableMetaAddress& address, MutableColumnPtr& doris_column, |
| const DataTypePtr& data_type, size_t num_rows) { |
| auto& map = static_cast<ColumnMap&>(*doris_column); |
| const DataTypePtr& key_type = |
| reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get())->get_key_type(); |
| const DataTypePtr& value_type = |
| reinterpret_cast<const DataTypeMap*>(remove_nullable(data_type).get()) |
| ->get_value_type(); |
| ColumnPtr& key_column = map.get_keys_ptr(); |
| ColumnPtr& value_column = map.get_values_ptr(); |
| ColumnArray::Offsets64& map_offsets = map.get_offsets(); |
| |
| int64_t* offsets = reinterpret_cast<int64_t*>(address.next_meta_as_ptr()); |
| size_t origin_size = map_offsets.size(); |
| map_offsets.resize(origin_size + num_rows); |
| size_t start_offset = map_offsets[origin_size - 1]; |
| for (size_t i = 0; i < num_rows; ++i) { |
| map_offsets[origin_size + i] = offsets[i] + start_offset; |
| } |
| |
| RETURN_IF_ERROR(_fill_column(address, key_column, key_type, |
| map_offsets[origin_size + num_rows - 1] - start_offset)); |
| RETURN_IF_ERROR(_fill_column(address, value_column, value_type, |
| map_offsets[origin_size + num_rows - 1] - start_offset)); |
| return map.deduplicate_keys(); |
| } |
| |
| Status JniConnector::_fill_struct_column(TableMetaAddress& address, MutableColumnPtr& doris_column, |
| const DataTypePtr& data_type, size_t num_rows) { |
| auto& doris_struct = static_cast<ColumnStruct&>(*doris_column); |
| const DataTypeStruct* doris_struct_type = |
| reinterpret_cast<const DataTypeStruct*>(remove_nullable(data_type).get()); |
| for (int i = 0; i < doris_struct.tuple_size(); ++i) { |
| ColumnPtr& struct_field = doris_struct.get_column_ptr(i); |
| const DataTypePtr& field_type = doris_struct_type->get_element(i); |
| RETURN_IF_ERROR(_fill_column(address, struct_field, field_type, num_rows)); |
| } |
| return Status::OK(); |
| } |
| |
| void JniConnector::_generate_predicates( |
| const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { |
| if (colname_to_value_range == nullptr) { |
| return; |
| } |
| for (auto& kv : *colname_to_value_range) { |
| const std::string& column_name = kv.first; |
| const ColumnValueRangeType& col_val_range = kv.second; |
| std::visit([&](auto&& range) { _parse_value_range(range, column_name); }, col_val_range); |
| } |
| } |
| |
| std::string JniConnector::get_jni_type(const DataTypePtr& data_type) { |
| DataTypePtr type = remove_nullable(data_type); |
| std::ostringstream buffer; |
| switch (type->get_primitive_type()) { |
| case TYPE_BOOLEAN: |
| return "boolean"; |
| case TYPE_TINYINT: |
| return "tinyint"; |
| case TYPE_SMALLINT: |
| return "smallint"; |
| case TYPE_INT: |
| return "int"; |
| case TYPE_BIGINT: |
| return "bigint"; |
| case TYPE_LARGEINT: |
| return "largeint"; |
| case TYPE_FLOAT: |
| return "float"; |
| case TYPE_DOUBLE: |
| return "double"; |
| case TYPE_IPV4: |
| return "ipv4"; |
| case TYPE_IPV6: |
| return "ipv6"; |
| case TYPE_VARCHAR: |
| [[fallthrough]]; |
| case TYPE_CHAR: |
| [[fallthrough]]; |
| case TYPE_STRING: |
| return "string"; |
| case TYPE_DATE: |
| return "datev1"; |
| case TYPE_DATEV2: |
| return "datev2"; |
| case TYPE_DATETIME: |
| return "datetimev1"; |
| case TYPE_DATETIMEV2: |
| [[fallthrough]]; |
| case TYPE_TIMEV2: { |
| buffer << "datetimev2(" << type->get_scale() << ")"; |
| return buffer.str(); |
| } |
| case TYPE_BINARY: |
| return "binary"; |
| case TYPE_DECIMALV2: { |
| buffer << "decimalv2(" << DecimalV2Value::PRECISION << "," << DecimalV2Value::SCALE << ")"; |
| return buffer.str(); |
| } |
| case TYPE_DECIMAL32: { |
| buffer << "decimal32(" << type->get_precision() << "," << type->get_scale() << ")"; |
| return buffer.str(); |
| } |
| case TYPE_DECIMAL64: { |
| buffer << "decimal64(" << type->get_precision() << "," << type->get_scale() << ")"; |
| return buffer.str(); |
| } |
| case TYPE_DECIMAL128I: { |
| buffer << "decimal128(" << type->get_precision() << "," << type->get_scale() << ")"; |
| return buffer.str(); |
| } |
| case TYPE_STRUCT: { |
| const DataTypeStruct* struct_type = reinterpret_cast<const DataTypeStruct*>(type.get()); |
| buffer << "struct<"; |
| for (int i = 0; i < struct_type->get_elements().size(); ++i) { |
| if (i != 0) { |
| buffer << ","; |
| } |
| buffer << struct_type->get_element_names()[i] << ":" |
| << get_jni_type(struct_type->get_element(i)); |
| } |
| buffer << ">"; |
| return buffer.str(); |
| } |
| case TYPE_ARRAY: { |
| const DataTypeArray* array_type = reinterpret_cast<const DataTypeArray*>(type.get()); |
| buffer << "array<" << get_jni_type(array_type->get_nested_type()) << ">"; |
| return buffer.str(); |
| } |
| case TYPE_MAP: { |
| const DataTypeMap* map_type = reinterpret_cast<const DataTypeMap*>(type.get()); |
| buffer << "map<" << get_jni_type(map_type->get_key_type()) << "," |
| << get_jni_type(map_type->get_value_type()) << ">"; |
| return buffer.str(); |
| } |
| case TYPE_VARBINARY: |
| return "varbinary"; |
| default: |
| return "unsupported"; |
| } |
| } |
| |
| std::string JniConnector::get_jni_type_with_different_string(const DataTypePtr& data_type) { |
| DataTypePtr type = remove_nullable(data_type); |
| std::ostringstream buffer; |
| switch (data_type->get_primitive_type()) { |
| case TYPE_BOOLEAN: |
| return "boolean"; |
| case TYPE_TINYINT: |
| return "tinyint"; |
| case TYPE_SMALLINT: |
| return "smallint"; |
| case TYPE_INT: |
| return "int"; |
| case TYPE_BIGINT: |
| return "bigint"; |
| case TYPE_LARGEINT: |
| return "largeint"; |
| case TYPE_FLOAT: |
| return "float"; |
| case TYPE_DOUBLE: |
| return "double"; |
| case TYPE_IPV4: |
| return "ipv4"; |
| case TYPE_IPV6: |
| return "ipv6"; |
| case TYPE_VARCHAR: { |
| buffer << "varchar(" |
| << assert_cast<const DataTypeString*>(remove_nullable(data_type).get())->len() |
| << ")"; |
| return buffer.str(); |
| } |
| case TYPE_DATE: |
| return "datev1"; |
| case TYPE_DATEV2: |
| return "datev2"; |
| case TYPE_DATETIME: |
| return "datetimev1"; |
| case TYPE_DATETIMEV2: |
| [[fallthrough]]; |
| case TYPE_TIMEV2: { |
| buffer << "datetimev2(" << data_type->get_scale() << ")"; |
| return buffer.str(); |
| } |
| case TYPE_BINARY: |
| return "binary"; |
| case TYPE_CHAR: { |
| buffer << "char(" |
| << assert_cast<const DataTypeString*>(remove_nullable(data_type).get())->len() |
| << ")"; |
| return buffer.str(); |
| } |
| case TYPE_STRING: |
| return "string"; |
| case TYPE_VARBINARY: |
| buffer << "varbinary(" |
| << assert_cast<const DataTypeVarbinary*>(remove_nullable(data_type).get())->len() |
| << ")"; |
| return buffer.str(); |
| case TYPE_DECIMALV2: { |
| buffer << "decimalv2(" << DecimalV2Value::PRECISION << "," << DecimalV2Value::SCALE << ")"; |
| return buffer.str(); |
| } |
| case TYPE_DECIMAL32: { |
| buffer << "decimal32(" << data_type->get_precision() << "," << data_type->get_scale() |
| << ")"; |
| return buffer.str(); |
| } |
| case TYPE_DECIMAL64: { |
| buffer << "decimal64(" << data_type->get_precision() << "," << data_type->get_scale() |
| << ")"; |
| return buffer.str(); |
| } |
| case TYPE_DECIMAL128I: { |
| buffer << "decimal128(" << data_type->get_precision() << "," << data_type->get_scale() |
| << ")"; |
| return buffer.str(); |
| } |
| case TYPE_STRUCT: { |
| const auto* type_struct = |
| assert_cast<const DataTypeStruct*>(remove_nullable(data_type).get()); |
| buffer << "struct<"; |
| for (int i = 0; i < type_struct->get_elements().size(); ++i) { |
| if (i != 0) { |
| buffer << ","; |
| } |
| buffer << type_struct->get_element_name(i) << ":" |
| << get_jni_type_with_different_string(type_struct->get_element(i)); |
| } |
| buffer << ">"; |
| return buffer.str(); |
| } |
| case TYPE_ARRAY: { |
| const auto* type_arr = assert_cast<const DataTypeArray*>(remove_nullable(data_type).get()); |
| buffer << "array<" << get_jni_type_with_different_string(type_arr->get_nested_type()) |
| << ">"; |
| return buffer.str(); |
| } |
| case TYPE_MAP: { |
| const auto* type_map = assert_cast<const DataTypeMap*>(remove_nullable(data_type).get()); |
| buffer << "map<" << get_jni_type_with_different_string(type_map->get_key_type()) << "," |
| << get_jni_type_with_different_string(type_map->get_value_type()) << ">"; |
| return buffer.str(); |
| } |
| default: |
| return "unsupported"; |
| } |
| } |
| |
| Status JniConnector::_fill_column_meta(const ColumnPtr& doris_column, const DataTypePtr& data_type, |
| std::vector<long>& meta_data) { |
| auto logical_type = data_type->get_primitive_type(); |
| const IColumn* column = nullptr; |
| // insert const flag |
| if (is_column_const(*doris_column)) { |
| meta_data.emplace_back((long)1); |
| const auto& const_column = assert_cast<const ColumnConst&>(*doris_column); |
| column = &(const_column.get_data_column()); |
| } else { |
| meta_data.emplace_back((long)0); |
| column = &(*doris_column); |
| } |
| |
| // insert null map address |
| const IColumn* data_column = nullptr; |
| if (column->is_nullable()) { |
| const auto& nullable_column = assert_cast<const vectorized::ColumnNullable&>(*column); |
| data_column = &(nullable_column.get_nested_column()); |
| const auto& null_map = nullable_column.get_null_map_data(); |
| meta_data.emplace_back((long)null_map.data()); |
| } else { |
| meta_data.emplace_back(0); |
| data_column = column; |
| } |
| switch (logical_type) { |
| #define DISPATCH(TYPE_INDEX, COLUMN_TYPE, CPP_TYPE) \ |
| case TYPE_INDEX: { \ |
| meta_data.emplace_back(_get_fixed_length_column_address<COLUMN_TYPE>(*data_column)); \ |
| break; \ |
| } |
| FOR_FIXED_LENGTH_TYPES(DISPATCH) |
| #undef DISPATCH |
| case PrimitiveType::TYPE_STRING: |
| [[fallthrough]]; |
| case PrimitiveType::TYPE_CHAR: |
| [[fallthrough]]; |
| case PrimitiveType::TYPE_VARCHAR: { |
| const auto& string_column = assert_cast<const ColumnString&>(*data_column); |
| // inert offsets |
| meta_data.emplace_back((long)string_column.get_offsets().data()); |
| meta_data.emplace_back((long)string_column.get_chars().data()); |
| break; |
| } |
| case PrimitiveType::TYPE_ARRAY: { |
| const auto& element_column = assert_cast<const ColumnArray&>(*data_column).get_data_ptr(); |
| meta_data.emplace_back( |
| (long)assert_cast<const ColumnArray&>(*data_column).get_offsets().data()); |
| const auto& element_type = assert_cast<const DataTypePtr&>( |
| (assert_cast<const DataTypeArray*>(remove_nullable(data_type).get())) |
| ->get_nested_type()); |
| RETURN_IF_ERROR(_fill_column_meta(element_column, element_type, meta_data)); |
| break; |
| } |
| case PrimitiveType::TYPE_STRUCT: { |
| const auto& doris_struct = assert_cast<const ColumnStruct&>(*data_column); |
| const auto* doris_struct_type = |
| assert_cast<const DataTypeStruct*>(remove_nullable(data_type).get()); |
| for (int i = 0; i < doris_struct.tuple_size(); ++i) { |
| const auto& struct_field = doris_struct.get_column_ptr(i); |
| const auto& field_type = |
| assert_cast<const DataTypePtr&>(doris_struct_type->get_element(i)); |
| RETURN_IF_ERROR(_fill_column_meta(struct_field, field_type, meta_data)); |
| } |
| break; |
| } |
| case PrimitiveType::TYPE_MAP: { |
| const auto& map = assert_cast<const ColumnMap&>(*data_column); |
| const auto& key_type = assert_cast<const DataTypePtr&>( |
| assert_cast<const DataTypeMap*>(remove_nullable(data_type).get())->get_key_type()); |
| const auto& value_type = assert_cast<const DataTypePtr&>( |
| assert_cast<const DataTypeMap*>(remove_nullable(data_type).get()) |
| ->get_value_type()); |
| const auto& key_column = map.get_keys_ptr(); |
| const auto& value_column = map.get_values_ptr(); |
| meta_data.emplace_back((long)map.get_offsets().data()); |
| RETURN_IF_ERROR(_fill_column_meta(key_column, key_type, meta_data)); |
| RETURN_IF_ERROR(_fill_column_meta(value_column, value_type, meta_data)); |
| break; |
| } |
| case PrimitiveType::TYPE_VARBINARY: { |
| const auto& varbinary_col = assert_cast<const ColumnVarbinary&>(*data_column); |
| meta_data.emplace_back( |
| (long)assert_cast<const ColumnVarbinary&>(varbinary_col).get_data().data()); |
| break; |
| } |
| default: |
| return Status::InternalError("Unsupported type: {}", data_type->get_name()); |
| } |
| return Status::OK(); |
| } |
| |
| Status JniConnector::to_java_table(Block* block, std::unique_ptr<long[]>& meta) { |
| ColumnNumbers arguments; |
| for (size_t i = 0; i < block->columns(); ++i) { |
| arguments.emplace_back(i); |
| } |
| return to_java_table(block, block->rows(), arguments, meta); |
| } |
| |
| Status JniConnector::to_java_table(Block* block, size_t num_rows, const ColumnNumbers& arguments, |
| std::unique_ptr<long[]>& meta) { |
| std::vector<long> meta_data; |
| // insert number of rows |
| meta_data.emplace_back(num_rows); |
| for (size_t i : arguments) { |
| auto& column_with_type_and_name = block->get_by_position(i); |
| RETURN_IF_ERROR(_fill_column_meta(column_with_type_and_name.column, |
| column_with_type_and_name.type, meta_data)); |
| } |
| |
| meta.reset(new long[meta_data.size()]); |
| memcpy(meta.get(), &meta_data[0], meta_data.size() * 8); |
| return Status::OK(); |
| } |
| |
| std::pair<std::string, std::string> JniConnector::parse_table_schema(Block* block, |
| const ColumnNumbers& arguments, |
| bool ignore_column_name) { |
| // prepare table schema |
| std::ostringstream required_fields; |
| std::ostringstream columns_types; |
| for (int i = 0; i < arguments.size(); ++i) { |
| // column name maybe empty or has special characters |
| // std::string field = block->get_by_position(i).name; |
| std::string type = JniConnector::get_jni_type(block->get_by_position(arguments[i]).type); |
| if (i == 0) { |
| if (ignore_column_name) { |
| required_fields << "_col_" << arguments[i]; |
| } else { |
| required_fields << block->get_by_position(arguments[i]).name; |
| } |
| columns_types << type; |
| } else { |
| if (ignore_column_name) { |
| required_fields << "," |
| << "_col_" << arguments[i]; |
| } else { |
| required_fields << "," << block->get_by_position(arguments[i]).name; |
| } |
| columns_types << "#" << type; |
| } |
| } |
| return std::make_pair(required_fields.str(), columns_types.str()); |
| } |
| |
| std::pair<std::string, std::string> JniConnector::parse_table_schema(Block* block) { |
| ColumnNumbers arguments; |
| for (size_t i = 0; i < block->columns(); ++i) { |
| arguments.emplace_back(i); |
| } |
| return parse_table_schema(block, arguments, true); |
| } |
| |
| void JniConnector::_collect_profile_before_close() { |
| if (_scanner_opened && _profile != nullptr) { |
| JNIEnv* env = nullptr; |
| Status st = JniUtil::GetJNIEnv(&env); |
| if (!st) { |
| LOG(WARNING) << "failed to get jni env when collect profile: " << st; |
| return; |
| } |
| // update scanner metrics |
| std::map<std::string, std::string> statistics_result; |
| st = get_statistics(env, &statistics_result); |
| if (!st) { |
| LOG(WARNING) << "failed to get_statistics when collect profile: " << st; |
| return; |
| } |
| |
| for (const auto& metric : statistics_result) { |
| std::vector<std::string> type_and_name = split(metric.first, ":"); |
| if (type_and_name.size() != 2) { |
| LOG(WARNING) << "Name of JNI Scanner metric should be pattern like " |
| << "'metricType:metricName'"; |
| continue; |
| } |
| long metric_value = std::stol(metric.second); |
| RuntimeProfile::Counter* scanner_counter; |
| if (type_and_name[0] == "timer") { |
| scanner_counter = |
| ADD_CHILD_TIMER(_profile, type_and_name[1], _connector_name.c_str()); |
| } else if (type_and_name[0] == "counter") { |
| scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::UNIT, |
| _connector_name.c_str()); |
| } else if (type_and_name[0] == "bytes") { |
| scanner_counter = ADD_CHILD_COUNTER(_profile, type_and_name[1], TUnit::BYTES, |
| _connector_name.c_str()); |
| } else { |
| LOG(WARNING) << "Type of JNI Scanner metric should be timer, counter or bytes"; |
| continue; |
| } |
| COUNTER_UPDATE(scanner_counter, metric_value); |
| } |
| } |
| } |
| #include "common/compile_check_end.h" |
| } // namespace doris::vectorized |