blob: dce94f8ad3204349471d11e80334922bd0c0b5f5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef COMMON_TSBLOCK_TSBLOCK_H
#define COMMON_TSBLOCK_TSBLOCK_H
#include <stdint.h>
#include "common/allocator/byte_stream.h"
#include "common/container/byte_buffer.h"
#include "common/global.h"
#include "common/logger/elog.h"
#include "tuple_desc.h"
#include "vector/fixed_length_vector.h"
#include "vector/variable_length_vector.h"
#include "vector/vector.h"
namespace common {
class TsBlock {
public:
friend class RowIterator;
friend class ColIterator;
friend class RowAppender;
friend class ColAppender;
/*
* row_count: If we can clearly estimate the number of tsblock rows,
* such as limit scenarios, such as based on statistical
* information, such as insert scenarios, etc. Then we will use the given
* number of rows
*/
explicit TsBlock(TupleDesc* tupledesc, uint32_t max_row_count = 0)
: capacity_(g_config_value_.tsblock_max_memory_),
row_count_(0),
max_row_count_(max_row_count),
tuple_desc_(tupledesc) {}
~TsBlock() {
int size = vectors_.size();
for (int i = 0; i < size; ++i) {
delete vectors_[i];
vectors_[i] = nullptr;
}
}
FORCE_INLINE uint32_t get_row_count() const { return row_count_; }
FORCE_INLINE TupleDesc* get_tuple_desc() const { return tuple_desc_; }
FORCE_INLINE Vector* get_vector(uint32_t index) { return vectors_[index]; }
FORCE_INLINE uint32_t get_column_count() const {
return tuple_desc_->get_column_count();
}
FORCE_INLINE uint32_t get_max_row_count() const { return max_row_count_; }
FORCE_INLINE uint32_t get_capacity() const { return capacity_; }
FORCE_INLINE void update_capacity(uint32_t extend_size) {
capacity_ += extend_size;
}
// need to call flush_row_count after using colappender
FORCE_INLINE int flush_row_count(uint32_t row_count) {
int errnum = E_OK;
if (row_count_ == 0) {
row_count_ = row_count;
} else if (row_count_ != row_count) {
LOGE("Inconsistent number of rows in two columns");
errnum = E_TSBLOCK_DATA_INCONSISTENCY;
}
return errnum;
}
FORCE_INLINE void fill_trailling_nulls() {
for (uint32_t i = 0; i < get_column_count(); ++i) {
for (uint32_t j = vectors_[i]->get_row_num(); j < row_count_; ++j) {
vectors_[i]->set_null(j);
}
}
}
FORCE_INLINE void reset() {
int size = vectors_.size();
for (int i = 0; i < size; ++i) {
vectors_[i]->reset();
}
row_count_ = 0;
}
FORCE_INLINE static int create_tsblock(TupleDesc* tupledesc,
TsBlock*& ret_tsblock,
uint32_t max_row_count = 0) {
int ret = common::E_OK;
if (ret_tsblock == nullptr) {
ret_tsblock = new TsBlock(tupledesc, max_row_count);
}
if (RET_FAIL(ret_tsblock->init())) {
delete ret_tsblock;
ret_tsblock = nullptr;
}
return ret;
}
int init();
void tsblock_to_json(ByteStream* byte_stream);
std::string debug_string();
private:
int build_vector(common::TSDataType type, uint32_t row_count);
void write_data(ByteStream* __restrict byte_stream, char* __restrict val,
uint32_t len, bool has_null, TSDataType type);
private:
uint32_t capacity_; // maximum memory capacity
uint32_t row_count_; // real row count
uint32_t max_row_count_;
common::BitMap select_list_;
TupleDesc* tuple_desc_;
std::vector<Vector*> vectors_;
};
class RowAppender {
public:
explicit RowAppender(TsBlock* tsblock) : tsblock_(tsblock) {}
~RowAppender() {}
// todo:(yanghao) maybe need to consider select-list
FORCE_INLINE bool add_row() {
if (LIKELY(tsblock_->row_count_ < tsblock_->max_row_count_)) {
++tsblock_->row_count_;
return true;
} else {
return false;
}
}
FORCE_INLINE void backoff_add_row() {
ASSERT(tsblock_->row_count_ > 0);
tsblock_->row_count_--;
}
FORCE_INLINE void append(uint32_t slot_index, const char* value,
uint32_t len) {
ASSERT(slot_index < tsblock_->tuple_desc_->get_column_count());
Vector* vec = tsblock_->vectors_[slot_index];
// TODO(Colin): Refine this.
TSDataType datatype = vec->get_vector_type();
if (len == 4 && datatype == INT64) {
int32_t int32_val = *reinterpret_cast<const int32_t*>(value);
int64_t int64_val = static_cast<int64_t>(int32_val);
vec->append(reinterpret_cast<const char*>(&int64_val), 8);
} else if (len == 4 && datatype == DOUBLE) {
float float_val = *reinterpret_cast<const float*>(value);
double double_val = static_cast<double>(float_val);
vec->append(reinterpret_cast<const char*>(&double_val), 8);
} else {
vec->append(value, len);
}
}
FORCE_INLINE void append_null(uint32_t slot_index) {
Vector* vec = tsblock_->vectors_[slot_index];
vec->set_null(tsblock_->row_count_ - 1);
}
private:
TsBlock* tsblock_;
};
class ColAppender {
public:
ColAppender(uint32_t column_index, TsBlock* tsblock)
: column_index_(column_index), column_row_count_(0), tsblock_(tsblock) {
ASSERT(column_index < tsblock_->tuple_desc_->get_column_count());
vec_ = tsblock_->vectors_[column_index];
}
~ColAppender() {}
// todo:(yanghao) maybe need to consider select-list
FORCE_INLINE bool add_row() {
if (LIKELY(column_row_count_ < tsblock_->max_row_count_)) {
++column_row_count_;
vec_->add_row_num();
return true;
} else {
return false;
}
}
FORCE_INLINE void append(const char* value, uint32_t len) {
vec_->append(value, len);
}
FORCE_INLINE void append_null() { vec_->set_null(column_row_count_ - 1); }
FORCE_INLINE uint32_t get_col_row_count() { return column_row_count_; }
FORCE_INLINE uint32_t get_column_index() { return column_index_; }
FORCE_INLINE int fill_null(uint32_t end_index) {
while (column_row_count_ < end_index) {
if (!add_row()) {
return E_INVALID_ARG;
}
append_null();
}
return E_OK;
}
FORCE_INLINE int fill(const char* value, uint32_t len, uint32_t end_index) {
while (column_row_count_ < end_index) {
if (!add_row()) {
return E_INVALID_ARG;
}
append(value, len);
}
return E_OK;
}
FORCE_INLINE void reset() { column_row_count_ = 0; }
private:
uint32_t column_index_;
uint32_t column_row_count_;
TsBlock* tsblock_;
Vector* vec_;
};
// todo:(yanghao) need to deal with select-list
class RowIterator {
public:
explicit RowIterator(TsBlock* tsblock) : tsblock_(tsblock), row_id_(0) {
column_count_ = tsblock_->tuple_desc_->get_column_count();
}
~RowIterator() {}
FORCE_INLINE bool end() { return row_id_ >= tsblock_->row_count_; }
FORCE_INLINE bool has_next() { return row_id_ < tsblock_->row_count_; }
FORCE_INLINE uint32_t get_column_count() { return column_count_; }
FORCE_INLINE TSDataType get_data_type(uint32_t column_index) {
ASSERT(column_index < column_count_);
return tsblock_->vectors_[column_index]->get_vector_type();
}
FORCE_INLINE void next() {
ASSERT(row_id_ < tsblock_->row_count_);
++row_id_;
for (uint32_t i = 0; i < column_count_; ++i) {
tsblock_->vectors_[i]->update_offset();
}
}
FORCE_INLINE void next(size_t ind) const {
ASSERT(row_id_ < tsblock_->row_count_);
tsblock_->vectors_[ind]->update_offset();
}
FORCE_INLINE void update_row_id() { row_id_++; }
FORCE_INLINE char* read(uint32_t column_index, uint32_t* __restrict len,
bool* __restrict null) {
ASSERT(column_index < column_count_);
Vector* vec = tsblock_->vectors_[column_index];
return vec->read(len, null, row_id_);
}
std::string debug_string(); // for debug
private:
TsBlock* tsblock_;
uint32_t row_id_; // The line number currently being reader
uint32_t column_count_;
};
// todo:(yanghao) need to deal with select-list
class ColIterator {
public:
ColIterator(uint32_t column_index, const TsBlock* tsblock)
: column_index_(column_index), row_id_(0), tsblock_(tsblock) {
ASSERT(column_index < tsblock_->tuple_desc_->get_column_count());
vec_ = tsblock_->vectors_[column_index];
}
~ColIterator() { vec_->reset_offset(); }
FORCE_INLINE bool end() const { return row_id_ >= tsblock_->row_count_; }
FORCE_INLINE void next() {
if (!vec_->is_null(row_id_)) {
vec_->update_offset();
}
++row_id_;
}
FORCE_INLINE bool has_null() { return vec_->has_null(); }
FORCE_INLINE TSDataType get_data_type() { return vec_->get_vector_type(); }
FORCE_INLINE char* read(uint32_t* __restrict len, bool* __restrict null) {
return vec_->read(len, null, row_id_);
}
FORCE_INLINE char* read(uint32_t* len) { return vec_->read(len); }
FORCE_INLINE uint32_t get_column_index() { return column_index_; }
private:
uint32_t column_index_;
uint32_t row_id_;
const TsBlock* tsblock_;
Vector* vec_;
};
int merge_tsblock_by_row(TsBlock* sea, TsBlock* river);
} // end namespace common
#endif // COMMON_TSBLOCK_TSBLOCK_H