blob: ca8a5f1824b93f916a9101a833ced7834f86c6e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <cstdlib>
#include <string>
#include <vector>
#include <map>
#include <dsn/dist/replication/replication_ddl_client.h>
#include <dsn/service_api_c.h>
#include <unistd.h>
#include <pegasus/client.h>
#include <gtest/gtest.h>
#include "base/pegasus_const.h"
using namespace ::pegasus;
extern pegasus_client *client;
extern std::shared_ptr<dsn::replication::replication_ddl_client> ddl_client;
static const char CCH[] = "_0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
static char buffer[256];
static std::map<std::string, std::map<std::string, std::string>> base;
static std::string expected_hash_key;
// REQUIRED: 'buffer' has been filled with random chars.
static const std::string random_string()
{
int pos = random() % sizeof(buffer);
buffer[pos] = CCH[random() % sizeof(CCH)];
unsigned int length = random() % sizeof(buffer) + 1;
if (pos + length < sizeof(buffer)) {
return std::string(buffer + pos, length);
} else {
return std::string(buffer + pos, sizeof(buffer) - pos) +
std::string(buffer, length + pos - sizeof(buffer));
}
}
static void check_and_put(std::map<std::string, std::map<std::string, std::string>> &data,
const std::string &hash_key,
const std::string &sort_key,
const std::string &value)
{
auto it1 = data.find(hash_key);
if (it1 != data.end()) {
auto it2 = it1->second.find(sort_key);
ASSERT_EQ(it1->second.end(), it2)
<< "Duplicate: hash_key=" << hash_key << ", sort_key=" << sort_key
<< ", old_value=" << it2->second << ", new_value=" << value;
}
data[hash_key][sort_key] = value;
}
static void check_and_put(std::map<std::string, std::string> &data,
const std::string &hash_key,
const std::string &sort_key,
const std::string &value)
{
auto it1 = data.find(sort_key);
ASSERT_EQ(data.end(), it1) << "Duplicate: hash_key=" << hash_key << ", sort_key=" << sort_key
<< ", old_value=" << it1->second << ", new_value=" << value;
data[sort_key] = value;
}
static void compare(const std::map<std::string, std::string> &data,
const std::map<std::string, std::string> &base,
const std::string &hash_key)
{
for (auto it1 = data.begin(), it2 = base.begin();; ++it1, ++it2) {
if (it1 == data.end()) {
ASSERT_EQ(base.end(), it2) << "Only in base: hash_key=" << hash_key
<< ", sort_key=" << it2->first << ", value=" << it2->second;
break;
}
ASSERT_NE(base.end(), it2) << "Only in data: hash_key=" << hash_key
<< ", sort_key=" << it1->first << ", value=" << it1->second;
ASSERT_EQ(*it2, *it1) << "Diff: hash_key=" << hash_key << ", data_sort_key=" << it1->first
<< ", data_value=" << it1->second << ", base_sort_key=" << it2->first
<< ", base_value=" << it2->second;
}
dinfo("Data and base are the same.");
}
static void compare(std::map<std::string, std::map<std::string, std::string>> &data,
std::map<std::string, std::map<std::string, std::string>> &base)
{
for (auto it1 = data.begin(), it2 = base.begin();; ++it1, ++it2) {
if (it1 == data.end()) {
ASSERT_EQ(base.end(), it2) << "Only in base: hash_key=" << it2->first;
break;
}
ASSERT_NE(base.end(), it2) << "Only in data: hash_key=" << it1->first;
ASSERT_EQ(it1->first, it2->first) << "Diff: data_hash_key=" << it1->first
<< ", base_hash_key=" << it2->first;
compare(it1->second, it2->second, it1->first);
}
dinfo("Data and base are the same.");
}
static void clear_database()
{
ddebug("CLEARING_DATABASE...");
pegasus_client::scan_options option;
std::vector<pegasus_client::pegasus_scanner *> scanners;
int ret = client->get_unordered_scanners(1, option, scanners);
ASSERT_EQ(PERR_OK, ret) << "Error occurred when get scanners, error="
<< client->get_error_string(ret);
ASSERT_EQ(1, scanners.size());
ASSERT_NE(nullptr, scanners[0]);
std::string hash_key;
std::string sort_key;
std::string value;
while (PERR_OK == (ret = (scanners[0]->next(hash_key, sort_key, value)))) {
int r = client->del(hash_key, sort_key);
ASSERT_EQ(PERR_OK, r) << "Error occurred when del, hash_key=" << hash_key
<< ", sort_key=" << sort_key
<< ", error=" << client->get_error_string(r);
}
delete scanners[0];
ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when next() in clearing database. error="
<< client->get_error_string(ret);
ret = client->get_unordered_scanners(1, option, scanners);
ASSERT_EQ(PERR_OK, ret) << "Error occurred when get scanners, error="
<< client->get_error_string(ret);
ASSERT_EQ(1, scanners.size());
ASSERT_NE(nullptr, scanners[0]);
ret = scanners[0]->next(hash_key, sort_key, value);
delete scanners[0];
ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when clearing database. error="
<< client->get_error_string(ret);
base.clear();
ddebug("Database cleared.");
}
// REQUIRED: 'base' is empty
static void fill_database()
{
ddebug("FILLING_DATABASE...");
srandom((unsigned int)time(nullptr));
for (auto &c : buffer) {
c = CCH[random() % sizeof(CCH)];
}
expected_hash_key = random_string();
std::string hash_key;
std::string sort_key;
std::string value;
while (base[expected_hash_key].size() < 1000) {
sort_key = random_string();
value = random_string();
int ret = client->set(expected_hash_key, sort_key, value);
ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
<< ", sort_key=" << sort_key
<< ", error=" << client->get_error_string(ret);
base[expected_hash_key][sort_key] = value;
}
while (base.size() < 1000) {
hash_key = random_string();
while (base[hash_key].size() < 10) {
sort_key = random_string();
value = random_string();
int ret = client->set(hash_key, sort_key, value);
ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << hash_key
<< ", sort_key=" << sort_key
<< ", error=" << client->get_error_string(ret);
base[hash_key][sort_key] = value;
}
}
ddebug("Database filled.");
}
class scan : public testing::Test
{
public:
static void SetUpTestCase()
{
ddebug("SetUp...");
clear_database();
fill_database();
}
static void TearDownTestCase()
{
ddebug("TearDown...");
clear_database();
}
};
TEST_F(scan, ALL_SORT_KEY)
{
ddebug("TESTING_HASH_SCAN, ALL SORT_KEYS ....");
pegasus_client::scan_options options;
std::map<std::string, std::string> data;
pegasus_client::pegasus_scanner *scanner = nullptr;
int ret = client->get_scanner(expected_hash_key, "", "", options, scanner);
ASSERT_EQ(PERR_OK, ret) << "Error occurred when getting scanner. error="
<< client->get_error_string(ret);
ASSERT_NE(nullptr, scanner);
std::string hash_key;
std::string sort_key;
std::string value;
while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
ASSERT_EQ(expected_hash_key, hash_key);
check_and_put(data, expected_hash_key, sort_key, value);
}
delete scanner;
ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
<< client->get_error_string(ret);
compare(data, base[expected_hash_key], expected_hash_key);
}
TEST_F(scan, BOUND_INCLUSIVE)
{
ddebug("TESTING_HASH_SCAN, [start, stop]...");
auto it1 = base[expected_hash_key].begin();
std::advance(it1, random() % 500); // [0,499]
std::string start = it1->first;
auto it2 = it1;
std::advance(it2, random() % 400 + 50); // [0,499] + [50, 449] = [50, 948]
std::string stop = it2->first;
pegasus_client::scan_options options;
options.start_inclusive = true;
options.stop_inclusive = true;
std::map<std::string, std::string> data;
pegasus_client::pegasus_scanner *scanner = nullptr;
int ret = client->get_scanner(expected_hash_key, start, stop, options, scanner);
ASSERT_EQ(PERR_OK, ret) << "Error occurred when getting scanner. error="
<< client->get_error_string(ret);
ASSERT_NE(nullptr, scanner);
std::string hash_key;
std::string sort_key;
std::string value;
while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
ASSERT_EQ(expected_hash_key, hash_key);
check_and_put(data, expected_hash_key, sort_key, value);
}
delete scanner;
ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
<< client->get_error_string(ret);
++it2; // to be the 'end' iterator
compare(data, std::map<std::string, std::string>(it1, it2), expected_hash_key);
}
TEST_F(scan, BOUND_EXCLUSIVE)
{
ddebug("TESTING_HASH_SCAN, (start, stop)...");
auto it1 = base[expected_hash_key].begin();
std::advance(it1, random() % 500); // [0,499]
std::string start = it1->first;
auto it2 = it1;
std::advance(it2, random() % 400 + 50); // [0,499] + [50, 449] = [50, 948]
std::string stop = it2->first;
pegasus_client::scan_options options;
options.start_inclusive = false;
options.stop_inclusive = false;
std::map<std::string, std::string> data;
pegasus_client::pegasus_scanner *scanner = nullptr;
int ret = client->get_scanner(expected_hash_key, start, stop, options, scanner);
ASSERT_EQ(PERR_OK, ret) << "Error occurred when getting scanner. error="
<< client->get_error_string(ret);
ASSERT_NE(nullptr, scanner);
std::string hash_key;
std::string sort_key;
std::string value;
while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
ASSERT_EQ(expected_hash_key, hash_key);
check_and_put(data, expected_hash_key, sort_key, value);
}
delete scanner;
ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
<< client->get_error_string(ret);
++it1;
compare(data, std::map<std::string, std::string>(it1, it2), expected_hash_key);
}
TEST_F(scan, ONE_POINT)
{
ddebug("TESTING_HASH_SCAN, [start, start]...");
auto it1 = base[expected_hash_key].begin();
std::advance(it1, random() % 800); // [0,799]
std::string start = it1->first;
pegasus_client::scan_options options;
options.start_inclusive = true;
options.stop_inclusive = true;
pegasus_client::pegasus_scanner *scanner = nullptr;
int ret = client->get_scanner(expected_hash_key, start, start, options, scanner);
ASSERT_EQ(PERR_OK, ret) << "Error occurred when getting scanner. error="
<< client->get_error_string(ret);
ASSERT_NE(nullptr, scanner);
std::string hash_key;
std::string sort_key;
std::string value;
ret = scanner->next(hash_key, sort_key, value);
ASSERT_EQ(PERR_OK, ret) << "Error occurred when scan. error=" << client->get_error_string(ret);
ASSERT_EQ(expected_hash_key, hash_key);
ASSERT_EQ(start, sort_key);
ASSERT_EQ(it1->second, value);
ret = scanner->next(hash_key, sort_key, value);
ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
<< client->get_error_string(ret);
delete scanner;
}
TEST_F(scan, HALF_INCLUSIVE)
{
ddebug("TESTING_HASH_SCAN, [start, start)...");
auto it1 = base[expected_hash_key].begin();
std::advance(it1, random() % 800); // [0,799]
std::string start = it1->first;
pegasus_client::scan_options options;
options.start_inclusive = true;
options.stop_inclusive = false;
pegasus_client::pegasus_scanner *scanner = nullptr;
int ret = client->get_scanner(expected_hash_key, start, start, options, scanner);
ASSERT_EQ(PERR_OK, ret) << "Error occurred when getting scanner. error="
<< client->get_error_string(ret);
ASSERT_NE(nullptr, scanner);
std::string hash_key;
std::string sort_key;
std::string value;
ret = scanner->next(hash_key, sort_key, value);
ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
<< client->get_error_string(ret);
delete scanner;
}
TEST_F(scan, VOID_SPAN)
{
ddebug("TESTING_HASH_SCAN, [stop, start]...");
auto it1 = base[expected_hash_key].begin();
std::advance(it1, random() % 500); // [0,499]
std::string start = it1->first;
auto it2 = it1;
std::advance(it2, random() % 400 + 50); // [0,499] + [50, 449] = [50, 948]
std::string stop = it2->first;
pegasus_client::scan_options options;
options.start_inclusive = true;
options.stop_inclusive = true;
pegasus_client::pegasus_scanner *scanner = nullptr;
int ret = client->get_scanner(expected_hash_key, stop, start, options, scanner);
ASSERT_EQ(PERR_OK, ret) << "Error occurred when getting scanner. error="
<< client->get_error_string(ret);
ASSERT_NE(nullptr, scanner);
std::string hash_key;
std::string sort_key;
std::string value;
ret = scanner->next(hash_key, sort_key, value);
ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
<< client->get_error_string(ret);
delete scanner;
}
TEST_F(scan, OVERALL)
{
ddebug("TEST OVERALL_SCAN...");
pegasus_client::scan_options options;
std::vector<pegasus_client::pegasus_scanner *> scanners;
int ret = client->get_unordered_scanners(3, options, scanners);
ASSERT_EQ(0, ret) << "Error occurred when getting scanner. error="
<< client->get_error_string(ret);
ASSERT_LE(scanners.size(), 3);
std::string hash_key;
std::string sort_key;
std::string value;
std::map<std::string, std::map<std::string, std::string>> data;
for (auto scanner : scanners) {
ASSERT_NE(nullptr, scanner);
while (PERR_OK == (ret = (scanner->next(hash_key, sort_key, value)))) {
check_and_put(data, hash_key, sort_key, value);
}
ASSERT_EQ(PERR_SCAN_COMPLETE, ret) << "Error occurred when scan. error="
<< client->get_error_string(ret);
delete scanner;
}
compare(data, base);
}
TEST_F(scan, ITERATION_TIME_LIMIT)
{
// update iteration threshold to 1ms
auto response = ddl_client->set_app_envs(
client->get_app_name(), {ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, {std::to_string(1)});
ASSERT_EQ(true, response.is_ok());
ASSERT_EQ(dsn::ERR_OK, response.get_value().err);
// wait envs to be synced.
std::this_thread::sleep_for(std::chrono::seconds(30));
// write data into table
int32_t i = 0;
std::string sort_key;
std::string value;
while (i < 9000) {
sort_key = random_string();
value = random_string();
int ret = client->set(expected_hash_key, sort_key, value);
ASSERT_EQ(PERR_OK, ret) << "Error occurred when set, hash_key=" << expected_hash_key
<< ", sort_key=" << sort_key
<< ", error=" << client->get_error_string(ret);
i++;
}
// get sortkey count timeout
int64_t count = 0;
int ret = client->sortkey_count(expected_hash_key, count);
ASSERT_EQ(0, ret);
ASSERT_EQ(count, -1);
// set iteration threshold to 100ms
response = ddl_client->set_app_envs(
client->get_app_name(), {ROCKSDB_ITERATION_THRESHOLD_TIME_MS}, {std::to_string(100)});
ASSERT_EQ(true, response.is_ok());
ASSERT_EQ(dsn::ERR_OK, response.get_value().err);
// wait envs to be synced.
std::this_thread::sleep_for(std::chrono::seconds(30));
}