blob: ad4b2975c0302895d54cd4e02735a468876ab6ab [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "olap/memory/mem_tablet.h"
#include <gtest/gtest.h>
#include "olap/memory/mem_tablet_scan.h"
#include "olap/memory/write_txn.h"
#include "olap/tablet_meta.h"
#include "test_util/test_util.h"
namespace doris {
namespace memory {
struct TData {
int32_t id;
int32_t uv;
int32_t pv;
int8_t city;
};
TEST(MemTablet, writescan) {
const int num_insert = LOOP_LESS_OR_MORE(2000, 2000000);
const int insert_per_write = LOOP_LESS_OR_MORE(500, 500000);
const int num_update = LOOP_LESS_OR_MORE(10, 10000);
const int update_time = 3;
scoped_refptr<Schema> sc;
ASSERT_TRUE(Schema::create("id int,uv int,pv int,city tinyint null", &sc).ok());
std::unordered_map<uint32_t, uint32_t> col_idx_to_unique_id;
std::vector<TColumn> columns(sc->num_columns());
for (size_t i = 0; i < sc->num_columns(); i++) {
const ColumnSchema* cs = sc->get(i);
col_idx_to_unique_id[i] = cs->cid();
TColumn& c = columns[i];
c.__set_column_name(cs->name());
TColumnType tct;
if (cs->type() == ColumnType::OLAP_FIELD_TYPE_INT) {
tct.__set_type(TPrimitiveType::INT);
} else if (cs->type() == ColumnType::OLAP_FIELD_TYPE_TINYINT) {
tct.__set_type(TPrimitiveType::TINYINT);
} else {
ASSERT_TRUE(false);
}
c.__set_column_type(tct);
c.__set_is_allow_null(cs->is_nullable());
c.__set_is_key(cs->is_key());
c.__set_aggregation_type(TAggregationType::REPLACE);
}
TTabletSchema tschema;
tschema.__set_short_key_column_count(1);
tschema.__set_keys_type(TKeysType::UNIQUE_KEYS);
tschema.__set_columns(columns);
tschema.__set_is_in_memory(false);
tschema.__set_schema_hash(1);
TabletMetaSharedPtr tablet_meta(
new TabletMeta(1, 1, 1, 1, 1, tschema, static_cast<uint32_t>(sc->cid_size()),
col_idx_to_unique_id, TabletUid(1, 1), TTabletType::TABLET_TYPE_MEMORY));
std::shared_ptr<MemTablet> tablet = MemTablet::create_tablet_from_meta(tablet_meta, nullptr);
ASSERT_TRUE(tablet->init().ok());
uint64_t cur_version = 0;
std::vector<TData> alldata(num_insert);
// insert
srand(1);
size_t nrow = 0;
for (int insert_start = 0; insert_start < num_insert; insert_start += insert_per_write) {
std::unique_ptr<WriteTxn> wtx;
EXPECT_TRUE(tablet->create_write_txn(&wtx).ok());
PartialRowWriter writer(wtx->get_schema_ptr());
int insert_end = std::min(insert_start + insert_per_write, num_insert);
EXPECT_TRUE(writer.start_batch(insert_per_write + 1, insert_per_write * 32).ok());
for (int i = insert_start; i < insert_end; i++) {
nrow++;
EXPECT_TRUE(writer.start_row().ok());
int id = i;
int uv = rand() % 10000;
int pv = rand() % 10000;
int8_t city = rand() % 100;
alldata[i].id = id;
alldata[i].uv = uv;
alldata[i].pv = pv;
alldata[i].city = city;
EXPECT_TRUE(writer.set("id", &id).ok());
EXPECT_TRUE(writer.set("uv", &uv).ok());
EXPECT_TRUE(writer.set("pv", &pv).ok());
EXPECT_TRUE(writer.set("city", city % 2 == 0 ? nullptr : &city).ok());
EXPECT_TRUE(writer.end_row().ok());
}
std::vector<uint8_t> wtxn_buff;
EXPECT_TRUE(writer.finish_batch(&wtxn_buff).ok());
PartialRowBatch* batch = wtx->new_batch();
EXPECT_TRUE(batch->load(std::move(wtxn_buff)).ok());
EXPECT_TRUE(tablet->commit_write_txn(wtx.get(), ++cur_version).ok());
wtx.reset();
}
// update
for (int i = 0; i < update_time; i++) {
std::unique_ptr<WriteTxn> wtx;
EXPECT_TRUE(tablet->create_write_txn(&wtx).ok());
PartialRowWriter writer(wtx->get_schema_ptr());
EXPECT_TRUE(writer.start_batch(num_update + 1, num_update * 32).ok());
size_t nrow = 0;
for (int j = 0; j < num_update; j++) {
nrow++;
EXPECT_TRUE(writer.start_row().ok());
int id = rand() % num_insert;
int uv = rand() % 10000;
int pv = rand() % 10000;
int8_t city = rand() % 100;
alldata[id].uv = uv;
alldata[id].pv = pv;
alldata[id].city = city;
EXPECT_TRUE(writer.set("id", &id).ok());
EXPECT_TRUE(writer.set("pv", &pv).ok());
EXPECT_TRUE(writer.set("city", city % 2 == 0 ? nullptr : &city).ok());
EXPECT_TRUE(writer.end_row().ok());
}
std::vector<uint8_t> wtxn_buff;
EXPECT_TRUE(writer.finish_batch(&wtxn_buff).ok());
PartialRowBatch* batch = wtx->new_batch();
EXPECT_TRUE(batch->load(std::move(wtxn_buff)).ok());
EXPECT_TRUE(tablet->commit_write_txn(wtx.get(), ++cur_version).ok());
wtx.reset();
}
// scan perf test
{
double t0 = GetMonoTimeSecondsAsDouble();
std::unique_ptr<ScanSpec> scanspec(new ScanSpec({"pv"}, cur_version));
std::unique_ptr<MemTabletScan> scan;
ASSERT_TRUE(tablet->scan(&scanspec, &scan).ok());
const RowBlock* rblock = nullptr;
while (true) {
EXPECT_TRUE(scan->next_block(&rblock).ok());
if (!rblock) {
break;
}
}
double t = GetMonoTimeSecondsAsDouble() - t0;
LOG(INFO) << StringPrintf("scan %d record, time: %.3lfs %.0lf row/s", num_insert, t,
num_insert / t);
scan.reset();
}
// scan result validation
{
std::unique_ptr<ScanSpec> scanspec(new ScanSpec({"pv"}, cur_version));
std::unique_ptr<MemTabletScan> scan;
ASSERT_TRUE(tablet->scan(&scanspec, &scan).ok());
size_t curidx = 0;
while (true) {
const RowBlock* rblock = nullptr;
EXPECT_TRUE(scan->next_block(&rblock).ok());
if (!rblock) {
break;
}
size_t nrows = rblock->num_rows();
const ColumnBlock& cb = rblock->get_column(0);
for (size_t i = 0; i < nrows; i++) {
int32_t value = cb.data().as<int32_t>()[i];
EXPECT_EQ(value, alldata[curidx].pv);
curidx++;
}
}
EXPECT_EQ(curidx, (size_t)num_insert);
scan.reset();
}
}
} // namespace memory
} // namespace doris
int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}