blob: 38b862029263f6e52281e9c05d91f80228b9c318 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: ExExeUtilMisc.cpp
* Description:
*
*
* Language: C++
*
*
*
*
*****************************************************************************
*/
#include "ComCextdecs.h"
#include "cli_stdh.h"
#include "ex_stdh.h"
#include "sql_id.h"
#include "ex_transaction.h"
#include "ComTdb.h"
#include "ex_tcb.h"
#include "ComSqlId.h"
#include "ExExeUtil.h"
#include "ex_exe_stmt_globals.h"
#include "exp_expr.h"
#include "exp_clause_derived.h"
#include "ExpLOB.h"
#include "ComRtUtils.h"
#include "ExStats.h"
#include "ComSmallDefs.h"
#include <unistd.h>
//////////////////////////////////////////////////////////
// classes defined in this file:
//
//////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
ex_tcb * ExExeUtilLongRunningTdb::build(ex_globals * glob)
{
ExExeUtilLongRunningTcb * exe_util_tcb;
exe_util_tcb =
new(glob->getSpace()) ExExeUtilLongRunningTcb(*this, glob);
exe_util_tcb->registerSubtasks();
return (exe_util_tcb);
}
////////////////////////////////////////////////////////////////
// Constructor for class ExExeUtilLongRunningTcb
///////////////////////////////////////////////////////////////
ExExeUtilLongRunningTcb::ExExeUtilLongRunningTcb(
const ComTdbExeUtilLongRunning & exe_util_tdb,
ex_globals * glob)
: ExExeUtilTcb( exe_util_tdb, NULL, glob)
{
// Allocate the private state in each entry of the down queue
qparent_.down->allocatePstate(this);
step_ = INITIAL_;
rowsDeleted_ = 0;
transactions_ = 0;
initial_ = 1;
initialOutputVarPtrList_ = NULL;
continuingOutputVarPtrList_ = NULL;
lruStmtAndPartInfo_= NULL;
lruStmtWithCKAndPartInfo_= NULL;
currTransaction_ = NULL;
}
ExExeUtilLongRunningTcb::~ExExeUtilLongRunningTcb()
{
}
Int32 ExExeUtilLongRunningTcb::fixup()
{
return ex_tcb::fixup();
}
short ExExeUtilLongRunningTcb::doLongRunning()
{
Lng32 cliRC =0;
short retcode = 0;
NABoolean xnAlreadyStarted = FALSE;
// Get the globals stucture of the master executor.
ExExeStmtGlobals *exeGlob = getGlobals()->castToExExeStmtGlobals();
ExMasterStmtGlobals *masterGlob = exeGlob->castToExMasterStmtGlobals();
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExExeUtilPrivateState & pstate =
*((ExExeUtilPrivateState*) pentry_down->pstate);
CliGlobals *cliGlobals = 0;
cliGlobals = GetCliGlobals();
ex_assert(cliGlobals != NULL, "Cli globals is NULL - should have been allocated already");
if (cliGlobals->isESPProcess())
{
if (!currTransaction_)
currTransaction_ = new (getHeap()) ExTransaction (cliGlobals, getHeap());
}
else // in master executor
{
currTransaction_ = masterGlob->getStatement()->getContext()->getTransaction();
}
if (currTransaction_->xnInProgress())
{
xnAlreadyStarted = TRUE;
}
// cannot run LRU when a user transaction is in progress
if (xnAlreadyStarted)
{
ExHandleErrors(qparent_,
pentry_down,
0,
getGlobals(),
NULL,
(ExeErrorCode)(-8603),
NULL,
exeUtilTdb().getTableName()
);
return (-8603);
}
SQL_EXEC_ClearDiagnostics(NULL);
// no Xn in progress. Start one.
cliRC = currTransaction_->beginTransaction ();
if (cliRC < 0)
{
ExHandleErrors(qparent_,
pentry_down,
0,
getGlobals(),
NULL,
(ExeErrorCode)(cliRC),
NULL,
exeUtilTdb().getTableName()
);
return (short) cliRC;
}
retcode = executeLongRunningQuery();
// Rollback the transaction, if there is an error.
if (retcode < 0)
{
// rollback the transaction
cliRC = currTransaction_->rollbackTransaction ();
return retcode;
}
else
{
// commit the transaction
cliRC = currTransaction_->commitTransaction ();
if (cliRC < 0)
{
ExHandleErrors(qparent_,
pentry_down,
0,
getGlobals(),
NULL,
(ExeErrorCode)(cliRC),
NULL,
exeUtilTdb().getTableName()
);
return short(cliRC);
}
addTransactionCount();
}
return retcode;
}
//////////////////////////////////////////////////////
// work() for ExExeUtilLongRunningTcb
//////////////////////////////////////////////////////
short ExExeUtilLongRunningTcb::work()
{
short rc = 0;
Lng32 cliRC = 0;
Int64 rowsDeleted = 0;
Int64 transactions = 0;
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
// if no room in up queue, won't be able to return data/status.
// Come back later.
if (qparent_.up->isFull())
return WORK_OK;
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExExeUtilPrivateState & pstate =
*((ExExeUtilPrivateState*) pentry_down->pstate);
// Get the globals stucture of the ESP if this is an ESP
ExExeStmtGlobals *exeGlob = getGlobals()->castToExExeStmtGlobals();
ExEspStmtGlobals *espGlob = exeGlob->castToExEspStmtGlobals();
Int32 espNum = 1;
// this is an ESP?
if (espGlob != NULL)
{
espNum = (Int32) espGlob->getMyInstanceNumber();
}
while (1)
{
switch (step_)
{
case INITIAL_:
{
step_ = LONG_RUNNING_;
}
break;
case LONG_RUNNING_:
{
rc = doLongRunning();
if ((rc < 0) || (rc == 100))
{
finalizeDoLongRunning();
if (rc <0)
step_ = ERROR_;
else // rc == 100 - done with all the transactions.
step_ = DONE_;
}
// continue in LONG_RUNNING_ state if (rc >= 0) - success and warning.
}
break;
case DONE_:
{
if (qparent_.up->isFull())
return WORK_OK;
// Return EOF.
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(0);
up_entry->upState.status = ex_queue::Q_NO_DATA;
// before sending the Q_NO_DATA, send the rowcount as well thro'
// the diagsArea.
getDiagsArea()->setRowCount(getRowsDeleted());
ComDiagsArea *diagsArea = getDiagAreaFromUpQueueTail();
if (lrTdb().longRunningQueryPlan())
{
(*diagsArea) << DgSqlCode(8450)
<< DgString0((char*)exeUtilTdb().getTableName())
<< DgInt0(espNum)
<< DgInt1((Lng32)getTransactionCount());
}
// insert into parent
qparent_.up->insert();
//pstate.matches_ = 0;
// reset the parameters.
step_ = INITIAL_;
transactions_ = 0;
rowsDeleted_ = 0;
initial_ = 1;
// clear diags if any
if (getDiagsArea())
{
getDiagsArea()->clear();
}
qparent_.down->removeHead();
return WORK_OK;
}
break;
case ERROR_:
{
if (qparent_.up->isFull())
return WORK_OK;
// Return EOF.
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(0);
up_entry->upState.status = ex_queue::Q_SQLERROR;
// get rows deleted so far.
getDiagsArea()->setRowCount(getRowsDeleted());
ComDiagsArea *diagsArea = up_entry->getDiagsArea();
if (diagsArea == NULL)
diagsArea =
ComDiagsArea::allocate(this->getGlobals()->getDefaultHeap());
else
diagsArea->incrRefCount (); // setDiagsArea call below will decr ref count
if (getDiagsArea())
diagsArea->mergeAfter(*getDiagsArea());
up_entry->setDiagsArea (diagsArea);
// insert into parent
qparent_.up->insert();
// clear diags if any, since we already sent the information
// up and don't want to send it again as part of DONE_
if (getDiagsArea())
{
rowsDeleted_ = 0;
getDiagsArea()->clear();
}
step_ = DONE_;
}
break;
} // switch
} // while
}
void ExExeUtilLongRunningTcb::registerSubtasks()
{
// register events for parent queue
ex_tcb::registerSubtasks();
}
ComDiagsArea *ExExeUtilLongRunningTcb::getDiagAreaFromUpQueueTail()
{
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
ComDiagsArea *diagsArea = up_entry->getDiagsArea();
if (diagsArea == NULL)
diagsArea = ComDiagsArea::allocate(this->getGlobals()->getDefaultHeap());
else
diagsArea->incrRefCount (); // setDiagsArea call below will decr ref count
// this is the side-effect of this function. Merge in this object's
// diagsarea.
if (getDiagsArea())
diagsArea->mergeAfter(*getDiagsArea());
up_entry->setDiagsArea (diagsArea);
return diagsArea;
}
short ExExeUtilLongRunningTcb::executeLongRunningQuery()
{
Lng32 rc = 0;
if (getInitial())
{
Lng32 cliRC =
cliInterface()->executeImmediate(
"control query default HIST_ON_DEMAND_STATS_SIZE '0'");
if (cliRC < 0)
{
cliInterface()->allocAndRetrieveSQLDiagnostics(diagsArea_);
return cliRC;
}
// Perform the initial processing
short rtc = processInitial(rc);
if ((rc != 0) && (rc != 100))
{
cliInterface()->allocAndRetrieveSQLDiagnostics(diagsArea_);
}
cliRC =
cliInterface()->executeImmediate(
"control query default HIST_ON_DEMAND_STATS_SIZE 'RESET'");
if (cliRC < 0)
{
cliInterface()->allocAndRetrieveSQLDiagnostics(diagsArea_);
return cliRC;
}
#ifdef _DEBUG
if (lrTdb().longRunningQueryPlan()) {
Int32 bufSize = 100 +
strlen(lruStmtAndPartInfo_)+ strlen(lruStmtWithCKAndPartInfo_);
char* lruQPInfo = new (getHeap()) char[bufSize];
// str_printf() does not support %ld. Use %d instead.
str_sprintf(lruQPInfo,
"Queries to be processed: \n\n%s\n\n%s\n\n\n Initial rows deleted: %ld",
lruStmtAndPartInfo_,
lruStmtWithCKAndPartInfo_,
getRowsDeleted());
ComDiagsArea * diagsArea = getDiagAreaFromUpQueueTail();
(*diagsArea) << DgSqlCode(8427) << DgString0(lruQPInfo);
NADELETEBASIC(lruQPInfo, getHeap());
}
#endif
setInitial(0);
}
else
{
// Perform the continuing processing
short rtc = processContinuing(rc);
if ((rc != 0) && (rc != 100))
{
cliInterface()->allocAndRetrieveSQLDiagnostics(diagsArea_);
}
#ifdef _DEBUG
if ((rc == 100 || rc > 0) && lrTdb().longRunningQueryPlan()) {
char lruQPInfo [100];
str_sprintf(lruQPInfo, "Total rows deleted: %ld\n\n", getRowsDeleted());
ComDiagsArea * diagsArea = getDiagAreaFromUpQueueTail();
(*diagsArea) << DgSqlCode(8427) << DgString0(lruQPInfo);
}
#endif
}
return (short )rc;
}
short ExExeUtilLongRunningTcb::finalizeDoLongRunning()
{
// Close the statements
Lng32 rc = cliInterface()->prepareAndExecRowsEpilogue();
NADELETE(initialOutputVarPtrList_, Queue, getHeap());
NADELETE(continuingOutputVarPtrList_, Queue, getHeap());
NADELETEBASIC(lruStmtAndPartInfo_, getHeap());
NADELETEBASIC(lruStmtWithCKAndPartInfo_, getHeap());
// if this is an ESP, deallocate transaction
CliGlobals *cliGlobals = GetCliGlobals();
if (cliGlobals->isESPProcess())
{
NADELETEBASIC(currTransaction_, getHeap());
currTransaction_ = NULL;
}
return 0;
}
short ExExeUtilLongRunningTcb::processInitial(Lng32 &rc)
{
Int64 rowsAffected = 0;
setInitialOutputVarPtrList(new(getHeap()) Queue(getHeap()));
setContinuingOutputVarPtrList(new(getHeap()) Queue(getHeap()));
lruStmtAndPartInfo_ = new(getHeap()) char[(UInt32)(lrTdb().getLruStmtLen() + lrTdb().getPredicateLen() + 10)];
ExExeStmtGlobals *exeGlob = getGlobals()->castToExExeStmtGlobals();
ExEspStmtGlobals *espGlob = exeGlob->castToExEspStmtGlobals();
ContextCli *currContext = NULL;
CliGlobals *cliGlobals = NULL;
if (espGlob)
{
cliGlobals = GetCliGlobals();
currContext = cliGlobals->currContext();
}
else
{
currContext = exeGlob->castToExMasterStmtGlobals()->getStatement()->getContext();
}
Int32 espNum = 1;
// we are executing inside an ESP go ahead and set the partition number to the
// ESP instance number. The added one is because ESP instances are 0 based
if (espGlob)
{
espNum = (Int32) espGlob->getMyInstanceNumber() + 1;
}
if (lrTdb().getPredicate() != NULL)
str_sprintf(lruStmtAndPartInfo_,
lrTdb().getLruStmt(),
espNum,
lrTdb().getPredicate());
else
str_sprintf(lruStmtAndPartInfo_,
lrTdb().getLruStmt(),
espNum);
lruStmtWithCKAndPartInfo_ = new(getHeap()) char[(UInt32)(lrTdb().getLruStmtWithCKLen() + lrTdb().getPredicateLen() + 10)];
if (lrTdb().getPredicate() != NULL)
str_sprintf(lruStmtWithCKAndPartInfo_,
lrTdb().getLruStmtWithCK(),
espNum,
lrTdb().getPredicate());
else
str_sprintf(lruStmtWithCKAndPartInfo_,
lrTdb().getLruStmtWithCK(),
espNum);
// All internal queries issued from CliInterface assume that
// they are in ISO_MAPPING.
// For LongRunning we need to send the actual ISO_MAPPING.
// Save it and restore after the prepare
Int32 savedIsoMapping =
cliInterface()->getIsoMapping();
cliInterface()->setIsoMapping
(currContext->getSessionDefaults()->getIsoMappingEnum());
// If the table we are deleting from is an IUD log table,
// we need to set the parserflags to accept the special
// table type and the quoted column names
if (lrTdb().useParserflags())
currContext->setSqlParserFlags(0x3);
rc = cliInterface()->prepareAndExecRowsPrologue(lruStmtAndPartInfo_,
lruStmtWithCKAndPartInfo_,
getInitialOutputVarPtrList(),
getContinuingOutputVarPtrList(),
rowsAffected);
cliInterface()->setIsoMapping(savedIsoMapping);
if (rc < 0)
{
return -1;
}
if (rc >= 0 && rowsAffected > 0)
addRowsDeleted(rowsAffected);
return 0;
}
short ExExeUtilLongRunningTcb::processContinuing(Lng32 &rc)
{
Int64 rowsAffected = 0;
rc = cliInterface()->execContinuingRows(getContinuingOutputVarPtrList(),
rowsAffected);
if (rc < 0)
{
return -1;
}
if (rowsAffected > 0)
addRowsDeleted(rowsAffected);
#ifdef _DEBUG
if ((rowsAffected > 0) && lrTdb().longRunningQueryPlan()) {
char lruQPInfo[100];
str_sprintf(lruQPInfo, "Continuing rows deleted: %ld\n\n",
rowsAffected);
ComDiagsArea * diagsArea = getDiagAreaFromUpQueueTail();
(*diagsArea) << DgSqlCode(8427) << DgString0(lruQPInfo);
}
#endif
return 0;
}
////////////////////////////////////////////////////////////////////////
// Redefine virtual method allocatePstates, to be used by dynamic queue
// resizing, as well as the initial queue construction.
////////////////////////////////////////////////////////////////////////
ex_tcb_private_state * ExExeUtilLongRunningTcb::allocatePstates(
Lng32 &numElems, // inout, desired/actual elements
Lng32 &pstateLength) // out, length of one element
{
PstateAllocator<ExExeUtilLongRunningPrivateState> pa;
return pa.allocatePstates(this, numElems, pstateLength);
}
/////////////////////////////////////////////////////////////////////////////
// Constructor and destructor for ExeUtil_private_state
/////////////////////////////////////////////////////////////////////////////
ExExeUtilLongRunningPrivateState::ExExeUtilLongRunningPrivateState()
{
}
ExExeUtilLongRunningPrivateState::~ExExeUtilLongRunningPrivateState()
{
};
///////////////////////////////////////////////////////////////////
// class ExExeUtilAQRTdb
///////////////////////////////////////////////////////////////
ex_tcb * ExExeUtilAQRTdb::build(ex_globals * glob)
{
ExExeUtilAQRTcb * exe_util_tcb;
exe_util_tcb = new(glob->getSpace()) ExExeUtilAQRTcb(*this, glob);
exe_util_tcb->registerSubtasks();
return (exe_util_tcb);
}
ExExeUtilAQRTcb::ExExeUtilAQRTcb(const ComTdbExeUtilAQR & exe_util_tdb,
ex_globals * glob)
: ExExeUtilTcb(exe_util_tdb, NULL, glob),
step_(EMPTY_)
{
}
short ExExeUtilAQRTcb::work()
{
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExExeUtilPrivateState & pstate =
*((ExExeUtilPrivateState*) pentry_down->pstate);
ContextCli *currContext =
getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals()->
getStatement()->getContext();
AQRInfo * aqr = currContext->aqrInfo();
while (1)
{
switch (step_)
{
case EMPTY_:
{
if (aqrTdb().getTask() == ComTdbExeUtilAQR::GET_)
{
// GET aqr entries and display them
aqr->position();
step_ = RETURN_HEADER_;
}
else
{
step_ = SET_ENTRY_;
}
}
break;
case SET_ENTRY_:
{
if (aqr->setAQREntry(aqrTdb().getTask(),
aqrTdb().sqlcode_, aqrTdb().nskcode_,
aqrTdb().retries_, aqrTdb().delay_,
aqrTdb().type_,
0, NULL, 0, 0))
{
// error
step_ = HANDLE_ERROR_;
break;
}
step_ = DONE_;
}
break;
case RETURN_HEADER_:
{
// if no room in up queue for 4 display rows,
// won't be able to return data/status.
// Come back later.
if ((qparent_.up->getSize() - qparent_.up->getLength()) < 5)
return -1;
moveRowToUpQueue(" ");
moveRowToUpQueue(" SQLCODE PLATFORMERROR RETRIES DELAY TYPE ");
moveRowToUpQueue("============================================================");
moveRowToUpQueue(" ");
step_ = RETURNING_ENTRY_;
}
break;
case RETURNING_ENTRY_:
{
// if no room in up queue, won't be able to return data/status.
// Come back later.
if (qparent_.up->isFull())
return WORK_OK;
Lng32 sqlcode, nskcode, retries, delay, type, intAQR;
Lng32 eof = 0;
eof =
aqr->getNextAQREntry(sqlcode, nskcode, retries, delay, type,
intAQR);
if (eof)
{
step_ = DONE_;
break;
}
// internal aqr entry, no need to display
if (intAQR)
break;
char formattedStr[400];
str_sprintf(formattedStr, "%9d %13d %9d %9d %9d",
sqlcode, nskcode, retries, delay, type);
moveRowToUpQueue(formattedStr);
}
break;
case HANDLE_ERROR_:
{
if (qparent_.up->isFull())
return WORK_OK;
ExHandleErrors(qparent_,
pentry_down,
0,
getGlobals(),
NULL,
(ExeErrorCode)8931,
NULL,
NULL
);
step_ = DONE_;
}
break;
case DONE_:
{
if (qparent_.up->isFull())
return WORK_OK;
// all ok. Return EOF.
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(0);
up_entry->upState.status = ex_queue::Q_NO_DATA;
// insert into parent
qparent_.up->insert();
step_ = EMPTY_;
qparent_.down->removeHead();
return WORK_OK;
}
break;
} // switch
}
return 0;
}
///////////////////////////////////////////////////////////////////
ex_tcb * ExExeUtilPopulateInMemStatsTdb::build(ex_globals * glob)
{
ex_tcb * exe_util_tcb;
exe_util_tcb = new(glob->getSpace()) ExExeUtilPopulateInMemStatsTcb(*this, glob);
exe_util_tcb->registerSubtasks();
return (exe_util_tcb);
}
////////////////////////////////////////////////////////////////
// Constructor for class ExExeUtilPopulateInMemStatsTcb
///////////////////////////////////////////////////////////////
ExExeUtilPopulateInMemStatsTcb::ExExeUtilPopulateInMemStatsTcb(
const ComTdbExeUtilPopulateInMemStats & exe_util_tdb,
ex_globals * glob)
: ExExeUtilTcb( exe_util_tdb, NULL, glob)
{
// Allocate the private state in each entry of the down queue
qparent_.down->allocatePstate(this);
step_ = INITIAL_;
}
ExExeUtilPopulateInMemStatsTcb::~ExExeUtilPopulateInMemStatsTcb()
{
}
//////////////////////////////////////////////////////
// work() for ExExeUtilPopulateInMemStatsTcb
//////////////////////////////////////////////////////
static const QueryString deleteStatsQuery[] =
{
{" delete from %s "},
{" where table_uid = %ld"}
};
static const QueryString populateHistogramsStatsQuery[] =
{
{" insert into %s "},
{" select "},
{" %ld, "},
{" histogram_id, "},
{" col_position, column_number, colcount, interval_count, rowcount, "},
{" total_uec, stats_time, low_value, high_value, read_time, read_count, "},
{" sample_secs, col_secs, sample_percent, "},
{" cv, reason, v1, v2, v3, v4, v5, v6 "},
{" from "},
{" %s "},
{" where table_uid = "},
{" ( "},
{" select object_uid from "},
{" HP_SYSTEM_CATALOG.system_schema.catsys C, "},
{" HP_SYSTEM_CATALOG.system_schema.schemata S, "},
{" %s.HP_DEFINITION_SCHEMA.objects O "},
{" where "},
{" C.cat_name = '%s' "},
{" and S.schema_name = '%s' "},
{" and C.cat_uid = S.cat_uid "},
{" and S.schema_uid = O.schema_uid "},
{" and O.object_name = '%s' "},
{" ) "}
};
static const QueryString populateHistintsStatsQuery[] =
{
{" insert into %s "},
{" select "},
{" %ld, "},
{" histogram_id, "},
{" interval_number, interval_rowcount, interval_uec, interval_boundary, "},
{" std_dev_of_freq, "},
{" v1, v2, v3, v4, v5, v6 "},
{" from "},
{" %s "},
{" where table_uid = "},
{" ( "},
{" select object_uid from "},
{" HP_SYSTEM_CATALOG.system_schema.catsys C, "},
{" HP_SYSTEM_CATALOG.system_schema.schemata S, "},
{" %s.HP_DEFINITION_SCHEMA.objects O "},
{" where "},
{" C.cat_name = '%s' "},
{" and S.schema_name = '%s' "},
{" and C.cat_uid = S.cat_uid "},
{" and S.schema_uid = O.schema_uid "},
{" and O.object_name = '%s' "},
{" ) "}
};
short ExExeUtilPopulateInMemStatsTcb::work()
{
// short rc = 0;
Lng32 cliRC = 0;
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
// if no room in up queue, won't be able to return data/status.
// Come back later.
if (qparent_.up->isFull())
return WORK_OK;
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExExeUtilPrivateState & pstate =
*((ExExeUtilPrivateState*) pentry_down->pstate);
// Get the globals stucture of the master executor.
ExExeStmtGlobals *exeGlob = getGlobals()->castToExExeStmtGlobals();
ExMasterStmtGlobals *masterGlob = exeGlob->castToExMasterStmtGlobals();
ContextCli * currContext = masterGlob->getStatement()->getContext();
while (1)
{
switch (step_)
{
case INITIAL_:
{
if (getDiagsArea())
{
getDiagsArea()->clear();
getDiagsArea()->deAllocate();
}
setDiagsArea(ComDiagsArea::allocate(getHeap()));
step_ = PROLOGUE_;
}
break;
case PROLOGUE_:
{
if (disableCQS())
{
step_ = ERROR_;
break;
}
if (setSchemaVersion(pimsTdb().sourceTableCatName_))
{
step_ = ERROR_;
break;
}
// set sqlparserflags to allow use of volatile schema in queries.
masterGlob->getStatement()->getContext()->
setSqlParserFlags(0x10000);//ALLOW_VOLATILE_SCHEMA_IN_TABLE_NAME
step_ = DELETE_STATS_;
}
break;
case DELETE_STATS_:
{
Int32 qry_array_size = sizeof(deleteStatsQuery)
/ sizeof(QueryString);
const QueryString * queryString = deleteStatsQuery;;
char * gluedQuery;
Lng32 gluedQuerySize;
glueQueryFragments(qry_array_size, queryString,
gluedQuery, gluedQuerySize);
Lng32 extraSpace =
ComMAX_3_PART_EXTERNAL_UTF8_NAME_LEN_IN_BYTES /* fullyQualTableName */
+ 20 /* UID */
+ 200 /* overhead */;
char * query = new(getHeap()) char[gluedQuerySize + extraSpace];
str_sprintf(query, gluedQuery,
(char*)pimsTdb().inMemHistogramsTableName_,
pimsTdb().uid_);
cliRC =
cliInterface()->executeImmediate(query);
if (cliRC >= 0)
{
str_sprintf(query, gluedQuery,
(char*)pimsTdb().inMemHistintsTableName_,
pimsTdb().uid_);
cliRC =
cliInterface()->executeImmediate(query);
}
// Delete new'd string
NADELETEBASIC(gluedQuery, getHeap());
gluedQuery = NULL;
NADELETEBASIC(query, getHeap());
query = NULL;
if (cliRC < 0)
{
cliInterface()->allocAndRetrieveSQLDiagnostics(diagsArea_);
step_ = ERROR_;
}
else
step_ = POPULATE_HISTOGRAMS_STATS_;
}
break;
case POPULATE_HISTOGRAMS_STATS_:
{
Int32 qry_array_size = sizeof(populateHistogramsStatsQuery)
/ sizeof(QueryString);
const QueryString * queryString = populateHistogramsStatsQuery;;
char * gluedQuery;
Lng32 gluedQuerySize;
glueQueryFragments(qry_array_size, queryString,
gluedQuery, gluedQuerySize);
Lng32 extraSpace =
ComMAX_3_PART_EXTERNAL_UTF8_NAME_LEN_IN_BYTES /* fullyQualInMemHistTableName */
+ 20 /* UID */
+ ComMAX_3_PART_EXTERNAL_UTF8_NAME_LEN_IN_BYTES /* fullyQualSourceHistTableName */
+ 2 * 10 /*segment name*/
+ ComMAX_1_PART_EXTERNAL_UTF8_NAME_LEN_IN_BYTES /*cat name*/
+ 10 /*version*/
+ ComMAX_1_PART_EXTERNAL_UTF8_NAME_LEN_IN_BYTES /*cat name*/
+ ComMAX_1_PART_EXTERNAL_UTF8_NAME_LEN_IN_BYTES /*sch name*/
+ ComMAX_1_PART_EXTERNAL_UTF8_NAME_LEN_IN_BYTES /*obj name*/
+ 200 /* overhead */;
char * query = new(getHeap()) char[gluedQuerySize + extraSpace];
str_sprintf(query, gluedQuery,
(char*)pimsTdb().inMemHistogramsTableName_,
pimsTdb().uid_,
(char*)pimsTdb().sourceHistogramsTableName_,
(char*)pimsTdb().sourceTableCatName_,
(char*)pimsTdb().sourceTableCatName_,
(char*)pimsTdb().sourceTableSchName_,
(char*)pimsTdb().sourceTableObjName_
);
cliRC =
cliInterface()->executeImmediate(query);
// Delete new'd string
NADELETEBASIC(gluedQuery, getHeap());
gluedQuery = NULL;
NADELETEBASIC(query, getHeap());
query = NULL;
if (cliRC < 0)
{
cliInterface()->allocAndRetrieveSQLDiagnostics(diagsArea_);
step_ = ERROR_;
}
else
step_ = POPULATE_HISTINTS_STATS_;
}
break;
case POPULATE_HISTINTS_STATS_:
{
Int32 qry_array_size = sizeof(populateHistintsStatsQuery)
/ sizeof(QueryString);
const QueryString * queryString = populateHistintsStatsQuery;;
char * gluedQuery;
Lng32 gluedQuerySize;
glueQueryFragments(qry_array_size, queryString,
gluedQuery, gluedQuerySize);
Lng32 extraSpace =
ComMAX_3_PART_EXTERNAL_UTF8_NAME_LEN_IN_BYTES /* fullyQualInMemHistTableName */
+ 20 /* UID */
+ ComMAX_3_PART_EXTERNAL_UTF8_NAME_LEN_IN_BYTES /* fullyQualSourceHistTableName */
+ 2 * 10 /*segment name*/
+ ComMAX_1_PART_EXTERNAL_UTF8_NAME_LEN_IN_BYTES /*cat name*/
+ 10 /*version*/
+ ComMAX_1_PART_EXTERNAL_UTF8_NAME_LEN_IN_BYTES /*cat name*/
+ ComMAX_1_PART_EXTERNAL_UTF8_NAME_LEN_IN_BYTES /*sch name*/
+ ComMAX_1_PART_EXTERNAL_UTF8_NAME_LEN_IN_BYTES /*obj name*/
+ 200 /* overhead */;
char * query = new(getHeap()) char[gluedQuerySize + extraSpace];
str_sprintf(query, gluedQuery,
(char*)pimsTdb().inMemHistintsTableName_,
pimsTdb().uid_,
(char*)pimsTdb().sourceHistintsTableName_,
(char*)pimsTdb().sourceTableCatName_,
(char*)pimsTdb().sourceTableCatName_,
(char*)pimsTdb().sourceTableSchName_,
(char*)pimsTdb().sourceTableObjName_
);
cliRC =
cliInterface()->executeImmediate(query);
// Delete new'd string
NADELETEBASIC(gluedQuery, getHeap());
gluedQuery = NULL;
NADELETEBASIC(query, getHeap());
query = NULL;
if (cliRC < 0)
{
cliInterface()->allocAndRetrieveSQLDiagnostics(diagsArea_);
step_ = ERROR_;
}
else
step_ = EPILOGUE_;
}
break;
case EPILOGUE_:
case EPILOGUE_AND_ERROR_RETURN_:
{
// reset sqlparserflags
masterGlob->getStatement()->getContext()->
resetSqlParserFlags(0x10000);//ALLOW_VOLATILE_SCHEMA_IN_TABLE_NAME
restoreCQS();
if (step_ == EPILOGUE_AND_ERROR_RETURN_)
step_ = ERROR_RETURN_;
else
step_ = DONE_;
}
break;
case ERROR_:
{
step_ = EPILOGUE_AND_ERROR_RETURN_;
}
break;
case ERROR_RETURN_:
{
if (qparent_.up->isFull())
return WORK_OK;
// Return EOF.
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(0);
up_entry->upState.status = ex_queue::Q_SQLERROR;
ComDiagsArea *diagsArea = up_entry->getDiagsArea();
if (diagsArea == NULL)
diagsArea =
ComDiagsArea::allocate(this->getGlobals()->getDefaultHeap());
if (getDiagsArea())
diagsArea->mergeAfter(*getDiagsArea());
up_entry->setDiagsArea (diagsArea);
// insert into parent
qparent_.up->insert();
step_ = DONE_;
}
break;
case DONE_:
{
if (qparent_.up->isFull())
return WORK_OK;
// Return EOF.
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(0);
up_entry->upState.status = ex_queue::Q_NO_DATA;
// insert into parent
qparent_.up->insert();
qparent_.down->removeHead();
step_ = INITIAL_;
return WORK_OK;
}
break;
default:
break;
}
}
return 0;
}
////////////////////////////////////////////////////////////////////////
// Redefine virtual method allocatePstates, to be used by dynamic queue
// resizing, as well as the initial queue construction.
////////////////////////////////////////////////////////////////////////
ex_tcb_private_state * ExExeUtilPopulateInMemStatsTcb::allocatePstates(
Lng32 &numElems, // inout, desired/actual elements
Lng32 &pstateLength) // out, length of one element
{
PstateAllocator<ExExeUtilPopulateInMemStatsPrivateState> pa;
return pa.allocatePstates(this, numElems, pstateLength);
}
/////////////////////////////////////////////////////////////////////////////
// Constructor and destructor for ExeUtil_private_state
/////////////////////////////////////////////////////////////////////////////
ExExeUtilPopulateInMemStatsPrivateState::ExExeUtilPopulateInMemStatsPrivateState()
{
}
ExExeUtilPopulateInMemStatsPrivateState::~ExExeUtilPopulateInMemStatsPrivateState()
{
};
////////////////////////////////////////////////////////////////
// Constructor for class ExExeUtilHiveTruncateTdb
///////////////////////////////////////////////////////////////
ex_tcb * ExExeUtilHiveTruncateTdb::build(ex_globals * glob)
{
ExExeUtilTcb * exe_util_tcb;
if (getIsLegacy())
exe_util_tcb = new(glob->getSpace()) ExExeUtilHiveTruncateLegacyTcb(*this, glob);
else
exe_util_tcb = new(glob->getSpace()) ExExeUtilHiveTruncateTcb(*this, glob);
exe_util_tcb->registerSubtasks();
return (exe_util_tcb);
}
////////////////////////////////////////////////////////////////
// Constructor for class ExExeUtilHiveTruncateLegacyTcb
///////////////////////////////////////////////////////////////
ExExeUtilHiveTruncateLegacyTcb::ExExeUtilHiveTruncateLegacyTcb(
const ComTdbExeUtilHiveTruncate & exe_util_tdb,
ex_globals * glob)
: ExExeUtilTcb( exe_util_tdb, NULL, glob)
{
// Allocate the private state in each entry of the down queue
qparent_.down->allocatePstate(this);
numExistingFiles_ = 0;
step_ = INITIAL_;
}
ExExeUtilHiveTruncateLegacyTcb::~ExExeUtilHiveTruncateLegacyTcb()
{
freeResources();
}
void ExExeUtilHiveTruncateLegacyTcb::freeResources()
{
if (htTdb().getDropOnDealloc())
{
NAString hiveDropDDL("drop table ");
hiveDropDDL += htTdb().getHiveTableName();
// TODO: is it ok to ignore the error
HiveClient_JNI::executeHiveSQL(hiveDropDDL);
}
if (lobGlob_) {
ExpLOBinterfaceCleanup(lobGlob_);
lobGlob_ = NULL;
}
}
Int32 ExExeUtilHiveTruncateLegacyTcb::fixup()
{
lobGlob_ = NULL;
ExpLOBinterfaceInit
(lobGlob_, (NAHeap *)getGlobals()->getDefaultHeap(),
getGlobals()->castToExExeStmtGlobals()->getContext(),FALSE,
htTdb().getHdfsHost(),
htTdb().getHdfsPort());
return 0;
}
//////////////////////////////////////////////////////
// work() for ExExeUtilHiveTruncateLegacyTcb
//////////////////////////////////////////////////////
short ExExeUtilHiveTruncateLegacyTcb::work()
{
short rc = 0;
Lng32 cliRC = 0;
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
// if no room in up queue, won't be able to return data/status.
// Come back later.
if (qparent_.up->isFull())
return WORK_OK;
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExExeUtilPrivateState & pstate = *((ExExeUtilPrivateState*) pentry_down->pstate);
while (1)
{
switch (step_)
{
case INITIAL_:
{
step_ = EMPTY_DIRECTORY_;
}
break;
case EMPTY_DIRECTORY_:
{
cliRC = ExpLOBinterfaceEmptyDirectory(
lobGlob_,
(char*)"", //name is empty
(htTdb().getPartnLocation() ?
htTdb().getPartnLocation() :
htTdb().getTableLocation()),
Lob_HDFS_File,
htTdb().getHdfsHost(),
htTdb().getHdfsPort(),
0 ,
1 ,
0);
if (cliRC != 0)
{
Lng32 cliError = 0;
Lng32 intParam1 = -cliRC;
ExRaiseSqlError(getHeap(), &diagsArea_,
(ExeErrorCode)(EXE_ERROR_FROM_LOB_INTERFACE),
NULL, &intParam1,
&cliError,
NULL,
"HDFS",
(char*)"ExpLOBInterfaceEmptyDirectory",
getLobErrStr(intParam1));
char reason[200];
strcpy(reason, " ");
if (intParam1 == LOB_DIR_NAME_ERROR)
{
if (htTdb().getPartnLocation())
strcpy(reason, "Reason: specified partition does not exist");
else
strcpy(reason, "Reason: specified table location does not exist");
}
else if (intParam1 == LOB_DATA_FILE_DELETE_ERROR)
{
strcpy(reason, "Reason: error occurred during deletion of one or more files at the specified location");
}
ExRaiseSqlError(getHeap(), &diagsArea_,
(ExeErrorCode)(EXE_HIVE_TRUNCATE_ERROR), NULL,
NULL, NULL, NULL,
reason,
NULL, NULL);
step_ = ERROR_;
}
else
{
step_= DONE_;
}
}
break;
case ERROR_:
{
if (handleError())
return WORK_OK;
step_ = DONE_;
}
break;
case DONE_:
{
if (handleDone())
return WORK_OK;
step_ = INITIAL_;
return WORK_OK;
}
break;
} // switch
} // while
}
ex_tcb_private_state * ExExeUtilHiveTruncateLegacyTcb::allocatePstates(
Lng32 &numElems, // inout, desired/actual elements
Lng32 &pstateLength) // out, length of one element
{
PstateAllocator<ExExeUtilHiveTruncateLegacyPrivateState> pa;
return pa.allocatePstates(this, numElems, pstateLength);
}
/////////////////////////////////////////////////////////////////////////////
// Constructor and destructor for ExeUtil_private_state
/////////////////////////////////////////////////////////////////////////////
ExExeUtilHiveTruncateLegacyPrivateState::ExExeUtilHiveTruncateLegacyPrivateState()
{
}
ExExeUtilHiveTruncateLegacyPrivateState::~ExExeUtilHiveTruncateLegacyPrivateState()
{
};
////////////////////////////////////////////////////////////////
// Constructor for class ExExeUtilHiveTruncateTcb
///////////////////////////////////////////////////////////////
ExExeUtilHiveTruncateTcb::ExExeUtilHiveTruncateTcb(
const ComTdbExeUtilHiveTruncate & exe_util_tdb,
ex_globals * glob)
: ExExeUtilTcb( exe_util_tdb, NULL, glob)
{
// Allocate the private state in each entry of the down queue
qparent_.down->allocatePstate(this);
step_ = INITIAL_;
}
ExExeUtilHiveTruncateTcb::~ExExeUtilHiveTruncateTcb()
{
freeResources();
}
void ExExeUtilHiveTruncateTcb::freeResources()
{
if (htTdb().getDropOnDealloc())
{
NAString hiveDropDDL("drop table ");
hiveDropDDL += htTdb().getHiveTableName();
// TODO: is it ok to ignore the error
HiveClient_JNI::executeHiveSQL(hiveDropDDL);
}
}
//////////////////////////////////////////////////////
// work() for ExExeUtilHiveTruncateTcb
//////////////////////////////////////////////////////
short ExExeUtilHiveTruncateTcb::work()
{
short rc = 0;
Lng32 cliRC = 0;
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
// if no room in up queue, won't be able to return data/status.
// Come back later.
if (qparent_.up->isFull())
return WORK_OK;
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExExeUtilPrivateState & pstate = *((ExExeUtilPrivateState*) pentry_down->pstate);
while (1)
{
switch (step_)
{
case INITIAL_:
{
// if 'if exists' clause was specified and table does not exist
// during compile phase, return.
// If table was missing during compile and was created before
// execute, then QI/AQR/Timestamp check will recompile.
if (htTdb().getIfExists() && htTdb().getTableNotExists())
{
step_ = DONE_;
break;
}
if (htTdb().getIsExternal())
step_ = ALTER_TO_MANAGED_;
else
step_ = TRUNCATE_TABLE_;
}
break;
case ALTER_TO_MANAGED_:
{
// A Hive table can be an External or Managed table.
// Currently, an External Hive table cannot be truncated.
// Maybe some future Hive version will allow that.
// Temporarily change the table attribute to be Managed,
// truncate the table and then change it back to be External.
NAString alterStmt("alter table ");
alterStmt += htTdb().getHiveTableName();
alterStmt += " set tblproperties ('EXTERNAL'='False')";
if (HiveClient_JNI::executeHiveSQL(alterStmt.data()) != HVC_OK)
{
// alter failed
ExRaiseSqlError(getHeap(), &diagsArea_, -1214,
NULL, NULL, NULL,
getSqlJniErrorStr(), htTdb().getHiveTruncQuery());
step_ = ERROR_;
break;
}
step_ = TRUNCATE_TABLE_;
}
break;
case TRUNCATE_TABLE_:
{
if (HiveClient_JNI::executeHiveSQL(htTdb().getHiveTruncQuery()) != HVC_OK)
{
ExRaiseSqlError(getHeap(), &diagsArea_, -1214,
NULL, NULL, NULL,
getSqlJniErrorStr(), htTdb().getHiveTruncQuery());
if (htTdb().getIsExternal())
{
step_ = ALTER_TO_EXTERNAL_AND_ERROR_;
break;
}
step_ = ERROR_;
break;
}
if (htTdb().getIsExternal())
step_ = ALTER_TO_EXTERNAL_;
else
step_= DONE_;
}
break;
case ALTER_TO_EXTERNAL_:
case ALTER_TO_EXTERNAL_AND_ERROR_:
{
// table was altered to Managed. Alter it back to External.
NAString alterStmt("alter table ");
alterStmt += htTdb().getHiveTableName();
alterStmt += " set tblproperties ('EXTERNAL'='TRUE')";
if (HiveClient_JNI::executeHiveSQL(alterStmt.data()) != HVC_OK)
{
// alter failed
ExRaiseSqlError(getHeap(), &diagsArea_, -1214,
NULL, NULL, NULL,
getSqlJniErrorStr(), alterStmt.data());
step_ = ERROR_;
break;
}
if (step_ == ALTER_TO_EXTERNAL_AND_ERROR_)
step_ = ERROR_;
else
step_ = DONE_;
}
break;
case ERROR_:
{
if (handleError())
return WORK_OK;
step_ = DONE_;
}
break;
case DONE_:
{
if (handleDone())
return WORK_OK;
step_ = INITIAL_;
return WORK_OK;
}
break;
} // switch
} // while
}
ex_tcb_private_state * ExExeUtilHiveTruncateTcb::allocatePstates(
Lng32 &numElems, // inout, desired/actual elements
Lng32 &pstateLength) // out, length of one element
{
PstateAllocator<ExExeUtilHiveTruncatePrivateState> pa;
return pa.allocatePstates(this, numElems, pstateLength);
}
/////////////////////////////////////////////////////////////////////////////
// Constructor and destructor for ExeUtil_private_state
/////////////////////////////////////////////////////////////////////////////
ExExeUtilHiveTruncatePrivateState::ExExeUtilHiveTruncatePrivateState()
{
}
ExExeUtilHiveTruncatePrivateState::~ExExeUtilHiveTruncatePrivateState()
{
};
///////////////////////////////////////////////////////////////////////
// ExExeUtilHiveQueryTdb
///////////////////////////////////////////////////////////////////////
ex_tcb * ExExeUtilHiveQueryTdb::build(ex_globals * glob)
{
ExExeUtilTcb * exe_util_tcb;
exe_util_tcb = new(glob->getSpace()) ExExeUtilHiveQueryTcb(*this, glob);
exe_util_tcb->registerSubtasks();
return (exe_util_tcb);
}
ExExeUtilHiveQueryTcb::ExExeUtilHiveQueryTcb(
const ComTdbExeUtilHiveQuery & exe_util_tdb,
ex_globals * glob)
: ExExeUtilTcb( exe_util_tdb, NULL, glob)
{
qparent_.down->allocatePstate(this);
step_ = INITIAL_;
}
ExExeUtilHiveQueryTcb::~ExExeUtilHiveQueryTcb()
{
}
short ExExeUtilHiveQueryTcb::work()
{
short rc = 0;
Lng32 cliRC = 0;
if (qparent_.down->isEmpty())
return WORK_OK;
if (qparent_.up->isFull())
return WORK_OK;
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExExeUtilPrivateState & pstate = *((ExExeUtilPrivateState*) pentry_down->pstate);
while (1)
{
switch (step_)
{
case INITIAL_:
{
step_ = PROCESS_QUERY_;
}
break;
case PROCESS_QUERY_:
{
if (HiveClient_JNI::executeHiveSQL(htTdb().getHiveQuery()) != HVC_OK)
{
ExRaiseSqlError(getHeap(), &diagsArea_, -1214,
NULL, NULL, NULL,
getSqlJniErrorStr(), htTdb().getHiveQuery());
step_ = ERROR_;
break;
}
step_ = DONE_;
}
break;
case ERROR_:
{
if (handleError())
return WORK_OK;
step_ = DONE_;
}
break;
case DONE_:
{
if (handleDone())
return WORK_OK;
step_ = INITIAL_;
return WORK_OK;
}
break;
} // switch
} // while
}
ex_tcb_private_state * ExExeUtilHiveQueryTcb::allocatePstates(
Lng32 &numElems, // inout, desired/actual elements
Lng32 &pstateLength) // out, length of one element
{
PstateAllocator<ExExeUtilHiveQueryPrivateState> pa;
return pa.allocatePstates(this, numElems, pstateLength);
}
ExExeUtilHiveQueryPrivateState::ExExeUtilHiveQueryPrivateState()
{
}
ExExeUtilHiveQueryPrivateState::~ExExeUtilHiveQueryPrivateState()
{
};