blob: f3db9d3460c3cae774c275bc122f85d90fcb1c2e [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: ExExeUtilVolTab.cpp
* Description:
*
*
* Language: C++
*
*
*
*
*****************************************************************************
*/
#include "ComCextdecs.h"
#include "ComSizeDefs.h"
#include "cli_stdh.h"
#include "ex_stdh.h"
#include "sql_id.h"
#include "ex_transaction.h"
#include "ComTdb.h"
#include "ex_tcb.h"
#include "ComSqlId.h"
#include "ExExeUtil.h"
#include "ex_exe_stmt_globals.h"
#include "exp_expr.h"
#include "exp_clause_derived.h"
#include "ComRtUtils.h"
#include "ExStats.h"
#include "seabed/ms.h"
#include "CmpContext.h"
///////////////////////////////////////////////////////////////////
ex_tcb * ExExeUtilLoadVolatileTableTdb::build(ex_globals * glob)
{
ExExeUtilLoadVolatileTableTcb * exe_util_tcb;
exe_util_tcb =
new(glob->getSpace()) ExExeUtilLoadVolatileTableTcb(*this, glob);
exe_util_tcb->registerSubtasks();
return (exe_util_tcb);
}
////////////////////////////////////////////////////////////////
// Constructor for class ExExeUtilLoadVolatileTableTcb
///////////////////////////////////////////////////////////////
ExExeUtilLoadVolatileTableTcb::ExExeUtilLoadVolatileTableTcb(
const ComTdbExeUtil & exe_util_tdb,
ex_globals * glob)
: ExExeUtilTcb( exe_util_tdb, NULL, glob),
step_(INITIAL_)
{
// Allocate the private state in each entry of the down queue
qparent_.down->allocatePstate(this);
}
//////////////////////////////////////////////////////
// work() for ExExeUtilLoadVolatileTableTcb
//////////////////////////////////////////////////////
short ExExeUtilLoadVolatileTableTcb::work()
{
Lng32 cliRC = 0;
short retcode = 0;
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
// if no room in up queue, won't be able to return data/status.
// Come back later.
if (qparent_.up->isFull())
return WORK_OK;
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExExeUtilPrivateState & pstate =
*((ExExeUtilPrivateState*) pentry_down->pstate);
// Get the globals stucture of the master executor.
ExExeStmtGlobals *exeGlob = getGlobals()->castToExExeStmtGlobals();
ExMasterStmtGlobals *masterGlob = exeGlob->castToExMasterStmtGlobals();
ContextCli *currContext = masterGlob->getStatement()->getContext();
while (1)
{
switch (step_)
{
case INITIAL_:
{
step_ = INSERT_;
}
break;
case INSERT_:
{
// let compiler know that this insert statement should be
// treated as a regular insert stmt.
// 0x20000 is the bit to allow this.
// Bit defined in parser/SqlParserGlobalsCmn.h.
// masterGlob->getStatement()->getContext()->setSqlParserFlags(0x20000); // NO_IMPLICIT_VOLATILE_TABLE_UPD_STATS
// issue the insert command
Int64 rowsAffected;
// All internal queries issued from CliInterface assume that
// they are in ISO_MAPPING.
// That causes mxcmp to use the default charset as iso88591
// for unprefixed literals.
// The insert...select being issued out here contains the user
// specified query and any literals in that should be using
// the default_charset.
// So we send the isoMapping charset instead of the
// enum ISO_MAPPING.
Int32 savedIsoMapping =
currContext->getSessionDefaults()->getIsoMappingEnum();
cliInterface()->setIsoMapping
(currContext->getSessionDefaults()->getIsoMappingEnum());
cliRC = cliInterface()->executeImmediate(
lvtTdb().insertQuery_,
NULL, NULL, TRUE,
&rowsAffected,TRUE);
cliInterface()->setIsoMapping(savedIsoMapping);
if (cliRC < 0)
{
ExHandleErrors(qparent_,
pentry_down,
0,
getGlobals(),
NULL,
(ExeErrorCode)cliRC,
NULL,
NULL
);
step_ = ERROR_;
break;
}
masterGlob->setRowsAffected(rowsAffected);
if (rowsAffected > lvtTdb().threshold_)
step_ = UPD_STATS_;
else
step_ = DONE_;
}
break;
case UPD_STATS_:
{
// issue the upd stats command
char * usQuery =
new(getHeap()) char[strlen(lvtTdb().updStatsQuery_) + 10 + 1];
str_sprintf(usQuery, lvtTdb().updStatsQuery_,
masterGlob->getRowsAffected());
cliRC = cliInterface()->executeImmediate(usQuery,NULL,NULL,TRUE,NULL,TRUE);
NADELETEBASIC(usQuery, getHeap());
if (cliRC < 0)
{
ExHandleErrors(qparent_,
pentry_down,
0,
getGlobals(),
NULL,
(ExeErrorCode)cliRC,
NULL,
NULL
);
step_ = ERROR_;
break;
}
step_ = DONE_;
}
break;
case DONE_:
{
// reset special insert processing bit
// masterGlob->getStatement()->getContext()->resetSqlParserFlags(0x20000); // NO_IMPLICIT_VOLATILE_TABLE_UPD_STATS
// Return EOF.
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(0);
up_entry->upState.status = ex_queue::Q_NO_DATA;
// insert into parent
qparent_.up->insert();
step_ = INITIAL_;
qparent_.down->removeHead();
return WORK_OK;
}
break;
case ERROR_:
{
step_ = DONE_;
}
break;
} // switch
} // while
return WORK_OK;
}
////////////////////////////////////////////////////////////////////////
// Redefine virtual method allocatePstates, to be used by dynamic queue
// resizing, as well as the initial queue construction.
////////////////////////////////////////////////////////////////////////
ex_tcb_private_state * ExExeUtilLoadVolatileTableTcb::allocatePstates(
Lng32 &numElems, // inout, desired/actual elements
Lng32 &pstateLength) // out, length of one element
{
PstateAllocator<ExExeUtilLoadVolatileTablePrivateState> pa;
return pa.allocatePstates(this, numElems, pstateLength);
}
/////////////////////////////////////////////////////////////////////////////
// Constructor and destructor for ExeUtil_private_state
/////////////////////////////////////////////////////////////////////////////
ExExeUtilLoadVolatileTablePrivateState::ExExeUtilLoadVolatileTablePrivateState()
{
}
ExExeUtilLoadVolatileTablePrivateState::~ExExeUtilLoadVolatileTablePrivateState()
{
};
///////////////////////////////////////////////////////////////////
ex_tcb * ExExeUtilCleanupVolatileTablesTdb::build(ex_globals * glob)
{
ExExeUtilCleanupVolatileTablesTcb * exe_util_tcb;
exe_util_tcb = new(glob->getSpace()) ExExeUtilCleanupVolatileTablesTcb(*this, glob);
exe_util_tcb->registerSubtasks();
return (exe_util_tcb);
}
////////////////////////////////////////////////////////////////
// class ExExeUtilVolatileTablesTcb
///////////////////////////////////////////////////////////////
ExExeUtilVolatileTablesTcb::ExExeUtilVolatileTablesTcb(
const ComTdbExeUtil & exe_util_tdb,
ex_globals * glob)
: ExExeUtilTcb( exe_util_tdb, NULL, glob)
{
// Allocate the private state in each entry of the down queue
qparent_.down->allocatePstate(this);
}
short ExExeUtilVolatileTablesTcb::isCreatorProcessObsolete
(const char * name, NABoolean includesCat, NABoolean isCSETableName)
{
Lng32 retcode = 0;
// find process start time, node name, cpu and pin of creator process.
short segmentNum;
short cpu;
pid_t pin;
Int64 nameCreateTime = 0;
Lng32 currPos = 0;
if (includesCat)
{
// name is of the form: <CAT>.<SCHEMA>
// Skip the <CAT> part.
while (name[currPos] != '.')
currPos++;
currPos++;
}
if (isCSETableName)
{
// CSE table names look like this: CSE_TEMP_<name>_MXID..._Snnn_mmm
const char *startPrefix = "_" COM_SESSION_ID_PREFIX;
const char *match = &name[currPos];
const char *prevMatch = NULL;
// find the last occurrence of the start prefix in the name
while ((match = strstr(match, startPrefix)) != NULL)
prevMatch = ++match; // position prevMatch on the "MXID"
if (prevMatch)
currPos = prevMatch-name;
else
return 0; // name does not fit our pattern, don't delete it
}
else
// volatile table schema is a fixed prefix, followed by the session id
currPos +=
strlen(COM_VOLATILE_SCHEMA_PREFIX);
Int64 segmentNum_l;
Int64 cpu_l;
Int64 pin_l;
Int64 sessionUniqNum;
Lng32 userNameLen = 0;
Lng32 userSessionNameLen = 0;
ComSqlId::extractSqlSessionIdAttrs
(&name[currPos],
-1, //(strlen(name) - currPos),
segmentNum_l,
cpu_l,
pin_l,
nameCreateTime,
sessionUniqNum,
userNameLen, NULL,
userSessionNameLen, NULL);
segmentNum = (short)segmentNum_l;
cpu = (short)cpu_l;
pin = (pid_t)pin_l;
// see if process exists. If it exists, check if it is the same
// process that is specified in the name.
short errorDetail = 0;
Int64 procCreateTime = 0;
retcode = ComRtGetProcessCreateTime(&cpu, &pin, &segmentNum,
procCreateTime,
errorDetail);
if (retcode == XZFIL_ERR_OK)
{
// process specified in name exists.
if (nameCreateTime != procCreateTime)
// but is a different process. Schema or name's process is obsolete.
return -1;
else
// schema or name's process is still alive.
return 0;
}
else
{
if (retcode == XZFIL_ERR_NOSUCHDEV)
return -1;
else
return 0;
}
}
////////////////////////////////////////////////////////////////////////
// Redefine virtual method allocatePstates, to be used by dynamic queue
// resizing, as well as the initial queue construction.
////////////////////////////////////////////////////////////////////////
ex_tcb_private_state * ExExeUtilVolatileTablesTcb::allocatePstates(
Lng32 &numElems, // inout, desired/actual elements
Lng32 &pstateLength) // out, length of one element
{
PstateAllocator<ExExeUtilVolatileTablesPrivateState> pa;
return pa.allocatePstates(this, numElems, pstateLength);
}
/////////////////////////////////////////////////////////////////////////////
// Constructor and destructor for ExeUtil_private_state
/////////////////////////////////////////////////////////////////////////////
ExExeUtilVolatileTablesPrivateState::ExExeUtilVolatileTablesPrivateState()
{
}
ExExeUtilVolatileTablesPrivateState::~ExExeUtilVolatileTablesPrivateState()
{
};
////////////////////////////////////////////////////////////////
// class ExExeUtilCleanupVolatileTablesTcb
///////////////////////////////////////////////////////////////
ExExeUtilCleanupVolatileTablesTcb::ExExeUtilCleanupVolatileTablesTcb(
const ComTdbExeUtil & exe_util_tdb,
ex_globals * glob)
: ExExeUtilVolatileTablesTcb( exe_util_tdb, glob),
step_(INITIAL_),
schemaNamesList_(NULL),
schemaQuery_(NULL),
someSchemasCouldNotBeDropped_(FALSE)
{
}
//////////////////////////////////////////////////////
// work() for ExExeUtilCleanupVolatileTablesTcb
//////////////////////////////////////////////////////
static const QueryString getAllVolatileSchemasQuery[] =
{
{" select O.schema_name from "},
{" TRAFODION.\"_MD_\".OBJECTS O "},
{" where O.schema_name like 'VOLATILE_SCHEMA_%%' "},
{" and O.object_type = 'SS' "},
{" order by 1 "},
{" for read uncommitted access "}
};
short ExExeUtilCleanupVolatileTablesTcb::work()
{
Lng32 cliRC = 0;
short retcode = 0;
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
// if no room in up queue, won't be able to return data/status.
// Come back later.
if (qparent_.up->isFull())
return WORK_OK;
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExExeUtilPrivateState & pstate =
*((ExExeUtilPrivateState*) pentry_down->pstate);
// Get the globals stucture of the master executor.
ExExeStmtGlobals *exeGlob = getGlobals()->castToExExeStmtGlobals();
ExMasterStmtGlobals *masterGlob = exeGlob->castToExMasterStmtGlobals();
while (1)
{
switch (step_)
{
case INITIAL_:
{
step_ = FETCH_SCHEMA_NAMES_;
errorSchemas_[0] = 0;
}
break;
case FETCH_SCHEMA_NAMES_:
{
Int32 schema_qry_array_size =
sizeof(getAllVolatileSchemasQuery)
/ sizeof(QueryString);
const QueryString * schemaCleanupQueryString =
getAllVolatileSchemasQuery;
char * gluedQuery;
Lng32 gluedQuerySize;
glueQueryFragments(schema_qry_array_size,
schemaCleanupQueryString,
gluedQuery, gluedQuerySize);
schemaQuery_ =
new(getHeap()) char[gluedQuerySize + 10 + 1];
str_sprintf(schemaQuery_, gluedQuery);
NADELETEBASIC(gluedQuery, getMyHeap());
if (initializeInfoList(schemaNamesList_))
{
step_ = ERROR_;
break;
}
if (fetchAllRows(schemaNamesList_, schemaQuery_, 1, FALSE, retcode) < 0)
{
// Delete new'd characters
NADELETEBASIC(schemaQuery_, getHeap());
schemaQuery_ = NULL;
step_ = ERROR_;
break;
}
// Delete new'd characters
NADELETEBASIC(schemaQuery_, getHeap());
schemaQuery_ = NULL;
step_ = START_CLEANUP_;
}
break;
case START_CLEANUP_:
{
schemaNamesList_->position();
someSchemasCouldNotBeDropped_ = FALSE;
if (masterGlob->getStatement()->getContext()->
getTransaction()->xnInProgress())
{
// cannot have a transaction running.
// Return error.
ExRaiseSqlError(getHeap(), &diagsArea_, -EXE_BEGIN_TRANSACTION_ERROR);
step_ = ERROR_;
break;
}
step_ = CHECK_FOR_OBSOLETE_CREATOR_PROCESS_;
}
break;
case CHECK_FOR_OBSOLETE_CREATOR_PROCESS_:
{
if (schemaNamesList_->atEnd())
{
step_ = END_CLEANUP_;
break;
}
OutputInfo * vi = (OutputInfo*)schemaNamesList_->getCurr();
char * schemaName = vi->get(0);
if ((cvtTdb().cleanupAllTables()) ||
(isCreatorProcessObsolete(schemaName, FALSE, FALSE)))
{
// schema is obsolete, drop it.
// Or we need to cleanup all schemas, active or obsolete.
step_ = DO_CLEANUP_;
}
else
{
schemaNamesList_->advance();
}
}
break;
case DO_CLEANUP_:
{
OutputInfo * vi = (OutputInfo*)schemaNamesList_->getCurr();
char * schemaName = vi->get(0);
retcode =
dropVolatileSchema(masterGlob->getStatement()->getContext(),
schemaName, getHeap(), getDiagsArea(), getGlobals());
if (retcode < 0)
{
// changes errors to warnings and move on to next schema.
getDiagsArea()->negateAllErrors();
// clear diags and move on to next schema.
// Remember that an error was returned, we will
// return a warning at the end.
SQL_EXEC_ClearDiagnostics(NULL);
retcode = 0;
if ((strlen(errorSchemas_) + strlen(schemaName)) < 1000)
{
strcat(errorSchemas_, schemaName);
strcat(errorSchemas_, " ");
}
else if (strlen(errorSchemas_) < 1005) // maxlen = 1010
strcat(errorSchemas_, "..."); // could not fit
someSchemasCouldNotBeDropped_ = TRUE;
}
schemaNamesList_->advance();
step_ = CHECK_FOR_OBSOLETE_CREATOR_PROCESS_;
}
break;
case END_CLEANUP_:
{
if (someSchemasCouldNotBeDropped_)
{
// add a warning to indicate that some schemas were not
// dropped.
ExRaiseSqlError(getHeap(), &diagsArea_, 1069,
NULL, NULL, NULL,
errorSchemas_);
}
step_ = CLEANUP_HIVE_TABLES_;
}
break;
case CLEANUP_HIVE_TABLES_:
{
if (cvtTdb().cleanupHiveCSETables())
dropHiveTempTablesForCSEs();
step_ = DONE_;
}
break;
case ERROR_:
{
if (handleError())
return WORK_OK;
getDiagsArea()->clear();
step_ = DONE_;
}
break;
case DONE_:
{
if (handleDone())
return WORK_OK;
step_ = INITIAL_;
return WORK_OK;
}
break;
} // switch
} // while
return WORK_OK;
}
short ExExeUtilCleanupVolatileTablesTcb::dropVolatileSchema
(ContextCli * currContext,
char * schemaName,
CollHeap * heap,
ComDiagsArea *&diagsArea,
ex_globals *glob)
{
const char *parentQid = NULL;
if (glob)
{
ExExeStmtGlobals *stmtGlobals = glob->castToExExeStmtGlobals();
if (stmtGlobals->castToExMasterStmtGlobals())
parentQid = stmtGlobals->castToExMasterStmtGlobals()->
getStatement()->getUniqueStmtId();
else
{
ExEspStmtGlobals *espGlobals = stmtGlobals->castToExEspStmtGlobals();
if (espGlobals && espGlobals->getStmtStats())
parentQid = espGlobals->getStmtStats()->getQueryId();
}
}
ExeCliInterface cliInterface(heap, 0, currContext, parentQid);
char * dropSchema = NULL;
if (schemaName)
{
dropSchema =
new(heap) char[strlen("DROP VOLATILE SCHEMA CLEANUP CASCADE; ")
+ strlen(schemaName) + 1];
strcpy(dropSchema, "DROP VOLATILE SCHEMA ");
strcat(dropSchema, schemaName);
strcat(dropSchema, " CLEANUP CASCADE;");
}
else
{
dropSchema
= new(heap) char[strlen("DROP IMPLICIT VOLATILE SCHEMA CLEANUP CASCADE; ") + 1];
strcpy(dropSchema, "DROP IMPLICIT VOLATILE SCHEMA CLEANUP CASCADE;");
}
// let compiler know that volatile schema could be dropped.
// 0x8000 is the bit to allow this.
// Bit defined in parser/SqlParserGlobalsCmn.h.
// currContext->setSqlParserFlags(0x8000); // ALLOW_VOLATILE_SCHEMA_CREATION
// issue the drop schema command
Lng32 cliRC = cliInterface.executeImmediate(dropSchema);
cliInterface.allocAndRetrieveSQLDiagnostics(diagsArea);
// reset volatile schema bit
// currContext->resetSqlParserFlags(0x8000); // ALLOW_VOLATILE_SCHEMA_CREATION
NADELETEBASIC(dropSchema, heap);
return cliRC;
}
short ExExeUtilCleanupVolatileTablesTcb::dropVolatileTables
(ContextCli * currContext,
CollHeap * heap)
{
HashQueue * volTabList = currContext->getVolTabList();
if ((! volTabList) ||
(volTabList->numEntries() == 0))
return 0;
ExeCliInterface cliInterface(heap, 0, currContext);
char * dropSchema
= new(heap) char[strlen("DROP IMPLICIT VOLATILE SCHEMA TABLES CLEANUP CASCADE; ") + 1];
strcpy(dropSchema, "DROP IMPLICIT VOLATILE SCHEMA TABLES CLEANUP CASCADE;");
// let compiler know that volatile schema could be dropped.
// 0x8000 is the bit to allow this.
// Bit defined in parser/SqlParserGlobalsCmn.h.
//currContext->setSqlParserFlags(0x8000); // ALLOW_VOLATILE_SCHEMA_CREATION
// issue the drop schema command
Lng32 cliRC = cliInterface.executeImmediate(dropSchema);
// reset volatile schema bit
//currContext->resetSqlParserFlags(0x8000); // ALLOW_VOLATILE_SCHEMA_CREATION
NADELETEBASIC(dropSchema, heap);
char * sendCQD
= new(heap) char[strlen("CONTROL QUERY DEFAULT VOLATILE_SCHEMA_IN_USE 'FALSE';") + 1];
strcpy(sendCQD, "CONTROL QUERY DEFAULT VOLATILE_SCHEMA_IN_USE 'FALSE';");
cliInterface.executeImmediate(sendCQD);
NADELETEBASIC(sendCQD, heap);
return cliRC;
}
short ExExeUtilCleanupVolatileTablesTcb::dropHiveTempTablesForCSEs()
{
Queue * hiveTableNames = NULL;
// Todo: CSE: support schemas other than default for temp tables
NAString hiveTablesGetQuery("get tables in schema hive.hive, no header");
short retcode = 0;
if (initializeInfoList(hiveTableNames))
{
return -1;
}
if (fetchAllRows(hiveTableNames,
(char *) hiveTablesGetQuery.data(),
1,
FALSE,
retcode) < 0)
{
return -1;
}
hiveTableNames->position();
while (!hiveTableNames->atEnd())
{
OutputInfo * ht = (OutputInfo*) (hiveTableNames->getCurr());
const char *origTableName = ht->get(0);
NAString tableName(origTableName);
tableName.toUpper();
if (strstr(tableName.data(), COM_CSE_TABLE_PREFIX) == tableName.data() &&
isCreatorProcessObsolete(tableName.data(), FALSE, TRUE))
{
NAString dropHiveTable("drop table ");
dropHiveTable += origTableName;
if (HiveClient_JNI::executeHiveSQL(dropHiveTable.data()) != HVC_OK)
; // ignore errors for now
}
hiveTableNames->advance();
}
return 0;
}
///////////////////////////////////////////////////////////////////
// class ExExeUtilGetVolatileInfoTdb
///////////////////////////////////////////////////////////////
ex_tcb * ExExeUtilGetVolatileInfoTdb::build(ex_globals * glob)
{
ExExeUtilGetVolatileInfoTcb * exe_util_tcb;
exe_util_tcb = new(glob->getSpace()) ExExeUtilGetVolatileInfoTcb(*this, glob);
exe_util_tcb->registerSubtasks();
return (exe_util_tcb);
}
////////////////////////////////////////////////////////////////
// class ExExeUtilGetVolatileInfoTcb
///////////////////////////////////////////////////////////////
ExExeUtilGetVolatileInfoTcb::ExExeUtilGetVolatileInfoTcb(
const ComTdbExeUtil & exe_util_tdb,
ex_globals * glob)
: ExExeUtilVolatileTablesTcb( exe_util_tdb, glob),
step_(INITIAL_),
prevInfo_(NULL),
infoQuery_(NULL)
{
infoQuery_ = new(glob->getDefaultHeap()) char[10000];
}
//////////////////////////////////////////////////////
// class ExExeUtilGetVolatileInfoTcb::work
//////////////////////////////////////////////////////
static const QueryString getAllVolatileTablesQuery[] =
{
{" select O.schema_name, O.object_type, O.object_name from "},
{" TRAFODION.\"_MD_\".OBJECTS O "},
{" where O.schema_name like 'VOLATILE_SCHEMA_%%' "},
{" and (O.object_type = 'BT' or O.object_type = 'IX') "},
{" order by 1,2 "},
{" for read uncommitted access "}
};
static const QueryString getAllVolatileTablesInASessionQuery[] =
{
{" select O.schema_name, O.object_type, O.object_name from "},
{" TRAFODION.\"_MD_\".OBJECTS O "},
{" where O.schema_name like 'VOLATILE_SCHEMA_' || trim(substr('%s', 1, 42)) || '%%' "},
{" and (O.object_type = 'BT' or O.object_type = 'IX') "},
{" order by 1,2 "},
{" for read uncommitted access "}
};
short ExExeUtilGetVolatileInfoTcb::work()
{
Lng32 cliRC = 0;
short retcode = 0;
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
// if no room in up queue, won't be able to return data/status.
// Come back later.
if (qparent_.up->isFull())
return WORK_OK;
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExExeUtilPrivateState & pstate =
*((ExExeUtilPrivateState*) pentry_down->pstate);
// Get the globals stucture of the master executor.
ExExeStmtGlobals *exeGlob = getGlobals()->castToExExeStmtGlobals();
ExMasterStmtGlobals *masterGlob = exeGlob->castToExMasterStmtGlobals();
while (1)
{
switch (step_)
{
case INITIAL_:
{
step_ = APPEND_NEXT_QUERY_FRAGMENT_;
}
break;
case APPEND_NEXT_QUERY_FRAGMENT_:
{
Int32 info_qry_array_size = -1;
const QueryString * infoQueryString = NULL;
// extra space to be allocated to fill with "%s" fillers
// in the query text.
Lng32 extraSpace = 0;
if (gviTdb().allSchemas())
{
info_qry_array_size = sizeof(getAllVolatileSchemasQuery)
/ sizeof(QueryString);
infoQueryString = getAllVolatileSchemasQuery;
}
else if (gviTdb().allTables())
{
info_qry_array_size = sizeof(getAllVolatileTablesQuery)
/ sizeof(QueryString);
infoQueryString = getAllVolatileTablesQuery;
}
else if (gviTdb().allTablesInASession())
{
info_qry_array_size = sizeof(getAllVolatileTablesInASessionQuery)
/ sizeof(QueryString);
infoQueryString = getAllVolatileTablesInASessionQuery;
}
char * param1 = gviTdb().param1_;
char * param2 = gviTdb().param2_;
char * gluedQuery;
Lng32 gluedQuerySize;
glueQueryFragments(info_qry_array_size,
infoQueryString,
gluedQuery, gluedQuerySize);
str_sprintf(infoQuery_, gluedQuery, param1, param2);
// Delete new'd characters
NADELETEBASIC(gluedQuery, getHeap());
gluedQuery = NULL;
step_ = FETCH_ALL_ROWS_;
}
break;
case FETCH_ALL_ROWS_:
{
Lng32 numOutputEntries = 0;
if (gviTdb().allSchemas())
{
numOutputEntries = 1;
}
else if (gviTdb().allTables())
{
numOutputEntries = 3;
}
else if (gviTdb().allTablesInASession())
{
numOutputEntries = 3;
}
if (initializeInfoList(infoList_))
{
step_ = ERROR_;
break;
}
if (fetchAllRows(infoList_, infoQuery_, numOutputEntries, FALSE, retcode) < 0)
{
step_ = ERROR_;
NADELETEBASIC(infoQuery_, getHeap());
infoQuery_ = NULL;
break;
}
NADELETEBASIC(infoQuery_, getHeap());
infoQuery_ = NULL;
infoList_->position();
if (gviTdb().allSchemas())
{
step_ = RETURN_ALL_SCHEMAS_;
}
else if (gviTdb().allTables())
{
step_ = RETURN_ALL_TABLES_;
}
else if (gviTdb().allTablesInASession())
{
step_ = RETURN_TABLES_IN_A_SESSION_;
}
else
step_ = ERROR_;
}
break;
case RETURN_ALL_SCHEMAS_:
{
if (infoList_->atEnd())
{
step_ = DONE_;
break;
}
if (qparent_.up->isFull())
return WORK_OK;
char outBuf[400+ComMAX_3_PART_EXTERNAL_UTF8_NAME_LEN_IN_BYTES];
OutputInfo * vi = (OutputInfo*)infoList_->getCurr();
char * schemaName = vi->get(0);
char state[10];
if (isCreatorProcessObsolete(schemaName, FALSE, FALSE))
strcpy(state, "Obsolete");
else
strcpy(state, "Active ");
str_sprintf(outBuf, "Schema(%8s): %s", state, schemaName);
moveRowToUpQueue(outBuf);
infoList_->advance();
}
break;
case RETURN_ALL_TABLES_:
case RETURN_TABLES_IN_A_SESSION_:
{
if (infoList_->atEnd())
{
step_ = DONE_;
break;
}
if ((qparent_.up->getSize() - qparent_.up->getLength()) < 4)
return WORK_CALL_AGAIN;
char outBuf[400+ComMAX_3_PART_EXTERNAL_UTF8_NAME_LEN_IN_BYTES];
OutputInfo * vi = (OutputInfo*)infoList_->getCurr();
char * schemaName = vi->get(0);
if ((! prevInfo_) ||
(strcmp(prevInfo_->get(0), schemaName) != 0))
{
char state[10];
if (isCreatorProcessObsolete(schemaName, FALSE, FALSE))
strcpy(state, "Obsolete");
else
strcpy(state, "Active ");
str_sprintf(outBuf, "Schema(%8s): %s", state, schemaName);
moveRowToUpQueue(outBuf);
prevInfo_ = (OutputInfo*)infoList_->getCurr();
}
char objectType[6];
if (strcmp(vi->get(1), "BT") == 0)
strcpy(objectType, "Table");
else if (strcmp(vi->get(1), "IX") == 0)
strcpy(objectType, "Index");
str_sprintf(outBuf, " %5s: %s",
objectType, vi->get(2));
moveRowToUpQueue(outBuf);
infoList_->advance();
}
break;
case ERROR_:
{
step_ = DONE_;
}
break;
case DONE_:
{
if (qparent_.up->isFull())
return WORK_OK;
// Return EOF.
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(0);
up_entry->upState.status = ex_queue::Q_NO_DATA;
// insert into parent
qparent_.up->insert();
step_ = INITIAL_;
qparent_.down->removeHead();
return WORK_OK;
}
break;
} // switch
} // while
return WORK_OK;
}