blob: a1643439a2ffbd6c66b849164cad43bd1fc2180b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "common/tsfile_common.h"
#include <algorithm>
#include <map>
#include "common/logger/elog.h"
using namespace common;
namespace storage {
const char *MAGIC_STRING_TSFILE = "TsFile";
const int MAGIC_STRING_TSFILE_LEN = 6;
const char VERSION_NUM_BYTE = 0x03;
const char CHUNK_GROUP_HEADER_MARKER = 0;
const char CHUNK_HEADER_MARKER = 1;
const char ONLY_ONE_PAGE_CHUNK_HEADER_MARKER = 5;
const char SEPARATOR_MARKER = 2;
const char OPERATION_INDEX_RANGE = 4;
/* ================ TimeseriesIndex ================ */
int TimeseriesIndex::add_chunk_meta(ChunkMeta *chunk_meta,
bool serialize_statistic) {
int ret = E_OK;
if (IS_NULL(chunk_meta)) {
ret = E_INVALID_ARG;
} else if (RET_FAIL(chunk_meta->serialize_to(
chunk_meta_list_serialized_buf_, serialize_statistic))) {
} else if (RET_FAIL(statistic_->merge_with(chunk_meta->statistic_))) {
}
return ret;
}
/* ================ TSMIterator ================ */
int TSMIterator::init() {
// sort chunk_group_meta_list_ : {[measurementA, offsetA1], [measurementB,
// offsetB1], [measurementA, offsetA2], [measurementB, offsetB2]} ->
// {[measurementA, offsetA1], [measurementA, offsetA2], [measurementB,
// offsetB1], [measurementB, offsetB2]}
for (auto chunk_group_meta_iter = chunk_group_meta_list_.begin();
chunk_group_meta_iter != chunk_group_meta_list_.end();
chunk_group_meta_iter++) {
auto chunk_meta_list = chunk_group_meta_iter.get()->chunk_meta_list_;
// Use a map to group chunks by measurement_name_
std::map<common::String, std::vector<ChunkMeta *>> groups;
std::vector<common::String> order;
for (auto it = chunk_meta_list.begin(); it != chunk_meta_list.end();
it++) {
auto *chunk_meta = it.get();
if (groups.find(chunk_meta->measurement_name_) == groups.end()) {
order.push_back(chunk_meta->measurement_name_);
}
groups[chunk_meta->measurement_name_].push_back(chunk_meta);
}
// Sort each group of chunk metas by offset
for (auto it = groups.begin(); it != groups.end(); ++it) {
std::vector<ChunkMeta *> &group = it->second;
std::sort(group.begin(), group.end(),
[](ChunkMeta *a, ChunkMeta *b) {
return a->offset_of_chunk_header_ <
b->offset_of_chunk_header_;
});
}
// Clear and refill chunk_group_meta_list
chunk_group_meta_iter.get()->chunk_meta_list_.clear();
for (const auto &measurement_name : order) {
for (auto chunk_meta : groups[measurement_name]) {
chunk_group_meta_iter.get()->chunk_meta_list_.push_back(
chunk_meta);
}
}
}
// FIXME empty list
chunk_group_meta_iter_ = chunk_group_meta_list_.begin();
if (chunk_group_meta_iter_ == chunk_group_meta_list_.end()) {
return E_NOT_EXIST;
}
while (chunk_group_meta_iter_ != chunk_group_meta_list_.end()) {
chunk_meta_iter_ =
chunk_group_meta_iter_.get()->chunk_meta_list_.begin();
std::map<common::String, std::vector<ChunkMeta *>> tmp;
while (chunk_meta_iter_ !=
chunk_group_meta_iter_.get()->chunk_meta_list_.end()) {
tmp[chunk_meta_iter_.get()->measurement_name_].emplace_back(
chunk_meta_iter_.get());
chunk_meta_iter_++;
}
if (!tmp.empty()) {
tsm_chunk_meta_info_[chunk_group_meta_iter_.get()->device_name_] =
tmp;
}
chunk_group_meta_iter_++;
}
tsm_measurement_iter_ = tsm_chunk_meta_info_.begin()->second.begin();
tsm_device_iter_ = tsm_chunk_meta_info_.begin();
return E_OK;
}
bool TSMIterator::has_next() const {
return tsm_device_iter_ != tsm_chunk_meta_info_.end();
}
int TSMIterator::get_next(String &ret_device_name, String &ret_measurement_name,
TimeseriesIndex &ret_ts_index) {
int ret = E_OK;
SimpleList<ChunkMeta *> chunk_meta_list_of_this_ts(
1024, MOD_TIMESERIES_INDEX_OBJ); // FIXME
if (tsm_measurement_iter_ == tsm_device_iter_->second.end()) {
tsm_device_iter_++;
if (!has_next()) {
return E_NO_MORE_DATA;
} else {
tsm_measurement_iter_ = tsm_device_iter_->second.begin();
}
}
ret_device_name.shallow_copy_from(tsm_device_iter_->first);
ret_measurement_name.shallow_copy_from(tsm_measurement_iter_->first);
for (auto meta : tsm_measurement_iter_->second) {
chunk_meta_list_of_this_ts.push_back(meta);
}
if (chunk_meta_list_of_this_ts.size() == 0) {
return E_TSFILE_WRITER_META_ERR;
}
const bool multi_chunks = chunk_meta_list_of_this_ts.size() > 1;
ChunkMeta *first_chunk_meta = chunk_meta_list_of_this_ts.front();
const char meta_type = (multi_chunks ? 1 : 0) | (first_chunk_meta->mask_);
const TSDataType data_type = first_chunk_meta->data_type_;
const TsID ts_id = first_chunk_meta->ts_id_;
ret_ts_index.set_ts_meta_type(meta_type);
ret_ts_index.set_measurement_name(ret_measurement_name);
ret_ts_index.set_data_type(data_type);
ret_ts_index.init_statistic(data_type);
ret_ts_index.set_ts_id(ts_id);
SimpleList<ChunkMeta *>::Iterator ts_chunk_meta_iter =
chunk_meta_list_of_this_ts.begin();
for (;
IS_SUCC(ret) && ts_chunk_meta_iter != chunk_meta_list_of_this_ts.end();
ts_chunk_meta_iter++) {
ChunkMeta *chunk_meta = ts_chunk_meta_iter.get();
if (RET_FAIL(ret_ts_index.add_chunk_meta(chunk_meta, multi_chunks))) {
}
}
if (IS_SUCC(ret)) {
ret_ts_index.finish();
}
if (UNLIKELY(ret_device_name.is_null())) {
ret = E_TSFILE_WRITER_META_ERR;
// log_err("null device name from chunk_group_meta_iter, ret=%d", ret);
ASSERT(false);
}
tsm_measurement_iter_++;
return ret;
}
/* ================ MetaIndexNode ================ */
struct MetaIndexEntryComp {
bool operator()(MetaIndexEntry *entry, const String &target_name) {
return entry->name_.less_than(target_name);
}
};
int MetaIndexNode::binary_search_children(const String &name, bool exact_search,
MetaIndexEntry &ret_index_entry,
int64_t &ret_end_offset) {
#if DEBUG_SE
std::cout << "MetaIndexNode::binary_search_children start, name=" << name
<< ", exact_search=" << exact_search
<< ", children_.size=" << children_.size() << std::endl;
for (int i = 0; i < (int)children_.size(); i++) {
std::cout << "Iterating children: " << children_[i]->name_ << std::endl;
}
#endif
bool is_aligned = false;
if (node_type_ == LEAF_MEASUREMENT && children_.size() == 1 &&
children_[0]->name_.len_ == 0) {
is_aligned = true;
}
// children_[l] <= name < children_[h]
int l = -1;
if (is_aligned) {
l = 0;
} else {
int h = (int)children_.size();
bool found = false;
while (l < h - 1) {
int m = (l + h) / 2;
int cmp = children_[m]->name_.compare(name);
#if DEBUG_SE
std::cout
<< "MetaIndexNode::binary_search_children doing, cmp: cur="
<< children_[m]->name_ << ", name=" << name
<< ", exact_search=" << exact_search
<< ", children_.size=" << children_.size() << std::endl;
#endif
if (cmp == 0) {
l = m;
found = true;
break;
} else if (cmp > 0) { // children_[m] > name
h = m;
} else { // children_[m] < name
l = m;
}
}
if ((l == -1) || (exact_search && !found)) {
#if DEBUG_SE
std::cout << "MetaIndexNode::binary_search_children end, "
"ret=E_NOT_EXIST, name="
<< name << ", exact_search=" << exact_search << std::endl;
#endif
return E_NOT_EXIST;
}
}
ret_index_entry = *children_[l];
if (l == (int)children_.size() - 1) {
ret_end_offset = this->end_offset_;
} else {
ret_end_offset = children_[l + 1]->offset_;
}
#if DEBUG_SE
std::cout << "MetaIndexNode::binary_search_children end, ret_index_entry="
<< ret_index_entry << ", ret_end_offset=" << ret_end_offset
<< ", name=" << name << ", exact_search=" << exact_search
<< std::endl;
#endif
return E_OK;
}
#if 0
int MetaIndexNode::binary_search_children(const String &name,
bool exact_search,
MetaIndexEntry &ret_index_entry,
int64_t &ret_end_offset)
{
// TODO currently, we do sequence search.
// We will change it to binary search after replacing SimpleList with SimpleVector
SimpleList<MetaIndexEntry*>::Iterator it;
SimpleList<MetaIndexEntry*>::Iterator prev_it;
SimpleList<MetaIndexEntry*>::Iterator target_it;
for (it = children_.begin(); it != children_.end(); it++) {
int cmp_res = it.get()->name_.compare(name);
if (cmp_res == 0) {
target_it = it;
break;
} else if (cmp_res < 0) {
prev_it = it;
} else {
break;
}
} // end for
if (exact_search && target_it == children_.end()) {
return E_NOT_EXIST;
} else {
if (target_it == children_.end()) {
target_it = prev_it;
}
ret_index_entry = *(target_it.get());
target_it++;
if (target_it == children_.end()) {
ret_end_offset = this->end_offset_;
} else {
ret_end_offset = target_it.get()->offset_;
}
}
return E_OK;
}
#endif
} // end namespace storage