blob: fec1aaef1829e32d4003f5c1dbec4076995b416b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <ostream>
#include <string>
#include <vector>
#include <gflags/gflags.h>
#include <glog/logging.h>
#include <gtest/gtest.h>
#include "kudu/cfile/cfile_util.h"
#include "kudu/common/common.pb.h"
#include "kudu/common/row_changelist.h"
#include "kudu/common/schema.h"
#include "kudu/common/timestamp.h"
#include "kudu/common/types.h"
#include "kudu/fs/block_id.h"
#include "kudu/fs/block_manager.h"
#include "kudu/fs/fs_manager.h"
#include "kudu/gutil/port.h"
#include "kudu/tablet/delta_iterator_merger.h"
#include "kudu/tablet/delta_key.h"
#include "kudu/tablet/delta_stats.h"
#include "kudu/tablet/delta_store.h"
#include "kudu/tablet/deltafile.h"
#include "kudu/tablet/rowset.h"
#include "kudu/util/faststring.h"
#include "kudu/util/slice.h"
#include "kudu/util/status.h"
#include "kudu/util/test_macros.h"
#include "kudu/util/test_util.h"
DEFINE_int32(num_rows, 2100, "the first row to update");
DEFINE_int32(num_delta_files, 3, "number of delta files");
using std::is_sorted;
using std::shared_ptr;
using std::string;
using std::unique_ptr;
using std::vector;
namespace kudu {
namespace tablet {
using cfile::ReaderOptions;
using fs::ReadableBlock;
using fs::WritableBlock;
class TestDeltaCompaction : public KuduTest {
public:
TestDeltaCompaction()
: deltafile_idx_(0),
schema_(CreateSchema()) {
}
static Schema CreateSchema() {
SchemaBuilder builder;
CHECK_OK(builder.AddColumn("val", UINT32));
return builder.Build();
}
Status GetDeltaFileWriter(unique_ptr<DeltaFileWriter>* dfw,
BlockId* block_id) const {
unique_ptr<WritableBlock> block;
RETURN_NOT_OK(fs_manager_->CreateNewBlock({}, &block));
*block_id = block->id();
dfw->reset(new DeltaFileWriter(std::move(block)));
RETURN_NOT_OK((*dfw)->Start());
return Status::OK();
}
Status GetDeltaFileReader(const BlockId& block_id,
shared_ptr<DeltaFileReader>* dfr) const {
unique_ptr<ReadableBlock> block;
RETURN_NOT_OK(fs_manager_->OpenBlock(block_id, &block));
shared_ptr<DeltaFileReader> delta_reader;
return DeltaFileReader::Open(std::move(block), REDO, ReaderOptions(), dfr);
}
virtual void SetUp() OVERRIDE {
KuduTest::SetUp();
SeedRandom();
fs_manager_.reset(new FsManager(env_, FsManagerOpts(GetTestPath("fs_root"))));
ASSERT_OK(fs_manager_->CreateInitialFileSystemLayout());
ASSERT_OK(fs_manager_->Open());
}
protected:
int64_t deltafile_idx_;
Schema schema_;
unique_ptr<FsManager> fs_manager_;
};
TEST_F(TestDeltaCompaction, TestMergeMultipleSchemas) {
vector<Schema> schemas;
SchemaBuilder builder(schema_);
schemas.push_back(builder.Build());
// Add an int column with default
uint32_t default_c2 = 10;
ASSERT_OK(builder.AddColumn("c2", UINT32, false, &default_c2, &default_c2));
schemas.push_back(builder.Build());
// add a string column with default
Slice default_c3("Hello World");
ASSERT_OK(builder.AddColumn("c3", STRING, false, &default_c3, &default_c3));
schemas.push_back(builder.Build());
vector<shared_ptr<DeltaStore> > inputs;
faststring buf;
int row_id = 0;
int curr_timestamp = 0;
int deltafile_idx = 0;
for (const Schema& schema : schemas) {
// Write the Deltas
BlockId block_id;
unique_ptr<DeltaFileWriter> dfw;
ASSERT_OK(GetDeltaFileWriter(&dfw, &block_id));
// Generate N updates with the new schema, some of them are on existing
// rows others are on new rows (see kNumUpdates and kNumMultipleUpdates).
// Each column will be updated with value composed by delta file id
// and update number (see update_value assignment).
size_t kNumUpdates = 10;
size_t kNumMultipleUpdates = kNumUpdates / 2;
unique_ptr<DeltaStats> stats(new DeltaStats);
for (size_t i = 0; i < kNumUpdates; ++i) {
buf.clear();
RowChangeListEncoder update(&buf);
for (size_t col_idx = schema.num_key_columns(); col_idx < schema.num_columns(); ++col_idx) {
ColumnId col_id = schema.column_id(col_idx);
DCHECK_GE(col_id, 0);
stats->IncrUpdateCount(col_id, 1);
const ColumnSchema& col_schema = schema.column(col_idx);
int update_value = deltafile_idx * 100 + i;
switch (col_schema.type_info()->physical_type()) {
case UINT32:
{
uint32_t u32_val = update_value;
update.AddColumnUpdate(col_schema, col_id, &u32_val);
}
break;
case BINARY:
{
string s = std::to_string(update_value);
Slice str_val(s);
update.AddColumnUpdate(col_schema, col_id, &str_val);
}
break;
default:
FAIL() << "Type " << DataType_Name(col_schema.type_info()->type()) << " Not Supported";
break;
}
}
// To simulate multiple updates on the same row, the first N updates
// of this new schema will always be on rows [0, 1, 2, ...] while the
// others will be on new rows. (N is tunable by changing kNumMultipleUpdates)
DeltaKey key((i < kNumMultipleUpdates) ? i : row_id, Timestamp(curr_timestamp));
RowChangeList row_changes = update.as_changelist();
ASSERT_OK(dfw->AppendDelta<REDO>(key, row_changes));
ASSERT_OK(stats->UpdateStats(key.timestamp(), row_changes));
curr_timestamp++;
row_id++;
}
dfw->WriteDeltaStats(std::move(stats));
ASSERT_OK(dfw->Finish());
shared_ptr<DeltaFileReader> dfr;
ASSERT_OK(GetDeltaFileReader(block_id, &dfr));
inputs.push_back(dfr);
deltafile_idx++;
}
// Merge
const Schema& merge_schema = schemas.back();
RowIteratorOptions opts;
opts.projection = &merge_schema;
unique_ptr<DeltaIterator> merge_iter;
ASSERT_OK(DeltaIteratorMerger::Create(inputs, opts, &merge_iter));
unique_ptr<DeltaFileWriter> dfw;
BlockId block_id;
ASSERT_OK(GetDeltaFileWriter(&dfw, &block_id));
ASSERT_OK(WriteDeltaIteratorToFile<REDO>(merge_iter.get(),
ITERATE_OVER_ALL_ROWS,
dfw.get()));
ASSERT_OK(dfw->Finish());
shared_ptr<DeltaFileReader> dfr;
ASSERT_OK(GetDeltaFileReader(block_id, &dfr));
unique_ptr<DeltaIterator> iter;
ASSERT_OK(dfr->NewDeltaIterator(opts, &iter));
vector<string> results;
ASSERT_OK(DebugDumpDeltaIterator(REDO, iter.get(), merge_schema,
ITERATE_OVER_ALL_ROWS, &results));
for (const string &str : results) {
VLOG(1) << str;
}
ASSERT_TRUE(is_sorted(results.begin(), results.end()));
}
} // namespace tablet
} // namespace kudu