blob: 91e2c4d49eb4ea402a30e7aebf12aec4e7638593 [file] [log] [blame]
/*
This is free and unencumbered software released into the public domain.
Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a compiled
binary, for any purpose, commercial or non-commercial, and by any
means.
In jurisdictions that recognize copyright laws, the author or authors
of this software dedicate any and all copyright interest in the
software to the public domain. We make this dedication for the benefit
of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of
relinquishment in perpetuity of all present and future rights to this
software under copyright law.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.
For more information, please refer to <http://unlicense.org/>
*/
#include <assert.h>
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <uv.h>
#include "cassandra.h"
/*
* Use this example with caution. It's just used as a scratch example for debugging and
* roughly analyzing performance.
*/
#define NUM_THREADS 1
#define NUM_IO_WORKER_THREADS 4
#define NUM_CONCURRENT_REQUESTS 10000
#define NUM_ITERATIONS 1000
#define DO_SELECTS 1
#define USE_PREPARED 1
#if defined(_MSC_VER) && defined(_DEBUG)
#include <windows.h>
const DWORD MS_VC_EXCEPTION = 0x406D1388;
#pragma pack(push, 8)
typedef struct tagTHREADNAME_INFO {
DWORD dwType; /* Must be 0x1000. */
LPCSTR szName; /* Pointer to name (in user addr space). */
DWORD dwThreadID; /* Thread ID (-1=caller thread). */
DWORD dwFlags; /* Reserved for future use, must be zero. */
} THREADNAME_INFO;
#pragma pack(pop)
void set_thread_name(const char* thread_name) {
THREADNAME_INFO info;
info.dwType = 0x1000;
info.szName = thread_name;
info.dwThreadID = -1;
info.dwFlags = 0;
#pragma warning(push)
#pragma warning(disable : 6320 6322)
__try {
RaiseException(MS_VC_EXCEPTION, 0, sizeof(info) / sizeof(ULONG_PTR), (ULONG_PTR*)&info);
} __except (EXCEPTION_EXECUTE_HANDLER) {
}
#pragma warning(pop)
}
#endif
const char* big_string = "0123456701234567012345670123456701234567012345670123456701234567"
"0123456701234567012345670123456701234567012345670123456701234567"
"0123456701234567012345670123456701234567012345670123456701234567"
"0123456701234567012345670123456701234567012345670123456701234567"
"0123456701234567012345670123456701234567012345670123456701234567"
"0123456701234567012345670123456701234567012345670123456701234567"
"0123456701234567012345670123456701234567012345670123456701234567";
CassUuidGen* uuid_gen;
typedef struct Status_ {
uv_mutex_t mutex;
uv_cond_t cond;
int count;
} Status;
Status status;
int status_init(Status* status, int initial_count) {
int rc;
rc = uv_mutex_init(&status->mutex);
if (rc != 0) return rc;
rc = uv_cond_init(&status->cond);
if (rc != 0) return rc;
status->count = initial_count;
return rc;
}
void status_destroy(Status* status) {
uv_mutex_destroy(&status->mutex);
uv_cond_destroy(&status->cond);
}
void status_notify(Status* status) {
uv_mutex_lock(&status->mutex);
status->count--;
uv_cond_signal(&status->cond);
uv_mutex_unlock(&status->mutex);
}
int status_wait(Status* status, uint64_t timeout_secs) {
int count;
uv_mutex_lock(&status->mutex);
uv_cond_timedwait(&status->cond, &status->mutex, timeout_secs * 1000 * 1000 * 1000);
count = status->count;
uv_mutex_unlock(&status->mutex);
return count;
}
void print_error(CassFuture* future) {
const char* message;
size_t message_length;
cass_future_error_message(future, &message, &message_length);
fprintf(stderr, "Error: %.*s\n", (int)message_length, message);
}
CassCluster* create_cluster(const char* hosts) {
CassCluster* cluster = cass_cluster_new();
cass_cluster_set_contact_points(cluster, hosts);
cass_cluster_set_credentials(cluster, "cassandra", "cassandra");
cass_cluster_set_num_threads_io(cluster, NUM_IO_WORKER_THREADS);
cass_cluster_set_queue_size_io(cluster, 10000);
cass_cluster_set_core_connections_per_host(cluster, 1);
return cluster;
}
CassError connect_session(CassSession* session, const CassCluster* cluster) {
CassError rc = CASS_OK;
CassFuture* future = cass_session_connect(session, cluster);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
print_error(future);
}
cass_future_free(future);
return rc;
}
CassError execute_query(CassSession* session, const char* query) {
CassError rc = CASS_OK;
CassFuture* future = NULL;
CassStatement* statement = cass_statement_new(query, 0);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
print_error(future);
}
cass_future_free(future);
cass_statement_free(statement);
return rc;
}
CassError prepare_query(CassSession* session, const char* query, const CassPrepared** prepared) {
CassError rc = CASS_OK;
CassFuture* future = NULL;
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
print_error(future);
} else {
*prepared = cass_future_get_prepared(future);
}
cass_future_free(future);
return rc;
}
int compare_dbl(const void* d1, const void* d2) {
if (*((double*)d1) < *((double*)d2)) {
return -1;
} else if (*((double*)d1) > *((double*)d2)) {
return 1;
} else {
return 0;
}
}
void insert_into_perf(CassSession* session, const char* query, const CassPrepared* prepared) {
int i;
CassFuture* futures[NUM_CONCURRENT_REQUESTS];
CassCollection* collection = cass_collection_new(CASS_COLLECTION_TYPE_SET, 2);
cass_collection_append_string(collection, "jazz");
cass_collection_append_string(collection, "2013");
for (i = 0; i < NUM_CONCURRENT_REQUESTS; ++i) {
CassUuid id;
CassStatement* statement;
if (prepared != NULL) {
statement = cass_prepared_bind(prepared);
} else {
statement = cass_statement_new(query, 5);
}
cass_statement_set_is_idempotent(statement, cass_true);
cass_uuid_gen_time(uuid_gen, &id);
cass_statement_bind_uuid(statement, 0, id);
cass_statement_bind_string(statement, 1, big_string);
cass_statement_bind_string(statement, 2, big_string);
cass_statement_bind_string(statement, 3, big_string);
cass_statement_bind_collection(statement, 4, collection);
futures[i] = cass_session_execute(session, statement);
cass_statement_free(statement);
}
for (i = 0; i < NUM_CONCURRENT_REQUESTS; ++i) {
CassFuture* future = futures[i];
CassError rc = cass_future_error_code(future);
if (rc != CASS_OK) {
print_error(future);
}
cass_future_free(future);
}
cass_collection_free(collection);
}
void run_insert_queries(void* data) {
int i;
CassSession* session = (CassSession*)data;
const CassPrepared* insert_prepared = NULL;
const char* insert_query =
"INSERT INTO stress.songs (id, title, album, artist, tags) VALUES (?, ?, ?, ?, ?);";
#if USE_PREPARED
if (prepare_query(session, insert_query, &insert_prepared) == CASS_OK) {
#endif
for (i = 0; i < NUM_ITERATIONS; ++i) {
insert_into_perf(session, insert_query, insert_prepared);
}
#if USE_PREPARED
cass_prepared_free(insert_prepared);
}
#endif
status_notify(&status);
}
void select_from_perf(CassSession* session, const char* query, const CassPrepared* prepared) {
int i;
CassFuture* futures[NUM_CONCURRENT_REQUESTS];
for (i = 0; i < NUM_CONCURRENT_REQUESTS; ++i) {
CassStatement* statement;
if (prepared != NULL) {
statement = cass_prepared_bind(prepared);
} else {
statement = cass_statement_new(query, 0);
}
cass_statement_set_is_idempotent(statement, cass_true);
futures[i] = cass_session_execute(session, statement);
cass_statement_free(statement);
}
for (i = 0; i < NUM_CONCURRENT_REQUESTS; ++i) {
CassFuture* future = futures[i];
CassError rc = cass_future_error_code(future);
if (rc != CASS_OK) {
print_error(future);
} else {
const CassResult* result = cass_future_get_result(future);
assert(cass_result_column_count(result) == 6);
cass_result_free(result);
}
cass_future_free(future);
}
}
void run_select_queries(void* data) {
int i;
CassSession* session = (CassSession*)data;
const CassPrepared* select_prepared = NULL;
const char* select_query =
"SELECT * FROM stress.songs WHERE id = a98d21b2-1900-11e4-b97b-e5e358e71e0d";
#if defined(_MSC_VER) && defined(_DEBUG)
char thread_name[32];
sprintf(thread_name, "Perf - %lu", (unsigned long)(GetThreadId(uv_thread_self())));
set_thread_name(thread_name);
#endif
#if USE_PREPARED
if (prepare_query(session, select_query, &select_prepared) == CASS_OK) {
#endif
for (i = 0; i < NUM_ITERATIONS; ++i) {
select_from_perf(session, select_query, select_prepared);
}
#if USE_PREPARED
cass_prepared_free(select_prepared);
}
#endif
status_notify(&status);
}
int main(int argc, char* argv[]) {
int i;
CassMetrics metrics;
uv_thread_t threads[NUM_THREADS];
CassCluster* cluster = NULL;
CassSession* session = NULL;
char* hosts = "127.0.0.1";
if (argc > 1) {
hosts = argv[1];
}
status_init(&status, NUM_THREADS);
cass_log_set_level(CASS_LOG_INFO);
cluster = create_cluster(hosts);
uuid_gen = cass_uuid_gen_new();
session = cass_session_new();
if (connect_session(session, cluster) != CASS_OK) {
cass_cluster_free(cluster);
cass_session_free(session);
cass_uuid_gen_free(uuid_gen);
return -1;
}
execute_query(session, "DROP KEYSPACE stress");
execute_query(session, "CREATE KEYSPACE IF NOT EXISTS stress WITH "
"replication = { 'class': 'SimpleStrategy', 'replication_factor': '3'}");
execute_query(session, "CREATE TABLE IF NOT EXISTS stress.songs (id uuid PRIMARY KEY, "
"title text, album text, artist text, "
"tags set<text>, data blob)");
execute_query(
session,
"INSERT INTO stress.songs (id, title, album, artist, tags) VALUES "
"(a98d21b2-1900-11e4-b97b-e5e358e71e0d, "
"'La Petite Tonkinoise', 'Bye Bye Blackbird', 'Joséphine Baker', { 'jazz', '2013' });");
for (i = 0; i < NUM_THREADS; ++i) {
#if DO_SELECTS
uv_thread_create(&threads[i], run_select_queries, (void*)session);
#else
uv_thread_create(&threads[i], run_insert_queries, (void*)session);
#endif
}
while (status_wait(&status, 5) > 0) {
cass_session_get_metrics(session, &metrics);
printf("rate stats (requests/second): mean %f 1m %f 5m %f 10m %f\n", metrics.requests.mean_rate,
metrics.requests.one_minute_rate, metrics.requests.five_minute_rate,
metrics.requests.fifteen_minute_rate);
}
cass_session_get_metrics(session, &metrics);
printf("final stats (microseconds): min %llu max %llu median %llu 75th %llu 95th %llu 98th %llu "
"99th %llu 99.9th %llu\n",
(unsigned long long int)metrics.requests.min, (unsigned long long int)metrics.requests.max,
(unsigned long long int)metrics.requests.median,
(unsigned long long int)metrics.requests.percentile_75th,
(unsigned long long int)metrics.requests.percentile_95th,
(unsigned long long int)metrics.requests.percentile_98th,
(unsigned long long int)metrics.requests.percentile_99th,
(unsigned long long int)metrics.requests.percentile_999th);
for (i = 0; i < NUM_THREADS; ++i) {
uv_thread_join(&threads[i]);
}
cass_cluster_free(cluster);
cass_session_free(session);
cass_uuid_gen_free(uuid_gen);
status_destroy(&status);
return 0;
}