blob: d6dd9e56bea815431f4db39eca2871d925047ec1 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <brpc/couchbase.h>
#include <bthread/bthread.h>
#include <butil/logging.h>
#include <butil/string_printf.h>
#include <atomic>
#include <iostream>
#include <string>
#include <vector>
// ANSI color codes
#define GREEN "\033[32m"
#define RED "\033[31m"
#define BLUE "\033[34m"
#define YELLOW "\033[33m"
#define CYAN "\033[36m"
#define RESET "\033[0m"
const int NUM_THREADS = 20;
const int THREADS_PER_BUCKET = 5;
// Simple global config
struct {
std::string username = "Administrator";
std::string password = "password";
std::vector<std::string> bucket_names = {"t0", "t1", "t2", "t3"};
} g_config;
// Simple thread statistics
struct ThreadStats {
std::atomic<int> operations_attempted{0};
std::atomic<int> operations_successful{0};
std::atomic<int> operations_failed{0};
void reset() {
operations_attempted = 0;
operations_successful = 0;
operations_failed = 0;
}
};
// Global statistics
struct GlobalStats {
ThreadStats total;
std::vector<ThreadStats> per_thread_stats;
GlobalStats() : per_thread_stats(NUM_THREADS) {}
void aggregate_stats() {
total.reset();
for (const auto& stats : per_thread_stats) {
total.operations_attempted += stats.operations_attempted.load();
total.operations_successful += stats.operations_successful.load();
total.operations_failed += stats.operations_failed.load();
}
}
} g_stats;
// Simple thread arguments
struct ThreadArgs {
int thread_id;
int bucket_id;
std::string bucket_name;
brpc::CouchbaseOperations* couchbase_ops;
ThreadStats* stats;
};
// Simple CRUD operations on default collection
void perform_crud_operations_default(brpc::CouchbaseOperations& couchbase_ops,
const std::string& base_key,
ThreadStats* stats) {
std::string key = base_key + "_default";
std::string value = butil::string_printf(
R"({"thread_id": %d, "collection": "default"})", (int)bthread_self());
stats->operations_attempted++;
// UPSERT
brpc::CouchbaseOperations::Result result = couchbase_ops.upsert(key, value);
if (result.success) {
stats->operations_successful++;
} else {
stats->operations_failed++;
return;
}
stats->operations_attempted++;
// GET
result = couchbase_ops.get(key);
if (result.success) {
stats->operations_successful++;
} else {
stats->operations_failed++;
return;
}
stats->operations_attempted++;
// DELETE
result = couchbase_ops.delete_(key);
if (result.success) {
stats->operations_successful++;
} else {
stats->operations_failed++;
}
}
// Simple CRUD operations on col1 collection
void perform_crud_operations_col1(brpc::CouchbaseOperations& couchbase_ops,
const std::string& base_key,
ThreadStats* stats) {
std::string key = base_key + "_col1";
std::string value = butil::string_printf(
R"({"thread_id": %d, "collection": "col1"})", (int)bthread_self());
stats->operations_attempted++;
// UPSERT
brpc::CouchbaseOperations::Result result =
couchbase_ops.upsert(key, value, "col1");
if (result.success) {
stats->operations_successful++;
} else {
stats->operations_failed++;
std::cout << "UPSERT failed: " << result.error_message << std::endl;
return;
}
stats->operations_attempted++;
// GET
result = couchbase_ops.get(key, "col1");
if (result.success) {
stats->operations_successful++;
} else {
stats->operations_failed++;
std::cout << "GET failed: " << result.error_message << std::endl;
return;
}
stats->operations_attempted++;
// DELETE
result = couchbase_ops.delete_(key, "col1");
if (result.success) {
stats->operations_successful++;
} else {
stats->operations_failed++;
}
}
// Simple thread worker function
void* thread_worker(void* arg) {
ThreadArgs* args = static_cast<ThreadArgs*>(arg);
std::cout << CYAN << "Thread " << args->thread_id << " starting on bucket "
<< args->bucket_name << RESET << std::endl;
// Create CouchbaseOperations instance for this thread
brpc::CouchbaseOperations couchbase_ops;
// Authentication
brpc::CouchbaseOperations::Result auth_result = couchbase_ops.authenticate(
g_config.username, g_config.password, "127.0.0.1:11210", args->bucket_name);
// for SSL authentication use below line instead
// brpc::CouchbaseOperations::Result auth_result = couchbase_ops.authenticateSSL(username, password, "127.0.0.1:11210", args->bucket_name, "/path/to/cert.txt");
if (!auth_result.success) {
std::cout << RED << "Thread " << args->thread_id << ": Auth failed - "
<< auth_result.error_message << RESET << std::endl;
return NULL;
}
// // Select bucket
// brpc::CouchbaseOperations::Result bucket_result =
// couchbase_ops.selectBucket(args->bucket_name);
// if (!bucket_result.success) {
// std::cout << RED << "Thread " << args->thread_id
// << ": Bucket selection failed - " << bucket_result.error_message
// << RESET << std::endl;
// return NULL;
// }
std::cout << GREEN << "Thread " << args->thread_id << " connected to bucket "
<< args->bucket_name << RESET << std::endl;
// Perform operations - 10 times on default collection, 10 times on col1
// collection
for (int i = 0; i < 10; ++i) {
std::string base_key =
butil::string_printf("thread_%d_op_%d", args->thread_id, i);
// CRUD operations on default collection
perform_crud_operations_default(couchbase_ops, base_key, args->stats);
// CRUD operations on col1 collection
perform_crud_operations_col1(couchbase_ops, base_key, args->stats);
// Small delay between operations
bthread_usleep(10000); // 10ms
}
int successful = args->stats->operations_successful.load();
int attempted = args->stats->operations_attempted.load();
int failed = args->stats->operations_failed.load();
std::cout << GREEN << "Thread " << args->thread_id
<< " completed: " << successful << "/" << attempted
<< " operations successful, " << failed << " failed" << RESET
<< std::endl;
return NULL;
}
void* shared_object_thread_worker(void *arg){
ThreadArgs* shared_args = static_cast<ThreadArgs*>(arg);
brpc::CouchbaseOperations* shared_couchbase_ops = shared_args->couchbase_ops;
// Perform operations - 10 times on default collection, 10 times on col1
// collection
for (int i = 0; i < 10; ++i) {
std::string base_key =
butil::string_printf("shared_thread_op_%d_thread_id_%d", i, shared_args->thread_id);
// CRUD operations on default collection
perform_crud_operations_default(*shared_couchbase_ops, base_key, shared_args->stats);
// CRUD operations on col1 collection
perform_crud_operations_col1(*shared_couchbase_ops, base_key, shared_args->stats);
// Small delay between operations
bthread_usleep(10000); // 10ms
}
return NULL;
}
// Print simple statistics
void print_stats() {
g_stats.aggregate_stats();
std::cout << std::endl;
std::cout << CYAN << "=== TEST RESULTS ===" << RESET << std::endl;
int total_attempted = g_stats.total.operations_attempted.load();
int total_successful = g_stats.total.operations_successful.load();
int total_failed = g_stats.total.operations_failed.load();
double success_rate = total_attempted > 0
? (double)total_successful / total_attempted * 100.0
: 0.0;
std::cout << GREEN << "Overall Performance:" << RESET << std::endl;
std::cout << " Total Operations: " << total_attempted << std::endl;
std::cout << " Successful: " << total_successful << " (" << success_rate
<< "%)" << std::endl;
std::cout << " Failed: " << total_failed << std::endl;
std::cout << std::endl;
// Per-thread breakdown
std::cout << YELLOW << "Per-Thread Performance:" << RESET << std::endl;
for (int i = 0; i < NUM_THREADS; ++i) {
const auto& stats = g_stats.per_thread_stats[i];
int attempted = stats.operations_attempted.load();
int successful = stats.operations_successful.load();
int failed = stats.operations_failed.load();
std::cout << " Thread " << i << ": " << attempted << " ops, " << successful
<< " success, " << failed << " failed" << std::endl;
}
std::cout << std::endl;
}
int main(int argc, char* argv[]) {
std::cout << GREEN << "Starting Simple Multithreaded Couchbase Client"
<< RESET << std::endl;
std::cout
<< YELLOW
<< "20 threads: 5 per bucket (testing0, testing1, testing2, testing3)"
<< RESET << std::endl;
std::cout << BLUE
<< "Each thread performs CRUD operations on default collection and "
"col1 collection"
<< RESET << std::endl;
// Create threads and arguments
std::vector<bthread_t> threads(NUM_THREADS);
std::vector<ThreadArgs> args(NUM_THREADS);
// Assign threads to buckets
for (int i = 0; i < NUM_THREADS; ++i) {
args[i].thread_id = i;
args[i].bucket_id = i / THREADS_PER_BUCKET;
args[i].bucket_name = g_config.bucket_names[args[i].bucket_id];
args[i].stats = &g_stats.per_thread_stats[i];
}
// Print thread assignments
std::cout << "Thread Assignments:" << RESET << std::endl;
for (int i = 0; i < NUM_THREADS; ++i) {
std::cout << "Thread " << i << " -> Bucket: " << args[i].bucket_name
<< std::endl;
}
std::cout << std::endl;
// Start all threads
for (int i = 0; i < NUM_THREADS; ++i) {
if (bthread_start_background(&threads[i], NULL, thread_worker, &args[i]) !=
0) {
std::cout << RED << "Failed to create thread " << i << RESET << std::endl;
return -1;
}
}
std::cout << GREEN << "All 20 threads started!" << RESET << std::endl;
// Wait for all threads to complete
std::cout << YELLOW << "Waiting for all threads to complete..." << RESET
<< std::endl;
for (int i = 0; i < NUM_THREADS; ++i) {
bthread_join(threads[i], NULL);
}
std::cout << GREEN << "All threads completed!" << RESET << std::endl;
// create a shared CouchbaseOperations instance
brpc::CouchbaseOperations shared_couchbase_ops;
brpc::CouchbaseOperations::Result result;
result = shared_couchbase_ops.authenticate(
g_config.username, g_config.password, "127.0.0.1:11210", "t0");
if(result.success){
std::cout << GREEN << "Shared CouchbaseOperations instance authenticated successfully!" << RESET << std::endl;
} else {
std::cout << RED << "Shared CouchbaseOperations instance authentication failed: " << result.error_message << RESET << std::endl;
return -1;
}
for (int i = 0; i < NUM_THREADS; ++i) {
args[i].thread_id = i;
args[i].couchbase_ops = &shared_couchbase_ops;
args[i].bucket_id = 0;
args[i].bucket_name = "t0";
// args[i].stats = &g_stats.per_thread_stats[i];
}
for(int i = 0; i < NUM_THREADS; ++i){
if (bthread_start_background(&threads[i], NULL, shared_object_thread_worker, &args[i]) !=
0) {
std::cout << RED << "Failed to create shared object thread " << i << RESET << std::endl;
return -1;
}
}
for(int i = 0; i < NUM_THREADS; ++i){
bthread_join(threads[i], NULL);
}
std::cout << GREEN << "All shared object threads completed!" << RESET << std::endl;
// Print statistics
print_stats();
return 0;
}