blob: 2df5c17bbbd210d6b5dcaa3ed6ebb922004de7d9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "hudi_jni_reader.h"
#include <map>
#include "core/types.h"
#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
namespace doris {
class RuntimeProfile;
class RuntimeState;
class Block;
} // namespace doris
namespace doris {
#include "common/compile_check_begin.h"
const std::string HudiJniReader::HOODIE_CONF_PREFIX = "hoodie.";
const std::string HudiJniReader::HADOOP_CONF_PREFIX = "hadoop_conf.";
HudiJniReader::HudiJniReader(const TFileScanRangeParams& scan_params,
const THudiFileDesc& hudi_params,
const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state, RuntimeProfile* profile)
: JniReader(
file_slot_descs, state, profile, "org/apache/doris/hudi/HadoopHudiJniScanner",
[&]() {
std::vector<std::string> required_fields;
for (const auto& desc : file_slot_descs) {
required_fields.emplace_back(desc->col_name());
}
std::map<String, String> params = {
{"query_id", print_id(state->query_id())},
{"base_path", hudi_params.base_path},
{"data_file_path", hudi_params.data_file_path},
{"data_file_length", std::to_string(hudi_params.data_file_length)},
{"delta_file_paths", join(hudi_params.delta_logs, ",")},
{"hudi_column_names", join(hudi_params.column_names, ",")},
{"hudi_column_types", join(hudi_params.column_types, "#")},
{"required_fields", join(required_fields, ",")},
{"instant_time", hudi_params.instant_time},
{"serde", hudi_params.serde},
{"input_format", hudi_params.input_format},
{"time_zone", state->timezone_obj().name()}};
for (const auto& kv : scan_params.properties) {
if (kv.first.starts_with(HOODIE_CONF_PREFIX)) {
params[kv.first] = kv.second;
} else {
params[HADOOP_CONF_PREFIX + kv.first] = kv.second;
}
}
return params;
}(),
[&]() {
std::vector<std::string> names;
for (const auto& desc : file_slot_descs) {
names.emplace_back(desc->col_name());
}
return names;
}()) {}
Status HudiJniReader::init_reader() {
return open(_state, _profile);
}
#include "common/compile_check_end.h"
} // namespace doris