blob: 1a5665be257e1a6b0b2c4647e74d5dc9c6027e42 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <thread>
#include <fstream>
#include <stdio.h>
#include "sql_util_parallel.h"
#include "string_util.h"
using std::string;
namespace hawq {
namespace test {
ThreadMessage::ThreadMessage(uint16_t tid)
{
command = NULLCOMMAND;
messageinfo = "";
isrunning = false;
thread_id = tid;
waittids.clear();
waitfortids.clear();
submit = 1 ; //commit
}
ThreadMessage::~ThreadMessage()
{
waittids.clear();
waitfortids.clear();
}
SQLUtilityParallel::SQLUtilityParallel() {
outputstream = NULL;
exclusive = false;
srand((unsigned)time(NULL));
}
SQLUtilityParallel::~SQLUtilityParallel() {
}
int32_t SQLUtilityParallel::__getTotalSession(std::fstream &in) {
string line;
int sessionnum = 0;
while (getline(in, line))
{
if (hawq::test::startsWith(line, "#"))
continue;
if (!hawq::test::startsWith(hawq::test::lower(line), "total"))
{
EXPECT_TRUE(false) << "The file format is error, "
"it should begin with total:number of sessions ";
return 0;
}
auto totalstring = hawq::test::split(line, ':');
if (totalstring.size() != 2 )
return false;
try
{
sessionnum = std::stoi(hawq::test::trim(totalstring[1]));
}
catch(...)
{
return 0;
}
break;
}
return sessionnum;
}
bool SQLUtilityParallel::__processLine(const string &line, const int32_t totalsession,
int32_t &sessionid, int32_t &waittid, string &message)
{
auto querystring = hawq::test::split(line, ':');
if (querystring.size() < 2 )
return false;
sessionid = std::stoi(hawq::test::trim(querystring[0])) ;
if (sessionid < 0 || sessionid > totalsession)
return false;
message = hawq::test::trim(querystring[querystring.size() - 1]);
thread_mutex.lock();
if(querystring.size() >= 3) {
waittid = 1;
if(!this->exclusive){
for (int i = 1; i < querystring.size() - 1; i++) {
int wait_tid = std::stoi(hawq::test::trim(querystring[i]));
threadmessage[wait_tid]->waittids.push_back(sessionid);
threadmessage[sessionid]->waitfortids.push_back(wait_tid);
}
}
}
thread_mutex.unlock();
return true;
}
bool SQLUtilityParallel::runParallelControlFile_exclusive(const string &sqlFile, const string &outputFile)
{
string line;
bool result = true;
int32_t totalsession = 0 , i = 0 , commitnum = 0 ,blocknum = 0,commit=0;
std::vector<std::unique_ptr<std::thread>> querythread;
// Step 1: Open input and output file
std::fstream in;
in.open(sqlFile, std::ios::in);
outputstream = fopen(outputFile.c_str(), "w");
if (!in.is_open() || !outputstream) {
EXPECT_TRUE(false) << "Error opening input/output file ";
result = false;
goto clear;
}
// Step 2: clear env and get total session number
threadmessage.clear();
totalsession = __getTotalSession(in);
if (totalsession <= 0 )
{
result = false;
goto clear;
}
try
{
// Step 3: Initialize thread message, query thread, and connection pool
std::unique_ptr<ThreadMessage> thrm(new ThreadMessage(0));
thrm->command = NEXTLINE;
threadmessage.push_back(std::move(thrm));
string schemamessage = "SET SEARCH_PATH = " + this->getSchemaName() + ";" ;
string connstr = this->getConnectionString();
//begin
for(i = 1 ; i <= totalsession; i++ ){
std::unique_ptr<ThreadMessage> thrm(new ThreadMessage(i));
threadmessage.push_back(std::move(thrm));
}
for(i = 1 ; i <= totalsession; i++ ) {
std::unique_ptr<std::thread> thr(new std::thread
(&SQLUtilityParallel::thread_execute_exclusive, this, i, schemamessage, connstr , totalsession));
querythread.push_back(std::move(thr));
}
// Step 4: Read every line of file and pass to sub thread
while (getline(in, line))
{
if (hawq::test::startsWith(line, "#"))
continue;
if (hawq::test::startsWith(line, "$"))
{
commit=1;
continue;
}
int32_t sessionid = 0 , waittid = 0;
string message = "";
result = __processLine(line, totalsession, sessionid, waittid, message);
if (!result)
goto clear;
if (sessionid == 0)
{
this->execute(message, true);
}
else
{
if ( threadmessage[sessionid]->command != NULLCOMMAND )
{
EXPECT_TRUE(false) << "The file format is error or query handling error\n";
result = false;
goto clear;
}
MessageCommandType cmd = (waittid == 0) ? SQLSTRING : BLOCKEDSQL;
if(cmd == SQLSTRING){
blocknum++;
}
//lock
thread_mutex.lock();
threadmessage[sessionid]->command = cmd;
threadmessage[sessionid]->messageinfo = message;
threadmessage[0]->command = NULLCOMMAND;
if(threadmessage[sessionid]->messageinfo.find("vacuum") != string::npos)
{
threadmessage[sessionid]->submit = -2; //vacuum
}
else if(threadmessage[sessionid]->messageinfo.find("drop table") != string::npos){
threadmessage[sessionid]->submit = -1; //rollback
}
else{
int32_t n = random() % 2;
if(n == 0)
threadmessage[sessionid]->submit = 1; //commit
else
threadmessage[sessionid]->submit = -1; //rollback
}
thread_mutex.unlock();
}
while (threadmessage[0]->command == NULLCOMMAND ) {}
if (threadmessage[0]->command == BADSTATUS)
break;
}
// Step 5: Set commit/rollback command
int32_t j = 0;
while(commitnum < totalsession || blocknum < totalsession){
j = (j+1) % totalsession ;
if( j == 0 )
{
j = totalsession;
}
if(threadmessage[j]->command == BLOCKOVER){
thread_mutex.lock();
threadmessage[j]->command = PRINTRESULT;
thread_mutex.unlock();
while(threadmessage[j]->command == PRINTRESULT){}
blocknum++;
}
if(threadmessage[j]->submit == -2 && threadmessage[j]->command == NULLCOMMAND)
{
commitnum ++;
thread_mutex.lock();
threadmessage[j]->submit = 0;
thread_mutex.unlock();
}
else if(threadmessage[j]->submit != 0 && threadmessage[j]->command == NULLCOMMAND){
commitnum ++;
thread_mutex.lock();
threadmessage[j]->command = SQLSTRING;
if(commit&&j%2==1)
threadmessage[j]->messageinfo = "commit;";
else
threadmessage[j]->messageinfo = "rollback;";
threadmessage[0]->command = NULLCOMMAND;
thread_mutex.unlock();
while (threadmessage[0]->command == NULLCOMMAND ) {}
thread_mutex.lock();
threadmessage[j]->submit = 0;
thread_mutex.unlock();
}
}
// Step 6: Setup exit signal to every sub-thread
thread_mutex.lock();
for(i = 1 ; i <= totalsession; i++ )
{
threadmessage[i]->command = EXITTHREAD;
threadmessage[i]->messageinfo = "";
}
thread_mutex.unlock();
for(i = 0 ; i < totalsession; i++ ){
querythread[i]->join();
}
// Step 7: Check status of every thread
for(i = 0 ; i <= totalsession; i++ )
{
if (threadmessage[i]->isrunning ||
threadmessage[i]->messageinfo.size() > 0 ||
threadmessage[i]->command == BADSTATUS)
{
EXPECT_TRUE(false) << "There is not finished or error job";
result = false;
goto clear;
}
}
}
catch(...)
{
EXPECT_TRUE(false) << "There is exception \n";
result = false;
goto clear;
}
clear:
threadmessage.clear();
in.close();
fclose(outputstream);
outputstream = NULL;
return result;
}
void SQLUtilityParallel::thread_execute_exclusive(uint16_t tid, const std::string &schemapath, const std::string &connstr , int32_t totalsession)
{
std::unique_ptr<LibPQSQL> libpqconn;
libpqconn.reset(new LibPQSQL(connstr));
if (!libpqconn->connectDb())
goto errorclear;
if (!libpqconn->execQuery(schemapath,outputstream))
goto errorclear;
do
{
thread_mutex.lock();
MessageCommandType cmd = threadmessage[tid]->command;
string message = threadmessage[tid]->messageinfo;
thread_mutex.unlock();
switch (cmd)
{
case EXITTHREAD:
{
libpqconn->disconnectDb();
libpqconn.release();
return;
}
case BLOCKEDSQL:
{
thread_mutex.lock();
std::cout<<"Tid: "<<tid<<" Blocked "<<threadmessage[tid]->messageinfo<<std::endl;
threadmessage[tid]->isrunning = true;
fprintf(outputstream, "%d:blocked:%s\n", tid, message.c_str());
threadmessage[0]->command = NEXTLINE;
thread_mutex.unlock();
//wait for result from database
bool result = libpqconn->execQuery(message,outputstream);
if (!result) {
goto errorclear;
}
thread_mutex.lock();
threadmessage[tid]->isrunning = false;
threadmessage[tid]->command = BLOCKOVER;
std::cout<<"Tid: "<<tid<<" Block over "<<std::endl;
thread_mutex.unlock();
break;
}
case BLOCKOVER :
break;
case PRINTRESULT:
{
thread_mutex.lock();
std::cout<<"Tid: "<<tid<<" Print "<<std::endl;
fprintf(outputstream, "%d:back:%s\n", tid, message.c_str());
libpqconn->printQueryResult(outputstream);
threadmessage[tid]->command = NULLCOMMAND;
threadmessage[tid]->messageinfo = "";
thread_mutex.unlock();
break;
}
case SQLSTRING:
{
thread_mutex.lock();
std::cout<<"Tid: "<<tid<<" SQLstring "<<threadmessage[tid]->messageinfo<<std::endl;
if (threadmessage[tid]->isrunning)
goto errorclearlock;
fprintf(outputstream, "%d:%s\n", tid, message.c_str());
//submit sql
if (message.compare("commit;") == 0 ||
message.compare("rollback;") == 0 ||
threadmessage[tid]->messageinfo.find("vacuum") != string::npos ||
threadmessage[tid]->messageinfo.find("VACUUM") != string::npos)
{
for (uint16_t i = 0; i < threadmessage[tid]->waittids.size(); i++)
{
if (!threadmessage[threadmessage[tid]->waittids[i]]->isrunning)
{
goto errorclearlock;
}
}
bool result = libpqconn->execQuery(message,outputstream);
if (!result)
goto errorclearlock;
}
else
{
bool result = libpqconn->execQuery(message,outputstream);
if (!result)
goto errorclearlock;
}
libpqconn->printQueryResult(outputstream);
threadmessage[tid]->command = NULLCOMMAND;
threadmessage[tid]->messageinfo = "";
threadmessage[0]->command = NEXTLINE;
thread_mutex.unlock();
break;
}
case NULLCOMMAND:
break;
case NEXTLINE:
case BADSTATUS:
{
goto errorclear;
}
}
} while (true);
errorclearlock:
thread_mutex.unlock();
errorclear:
thread_mutex.lock();
threadmessage[0]->command = BADSTATUS;
threadmessage[tid]->command = BADSTATUS;
thread_mutex.unlock();
libpqconn->disconnectDb();
libpqconn.release();
}
bool SQLUtilityParallel::runParallelControlFile(const string &sqlFile, const string &outputFile)
{
string line;
bool result = true;
int32_t totalsession = 0, i = 0;
std::vector<std::unique_ptr<std::thread>> querythread;
// Step 1: Open input and output file
std::fstream in;
in.open(sqlFile, std::ios::in);
outputstream = fopen(outputFile.c_str(), "w");
if (!in.is_open() || !outputstream) {
EXPECT_TRUE(false) << "Error opening input/output file ";
result = false;
goto clear;
}
// Step 2: clear env and get total session number
threadmessage.clear();
totalsession = __getTotalSession(in);
if (totalsession <= 0 )
{
result = false;
goto clear;
}
try
{
// Step 3: Initialize thread message, query thread, and connection pool
std::unique_ptr<ThreadMessage> thrm(new ThreadMessage(0));
thrm->command = NEXTLINE;
threadmessage.push_back(std::move(thrm));
string schemamessage = "SET SEARCH_PATH = " + this->getSchemaName() + ";" ;
string connstr = this->getConnectionString();
//begin
for(i = 1 ; i <= totalsession; i++ ){
std::unique_ptr<ThreadMessage> thrm(new ThreadMessage(i));
threadmessage.push_back(std::move(thrm));
}
for(i = 1 ; i <= totalsession; i++ ) {
std::unique_ptr<std::thread> thr(new std::thread
(&SQLUtilityParallel::thread_execute, this, i, schemamessage, connstr));
querythread.push_back(std::move(thr));
}
// Step 4: Read every line of file and pass to sub thread
while (getline(in, line))
{
if (hawq::test::startsWith(line, "#"))
continue;
int32_t sessionid = 0 , waittid = 0;
string message = "";
result = __processLine(line, totalsession, sessionid, waittid, message);
if (!result)
goto clear;
if (sessionid == 0)
{
this->execute(message, true);
}
else
{
if ( threadmessage[sessionid]->command != NULLCOMMAND )
{
EXPECT_TRUE(false) << "The file format is error or query handling error\n";
result = false;
goto clear;
}
MessageCommandType cmd = (waittid == 0) ? SQLSTRING : BLOCKEDSQL;
//lock
thread_mutex.lock();
threadmessage[sessionid]->command = cmd;
threadmessage[sessionid]->messageinfo = message;
threadmessage[0]->command = NULLCOMMAND;
thread_mutex.unlock();
}
while (threadmessage[0]->command == NULLCOMMAND ) {}
if (threadmessage[0]->command == BADSTATUS)
break;
}
// Step 5: Setup exit signal to every sub-thread
thread_mutex.lock();
for(i = 1 ; i <= totalsession; i++ )
{
threadmessage[i]->command = EXITTHREAD;
threadmessage[i]->messageinfo = "";
}
thread_mutex.unlock();
for(i = 0 ; i < totalsession; i++ ){
querythread[i]->join();
}
// Step 6: Check status of every thread
for(i = 0 ; i <= totalsession; i++ )
{
if (threadmessage[i]->isrunning ||
threadmessage[i]->messageinfo.size() > 0 ||
threadmessage[i]->command == BADSTATUS)
{
EXPECT_TRUE(false) << "There is not finished or error job";
result = false;
goto clear;
}
}
}
catch(...)
{
EXPECT_TRUE(false) << "There is exception \n";
result = false;
goto clear;
}
clear:
threadmessage.clear();
in.close();
fclose(outputstream);
outputstream = NULL;
return result;
}
void SQLUtilityParallel::thread_execute(uint16_t tid, const std::string &schemapath, const std::string &connstr)
{
std::unique_ptr<LibPQSQL> libpqconn;
libpqconn.reset(new LibPQSQL(connstr));
if (!libpqconn->connectDb())
goto errorclear;
if (!libpqconn->execQuery(schemapath,outputstream))
goto errorclear;
do
{
thread_mutex.lock();
MessageCommandType cmd = threadmessage[tid]->command;
string message = threadmessage[tid]->messageinfo;
thread_mutex.unlock();
switch (cmd)
{
case EXITTHREAD:
{
libpqconn->disconnectDb();
libpqconn.release();
return;
}
case BLOCKEDSQL:
{
thread_mutex.lock();
threadmessage[tid]->isrunning = true;
std::cout<<"BLOCK Tid: "<<tid<<" "<<threadmessage[tid]->messageinfo<<std::endl;
fprintf(outputstream, "%d:blocked:%s\n", tid, message.c_str());
threadmessage[0]->command = NEXTLINE;
thread_mutex.unlock();
bool result = libpqconn->execQuery(message,outputstream);
if (!result) {
std::cout<<"Error"<<std::endl;
goto errorclear;
}
thread_mutex.lock();
threadmessage[tid]->isrunning = false;
threadmessage[tid]->command = BLOCKOVER;
std::cout<<"BLOCKOVER Tid: "<<tid<<" "<<threadmessage[tid]->messageinfo<<std::endl;
thread_mutex.unlock();
break;
}
case BLOCKOVER :
break;
case PRINTRESULT:
{
thread_mutex.lock();
std::cout<<"PRINT Tid: "<<tid<<" "<<threadmessage[tid]->messageinfo<<std::endl;
fprintf(outputstream, "%d:back:%s\n", tid, message.c_str());
libpqconn->printQueryResult(outputstream);
threadmessage[tid]->command = NULLCOMMAND;
threadmessage[tid]->messageinfo = "";
thread_mutex.unlock();
break;
}
case SQLSTRING:
{
thread_mutex.lock();
std::cout<<"SQL Tid: "<<tid<<" "<<threadmessage[tid]->messageinfo<<std::endl;
if (threadmessage[tid]->isrunning)
goto errorclearlock;
fprintf(outputstream, "%d:%s\n", tid, message.c_str());
if (message.compare("commit;") == 0 ||
message.compare("rollback;") == 0 || threadmessage[tid]->messageinfo.find("vacuum") != string::npos) {
for (uint16_t i = 0; i < threadmessage[tid]->waittids.size(); i++) {
if (!threadmessage[threadmessage[tid]->waittids[i]]->isrunning) {
goto errorclearlock;
}
}
bool result = libpqconn->execQuery(message,outputstream);
if (!result)
goto errorclearlock;
thread_mutex.unlock();
while (threadmessage[tid]->waittids.size() > 0) {
thread_mutex.lock();
int waitpid = threadmessage[tid]->waittids[0];
for (std::vector<int>::iterator iter = threadmessage[waitpid]->waitfortids.begin();
iter != threadmessage[waitpid]->waitfortids.end(); iter++) {
if (*iter == tid) {
threadmessage[waitpid]->waitfortids.erase(iter);
break;
}
}
thread_mutex.unlock();
if (threadmessage[waitpid]->waitfortids.size() == 0) {
thread_mutex.unlock();
while (threadmessage[waitpid]->command != BLOCKOVER) {}
thread_mutex.lock();
threadmessage[waitpid]->command = PRINTRESULT;
thread_mutex.unlock();
}
while (threadmessage[waitpid]->command == PRINTRESULT) {}
thread_mutex.lock();
threadmessage[tid]->waittids.erase(threadmessage[tid]->waittids.begin());
thread_mutex.unlock();
}
thread_mutex.lock();
}
else
{
bool result = libpqconn->execQuery(message,outputstream);
if (!result)
goto errorclearlock;
}
libpqconn->printQueryResult(outputstream);
threadmessage[tid]->command = NULLCOMMAND;
threadmessage[tid]->messageinfo = "";
threadmessage[0]->command = NEXTLINE;
thread_mutex.unlock();
break;
}
case NULLCOMMAND:
break;
case NEXTLINE:
case BADSTATUS:
{
goto errorclear;
}
}
} while (true);
errorclearlock:
thread_mutex.unlock();
errorclear:
thread_mutex.lock();
threadmessage[0]->command = BADSTATUS;
threadmessage[tid]->command = BADSTATUS;
thread_mutex.unlock();
libpqconn->disconnectDb();
libpqconn.release();
}
void SQLUtilityParallel::execParallelControlSQLFile(const string &sqlFile,
const string &ansFile,
bool exclusive,
const string &initFile)
{
FilePath fp;
const string ansFileAbsPath = testRootPath + "/" + ansFile;
const string sqlFileAbsPath = testRootPath + "/" + sqlFile;
if (!std::ifstream(ansFileAbsPath))
ASSERT_TRUE(false) << ansFileAbsPath << " doesn't exist";
if (!std::ifstream(sqlFileAbsPath))
ASSERT_TRUE(false) << sqlFileAbsPath << " doesn't exist";
//get file from source
fp = splitFilePath(ansFileAbsPath);
// double check to avoid empty fileBaseName
if (fp.fileBaseName.empty())
ASSERT_TRUE(false) << ansFileAbsPath << " is invalid";
// outFile is located in the same folder with ansFile
const string outFileAbsPath = fp.path + "/" + fp.fileBaseName + ".out";
bool result;
this->exclusive = exclusive ;
if( this->exclusive )
result = this->runParallelControlFile_exclusive(sqlFileAbsPath, outFileAbsPath);
else
result = this->runParallelControlFile(sqlFileAbsPath, outFileAbsPath);
if (!result)
ASSERT_TRUE(false) << " Execute query is failed";
//Check result
if( !this->exclusive )
checkAnsFile(ansFileAbsPath, outFileAbsPath, initFile, "", fp);
}
} // namespace test
} // namespace hawq