blob: a32184594f1f2f6345f93acb4a7b3910b0214f43 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <folly/Logging.h>
#include <folly/Random.h>
#include <gflags/gflags.h>
#include <atomic>
#include <chrono>
#include <iostream>
#include <thread>
#include "hbase/client/client.h"
#include "hbase/client/get.h"
#include "hbase/client/put.h"
#include "hbase/client/table.h"
#include "hbase/serde/table-name.h"
#include "hbase/utils/time-util.h"
using hbase::Client;
using hbase::Configuration;
using hbase::Get;
using hbase::Put;
using hbase::Table;
using hbase::pb::TableName;
using hbase::TimeUtil;
using folly::Random;
DEFINE_string(table, "load_test_table", "What table to do the reads and writes with");
DEFINE_string(families, "f", "comma separated list of column family names");
DEFINE_string(conf, "", "Conf directory to read the config from (optional)");
DEFINE_string(zookeeper, "localhost:2181", "What zk quorum to talk to");
DEFINE_string(znode, "/hbase", "parent znode");
DEFINE_uint64(num_rows, 1'000'000, "How many rows to write and read");
DEFINE_uint64(num_cols, 1000, "How many columns there are in a row");
DEFINE_int32(threads, 10, "How many client threads");
DEFINE_int32(batch_num_rows, 100, "number of rows in one multi-get / multi-put");
DEFINE_uint64(report_num_rows, 5000, "How frequent we should report the progress");
DEFINE_bool(gets, true, "perform gets");
DEFINE_bool(scans, true, "perform scans");
DEFINE_bool(puts, true, "perform put's");
DEFINE_bool(appends, true, "perform append's");
static constexpr const char *kNumColumn = "num";
static constexpr const char *incrPrefix = "i";
static constexpr const char *appendPrefix = "a";
std::string PrefixZero(int total_width, int num) {
std::string str = std::to_string(num);
int prefix_len = total_width - str.length();
if (prefix_len > 0) {
return std::string(prefix_len, '0') + str;
}
return str;
}
bool Verify(std::shared_ptr<hbase::Result> result, std::string family, int m) {
auto col = std::to_string(m);
if (!result->Value(family, col)) {
LOG(ERROR) << "Column:" << col << " is not found for " << result->Row();
return false;
}
auto l = *(result->Value(family, col));
if (l != col) {
LOG(ERROR) << "value " << *(result->Value(family, "1")) << " is not " << col;
return false;
}
if (FLAGS_appends) {
if (!result->Value(family, incrPrefix + col)) {
LOG(ERROR) << "Column:" << (incrPrefix + col) << " is not found for " << result->Row();
return false;
}
auto int_val = hbase::BytesUtil::ToInt64(*(result->Value(family, incrPrefix + col)));
if (int_val != m) {
LOG(ERROR) << "value is not " << col << " for " << result->Row();
return false;
}
if (!result->Value(family, appendPrefix + col)) {
LOG(ERROR) << "Column:" << (appendPrefix + col) << " is not found for " << result->Row();
return false;
}
l = *(result->Value(family, appendPrefix + col));
if (l != col) {
LOG(ERROR) << "value " << *(result->Value(family, "1")) << " is not " << col;
return false;
}
}
return true;
}
bool Verify(std::shared_ptr<hbase::Result> result, const std::string &row,
const std::vector<std::string> &families) {
if (result == nullptr || result->IsEmpty()) {
LOG(ERROR) << "didn't get result";
return false;
}
if (result->Row().compare(row) != 0) {
LOG(ERROR) << "row " << result->Row() << " is not the expected: " << row;
return false;
}
// Test the values
for (auto family : families) {
if (!result->Value(family, kNumColumn)) {
LOG(ERROR) << "Column:" << kNumColumn << " is not found for " << result->Row();
return false;
}
auto cols = std::stoi(*(result->Value(family, kNumColumn)));
VLOG(3) << "Result for row:" << row << " contains " << std::to_string(cols) << " columns";
for (int m = 1; m <= cols; m++) {
if (!Verify(result, family, m)) return false;
}
}
return true;
}
bool DoScan(int iteration, uint64_t max_row, uint64_t rows, std::unique_ptr<Table> table,
const std::vector<std::string> &families) {
hbase::Scan scan{};
auto start = iteration * rows;
auto end = start + rows;
auto width = std::to_string(max_row).length();
scan.SetStartRow(PrefixZero(width, start));
if (end != max_row && end != max_row + 1) {
scan.SetStopRow(PrefixZero(width, end));
}
auto start_ns = TimeUtil::GetNowNanos();
auto scanner = table->Scan(scan);
auto cnt = 0;
auto r = scanner->Next();
while (r != nullptr) {
auto row = PrefixZero(width, start + cnt);
if (!Verify(r, row, families)) {
return false;
}
cnt++;
r = scanner->Next();
if (cnt != 0 && cnt % FLAGS_report_num_rows == 0) {
LOG(INFO) << "(Thread " << iteration << ") "
<< "Scan iterated over " << cnt << " results in "
<< TimeUtil::ElapsedMillis(start_ns) << " ms.";
}
}
if (cnt != rows) {
LOG(ERROR) << "(Thread " << iteration << ") "
<< "Expected number of results does not match. expected:" << rows
<< ", actual:" << cnt;
return false;
}
LOG(INFO) << "(Thread " << iteration << ") "
<< "scanned " << std::to_string(cnt) << " rows in " << TimeUtil::ElapsedMillis(start_ns)
<< " ms.";
return true;
}
bool DoGet(int iteration, uint64_t max_row, uint64_t rows, std::unique_ptr<Table> table,
const std::vector<std::string> &families, uint64_t batch_num_rows) {
auto width = std::to_string(max_row).length();
auto start_ns = TimeUtil::GetNowNanos();
for (uint64_t k = iteration; k <= max_row;) {
uint64_t total_read = 0;
std::vector<hbase::Get> gets;
for (uint64_t i = 0; i < batch_num_rows && k <= max_row; ++i, k += FLAGS_threads) {
std::string row = PrefixZero(width, k);
hbase::Get get(row);
gets.push_back(get);
}
VLOG(3) << "getting for " << batch_num_rows << " rows";
auto results = table->Get(gets);
if (results.size() != gets.size()) {
LOG(ERROR) << "(Thread " << iteration << ") "
<< "Expected number of results does not match. expected:" << gets.size()
<< ", actual:" << results.size();
return false;
}
for (uint64_t i = 0; i < batch_num_rows && i < results.size(); ++i) {
if (!Verify(results[i], gets[i].row(), families)) {
return false;
}
}
total_read += gets.size();
if (total_read != 0 && total_read % FLAGS_report_num_rows == 0) {
LOG(INFO) << "(Thread " << iteration << ") "
<< "Sent " << total_read << " Multi-Get requests in "
<< TimeUtil::ElapsedMillis(start_ns) << " ms.";
}
k += batch_num_rows;
}
LOG(INFO) << "(Thread " << iteration << ") "
<< "Sent " << rows << " gets"
<< " in " << TimeUtil::ElapsedMillis(start_ns) << " ms.";
return true;
}
void DoPut(int iteration, uint64_t max_row, uint64_t rows, int cols, std::unique_ptr<Table> table,
const std::vector<std::string> &families) {
auto start_ns = TimeUtil::GetNowNanos();
auto width = std::to_string(max_row).length();
for (uint64_t j = 0; j < rows; j++) {
std::string row = PrefixZero(width, iteration * rows + j);
auto put = Put{row};
for (auto family : families) {
auto n_cols = Random::rand32(1, cols);
put.AddColumn(family, kNumColumn, std::to_string(n_cols));
for (unsigned int k = 1; k <= n_cols; k++) {
put.AddColumn(family, std::to_string(k), std::to_string(k));
}
}
table->Put(put);
if ((j + 1) % FLAGS_report_num_rows == 0) {
LOG(INFO) << "(Thread " << iteration << ") "
<< "Written " << std::to_string(j + 1) << " rows in "
<< TimeUtil::ElapsedMillis(start_ns) << " ms.";
}
}
LOG(INFO) << "(Thread " << iteration << ") "
<< "written " << std::to_string(rows) << " rows"
<< " in " << TimeUtil::ElapsedMillis(start_ns) << " ms.";
}
bool DoAppendIncrement(int iteration, uint64_t max_row, uint64_t rows, int cols,
std::unique_ptr<Table> table, const std::vector<std::string> &families) {
auto start_ns = TimeUtil::GetNowNanos();
auto width = std::to_string(max_row).length();
for (uint64_t j = 0; j < rows; j++) {
std::string row = PrefixZero(width, iteration * rows + j);
hbase::Get get(row);
auto result = table->Get(get);
for (auto family : families) {
auto n_cols = std::stoi(*(result->Value(family, kNumColumn)));
for (unsigned int k = 1; k <= n_cols; k++) {
table->Increment(
hbase::Increment{row}.AddColumn(family, incrPrefix + std::to_string(k), k));
if (!table->Append(hbase::Append{row}.Add(family, appendPrefix + std::to_string(k),
std::to_string(k)))) {
LOG(ERROR) << "(Thread " << iteration << ") "
<< "append for " << row << " family: " << family << " failed";
return false;
}
}
}
if ((j + 1) % FLAGS_report_num_rows == 0)
LOG(INFO) << "(Thread " << iteration << ") "
<< "Written " << std::to_string(j + 1) << " increments"
<< " in " << TimeUtil::ElapsedMillis(start_ns) << " ms.";
}
LOG(INFO) << "(Thread " << iteration << ") "
<< "written " << std::to_string(rows) << " increments"
<< " in " << TimeUtil::ElapsedMillis(start_ns) << " ms.";
return true;
}
int main(int argc, char *argv[]) {
gflags::SetUsageMessage("Load client to manipulate multiple rows from HBase on the comamnd line");
gflags::ParseCommandLineFlags(&argc, &argv, true);
google::InitGoogleLogging(argv[0]);
google::InstallFailureSignalHandler();
FLAGS_logtostderr = 1;
FLAGS_stderrthreshold = 1;
if (FLAGS_batch_num_rows < 1) {
LOG(ERROR) << "size of multi get should be positive";
return -1;
}
if (!FLAGS_gets && !FLAGS_scans && !FLAGS_puts) {
LOG(ERROR) << "Must perform at least Get or Put operations";
return -1;
}
std::shared_ptr<Configuration> conf = nullptr;
if (FLAGS_conf == "") {
// Configuration
conf = std::make_shared<Configuration>();
conf->Set("hbase.zookeeper.quorum", FLAGS_zookeeper);
conf->Set("zookeeper.znode.parent", FLAGS_znode);
} else {
setenv("HBASE_CONF", FLAGS_conf.c_str(), 1);
hbase::HBaseConfigurationLoader loader;
conf = std::make_shared<Configuration>(loader.LoadDefaultResources().value());
}
auto tn = std::make_shared<TableName>(folly::to<TableName>(FLAGS_table));
auto num_puts = FLAGS_num_rows;
auto client = std::make_unique<Client>(*conf);
// Do the Put requests
std::vector<std::string> families;
std::size_t pos = 0, found;
while ((found = FLAGS_families.find_first_of(',', pos)) != std::string::npos) {
families.push_back(FLAGS_families.substr(pos, found - pos));
pos = found + 1;
}
families.push_back(FLAGS_families.substr(pos));
int rows = FLAGS_num_rows / FLAGS_threads;
if (FLAGS_num_rows % FLAGS_threads != 0) rows++;
int cols = FLAGS_num_cols;
std::atomic<int8_t> succeeded{1}; // not using bool since we want atomic &=
if (FLAGS_puts) {
LOG(INFO) << "Sending put requests";
auto start_ns = TimeUtil::GetNowNanos();
std::vector<std::thread> writer_threads;
for (int i = 0; i < FLAGS_threads; i++) {
writer_threads.push_back(std::thread([&, i] {
auto table = client->Table(*tn);
DoPut(i, FLAGS_num_rows - 1, rows, cols, std::move(table), families);
}));
}
for (auto &t : writer_threads) {
t.join();
}
LOG(INFO) << "Successfully sent " << num_puts << " Put requests in "
<< TimeUtil::ElapsedMillis(start_ns) << " ms.";
}
if (FLAGS_appends) {
LOG(INFO) << "Sending append/increment requests";
auto start_ns = TimeUtil::GetNowNanos();
std::vector<std::thread> writer_threads;
for (int i = 0; i < FLAGS_threads; i++) {
writer_threads.push_back(std::thread([&, i] {
auto table = client->Table(*tn);
succeeded &=
DoAppendIncrement(i, FLAGS_num_rows - 1, rows, cols, std::move(table), families);
}));
}
for (auto &t : writer_threads) {
t.join();
}
LOG(INFO) << "Successfully sent " << num_puts << " append requests in "
<< TimeUtil::ElapsedMillis(start_ns) << " ms.";
}
if (FLAGS_scans) {
LOG(INFO) << "Sending scan requests";
auto start_ns = TimeUtil::GetNowNanos();
std::vector<std::thread> reader_threads;
for (int i = 0; i < FLAGS_threads; i++) {
reader_threads.push_back(std::thread([&, i] {
auto table1 = client->Table(*tn);
succeeded &= DoScan(i, FLAGS_num_rows - 1, rows, std::move(table1), families);
}));
}
for (auto &t : reader_threads) {
t.join();
}
LOG(INFO) << (succeeded.load() ? "Successfully " : "Failed. ") << " scannned " << num_puts
<< " rows in " << TimeUtil::ElapsedMillis(start_ns) << " ms.";
}
if (FLAGS_gets) {
LOG(INFO) << "Sending get requests";
auto start_ns = TimeUtil::GetNowNanos();
std::vector<std::thread> reader_threads;
for (int i = 0; i < FLAGS_threads; i++) {
reader_threads.push_back(std::thread([&, i] {
auto table1 = client->Table(*tn);
succeeded &=
DoGet(i, FLAGS_num_rows - 1, rows, std::move(table1), families, FLAGS_batch_num_rows);
}));
}
for (auto &t : reader_threads) {
t.join();
}
LOG(INFO) << (succeeded.load() ? "Successful. " : "Failed. ") << " sent multi-get requests for "
<< num_puts << " rows in " << TimeUtil::ElapsedMillis(start_ns) << " ms.";
}
client->Close();
return succeeded.load() ? 0 : -1;
}