| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| // |
| #ifndef ROCKSDB_LITE |
| |
| #include "utilities/column_aware_encoding_util.h" |
| |
| #ifndef __STDC_FORMAT_MACROS |
| #define __STDC_FORMAT_MACROS |
| #endif |
| |
| #include <stddef.h> |
| #include <stdint.h> |
| #include <stdio.h> |
| #include <algorithm> |
| #include <utility> |
| #include <vector> |
| #include "include/rocksdb/comparator.h" |
| #include "include/rocksdb/slice.h" |
| #include "rocksdb/env.h" |
| #include "rocksdb/status.h" |
| #include "table/block_based_table_builder.h" |
| #include "table/block_based_table_factory.h" |
| #include "table/format.h" |
| #include "table/table_reader.h" |
| #include "util/coding.h" |
| #include "utilities/col_buf_decoder.h" |
| #include "utilities/col_buf_encoder.h" |
| |
| #include "port/port.h" |
| |
| namespace rocksdb { |
| |
| ColumnAwareEncodingReader::ColumnAwareEncodingReader( |
| const std::string& file_path) |
| : file_name_(file_path), |
| ioptions_(options_), |
| internal_comparator_(BytewiseComparator()) { |
| InitTableReader(file_name_); |
| } |
| |
| void ColumnAwareEncodingReader::InitTableReader(const std::string& file_path) { |
| std::unique_ptr<RandomAccessFile> file; |
| uint64_t file_size; |
| options_.env->NewRandomAccessFile(file_path, &file, soptions_); |
| options_.env->GetFileSize(file_path, &file_size); |
| |
| file_.reset(new RandomAccessFileReader(std::move(file), file_path)); |
| |
| options_.comparator = &internal_comparator_; |
| options_.table_factory = std::make_shared<BlockBasedTableFactory>(); |
| |
| std::unique_ptr<TableReader> table_reader; |
| options_.table_factory->NewTableReader( |
| TableReaderOptions(ioptions_, soptions_, internal_comparator_, |
| /*skip_filters=*/false), |
| std::move(file_), file_size, &table_reader, /*enable_prefetch=*/false); |
| |
| table_reader_.reset(dynamic_cast<BlockBasedTable*>(table_reader.release())); |
| } |
| |
| void ColumnAwareEncodingReader::GetKVPairsFromDataBlocks( |
| std::vector<KVPairBlock>* kv_pair_blocks) { |
| table_reader_->GetKVPairsFromDataBlocks(kv_pair_blocks); |
| } |
| |
| void ColumnAwareEncodingReader::DecodeBlocks( |
| const KVPairColDeclarations& kvp_col_declarations, WritableFile* out_file, |
| const std::vector<std::string>* blocks) { |
| char* decoded_content_base = new char[16384]; |
| Options options; |
| ImmutableCFOptions ioptions(options); |
| for (auto& block : *blocks) { |
| KVPairColBufDecoders kvp_col_bufs(kvp_col_declarations); |
| auto& key_col_bufs = kvp_col_bufs.key_col_bufs; |
| auto& value_col_bufs = kvp_col_bufs.value_col_bufs; |
| auto& value_checksum_buf = kvp_col_bufs.value_checksum_buf; |
| |
| auto& slice_final_with_bit = block; |
| uint32_t format_version = 2; |
| Slice compression_dict; |
| BlockContents contents; |
| const char* content_ptr; |
| |
| CompressionType type = |
| (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1]; |
| if (type != kNoCompression) { |
| UncompressBlockContents(slice_final_with_bit.c_str(), |
| slice_final_with_bit.size() - 1, &contents, |
| format_version, compression_dict, ioptions); |
| content_ptr = contents.data.data(); |
| } else { |
| content_ptr = slice_final_with_bit.data(); |
| } |
| |
| size_t num_kv_pairs; |
| const char* header_content_ptr = content_ptr; |
| num_kv_pairs = DecodeFixed64(header_content_ptr); |
| |
| header_content_ptr += sizeof(size_t); |
| size_t num_key_columns = key_col_bufs.size(); |
| size_t num_value_columns = value_col_bufs.size(); |
| std::vector<const char*> key_content_ptr(num_key_columns); |
| std::vector<const char*> value_content_ptr(num_value_columns); |
| const char* checksum_content_ptr; |
| |
| size_t num_columns = num_key_columns + num_value_columns; |
| const char* col_content_ptr = |
| header_content_ptr + sizeof(size_t) * num_columns; |
| |
| // Read headers |
| for (size_t i = 0; i < num_key_columns; ++i) { |
| key_content_ptr[i] = col_content_ptr; |
| key_content_ptr[i] += key_col_bufs[i]->Init(key_content_ptr[i]); |
| size_t offset; |
| offset = DecodeFixed64(header_content_ptr); |
| header_content_ptr += sizeof(size_t); |
| col_content_ptr += offset; |
| } |
| for (size_t i = 0; i < num_value_columns; ++i) { |
| value_content_ptr[i] = col_content_ptr; |
| value_content_ptr[i] += value_col_bufs[i]->Init(value_content_ptr[i]); |
| size_t offset; |
| offset = DecodeFixed64(header_content_ptr); |
| header_content_ptr += sizeof(size_t); |
| col_content_ptr += offset; |
| } |
| checksum_content_ptr = col_content_ptr; |
| checksum_content_ptr += value_checksum_buf->Init(checksum_content_ptr); |
| |
| // Decode block |
| char* decoded_content = decoded_content_base; |
| for (size_t j = 0; j < num_kv_pairs; ++j) { |
| for (size_t i = 0; i < num_key_columns; ++i) { |
| key_content_ptr[i] += |
| key_col_bufs[i]->Decode(key_content_ptr[i], &decoded_content); |
| } |
| for (size_t i = 0; i < num_value_columns; ++i) { |
| value_content_ptr[i] += |
| value_col_bufs[i]->Decode(value_content_ptr[i], &decoded_content); |
| } |
| checksum_content_ptr += |
| value_checksum_buf->Decode(checksum_content_ptr, &decoded_content); |
| } |
| |
| size_t offset = decoded_content - decoded_content_base; |
| Slice output_content(decoded_content, offset); |
| |
| if (out_file != nullptr) { |
| out_file->Append(output_content); |
| } |
| } |
| delete[] decoded_content_base; |
| } |
| |
| void ColumnAwareEncodingReader::DecodeBlocksFromRowFormat( |
| WritableFile* out_file, const std::vector<std::string>* blocks) { |
| Options options; |
| ImmutableCFOptions ioptions(options); |
| for (auto& block : *blocks) { |
| auto& slice_final_with_bit = block; |
| uint32_t format_version = 2; |
| Slice compression_dict; |
| BlockContents contents; |
| std::string decoded_content; |
| |
| CompressionType type = |
| (CompressionType)slice_final_with_bit[slice_final_with_bit.size() - 1]; |
| if (type != kNoCompression) { |
| UncompressBlockContents(slice_final_with_bit.c_str(), |
| slice_final_with_bit.size() - 1, &contents, |
| format_version, compression_dict, ioptions); |
| decoded_content = std::string(contents.data.data(), contents.data.size()); |
| } else { |
| decoded_content = std::move(slice_final_with_bit); |
| } |
| |
| if (out_file != nullptr) { |
| out_file->Append(decoded_content); |
| } |
| } |
| } |
| |
| void ColumnAwareEncodingReader::DumpDataColumns( |
| const std::string& filename, |
| const KVPairColDeclarations& kvp_col_declarations, |
| const std::vector<KVPairBlock>& kv_pair_blocks) { |
| KVPairColBufEncoders kvp_col_bufs(kvp_col_declarations); |
| auto& key_col_bufs = kvp_col_bufs.key_col_bufs; |
| auto& value_col_bufs = kvp_col_bufs.value_col_bufs; |
| auto& value_checksum_buf = kvp_col_bufs.value_checksum_buf; |
| |
| FILE* fp = fopen(filename.c_str(), "w"); |
| size_t block_id = 1; |
| for (auto& kv_pairs : kv_pair_blocks) { |
| fprintf(fp, "---------------- Block: %-4" ROCKSDB_PRIszt " ----------------\n", block_id); |
| for (auto& kv_pair : kv_pairs) { |
| const auto& key = kv_pair.first; |
| const auto& value = kv_pair.second; |
| size_t value_offset = 0; |
| |
| const char* key_ptr = key.data(); |
| for (auto& buf : key_col_bufs) { |
| size_t col_size = buf->Append(key_ptr); |
| std::string tmp_buf(key_ptr, col_size); |
| Slice col(tmp_buf); |
| fprintf(fp, "%s ", col.ToString(true).c_str()); |
| key_ptr += col_size; |
| } |
| fprintf(fp, "|"); |
| |
| const char* value_ptr = value.data(); |
| for (auto& buf : value_col_bufs) { |
| size_t col_size = buf->Append(value_ptr); |
| std::string tmp_buf(value_ptr, col_size); |
| Slice col(tmp_buf); |
| fprintf(fp, " %s", col.ToString(true).c_str()); |
| value_ptr += col_size; |
| value_offset += col_size; |
| } |
| |
| if (value_offset < value.size()) { |
| size_t col_size = value_checksum_buf->Append(value_ptr); |
| std::string tmp_buf(value_ptr, col_size); |
| Slice col(tmp_buf); |
| fprintf(fp, "|%s", col.ToString(true).c_str()); |
| } else { |
| value_checksum_buf->Append(nullptr); |
| } |
| fprintf(fp, "\n"); |
| } |
| block_id++; |
| } |
| fclose(fp); |
| } |
| |
| namespace { |
| |
| void CompressDataBlock(const std::string& output_content, Slice* slice_final, |
| CompressionType* type, std::string* compressed_output) { |
| CompressionOptions compression_opts; |
| uint32_t format_version = 2; // hard-coded version |
| Slice compression_dict; |
| *slice_final = |
| CompressBlock(output_content, compression_opts, type, format_version, |
| compression_dict, compressed_output); |
| } |
| |
| } // namespace |
| |
| void ColumnAwareEncodingReader::EncodeBlocksToRowFormat( |
| WritableFile* out_file, CompressionType compression_type, |
| const std::vector<KVPairBlock>& kv_pair_blocks, |
| std::vector<std::string>* blocks) { |
| std::string output_content; |
| for (auto& kv_pairs : kv_pair_blocks) { |
| output_content.clear(); |
| std::string last_key; |
| size_t counter = 0; |
| const size_t block_restart_interval = 16; |
| for (auto& kv_pair : kv_pairs) { |
| const auto& key = kv_pair.first; |
| const auto& value = kv_pair.second; |
| |
| Slice last_key_piece(last_key); |
| size_t shared = 0; |
| if (counter >= block_restart_interval) { |
| counter = 0; |
| } else { |
| const size_t min_length = std::min(last_key_piece.size(), key.size()); |
| while ((shared < min_length) && last_key_piece[shared] == key[shared]) { |
| shared++; |
| } |
| } |
| const size_t non_shared = key.size() - shared; |
| output_content.append(key.c_str() + shared, non_shared); |
| output_content.append(value); |
| |
| last_key.resize(shared); |
| last_key.append(key.data() + shared, non_shared); |
| counter++; |
| } |
| Slice slice_final; |
| auto type = compression_type; |
| std::string compressed_output; |
| CompressDataBlock(output_content, &slice_final, &type, &compressed_output); |
| |
| if (out_file != nullptr) { |
| out_file->Append(slice_final); |
| } |
| |
| // Add a bit in the end for decoding |
| std::string slice_final_with_bit(slice_final.data(), slice_final.size()); |
| slice_final_with_bit.append(reinterpret_cast<char*>(&type), 1); |
| blocks->push_back( |
| std::string(slice_final_with_bit.data(), slice_final_with_bit.size())); |
| } |
| } |
| |
| Status ColumnAwareEncodingReader::EncodeBlocks( |
| const KVPairColDeclarations& kvp_col_declarations, WritableFile* out_file, |
| CompressionType compression_type, |
| const std::vector<KVPairBlock>& kv_pair_blocks, |
| std::vector<std::string>* blocks, bool print_column_stat) { |
| std::vector<size_t> key_col_sizes( |
| kvp_col_declarations.key_col_declarations->size(), 0); |
| std::vector<size_t> value_col_sizes( |
| kvp_col_declarations.value_col_declarations->size(), 0); |
| size_t value_checksum_size = 0; |
| |
| for (auto& kv_pairs : kv_pair_blocks) { |
| KVPairColBufEncoders kvp_col_bufs(kvp_col_declarations); |
| auto& key_col_bufs = kvp_col_bufs.key_col_bufs; |
| auto& value_col_bufs = kvp_col_bufs.value_col_bufs; |
| auto& value_checksum_buf = kvp_col_bufs.value_checksum_buf; |
| |
| size_t num_kv_pairs = 0; |
| for (auto& kv_pair : kv_pairs) { |
| const auto& key = kv_pair.first; |
| const auto& value = kv_pair.second; |
| size_t value_offset = 0; |
| num_kv_pairs++; |
| |
| const char* key_ptr = key.data(); |
| for (auto& buf : key_col_bufs) { |
| size_t col_size = buf->Append(key_ptr); |
| key_ptr += col_size; |
| } |
| |
| const char* value_ptr = value.data(); |
| for (auto& buf : value_col_bufs) { |
| size_t col_size = buf->Append(value_ptr); |
| value_ptr += col_size; |
| value_offset += col_size; |
| } |
| |
| if (value_offset < value.size()) { |
| value_checksum_buf->Append(value_ptr); |
| } else { |
| value_checksum_buf->Append(nullptr); |
| } |
| } |
| |
| kvp_col_bufs.Finish(); |
| // Get stats |
| // Compress and write a block |
| if (print_column_stat) { |
| for (size_t i = 0; i < key_col_bufs.size(); ++i) { |
| Slice slice_final; |
| auto type = compression_type; |
| std::string compressed_output; |
| CompressDataBlock(key_col_bufs[i]->GetData(), &slice_final, &type, |
| &compressed_output); |
| out_file->Append(slice_final); |
| key_col_sizes[i] += slice_final.size(); |
| } |
| for (size_t i = 0; i < value_col_bufs.size(); ++i) { |
| Slice slice_final; |
| auto type = compression_type; |
| std::string compressed_output; |
| CompressDataBlock(value_col_bufs[i]->GetData(), &slice_final, &type, |
| &compressed_output); |
| out_file->Append(slice_final); |
| value_col_sizes[i] += slice_final.size(); |
| } |
| Slice slice_final; |
| auto type = compression_type; |
| std::string compressed_output; |
| CompressDataBlock(value_checksum_buf->GetData(), &slice_final, &type, |
| &compressed_output); |
| out_file->Append(slice_final); |
| value_checksum_size += slice_final.size(); |
| } else { |
| std::string output_content; |
| // Write column sizes |
| PutFixed64(&output_content, num_kv_pairs); |
| for (auto& buf : key_col_bufs) { |
| size_t size = buf->GetData().size(); |
| PutFixed64(&output_content, size); |
| } |
| for (auto& buf : value_col_bufs) { |
| size_t size = buf->GetData().size(); |
| PutFixed64(&output_content, size); |
| } |
| // Write data |
| for (auto& buf : key_col_bufs) { |
| output_content.append(buf->GetData()); |
| } |
| for (auto& buf : value_col_bufs) { |
| output_content.append(buf->GetData()); |
| } |
| output_content.append(value_checksum_buf->GetData()); |
| |
| Slice slice_final; |
| auto type = compression_type; |
| std::string compressed_output; |
| CompressDataBlock(output_content, &slice_final, &type, |
| &compressed_output); |
| |
| if (out_file != nullptr) { |
| out_file->Append(slice_final); |
| } |
| |
| // Add a bit in the end for decoding |
| std::string slice_final_with_bit(slice_final.data(), |
| slice_final.size() + 1); |
| slice_final_with_bit[slice_final.size()] = static_cast<char>(type); |
| blocks->push_back(std::string(slice_final_with_bit.data(), |
| slice_final_with_bit.size())); |
| } |
| } |
| |
| if (print_column_stat) { |
| size_t total_size = 0; |
| for (size_t i = 0; i < key_col_sizes.size(); ++i) |
| total_size += key_col_sizes[i]; |
| for (size_t i = 0; i < value_col_sizes.size(); ++i) |
| total_size += value_col_sizes[i]; |
| total_size += value_checksum_size; |
| |
| for (size_t i = 0; i < key_col_sizes.size(); ++i) |
| printf("Key col %" ROCKSDB_PRIszt " size: %" ROCKSDB_PRIszt " percentage %lf%%\n", i, key_col_sizes[i], |
| 100.0 * key_col_sizes[i] / total_size); |
| for (size_t i = 0; i < value_col_sizes.size(); ++i) |
| printf("Value col %" ROCKSDB_PRIszt " size: %" ROCKSDB_PRIszt " percentage %lf%%\n", i, |
| value_col_sizes[i], 100.0 * value_col_sizes[i] / total_size); |
| printf("Value checksum size: %" ROCKSDB_PRIszt " percentage %lf%%\n", value_checksum_size, |
| 100.0 * value_checksum_size / total_size); |
| } |
| return Status::OK(); |
| } |
| |
| void ColumnAwareEncodingReader::GetColDeclarationsPrimary( |
| std::vector<ColDeclaration>** key_col_declarations, |
| std::vector<ColDeclaration>** value_col_declarations, |
| ColDeclaration** value_checksum_declaration) { |
| *key_col_declarations = new std::vector<ColDeclaration>{ |
| ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4, false, |
| true), |
| ColDeclaration("FixedLength", ColCompressionType::kColRleDeltaVarint, 8, |
| false, true), |
| ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8, |
| false, true), |
| ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8, |
| false, true), |
| ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8)}; |
| |
| *value_col_declarations = new std::vector<ColDeclaration>{ |
| ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4), |
| ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4), |
| ColDeclaration("FixedLength", ColCompressionType::kColRle, 1), |
| ColDeclaration("VariableLength"), |
| ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 4), |
| ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8)}; |
| *value_checksum_declaration = new ColDeclaration( |
| "LongFixedLength", ColCompressionType::kColNoCompression, 9, |
| true /* nullable */); |
| } |
| |
| void ColumnAwareEncodingReader::GetColDeclarationsSecondary( |
| std::vector<ColDeclaration>** key_col_declarations, |
| std::vector<ColDeclaration>** value_col_declarations, |
| ColDeclaration** value_checksum_declaration) { |
| *key_col_declarations = new std::vector<ColDeclaration>{ |
| ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 4, false, |
| true), |
| ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8, |
| false, true), |
| ColDeclaration("FixedLength", ColCompressionType::kColRleDeltaVarint, 8, |
| false, true), |
| ColDeclaration("FixedLength", ColCompressionType::kColRle, 1), |
| ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 4, |
| false, true), |
| ColDeclaration("FixedLength", ColCompressionType::kColDeltaVarint, 8, |
| false, true), |
| ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8, false, |
| true), |
| ColDeclaration("VariableChunk", ColCompressionType::kColNoCompression), |
| ColDeclaration("FixedLength", ColCompressionType::kColRleVarint, 8)}; |
| *value_col_declarations = new std::vector<ColDeclaration>(); |
| *value_checksum_declaration = new ColDeclaration( |
| "LongFixedLength", ColCompressionType::kColNoCompression, 9, |
| true /* nullable */); |
| } |
| |
| } // namespace rocksdb |
| |
| #endif // ROCKSDB_LITE |