| /* |
| * pgbench.c |
| * |
| * A simple benchmark program for PostgreSQL |
| * Originally written by Tatsuo Ishii and enhanced by many contributors. |
| * |
| * $PostgreSQL: pgsql/contrib/pgbench/pgbench.c,v 1.93 2009/12/15 07:17:57 itagaki Exp $ |
| * Copyright (c) 2000-2009, PostgreSQL Global Development Group |
| * ALL RIGHTS RESERVED; |
| * |
| * Permission to use, copy, modify, and distribute this software and its |
| * documentation for any purpose, without fee, and without a written agreement |
| * is hereby granted, provided that the above copyright notice and this |
| * paragraph and the following two paragraphs appear in all copies. |
| * |
| * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR |
| * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING |
| * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS |
| * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE |
| * POSSIBILITY OF SUCH DAMAGE. |
| * |
| * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES, |
| * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY |
| * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS |
| * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO |
| * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. |
| * |
| */ |
| |
| #ifdef WIN32 |
| #define FD_SETSIZE 1024 /* set before winsock2.h is included */ |
| #endif /* ! WIN32 */ |
| |
| #include "postgres_fe.h" |
| |
| #include "libpq-fe.h" |
| #include "pqsignal.h" |
| #include "portability/instr_time.h" |
| |
| #include <ctype.h> |
| |
| #ifdef WIN32 |
| #include <win32.h> |
| #else |
| #include <signal.h> |
| #include <sys/time.h> |
| #include <unistd.h> |
| #endif /* ! WIN32 */ |
| |
| #ifdef HAVE_GETOPT_H |
| #include <getopt.h> |
| #endif |
| |
| #ifdef HAVE_SYS_SELECT_H |
| #include <sys/select.h> |
| #endif |
| |
| #ifdef HAVE_SYS_RESOURCE_H |
| #include <sys/resource.h> /* for getrlimit */ |
| #endif |
| |
| #ifndef INT64_MAX |
| #define INT64_MAX INT64CONST(0x7FFFFFFFFFFFFFFF) |
| #endif |
| |
| /* |
| * Multi-platform pthread implementations |
| */ |
| |
| #ifdef WIN32 |
| /* Use native win32 threads on Windows */ |
| typedef struct win32_pthread *pthread_t; |
| typedef int pthread_attr_t; |
| |
| static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg); |
| static int pthread_join(pthread_t th, void **thread_return); |
| |
| #elif defined(ENABLE_THREAD_SAFETY) |
| /* Use platform-dependent pthread capability */ |
| #include <pthread.h> |
| |
| #else |
| /* Use emulation with fork. Rename pthread identifiers to avoid conflicts */ |
| |
| #include <sys/wait.h> |
| |
| #define pthread_t pg_pthread_t |
| #define pthread_attr_t pg_pthread_attr_t |
| #define pthread_create pg_pthread_create |
| #define pthread_join pg_pthread_join |
| |
| typedef struct fork_pthread *pthread_t; |
| typedef int pthread_attr_t; |
| |
| static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void * (*start_routine)(void *), void *arg); |
| static int pthread_join(pthread_t th, void **thread_return); |
| |
| #endif |
| |
| extern char *optarg; |
| extern int optind; |
| |
| |
| /******************************************************************** |
| * some configurable parameters */ |
| |
| /* max number of clients allowed */ |
| #ifdef FD_SETSIZE |
| #define MAXCLIENTS (FD_SETSIZE - 10) |
| #else |
| #define MAXCLIENTS 1024 |
| #endif |
| |
| #define DEFAULT_NXACTS 10 /* default nxacts */ |
| |
| int nxacts = 0; /* number of transactions per client */ |
| int duration = 0; /* duration in seconds */ |
| |
| /* |
| * scaling factor. for example, scale = 10 will make 1000000 tuples in |
| * pgbench_accounts table. |
| */ |
| int scale = 1; |
| |
| /* |
| * fillfactor. for example, fillfactor = 90 will use only 90 percent |
| * space during inserts and leave 10 percent free. |
| */ |
| int fillfactor = 100; |
| |
| /* |
| * end of configurable parameters |
| *********************************************************************/ |
| |
| #define nbranches 1 /* Makes little sense to change this. Change -s instead */ |
| #define ntellers 10 |
| #define naccounts 100000 |
| |
| FILE *LOGFILE = NULL; |
| |
| bool use_log; /* log transaction latencies to a file */ |
| |
| int is_connect; /* establish connection for each transaction */ |
| int use_unique_key=1; /* indexes will be primary key if set, otherwise non-unique indexes */ |
| |
| char *pghost = ""; |
| char *pgport = ""; |
| char *storage_clause = "appendonly=false"; |
| char *pgoptions = NULL; |
| char *pgtty = NULL; |
| char *login = NULL; |
| char *dbName; |
| |
| volatile bool timer_exceeded = false; /* flag from signal handler */ |
| |
| /* variable definitions */ |
| typedef struct |
| { |
| char *name; /* variable name */ |
| char *value; /* its value */ |
| } Variable; |
| |
| #define MAX_FILES 128 /* max number of SQL script files allowed */ |
| #define SHELL_COMMAND_SIZE 256 /* maximum size allowed for shell command */ |
| |
| /* |
| * structures used in custom query mode |
| */ |
| |
| typedef struct |
| { |
| PGconn *con; /* connection handle to DB */ |
| int id; /* client No. */ |
| int state; /* state No. */ |
| int cnt; /* xacts count */ |
| int ecnt; /* error count */ |
| int listen; /* 0 indicates that an async query has been |
| * sent */ |
| int sleeping; /* 1 indicates that the client is napping */ |
| int64 until; /* napping until (usec) */ |
| Variable *variables; /* array of variable definitions */ |
| int nvariables; |
| instr_time txn_begin; /* used for measuring latencies */ |
| int use_file; /* index in sql_files for this client */ |
| bool prepared[MAX_FILES]; |
| } CState; |
| |
| /* |
| * Thread state and result |
| */ |
| typedef struct |
| { |
| pthread_t thread; /* thread handle */ |
| CState *state; /* array of CState */ |
| int nstate; /* length of state[] */ |
| instr_time start_time; /* thread start time */ |
| } TState; |
| |
| #define INVALID_THREAD ((pthread_t) 0) |
| |
| typedef struct |
| { |
| instr_time conn_time; |
| int xacts; |
| } TResult; |
| |
| /* |
| * queries read from files |
| */ |
| #define SQL_COMMAND 1 |
| #define META_COMMAND 2 |
| #define MAX_ARGS 10 |
| |
| typedef enum QueryMode |
| { |
| QUERY_SIMPLE, /* simple query */ |
| QUERY_EXTENDED, /* extended query */ |
| QUERY_PREPARED, /* extended query with prepared statements */ |
| NUM_QUERYMODE |
| } QueryMode; |
| |
| static QueryMode querymode = QUERY_SIMPLE; |
| static const char *QUERYMODE[] = {"simple", "extended", "prepared"}; |
| |
| typedef struct |
| { |
| int type; /* command type (SQL_COMMAND or META_COMMAND) */ |
| int argc; /* number of commands */ |
| char *argv[MAX_ARGS]; /* command list */ |
| } Command; |
| |
| static Command **sql_files[MAX_FILES]; /* SQL script files */ |
| static int num_files; /* number of script files */ |
| static int debug = 0; /* debug flag */ |
| |
| /* default scenario */ |
| static char *tpc_b = { |
| "\\set nbranches " CppAsString2(nbranches) " * :scale\n" |
| "\\set ntellers " CppAsString2(ntellers) " * :scale\n" |
| "\\set naccounts " CppAsString2(naccounts) " * :scale\n" |
| "\\setrandom aid 1 :naccounts\n" |
| "\\setrandom bid 1 :nbranches\n" |
| "\\setrandom tid 1 :ntellers\n" |
| "\\setrandom delta -5000 5000\n" |
| "BEGIN;\n" |
| "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n" |
| "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n" |
| "UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;\n" |
| "UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;\n" |
| "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n" |
| "END;\n" |
| }; |
| |
| /* -N case */ |
| static char *simple_update = { |
| "\\set nbranches " CppAsString2(nbranches) " * :scale\n" |
| "\\set ntellers " CppAsString2(ntellers) " * :scale\n" |
| "\\set naccounts " CppAsString2(naccounts) " * :scale\n" |
| "\\setrandom aid 1 :naccounts\n" |
| "\\setrandom bid 1 :nbranches\n" |
| "\\setrandom tid 1 :ntellers\n" |
| "\\setrandom delta -5000 5000\n" |
| "BEGIN;\n" |
| "UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;\n" |
| "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n" |
| "INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);\n" |
| "END;\n" |
| }; |
| |
| /* -S case */ |
| static char *select_only = { |
| "\\set naccounts " CppAsString2(naccounts) " * :scale\n" |
| "\\setrandom aid 1 :naccounts\n" |
| "SELECT abalance FROM pgbench_accounts WHERE aid = :aid;\n" |
| }; |
| |
| /* Function prototypes */ |
| static void setalarm(int seconds); |
| static void* threadRun(void *arg); |
| |
| static void |
| usage(const char *progname) |
| { |
| printf("%s is a benchmarking tool for PostgreSQL.\n\n" |
| "Usage:\n" |
| " %s [OPTIONS]... [DBNAME]\n" |
| "\nInitialization options:\n" |
| " -i invokes initialization mode\n" |
| " -x STRING append this string to the storage clause e.g. 'appendonly=true, orientation=column'\n" |
| " -q make the indexes that are created non-unique indexes (default: unique)\n" |
| " -F NUM fill factor\n" |
| " -s NUM scaling factor\n" |
| "\nBenchmarking options:\n" |
| " -c NUM number of concurrent database clients (default: 1)\n" |
| " -C establish new connection for each transaction\n" |
| " -D VARNAME=VALUE\n" |
| " define variable for use by custom script\n" |
| " -f FILENAME read transaction script from FILENAME\n" |
| " -j NUM number of threads (default: 1)\n" |
| " -l write transaction times to log file\n" |
| " -M {simple|extended|prepared}\n" |
| " protocol for submitting queries to server (default: simple)\n" |
| " -n do not run VACUUM before tests\n" |
| " -N do not update tables \"pgbench_tellers\" and \"pgbench_branches\"\n" |
| " -s NUM report this scale factor in output\n" |
| " -S perform SELECT-only transactions\n" |
| " -t NUM number of transactions each client runs (default: 10)\n" |
| " -T NUM duration of benchmark test in seconds\n" |
| " -v vacuum all four standard tables before tests\n" |
| "\nCommon options:\n" |
| " -d print debugging output\n" |
| " -h HOSTNAME database server host or socket directory\n" |
| " -p PORT database server port number\n" |
| " -U USERNAME connect as specified database user\n" |
| " --help show this help, then exit\n" |
| " --version output version information, then exit\n" |
| "\n" |
| "Report bugs to <pgsql-bugs@postgresql.org>.\n", |
| progname, progname); |
| } |
| |
| /* random number generator: uniform distribution from min to max inclusive */ |
| static int |
| getrand(int min, int max) |
| { |
| /* |
| * Odd coding is so that min and max have approximately the same chance of |
| * being selected as do numbers between them. |
| */ |
| return min + (int) (((max - min + 1) * (double) random()) / (MAX_RANDOM_VALUE + 1.0)); |
| } |
| |
| /* call PQexec() and exit() on failure */ |
| static void |
| executeStatement(PGconn *con, const char *sql) |
| { |
| PGresult *res; |
| |
| res = PQexec(con, sql); |
| if (PQresultStatus(res) != PGRES_COMMAND_OK) |
| { |
| fprintf(stderr, "%s", PQerrorMessage(con)); |
| exit(1); |
| } |
| PQclear(res); |
| } |
| |
| /* set up a connection to the backend */ |
| static PGconn * |
| doConnect(void) |
| { |
| PGconn *conn; |
| static char *password = NULL; |
| bool new_pass; |
| |
| /* |
| * Start the connection. Loop until we have a password if requested by |
| * backend. |
| */ |
| do |
| { |
| new_pass = false; |
| |
| conn = PQsetdbLogin(pghost, pgport, pgoptions, pgtty, dbName, |
| login, password); |
| if (!conn) |
| { |
| fprintf(stderr, "Connection to database \"%s\" failed\n", |
| dbName); |
| return NULL; |
| } |
| |
| if (PQstatus(conn) == CONNECTION_BAD && |
| PQconnectionNeedsPassword(conn) && |
| password == NULL) |
| { |
| PQfinish(conn); |
| password = simple_prompt("Password: ", 100, false); |
| new_pass = true; |
| } |
| } while (new_pass); |
| |
| /* check to see that the backend connection was successfully made */ |
| if (PQstatus(conn) == CONNECTION_BAD) |
| { |
| fprintf(stderr, "Connection to database \"%s\" failed:\n%s", |
| dbName, PQerrorMessage(conn)); |
| PQfinish(conn); |
| return NULL; |
| } |
| |
| return conn; |
| } |
| |
| /* throw away response from backend */ |
| static void |
| discard_response(CState *state) |
| { |
| PGresult *res; |
| |
| do |
| { |
| res = PQgetResult(state->con); |
| if (res) |
| PQclear(res); |
| } while (res); |
| } |
| |
| static int |
| compareVariables(const void *v1, const void *v2) |
| { |
| return strcmp(((const Variable *) v1)->name, |
| ((const Variable *) v2)->name); |
| } |
| |
| static char * |
| getVariable(CState *st, char *name) |
| { |
| Variable key, |
| *var; |
| |
| /* On some versions of Solaris, bsearch of zero items dumps core */ |
| if (st->nvariables <= 0) |
| return NULL; |
| |
| key.name = name; |
| var = (Variable *) bsearch((void *) &key, |
| (void *) st->variables, |
| st->nvariables, |
| sizeof(Variable), |
| compareVariables); |
| if (var != NULL) |
| return var->value; |
| else |
| return NULL; |
| } |
| |
| static int |
| putVariable(CState *st, char *name, char *value) |
| { |
| Variable key, |
| *var; |
| |
| key.name = name; |
| /* On some versions of Solaris, bsearch of zero items dumps core */ |
| if (st->nvariables > 0) |
| var = (Variable *) bsearch((void *) &key, |
| (void *) st->variables, |
| st->nvariables, |
| sizeof(Variable), |
| compareVariables); |
| else |
| var = NULL; |
| |
| if (var == NULL) |
| { |
| Variable *newvars; |
| |
| if (st->variables) |
| newvars = (Variable *) realloc(st->variables, |
| (st->nvariables + 1) * sizeof(Variable)); |
| else |
| newvars = (Variable *) malloc(sizeof(Variable)); |
| |
| if (newvars == NULL) |
| return false; |
| |
| st->variables = newvars; |
| |
| var = &newvars[st->nvariables]; |
| |
| var->name = NULL; |
| var->value = NULL; |
| |
| if ((var->name = strdup(name)) == NULL || |
| (var->value = strdup(value)) == NULL) |
| { |
| free(var->name); |
| free(var->value); |
| return false; |
| } |
| |
| st->nvariables++; |
| |
| qsort((void *) st->variables, st->nvariables, sizeof(Variable), |
| compareVariables); |
| } |
| else |
| { |
| char *val; |
| |
| if ((val = strdup(value)) == NULL) |
| return false; |
| |
| free(var->value); |
| var->value = val; |
| } |
| |
| return true; |
| } |
| |
| static char * |
| parseVariable(const char *sql, int *eaten) |
| { |
| int i = 0; |
| char *name; |
| |
| do |
| { |
| i++; |
| } while (isalnum((unsigned char) sql[i]) || sql[i] == '_'); |
| if (i == 1) |
| return NULL; |
| |
| name = malloc(i); |
| if (name == NULL) |
| return NULL; |
| memcpy(name, &sql[1], i - 1); |
| name[i - 1] = '\0'; |
| |
| *eaten = i; |
| return name; |
| } |
| |
| static char * |
| replaceVariable(char **sql, char *param, int len, char *value) |
| { |
| int valueln = strlen(value); |
| |
| if (valueln > len) |
| { |
| char *tmp; |
| size_t offset = param - *sql; |
| |
| tmp = realloc(*sql, strlen(*sql) - len + valueln + 1); |
| if (tmp == NULL) |
| { |
| free(*sql); |
| return NULL; |
| } |
| *sql = tmp; |
| param = *sql + offset; |
| } |
| |
| if (valueln != len) |
| memmove(param + valueln, param + len, strlen(param + len) + 1); |
| strncpy(param, value, valueln); |
| |
| return param + valueln; |
| } |
| |
| static char * |
| assignVariables(CState *st, char *sql) |
| { |
| char *p, |
| *name, |
| *val; |
| |
| p = sql; |
| while ((p = strchr(p, ':')) != NULL) |
| { |
| int eaten; |
| |
| name = parseVariable(p, &eaten); |
| if (name == NULL) |
| { |
| while (*p == ':') |
| { |
| p++; |
| } |
| continue; |
| } |
| |
| val = getVariable(st, name); |
| free(name); |
| if (val == NULL) |
| { |
| p++; |
| continue; |
| } |
| |
| if ((p = replaceVariable(&sql, p, eaten, val)) == NULL) |
| return NULL; |
| } |
| |
| return sql; |
| } |
| |
| static void |
| getQueryParams(CState *st, const Command *command, const char **params) |
| { |
| int i; |
| |
| for (i = 0; i < command->argc - 1; i++) |
| params[i] = getVariable(st, command->argv[i + 1]); |
| } |
| |
| /* |
| * Run a shell command. The result is assigned to the variable if not NULL. |
| * Return true if succeeded, or false on error. |
| */ |
| static bool |
| runShellCommand(CState *st, char *variable, char **argv, int argc) |
| { |
| char command[SHELL_COMMAND_SIZE]; |
| int i, |
| len = 0; |
| FILE *fp; |
| char res[64]; |
| char *endptr; |
| int retval; |
| |
| /* |
| * Join arguments with whilespace separaters. Arguments starting with |
| * exactly one colon are treated as variables: |
| * name - append a string "name" |
| * :var - append a variable named 'var'. |
| * ::name - append a string ":name" |
| */ |
| for (i = 0; i < argc; i++) |
| { |
| char *arg; |
| int arglen; |
| |
| if (argv[i][0] != ':') |
| { |
| arg = argv[i]; /* a string literal */ |
| } |
| else if (argv[i][1] == ':') |
| { |
| arg = argv[i] + 1; /* a string literal starting with colons */ |
| } |
| else if ((arg = getVariable(st, argv[i] + 1)) == NULL) |
| { |
| fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[i]); |
| return false; |
| } |
| |
| arglen = strlen(arg); |
| if (len + arglen + (i > 0 ? 1 : 0) >= SHELL_COMMAND_SIZE - 1) |
| { |
| fprintf(stderr, "%s: too long shell command\n", argv[0]); |
| return false; |
| } |
| |
| if (i > 0) |
| command[len++] = ' '; |
| memcpy(command + len, arg, arglen); |
| len += arglen; |
| } |
| |
| command[len] = '\0'; |
| |
| /* Fast path for non-assignment case */ |
| if (variable == NULL) |
| { |
| if (system(command)) |
| { |
| if (!timer_exceeded) |
| fprintf(stderr, "%s: cannot launch shell command\n", argv[0]); |
| return false; |
| } |
| return true; |
| } |
| |
| /* Execute the command with pipe and read the standard output. */ |
| if ((fp = popen(command, "r")) == NULL) |
| { |
| fprintf(stderr, "%s: cannot launch shell command\n", argv[0]); |
| return false; |
| } |
| if (fgets(res, sizeof(res), fp) == NULL) |
| { |
| if (!timer_exceeded) |
| fprintf(stderr, "%s: cannot read the result\n", argv[0]); |
| return false; |
| } |
| if (pclose(fp) < 0) |
| { |
| fprintf(stderr, "%s: cannot close shell command\n", argv[0]); |
| return false; |
| } |
| |
| /* Check whether the result is an integer and assign it to the variable */ |
| retval = (int) strtol(res, &endptr, 10); |
| while (*endptr != '\0' && isspace((unsigned char) *endptr)) |
| endptr++; |
| if (*res == '\0' || *endptr != '\0') |
| { |
| fprintf(stderr, "%s: must return an integer ('%s' returned)\n", argv[0], res); |
| return false; |
| } |
| snprintf(res, sizeof(res), "%d", retval); |
| if (!putVariable(st, variable, res)) |
| { |
| fprintf(stderr, "%s: out of memory\n", argv[0]); |
| return false; |
| } |
| |
| #ifdef DEBUG |
| printf("shell parameter name: %s, value: %s\n", argv[1], res); |
| #endif |
| return true; |
| } |
| |
| #define MAX_PREPARE_NAME 32 |
| static void |
| preparedStatementName(char *buffer, int file, int state) |
| { |
| sprintf(buffer, "P%d_%d", file, state); |
| } |
| |
| static bool |
| clientDone(CState *st, bool ok) |
| { |
| (void) ok; /* unused */ |
| |
| if (st->con != NULL) |
| { |
| PQfinish(st->con); |
| st->con = NULL; |
| } |
| return false; /* always false */ |
| } |
| |
| /* return false iff client should be disconnected */ |
| static bool |
| doCustom(CState *st, instr_time *conn_time) |
| { |
| PGresult *res; |
| Command **commands; |
| |
| top: |
| commands = sql_files[st->use_file]; |
| |
| if (st->sleeping) |
| { /* are we sleeping? */ |
| instr_time now; |
| |
| INSTR_TIME_SET_CURRENT(now); |
| if (st->until <= INSTR_TIME_GET_MICROSEC(now)) |
| st->sleeping = 0; /* Done sleeping, go ahead with next command */ |
| else |
| return true; /* Still sleeping, nothing to do here */ |
| } |
| |
| if (st->listen) |
| { /* are we receiver? */ |
| if (commands[st->state]->type == SQL_COMMAND) |
| { |
| if (debug) |
| fprintf(stderr, "client %d receiving\n", st->id); |
| if (!PQconsumeInput(st->con)) |
| { /* there's something wrong */ |
| fprintf(stderr, "Client %d aborted in state %d. Probably the backend died while processing.\n", st->id, st->state); |
| return clientDone(st, false); |
| } |
| if (PQisBusy(st->con)) |
| return true; /* don't have the whole result yet */ |
| } |
| |
| /* |
| * transaction finished: record the time it took in the log |
| */ |
| if (use_log && commands[st->state + 1] == NULL) |
| { |
| instr_time now; |
| instr_time diff; |
| double usec; |
| |
| INSTR_TIME_SET_CURRENT(now); |
| diff = now; |
| INSTR_TIME_SUBTRACT(diff, st->txn_begin); |
| usec = (double) INSTR_TIME_GET_MICROSEC(diff); |
| |
| #ifndef WIN32 |
| /* This is more than we really ought to know about instr_time */ |
| fprintf(LOGFILE, "%d %d %.0f %d %ld %ld\n", |
| st->id, st->cnt, usec, st->use_file, |
| (long) now.tv_sec, (long) now.tv_usec); |
| #else |
| /* On Windows, instr_time doesn't provide a timestamp anyway */ |
| fprintf(LOGFILE, "%d %d %.0f %d 0 0\n", |
| st->id, st->cnt, usec, st->use_file); |
| #endif |
| } |
| |
| if (commands[st->state]->type == SQL_COMMAND) |
| { |
| res = PQgetResult(st->con); |
| switch (PQresultStatus(res)) |
| { |
| case PGRES_COMMAND_OK: |
| case PGRES_TUPLES_OK: |
| break; /* OK */ |
| default: |
| fprintf(stderr, "Client %d aborted in state %d: %s", |
| st->id, st->state, PQerrorMessage(st->con)); |
| PQclear(res); |
| return clientDone(st, false); |
| } |
| PQclear(res); |
| discard_response(st); |
| } |
| |
| if (commands[st->state + 1] == NULL) |
| { |
| if (is_connect) |
| { |
| PQfinish(st->con); |
| st->con = NULL; |
| } |
| |
| ++st->cnt; |
| if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded) |
| return clientDone(st, true); /* exit success */ |
| } |
| |
| /* increment state counter */ |
| st->state++; |
| if (commands[st->state] == NULL) |
| { |
| st->state = 0; |
| st->use_file = getrand(0, num_files - 1); |
| commands = sql_files[st->use_file]; |
| } |
| } |
| |
| if (st->con == NULL) |
| { |
| instr_time start, end; |
| |
| INSTR_TIME_SET_CURRENT(start); |
| if ((st->con = doConnect()) == NULL) |
| { |
| fprintf(stderr, "Client %d aborted in establishing connection.\n", st->id); |
| return clientDone(st, false); |
| } |
| INSTR_TIME_SET_CURRENT(end); |
| INSTR_TIME_ACCUM_DIFF(*conn_time, end, start); |
| } |
| |
| if (use_log && st->state == 0) |
| INSTR_TIME_SET_CURRENT(st->txn_begin); |
| |
| if (commands[st->state]->type == SQL_COMMAND) |
| { |
| const Command *command = commands[st->state]; |
| int r; |
| |
| if (querymode == QUERY_SIMPLE) |
| { |
| char *sql; |
| |
| if ((sql = strdup(command->argv[0])) == NULL |
| || (sql = assignVariables(st, sql)) == NULL) |
| { |
| fprintf(stderr, "out of memory\n"); |
| st->ecnt++; |
| return true; |
| } |
| |
| if (debug) |
| fprintf(stderr, "client %d sending %s\n", st->id, sql); |
| r = PQsendQuery(st->con, sql); |
| free(sql); |
| } |
| else if (querymode == QUERY_EXTENDED) |
| { |
| const char *sql = command->argv[0]; |
| const char *params[MAX_ARGS]; |
| |
| getQueryParams(st, command, params); |
| |
| if (debug) |
| fprintf(stderr, "client %d sending %s\n", st->id, sql); |
| r = PQsendQueryParams(st->con, sql, command->argc - 1, |
| NULL, params, NULL, NULL, 0); |
| } |
| else if (querymode == QUERY_PREPARED) |
| { |
| char name[MAX_PREPARE_NAME]; |
| const char *params[MAX_ARGS]; |
| |
| if (!st->prepared[st->use_file]) |
| { |
| int j; |
| |
| for (j = 0; commands[j] != NULL; j++) |
| { |
| PGresult *res; |
| char name[MAX_PREPARE_NAME]; |
| |
| if (commands[j]->type != SQL_COMMAND) |
| continue; |
| preparedStatementName(name, st->use_file, j); |
| res = PQprepare(st->con, name, |
| commands[j]->argv[0], commands[j]->argc - 1, NULL); |
| if (PQresultStatus(res) != PGRES_COMMAND_OK) |
| fprintf(stderr, "%s", PQerrorMessage(st->con)); |
| PQclear(res); |
| } |
| st->prepared[st->use_file] = true; |
| } |
| |
| getQueryParams(st, command, params); |
| preparedStatementName(name, st->use_file, st->state); |
| |
| if (debug) |
| fprintf(stderr, "client %d sending %s\n", st->id, name); |
| r = PQsendQueryPrepared(st->con, name, command->argc - 1, |
| params, NULL, NULL, 0); |
| } |
| else /* unknown sql mode */ |
| r = 0; |
| |
| if (r == 0) |
| { |
| if (debug) |
| fprintf(stderr, "client %d cannot send %s\n", st->id, command->argv[0]); |
| st->ecnt++; |
| } |
| else |
| st->listen = 1; /* flags that should be listened */ |
| } |
| else if (commands[st->state]->type == META_COMMAND) |
| { |
| int argc = commands[st->state]->argc, |
| i; |
| char **argv = commands[st->state]->argv; |
| |
| if (debug) |
| { |
| fprintf(stderr, "client %d executing \\%s", st->id, argv[0]); |
| for (i = 1; i < argc; i++) |
| fprintf(stderr, " %s", argv[i]); |
| fprintf(stderr, "\n"); |
| } |
| |
| if (pg_strcasecmp(argv[0], "setrandom") == 0) |
| { |
| char *var; |
| int min, |
| max; |
| char res[64]; |
| |
| if (*argv[2] == ':') |
| { |
| if ((var = getVariable(st, argv[2] + 1)) == NULL) |
| { |
| fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]); |
| st->ecnt++; |
| return true; |
| } |
| min = atoi(var); |
| } |
| else |
| min = atoi(argv[2]); |
| |
| #ifdef NOT_USED |
| if (min < 0) |
| { |
| fprintf(stderr, "%s: invalid minimum number %d\n", argv[0], min); |
| st->ecnt++; |
| return; |
| } |
| #endif |
| |
| if (*argv[3] == ':') |
| { |
| if ((var = getVariable(st, argv[3] + 1)) == NULL) |
| { |
| fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[3]); |
| st->ecnt++; |
| return true; |
| } |
| max = atoi(var); |
| } |
| else |
| max = atoi(argv[3]); |
| |
| if (max < min || max > MAX_RANDOM_VALUE) |
| { |
| fprintf(stderr, "%s: invalid maximum number %d\n", argv[0], max); |
| st->ecnt++; |
| return true; |
| } |
| |
| #ifdef DEBUG |
| printf("min: %d max: %d random: %d\n", min, max, getrand(min, max)); |
| #endif |
| snprintf(res, sizeof(res), "%d", getrand(min, max)); |
| |
| if (putVariable(st, argv[1], res) == false) |
| { |
| fprintf(stderr, "%s: out of memory\n", argv[0]); |
| st->ecnt++; |
| return true; |
| } |
| |
| st->listen = 1; |
| } |
| else if (pg_strcasecmp(argv[0], "set") == 0) |
| { |
| char *var; |
| int ope1, |
| ope2; |
| char res[64]; |
| |
| if (*argv[2] == ':') |
| { |
| if ((var = getVariable(st, argv[2] + 1)) == NULL) |
| { |
| fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[2]); |
| st->ecnt++; |
| return true; |
| } |
| ope1 = atoi(var); |
| } |
| else |
| ope1 = atoi(argv[2]); |
| |
| if (argc < 5) |
| snprintf(res, sizeof(res), "%d", ope1); |
| else |
| { |
| if (*argv[4] == ':') |
| { |
| if ((var = getVariable(st, argv[4] + 1)) == NULL) |
| { |
| fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[4]); |
| st->ecnt++; |
| return true; |
| } |
| ope2 = atoi(var); |
| } |
| else |
| ope2 = atoi(argv[4]); |
| |
| if (strcmp(argv[3], "+") == 0) |
| snprintf(res, sizeof(res), "%d", ope1 + ope2); |
| else if (strcmp(argv[3], "-") == 0) |
| snprintf(res, sizeof(res), "%d", ope1 - ope2); |
| else if (strcmp(argv[3], "*") == 0) |
| snprintf(res, sizeof(res), "%d", ope1 * ope2); |
| else if (strcmp(argv[3], "/") == 0) |
| { |
| if (ope2 == 0) |
| { |
| fprintf(stderr, "%s: division by zero\n", argv[0]); |
| st->ecnt++; |
| return true; |
| } |
| snprintf(res, sizeof(res), "%d", ope1 / ope2); |
| } |
| else |
| { |
| fprintf(stderr, "%s: unsupported operator %s\n", argv[0], argv[3]); |
| st->ecnt++; |
| return true; |
| } |
| } |
| |
| if (putVariable(st, argv[1], res) == false) |
| { |
| fprintf(stderr, "%s: out of memory\n", argv[0]); |
| st->ecnt++; |
| return true; |
| } |
| |
| st->listen = 1; |
| } |
| else if (pg_strcasecmp(argv[0], "sleep") == 0) |
| { |
| char *var; |
| int usec; |
| instr_time now; |
| |
| if (*argv[1] == ':') |
| { |
| if ((var = getVariable(st, argv[1] + 1)) == NULL) |
| { |
| fprintf(stderr, "%s: undefined variable %s\n", argv[0], argv[1]); |
| st->ecnt++; |
| return true; |
| } |
| usec = atoi(var); |
| } |
| else |
| usec = atoi(argv[1]); |
| |
| if (argc > 2) |
| { |
| if (pg_strcasecmp(argv[2], "ms") == 0) |
| usec *= 1000; |
| else if (pg_strcasecmp(argv[2], "s") == 0) |
| usec *= 1000000; |
| } |
| else |
| usec *= 1000000; |
| |
| INSTR_TIME_SET_CURRENT(now); |
| st->until = INSTR_TIME_GET_MICROSEC(now) + usec; |
| st->sleeping = 1; |
| |
| st->listen = 1; |
| } |
| else if (pg_strcasecmp(argv[0], "setshell") == 0) |
| { |
| bool ret = runShellCommand(st, argv[1], argv + 2, argc - 2); |
| |
| if (timer_exceeded) /* timeout */ |
| return clientDone(st, true); |
| else if (!ret) /* on error */ |
| { |
| st->ecnt++; |
| return true; |
| } |
| else /* succeeded */ |
| st->listen = 1; |
| } |
| else if (pg_strcasecmp(argv[0], "shell") == 0) |
| { |
| bool ret = runShellCommand(st, NULL, argv + 1, argc - 1); |
| |
| if (timer_exceeded) /* timeout */ |
| return clientDone(st, true); |
| else if (!ret) /* on error */ |
| { |
| st->ecnt++; |
| return true; |
| } |
| else /* succeeded */ |
| st->listen = 1; |
| } |
| goto top; |
| } |
| |
| return true; |
| } |
| |
| /* discard connections */ |
| static void |
| disconnect_all(CState *state, int length) |
| { |
| int i; |
| |
| for (i = 0; i < length; i++) |
| { |
| if (state[i].con) |
| { |
| PQfinish(state[i].con); |
| state[i].con = NULL; |
| } |
| } |
| } |
| |
| /* create tables and setup data */ |
| static void |
| init(void) |
| { |
| /* |
| * Note: TPC-B requires at least 100 bytes per row, and the "filler" |
| * fields in these table declarations were intended to comply with that. |
| * But because they default to NULLs, they don't actually take any space. |
| * We could fix that by giving them non-null default values. However, that |
| * would completely break comparability of pgbench results with prior |
| * versions. Since pgbench has never pretended to be fully TPC-B |
| * compliant anyway, we stick with the historical behavior. |
| */ |
| static char *DDLs[] = { |
| "drop table if exists pgbench_branches", |
| "create table pgbench_branches(bid int not null,bbalance int,filler char(88)) with (fillfactor=%d, %s) DISTRIBUTED BY (bid)", |
| "drop table if exists pgbench_tellers", |
| "create table pgbench_tellers(tid int not null,bid int,tbalance int,filler char(84)) with (fillfactor=%d, %s) DISTRIBUTED BY (tid)", |
| "drop table if exists pgbench_accounts", |
| "create table pgbench_accounts(aid int not null,bid int,abalance int,filler char(84)) with (fillfactor=%d, %s) DISTRIBUTED BY (aid)", |
| "drop table if exists pgbench_history", |
| "create table pgbench_history(tid int,bid int,aid int,delta int,mtime timestamp,filler char(22)) with (%s) DISTRIBUTED BY (tid)" |
| }; |
| static char *DDLAFTERs[] = { |
| "alter table pgbench_branches add primary key (bid)", |
| "alter table pgbench_tellers add primary key (tid)", |
| "alter table pgbench_accounts add primary key (aid)" |
| }; |
| static char *NON_UNIQUE_INDEX_DDLAFTERs[] = { |
| "CREATE INDEX branch_idx ON pgbench_branches (bid) ", |
| "CREATE INDEX teller_idx ON pgbench_tellers (tid) ", |
| "CREATE INDEX account_idx ON pgbench_accounts (aid) " |
| }; |
| |
| PGconn *con; |
| PGresult *res; |
| char sql[256]; |
| int i; |
| |
| if ((con = doConnect()) == NULL) |
| exit(1); |
| |
| fprintf(stderr, "creating tables...\n"); |
| |
| for (i = 0; i < lengthof(DDLs); i++) |
| { |
| /* |
| * set fillfactor for branches, tellers and accounts tables |
| */ |
| if ((strstr(DDLs[i], "create table pgbench_branches") == DDLs[i]) || |
| (strstr(DDLs[i], "create table pgbench_tellers") == DDLs[i]) || |
| (strstr(DDLs[i], "create table pgbench_accounts") == DDLs[i])) |
| { |
| char ddl_stmt[256]; |
| snprintf(ddl_stmt, 256, DDLs[i], fillfactor, storage_clause); |
| fprintf(stderr, "%s\n", ddl_stmt); |
| executeStatement(con, ddl_stmt); |
| } |
| else if (strstr(DDLs[i], "create table pgbench_history") == DDLs[i]) |
| { |
| char ddl_stmt[256]; |
| snprintf(ddl_stmt, 256, DDLs[i], storage_clause); |
| fprintf(stderr, "%s\n", ddl_stmt); |
| executeStatement(con, ddl_stmt); |
| } |
| else |
| { |
| executeStatement(con, DDLs[i]); |
| } |
| } |
| |
| executeStatement(con, "begin"); |
| |
| for (i = 0; i < nbranches * scale; i++) |
| { |
| snprintf(sql, 256, "insert into pgbench_branches(bid,bbalance) values(%d,0)", i + 1); |
| executeStatement(con, sql); |
| } |
| |
| for (i = 0; i < ntellers * scale; i++) |
| { |
| snprintf(sql, 256, "insert into pgbench_tellers(tid,bid,tbalance) values (%d,%d,0)", |
| i + 1, i / ntellers + 1); |
| executeStatement(con, sql); |
| } |
| |
| executeStatement(con, "commit"); |
| |
| /* |
| * fill the pgbench_accounts table with some data |
| */ |
| |
| executeStatement(con, "begin"); |
| executeStatement(con, "truncate pgbench_accounts"); |
| |
| res = PQexec(con, "copy pgbench_accounts from stdin"); |
| if (PQresultStatus(res) != PGRES_COPY_IN) |
| { |
| fprintf(stderr, "%s", PQerrorMessage(con)); |
| exit(1); |
| } |
| PQclear(res); |
| |
| for (i = 0; i < naccounts * scale; i++) |
| { |
| int j = i + 1; |
| |
| snprintf(sql, 256, "%d\t%d\t%d\t\n", j, i / naccounts + 1, 0); |
| if (PQputline(con, sql)) |
| { |
| fprintf(stderr, "PQputline failed\n"); |
| exit(1); |
| } |
| |
| if (j % 10000 == 0) |
| fprintf(stderr, "%d tuples done.\n", j); |
| } |
| if (PQputline(con, "\\.\n")) |
| { |
| fprintf(stderr, "very last PQputline failed\n"); |
| exit(1); |
| } |
| if (PQendcopy(con)) |
| { |
| fprintf(stderr, "PQendcopy failed\n"); |
| exit(1); |
| } |
| executeStatement(con, "commit"); |
| |
| /* |
| * create indexes |
| */ |
| fprintf(stderr, "creating indexes...\n"); |
| if (use_unique_key) |
| { |
| for (i = 0; i < lengthof(DDLAFTERs); i++) |
| { |
| fprintf(stderr, "%s\n", DDLAFTERs[i]); |
| executeStatement(con, DDLAFTERs[i]); |
| } |
| } |
| else |
| { |
| for (i = 0; i < lengthof(NON_UNIQUE_INDEX_DDLAFTERs); i++) |
| { |
| fprintf(stderr, "%s\n", NON_UNIQUE_INDEX_DDLAFTERs[i]); |
| executeStatement(con, NON_UNIQUE_INDEX_DDLAFTERs[i]); |
| } |
| } |
| |
| /* vacuum */ |
| fprintf(stderr, "vacuum..."); |
| executeStatement(con, "vacuum analyze pgbench_branches"); |
| executeStatement(con, "vacuum analyze pgbench_tellers"); |
| executeStatement(con, "vacuum analyze pgbench_accounts"); |
| executeStatement(con, "vacuum analyze pgbench_history"); |
| |
| fprintf(stderr, "done.\n"); |
| PQfinish(con); |
| } |
| |
| /* |
| * Parse the raw sql and replace :param to $n. |
| */ |
| static bool |
| parseQuery(Command *cmd, const char *raw_sql) |
| { |
| char *sql, |
| *p; |
| |
| sql = strdup(raw_sql); |
| if (sql == NULL) |
| return false; |
| cmd->argc = 1; |
| |
| p = sql; |
| while ((p = strchr(p, ':')) != NULL) |
| { |
| char var[12]; |
| char *name; |
| int eaten; |
| |
| name = parseVariable(p, &eaten); |
| if (name == NULL) |
| { |
| while (*p == ':') |
| { |
| p++; |
| } |
| continue; |
| } |
| |
| if (cmd->argc >= MAX_ARGS) |
| { |
| fprintf(stderr, "statement has too many arguments (maximum is %d): %s\n", MAX_ARGS - 1, raw_sql); |
| return false; |
| } |
| |
| sprintf(var, "$%d", cmd->argc); |
| if ((p = replaceVariable(&sql, p, eaten, var)) == NULL) |
| return false; |
| |
| cmd->argv[cmd->argc] = name; |
| cmd->argc++; |
| } |
| |
| cmd->argv[0] = sql; |
| return true; |
| } |
| |
| static Command * |
| process_commands(char *buf) |
| { |
| const char delim[] = " \f\n\r\t\v"; |
| |
| Command *my_commands; |
| int j; |
| char *p, |
| *tok; |
| |
| if ((p = strchr(buf, '\n')) != NULL) |
| *p = '\0'; |
| |
| p = buf; |
| while (isspace((unsigned char) *p)) |
| p++; |
| |
| if (*p == '\0' || strncmp(p, "--", 2) == 0) |
| { |
| return NULL; |
| } |
| |
| my_commands = (Command *) malloc(sizeof(Command)); |
| if (my_commands == NULL) |
| { |
| return NULL; |
| } |
| |
| my_commands->argc = 0; |
| |
| if (*p == '\\') |
| { |
| my_commands->type = META_COMMAND; |
| |
| j = 0; |
| tok = strtok(++p, delim); |
| |
| while (tok != NULL) |
| { |
| if ((my_commands->argv[j] = strdup(tok)) == NULL) |
| return NULL; |
| |
| my_commands->argc++; |
| |
| j++; |
| tok = strtok(NULL, delim); |
| } |
| |
| if (pg_strcasecmp(my_commands->argv[0], "setrandom") == 0) |
| { |
| if (my_commands->argc < 4) |
| { |
| fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]); |
| return NULL; |
| } |
| |
| for (j = 4; j < my_commands->argc; j++) |
| fprintf(stderr, "%s: extra argument \"%s\" ignored\n", |
| my_commands->argv[0], my_commands->argv[j]); |
| } |
| else if (pg_strcasecmp(my_commands->argv[0], "set") == 0) |
| { |
| if (my_commands->argc < 3) |
| { |
| fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]); |
| return NULL; |
| } |
| |
| for (j = my_commands->argc < 5 ? 3 : 5; j < my_commands->argc; j++) |
| fprintf(stderr, "%s: extra argument \"%s\" ignored\n", |
| my_commands->argv[0], my_commands->argv[j]); |
| } |
| else if (pg_strcasecmp(my_commands->argv[0], "sleep") == 0) |
| { |
| if (my_commands->argc < 2) |
| { |
| fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]); |
| return NULL; |
| } |
| |
| /* |
| * Split argument into number and unit to allow "sleep 1ms" etc. |
| * We don't have to terminate the number argument with null |
| * because it will be parsed with atoi, which ignores trailing |
| * non-digit characters. |
| */ |
| if (my_commands->argv[1][0] != ':') |
| { |
| char *c = my_commands->argv[1]; |
| |
| while (isdigit((unsigned char) *c)) |
| c++; |
| if (*c) |
| { |
| my_commands->argv[2] = c; |
| if (my_commands->argc < 3) |
| my_commands->argc = 3; |
| } |
| } |
| |
| if (my_commands->argc >= 3) |
| { |
| if (pg_strcasecmp(my_commands->argv[2], "us") != 0 && |
| pg_strcasecmp(my_commands->argv[2], "ms") != 0 && |
| pg_strcasecmp(my_commands->argv[2], "s")) |
| { |
| fprintf(stderr, "%s: unknown time unit '%s' - must be us, ms or s\n", |
| my_commands->argv[0], my_commands->argv[2]); |
| return NULL; |
| } |
| } |
| |
| for (j = 3; j < my_commands->argc; j++) |
| fprintf(stderr, "%s: extra argument \"%s\" ignored\n", |
| my_commands->argv[0], my_commands->argv[j]); |
| } |
| else if (pg_strcasecmp(my_commands->argv[0], "setshell") == 0) |
| { |
| if (my_commands->argc < 3) |
| { |
| fprintf(stderr, "%s: missing argument\n", my_commands->argv[0]); |
| return NULL; |
| } |
| } |
| else if (pg_strcasecmp(my_commands->argv[0], "shell") == 0) |
| { |
| if (my_commands->argc < 1) |
| { |
| fprintf(stderr, "%s: missing command\n", my_commands->argv[0]); |
| return NULL; |
| } |
| } |
| else |
| { |
| fprintf(stderr, "Invalid command %s\n", my_commands->argv[0]); |
| return NULL; |
| } |
| } |
| else |
| { |
| my_commands->type = SQL_COMMAND; |
| |
| switch (querymode) |
| { |
| case QUERY_SIMPLE: |
| if ((my_commands->argv[0] = strdup(p)) == NULL) |
| return NULL; |
| my_commands->argc++; |
| break; |
| case QUERY_EXTENDED: |
| case QUERY_PREPARED: |
| if (!parseQuery(my_commands, p)) |
| return NULL; |
| break; |
| default: |
| return NULL; |
| } |
| } |
| |
| return my_commands; |
| } |
| |
| static int |
| process_file(char *filename) |
| { |
| #define COMMANDS_ALLOC_NUM 128 |
| |
| Command **my_commands; |
| FILE *fd; |
| int lineno; |
| char buf[BUFSIZ]; |
| int alloc_num; |
| |
| if (num_files >= MAX_FILES) |
| { |
| fprintf(stderr, "Up to only %d SQL files are allowed\n", MAX_FILES); |
| exit(1); |
| } |
| |
| alloc_num = COMMANDS_ALLOC_NUM; |
| my_commands = (Command **) malloc(sizeof(Command *) * alloc_num); |
| if (my_commands == NULL) |
| return false; |
| |
| if (strcmp(filename, "-") == 0) |
| fd = stdin; |
| else if ((fd = fopen(filename, "r")) == NULL) |
| { |
| fprintf(stderr, "%s: %s\n", filename, strerror(errno)); |
| return false; |
| } |
| |
| lineno = 0; |
| |
| while (fgets(buf, sizeof(buf), fd) != NULL) |
| { |
| Command *commands; |
| int i; |
| |
| i = 0; |
| while (isspace((unsigned char) buf[i])) |
| i++; |
| |
| if (buf[i] != '\0' && strncmp(&buf[i], "--", 2) != 0) |
| { |
| commands = process_commands(&buf[i]); |
| if (commands == NULL) |
| { |
| fclose(fd); |
| return false; |
| } |
| } |
| else |
| continue; |
| |
| my_commands[lineno] = commands; |
| lineno++; |
| |
| if (lineno >= alloc_num) |
| { |
| alloc_num += COMMANDS_ALLOC_NUM; |
| my_commands = realloc(my_commands, sizeof(Command *) * alloc_num); |
| if (my_commands == NULL) |
| { |
| fclose(fd); |
| return false; |
| } |
| } |
| } |
| fclose(fd); |
| |
| my_commands[lineno] = NULL; |
| |
| sql_files[num_files++] = my_commands; |
| |
| return true; |
| } |
| |
| static Command ** |
| process_builtin(char *tb) |
| { |
| #define COMMANDS_ALLOC_NUM 128 |
| |
| Command **my_commands; |
| int lineno; |
| char buf[BUFSIZ]; |
| int alloc_num; |
| |
| if (*tb == '\0') |
| return NULL; |
| |
| alloc_num = COMMANDS_ALLOC_NUM; |
| my_commands = (Command **) malloc(sizeof(Command *) * alloc_num); |
| if (my_commands == NULL) |
| return NULL; |
| |
| lineno = 0; |
| |
| for (;;) |
| { |
| char *p; |
| Command *commands; |
| |
| p = buf; |
| while (*tb && *tb != '\n') |
| *p++ = *tb++; |
| |
| if (*tb == '\0') |
| break; |
| |
| if (*tb == '\n') |
| tb++; |
| |
| *p = '\0'; |
| |
| commands = process_commands(buf); |
| if (commands == NULL) |
| { |
| return NULL; |
| } |
| |
| my_commands[lineno] = commands; |
| lineno++; |
| |
| if (lineno >= alloc_num) |
| { |
| alloc_num += COMMANDS_ALLOC_NUM; |
| my_commands = realloc(my_commands, sizeof(Command *) * alloc_num); |
| if (my_commands == NULL) |
| { |
| return NULL; |
| } |
| } |
| } |
| |
| my_commands[lineno] = NULL; |
| |
| return my_commands; |
| } |
| |
| /* print out results */ |
| static void |
| printResults(int ttype, int normal_xacts, int nclients, int nthreads, |
| instr_time total_time, instr_time conn_total_time) |
| { |
| double time_include, |
| tps_include, |
| tps_exclude; |
| char *s; |
| |
| time_include = INSTR_TIME_GET_DOUBLE(total_time); |
| tps_include = normal_xacts / time_include; |
| tps_exclude = normal_xacts / (time_include - |
| (INSTR_TIME_GET_DOUBLE(conn_total_time) / nthreads)); |
| |
| if (ttype == 0) |
| s = "TPC-B (sort of)"; |
| else if (ttype == 2) |
| s = "Update only pgbench_accounts"; |
| else if (ttype == 1) |
| s = "SELECT only"; |
| else |
| s = "Custom query"; |
| |
| printf("transaction type: %s\n", s); |
| printf("scaling factor: %d\n", scale); |
| printf("query mode: %s\n", QUERYMODE[querymode]); |
| printf("number of clients: %d\n", nclients); |
| printf("number of threads: %d\n", nthreads); |
| if (duration <= 0) |
| { |
| printf("number of transactions per client: %d\n", nxacts); |
| printf("number of transactions actually processed: %d/%d\n", |
| normal_xacts, nxacts * nclients); |
| } |
| else |
| { |
| printf("duration: %d s\n", duration); |
| printf("number of transactions actually processed: %d\n", |
| normal_xacts); |
| } |
| printf("tps = %f (including connections establishing)\n", tps_include); |
| printf("tps = %f (excluding connections establishing)\n", tps_exclude); |
| } |
| |
| |
| int |
| main(int argc, char **argv) |
| { |
| int c; |
| int nclients = 1; /* default number of simulated clients */ |
| int nthreads = 1; /* default number of threads */ |
| int is_init_mode = 0; /* initialize mode? */ |
| int is_no_vacuum = 0; /* no vacuum at all before testing? */ |
| int do_vacuum_accounts = 0; /* do vacuum accounts before testing? */ |
| int ttype = 0; /* transaction type. 0: TPC-B, 1: SELECT only, |
| * 2: skip update of branches and tellers */ |
| char *filename = NULL; |
| bool scale_given = false; |
| |
| CState *state; /* status of clients */ |
| TState *threads; /* array of thread */ |
| |
| instr_time start_time; /* start up time */ |
| instr_time total_time; |
| instr_time conn_total_time; |
| int total_xacts; |
| |
| int i; |
| |
| #ifdef HAVE_GETRLIMIT |
| struct rlimit rlim; |
| #endif |
| |
| PGconn *con; |
| PGresult *res; |
| char *env; |
| |
| char val[64]; |
| |
| const char *progname; |
| |
| progname = get_progname(argv[0]); |
| |
| if (argc > 1) |
| { |
| if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0) |
| { |
| usage(progname); |
| exit(0); |
| } |
| if (strcmp(argv[1], "--version") == 0 || strcmp(argv[1], "-V") == 0) |
| { |
| puts("pgbench (PostgreSQL) " PG_VERSION); |
| exit(0); |
| } |
| } |
| |
| #ifdef WIN32 |
| /* stderr is buffered on Win32. */ |
| setvbuf(stderr, NULL, _IONBF, 0); |
| #endif |
| |
| if ((env = getenv("PGHOST")) != NULL && *env != '\0') |
| pghost = env; |
| if ((env = getenv("PGPORT")) != NULL && *env != '\0') |
| pgport = env; |
| else if ((env = getenv("PGUSER")) != NULL && *env != '\0') |
| login = env; |
| |
| state = (CState *) malloc(sizeof(CState)); |
| if (state == NULL) |
| { |
| fprintf(stderr, "Couldn't allocate memory for state\n"); |
| exit(1); |
| } |
| |
| memset(state, 0, sizeof(*state)); |
| |
| while ((c = getopt(argc, argv, "ih:nvp:dSNc:Cs:t:T:U:lf:D:F:M:j:x:q")) != -1) |
| { |
| switch (c) |
| { |
| case 'i': |
| is_init_mode++; |
| break; |
| case 'q': |
| use_unique_key = 0; |
| break; |
| case 'x': |
| storage_clause = optarg; |
| break; |
| case 'h': |
| pghost = optarg; |
| break; |
| case 'n': |
| is_no_vacuum++; |
| break; |
| case 'v': |
| do_vacuum_accounts++; |
| break; |
| case 'p': |
| pgport = optarg; |
| break; |
| case 'd': |
| debug++; |
| break; |
| case 'S': |
| ttype = 1; |
| break; |
| case 'N': |
| ttype = 2; |
| break; |
| case 'c': |
| nclients = atoi(optarg); |
| if (nclients <= 0 || nclients > MAXCLIENTS) |
| { |
| fprintf(stderr, "invalid number of clients: %d\n", nclients); |
| exit(1); |
| } |
| #ifdef HAVE_GETRLIMIT |
| #ifdef RLIMIT_NOFILE /* most platforms use RLIMIT_NOFILE */ |
| if (getrlimit(RLIMIT_NOFILE, &rlim) == -1) |
| #else /* but BSD doesn't ... */ |
| if (getrlimit(RLIMIT_OFILE, &rlim) == -1) |
| #endif /* RLIMIT_NOFILE */ |
| { |
| fprintf(stderr, "getrlimit failed: %s\n", strerror(errno)); |
| exit(1); |
| } |
| if (rlim.rlim_cur <= (nclients + 2)) |
| { |
| fprintf(stderr, "You need at least %d open files but you are only allowed to use %ld.\n", nclients + 2, (long) rlim.rlim_cur); |
| fprintf(stderr, "Use limit/ulimit to increase the limit before using pgbench.\n"); |
| exit(1); |
| } |
| #endif /* HAVE_GETRLIMIT */ |
| break; |
| case 'j': /* jobs */ |
| nthreads = atoi(optarg); |
| if (nthreads <= 0) |
| { |
| fprintf(stderr, "invalid number of threads: %d\n", nthreads); |
| exit(1); |
| } |
| break; |
| case 'C': |
| is_connect = 1; |
| break; |
| case 's': |
| scale_given = true; |
| scale = atoi(optarg); |
| if (scale <= 0) |
| { |
| fprintf(stderr, "invalid scaling factor: %d\n", scale); |
| exit(1); |
| } |
| break; |
| case 't': |
| if (duration > 0) |
| { |
| fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n"); |
| exit(1); |
| } |
| nxacts = atoi(optarg); |
| if (nxacts <= 0) |
| { |
| fprintf(stderr, "invalid number of transactions: %d\n", nxacts); |
| exit(1); |
| } |
| break; |
| case 'T': |
| if (nxacts > 0) |
| { |
| fprintf(stderr, "specify either a number of transactions (-t) or a duration (-T), not both.\n"); |
| exit(1); |
| } |
| duration = atoi(optarg); |
| if (duration <= 0) |
| { |
| fprintf(stderr, "invalid duration: %d\n", duration); |
| exit(1); |
| } |
| break; |
| case 'U': |
| login = optarg; |
| break; |
| case 'l': |
| use_log = true; |
| break; |
| case 'f': |
| ttype = 3; |
| filename = optarg; |
| if (process_file(filename) == false || *sql_files[num_files - 1] == NULL) |
| exit(1); |
| break; |
| case 'D': |
| { |
| char *p; |
| |
| if ((p = strchr(optarg, '=')) == NULL || p == optarg || *(p + 1) == '\0') |
| { |
| fprintf(stderr, "invalid variable definition: %s\n", optarg); |
| exit(1); |
| } |
| |
| *p++ = '\0'; |
| if (putVariable(&state[0], optarg, p) == false) |
| { |
| fprintf(stderr, "Couldn't allocate memory for variable\n"); |
| exit(1); |
| } |
| } |
| break; |
| case 'F': |
| fillfactor = atoi(optarg); |
| if ((fillfactor < 10) || (fillfactor > 100)) |
| { |
| fprintf(stderr, "invalid fillfactor: %d\n", fillfactor); |
| exit(1); |
| } |
| break; |
| case 'M': |
| if (num_files > 0) |
| { |
| fprintf(stderr, "query mode (-M) should be specifiled before transaction scripts (-f)\n"); |
| exit(1); |
| } |
| for (querymode = 0; querymode < NUM_QUERYMODE; querymode++) |
| if (strcmp(optarg, QUERYMODE[querymode]) == 0) |
| break; |
| if (querymode >= NUM_QUERYMODE) |
| { |
| fprintf(stderr, "invalid query mode (-M): %s\n", optarg); |
| exit(1); |
| } |
| break; |
| default: |
| fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname); |
| exit(1); |
| break; |
| } |
| } |
| |
| if (argc > optind) |
| dbName = argv[optind]; |
| else |
| { |
| if ((env = getenv("PGDATABASE")) != NULL && *env != '\0') |
| dbName = env; |
| else if (login != NULL && *login != '\0') |
| dbName = login; |
| else |
| dbName = ""; |
| } |
| |
| if (is_init_mode) |
| { |
| init(); |
| exit(0); |
| } |
| |
| /* Use DEFAULT_NXACTS if neither nxacts nor duration is specified. */ |
| if (nxacts <= 0 && duration <= 0) |
| nxacts = DEFAULT_NXACTS; |
| |
| if (nclients % nthreads != 0) |
| { |
| fprintf(stderr, "number of clients (%d) must be a multiple of number of threads (%d)\n", nclients, nthreads); |
| exit(1); |
| } |
| |
| if (nclients > 1) |
| { |
| state = (CState *) realloc(state, sizeof(CState) * nclients); |
| if (state == NULL) |
| { |
| fprintf(stderr, "Couldn't allocate memory for state\n"); |
| exit(1); |
| } |
| |
| memset(state + 1, 0, sizeof(*state) * (nclients - 1)); |
| |
| /* copy any -D switch values to all clients */ |
| for (i = 1; i < nclients; i++) |
| { |
| int j; |
| |
| state[i].id = i; |
| for (j = 0; j < state[0].nvariables; j++) |
| { |
| if (putVariable(&state[i], state[0].variables[j].name, state[0].variables[j].value) == false) |
| { |
| fprintf(stderr, "Couldn't allocate memory for variable\n"); |
| exit(1); |
| } |
| } |
| } |
| } |
| |
| if (use_log) |
| { |
| char logpath[64]; |
| |
| snprintf(logpath, 64, "pgbench_log.%d", (int) getpid()); |
| LOGFILE = fopen(logpath, "w"); |
| |
| if (LOGFILE == NULL) |
| { |
| fprintf(stderr, "Couldn't open logfile \"%s\": %s", logpath, strerror(errno)); |
| exit(1); |
| } |
| } |
| |
| if (debug) |
| { |
| if (duration <= 0) |
| printf("pghost: %s pgport: %s nclients: %d nxacts: %d dbName: %s\n", |
| pghost, pgport, nclients, nxacts, dbName); |
| else |
| printf("pghost: %s pgport: %s nclients: %d duration: %d dbName: %s\n", |
| pghost, pgport, nclients, duration, dbName); |
| } |
| |
| /* opening connection... */ |
| con = doConnect(); |
| if (con == NULL) |
| exit(1); |
| |
| if (PQstatus(con) == CONNECTION_BAD) |
| { |
| fprintf(stderr, "Connection to database '%s' failed.\n", dbName); |
| fprintf(stderr, "%s", PQerrorMessage(con)); |
| exit(1); |
| } |
| |
| if (ttype != 3) |
| { |
| /* |
| * get the scaling factor that should be same as count(*) from |
| * pgbench_branches if this is not a custom query |
| */ |
| res = PQexec(con, "select count(*) from pgbench_branches"); |
| if (PQresultStatus(res) != PGRES_TUPLES_OK) |
| { |
| fprintf(stderr, "%s", PQerrorMessage(con)); |
| exit(1); |
| } |
| scale = atoi(PQgetvalue(res, 0, 0)); |
| if (scale < 0) |
| { |
| fprintf(stderr, "count(*) from pgbench_branches invalid (%d)\n", scale); |
| exit(1); |
| } |
| PQclear(res); |
| |
| /* warn if we override user-given -s switch */ |
| if (scale_given) |
| fprintf(stderr, |
| "Scale option ignored, using pgbench_branches table count = %d\n", |
| scale); |
| } |
| |
| /* |
| * :scale variables normally get -s or database scale, but don't override |
| * an explicit -D switch |
| */ |
| if (getVariable(&state[0], "scale") == NULL) |
| { |
| snprintf(val, sizeof(val), "%d", scale); |
| for (i = 0; i < nclients; i++) |
| { |
| if (putVariable(&state[i], "scale", val) == false) |
| { |
| fprintf(stderr, "Couldn't allocate memory for variable\n"); |
| exit(1); |
| } |
| } |
| } |
| |
| if (!is_no_vacuum) |
| { |
| fprintf(stderr, "starting vacuum..."); |
| executeStatement(con, "vacuum pgbench_branches"); |
| executeStatement(con, "vacuum pgbench_tellers"); |
| executeStatement(con, "truncate pgbench_history"); |
| fprintf(stderr, "end.\n"); |
| |
| if (do_vacuum_accounts) |
| { |
| fprintf(stderr, "starting vacuum pgbench_accounts..."); |
| executeStatement(con, "vacuum analyze pgbench_accounts"); |
| fprintf(stderr, "end.\n"); |
| } |
| } |
| PQfinish(con); |
| |
| /* set random seed */ |
| INSTR_TIME_SET_CURRENT(start_time); |
| srandom((unsigned int) INSTR_TIME_GET_MICROSEC(start_time)); |
| |
| /* process builtin SQL scripts */ |
| switch (ttype) |
| { |
| case 0: |
| sql_files[0] = process_builtin(tpc_b); |
| num_files = 1; |
| break; |
| |
| case 1: |
| sql_files[0] = process_builtin(select_only); |
| num_files = 1; |
| break; |
| |
| case 2: |
| sql_files[0] = process_builtin(simple_update); |
| num_files = 1; |
| break; |
| |
| default: |
| break; |
| } |
| |
| /* get start up time */ |
| INSTR_TIME_SET_CURRENT(start_time); |
| |
| /* set alarm if duration is specified. */ |
| if (duration > 0) |
| setalarm(duration); |
| |
| /* start threads */ |
| threads = (TState *) malloc(sizeof(TState) * nthreads); |
| for (i = 0; i < nthreads; i++) |
| { |
| threads[i].state = &state[nclients / nthreads * i]; |
| threads[i].nstate = nclients / nthreads; |
| INSTR_TIME_SET_CURRENT(threads[i].start_time); |
| |
| /* the first thread (i = 0) is executed by main thread */ |
| if (i > 0) |
| { |
| int err = pthread_create(&threads[i].thread, NULL, threadRun, &threads[i]); |
| if (err != 0 || threads[i].thread == INVALID_THREAD) |
| { |
| fprintf(stderr, "cannot create thread: %s\n", strerror(err)); |
| exit(1); |
| } |
| } |
| else |
| { |
| threads[i].thread = INVALID_THREAD; |
| } |
| } |
| |
| /* wait for threads and accumulate results */ |
| total_xacts = 0; |
| INSTR_TIME_SET_ZERO(conn_total_time); |
| for (i = 0; i < nthreads; i++) |
| { |
| void *ret = NULL; |
| |
| if (threads[i].thread == INVALID_THREAD) |
| ret = threadRun(&threads[i]); |
| else |
| pthread_join(threads[i].thread, &ret); |
| |
| if (ret != NULL) |
| { |
| TResult *r = (TResult *) ret; |
| total_xacts += r->xacts; |
| INSTR_TIME_ADD(conn_total_time, r->conn_time); |
| free(ret); |
| } |
| } |
| disconnect_all(state, nclients); |
| |
| /* get end time */ |
| INSTR_TIME_SET_CURRENT(total_time); |
| INSTR_TIME_SUBTRACT(total_time, start_time); |
| printResults(ttype, total_xacts, nclients, nthreads, total_time, conn_total_time); |
| if (LOGFILE) |
| fclose(LOGFILE); |
| |
| return 0; |
| } |
| |
| static void * |
| threadRun(void *arg) |
| { |
| TState *thread = (TState *) arg; |
| CState *state = thread->state; |
| TResult *result; |
| instr_time start, end; |
| int nstate = thread->nstate; |
| int remains = nstate; /* number of remaining clients */ |
| int i; |
| |
| result = malloc(sizeof(TResult)); |
| INSTR_TIME_SET_ZERO(result->conn_time); |
| |
| if (is_connect == 0) |
| { |
| /* make connections to the database */ |
| for (i = 0; i < nstate; i++) |
| { |
| if ((state[i].con = doConnect()) == NULL) |
| goto done; |
| } |
| } |
| |
| /* time after thread and connections set up */ |
| INSTR_TIME_SET_CURRENT(result->conn_time); |
| INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time); |
| |
| /* send start up queries in async manner */ |
| for (i = 0; i < nstate; i++) |
| { |
| CState *st = &state[i]; |
| Command **commands = sql_files[st->use_file]; |
| int prev_ecnt = st->ecnt; |
| |
| st->use_file = getrand(0, num_files - 1); |
| if (!doCustom(st, &result->conn_time)) |
| remains--; /* I've aborted */ |
| |
| if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) |
| { |
| fprintf(stderr, "Client %d aborted in state %d. Execution meta-command failed.\n", i, st->state); |
| remains--; /* I've aborted */ |
| PQfinish(st->con); |
| st->con = NULL; |
| } |
| } |
| |
| while (remains > 0) |
| { |
| fd_set input_mask; |
| int maxsock; /* max socket number to be waited */ |
| int64 now_usec = 0; |
| int64 min_usec; |
| |
| FD_ZERO(&input_mask); |
| |
| maxsock = -1; |
| min_usec = INT64_MAX; |
| for (i = 0; i < nstate; i++) |
| { |
| CState *st = &state[i]; |
| Command **commands = sql_files[st->use_file]; |
| int sock; |
| |
| if (st->sleeping) |
| { |
| int this_usec; |
| |
| if (min_usec == INT64_MAX) |
| { |
| instr_time now; |
| INSTR_TIME_SET_CURRENT(now); |
| now_usec = INSTR_TIME_GET_MICROSEC(now); |
| } |
| |
| this_usec = st->until - now_usec; |
| if (min_usec > this_usec) |
| min_usec = this_usec; |
| } |
| else if (st->con == NULL) |
| { |
| continue; |
| } |
| else if (commands[st->state]->type == META_COMMAND) |
| { |
| min_usec = 0; /* the connection is ready to run */ |
| break; |
| } |
| |
| sock = PQsocket(st->con); |
| if (sock < 0) |
| { |
| fprintf(stderr, "bad socket: %s\n", strerror(errno)); |
| goto done; |
| } |
| |
| FD_SET(sock, &input_mask); |
| if (maxsock < sock) |
| maxsock = sock; |
| } |
| |
| if (min_usec > 0 && maxsock != -1) |
| { |
| int nsocks; /* return from select(2) */ |
| |
| if (min_usec != INT64_MAX) |
| { |
| struct timeval timeout; |
| timeout.tv_sec = min_usec / 1000000; |
| timeout.tv_usec = min_usec % 1000000; |
| nsocks = select(maxsock + 1, &input_mask, NULL, NULL, &timeout); |
| } |
| else |
| nsocks = select(maxsock + 1, &input_mask, NULL, NULL, NULL); |
| if (nsocks < 0) |
| { |
| if (errno == EINTR) |
| continue; |
| /* must be something wrong */ |
| fprintf(stderr, "select failed: %s\n", strerror(errno)); |
| goto done; |
| } |
| } |
| |
| /* ok, backend returns reply */ |
| for (i = 0; i < nstate; i++) |
| { |
| CState *st = &state[i]; |
| Command **commands = sql_files[st->use_file]; |
| int prev_ecnt = st->ecnt; |
| |
| if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask) |
| || commands[st->state]->type == META_COMMAND)) |
| { |
| if (!doCustom(st, &result->conn_time)) |
| remains--; /* I've aborted */ |
| } |
| |
| if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND) |
| { |
| fprintf(stderr, "Client %d aborted in state %d. Execution of meta-command failed.\n", i, st->state); |
| remains--; /* I've aborted */ |
| PQfinish(st->con); |
| st->con = NULL; |
| } |
| } |
| } |
| |
| done: |
| INSTR_TIME_SET_CURRENT(start); |
| disconnect_all(state, nstate); |
| result->xacts = 0; |
| for (i = 0; i < nstate; i++) |
| result->xacts += state[i].cnt; |
| INSTR_TIME_SET_CURRENT(end); |
| INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start); |
| return result; |
| } |
| |
| |
| /* |
| * Support for duration option: set timer_exceeded after so many seconds. |
| */ |
| |
| #ifndef WIN32 |
| |
| static void |
| handle_sig_alarm(SIGNAL_ARGS) |
| { |
| timer_exceeded = true; |
| } |
| |
| static void |
| setalarm(int seconds) |
| { |
| pqsignal(SIGALRM, handle_sig_alarm); |
| alarm(seconds); |
| } |
| |
| #ifndef ENABLE_THREAD_SAFETY |
| |
| /* |
| * implements pthread using fork. |
| */ |
| |
| typedef struct fork_pthread |
| { |
| pid_t pid; |
| int pipes[2]; |
| } fork_pthread; |
| |
| static int |
| pthread_create(pthread_t *thread, |
| pthread_attr_t *attr, |
| void * (*start_routine)(void *), |
| void *arg) |
| { |
| fork_pthread *th; |
| void *ret; |
| instr_time start_time; |
| |
| th = (fork_pthread *) malloc(sizeof(fork_pthread)); |
| pipe(th->pipes); |
| |
| th->pid = fork(); |
| if (th->pid == -1) /* error */ |
| { |
| free(th); |
| return errno; |
| } |
| if (th->pid != 0) /* in parent process */ |
| { |
| close(th->pipes[1]); |
| *thread = th; |
| return 0; |
| } |
| |
| /* in child process */ |
| close(th->pipes[0]); |
| |
| /* set alarm again because the child does not inherit timers */ |
| if (duration > 0) |
| setalarm(duration); |
| |
| /* |
| * Set a different random seed in each child process. Otherwise they |
| * all inherit the parent's state and generate the same "random" |
| * sequence. (In the threaded case, the different threads will obtain |
| * subsets of the output of a single random() sequence, which should be |
| * okay for our purposes.) |
| */ |
| INSTR_TIME_SET_CURRENT(start_time); |
| srandom(((unsigned int) INSTR_TIME_GET_MICROSEC(start_time)) + |
| ((unsigned int) getpid())); |
| |
| ret = start_routine(arg); |
| write(th->pipes[1], ret, sizeof(TResult)); |
| close(th->pipes[1]); |
| free(th); |
| exit(0); |
| } |
| |
| static int |
| pthread_join(pthread_t th, void **thread_return) |
| { |
| int status; |
| |
| while (waitpid(th->pid, &status, 0) != th->pid) |
| { |
| if (errno != EINTR) |
| return errno; |
| } |
| |
| if (thread_return != NULL) |
| { |
| /* assume result is TResult */ |
| *thread_return = malloc(sizeof(TResult)); |
| if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult)) |
| { |
| free(*thread_return); |
| *thread_return = NULL; |
| } |
| } |
| close(th->pipes[0]); |
| |
| free(th); |
| return 0; |
| } |
| |
| #endif |
| |
| #else /* WIN32 */ |
| |
| static VOID CALLBACK |
| win32_timer_callback(PVOID lpParameter, BOOLEAN TimerOrWaitFired) |
| { |
| timer_exceeded = true; |
| } |
| |
| static void |
| setalarm(int seconds) |
| { |
| HANDLE queue; |
| HANDLE timer; |
| |
| /* This function will be called at most once, so we can cheat a bit. */ |
| queue = CreateTimerQueue(); |
| if (seconds > ((DWORD) -1) / 1000 || |
| !CreateTimerQueueTimer(&timer, queue, |
| win32_timer_callback, NULL, seconds * 1000, 0, |
| WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE)) |
| { |
| fprintf(stderr, "Failed to set timer\n"); |
| exit(1); |
| } |
| } |
| |
| /* partial pthread implementation for Windows */ |
| |
| typedef struct win32_pthread |
| { |
| HANDLE handle; |
| void *(*routine)(void *); |
| void *arg; |
| void *result; |
| } win32_pthread; |
| |
| static unsigned __stdcall |
| win32_pthread_run(void *arg) |
| { |
| win32_pthread *th = (win32_pthread *) arg; |
| |
| th->result = th->routine(th->arg); |
| |
| return 0; |
| } |
| |
| static int |
| pthread_create(pthread_t *thread, |
| pthread_attr_t *attr, |
| void * (*start_routine)(void *), |
| void *arg) |
| { |
| int save_errno; |
| win32_pthread *th; |
| |
| th = (win32_pthread *) malloc(sizeof(win32_pthread)); |
| th->routine = start_routine; |
| th->arg = arg; |
| th->result = NULL; |
| |
| th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL); |
| if (th->handle == NULL) |
| { |
| save_errno = errno; |
| free(th); |
| return save_errno; |
| } |
| |
| *thread = th; |
| return 0; |
| } |
| |
| static int |
| pthread_join(pthread_t th, void **thread_return) |
| { |
| if (th == NULL || th->handle == NULL) |
| return errno = EINVAL; |
| |
| if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0) |
| { |
| _dosmaperr(GetLastError()); |
| return errno; |
| } |
| |
| if (thread_return) |
| *thread_return = th->result; |
| |
| CloseHandle(th->handle); |
| free(th); |
| return 0; |
| } |
| |
| #endif /* WIN32 */ |