| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <jni.h> |
| #include <string.h> |
| |
| #include <map> |
| #include <memory> |
| #include <string> |
| #include <unordered_map> |
| #include <utility> |
| #include <vector> |
| |
| #include "common/status.h" |
| #include "exec/olap_common.h" |
| #include "exec/olap_utils.h" |
| #include "runtime/define_primitive_type.h" |
| #include "runtime/primitive_type.h" |
| #include "runtime/types.h" |
| #include "util/jni-util.h" |
| #include "util/profile_collector.h" |
| #include "util/runtime_profile.h" |
| #include "util/string_util.h" |
| #include "vec/aggregate_functions/aggregate_function.h" |
| #include "vec/common/string_ref.h" |
| #include "vec/data_types/data_type.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| class RuntimeState; |
| |
| namespace vectorized { |
| class Block; |
| template <PrimitiveType T> |
| class ColumnDecimal; |
| template <PrimitiveType T> |
| class ColumnVector; |
| } // namespace vectorized |
| } // namespace doris |
| |
| namespace doris::vectorized { |
| |
| /** |
| * Connector to java jni scanner, which should extend org.apache.doris.common.jni.JniScanner |
| */ |
| class JniConnector : public ProfileCollector { |
| public: |
| class TableMetaAddress { |
| private: |
| long* _meta_ptr; |
| int _meta_index; |
| |
| public: |
| TableMetaAddress() { |
| _meta_ptr = nullptr; |
| _meta_index = 0; |
| } |
| |
| TableMetaAddress(long meta_addr) { |
| _meta_ptr = static_cast<long*>(reinterpret_cast<void*>(meta_addr)); |
| _meta_index = 0; |
| } |
| |
| void set_meta(long meta_addr) { |
| _meta_ptr = static_cast<long*>(reinterpret_cast<void*>(meta_addr)); |
| _meta_index = 0; |
| } |
| |
| long next_meta_as_long() { return _meta_ptr[_meta_index++]; } |
| |
| void* next_meta_as_ptr() { return reinterpret_cast<void*>(_meta_ptr[_meta_index++]); } |
| }; |
| |
| /** |
| * The predicates that can be pushed down to java side. |
| * Reference to java class org.apache.doris.common.jni.vec.ScanPredicate |
| */ |
| template <typename CppType> |
| struct ScanPredicate { |
| ScanPredicate() = default; |
| ~ScanPredicate() = default; |
| std::string column_name; |
| SQLFilterOp op; |
| std::vector<const CppType*> values; |
| int scale; |
| |
| ScanPredicate(const std::string column_name) : column_name(std::move(column_name)) {} |
| |
| ScanPredicate(const ScanPredicate& other) |
| : column_name(other.column_name), op(other.op), scale(other.scale) { |
| for (auto v : other.values) { |
| values.emplace_back(v); |
| } |
| } |
| |
| int length() { |
| // name_length(4) + column_name + operator(4) + scale(4) + num_values(4) |
| int len = 4 + static_cast<int>(column_name.size()) + 4 + 4 + 4; |
| if constexpr (std::is_same_v<CppType, StringRef>) { |
| for (const StringRef* s : values) { |
| // string_length(4) + string |
| len += static_cast<int>(4 + s->size); |
| } |
| } else { |
| int type_len = sizeof(CppType); |
| // value_length(4) + value |
| len += static_cast<int>((4 + type_len) * values.size()); |
| } |
| return len; |
| } |
| |
| /** |
| * The value ranges can be stored as byte array as following format: |
| * number_filters(4) | length(4) | column_name | op(4) | scale(4) | num_values(4) | value_length(4) | value | ... |
| * The read method is implemented in org.apache.doris.common.jni.vec.ScanPredicate#parseScanPredicates |
| */ |
| int write(std::unique_ptr<char[]>& predicates, int origin_length) { |
| int num_filters = 0; |
| if (origin_length != 0) { |
| num_filters = *reinterpret_cast<int*>(predicates.get()); |
| } else { |
| origin_length = 4; |
| } |
| num_filters += 1; |
| int new_length = origin_length + length(); |
| char* new_bytes = new char[new_length]; |
| if (origin_length != 4) { |
| memcpy(new_bytes, predicates.get(), origin_length); |
| } |
| *reinterpret_cast<int*>(new_bytes) = num_filters; |
| |
| char* char_ptr = new_bytes + origin_length; |
| *reinterpret_cast<int*>(char_ptr) = static_cast<int>(column_name.size()); |
| char_ptr += 4; |
| memcpy(char_ptr, column_name.data(), column_name.size()); |
| char_ptr += static_cast<int>(column_name.size()); |
| *reinterpret_cast<int*>(char_ptr) = op; |
| char_ptr += 4; |
| *reinterpret_cast<int*>(char_ptr) = scale; |
| char_ptr += 4; |
| *reinterpret_cast<int*>(char_ptr) = static_cast<int>(values.size()); |
| char_ptr += 4; |
| if constexpr (std::is_same_v<CppType, StringRef>) { |
| for (const StringRef* s : values) { |
| *reinterpret_cast<int*>(char_ptr) = static_cast<int>(s->size); |
| char_ptr += 4; |
| memcpy(char_ptr, s->data, s->size); |
| char_ptr += static_cast<int>(s->size); |
| } |
| } else { |
| // FIXME: it can not handle decimal type correctly. |
| // but this logic is deprecated and not used. |
| // so may be deleted or fixed later. |
| for (const CppType* v : values) { |
| int type_len = sizeof(CppType); |
| *reinterpret_cast<int*>(char_ptr) = type_len; |
| char_ptr += 4; |
| *reinterpret_cast<CppType*>(char_ptr) = *v; |
| char_ptr += type_len; |
| } |
| } |
| |
| predicates.reset(new_bytes); |
| return new_length; |
| } |
| }; |
| |
| /** |
| * Use configuration map to provide scan information. The java side should determine how the parameters |
| * are parsed. For example, using "required_fields=col0,col1,...,colN" to provide the scan fields. |
| * @param connector_class Java scanner class |
| * @param scanner_params Provided configuration map |
| * @param column_names Fields to read, also the required_fields in scanner_params |
| */ |
| JniConnector(std::string connector_class, std::map<std::string, std::string> scanner_params, |
| std::vector<std::string> column_names, int64_t self_split_weight = -1) |
| : _connector_class(std::move(connector_class)), |
| _scanner_params(std::move(scanner_params)), |
| _column_names(std::move(column_names)), |
| _self_split_weight(static_cast<int32_t>(self_split_weight)) { |
| // Use java class name as connector name |
| _connector_name = split(_connector_class, "/").back(); |
| } |
| |
| /** |
| * Just use to get the table schema. |
| * @param connector_class Java scanner class |
| * @param scanner_params Provided configuration map |
| */ |
| JniConnector(std::string connector_class, std::map<std::string, std::string> scanner_params) |
| : _connector_class(std::move(connector_class)), |
| _scanner_params(std::move(scanner_params)) { |
| _is_table_schema = true; |
| } |
| |
| ~JniConnector() override = default; |
| |
| /** |
| * Open java scanner, and get the following scanner methods by jni: |
| * 1. getNextBatchMeta: read next batch and return the address of meta information |
| * 2. close: close java scanner, and release jni resources |
| * 3. releaseColumn: release a single column |
| * 4. releaseTable: release current batch, which will also release columns and meta information |
| */ |
| Status open(RuntimeState* state, RuntimeProfile* profile); |
| |
| /** |
| * Should call before open, parse the pushed down filters. The value ranges can be stored as byte array in heap: |
| * number_filters(4) | length(4) | column_name | op(4) | scale(4) | num_values(4) | value_length(4) | value | ... |
| * Then, pass the byte array address in configuration map, like "push_down_predicates=${address}" |
| */ |
| Status init(); |
| |
| /** |
| * Call java side function JniScanner.getNextBatchMeta. The columns information are stored as long array: |
| * | number of rows | |
| * | null indicator start address of fixed length column-A | |
| * | data column start address of the fixed length column-A | |
| * | ... | |
| * | null indicator start address of variable length column-B | |
| * | offset column start address of the variable length column-B | |
| * | data column start address of the variable length column-B | |
| * | ... | |
| */ |
| Status get_next_block(Block* block, size_t* read_rows, bool* eof); |
| |
| /** |
| * Get performance metrics from java scanner |
| */ |
| Status get_statistics(JNIEnv* env, std::map<std::string, std::string>* result); |
| |
| /** |
| * Call java side function JniScanner.getTableSchema. |
| * |
| * The schema information are stored as json format |
| */ |
| Status get_table_schema(std::string& table_schema_str); |
| |
| /** |
| * Close scanner and release jni resources. |
| */ |
| Status close(); |
| |
| /** |
| * Set column name to block index map from FileScanner to avoid repeated map creation. |
| */ |
| void set_col_name_to_block_idx( |
| const std::unordered_map<std::string, uint32_t>* col_name_to_block_idx) { |
| _col_name_to_block_idx = col_name_to_block_idx; |
| } |
| |
| static std::string get_jni_type(const DataTypePtr& data_type); |
| static std::string get_jni_type_with_different_string(const DataTypePtr& data_type); |
| |
| static Status to_java_table(Block* block, size_t num_rows, const ColumnNumbers& arguments, |
| std::unique_ptr<long[]>& meta); |
| |
| static Status to_java_table(Block* block, std::unique_ptr<long[]>& meta); |
| |
| static std::pair<std::string, std::string> parse_table_schema(Block* block, |
| const ColumnNumbers& arguments, |
| bool ignore_column_name = true); |
| |
| static std::pair<std::string, std::string> parse_table_schema(Block* block); |
| |
| static Status fill_block(Block* block, const ColumnNumbers& arguments, long table_address); |
| |
| protected: |
| void _collect_profile_before_close() override; |
| |
| private: |
| std::string _connector_name; |
| std::string _connector_class; |
| std::map<std::string, std::string> _scanner_params; |
| std::vector<std::string> _column_names; |
| int32_t _self_split_weight; |
| bool _is_table_schema = false; |
| |
| RuntimeState* _state = nullptr; |
| RuntimeProfile* _profile = nullptr; |
| RuntimeProfile::Counter* _open_scanner_time = nullptr; |
| RuntimeProfile::Counter* _java_scan_time = nullptr; |
| RuntimeProfile::Counter* _java_append_data_time = nullptr; |
| RuntimeProfile::Counter* _java_create_vector_table_time = nullptr; |
| RuntimeProfile::Counter* _fill_block_time = nullptr; |
| std::map<std::string, RuntimeProfile::Counter*> _scanner_profile; |
| RuntimeProfile::ConditionCounter* _max_time_split_weight_counter = nullptr; |
| |
| int64_t _jni_scanner_open_watcher = 0; |
| int64_t _java_scan_watcher = 0; |
| int64_t _fill_block_watcher = 0; |
| |
| size_t _has_read = 0; |
| |
| bool _closed = false; |
| bool _scanner_opened = false; |
| |
| Jni::GlobalClass _jni_scanner_cls; |
| Jni::GlobalObject _jni_scanner_obj; |
| Jni::MethodId _jni_scanner_open; |
| Jni::MethodId _jni_scanner_get_append_data_time; |
| Jni::MethodId _jni_scanner_get_create_vector_table_time; |
| Jni::MethodId _jni_scanner_get_next_batch; |
| Jni::MethodId _jni_scanner_get_table_schema; |
| Jni::MethodId _jni_scanner_close; |
| Jni::MethodId _jni_scanner_release_column; |
| Jni::MethodId _jni_scanner_release_table; |
| Jni::MethodId _jni_scanner_get_statistics; |
| |
| TableMetaAddress _table_meta; |
| |
| int _predicates_length = 0; |
| std::unique_ptr<char[]> _predicates; |
| |
| // Column name to block index map, passed from FileScanner to avoid repeated map creation |
| const std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = nullptr; |
| |
| /** |
| * Set the address of meta information, which is returned by org.apache.doris.common.jni.JniScanner#getNextBatchMeta |
| */ |
| void _set_meta(long meta_addr) { _table_meta.set_meta(meta_addr); } |
| |
| Status _init_jni_scanner(JNIEnv* env, int batch_size); |
| |
| Status _fill_block(Block* block, size_t num_rows); |
| |
| static Status _fill_column(TableMetaAddress& address, ColumnPtr& doris_column, |
| const DataTypePtr& data_type, size_t num_rows); |
| |
| static Status _fill_string_column(TableMetaAddress& address, MutableColumnPtr& doris_column, |
| size_t num_rows); |
| |
| static Status _fill_varbinary_column(TableMetaAddress& address, MutableColumnPtr& doris_column, |
| size_t num_rows); |
| |
| static Status _fill_map_column(TableMetaAddress& address, MutableColumnPtr& doris_column, |
| const DataTypePtr& data_type, size_t num_rows); |
| |
| static Status _fill_array_column(TableMetaAddress& address, MutableColumnPtr& doris_column, |
| const DataTypePtr& data_type, size_t num_rows); |
| |
| static Status _fill_struct_column(TableMetaAddress& address, MutableColumnPtr& doris_column, |
| const DataTypePtr& data_type, size_t num_rows); |
| |
| static Status _fill_column_meta(const ColumnPtr& doris_column, const DataTypePtr& data_type, |
| std::vector<long>& meta_data); |
| |
| template <typename COLUMN_TYPE, typename CPP_TYPE> |
| requires(!std::is_same_v<COLUMN_TYPE, ColumnDecimal128V2> && |
| !std::is_same_v<COLUMN_TYPE, ColumnDate> && |
| !std::is_same_v<COLUMN_TYPE, ColumnDateTime> && |
| !std::is_same_v<COLUMN_TYPE, ColumnDateV2> && |
| !std::is_same_v<COLUMN_TYPE, ColumnDateTimeV2> && |
| !std::is_same_v<COLUMN_TYPE, ColumnTimeStampTz>) |
| static Status _fill_fixed_length_column(MutableColumnPtr& doris_column, CPP_TYPE* ptr, |
| size_t num_rows) { |
| auto& column_data = assert_cast<COLUMN_TYPE&>(*doris_column).get_data(); |
| size_t origin_size = column_data.size(); |
| column_data.resize(origin_size + num_rows); |
| memcpy(column_data.data() + origin_size, ptr, sizeof(CPP_TYPE) * num_rows); |
| return Status::OK(); |
| } |
| |
| template <typename COLUMN_TYPE, typename CPP_TYPE> |
| requires(std::is_same_v<COLUMN_TYPE, ColumnDate> || |
| std::is_same_v<COLUMN_TYPE, ColumnDateTime>) |
| static Status _fill_fixed_length_column(MutableColumnPtr& doris_column, CPP_TYPE* ptr, |
| size_t num_rows) { |
| auto& column_data = assert_cast<COLUMN_TYPE&>(*doris_column).get_data(); |
| size_t origin_size = column_data.size(); |
| column_data.resize(origin_size + num_rows); |
| memcpy((int64_t*)column_data.data() + origin_size, ptr, sizeof(CPP_TYPE) * num_rows); |
| return Status::OK(); |
| } |
| |
| template <typename COLUMN_TYPE, typename CPP_TYPE> |
| requires(std::is_same_v<COLUMN_TYPE, ColumnDateV2>) |
| static Status _fill_fixed_length_column(MutableColumnPtr& doris_column, CPP_TYPE* ptr, |
| size_t num_rows) { |
| auto& column_data = assert_cast<COLUMN_TYPE&>(*doris_column).get_data(); |
| size_t origin_size = column_data.size(); |
| column_data.resize(origin_size + num_rows); |
| memcpy((uint32_t*)column_data.data() + origin_size, ptr, sizeof(CPP_TYPE) * num_rows); |
| return Status::OK(); |
| } |
| |
| template <typename COLUMN_TYPE, typename CPP_TYPE> |
| requires(std::is_same_v<COLUMN_TYPE, ColumnDateTimeV2> || |
| std::is_same_v<COLUMN_TYPE, ColumnTimeStampTz>) |
| static Status _fill_fixed_length_column(MutableColumnPtr& doris_column, CPP_TYPE* ptr, |
| size_t num_rows) { |
| auto& column_data = assert_cast<COLUMN_TYPE&>(*doris_column).get_data(); |
| size_t origin_size = column_data.size(); |
| column_data.resize(origin_size + num_rows); |
| memcpy((uint64_t*)column_data.data() + origin_size, ptr, sizeof(CPP_TYPE) * num_rows); |
| return Status::OK(); |
| } |
| |
| template <typename COLUMN_TYPE, typename CPP_TYPE> |
| requires(std::is_same_v<COLUMN_TYPE, ColumnDecimal128V2>) |
| static Status _fill_fixed_length_column(MutableColumnPtr& doris_column, CPP_TYPE* ptr, |
| size_t num_rows) { |
| auto& column_data = assert_cast<COLUMN_TYPE&>(*doris_column).get_data(); |
| size_t origin_size = column_data.size(); |
| column_data.resize(origin_size + num_rows); |
| for (size_t i = 0; i < num_rows; i++) { |
| column_data[origin_size + i] = DecimalV2Value(ptr[i]); |
| } |
| return Status::OK(); |
| } |
| |
| template <typename COLUMN_TYPE> |
| static long _get_fixed_length_column_address(const IColumn& doris_column) { |
| return (long)assert_cast<const COLUMN_TYPE&>(doris_column).get_data().data(); |
| } |
| |
| template <PrimitiveType primitive_type> |
| void _parse_value_range(const ColumnValueRange<primitive_type>& col_val_range, |
| const std::string& column_name) { |
| using CppType = std::conditional_t<primitive_type == TYPE_HLL, StringRef, |
| typename PrimitiveTypeTraits<primitive_type>::CppType>; |
| |
| if (col_val_range.is_fixed_value_range()) { |
| ScanPredicate<CppType> in_predicate(column_name); |
| in_predicate.op = SQLFilterOp::FILTER_IN; |
| in_predicate.scale = col_val_range.scale(); |
| for (const auto& value : col_val_range.get_fixed_value_set()) { |
| in_predicate.values.emplace_back(&value); |
| } |
| if (!in_predicate.values.empty()) { |
| _predicates_length = in_predicate.write(_predicates, _predicates_length); |
| } |
| return; |
| } |
| |
| const CppType high_value = col_val_range.get_range_max_value(); |
| const CppType low_value = col_val_range.get_range_min_value(); |
| const SQLFilterOp high_op = col_val_range.get_range_high_op(); |
| const SQLFilterOp low_op = col_val_range.get_range_low_op(); |
| |
| // orc can only push down is_null. When col_value_range._contain_null = true, only indicating that |
| // value can be null, not equals null, so ignore _contain_null in col_value_range |
| if (col_val_range.is_high_value_maximum() && high_op == SQLFilterOp::FILTER_LESS_OR_EQUAL && |
| col_val_range.is_low_value_minimum() && low_op == SQLFilterOp::FILTER_LARGER_OR_EQUAL) { |
| return; |
| } |
| |
| if (low_value < high_value) { |
| if (!col_val_range.is_low_value_minimum() || |
| SQLFilterOp::FILTER_LARGER_OR_EQUAL != low_op) { |
| ScanPredicate<CppType> low_predicate(column_name); |
| low_predicate.scale = col_val_range.scale(); |
| low_predicate.op = low_op; |
| low_predicate.values.emplace_back(col_val_range.get_range_min_value_ptr()); |
| _predicates_length = low_predicate.write(_predicates, _predicates_length); |
| } |
| if (!col_val_range.is_high_value_maximum() || |
| SQLFilterOp::FILTER_LESS_OR_EQUAL != high_op) { |
| ScanPredicate<CppType> high_predicate(column_name); |
| high_predicate.scale = col_val_range.scale(); |
| high_predicate.op = high_op; |
| high_predicate.values.emplace_back(col_val_range.get_range_max_value_ptr()); |
| _predicates_length = high_predicate.write(_predicates, _predicates_length); |
| } |
| } |
| } |
| }; |
| #include "common/compile_check_end.h" |
| } // namespace doris::vectorized |