| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| |
| #include "utilities/col_buf_encoder.h" |
| #include <cstring> |
| #include <string> |
| #include "port/port.h" |
| |
| namespace rocksdb { |
| |
| ColBufEncoder::~ColBufEncoder() {} |
| |
| namespace { |
| |
| inline uint64_t DecodeFixed64WithEndian(uint64_t val, bool big_endian, |
| size_t size) { |
| if (big_endian && port::kLittleEndian) { |
| val = EndianTransform(val, size); |
| } else if (!big_endian && !port::kLittleEndian) { |
| val = EndianTransform(val, size); |
| } |
| return val; |
| } |
| |
| } // namespace |
| |
| const std::string &ColBufEncoder::GetData() { return buffer_; } |
| |
| ColBufEncoder *ColBufEncoder::NewColBufEncoder( |
| const ColDeclaration &col_declaration) { |
| if (col_declaration.col_type == "FixedLength") { |
| return new FixedLengthColBufEncoder( |
| col_declaration.size, col_declaration.col_compression_type, |
| col_declaration.nullable, col_declaration.big_endian); |
| } else if (col_declaration.col_type == "VariableLength") { |
| return new VariableLengthColBufEncoder(); |
| } else if (col_declaration.col_type == "VariableChunk") { |
| return new VariableChunkColBufEncoder(col_declaration.col_compression_type); |
| } else if (col_declaration.col_type == "LongFixedLength") { |
| return new LongFixedLengthColBufEncoder(col_declaration.size, |
| col_declaration.nullable); |
| } |
| // Unrecognized column type |
| return nullptr; |
| } |
| |
| #ifdef ROCKSDB_UBSAN_RUN |
| #if defined(__clang__) |
| __attribute__((__no_sanitize__("shift"))) |
| #elif defined(__GNUC__) |
| __attribute__((__no_sanitize_undefined__)) |
| #endif |
| #endif |
| size_t FixedLengthColBufEncoder::Append(const char *buf) { |
| if (nullable_) { |
| if (buf == nullptr) { |
| buffer_.append(1, 0); |
| return 0; |
| } else { |
| buffer_.append(1, 1); |
| } |
| } |
| uint64_t read_val = 0; |
| memcpy(&read_val, buf, size_); |
| read_val = DecodeFixed64WithEndian(read_val, big_endian_, size_); |
| |
| // Determine write value |
| uint64_t write_val = read_val; |
| if (col_compression_type_ == kColDeltaVarint || |
| col_compression_type_ == kColRleDeltaVarint) { |
| int64_t delta = read_val - last_val_; |
| // Encode signed delta value |
| delta = (delta << 1) ^ (delta >> 63); |
| write_val = delta; |
| last_val_ = read_val; |
| } else if (col_compression_type_ == kColDict || |
| col_compression_type_ == kColRleDict) { |
| auto iter = dictionary_.find(read_val); |
| uint64_t dict_val; |
| if (iter == dictionary_.end()) { |
| // Add new entry to dictionary |
| dict_val = dictionary_.size(); |
| dictionary_.insert(std::make_pair(read_val, dict_val)); |
| dict_vec_.push_back(read_val); |
| } else { |
| dict_val = iter->second; |
| } |
| write_val = dict_val; |
| } |
| |
| // Write into buffer |
| if (IsRunLength(col_compression_type_)) { |
| if (run_length_ == -1) { |
| // First element |
| run_val_ = write_val; |
| run_length_ = 1; |
| } else if (write_val != run_val_) { |
| // End of run |
| // Write run value |
| if (col_compression_type_ == kColRle) { |
| buffer_.append(reinterpret_cast<char *>(&run_val_), size_); |
| } else { |
| PutVarint64(&buffer_, run_val_); |
| } |
| // Write run length |
| PutVarint64(&buffer_, run_length_); |
| run_val_ = write_val; |
| run_length_ = 1; |
| } else { |
| run_length_++; |
| } |
| } else { // non run-length encodings |
| if (col_compression_type_ == kColNoCompression) { |
| buffer_.append(reinterpret_cast<char *>(&write_val), size_); |
| } else { |
| PutVarint64(&buffer_, write_val); |
| } |
| } |
| return size_; |
| } |
| |
| void FixedLengthColBufEncoder::Finish() { |
| if (col_compression_type_ == kColDict || |
| col_compression_type_ == kColRleDict) { |
| std::string header; |
| PutVarint64(&header, dict_vec_.size()); |
| // Put dictionary in the header |
| for (auto item : dict_vec_) { |
| PutVarint64(&header, item); |
| } |
| buffer_ = header + buffer_; |
| } |
| if (IsRunLength(col_compression_type_)) { |
| // Finish last run value |
| if (col_compression_type_ == kColRle) { |
| buffer_.append(reinterpret_cast<char *>(&run_val_), size_); |
| } else { |
| PutVarint64(&buffer_, run_val_); |
| } |
| PutVarint64(&buffer_, run_length_); |
| } |
| } |
| |
| size_t LongFixedLengthColBufEncoder::Append(const char *buf) { |
| if (nullable_) { |
| if (buf == nullptr) { |
| buffer_.append(1, 0); |
| return 0; |
| } else { |
| buffer_.append(1, 1); |
| } |
| } |
| buffer_.append(buf, size_); |
| return size_; |
| } |
| |
| void LongFixedLengthColBufEncoder::Finish() {} |
| |
| size_t VariableLengthColBufEncoder::Append(const char *buf) { |
| uint8_t length = 0; |
| length = *buf; |
| buffer_.append(buf, 1); |
| buf += 1; |
| buffer_.append(buf, length); |
| return length + 1; |
| } |
| |
| void VariableLengthColBufEncoder::Finish() {} |
| |
| size_t VariableChunkColBufEncoder::Append(const char *buf) { |
| const char *orig_buf = buf; |
| uint8_t mark = 0xFF; |
| size_t length = 0; |
| std::string tmp_buffer; |
| while (mark == 0xFF) { |
| uint64_t val; |
| memcpy(&val, buf, 8); |
| buf += 8; |
| mark = *buf; |
| buf += 1; |
| int8_t chunk_size = 8 - (0xFF - mark); |
| if (col_compression_type_ == kColDict) { |
| auto iter = dictionary_.find(val); |
| uint64_t dict_val; |
| if (iter == dictionary_.end()) { |
| dict_val = dictionary_.size(); |
| dictionary_.insert(std::make_pair(val, dict_val)); |
| dict_vec_.push_back(val); |
| } else { |
| dict_val = iter->second; |
| } |
| PutVarint64(&tmp_buffer, dict_val); |
| } else { |
| tmp_buffer.append(reinterpret_cast<char *>(&val), chunk_size); |
| } |
| length += chunk_size; |
| } |
| |
| PutVarint64(&buffer_, length); |
| buffer_.append(tmp_buffer); |
| return buf - orig_buf; |
| } |
| |
| void VariableChunkColBufEncoder::Finish() { |
| if (col_compression_type_ == kColDict) { |
| std::string header; |
| PutVarint64(&header, dict_vec_.size()); |
| for (auto item : dict_vec_) { |
| PutVarint64(&header, item); |
| } |
| buffer_ = header + buffer_; |
| } |
| } |
| |
| } // namespace rocksdb |