blob: 13146a6eb0aad4a636512bca003a90b32f9ecc91 [file] [log] [blame]
// **********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
// **********************************************************************
#include "Platform.h"
#include "ex_stdh.h"
#include "ComTdb.h"
#include "ex_tcb.h"
#include "ExHbaseAccess.h"
#include "ex_exe_stmt_globals.h"
#include "ExpLOBinterface.h"
#include "SQLTypeDefs.h"
#include "ExpHbaseInterface.h"
#include "QRLogger.h"
#include "cli_stdh.h"
#include "exp_function.h"
#include "jni.h"
#include <random>
#include "HdfsClient_JNI.h"
// forward declare
Int64 generateUniqueValueFast ();
Int64 getTransactionIDFromContext()
{
ExTransaction *ta = GetCliGlobals()->currContext()->getTransaction();
Int64 currentXnId = (Int64) 0;
Int64 transid = (Int64) 0;
TmfPhandle_Struct currentTxHandle;
short retcode = ta->getCurrentXnId((Int64 *)&currentXnId, (Int64 *)&transid, (short *)&currentTxHandle);
return transid;
}
ex_tcb * ExHbaseAccessTdb::build(ex_globals * glob)
{
ExExeStmtGlobals * exe_glob = glob->castToExExeStmtGlobals();
ex_assert(exe_glob,"This operator cannot be in DP2");
ExHbaseAccessTcb *tcb = NULL;
if ((1) && //(sqHbaseTable()) &&
((accessType_ == UPDATE_) ||
(accessType_ == MERGE_) ||
(accessType_ == DELETE_)))
{
if (rowsetOper())
tcb = new(exe_glob->getSpace())
ExHbaseAccessSQRowsetTcb(
*this,
exe_glob);
else
tcb = new(exe_glob->getSpace())
ExHbaseAccessUMDTcb(
*this,
exe_glob);
}
else if ((sqHbaseTable()) &&
(accessType_ == SELECT_) &&
(rowsetOper()))
{
tcb = new(exe_glob->getSpace())
ExHbaseAccessSQRowsetTcb(
*this,
exe_glob);
}
else if (accessType_ == DELETE_)
{
/*
tcb = new(exe_glob->getSpace())
ExHbaseAccessUMDHbaseTcb(
*this,
exe_glob);
*/
}
else if (accessType_ == SELECT_)
{
if (keyMDAMGen())
{
// must be SQ Seabase table and no listOfScan/Get keys
if ((sqHbaseTable()) &&
(! listOfGetRows()) &&
(! listOfScanRows()))
{
tcb = new(exe_glob->getSpace())
ExHbaseAccessMdamSelectTcb(
*this,
exe_glob);
}
}
else
tcb = new(exe_glob->getSpace())
ExHbaseAccessSelectTcb(
*this,
exe_glob);
}
else if (accessType_ == COPROC_)
{
tcb = new(exe_glob->getSpace())
ExHbaseAccessSelectTcb(
*this,
exe_glob);
}
else if ((accessType_ == INSERT_) ||
(accessType_ == UPSERT_) ||
(accessType_ == UPSERT_LOAD_))
{
if (sqHbaseTable())
{
if ((vsbbInsert()) &&
(NOT hbaseSqlIUD()))
if (this->getIsTrafodionLoadPrep())
tcb = new(exe_glob->getSpace())
ExHbaseAccessBulkLoadPrepSQTcb(
*this,
exe_glob);
else
tcb = new(exe_glob->getSpace())
ExHbaseAccessUpsertVsbbSQTcb(
*this,
exe_glob);
else
tcb = new(exe_glob->getSpace())
ExHbaseAccessInsertSQTcb(
*this,
exe_glob);
}
else
{
if (rowwiseFormat())
tcb = new(exe_glob->getSpace())
ExHbaseAccessInsertRowwiseTcb(
*this,
exe_glob);
else
tcb = new(exe_glob->getSpace())
ExHbaseAccessInsertTcb(
*this,
exe_glob);
}
}
else if ((accessType_ == CREATE_) ||
(accessType_ == DROP_))
{
tcb = new(exe_glob->getSpace())
ExHbaseAccessDDLTcb(
*this,
exe_glob);
}
else if (accessType_ == BULK_LOAD_TASK_)
{
tcb = new(exe_glob->getSpace())
ExHbaseAccessBulkLoadTaskTcb(
*this,
exe_glob);
}
else if ((accessType_ == INIT_MD_) ||
(accessType_ == DROP_MD_))
{
tcb = new(exe_glob->getSpace())
ExHbaseAccessInitMDTcb(
*this,
exe_glob);
}
else if (accessType_ == GET_TABLES_)
{
tcb = new(exe_glob->getSpace())
ExHbaseAccessGetTablesTcb(
*this,
exe_glob);
}
ex_assert(tcb, "Error building ExHbaseAccessTcb.");
return (tcb);
}
ex_tcb * ExHbaseCoProcAggrTdb::build(ex_globals * glob)
{
ExExeStmtGlobals * exe_glob = glob->castToExExeStmtGlobals();
ex_assert(exe_glob,"This operator cannot be in DP2");
ExHbaseCoProcAggrTcb *tcb = NULL;
tcb = new(exe_glob->getSpace())
ExHbaseCoProcAggrTcb(
*this,
exe_glob);
ex_assert(tcb, "Error building ExHbaseAggrTcb.");
return (tcb);
}
////////////////////////////////////////////////////////////////
// Constructor and initialization.
////////////////////////////////////////////////////////////////
ExHbaseAccessTcb::ExHbaseAccessTcb(
const ComTdbHbaseAccess &hbaseAccessTdb,
ex_globals * glob ) :
ex_tcb( hbaseAccessTdb, 1, glob)
, workAtp_(NULL)
, pool_(NULL)
, matches_(0)
, rowIds_(glob ? glob->getDefaultHeap() : NULL)
, columns_(glob ? glob->getDefaultHeap() : NULL)
, deletedColumns_(glob ? glob->getDefaultHeap() : NULL)
, hnl_(glob ? glob->getDefaultHeap() : NULL)
, hbaseFilterColumns_(glob ? glob->getDefaultHeap() : NULL)
, hbaseFilterOps_(glob ? glob->getDefaultHeap() : NULL)
, hbaseFilterValues_(glob ? glob->getDefaultHeap() : NULL)
, colValVec_(NULL)
, colValVecSize_(0)
, colValEntry_(0)
, loggingErrorDiags_(NULL)
, logFileHdfsClient_(NULL)
, loggingFileCreated_(FALSE)
, loggingFileName_(NULL)
{
Space * space = (glob ? glob->getSpace() : NULL);
CollHeap * heap = (glob ? glob->getDefaultHeap() : NULL);
pool_ = new(space)
sql_buffer_pool(hbaseAccessTdb.numBuffers_,
hbaseAccessTdb.bufferSize_,
space,
SqlBufferBase::NORMAL_);
pool_->setStaticMode(TRUE);
// Allocate the queue to communicate with parent
allocateParentQueues(qparent_);
if (hbaseAccessTdb.workCriDesc_)
{
workAtp_ = allocateAtp(hbaseAccessTdb.workCriDesc_, space);
if (hbaseAccessTdb.asciiTuppIndex_ > 0)
pool_->get_free_tuple(workAtp_->getTupp(hbaseAccessTdb.asciiTuppIndex_), 0);
if (hbaseAccessTdb.convertTuppIndex_ > 0)
pool_->get_free_tuple(workAtp_->getTupp(hbaseAccessTdb.convertTuppIndex_), 0);
if (hbaseAccessTdb.updateTuppIndex_ > 0)
pool_->get_free_tuple(workAtp_->getTupp(hbaseAccessTdb.updateTuppIndex_), 0);
if (hbaseAccessTdb.mergeInsertTuppIndex_ > 0)
pool_->get_free_tuple(workAtp_->getTupp(hbaseAccessTdb.mergeInsertTuppIndex_), 0);
if (hbaseAccessTdb.mergeInsertRowIdTuppIndex_ > 0)
pool_->get_free_tuple(workAtp_->getTupp(hbaseAccessTdb.mergeInsertRowIdTuppIndex_), 0);
if (hbaseAccessTdb.mergeIUDIndicatorTuppIndex_ > 0)
pool_->get_free_tuple(workAtp_->getTupp(hbaseAccessTdb.mergeIUDIndicatorTuppIndex_), 0);
if (hbaseAccessTdb.rowIdTuppIndex_ > 0)
pool_->get_free_tuple(workAtp_->getTupp(hbaseAccessTdb.rowIdTuppIndex_), 0);
if (hbaseAccessTdb.rowIdAsciiTuppIndex_ > 0)
pool_->get_free_tuple(workAtp_->getTupp(hbaseAccessTdb.rowIdAsciiTuppIndex_), 0);
if (hbaseAccessTdb.keyTuppIndex_ > 0)
pool_->get_free_tuple(workAtp_->getTupp(hbaseAccessTdb.keyTuppIndex_), 0);
if (hbaseAccessTdb.keyColValTuppIndex_ > 0)
pool_->get_free_tuple(workAtp_->getTupp(hbaseAccessTdb.keyColValTuppIndex_), 0);
if (hbaseAccessTdb.hbaseFilterValTuppIndex_ > 0)
pool_->get_free_tuple(workAtp_->getTupp(hbaseAccessTdb.hbaseFilterValTuppIndex_), 0);
if (hbaseAccessTdb.hbaseTimestampTuppIndex_ > 0)
pool_->get_free_tuple(workAtp_->getTupp(hbaseAccessTdb.hbaseTimestampTuppIndex_), 0);
if (hbaseAccessTdb.hbaseVersionTuppIndex_ > 0)
pool_->get_free_tuple(workAtp_->getTupp(hbaseAccessTdb.hbaseVersionTuppIndex_), 0);
}
keySubsetExeExpr_ = NULL;
keyMdamExeExpr_ = NULL;
if (hbaseAccessTdb.keySubsetGen())
{
// this constructor will also do the fixups of underlying expressions
keySubsetExeExpr_ = new(glob->getSpace())
keySingleSubsetEx(*hbaseAccessTdb.keySubsetGen(),
1, // version 1
pool_,
getGlobals(),
getExpressionMode(), this);
}
else if (hbaseAccessTdb.keyMDAMGen())
{
// this constructor will also do the fixups of underlying expressions
keyMdamExeExpr_ = new(glob->getSpace())
keyMdamEx(*hbaseAccessTdb.keyMDAMGen(),
1, // version 1
pool_,
getGlobals(),
this);
}
// fixup expressions
if (convertExpr())
convertExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob);
if (updateExpr())
updateExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob);
if (mergeInsertExpr())
mergeInsertExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob);
if (mergeInsertRowIdExpr())
mergeInsertRowIdExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob);
if (mergeUpdScanExpr())
mergeUpdScanExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob);
if (returnFetchExpr())
returnFetchExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob);
if (returnUpdateExpr())
returnUpdateExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob);
if (returnMergeInsertExpr())
returnMergeInsertExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob);
if (scanExpr())
scanExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob);
if (rowIdExpr())
rowIdExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob);
if (encodedKeyExpr())
encodedKeyExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob);
if (keyColValExpr())
keyColValExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob);
if (insDelPreCondExpr())
insDelPreCondExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob);
if (insConstraintExpr())
insConstraintExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob);
if (updConstraintExpr())
updConstraintExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob);
if (hbaseFilterValExpr())
hbaseFilterValExpr()->fixup(0, getExpressionMode(), this, space, heap, FALSE, glob);
// Register subtasks with the scheduler
registerSubtasks();
registerResizeSubtasks();
ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(),
(char*)hbaseAccessTdb.server_,
(char*)hbaseAccessTdb.zkPort_);
asciiRow_ = NULL;
asciiRowMissingCols_ = NULL;
latestTimestampForCols_ = NULL;
latestVersionNumForCols_ = NULL;
convertRow_ = NULL;
updateRow_ = NULL;
mergeInsertRow_ = NULL;
rowIdRow_ = NULL;
rowwiseRow_ = NULL;
rowwiseRowLen_ = 0;
beginRowIdRow_ = NULL;
endRowIdRow_ = NULL;
beginKeyRow_ = NULL;
endKeyRow_ = NULL;
encodedKeyRow_ = NULL;
keyColValRow_ = NULL;
hbaseFilterValRow_ = NULL;
if (hbaseAccessTdb.asciiRowLen_ > 0)
{
asciiRow_ = new(glob->getDefaultHeap()) char[hbaseAccessTdb.asciiRowLen_];
asciiRowMissingCols_ =
new(glob->getDefaultHeap())
char[hbaseAccessTdb.workCriDesc_->getTupleDescriptor(hbaseAccessTdb.asciiTuppIndex_)->numAttrs()];
latestTimestampForCols_ = new(glob->getDefaultHeap())
long[hbaseAccessTdb.workCriDesc_->getTupleDescriptor(hbaseAccessTdb.asciiTuppIndex_)->numAttrs()] ;
latestVersionNumForCols_ = new(glob->getDefaultHeap())
long[hbaseAccessTdb.workCriDesc_->getTupleDescriptor(hbaseAccessTdb.asciiTuppIndex_)->numAttrs()] ;
}
convertRowLen_ = hbaseAccessTdb.convertRowLen();
if (hbaseAccessTdb.convertRowLen() > 0)
convertRow_ = new(glob->getDefaultHeap()) char[hbaseAccessTdb.convertRowLen()];
if (hbaseAccessTdb.updateRowLen_ > 0)
updateRow_ = new(glob->getDefaultHeap()) char[hbaseAccessTdb.updateRowLen_];
if (hbaseAccessTdb.mergeInsertRowLen_ > 0)
mergeInsertRow_ = new(glob->getDefaultHeap()) char[hbaseAccessTdb.mergeInsertRowLen_];
if (hbaseAccessTdb.rowIdLen_ > 0)
{
// +2 for adding a char to exclude the begin/end row, and for null terminator
rowIdRow_ = new(glob->getDefaultHeap()) char[hbaseAccessTdb.rowIdLen_ + 2];
beginRowIdRow_ = new(glob->getDefaultHeap()) char[hbaseAccessTdb.rowIdLen_ + 2];
endRowIdRow_ = new(glob->getDefaultHeap()) char[hbaseAccessTdb.rowIdLen_ + 2];
}
if (hbaseAccessTdb.convertRowLen() > 0)
rowwiseRow_ = new(glob->getDefaultHeap()) char[hbaseAccessTdb.convertRowLen()];
if (hbaseAccessTdb.rowIdAsciiRowLen_ > 0)
rowIdAsciiRow_ = new(glob->getDefaultHeap()) char[hbaseAccessTdb.rowIdAsciiRowLen_];
if (hbaseAccessTdb.keyLen_ > 0)
{
beginKeyRow_ = new(glob->getDefaultHeap()) char[hbaseAccessTdb.keyLen_];
endKeyRow_ = new(glob->getDefaultHeap()) char[hbaseAccessTdb.keyLen_];
encodedKeyRow_ = new(glob->getDefaultHeap()) char[hbaseAccessTdb.keyLen_];
}
if (hbaseAccessTdb.keyColValLen_ > 0)
{
keyColValRow_ = new(glob->getDefaultHeap()) char[hbaseAccessTdb.keyColValLen_];
}
if (hbaseAccessTdb.hbaseFilterValRowLen_ > 0)
{
hbaseFilterValRow_ = new(glob->getDefaultHeap()) char[hbaseAccessTdb.hbaseFilterValRowLen_];
}
prevRowIdMaxLen_ = 0;
prevRowId_.val = NULL;
prevRowId_.len = 0;
isEOD_ = FALSE;
rowIDAllocatedLen_ = 0;
rowIDAllocatedVal_ = NULL;
rowID_.val = NULL;
rowID_.len = 0;
directBufferRowNum_ = 0;
directRowIDBuffer_ = NULL;
directRowIDBufferLen_ = 0;
dbRowID_.val = NULL;
dbRowID_.len = 0;
directRowBuffer_ = NULL;
directRowBufferLen_ = 0;
directBufferMaxRows_ = 0;
row_.val = NULL;
row_.len = 0;
rows_.val = NULL;
rows_.len = 0;
colVal_.val = 0;
colVal_.len = 0;
asyncCompleteRetryCount_ = 0;
asyncOperation_ = FALSE;
asyncOperationTimeout_ = 2;
resultArray_ = NULL;
}
ExHbaseAccessTcb::~ExHbaseAccessTcb()
{
freeResources();
}
void ExHbaseAccessTcb::freeResources()
{
if (workAtp_)
{
workAtp_->release();
deallocateAtp(workAtp_, getSpace());
workAtp_ = NULL;
}
if (pool_)
{
delete pool_;
pool_ = NULL;
}
if (qparent_.up)
{
delete qparent_.up;
qparent_.up = NULL;
}
if (qparent_.down)
{
delete qparent_.down;
qparent_.down = NULL;
}
delete ehi_;
if (rowIDAllocatedVal_)
NADELETEBASIC(rowIDAllocatedVal_, getHeap());
if (directRowIDBuffer_)
NADELETEBASIC(directRowIDBuffer_, getHeap());
if (directRowBuffer_)
NADELETEBASIC(directRowBuffer_, getHeap());
if (colVal_.val != NULL)
NADELETEBASIC(colVal_.val, getHeap());
if (logFileHdfsClient_ != NULL)
NADELETE(logFileHdfsClient_, HdfsClient, getHeap());
if (loggingFileName_ != NULL)
NADELETEBASIC(loggingFileName_, getHeap());
}
NABoolean ExHbaseAccessTcb::needStatsEntry()
{
return TRUE;
// stats are collected for ALL and OPERATOR options.
if ((getGlobals()->getStatsArea()->getCollectStatsType() ==
ComTdb::ALL_STATS) ||
(getGlobals()->getStatsArea()->getCollectStatsType() ==
ComTdb::OPERATOR_STATS))
return TRUE;
else if ( getGlobals()->getStatsArea()->getCollectStatsType() == ComTdb::PERTABLE_STATS)
return TRUE;
else
return FALSE;
}
ExOperStats * ExHbaseAccessTcb::doAllocateStatsEntry(
CollHeap *heap,
ComTdb *tdb)
{
ExEspStmtGlobals *espGlobals = getGlobals()->castToExExeStmtGlobals()->castToExEspStmtGlobals();
StmtStats *ss;
if (espGlobals != NULL)
ss = espGlobals->getStmtStats();
else
ss = getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals()->getStatement()->getStmtStats();
ExHbaseAccessStats *hbaseAccessStats = new(heap) ExHbaseAccessStats(heap,
this,
tdb);
if (ss != NULL)
hbaseAccessStats->setQueryId(ss->getQueryId(), ss->getQueryIdLen());
return hbaseAccessStats;
}
void ExHbaseAccessTcb::registerSubtasks()
{
ExScheduler *sched = getGlobals()->getScheduler();
sched->registerInsertSubtask(sWork, this, qparent_.down,"PD");
sched->registerUnblockSubtask(sWork, this, qparent_.up, "PU");
sched->registerCancelSubtask(sWork, this, qparent_.down,"CN");
}
ex_tcb_private_state *ExHbaseAccessTcb::allocatePstates(
Lng32 &numElems, // inout, desired/actual elements
Lng32 &pstateLength) // out, length of one element
{
PstateAllocator<ex_tcb_private_state> pa;
return pa.allocatePstates(this, numElems, pstateLength);
}
ExWorkProcRetcode ExHbaseAccessTcb::work()
{
ex_assert(0, "Should not reach ExHbaseAccessTcb::work()");
return WORK_OK;
}
short ExHbaseAccessTcb::allocateUpEntryTupps
(Lng32 tupp1index, Lng32 tupp1length, Lng32 tupp2index, Lng32 tupp2length,
NABoolean isVarchar,
short * rc)
{
tupp p1;
tupp p2;
if (tupp1index > 0)
{
if (pool_->get_free_tuple(p1, (Lng32)
((isVarchar ? SQL_VARCHAR_HDR_SIZE : 0)
+ tupp1length)))
{
if (rc)
*rc = WORK_POOL_BLOCKED;
return -1;
}
}
if (tupp2index > 0)
{
if (pool_->get_free_tuple(p2, (Lng32)
((isVarchar ? SQL_VARCHAR_HDR_SIZE : 0)
+ tupp2length)))
{
if (rc)
*rc = WORK_POOL_BLOCKED;
return -1;
}
}
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
up_entry->copyAtp(pentry_down);
if (tupp1index > 0)
{
up_entry->getAtp()->getTupp(tupp1index) = p1;
}
if (tupp2index > 0)
{
up_entry->getAtp()->getTupp(tupp2index) = p2;
}
return 0;
}
short ExHbaseAccessTcb::moveRowToUpQueue(short * rc)
{
if (qparent_.up->isFull())
{
if (rc)
*rc = WORK_OK;
return -1;
}
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(++matches_);
up_entry->upState.status = ex_queue::Q_OK_MMORE;
if (getHbaseAccessStats())
getHbaseAccessStats()->incActualRowsReturned();
// insert into parent
qparent_.up->insert();
return 0;
}
short ExHbaseAccessTcb::moveRowToUpQueue(const char * row, Lng32 len,
short * rc, NABoolean isVarchar)
{
if (qparent_.up->isFull())
{
if (rc)
*rc = WORK_OK;
return -1;
}
Lng32 length;
if (len <= 0)
length = strlen(row);
else
length = len;
tupp p;
if (pool_->get_free_tuple(p, (Lng32)
((isVarchar ? SQL_VARCHAR_HDR_SIZE : 0)
+ length)))
{
if (rc)
*rc = WORK_POOL_BLOCKED;
return -1;
}
char * dp = p.getDataPointer();
if (isVarchar)
{
*(short*)dp = (short)length;
str_cpy_all(&dp[SQL_VARCHAR_HDR_SIZE], row, length);
}
else
{
str_cpy_all(dp, row, length);
}
ex_queue_entry * pentry_down = qparent_.down->getHeadEntry();
ex_queue_entry * up_entry = qparent_.up->getTailEntry();
up_entry->copyAtp(pentry_down);
up_entry->getAtp()->getTupp((Lng32)hbaseAccessTdb().returnedTuppIndex_) = p;
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.setMatchNo(++matches_);
up_entry->upState.status = ex_queue::Q_OK_MMORE;
if (getHbaseAccessStats())
getHbaseAccessStats()->incActualRowsReturned();
// insert into parent
qparent_.up->insert();
return 0;
}
short ExHbaseAccessTcb::setupError(NAHeap *heap, ex_queue_pair &qparent, Lng32 retcode, const char * str, const char * str2)
{
ContextCli *currContext = GetCliGlobals()->currContext();
// Make sure retcode is positive.
if (retcode < 0)
retcode = -retcode;
if ((ABS(retcode) >= HBASE_MIN_ERROR_NUM) &&
(ABS(retcode) <= HBASE_MAX_ERROR_NUM))
{
ex_queue_entry *pentry_down = qparent.down->getHeadEntry();
Lng32 cliError = 0;
Lng32 intParam1 = -retcode;
ComDiagsArea * diagsArea = NULL;
ExRaiseSqlError(heap, &diagsArea,
(ExeErrorCode)(8448), NULL, &intParam1,
&cliError, NULL,
(str ? (char*)str : (char*)" "),
getHbaseErrStr(retcode),
(str2 ? (char*)str2 :
(char *)GetCliGlobals()->getJniErrorStr()));
pentry_down->setDiagsArea(diagsArea);
return -1;
}
return 0;
}
short ExHbaseAccessTcb::setupError(Lng32 retcode, const char * str, const char * str2)
{
ContextCli *currContext = GetCliGlobals()->currContext();
// Make sure retcode is positive.
if (retcode < 0)
retcode = -retcode;
if ((ABS(retcode) >= HBASE_MIN_ERROR_NUM) &&
(ABS(retcode) <= HBASE_MAX_ERROR_NUM))
{
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
Lng32 cliError = 0;
Lng32 intParam1 = -retcode;
ComDiagsArea * diagsArea = NULL;
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(8448), NULL, &intParam1,
&cliError, NULL,
(str ? (char*)str : (char*)" "),
getHbaseErrStr(retcode),
(str2 ? (char*)str2 :
(char *)GetCliGlobals()->getJniErrorStr()));
pentry_down->setDiagsArea(diagsArea);
return -1;
}
return 0;
}
short ExHbaseAccessTcb::raiseError(Lng32 errcode,
Lng32 * intParam1,
const char * str1,
const char * str2)
{
// Make sure retcode is positive.
if (errcode < 0)
errcode = -errcode;
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
ComDiagsArea * diagsArea = NULL;
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(errcode), NULL,
intParam1, NULL, NULL,
str1, str2, NULL);
pentry_down->setDiagsArea(diagsArea);
return 0;
}
short ExHbaseAccessTcb::handleError(short &rc)
{
if (qparent_.up->isFull())
{
rc = WORK_OK;
return -1;
}
if (qparent_.up->isFull())
return WORK_OK;
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
ex_queue_entry *up_entry = qparent_.up->getTailEntry();
up_entry->copyAtp(pentry_down);
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.downIndex = qparent_.down->getHeadIndex();
up_entry->upState.status = ex_queue::Q_SQLERROR;
qparent_.up->insert();
return 0;
}
short ExHbaseAccessTcb::handleDone(ExWorkProcRetcode &rc, Int64 rowsAffected)
{
if (qparent_.up->isFull())
{
rc = WORK_OK;
return -1;
}
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
ex_queue_entry *up_entry = qparent_.up->getTailEntry();
up_entry->copyAtp(pentry_down);
up_entry->upState.parentIndex =
pentry_down->downState.parentIndex;
up_entry->upState.downIndex = qparent_.down->getHeadIndex();
up_entry->upState.status = ex_queue::Q_NO_DATA;
up_entry->upState.setMatchNo(matches_);
if (rowsAffected > 0 && hbaseAccessTdb().computeRowsAffected())
{
ExMasterStmtGlobals *g = getGlobals()->
castToExExeStmtGlobals()->castToExMasterStmtGlobals();
if (g)
{
g->setRowsAffected(g->getRowsAffected() + rowsAffected);
}
else
{
ComDiagsArea * diagsArea = up_entry->getDiagsArea();
if (!diagsArea)
{
diagsArea =
ComDiagsArea::allocate(getGlobals()->getDefaultHeap());
up_entry->setDiagsArea(diagsArea);
}
diagsArea->addRowCount(rowsAffected);
}
}
qparent_.up->insert();
qparent_.down->removeHead();
return 0;
}
// return:
// 0, if all ok.
// -1, if error.
short ExHbaseAccessTcb::createColumnwiseRow()
{
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
ExpTupleDesc * asciiSourceTD =
hbaseAccessTdb().workCriDesc_->getTupleDescriptor
(hbaseAccessTdb().asciiTuppIndex_);
for (Lng32 i = 0; i < asciiSourceTD->numAttrs(); i++)
{
Attributes * attr = asciiSourceTD->getAttr(i);
if (attr)
{
switch (i)
{
case HBASE_ROW_ID_INDEX:
{
*(short*)&asciiRow_[attr->getVCLenIndOffset()] = rowId_.len;
str_cpy_all(&asciiRow_[attr->getOffset()],
rowId_.val, rowId_.len);
}
break;
case HBASE_COL_FAMILY_INDEX:
{
*(short*)&asciiRow_[attr->getVCLenIndOffset()] = colFamName_.len;
str_cpy_all(&asciiRow_[attr->getOffset()],
colFamName_.val, colFamName_.len);
}
break;
case HBASE_COL_NAME_INDEX:
{
*(short*)&asciiRow_[attr->getVCLenIndOffset()] = colName_.len;
str_cpy_all(&asciiRow_[attr->getOffset()],
colName_.val, colName_.len);
}
break;
case HBASE_COL_VALUE_INDEX:
{
if (attr->getVCIndicatorLength() == sizeof(short))
*(short*)&asciiRow_[attr->getVCLenIndOffset()] = colVal_.len;
else
*(Lng32*)&asciiRow_[attr->getVCLenIndOffset()] = colVal_.len;
if (colVal_.len > attr->getLength())
{
// not enough space. Return error.
return -HBASE_COPY_ERROR;
}
str_cpy_all(&asciiRow_[attr->getOffset()],
colVal_.val, colVal_.len);
}
break;
case HBASE_COL_TS_INDEX:
{
*(Int64*)&asciiRow_[attr->getOffset()] = colTS_;
}
break;
} // switch
} // if attr
} // for
workAtp_->getTupp(hbaseAccessTdb().convertTuppIndex_)
.setDataPointer(convertRow_);
workAtp_->getTupp(hbaseAccessTdb().asciiTuppIndex_)
.setDataPointer(asciiRow_);
if (convertExpr())
{
ex_expr::exp_return_type evalRetCode =
convertExpr()->eval(pentry_down->getAtp(), workAtp_);
if (evalRetCode == ex_expr::EXPR_ERROR)
{
return -1;
}
}
return 0;
}
short ExHbaseAccessTcb::copyCell()
{
// values are stored in the following format:
// colNameLen(2 bytes)
// colName for colNameLen bytes
// colValueLen(4 bytes)
// colValue for colValueLen bytes.
//
// Attribute values are not necessarily null-terminated.
// Cannot use string functions.
//
//
short colNameLen = colName_.len + colFamName_.len + 1;
Lng32 colValueLen = colVal_.len;
Lng32 neededLen =
sizeof(colNameLen) + colNameLen + sizeof(colValueLen) + colValueLen;
if (rowwiseRowLen_ + neededLen > hbaseAccessTdb().convertRowLen())
{
// not enough space. Return error.
return -HBASE_COPY_ERROR;
}
char* pos = rowwiseRow_ + rowwiseRowLen_;
memcpy(pos, (char*)&colNameLen, sizeof(short));
pos += sizeof(short);
memcpy(pos, colFamName_.val, colFamName_.len);
pos += colFamName_.len;
*pos++ = ':';
memcpy(pos, colName_.val, colName_.len);
pos += colName_.len;
memcpy(pos, (char*)&colValueLen, sizeof(Lng32));
pos += sizeof(Lng32);
memcpy(pos, colVal_.val, colValueLen);
pos += colValueLen;
rowwiseRowLen_ += sizeof(short) + colNameLen + sizeof(Lng32) + colValueLen;
return 0;
}
// return:
// 0, if all ok.
// -1, if error.
short ExHbaseAccessTcb::createRowwiseRow()
{
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
ExpTupleDesc * asciiSourceTD =
hbaseAccessTdb().workCriDesc_->getTupleDescriptor
(hbaseAccessTdb().asciiTuppIndex_);
for (Lng32 i = 0; i < asciiSourceTD->numAttrs(); i++)
{
Attributes * attr = asciiSourceTD->getAttr(i);
if (attr)
{
switch (i)
{
case HBASE_ROW_ROWID_INDEX:
{
*(short*)&asciiRow_[attr->getVCLenIndOffset()] = rowId_.len;
str_cpy_all(&asciiRow_[attr->getOffset()], rowId_.val, rowId_.len);
}
break;
case HBASE_COL_DETAILS_INDEX:
{
if (attr->getVCIndicatorLength() == sizeof(short))
*(short*)&asciiRow_[attr->getVCLenIndOffset()] = rowwiseRowLen_;
else
*(Lng32*)&asciiRow_[attr->getVCLenIndOffset()] = rowwiseRowLen_;
str_cpy_all(&asciiRow_[attr->getOffset()], rowwiseRow_, rowwiseRowLen_);
}
break;
} // switch
} // if
} // for
workAtp_->getTupp(hbaseAccessTdb().convertTuppIndex_)
.setDataPointer(convertRow_);
workAtp_->getTupp(hbaseAccessTdb().asciiTuppIndex_)
.setDataPointer(asciiRow_);
if (convertExpr())
{
ex_expr::exp_return_type evalRetCode =
convertExpr()->eval(pentry_down->getAtp(), workAtp_);
if (evalRetCode == ex_expr::EXPR_ERROR)
{
return -1;
}
}
return 0;
}
// return 1, if found. 0, if not found. -1, if error.
// Columns retrieved from hbase are in sorted order.
// Columns in listOfColNames may or may not be in sorted order.
// This method advances LOCN until a match is found or a full
// loop of LOCN is done.
// If cols in LOCN are in sorted order, then the search will be faster.
//
short ExHbaseAccessTcb::getColPos(char * colName, Lng32 colNameLen, Lng32 &idx)
{
NABoolean found = FALSE;
Lng32 numCompares = hbaseAccessTdb().listOfFetchedColNames()->numEntries();
while ((numCompares > 0) &&
(NOT found))
{
if (hbaseAccessTdb().listOfFetchedColNames()->atEnd())
{
hbaseAccessTdb().listOfFetchedColNames()->position();
idx = -1;
}
// currNamePtr points to:
// 2 bytes len, len bytes colname.
char * currNamePtr = (char*)hbaseAccessTdb().listOfFetchedColNames()->getCurr();
short len = *(short*)currNamePtr;
currNamePtr += sizeof(short);
char * currName = currNamePtr;
if ((colNameLen == len) &&
(memcmp(colName, currName, len) == 0))
{
found = TRUE;
}
else
numCompares--;
hbaseAccessTdb().listOfFetchedColNames()->advance();
idx++;
}
if (found)
return 1;
else
return 0;
}
Lng32 ExHbaseAccessTcb::createSQRowDirect(Int64 *latestRowTimestamp)
{
short retcode = 0;
if (hbaseAccessTdb().alignedFormat())
retcode = createSQRowFromAlignedFormat(latestRowTimestamp);
else
{
if (hbaseAccessTdb().multiVersions()) {
if (latestRowTimestamp != NULL)
*latestRowTimestamp = -1;
retcode = createSQRowFromHbaseFormatMulti();
}
else
retcode = createSQRowFromHbaseFormat(latestRowTimestamp);
}
return retcode;
}
Lng32 ExHbaseAccessTcb::createSQRowFromHbaseFormat(Int64 *latestRowTimestamp)
{
// no columns are being fetched from hbase, do not create a row.
if (hbaseAccessTdb().listOfFetchedColNames()->numEntries() == 0)
return 0;
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
ExpTupleDesc * asciiSourceTD =
hbaseAccessTdb().workCriDesc_->getTupleDescriptor
(hbaseAccessTdb().asciiTuppIndex_);
ExpTupleDesc * convertTuppTD =
hbaseAccessTdb().workCriDesc_->getTupleDescriptor
(hbaseAccessTdb().convertTuppIndex_);
Attributes * attr = NULL;
// initialize as missing cols.
// TBD: can optimize to skip this step if there are no nullable and no added cols
memset(asciiRowMissingCols_, 1, asciiSourceTD->numAttrs());
// initialize latest timestamp to 0 for every column
memset(latestTimestampForCols_, 0, (asciiSourceTD->numAttrs()*sizeof(long)));
if (latestRowTimestamp != NULL)
*latestRowTimestamp = -1;
// initialize latest version to 0 for every column
memset(latestVersionNumForCols_, 0, (asciiSourceTD->numAttrs()*sizeof(long)));
hbaseAccessTdb().listOfFetchedColNames()->position();
Lng32 idx = -1;
NABoolean hbmTab = hbaseAccessTdb().hbaseMapTable();
BYTE *colVal;
Lng32 colValLen;
long timestamp;
char *colName;
short colNameLen;
BYTE nullVal;
Lng32 retcode;
int numCells;
retcode = ehi_->getNumCellsPerRow(numCells);
if (retcode == HBASE_ACCESS_NO_ROW)
return retcode ;
if (retcode != HBASE_ACCESS_SUCCESS)
{
setupError(retcode, "", "getNumCellsPerRow()");
return retcode;
}
// For an hbase mapped table, the primary key column may not exist as
// a separate hbase cell/column. In that case, the rowID of the retrieved row
// is the primary key column value.
// Also, if a primary key col value exists as a separate cell/column, then
// the contents of rowID and pkey col value must be the same.
// Retrieve rowID for hbase mapped table. It will be used later to validate
// the abovementioned 2 cases.
char * pkeyColInfo = hbaseAccessTdb().getPkeyColName();
Lng32 pkeyColInfoLen = 0;
HbaseStr rowID;
NABoolean hbmPkeyColCheck = FALSE;
if ((hbaseAccessTdb().hbaseMapTable()) &&
(pkeyColInfo))
{
hbmPkeyColCheck = TRUE;
pkeyColInfo = hbaseAccessTdb().getPkeyColName();
pkeyColInfoLen = *(short*)pkeyColInfo;
pkeyColInfo += sizeof(short);
retcode = ehi_->getRowID(rowID);
if (retcode != HBASE_ACCESS_SUCCESS)
{
setupError(retcode, "", "getRowID()");
return retcode;
}
}
for (int colNo= 0; colNo < numCells; colNo++)
{
retcode = ehi_->getColName(colNo, &colName, colNameLen, timestamp);
if (retcode != HBASE_ACCESS_SUCCESS)
{
setupError(retcode, "", "getColName()");
return retcode;
}
if (! getColPos(colName, colNameLen, idx)) // not found
{
ex_assert(FALSE, "Error in getColPos()");
}
// not missing any more
asciiRowMissingCols_[idx] = 0;
if (timestamp > latestTimestampForCols_[idx])
latestTimestampForCols_[idx] = timestamp;
if (latestRowTimestamp != NULL && timestamp > *latestRowTimestamp)
*latestRowTimestamp = timestamp;
latestVersionNumForCols_[idx] = 1;
Attributes * attr = asciiSourceTD->getAttr(idx);
if (! attr)
ex_assert(FALSE, "Attr not found -1");
// copy to asciiRow only if this is the latest version seen so far
// for this column. On 6/10/2014 we get two versions for a newly
// updated column that has not been committed yet.
if (timestamp == latestTimestampForCols_[idx])
{
colVal = (BYTE *)&asciiRow_[attr->getOffset()];
colValLen = attr->getLength();
BYTE nullVal;
if (hbmTab)
{
Lng32 ij = 0;
}
retcode = ehi_->getColVal(colNo, colVal, colValLen,
(NOT hbaseAccessTdb().hbaseMapTable()
? attr->getNullFlag() : FALSE), nullVal);
if (retcode != HBASE_ACCESS_SUCCESS)
{
setupError(retcode, "", "getColVal()");
return retcode;
}
if (colValLen > attr->getLength())
{
// col val wont fit. Return error.
char buf[1000];
str_sprintf(buf, " Details: actual column value length of %d is greater than the expected max buffer size of %d.",
colValLen, attr->getLength());
raiseError(EXE_HBASE_ACCESS_ERROR, NULL,
hbaseAccessTdb().getTableName(), buf);
return -1;
}
if ((hbmPkeyColCheck) &&
(pkeyColInfoLen == colNameLen) &&
(str_cmp(pkeyColInfo, colName, colNameLen) == 0))
{
// validate that this col value is same as the pkey value for this
// hbase mapped table.
if (NOT ((rowID.len == colValLen) &&
(str_cmp(rowID.val, (char*)colVal, colValLen) == 0)))
{
char buf[1000];
str_sprintf(buf, " Details: HBase rowID content must match the primary key column content.");
raiseError(EXE_HBASE_ACCESS_ERROR, NULL,
hbaseAccessTdb().getTableName(), buf);
return -1;
}
}
if (attr->getNullFlag())
{
if (nullVal)
*(short*)&asciiRow_[attr->getNullIndOffset()] = -1;
else
*(short*)&asciiRow_[attr->getNullIndOffset()] = 0;
}
if (attr->getVCIndicatorLength() > 0)
{
if (attr->getVCIndicatorLength() == sizeof(short))
*(short*)&asciiRow_[attr->getVCLenIndOffset()] = colValLen;
else
*(Lng32*)&asciiRow_[attr->getVCLenIndOffset()] = colValLen;
}
}
} // for
// fill in null or default values for missing cols.
for (idx = 0; idx < asciiSourceTD->numAttrs(); idx++)
{
if (asciiRowMissingCols_[idx] == 1) // missing
{
attr = asciiSourceTD->getAttr(idx);
if (! attr)
ex_assert(FALSE, "Attr not found -2");
char * defVal = attr->getDefaultValue();
char * colInfo = NULL;
Lng32 colInfoLen = 0;
if (hbmPkeyColCheck || (! defVal))
{
colInfo =
(char*)hbaseAccessTdb().listOfFetchedColNames()->get(idx);
colInfoLen = *(short*)colInfo;
colInfo += sizeof(short);
}
// if this missing col is the pkey col, fill in values
// from the retrieved rowID.
if (hbmPkeyColCheck &&
(pkeyColInfoLen == colInfoLen) &&
(str_cmp(pkeyColInfo, colInfo, colInfoLen) == 0))
{
char * colValLoc = (char *)&asciiRow_[attr->getOffset()];
Lng32 colValMaxLen = attr->getLength();
if (rowID.len > colValMaxLen)
{
// rowID val wont fit. Return error.
char buf[1000];
str_sprintf(buf, " Details: retrieved rowID of length %d is larger than the specified key size of %d.",
rowID.len, colValMaxLen);
raiseError(EXE_HBASE_ACCESS_ERROR, NULL,
hbaseAccessTdb().getTableName(), buf);
return -1;
}
str_cpy_all(colValLoc, rowID.val, rowID.len);
*(short*)&asciiRow_[attr->getNullIndOffset()] = 0;
if (attr->getVCIndicatorLength() > 0)
{
if (attr->getVCIndicatorLength() == sizeof(short))
*(short*)&asciiRow_[attr->getVCLenIndOffset()] = rowID.len;
else
*(Lng32*)&asciiRow_[attr->getVCLenIndOffset()] = rowID.len;
}
// dont check for other columns once pkey col has been found.
hbmPkeyColCheck = FALSE;
} // missing pkey val
else
{
if (! defVal)
{
Text colFam, colName;
extractColFamilyAndName(colInfo, colFam, colName);
char buf[20];
if (NOT hbaseAccessTdb().hbaseMapTable())
{
Int64 v = 0;
if (colName.length() == sizeof(char))
v = *(char*)colName.data();
else if (colName.length() == sizeof(UInt16))
v = *(UInt16*)colName.data();
else if (colName.length() == sizeof(ULng32))
v = *(ULng32*)colName.data();
str_sprintf(buf, "%ld", v);
}
ComDiagsArea * diagsArea = NULL;
ExRaiseSqlError(
getHeap(), &diagsArea,
(ExeErrorCode) EXE_DEFAULT_VALUE_INCONSISTENT_ERROR,
NULL, NULL, NULL, NULL,
(hbaseAccessTdb().hbaseMapTable() ? colName.data() : buf),
hbaseAccessTdb().getTableName());
pentry_down->setDiagsArea(diagsArea);
return -1;
}
char * defValPtr = defVal;
short nullVal = 0;
if (attr->getNullFlag())
{
nullVal = *(short*)defVal;
*(short*)&asciiRow_[attr->getNullIndOffset()] = nullVal;
defValPtr += 2;
}
Lng32 copyLen;
if (! nullVal)
{
if (attr->getVCIndicatorLength() > 0)
{
copyLen = *(short*)defValPtr;
if (attr->getVCIndicatorLength() == sizeof(short))
*(short*)&asciiRow_[attr->getVCLenIndOffset()] = copyLen;
else
*(Lng32*)&asciiRow_[attr->getVCLenIndOffset()] = copyLen;
defValPtr += attr->getVCIndicatorLength();
}
else
{
copyLen = attr->getLength();
}
char *destPtr = &asciiRow_[attr->getOffset()];
str_cpy_all(destPtr, defValPtr, copyLen);
} // not nullVal
}
} // missing col
} // for
workAtp_->getTupp(hbaseAccessTdb().convertTuppIndex_)
.setDataPointer(convertRow_);
workAtp_->getTupp(hbaseAccessTdb().asciiTuppIndex_)
.setDataPointer(asciiRow_);
if (hbaseAccessTdb().hbaseTimestampTuppIndex_ > 0)
{
workAtp_->getTupp(hbaseAccessTdb().hbaseTimestampTuppIndex_)
.setDataPointer((char*)latestTimestampForCols_);
}
if (hbaseAccessTdb().hbaseVersionTuppIndex_ > 0)
{
workAtp_->getTupp(hbaseAccessTdb().hbaseVersionTuppIndex_)
.setDataPointer((char*)latestVersionNumForCols_);
}
if (convertExpr())
{
convertRowLen_ = hbaseAccessTdb().convertRowLen();
UInt32 rowLen = convertRowLen_;
ex_expr::exp_return_type evalRetCode =
convertExpr()->eval(pentry_down->getAtp(), workAtp_, 0, -1, &rowLen);
if (evalRetCode == ex_expr::EXPR_ERROR)
{
return -1;
}
if (hbaseAccessTdb().getUseCif() && rowLen < convertRowLen_)
convertRowLen_= rowLen;
}
return 0;
}
////////////////////////////////////////////////////////////////////////
// hbase JNI returns all versions of cell values for each row id as a single list.
//
// Example:
// a row has 3 columns, C1, C2, C3.
// C1 has 3 cell versions, C2 has 1 version, C3 has 2 versions.
// Returned cell values from hbase will be:
// C1V1, C1V2, C1V3, C2V1, C3V1, C3V2
//
// These cell values will be transformed and returned as 3 rows:
// RowVersion1: C1V1 C2V1 C3V1
// RowVersion2: C1V2 C2V1 C3V2
// RowVersion3: C1V3 C2V1 C3V2
//
// Method setupSQMultiVersionRow does this transformation and creates
// an array with multiple rows.
// Method createSQRowFromHbaseFormatMulti() loops through this array
// and returns these 3 rows.
//
// colValVec: one entry for each fetched column. Each column can have
// max numVers versions. numVers is specified by user at compile time.
//
// For each column entry:
// colValVec_[i].numVersions_: number of versions
// colValVec_[i].versionArray_: array of numVers pointer entries.
// Each entry points to a column version, if one exists
// colValVec_[i].timestampArray_: array of timestamp values, if timestamp is
// to be returned.
// Each entry contains timestamp for that version.
// colValVec_[i].versionNumArray_: array of version numbers, if version is
// to be returned.
// Each entry contains version num for that version.
//
//
/////////////////////////////////////////////////////////////////////////
Lng32 ExHbaseAccessTcb::setupSQMultiVersionRow()
{
// no columns are being fetched from hbase, do not create a row. TBD for RAW.
Lng32 numFetchedCols = hbaseAccessTdb().listOfFetchedColNames()->numEntries();
if (hbaseAccessTdb().listOfFetchedColNames()->numEntries() == 0)
return 0;
hbaseAccessTdb().listOfFetchedColNames()->position();
Lng32 idx = -1;
ExpTupleDesc * asciiSourceTD =
hbaseAccessTdb().workCriDesc_->getTupleDescriptor
(hbaseAccessTdb().asciiTuppIndex_);
Attributes * attr = NULL;
char *colVal;
Lng32 colValLen;
long timestamp;
char *colName;
short colNameLen;
BYTE nullVal;
Lng32 retcode;
int numCells;
retcode = ehi_->getNumCellsPerRow(numCells);
if (retcode == HBASE_ACCESS_NO_ROW)
return retcode ;
if (retcode != HBASE_ACCESS_SUCCESS)
{
setupError(retcode, "", "getNumCellsPerRow()");
return retcode;
}
if (colValVec_ == NULL)
{
colValVec_ = new(getHeap()) ColValVec[numFetchedCols];
for (int i = 0; i < numFetchedCols; i++)
{
Lng32 numVers = hbaseAccessTdb().getHbaseAccessOptions()->getNumVersions();
if (numVers == -2)
numVers = 100;
char ** v = new(getHeap()) char*[numVers];
colValVec_[i].numVersions_ = 0;
colValVec_[i].versionArray_ = v;
if (hbaseAccessTdb().isHbaseTimestampNeeded())
{
colValVec_[i].timestampArray_ = new(getHeap()) Int64[numVers];
}
if (hbaseAccessTdb().isHbaseVersionNeeded())
{
colValVec_[i].versionNumArray_ = new(getHeap()) Int64[numVers];
}
for (int j = 0; j < numVers; j++)
{
colValVec_[i].versionArray_[j] = NULL;
if (hbaseAccessTdb().isHbaseTimestampNeeded())
colValVec_[i].timestampArray_[j] = 0;
if (hbaseAccessTdb().isHbaseVersionNeeded())
colValVec_[i].versionNumArray_[j] = 0;
}
}
}
else
{
for (int i = 0; i < numFetchedCols; i++)
{
colValVec_[i].numVersions_ = 0;
}
}
colValVecSize_ = 0;
std::string prevColName;
int colValPos = 0;
for (int colNo= 0; colNo < numCells; colNo++)
{
retcode = ehi_->getColName(colNo, &colName, colNameLen, timestamp);
if (retcode != HBASE_ACCESS_SUCCESS)
{
setupError(retcode, "", "getColName()");
return retcode;
}
if (prevColName.empty())
{
prevColName.assign(colName, colNameLen);
colValPos = 0;
}
else if (std::string(colName, colNameLen) == prevColName)
{
colValPos++;
}
else // colName != prevColName
{
colValPos = 0;
prevColName = std::string(colName, colNameLen);
}
// if col name changed, find idx into attributes array.
// otherwise, use previous idx. This avoids looping over listOfFetchedColNames
// in getColPos.
if (colValPos == 0)
{
if (! getColPos(colName, colNameLen, idx)) // not found
{
ex_assert(FALSE, "Error in getColPos()");
}
}
Attributes * attr = asciiSourceTD->getAttr(idx);
if (! attr)
ex_assert(FALSE, "Attr not found -1");
char ** colValEntries = colValVec_[idx].versionArray_;
if (colValEntries[colValPos] == NULL)
{
Lng32 colValAllocLen = ((attr->getNullFlag() ? sizeof(short) : 0) +
sizeof(Lng32) + attr->getLength());
colVal = new(getHeap()) char[colValAllocLen];
colValEntries[colValPos] = colVal;
colValVec_[idx].numVersions_++;
}
else
{
colVal = colValEntries[colValPos];
colValVec_[idx].numVersions_++;
}
if (colValVec_[idx].numVersions_ > colValVecSize_)
colValVecSize_ = colValVec_[idx].numVersions_;
char * nullData = (attr->getNullFlag() ? (char*)colVal : NULL);
char * vcLen = (char*)colVal + (nullData ? sizeof(short) : 0);
char * colData = vcLen + sizeof(Lng32);
colValLen = attr->getLength();
retcode = ehi_->getColVal(colNo, (BYTE*)colData, colValLen,
attr->getNullFlag(), nullVal);
if (retcode != HBASE_ACCESS_SUCCESS)
{
setupError(retcode, "", "getColVal()");
return retcode;
}
if (nullData)
{
if (nullVal)
*(short*)nullData = -1;
else
*(short*)nullData = 0;
}
*(Lng32*)vcLen = colValLen;
if (hbaseAccessTdb().isHbaseTimestampNeeded())
colValVec_[idx].timestampArray_[colValPos] = timestamp;
if (hbaseAccessTdb().isHbaseVersionNeeded())
colValVec_[idx].versionNumArray_[colValPos] = colValPos+1;
}
colValEntry_ = 0;
return 0;
}
Lng32 ExHbaseAccessTcb::createSQRowFromHbaseFormatMulti()
{
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
Lng32 numFetchedCols = hbaseAccessTdb().listOfFetchedColNames()->numEntries();
ExpTupleDesc * asciiSourceTD =
hbaseAccessTdb().workCriDesc_->getTupleDescriptor
(hbaseAccessTdb().asciiTuppIndex_);
ExpTupleDesc * convertTuppTD =
hbaseAccessTdb().workCriDesc_->getTupleDescriptor
(hbaseAccessTdb().convertTuppIndex_);
if (colValEntry_ >= colValVecSize_)
return HBASE_ACCESS_NO_ROW;
// initialize as missing cols.
memset(asciiRowMissingCols_, 1, asciiSourceTD->numAttrs());
Lng32 retcode;
Attributes * attr = NULL;
for (int idx = 0; idx < numFetchedCols; idx++)
{
attr = asciiSourceTD->getAttr(idx);
char ** colValEntries = colValVec_[idx].versionArray_;
Lng32 numColValEntries = colValVec_[idx].numVersions_;
if (numColValEntries > 0)
{
// not missing any more
asciiRowMissingCols_[idx] = 0;
Lng32 entryPos =
(colValEntry_ >= numColValEntries ? colValEntry_ - 1 : colValEntry_);
char *s = colValEntries[entryPos];
char * colVal = (char*)s;
char * nullData = NULL;
if (attr->getNullFlag())
{
nullData = colVal;
if (*(short*)nullData)
*(short*)&asciiRow_[attr->getNullIndOffset()] = -1;
else
*(short*)&asciiRow_[attr->getNullIndOffset()] = 0;
}
char * vcLen = (char*)colVal + (nullData ? sizeof(short) : 0);
Lng32 colDataLen = *(Lng32*)vcLen;
if (attr->getVCIndicatorLength() > 0)
{
if (attr->getVCIndicatorLength() == sizeof(short))
*(short*)&asciiRow_[attr->getVCLenIndOffset()] = colDataLen;
else
*(Lng32*)&asciiRow_[attr->getVCLenIndOffset()] = colDataLen;
}
char * colData = vcLen + sizeof(Lng32);
colVal = &asciiRow_[attr->getOffset()];
memcpy(colVal, colData, colDataLen);
if (hbaseAccessTdb().isHbaseTimestampNeeded())
latestTimestampForCols_[idx] = colValVec_[idx].timestampArray_[entryPos];
if (hbaseAccessTdb().isHbaseVersionNeeded())
latestVersionNumForCols_[idx] = colValVec_[idx].versionNumArray_[entryPos];
}
}
// fill in null or default values for missing cols.
for (int idx = 0; idx < asciiSourceTD->numAttrs(); idx++)
{
if (asciiRowMissingCols_[idx] == 1) // missing
{
attr = asciiSourceTD->getAttr(idx);
if (! attr)
ex_assert(FALSE, "Attr not found -2");
char * defVal = attr->getDefaultValue();
char * defValPtr = defVal;
short nullVal = 0;
if (attr->getNullFlag())
{
nullVal = *(short*)defVal;
*(short*)&asciiRow_[attr->getNullIndOffset()] = nullVal;
defValPtr += 2;
}
Lng32 copyLen;
if (! nullVal)
{
if (attr->getVCIndicatorLength() > 0)
{
copyLen = *(short*)defValPtr;
if (attr->getVCIndicatorLength() == sizeof(short))
*(short*)&asciiRow_[attr->getVCLenIndOffset()] = copyLen;
else
*(Lng32*)&asciiRow_[attr->getVCLenIndOffset()] = copyLen;
defValPtr += attr->getVCIndicatorLength();
}
else
{
copyLen = attr->getLength();
}
char *destPtr = &asciiRow_[attr->getOffset()];
str_cpy_all(destPtr, defValPtr, copyLen);
} // not nullVal
} // missing col
}
workAtp_->getTupp(hbaseAccessTdb().convertTuppIndex_)
.setDataPointer(convertRow_);
workAtp_->getTupp(hbaseAccessTdb().asciiTuppIndex_)
.setDataPointer(asciiRow_);
if (hbaseAccessTdb().hbaseTimestampTuppIndex_ > 0)
{
workAtp_->getTupp(hbaseAccessTdb().hbaseTimestampTuppIndex_)
.setDataPointer((char*)latestTimestampForCols_);
}
if (hbaseAccessTdb().hbaseVersionTuppIndex_ > 0)
{
workAtp_->getTupp(hbaseAccessTdb().hbaseVersionTuppIndex_)
.setDataPointer((char*)latestVersionNumForCols_);
}
if (convertExpr())
{
ex_expr::exp_return_type evalRetCode =
convertExpr()->eval(pentry_down->getAtp(), workAtp_);
if (evalRetCode == ex_expr::EXPR_ERROR)
{
return -1;
}
}
colValEntry_++;
return 0;
}
// return TRUE is values are missing in aligned format row.
// Added columns are always added after all the existing columns
// in a table.
// This methods checks for added columns from in the list of attributes.
// Check is done from last to first attr.
// If an attr is a specialField (added col), then check is done to see if
// it is missing. TRUE is returned in that case.
// If a non-added field is reached, then no cols are missing.
// FALSE is returned in that case.
NABoolean ExHbaseAccessTcb::missingValuesInAlignedFormatRow(
ExpTupleDesc * tdesc, char * alignedRow, Lng32 alignedRowLen)
{
if (NOT tdesc->addedFieldPresent())
return FALSE;
UInt32 ff = ExpTupleDesc::getFirstFixedOffset
(alignedRow, ExpTupleDesc::SQLMX_ALIGNED_FORMAT);
Attributes * attr = NULL;
for (int idx = tdesc->numAttrs()-1; idx >= 0; idx--)
{
attr = tdesc->getAttr(idx);
// if a regular (non-added) attr is reached, we are done.
// There cannot be any added columns to the left of this attr.
if (NOT attr->isAddedCol())
return FALSE;
if (Attributes::isDefaultValueNeeded(
attr, tdesc, ff,
attr->getVCIndicatorLength(), attr->getVoaOffset(),
alignedRow, alignedRowLen))
return TRUE;
} // for
return FALSE;
}
Lng32 ExHbaseAccessTcb::createSQRowFromAlignedFormat(Int64 *latestRowTimestamp)
{
// no columns are being fetched from hbase, do not create a row.
if (hbaseAccessTdb().listOfFetchedColNames()->numEntries() == 0)
return 0;
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
ExpTupleDesc * asciiSourceTD =
hbaseAccessTdb().workCriDesc_->getTupleDescriptor
(hbaseAccessTdb().asciiTuppIndex_);
ExpTupleDesc * convertTuppTD =
hbaseAccessTdb().workCriDesc_->getTupleDescriptor
(hbaseAccessTdb().convertTuppIndex_);
Attributes * attr = NULL;
// initialize latest timestamp to 0 for every column
memset(latestTimestampForCols_, 0, (asciiSourceTD->numAttrs()*sizeof(long)));
if (latestRowTimestamp != NULL)
*latestRowTimestamp = -1;
hbaseAccessTdb().listOfFetchedColNames()->position();
Lng32 idx = -1;
BYTE *colVal;
Lng32 colValLen;
long timestamp;
char *colName;
short colNameLen;
BYTE nullVal;
Lng32 retcode;
int numCells;
retcode = ehi_->getNumCellsPerRow(numCells);
if (retcode == HBASE_ACCESS_NO_ROW)
return retcode ;
if (retcode != HBASE_ACCESS_SUCCESS)
{
setupError(retcode, "", "getNumCellsPerRow()");
return retcode;
}
// aligned format should only return one column.
if (numCells > 2)
{
// error
return -HBASE_CREATE_ROW_ERROR;
}
Lng32 asciiRowLen = hbaseAccessTdb().asciiRowLen_;
for (int colNo= 0; colNo < numCells; colNo++)
{
retcode = ehi_->getColName(colNo, &colName, colNameLen, timestamp);
if (retcode != HBASE_ACCESS_SUCCESS)
{
setupError(retcode, "", "getColName()");
return retcode;
}
if (! getColPos(colName, colNameLen, idx)) // not found
{
ex_assert(FALSE, "Error in getColPos()");
}
if (timestamp > latestTimestampForCols_[idx])
latestTimestampForCols_[idx] = timestamp;
if (latestRowTimestamp != NULL && timestamp > *latestRowTimestamp)
*latestRowTimestamp = timestamp;
// copy to asciiRow only if this is the latest version seen so far
// for this column. On 6/10/2014 we get two versions for a newly
// updated column that has not been committed yet.
if (timestamp == latestTimestampForCols_[idx])
{
colVal = (BYTE*)asciiRow_;
retcode = ehi_->getColVal(colNo, colVal, asciiRowLen,
FALSE, nullVal);
if (retcode != HBASE_ACCESS_SUCCESS)
{
setupError(retcode, "", "getColVal()");
return retcode;
}
}
}
// check to see if added columns are missing from this row.
// If they are, pcode evaluation cannot be used.
NABoolean doEvalClauses = FALSE;
if ((convertExpr()) &&
(convertExpr()->getPCodeBinary()) &&
(asciiSourceTD->addedFieldPresent()) &&
(missingValuesInAlignedFormatRow(asciiSourceTD, asciiRow_, asciiRowLen)))
doEvalClauses = TRUE;
workAtp_->getTupp(hbaseAccessTdb().convertTuppIndex_)
.setDataPointer(convertRow_);
workAtp_->getTupp(hbaseAccessTdb().asciiTuppIndex_)
.setDataPointer(asciiRow_);
if (convertExpr())
{
convertRowLen_ = hbaseAccessTdb().convertRowLen();
UInt32 rowLen = convertRowLen_;
ex_expr::exp_return_type evalRetCode = ex_expr::EXPR_OK;
if (doEvalClauses)
{
evalRetCode =
convertExpr()->evalClauses(convertExpr()->getClauses(),
pentry_down->getAtp(), workAtp_,
asciiRowLen, &rowLen,
NULL, NULL);
}
else
{
evalRetCode =
convertExpr()->eval(pentry_down->getAtp(), workAtp_, NULL,
asciiRowLen, &rowLen);
}
if (evalRetCode == ex_expr::EXPR_ERROR)
{
return -1;
}
if (hbaseAccessTdb().getUseCif() &&
rowLen < convertRowLen_)
convertRowLen_=rowLen;
}
return 0;
}
// returns:
// 0, if expr is false
// 1, if no pred or expr is true
// -1, if expr error
short ExHbaseAccessTcb::applyPred(ex_expr * expr, UInt16 tuppIndex,
char * tuppRow)
{
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
if (! expr)
{
return 1;
}
ex_expr::exp_return_type exprRetCode = ex_expr::EXPR_OK;
if ((tuppRow) && (tuppIndex > 0))
workAtp_->getTupp(tuppIndex)
.setDataPointer(tuppRow);
else
workAtp_->getTupp(hbaseAccessTdb().convertTuppIndex_)
.setDataPointer(convertRow_);
exprRetCode =
expr->eval(pentry_down->getAtp(), workAtp_);
if (exprRetCode == ex_expr::EXPR_ERROR)
{
return -1;
}
if (exprRetCode == ex_expr::EXPR_TRUE)
{
return 1;
}
return 0;
}
short ExHbaseAccessTcb::extractColFamilyAndName(char * input,
Text &colFam, Text &colName)
{
return ExFunctionHbaseColumnLookup::extractColFamilyAndName(input,
-1,
TRUE,
colFam, colName);
}
short ExHbaseAccessTcb::evalKeyColValExpr(HbaseStr &columnToCheck, HbaseStr &colValToCheck)
{
if (! keyColValExpr())
return -1;
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
ex_expr::exp_return_type exprRetCode = ex_expr::EXPR_OK;
workAtp_->getTupp(hbaseAccessTdb().keyColValTuppIndex_)
.setDataPointer(keyColValRow_);
exprRetCode =
keyColValExpr()->eval(pentry_down->getAtp(), workAtp_);
if (exprRetCode == ex_expr::EXPR_ERROR)
{
return -1;
}
memcpy(colValToCheck.val, keyColValRow_, hbaseAccessTdb().keyColValLen_);
colValToCheck.len = hbaseAccessTdb().keyColValLen_;
char * keyColNamePtr = hbaseAccessTdb().keyColName();
short nameLen = *(short*)keyColNamePtr;
char * keyColName = &keyColNamePtr[sizeof(short)];
memcpy(columnToCheck.val, keyColName, nameLen);
columnToCheck.len = nameLen;
return 0;
}
short ExHbaseAccessTcb::evalEncodedKeyExpr()
{
if (! encodedKeyExpr())
return 0;
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
ex_expr::exp_return_type exprRetCode = ex_expr::EXPR_OK;
workAtp_->getTupp(hbaseAccessTdb().keyTuppIndex_)
.setDataPointer(encodedKeyRow_);
exprRetCode =
encodedKeyExpr()->eval(pentry_down->getAtp(), workAtp_);
if (exprRetCode == ex_expr::EXPR_ERROR)
{
return -1;
}
return 0;
}
short ExHbaseAccessTcb::evalRowIdExpr(NABoolean isVarchar)
{
if (! rowIdExpr())
return 0;
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
ex_expr::exp_return_type exprRetCode = ex_expr::EXPR_OK;
workAtp_->getTupp(hbaseAccessTdb().rowIdTuppIndex_)
.setDataPointer(rowIdRow_);
exprRetCode =
rowIdExpr()->eval(pentry_down->getAtp(), workAtp_);
if (exprRetCode == ex_expr::EXPR_ERROR)
{
return -1;
}
if (isVarchar)
{
rowId_.val = &rowIdRow_[SQL_VARCHAR_HDR_SIZE];
rowId_.len = *(short*)rowIdRow_;
}
else
{
rowId_.val = rowIdRow_;
rowId_.len = hbaseAccessTdb().rowIdLen_;
}
return 0;
}
short ExHbaseAccessTcb::evalInsDelPreCondExpr()
{
if (! insDelPreCondExpr()) {
return 1;
}
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
ex_expr::exp_return_type exprRetCode = ex_expr::EXPR_OK;
exprRetCode =
insDelPreCondExpr()->eval(pentry_down->getAtp(), workAtp_);
switch (exprRetCode) {
case ex_expr::EXPR_FALSE:
return 0;
case ex_expr::EXPR_TRUE:
return 1;
default:
return -1;
}
return 0;
}
short ExHbaseAccessTcb::evalConstraintExpr(ex_expr *expr, UInt16 tuppIndex,
char * tuppRow)
{
if (expr == NULL) {
return 1;
}
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
ex_expr::exp_return_type exprRetCode = ex_expr::EXPR_OK;
if ((tuppRow) && (tuppIndex > 0))
workAtp_->getTupp(tuppIndex).setDataPointer(tuppRow);
else
workAtp_->getTupp(hbaseAccessTdb().convertTuppIndex_).setDataPointer(convertRow_);
exprRetCode = expr->eval(pentry_down->getAtp(), workAtp_);
if (exprRetCode == ex_expr::EXPR_ERROR)
return -1;
else if (exprRetCode == ex_expr::EXPR_TRUE)
return 1;
else
return 0;
}
short ExHbaseAccessTcb::evalRowIdAsciiExpr(const char * inputRowIdVals,
char * rowIdBuf, // input: buffer where rowid is created
char* &outputRowIdPtr, // output: ptr to rowid.
Lng32 excludeKey,
Lng32 &outputRowIdLen)
{
if (! rowIdExpr())
return -1;
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
ex_expr::exp_return_type exprRetCode = ex_expr::EXPR_OK;
ExpTupleDesc * asciiSourceTD =
hbaseAccessTdb().workCriDesc_->getTupleDescriptor
(hbaseAccessTdb().rowIdAsciiTuppIndex_);
Lng32 currPos = 0;
for (CollIndex i = 0; i < asciiSourceTD->numAttrs(); i++)
{
Attributes * attr = asciiSourceTD->getAttr(i);
if (! attr)
{
// error
return -1;
}
short inputRowIdValLen = *(short*)&inputRowIdVals[currPos];
currPos += sizeof(short);
short nullVal = 0;
if (attr->getNullFlag())
{
inputRowIdValLen -= sizeof(short);
nullVal = *(short*)&inputRowIdVals[currPos];
if (nullVal)
{
*(short*)&rowIdAsciiRow_[attr->getNullIndOffset()] = -1;
}
else
{
*(short*)&rowIdAsciiRow_[attr->getNullIndOffset()] = 0;
}
currPos += sizeof(short);
}
const char * inputRowIdVal = &inputRowIdVals[currPos];
if (attr->getVCIndicatorLength() > 0)
{
*(short*)&rowIdAsciiRow_[attr->getVCLenIndOffset()] = inputRowIdValLen;
*(Int64*)&rowIdAsciiRow_[attr->getOffset()] = (Int64)inputRowIdVal;
}
else
{
char * srcPtr = &rowIdAsciiRow_[attr->getOffset()];
Lng32 copyLen = MINOF(attr->getLength(), inputRowIdValLen);
str_cpy_all(srcPtr, (char*)inputRowIdVal, copyLen);
}
currPos += inputRowIdValLen;
}
workAtp_->getTupp(hbaseAccessTdb().rowIdTuppIndex_)
.setDataPointer(rowIdBuf);
workAtp_->getTupp(hbaseAccessTdb().rowIdAsciiTuppIndex_)
.setDataPointer(rowIdAsciiRow_);
exprRetCode =
rowIdExpr()->eval(pentry_down->getAtp(), workAtp_);
if (exprRetCode == ex_expr::EXPR_ERROR)
{
return -1;
}
// For non-SQ tables,
// rowid is created as a null terminated string in rowIdBuf. (ANSI char type)
// At this point, there are 2 extra bytes allocated in the beginning. Need to
// figure out why that is and if it is needed.
// Skip the first 2 bytes and point to the actual row.
// If exclude flag is set, then add a null byte and increment the length by 1.
if (hbaseAccessTdb().sqHbaseTable())
{
if (hbaseAccessTdb().keyInVCformat())
{
// Key is in varchar format.
outputRowIdLen = *(short*)rowIdBuf;
outputRowIdPtr = rowIdBuf + sizeof(short);
}
else
{
outputRowIdPtr = rowIdBuf;
outputRowIdLen = hbaseAccessTdb().getRowIDLen();
}
}
else
{
outputRowIdPtr = &rowIdBuf[2];
outputRowIdLen = strlen(outputRowIdPtr);
}
if (excludeKey)
{
outputRowIdPtr[outputRowIdLen] = '\0';
outputRowIdLen++;
}
return 0;
}
void ExHbaseAccessTcb::setupPrevRowId()
{
if (prevRowIdMaxLen_ < rowId_.len)
{
if (prevRowId_.val)
NADELETEBASIC(prevRowId_.val, getHeap());
prevRowIdMaxLen_ = rowId_.len;
prevRowId_.val = new(getHeap()) char[prevRowIdMaxLen_];
}
str_cpy_all(prevRowId_.val, rowId_.val, rowId_.len);
prevRowId_.len = rowId_.len;
if (rowwiseRow_)
{
rowwiseRowLen_ = 0;
}
}
void ExHbaseAccessTcb::extractColNameFields
(char * inValPtr, short &colNameLen, char* &colName)
{
// inValPtr points to:
// 2 bytes len, len bytes colname.
char * currPtr = inValPtr;
colNameLen = *(short*)currPtr;
currPtr += sizeof(short);
colName = currPtr;
}
Lng32 ExHbaseAccessTcb::setupListOfColNames(Queue * listOfColNames,
LIST(HbaseStr) &columns)
{
if (columns.isEmpty() && listOfColNames)
{
listOfColNames->position();
HbaseStr colNameText;
for (Lng32 i = 0; i < listOfColNames->numEntries(); i++)
{
short colNameLen;
char * colName;
extractColNameFields((char*)listOfColNames->getCurr(),
colNameLen, colName);
listOfColNames->advance();
colNameText.val = colName;
colNameText.len = colNameLen;
columns.insert(colNameText);
}
}
return 0;
}
Lng32 ExHbaseAccessTcb::setupListOfColNames(Queue * listOfColNames,
LIST(NAString) &columns)
{
if (columns.isEmpty() && listOfColNames)
{
listOfColNames->position();
for (Lng32 i = 0; i < listOfColNames->numEntries(); i++)
{
short colNameLen;
char * colName;
extractColNameFields((char*)listOfColNames->getCurr(),
colNameLen, colName);
listOfColNames->advance();
NAString colNameText(colName, colNameLen, getHeap());
columns.insert(colNameText);
}
}
return 0;
}
Lng32 ExHbaseAccessTcb::setupUniqueRowIdsAndCols
(ComTdbHbaseAccess::HbaseGetRows*hgr)
{
if ((! hgr->rowIds()) ||
(hgr->rowIds()->numEntries() == 0))
{
setupError(-HBASE_OPEN_ERROR, "", "RowId list is empty");
return -1;
}
HbaseStr rowIdRowText;
hgr->rowIds()->position();
rowIds_.clear();
for (Lng32 i = 0; i < hgr->rowIds()->numEntries(); i++)
{
if (! rowIdExpr())
{
setupError(-HBASE_OPEN_ERROR, "", "RowId Expr is empty");
return -1;
}
const char * inputRowIdVals = (char*)hgr->rowIds()->getNext();
char * rowIdRow = NULL;
if (hgr->rowIds()->numEntries() == 1)
{
rowIdRow = rowIdRow_;
}
else
{
// need to be deleted. TBD.
rowIdRow = new(getGlobals()->getDefaultHeap()) char[hbaseAccessTdb().rowIdLen_ + 2];
}
Lng32 rowIdLen = 0;
char * rowIdPtr = NULL;
if (evalRowIdAsciiExpr(inputRowIdVals,
rowIdRow,
rowIdPtr,
0,
rowIdLen) == -1)
{
return -1;
}
rowIdRowText.val = rowIdPtr;
rowIdRowText.len = rowIdLen;
rowIds_.insert(rowIdRowText);
}
setupListOfColNames(hbaseAccessTdb().listOfFetchedColNames(), columns_);
return 0;
}
// sets up beginRowId_, endRowId_ and columns_ fields.
Lng32 ExHbaseAccessTcb::setupSubsetRowIdsAndCols
(ComTdbHbaseAccess::HbaseScanRows* hsr)
{
Lng32 rowIdLen;
char * rowIdPtr = NULL;
const char * inputBeginRowIdVals = (char*)hsr->beginRowId_;
rowIdLen = *(short*)inputBeginRowIdVals;
if (rowIdLen > 0)
{
if (evalRowIdAsciiExpr(inputBeginRowIdVals,
beginRowIdRow_,
rowIdPtr,
hsr->beginKeyExclusive_,
rowIdLen) == -1)
{
return -1;
}
beginRowId_.assign(rowIdPtr, rowIdLen);
}
else
{
beginRowId_.assign(hsr->beginRowId_, rowIdLen);
}
const char * inputEndRowIdVals = (char*)hsr->endRowId_;
rowIdLen = *(short*)inputEndRowIdVals;
if (rowIdLen > 0)
{
if (evalRowIdAsciiExpr(inputEndRowIdVals,
endRowIdRow_,
rowIdPtr,
(!hsr->endKeyExclusive_),
rowIdLen) == -1)
{
return -1;
}
endRowId_.assign(rowIdPtr, rowIdLen);
}
else
{
endRowId_.assign(hsr->endRowId_, rowIdLen);
}
setupListOfColNames(hbaseAccessTdb().listOfFetchedColNames(), columns_);
return 0;
}
Lng32 ExHbaseAccessTcb::initNextKeyRange(sql_buffer_pool *pool,
atp_struct * atp)
{
if (keyExeExpr())
keyExeExpr()->initNextKeyRange(pool, atp);
else
return -1;
return 0;
}
Lng32 ExHbaseAccessTcb::setupUniqueKeyAndCols(NABoolean doInit)
{
if (doInit)
rowIds_.clear();
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
ex_expr::exp_return_type exprRetCode = ex_expr::EXPR_OK;
keyRangeEx::getNextKeyRangeReturnType keyRangeStatus;
initNextKeyRange();
keyRangeStatus =
keySubsetExeExpr_->getNextKeyRange(pentry_down->getAtp(), FALSE, TRUE);
if (keyRangeStatus == keyRangeEx::EXPRESSION_ERROR)
{
return -1;
}
tupp &keyData = keySubsetExeExpr_->getBkData();
char * beginKeyRow = keyData.getDataPointer();
HbaseStr rowIdRowText;
if ((NOT hbaseAccessTdb().sqHbaseTable()) ||
(hbaseAccessTdb().keyInVCformat()))
{
// Key is in varchar format.
short keyLen = *(short*)beginKeyRow;
rowIdRowText.val = beginKeyRow + sizeof(short);
rowIdRowText.len = keyLen;
}
else
{
rowIdRowText.val = beginKeyRow;
rowIdRowText.len = hbaseAccessTdb().keyLen_;
}
if (keyRangeStatus == keyRangeEx::NO_MORE_RANGES)
{
// At the end of range, added an extra byte with "0" value to
// make the key exclusive which is required by hbase scan.
// Note that the key buffer should have 2 extra bytes allocated
rowIdRowText.val[rowIdRowText.len] = '\0';
rowIdRowText.len += 1;
}
rowIds_.insert(rowIdRowText);
if (doInit)
setupListOfColNames(hbaseAccessTdb().listOfFetchedColNames(), columns_);
return 0;
}
keyRangeEx::getNextKeyRangeReturnType ExHbaseAccessTcb::setupSubsetKeys
(NABoolean fetchRangeHadRows)
{
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
ex_expr::exp_return_type exprRetCode = ex_expr::EXPR_OK;
keyRangeEx::getNextKeyRangeReturnType keyRangeStatus;
keyRangeStatus =
keyExeExpr()->getNextKeyRange(pentry_down->getAtp(), fetchRangeHadRows,
(hbaseAccessTdb().sqHbaseTable() ? TRUE : FALSE));
if (keyRangeStatus == keyRangeEx::EXPRESSION_ERROR)
{
return keyRangeEx::EXPRESSION_ERROR;
}
tupp &beginKeyData = keyExeExpr()->getBkData();
char * beginKeyRow = beginKeyData.getDataPointer();
tupp &endKeyData = keyExeExpr()->getEkData();
char * endKeyRow = endKeyData.getDataPointer();
short beginKeyLen = hbaseAccessTdb().keyLen_;
short endKeyLen = hbaseAccessTdb().keyLen_;
if ((NOT hbaseAccessTdb().sqHbaseTable()) ||
(hbaseAccessTdb().keyInVCformat()))
{
// key in varchar format
beginKeyLen = *(short*)beginKeyRow;
beginKeyRow += sizeof(short);
if (beginKeyRow[0] == '\0')
beginKeyLen = 1;
endKeyLen = *(short*)endKeyRow;
endKeyRow += sizeof(short);
if (endKeyRow[0] == '\0')
endKeyLen = 1;
}
if (keyRangeStatus == keyRangeEx::NO_MORE_RANGES)
{
memset(beginKeyRow, '\377', beginKeyLen);
beginRowId_.assign(beginKeyRow, beginKeyLen);
beginRowId_.append(1, '\0');
memset(endKeyRow, '\0', endKeyLen);
endRowId_.assign(endKeyRow, endKeyLen);
return keyRangeEx::NO_MORE_RANGES;
}
else
{
beginRowId_.assign(beginKeyRow, beginKeyLen);
endRowId_.assign(endKeyRow, endKeyLen);
if (keyExeExpr()->getBkExcludeFlag())
beginRowId_.append(1, '\0');
if (! keyExeExpr()->getEkExcludeFlag())
endRowId_.append(1, '\0');
}
return keyRangeStatus;
}
Lng32 ExHbaseAccessTcb::setupSubsetKeysAndCols()
{
initNextKeyRange();
if (setupSubsetKeys() == keyRangeEx::EXPRESSION_ERROR)
return -1;
setupListOfColNames(hbaseAccessTdb().listOfFetchedColNames(), columns_);
return 0;
}
Lng32 ExHbaseAccessTcb::genAndAssignSyskey(UInt16 tuppIndex, char * tuppRow)
{
if (hbaseAccessTdb().addSyskeyTS())
{
char * syskeyPtr;
if (hbaseAccessTdb().alignedFormat())
{
// syskey is the first attribute
ExpTupleDesc * rowTD =
hbaseAccessTdb().workCriDesc_->getTupleDescriptor
(tuppIndex);
Attributes * attr = rowTD->getAttr(0);
syskeyPtr = &tuppRow[attr->getOffset()];
}
else
{
syskeyPtr = tuppRow;
}
*(Int64*)syskeyPtr = generateUniqueValueFast();
}
return 0;
}
void ExHbaseAccessTcb::allocateDirectBufferForJNI(UInt32 rowLen)
{
if (rowLen > directRowBufferLen_)
{
if (directRowBuffer_ != NULL)
{
NADELETEBASIC(directRowBuffer_, getHeap());
directRowBuffer_ = NULL;
}
}
if (directRowBuffer_ == NULL)
{
directRowBuffer_ = new (getHeap()) BYTE[rowLen];
directRowBufferLen_ = rowLen;
}
row_.val = (char *)directRowBuffer_;
row_.len = 0;
}
void ExHbaseAccessTcb::allocateDirectRowBufferForJNI(
short numCols, short maxRows)
{
UInt32 directBufferOverhead;
UInt32 maxRowLen;
UInt32 rowLen = hbaseAccessTdb().getRowLen();
if (directRowBuffer_ == NULL)
{
directBufferOverhead = sizeof(numCols) + // Number of columns in the row
(numCols * (2 + // for name len
5 + // for colname len - To accomadate upto 32767 column nos
4 )) // for col value len
+ numCols; // 1 byte null value
maxRowLen = rowLen + directBufferOverhead;
// limit direct buffer len to 1G
Int64 tempDirectBuflen = (Int64)maxRowLen * (Int64)maxRows;
Int64 maxDirectBuflen = 1024*1024*1024;
if (tempDirectBuflen > maxDirectBuflen)
{
directRowBufferLen_ = MINOF(tempDirectBuflen, maxDirectBuflen);
directBufferMaxRows_ = (Int64)directRowBufferLen_ / (Int64)maxRowLen;
}
else
{
directRowBufferLen_ = (maxRowLen * maxRows);
directBufferMaxRows_ = maxRows;
}
directRowBuffer_ = new (getHeap()) BYTE[directRowBufferLen_];
rows_.val = (char *)directRowBuffer_;
rows_.len = 0;
}
if (directBufferRowNum_ == 0)
{
row_.val = rows_.val;
row_.len = 0;
rows_.len = 0;
}
else
{
rows_.len += row_.len;
ex_assert(rows_.len < directRowBufferLen_, "Direct row buffer overflow");
row_.val = rows_.val + rows_.len;
row_.len = 0;
}
return;
}
short ExHbaseAccessTcb::patchDirectRowBuffers()
{
ex_assert(rows_.val != NULL, "Direct row buffer not allocaed");
rows_.len += row_.len;
ex_assert(rows_.len < directRowBufferLen_, "Direct row buffer overflow");
row_.val = NULL;
row_.len = 0;
return patchDirectRowIDBuffers();
}
short ExHbaseAccessTcb::patchDirectRowIDBuffers()
{
ex_assert(rowIDs_.val != NULL, "Direct rowIDs buffer not allocaed");
short *numRows = (short *)rowIDs_.val;
*numRows = bswap_16(directBufferRowNum_);
short numRowsInBuffer = directBufferRowNum_;
directBufferRowNum_ = 0;
return numRowsInBuffer;
}
void ExHbaseAccessTcb::allocateDirectRowIDBufferForJNI(short maxRows)
{
UInt32 rowIDLen;
UInt32 maxRowIDLen;
rowIDLen = hbaseAccessTdb().getRowIDLen();
if (directRowIDBuffer_ == NULL)
{
directRowIDBufferLen_ = ((rowIDLen+1)* maxRows) + sizeof(short); // For no. of Rows
directRowIDBuffer_ = new (getHeap()) BYTE[directRowIDBufferLen_];
rowIDs_.val = (char *)directRowIDBuffer_;
rowIDs_.len = sizeof(short); // To store num of Rows
}
if (directBufferRowNum_ == 0)
{
rowIDs_.len = sizeof(short);
dbRowID_.val = rowIDs_.val + sizeof(short);
dbRowID_.len = 0;
}
}
short ExHbaseAccessTcb::createDirectRowBuffer(Text &colFamily,
Text &colName,
Text &colVal)
{
short colNameLen = colFamily.size() + colName.size() + 1;
Int32 colValLen = colVal.size();
UInt32 rowLen = sizeof(short) + sizeof(short) + // numCols, column length
colNameLen +
sizeof(Int32) + colValLen;
if (rowLen > directRowBufferLen_)
{
if (directRowBuffer_ != NULL)
{
NADELETEBASIC(directRowBuffer_, getHeap());
directRowBuffer_ = NULL;
}
}
if (directRowBuffer_ == NULL)
{
directRowBuffer_ = new (getHeap()) BYTE[rowLen];
directRowBufferLen_ = rowLen;
}
row_.val = (char *)directRowBuffer_;
BYTE *temp = (BYTE *)row_.val;
short numCols = 1;
*(short *)temp = bswap_16(numCols);
temp += sizeof(numCols);
*(short *)temp = bswap_16(colNameLen);
temp += sizeof(colNameLen);
memcpy(temp, colFamily.data(), colFamily.size());
temp += colFamily.size();
*temp = ':';
temp ++;
memcpy(temp, colName.data(), colName.size());
temp += colName.size();
*(Int32 *)temp = bswap_32(colValLen);
temp += sizeof(colValLen);
memcpy(temp, colVal.data(), colValLen);
temp += colValLen;
row_.len = (temp-(BYTE *)row_.val);
return 0;
}
Lng32 ExHbaseAccessTcb::copyColToDirectBuffer( BYTE *rowCurPtr,
char *colName, short colNameLen,
NABoolean prependNullVal, char nullVal,
char *colVal, Int32 colValLen)
{
Int32 bytesCopied;
assert(directRowBufferLen_ >= (row_.len+colNameLen+colValLen+4));
BYTE *temp = rowCurPtr;
*(short *)temp = bswap_16(colNameLen);
temp += sizeof(colNameLen);
memcpy(temp, colName, colNameLen);
temp += colNameLen;
*(Int32 *)temp = bswap_32(colValLen+prependNullVal);
temp += sizeof(colValLen);
if (prependNullVal)
*temp++ = nullVal;
memcpy(temp, colVal, colValLen);
temp += colValLen;
bytesCopied = (temp-rowCurPtr);
ex_assert((bytesCopied > 0), "Corrupted buffer while copying column");
return bytesCopied;
}
short ExHbaseAccessTcb::copyRowIDToDirectBuffer(HbaseStr &rowID)
{
if (directBufferRowNum_ == 0)
{
rowIDs_.len = sizeof(short);
dbRowID_.val = rowIDs_.val + sizeof(short);
dbRowID_.len = 0;
}
UInt32 keyLen = hbaseAccessTdb().getRowIDLen();
if (keyLen == rowID.len)
*dbRowID_.val = '0';
else if ((keyLen+1) == rowID.len)
*dbRowID_.val = '1';
else
ex_assert(FALSE, "Invalid RowID length");
dbRowID_.val++;
rowIDs_.len++;
memcpy(dbRowID_.val, rowID.val, rowID.len);
dbRowID_.val += rowID.len;
dbRowID_.len = 0;
rowIDs_.len += rowID.len;
directBufferRowNum_++;
return 0;
}
short ExHbaseAccessTcb::createDirectRowBuffer( UInt16 tuppIndex,
char * tuppRow,
Queue * listOfColNames,
Queue * listOfOmittedColNames,
NABoolean isUpdate,
std::vector<UInt32> * posVec,
double samplingRate )
{
char hiveBuff[500]; //@ZXtemp
size_t hiveBuffInx = 0;
NABoolean includeInSample = (samplingRate > 0 && (rand() / (double)RAND_MAX) < samplingRate);
Int16 datatype;
Int16 scale = 0;
union
{
Int32 i;
UInt32 ui;
Int64 i64;
Int16 i16;
float f;
double d;
} numUnion;
if (hbaseAccessTdb().alignedFormat())
return createDirectAlignedRowBuffer(tuppIndex, tuppRow, listOfColNames,
isUpdate, posVec);
ExpTupleDesc * rowTD =
hbaseAccessTdb().workCriDesc_->getTupleDescriptor
(tuppIndex);
short colNameLen;
char * colName;
short nullVal = 0;
short nullValLen = 0;
Int32 colValLen;
char *colVal;
char *str;
NABoolean prependNullVal;
char nullValChar = 0;
Attributes * attr;
int numCols = 0;
short *numColsPtr;
char *str_1;
NABoolean omittedColFound;
allocateDirectRowBufferForJNI(rowTD->numAttrs());
BYTE *rowCurPtr = (BYTE *)row_.val;
numColsPtr = (short *)rowCurPtr;
row_.len += sizeof(short);
rowCurPtr += sizeof(short);
listOfColNames->position();
for (Lng32 i = 0; i < rowTD->numAttrs(); i++)
{
Attributes * attr;
if (!posVec)
attr = rowTD->getAttr(i);
else
{
attr = rowTD->getAttr((*posVec)[i] -1);
}
if (attr)
{
if (!posVec)
{
extractColNameFields((char*)listOfColNames->getCurr(),
colNameLen, colName);
if (listOfOmittedColNames != NULL) {
omittedColFound = FALSE;
listOfOmittedColNames->position();
while ((str_1 = (char *)listOfOmittedColNames->getNext()) != NULL) {
str = (char*)listOfColNames->getCurr();
if (memcmp(str, str_1, colNameLen+2) == 0) {
omittedColFound = TRUE;
break;
}
}
if (omittedColFound) {
listOfColNames->advance();
continue ;
}
}
}
else
{
str = (char*)listOfColNames->getCurr();
colNameLen = *(short*)(str + sizeof(UInt32));
colName = str + sizeof(short) + sizeof(UInt32);
}
colVal = &tuppRow[attr->getOffset()];
if (includeInSample)
{
datatype = attr->getDatatype();
if (DFS2REC::isNumeric(datatype))
{
scale = attr->getScale();
strncpy((char*)&numUnion, colVal, 8);
}
switch (datatype)
{
case REC_BIN32_SIGNED:
hiveBuffInx += snprintf(hiveBuff+hiveBuffInx, 500 - hiveBuffInx, "%d|", numUnion.i);
break;
case REC_BIN32_UNSIGNED:
hiveBuffInx += snprintf(hiveBuff+hiveBuffInx, 500 - hiveBuffInx, "%d|", numUnion.ui);
break;
case REC_BIN64_SIGNED:
//printf("Scale of column %d is %d\n", i, attr->getScale());
hiveBuffInx += snprintf(hiveBuff+hiveBuffInx, 500 - hiveBuffInx, PFP64 "|", attr->getScale(), numUnion.i64);
if (scale)
{
hiveBuffInx -= (scale + 1);
memmove(hiveBuff+hiveBuffInx+1, hiveBuff+hiveBuffInx, scale + 1);
hiveBuff[hiveBuffInx] = '.';
hiveBuffInx += (scale + 2);
}
break;
case REC_BYTE_F_ASCII:
strncpy(hiveBuff+hiveBuffInx, colVal, attr->getLength());
hiveBuffInx += attr->getLength();
hiveBuff[hiveBuffInx++] = '|';
break;
case REC_BYTE_V_ASCII:
hiveBuffInx += snprintf(hiveBuff+hiveBuffInx, 500 - hiveBuffInx, "%s|", colVal);
break;
case REC_DATETIME:
//@ZXbl -- need to account for datetime code
strncpy((char*)&numUnion, colVal, 2);
hiveBuffInx += snprintf(hiveBuff+hiveBuffInx, 500 - hiveBuffInx,
"%.4d-%.2d-%.2d|", numUnion.i16,
*(colVal+1), *(colVal+2));
break;
default:
hiveBuffInx += sprintf(hiveBuff+hiveBuffInx, "???|");
break;
}
}
prependNullVal = FALSE;
nullVal = 0;
if (attr->getNullFlag())
{
nullVal = *(short*)&tuppRow[attr->getNullIndOffset()];
if (hbaseAccessTdb().hbaseMapTable())
prependNullVal = FALSE;
else if ((attr->getDefaultClass() != Attributes::DEFAULT_NULL) &&
(nullVal))
prependNullVal = TRUE;
else if (isUpdate && nullVal)
prependNullVal = TRUE;
else if (! nullVal)
prependNullVal = TRUE;
if ((NOT prependNullVal) && (nullVal))
goto label_end1;
}
if (prependNullVal)
{
nullValChar = 0;
if (nullVal)
nullValChar = -1;
}
colValLen = attr->getLength(&tuppRow[attr->getVCLenIndOffset()]);
Int32 bytesCopied = copyColToDirectBuffer(rowCurPtr, colName,
colNameLen, prependNullVal, nullValChar, colVal, colValLen);
rowCurPtr += bytesCopied;
row_.len += bytesCopied;
numCols++;
}
else
{
ex_assert(false, "Unable to obtain column descriptor");
}
label_end1:
listOfColNames->advance();
} // for
*numColsPtr = bswap_16(numCols);
if (includeInSample)
{
// Overwrite trailing delimiter with newline.
hiveBuff[hiveBuffInx-1] = '\n';
HDFS_Client_RetCode hdfsClientRetcode;
sampleFileHdfsClient()->hdfsWrite(hiveBuff, hiveBuffInx, hdfsClientRetcode);
}
return 0;
}
short ExHbaseAccessTcb::createDirectAlignedRowBuffer( UInt16 tuppIndex,
char * tuppRow,
Queue * listOfColNames,
NABoolean isUpdate,
std::vector<UInt32> * posVec )
{
ExpTupleDesc * rowTD =
hbaseAccessTdb().workCriDesc_->getTupleDescriptor
(tuppIndex);
short colNameLen;
char * colName;
short nullVal = 0;
short nullValLen = 0;
Int32 colValLen;
char *colVal;
char *str;
short * numColsPtr;
// allocateDirectRowBufferForJNI(rowTD->numAttrs());
allocateDirectRowBufferForJNI(1);
BYTE *rowCurPtr = (BYTE *)row_.val;
numColsPtr = (short *)rowCurPtr;
row_.len += sizeof(short);
rowCurPtr += sizeof(short);
listOfColNames->position();
extractColNameFields((char*)listOfColNames->getCurr(),
colNameLen, colName);
colVal = tuppRow;
colValLen = insertRowlen_;
Int32 bytesCopied =
copyColToDirectBuffer(rowCurPtr, colName, colNameLen,
FALSE, 0,
colVal, colValLen);
rowCurPtr += bytesCopied;
row_.len += bytesCopied;
*numColsPtr = bswap_16(1);
return 0;
}
short ExHbaseAccessTcb::createDirectRowwiseBuffer(char * inputRow)
{
short numEntries;
short colNameLen;
Int32 colValLen;
short nullVal;
char *colName;
char *colVal;
Int32 bytesCopied;
char nullValChar = 0;
Int32 rowLen;
char *curPtr;
short maxColNameLen;
short colValVCIndLen;
Int32 maxColValLen;
curPtr = inputRow;
numEntries = *((short *)curPtr);
curPtr += sizeof(short);
maxColNameLen = *((short *)curPtr);
curPtr += sizeof(short);
colValVCIndLen = *((short *)curPtr);
curPtr += sizeof(short);
maxColValLen = *((Int32 *)curPtr);
rowLen = sizeof(short) + // For row number
sizeof(short) // For number of columns
+ (numEntries * (ROUND2(maxColNameLen)+ ROUND2(maxColValLen)
+ (sizeof(short) + sizeof(Int32)))); // Store colNameLen and colValLen
short numCols = 0;
short *numColsPtr ;
allocateDirectBufferForJNI(rowLen);
BYTE *rowCurPtr = (BYTE *)row_.val;
numColsPtr = (short *)rowCurPtr;
rowCurPtr += sizeof(short); // To store numCols later
row_.len += sizeof(short);
curPtr = inputRow;
curPtr += (3*sizeof(short) + sizeof(Int32)); // skip numEntries, maxColNameLen,
// colValVCIndLen, maxColValLen
for (Lng32 ij = 0; ij < numEntries; ij++)
{
colNameLen = *(short*)curPtr;
curPtr += sizeof(short);
colName = curPtr;
curPtr += ROUND2(maxColNameLen);
nullVal = *(short*)curPtr;
curPtr += sizeof(short);
if (colValVCIndLen == sizeof(short))
{
colValLen = *(short*)curPtr;
curPtr += sizeof(short);
}
else
{
curPtr = (char*)ROUND4((Int64)curPtr);
colValLen = *(Int32*)curPtr;
curPtr += sizeof(Int32);
}
colVal = curPtr;
curPtr += ROUND2(maxColValLen);
if (! nullVal)
{
bytesCopied = copyColToDirectBuffer(rowCurPtr, colName,
colNameLen, FALSE, nullValChar, colVal, colValLen);
rowCurPtr += bytesCopied;
row_.len += bytesCopied;
numCols++;
}
}
*numColsPtr = bswap_16(numCols);
return 0;
}
short ExHbaseAccessTcb::setupHbaseFilterPreds()
{
if ((!hbaseAccessTdb().listOfHbaseFilterColNames()) ||
(hbaseAccessTdb().listOfHbaseFilterColNames()->numEntries() == 0))
return 0;
if (hbaseFilterValExpr()){// with pushdown V2 it can be null if we have only unary operation
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
workAtp_->getTupp(hbaseAccessTdb().hbaseFilterValTuppIndex_)
.setDataPointer(hbaseFilterValRow_);
ex_expr::exp_return_type evalRetCode =
hbaseFilterValExpr()->eval(pentry_down->getAtp(), workAtp_);
if (evalRetCode == ex_expr::EXPR_ERROR)
{
return -1;
}
ExpTupleDesc * hfrTD =
hbaseAccessTdb().workCriDesc_->getTupleDescriptor
(hbaseAccessTdb().hbaseFilterValTuppIndex_);
hbaseFilterValues_.clear();
//for each evaluated value, populate the corresponding hBaseFilterValue
for (Lng32 i = 0; i < hfrTD->numAttrs(); i++)
{
Attributes * attr = hfrTD->getAttr(i);
if (attr)
{
NAString value(getHeap());
if (attr->getNullFlag())
{
char nullValChar = 0;
short nullVal = *(short*)&hbaseFilterValRow_[attr->getNullIndOffset()];
if (nullVal)
nullValChar = -1;
value.append((char*)&nullValChar, sizeof(char));
}
char * colVal = &hbaseFilterValRow_[attr->getOffset()];
value.append(colVal,
attr->getLength(&hbaseFilterValRow_[attr->getVCLenIndOffset()]));
hbaseFilterValues_.insert(value);
}
}
}
setupListOfColNames(hbaseAccessTdb().listOfHbaseFilterColNames(),
hbaseFilterColumns_);
hbaseFilterOps_.clear();
hbaseAccessTdb().listOfHbaseCompareOps()->position();
while (NOT hbaseAccessTdb().listOfHbaseCompareOps()->atEnd())
{
char * op = (char*)hbaseAccessTdb().listOfHbaseCompareOps()->getCurr();
hbaseFilterOps_.insert(op);
hbaseAccessTdb().listOfHbaseCompareOps()->advance();
}
return 0;
}
void ExHbaseAccessTcb::setRowID(char *rowId, Lng32 rowIdLen)
{
if (rowId == NULL)
{
rowID_.val = NULL;
rowID_.len = 0;
return;
}
if (rowIdLen > rowIDAllocatedLen_)
{
if (rowIDAllocatedVal_)
{
NADELETEBASIC(rowIDAllocatedVal_, getHeap());
}
rowIDAllocatedVal_ = new (getHeap()) char[rowIdLen];
rowIDAllocatedLen_ = rowIdLen;
}
rowID_.val = rowIDAllocatedVal_;
rowID_.len = rowIdLen;
memcpy(rowID_.val, rowId, rowIdLen);
}
void ExHbaseAccessTcb::buildLoggingFileName(NAHeap *heap,
const char * currCmdLoggingLocation,
const char *tableName,
const char * loggingFileNamePrefix,
Lng32 instId,
char *&loggingFileName)
{
if (loggingFileName != NULL)
NADELETEBASIC(loggingFileName, heap);
loggingFileName = NULL;
if (currCmdLoggingLocation == NULL)
return;
short logLen = strlen(currCmdLoggingLocation)+strlen(loggingFileNamePrefix)+strlen(tableName)+100;
loggingFileName = new (heap) char[logLen];
sprintf(loggingFileName, "%s/%s_%s_%d",
currCmdLoggingLocation, loggingFileNamePrefix, tableName, instId);
}
void ExHbaseAccessTcb::buildLoggingPath(
const char *loggingLocation,
char * logId,
const char * tableName,
char *currCmdLoggingLocation)
{
time_t t;
char logId_tmp[30];
if (logId == NULL || strlen(logId) == 0)
{
time(&t);
struct tm * curgmtime = gmtime(&t);
strftime(logId_tmp, sizeof(logId_tmp)-1, "%Y%m%d_%H%M%S", curgmtime);
logId = logId_tmp;
}
sprintf(currCmdLoggingLocation, "%s/ERR_%s_%s",
loggingLocation, tableName, logId);
}
void ExHbaseAccessTcb::handleException(NAHeap *heap,
char *logErrorRow,
Lng32 logErrorRowLen,
ComCondition *errorCond)
{
Lng32 errorMsgLen = 0;
charBuf *cBuf = NULL;
char *errorMsg;
HDFS_Client_RetCode hdfsClientRetcode;
if (loggingErrorDiags_ != NULL)
return;
if (!loggingFileCreated_) {
logFileHdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), NULL, hdfsClientRetcode);
if (hdfsClientRetcode == HDFS_CLIENT_OK)
hdfsClientRetcode = logFileHdfsClient_->hdfsCreate(loggingFileName_, TRUE, FALSE);
if (hdfsClientRetcode == HDFS_CLIENT_OK)
loggingFileCreated_ = TRUE;
else
goto logErrorReturn;
}
logFileHdfsClient_->hdfsWrite(logErrorRow, logErrorRowLen, hdfsClientRetcode);
if (hdfsClientRetcode != HDFS_CLIENT_OK)
goto logErrorReturn;
if (errorCond != NULL) {
errorMsgLen = errorCond->getMessageLength();
const NAWcharBuf wBuf((NAWchar*)errorCond->getMessageText(), errorMsgLen, heap);
cBuf = unicodeToISO88591(wBuf, heap, cBuf);
errorMsg = (char *)cBuf->data();
errorMsgLen = cBuf -> getStrLen();
errorMsg[errorMsgLen]='\n';
errorMsgLen++;
}
else {
errorMsg = (char *)"[UNKNOWN EXCEPTION]\n";
errorMsgLen = strlen(errorMsg);
}
logFileHdfsClient_->hdfsWrite(errorMsg, errorMsgLen, hdfsClientRetcode);
logErrorReturn:
if (hdfsClientRetcode != HDFS_CLIENT_OK) {
loggingErrorDiags_ = ComDiagsArea::allocate(heap);
*loggingErrorDiags_ << DgSqlCode(EXE_ERROR_WHILE_LOGGING)
<< DgString0(loggingFileName_)
<< DgString1((char *)GetCliGlobals()->getJniErrorStr());
}
return;
}
void ExHbaseAccessTcb::incrErrorCount( ExpHbaseInterface * ehi,Int64 & totalExceptionCount,
const char * tabName, const char * rowId )
{
Lng32 retcode;
retcode = ehi->incrCounter(tabName, rowId, (const char*)"ERRORS",(const char*)"ERROR_COUNT",1, totalExceptionCount);
}
void ExHbaseAccessTcb::getErrorCount( ExpHbaseInterface * ehi,Int64 & totalExceptionCount,
const char * tabName, const char * rowId )
{
Lng32 retcode;
retcode = ehi->incrCounter(tabName, rowId, (const char*)"ERRORS",(const char*)"ERROR_COUNT",0, totalExceptionCount);
}
static const char * const BatchSizeEnvvar =
getenv("SQL_CANCEL_BATCH_SIZE");
static const Lng32 BatchSize = (BatchSizeEnvvar &&
atoi(BatchSizeEnvvar) > 0 )?
atoi(BatchSizeEnvvar) : 8192;
ExHbaseTaskTcb::ExHbaseTaskTcb(ExHbaseAccessTcb * tcb, NABoolean rowsetTcb)
: tcb_(tcb)
, batchSize_(BatchSize)
, rowsetTcb_(rowsetTcb)
{}
ExWorkProcRetcode ExHbaseTaskTcb::work(short &rc)
{
ex_assert(0, "Should not reach ExHbaseTaskTcb::work()");
return WORK_OK;
}
ExHbaseAccessBulkLoadTaskTcb::ExHbaseAccessBulkLoadTaskTcb(const ExHbaseAccessTdb &hbaseAccessTdb, ex_globals * glob) :
ExHbaseAccessTcb(hbaseAccessTdb, glob), step_(NOT_STARTED)
{
}
ExWorkProcRetcode ExHbaseAccessBulkLoadTaskTcb::work()
{
short retcode = 0;
short rc = 0;
Queue * indexList = NULL; // this list includes the base table too.
char * indexName ;
NABoolean cleanupAfterComplete = FALSE;
Text tabName;
// if no parent request, return
if (qparent_.down->isEmpty())
return WORK_OK;
while (1)
{
switch (step_)
{
case NOT_STARTED:
{
matches_ = 0;
retcode = ehi_->initHBLC();
if (setupError(retcode, "ExpHbaseInterface::initHBLC"))
{
step_ = HANDLE_ERROR_AND_CLOSE;
break;
}
indexList = hbaseAccessTdb().listOfIndexesAndTable();
if (!indexList) {
if (setupError(-1, "listOfIndexesAndTable is empty"))
{
step_ = HANDLE_ERROR_AND_CLOSE;
break;
}
}
indexList->position();
indexName = NULL;
cleanupAfterComplete = FALSE;
step_ = GET_NAME;
}
break;
case GET_NAME:
{
char * indexName = (char*)indexList->getNext();
table_.val = indexName;
table_.len = strlen(indexName);
hBulkLoadPrepPath_ = std::string(((ExHbaseAccessTdb&) hbaseAccessTdb()).getLoadPrepLocation())
+ indexName ;
tabName = indexName;
if (((ExHbaseAccessTdb&) hbaseAccessTdb()).getIsTrafLoadCleanup() || cleanupAfterComplete )
step_ = LOAD_CLEANUP;
else
step_ = COMPLETE_LOAD;
}
break;
case LOAD_CLEANUP:
{
//cleanup
retcode = ehi_->bulkLoadCleanup(table_, hBulkLoadPrepPath_);
if (setupError(retcode, "ExpHbaseInterface::bulkLoadCleanup"))
{
step_ = HANDLE_ERROR_AND_CLOSE;
break;
}
if (!indexList->atEnd())
step_ = GET_NAME;
else
step_ = LOAD_CLOSE;
}
break;
case COMPLETE_LOAD:
{
retcode = ehi_->doBulkLoad(table_,
hBulkLoadPrepPath_,
tabName,
hbaseAccessTdb().getUseQuasiSecure(),
hbaseAccessTdb().getTakeSnapshot());
if (setupError(retcode, "ExpHbaseInterface::doBulkLoad"))
{
step_ = HANDLE_ERROR_AND_CLOSE;
break;
}
if (((ExHbaseAccessTdb&) hbaseAccessTdb()).getIsTrafLoadKeepHFiles())
{
if (!indexList->atEnd())
step_ = GET_NAME;
else
step_ = LOAD_CLOSE;
break;
}
if (!indexList->atEnd())
step_ = GET_NAME;
else {
indexList->position();
cleanupAfterComplete = TRUE;
step_ = GET_NAME;
}
}
break;
case LOAD_CLOSE:
{
retcode = ehi_->close();
if (setupError(retcode, "ExpHbaseInterface::close"))
{
step_ = HANDLE_ERROR;
break;
}
step_ = DONE;
}
break;
case HANDLE_ERROR:
case HANDLE_ERROR_AND_CLOSE:
{
if (handleError(rc))
return rc;
if (step_==HANDLE_ERROR_AND_CLOSE)
{
step_ = LOAD_CLOSE;
break;
}
step_ = DONE;
}
break;
case DONE:
{
if (handleDone(rc))
return rc;
step_ = NOT_STARTED;
retcode = ehi_->close();
return WORK_OK;
}
break;
} // switch
} // while
}