blob: c596e3946ae60a5b00a62e4e122ed6d265d6d79c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef COMMON_TSBLOCK_TSBLOCK_H
#define COMMON_TSBLOCK_TSBLOCK_H
#include <cstdint>
#include "common/allocator/byte_stream.h"
#include "common/container/byte_buffer.h"
#include "common/global.h"
#include "common/logger/elog.h"
#include "tuple_desc.h"
#include "vector/vector.h"
namespace common {
class TsBlock {
public:
friend class RowIterator;
friend class ColIterator;
friend class RowAppender;
friend class ColAppender;
/*
* row_count: If we can clearly estimate the number of tsblock rows,
* such as limit scenarios, such as based on statistical
* information, such as insert scenarios, etc. Then we will use the given
* number of rows
*/
explicit TsBlock(TupleDesc *tupledesc, uint32_t max_row_count = 0)
: capacity_(g_config_value_.tsblock_max_memory_),
row_count_(0),
max_row_count_(max_row_count),
tuple_desc_(tupledesc) {}
~TsBlock() {
for (auto & vector : vectors_) {
delete vector;
vector = nullptr;
}
}
FORCE_INLINE uint32_t get_row_count() const { return row_count_; }
FORCE_INLINE TupleDesc *get_tuple_desc() const { return tuple_desc_; }
FORCE_INLINE Vector *get_vector(uint32_t index) { return vectors_[index]; }
FORCE_INLINE uint32_t get_column_count() const {
return static_cast<uint32_t>(tuple_desc_->get_column_count());
}
FORCE_INLINE uint32_t get_max_row_count() const { return max_row_count_; }
FORCE_INLINE uint32_t get_capacity() const { return capacity_; }
FORCE_INLINE void update_capacity(uint32_t extend_size) {
capacity_ += extend_size;
}
FORCE_INLINE void fill_trailling_nulls() {
for (uint32_t i = 0; i < get_column_count(); ++i) {
for (uint32_t j = vectors_[i]->get_row_num(); j < row_count_; ++j) {
vectors_[i]->set_null(j);
}
}
}
FORCE_INLINE void reset() {
for (const auto & vector : vectors_) {
vector->reset();
}
row_count_ = 0;
}
FORCE_INLINE static int create_tsblock(TupleDesc *tupledesc,
TsBlock *&ret_tsblock,
uint32_t max_row_count = 0) {
int ret = error_info::E_OK;
if (ret_tsblock == nullptr) {
ret_tsblock = new TsBlock(tupledesc, max_row_count);
}
if (RET_FAIL(ret_tsblock->init())) {
delete ret_tsblock;
ret_tsblock = nullptr;
}
return ret;
}
int init();
void tsblock_to_json(ByteStream *byte_stream);
std::string debug_string();
private:
int build_vector(common::TSDataType type, uint32_t row_count);
void write_data(ByteStream *__restrict byte_stream, char *__restrict val,
uint32_t len, bool has_null, TSDataType type);
private:
uint32_t capacity_; // maximum memory capacity
uint32_t row_count_; // real row count
uint32_t max_row_count_;
// common::BitMap select_list_;
TupleDesc *tuple_desc_;
std::vector<Vector *> vectors_;
};
class RowAppender {
public:
explicit RowAppender(TsBlock *tsblock) : tsblock_(tsblock) {}
~RowAppender() = default;
// todo:(yanghao) maybe need to consider select-list
FORCE_INLINE bool add_row() {
if (LIKELY(tsblock_->row_count_ < tsblock_->max_row_count_)) {
++tsblock_->row_count_;
return true;
} else {
return false;
}
}
FORCE_INLINE void backoff_add_row() {
ASSERT(tsblock_->row_count_ > 0);
tsblock_->row_count_--;
}
FORCE_INLINE void append(uint32_t slot_index, const char *value,
uint32_t len) {
ASSERT(slot_index < tsblock_->tuple_desc_->get_column_count());
Vector *vec = tsblock_->vectors_[slot_index];
vec->append(value, len);
}
FORCE_INLINE void append_null(uint32_t slot_index) {
Vector *vec = tsblock_->vectors_[slot_index];
vec->set_null(tsblock_->row_count_ - 1);
}
private:
TsBlock *tsblock_;
};
class ColAppender {
public:
ColAppender(uint32_t column_index, TsBlock *tsblock)
: column_index_(column_index), column_row_count_(0), tsblock_(tsblock) {
ASSERT(column_index < tsblock_->tuple_desc_->get_column_count());
vec_ = tsblock_->vectors_[column_index];
}
~ColAppender() = default;
// todo:(yanghao) maybe need to consider select-list
FORCE_INLINE bool add_row() {
if (LIKELY(column_row_count_ < tsblock_->max_row_count_)) {
++column_row_count_;
vec_->add_row_num();
return true;
} else {
return false;
}
}
FORCE_INLINE void append(const char *value, uint32_t len) {
vec_->append(value, len);
}
FORCE_INLINE void append_null() { vec_->set_null(column_row_count_ - 1); }
FORCE_INLINE uint32_t get_col_row_count() const { return column_row_count_; }
FORCE_INLINE uint32_t get_column_index() const { return column_index_; }
FORCE_INLINE int fill_null(uint32_t end_index) {
while (column_row_count_ < end_index) {
if (!add_row()) {
return E_INVALID_ARG;
}
append_null();
}
return E_OK;
}
FORCE_INLINE int fill(const char *value, uint32_t len,
uint32_t end_index) {
while (column_row_count_ < end_index) {
if (!add_row()) {
return E_INVALID_ARG;
}
append(value, len);
}
return E_OK;
}
FORCE_INLINE void reset() { column_row_count_ = 0; }
private:
uint32_t column_index_;
uint32_t column_row_count_;
TsBlock *tsblock_;
Vector *vec_;
};
// todo:(yanghao) need to deal with select-list
class RowIterator {
public:
explicit RowIterator(TsBlock *tsblock) : tsblock_(tsblock), row_id_(0) {
column_count_ = static_cast<uint32_t>(tsblock_->tuple_desc_->get_column_count());
}
~RowIterator() = default;
FORCE_INLINE bool end() { return row_id_ >= tsblock_->row_count_; }
FORCE_INLINE bool has_next() { return row_id_ < tsblock_->row_count_; }
FORCE_INLINE uint32_t get_column_count() const { return column_count_; }
FORCE_INLINE TSDataType get_data_type(uint32_t column_index) {
ASSERT(column_index < column_count_);
return tsblock_->vectors_[column_index]->get_vector_type();
}
FORCE_INLINE void next() {
ASSERT(row_id_ < tsblock_->row_count_);
++row_id_;
for (uint32_t i = 0; i < column_count_; ++i) {
tsblock_->vectors_[i]->update_offset();
}
}
FORCE_INLINE char *read(uint32_t column_index, uint32_t *__restrict len,
bool *__restrict null) {
ASSERT(column_index < column_count_);
Vector *vec = tsblock_->vectors_[column_index];
return vec->read(len, null, row_id_);
}
std::string debug_string(); // for debug
private:
TsBlock *tsblock_;
uint32_t row_id_; // The line number currently being reader
uint32_t column_count_;
};
// todo:(yanghao) need to deal with select-list
class ColIterator {
public:
ColIterator(uint32_t column_index, const TsBlock *tsblock)
: column_index_(column_index), row_id_(0), tsblock_(tsblock) {
ASSERT(column_index < tsblock_->tuple_desc_->get_column_count());
vec_ = tsblock_->vectors_[column_index];
}
~ColIterator() { vec_->reset_offset(); }
FORCE_INLINE bool end() const { return row_id_ >= tsblock_->row_count_; }
FORCE_INLINE void next() {
++row_id_;
vec_->update_offset();
}
FORCE_INLINE bool has_null() { return vec_->has_null(); }
FORCE_INLINE TSDataType get_data_type() { return vec_->get_vector_type(); }
FORCE_INLINE char *read(uint32_t *__restrict len, bool *__restrict null) {
return vec_->read(len, null, row_id_);
}
FORCE_INLINE char *read(uint32_t *len) { return vec_->read(len); }
FORCE_INLINE uint32_t get_column_index() const { return column_index_; }
private:
uint32_t column_index_;
uint32_t row_id_;
const TsBlock *tsblock_;
Vector *vec_;
};
int merge_tsblock_by_row(TsBlock *sea, TsBlock *river);
} // end namespace common
#endif // COMMON_TSBLOCK_TSBLOCK_H