blob: 1d9b6b5768f360396979a90a39917917862e17c7 [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <CLucene.h>
#include <CLucene/config/repl_wchar.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/olap_file.pb.h>
#include <gflags/gflags.h>
#include <filesystem>
#include <fstream>
#include <iostream>
#include <memory>
#include <nlohmann/json.hpp>
#include <roaring/roaring.hh>
#include <sstream>
#include <string>
#include <vector>
#include "io/fs/file_reader.h"
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wshadow-field"
#endif
#include "CLucene/analysis/standard95/StandardAnalyzer.h"
#ifdef __clang__
#pragma clang diagnostic pop
#endif
#include "gutil/strings/strip.h"
#include "io/fs/local_file_system.h"
#include "olap/rowset/segment_v2/inverted_index/query/conjunction_query.h"
#include "olap/rowset/segment_v2/inverted_index_compound_reader.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/inverted_index_file_reader.h"
#include "olap/rowset/segment_v2/x_index_file_writer.h"
#include "olap/rowset/segment_v2/inverted_index_fs_directory.h"
#include "olap/tablet_schema.h"
#include "io/fs/file_system.h"
#include "vector/diskann_vector_index.h"
#include "vector/vector_index.h"
#include "olap/rowset/segment_v2/ann_index_writer.h"
#include "olap/options.h"
#include "common/signal_handler.h"
#include "util/disk_info.h"
#include "util/mem_info.h"
#include "io/fs/file_writer.h"
using doris::segment_v2::DorisCompoundReader;
using doris::segment_v2::DorisFSDirectoryFactory;
using doris::segment_v2::XIndexFileWriter;
using doris::segment_v2::InvertedIndexDescriptor;
using doris::segment_v2::InvertedIndexFileReader;
using doris::io::FileInfo;
using doris::TabletIndex;
using namespace doris::segment_v2;
using namespace lucene::analysis;
using namespace lucene::index;
using namespace lucene::util;
using namespace lucene::search;
using doris::io::FileSystem;
#include "io/fs/path.h"
const std::string& file_dir = "/home/users/clz/run/test_diskann/123456_0.idx";
std::filesystem::path index_data_path(file_dir);
int index_id = 100;
std::string rowset_id="rowset_id";
int seg_id = 0;
std::shared_ptr<FileSystem> get_local_file_filesystem(){
return doris::io::global_local_filesystem();
}
void test_add(){
auto fs = get_local_file_filesystem();
int index_id = 100;
doris::io::FileWriterPtr file_writer;
auto st = doris::io::global_local_filesystem()->create_file(file_dir, &file_writer);
if(!st.ok()){
std::cerr<<"failed create_file" << file_dir << std::endl;
}
std::unique_ptr<XIndexFileWriter> index_file_writer = std::make_unique<XIndexFileWriter>(
fs,
index_data_path.parent_path(),
rowset_id, seg_id, doris::InvertedIndexStorageFormatPB::V2, std::move(file_writer));
doris::TabletIndexPB index_pb;
index_pb.set_index_id(index_id);
TabletIndex index_meta;
index_meta.init_from_pb(index_pb);
index_meta._index_type = doris::IndexType::ANN;
index_meta._properties["index_type"]="diskann";
index_meta._properties["metric_type"]="l2";
index_meta._properties["dim"]="7";
index_meta._properties["max_degree"]="32";
index_meta._properties["search_list"]="100";
std::string field_name="word_embeding";
std::unique_ptr<AnnIndexColumnWriter> ann_writer = std::make_unique<AnnIndexColumnWriter>(
field_name, index_file_writer.get(), &index_meta, true);
st = ann_writer->init();
if (!st.ok()) {
std::cout << "failed to ann_writer->init()" << std::endl;
return ;
}
//writer
int field_size = 4;
float value[14]={1,2,3,4,5,6,7,7,6,5,4,3,2,1};
int64_t offsets[3]={0,7,14};
st = ann_writer->add_array_values(field_size, (const void*)value, nullptr, (const uint8_t*)offsets, 2);
if (!st.ok()) {
std::cout << "failed to ann_writer->add_array_values" << std::endl;
return ;
}
//finish
st = ann_writer->finish();
if (!st.ok()) {
std::cout << "failed to ann_writer->finish" << std::endl;
return ;
}
//save to disk
st = index_file_writer->close();
if (!st.ok()) {
std::cout << "failed to indexwriter->close" << std::endl;
return ;
}
}
void init_env(){
doris::CpuInfo::init();
doris::DiskInfo::init();
doris::MemInfo::init();
string custom_conffile = "/home/users/clz/run/be_1.conf";
if (!doris::config::init(custom_conffile.c_str(), true, false, true)) {
fprintf(stderr, "error read custom config file. \n");
return ;
}
std::vector<doris::StorePath> paths;
std::string storage="/home/users/clz/run/storage";
std::string spill="/home/users/clz/run/splill";
std::string broken_storage_path="/home/users/clz/run/broken";
auto olap_res = doris::parse_conf_store_paths(storage, &paths);
if (!olap_res) {
LOG(ERROR) << "parse config storage path failed, path=" << storage;
exit(-1);
}
std::vector<doris::StorePath> spill_paths;
olap_res = doris::parse_conf_store_paths(spill, &spill_paths);
if (!olap_res) {
LOG(ERROR) << "parse config spill storage path failed, path="
<< spill;
exit(-1);
}
std::set<std::string> broken_paths;
doris::parse_conf_broken_store_paths(broken_storage_path, &broken_paths);
// auto it = paths.begin();
// for (; it != paths.end();) {
// if (broken_paths.count(it->path) > 0) {
// if (doris::config::ignore_broken_disk) {
// LOG(WARNING) << "ignore broken disk, path = " << it->path;
// it = paths.erase(it);
// } else {
// LOG(ERROR) << "a broken disk is found " << it->path;
// exit(-1);
// }
// } else if (!doris::check_datapath_rw(it->path)) {
// if (doris::config::ignore_broken_disk) {
// LOG(WARNING) << "read write test file failed, path=" << it->path;
// it = paths.erase(it);
// } else {
// LOG(ERROR) << "read write test file failed, path=" << it->path;
// // if only one disk and the disk is full, also need exit because rocksdb will open failed
// exit(-1);
// }
// } else {
// ++it;
// }
// }
// if (paths.empty()) {
// LOG(ERROR) << "All disks are broken, exit.";
// exit(-1);
// }
// it = spill_paths.begin();
// for (; it != spill_paths.end();) {
// if (!doris::check_datapath_rw(it->path)) {
// if (doris::config::ignore_broken_disk) {
// LOG(WARNING) << "read write test file failed, path=" << it->path;
// it = spill_paths.erase(it);
// } else {
// LOG(ERROR) << "read write test file failed, path=" << it->path;
// exit(-1);
// }
// } else {
// ++it;
// }
// }
// if (spill_paths.empty()) {
// LOG(ERROR) << "All spill disks are broken, exit.";
// exit(-1);
// }
// // initialize libcurl here to avoid concurrent initialization
// auto curl_ret = curl_global_init(CURL_GLOBAL_ALL);
// if (curl_ret != 0) {
// LOG(ERROR) << "fail to initialize libcurl, curl_ret=" << curl_ret;
// exit(-1);
// }
// // add logger for thrift internal
// apache::thrift::GlobalOutput.setOutputFunction(doris::thrift_output);
// Status status = Status::OK();
// if (doris::config::enable_java_support) {
// // Init jni
// status = doris::JniUtil::Init();
// if (!status.ok()) {
// LOG(WARNING) << "Failed to initialize JNI: " << status;
// exit(1);
// } else {
// LOG(INFO) << "Doris backend JNI is initialized.";
// }
// }
// // Doris own signal handler must be register after jvm is init.
// // Or our own sig-handler for SIGINT & SIGTERM will not be chained ...
// // https://www.oracle.com/java/technologies/javase/signals.html
// doris::init_signals();
// // ATTN: MUST init before `ExecEnv`, `StorageEngine` and other daemon services
// //
// // Daemon ───┬──► StorageEngine ──► ExecEnv ──► Disk/Mem/CpuInfo
// // │
// // │
// // BackendService ─┘
// doris::CpuInfo::init();
// doris::DiskInfo::init();
// doris::MemInfo::init();
// LOG(INFO) << doris::CpuInfo::debug_string();
// LOG(INFO) << doris::DiskInfo::debug_string();
// LOG(INFO) << doris::MemInfo::debug_string();
// // PHDR speed up exception handling, but exceptions from dynamically loaded libraries (dlopen)
// // will work only after additional call of this function.
// // rewrites dl_iterate_phdr will cause Jemalloc to fail to run after enable profile. see #
// // updatePHDRCache();
// if (!doris::BackendOptions::init()) {
// exit(-1);
// }
// doris::ThreadLocalHandle::create_thread_local_if_not_exits();
// init exec env
//auto* exec_env(doris::ExecEnv::GetInstance());
doris::Status status = doris::ExecEnv::init(doris::ExecEnv::GetInstance(), paths, spill_paths, broken_paths);
if(!status.ok()){
std::cout << "init fail" << std::endl;
}
}
void test_search(){
auto fs = get_local_file_filesystem();
auto index_file_reader = std::make_unique<InvertedIndexFileReader>(
fs, "/home/users/clz/run/test_diskann/123456_0", doris::InvertedIndexStorageFormatPB::V2);
auto st = index_file_reader->init(4096);
if (!st.ok()) {
std::cout << "failed to index_file_reader->init" << st << std::endl;
return ;
}
doris::TabletIndexPB index_pb;
index_pb.set_index_id(index_id);
TabletIndex index_meta;
index_meta.init_from_pb(index_pb);
auto ret = index_file_reader->open(&index_meta);
if (!ret.has_value()) {
std::cerr << "InvertedIndexFileReader open error:" << ret.error() << std::endl;
return ;
}
using T = std::decay_t<decltype(ret)>;
std::shared_ptr<DorisCompoundReader> dir = std::forward<T>(ret).value();
std::shared_ptr<DiskannVectorIndex> vindex = std::make_shared<DiskannVectorIndex>(dir);
st = vindex->load(VectorIndex::Metric::L2);
if (!st.ok()) {
std::cout << "failed to vindex->load" << std::endl;
return ;
}
float query_vec[7]={1,2,3,4,5,6,7};
SearchResult result;
std::shared_ptr<DiskannSearchParameter> searchParams = std::make_shared<DiskannSearchParameter>();
searchParams->with_search_list(100);
searchParams->with_beam_width(2);
//设置过滤条件
std::shared_ptr<IDFilter> filter = nullptr;
std::shared_ptr<roaring::Roaring> bitmap = std::make_shared<roaring::Roaring>();
// bitmap->add(1);
// filter.reset(new IDFilter(bitmap));
// searchParams->set_filter(filter);
st = vindex->search(query_vec, 5, &result, searchParams.get());
if (!st.ok()) {
std::cout << "failed to vindex->search" << std::endl;
return ;
}
if(result.has_rows()){
for(int i=0;i<result.row_count();i++){
std::cout << "idx:" << result.get_id(i) << ", distance:" << result.get_distance(i) << std::endl;
}
}
}
int main(int argc, char** argv) {
if (argc < 2) {
std::cerr << "Usage: " << argv[0] << " <test_add|test_search>" << std::endl;
return 1;
}
doris::signal::InstallFailureSignalHandler();
init_env();
std::string command = argv[1];
if(command=="add"){
test_add();
}else if(command=="search"){
test_search();
}else{
std::cout << "unkonw command" << std::endl;
}
return 0;
}