blob: 79efdfc464d242b3e1c2256314e799271522beb8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef COMMON_ALLOCATOR_BYTE_STREAM_H
#define COMMON_ALLOCATOR_BYTE_STREAM_H
#include <common/constant/tsfile_constant.h>
#include <stdio.h>
#include <stdlib.h>
#include <iostream>
#include <string>
#include "common/allocator/alloc_base.h"
#include "common/allocator/my_string.h"
#include "utils/errno_define.h"
namespace common {
template <typename T>
class OptionalAtomic {
public:
OptionalAtomic(T t, bool enable_atomic = false)
: val_(t), enable_atomic_(enable_atomic) {}
FORCE_INLINE T load() const {
if (UNLIKELY(enable_atomic_)) {
return ATOMIC_LOAD(&val_);
} else {
return val_;
}
}
FORCE_INLINE void store(const T t) {
if (UNLIKELY(enable_atomic_)) {
ATOMIC_STORE(&val_, t);
} else {
val_ = t;
}
}
FORCE_INLINE T atomic_faa(const T increament) {
if (UNLIKELY(enable_atomic_)) {
return ATOMIC_FAA(&val_, increament);
} else {
T old_val = val_;
val_ = val_ + increament;
return old_val;
}
}
FORCE_INLINE T atomic_aaf(const T increament) {
if (UNLIKELY(enable_atomic_)) {
return ATOMIC_AAF(&val_, increament);
} else {
val_ = val_ + increament;
return val_;
}
}
FORCE_INLINE bool enable_atomic() const { return enable_atomic_; }
private:
T val_;
bool enable_atomic_;
};
FORCE_INLINE int32_t float_to_int(float f) {
// return *((int32_t*)(&f));
// return *(reinterpret_cast<int32_t*>(&f));
union fi {
int i_;
float f_;
};
fi my_fi;
my_fi.f_ = f;
return my_fi.i_; // By mimicking the java jdk's implementation, it's
// essentially the same as line 75.
}
FORCE_INLINE float int_to_float(int32_t i) {
// return *((float *)(&i));
union fi {
int i_;
float f_;
};
fi my_fi;
my_fi.i_ = i;
return my_fi.f_;
}
FORCE_INLINE void float_to_bytes(float f, uint8_t bytes[4]) {
/*
* See:
* floatToBytes in BytesUtils.java of IoTDB project and
* Java_java_lang_Float_intBitsToFloat in JDK project
*/
if (UNLIKELY(f != f)) { // this is NaN
// IEEE754: 0x7FC00000 for NanN
bytes[0] = 0x7F;
bytes[1] = 0xC0;
bytes[2] = 0x00;
bytes[3] = 0x00;
return;
}
// follow jdk implementation style.
union {
int i;
float f;
} u;
u.f = f;
bytes[3] = (uint8_t)(u.i);
bytes[2] = (uint8_t)(u.i >> 8);
bytes[1] = (uint8_t)(u.i >> 16);
bytes[0] = (uint8_t)(u.i >> 24);
}
FORCE_INLINE float bytes_to_float(uint8_t bytes[4]) {
int32_t i = bytes[3];
i &= 0xFF;
i |= bytes[2] << 8;
i &= 0xFFFF;
i |= bytes[1] << 16;
i &= 0xFFFFFF;
i |= bytes[0] << 24;
union {
int i;
float f;
} u;
u.i = i;
return u.f;
}
FORCE_INLINE int64_t double_to_long(double d) {
// return *((int64_t*)(&d));
union dl {
double d_;
int64_t l_;
};
dl my_dl;
my_dl.d_ = d;
return my_dl.l_;
}
FORCE_INLINE double long_to_double(int64_t l) {
// return *((double *)(&l));
union dl {
double d_;
int64_t l_;
};
dl my_dl;
my_dl.l_ = l;
return my_dl.d_;
}
FORCE_INLINE void double_to_bytes(double d, uint8_t bytes[8]) {
if (UNLIKELY(d != d)) {
// NaN, 0x7FF8000000000000L
memset(bytes, 0, 8);
bytes[0] = 0x7F;
bytes[1] = 0xF8;
return;
}
// follow jdk implementation style.
union {
long long l;
double d;
} u;
u.d = d;
bytes[7] = (uint8_t)u.l;
bytes[6] = (uint8_t)(u.l >> 8);
bytes[5] = (uint8_t)(u.l >> 16);
bytes[4] = (uint8_t)(u.l >> 24);
bytes[3] = (uint8_t)(u.l >> 32);
bytes[2] = (uint8_t)(u.l >> 40);
bytes[1] = (uint8_t)(u.l >> 48);
bytes[0] = (uint8_t)(u.l >> 56);
}
FORCE_INLINE double bytes_to_double(uint8_t bytes[8]) {
int64_t value = bytes[7];
value &= 0xFFl;
value |= ((int64_t)bytes[6]) << 8;
value &= 0xFFFFl;
value |= ((int64_t)bytes[5]) << 16;
value &= 0xFFFFFFl;
value |= ((int64_t)bytes[4]) << 24;
value &= 0xFFFFFFFFl;
value |= ((int64_t)bytes[3]) << 32;
value &= 0xFFFFFFFFFFl;
value |= ((int64_t)bytes[2]) << 40;
value &= 0xFFFFFFFFFFFFl;
value |= ((int64_t)bytes[1]) << 48;
value &= 0xFFFFFFFFFFFFFFl;
value |= ((int64_t)bytes[0]) << 56;
// follow jdk implementation style.
union {
long long l;
double d;
} u;
u.l = value;
return u.d;
}
// TODO define a WrappedByteStream class
// auto extend buffer for serialization
class ByteStream {
private:
struct Page {
OptionalAtomic<Page *> next_; // 9 bytes
uint8_t *buf_; // 8 bytes
explicit Page(bool enable_atomic) : next_(nullptr, enable_atomic) {
buf_ = (uint8_t
*)(this +
1); // I think it should add 17, because the Page
// class is 9+8=17 bytes. No, adding one to a
// pointer is not adding a byte, but adding the
// length of the byte corresponding to the type
// pointed to by the pointer, and adding one
// here is actually adding 17 bytes. No problem.
}
Page(bool enable_atomic, uint8_t *wrapped_buf)
: next_(nullptr, enable_atomic), buf_(wrapped_buf) {}
};
public:
ByteStream(uint32_t page_size, AllocModID mid, bool enable_atomic = false,
BaseAllocator &allocator = g_base_allocator)
: allocator_(allocator),
head_(nullptr, enable_atomic),
tail_(nullptr, enable_atomic),
read_page_(nullptr),
total_size_(0, enable_atomic),
read_pos_(0),
marked_read_pos_(0),
page_size_(page_size),
mid_(mid),
wrapped_page_(false, nullptr) {
// assert(page_size >= 16); // commented out by gxh on 2023.03.09
}
// TODO use a specific construct function to mark it as wrapped use.
// for wrap plain buffer to ByteStream
ByteStream()
: allocator_(g_base_allocator),
head_(nullptr, false),
tail_(nullptr, false),
read_page_(nullptr),
total_size_(0, false),
read_pos_(0),
marked_read_pos_(0),
page_size_(0),
mid_(MOD_DEFAULT),
wrapped_page_(false, nullptr) {}
~ByteStream() { destroy(); }
/* ================ Part 0: wrap from outside buffer ================ */
// if you wrap a buffer as a ByteStream, you should
// manage the outside buffer yourself.
void wrap_from(const char *buf, int32_t buf_len) {
wrapped_page_.next_.store(nullptr);
wrapped_page_.buf_ = (uint8_t *)buf;
page_size_ = buf_len;
head_.store(&wrapped_page_);
tail_.store(&wrapped_page_);
total_size_.store(buf_len);
marked_read_pos_ = 0;
read_pos_ = 0;
read_page_ = nullptr;
}
FORCE_INLINE bool is_wrapped() const {
return head_.load() == &wrapped_page_;
}
char *get_wrapped_buf() { return (char *)wrapped_page_.buf_; }
void clear_wrapped_buf() { wrapped_page_.buf_ = nullptr; }
/* ================ Part 1: basic ================ */
FORCE_INLINE int64_t remaining_size() const {
ASSERT(total_size_.load() >= read_pos_);
return total_size_.load() - read_pos_;
}
FORCE_INLINE bool has_remaining() const { return remaining_size() > 0; }
FORCE_INLINE void mark_read_pos() { marked_read_pos_ = read_pos_; }
FORCE_INLINE int64_t get_mark_len() const {
ASSERT(marked_read_pos_ <= read_pos_);
return read_pos_ - marked_read_pos_;
}
void destroy() { reset(); }
void reset() {
// if this ByteStream is wrapped from a plain buffer
// do not free the outside buffer.
if (!is_wrapped()) {
Page *p = head_.load();
while (p) {
p = head_.load()->next_.load();
allocator_.free(head_.load());
head_.store(p);
}
}
head_.store(nullptr);
tail_.store(nullptr);
read_page_ = nullptr;
total_size_.store(0);
read_pos_ = 0;
}
// never used TODO
void shallow_clone_from(ByteStream &other) {
this->page_size_ = other.page_size_;
this->mid_ = other.mid_;
this->head_.store(other.head_.load());
this->tail_.store(other.tail_.load());
this->total_size_.store(other.total_size_.load());
}
FORCE_INLINE int64_t total_size() const { return total_size_.load(); }
FORCE_INLINE int64_t read_pos() const { return read_pos_; };
FORCE_INLINE void wrapped_buf_advance_read_pos(uint32_t size) {
if (size + read_pos_ > total_size_.load()) {
read_pos_ = total_size_.load();
} else {
read_pos_ += size;
}
}
/* ================ Part 2: write_xxx and read_xxx ================ */
// writer @buf with length @len into this bytestream
int write_buf(const uint8_t *buf, const uint32_t len) {
int ret = common::E_OK;
uint32_t write_len = 0;
while (write_len < len) {
if (RET_FAIL(prepare_space())) {
std::cout << "write_buf error " << ret << std::endl;
return ret;
}
uint32_t remainder = page_size_ - (total_size_.load() % page_size_);
uint32_t copy_len =
remainder < (len - write_len) ? remainder : (len - write_len);
memcpy(tail_.load()->buf_ + total_size_.load() % page_size_,
buf + write_len, copy_len);
total_size_.atomic_aaf(copy_len);
write_len += copy_len;
}
return ret;
}
// reader @want_len bytes to @buf, @read_len indicates real len we reader.
// if ByteStream do not have so many bytes, it will return E_PARTIAL_READ if
// no other error occure.
int read_buf(uint8_t *buf, const uint32_t want_len, uint32_t &read_len) {
int ret = common::E_OK;
bool partial_read = (read_pos_ + want_len > total_size_.load());
uint32_t want_len_limited =
partial_read ? (total_size_.load() - read_pos_) : want_len;
read_len = 0;
while (read_len < want_len_limited) {
if (RET_FAIL(check_space())) {
return ret;
}
uint32_t remainder = page_size_ - (read_pos_ % page_size_);
uint32_t copy_len = remainder < want_len_limited - read_len
? remainder
: want_len_limited - read_len;
memcpy(buf + read_len, read_page_->buf_ + (read_pos_ % page_size_),
copy_len);
read_len += copy_len;
read_pos_ += copy_len;
}
return partial_read ? common::E_PARTIAL_READ : common::E_OK;
}
FORCE_INLINE int write_buf(const char *buf, const uint32_t len) {
return write_buf((const uint8_t *)buf, len);
}
FORCE_INLINE int read_buf(char *buf, const uint32_t want_len,
uint32_t &read_len) {
return read_buf((uint8_t *)buf, want_len, read_len);
}
FORCE_INLINE int read_buf(char *buf, const int32_t want_len,
int32_t &read_len) {
return read_buf((uint8_t *)buf, (uint32_t &)want_len,
(uint32_t &)read_len);
}
void purge_prev_pages(int purge_page_count = INT32_MAX) {
Page *cur = head_.load();
while (cur != tail_.load() && purge_page_count-- > 0) {
Page *next = cur->next_.load();
allocator_.free(cur);
cur = next;
}
head_.store(cur);
}
/* ================ Part 3: writing internal buffers ================ */
/*
* use @acquire_buf function to get a buf, caller will fill the buf, and
* after filling, caller use @buffer_used to notice ByteStream to update
* internal variables.
*
* Note: it should be used in single thread.
*/
struct Buffer {
char *buf_;
uint32_t len_;
Buffer() : buf_(nullptr), len_(0) {}
};
Buffer acquire_buf() {
Buffer b;
if (common::E_OK != prepare_space()) {
return b;
}
b.buf_ =
(char *)(tail_.load()->buf_ + (total_size_.load() % page_size_));
b.len_ = page_size_ - (total_size_.load() % page_size_);
return b;
}
void buffer_used(uint32_t used_bytes) {
ASSERT(used_bytes >= 1);
// would not span page
ASSERT((total_size_.load() / page_size_) ==
((total_size_.load() + used_bytes - 1) / page_size_));
total_size_.atomic_aaf(used_bytes);
}
/* ================ Part 4: reading internal buffers ================ */
/*
* one-shot reader iterator
* Do not use it if the host_ is still writing
*/
struct BufferIterator {
const ByteStream &host_;
Page *cur_;
Page *end_;
int64_t total_size_;
BufferIterator(const ByteStream &bs) : host_(bs) {
cur_ = bs.head_.load();
end_ = bs.tail_.load();
total_size_ = bs.total_size_.load();
}
Buffer get_next_buf() {
Buffer b;
if (cur_ != nullptr) {
b.buf_ = (char *)cur_->buf_;
if (cur_ == end_ &&
host_.total_size_.load() % host_.page_size_ != 0) {
b.len_ = host_.total_size_.load() % host_.page_size_;
} else {
b.len_ = host_.page_size_;
}
ASSERT(b.len_ > 0);
cur_ = cur_->next_.load();
}
return b;
}
};
BufferIterator init_buffer_iterator() const {
return BufferIterator(*this);
}
/*
* producer-consumer mode reader iterator
* Note:
* the host ByteStream should be configured as atomically.
*/
struct Consumer {
const ByteStream &host_;
Page *cur_;
uint32_t read_offset_within_cur_page_;
int64_t total_end_offset_; // for DEBUG
Consumer(const ByteStream &bs) : host_(bs) {
ASSERT(bs.head_.enable_atomic());
cur_ = nullptr;
read_offset_within_cur_page_ = 0;
total_end_offset_ = 0;
}
Buffer get_next_buf(ByteStream &host) {
Buffer b;
if (UNLIKELY(host.head_.load() == nullptr)) {
// host is empty, return empty buffer.
return b;
}
if (UNLIKELY(cur_ == nullptr)) {
// this consumer did not initialiazed.
cur_ = host_.head_.load();
read_offset_within_cur_page_ = 0;
}
// get tail position <tail_, total_size_> atomically
Page *host_end = nullptr;
int64_t host_total_size = 0;
while (true) {
host_end = host_.tail_.load();
host_total_size = host_.total_size_.load();
if (host_end == host_.tail_.load()) {
break;
}
}
while (true) {
if (cur_ == host_end) {
if (host_total_size % host_.page_size_ == 0) {
if (read_offset_within_cur_page_ == host_.page_size_) {
return b;
} else {
b.buf_ = ((char *)(cur_->buf_)) +
read_offset_within_cur_page_;
b.len_ =
host_.page_size_ - read_offset_within_cur_page_;
read_offset_within_cur_page_ = host_.page_size_;
total_end_offset_ += b.len_;
return b;
}
} else {
if (read_offset_within_cur_page_ ==
(host_total_size % host_.page_size_)) {
return b;
} else {
b.buf_ = ((char *)(cur_->buf_)) +
read_offset_within_cur_page_;
b.len_ = (host_total_size % host_.page_size_) -
read_offset_within_cur_page_;
read_offset_within_cur_page_ =
(host_total_size % host_.page_size_);
total_end_offset_ += b.len_;
return b;
}
}
} else {
if (read_offset_within_cur_page_ == host_.page_size_) {
cur_ = cur_->next_.load();
read_offset_within_cur_page_ = 0;
} else {
b.buf_ = ((char *)(cur_->buf_)) +
read_offset_within_cur_page_;
b.len_ =
host_.page_size_ - read_offset_within_cur_page_;
cur_ = cur_->next_.load();
read_offset_within_cur_page_ = 0;
total_end_offset_ += b.len_;
return b;
}
}
}
ASSERT(b.len_ < (1 << 30));
return b;
}
};
private:
FORCE_INLINE int prepare_space() {
int ret = common::E_OK;
if (UNLIKELY(tail_.load() == nullptr ||
total_size_.load() % page_size_ == 0)) {
Page *p = nullptr;
if (RET_FAIL(alloc_page(p))) {
return ret;
}
} else {
// do nothing
}
return ret;
}
FORCE_INLINE int check_space() {
if (UNLIKELY(read_pos_ >= total_size_.load())) {
return common::E_OUT_OF_RANGE;
}
if (UNLIKELY(read_page_ == nullptr)) {
read_page_ = head_.load();
} else if (UNLIKELY(read_pos_ % page_size_ == 0)) {
read_page_ = read_page_->next_.load();
}
if (UNLIKELY(read_page_ == nullptr)) {
return common::E_OUT_OF_RANGE;
}
return common::E_OK;
}
FORCE_INLINE int alloc_page(Page *&p) {
int ret = common::E_OK;
char *buf = (char *)allocator_.alloc(page_size_ + sizeof(Page), mid_);
if (UNLIKELY(buf == nullptr)) {
ret = common::E_OOM;
} else {
p = new (buf) Page(head_.enable_atomic());
p->next_.store(nullptr);
if (head_.load()) {
tail_.load()->next_.store(p);
tail_.store(p);
} else {
head_.store(p);
tail_.store(p);
}
}
// printf("\nByteStream alloc_page, this=%p, new_page=%p\n", this, p);
return ret;
}
DISALLOW_COPY_AND_ASSIGN(ByteStream);
private:
BaseAllocator &allocator_;
OptionalAtomic<Page *> head_;
OptionalAtomic<Page *> tail_;
Page *read_page_; // only one thread is allow to reader this ByteStream
OptionalAtomic<int64_t> total_size_; // total size in byte
int64_t read_pos_; // current reader position
int64_t marked_read_pos_; // current reader position
int64_t page_size_;
AllocModID mid_;
Page wrapped_page_;
};
FORCE_INLINE int merge_byte_stream(ByteStream &sea, ByteStream &river,
bool purge_river = false) {
int ret = common::E_OK;
ByteStream::BufferIterator buf_iter = river.init_buffer_iterator();
while (true) {
ByteStream::Buffer buf = buf_iter.get_next_buf();
if (buf.buf_ == nullptr) {
break;
} else {
if (RET_FAIL(sea.write_buf(buf.buf_, buf.len_))) {
break;
}
}
if (purge_river) {
river.purge_prev_pages(1);
}
}
return ret;
}
FORCE_INLINE int copy_bs_to_buf(ByteStream &bs, char *src_buf,
uint32_t src_buf_len) {
ByteStream::BufferIterator buf_iter = bs.init_buffer_iterator();
uint32_t copyed_len = 0;
while (true) {
ByteStream::Buffer buf = buf_iter.get_next_buf();
if (buf.buf_ == nullptr) {
break;
} else {
if (src_buf_len - copyed_len < buf.len_) {
return E_BUF_NOT_ENOUGH;
}
memcpy(src_buf + copyed_len, buf.buf_, buf.len_);
copyed_len += buf.len_;
}
}
return E_OK;
}
FORCE_INLINE uint32_t get_var_uint_size(
uint32_t
ui32) // return: the length of usigned number after varint encoding.
{
uint32_t bytes = 0;
while ((ui32 & 0xFFFFFF80) != 0) {
bytes++;
ui32 = ui32 >> 7;
}
return ++bytes;
}
#if DEBUG_SE
FORCE_INLINE void DEBUG_print_byte_stream(const char *print_tag,
ByteStream &b) {
const int32_t WRAP_COUNT = 16;
int32_t print_char_count = 0;
ByteStream::BufferIterator buf_iter = b.init_buffer_iterator();
while (true) {
ByteStream::Buffer buf = buf_iter.get_next_buf();
if (buf.buf_ == nullptr) {
break;
} else {
// print_hex(buf.buf_, buf.len_);
printf("\n %s: buf.buf_=%p, buf.len_=%u\n", print_tag, buf.buf_,
buf.len_);
for (uint32_t i = 0; i < buf.len_; i++) {
if (print_char_count++ % WRAP_COUNT == 0) {
printf("\n %s offset=0x%05x : ", print_tag,
print_char_count - 1);
}
printf("%02x ", (uint8_t)buf.buf_[i]);
}
}
}
printf("\n\n");
}
FORCE_INLINE void DEBUG_hex_dump_buf(const char *print_tag, const char *buf,
int32_t len) {
const int32_t WRAP_COUNT = 16;
for (int i = 0; i < len; i++) {
if (i % WRAP_COUNT == 0) {
printf("\n%s", print_tag);
}
printf("%02x ", (uint8_t)(buf[i]));
}
printf("\n");
}
#endif // ifndef NDEBUG
class SerializationUtil {
public:
FORCE_INLINE static int write_ui8(uint8_t ui8, ByteStream &out) {
return out.write_buf(&ui8, 1);
}
FORCE_INLINE static int write_ui16(uint16_t ui16, ByteStream &out) {
uint8_t buf[2];
buf[0] = (uint8_t)((ui16 >> 8) & 0xFF);
buf[1] = (uint8_t)((ui16) & 0xFF);
return out.write_buf(buf, 2);
}
FORCE_INLINE static int write_ui32(uint32_t ui32, ByteStream &out) {
uint8_t buf[4];
buf[0] = (uint8_t)((ui32 >> 24) & 0xFF);
buf[1] = (uint8_t)((ui32 >> 16) & 0xFF);
buf[2] = (uint8_t)((ui32 >> 8) & 0xFF);
buf[3] = (uint8_t)((ui32) & 0xFF);
return out.write_buf(buf, 4);
}
FORCE_INLINE static int write_ui64(uint64_t ui64, ByteStream &out) {
// big-endian: most signification byte at smaller address
// refer to tsfile.utils.BytesUtil
uint8_t buf[8];
buf[0] = (uint8_t)((ui64 >> 56) & 0xFF);
buf[1] = (uint8_t)((ui64 >> 48) & 0xFF);
buf[2] = (uint8_t)((ui64 >> 40) & 0xFF);
buf[3] = (uint8_t)((ui64 >> 32) & 0xFF);
buf[4] = (uint8_t)((ui64 >> 24) & 0xFF);
buf[5] = (uint8_t)((ui64 >> 16) & 0xFF);
buf[6] = (uint8_t)((ui64 >> 8) & 0xFF);
buf[7] = (uint8_t)((ui64) & 0xFF);
return out.write_buf(buf, 8);
}
FORCE_INLINE static int read_ui8(uint8_t &ui8, ByteStream &in) {
int ret = common::E_OK;
char buf[1];
uint32_t read_len = 0;
ret = in.read_buf(buf, 1, read_len);
ui8 = (uint8_t)buf[0];
return ret;
}
FORCE_INLINE static int read_ui16(uint16_t &ui16, ByteStream &in) {
int ret = common::E_OK;
uint8_t buf[2];
uint32_t read_len = 0;
if (RET_FAIL(in.read_buf(buf, 2, read_len))) {
return ret;
}
ui16 = buf[0];
ui16 = (ui16 << 8) | buf[1];
return ret;
}
FORCE_INLINE static int read_ui32(uint32_t &ui32, ByteStream &in) {
int ret = common::E_OK;
uint8_t buf[4];
uint32_t read_len = 0;
if (RET_FAIL(in.read_buf(buf, 4, read_len))) {
return ret;
}
ui32 = buf[0];
ui32 = (ui32 << 8) | (buf[1] & 0xFF);
ui32 = (ui32 << 8) | (buf[2] & 0xFF);
ui32 = (ui32 << 8) | (buf[3] & 0xFF);
return ret;
}
FORCE_INLINE static int read_ui64(uint64_t &ui64, ByteStream &in) {
int ret = common::E_OK;
uint8_t buf[8];
uint32_t read_len = 0;
if (RET_FAIL(in.read_buf(buf, 8, read_len))) {
return ret;
}
ui64 = buf[0];
ui64 = (ui64 << 8) | (buf[1] & 0xFF);
ui64 = (ui64 << 8) | (buf[2] & 0xFF);
ui64 = (ui64 << 8) | (buf[3] & 0xFF);
ui64 = (ui64 << 8) | (buf[4] & 0xFF);
ui64 = (ui64 << 8) | (buf[5] & 0xFF);
ui64 = (ui64 << 8) | (buf[6] & 0xFF);
ui64 = (ui64 << 8) | (buf[7] & 0xFF);
return ret;
}
// caller guarantee buffer has at least 1 byte
FORCE_INLINE static uint8_t read_ui8(char *buffer) {
return *(uint8_t *)buffer;
}
// caller guarantee buffer has at least 2 bytes
FORCE_INLINE static uint16_t read_ui16(char *buffer) {
uint8_t *buf = (uint8_t *)buffer;
uint16_t ui16 = buf[0];
ui16 = (ui16 << 8) | (buf[1] & 0xFF);
return ui16;
}
// caller guarantee buffer has at least 4 bytes
FORCE_INLINE static uint32_t read_ui32(char *buffer) {
uint8_t *buf = (uint8_t *)buffer;
uint32_t ui32 = buf[0];
ui32 = (ui32 << 8) | (buf[1] & 0xFF);
ui32 = (ui32 << 8) | (buf[2] & 0xFF);
ui32 = (ui32 << 8) | (buf[3] & 0xFF);
return ui32;
}
// caller guarantee buffer has at least 8 bytes
FORCE_INLINE static uint64_t read_ui64(char *buffer) {
uint8_t *buf = (uint8_t *)buffer;
uint64_t ui64 = buf[0];
ui64 = (ui64 << 8) | (buf[1] & 0xFF);
ui64 = (ui64 << 8) | (buf[2] & 0xFF);
ui64 = (ui64 << 8) | (buf[3] & 0xFF);
ui64 = (ui64 << 8) | (buf[4] & 0xFF);
ui64 = (ui64 << 8) | (buf[5] & 0xFF);
ui64 = (ui64 << 8) | (buf[6] & 0xFF);
ui64 = (ui64 << 8) | (buf[7] & 0xFF);
return ui64;
}
FORCE_INLINE static int write_float(float f, ByteStream &out) {
uint8_t bytes[4];
float_to_bytes(f, bytes);
return out.write_buf(bytes, 4);
}
FORCE_INLINE static int read_float(float &f, ByteStream &in) {
int ret = common::E_OK;
uint8_t bytes[4];
uint32_t read_len = 0;
if (RET_FAIL(in.read_buf(bytes, 4, read_len))) {
} else if (read_len != 4) {
ret = common::E_BUF_NOT_ENOUGH;
} else {
f = bytes_to_float(bytes);
}
return ret;
}
FORCE_INLINE static float read_float(char *buffer) {
uint8_t *buf = (uint8_t *)buffer;
return bytes_to_float(buf);
}
FORCE_INLINE static int write_double(double d, ByteStream &out) {
uint8_t bytes[8];
double_to_bytes(d, bytes);
return out.write_buf(bytes, 8);
}
FORCE_INLINE static int read_double(double &d, ByteStream &in) {
int ret = common::E_OK;
uint32_t read_len = 0;
uint8_t bytes[8];
if (RET_FAIL(in.read_buf(bytes, 8, read_len))) {
} else if (read_len != 8) {
ret = common::E_BUF_NOT_ENOUGH;
} else {
d = bytes_to_double(bytes);
}
return ret;
}
FORCE_INLINE static double read_double(char *buffer) {
uint8_t *buf = (uint8_t *)buffer;
return bytes_to_double(buf);
}
FORCE_INLINE static int write_i8(int8_t i8, ByteStream &out) {
return write_ui8((uint8_t)i8, out);
}
FORCE_INLINE static int write_i16(int16_t i16, ByteStream &out) {
return write_ui16((uint16_t)i16, out);
}
FORCE_INLINE static int write_i32(int32_t i32, ByteStream &out) {
return write_ui32((uint32_t)i32, out);
}
FORCE_INLINE static int write_i64(int64_t i64, ByteStream &out) {
return write_ui64((uint64_t)i64, out);
}
FORCE_INLINE static int read_i8(int8_t &i8, ByteStream &in) {
return read_ui8((uint8_t &)i8, in);
}
FORCE_INLINE static int read_i16(int16_t &i16, ByteStream &in) {
return read_ui16((uint16_t &)i16, in);
}
FORCE_INLINE static int read_i32(int32_t &i32, ByteStream &in) {
return read_ui32((uint32_t &)i32, in);
}
FORCE_INLINE static int read_i64(int64_t &i64, ByteStream &in) {
return read_ui64((uint64_t &)i64, in);
}
// TODO more test on var_xxx
FORCE_INLINE static int do_write_var_uint(uint32_t ui32, ByteStream &out) {
int ret = common::E_OK;
while ((ui32 & 0xFFFFFF80) != 0) {
if (RET_FAIL(write_ui8((ui32 & 0x7F) | 0x80, out))) {
return ret;
}
ui32 = ui32 >> 7;
}
return write_ui8(ui32 & 0x7F, out);
}
FORCE_INLINE static int do_write_var_uint(uint32_t ui32, char *out_buf,
const uint32_t out_buf_len) {
uint32_t offset = 0;
while ((ui32 & 0xFFFFFF80) != 0) {
if (offset >= out_buf_len) {
return common::E_BUF_NOT_ENOUGH;
}
*(out_buf + offset) = (ui32 & 0x7F) | 0x80;
ui32 = ui32 >> 7;
offset++;
}
if (offset >= out_buf_len) {
return common::E_BUF_NOT_ENOUGH;
}
*(out_buf + offset) = (ui32 & 0x7F);
return common::E_OK;
}
FORCE_INLINE static int do_read_var_uint(uint32_t &ui32, ByteStream &in) {
// Follow readUnsignedVarInt in ReadWriteForEncodingUtils.java
int ret = common::E_OK;
ui32 = 0;
int i = 0;
uint8_t ui8 = 0;
if (RET_FAIL(read_ui8(ui8, in))) {
return ret;
}
while (ui8 != 0xF && (ui8 & 0x80) != 0) {
ui32 = ui32 | ((ui8 & 0x7F) << i);
i = i + 7;
if (RET_FAIL(read_ui8(ui8, in))) {
return ret;
}
}
ui32 = ui32 | (ui8 << i);
return ret;
}
FORCE_INLINE static int do_read_var_uint(uint32_t &ui32, char *in_buf,
int in_buf_len, int *ret_offset) {
ui32 = 0;
int i = 0;
uint8_t ui8 = 0;
int offset = 0;
if (offset < in_buf_len) {
ui8 = *(uint8_t *)(in_buf + offset);
offset++;
} else {
return common::E_BUF_NOT_ENOUGH;
}
while (ui8 != 0xF && (ui8 & 0x80) != 0) {
ui32 = ui32 | ((ui8 & 0x7F) << i);
i = i + 7;
if (offset < in_buf_len) {
ui8 = *(uint8_t *)(in_buf + offset);
offset++;
} else {
return common::E_BUF_NOT_ENOUGH;
}
}
ui32 = ui32 | (ui8 << i);
if (ret_offset != nullptr) {
*ret_offset = offset;
}
return common::E_OK;
}
FORCE_INLINE static int write_var_int(int32_t i32, ByteStream &out) {
// TODO 8byte to 4byte.
// but in IoTDB java, it has only write_var_uint(i32)
int ui32 = i32 << 1;
if (i32 < 0) {
ui32 = ~ui32;
}
return do_write_var_uint(static_cast<uint32_t>(ui32), out);
}
FORCE_INLINE static int read_var_int(int32_t &i32, ByteStream &in) {
int ret = common::E_OK;
uint32_t ui32;
if (RET_FAIL(do_read_var_uint(ui32, in))) {
} else {
i32 = (int32_t)(ui32 >> 1);
if ((ui32 & 1) != 0) {
i32 = ~i32;
}
}
return ret;
}
FORCE_INLINE static int write_var_uint(uint32_t ui32, ByteStream &out) {
return do_write_var_uint(ui32, out);
}
FORCE_INLINE static int write_var_uint(uint32_t ui32, char *out_buf,
const uint32_t out_buf_len) {
return do_write_var_uint(ui32, out_buf, out_buf_len);
}
FORCE_INLINE static int read_var_uint(uint32_t &ui32, ByteStream &in) {
return do_read_var_uint(ui32, in);
}
FORCE_INLINE static int read_var_uint(uint32_t &ui32, char *in_buf,
int in_buf_len,
int *ret_offset = nullptr) {
return do_read_var_uint(ui32, in_buf, in_buf_len, ret_offset);
}
FORCE_INLINE static int write_var_str(const std::string &str,
ByteStream &out) {
int ret = common::E_OK;
if (RET_FAIL(write_var_int(((int32_t)str.size()), out))) {
} else if (RET_FAIL(out.write_buf(str.c_str(), str.size()))) {
}
return ret;
}
// If the str is nullptr, NO_STR_TO_READ will be added instead.
FORCE_INLINE static int write_var_char_ptr(const std::string *str,
ByteStream &out) {
int ret = common::E_OK;
if (str == nullptr) {
write_var_int(storage::NO_STR_TO_READ, out);
return ret;
}
size_t str_len = str->length();
if (RET_FAIL(write_var_int(str_len, out))) {
return ret;
} else if (RET_FAIL(out.write_buf(str->c_str(), str_len))) {
return ret;
}
return ret;
}
// If `str` is not a nullptr after calling `read_var_char_ptr`, it
// indicates that memory has been allocated and must be freed.
FORCE_INLINE static int read_var_char_ptr(std::string *&str,
ByteStream &in) {
int ret = common::E_OK;
int32_t len = 0;
int32_t read_len = 0;
if (RET_FAIL(read_var_int(len, in))) {
return ret;
} else {
if (len == storage::NO_STR_TO_READ) {
str = nullptr;
return ret;
} else {
char *tmp_buf = static_cast<char *>(malloc(len));
if (RET_FAIL(in.read_buf(tmp_buf, len, read_len))) {
free(tmp_buf);
return ret;
} else if (len != read_len) {
free(tmp_buf);
ret = E_BUF_NOT_ENOUGH;
} else {
str = new std::string(tmp_buf, len);
free(tmp_buf);
}
}
}
return ret;
}
FORCE_INLINE static int read_var_str(std::string &str, ByteStream &in) {
int ret = common::E_OK;
int32_t len = 0;
int32_t read_len = 0;
if (RET_FAIL(read_var_int(len, in))) {
} else {
char *tmp_buf = (char *)malloc(len + 1);
tmp_buf[len] = '\0';
if (RET_FAIL(in.read_buf(tmp_buf, len, read_len))) {
} else if (len != read_len) {
ret = E_BUF_NOT_ENOUGH;
} else {
str = std::string(tmp_buf);
}
free(tmp_buf);
}
return ret;
}
FORCE_INLINE static int write_str(const std::string &str, ByteStream &out) {
int ret = common::E_OK;
if (RET_FAIL(write_i32((static_cast<int32_t>(str.size())), out))) {
} else if (RET_FAIL(out.write_buf(str.c_str(), str.size()))) {
}
return ret;
}
FORCE_INLINE static int read_str(std::string &str, ByteStream &in) {
int ret = common::E_OK;
int32_t len = 0;
if (RET_FAIL(read_i32(len, in))) {
} else {
int32_t read_len = 0;
char *tmp_buf = static_cast<char *>(malloc(len + 1));
tmp_buf[len] = '\0';
if (RET_FAIL(in.read_buf(tmp_buf, len, read_len))) {
} else if (len != read_len) {
ret = E_BUF_NOT_ENOUGH;
} else {
str = std::string(tmp_buf);
}
free(tmp_buf);
}
return ret;
}
FORCE_INLINE static int write_str(const String &str, ByteStream &out) {
int ret = common::E_OK;
if (RET_FAIL(write_i32((static_cast<int32_t>(str.len_)), out))) {
} else if (RET_FAIL(out.write_buf(str.buf_, str.len_))) {
}
return ret;
}
FORCE_INLINE static int read_str(String &str, common::PageArena *pa,
ByteStream &in) {
int ret = common::E_OK;
int32_t len = 0;
int32_t read_len = 0;
if (RET_FAIL(read_i32(len, in))) {
} else {
char *buf = (char *)pa->alloc(len);
if (IS_NULL(buf)) {
ret = common::E_OOM;
} else {
if (RET_FAIL(in.read_buf(buf, len, read_len))) {
} else if (len != read_len) {
ret = E_BUF_NOT_ENOUGH;
} else {
str.buf_ = buf;
str.len_ = len;
}
}
}
return ret;
}
FORCE_INLINE static int write_mystring(const String &str, ByteStream &out) {
int ret = common::E_OK;
if (RET_FAIL(write_var_int(str.len_, out))) {
} else if (RET_FAIL(out.write_buf(str.buf_, str.len_))) {
}
return ret;
}
FORCE_INLINE static int read_mystring(String &str, common::PageArena *pa,
ByteStream &in) {
int ret = common::E_OK;
int32_t len = 0;
int32_t read_len = 0;
if (RET_FAIL(read_var_int(len, in))) {
} else {
char *buf = (char *)pa->alloc(len);
if (IS_NULL(buf)) {
ret = common::E_OOM;
} else {
if (RET_FAIL(in.read_buf(buf, len, read_len))) {
} else if (len != read_len) {
ret = E_BUF_NOT_ENOUGH;
} else {
str.buf_ = buf;
str.len_ = len;
}
}
}
return ret;
}
FORCE_INLINE static int write_char(char ch, ByteStream &out) {
return write_ui8(ch, out);
}
FORCE_INLINE static int read_char(char &ch, ByteStream &in) {
return read_ui8((uint8_t &)ch, in);
}
};
FORCE_INLINE bool deserialize_buf_not_enough(int ret) {
return ret == E_OUT_OF_RANGE || ret == E_PARTIAL_READ;
}
FORCE_INLINE char *get_bytes_from_bytestream(ByteStream &bs) {
if (bs.total_size() == 0) {
return nullptr;
}
uint32_t size = bs.total_size();
char *ret_buf = (char *)malloc(size);
if (ret_buf == nullptr) {
return nullptr;
}
ByteStream::BufferIterator buf_iter = bs.init_buffer_iterator();
uint32_t offset = 0;
while (true) {
ByteStream::Buffer buf = buf_iter.get_next_buf();
if (buf.buf_ == nullptr) {
break;
} else {
assert(offset + buf.len_ <= size);
memcpy(ret_buf + offset, buf.buf_, buf.len_);
offset += buf.len_;
}
}
assert(offset == size);
return ret_buf;
}
} // end namespace common
#endif // COMMON_ALLOCATOR_BYTE_STREAM_H