blob: 4cf2ffa92dd737a3558dab6ba7491ce3f32bd487 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef WRITER_PAGE_VALUE_WRITER_H
#define WRITER_PAGE_VALUE_WRITER_H
#include <vector>
#include "common/allocator/byte_stream.h"
#include "common/container/bit_map.h"
#include "common/statistic.h"
#include "compress/compressor.h"
#include "encoding/encoder.h"
#include "utils/db_utils.h"
namespace storage {
struct ValuePageData {
uint32_t col_notnull_bitmap_buf_size_;
uint32_t value_buf_size_;
uint32_t uncompressed_size_;
uint32_t compressed_size_;
char *uncompressed_buf_;
char *compressed_buf_;
Compressor *compressor_;
ValuePageData()
: col_notnull_bitmap_buf_size_(0),
value_buf_size_(0),
uncompressed_size_(0),
compressed_size_(0),
uncompressed_buf_(nullptr),
compressed_buf_(nullptr),
compressor_(nullptr) {}
int init(common::ByteStream &col_notnull_bitmap_bs,
common::ByteStream &value_bs, Compressor *compressor,
uint32_t size);
void destroy() {
// Be careful about the memory
if (uncompressed_buf_ != nullptr) {
common::mem_free(uncompressed_buf_);
uncompressed_buf_ = nullptr;
}
if (compressed_buf_ != nullptr && compressor_ != nullptr) {
compressor_->after_compress(compressed_buf_);
compressed_buf_ = nullptr;
}
}
};
#define VPW_DO_WRITE_FOR_TYPE(ISNULL) \
{ \
int ret = common::E_OK; \
if ((size_ / 8) + 1 > col_notnull_bitmap_.size()) { \
col_notnull_bitmap_.push_back(0); \
} \
if (!ISNULL) { \
col_notnull_bitmap_[size_ / 8] |= (MASK >> (size_ % 8)); \
} \
size_++; \
if (ISNULL) { \
return ret; \
} \
if (RET_FAIL(value_encoder_->encode(value, value_out_stream_))) { \
} else { \
statistic_->update(timestamp, value); \
} \
return ret; \
}
class ValuePageWriter {
public:
ValuePageWriter()
: data_type_(common::VECTOR),
value_encoder_(nullptr),
statistic_(nullptr),
col_notnull_bitmap_out_stream_(OUT_STREAM_PAGE_SIZE,
common::MOD_PAGE_WRITER_OUTPUT_STREAM),
value_out_stream_(OUT_STREAM_PAGE_SIZE,
common::MOD_PAGE_WRITER_OUTPUT_STREAM),
cur_page_data_(),
compressor_(nullptr),
is_inited_(false),
col_notnull_bitmap_(),
size_(0) {}
~ValuePageWriter() { destroy(); }
int init(common::TSDataType data_type, common::TSEncoding encoding,
common::CompressionType compression);
void reset();
void destroy();
FORCE_INLINE int write(int64_t timestamp, bool value, bool isnull) {
if (UNLIKELY(data_type_ != common::BOOLEAN)) {
return common::E_TYPE_NOT_MATCH;
}
VPW_DO_WRITE_FOR_TYPE(isnull);
}
FORCE_INLINE int write(int64_t timestamp, int32_t value, bool isnull) {
if (UNLIKELY(data_type_ != common::INT32 &&
data_type_ != common::DATE)) {
return common::E_TYPE_NOT_MATCH;
}
VPW_DO_WRITE_FOR_TYPE(isnull);
}
FORCE_INLINE int write(int64_t timestamp, int64_t value, bool isnull) {
if (UNLIKELY(data_type_ != common::INT64 &&
data_type_ != common::TIMESTAMP)) {
return common::E_TYPE_NOT_MATCH;
}
VPW_DO_WRITE_FOR_TYPE(isnull);
}
FORCE_INLINE int write(int64_t timestamp, float value, bool isnull) {
if (UNLIKELY(data_type_ != common::FLOAT)) {
return common::E_TYPE_NOT_MATCH;
}
VPW_DO_WRITE_FOR_TYPE(isnull);
}
FORCE_INLINE int write(int64_t timestamp, double value, bool isnull) {
if (UNLIKELY(data_type_ != common::DOUBLE)) {
return common::E_TYPE_NOT_MATCH;
}
VPW_DO_WRITE_FOR_TYPE(isnull);
}
FORCE_INLINE int write(int64_t timestamp, common::String value,
bool isnull) {
if (UNLIKELY(data_type_ != common::STRING &&
data_type_ != common::TEXT &&
data_type_ != common::BLOB)) {
return common::E_TYPE_NOT_MATCH;
}
VPW_DO_WRITE_FOR_TYPE(isnull);
}
FORCE_INLINE uint32_t get_point_numer() const { return statistic_->count_; }
FORCE_INLINE uint32_t get_col_notnull_bitmap_out_stream_size() const {
return col_notnull_bitmap_out_stream_.total_size();
}
FORCE_INLINE uint32_t get_page_memory_size() const {
return col_notnull_bitmap_out_stream_.total_size() +
value_out_stream_.total_size();
}
/**
* calculate max possible memory size it occupies, including time
* outputStream and value outputStream, because size outputStream is never
* used until flushing.
*
* @return allocated size in time, value and outputStream
*/
FORCE_INLINE uint32_t estimate_max_mem_size() const {
return sizeof(int32_t) + 1 +
col_notnull_bitmap_out_stream_.total_size() +
value_out_stream_.total_size() +
value_encoder_->get_max_byte_size();
}
int write_to_chunk(common::ByteStream &pages_data, bool write_header,
bool write_statistic, bool write_data_to_chunk_data);
FORCE_INLINE common::ByteStream &get_col_notnull_bitmap_data() {
return col_notnull_bitmap_out_stream_;
}
FORCE_INLINE common::ByteStream &get_value_data() {
return value_out_stream_;
}
FORCE_INLINE Statistic *get_statistic() { return statistic_; }
ValuePageData get_cur_page_data() { return cur_page_data_; }
void destroy_page_data() { cur_page_data_.destroy(); }
private:
FORCE_INLINE int prepare_end_page() {
int ret = common::E_OK;
if (RET_FAIL(value_encoder_->flush(value_out_stream_))) {
}
for (auto col_notnull_bitmap_byte : col_notnull_bitmap_) {
col_notnull_bitmap_out_stream_.write_buf(&col_notnull_bitmap_byte,
1);
}
return ret;
}
int copy_page_data_to(common::ByteStream &my_page_data,
common::ByteStream &pages_data);
private:
static const uint32_t OUT_STREAM_PAGE_SIZE = 1024;
private:
common::TSDataType data_type_;
Encoder *value_encoder_;
Statistic *statistic_;
common::ByteStream col_notnull_bitmap_out_stream_;
common::ByteStream value_out_stream_;
ValuePageData cur_page_data_;
Compressor *compressor_;
bool is_inited_;
std::vector<uint8_t> col_notnull_bitmap_;
uint32_t size_;
static uint32_t MASK;
};
} // end namespace storage
#endif // WRITER_PAGE_VALUE_WRITER_H