blob: 702dd12678896680c6a5aae746c3d42af838ff60 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: ExExeUtilLoad.cpp
* Description:
*
*
* Language: C++
*
*
*
*
*****************************************************************************
*/
#include <iostream>
using std::cerr;
using std::endl;
#include <fstream>
using std::ofstream;
#include <stdio.h>
#include "ComCextdecs.h"
#include "cli_stdh.h"
#include "ex_stdh.h"
#include "sql_id.h"
#include "ex_transaction.h"
#include "ComTdb.h"
#include "ex_tcb.h"
#include "ComSqlId.h"
#include "ExExeUtil.h"
#include "ex_exe_stmt_globals.h"
#include "exp_expr.h"
#include "exp_clause_derived.h"
#include "ComRtUtils.h"
#include "ExStats.h"
#include "ExpLOB.h"
#include "ExpLOBenums.h"
#include "ExpLOBinterface.h"
#include "ExpLOBexternal.h"
#include "str.h"
#include "ExpHbaseInterface.h"
#include "ExHbaseAccess.h"
#include "ExpErrorEnums.h"
#include "HdfsClient_JNI.h"
///////////////////////////////////////////////////////////////////
ex_tcb * ExExeUtilCreateTableAsTdb::build(ex_globals * glob)
{
ExExeUtilCreateTableAsTcb * exe_util_tcb;
exe_util_tcb = new(glob->getSpace()) ExExeUtilCreateTableAsTcb(*this, glob);
exe_util_tcb->registerSubtasks();
return (exe_util_tcb);
}
////////////////////////////////////////////////////////////////
// Constructor for class ExExeUtilCreateTableAsTcb
///////////////////////////////////////////////////////////////
ExExeUtilCreateTableAsTcb::ExExeUtilCreateTableAsTcb(
const ComTdbExeUtil & exe_util_tdb,
ex_globals * glob)
: ExExeUtilTcb( exe_util_tdb, NULL, glob),
step_(INITIAL_),
tableExists_(FALSE)
{
// Allocate the private state in each entry of the down queue
qparent_.down->allocatePstate(this);
}
//////////////////////////////////////////////////////
// work() for ExExeUtilCreateTableAsTcb
//////////////////////////////////////////////////////
short ExExeUtilCreateTableAsTcb::work()
{
Lng32 cliRC = 0;
short retcode = 0;
Int64 rowsAffected = 0;
NABoolean redriveCTAS = FALSE;
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
// if no room in up queue, won't be able to return data/status.
// Come back later.
if (qparent_.up->isFull())
return WORK_OK;
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExExeUtilPrivateState & pstate =
*((ExExeUtilPrivateState*) pentry_down->pstate);
// Get the globals stucture of the master executor.
ExExeStmtGlobals *exeGlob = getGlobals()->castToExExeStmtGlobals();
ExMasterStmtGlobals *masterGlob = exeGlob->castToExMasterStmtGlobals();
ContextCli *currContext = masterGlob->getStatement()->getContext();
ExTransaction *ta = getGlobals()->castToExExeStmtGlobals()->
castToExMasterStmtGlobals()->getStatement()->getContext()->getTransaction();
while (1)
{
switch (step_)
{
case INITIAL_:
{
NABoolean xnAlreadyStarted = ta->xnInProgress();
// allow a user transaction if NO LOAD was specified
if (xnAlreadyStarted && !ctaTdb().noLoad())
{
*getDiagsArea() << DgSqlCode(-20123)
<< DgString0("This DDL operation");
step_ = ERROR_;
break;
}
doSidetreeInsert_ = TRUE;
if (xnAlreadyStarted)
doSidetreeInsert_ = FALSE;
else if ( ctaTdb().siQuery_ == (NABasicPtr) NULL )
doSidetreeInsert_ = FALSE;
tableExists_ = FALSE;
if (ctaTdb().ctQuery_)
step_ = CREATE_;
else
step_ = ALTER_TO_NOAUDIT_;
}
break;
case CREATE_:
{
tableExists_ = FALSE;
// issue the create table command
cliRC = cliInterface()->executeImmediate(
ctaTdb().ctQuery_);
if (cliRC < 0)
{
if (((cliRC == -1055) || // SQ table err msg
(cliRC == -1390)) && // Traf err msg
(ctaTdb().loadIfExists()))
{
SQL_EXEC_ClearDiagnostics(NULL);
tableExists_ = TRUE;
if (ctaTdb().deleteData())
step_ = DELETE_DATA_;
else
step_ = ALTER_TO_NOAUDIT_;
break;
}
else
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
step_ = ERROR_;
break;
}
}
// a transaction may not have existed prior to the create stmt,
// but if autocommit was off, a transaction would now exist.
// turn off sidetree insert.
if (ta->xnInProgress())
doSidetreeInsert_ = FALSE;
if (ctaTdb().noLoad())
step_ = DONE_;
else
step_ = ALTER_TO_NOAUDIT_;
}
break;
case DELETE_DATA_:
case DELETE_DATA_AND_ERROR_:
{
char * ddQuery =
new(getMyHeap()) char[strlen("DELETE DATA FROM; ") +
strlen(ctaTdb().getTableName()) +
100];
strcpy(ddQuery, "DELETE DATA FROM ");
strcat(ddQuery, ctaTdb().getTableName());
strcat(ddQuery, ";");
cliRC = cliInterface()->executeImmediate(ddQuery, NULL,NULL,TRUE,NULL,TRUE);
// Delete new'd characters
NADELETEBASIC(ddQuery, getHeap());
ddQuery = NULL;
if (cliRC < 0)
{
if (step_ == DELETE_DATA_)
{
step_ = ERROR_;
break;
}
// delete data returned an error.
// As a last resort, drop this table.
SQL_EXEC_ClearDiagnostics(NULL);
step_ = DROP_AND_ERROR_;
break;
}
if (step_ == DELETE_DATA_AND_ERROR_)
{
if (doSidetreeInsert_)
{
cliRC = changeAuditAttribute(ctaTdb().getTableName(),TRUE);
}
step_ = ERROR_;
}
else
step_ = ALTER_TO_NOAUDIT_;
}
break;
case ALTER_TO_NOAUDIT_:
{
if (NOT doSidetreeInsert_)
{
step_ = INSERT_VSBB_;
break;
}
step_ = INSERT_SIDETREE_;
}
break;
case INSERT_SIDETREE_:
{
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
ComDiagsArea *diagsArea = up_entry->getDiagsArea();
// issue the insert command
cliInterface()->clearGlobalDiags();
// All internal queries issued from CliInterface assume that
// they are in ISO_MAPPING.
// That causes mxcmp to use the default charset as iso88591
// for unprefixed literals.
// The insert...select being issued out here contains the user
// specified query and any literals in that should be using
// the default_charset.
// So we send the isoMapping charset instead of the
// enum ISO_MAPPING.
Int32 savedIsoMapping =
currContext->getSessionDefaults()->getIsoMappingEnum();
cliInterface()->setIsoMapping
(currContext->getSessionDefaults()->getIsoMappingEnum());
redriveCTAS = currContext->getSessionDefaults()->getRedriveCTAS();
if (redriveCTAS)
{
if (childQueryId_ == NULL)
childQueryId_ = new (getHeap()) char[ComSqlId::MAX_QUERY_ID_LEN+1];
childQueryIdLen_ = ComSqlId::MAX_QUERY_ID_LEN;
cliRC = cliInterface()->executeImmediatePrepare2(
ctaTdb().siQuery_,
childQueryId_,
&childQueryIdLen_,
&childQueryCostInfo_,
&childQueryCompStatsInfo_,
NULL, NULL,
&rowsAffected,TRUE);
if (cliRC >= 0)
{
childQueryId_[childQueryIdLen_] = '\0';
Statement *ctasStmt = masterGlob->getStatement();
cliRC = ctasStmt->setChildQueryInfo(diagsArea,
childQueryId_, childQueryIdLen_,
&childQueryCostInfo_, &childQueryCompStatsInfo_);
if (cliRC < 0)
{
step_ = HANDLE_ERROR_;
break;
}
if (outputBuf_ == NULL)
outputBuf_ = new (getHeap()) char[ComSqlId::MAX_QUERY_ID_LEN+100]; //
str_sprintf(outputBuf_, "childQidBegin: %s ", childQueryId_);
moveRowToUpQueue(outputBuf_);
step_ = INSERT_SIDETREE_EXECUTE_;
return WORK_RESCHEDULE_AND_RETURN;
}
}
else
{
cliRC = cliInterface()->executeImmediatePrepare(
ctaTdb().siQuery_,
NULL, NULL,
&rowsAffected,TRUE);
}
cliInterface()->setIsoMapping(savedIsoMapping);
if (cliRC < 0)
{
// sidetree insert prepare failed.
// Try vsbb insert
step_ = ALTER_TO_AUDIT_AND_INSERT_VSBB_;
break;
}
step_ = INSERT_SIDETREE_EXECUTE_;
}
break;
case INSERT_SIDETREE_EXECUTE_:
{
cliRC = cliInterface()->executeImmediateExec(
ctaTdb().siQuery_,
NULL, NULL, TRUE,
&rowsAffected);
if (cliRC < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
step_ = HANDLE_ERROR_;
}
else
{
masterGlob->setRowsAffected(rowsAffected);
step_ = ALTER_TO_AUDIT_;
}
redriveCTAS = currContext->getSessionDefaults()->getRedriveCTAS();
if (redriveCTAS)
{
str_sprintf(outputBuf_, "childQidEnd: %s ", childQueryId_);
moveRowToUpQueue(outputBuf_);
return WORK_RESCHEDULE_AND_RETURN;
}
}
break;
case ALTER_TO_AUDIT_:
case ALTER_TO_AUDIT_AND_INSERT_VSBB_:
{
if (step_ == ALTER_TO_AUDIT_AND_INSERT_VSBB_)
step_ = INSERT_VSBB_;
else
step_ = UPD_STATS_;
}
break;
case INSERT_VSBB_:
{
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
ComDiagsArea *diagsArea = up_entry->getDiagsArea();
// issue the insert command
Int64 rowsAffected = 0;
Int32 savedIsoMapping =
currContext->getSessionDefaults()->getIsoMappingEnum();
cliInterface()->setIsoMapping
(currContext->getSessionDefaults()->getIsoMappingEnum());
NABoolean redriveCTAS = currContext->getSessionDefaults()->getRedriveCTAS();
if (redriveCTAS)
{
if (childQueryId_ == NULL)
childQueryId_ = new (getHeap()) char[ComSqlId::MAX_QUERY_ID_LEN+1];
cliRC = cliInterface()->executeImmediatePrepare2(
ctaTdb().viQuery_,
childQueryId_,
&childQueryIdLen_,
&childQueryCostInfo_,
&childQueryCompStatsInfo_,
NULL, NULL,
&rowsAffected,TRUE);
cliInterface()->setIsoMapping(savedIsoMapping);
if (cliRC >= 0)
{
childQueryId_[childQueryIdLen_] = '\0';
Statement *ctasStmt = masterGlob->getStatement();
cliRC = ctasStmt->setChildQueryInfo(diagsArea,
childQueryId_, childQueryIdLen_,
&childQueryCostInfo_, &childQueryCompStatsInfo_);
if (cliRC < 0)
{
step_ = HANDLE_ERROR_;
break;
}
if (outputBuf_ == NULL)
outputBuf_ = new (getHeap()) char[ComSqlId::MAX_QUERY_ID_LEN+100]; //
str_sprintf(outputBuf_, "childQidBegin: %s ", childQueryId_);
moveRowToUpQueue(outputBuf_);
step_ = INSERT_VSBB_EXECUTE_;
return WORK_RESCHEDULE_AND_RETURN;
}
else
{
step_ = HANDLE_ERROR_;
break;
}
}
else
{
cliRC = cliInterface()->executeImmediate(
ctaTdb().viQuery_,
NULL, NULL, TRUE,
&rowsAffected,TRUE);
cliInterface()->setIsoMapping(savedIsoMapping);
if (cliRC < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
step_ = HANDLE_ERROR_;
}
else
{
masterGlob->setRowsAffected(rowsAffected);
step_ = UPD_STATS_;
}
}
}
break;
case INSERT_VSBB_EXECUTE_:
{
cliRC = cliInterface()->executeImmediateExec(
ctaTdb().viQuery_,
NULL, NULL, TRUE,
&rowsAffected);
if (cliRC < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
step_ = HANDLE_ERROR_;
}
else
{
str_sprintf(outputBuf_, "childQidEnd: %s ", childQueryId_);
moveRowToUpQueue(outputBuf_);
masterGlob->setRowsAffected(rowsAffected);
step_ = UPD_STATS_;
return WORK_RESCHEDULE_AND_RETURN;
}
}
break;
case UPD_STATS_:
{
if ((ctaTdb().threshold_ == -1) ||
((ctaTdb().threshold_ > 0) &&
(masterGlob->getRowsAffected() < ctaTdb().threshold_)))
{
step_ = DONE_;
break;
}
// issue the upd stats command
char * usQuery =
new(getHeap()) char[strlen(ctaTdb().usQuery_) + 10 + 1];
str_sprintf(usQuery, ctaTdb().usQuery_,
masterGlob->getRowsAffected());
cliRC = cliInterface()->executeImmediate(usQuery,NULL,NULL,TRUE,NULL,TRUE);
NADELETEBASIC(usQuery, getHeap());
if (cliRC < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
step_ = HANDLE_ERROR_;
break;
}
step_ = DONE_;
}
break;
case HANDLE_ERROR_:
{
if ((ctaTdb().ctQuery_) &&
(ctaTdb().loadIfExists()))
{
// error case and 'load if exists' specified.
// Do not drop the table, only delete data from it.
step_ = DELETE_DATA_AND_ERROR_;
}
else
step_ = DROP_AND_ERROR_;
}
break;
case DROP_AND_ERROR_:
{
if ((ctaTdb().ctQuery_) &&
(NOT tableExists_))
{
// this is an error case, drop the table
char * dtQuery =
new(getMyHeap()) char[strlen("DROP TABLE CASCADE; ") +
strlen(ctaTdb().getTableName()) +
100];
strcpy(dtQuery, "DROP TABLE ");
strcat(dtQuery, ctaTdb().getTableName());
strcat(dtQuery, " CASCADE;");
cliRC = cliInterface()->executeImmediate(dtQuery);
// Delete new'd characters
NADELETEBASIC(dtQuery, getHeap());
dtQuery = NULL;
}
else if (doSidetreeInsert_)
{
cliRC = changeAuditAttribute(ctaTdb().getTableName(),
TRUE, ctaTdb().isVolatile());
}
if (step_ == DROP_AND_ERROR_)
step_ = ERROR_;
else
step_ = DONE_;
}
break;
case DONE_:
{
if (qparent_.up->isFull())
return WORK_OK;
// Return EOF.
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(0);
up_entry->upState.status = ex_queue::Q_NO_DATA;
// insert into parent
qparent_.up->insert();
step_ = INITIAL_;
qparent_.down->removeHead();
return WORK_OK;
}
break;
case ERROR_:
{
if (qparent_.up->isFull())
return WORK_OK;
// Return EOF.
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(0);
up_entry->upState.status = ex_queue::Q_SQLERROR;
ComDiagsArea *diagsArea = up_entry->getDiagsArea();
if (diagsArea == NULL)
diagsArea =
ComDiagsArea::allocate(this->getGlobals()->getDefaultHeap());
else
diagsArea->incrRefCount (); // setDiagsArea call below will decr ref count
if (getDiagsArea())
diagsArea->mergeAfter(*getDiagsArea());
up_entry->setDiagsArea (diagsArea);
// insert into parent
qparent_.up->insert();
pstate.matches_ = 0;
step_ = DONE_;
}
break;
} // switch
} // while
return WORK_OK;
}
////////////////////////////////////////////////////////////////////////
// Redefine virtual method allocatePstates, to be used by dynamic queue
// resizing, as well as the initial queue construction.
////////////////////////////////////////////////////////////////////////
ex_tcb_private_state * ExExeUtilCreateTableAsTcb::allocatePstates(
Lng32 &numElems, // inout, desired/actual elements
Lng32 &pstateLength) // out, length of one element
{
PstateAllocator<ExExeUtilCreateTableAsPrivateState> pa;
return pa.allocatePstates(this, numElems, pstateLength);
}
/////////////////////////////////////////////////////////////////////////////
// Constructor and destructor for ExeUtil_private_state
/////////////////////////////////////////////////////////////////////////////
ExExeUtilCreateTableAsPrivateState::ExExeUtilCreateTableAsPrivateState()
{
}
ExExeUtilCreateTableAsPrivateState::~ExExeUtilCreateTableAsPrivateState()
{
};
#define SETSTEP(s) setStep(s, __LINE__)
static THREAD_P bool sv_checked_yet = false;
static THREAD_P bool sv_logStep = false;
////////////////////////////////////////////////////////////////
// Methods for classes ExExeUtilAqrWnrInsertTdb and
// ExExeUtilAqrWnrInsertTcb
///////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////
ex_tcb * ExExeUtilAqrWnrInsertTdb::build(ex_globals * glob)
{
// build the child first
ex_tcb * childTcb = child_->build(glob);
ExExeUtilAqrWnrInsertTcb *exe_util_tcb;
exe_util_tcb =
new(glob->getSpace()) ExExeUtilAqrWnrInsertTcb(*this, childTcb, glob);
exe_util_tcb->registerSubtasks();
return (exe_util_tcb);
}
ExExeUtilAqrWnrInsertTcb::ExExeUtilAqrWnrInsertTcb(
const ComTdbExeUtilAqrWnrInsert & exe_util_tdb,
const ex_tcb * child_tcb,
ex_globals * glob)
: ExExeUtilTcb( exe_util_tdb, child_tcb, glob)
, step_(INITIAL_)
, targetWasEmpty_(false)
{
}
ExExeUtilAqrWnrInsertTcb::~ExExeUtilAqrWnrInsertTcb()
{
// mjh - tbd :
// is base class dtor called?
}
Int32 ExExeUtilAqrWnrInsertTcb::fixup()
{
return ex_tcb::fixup();
}
void ExExeUtilAqrWnrInsertTcb::setStep(Step newStep, int lineNum)
{
static bool sv_checked_yet = false;
static bool sv_logStep = false;
step_ = newStep;
if (!sv_checked_yet)
{
sv_checked_yet = true;
char *logST = getenv("LOG_AQR_WNR_INSERT");
if (logST && *logST == '1')
sv_logStep = true;
}
if (!sv_logStep)
return;
if (NULL ==
getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals())
return;
char *stepStr = (char *) "UNKNOWN";
switch (step_)
{
case INITIAL_:
stepStr = (char *) "INITIAL_";
break;
case LOCK_TARGET_:
stepStr = (char *) "LOCK_TARGET_";
break;
case IS_TARGET_EMPTY_:
stepStr = (char *) "IS_TARGET_EMPTY_";
break;
case SEND_REQ_TO_CHILD_:
stepStr = (char *) "SEND_REQ_TO_CHILD_";
break;
case GET_REPLY_FROM_CHILD_:
stepStr = (char *) "GET_REPLY_FROM_CHILD_";
break;
case CLEANUP_CHILD_:
stepStr = (char *) "CLEANUP_CHILD_";
break;
case CLEANUP_TARGET_:
stepStr = (char *) "CLEANUP_TARGET_";
break;
case ERROR_:
stepStr = (char *) "ERROR_";
break;
case DONE_:
stepStr = (char *) "DONE_";
break;
}
cout << stepStr << ", line " << lineNum << endl;
}
// Temporary.
#define VERIFY_CLI_UTIL 1
ExWorkProcRetcode ExExeUtilAqrWnrInsertTcb::work()
{
ExWorkProcRetcode rc = WORK_OK;
Lng32 cliRC = 0;
// Get the globals stucture of the master executor.
ExExeStmtGlobals *exeGlob = getGlobals()->castToExExeStmtGlobals();
ExMasterStmtGlobals *masterGlob = exeGlob->castToExMasterStmtGlobals();
ContextCli *currContext = masterGlob->getCliGlobals()->currContext();
while (! (qparent_.down->isEmpty() || qparent_.up->isFull()) )
{
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ex_queue_entry * pentry_up = qparent_.up->getTailEntry();
switch (step_)
{
case INITIAL_:
{
targetWasEmpty_ = false;
masterGlob->resetAqrWnrInsertCleanedup();
if (getDiagsArea())
getDiagsArea()->clear();
if (ulTdb().doLockTarget())
SETSTEP(LOCK_TARGET_);
else
SETSTEP(IS_TARGET_EMPTY_);
break;
}
case LOCK_TARGET_:
{
SQL_EXEC_ClearDiagnostics(NULL);
query_ = new(getGlobals()->getDefaultHeap()) char[1000];
str_sprintf(query_, "lock table %s in share mode;",
ulTdb().getTableName());
Lng32 len = 0;
char dummyArg[128];
#ifdef VERIFY_CLI_UTIL
for (size_t i = 0; i < sizeof(dummyArg); i++)
dummyArg[i] = i;
#endif
cliRC = cliInterface()->executeImmediate(
query_, dummyArg, &len, FALSE);
#ifdef VERIFY_CLI_UTIL
ex_assert (len == 0, "lock table returned data");
for (size_t i = 0; i < sizeof(dummyArg); i++)
ex_assert( dummyArg[i] == i, "lock table returned data");
#endif
NADELETEBASIC(query_, getMyHeap());
if (cliRC < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
SETSTEP(ERROR_);
}
else
SETSTEP(IS_TARGET_EMPTY_);
// Allow the query to be canceled.
return WORK_CALL_AGAIN;
}
case IS_TARGET_EMPTY_:
{
SQL_EXEC_ClearDiagnostics(NULL);
query_ = new(getGlobals()->getDefaultHeap()) char[1000];
str_sprintf(query_, "select row count from %s;",
ulTdb().getTableName());
Lng32 len = 0;
Int64 rowCount = 0;
cliRC = cliInterface()->executeImmediate(query_,
(char*)&rowCount,
&len, FALSE);
NADELETEBASIC(query_, getMyHeap());
if (cliRC < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
SETSTEP(ERROR_);
}
else
{
targetWasEmpty_ = (rowCount == 0);
SETSTEP(SEND_REQ_TO_CHILD_);
}
// Allow the query to be canceled.
return WORK_CALL_AGAIN;
}
case SEND_REQ_TO_CHILD_:
{
if (qchild_.down->isFull())
return WORK_OK;
ex_queue_entry * centry = qchild_.down->getTailEntry();
centry->downState.request = ex_queue::GET_ALL;
centry->downState.requestValue =
pentry_down->downState.requestValue;
centry->downState.parentIndex = qparent_.down->getHeadIndex();
// set the child's input atp
centry->passAtp(pentry_down->getAtp());
qchild_.down->insert();
SETSTEP(GET_REPLY_FROM_CHILD_);
}
break;
case GET_REPLY_FROM_CHILD_:
{
// if nothing returned from child. Get outta here.
if (qchild_.up->isEmpty())
return WORK_OK;
ex_queue_entry * centry = qchild_.up->getHeadEntry();
// DA from child, if any, is copied to parent up entry.
pentry_up->copyAtp(centry);
switch(centry->upState.status)
{
case ex_queue::Q_NO_DATA:
{
SETSTEP(DONE_);
break;
}
case ex_queue::Q_SQLERROR:
{
SETSTEP(CLEANUP_CHILD_);
break;
}
default:
{
ex_assert(0, "Invalid child_status");
break;
}
}
qchild_.up->removeHead();
break;
}
case CLEANUP_CHILD_:
{
bool lookingForQnoData = true;
do
{
if (qchild_.up->isEmpty())
return WORK_OK;
ex_queue_entry * centry = qchild_.up->getHeadEntry();
if (centry->upState.status == ex_queue::Q_NO_DATA)
{
lookingForQnoData = false;
bool cleanupTarget = false;
if (targetWasEmpty_ &&
(pentry_down->downState.request != ex_queue::GET_NOMORE))
{
// Find out if any messages were sent to insert TSE sessions,
// because we'd like to skip CLEANUP_TARGET_ if not.
const ExStatisticsArea *constStatsArea = NULL;
Lng32 cliRc = SQL_EXEC_GetStatisticsArea_Internal(
SQLCLI_STATS_REQ_QID,
masterGlob->getStatement()->getUniqueStmtId(),
masterGlob->getStatement()->getUniqueStmtIdLen(),
-1, SQLCLI_SAME_STATS,
constStatsArea);
ExStatisticsArea * statsArea =
(ExStatisticsArea *) constStatsArea;
if (cliRc < 0 || !statsArea)
{
// Error or some problem getting stats.
cleanupTarget = true;
}
else if (!statsArea->getMasterStats() ||
statsArea->getMasterStats()->getStatsErrorCode() != 0)
{
// Partial success getting stats. Can't trust results.
cleanupTarget = true;
}
else if (statsArea->anyHaveSentMsgIUD())
{
// Stats shows that IUD started. Must cleanup.
cleanupTarget = true;
}
}
if (cleanupTarget)
SETSTEP(CLEANUP_TARGET_);
else
SETSTEP(ERROR_);
}
qchild_.up->removeHead();
} while (lookingForQnoData);
break;
}
case CLEANUP_TARGET_:
{
SQL_EXEC_ClearDiagnostics(NULL);
query_ = new(getGlobals()->getDefaultHeap()) char[1000];
str_sprintf(query_, "delete with no rollback from %s;", ulTdb().getTableName());
Lng32 len = 0;
char dummyArg[128];
cliRC = cliInterface()->executeImmediate(
query_, dummyArg, &len, FALSE);
NADELETEBASIC(query_, getMyHeap());
if (cliRC < 0)
{
// mjh - tbd - warning or EMS message to give context to error on
// delete after error on the insert?
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
}
else
masterGlob->setAqrWnrInsertCleanedup();
SETSTEP(ERROR_);
break;
}
case ERROR_:
{
if (pentry_down->downState.request != ex_queue::GET_NOMORE)
{
if (getDiagsArea())
{
// Any error from child already put a DA into pentry_up.
if (NULL == pentry_up->getDiagsArea())
{
ComDiagsArea * da = ComDiagsArea::allocate(
getGlobals()->getDefaultHeap());
pentry_up->setDiagsArea(da);
}
pentry_up->getDiagsArea()->mergeAfter(*getDiagsArea());
getDiagsArea()->clear();
SQL_EXEC_ClearDiagnostics(NULL);
}
pentry_up->upState.status = ex_queue::Q_SQLERROR;
pentry_up->upState.parentIndex =
pentry_down->downState.parentIndex;
pentry_up->upState.downIndex = qparent_.down->getHeadIndex();
qparent_.up->insert();
}
SETSTEP(DONE_);
break;
}
case DONE_:
{
// Return EOF.
pentry_up->upState.parentIndex = pentry_down->downState.parentIndex;
pentry_up->upState.setMatchNo(0);
pentry_up->upState.status = ex_queue::Q_NO_DATA;
// insert into parent
qparent_.up->insert();
SETSTEP(INITIAL_);
qparent_.down->removeHead();
break;
}
} // switch
} // while
return WORK_OK;
}
ExWorkProcRetcode ExExeUtilAqrWnrInsertTcb::workCancel()
{
if (!(qparent_.down->isEmpty()) &&
(ex_queue::GET_NOMORE ==
qparent_.down->getHeadEntry()->downState.request))
{
switch (step_)
{
case INITIAL_:
SETSTEP(DONE_);
break;
case LOCK_TARGET_:
ex_assert (0,
"work method doesn't return with step_ set to LOCK_TARGET_.");
break;
case IS_TARGET_EMPTY_:
ex_assert (0,
"work method doesn't return with step_ set "
"to IS_TARGET_EMPTY_.");
break;
case SEND_REQ_TO_CHILD_:
SETSTEP(DONE_);
break;
case GET_REPLY_FROM_CHILD_:
// Assuming that the entire query is canceled. Canceled queries
// are not AQR'd. Will cleanup without purging target.
qchild_.down->cancelRequest();
SETSTEP(CLEANUP_CHILD_);
break;
case CLEANUP_CHILD_:
// CLEANUP_CHILD_ does the right thing when canceling.
break;
case ERROR_:
// ERROR_ does the right thing when canceling.
break;
case CLEANUP_TARGET_:
ex_assert (0,
"work method doesn't return with step_ set to CLEANUP_TARGET_,.");
break;
case DONE_:
ex_assert (0,
"work method doesn't return with step_ set to DONE_.");
break;
}
}
return WORK_OK;
}
#define changeStep(x) changeAndTraceStep(x, __LINE__)
////////////////////////////////////////////////////////////////
// build for class ExExeUtilHbaseLoadTdb
///////////////////////////////////////////////////////////////
ex_tcb * ExExeUtilHBaseBulkLoadTdb::build(ex_globals * glob)
{
ExExeUtilHBaseBulkLoadTcb * exe_util_tcb;
exe_util_tcb = new(glob->getSpace()) ExExeUtilHBaseBulkLoadTcb(*this, glob);
exe_util_tcb->registerSubtasks();
return (exe_util_tcb);
}
////////////////////////////////////////////////////////////////
// Constructor for class ExExeUtilHbaseLoadTcb
///////////////////////////////////////////////////////////////
ExExeUtilHBaseBulkLoadTcb::ExExeUtilHBaseBulkLoadTcb(
const ComTdbExeUtil & exe_util_tdb,
ex_globals * glob)
: ExExeUtilTcb( exe_util_tdb, NULL, glob),
step_(INITIAL_),
nextStep_(INITIAL_),
rowsAffected_(0),
loggingLocation_(NULL)
{
ehi_ = NULL;
qparent_.down->allocatePstate(this);
}
ExExeUtilHBaseBulkLoadTcb::~ExExeUtilHBaseBulkLoadTcb()
{
if (loggingLocation_ != NULL) {
NADELETEBASIC(loggingLocation_, getHeap());
loggingLocation_ = NULL;
}
}
//////////////////////////////////////////////////////
// work() for ExExeUtilHbaseLoadTcb
//////////////////////////////////////////////////////
short ExExeUtilHBaseBulkLoadTcb::work()
{
Lng32 cliRC = 0;
short retcode = 0;
short rc;
Lng32 errorRowCount = 0;
int len;
ComDiagsArea *diagsArea;
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
// if no room in up queue, won't be able to return data/status.
// Come back later.
if (qparent_.up->isFull())
return WORK_OK;
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExExeUtilPrivateState & pstate = *((ExExeUtilPrivateState*) pentry_down->pstate);
ContextCli *currContext =
getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals()->
getStatement()->getContext();
ExTransaction *ta = currContext->getTransaction();
NABoolean ustatNonEmptyTable = FALSE;
while (1)
{
switch (step_)
{
case INITIAL_:
{
NABoolean xnAlreadyStarted = ta->xnInProgress();
if (xnAlreadyStarted &&
//a transaction is active when we load/populate an indexe table
!hblTdb().getIndexTableOnly())
{
//8111 - Transactions are not allowed with Bulk load.
ComDiagsArea * da = getDiagsArea();
*da << DgSqlCode(-8111);
step_ = LOAD_ERROR_;
break;
}
if (setStartStatusMsgAndMoveToUpQueue(" LOAD", &rc))
return rc;
if (hblTdb().getUpsertUsingLoad())
hblTdb().setPreloadCleanup(FALSE);
if (hblTdb().getTruncateTable())
{
step_ = TRUNCATE_TABLE_;
break;
}
// Table will not be truncated, so make sure it is empty if Update
// Stats has been requested. We obviously have to do this check before
// the load, but if the table is determined to be non-empty, the
// message is deferred until the UPDATE_STATS_ step.
if (hblTdb().getUpdateStats())
{
NAString selectFirstQuery = "select [first 1] 0 from ";
selectFirstQuery.append(hblTdb().getTableName()).append(";");
cliRC = cliInterface()->executeImmediate(selectFirstQuery.data());
if (cliRC < 0)
{
step_ = LOAD_END_ERROR_;
break;
}
else if (cliRC != 100)
ustatNonEmptyTable = TRUE; // So we can display msg later
}
step_ = LOAD_START_;
}
break;
case TRUNCATE_TABLE_:
{
if (setStartStatusMsgAndMoveToUpQueue(" PURGE DATA",&rc, 0, TRUE))
return rc;
// Set the parserflag to prevent privilege checks in purgedata
ExExeStmtGlobals *exeGlob = getGlobals()->castToExExeStmtGlobals();
ExMasterStmtGlobals *masterGlob = exeGlob->castToExMasterStmtGlobals();
NABoolean parserFlagSet = FALSE;
if ((masterGlob->getStatement()->getContext()->getSqlParserFlags() & 0x20000) == 0)
{
parserFlagSet = TRUE;
masterGlob->getStatement()->getContext()->setSqlParserFlags(0x20000);
}
//for now the purgedata statement does not keep the partitions
char * ttQuery =
new(getMyHeap()) char[strlen("PURGEDATA ; ") +
strlen(hblTdb().getTableName()) +
100];
strcpy(ttQuery, "PURGEDATA ");
strcat(ttQuery, hblTdb().getTableName());
strcat(ttQuery, ";");
Lng32 len = 0;
Int64 rowCount = 0;
cliRC = cliInterface()->executeImmediate(ttQuery, NULL,NULL,TRUE,NULL,TRUE);
NADELETEBASIC(ttQuery, getHeap());
ttQuery = NULL;
if (parserFlagSet)
masterGlob->getStatement()->getContext()->resetSqlParserFlags(0x20000);
if (cliRC < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
step_ = LOAD_ERROR_;
break;
}
step_ = LOAD_START_;
setEndStatusMsg(" PURGE DATA", 0, TRUE);
}
break;
case LOAD_START_:
{
if (setCQDs() <0)
{
step_ = LOAD_END_ERROR_;
break;
}
int jniDebugPort = 0;
int jniDebugTimeout = 0;
ehi_ = ExpHbaseInterface::newInstance(getGlobals()->getDefaultHeap(),
(char*)"", //Later may need to change to hblTdb.server_,
(char*)""); //Later may need to change to hblTdb.zkPort_);
retcode = ehi_->initHBLC();
if (retcode == 0)
retcode = ehi_->createCounterTable(hblTdb().getErrCountTable(), (char *)"ERRORS");
if (retcode != 0 )
{
Lng32 cliError = 0;
Lng32 intParam1 = -retcode;
diagsArea = NULL;
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(8448), NULL, &intParam1,
&cliError, NULL,
" ",
getHbaseErrStr(retcode),
(char *)GetCliGlobals()->getJniErrorStr());
step_ = LOAD_END_ERROR_;
break;
}
if (hblTdb().getPreloadCleanup())
step_ = PRE_LOAD_CLEANUP_;
else
{
step_ = PREPARATION_;
if (hblTdb().getRebuildIndexes() || hblTdb().getHasUniqueIndexes())
step_ = DISABLE_INDEXES_;
}
}
break;
case PRE_LOAD_CLEANUP_:
{
if (setStartStatusMsgAndMoveToUpQueue(" CLEANUP", &rc, 0, TRUE))
return rc;
//Cleanup files
char * clnpQuery =
new(getMyHeap()) char[strlen("LOAD CLEANUP FOR TABLE ; ") +
strlen(hblTdb().getTableName()) +
100];
strcpy(clnpQuery, "LOAD CLEANUP FOR TABLE ");
if (hblTdb().getIndexTableOnly())
strcat(clnpQuery, "TABLE(INDEX_TABLE ");
strcat(clnpQuery, hblTdb().getTableName());
if (hblTdb().getIndexTableOnly())
strcat(clnpQuery, ")");
strcat(clnpQuery, ";");
cliRC = cliInterface()->executeImmediate(clnpQuery, NULL,NULL,TRUE,NULL,TRUE);
NADELETEBASIC(clnpQuery, getHeap());
clnpQuery = NULL;
if (cliRC < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
step_ = LOAD_END_ERROR_;
break;
}
step_ = PREPARATION_;
if (hblTdb().getRebuildIndexes() || hblTdb().getHasUniqueIndexes())
step_ = DISABLE_INDEXES_;
setEndStatusMsg(" CLEANUP", 0, TRUE);
}
break;
case DISABLE_INDEXES_:
{
if (setStartStatusMsgAndMoveToUpQueue(" DISABLE INDEXES", &rc, 0, TRUE))
return rc;
// disable indexes before starting the load preparation. load preparation phase will
// give an error if indexes are not disabled
// For constarints --disabling/enabling constarints is not supported yet. in this case the user
// needs to disable or drop the constraints manually before starting load. If constarints
// exist load preparation will give an error
char * diQuery =
new(getMyHeap()) char[strlen("ALTER TABLE DISABLE ALL INDEXES ; ") +
strlen(hblTdb().getTableName()) +
100];
strcpy(diQuery, "ALTER TABLE ");
strcat(diQuery, hblTdb().getTableName());
if (hblTdb().getRebuildIndexes())
strcat(diQuery, " DISABLE ALL INDEXES ;");
else
strcat(diQuery, " DISABLE ALL UNIQUE INDEXES ;"); // has unique indexes and rebuild not specified
cliRC = cliInterface()->executeImmediate(diQuery, NULL,NULL,TRUE,NULL,TRUE);
NADELETEBASIC(diQuery, getMyHeap());
diQuery = NULL;
if (cliRC < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
step_ = LOAD_END_ERROR_;
break;
}
step_ = PREPARATION_;
setEndStatusMsg(" DISABLE INDEXES", 0, TRUE);
}
break;
case PREPARATION_:
{
short bufPos = 0;
if (!hblTdb().getUpsertUsingLoad())
{
setLoggingLocation();
bufPos = printLoggingLocation(0);
if (setStartStatusMsgAndMoveToUpQueue(" LOADING DATA", &rc, bufPos, TRUE))
return rc;
else {
step_ = LOADING_DATA_;
return WORK_CALL_AGAIN;
}
}
else
step_ = LOADING_DATA_;
}
break;
case LOADING_DATA_:
{
if (!hblTdb().getUpsertUsingLoad())
{
if (hblTdb().getNoDuplicates())
cliRC = holdAndSetCQD("TRAF_LOAD_PREP_SKIP_DUPLICATES", "OFF");
else
cliRC = holdAndSetCQD("TRAF_LOAD_PREP_SKIP_DUPLICATES", "ON");
if (cliRC < 0)
{
step_ = LOAD_END_ERROR_;
break;
}
if (loggingLocation_ != NULL)
cliRC = holdAndSetCQD("TRAF_LOAD_ERROR_LOGGING_LOCATION", loggingLocation_);
if (cliRC < 0)
{
step_ = LOAD_END_ERROR_;
break;
}
rowsAffected_ = 0;
char *loadQuery = hblTdb().ldQuery_;
if (ustatNonEmptyTable)
{
// If the ustat option was specified, but the table to be loaded
// is not empty, we have to retract the WITH SAMPLE option that
// was added to the LOAD TRANSFORM statement when the original
// bulk load statement was parsed.
const char* sampleOpt = " WITH SAMPLE ";
char* sampleOptPtr = strstr(loadQuery, sampleOpt);
if (sampleOptPtr)
memset(sampleOptPtr, ' ', strlen(sampleOpt));
}
//printf("*** Load stmt is %s\n",loadQuery);
// If the WITH SAMPLE clause is included, set the internal exe util
// parser flag to allow it.
ExExeStmtGlobals *exeGlob = getGlobals()->castToExExeStmtGlobals();
ExMasterStmtGlobals *masterGlob = exeGlob->castToExMasterStmtGlobals();
NABoolean parserFlagSet = FALSE;
if (hblTdb().getUpdateStats() && !ustatNonEmptyTable)
{
if ((masterGlob->getStatement()->getContext()->getSqlParserFlags() & 0x20000) == 0)
{
parserFlagSet = TRUE;
masterGlob->getStatement()->getContext()->setSqlParserFlags(0x20000);
}
}
diagsArea = getDiagsArea();
cliRC = cliInterface()->executeImmediate(loadQuery,
NULL,
NULL,
TRUE,
&rowsAffected_,
FALSE,
diagsArea);
if (parserFlagSet)
masterGlob->getStatement()->getContext()->resetSqlParserFlags(0x20000);
if (cliRC < 0)
{
rowsAffected_ = 0;
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
step_ = LOAD_END_ERROR_;
break;
}
else {
step_ = COMPLETE_BULK_LOAD_;
ComCondition *cond;
Lng32 entryNumber;
while ((cond = diagsArea->findCondition(EXE_ERROR_ROWS_FOUND, &entryNumber)) != NULL) {
if (errorRowCount < cond->getOptionalInteger(0))
errorRowCount = cond->getOptionalInteger(0);
diagsArea->deleteWarning(entryNumber);
}
diagsArea->setRowCount(0);
}
if (rowsAffected_ == 0)
step_ = LOAD_END_;
sprintf(statusMsgBuf_, " Rows Processed: %ld %c",rowsAffected_+errorRowCount, '\n' );
int len = strlen(statusMsgBuf_);
sprintf(&statusMsgBuf_[len]," Error Rows: %d %c",errorRowCount, '\n' );
len = strlen(statusMsgBuf_);
setEndStatusMsg(" LOADING DATA", len, TRUE);
}
else
{
if (setStartStatusMsgAndMoveToUpQueue(" UPSERT USING LOAD ", &rc, 0, TRUE))
return rc;
rowsAffected_ = 0;
char * upsQuery = hblTdb().ldQuery_;
cliRC = cliInterface()->executeImmediate(upsQuery, NULL, NULL, TRUE, &rowsAffected_);
upsQuery = NULL;
if (cliRC < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
step_ = LOAD_ERROR_;
break;
}
step_ = LOAD_END_;
if (hblTdb().getRebuildIndexes() || hblTdb().getHasUniqueIndexes())
step_ = POPULATE_INDEXES_;
sprintf(statusMsgBuf_," Rows Processed: %ld %c",rowsAffected_, '\n' );
int len = strlen(statusMsgBuf_);
setEndStatusMsg(" UPSERT USING LOAD ", len, TRUE);
}
}
break;
case COMPLETE_BULK_LOAD_:
{
if (setStartStatusMsgAndMoveToUpQueue(" COMPLETION", &rc,0, TRUE))
return rc;
//TRAF_LOAD_TAKE_SNAPSHOT
if (hblTdb().getNoRollback() )
cliRC = holdAndSetCQD("TRAF_LOAD_TAKE_SNAPSHOT", "OFF");
else
cliRC = holdAndSetCQD("TRAF_LOAD_TAKE_SNAPSHOT", "ON");
if (cliRC < 0)
{
step_ = LOAD_END_ERROR_;
break;
}
//this case is mainly for debugging
if (hblTdb().getKeepHFiles() &&
!hblTdb().getSecure() )
{
if (holdAndSetCQD("COMPLETE_BULK_LOAD_N_KEEP_HFILES", "ON") < 0)
{
step_ = LOAD_END_ERROR_;
break;
}
}
//complete load query
char * clQuery =
new(getMyHeap()) char[strlen("LOAD COMPLETE FOR TABLE ; ") +
strlen(hblTdb().getTableName()) +
100];
strcpy(clQuery, "LOAD COMPLETE FOR TABLE ");
if (hblTdb().getIndexTableOnly())
strcat(clQuery, "TABLE(INDEX_TABLE ");
strcat(clQuery, hblTdb().getTableName());
if (hblTdb().getIndexTableOnly())
strcat(clQuery, ")");
strcat(clQuery, ";");
cliRC = cliInterface()->executeImmediate(clQuery, NULL,NULL,TRUE,NULL,TRUE, getDiagsArea());
NADELETEBASIC(clQuery, getMyHeap());
clQuery = NULL;
if (cliRC < 0)
rowsAffected_ = 0;
sprintf(statusMsgBuf_, " Rows Loaded: %ld %c",rowsAffected_, '\n' );
len = strlen(statusMsgBuf_);
if (cliRC < 0)
{
rowsAffected_ = 0;
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
setEndStatusMsg(" COMPLETION", len, TRUE);
step_ = LOAD_END_ERROR_;
break;
}
cliRC = restoreCQD("TRAF_LOAD_TAKE_SNAPSHOT");
if (hblTdb().getRebuildIndexes() || hblTdb().getHasUniqueIndexes())
step_ = POPULATE_INDEXES_;
else if (hblTdb().getUpdateStats())
step_ = UPDATE_STATS_;
else
step_ = LOAD_END_;
setEndStatusMsg(" COMPLETION", len, TRUE);
}
break;
case POPULATE_INDEXES_:
{
if (setStartStatusMsgAndMoveToUpQueue(" POPULATE INDEXES", &rc, 0, TRUE))
return rc;
else {
step_ = POPULATE_INDEXES_EXECUTE_;
return WORK_CALL_AGAIN;
}
}
break;
case POPULATE_INDEXES_EXECUTE_:
{
char * piQuery =
new(getMyHeap()) char[strlen("POPULATE ALL INDEXES ON ; ") +
strlen(hblTdb().getTableName()) +
100];
if (hblTdb().getRebuildIndexes())
strcpy(piQuery, "POPULATE ALL INDEXES ON ");
else
strcpy(piQuery, "POPULATE ALL UNIQUE INDEXES ON "); // has unique indexes and rebuild not used
strcat(piQuery, hblTdb().getTableName());
strcat(piQuery, ";");
cliRC = cliInterface()->executeImmediate(piQuery, NULL,NULL,TRUE,NULL,TRUE);
NADELETEBASIC(piQuery, getHeap());
piQuery = NULL;
if (cliRC < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
step_ = LOAD_END_ERROR_;
break;
}
if (hblTdb().getUpdateStats())
step_ = UPDATE_STATS_;
else
step_ = LOAD_END_;
setEndStatusMsg(" POPULATE INDEXES", 0, TRUE);
}
break;
case UPDATE_STATS_:
{
if (setStartStatusMsgAndMoveToUpQueue(" UPDATE STATISTICS", &rc, 0, TRUE))
return rc;
else {
step_ = UPDATE_STATS_EXECUTE_;
return WORK_CALL_AGAIN;
}
}
break;
case UPDATE_STATS_EXECUTE_:
{
if (ustatNonEmptyTable)
{
// Table was not empty prior to the load.
step_ = LOAD_END_;
sprintf(statusMsgBuf_,
" UPDATE STATISTICS not executed: table %s not empty before load. %c",
hblTdb().getTableName(), '\n' );
int len = strlen(statusMsgBuf_);
setEndStatusMsg(" UPDATE STATS", len, TRUE);
break;
}
char * ustatStmt =
new(getMyHeap()) char[strlen("UPDATE STATS FOR TABLE ON EVERY COLUMN; ") +
strlen(hblTdb().getTableName()) +
100];
strcpy(ustatStmt, "UPDATE STATISTICS FOR TABLE ");
strcat(ustatStmt, hblTdb().getTableName());
strcat(ustatStmt, " ON EVERY COLUMN;");
cliRC = holdAndSetCQD("USTAT_USE_BACKING_SAMPLE", "ON");
if (cliRC < 0)
{
step_ = LOAD_END_ERROR_;
break;
}
cliRC = cliInterface()->executeImmediate(ustatStmt, NULL, NULL, TRUE, NULL, TRUE);
NADELETEBASIC(ustatStmt, getMyHeap());
ustatStmt = NULL;
if (cliRC < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
step_ = LOAD_END_ERROR_;
}
else
step_ = LOAD_END_;
cliRC = restoreCQD("USTAT_USE_BACKING_SAMPLE");
if (cliRC < 0)
step_ = LOAD_END_ERROR_;
setEndStatusMsg(" UPDATE STATS", 0, TRUE);
}
break;
case RETURN_STATUS_MSG_:
{
if (moveRowToUpQueue(statusMsgBuf_,0,&rc))
return rc;
step_ = nextStep_;
}
break;
case LOAD_END_:
case LOAD_END_ERROR_:
{
if (restoreCQDs() < 0)
{
step_ = LOAD_ERROR_;
break;
}
if (hblTdb().getContinueOnError() && ehi_)
{
ehi_->close();
ehi_ = NULL;
}
if (step_ == LOAD_END_)
step_ = DONE_;
else
step_ = LOAD_ERROR_;
}
break;
case DONE_:
{
if (qparent_.up->isFull())
return WORK_OK;
// Return EOF.
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
up_entry->upState.parentIndex = pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(0);
up_entry->upState.status = ex_queue::Q_NO_DATA;
diagsArea = up_entry->getDiagsArea();
if (diagsArea == NULL)
diagsArea = ComDiagsArea::allocate(getMyHeap());
else
diagsArea->incrRefCount(); // setDiagsArea call below will decr ref count
diagsArea->setRowCount(rowsAffected_);
if (getDiagsArea())
diagsArea->mergeAfter(*getDiagsArea());
up_entry->setDiagsArea(diagsArea);
// insert into parent
qparent_.up->insert();
step_ = INITIAL_;
qparent_.down->removeHead();
return WORK_OK;
}
break;
case LOAD_ERROR_:
{
if (qparent_.up->isFull())
return WORK_OK;
// Return EOF.
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
up_entry->upState.parentIndex = pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(0);
up_entry->upState.status = ex_queue::Q_SQLERROR;
ComDiagsArea *diagsArea = up_entry->getDiagsArea();
if (diagsArea == NULL)
diagsArea = ComDiagsArea::allocate(getMyHeap());
else
diagsArea->incrRefCount(); // setDiagsArea call below will decr ref count
if (getDiagsArea())
{
diagsArea->mergeAfter(*getDiagsArea());
diagsArea->setRowCount(rowsAffected_);
}
up_entry->setDiagsArea(diagsArea);
// insert into parent
qparent_.up->insert();
pstate.matches_ = 0;
step_ = DONE_;
}
break;
} // switch
} // while
return WORK_OK;
}
short ExExeUtilHBaseBulkLoadTcb::setCQDs()
{
if (holdAndSetCQD("COMP_BOOL_226", "ON") < 0) { return -1;}
// next cqd required to allow load into date/timestamp Traf columns from string Hive columns.
// This is a common use case. Cqd can be removed when Traf hive access supports more Hive types.
if (holdAndSetCQD("ALLOW_INCOMPATIBLE_OPERATIONS", "ON") < 0) { return -1;}
if (hblTdb().getForceCIF())
{
if (holdAndSetCQD("COMPRESSED_INTERNAL_FORMAT", "ON") < 0) {return -1; }
if (holdAndSetCQD("COMPRESSED_INTERNAL_FORMAT_BMO", "ON") < 0){ return -1; }
if (holdAndSetCQD("COMPRESSED_INTERNAL_FORMAT_DEFRAG_RATIO", "100") < 0) { return -1;}
}
if (holdAndSetCQD("TRAF_LOAD_LOG_ERROR_ROWS", (hblTdb().getLogErrorRows()) ? "ON" : "OFF") < 0)
{ return -1;}
if (holdAndSetCQD("TRAF_LOAD_CONTINUE_ON_ERROR", (hblTdb().getContinueOnError()) ? "ON" : "OFF") < 0)
{ return -1; }
char strMaxRR[10];
sprintf(strMaxRR,"%d", hblTdb().getMaxErrorRows());
if (holdAndSetCQD("TRAF_LOAD_MAX_ERROR_ROWS", strMaxRR) < 0) { return -1;}
if (hblTdb().getContinueOnError())
{
if (holdAndSetCQD("TRAF_LOAD_ERROR_COUNT_TABLE", hblTdb().getErrCountTable()) < 0)
{ return -1;}
time_t t;
time(&t);
char pt[30];
struct tm * curgmtime = gmtime(&t);
strftime(pt, 30, "%Y%m%d_%H%M%S", curgmtime);
if (holdAndSetCQD("TRAF_LOAD_ERROR_COUNT_ID", pt) < 0) { return -1;}
}
return 0;
}
short ExExeUtilHBaseBulkLoadTcb::restoreCQDs()
{
if (restoreCQD("COMP_BOOL_226") < 0) { return -1;}
if (restoreCQD("TRAF_LOAD_PREP_SKIP_DUPLICATES") < 0) { return -1;}
if (restoreCQD("ALLOW_INCOMPATIBLE_OPERATIONS") < 0) { return -1;}
if (hblTdb().getForceCIF())
{
if (restoreCQD("COMPRESSED_INTERNAL_FORMAT") < 0) { return -1;}
if (restoreCQD("COMPRESSED_INTERNAL_FORMAT_BMO") < 0) { return -1;}
if (restoreCQD("COMPRESSED_INTERNAL_FORMAT_DEFRAG_RATIO") < 0) { return -1;}
}
if (restoreCQD("TRAF_LOAD_LOG_ERROR_ROWS") < 0) { return -1;}
if (restoreCQD("TRAF_LOAD_CONTINUE_ON_ERROR") < 0) { return -1;}
if (restoreCQD("TRAF_LOAD_MAX_ERROR_ROWS") < 0) { return -1;}
if (restoreCQD("TRAF_LOAD_ERROR_COUNT_TABLE") < 0) { return -1;}
if (restoreCQD("TRAF_LOAD_ERROR_COUNT_ID") < 0) { return -1; }
if (restoreCQD("TRAF_LOAD_ERROR_LOGGING_LOCATION") < 0) { return -1; }
return 0;
}
short ExExeUtilHBaseBulkLoadTcb::moveRowToUpQueue(const char * row, Lng32 len,
short * rc, NABoolean isVarchar)
{
if (hblTdb().getNoOutput())
return 0;
return ExExeUtilTcb::moveRowToUpQueue(row, len, rc, isVarchar);
}
short ExExeUtilHBaseBulkLoadTcb::printLoggingLocation(int bufPos)
{
short retBufPos = bufPos;
if (hblTdb().getNoOutput())
return 0;
if (loggingLocation_ != NULL) {
str_sprintf(&statusMsgBuf_[bufPos], " Logging Location: %s", loggingLocation_);
retBufPos = strlen(statusMsgBuf_);
statusMsgBuf_[retBufPos] = '\n';
retBufPos++;
}
return retBufPos;
}
short ExExeUtilHBaseBulkLoadTcb::setStartStatusMsgAndMoveToUpQueue(const char * operation,
short * rc,
int bufPos,
NABoolean withtime)
{
if (hblTdb().getNoOutput())
return 0;
char timeBuf[200];
if (withtime)
{
startTime_ = NA_JulianTimestamp();
getTimestampAsString(startTime_, timeBuf);
}
getStatusString(operation, "Started",hblTdb().getTableName(), &statusMsgBuf_[bufPos], FALSE, (withtime ? timeBuf : NULL));
return moveRowToUpQueue(statusMsgBuf_,0,rc);
}
void ExExeUtilHBaseBulkLoadTcb::setEndStatusMsg(const char * operation,
int bufPos,
NABoolean withtime)
{
if (hblTdb().getNoOutput())
return ;
char timeBuf[200];
nextStep_ = step_;
step_ = RETURN_STATUS_MSG_;
Int64 elapsedTime;
if (withtime)
{
endTime_ = NA_JulianTimestamp();
elapsedTime = endTime_ - startTime_;
getTimestampAsString(endTime_, timeBuf);
getStatusString(operation, "Ended", hblTdb().getTableName(),&statusMsgBuf_[bufPos], FALSE, withtime ? timeBuf : NULL);
bufPos = strlen(statusMsgBuf_);
statusMsgBuf_[bufPos] = '\n';
bufPos++;
getTimeAsString(elapsedTime, timeBuf);
}
getStatusString(operation, "Ended", hblTdb().getTableName(),&statusMsgBuf_[bufPos], TRUE, withtime ? timeBuf : NULL);
}
void ExExeUtilHBaseBulkLoadTcb::setLoggingLocation()
{
char * loggingLocation = hblTdb().getLoggingLocation();
if (loggingLocation_ != NULL) {
NADELETEBASIC(loggingLocation_, getHeap());
loggingLocation_ = NULL;
}
if (loggingLocation != NULL) {
short logLen = strlen(loggingLocation);
char *tableName = hblTdb().getTableName();
short tableNameLen = strlen(tableName);
loggingLocation_ = new (getHeap()) char[logLen+tableNameLen+100];
ExHbaseAccessTcb::buildLoggingPath(loggingLocation, NULL, tableName, loggingLocation_);
}
}
////////////////////////////////////////////////////////////////////////
// Redefine virtual method allocatePstates, to be used by dynamic queue
// resizing, as well as the initial queue construction.
////////////////////////////////////////////////////////////////////////
ex_tcb_private_state * ExExeUtilHBaseBulkLoadTcb::allocatePstates(
Lng32 &numElems, // inout, desired/actual elements
Lng32 &pstateLength) // out, length of one element
{
PstateAllocator<ExExeUtilHbaseLoadPrivateState> pa;
return pa.allocatePstates(this, numElems, pstateLength);
}
/////////////////////////////////////////////////////////////////////////////
// Constructor and destructor for ExeUtil_private_state
/////////////////////////////////////////////////////////////////////////////
ExExeUtilHbaseLoadPrivateState::ExExeUtilHbaseLoadPrivateState()
{
}
ExExeUtilHbaseLoadPrivateState::~ExExeUtilHbaseLoadPrivateState()
{
};
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////
// build for class ExExeUtilHbaseUnLoadTdb
///////////////////////////////////////////////////////////////
ex_tcb * ExExeUtilHBaseBulkUnLoadTdb::build(ex_globals * glob)
{
ExExeUtilHBaseBulkUnLoadTcb * exe_util_tcb;
exe_util_tcb = new(glob->getSpace()) ExExeUtilHBaseBulkUnLoadTcb(*this, glob);
exe_util_tcb->registerSubtasks();
return (exe_util_tcb);
}
void ExExeUtilHBaseBulkUnLoadTcb::createHdfsFileError(Int32 hdfsClientRetCode)
{
ComDiagsArea * diagsArea = NULL;
char* errorMsg = HdfsClient::getErrorText((HDFS_Client_RetCode)hdfsClientRetCode);
ExRaiseSqlError(getHeap(), &diagsArea, (ExeErrorCode)(8447), NULL,
NULL, NULL, NULL, errorMsg, (char *)GetCliGlobals()->getJniErrorStr());
ex_queue_entry *pentry_up = qparent_.up->getTailEntry();
pentry_up->setDiagsArea(diagsArea);
}
////////////////////////////////////////////////////////////////
// Constructor for class ExExeUtilHbaseLoadTcb
///////////////////////////////////////////////////////////////
ExExeUtilHBaseBulkUnLoadTcb::ExExeUtilHBaseBulkUnLoadTcb(
const ComTdbExeUtil & exe_util_tdb,
ex_globals * glob)
: ExExeUtilTcb( exe_util_tdb, NULL, glob),
step_(INITIAL_),
nextStep_(INITIAL_),
rowsAffected_(0),
snapshotsList_(NULL),
emptyTarget_(FALSE),
oneFile_(FALSE)
{
ehi_ = ExpHbaseInterface::newInstance(getGlobals()->getDefaultHeap(),
(char*)"", //Later may need to change to hblTdb.server_,
(char*)""); //Later may need to change to hblTdb.zkPort_);
qparent_.down->allocatePstate(this);
}
void ExExeUtilHBaseBulkUnLoadTcb::freeResources()
{
if (snapshotsList_)
{
for ( ; snapshotsList_->entries(); )
{
snapshotStruct *snp = snapshotsList_->at(0);
snapshotsList_->removeAt(0);
NADELETEBASIC(snp->fullTableName, getMyHeap());
NADELETEBASIC(snp->snapshotName, getMyHeap());
NADELETEBASIC( snp, getMyHeap());
snp->fullTableName = NULL;
snp->snapshotName = NULL;
snp = NULL;
}
NADELETEBASIC (snapshotsList_, getMyHeap());
snapshotsList_ = NULL;
}
NADELETE(ehi_, ExpHbaseInterface, getGlobals()->getDefaultHeap());
ehi_ = NULL;
}
ExExeUtilHBaseBulkUnLoadTcb::~ExExeUtilHBaseBulkUnLoadTcb()
{
freeResources();
}
short ExExeUtilHBaseBulkUnLoadTcb::resetExplainSettings()
{
if (cliInterface()->executeImmediate("control session reset 'EXPLAIN';") < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
return -1;
}
if (restoreCQD("generate_explain") < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
return -1;
}
return 0;
}
short ExExeUtilHBaseBulkUnLoadTcb::getTrafodionScanTables()
{
// Variables
SQLMODULE_ID * module = NULL;
SQLSTMT_ID * stmt = NULL;
SQLDESC_ID * sql_src = NULL;
SQLDESC_ID * input_desc = NULL;
SQLDESC_ID * output_desc = NULL;
char * outputBuf = NULL;
assert (snapshotsList_ != NULL);
Lng32 cliRC = 0;
if (holdAndSetCQD("generate_explain", "ON") < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
resetExplainSettings();
return -1;
}
// tell mxcmp that this prepare is for explain.
cliRC = cliInterface()->executeImmediate("control session 'EXPLAIN' 'ON';");
if (cliRC < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
resetExplainSettings();
return cliRC;
}
cliInterface()->clearGlobalDiags();
cliRC = cliInterface()->allocStuff(module, stmt, sql_src, input_desc, output_desc, "__EXPL_STMT_NAME__");
if (cliRC < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
resetExplainSettings();
return cliRC;
}
char * stmtStr = hblTdb().uldQuery_;
cliRC = cliInterface()->prepare(stmtStr, module, stmt, sql_src, input_desc, output_desc, NULL);
if (cliRC < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
cliInterface()->deallocStuff(module, stmt, sql_src, input_desc, output_desc);
resetExplainSettings();
return cliRC;
}
resetExplainSettings();
NAString qry_str = "";
qry_str = qry_str + "SELECT DISTINCT ";
qry_str = qry_str + "CASE ";
qry_str = qry_str + "WHEN (POSITION('full_table_name:' IN description) = 0 ) OR (POSITION('snapshot_name:' IN description) = 0) ";
qry_str = qry_str + "THEN NULL ";
qry_str = qry_str + "ELSE ";
qry_str = qry_str + "TRIM(SUBSTRING (description from POSITION('full_table_name:' IN description) + CHAR_LENGTH('full_table_name:') ";
qry_str = qry_str + "FOR POSITION('snapshot_name:' IN description) - ";
qry_str = qry_str + " POSITION('full_table_name:' IN description) - CHAR_LENGTH('full_table_name:'))) ";
qry_str = qry_str + "END AS full_table_name, ";
qry_str = qry_str + "CASE ";
qry_str = qry_str + "WHEN (POSITION('snapshot_temp_location:' IN description) = 0 ) OR ( POSITION('snapshot_name:' IN description) = 0) ";
qry_str = qry_str + "THEN NULL ";
qry_str = qry_str + "ELSE ";
qry_str = qry_str + "TRIM(SUBSTRING (description from POSITION('snapshot_name:' IN description) + CHAR_LENGTH('snapshot_name:') ";
qry_str = qry_str + " FOR POSITION('snapshot_temp_location:' IN description) - ";
qry_str = qry_str + " POSITION('snapshot_name:' IN description) - CHAR_LENGTH('snapshot_name:'))) ";
qry_str = qry_str + "END AS snapshot_name ";
qry_str = qry_str + "FROM TABLE (EXPLAIN (NULL,'__EXPL_STMT_NAME__')) WHERE TRIM(OPERATOR) LIKE 'TRAFODION%SCAN%' ; ";
Queue * tbls = NULL;
cliRC = cliInterface()->fetchAllRows(tbls, (char*)qry_str.data(), 0, FALSE, FALSE, TRUE);
if (cliRC < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
cliInterface()->deallocStuff(module, stmt, sql_src, input_desc, output_desc);
return cliRC;
}
if (tbls)
{
if (tbls->numEntries() == 0)
{
cliRC = cliInterface()->deallocStuff(module, stmt, sql_src, input_desc, output_desc);
if (cliRC < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
return cliRC;
}
}
tbls->position();
for (int ii = 0; ii < tbls->numEntries(); ii++)
{
OutputInfo * idx = (OutputInfo*) tbls->getNext();
snapshotStruct * snap = new (getMyHeap()) snapshotStruct();
snap->fullTableName = new (getMyHeap()) NAString(idx->get(0),getMyHeap());
snap->snapshotName = new (getMyHeap()) NAString(idx->get(1),getMyHeap());
//remove trailing spaces
snap->fullTableName->strip(NAString::trailing, ' ');
snap->snapshotName->strip(NAString::trailing, ' ');
ex_assert(snap->fullTableName->length()>0 &&
snap->snapshotName->length()>0 ,
"full table name and snapshot name cannot be empty");
snapshotsList_->insert(snap);
}
}
cliRC = cliInterface()->deallocStuff(module, stmt, sql_src, input_desc, output_desc);
if (cliRC < 0)
{
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
return cliRC;
}
return snapshotsList_->entries();
}
//////////////////////////////////////////////////////
// work() for ExExeUtilHbaseLoadTcb
//////////////////////////////////////////////////////
short ExExeUtilHBaseBulkUnLoadTcb::work()
{
Lng32 cliRC = 0;
Lng32 retcode = 0;
short rc;
HDFS_Client_RetCode hdfsClientRetCode = HDFS_CLIENT_OK;
Lng32 hbcRetCode = HBC_OK;
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
// if no room in up queue, won't be able to return data/status.
// Come back later.
if (qparent_.up->isFull())
return WORK_OK;
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExExeUtilPrivateState & pstate = *((ExExeUtilPrivateState*) pentry_down->pstate);
ExTransaction *ta = getGlobals()->castToExExeStmtGlobals()->
castToExMasterStmtGlobals()->getStatement()->getContext()->getTransaction();
while (1)
{
switch (step_)
{
case INITIAL_:
{
NABoolean xnAlreadyStarted = ta->xnInProgress();
if (xnAlreadyStarted )
{
//8111 - Transactions are not allowed with Bulk unload.
ComDiagsArea * da = getDiagsArea();
*da << DgSqlCode(-8111);
step_ = UNLOAD_ERROR_;
break;
}
setEmptyTarget(hblTdb().getEmptyTarget());
setOneFile(hblTdb().getOneFile());
if ((retcode = ehi_->init(NULL)) != HBASE_ACCESS_SUCCESS)
{
ExHbaseAccessTcb::setupError((NAHeap *)getMyHeap(),qparent_, retcode,
"ExpHbaseInterface_JNI::init");
handleError();
step_ = UNLOAD_END_ERROR_;
break;
}
if (!hblTdb().getOverwriteMergeFile() && hblTdb().getMergePath() != NULL)
{
NABoolean exists = FALSE;
hdfsClientRetCode = HdfsClient::hdfsExists( hblTdb().getMergePath(), exists);
if (hdfsClientRetCode != HDFS_CLIENT_OK)
{
createHdfsFileError(hdfsClientRetCode);
step_ = UNLOAD_END_ERROR_;
break;
}
if (exists)
{
//EXE_UNLOAD_FILE_EXISTS
ComDiagsArea * da = getDiagsArea();
*da << DgSqlCode(- EXE_UNLOAD_FILE_EXISTS)
<< DgString0(hblTdb().getMergePath());
step_ = UNLOAD_END_ERROR_;
break;
}
}
if (holdAndSetCQD("COMP_BOOL_226", "ON") < 0)
{
step_ = UNLOAD_END_ERROR_;
break;
}
if (hblTdb().getSkipWriteToFiles())
{
setEmptyTarget(FALSE);
setOneFile(FALSE);
}
if (setStartStatusMsgAndMoveToUpQueue("UNLOAD", &rc))
return rc;
if (hblTdb().getCompressType() == 0)
cliRC = holdAndSetCQD("TRAF_UNLOAD_HDFS_COMPRESS", "0");
else
cliRC = holdAndSetCQD("TRAF_UNLOAD_HDFS_COMPRESS", "1");
if (cliRC < 0)
{
step_ = UNLOAD_END_ERROR_;
break;
}
if (hblTdb().getScanType()== ComTdbExeUtilHBaseBulkUnLoad::REGULAR_SCAN)
cliRC = holdAndSetCQD("TRAF_TABLE_SNAPSHOT_SCAN", "NONE");
else
{
cliRC = holdAndSetCQD("TRAF_TABLE_SNAPSHOT_SCAN", "SUFFIX");
if (cliRC < 0)
{
step_ = UNLOAD_END_ERROR_;
break;
}
}
cliRC = holdAndSetCQD("TRAF_TABLE_SNAPSHOT_SCAN_TABLE_SIZE_THRESHOLD", "0");
if (cliRC < 0)
{
step_ = UNLOAD_END_ERROR_;
break;
}
if (hblTdb().getSnapshotSuffix() != NULL)
{
cliRC = holdAndSetCQD("TRAF_TABLE_SNAPSHOT_SCAN_SNAP_SUFFIX", hblTdb().getSnapshotSuffix());
if (cliRC < 0)
{
step_ = UNLOAD_END_ERROR_;
break;
}
}
step_ = UNLOAD_;
if (hblTdb().getScanType() == ComTdbExeUtilHBaseBulkUnLoad::SNAPSHOT_SCAN_CREATE )
step_ = CREATE_SNAPSHOTS_;
else if (hblTdb().getScanType() == ComTdbExeUtilHBaseBulkUnLoad::SNAPSHOT_SCAN_EXISTING )
step_ = VERIFY_SNAPSHOTS_;
if (getEmptyTarget())
step_ = EMPTY_TARGET_;
}
break;
case EMPTY_TARGET_:
{
if (setStartStatusMsgAndMoveToUpQueue(" EMPTY TARGET ", &rc, 0, TRUE))
return rc;
NAString uldPath ( hblTdb().getExtractLocation());
hdfsClientRetCode = HdfsClient::hdfsCleanUnloadPath( uldPath);
if (hdfsClientRetCode != HDFS_CLIENT_OK)
{
createHdfsFileError(hdfsClientRetCode);
step_ = UNLOAD_END_ERROR_;
break;
}
step_ = UNLOAD_;
if (hblTdb().getScanType() == ComTdbExeUtilHBaseBulkUnLoad::SNAPSHOT_SCAN_CREATE) //SNAPSHOT_SCAN_CREATE_ = 1
step_ = CREATE_SNAPSHOTS_;
else if (hblTdb().getScanType() == ComTdbExeUtilHBaseBulkUnLoad::SNAPSHOT_SCAN_EXISTING )
step_ = VERIFY_SNAPSHOTS_;
setEndStatusMsg(" EMPTY TARGET ", 0, TRUE);
}
break;
case CREATE_SNAPSHOTS_:
case VERIFY_SNAPSHOTS_:
{
NABoolean createSnp = (step_ == CREATE_SNAPSHOTS_)? TRUE : FALSE;
NAString msg(createSnp ? " CREATE SNAPSHOTS " : " VERIFY SNAPSHOTS ");
NAString msg2 (createSnp ? "created" : "verified");
if (setStartStatusMsgAndMoveToUpQueue( msg.data() , &rc, 0, TRUE))
return rc;
assert (snapshotsList_ == NULL);
snapshotsList_ = new (getMyHeap()) NAList<struct snapshotStruct *> (getMyHeap());
Lng32 rc = getTrafodionScanTables();
if (rc < 0)
{
step_ = UNLOAD_END_ERROR_;
break;
}
for ( int i = 0 ; i < snapshotsList_->entries(); i++)
{
if (createSnp)
hbcRetCode = ehi_->createSnapshot( *snapshotsList_->at(i)->fullTableName, *snapshotsList_->at(i)->snapshotName);
else
{
NABoolean exist = FALSE;
hbcRetCode = ehi_->verifySnapshot(*snapshotsList_->at(i)->fullTableName, *snapshotsList_->at(i)->snapshotName, exist);
if ( hbcRetCode == HBC_OK && !exist)
{
ComDiagsArea * da = getDiagsArea();
*da << DgSqlCode(-8112)
<< DgString0(snapshotsList_->at(i)->snapshotName->data())
<< DgString1(snapshotsList_->at(i)->fullTableName->data());
step_ = UNLOAD_END_ERROR_;
break;
}
}
if (hbcRetCode != HBC_OK)
{
ExHbaseAccessTcb::setupError((NAHeap *)getMyHeap(),qparent_, hbcRetCode,
"HBaseClient_JNI::createSnapshot/verifySnapshot",
snapshotsList_->at(i)->snapshotName->data() );
handleError();
step_ = UNLOAD_END_ERROR_;
break;
}
}
if (step_ == UNLOAD_END_ERROR_)
break;
step_ = UNLOAD_;
sprintf(statusMsgBuf_," Snapshots %s: %d %c",msg2.data(), (int)snapshotsList_->entries(), '\n' );
int len = strlen(statusMsgBuf_);
setEndStatusMsg(msg.data(), len, TRUE);
}
break;
case UNLOAD_:
{
if (setStartStatusMsgAndMoveToUpQueue(" EXTRACT ", &rc, 0, TRUE))
return rc;
rowsAffected_ = 0;
cliRC = cliInterface()->executeImmediate(hblTdb().uldQuery_,
NULL,
NULL,
TRUE,
&rowsAffected_);
if (cliRC < 0)
{
rowsAffected_ = 0;
cliInterface()->retrieveSQLDiagnostics(getDiagsArea());
step_ = UNLOAD_END_ERROR_;
break;
}
step_ = UNLOAD_END_;
if (getOneFile())
step_ = MERGE_FILES_;
if (hblTdb().getScanType() == ComTdbExeUtilHBaseBulkUnLoad::SNAPSHOT_SCAN_CREATE )
step_ = DELETE_SNAPSHOTS_;
if (hblTdb().getSkipWriteToFiles())
sprintf(statusMsgBuf_," Rows Processed but NOT Written to Disk: %ld %c",rowsAffected_, '\n' );
else
sprintf(statusMsgBuf_," Rows Processed: %ld %c",rowsAffected_, '\n' );
int len = strlen(statusMsgBuf_);
setEndStatusMsg(" EXTRACT ", len, TRUE);
}
break;
case DELETE_SNAPSHOTS_:
{
if (setStartStatusMsgAndMoveToUpQueue(" DELETE SNAPSHOTS ", &rc, 0, TRUE))
return rc;
for ( int i = 0 ; i < snapshotsList_->entries(); i++)
{
hbcRetCode = ehi_->deleteSnapshot( *snapshotsList_->at(i)->snapshotName);
if (hbcRetCode != HBC_OK)
{
ExHbaseAccessTcb::setupError((NAHeap *)getMyHeap(),qparent_, hbcRetCode,
"HBaseClient_JNI::createSnapshot/verifySnapshot",
snapshotsList_->at(i)->snapshotName->data() );
handleError();
step_ = UNLOAD_END_ERROR_;
break;
}
}
if (step_ == UNLOAD_END_ERROR_)
break;
step_ = UNLOAD_END_;
if (getOneFile())
step_ = MERGE_FILES_;
sprintf(statusMsgBuf_," Snapshots deleted: %d %c",(int)snapshotsList_->entries(), '\n' );
int len = strlen(statusMsgBuf_);
setEndStatusMsg(" DELETE SNAPSHOTS ", len, TRUE);
}
break;
case MERGE_FILES_:
{
if (setStartStatusMsgAndMoveToUpQueue(" MERGE FILES ", &rc, 0, TRUE))
return rc;
NAString srcPath ( hblTdb().getExtractLocation());
NAString dstPath ( hblTdb().getMergePath());
hdfsClientRetCode = HdfsClient::hdfsMergeFiles( srcPath, dstPath);
if (hdfsClientRetCode != HDFS_CLIENT_OK)
{
createHdfsFileError(hdfsClientRetCode);
step_ = UNLOAD_END_;
break;
}
step_ = UNLOAD_END_;
setEndStatusMsg(" MERGE FILES ", 0, TRUE);
}
break;
case UNLOAD_END_:
case UNLOAD_END_ERROR_:
{
ehi_->close();
if (restoreCQD("TRAF_TABLE_SNAPSHOT_SCAN") < 0)
{
step_ = UNLOAD_ERROR_;
break;
}
if (restoreCQD("TRAF_TABLE_SNAPSHOT_SCAN_TABLE_SIZE_THRESHOLD") < 0)
{
step_ = UNLOAD_ERROR_;
break;
}
if (hblTdb().getSnapshotSuffix() != NULL)
{
if (restoreCQD("TRAF_TABLE_SNAPSHOT_SCAN_SNAP_SUFFIX") < 0)
{
step_ = UNLOAD_ERROR_;
break;
}
}
if (restoreCQD("COMP_BOOL_226") < 0)
{
step_ = UNLOAD_ERROR_;
break;
}
if ( restoreCQD("TRAF_UNLOAD_HDFS_COMPRESS") < 0)
{
step_ = UNLOAD_ERROR_;
break;
}
if (step_ == UNLOAD_END_)
step_ = DONE_;
else
step_ = UNLOAD_ERROR_;
}
break;
case RETURN_STATUS_MSG_:
{
if (moveRowToUpQueue(statusMsgBuf_,0,&rc))
return rc;
step_ = nextStep_;
}
break;
case DONE_:
{
if (qparent_.up->isFull())
return WORK_OK;
// Return EOF.
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
up_entry->upState.parentIndex = pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(0);
up_entry->upState.status = ex_queue::Q_NO_DATA;
ComDiagsArea *diagsArea = up_entry->getDiagsArea();
if (diagsArea == NULL)
diagsArea = ComDiagsArea::allocate(getMyHeap());
else
diagsArea->incrRefCount(); // setDiagsArea call below will decr ref count
diagsArea->setRowCount(rowsAffected_);
if (getDiagsArea())
diagsArea->mergeAfter(*getDiagsArea());
up_entry->setDiagsArea(diagsArea);
// insert into parent
qparent_.up->insert();
step_ = INITIAL_;
qparent_.down->removeHead();
freeResources();
return WORK_OK;
}
break;
case UNLOAD_ERROR_:
{
if (qparent_.up->isFull())
return WORK_OK;
// Return EOF.
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
up_entry->upState.parentIndex = pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(0);
up_entry->upState.status = ex_queue::Q_SQLERROR;
ComDiagsArea *diagsArea = up_entry->getDiagsArea();
if (diagsArea == NULL)
diagsArea = ComDiagsArea::allocate(getMyHeap());
else
diagsArea->incrRefCount(); // setDiagsArea call below will decr ref count
if (getDiagsArea())
diagsArea->mergeAfter(*getDiagsArea());
up_entry->setDiagsArea(diagsArea);
// insert into parent
qparent_.up->insert();
pstate.matches_ = 0;
step_ = DONE_;
}
break;
} // switch
} // while
return WORK_OK;
}
short ExExeUtilHBaseBulkUnLoadTcb::moveRowToUpQueue(const char * row, Lng32 len,
short * rc, NABoolean isVarchar)
{
if (hblTdb().getNoOutput())
return 0;
return ExExeUtilTcb::moveRowToUpQueue(row, len, rc, isVarchar);
}
short ExExeUtilHBaseBulkUnLoadTcb::setStartStatusMsgAndMoveToUpQueue(const char * operation,
short * rc,
int bufPos,
NABoolean withtime)
{
if (hblTdb().getNoOutput())
return 0;
char timeBuf[200];
if (withtime) {
startTime_ = NA_JulianTimestamp();
getTimestampAsString(startTime_, timeBuf);
}
getStatusString(operation, "Started",NULL, &statusMsgBuf_[bufPos], FALSE, (withtime ? timeBuf : NULL));
return moveRowToUpQueue(statusMsgBuf_,0,rc);
}
void ExExeUtilHBaseBulkUnLoadTcb::setEndStatusMsg(const char * operation,
int bufPos,
NABoolean withtime)
{
if (hblTdb().getNoOutput())
return ;
char timeBuf[200];
nextStep_ = step_;
step_ = RETURN_STATUS_MSG_;
if (withtime)
{
endTime_ = NA_JulianTimestamp();
Int64 elapsedTime = endTime_ - startTime_;
getTimestampAsString(endTime_, timeBuf);
getStatusString(operation, "Ended", hblTdb().getTableName(),&statusMsgBuf_[bufPos], FALSE, withtime ? timeBuf : NULL);
bufPos = strlen(statusMsgBuf_);
statusMsgBuf_[bufPos] = '\n';
bufPos++;
getTimeAsString(elapsedTime, timeBuf);
}
getStatusString(operation, "Ended", NULL,&statusMsgBuf_[bufPos], TRUE, withtime ? timeBuf : NULL);
}
////////////////////////////////////////////////////////////////////////
// Redefine virtual method allocatePstates, to be used by dynamic queue
// resizing, as well as the initial queue construction.
////////////////////////////////////////////////////////////////////////
ex_tcb_private_state * ExExeUtilHBaseBulkUnLoadTcb::allocatePstates(
Lng32 &numElems, // inout, desired/actual elements
Lng32 &pstateLength) // out, length of one element
{
PstateAllocator<ExExeUtilHbaseUnLoadPrivateState> pa;
return pa.allocatePstates(this, numElems, pstateLength);
}
/////////////////////////////////////////////////////////////////////////////
// Constructor and destructor for ExeUtil_private_state
/////////////////////////////////////////////////////////////////////////////
ExExeUtilHbaseUnLoadPrivateState::ExExeUtilHbaseUnLoadPrivateState()
{
}
ExExeUtilHbaseUnLoadPrivateState::~ExExeUtilHbaseUnLoadPrivateState()
{
};
///////////////////////////////////////////////////////////////////
// class ExExeUtilLobExtractTdb
///////////////////////////////////////////////////////////////
ex_tcb * ExExeUtilLobExtractTdb::build(ex_globals * glob)
{
ExExeUtilLobExtractTcb * exe_util_tcb;
ex_tcb * childTcb = NULL;
if (child_)
{
// build the child first
childTcb = child_->build(glob);
}
if ((getToType() == ComTdbExeUtilLobExtract::TO_EXTERNAL_FROM_STRING_) ||
(getToType() == ComTdbExeUtilLobExtract::TO_EXTERNAL_FROM_FILE_))
exe_util_tcb = new(glob->getSpace())
ExExeUtilFileLoadTcb(*this, childTcb, glob);
else if (srcIsFile())
exe_util_tcb = new(glob->getSpace())
ExExeUtilFileExtractTcb(*this, childTcb, glob);
else
exe_util_tcb = new(glob->getSpace())
ExExeUtilLobExtractTcb(*this, childTcb, glob);
exe_util_tcb->registerSubtasks();
return (exe_util_tcb);
}
ExExeUtilLobExtractTcb::ExExeUtilLobExtractTcb
(
const ComTdbExeUtilLobExtract & exe_util_tdb,
const ex_tcb * child_tcb,
ex_globals * glob)
: ExExeUtilTcb(exe_util_tdb, child_tcb, glob),
step_(EMPTY_)
{
ContextCli *currContext =
getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals()->
getStatement()->getContext();
lobHandleLen_ = 2050;
lobHandle_[0] = '\0';
lobInputHandleBuf_[0] = '\0';
lobNameBuf_[0] = '\0';
lobNameLen_ =1024;
lobName_ = NULL;
statusString_[0] = '\0';
lobType_ = 0;
lobData_= NULL;
lobData2_= NULL;
lobDataSpecifiedExtractLen_ = 0; // default. Actual value set from tdb below
lobDataLen_= 0;
remainingBytes_= 0;
currPos_ = 0;
numChildRows_ = 0;
requestTag_ = -1;
lobLoc_[0] = '\0';
exLobGlobals_ = NULL;
ExpLOBinterfaceInit(exLobGlobals_,currContext->exHeap(),currContext,TRUE,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort());
}
void ExExeUtilLobExtractTcb::freeResources()
{
Lng32 cliRC = 0;
Lng32 retcode = 0;
ExLobGlobals * lobGlobs = getLobGlobals();
ContextCli *currContext =
getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals()->
getStatement()->getContext();
//close any open cursors.
if (lobHandle_ and lobName_)
retcode = ExpLOBInterfaceSelectCursor
(lobGlobs,
lobName_,
lobLoc_,
lobType_,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),
lobHandleLen_, lobHandle_,
0, //cursor bytes
NULL, //cursor id
requestTag_,
Lob_Buffer,
0, // not check status
1, // waited op
0, lobDataSpecifiedExtractLen_,
lobDataLen_, lobData_,
3, // close
0); // open type not applicable
ExpLOBinterfaceCleanup(exLobGlobals_);
exLobGlobals_ = NULL;
}
ExExeUtilLobExtractTcb::~ExExeUtilLobExtractTcb()
{
freeResources();
}
short ExExeUtilLobExtractTcb::work()
{
Lng32 cliRC = 0;
Lng32 retcode = 0;
Int64 lobDataOutputLen = 0;
Int64 requestTag = -1;
LobsSubOper so = Lob_None;
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExExeUtilPrivateState & pstate =
*((ExExeUtilPrivateState*) pentry_down->pstate);
ContextCli *currContext =
getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals()->
getStatement()->getContext();
ComDiagsArea & diags = currContext->diags();
ExLobGlobals * lobGlobs = getLobGlobals();
ex_queue_entry * centry = NULL;
while (1)
{
switch (step_)
{
case EMPTY_:
{
workAtp_->getTupp(lobTdb().workAtpIndex())
.setDataPointer(lobInputHandleBuf_);
if (getChild(0))
step_ = SEND_REQ_TO_CHILD_;
else
step_ = GET_NO_CHILD_HANDLE_;
}
break;
case GET_NO_CHILD_HANDLE_:
{
ex_expr::exp_return_type exprRetCode =
lobTdb().inputExpr_->eval(NULL,
workAtp_);
if (exprRetCode == ex_expr::EXPR_ERROR)
{
step_ = CANCEL_;
break;
}
step_ = GET_LOB_HANDLE_;
}
break;
case SEND_REQ_TO_CHILD_:
{
if (qchild_.down->isFull())
return WORK_OK;
ex_queue_entry * centry = qchild_.down->getTailEntry();
centry->downState.request = ex_queue::GET_ALL;
centry->downState.requestValue =
pentry_down->downState.requestValue;
centry->downState.parentIndex = qparent_.down->getHeadIndex();
// set the child's input atp
centry->passAtp(pentry_down->getAtp());
qchild_.down->insert();
numChildRows_ = 0;
step_ = GET_REPLY_FROM_CHILD_;
}
break;
case GET_REPLY_FROM_CHILD_:
{
// if nothing returned from child. Get outta here.
if (qchild_.up->isEmpty())
return WORK_OK;
// check if we've got room in the up queue
if (qparent_.up->isFull())
return WORK_OK; // parent queue is full. Just return
centry = qchild_.up->getHeadEntry();
ex_queue::up_status child_status = centry->upState.status;
switch(child_status)
{
case ex_queue::Q_NO_DATA:
{
qchild_.up->removeHead();
if (numChildRows_ == 0)
step_ = DONE_;
else if (numChildRows_ == 1)
step_ = GET_LOB_HANDLE_;
else
{
Lng32 cliError = 0;
Lng32 intParam1 = 0;
ComDiagsArea * diagsArea = getDiagsArea();
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(8444), NULL, &intParam1,
&cliError, NULL, NULL);
step_ = HANDLE_ERROR_;
}
}
break;
case ex_queue::Q_OK_MMORE:
{
if (numChildRows_ == 0) // first child row
{
ex_expr::exp_return_type exprRetCode =
lobTdb().inputExpr_->eval(centry->getAtp(),
workAtp_);
if (exprRetCode == ex_expr::EXPR_ERROR)
{
step_ = CANCEL_;
break;
}
}
qchild_.up->removeHead();
numChildRows_++;
// step_ = GET_LOB_HANDLE_;
}
break;
case ex_queue::Q_INVALID:
{
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
// invalid state, should not be reached.
ComDiagsArea * da = up_entry->getDiagsArea();
ExRaiseSqlError(getMyHeap(),
&da,
(ExeErrorCode)(EXE_INTERNAL_ERROR));
step_ = CANCEL_;
}
break;
case ex_queue::Q_SQLERROR:
{
step_ = ERROR_FROM_CHILD_;
}
break;
}
}
break;
case ERROR_FROM_CHILD_:
{
// check if we've got room in the up queue
if (qparent_.up->isFull())
return WORK_OK; // parent queue is full. Just return
ex_queue_entry *pentry_up = qparent_.up->getTailEntry();
ex_queue_entry * centry = qchild_.up->getHeadEntry();
qchild_.down->cancelRequestWithParentIndex(qparent_.down->getHeadIndex());
pentry_up->copyAtp(centry);
pentry_up->upState.status = ex_queue::Q_SQLERROR;
pentry_up->upState.parentIndex = pentry_down->downState.parentIndex;
pentry_up->upState.downIndex = qparent_.down->getHeadIndex();
qparent_.up->insert();
qchild_.up->removeHead();
step_ = CANCEL_;
}
break;
case CANCEL_:
{
// ignore all up rows from child. wait for Q_NO_DATA.
if (qchild_.up->isEmpty())
return WORK_OK;
ex_queue_entry * centry = qchild_.up->getHeadEntry();
switch(centry->upState.status)
{
case ex_queue::Q_OK_MMORE:
case ex_queue::Q_SQLERROR:
case ex_queue::Q_INVALID:
{
qchild_.up->removeHead();
}
break;
case ex_queue::Q_NO_DATA:
{
qchild_.up->removeHead();
step_ = HANDLE_ERROR_;
}
break;
}
}
break;
case GET_LOB_HANDLE_:
{
if (lobTdb().handleInStringFormat())
{
if (ExpLOBoper::genLOBhandleFromHandleString
(lobTdb().getHandle(),
lobTdb().getHandleLen(),
lobHandle_,
lobHandleLen_))
{
ComDiagsArea * da = getDiagsArea();
ExRaiseSqlError(getMyHeap(),
&da,
(ExeErrorCode)(EXE_INVALID_LOB_HANDLE));
step_ = HANDLE_ERROR_;
break;
}
}
else
{
lobHandleLen_ = *(short*)&lobInputHandleBuf_[sizeof(short)]; //lobTdb().getHandleLen();
str_cpy_all(lobHandle_, &lobInputHandleBuf_[sizeof(short)], lobHandleLen_); //lobTdb().getHandle();
if (*(short*)lobInputHandleBuf_ != 0) //null value
{
step_ = DONE_;
break;
}
}
Int16 flags;
Lng32 lobNum;
Int64 uid, inDescSyskey, descPartnKey;
short schNameLen;
char schName[1024];
ExpLOBoper::extractFromLOBhandle(&flags, &lobType_, &lobNum, &uid,
&inDescSyskey, &descPartnKey,
&schNameLen, (char *)schName,
(char *)lobHandle_, (Lng32)lobHandleLen_);
//Retrieve the lobLocation for this lobNum which will be used
//in the other steps_ which open and read lob data file.
short *lobNumList = new (getHeap()) short[1];
short *lobTypList = new (getHeap()) short[1];
char **lobLocList = new (getHeap()) char*[1];
char **lobColNameList = new (getHeap()) char*[1];
lobLocList[0] = new (getHeap()) char[1024];
lobColNameList[0] = new (getHeap()) char[256];
Lng32 numLobs = lobNum;
Lng32 cliRC = SQL_EXEC_LOBddlInterface
(
schName,
schNameLen,
uid,
numLobs,
LOB_CLI_SELECT_UNIQUE,
lobNumList,
lobTypList,
lobLocList,lobColNameList,lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),0,FALSE);
if (cliRC < 0)
{
getDiagsArea()->mergeAfter(diags);
step_ = HANDLE_ERROR_;
break;
}
strcpy(lobLoc_, lobLocList[0]);
NADELETEBASIC(lobColNameList[0],getHeap());
NADELETEBASIC(lobNumList,getHeap());
NADELETEBASIC(lobTypList,getHeap());
if (lobTdb().getToType() == ComTdbExeUtilLobExtract::RETRIEVE_HDFSFILENAME_)
step_ = EXTRACT_HDFSFILENAME_;
else if (lobTdb().getToType() == ComTdbExeUtilLobExtract::RETRIEVE_OFFSET_)
step_ = RETRIEVE_OFFSET_;
else if (lobTdb().getToType() == ComTdbExeUtilLobExtract::TO_BUFFER_)
step_ = EXTRACT_LOB_DATA_;
else if ((lobTdb().getToType() == ComTdbExeUtilLobExtract::RETRIEVE_LENGTH_) || (lobTdb().getToType() == ComTdbExeUtilLobExtract::TO_FILE_))
step_ = RETRIEVE_LOB_LENGTH_;
else
{
// invalid "toType"
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
ComDiagsArea * da = up_entry->getDiagsArea();
ExRaiseSqlError(getMyHeap(),
&da,
(ExeErrorCode)(EXE_INTERNAL_ERROR));
step_ = CANCEL_;
break;
}
break;
}
case EXTRACT_HDFSFILENAME_:
{
Int16 flags;
Lng32 lobNum;
Int64 uid, inDescSyskey, descPartnKey;
short schNameLen;
char schName[1024]={'\0'};
char hdfsFileName[MAX_LOB_FILE_NAME_LEN]={'\0'};
Int32 fileNameLen = 0;
ExpLOBoper::extractFromLOBhandle(&flags, &lobType_, &lobNum, &uid,
&inDescSyskey, &descPartnKey,
&schNameLen, (char *)schName,
(char *)lobHandle_, (Lng32)lobHandleLen_);
lobName_ = ExpLOBoper::ExpGetLOBname(uid, lobNum, lobNameBuf_, 1000);
//Retrieve the filename of this lob using the handle info and return to the caller
retcode = ExpLOBInterfaceGetFileName( lobGlobs,
lobName_,
lobLoc_,
lobType_,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),
lobHandleLen_, lobHandle_,
hdfsFileName,
fileNameLen);
if ((lobTdb().getBufAddr() != -1) && (lobTdb().getBufAddr() != 0))
str_cpy_all((char *)lobTdb().getBufAddr(), (char *)&lobDataLen_,sizeof(Int64));
str_sprintf(statusString_," LOB filename : %s", hdfsFileName);
step_ = RETURN_STATUS_;
break;
}
break;
case RETRIEVE_LOB_LENGTH_ :
{
Int16 flags;
Lng32 lobNum;
Int64 uid, inDescSyskey, descPartnKey;
short schNameLen;
char schName[1024];
ExpLOBoper::extractFromLOBhandle(&flags, &lobType_, &lobNum, &uid,
&inDescSyskey, &descPartnKey,
&schNameLen, (char *)schName,
(char *)lobHandle_, (Lng32)lobHandleLen_);
lobName_ = ExpLOBoper::ExpGetLOBname(uid, lobNum, lobNameBuf_, 1000);
//Retrieve the total length of this lob using the handle info and return to the caller
retcode = ExpLOBInterfaceGetLobLength( lobGlobs,
lobName_,
lobLoc_,
lobType_,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),
lobHandleLen_, lobHandle_,
lobDataLen_);
if (lobTdb().retrieveLength())
{
if ((lobTdb().getBufAddr() != -1) && (lobTdb().getBufAddr() != 0))
str_cpy_all((char *)lobTdb().getBufAddr(), (char *)&lobDataLen_,sizeof(Int64));
str_sprintf(statusString_," LOB Length : %ld", lobDataLen_);
step_ = RETURN_STATUS_;
break;
}
else
step_ = EXTRACT_LOB_DATA_;
break;
}
case RETRIEVE_OFFSET_ :
{
Int16 flags;
Lng32 lobNum;
Int64 uid, inDescSyskey, descPartnKey;
short schNameLen;
char schName[1024];
Int64 lobOffset = 0;
ExpLOBoper::extractFromLOBhandle(&flags, &lobType_, &lobNum, &uid,
&inDescSyskey, &descPartnKey,
&schNameLen, (char *)schName,
(char *)lobHandle_, (Lng32)lobHandleLen_);
lobName_ = ExpLOBoper::ExpGetLOBname(uid, lobNum, lobNameBuf_, 1000);
//Retrieve the total length of this lob using the handle info and return to the caller
retcode = ExpLOBInterfaceGetOffset( lobGlobs,
lobName_,
lobLoc_,
lobType_,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),
lobHandleLen_, lobHandle_,
lobOffset);
if ((lobTdb().getBufAddr() != -1) && (lobTdb().getBufAddr() != 0))
str_cpy_all((char *)lobTdb().getBufAddr(), (char *)&lobOffset,sizeof(Int64));
str_sprintf(statusString_," LOB Offset : %ld", lobOffset);
step_ = RETURN_STATUS_;
break;
}
case EXTRACT_LOB_DATA_ :
{
Int16 flags;
Lng32 lobNum;
Int64 uid, inDescSyskey, descPartnKey;
short schNameLen;
char schName[1024];
ExpLOBoper::extractFromLOBhandle(&flags, &lobType_, &lobNum, &uid,
&inDescSyskey, &descPartnKey,
&schNameLen, (char *)schName,
(char *)lobHandle_, (Lng32)lobHandleLen_);
lobName_ = ExpLOBoper::ExpGetLOBname(uid, lobNum, lobNameBuf_, 1000);
lobDataSpecifiedExtractLen_ = lobTdb().totalBufSize_;
if (lobDataSpecifiedExtractLen_ == 0)
{
// Passed in length is 0 indicates the caller is done with
// this lobhandle and wants to close this cursor
step_ = CLOSE_CURSOR_;
break;
}
// Read the lob contents into target file
if (lobTdb().getToType() == ComTdbExeUtilLobExtract::TO_FILE_)
{
so = Lob_File;
LobTgtFileFlags tgtFlags = Lob_Error_Or_Create;
if (lobTdb().errorIfNotExists() && !lobTdb().truncateExisting())
tgtFlags = Lob_Append_Or_Error;
if (lobTdb().truncateExisting() &&lobTdb().errorIfNotExists() )
tgtFlags = Lob_Truncate_Or_Error;
if (lobTdb().truncateExisting() && !lobTdb().errorIfNotExists())
tgtFlags = Lob_Truncate_Or_Create;
if(lobTdb().appendOrCreate())
tgtFlags = Lob_Append_Or_Create;
retcode = ExpLOBInterfaceSelect(lobGlobs,
lobName_,
lobLoc_,
lobType_,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),
lobHandleLen_,
lobHandle_,
requestTag,
so,
-1,
0,0,
0, lobDataLen_, lobDataOutputLen,
lobTdb().getFileName(),
lobDataSpecifiedExtractLen_,
(Int32)tgtFlags
);
if (retcode <0)
{
Lng32 intParam1 = -retcode;
Lng32 cliError;
ComDiagsArea * diagsArea = getDiagsArea();
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(8442), NULL, &intParam1,
&cliError, NULL, (char*)"ExpLOBInterfaceSelect",
getLobErrStr(intParam1));
step_ = HANDLE_ERROR_;
break;
}
str_sprintf(statusString_, "Success. Targetfile :%s Length : %ld", lobTdb().getFileName(), lobDataOutputLen);
step_ = RETURN_STATUS_;
}
else if (lobTdb().getToType() == ComTdbExeUtilLobExtract::TO_BUFFER_)
{
so = Lob_Buffer;
lobData_ = (char *)lobTdb().getBufAddr();
lobDataSpecifiedExtractLen_ = *((Int64 *)(lobTdb().dataExtractSizeIOAddr()));
step_ = OPEN_CURSOR_;
}
}
break;
case OPEN_CURSOR_:
{
retcode = ExpLOBInterfaceSelectCursor
(lobGlobs,
lobName_,
lobLoc_,
lobType_,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),
lobHandleLen_, lobHandle_,
0, // cursor bytes
NULL, //cursor id
requestTag_,
Lob_Buffer,
0, // not check status
1, // waited op
0, lobDataSpecifiedExtractLen_,
lobDataOutputLen, lobData_,
1, // open
2); // must open
if (retcode < 0)
{
Lng32 cliError = 0;
Lng32 intParam1 = -retcode;
ComDiagsArea * diagsArea = getDiagsArea();
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(8442), NULL, &intParam1,
&cliError, NULL, (char*)"ExpLOBInterfaceSelectCursor",
getLobErrStr(intParam1));
step_ = HANDLE_ERROR_;
break;
}
step_ = READ_CURSOR_;
}
break;
case READ_CURSOR_:
{
if (lobTdb().getToType() == ComTdbExeUtilLobExtract::TO_BUFFER_)
so = Lob_Buffer;
lobDataSpecifiedExtractLen_ = *((Int64 *)(lobTdb().dataExtractSizeIOAddr()));
if (lobDataSpecifiedExtractLen_ == 0)
{
// Passed in length is 0 indicates the caller is done with
// this lobhandle and wants to close this cursor
step_ = CLOSE_CURSOR_;
break;
}
retcode = ExpLOBInterfaceSelectCursor
(lobGlobs,
lobName_,
lobLoc_,
lobType_,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),
lobHandleLen_, lobHandle_,
0 , //cursor bytes,
NULL, //cursor id
requestTag_,
so,
0, // not check status
1, // waited op
0,
lobDataSpecifiedExtractLen_,
//lobDataLen_, lobData_,
lobDataOutputLen,
lobData_,
2, // read
0); // open type not applicable
if (retcode < 0)
{
Lng32 cliError = 0;
Lng32 intParam1 = -retcode;
ComDiagsArea * diagsArea = getDiagsArea();
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(8442), NULL, &intParam1,
&cliError, NULL, (char*)"ExpLOBInterfaceSelectCursor",
getLobErrStr(intParam1));
step_ = HANDLE_ERROR_;
break;
}
if (lobDataOutputLen == 0)
{
step_ = CLOSE_CURSOR_;
break;
}
remainingBytes_ = lobDataOutputLen;
currPos_ = 0;
if (lobTdb().getToType() == ComTdbExeUtilLobExtract::TO_BUFFER_)
{
str_sprintf(statusString_," Success: LOB data length returned : %ld", lobDataOutputLen);
//lobTdb().setExtractSizeIOAddr((Int64)(&lobDataOutputLen));
memcpy((char *)lobTdb().dataExtractSizeIOAddr(), (char *)&lobDataOutputLen,sizeof(Int64));
step_ = RETURN_STATUS_;
}
else
{
// No other "toType" shoudl reach here - i.e TO_FILE_ or TO_STRING
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
ComDiagsArea * da = up_entry->getDiagsArea();
ExRaiseSqlError(getMyHeap(),
&da,
(ExeErrorCode)(EXE_INTERNAL_ERROR));
step_ = CANCEL_;
break;
}
}
break;
case CLOSE_CURSOR_:
{
retcode = ExpLOBInterfaceSelectCursor
(lobGlobs,
lobName_,
lobLoc_,
lobType_,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),
lobHandleLen_, lobHandle_,
0, //cursor bytes
NULL, //cursor id
requestTag_,
so,
0, // not check status
1, // waited op
0, lobDataSpecifiedExtractLen_,
lobDataLen_, lobData_,
3, // close
0); // open type not applicable
if (retcode < 0)
{
Lng32 cliError = 0;
Lng32 intParam1 = -retcode;
ComDiagsArea * diagsArea = getDiagsArea();
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(8442), NULL, &intParam1,
&cliError, NULL, (char*)"ExpLOBInterfaceSelectCursor",
getLobErrStr(intParam1));
step_ = HANDLE_ERROR_;
break;
}
step_ = DONE_;
}
break;
case RETURN_STATUS_:
{
if (qparent_.up->isFull())
return WORK_OK;
//Return to upqueue whatever is in the lobStatusMsg_ data member
short rc;
moveRowToUpQueue(statusString_, 200, &rc);
if ((so == Lob_Buffer) && (remainingBytes_ >= 0))
{
step_ = READ_CURSOR_;
qparent_.down->removeHead();
return WORK_RESCHEDULE_AND_RETURN;
}
else
step_ = DONE_ ;
}
break;
case HANDLE_ERROR_:
{
retcode = handleError();
if (retcode == 1)
return WORK_OK;
step_ = DONE_;
}
break;
case DONE_:
{
retcode = handleDone();
if (retcode == 1)
return WORK_OK;
step_ = EMPTY_;
return WORK_OK;
}
break;
} // switch
}
return 0;
}
ExExeUtilFileExtractTcb::ExExeUtilFileExtractTcb
(
const ComTdbExeUtilLobExtract & exe_util_tdb,
const ex_tcb * child_tcb,
ex_globals * glob)
: ExExeUtilLobExtractTcb(exe_util_tdb, child_tcb, glob)
{
}
///////////////////////////////////////////////////////////////////
// class ExExeUtilLobUpdateTdb
///////////////////////////////////////////////////////////////
ex_tcb * ExExeUtilLobUpdateTdb::build(ex_globals * glob)
{
ExExeUtilLobUpdateTcb * exe_util_lobupdate_tcb = NULL;
ex_tcb * childTcb = NULL;
if (child_)
{
// build the child first
childTcb = child_->build(glob);
}
if (getFromType() == ComTdbExeUtilLobUpdate::FROM_BUFFER_)
exe_util_lobupdate_tcb = new(glob->getSpace())
ExExeUtilLobUpdateTcb(*this, childTcb, glob);
else
{
ex_assert(TRUE,"Only buffer input supported");
}
exe_util_lobupdate_tcb->registerSubtasks();
return (exe_util_lobupdate_tcb);
}
ExExeUtilLobUpdateTcb::ExExeUtilLobUpdateTcb
(
const ComTdbExeUtilLobUpdate & exe_util_lobupdate_tdb,
const ex_tcb * child_tcb,
ex_globals * glob)
: ExExeUtilTcb(exe_util_lobupdate_tdb, child_tcb, glob),
step_(EMPTY_)
{
ContextCli *currContext =
getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals()->
getStatement()->getContext();
lobHandleLen_ = 2050;
lobHandle_[0] = '\0';
exLobGlobals_=NULL;
ExpLOBinterfaceInit(exLobGlobals_,currContext->exHeap(),currContext,TRUE,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort());
}
ExExeUtilLobUpdateTcb::~ExExeUtilLobUpdateTcb()
{
freeResources();
}
void ExExeUtilLobUpdateTcb::freeResources()
{
ContextCli *currContext =
getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals()->
getStatement()->getContext();
ExpLOBinterfaceCleanup(exLobGlobals_);
exLobGlobals_ = NULL;
}
short ExExeUtilLobUpdateTcb::work()
{
Lng32 cliRC = 0;
Lng32 retcode = 0;
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExExeUtilPrivateState & pstate =
*((ExExeUtilPrivateState*) pentry_down->pstate);
ContextCli *currContext =
getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals()->
getStatement()->getContext();
ComDiagsArea & diags = currContext->diags();
LobsSubOper so = Lob_None;
if (lobTdb().getFromType() == ComTdbExeUtilLobUpdate::FROM_STRING_)
so = Lob_Memory;
else if (lobTdb().getFromType() == ComTdbExeUtilLobUpdate::FROM_EXTERNAL_)
so = Lob_External_File;
else if (lobTdb().getFromType() == ComTdbExeUtilLobUpdate::FROM_BUFFER_) //Only this is supported
so= Lob_Buffer;
Int64 lobLen = lobTdb().updateSize();
char * data = (char *)(lobTdb().getBufAddr());
ExLobGlobals * lobGlobs = getLobGlobals();
while (1)
{
switch (step_)
{
case EMPTY_:
{
workAtp_->getTupp(lobTdb().workAtpIndex())
.setDataPointer(lobInputHandleBuf_);
step_ = GET_HANDLE_;
break;
}
break;
case GET_HANDLE_:
{
ex_expr::exp_return_type exprRetCode =
lobTdb().inputExpr_->eval(NULL,
workAtp_);
if (exprRetCode == ex_expr::EXPR_ERROR)
{
step_ = CANCEL_;
break;
}
if (ExpLOBoper::genLOBhandleFromHandleString
(lobTdb().getHandle(),
lobTdb().getHandleLen(),
lobHandle_,
lobHandleLen_))
{
ComDiagsArea * da = getDiagsArea();
ExRaiseSqlError(getMyHeap(),
&da,
(ExeErrorCode)(EXE_INVALID_LOB_HANDLE));
step_ = HANDLE_ERROR_;
break;
}
if (lobTdb().getFromType() == ComTdbExeUtilLobUpdate::FROM_BUFFER_)
{
if (lobTdb().isTruncate())
step_ = EMPTY_LOB_DATA_;
else if (lobTdb().isReplace())
step_ = UPDATE_LOB_DATA_;
else if(lobTdb().isAppend())
step_ = APPEND_LOB_DATA_;
}
else
{
// invalid "fromType"
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
ComDiagsArea * da = up_entry->getDiagsArea();
ExRaiseSqlError(getMyHeap(),
&da,
(ExeErrorCode)(EXE_INTERNAL_ERROR));
step_ = CANCEL_;
break;
}
}
break;
case UPDATE_LOB_DATA_:
{
Int32 retcode = 0;
Int16 flags;
Lng32 lobNum;
Int64 uid, inDescSyskey, descPartnKey;
short schNameLen;
char schName[1024];
Int64 dummy = 0;
ExpLOBoper::extractFromLOBhandle(&flags, &lobType_, &lobNum, &uid,
&inDescSyskey, &descPartnKey,
&schNameLen, (char *)schName,
(char *)lobHandle_, (Lng32)lobHandleLen_);
lobName_ = ExpLOBoper::ExpGetLOBname(uid, lobNum, lobNameBuf_, 1000);
lobDataLen_ = lobTdb().totalBufSize_;
short *lobNumList = new (getHeap()) short[1];
short *lobTypList = new (getHeap()) short[1];
char **lobLocList = new (getHeap()) char*[1];
char **lobColNameList = new (getHeap()) char*[1];
lobLocList[0] = new (getHeap()) char[1024];
lobColNameList[0] = new (getHeap()) char[256];
Lng32 numLobs = lobNum;
Lng32 cliRC = SQL_EXEC_LOBddlInterface
(
schName,
schNameLen,
uid,
numLobs,
LOB_CLI_SELECT_UNIQUE,
lobNumList,
lobTypList,
lobLocList,lobColNameList,lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),0,FALSE);
if (cliRC < 0)
{
getDiagsArea()->mergeAfter(diags);
step_ = HANDLE_ERROR_;
break;
}
strcpy(lobLoc_, lobLocList[0]);
NADELETEBASIC(lobLocList[0],getHeap());
NADELETEBASIC(lobColNameList[0],getHeap());
NADELETEBASIC(lobNumList,getHeap());
NADELETEBASIC(lobTypList,getHeap());
char outLobHandle[LOB_HANDLE_LEN];
Int32 outHandleLen;
Int64 requestTag = 0;
retcode = ExpLOBInterfaceUpdate(lobGlobs,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),
lobName_,
lobLoc_,
lobHandleLen_,
lobHandle_,
&outHandleLen, outLobHandle,
requestTag,
-1,
0,
1,
so,
inDescSyskey,
lobLen,
data,
lobName_, schNameLen, schName,
dummy, dummy,
lobTdb().getLobMaxSize(),
lobTdb().getLobMaxChunkSize(),
lobTdb().getLobGCLimit());
if (retcode < 0)
{
Lng32 cliError = 0;
Lng32 intParam1 = -retcode;
ComDiagsArea * diagsArea = getDiagsArea();
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(8442), NULL, &intParam1,
&cliError, NULL, (char*)"ExpLOBInterfaceUpdate",
getLobErrStr(intParam1));
step_ = HANDLE_ERROR_;
break;
}
if (so == Lob_Buffer)
{
str_sprintf(statusString_," Updated/Replaced %ld bytes of LOB data ", lobLen);
step_ = RETURN_STATUS_;
}
break;
}
break;
case APPEND_LOB_DATA_:
{
Int32 retcode = 0;
Int16 flags;
Lng32 lobNum;
Int64 uid, inDescSyskey, descPartnKey;
short schNameLen;
char schName[1024];
Int64 dummy = 0;
ExpLOBoper::extractFromLOBhandle(&flags, &lobType_, &lobNum, &uid,
&inDescSyskey, &descPartnKey,
&schNameLen, (char *)schName,
(char *)lobHandle_, (Lng32)lobHandleLen_);
lobName_ = ExpLOBoper::ExpGetLOBname(uid, lobNum, lobNameBuf_, 1000);
lobDataLen_ = lobTdb().totalBufSize_;
short *lobNumList = new (getHeap()) short[1];
short *lobTypList = new (getHeap()) short[1];
char **lobLocList = new (getHeap()) char*[1];
char **lobColNameList = new (getHeap()) char*[1];
lobLocList[0] = new (getHeap()) char[1024];
lobColNameList[0] = new (getHeap()) char[256];
Lng32 numLobs = lobNum;
Lng32 cliRC = SQL_EXEC_LOBddlInterface
(
schName,
schNameLen,
uid,
numLobs,
LOB_CLI_SELECT_UNIQUE,
lobNumList,
lobTypList,
lobLocList,lobColNameList,lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),0,FALSE);
if (cliRC < 0)
{
getDiagsArea()->mergeAfter(diags);
step_ = HANDLE_ERROR_;
break;
}
strcpy(lobLoc_, lobLocList[0]);
NADELETEBASIC(lobLocList[0],getHeap());
NADELETEBASIC(lobColNameList[0],getHeap());
NADELETEBASIC(lobNumList,getHeap());
NADELETEBASIC(lobTypList,getHeap());
char outLobHandle[LOB_HANDLE_LEN];
Int32 outHandleLen;
Int64 requestTag = 0;
retcode = ExpLOBInterfaceUpdateAppend(lobGlobs,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),
lobName_,
lobLoc_,
lobHandleLen_,
lobHandle_,
&outHandleLen, outLobHandle,
requestTag,
-1,
0,
1,
so,
inDescSyskey,
lobLen,
data,
lobName_, schNameLen, schName,
dummy, dummy,
lobTdb().getLobMaxSize(),
lobTdb().getLobMaxChunkSize(),
lobTdb().getLobGCLimit());
if (retcode < 0)
{
Lng32 cliError = 0;
Lng32 intParam1 = -retcode;
ComDiagsArea * diagsArea = getDiagsArea();
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(8442), NULL, &intParam1,
&cliError, NULL, (char*)"ExpLOBInterfaceUpdate",
getLobErrStr(intParam1));
step_ = HANDLE_ERROR_;
break;
}
if (so == Lob_Buffer)
{
str_sprintf(statusString_," Updated/Appended %ld bytes of LOB data ", lobLen);
step_ = RETURN_STATUS_;
}
break;
}
break;
case EMPTY_LOB_DATA_:
{
Int32 retcode = 0;
Int16 flags;
Lng32 lobNum;
Int64 uid, inDescSyskey, descPartnKey;
short schNameLen;
char schName[1024];
Int64 dummy = 0;
ExpLOBoper::extractFromLOBhandle(&flags, &lobType_, &lobNum, &uid,
&inDescSyskey, &descPartnKey,
&schNameLen, (char *)schName,
(char *)lobHandle_, (Lng32)lobHandleLen_);
lobName_ = ExpLOBoper::ExpGetLOBname(uid, lobNum, lobNameBuf_, 1000);
lobDataLen_ = lobTdb().totalBufSize_;
short *lobNumList = new (getHeap()) short[1];
short *lobTypList = new (getHeap()) short[1];
char **lobLocList = new (getHeap()) char*[1];
char **lobColNameList = new (getHeap()) char*[1];
lobLocList[0] = new (getHeap()) char[1024];
lobColNameList[0] = new (getHeap()) char[256];
Lng32 numLobs = lobNum;
Lng32 cliRC = SQL_EXEC_LOBddlInterface
(
schName,
schNameLen,
uid,
numLobs,
LOB_CLI_SELECT_UNIQUE,
lobNumList,
lobTypList,
lobLocList,lobColNameList,lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),0,FALSE);
if (cliRC < 0)
{
getDiagsArea()->mergeAfter(diags);
step_ = HANDLE_ERROR_;
break;
}
strcpy(lobLoc_, lobLocList[0]);
NADELETEBASIC(lobLocList[0],getHeap());
NADELETEBASIC(lobColNameList[0],getHeap());
NADELETEBASIC(lobNumList,getHeap());
NADELETEBASIC(lobTypList,getHeap());
char outLobHandle[LOB_HANDLE_LEN];
Int32 outHandleLen;
Int64 requestTag = 0;
retcode = ExpLOBInterfaceUpdate(lobGlobs,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),
lobName_,
lobLoc_,
lobHandleLen_,
lobHandle_,
&outHandleLen, outLobHandle,
requestTag,
-1,
0,
1,
so,
inDescSyskey,
0,
NULL,
lobName_, schNameLen, schName,
dummy, dummy,
lobTdb().getLobMaxSize(),
lobTdb().getLobMaxChunkSize(),
lobTdb().getLobGCLimit());
if (retcode < 0)
{
Lng32 cliError = 0;
Lng32 intParam1 = -retcode;
ComDiagsArea * diagsArea = getDiagsArea();
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(8442), NULL, &intParam1,
&cliError, NULL, (char*)"ExpLOBInterfaceUpdate",
getLobErrStr(intParam1));
step_ = HANDLE_ERROR_;
break;
}
if (so == Lob_Buffer)
{
str_sprintf(statusString_," Updated with empty_blob/clob ");
step_ = RETURN_STATUS_;
}
break;
}
break;
case CANCEL_:
{
break;
}
break;
case RETURN_STATUS_:
{
if (qparent_.up->isFull())
return WORK_OK;
//Return to upqueue whatever is in the lobStatusMsg_ data member
short rc;
moveRowToUpQueue(statusString_, 200, &rc);
step_ = DONE_ ;
}
break;
case HANDLE_ERROR_:
{
retcode = handleError();
if (retcode == 1)
return WORK_OK;
step_ = DONE_;
}
break;
case DONE_:
{
retcode = handleDone();
if (retcode == 1)
return WORK_OK;
step_ = EMPTY_;
return WORK_OK;
}
break;
}
}
return 0;
}
NABoolean ExExeUtilFileExtractTcb::needStatsEntry()
{
// stats are collected for ALL and OPERATOR options.
if ((getGlobals()->getStatsArea()->getCollectStatsType() ==
ComTdb::ALL_STATS) ||
(getGlobals()->getStatsArea()->getCollectStatsType() ==
ComTdb::OPERATOR_STATS))
return TRUE;
else
return FALSE;
}
ExOperStats * ExExeUtilFileExtractTcb::doAllocateStatsEntry(
CollHeap *heap,
ComTdb *tdb)
{
ExEspStmtGlobals *espGlobals = getGlobals()->castToExExeStmtGlobals()->castToExEspStmtGlobals();
StmtStats *ss;
if (espGlobals != NULL)
ss = espGlobals->getStmtStats();
else
ss = getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals()->getStatement()->getStmtStats();
ExHdfsScanStats *hdfsScanStats = new(heap) ExHdfsScanStats(heap,
this,
tdb);
if (ss != NULL)
hdfsScanStats->setQueryId(ss->getQueryId(), ss->getQueryIdLen());
return hdfsScanStats;
}
short ExExeUtilFileExtractTcb::work()
{
Lng32 cliRC = 0;
Lng32 retcode = 0;
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExExeUtilPrivateState & pstate =
*((ExExeUtilPrivateState*) pentry_down->pstate);
ContextCli *currContext =
getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals()->
getStatement()->getContext();
ComDiagsArea & diags = currContext->diags();
ExLobGlobals * lobGlobs = getLobGlobals();
while (1)
{
switch (step_)
{
case EMPTY_:
{
lobName_ = lobNameBuf_;
strcpy(lobName_, lobTdb().getFileName());
strcpy(lobLoc_, lobTdb().getStringParam2());
lobType_ = lobTdb().lobStorageType_; //(Lng32)Lob_External_HDFS_File;
lobDataSpecifiedExtractLen_ = lobTdb().totalBufSize_;
// allocate 2 buffers for double buffering.
lobData_ = new(getHeap()) char[(UInt32)lobDataSpecifiedExtractLen_];
lobData2_ = new(getHeap()) char[(UInt32)lobDataSpecifiedExtractLen_];
eodReturned_ = FALSE;
step_ = OPEN_CURSOR_;
}
break;
case OPEN_CURSOR_:
{
eodReturned_ = FALSE;
retcode = ExpLOBInterfaceSelectCursor
(lobGlobs,
lobName_,
lobLoc_,
lobType_,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),
0, NULL, // handleLen, handle
0, NULL, //cursor bytes, cursor id
requestTag_,
Lob_File,
0, // not check status
1, // waited op
0, lobDataSpecifiedExtractLen_,
lobDataLen_, lobData_,
1, // open
2); // must open
if (retcode < 0)
{
Lng32 cliError = 0;
Lng32 intParam1 = -retcode;
ComDiagsArea * diagsArea = getDiagsArea();
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(8442), NULL, &intParam1,
&cliError, NULL, (char*)"ExpLOBInterfaceSelectCursor/open",
getLobErrStr(intParam1));
step_ = HANDLE_ERROR_;
break;
}
step_ = READ_CURSOR_;
}
break;
case READ_CURSOR_:
{
if (eodReturned_)
{
// eod was previously returned. close the cursor.
step_ = CLOSE_CURSOR_;
break;
}
retcode = ExpLOBInterfaceSelectCursor
(lobGlobs,
lobName_,
lobLoc_,
lobType_,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),
0, NULL,
0, NULL ,//cursor bytes, cursor id
requestTag_,
Lob_File,
0, // not check status
1, // waited op
0, lobDataSpecifiedExtractLen_,
lobDataLen_, lobData_,
2, // read
0); // open type not applicable
if (retcode < 0)
{
Lng32 cliError = 0;
Lng32 intParam1 = -retcode;
ComDiagsArea * diagsArea = getDiagsArea();
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(8442), NULL, &intParam1,
&cliError, NULL, (char*)"ExpLOBInterfaceSelectCursor/read",
getLobErrStr(intParam1));
step_ = HANDLE_ERROR_;
break;
}
if (lobDataLen_ == 0)
{
// EOD with no data: close cursor
eodReturned_ = TRUE;
step_ = CLOSE_CURSOR_;
break;
}
if (lobDataLen_ < lobDataSpecifiedExtractLen_)
{
// EOD with data: return data and then close cursor
eodReturned_ = TRUE;
}
remainingBytes_ = (Lng32)lobDataLen_;
currPos_ = 0;
step_ = RETURN_STRING_;
}
break;
case CLOSE_CURSOR_:
{
retcode = ExpLOBInterfaceSelectCursor
(lobGlobs,
lobName_,
lobLoc_,
lobType_,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),
0, NULL,
0, NULL, //cursor bytes, cursor id
requestTag_,
Lob_File,
0, // not check status
1, // waited op
0, lobDataSpecifiedExtractLen_,
lobDataLen_, lobData_,
3, // close
0); // open type not applicable
if (retcode < 0)
{
Lng32 cliError = 0;
Lng32 intParam1 = -retcode;
ComDiagsArea * diagsArea = getDiagsArea();
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(8442), NULL, &intParam1,
&cliError, NULL, (char*)"ExpLOBInterfaceSelectCursor/close",
getLobErrStr(intParam1));
step_ = HANDLE_ERROR_;
break;
}
step_ = COLLECT_STATS_;
}
break;
case HANDLE_ERROR_:
{
retcode = handleError();
if (retcode == 1)
return WORK_OK;
step_ = DONE_;
}
break;
case COLLECT_STATS_:
{
if (! getStatsEntry())
{
step_ = DONE_;
break;
}
ExHdfsScanStats * stats =
getStatsEntry()->castToExHdfsScanStats();
retcode = ExpLOBinterfaceStats
(lobGlobs,
stats->lobStats(),
lobName_,
lobLoc_,
lobType_,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort());
step_ = DONE_;
}
break;
case DONE_:
{
retcode = handleDone();
if (retcode == 1)
return WORK_OK;
step_ = EMPTY_;
return WORK_OK;
}
break;
} // switch
}
return 0;
}
ExExeUtilFileLoadTcb::ExExeUtilFileLoadTcb
(
const ComTdbExeUtilLobExtract & exe_util_tdb,
const ex_tcb * child_tcb,
ex_globals * glob)
: ExExeUtilLobExtractTcb(exe_util_tdb, child_tcb, glob)
{
}
short ExExeUtilFileLoadTcb::work()
{
Lng32 cliRC = 0;
Lng32 retcode = 0;
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ExExeUtilPrivateState & pstate =
*((ExExeUtilPrivateState*) pentry_down->pstate);
ContextCli *currContext =
getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals()->
getStatement()->getContext();
ComDiagsArea & diags = currContext->diags();
ExLobGlobals * lobGlobs = getLobGlobals();
while (1)
{
switch (step_)
{
case EMPTY_:
{
lobName_ = lobNameBuf_;
strcpy(lobName_, lobTdb().getStringParam2());
strcpy(lobLoc_, lobTdb().getStringParam3());
lobType_ = lobTdb().lobStorageType_; //(Lng32)Lob_HDFS_File;
lobDataSpecifiedExtractLen_ = lobTdb().totalBufSize_;
lobData_ = new(getHeap()) char[(UInt32)lobDataSpecifiedExtractLen_];
srcFileRemainingBytes_ = 0;
step_ = CREATE_TARGET_FILE_;
}
break;
case CREATE_TARGET_FILE_:
{
if (lobTdb().withCreate())
{
retcode = ExpLOBinterfaceCreate
(lobGlobs,
lobName_,
lobLoc_,
lobType_,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort());
if (retcode < 0)
{
Lng32 cliError = 0;
Lng32 intParam1 = -retcode;
ComDiagsArea * diagsArea = getDiagsArea();
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(8442), NULL, &intParam1,
&cliError, NULL, (char*)"ExpLOBInterfaceCreate",
getLobErrStr(intParam1));
step_ = HANDLE_ERROR_;
break;
}
}
if (lobTdb().getToType() == ComTdbExeUtilLobExtract::TO_EXTERNAL_FROM_STRING_)
{
strcpy(lobData_, lobTdb().getStringParam1());
lobDataLen_ = strlen(lobData_);
step_ = INSERT_FROM_STRING_;
}
else
step_ = INSERT_FROM_SOURCE_FILE_;
}
break;
case INSERT_FROM_SOURCE_FILE_:
{
char fname[400];
str_sprintf(fname, "%s", lobTdb().getStringParam1());
indata_.open(fname, fstream::in | fstream::binary);
if (! indata_)
{
Lng32 cliError = 0;
Lng32 intParam1 = -1;
ComDiagsArea * diagsArea = getDiagsArea();
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(8442), NULL, &intParam1,
&cliError, NULL, (char*)"SourceFile open");
step_ = HANDLE_ERROR_;
break;
}
indata_.seekg (0, indata_.end);
srcFileRemainingBytes_ = indata_.tellg();
indata_.seekg (0, indata_.beg);
step_ = READ_STRING_FROM_SOURCE_FILE_;
}
break;
case READ_STRING_FROM_SOURCE_FILE_:
{
if (! indata_.good())
{
indata_.close();
step_ = CLOSE_TARGET_FILE_;
break;
}
if (srcFileRemainingBytes_ == 0)
{
indata_.close();
step_ = CLOSE_TARGET_FILE_;
break;
}
Int64 length = MINOF(srcFileRemainingBytes_, lobDataSpecifiedExtractLen_);
indata_.read (lobData_, (std::streamsize)length);
if (indata_.fail())
{
indata_.close();
Lng32 cliError = 0;
Lng32 intParam1 = -1;
ComDiagsArea * diagsArea = getDiagsArea();
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(8442), NULL, &intParam1,
&cliError, NULL, (char*)"SourceFile read");
step_ = HANDLE_ERROR_;
break;
}
lobDataLen_ = length;
srcFileRemainingBytes_ -= length;
step_ = INSERT_FROM_STRING_;
}
break;
case INSERT_FROM_STRING_:
{
Int64 requestTag;
Int64 dummy;
retcode = ExpLOBInterfaceInsert
(lobGlobs,
lobName_,
lobLoc_,
lobType_,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort(),
0, NULL, NULL, NULL, 0, NULL,
requestTag,
0, // no xn id
dummy,Lob_InsertDataSimple,
NULL, Lob_Memory,
1, // waited
lobData_, lobDataLen_
);
if (retcode < 0)
{
Lng32 cliError = 0;
Lng32 intParam1 = -retcode;
ComDiagsArea * diagsArea = getDiagsArea();
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(8442), NULL, &intParam1,
&cliError, NULL, (char*)"ExpLOBInterfaceInsert",
getLobErrStr(intParam1));
step_ = HANDLE_ERROR_;
break;
}
if (lobTdb().getToType() == ComTdbExeUtilLobExtract::TO_EXTERNAL_FROM_FILE_)
step_ = READ_STRING_FROM_SOURCE_FILE_;
else
step_ = CLOSE_TARGET_FILE_;
}
break;
case CLOSE_TARGET_FILE_:
{
retcode = ExpLOBinterfaceCloseFile
(lobGlobs,
lobName_,
lobLoc_,
lobType_,
lobTdb().getLobHdfsServer(),
lobTdb().getLobHdfsPort());
if (retcode < 0)
{
Lng32 cliError = 0;
Lng32 intParam1 = -retcode;
ComDiagsArea * diagsArea = getDiagsArea();
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(8442), NULL, &intParam1,
&cliError, NULL, (char*)"ExpLOBInterfaceCloseFile",
getLobErrStr(intParam1));
step_ = HANDLE_ERROR_;
break;
}
step_ = DONE_;
}
break;
case HANDLE_ERROR_:
{
retcode = handleError();
if (retcode == 1)
return WORK_OK;
step_ = DONE_;
}
break;
case DONE_:
{
retcode = handleDone();
if (retcode == 1)
return WORK_OK;
step_ = EMPTY_;
return WORK_OK;
}
break;
} // switch
}
return 0;
}