| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "vec/runtime/vjni_format_transformer.h" |
| |
| #include "runtime/runtime_state.h" |
| #include "vec/exec/jni_connector.h" |
| |
| namespace doris::vectorized { |
| |
| VJniFormatTransformer::VJniFormatTransformer(RuntimeState* state, |
| const VExprContextSPtrs& output_vexpr_ctxs, |
| std::string writer_class, |
| std::map<std::string, std::string> writer_params) |
| : VFileFormatTransformer(state, output_vexpr_ctxs, false), |
| _writer_class(std::move(writer_class)), |
| _writer_params(std::move(writer_params)) {} |
| |
| Status VJniFormatTransformer::_init_jni_writer(JNIEnv* env, int batch_size) { |
| // Load writer class via the same class loader as JniScanner |
| Jni::GlobalClass jni_writer_cls; |
| RETURN_IF_ERROR(Jni::Util::get_jni_scanner_class(env, _writer_class.c_str(), &jni_writer_cls)); |
| |
| // Get constructor: (int batchSize, Map<String,String> params) |
| Jni::MethodId writer_constructor; |
| RETURN_IF_ERROR( |
| jni_writer_cls.get_method(env, "<init>", "(ILjava/util/Map;)V", &writer_constructor)); |
| |
| // Convert C++ params map to Java HashMap |
| Jni::LocalObject hashmap_object; |
| RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, _writer_params, &hashmap_object)); |
| |
| // Create writer instance |
| RETURN_IF_ERROR(jni_writer_cls.new_object(env, writer_constructor) |
| .with_arg((jint)batch_size) |
| .with_arg(hashmap_object) |
| .call(&_jni_writer_obj)); |
| |
| // Resolve method IDs |
| RETURN_IF_ERROR(jni_writer_cls.get_method(env, "open", "()V", &_jni_writer_open)); |
| RETURN_IF_ERROR( |
| jni_writer_cls.get_method(env, "write", "(Ljava/util/Map;)V", &_jni_writer_write)); |
| RETURN_IF_ERROR(jni_writer_cls.get_method(env, "close", "()V", &_jni_writer_close)); |
| RETURN_IF_ERROR(jni_writer_cls.get_method(env, "getStatistics", "()Ljava/util/Map;", |
| &_jni_writer_get_statistics)); |
| return Status::OK(); |
| } |
| |
| Status VJniFormatTransformer::open() { |
| JNIEnv* env = nullptr; |
| RETURN_IF_ERROR(Jni::Env::Get(&env)); |
| |
| int batch_size = _state->batch_size(); |
| RETURN_IF_ERROR(_init_jni_writer(env, batch_size)); |
| |
| RETURN_IF_ERROR(_jni_writer_obj.call_void_method(env, _jni_writer_open).call()); |
| RETURN_ERROR_IF_EXC(env); |
| |
| _opened = true; |
| return Status::OK(); |
| } |
| |
| Status VJniFormatTransformer::write(const Block& block) { |
| if (block.rows() == 0) { |
| return Status::OK(); |
| } |
| |
| JNIEnv* env = nullptr; |
| RETURN_IF_ERROR(Jni::Env::Get(&env)); |
| |
| // 1. Convert Block to Java table metadata (column addresses) |
| Block* mutable_block = const_cast<Block*>(&block); |
| std::unique_ptr<long[]> input_table; |
| RETURN_IF_ERROR(JniConnector::to_java_table(mutable_block, input_table)); |
| |
| // 2. Cache schema on first call |
| if (!_schema_cached) { |
| auto schema = JniConnector::parse_table_schema(mutable_block); |
| _cached_required_fields = schema.first; |
| _cached_columns_types = schema.second; |
| _schema_cached = true; |
| } |
| |
| // 3. Build input params map for Java writer |
| std::map<std::string, std::string> input_params = { |
| {"meta_address", std::to_string((long)input_table.get())}, |
| {"required_fields", _cached_required_fields}, |
| {"columns_types", _cached_columns_types}}; |
| |
| // 4. Convert to Java Map and call writer.write(inputParams) |
| Jni::LocalObject input_map; |
| RETURN_IF_ERROR(Jni::Util::convert_to_java_map(env, input_params, &input_map)); |
| |
| RETURN_IF_ERROR( |
| _jni_writer_obj.call_void_method(env, _jni_writer_write).with_arg(input_map).call()); |
| RETURN_ERROR_IF_EXC(env); |
| |
| _cur_written_rows += block.rows(); |
| return Status::OK(); |
| } |
| |
| Status VJniFormatTransformer::close() { |
| if (_closed || !_opened) { |
| return Status::OK(); |
| } |
| _closed = true; |
| |
| JNIEnv* env = nullptr; |
| RETURN_IF_ERROR(Jni::Env::Get(&env)); |
| |
| RETURN_IF_ERROR(_jni_writer_obj.call_void_method(env, _jni_writer_close).call()); |
| RETURN_ERROR_IF_EXC(env); |
| |
| return Status::OK(); |
| } |
| |
| int64_t VJniFormatTransformer::written_len() { |
| // JNI writer manages file size on Java side; return 0 to disable C++ auto-split. |
| return 0; |
| } |
| |
| } // namespace doris::vectorized |