blob: 794278fa00e81c8ea0619f12a4c80057f99b2b2d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <cstdlib>
#include <fstream>
#include <iostream>
#include <Builder/SerializedPlanBuilder.h>
#include <Compression/CompressedReadBuffer.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/HashJoin/HashJoin.h>
#include <Interpreters/TableJoin.h>
#include <Parser/CHColumnToSparkRow.h>
#include <Parser/LocalExecutor.h>
#include <Parser/ParserContext.h>
#include <Parser/SerializedPlanParser.h>
#include <Parser/SparkRowToCHColumn.h>
#include <Parser/SubstraitParserUtils.h>
#include <Parsers/ASTIdentifier.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Shuffle/ShuffleReader.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/SparkMergeTreeMeta.h>
#include <Storages/MergeTree/SparkStorageMergeTree.h>
#include <Storages/SubstraitSource/SubstraitFileSource.h>
#include <benchmark/benchmark.h>
#include <substrait/plan.pb.h>
#include <Common/CHUtil.h>
#include <Common/DebugUtils.h>
#include <Common/PODArray_fwd.h>
#include <Common/QueryContext.h>
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
#include "testConfig.h"
#if defined(__SSE2__)
#include <emmintrin.h>
#endif
using namespace local_engine;
using namespace dbms;
using namespace DB;
DB::ContextMutablePtr global_context;
[[maybe_unused]] static void BM_ParquetRead(benchmark::State & state)
{
const auto * type_string = "columns format version: 1\n"
"2 columns:\n"
"`l_returnflag` String\n"
"`l_linestatus` String\n";
auto names_and_types_list = NamesAndTypesList::parse(type_string);
ColumnsWithTypeAndName columns;
for (const auto & item : names_and_types_list)
{
ColumnWithTypeAndName col;
col.column = item.type->createColumn();
col.type = item.type;
col.name = item.name;
columns.emplace_back(std::move(col));
}
auto header = Block(std::move(columns));
for (auto _ : state)
{
substrait::ReadRel::LocalFiles files;
substrait::ReadRel::LocalFiles::FileOrFiles * file = files.add_items();
std::string file_path{GLUTEN_SOURCE_TPCH_URI("lineitem/part-00000-d08071cb-0dfa-42dc-9198-83cb334ccda3-c000.snappy.parquet")};
file->set_uri_file(file_path);
substrait::ReadRel::LocalFiles::FileOrFiles::ParquetReadOptions parquet_format;
file->mutable_parquet()->CopyFrom(parquet_format);
auto builder = std::make_unique<QueryPipelineBuilder>();
builder->init(Pipe(std::make_shared<SubstraitFileSource>(QueryContext::globalContext(), header, files)));
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(*builder));
auto executor = PullingPipelineExecutor(pipeline);
auto result = header.cloneEmpty();
size_t total_rows = 0;
while (executor.pull(result))
{
debug::headBlock(result);
total_rows += result.rows();
}
std::cerr << "rows:" << total_rows << std::endl;
}
}
[[maybe_unused]] static void BM_ShuffleReader(benchmark::State & state)
{
for (auto _ : state)
{
auto read_buffer = std::make_unique<ReadBufferFromFile>("/tmp/test_shuffle/ZSTD/data.dat");
// read_buffer->seek(357841655, SEEK_SET);
auto shuffle_reader = local_engine::ShuffleReader(std::move(read_buffer), true, -1, -1);
Block * block;
int sum = 0;
do
{
block = shuffle_reader.read();
sum += block->rows();
} while (block->columns() != 0);
std::cout << "total rows:" << sum << std::endl;
}
}
[[maybe_unused]] static void BM_SimpleAggregate(benchmark::State & state)
{
for (auto _ : state)
{
state.PauseTiming();
dbms::SerializedSchemaBuilder schema_builder;
auto * schema = schema_builder.column("l_orderkey", "I64")
.column("l_partkey", "I64")
.column("l_suppkey", "I64")
.column("l_linenumber", "I32")
.column("l_quantity", "FP64")
.column("l_extendedprice", "FP64")
.column("l_discount", "FP64")
.column("l_tax", "FP64")
.column("l_shipdate_new", "FP64")
.column("l_commitdate_new", "FP64")
.column("l_receiptdate_new", "FP64")
.build();
dbms::SerializedPlanBuilder plan_builder;
auto * measure = dbms::measureFunction(dbms::SUM, {dbms::selection(6)});
auto plan
= plan_builder.registerSupportedFunctions()
.aggregate({}, {measure})
.read(
"/home/kyligence/Documents/test-dataset/intel-gazelle-test-" + std::to_string(state.range(0)) + ".snappy.parquet",
std::move(schema))
.build();
auto parser_context = ParserContext::build(global_context, *plan);
local_engine::SerializedPlanParser parser(parser_context);
auto local_executor = parser.createExecutor(*plan);
state.ResumeTiming();
while (local_executor->hasNext())
local_engine::SparkRowInfoPtr spark_row_info = local_executor->next();
}
}
[[maybe_unused]] static void BM_TPCH_Q6(benchmark::State & state)
{
for (auto _ : state)
{
state.PauseTiming();
dbms::SerializedSchemaBuilder schema_builder;
auto * schema = schema_builder.column("l_discount", "FP64")
.column("l_extendedprice", "FP64")
.column("l_quantity", "FP64")
.column("l_shipdate_new", "Date")
.build();
dbms::SerializedPlanBuilder plan_builder;
auto * agg_mul = dbms::scalarFunction(dbms::MULTIPLY, {dbms::selection(1), dbms::selection(0)});
auto * measure1 = dbms::measureFunction(dbms::SUM, {agg_mul});
auto * measure2 = dbms::measureFunction(dbms::SUM, {dbms::selection(1)});
auto * measure3 = dbms::measureFunction(dbms::SUM, {dbms::selection(2)});
auto plan
= plan_builder.registerSupportedFunctions()
.aggregate({}, {measure1, measure2, measure3})
.project({dbms::selection(2), dbms::selection(1), dbms::selection(0)})
.filter(dbms::scalarFunction(
dbms::AND,
{dbms::scalarFunction(
AND,
{dbms::scalarFunction(
AND,
{dbms::scalarFunction(
AND,
{dbms::scalarFunction(
AND,
{dbms::scalarFunction(
AND,
{dbms::scalarFunction(
AND,
{scalarFunction(IS_NOT_NULL, {selection(3)}),
scalarFunction(IS_NOT_NULL, {selection(0)})}),
scalarFunction(IS_NOT_NULL, {selection(2)})}),
dbms::scalarFunction(GREATER_THAN_OR_EQUAL, {selection(3), literalDate(8766)})}),
scalarFunction(LESS_THAN, {selection(3), literalDate(9131)})}),
scalarFunction(GREATER_THAN_OR_EQUAL, {selection(0), literal(0.05)})}),
scalarFunction(LESS_THAN_OR_EQUAL, {selection(0), literal(0.07)})}),
scalarFunction(LESS_THAN, {selection(2), literal(24.0)})}))
.read(
"/home/kyligence/Documents/test-dataset/intel-gazelle-test-" + std::to_string(state.range(0)) + ".snappy.parquet",
std::move(schema))
.build();
auto parser_context = ParserContext::build(QueryContext::globalContext(), *plan);
local_engine::SerializedPlanParser parser(parser_context);
auto local_executor = parser.createExecutor(*plan);
state.ResumeTiming();
while (local_executor->hasNext())
{
Block * block = local_executor->nextColumnar();
delete block;
}
}
}
[[maybe_unused]] static void BM_MERGE_TREE_TPCH_Q6_FROM_TEXT(benchmark::State & state)
{
QueryContext::globalContext() = global_context;
for (auto _ : state)
{
state.PauseTiming();
//const char * path = "/data1/tpc_data/tpch1000_zhichao/serialized_q6_substrait_plan1.txt";
const char * path = "/data1/tpc_data/tpch100_zhichao/serialized_q4_substrait_plan_parquet.bin";
//const char * path = "/data1/tpc_data/tpch100_zhichao/serialized_q4_substrait_plan_mergetree.bin";
std::ifstream t(path);
std::string str((std::istreambuf_iterator<char>(t)), std::istreambuf_iterator<char>());
std::cout << "the plan from: " << path << std::endl;
auto plan = BinaryToMessage<substrait::Plan>(str);
auto parser_context = ParserContext::build(global_context, plan);
local_engine::SerializedPlanParser parser(parser_context);
auto local_executor = parser.createExecutor(plan);
state.ResumeTiming();
while (local_executor->hasNext()) [[maybe_unused]]
auto * x = local_executor->nextColumnar();
}
}
[[maybe_unused]] static void BM_CHColumnToSparkRowWithString(benchmark::State & state)
{
for (auto _ : state)
{
state.PauseTiming();
dbms::SerializedSchemaBuilder schema_builder;
auto * schema = schema_builder.column("l_orderkey", "I64")
.column("l_partkey", "I64")
.column("l_suppkey", "I64")
.column("l_linenumber", "I32")
.column("l_quantity", "FP64")
.column("l_extendedprice", "FP64")
.column("l_discount", "FP64")
.column("l_tax", "FP64")
.column("l_returnflag", "String")
.column("l_linestatus", "String")
.column("l_shipdate_new", "FP64")
.column("l_commitdate_new", "FP64")
.column("l_receiptdate_new", "FP64")
.column("l_shipinstruct", "String")
.column("l_shipmode", "String")
.column("l_comment", "String")
.build();
dbms::SerializedPlanBuilder plan_builder;
auto plan
= plan_builder
.read(
"/home/kyligence/Documents/test-dataset/intel-gazelle-test-" + std::to_string(state.range(0)) + ".snappy.parquet",
std::move(schema))
.build();
auto parser_context = ParserContext::build(QueryContext::globalContext(), *plan);
local_engine::SerializedPlanParser parser(parser_context);
auto local_executor = parser.createExecutor(*plan);
state.ResumeTiming();
while (local_executor->hasNext())
local_engine::SparkRowInfoPtr spark_row_info = local_executor->next();
}
}
[[maybe_unused]] static void BM_SparkRowToCHColumn(benchmark::State & state)
{
for (auto _ : state)
{
state.PauseTiming();
dbms::SerializedSchemaBuilder schema_builder;
auto * schema = schema_builder.column("l_orderkey", "I64")
.column("l_partkey", "I64")
.column("l_suppkey", "I64")
.column("l_linenumber", "I32")
.column("l_quantity", "FP64")
.column("l_extendedprice", "FP64")
.column("l_discount", "FP64")
.column("l_tax", "FP64")
.column("l_shipdate_new", "FP64")
.column("l_commitdate_new", "FP64")
.column("l_receiptdate_new", "FP64")
.build();
dbms::SerializedPlanBuilder plan_builder;
auto plan
= plan_builder
.read(
"/home/kyligence/Documents/test-dataset/intel-gazelle-test-" + std::to_string(state.range(0)) + ".snappy.parquet",
std::move(schema))
.build();
auto parser_context = ParserContext::build(QueryContext::globalContext(), *plan);
local_engine::SerializedPlanParser parser(parser_context);
auto local_executor = parser.createExecutor(*plan);
local_engine::SparkRowToCHColumn converter;
while (local_executor->hasNext())
{
local_engine::SparkRowInfoPtr spark_row_info = local_executor->next();
state.ResumeTiming();
auto block = converter.convertSparkRowInfoToCHColumn(*spark_row_info, local_executor->getHeader());
state.PauseTiming();
}
state.ResumeTiming();
}
}
[[maybe_unused]] static void BM_SparkRowToCHColumnWithString(benchmark::State & state)
{
for (auto _ : state)
{
state.PauseTiming();
dbms::SerializedSchemaBuilder schema_builder;
auto * schema = schema_builder.column("l_orderkey", "I64")
.column("l_partkey", "I64")
.column("l_suppkey", "I64")
.column("l_linenumber", "I32")
.column("l_quantity", "FP64")
.column("l_extendedprice", "FP64")
.column("l_discount", "FP64")
.column("l_tax", "FP64")
.column("l_returnflag", "String")
.column("l_linestatus", "String")
.column("l_shipdate_new", "FP64")
.column("l_commitdate_new", "FP64")
.column("l_receiptdate_new", "FP64")
.column("l_shipinstruct", "String")
.column("l_shipmode", "String")
.column("l_comment", "String")
.build();
dbms::SerializedPlanBuilder plan_builder;
auto plan
= plan_builder
.read(
"/home/kyligence/Documents/test-dataset/intel-gazelle-test-" + std::to_string(state.range(0)) + ".snappy.parquet",
std::move(schema))
.build();
auto parser_context = ParserContext::build(QueryContext::globalContext(), *plan);
local_engine::SerializedPlanParser parser(parser_context);
auto local_executor = parser.createExecutor(*plan);
local_engine::SparkRowToCHColumn converter;
while (local_executor->hasNext())
{
local_engine::SparkRowInfoPtr spark_row_info = local_executor->next();
state.ResumeTiming();
auto block = converter.convertSparkRowInfoToCHColumn(*spark_row_info, local_executor->getHeader());
state.PauseTiming();
}
state.ResumeTiming();
}
}
[[maybe_unused]] static void BM_SIMDFilter(benchmark::State & state)
{
const int n = 10000000;
for (auto _ : state)
{
state.PauseTiming();
PaddedPODArray<Int32> arr;
PaddedPODArray<UInt8> condition;
PaddedPODArray<Int32> res_data;
arr.reserve_exact(n);
condition.reserve_exact(n);
res_data.reserve_exact(n);
for (int i = 0; i < n; i++)
{
arr.push_back(i);
condition.push_back(state.range(0));
}
const Int32 * data_pos = arr.data();
const UInt8 * filt_pos = condition.data();
state.ResumeTiming();
#ifdef __SSE2__
int size = n;
static constexpr size_t SIMD_BYTES = 16;
const __m128i zero16 = _mm_setzero_si128();
const UInt8 * filt_end_sse = filt_pos + size / SIMD_BYTES * SIMD_BYTES;
while (filt_pos < filt_end_sse)
{
UInt16 mask = _mm_movemask_epi8(_mm_cmpeq_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(filt_pos)), zero16));
mask = ~mask;
if (0 == mask)
{
/// Nothing is inserted.
}
else if (0xFFFF == mask)
{
res_data.insert(data_pos, data_pos + SIMD_BYTES);
}
else
{
for (size_t i = 0; i < SIMD_BYTES; ++i)
if (filt_pos[i]) [[maybe_unused]]
auto x = data_pos[i];
}
filt_pos += SIMD_BYTES;
data_pos += SIMD_BYTES;
}
#endif
}
}
[[maybe_unused]] static void BM_NormalFilter(benchmark::State & state)
{
const int n = 10000000;
for (auto _ : state)
{
state.PauseTiming();
PaddedPODArray<Int32> arr;
PaddedPODArray<UInt8> condition;
PaddedPODArray<Int32> res_data;
arr.reserve_exact(n);
condition.reserve_exact(n);
res_data.reserve_exact(n);
for (int i = 0; i < n; i++)
{
arr.push_back(i);
condition.push_back(state.range(0));
}
const Int32 * data_pos = arr.data();
const UInt8 * filt_pos = condition.data();
const UInt8 * filt_end = filt_pos + n;
state.ResumeTiming();
while (filt_pos < filt_end)
{
if (*filt_pos)
res_data.push_back(*data_pos);
++filt_pos;
++data_pos;
}
}
}
[[maybe_unused]] static int add(int a, int b)
{
return a + b;
}
[[maybe_unused]] static void BM_TestSum(benchmark::State & state)
{
int cnt = state.range(0);
int i = 0;
std::vector<int> x;
std::vector<int> y;
x.reserve(cnt);
x.assign(cnt, 2);
y.reserve(cnt);
for (auto _ : state)
for (i = 0; i < cnt; i++)
y[i] = add(x[i], i);
}
[[maybe_unused]] static void BM_TestSumInline(benchmark::State & state)
{
int cnt = state.range(0);
int i = 0;
std::vector<int> x;
std::vector<int> y;
x.reserve(cnt);
x.assign(cnt, 2);
y.reserve(cnt);
for (auto _ : state)
for (i = 0; i < cnt; i++)
y[i] = x[i] + i;
}
[[maybe_unused]] static void BM_TestPlus(benchmark::State & state)
{
UInt64 rows = state.range(0);
auto & factory = FunctionFactory::instance();
auto & type_factory = DataTypeFactory::instance();
auto plus = factory.get("plus", global_context);
auto type = type_factory.get("UInt64");
ColumnsWithTypeAndName arguments;
arguments.push_back(ColumnWithTypeAndName(type, "x"));
arguments.push_back(ColumnWithTypeAndName(type, "y"));
auto function = plus->build(arguments);
ColumnsWithTypeAndName arguments_with_data;
Block block;
auto x = ColumnWithTypeAndName(type, "x");
auto y = ColumnWithTypeAndName(type, "y");
MutableColumnPtr mutable_x = x.type->createColumn();
MutableColumnPtr mutable_y = y.type->createColumn();
mutable_x->reserve(rows);
mutable_y->reserve(rows);
ColumnVector<UInt64> & column_x = assert_cast<ColumnVector<UInt64> &>(*mutable_x);
ColumnVector<UInt64> & column_y = assert_cast<ColumnVector<UInt64> &>(*mutable_y);
for (UInt64 i = 0; i < rows; i++)
{
column_x.insertValue(i);
column_y.insertValue(i + 1);
}
x.column = std::move(mutable_x);
y.column = std::move(mutable_y);
block.insert(x);
block.insert(y);
auto executable_function = function->prepare(arguments);
for (auto _ : state)
auto result = executable_function->execute(block.getColumnsWithTypeAndName(), type, rows, false);
}
[[maybe_unused]] static void BM_TestPlusEmbedded(benchmark::State & state)
{
UInt64 rows = state.range(0);
auto & factory = FunctionFactory::instance();
auto & type_factory = DataTypeFactory::instance();
auto plus = factory.get("plus", global_context);
auto type = type_factory.get("UInt64");
ColumnsWithTypeAndName arguments;
arguments.push_back(ColumnWithTypeAndName(type, "x"));
arguments.push_back(ColumnWithTypeAndName(type, "y"));
auto function = plus->build(arguments);
ColumnsWithTypeAndName arguments_with_data;
Block block;
auto x = ColumnWithTypeAndName(type, "x");
auto y = ColumnWithTypeAndName(type, "y");
MutableColumnPtr mutable_x = x.type->createColumn();
MutableColumnPtr mutable_y = y.type->createColumn();
mutable_x->reserve(rows);
mutable_y->reserve(rows);
ColumnVector<UInt64> & column_x = assert_cast<ColumnVector<UInt64> &>(*mutable_x);
ColumnVector<UInt64> & column_y = assert_cast<ColumnVector<UInt64> &>(*mutable_y);
for (UInt64 i = 0; i < rows; i++)
{
column_x.insertValue(i);
column_y.insertValue(i + 1);
}
x.column = std::move(mutable_x);
y.column = std::move(mutable_y);
block.insert(x);
block.insert(y);
CHJIT chjit;
auto compiled_function = compileFunction(chjit, *function);
std::vector<ColumnData> columns(arguments.size() + 1);
for (size_t i = 0; i < arguments.size(); ++i)
{
auto column = block.getByPosition(i).column->convertToFullIfNeeded();
columns[i] = getColumnData(column.get());
}
for (auto _ : state)
{
auto result_column = type->createColumn();
result_column->reserve(rows);
columns[arguments.size()] = getColumnData(result_column.get());
compiled_function.compiled_function(rows, columns.data());
}
}
[[maybe_unused]] static void BM_TestReadColumn(benchmark::State & state)
{
for (auto _ : state)
{
ReadBufferFromFile data_buf("/home/saber/Documents/test/c151.bin", 100000);
CompressedReadBuffer compressed(data_buf);
ReadBufferFromFile buf("/home/saber/Documents/test/c151.mrk2");
while (!buf.eof() && !data_buf.eof())
{
size_t x;
size_t y;
size_t z;
readIntBinary(x, buf);
readIntBinary(y, buf);
readIntBinary(z, buf);
std::cout << std::to_string(x) + " " << std::to_string(y) + " " << std::to_string(z) + " " << "\n";
data_buf.seek(x, SEEK_SET);
assert(!data_buf.eof());
std::string data;
data.reserve(y);
compressed.readBig(reinterpret_cast<char *>(data.data()), y);
std::cout << data << "\n";
}
}
}
[[maybe_unused]] static double quantile(const std::vector<double> & x)
{
double q = 0.8;
assert(q >= 0.0 && q <= 1.0);
const int n = x.size();
double id = (n - 1) * q;
int lo = static_cast<int>(floor(id));
int hi = static_cast<int>(ceil(id));
double qs = x[lo];
double h = (id - lo);
return (1.0 - h) * qs + h * x[hi];
}
// compress benchmark
#include <cstring>
#include <optional>
#include <base/types.h>
#include <Compression/CompressionInfo.h>
#include <Compression/LZ4_decompress_faster.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/MMapReadBufferFromFileDescriptor.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <base/unaligned.h>
#include <Common/PODArray.h>
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/memcpySmall.h>
namespace DB
{
class FasterCompressedReadBufferBase
{
protected:
ReadBuffer * compressed_in;
/// If 'compressed_in' buffer has whole compressed block - then use it. Otherwise copy parts of data to 'own_compressed_buffer'.
PODArray<char> own_compressed_buffer;
/// Points to memory, holding compressed block.
char * compressed_buffer = nullptr;
ssize_t variant;
/// Variant for reference implementation of LZ4.
static constexpr ssize_t LZ4_REFERENCE = -3;
LZ4::StreamStatistics stream_stat;
LZ4::PerformanceStatistics perf_stat;
size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum)
{
if (compressed_in->eof())
return 0;
CityHash_v1_0_2::uint128 checksum;
compressed_in->readStrict(reinterpret_cast<char *>(&checksum), sizeof(checksum));
own_compressed_buffer.resize(COMPRESSED_BLOCK_HEADER_SIZE);
compressed_in->readStrict(own_compressed_buffer.data(), COMPRESSED_BLOCK_HEADER_SIZE);
UInt8 method = own_compressed_buffer[0]; /// See CompressedWriteBuffer.h
size_t & size_compressed = size_compressed_without_checksum;
if (method == static_cast<UInt8>(CompressionMethodByte::LZ4) || method == static_cast<UInt8>(CompressionMethodByte::ZSTD)
|| method == static_cast<UInt8>(CompressionMethodByte::NONE))
{
size_compressed = unalignedLoad<UInt32>(&own_compressed_buffer[1]);
size_decompressed = unalignedLoad<UInt32>(&own_compressed_buffer[5]);
}
else
throw std::runtime_error("Unknown compression method: " + toString(method));
if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
throw std::runtime_error("Too large size_compressed. Most likely corrupted data.");
/// Is whole compressed block located in 'compressed_in' buffer?
if (compressed_in->offset() >= COMPRESSED_BLOCK_HEADER_SIZE
&& compressed_in->position() + size_compressed - COMPRESSED_BLOCK_HEADER_SIZE <= compressed_in->buffer().end())
{
compressed_in->position() -= COMPRESSED_BLOCK_HEADER_SIZE;
compressed_buffer = compressed_in->position();
compressed_in->position() += size_compressed;
}
else
{
own_compressed_buffer.resize(size_compressed + (variant == LZ4_REFERENCE ? 0 : LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER));
compressed_buffer = own_compressed_buffer.data();
compressed_in->readStrict(compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE, size_compressed - COMPRESSED_BLOCK_HEADER_SIZE);
}
return size_compressed + sizeof(checksum);
}
void decompress(char * to, size_t size_decompressed, size_t size_compressed_without_checksum)
{
UInt8 method = compressed_buffer[0]; /// See CompressedWriteBuffer.h
if (method == static_cast<UInt8>(CompressionMethodByte::LZ4))
{
//LZ4::statistics(compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE, to, size_decompressed, stat);
LZ4::decompress(
compressed_buffer + COMPRESSED_BLOCK_HEADER_SIZE, to, size_compressed_without_checksum, size_decompressed, perf_stat);
}
else
throw std::runtime_error("Unknown compression method: " + toString(method));
}
public:
/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
FasterCompressedReadBufferBase(ReadBuffer * in, ssize_t variant_)
: compressed_in(in), own_compressed_buffer(COMPRESSED_BLOCK_HEADER_SIZE), variant(variant_), perf_stat(variant)
{
}
LZ4::StreamStatistics getStreamStatistics() const { return stream_stat; }
LZ4::PerformanceStatistics getPerformanceStatistics() const { return perf_stat; }
};
class FasterCompressedReadBuffer : public FasterCompressedReadBufferBase, public BufferWithOwnMemory<ReadBuffer>
{
private:
size_t size_compressed = 0;
bool nextImpl() override
{
size_t size_decompressed;
size_t size_compressed_without_checksum;
size_compressed = readCompressedData(size_decompressed, size_compressed_without_checksum);
if (!size_compressed)
return false;
memory.resize(size_decompressed + LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER);
working_buffer = Buffer(memory.data(), &memory[size_decompressed]);
decompress(working_buffer.begin(), size_decompressed, size_compressed_without_checksum);
return true;
}
public:
FasterCompressedReadBuffer(ReadBuffer & in_, ssize_t method)
: FasterCompressedReadBufferBase(&in_, method), BufferWithOwnMemory<ReadBuffer>(0)
{
}
};
}
[[maybe_unused]] static void BM_TestDecompress(benchmark::State & state)
{
std::vector<String> files
= {"/home/saber/Documents/data/mergetree/all_1_1_0/l_discount.bin",
"/home/saber/Documents/data/mergetree/all_1_1_0/l_extendedprice.bin",
"/home/saber/Documents/data/mergetree/all_1_1_0/l_quantity.bin",
"/home/saber/Documents/data/mergetree/all_1_1_0/l_shipdate.bin",
"/home/saber/Documents/data/mergetree/all_2_2_0/l_discount.bin",
"/home/saber/Documents/data/mergetree/all_2_2_0/l_extendedprice.bin",
"/home/saber/Documents/data/mergetree/all_2_2_0/l_quantity.bin",
"/home/saber/Documents/data/mergetree/all_2_2_0/l_shipdate.bin",
"/home/saber/Documents/data/mergetree/all_3_3_0/l_discount.bin",
"/home/saber/Documents/data/mergetree/all_3_3_0/l_extendedprice.bin",
"/home/saber/Documents/data/mergetree/all_3_3_0/l_quantity.bin",
"/home/saber/Documents/data/mergetree/all_3_3_0/l_shipdate.bin"};
for (auto _ : state)
{
for (const auto & file : files)
{
ReadBufferFromFile in(file);
FasterCompressedReadBuffer decompressing_in(in, state.range(0));
while (!decompressing_in.eof())
{
decompressing_in.position() = decompressing_in.buffer().end();
decompressing_in.next();
}
// std::cout << "call count:" << std::to_string(decompressing_in.getPerformanceStatistics().data[state.range(0)].count) << "\n";
// std::cout << "false count:" << std::to_string(decompressing_in.false_count) << "\n";
// decompressing_in.getStreamStatistics().print();
}
}
}
#include <Parser/CHColumnToSparkRow.h>
struct MergeTreeWithSnapshot
{
std::shared_ptr<local_engine::SparkStorageMergeTree> merge_tree;
std::shared_ptr<StorageSnapshot> snapshot;
NamesAndTypesList columns;
};
QueryPlanPtr readFromMergeTree(MergeTreeWithSnapshot storage)
{
auto query_info = local_engine::buildQueryInfo(storage.columns);
auto data_parts = storage.merge_tree->getDataPartsVectorForInternalUsage();
auto query_plan = std::make_unique<QueryPlan>();
auto step = storage.merge_tree->reader.readFromParts(
RangesInDataParts{data_parts}, {}, storage.columns.getNames(), storage.snapshot, *query_info, global_context, 10000, 1);
query_plan->addStep(std::move(step));
return query_plan;
}
QueryPlanPtr joinPlan(QueryPlanPtr left, QueryPlanPtr right, String left_key, String right_key)
{
auto join = std::make_shared<TableJoin>(
global_context->getSettingsRef(), global_context->getGlobalTemporaryVolume(), global_context->getTempDataOnDisk());
auto left_columns = left->getCurrentHeader()->getColumnsWithTypeAndName();
auto right_columns = right->getCurrentHeader()->getColumnsWithTypeAndName();
join->setKind(JoinKind::Left);
join->setStrictness(JoinStrictness::All);
join->setColumnsFromJoinedTable(right->getCurrentHeader()->getNamesAndTypesList());
join->addDisjunct();
ASTPtr lkey = std::make_shared<ASTIdentifier>(left_key);
ASTPtr rkey = std::make_shared<ASTIdentifier>(right_key);
join->addOnKeys(lkey, rkey, true);
for (const auto & column : join->columnsFromJoinedTable())
join->addJoinedColumn(column);
auto left_keys = left->getCurrentHeader()->getNamesAndTypesList();
join->addJoinedColumnsAndCorrectTypes(left_keys, true);
std::optional<ActionsDAG> left_convert_actions;
std::optional<ActionsDAG> right_convert_actions;
std::tie(left_convert_actions, right_convert_actions) = join->createConvertingActions(left_columns, right_columns);
if (right_convert_actions)
{
auto converting_step = std::make_unique<ExpressionStep>(right->getCurrentHeader(), std::move(*right_convert_actions));
converting_step->setStepDescription("Convert joined columns");
right->addStep(std::move(converting_step));
}
if (left_convert_actions)
{
auto converting_step = std::make_unique<ExpressionStep>(right->getCurrentHeader(), std::move(*right_convert_actions));
converting_step->setStepDescription("Convert joined columns");
left->addStep(std::move(converting_step));
}
auto hash_join = std::make_shared<HashJoin>(join, right->getCurrentHeader());
QueryPlanStepPtr join_step = std::make_unique<JoinStep>(
left->getCurrentHeader(),
right->getCurrentHeader(),
hash_join,
DEFAULT_BLOCK_SIZE,
DEFAULT_BLOCK_SIZE,
524288,
1,
NameSet{},
false,
false);
std::vector<QueryPlanPtr> plans;
plans.emplace_back(std::move(left));
plans.emplace_back(std::move(right));
auto query_plan = std::make_unique<QueryPlan>();
query_plan->unitePlans(std::move(join_step), std::move(plans));
return query_plan;
}
BENCHMARK(BM_ParquetRead)->Unit(benchmark::kMillisecond)->Iterations(10);
// BENCHMARK(BM_TestDecompress)->Arg(0)->Arg(1)->Arg(2)->Arg(3)->Unit(benchmark::kMillisecond)->Iterations(50)->Repetitions(6)->ComputeStatistics("80%", quantile);
// BENCHMARK(BM_JoinTest)->Unit(benchmark::k
// Millisecond)->Iterations(10)->Repetitions(250)->ComputeStatistics("80%", quantile);
//BENCHMARK(BM_CHColumnToSparkRow)->Unit(benchmark::kMillisecond)->Iterations(40);
//BENCHMARK(BM_MergeTreeRead)->Arg(1)->Unit(benchmark::kMillisecond)->Iterations(10);
//BENCHMARK(BM_ShuffleSplitter)->Args({2, 0})->Args({2, 1})->Args({2, 2})->Unit(benchmark::kMillisecond)->Iterations(1);
//BENCHMARK(BM_HashShuffleSplitter)->Args({2, 0})->Args({2, 1})->Args({2, 2})->Unit(benchmark::kMillisecond)->Iterations(1);
//BENCHMARK(BM_ShuffleReader)->Unit(benchmark::kMillisecond)->Iterations(10);
//BENCHMARK(BM_SimpleAggregate)->Arg(150)->Unit(benchmark::kMillisecond)->Iterations(40);
//BENCHMARK(BM_SIMDFilter)->Arg(1)->Arg(0)->Unit(benchmark::kMillisecond)->Iterations(40);
//BENCHMARK(BM_NormalFilter)->Arg(1)->Arg(0)->Unit(benchmark::kMillisecond)->Iterations(40);
//BENCHMARK(BM_TPCH_Q6)->Arg(150)->Unit(benchmark::kMillisecond)->Iterations(10);
//BENCHMARK(BM_MERGE_TREE_TPCH_Q6)->Unit(benchmark::kMillisecond)->Iterations(10);
//BENCHMARK(BM_MERGE_TREE_TPCH_Q6_NEW)->Unit(benchmark::kMillisecond)->Iterations(10);
//BENCHMARK(BM_MERGE_TREE_TPCH_Q6_FROM_TEXT)->Unit(benchmark::kMillisecond)->Iterations(5);
//BENCHMARK(BM_CHColumnToSparkRowWithString)->Arg(1)->Arg(3)->Arg(30)->Arg(90)->Arg(150)->Unit(benchmark::kMillisecond)->Iterations(10);
//BENCHMARK(BM_SparkRowToCHColumn)->Arg(1)->Arg(3)->Arg(30)->Arg(90)->Arg(150)->Unit(benchmark::kMillisecond)->Iterations(10);
//BENCHMARK(BM_SparkRowToCHColumnWithString)->Arg(1)->Arg(3)->Arg(30)->Arg(90)->Arg(150)->Unit(benchmark::kMillisecond)->Iterations(10);
//BENCHMARK(BM_TestCreateExecute)->Unit(benchmark::kMillisecond)->Iterations(1000);
//BENCHMARK(BM_TestReadColumn)->Unit(benchmark::kMillisecond)->Iterations(1);
//BENCHMARK(BM_TestSum)->Arg(1000000)->Unit(benchmark::kMicrosecond)->Iterations(100)->Repetitions(100)->ComputeStatistics("80%", quantile)->DisplayAggregatesOnly();
//BENCHMARK(BM_TestSumInline)->Arg(1000000)->Unit(benchmark::kMicrosecond)->Iterations(100)->Repetitions(100)->ComputeStatistics("80%", quantile)->DisplayAggregatesOnly();
//
//BENCHMARK(BM_TestPlus)->Arg(65505)->Unit(benchmark::kMicrosecond)->Iterations(100)->Repetitions(1000)->ComputeStatistics("80%", quantile)->DisplayAggregatesOnly();
//BENCHMARK(BM_TestPlusEmbedded)->Arg(65505)->Unit(benchmark::kMicrosecond)->Iterations(100)->Repetitions(1000)->ComputeStatistics("80%", quantile)->DisplayAggregatesOnly();
int main(int argc, char ** argv)
{
SparkConfigs::ConfigMap empty;
BackendInitializerUtil::initBackend(empty);
SCOPE_EXIT({ BackendFinalizerUtil::finalizeGlobally(); });
::benchmark::Initialize(&argc, argv);
if (::benchmark::ReportUnrecognizedArguments(argc, argv))
return 1;
::benchmark::RunSpecifiedBenchmarks();
::benchmark::Shutdown();
return 0;
}