blob: ed3c1ed554633c10f8ad1a2e8c028d85474d8b5e [file] [log] [blame]
// **********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
// **********************************************************************
#include "Platform.h"
#include "ex_stdh.h"
#include "ComTdb.h"
#include "ex_tcb.h"
#include "ExHbaseAccess.h"
#include "ex_exe_stmt_globals.h"
#include "ExpLOBinterface.h"
#include "SQLTypeDefs.h"
#include "ExpHbaseInterface.h"
ExHbaseScanTaskTcb::ExHbaseScanTaskTcb(
ExHbaseAccessSelectTcb * tcb)
: ExHbaseTaskTcb(tcb)
, step_(NOT_STARTED)
{
}
void ExHbaseScanTaskTcb::init()
{
step_ = NOT_STARTED;
}
ExWorkProcRetcode ExHbaseScanTaskTcb::work(short &rc)
{
Lng32 retcode = 0;
rc = 0;
Lng32 remainingInBatch = batchSize_;
while (1)
{
ex_queue_entry *pentry_down = tcb_->qparent_.down->getHeadEntry();
switch (step_)
{
case NOT_STARTED:
{
step_ = SCAN_OPEN;
}
break;
case SCAN_OPEN:
{
tcb_->table_.val = tcb_->hbaseAccessTdb().getTableName();
tcb_->table_.len = strlen(tcb_->hbaseAccessTdb().getTableName());
if (tcb_->setupHbaseFilterPreds())
{
step_ = HANDLE_ERROR;
break;
}
retcode = tcb_->ehi_->scanOpen(tcb_->table_,
tcb_->beginRowId_, tcb_->endRowId_,
tcb_->columns_, -1,
tcb_->hbaseAccessTdb().readUncommittedScan(),
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->cacheBlocks(),
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->useSmallScanner(),
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->numCacheRows(),
FALSE,
(tcb_->hbaseFilterColumns_.entries() > 0 ?
&tcb_->hbaseFilterColumns_ : NULL),
(tcb_->hbaseFilterOps_.entries() > 0 ?
&tcb_->hbaseFilterOps_ : NULL),
(tcb_->hbaseFilterValues_.entries() > 0 ?
&tcb_->hbaseFilterValues_ : NULL),
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->dopParallelScanner(),
tcb_->getSamplePercentage(),
FALSE, 0, NULL, NULL, 0,
(tcb_->hbaseAccessTdb().getHbaseAccessOptions()
? tcb_->hbaseAccessTdb().getHbaseAccessOptions()->getNumVersions() : 0));
if (tcb_->setupError(retcode, "ExpHbaseInterface::scanOpen"))
step_ = HANDLE_ERROR;
else
step_ = NEXT_ROW;
}
break;
case NEXT_ROW:
{
if (--remainingInBatch <= 0)
{
rc = WORK_CALL_AGAIN;
return 1;
}
retcode = tcb_->ehi_->nextRow();
if (retcode == HBASE_ACCESS_EOD || retcode == HBASE_ACCESS_EOR)
{
step_ = SCAN_CLOSE;
break;
}
if (tcb_->setupError(retcode, "ExpHbaseInterface::nextRow"))
step_ = HANDLE_ERROR;
else
step_ = NEXT_CELL;
}
break;
case NEXT_CELL:
{
if (tcb_->colVal_.val == NULL)
tcb_->colVal_.val = new (tcb_->getHeap())
char[tcb_->hbaseAccessTdb().convertRowLen()];
tcb_->colVal_.len = tcb_->hbaseAccessTdb().convertRowLen();
retcode = tcb_->ehi_->nextCell(tcb_->rowId_, tcb_->colFamName_,
tcb_->colName_, tcb_->colVal_, tcb_->colTS_);
if (retcode == HBASE_ACCESS_EOD)
{
step_ = NEXT_ROW;
break;
}
if (tcb_->setupError(retcode, "ExpHbaseInterface::nextCell"))
step_ = HANDLE_ERROR;
else
step_ = CREATE_ROW;
}
break;
case CREATE_ROW:
{
rc = tcb_->createColumnwiseRow();
if (rc == -1)
{
step_ = HANDLE_ERROR;
break;
}
else if (tcb_->setupError(rc, "ExHbaseAccessTcb::createColumnwiseRow", "Not enough space in target buffer to move data"))
{
step_ = HANDLE_ERROR;
break;
}
step_ = APPLY_PRED;
}
break;
case APPLY_PRED:
{
rc = tcb_->applyPred(tcb_->scanExpr());
if (rc == 1)
step_ = RETURN_ROW;
else if (rc == -1)
step_ = HANDLE_ERROR;
else
step_ = NEXT_CELL;
}
break;
case RETURN_ROW:
{
if (tcb_->moveRowToUpQueue(tcb_->convertRow_,
tcb_->hbaseAccessTdb().convertRowLen(), &rc, FALSE))
return 1;
if (tcb_->getHbaseAccessStats())
tcb_->getHbaseAccessStats()->incUsedRows();
if ((pentry_down->downState.request == ex_queue::GET_N) &&
(pentry_down->downState.requestValue == tcb_->matches_))
{
step_ = SCAN_CLOSE;
break;
}
step_ = NEXT_CELL;
}
break;
case SCAN_CLOSE:
{
retcode = tcb_->ehi_->scanClose();
if (tcb_->setupError(retcode, "ExpHbaseInterface::scanClose"))
step_ = HANDLE_ERROR;
else
step_ = DONE;
}
break;
case HANDLE_ERROR:
{
step_ = NOT_STARTED;
return -1;
}
break;
case DONE:
{
step_ = NOT_STARTED;
return 0;
}
break;
}// switch
} // while
}
ExHbaseScanRowwiseTaskTcb::ExHbaseScanRowwiseTaskTcb(ExHbaseAccessSelectTcb * tcb)
: ExHbaseTaskTcb(tcb)
, step_(NOT_STARTED)
{
}
void ExHbaseScanRowwiseTaskTcb::init()
{
step_ = NOT_STARTED;
}
ExWorkProcRetcode ExHbaseScanRowwiseTaskTcb::work(short &rc)
{
Lng32 retcode = 0;
rc = 0;
Lng32 remainingInBatch = batchSize_;
while (1)
{
ex_queue_entry *pentry_down = tcb_->qparent_.down->getHeadEntry();
switch (step_)
{
case NOT_STARTED:
{
step_ = SCAN_OPEN;
}
break;
case SCAN_OPEN:
{
tcb_->table_.val = tcb_->hbaseAccessTdb().getTableName();
tcb_->table_.len = strlen(tcb_->hbaseAccessTdb().getTableName());
if (tcb_->setupHbaseFilterPreds())
{
step_ = HANDLE_ERROR;
break;
}
retcode = tcb_->ehi_->scanOpen(tcb_->table_,
tcb_->beginRowId_, tcb_->endRowId_,
tcb_->columns_, -1,
tcb_->hbaseAccessTdb().readUncommittedScan(),
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->cacheBlocks(),
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->useSmallScanner(),
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->numCacheRows(),
FALSE,
(tcb_->hbaseFilterColumns_.entries() > 0 ?
&tcb_->hbaseFilterColumns_ : NULL),
(tcb_->hbaseFilterOps_.entries() > 0 ?
&tcb_->hbaseFilterOps_ : NULL),
(tcb_->hbaseFilterValues_.entries() > 0 ?
&tcb_->hbaseFilterValues_ : NULL),
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->dopParallelScanner(),
tcb_->getSamplePercentage(),
FALSE, 0, NULL, NULL, 0,
(tcb_->hbaseAccessTdb().getHbaseAccessOptions()
? tcb_->hbaseAccessTdb().getHbaseAccessOptions()->getNumVersions() : 0));
if (tcb_->setupError(retcode, "ExpHbaseInterface::scanOpen"))
step_ = HANDLE_ERROR;
else
step_ = NEXT_ROW;
tcb_->isEOD_ = FALSE;
}
break;
case NEXT_ROW:
{
if (--remainingInBatch <= 0)
{
rc = WORK_CALL_AGAIN;
return 1;
}
tcb_->rowwiseRowLen_ = 0;
retcode = tcb_->ehi_->nextRow();
if (retcode == HBASE_ACCESS_EOD || retcode == HBASE_ACCESS_EOR)
{
step_ = SCAN_CLOSE;
break;
}
if (tcb_->setupError(retcode, "ExpHbaseInterface::nextRow"))
step_ = HANDLE_ERROR;
else
step_ = NEXT_CELL;
}
break;
case NEXT_CELL:
{
if (tcb_->colVal_.val == NULL)
tcb_->colVal_.val = new (tcb_->getHeap())
char[tcb_->hbaseAccessTdb().convertRowLen()];
tcb_->colVal_.len = tcb_->hbaseAccessTdb().convertRowLen();
retcode = tcb_->ehi_->nextCell(tcb_->rowId_, tcb_->colFamName_,
tcb_->colName_, tcb_->colVal_,
tcb_->colTS_);
if (retcode == HBASE_ACCESS_EOD)
{
step_ = CREATE_ROW;
break;
}
if (tcb_->setupError(retcode, "ExpHbaseInterface::nextCell"))
step_ = HANDLE_ERROR;
else
step_ = APPEND_ROW;
}
break;
case APPEND_ROW:
{
retcode = tcb_->copyCell();
if (tcb_->setupError(retcode, "ExHbaseAccessTcb::copyCell", "Not enough space in target buffer to move data"))
step_ = HANDLE_ERROR;
else
step_ = NEXT_CELL;
}
break;
case CREATE_ROW:
{
rc = tcb_->createRowwiseRow();
if (rc == -1)
{
step_ = HANDLE_ERROR;
break;
}
step_ = APPLY_PRED;
}
break;
case APPLY_PRED:
{
rc = tcb_->applyPred(tcb_->scanExpr());
if (rc == 1)
{
step_ = RETURN_ROW;
break;
}
else if (rc == -1)
{
step_ = HANDLE_ERROR;
break;
}
else if (tcb_->isEOD_)
step_ = SCAN_CLOSE;
else
step_ = NEXT_ROW;
}
break;
case RETURN_ROW:
{
rc = 0;
if (tcb_->moveRowToUpQueue(tcb_->convertRow_,
tcb_->hbaseAccessTdb().convertRowLen(), &rc, FALSE))
return 1;
if (tcb_->getHbaseAccessStats())
tcb_->getHbaseAccessStats()->incUsedRows();
if ((pentry_down->downState.request == ex_queue::GET_N) &&
(pentry_down->downState.requestValue == tcb_->matches_))
{
step_ = SCAN_CLOSE;
break;
}
step_ = NEXT_ROW;
}
break;
case SCAN_CLOSE:
{
retcode = tcb_->ehi_->scanClose();
if (tcb_->setupError(retcode, "ExpHbaseInterface::scanClose"))
step_ = HANDLE_ERROR;
else
step_ = DONE;
}
break;
case HANDLE_ERROR:
{
step_ = NOT_STARTED;
return -1;
}
break;
case DONE:
{
step_ = NOT_STARTED;
return 0;
}
break;
}// switch
} // while
return 0; // WORK_OK
}
ExHbaseScanSQTaskTcb::ExHbaseScanSQTaskTcb(
ExHbaseAccessSelectTcb * tcb)
: ExHbaseTaskTcb(tcb)
, step_(NOT_STARTED)
{
}
void ExHbaseScanSQTaskTcb::init()
{
step_ = NOT_STARTED;
}
ExWorkProcRetcode ExHbaseScanSQTaskTcb::work(short &rc)
{
Lng32 retcode = 0;
rc = 0;
Lng32 remainingInBatch = batchSize_;
NABoolean isFirstBatch = false;
// isFirstInBatch is a stack variable for optimization reason. It is used for the mdam small scanner optimization heuristic that
// is performed at runtime. Since this function is invoke intensively for all scan (mdam or regular scan), minimizing CPU/memory access
// impact on runtime code to a strict minimum is attempted. Given that we are trying to detect if the actual scan is bellow the size
// of an HBase block, having the runtime logic performing the detection only affect the first work invoke looks like the right idea.
// and leveraging an existing counter (remainingInBatch) instead of creating a new one. The reasonable asumption to allow this is that
// 1- batchSize_ being 8K, most likely times the row size, we are good in assuming that first hbase block will fit in batchSize
// 2- parent buffer size will be large enough to deal with one HBAse_Block_size without having to rely on re-invoking work in the middle.
// and anyway, if none of the reasonable assumption is true, then that's fine, the heuristic won't work, and we will use regular scanner,
// meaning optimization is off for the scan part of MDAM (still on for the probe side of it).
while (1)
{
ex_queue_entry *pentry_down = tcb_->qparent_.down->getHeadEntry();
switch (step_)
{
case NOT_STARTED:
{
step_ = SCAN_OPEN;
}
break;
case SCAN_OPEN:
{
tcb_->table_.val = tcb_->hbaseAccessTdb().getTableName();
tcb_->table_.len = strlen(tcb_->hbaseAccessTdb().getTableName());
if (tcb_->setupHbaseFilterPreds())
{
step_ = HANDLE_ERROR;
break;
}
isFirstBatch = true;
retcode = tcb_->ehi_->scanOpen(tcb_->table_,
tcb_->beginRowId_, tcb_->endRowId_,
tcb_->columns_, -1,
tcb_->hbaseAccessTdb().readUncommittedScan(),
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->cacheBlocks(),
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->useSmallScanner(),
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->numCacheRows(),
TRUE,
(tcb_->hbaseFilterColumns_.entries() > 0 ?
&tcb_->hbaseFilterColumns_ : NULL),
(tcb_->hbaseFilterOps_.entries() > 0 ?
&tcb_->hbaseFilterOps_ : NULL),
(tcb_->hbaseFilterValues_.entries() > 0 ?
&tcb_->hbaseFilterValues_ : NULL),
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->dopParallelScanner(),
tcb_->getSamplePercentage(),
tcb_->hbaseAccessTdb().getHbaseSnapshotScanAttributes()->getUseSnapshotScan(),
tcb_->hbaseAccessTdb().getHbaseSnapshotScanAttributes()->getSnapshotScanTimeout(),
tcb_->hbaseAccessTdb().getHbaseSnapshotScanAttributes()->getSnapshotName(),
tcb_->hbaseAccessTdb().getHbaseSnapshotScanAttributes()->getSnapScanTmpLocation(),
tcb_->getGlobals()->castToExExeStmtGlobals()->getMyInstanceNumber(),
(tcb_->hbaseAccessTdb().multiVersions()
? tcb_->hbaseAccessTdb().getHbaseAccessOptions()->getNumVersions() : 0)
);
if (tcb_->setupError(retcode, "ExpHbaseInterface::scanOpen"))
step_ = HANDLE_ERROR;
else
step_ = NEXT_ROW;
}
break;
case NEXT_ROW:
{
if (--remainingInBatch <= 0)
{
rc = WORK_CALL_AGAIN;
return 1;
}
retcode = tcb_->ehi_->nextRow();
if (retcode == HBASE_ACCESS_EOD || retcode == HBASE_ACCESS_EOR)
{
step_ = SCAN_CLOSE;
break;
}
if (tcb_->setupError(retcode, "ExpHbaseInterface::nextRow"))
step_ = HANDLE_ERROR;
else if (tcb_->hbaseAccessTdb().multiVersions())
step_ = SETUP_MULTI_VERSION_ROW;
else
step_ = CREATE_ROW;
}
break;
case SETUP_MULTI_VERSION_ROW:
{
retcode = tcb_->setupSQMultiVersionRow();
if (retcode == HBASE_ACCESS_NO_ROW)
{
step_ = NEXT_ROW;
break;
}
if (retcode < 0)
{
rc = (short)retcode;
tcb_->setupError(rc, "setupSQMultiVersionRow");
step_ = HANDLE_ERROR;
break;
}
step_ = CREATE_ROW;
}
break;
case CREATE_ROW:
{
retcode = tcb_->createSQRowDirect();
if (retcode == HBASE_ACCESS_NO_ROW)
{
step_ = NEXT_ROW;
break;
}
if (retcode < 0)
{
rc = (short)retcode;
tcb_->setupError(rc, "createSQRowDirect");
step_ = HANDLE_ERROR;
break;
}
if (retcode != HBASE_ACCESS_SUCCESS)
{
step_ = HANDLE_ERROR;
break;
}
step_ = APPLY_PRED;
}
break;
case APPLY_PRED:
{
rc = tcb_->applyPred(tcb_->scanExpr());
if (rc == 1)
step_ = RETURN_ROW;
else if (rc == -1)
step_ = HANDLE_ERROR;
else if (tcb_->hbaseAccessTdb().multiVersions())
step_ = CREATE_ROW;
else
step_ = NEXT_ROW;
}
break;
case RETURN_ROW:
{
if (tcb_->moveRowToUpQueue(tcb_->convertRow_, tcb_->convertRowLen_,
&rc, FALSE))
return 1;
if (tcb_->getHbaseAccessStats())
tcb_->getHbaseAccessStats()->incUsedRows();
if ((pentry_down->downState.request == ex_queue::GET_N) &&
(pentry_down->downState.requestValue == tcb_->matches_))
{
step_ = SCAN_CLOSE;
break;
}
if (tcb_->hbaseAccessTdb().multiVersions())
step_ = CREATE_ROW;
else
step_ = NEXT_ROW;
}
break;
case SCAN_CLOSE:
{
if (isFirstBatch) //only if closed happen in a single batch, batchSize - remainingInBatch = nb rows retrieved
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->setUseSmallScannerForMDAMifNeeded(batchSize_ - remainingInBatch); //calculate MDAM small scanner flag for next scan if it was MDAM
retcode = tcb_->ehi_->scanClose();
if (tcb_->setupError(retcode, "ExpHbaseInterface::scanClose"))
step_ = HANDLE_ERROR;
else
step_ = DONE;
}
break;
case HANDLE_ERROR:
{
step_ = NOT_STARTED;
return -1;
}
break;
case DONE:
{
step_ = NOT_STARTED;
return 0;
}
break;
}// switch
} // while
}
Lng32 ExHbaseScanSQTaskTcb::getProbeResult(char* &keyData)
{
Lng32 retcode = 0;
Lng32 rc = 0;
Lng32 probeSize = 100; // using fewer rows results in intermittent wrong
//results. Using the hbase default scan size of 100 as a workarorund.
if (tcb_->hbaseAccessTdb().getHbasePerfAttributes()->useMinMdamProbeSize())
probeSize = 1; // if performance is vital, comp_bool_184 can be set to ON
// to choose this path.
retcode = tcb_->ehi_->scanOpen(tcb_->table_,
tcb_->beginRowId_, tcb_->endRowId_,
tcb_->columns_, -1,
tcb_->hbaseAccessTdb().readUncommittedScan(),
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->cacheBlocks() ||
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->useSmallScannerForProbes(), // when small scanner feature is ON or SYSTEM force cache ON
tcb_->hbaseAccessTdb().getHbasePerfAttributes()->useSmallScannerForProbes(),
probeSize,
TRUE, NULL, NULL, NULL);
if (tcb_->setupError(retcode, "ExpHbaseInterface::scanOpen"))
{
rc = -1;
goto label_return;
}
retcode = tcb_->ehi_->nextRow();
if (retcode == HBASE_ACCESS_EOD || retcode == HBASE_ACCESS_EOR)
{
rc = 1; // no row found
goto label_return;
}
if (tcb_->setupError(retcode, "ExpHbaseInterface::nextRow"))
{
rc = -1;
goto label_return;
}
retcode = tcb_->createSQRowDirect();
if (retcode == HBASE_ACCESS_NO_ROW)
{
rc = 1;
goto label_return;
}
if (retcode < 0)
{
rc = retcode;
tcb_->setupError(rc, "createSQRowDirect");
rc = -1;
goto label_return;
}
if (retcode != HBASE_ACCESS_SUCCESS)
{
rc = -1;
goto label_return;
}
// extract the key from the fetched row, encode it and pass it back to mdam
if (tcb_->evalEncodedKeyExpr() == -1)
{
rc = -1;
goto label_return;
}
label_return:
retcode = tcb_->ehi_->scanClose();
if (tcb_->setupError(retcode, "ExpHbaseInterface::scanClose"))
{
rc = -1;
}
keyData = tcb_->encodedKeyRow_;
return rc;
}
ExHbaseGetTaskTcb::ExHbaseGetTaskTcb(
ExHbaseAccessSelectTcb * tcb)
: ExHbaseTaskTcb(tcb)
, step_(NOT_STARTED)
{
}
void ExHbaseGetTaskTcb::init()
{
step_ = NOT_STARTED;
}
ExWorkProcRetcode ExHbaseGetTaskTcb::work(short &rc)
{
Lng32 retcode = 0;
rc = 0;
Lng32 remainingInBatch = batchSize_;
while (1)
{
ex_queue_entry *pentry_down = tcb_->qparent_.down->getHeadEntry();
switch (step_)
{
case NOT_STARTED:
{
step_ = GET_OPEN;
}
break;
case GET_OPEN:
{
tcb_->table_.val = tcb_->hbaseAccessTdb().getTableName();
tcb_->table_.len = strlen(tcb_->hbaseAccessTdb().getTableName());
if (tcb_->evalRowIdExpr(TRUE) == -1)
{
step_ = HANDLE_ERROR;
break;
}
if (tcb_->rowIds_.entries() == 1)
{
retcode = tcb_->ehi_->getRowOpen(tcb_->table_, tcb_->rowIds_[0],
tcb_->columns_, -1);
if (tcb_->setupError(retcode, "ExpHbaseInterface::getRowOpen"))
step_ = HANDLE_ERROR;
else
step_ = NEXT_ROW;
}
else
{
retcode = tcb_->ehi_->getRowsOpen(tcb_->table_, &tcb_->rowIds_,
tcb_->columns_, -1);
if (tcb_->setupError(retcode, "ExpHbaseInterface::getRowsOpen"))
step_ = HANDLE_ERROR;
else
step_ = NEXT_ROW;
}
}
break;
case NEXT_ROW:
{
if (--remainingInBatch <= 0)
{
rc = WORK_CALL_AGAIN;
return 1;
}
retcode = tcb_->ehi_->nextRow();
if (retcode == HBASE_ACCESS_EOD || retcode == HBASE_ACCESS_EOR)
{
step_ = GET_CLOSE;
break;
}
if (tcb_->setupError(retcode, "ExpHbaseInterface::nextRow"))
step_ = HANDLE_ERROR;
else
step_ = NEXT_CELL;
}
break;
case NEXT_CELL:
{
if (tcb_->colVal_.val == NULL)
tcb_->colVal_.val = new (tcb_->getHeap())
char[tcb_->hbaseAccessTdb().convertRowLen()];
tcb_->hbaseAccessTdb().convertRowLen();
retcode = tcb_->ehi_->nextCell( tcb_->rowId_, tcb_->colFamName_,
tcb_->colName_, tcb_->colVal_, tcb_->colTS_);
if (retcode == HBASE_ACCESS_EOD)
{
step_ = NEXT_ROW;
break;
}
if (tcb_->setupError(retcode, "ExpHbaseInterface::nextCell"))
step_ = HANDLE_ERROR;
else
step_ = CREATE_ROW;
}
break;
case CREATE_ROW:
{
rc = tcb_->createColumnwiseRow();
if (rc == -1)
{
step_ = HANDLE_ERROR;
break;
}
else if (tcb_->setupError(rc, "ExHbaseAccessTcb::createColumnwiseRow", "Not enough space in target buffer to move data"))
{
step_ = HANDLE_ERROR;
break;
}
step_ = APPLY_PRED;
}
break;
case APPLY_PRED:
{
rc = tcb_->applyPred(tcb_->scanExpr());
if (rc == 1)
step_ = RETURN_ROW;
else if (rc == -1)
step_ = HANDLE_ERROR;
else
step_ = NEXT_CELL;
}
break;
case RETURN_ROW:
{
rc = 0;
if (tcb_->moveRowToUpQueue(tcb_->convertRow_, tcb_->hbaseAccessTdb().convertRowLen(),
&rc, FALSE))
return 1;
if (tcb_->getHbaseAccessStats())
tcb_->getHbaseAccessStats()->incUsedRows();
if ((pentry_down->downState.request == ex_queue::GET_N) &&
(pentry_down->downState.requestValue == tcb_->matches_))
{
step_ = GET_CLOSE;
break;
}
step_ = NEXT_CELL;
}
break;
case GET_CLOSE:
{
retcode = tcb_->ehi_->getClose();
if (tcb_->setupError(retcode, "ExpHbaseInterface::getClose"))
step_ = HANDLE_ERROR;
else
step_ = DONE;
}
break;
case HANDLE_ERROR:
{
step_ = NOT_STARTED;
return -1;
}
break;
case DONE:
{
step_ = NOT_STARTED;
return 0;
}
break;
}// switch
} // while
}
ExHbaseGetRowwiseTaskTcb::ExHbaseGetRowwiseTaskTcb(
ExHbaseAccessSelectTcb * tcb)
: ExHbaseTaskTcb(tcb)
, step_(NOT_STARTED)
{
}
void ExHbaseGetRowwiseTaskTcb::init()
{
step_ = NOT_STARTED;
}
ExWorkProcRetcode ExHbaseGetRowwiseTaskTcb::work(short &rc)
{
Lng32 retcode = 0;
rc = 0;
Lng32 remainingInBatch = batchSize_;
while (1)
{
ex_queue_entry *pentry_down = tcb_->qparent_.down->getHeadEntry();
switch (step_)
{
case NOT_STARTED:
{
step_ = GET_OPEN;
}
break;
case GET_OPEN:
{
tcb_->table_.val = tcb_->hbaseAccessTdb().getTableName();
tcb_->table_.len = strlen(tcb_->hbaseAccessTdb().getTableName());
if (tcb_->evalRowIdExpr(TRUE) == -1)
{
step_ = HANDLE_ERROR;
break;
}
if (tcb_->rowIds_.entries() == 1)
{
retcode = tcb_->ehi_->getRowOpen(tcb_->table_, tcb_->rowIds_[0],
tcb_->columns_, -1);
if (tcb_->setupError(retcode, "ExpHbaseInterface::getRowOpen"))
step_ = HANDLE_ERROR;
else
step_ = NEXT_ROW;
}
else
{
retcode = tcb_->ehi_->getRowsOpen(tcb_->table_, &tcb_->rowIds_,
tcb_->columns_, -1);
if (tcb_->setupError(retcode, "ExpHbaseInterface::getRowsOpen"))
step_ = HANDLE_ERROR;
else
step_ = NEXT_ROW;
}
}
break;
case NEXT_ROW:
{
if (--remainingInBatch <= 0)
{
rc = WORK_CALL_AGAIN;
return 1;
}
retcode = tcb_->ehi_->nextRow();
if (retcode == HBASE_ACCESS_EOD || retcode == HBASE_ACCESS_EOR)
{
step_ = GET_CLOSE;
break;
}
if (tcb_->setupError(retcode, "ExpHbaseInterface::nextRow"))
step_ = HANDLE_ERROR;
else
step_ = NEXT_CELL;
}
break;
case NEXT_CELL:
{
if (tcb_->colVal_.val == NULL)
tcb_->colVal_.val = new (tcb_->getHeap())
char[tcb_->hbaseAccessTdb().convertRowLen()];
tcb_->colVal_.len = tcb_->hbaseAccessTdb().convertRowLen();
retcode = tcb_->ehi_->nextCell(tcb_->rowId_, tcb_->colFamName_,
tcb_->colName_, tcb_->colVal_, tcb_->colTS_);
if (retcode == HBASE_ACCESS_EOD)
step_ = CREATE_ROW;
else
if (tcb_->setupError(retcode, "ExpHbaseInterface::nextCell"))
step_ = HANDLE_ERROR;
else
step_ = APPEND_ROW;
}
break;
case APPEND_ROW:
{
retcode = tcb_->copyCell();
if (tcb_->setupError(retcode, "ExHbaseAccessTcb::copyCell", "Not enough space in target buffer to move data"))
step_ = HANDLE_ERROR;
else
step_ = NEXT_CELL;
}
break;
case CREATE_ROW:
{
rc = tcb_->createRowwiseRow();
if (rc == -1)
{
step_ = HANDLE_ERROR;
break;
}
step_ = APPLY_PRED;
}
break;
case APPLY_PRED:
{
rc = tcb_->applyPred(tcb_->scanExpr());
if (rc == 1)
step_ = RETURN_ROW;
else if (rc == -1)
step_ = HANDLE_ERROR;
else
step_ = GET_CLOSE;
}
break;
case RETURN_ROW:
{
rc = 0;
if (tcb_->moveRowToUpQueue(tcb_->convertRow_, tcb_->hbaseAccessTdb().convertRowLen(),
&rc, FALSE))
return 1;
if (tcb_->getHbaseAccessStats())
tcb_->getHbaseAccessStats()->incUsedRows();
step_ = GET_CLOSE;
}
break;
case GET_CLOSE:
{
retcode = tcb_->ehi_->getClose();
if (tcb_->setupError(retcode, "ExpHbaseInterface::getClose"))
step_ = HANDLE_ERROR;
else
step_ = DONE;
}
break;
case HANDLE_ERROR:
{
step_ = NOT_STARTED;
return -1;
}
break;
case DONE:
{
step_ = NOT_STARTED;
return 0;
}
break;
}// switch
} // while
}
ExHbaseGetSQTaskTcb::ExHbaseGetSQTaskTcb( ExHbaseAccessTcb * tcb, NABoolean rowsetTcb)
: ExHbaseTaskTcb(tcb, rowsetTcb)
, step_(NOT_STARTED)
{
}
void ExHbaseGetSQTaskTcb::init()
{
step_ = NOT_STARTED;
}
ExWorkProcRetcode ExHbaseGetSQTaskTcb::work(short &rc)
{
Lng32 retcode = 0;
rc = 0;
while (1)
{
ex_queue_entry *pentry_down = NULL;
if (! tcb_->qparent_.down->isEmpty())
pentry_down = tcb_->qparent_.down->getHeadEntry();
switch (step_)
{
case NOT_STARTED:
{
step_ = GET_OPEN;
}
break;
case GET_OPEN:
{
tcb_->table_.val = tcb_->hbaseAccessTdb().getTableName();
tcb_->table_.len = strlen(tcb_->hbaseAccessTdb().getTableName());
remainingInBatch_ = tcb_->rowIds_.entries();
if (tcb_->rowIds_.entries() == 1)
{
retcode = tcb_->ehi_->getRowOpen(tcb_->table_, tcb_->rowIds_[0],
tcb_->columns_, -1);
if (tcb_->setupError(retcode, "ExpHbaseInterface::getRowOpen"))
step_ = HANDLE_ERROR;
else
step_ = NEXT_ROW;
}
else
{
retcode = tcb_->ehi_->getRowsOpen(tcb_->table_, &tcb_->rowIds_,
tcb_->columns_, -1);
if (tcb_->setupError(retcode, "ExpHbaseInterface::getRowsOpen"))
step_ = HANDLE_ERROR;
else
step_ = NEXT_ROW;
}
}
break;
case NEXT_ROW:
{
if (remainingInBatch_ <= 0) {
step_ = GET_CLOSE;
break;
}
retcode = tcb_->ehi_->nextRow();
remainingInBatch_--;
// EOD is end of data, EOR is end of result set.
// for single get, EOD or EOR indicates DONE
// for multi get, only EOR indicates DONE
if ( (retcode == HBASE_ACCESS_EOR) ||
( (retcode == HBASE_ACCESS_EOD) &&
(tcb_->rowIds_.entries() == 1) ) )
{
if (rowsetTcb_)
step_ = DONE;
else
step_ = GET_CLOSE;
break;
}
// for multi get, do FETCH if retcode is EOD
if ( (retcode == HBASE_ACCESS_EOD) &&
(tcb_->rowIds_.entries() > 1) )
{
if (rowsetTcb_)
step_ = DONE;
else
step_ = NEXT_ROW;
break;
}
if (tcb_->setupError(retcode, "ExpHbaseInterface::nextRow"))
step_ = HANDLE_ERROR;
else
step_ = CREATE_ROW;
}
break;
case CREATE_ROW:
{
retcode = tcb_->createSQRowDirect();
if (retcode == HBASE_ACCESS_NO_ROW)
{
if (rowsetTcb_)
step_ = DONE;
else
step_ = NEXT_ROW;
break;
}
if (retcode < 0)
{
rc = (short)retcode;
tcb_->setupError(rc, "createSQRowDirect");
step_ = HANDLE_ERROR;
break;
}
if (retcode != HBASE_ACCESS_SUCCESS)
{
step_ = HANDLE_ERROR;
break;
}
step_ = APPLY_PRED;
}
break;
case APPLY_PRED:
{
rc = tcb_->applyPred(tcb_->scanExpr());
if (rc == 1)
step_ = RETURN_ROW;
else if (rc == -1)
step_ = HANDLE_ERROR;
else
{
if (rowsetTcb_)
step_ = DONE;
else
step_ = NEXT_ROW;
}
}
break;
case RETURN_ROW:
{
rc = 0;
if (tcb_->moveRowToUpQueue(tcb_->convertRow_, tcb_->hbaseAccessTdb().convertRowLen(),
&rc, FALSE))
return 1;
if (tcb_->getHbaseAccessStats())
tcb_->getHbaseAccessStats()->incUsedRows();
if (rowsetTcb_)
step_ = DONE;
else
{
if ((pentry_down->downState.request == ex_queue::GET_N) &&
(pentry_down->downState.requestValue == tcb_->matches_))
{
step_ = GET_CLOSE;
break;
}
step_ = NEXT_ROW;
}
}
break;
case GET_CLOSE:
{
retcode = tcb_->ehi_->getClose();
if (tcb_->setupError(retcode, "ExpHbaseInterface::getClose"))
step_ = HANDLE_ERROR;
else
step_ = ALL_DONE;
}
break;
case HANDLE_ERROR:
{
step_ = NOT_STARTED;
return -1;
}
break;
case DONE:
if (tcb_->handleDone(rc, 0))
return 1;
else
step_ = NEXT_ROW;
break;
case ALL_DONE:
step_ = NOT_STARTED;
return 0;
break;
}// switch
} // while
return 0;
}
ExHbaseAccessSelectTcb::ExHbaseAccessSelectTcb(
const ExHbaseAccessTdb &hbaseAccessTdb,
ex_globals * glob ) :
ExHbaseAccessTcb(hbaseAccessTdb, glob),
step_(NOT_STARTED)
{
scanRowwiseTaskTcb_ = NULL;
scanTaskTcb_ = NULL;
getRowwiseTaskTcb_ = NULL;
getTaskTcb_ = NULL;
scanSQTaskTcb_ = NULL;
getSQTaskTcb_ = NULL;
ExHbaseAccessTdb &hbaseTdb = (ExHbaseAccessTdb&)hbaseAccessTdb;
samplePercentage_ = hbaseTdb.samplingRate_;
if ((hbaseTdb.listOfScanRows()) ||
((hbaseTdb.keySubsetGen()) &&
(NOT hbaseTdb.uniqueKeyInfo())) ||
(hbaseTdb.keyMDAMGen()))
{
if (hbaseTdb.sqHbaseTable())
scanSQTaskTcb_ =
new(getGlobals()->getDefaultHeap()) ExHbaseScanSQTaskTcb(this);
else if (hbaseTdb.rowwiseFormat())
scanRowwiseTaskTcb_ =
new(getGlobals()->getDefaultHeap()) ExHbaseScanRowwiseTaskTcb(this);
else
scanTaskTcb_ =
new(getGlobals()->getDefaultHeap()) ExHbaseScanTaskTcb(this);
}
if ((hbaseTdb.listOfGetRows()) ||
((hbaseTdb.keySubsetGen()) &&
(hbaseTdb.uniqueKeyInfo())) ||
(hbaseTdb.keyMDAMGen()))
{
if (hbaseTdb.sqHbaseTable())
getSQTaskTcb_ =
new(getGlobals()->getDefaultHeap()) ExHbaseGetSQTaskTcb(this, FALSE);
else if (hbaseTdb.rowwiseFormat())
getRowwiseTaskTcb_ =
new(getGlobals()->getDefaultHeap()) ExHbaseGetRowwiseTaskTcb(this);
else
getTaskTcb_ =
new(getGlobals()->getDefaultHeap()) ExHbaseGetTaskTcb(this);
}
if (hbaseTdb.sqHbaseTable())
{
scanTask_ = scanSQTaskTcb_;
getTask_ = getSQTaskTcb_;
}
else if (hbaseTdb.rowwiseFormat())
{
scanTask_ = scanRowwiseTaskTcb_;
getTask_ = getRowwiseTaskTcb_;
}
else
{
scanTask_ = scanTaskTcb_;
getTask_ = getTaskTcb_;
}
}
ExWorkProcRetcode ExHbaseAccessSelectTcb::work()
{
Lng32 retcode = 0;
short rc = 0;
while (!qparent_.down->isEmpty())
{
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
if ((pentry_down->downState.request == ex_queue::GET_NOMORE) &&
(step_ != DONE))
{
step_ = SELECT_CLOSE_NO_ERROR; //DONE;
}
switch (step_)
{
case NOT_STARTED:
{
matches_ = 0;
if ((pentry_down->downState.request == ex_queue::GET_N) &&
(pentry_down->downState.requestValue == matches_))
{
step_ = DONE;
break;
}
step_ = SELECT_INIT;
}
break;
case SELECT_INIT:
{
retcode = ehi_->init(getHbaseAccessStats());
if (setupError(retcode, "ExpHbaseInterface::init"))
{
step_ = HANDLE_ERROR;
break;
}
if (hbaseAccessTdb().listOfScanRows())
hbaseAccessTdb().listOfScanRows()->position();
if (hbaseAccessTdb().listOfGetRows())
hbaseAccessTdb().listOfGetRows()->position();
if (scanTask_)
scanTask_->init();
if (getTask_)
getTask_->init();
step_ = SETUP_SCAN;
}
break;
case SETUP_SCAN:
{
if ((! scanTask_) || (! hbaseAccessTdb().listOfScanRows()))
{
step_ = SETUP_GET;
break;
}
hsr_ =
(ComTdbHbaseAccess::HbaseScanRows*)hbaseAccessTdb().listOfScanRows()
->getCurr();
retcode = setupSubsetRowIdsAndCols(hsr_);
if (retcode == -1)
{
step_ = HANDLE_ERROR;
break;
}
step_ = PROCESS_SCAN;
}
break;
case PROCESS_SCAN:
case PROCESS_SCAN_KEY:
{
rc = 0;
retcode = scanTask_->work(rc);
if (retcode == 1)
return rc;
else if (retcode < 0)
step_ = HANDLE_ERROR;
else if (step_ == PROCESS_SCAN_KEY)
step_ = SETUP_GET_KEY;
else
step_ = NEXT_SCAN;
}
break;
case NEXT_SCAN:
{
hbaseAccessTdb().listOfScanRows()->advance();
if (! hbaseAccessTdb().listOfScanRows()->atEnd())
{
step_ = SETUP_SCAN;
break;
}
step_ = SETUP_GET;
}
break;
case SETUP_GET:
{
if ((! getTask_) || (!hbaseAccessTdb().listOfGetRows()))
{
step_ = SETUP_SCAN_KEY;
break;
}
hgr_ =
(ComTdbHbaseAccess::HbaseGetRows*)hbaseAccessTdb().listOfGetRows()
->getCurr();
retcode = setupUniqueRowIdsAndCols(hgr_);
if (retcode == -1)
{
step_ = HANDLE_ERROR;
break;
}
step_ = PROCESS_GET;
}
break;
case PROCESS_GET:
case PROCESS_GET_KEY:
{
rc = 0;
retcode = getTask_->work(rc);
if (retcode == 1)
return rc;
else if (retcode < 0)
step_ = HANDLE_ERROR;
else if (step_ == PROCESS_GET_KEY)
step_ = SELECT_CLOSE;
else
step_ = NEXT_GET;
}
break;
case NEXT_GET:
{
hbaseAccessTdb().listOfGetRows()->advance();
if (! hbaseAccessTdb().listOfGetRows()->atEnd())
{
step_ = SETUP_GET;
break;
}
step_ = SETUP_SCAN_KEY;
}
break;
case SETUP_SCAN_KEY:
{
if (! hbaseAccessTdb().keySubsetGen())
{
step_ = SELECT_CLOSE;
break;
}
if (hbaseAccessTdb().uniqueKeyInfo())
{
step_ = SETUP_GET_KEY;
break;
}
retcode = setupSubsetKeysAndCols();
if (retcode == -1)
{
step_ = HANDLE_ERROR;
break;
}
step_ = PROCESS_SCAN_KEY;
}
break;
case SETUP_GET_KEY:
{
if ((! getTask_) ||
((hbaseAccessTdb().keySubsetGen()) &&
(NOT hbaseAccessTdb().uniqueKeyInfo())))
{
step_ = SELECT_CLOSE;
break;
}
retcode = setupUniqueKeyAndCols(TRUE);
if (retcode == -1)
{
step_ = HANDLE_ERROR;
break;
}
step_ = PROCESS_GET_KEY;
}
break;
case SELECT_CLOSE:
case SELECT_CLOSE_NO_ERROR:
{
retcode = ehi_->close();
if (step_ == SELECT_CLOSE)
{
if (setupError(retcode, "ExpHbaseInterface::close"))
{
step_ = HANDLE_ERROR_NO_CLOSE;
break;
}
}
step_ = DONE;
}
break;
case HANDLE_ERROR:
case HANDLE_ERROR_NO_CLOSE:
{
if (handleError(rc))
return rc;
if (step_ == HANDLE_ERROR)
retcode = ehi_->close();
step_ = DONE;
}
break;
case DONE:
{
if (handleDone(rc))
return rc;
if (scanTask_)
scanTask_->init();
if (getTask_)
getTask_->init();
step_ = NOT_STARTED;
}
break;
} // switch
} // while
return WORK_OK;
}
ExHbaseAccessMdamSelectTcb::ExHbaseAccessMdamSelectTcb(
const ExHbaseAccessTdb &hbaseAccessTdb,
ex_globals * glob ) :
ExHbaseAccessSelectTcb(hbaseAccessTdb, glob),
step_(NOT_STARTED)
{
}
ExWorkProcRetcode ExHbaseAccessMdamSelectTcb::work()
{
Lng32 retcode = 0;
short rc = 0;
while (!qparent_.down->isEmpty())
{
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
if ((pentry_down->downState.request == ex_queue::GET_NOMORE) &&
(step_ != DONE))
{
step_ = SELECT_CLOSE_NO_ERROR;
}
switch (step_)
{
case NOT_STARTED:
{
matches_ = 0;
matchesBeforeFetch_ = 0;
if ((pentry_down->downState.request == ex_queue::GET_N) &&
(pentry_down->downState.requestValue == matches_))
{
step_ = DONE;
break;
}
step_ = SELECT_INIT;
}
break;
case SELECT_INIT:
{
retcode = ehi_->init(getHbaseAccessStats());
if (setupError(retcode, "ExpHbaseInterface::init"))
{
step_ = HANDLE_ERROR;
break;
}
table_.val = hbaseAccessTdb().getTableName();
table_.len = strlen(hbaseAccessTdb().getTableName());
step_ = INIT_NEXT_KEY_RANGE;
}
break;
case INIT_NEXT_KEY_RANGE:
{
retcode = initNextKeyRange(pool_, pentry_down->getAtp());
if (retcode == -1)
{
step_ = HANDLE_ERROR;
break;
}
setupListOfColNames(hbaseAccessTdb().listOfFetchedColNames(),
columns_);
fetchRangeHadRows_ = TRUE;
step_ = GET_NEXT_KEY_RANGE;
}
break;
case GET_NEXT_KEY_RANGE:
{
keyRangeEx::getNextKeyRangeReturnType
keyRangeStatus = setupSubsetKeys(fetchRangeHadRows_);
fetchRangeHadRows_ = FALSE;
if (keyRangeStatus == keyRangeEx::EXPRESSION_ERROR)
{
step_ = HANDLE_ERROR;
break;
}
if (keyRangeStatus == keyRangeEx::NO_MORE_RANGES)
{
step_ = SELECT_CLOSE;
break;
}
if (keyRangeStatus == keyRangeEx::PROBE_RANGE)
step_ = PROCESS_PROBE_RANGE;
else if (keyRangeStatus == keyRangeEx::FETCH_RANGE)
{
matchesBeforeFetch_ = matches_;
step_ = PROCESS_FETCH_RANGE;
}
else
step_ = HANDLE_ERROR;
}
break;
case PROCESS_PROBE_RANGE:
{
char * keyData = NULL;
retcode = scanSQTaskTcb_->getProbeResult(keyData);
if (retcode == -1)
{
step_ = HANDLE_ERROR;
break;
}
if (retcode == 1) // no rows found
{
keyExeExpr()->reportProbeResult(0);
step_ = GET_NEXT_KEY_RANGE;
break;
}
// pass the key value to the mdam generator
keyExeExpr()->reportProbeResult(keyData);
step_ = GET_NEXT_KEY_RANGE;
}
break;
case PROCESS_FETCH_RANGE:
{
rc = 0;
retcode = scanSQTaskTcb_->work(rc);
if (retcode == 1)
return rc;
else if (retcode < 0)
step_ = HANDLE_ERROR;
else
{
if ((pentry_down->downState.request == ex_queue::GET_N) &&
(pentry_down->downState.requestValue == matches_))
{
step_ = SELECT_CLOSE;
break;
}
if (matches_ > matchesBeforeFetch_)
fetchRangeHadRows_ = TRUE;
step_ = GET_NEXT_KEY_RANGE;
}
}
break;
case SELECT_CLOSE:
case SELECT_CLOSE_NO_ERROR:
{
retcode = ehi_->close();
if (step_ == SELECT_CLOSE)
{
if (setupError(retcode, "ExpHbaseInterface::close"))
{
step_ = HANDLE_ERROR_NO_CLOSE;
break;
}
}
step_ = DONE;
}
break;
case HANDLE_ERROR:
case HANDLE_ERROR_NO_CLOSE:
{
if (handleError(rc))
return rc;
if (step_ == HANDLE_ERROR)
retcode = ehi_->close();
step_ = DONE;
}
break;
case DONE:
{
if (handleDone(rc))
return rc;
step_ = NOT_STARTED;
}
break;
} // switch
} // while
return WORK_OK;
}
ExHbaseCoProcAggrTcb::ExHbaseCoProcAggrTcb(
const ComTdbHbaseCoProcAggr &hbaseAccessTdb,
ex_globals * glob ) :
ExHbaseAccessTcb(hbaseAccessTdb, glob),
step_(NOT_STARTED)
{
}
ExWorkProcRetcode ExHbaseCoProcAggrTcb::work()
{
Lng32 retcode = 0;
short rc = 0;
while (!qparent_.down->isEmpty())
{
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
if ((pentry_down->downState.request == ex_queue::GET_NOMORE) &&
(step_ != DONE))
{
// step_ = SELECT_CLOSE_NO_ERROR;
}
switch (step_)
{
case NOT_STARTED:
{
matches_ = 0;
step_ = COPROC_INIT;
}
break;
case COPROC_INIT:
{
retcode = ehi_->init(getHbaseAccessStats());
if (setupError(retcode, "ExpHbaseInterface::init"))
{
step_ = HANDLE_ERROR;
break;
}
table_.val = hbaseAccessTdb().getTableName();
table_.len = strlen(hbaseAccessTdb().getTableName());
hbaseAccessTdb().listOfFetchedColNames()->position();
hbaseAccessTdb().listOfAggrTypes()->position();
aggrIdx_ = 0;
step_ = COPROC_EVAL;
}
break;
case COPROC_EVAL:
{
Lng32 aggrType = *(short*)hbaseAccessTdb().listOfAggrTypes()->getCurr();
char * col = (char*)hbaseAccessTdb().listOfFetchedColNames()->getCurr();
Text aggrVal;
Text colFam;
Text colName;
retcode = extractColFamilyAndName(col, colFam, colName);
if (retcode)
{
step_ = HANDLE_ERROR;
break;
}
retcode = ehi_->coProcAggr(table_,
aggrType,
"", // startRow
"", // stopRow
colFam,
colName,
FALSE, // cacheBlocks
100, //numCacheRows
aggrVal);
if (setupError(retcode, "ExpHbaseInterface::coProcAggr"))
{
step_ = HANDLE_ERROR;
break;
}
ExpTupleDesc * convertTuppTD =
hbaseAccessTdb().workCriDesc_->getTupleDescriptor
(hbaseAccessTdb().convertTuppIndex_);
Attributes * attr = convertTuppTD->getAttr(aggrIdx_);
if (! attr)
{
step_ = HANDLE_ERROR;
break;
}
if (attr->getNullFlag())
{
*(short*)&convertRow_[attr->getNullIndOffset()] = 0;
}
str_cpy_all(&convertRow_[attr->getOffset()], aggrVal.data(), aggrVal.length());
hbaseAccessTdb().listOfAggrTypes()->advance();
hbaseAccessTdb().listOfFetchedColNames()->advance();
aggrIdx_++;
if (hbaseAccessTdb().listOfAggrTypes()->atEnd())
{
step_ = RETURN_ROW;
break;
}
}
break;
case RETURN_ROW:
{
short rc = 0;
if (moveRowToUpQueue(convertRow_, hbaseAccessTdb().convertRowLen(),
&rc, FALSE))
return 1;
step_ = DONE;
}
break;
case HANDLE_ERROR:
{
if (handleError(rc))
return rc;
retcode = ehi_->close();
step_ = DONE;
}
break;
case DONE:
{
if (handleDone(rc))
return rc;
step_ = NOT_STARTED;
}
break;
} // switch
} // while
return WORK_OK;
}