blob: 7849746172ad53d20b4426d213a29d55c9ced995 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: ExFastTransport.cpp
* Description: TDB/TCB for Fast transport
*
* Created: 08/28/2012
* Language: C++
*
*
*****************************************************************************
*/
#include "ex_stdh.h"
#include "ExFastTransport.h"
#include "ex_globals.h"
#include "ex_exe_stmt_globals.h"
#include "exp_attrs.h"
#include "exp_clause_derived.h"
#include "ex_error.h"
#include "ExStats.h"
#include "ExCextdecs.h"
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <errno.h>
#include <pthread.h>
#include "ComSysUtils.h"
#include "SequenceFileReader.h"
#include "HdfsClient_JNI.h"
#include "cli_stdh.h"
#include "ComSmallDefs.h"
//----------------------------------------------------------------------
// ExFastExtractTcb methods
//----------------------------------------------------------------------
ex_tcb *ExFastExtractTdb::build(ex_globals *glob)
{
// first build the child TCBs
ex_tcb *childTcb ;
ExFastExtractTcb *feTcb;
childTcb = childTdb_->build(glob);
feTcb = new (glob->getSpace()) ExHdfsFastExtractTcb(*this, *childTcb,glob);
feTcb->registerSubtasks();
feTcb->registerResizeSubtasks();
return feTcb;
}
ExFastExtractTcb::ExFastExtractTcb(
const ExFastExtractTdb &fteTdb,
const ex_tcb & childTcb,
ex_globals *glob)
: ex_tcb(fteTdb, 1, glob),
workAtp_(NULL),
outputPool_(NULL),
inputPool_(NULL),
childTcb_(&childTcb)
, inSqlBuffer_(NULL)
, childOutputTD_(NULL)
, sourceFieldsConvIndex_(NULL)
, currBuffer_(NULL)
, bufferAllocFailuresCount_(0)
, modTS_(-1)
{
ex_globals *stmtGlobals = getGlobals();
Space *globSpace = getSpace();
CollHeap *globHeap = getHeap();
heap_ = globHeap;
//convert to non constant to access the members.
ExFastExtractTdb *mytdb = (ExFastExtractTdb*)&fteTdb;
numBuffers_ = mytdb->getNumIOBuffers();
// Allocate queues to communicate with parent
allocateParentQueues(qParent_);
// get the queue that child use to communicate with me
qChild_ = childTcb.getParentQueue();
// Allocate the work ATP
if (myTdb().getWorkCriDesc())
workAtp_ = allocateAtp(myTdb().getWorkCriDesc(), globSpace);
// Fixup expressions
// NOT USED in M9
/*
if (myTdb().getInputExpression())
myTdb().getInputExpression()->fixup(0, getExpressionMode(), this,
globSpace, globHeap);
if (myTdb().getOutputExpression())
myTdb().getOutputExpression()->fixup(0, getExpressionMode(), this,
globSpace, globHeap);
*/
if (myTdb().getChildDataExpr())
myTdb().getChildDataExpr()->fixup(0,getExpressionMode(),this,
globSpace, globHeap, FALSE, glob);
//maybe we can move the below few line to the init section od work methods??
UInt32 numAttrs = myTdb().getChildTuple()->numAttrs();
sourceFieldsConvIndex_ = (int *)((NAHeap *)heap_)->allocateAlignedHeapMemory((UInt32)(sizeof(int) * numAttrs), 512, FALSE);
maxExtractRowLength_ = ROUND8(myTdb().getChildDataRowLen()) ;
const ULng32 sqlBufferSize = maxExtractRowLength_ +
ROUND8(sizeof(SqlBufferNormal)) +
sizeof(tupp_descriptor) +
16 ;//just in case
inSqlBuffer_ = (SqlBuffer *) new (heap_) char[sqlBufferSize];
inSqlBuffer_->driveInit(sqlBufferSize, TRUE, SqlBuffer::NORMAL_);
childOutputTD_ = inSqlBuffer_->add_tuple_desc(maxExtractRowLength_);
endOfData_ = FALSE;
} // ExFastExtractTcb::ExFastExtractTcb
ExFastExtractTcb::~ExFastExtractTcb()
{
// Release resources acquired
//
freeResources();
delete qParent_.up;
delete qParent_.down;
if (workAtp_)
{
workAtp_->release();
deallocateAtp(workAtp_, getSpace());
}
if (inSqlBuffer_ && getHeap())
{
getHeap()->deallocateMemory(inSqlBuffer_);
inSqlBuffer_ = NULL;
childOutputTD_ = NULL;
}
if (sourceFieldsConvIndex_)
getHeap()->deallocateMemory(sourceFieldsConvIndex_);
} // ExFastExtractTcb::~ExFastExtractTcb()
//
// This function frees any resources acquired.
// It should only be called by the TCB destructor.
//
void ExFastExtractTcb::freeResources()
{
for(Int16 i=0; i < numBuffers_; i++)
{
if(bufferPool_[i])
{
NADELETE(bufferPool_[i], IOBuffer, getHeap());
bufferPool_[i] = NULL;
}
}
}
void ExFastExtractTcb::registerSubtasks()
{
ex_tcb :: registerSubtasks();
}
ex_tcb_private_state *ExFastExtractTcb::allocatePstates(
Lng32 &numElems, // [IN/OUT] desired/actual elements
Lng32 &pstateLength) // [OUT] length of one element
{
PstateAllocator<ExFastExtractPrivateState> pa;
return pa.allocatePstates(this, numElems, pstateLength);
}
// Insert a single entry into the up queue and optionally
// remove the head of the down queue
//
// Right now this function does not handle data rows, only error
// and end-of-data. It could possibly be extended to handle a data
// row. I have not looked at that closely enough yet.
//
void ExFastExtractTcb::insertUpQueueEntry(ex_queue::up_status status,
ComDiagsArea *diags,
NABoolean popDownQueue)
{
ex_queue_entry *upEntry = qParent_.up->getTailEntry();
ex_queue_entry *downEntry = qParent_.down->getHeadEntry();
ExFastExtractPrivateState &privateState =
*((ExFastExtractPrivateState *) downEntry->pstate);
// Initialize the up queue entry.
//
// copyAtp() will copy all tuple pointers and the diags area from
// the down queue entry to the up queue entry.
//
// When we return Q_NO_DATA if the match count is > 0:
// * assume down queue diags were returned with the Q_OK_MMORE entries
// * release down queue diags before copyAtp()
//
if (status == ex_queue::Q_NO_DATA && privateState.matchCount_ > 0)
{
downEntry->setDiagsArea(NULL);
upEntry->copyAtp(downEntry);
}
else
{
upEntry->copyAtp(downEntry);
downEntry->setDiagsArea(NULL);
}
upEntry->upState.status = status;
upEntry->upState.parentIndex = downEntry->downState.parentIndex;
upEntry->upState.downIndex = qParent_.down->getHeadIndex();
upEntry->upState.setMatchNo(privateState.matchCount_);
// Move any diags to the up queue entry
if (diags != NULL)
{
ComDiagsArea *atpDiags = upEntry->getDiagsArea();
if (atpDiags == NULL)
{
// setDiagsArea() does not increment the reference count
upEntry->setDiagsArea(diags);
diags->incrRefCount();
}
else
{
atpDiags->mergeAfter(*diags);
}
}
// Insert into up queue
qParent_.up->insert();
// Optionally remove the head of the down queue
if (popDownQueue)
{
privateState.init();
qParent_.down->removeHead();
}
}
const char *ExFastExtractTcb::getTcbStateString(FastExtractStates s)
{
switch (s)
{
case EXTRACT_NOT_STARTED: return "EXTRACT_NOT_STARTED";
case EXTRACT_INITIALIZE: return "EXTRACT_INITIALIZE";
case EXTRACT_PASS_REQUEST_TO_CHILD: return "EXTRACT_PASS_REQUEST_TO_CHILD";
case EXTRACT_RETURN_ROWS_FROM_CHILD:return "EXTRACT_RETURN_ROWS_FROM_CHILD";
case EXTRACT_DATA_READY_TO_SEND: return "EXTRACT_DATA_READY_TO_SEND";
case EXTRACT_SYNC_WITH_IO_THREAD: return "EXTRACT_SYNC_WITH_IO_THREAD";
case EXTRACT_ERROR: return "EXTRACT_ERROR";
case EXTRACT_DONE: return "EXTRACT_DONE";
case EXTRACT_CANCELED: return "EXTRACT_CANCELED";
default: return "UNKNOWN";
}
}
/* Work method, where all the action happens
The stick figure shows transitions between various states.
Transitions to EXTRACT_ERROR are not shown for clarity.
EXTRACT_NOT_STARTED
|
|
V
EXTRACT_INITIALIZE
|
|
V
EXTRACT_PASS_REQUEST_TO_CHILD
|
| |----|
V V |
EXTRACT_RETURN_ROWS_FROM_CHILD ->| EXTRACT_ERROR
| ^ | |
| | |------| |
V | V | | |
EXTRACT_SYNC_WITH_IO_THREAD|->| |
| ^ | |
| | | |
V | | V
EXTRACT_DATA_READY_TO_SEND<- EXTRACT_CANCELLED
\ /
\ /
V V
EXTRACT_DONE
*/
ExWorkProcRetcode ExFastExtractTcb::work()
{
assert(0);
return WORK_OK;
}//ExFastExtractTcb::work()
void ExFastExtractTcb::updateWorkATPDiagsArea(ComDiagsArea *da)
{
if (da)
{
if (workAtp_->getDiagsArea())
{
workAtp_->getDiagsArea()->mergeAfter(*da);
}
else
{
ComDiagsArea * da1 = da;
workAtp_->setDiagsArea(da1);
da1->incrRefCount();
}
}
}
void ExFastExtractTcb::updateWorkATPDiagsArea(ex_queue_entry * centry)
{
if (centry->getDiagsArea())
{
if (workAtp_->getDiagsArea())
{
workAtp_->getDiagsArea()->mergeAfter(*centry->getDiagsArea());
}
else
{
ComDiagsArea * da = centry->getDiagsArea();
workAtp_->setDiagsArea(da);
da->incrRefCount();
centry->setDiagsArea(0);
}
}
}
void ExFastExtractTcb::updateWorkATPDiagsArea(ExeErrorCode rc, const char *msg)
{
ComDiagsArea *da = workAtp_->getDiagsArea();
if(!da)
{
da = ComDiagsArea::allocate(getHeap());
workAtp_->setDiagsArea(da);
}
if(msg != NULL)
{
*da << DgSqlCode(-rc)
<< DgString0(msg);
}
else
{
*da << DgSqlCode(-rc);
}
}
void ExFastExtractTcb::updateWorkATPDiagsArea(const char *file,
int line, const char *msg)
{
ComDiagsArea *da = workAtp_->getDiagsArea();
if(!da)
{
da = ComDiagsArea::allocate(getHeap());
workAtp_->setDiagsArea(da);
}
*da << DgSqlCode(-1001)
<< DgString0(file)
<< DgInt0(line)
<< DgString1(msg);
}
NABoolean ExFastExtractTcb::needStatsEntry()
{
if ((getGlobals()->getStatsArea()->getCollectStatsType() == ComTdb::ALL_STATS) ||
(getGlobals()->getStatsArea()->getCollectStatsType() == ComTdb::OPERATOR_STATS))
return TRUE;
else
return FALSE;
}
ExOperStats * ExFastExtractTcb::doAllocateStatsEntry(
CollHeap *heap,
ComTdb *tdb)
{
ExOperStats *stat = NULL;
ComTdb::CollectStatsType statsType = getGlobals()->getStatsArea()->getCollectStatsType();
if (statsType == ComTdb::OPERATOR_STATS)
{
return ex_tcb::doAllocateStatsEntry(heap, tdb);;
}
else
{
return new(heap) ExFastExtractStats( heap,
this,
tdb);
}
}
Lng32 ExHdfsFastExtractTcb::lobInterfaceDataModCheck
(Int64 &failedModTS,
char * failedLocBuf,
Int32 &failedLocBufLen)
{
return ExpLOBinterfaceDataModCheck(lobGlob_,
targetLocation_,
hdfsHost_,
hdfsPort_,
myTdb().getModTSforDir(),
0,
failedModTS,
failedLocBuf, failedLocBufLen);
}
ExHdfsFastExtractTcb::ExHdfsFastExtractTcb(
const ExFastExtractTdb &fteTdb,
const ex_tcb & childTcb,
ex_globals *glob)
: ExFastExtractTcb(
fteTdb,
childTcb,
glob),
sequenceFileWriter_(NULL)
, hdfsClient_(NULL)
{
} // ExHdfsFastExtractTcb::ExFastExtractTcb
ExHdfsFastExtractTcb::~ExHdfsFastExtractTcb()
{
if (lobGlob_) {
ExpLOBinterfaceCleanup(lobGlob_);
lobGlob_ = NULL;
}
if (sequenceFileWriter_ != NULL) {
NADELETE(sequenceFileWriter_, SequenceFileWriter, getHeap());
sequenceFileWriter_ = NULL;
}
if (hdfsClient_ != NULL) {
NADELETE(hdfsClient_, HdfsClient, getHeap());
hdfsClient_ = NULL;
}
} // ExHdfsFastExtractTcb::~ExHdfsFastExtractTcb()
Int32 ExHdfsFastExtractTcb::fixup()
{
lobGlob_ = NULL;
ex_tcb::fixup();
strncpy(hdfsHost_, myTdb().getHdfsHostName(), sizeof(hdfsHost_));
hdfsPort_ = myTdb().getHdfsPortNum();
ExpLOBinterfaceInit
(lobGlob_, (NAHeap *)getGlobals()->getDefaultHeap(),getGlobals()->castToExExeStmtGlobals()->getContext(),TRUE,hdfsHost_,hdfsPort_);
modTS_ = myTdb().getModTSforDir();
return 0;
}
void ExHdfsFastExtractTcb::convertSQRowToString(ULng32 nullLen,
ULng32 recSepLen,
ULng32 delimLen,
tupp_descriptor* dataDesc,
char* targetData,
NABoolean & convError) {
char* childRow = dataDesc->getTupleAddress();
ULng32 childRowLen = dataDesc->getAllocatedSize();
UInt32 vcActualLen = 0;
for (UInt32 i = 0; i < myTdb().getChildTuple()->numAttrs(); i++) {
Attributes * attr = myTdb().getChildTableAttr(i);
Attributes * attr2 = myTdb().getChildTableAttr2(i);
char *childColData = NULL; //childRow + attr->getOffset();
UInt32 childColLen = 0;
UInt32 maxTargetColLen = attr2->getLength();
//format is aligned format--
//----------
// field is varchar
if (attr->getVCIndicatorLength() > 0) {
childColData = childRow + *((UInt32*) (childRow + attr->getVoaOffset()));
childColLen = attr->getLength(childColData);
childColData += attr->getVCIndicatorLength();
} else { //source is fixed length
childColData = childRow + attr->getOffset();
childColLen = attr->getLength();
if ((attr->getCharSet() == CharInfo::ISO88591
|| attr->getCharSet() == CharInfo::UTF8) && childColLen > 0) {
// trim trailing blanks
while (childColLen > 0 && childColData[childColLen - 1] == ' ') {
childColLen--;
}
} else if (attr->getCharSet() == CharInfo::UCS2 && childColLen > 1) {
ex_assert(childColLen % 2 == 0, "invalid ucs2");
NAWchar* wChildColData = (NAWchar*) childColData;
Int32 wChildColLen = childColLen / 2;
while (wChildColLen > 0 && wChildColData[wChildColLen - 1] == L' ') {
wChildColLen--;
}
childColLen = wChildColLen * 2;
}
}
if (attr->getNullFlag()
&& ExpAlignedFormat::isNullValue(childRow + attr->getNullIndOffset(),
attr->getNullBitIndex())) {
// source is a null value.
nullLen = 0;
if (myTdb().getNullString()) {
nullLen = strlen(myTdb().getNullString());
memcpy(targetData, myTdb().getNullString(), nullLen);
}
targetData += nullLen;
currBuffer_->bytesLeft_ -= nullLen;
} else {
switch ((ConvInstruction) sourceFieldsConvIndex_[i]) {
case CONV_ASCII_V_V:
case CONV_ASCII_F_V:
case CONV_UNICODE_V_V:
case CONV_UNICODE_F_V: {
if (childColLen > 0) {
memcpy(targetData, childColData, childColLen);
targetData += childColLen;
currBuffer_->bytesLeft_ -= childColLen;
}
}
break;
default:
ex_expr::exp_return_type err = convDoIt(childColData, childColLen,
attr->getDatatype(), attr->getPrecision(), attr->getScale(),
targetData, attr2->getLength(), attr2->getDatatype(),
attr2->getPrecision(), attr2->getScale(), (char*) &vcActualLen,
sizeof(vcActualLen), 0, 0, // diags may need to be added
(ConvInstruction) sourceFieldsConvIndex_[i]);
if (err == ex_expr::EXPR_ERROR) {
convError = TRUE;
// not exit loop -- we will log the errenous row later
// do not cancel processing for this type of error???
}
targetData += vcActualLen;
currBuffer_->bytesLeft_ -= vcActualLen;
break;
} //switch
}
if (i == myTdb().getChildTuple()->numAttrs() - 1) {
strncpy(targetData, myTdb().getRecordSeparator(), recSepLen);
targetData += recSepLen;
currBuffer_->bytesLeft_ -= recSepLen;
} else {
strncpy(targetData, myTdb().getDelimiter(), delimLen);
targetData += delimLen;
currBuffer_->bytesLeft_ -= delimLen;
}
}
}
ExWorkProcRetcode ExHdfsFastExtractTcb::work()
{
Lng32 retcode = 0;
SFW_RetCode sfwRetCode = SFW_OK;
HDFS_Client_RetCode hdfsClientRetCode = HDFS_CLIENT_OK;
ULng32 recSepLen = strlen(myTdb().getRecordSeparator());
ULng32 delimLen = strlen(myTdb().getDelimiter());
ULng32 nullLen =
(myTdb().getNullString() ? strlen(myTdb().getNullString()) : 0);
if (myTdb().getIsHiveInsert())
{
recSepLen = 1;
delimLen = 1;
}
if (getEmptyNullString()) //covers hive null case also
nullLen = 0;
ExOperStats *stats = NULL;
ExFastExtractStats *feStats = getFastExtractStats();
while (TRUE)
{
// if no parent request, return
if (qParent_.down->isEmpty())
return WORK_OK;
ex_queue_entry *pentry_down = qParent_.down->getHeadEntry();
const ex_queue::down_request request = pentry_down->downState.request;
const Lng32 value = pentry_down->downState.requestValue;
ExFastExtractPrivateState &pstate = *((ExFastExtractPrivateState *) pentry_down->pstate);
switch (pstate.step_)
{
case EXTRACT_NOT_STARTED:
{
pstate.step_= EXTRACT_CHECK_MOD_TS;
}
break;
case EXTRACT_CHECK_MOD_TS:
{
// if no tgt file or input timestamp is -1, skip data mod check.
// Also, if this insert is being done with overwrite, then data mod
// check has already been done during directory cleanup. Skip it here.
if ((! myTdb().getTargetFile()) ||
(myTdb().getModTSforDir() == -1) ||
(myTdb().getOverwriteHiveTable()))
{
pstate.step_ = EXTRACT_INITIALIZE;
break;
}
numBuffers_ = 0;
memset (hdfsHost_, '\0', sizeof(hdfsHost_));
strncpy(hdfsHost_, myTdb().getHdfsHostName(), sizeof(hdfsHost_));
hdfsPort_ = myTdb().getHdfsPortNum();
memset (fileName_, '\0', sizeof(fileName_));
memset (targetLocation_, '\0', sizeof(targetLocation_));
snprintf(targetLocation_,999, "%s", myTdb().getTargetName());
Int64 failedModTS = -1;
Lng32 failedLocBufLen = 1000;
char failedLocBuf[failedLocBufLen];
retcode =
lobInterfaceDataModCheck(failedModTS, failedLocBuf, failedLocBufLen);
if (retcode < 0)
{
Lng32 cliError = 0;
Lng32 intParam1 = -retcode;
ComDiagsArea * diagsArea = NULL;
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(EXE_ERROR_FROM_LOB_INTERFACE),
NULL, &intParam1,
&cliError,
NULL,
"HDFS",
(char*)"ExpLOBInterfaceDataModCheck",
getLobErrStr(intParam1));
pentry_down->setDiagsArea(diagsArea);
pstate.step_ = EXTRACT_ERROR;
break;
}
if (retcode == 1) // check failed
{
char errStr[200];
str_sprintf(errStr, "genModTS = %ld, failedModTS = %ld",
myTdb().getModTSforDir(), failedModTS);
ComDiagsArea * diagsArea = NULL;
ExRaiseSqlError(getHeap(), &diagsArea,
(ExeErrorCode)(EXE_HIVE_DATA_MOD_CHECK_ERROR), NULL,
NULL, NULL, NULL,
errStr);
pentry_down->setDiagsArea(diagsArea);
pstate.step_ = EXTRACT_ERROR;
break;
}
pstate.step_= EXTRACT_INITIALIZE;
}
break;
case EXTRACT_INITIALIZE:
{
pstate.processingStarted_ = FALSE;
errorOccurred_ = FALSE;
//Allocate writeBuffers.
numBuffers_ = 1;
for (Int16 i = 0; i < numBuffers_; i++)
{
bool done = false;
Int64 input_datalen = myTdb().getHdfsIoBufferSize();
char * buf_addr = 0;
while ((!done) && input_datalen >= 32 * 1024)
{
buf_addr = 0;
buf_addr = (char *)((NAHeap *)heap_)->allocateAlignedHeapMemory((UInt32)input_datalen, 512, FALSE);
if (buf_addr)
{
done = true;
bufferPool_[i] = new (heap_) IOBuffer((char*) buf_addr, (Int32)input_datalen);
}
else
{
bufferAllocFailuresCount_++;
input_datalen = input_datalen / 2;
}
}
if (!done)
{
numBuffers_ = i;
break ; // if too few buffers have been allocated we will raise
} // an error later
}
if (feStats)
{
feStats->setBufferAllocFailuresCount(bufferAllocFailuresCount_);
feStats->setBuffersCount(numBuffers_);
}
ComDiagsArea *da = NULL;
if (myTdb().getTargetFile() )
{
Lng32 fileNum = getGlobals()->castToExExeStmtGlobals()->getMyInstanceNumber();
memset (hdfsHost_, '\0', sizeof(hdfsHost_));
strncpy(hdfsHost_, myTdb().getHdfsHostName(), sizeof(hdfsHost_));
hdfsPort_ = myTdb().getHdfsPortNum();
memset (fileName_, '\0', sizeof(fileName_));
memset (targetLocation_, '\0', sizeof(targetLocation_));
snprintf(targetLocation_,999, "%s", myTdb().getTargetName());
char pt[80]="";
char usec_buf[6]="";
struct timeval tmnow;
struct tm *currgmtime;
gettimeofday(&tmnow, NULL);
currgmtime = gmtime(&tmnow.tv_sec);
strftime(pt, sizeof(pt), "%Y%m%d%H%M%S", currgmtime);
sprintf(usec_buf,"%d",(int)tmnow.tv_usec);
strcat(pt,usec_buf);
srand(getpid());
if (myTdb().getIsHiveInsert())
snprintf(fileName_,999, "%s%d-%s-%d", myTdb().getHiveTableName(), fileNum, pt,rand() % 1000);
else
snprintf(fileName_,999, "%s%d-%s-%d", "file", fileNum, pt,rand() % 1000);
if (isSequenceFile() && sequenceFileWriter_ == NULL)
{
sequenceFileWriter_ = new(getHeap())
SequenceFileWriter((NAHeap *)getHeap());
sfwRetCode = sequenceFileWriter_->init();
if (sfwRetCode != SFW_OK)
{
createSequenceFileError(sfwRetCode);
pstate.step_ = EXTRACT_ERROR;
break;
}
}
else if (!isSequenceFile() && hdfsClient_ == NULL)
{
hdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), NULL, hdfsClientRetCode);
if (hdfsClientRetCode != HDFS_CLIENT_OK)
{
createHdfsClientFileError(hdfsClientRetCode);
pstate.step_ = EXTRACT_ERROR;
break;
}
}
strcat(targetLocation_, "//");
strcat(targetLocation_, fileName_);
if (isSequenceFile())
{
sfwRetCode = sequenceFileWriter_->open(targetLocation_, SFW_COMP_NONE);
if (sfwRetCode != SFW_OK)
{
createSequenceFileError(sfwRetCode);
pstate.step_ = EXTRACT_ERROR;
break;
}
}
else
{
hdfsClientRetCode = hdfsClient_->hdfsOpen(targetLocation_, isHdfsCompressed());
if (hdfsClientRetCode != HDFS_CLIENT_OK)
{
createHdfsClientFileError(hdfsClientRetCode);
NADELETE(hdfsClient_,
HdfsClient,
heap_);
hdfsClient_ = NULL;
pstate.step_ = EXTRACT_ERROR;
break;
}
}
if (feStats)
{
feStats->setPartitionNumber(fileNum);
}
}
else
{
updateWorkATPDiagsArea(__FILE__,__LINE__,"sockets are not supported");
pstate.step_ = EXTRACT_ERROR;
break;
}
for (UInt32 i = 0; i < myTdb().getChildTuple()->numAttrs(); i++)
{
Attributes * attr = myTdb().getChildTableAttr(i);
Attributes * attr2 = myTdb().getChildTableAttr2(i);
ex_conv_clause tempClause;
int convIndex = 0;
sourceFieldsConvIndex_[i] =
tempClause.findInstruction(
attr->getDatatype(),
0,
attr2->getDatatype(),
0,
0);
}
pstate.step_= EXTRACT_PASS_REQUEST_TO_CHILD;
}
break;
case EXTRACT_PASS_REQUEST_TO_CHILD:
{
// pass the parent request to the child downqueue
if (!qChild_.down->isFull())
{
ex_queue_entry * centry = qChild_.down->getTailEntry();
if (request == ex_queue::GET_N)
centry->downState.request = ex_queue::GET_ALL;
else
centry->downState.request = request;
centry->downState.requestValue = pentry_down->downState.requestValue;
centry->downState.parentIndex = qParent_.down->getHeadIndex();
// set the child's input atp
centry->passAtp(pentry_down->getAtp());
qChild_.down->insert();
pstate.processingStarted_ = TRUE;
}
else
// couldn't pass request to child, return
return WORK_OK;
pstate.step_ = EXTRACT_RETURN_ROWS_FROM_CHILD;
}
break;
case EXTRACT_RETURN_ROWS_FROM_CHILD:
{
if ((qChild_.up->isEmpty()))
{
return WORK_OK;
}
if (currBuffer_ == NULL)
{
currBuffer_ = bufferPool_[0];
memset(currBuffer_->data_, '\0',currBuffer_->bufSize_);
currBuffer_->bytesLeft_ = currBuffer_->bufSize_;
}
ex_queue_entry * centry = qChild_.up->getHeadEntry();
ComDiagsArea *cda = NULL;
ex_queue::up_status child_status = centry->upState.status;
switch (child_status)
{
case ex_queue::Q_OK_MMORE:
{
// for the very first row retruned from child
// include the header row if necessary
if ((pstate.matchCount_ == 0) && myTdb().getIncludeHeader())
{
if (!myTdb().getIsAppend())
{
Int32 headerLength = strlen(myTdb().getHeader());
char * target = currBuffer_->data_;
if (headerLength + 1 < currBuffer_->bufSize_)
{
strncpy(target, myTdb().getHeader(),headerLength);
target[headerLength] = '\n' ;
currBuffer_->bytesLeft_ -= headerLength+1 ;
}
else
{
updateWorkATPDiagsArea(__FILE__,__LINE__,"header does not fit in buffer");
pstate.step_ = EXTRACT_ERROR;
break;
}
}
}
tupp_descriptor *dataDesc = childOutputTD_;
ex_expr::exp_return_type expStatus = ex_expr::EXPR_OK;
if (myTdb().getChildDataExpr())
{
UInt32 childTuppIndex = myTdb().childDataTuppIndex_;
workAtp_->getTupp(childTuppIndex) = dataDesc;
// Evaluate the child data expression. If diags are generated they
// will be left in the down entry ATP.
expStatus = myTdb().getChildDataExpr()->eval(centry->getAtp(), workAtp_);
workAtp_->getTupp(childTuppIndex).release();
if (expStatus == ex_expr::EXPR_ERROR)
{
if (myTdb().getContinueOnError())
{
// ignore this row and continue to the next row
if (workAtp_->getDiagsArea())
workAtp_->getDiagsArea()->clear();
break;
}
else
{
updateWorkATPDiagsArea(centry);
pstate.step_ = EXTRACT_ERROR;
break;
}
}
} // if (myTdb().getChildDataExpr())
///////////////////////
char * targetData = currBuffer_->data_ + currBuffer_->bufSize_ - currBuffer_->bytesLeft_;
if (targetData == NULL)
{
updateWorkATPDiagsArea(__FILE__,__LINE__,"targetData is NULL");
pstate.step_ = EXTRACT_ERROR;
break;
}
NABoolean convError = FALSE;
convertSQRowToString(nullLen, recSepLen, delimLen, dataDesc,
targetData, convError);
///////////////////////////////
pstate.matchCount_++;
if (!convError)
{
if (feStats)
{
feStats->incProcessedRowsCount();
}
pstate.successRowCount_ ++;
}
else
{
if (feStats)
{
feStats->incErrorRowsCount();
}
pstate.errorRowCount_ ++;
}
if (currBuffer_->bytesLeft_ < (Int32) maxExtractRowLength_)
{
pstate.step_ = EXTRACT_DATA_READY_TO_SEND;
}
}
break;
case ex_queue::Q_NO_DATA:
{
pstate.step_ = EXTRACT_DATA_READY_TO_SEND;
endOfData_ = TRUE;
pstate.processingStarted_ = FALSE ; // so that cancel does not
//wait for this Q_NO_DATA
}
break;
case ex_queue::Q_SQLERROR:
{
if ((centry->getDiagsArea()) &&
(!pentry_down->getDiagsArea()))
{
ComDiagsArea *diagsArea = pentry_down->getDiagsArea();
diagsArea = ComDiagsArea::allocate(getGlobals()->getDefaultHeap());
pentry_down->setDiagsArea(diagsArea);
pentry_down->getDiagsArea()->mergeAfter(*centry->getDiagsArea());
}
pstate.step_ = EXTRACT_ERROR;
}
break;
case ex_queue::Q_INVALID:
{
updateWorkATPDiagsArea(__FILE__,__LINE__,
"ExFastExtractTcb::work() Invalid state returned by child");
pstate.step_ = EXTRACT_ERROR;
}
break;
} // switch
qChild_.up->removeHead();
}
break;
case EXTRACT_DATA_READY_TO_SEND:
{
ssize_t bytesToWrite = currBuffer_->bufSize_ - currBuffer_->bytesLeft_;
if (isSequenceFile())
{
sfwRetCode = sequenceFileWriter_->writeBuffer(currBuffer_->data_, bytesToWrite, myTdb().getRecordSeparator());
if (sfwRetCode != SFW_OK)
{
createSequenceFileError(sfwRetCode);
pstate.step_ = EXTRACT_ERROR;
break;
}
}
else
{
hdfsClient_->hdfsWrite(currBuffer_->data_, bytesToWrite, hdfsClientRetCode);
if (hdfsClientRetCode != HDFS_CLIENT_OK)
{
createSequenceFileError(hdfsClientRetCode);
pstate.step_ = EXTRACT_ERROR;
break;
}
}
if (feStats)
{
feStats->incReadyToSendBuffersCount();
feStats->incReadyToSendBytes(currBuffer_->bufSize_ - currBuffer_->bytesLeft_);
}
currBuffer_ = NULL;
if (endOfData_)
{
pstate.step_ = EXTRACT_DONE;
}
else
{
pstate.step_ = EXTRACT_RETURN_ROWS_FROM_CHILD;
}
}
break;
case EXTRACT_ERROR:
{
// If there is no room in the parent queue for the reply,
// try again later.
//Later we may split this state into 2 one for cancel and one for query
if (qParent_.up->isFull())
return WORK_OK;
// Cancel the child request - there must be a child request in
// progress to get to the ERROR state.
if (pstate.processingStarted_)
{
qChild_.down->cancelRequestWithParentIndex(qParent_.down->getHeadIndex());
//pstate.processingStarted_ = FALSE;
}
while (pstate.processingStarted_ && pstate.step_ == EXTRACT_ERROR)
{
if (qChild_.up->isEmpty())
return WORK_OK;
ex_queue_entry * childEntry = qChild_.up->getHeadEntry();
ex_queue::up_status childStatus = childEntry->upState.status;
if (childStatus == ex_queue::Q_NO_DATA)
{
pstate.step_ = EXTRACT_DONE;
pstate.processingStarted_ = FALSE;
}
qChild_.up->removeHead();
}
ex_queue_entry *pentry_up = qParent_.up->getTailEntry();
pentry_up->copyAtp(pentry_down);
// Construct and return the error row.
//
if (workAtp_->getDiagsArea())
{
ComDiagsArea *diagsArea = pentry_up->getDiagsArea();
if (diagsArea == NULL)
{
diagsArea = ComDiagsArea::allocate(getGlobals()->getDefaultHeap());
pentry_up->setDiagsArea(diagsArea);
}
pentry_up->getDiagsArea()->mergeAfter(*workAtp_->getDiagsArea());
workAtp_->setDiagsArea(NULL);
}
pentry_up->upState.status = ex_queue::Q_SQLERROR;
pentry_up->upState.parentIndex
= pentry_down->downState.parentIndex;
pentry_up->upState.downIndex = qParent_.down->getHeadIndex();
pentry_up->upState.setMatchNo(pstate.matchCount_);
qParent_.up->insert();
//
errorOccurred_ = TRUE;
pstate.step_ = EXTRACT_DONE;
}
break;
case EXTRACT_DONE:
{
// If there is no room in the parent queue for the reply,
// try again later.
//
if (qParent_.up->isFull())
return WORK_OK;
if (isSequenceFile())
{
if (sequenceFileWriter_)
{
sfwRetCode = sequenceFileWriter_->close();
if (!errorOccurred_ && sfwRetCode != SFW_OK )
{
createSequenceFileError(sfwRetCode);
pstate.step_ = EXTRACT_ERROR;
break;
}
}
}
else
{
if (hdfsClient_)
{
hdfsClientRetCode = hdfsClient_->hdfsClose();
if (!errorOccurred_ && HDFS_CLIENT_OK != HDFS_CLIENT_OK )
{
createHdfsClientFileError(hdfsClientRetCode);
pstate.step_ = EXTRACT_ERROR;
break;
}
}
}
//insertUpQueueEntry will insert Q_NO_DATA into the up queue and
//remove the head of the down queue
insertUpQueueEntry(ex_queue::Q_NO_DATA, NULL, TRUE);
errorOccurred_ = FALSE;
endOfData_ = FALSE;
//we need to set the next state so that the query can get re-executed
//and we start from the beginning again. Not sure if pstate will be
//valid anymore because insertUpQueueEntry() might have cleared it
//already.
pstate.step_ = EXTRACT_NOT_STARTED;
//exit out now and not break.
return WORK_OK;
}
break;
default:
{
ex_assert(FALSE, "Invalid state in ExHdfsFastExtractTcb ");
}
break;
} // switch(pstate.step_)
} // while
return WORK_OK;
}//ExHdfsFastExtractTcb::work()
void ExHdfsFastExtractTcb::insertUpQueueEntry(ex_queue::up_status status, ComDiagsArea *diags, NABoolean popDownQueue)
{
ex_queue_entry *upEntry = qParent_.up->getTailEntry();
ex_queue_entry *downEntry = qParent_.down->getHeadEntry();
ExFastExtractPrivateState &privateState = *((ExFastExtractPrivateState *) downEntry->pstate);
// Initialize the up queue entry.
//
// copyAtp() will copy all tuple pointers and the diags area from
// the down queue entry to the up queue entry.
//
// When we return Q_NO_DATA if the match count is > 0:
// * assume down queue diags were returned with the Q_OK_MMORE entries
// * release down queue diags before copyAtp()
//
if (status == ex_queue::Q_NO_DATA && privateState.matchCount_ > 0)
{
downEntry->setDiagsArea(NULL);
upEntry->copyAtp(downEntry);
}
else
{
upEntry->copyAtp(downEntry);
downEntry->setDiagsArea(NULL);
}
upEntry->upState.status = status;
upEntry->upState.parentIndex = downEntry->downState.parentIndex;
upEntry->upState.downIndex = qParent_.down->getHeadIndex();
upEntry->upState.setMatchNo(privateState.matchCount_);
// rows affected code (below) medeled after ex_partn_access.cpp
ExMasterStmtGlobals *g = getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals();
if (!g)
{
ComDiagsArea *da = upEntry->getDiagsArea();
if (da == NULL)
{
da = ComDiagsArea::allocate(getGlobals()->getDefaultHeap());
upEntry->setDiagsArea(da);
}
da->addRowCount(privateState.matchCount_);
}
else
{
g->setRowsAffected(privateState.matchCount_);
}
//
// Insert into up queue
qParent_.up->insert();
// Optionally remove the head of the down queue
if (popDownQueue)
{
privateState.init();
qParent_.down->removeHead();
}
}
NABoolean ExHdfsFastExtractTcb::isSequenceFile()
{
return myTdb().getIsSequenceFile();
}
NABoolean ExHdfsFastExtractTcb::isHdfsCompressed()
{
return myTdb().getHdfsCompressed();
}
void ExHdfsFastExtractTcb::createSequenceFileError(Int32 sfwRetCode)
{
ContextCli *currContext = GetCliGlobals()->currContext();
ComDiagsArea * diagsArea = NULL;
char* errorMsg = sequenceFileWriter_->getErrorText((SFW_RetCode)sfwRetCode);
ExRaiseSqlError(getHeap(),
&diagsArea,
(ExeErrorCode)(8447),
NULL, NULL, NULL, NULL,
errorMsg,
(char *)GetCliGlobals()->getJniErrorStr());
//ex_queue_entry *pentry_down = qParent_.down->getHeadEntry();
//pentry_down->setDiagsArea(diagsArea);
updateWorkATPDiagsArea(diagsArea);
}
void ExHdfsFastExtractTcb::createHdfsClientFileError(Int32 hdfsClientRetCode)
{
ContextCli *currContext = GetCliGlobals()->currContext();
ComDiagsArea * diagsArea = NULL;
char* errorMsg = hdfsClient_->getErrorText((HDFS_Client_RetCode)hdfsClientRetCode);
ExRaiseSqlError(getHeap(),
&diagsArea,
(ExeErrorCode)(8447),
NULL, NULL, NULL, NULL,
errorMsg,
(char *)GetCliGlobals()->getJniErrorStr());
updateWorkATPDiagsArea(diagsArea);
}
ExFastExtractPrivateState::ExFastExtractPrivateState()
{
init();
}
ExFastExtractPrivateState::~ExFastExtractPrivateState()
{
}