// **********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
// **********************************************************************

#include "Platform.h"

#include "ex_stdh.h"
#include "ComTdb.h"
#include "ex_tcb.h"
#include "ExHbaseAccess.h"
#include "ex_exe_stmt_globals.h"

#include "ExpLOBinterface.h"

#include "SQLTypeDefs.h"

#include "ExpHbaseInterface.h"

ExHbaseScanTaskTcb::ExHbaseScanTaskTcb(
				       ExHbaseAccessSelectTcb * tcb)
  :  ExHbaseTaskTcb(tcb)
  , step_(NOT_STARTED)
{
}

void ExHbaseScanTaskTcb::init() 
{
  step_ = NOT_STARTED;
}

ExWorkProcRetcode ExHbaseScanTaskTcb::work(short &rc)
{
  Lng32 retcode = 0;
  rc = 0;
  Lng32 remainingInBatch = batchSize_;

  while (1)
    {
      ex_queue_entry *pentry_down = tcb_->qparent_.down->getHeadEntry();
      
      switch (step_)
	{
	case NOT_STARTED:
	  {
	    step_ = SCAN_OPEN;
	  }
	  break;

	case SCAN_OPEN:
	  {

	    tcb_->table_.val = tcb_->hbaseAccessTdb().getTableName();
	    tcb_->table_.len = strlen(tcb_->hbaseAccessTdb().getTableName());

	    if (tcb_->setupHbaseFilterPreds())
	      {
		step_ = HANDLE_ERROR;
		break;
	      }
	    retcode = tcb_->ehi_->scanOpen(tcb_->table_, 
					   tcb_->beginRowId_, tcb_->endRowId_,
					   tcb_->columns_, -1,
					   tcb_->hbaseAccessTdb().readUncommittedScan(),
					   tcb_->hbaseAccessTdb().getHbasePerfAttributes()->cacheBlocks(),
					   tcb_->hbaseAccessTdb().getHbasePerfAttributes()->useSmallScanner(),
					   tcb_->hbaseAccessTdb().getHbasePerfAttributes()->numCacheRows(),
					   FALSE, 
					   (tcb_->hbaseFilterColumns_.entries() > 0 ?
					    &tcb_->hbaseFilterColumns_ : NULL),
					   (tcb_->hbaseFilterOps_.entries() > 0 ?
					    &tcb_->hbaseFilterOps_ : NULL),
					   (tcb_->hbaseFilterValues_.entries() > 0 ?
					    &tcb_->hbaseFilterValues_ : NULL),
                       tcb_->hbaseAccessTdb().getHbasePerfAttributes()->dopParallelScanner(),
					   tcb_->getSamplePercentage(),
                                           FALSE, 0, NULL, NULL, 0,
                                           (tcb_->hbaseAccessTdb().getHbaseAccessOptions() 
                                            ? tcb_->hbaseAccessTdb().getHbaseAccessOptions()->getNumVersions() : 0));
                                           
	    if (tcb_->setupError(retcode, "ExpHbaseInterface::scanOpen"))
	      step_ = HANDLE_ERROR;
	    else
	      step_ = NEXT_ROW;
	  }
	  break;

	case NEXT_ROW:
 	  {
            if (--remainingInBatch <= 0)
            {
              rc = WORK_CALL_AGAIN;
              return 1;
            }

	    retcode = tcb_->ehi_->nextRow();
	    if (retcode == HBASE_ACCESS_EOD || retcode == HBASE_ACCESS_EOR)
	    {
	       step_ = SCAN_CLOSE;
	       break;
	    }
	    if (tcb_->setupError(retcode, "ExpHbaseInterface::nextRow"))
	       step_ = HANDLE_ERROR;
	    else
	       step_ = NEXT_CELL;
	  }
	  break;
	case NEXT_CELL:
          {
 	    if (tcb_->colVal_.val == NULL)
	       tcb_->colVal_.val = new (tcb_->getHeap()) 
	          char[tcb_->hbaseAccessTdb().convertRowLen()];
	    tcb_->colVal_.len = tcb_->hbaseAccessTdb().convertRowLen();        
	    retcode = tcb_->ehi_->nextCell(tcb_->rowId_, tcb_->colFamName_, 
	       tcb_->colName_, tcb_->colVal_, tcb_->colTS_);
	    if (retcode == HBASE_ACCESS_EOD)
	    {
               step_ = NEXT_ROW;
               break;
	    }
	    if (tcb_->setupError(retcode, "ExpHbaseInterface::nextCell"))
               step_ = HANDLE_ERROR;
	    else
               step_ = CREATE_ROW;
	  }
	  break;

	case CREATE_ROW:
	  {
	    rc = tcb_->createColumnwiseRow();
	    if (rc == -1)
	      {
		step_ = HANDLE_ERROR;
		break;
	      }
            else if (tcb_->setupError(rc, "ExHbaseAccessTcb::createColumnwiseRow", "Not enough space in target buffer to move data"))
              {
                step_ = HANDLE_ERROR;
                break;
              }
            
	    step_ = APPLY_PRED;
	  }
	  break;

	  case APPLY_PRED:
	  {
	    rc = tcb_->applyPred(tcb_->scanExpr());
	    if (rc == 1)
	      step_ = RETURN_ROW;
	    else if (rc == -1)
	      step_ = HANDLE_ERROR;
	    else
	      step_ = NEXT_CELL;
	  }
	  break;

	case RETURN_ROW:
	  {
	    if (tcb_->moveRowToUpQueue(tcb_->convertRow_, 
                      tcb_->hbaseAccessTdb().convertRowLen(), &rc, FALSE))
	      return 1;
	    
	    if (tcb_->getHbaseAccessStats())
	      tcb_->getHbaseAccessStats()->incUsedRows();

	    if ((pentry_down->downState.request == ex_queue::GET_N) &&
		(pentry_down->downState.requestValue == tcb_->matches_))
	      {
		step_ = SCAN_CLOSE;
		break;
	      }

	    step_ = NEXT_CELL;
	  }
	  break;

	case SCAN_CLOSE:
	  {
	    retcode = tcb_->ehi_->scanClose();
	    if (tcb_->setupError(retcode, "ExpHbaseInterface::scanClose"))
	      step_ = HANDLE_ERROR;
	    else
	      step_ = DONE;
	  }
	  break;

	case HANDLE_ERROR:
	  {
	    step_ = NOT_STARTED;
	    return -1;
	  }
	  break;

	case DONE:
	  {
	    step_ = NOT_STARTED;
	    return 0;
	  }
	  break;

	}// switch

    } // while
}

ExHbaseScanRowwiseTaskTcb::ExHbaseScanRowwiseTaskTcb(ExHbaseAccessSelectTcb * tcb)
  :  ExHbaseTaskTcb(tcb)
  , step_(NOT_STARTED)
{
}

void ExHbaseScanRowwiseTaskTcb::init() 
{
  step_ = NOT_STARTED;
}

ExWorkProcRetcode ExHbaseScanRowwiseTaskTcb::work(short &rc)
{
  Lng32 retcode = 0;
  rc = 0;
  Lng32 remainingInBatch = batchSize_;

  while (1)
    {
      ex_queue_entry *pentry_down = tcb_->qparent_.down->getHeadEntry();
      
      switch (step_)
	{
	case NOT_STARTED:
	  {
	    step_ = SCAN_OPEN;
	  }
	  break;

	case SCAN_OPEN:
	  {
	    tcb_->table_.val = tcb_->hbaseAccessTdb().getTableName();
	    tcb_->table_.len = strlen(tcb_->hbaseAccessTdb().getTableName());

	    if (tcb_->setupHbaseFilterPreds())
	      {
		step_ = HANDLE_ERROR;
		break;
	      }

	    retcode = tcb_->ehi_->scanOpen(tcb_->table_, 
					   tcb_->beginRowId_, tcb_->endRowId_,
					   tcb_->columns_, -1,
					   tcb_->hbaseAccessTdb().readUncommittedScan(),
					   tcb_->hbaseAccessTdb().getHbasePerfAttributes()->cacheBlocks(),
					   tcb_->hbaseAccessTdb().getHbasePerfAttributes()->useSmallScanner(),
					   tcb_->hbaseAccessTdb().getHbasePerfAttributes()->numCacheRows(),
					   FALSE, 
					   (tcb_->hbaseFilterColumns_.entries() > 0 ?
					    &tcb_->hbaseFilterColumns_ : NULL),
					   (tcb_->hbaseFilterOps_.entries() > 0 ?
					    &tcb_->hbaseFilterOps_ : NULL),
					   (tcb_->hbaseFilterValues_.entries() > 0 ?
					    &tcb_->hbaseFilterValues_ : NULL),
                       tcb_->hbaseAccessTdb().getHbasePerfAttributes()->dopParallelScanner(),
					   tcb_->getSamplePercentage(),
                                           FALSE, 0, NULL, NULL, 0,
                                           (tcb_->hbaseAccessTdb().getHbaseAccessOptions() 
                                            ? tcb_->hbaseAccessTdb().getHbaseAccessOptions()->getNumVersions() : 0));
                                           
	    if (tcb_->setupError(retcode, "ExpHbaseInterface::scanOpen"))
	      step_ = HANDLE_ERROR;
	    else
	      step_ = NEXT_ROW;

	    tcb_->isEOD_ = FALSE;
	  }
	  break;

	case NEXT_ROW:
	  {
            if (--remainingInBatch <= 0)
            {
              rc = WORK_CALL_AGAIN;
              return 1;
            }

	    tcb_->rowwiseRowLen_ = 0;
	    retcode = tcb_->ehi_->nextRow();
	    if (retcode == HBASE_ACCESS_EOD || retcode == HBASE_ACCESS_EOR)
	    {
	       step_ = SCAN_CLOSE;
	       break;
	    }
	    if (tcb_->setupError(retcode, "ExpHbaseInterface::nextRow"))
	       step_ = HANDLE_ERROR;
	    else
	       step_ = NEXT_CELL;
	  }
	  break;

	case NEXT_CELL:
	  {
	    if (tcb_->colVal_.val == NULL)
	       tcb_->colVal_.val = new (tcb_->getHeap()) 
 	       char[tcb_->hbaseAccessTdb().convertRowLen()];
	    tcb_->colVal_.len = tcb_->hbaseAccessTdb().convertRowLen();        
	    retcode = tcb_->ehi_->nextCell(tcb_->rowId_, tcb_->colFamName_, 
					    tcb_->colName_, tcb_->colVal_,
					    tcb_->colTS_);
	    if (retcode == HBASE_ACCESS_EOD)
	    { 
	       step_ = CREATE_ROW;
	       break;
	    }
	    if (tcb_->setupError(retcode, "ExpHbaseInterface::nextCell"))
               step_ = HANDLE_ERROR;
	    else
	       step_ = APPEND_ROW;
	  }
	  break;

	case APPEND_ROW:
	  {
	    retcode = tcb_->copyCell();
            if (tcb_->setupError(retcode, "ExHbaseAccessTcb::copyCell", "Not enough space in target buffer to move data"))
              step_ = HANDLE_ERROR;
            else
              step_ = NEXT_CELL;
	  }
	  break;

	case CREATE_ROW:
	  {
	    rc = tcb_->createRowwiseRow();
	    if (rc == -1)
	      {
		step_ = HANDLE_ERROR;
		break;
	      }
	    
	    step_ = APPLY_PRED;
	  }
	  break;

	case APPLY_PRED:
	  {
	    rc = tcb_->applyPred(tcb_->scanExpr());

	    if (rc == 1)
	      {
		step_ = RETURN_ROW;
		break;
	      }
	    else if (rc == -1)
	      {
		step_ = HANDLE_ERROR;
		break;
	      }
	    else if (tcb_->isEOD_)
	      step_ = SCAN_CLOSE;
	    else
	      step_ = NEXT_ROW;
	  }
	  break;

	case RETURN_ROW:
	  {
	    rc = 0;
	    if (tcb_->moveRowToUpQueue(tcb_->convertRow_, 
                   tcb_->hbaseAccessTdb().convertRowLen(), &rc, FALSE))
	      return 1;

	    if (tcb_->getHbaseAccessStats())
	      tcb_->getHbaseAccessStats()->incUsedRows();

	    if ((pentry_down->downState.request == ex_queue::GET_N) &&
		(pentry_down->downState.requestValue == tcb_->matches_))
	      {
		step_ = SCAN_CLOSE;
		break;
	      }
             step_ = NEXT_ROW;
	  }
	  break;

	case SCAN_CLOSE:
	  {
	    retcode = tcb_->ehi_->scanClose();
	    if (tcb_->setupError(retcode, "ExpHbaseInterface::scanClose"))
	      step_ = HANDLE_ERROR;
	    else
	      step_ = DONE;
	  }
	  break;

	case HANDLE_ERROR:
	  {
	    step_ = NOT_STARTED;
	    return -1;
	  }
	  break;

	case DONE:
	  {
	    step_ = NOT_STARTED;
	    return 0;
	  }
	  break;

	}// switch

    } // while

  return 0; // WORK_OK
}

ExHbaseScanSQTaskTcb::ExHbaseScanSQTaskTcb(
				       ExHbaseAccessSelectTcb * tcb)
  :  ExHbaseTaskTcb(tcb)
  , step_(NOT_STARTED)
{
}

void ExHbaseScanSQTaskTcb::init() 
{
  step_ = NOT_STARTED;
}

ExWorkProcRetcode ExHbaseScanSQTaskTcb::work(short &rc)
{
  Lng32 retcode = 0;
  rc = 0;
  Lng32 remainingInBatch = batchSize_;
  NABoolean isFirstBatch = false;
  // isFirstInBatch is a stack variable for optimization reason. It is used for the mdam small scanner optimization heuristic that
  // is performed at runtime. Since this function is invoke intensively for all scan (mdam or regular scan), minimizing CPU/memory access
  // impact on runtime code to a strict minimum is attempted. Given that we are trying to detect if the actual scan is bellow the size
  // of an HBase block, having the runtime logic performing the detection only affect the first work invoke looks like the right idea.
  // and leveraging an existing counter (remainingInBatch) instead of creating a new one. The reasonable asumption to allow this is that
  // 1- batchSize_ being 8K, most likely times the row size, we are good in assuming that first hbase block will fit in batchSize
  // 2- parent buffer size will be large enough to deal with one HBAse_Block_size without having to rely on re-invoking work in the middle.
  // and anyway, if none of the reasonable assumption is true, then that's fine, the heuristic won't work, and we will use regular scanner,
  // meaning optimization is off for the scan part of MDAM (still on for the probe side of it).

  while (1)
    {
      ex_queue_entry *pentry_down = tcb_->qparent_.down->getHeadEntry();
      
      switch (step_)
	{
	case NOT_STARTED:
	  {
	    step_ = SCAN_OPEN;
	  }
	  break;

	case SCAN_OPEN:
	  {
	    tcb_->table_.val = tcb_->hbaseAccessTdb().getTableName();
	    tcb_->table_.len = strlen(tcb_->hbaseAccessTdb().getTableName());

	    if (tcb_->setupHbaseFilterPreds())
	      {
		step_ = HANDLE_ERROR;
		break;
	      }
	    isFirstBatch = true;
	    retcode = tcb_->ehi_->scanOpen(tcb_->table_, 
					   tcb_->beginRowId_, tcb_->endRowId_,
					   tcb_->columns_, -1,
					   tcb_->hbaseAccessTdb().readUncommittedScan(),
					   tcb_->hbaseAccessTdb().getHbasePerfAttributes()->cacheBlocks(),
					   tcb_->hbaseAccessTdb().getHbasePerfAttributes()->useSmallScanner(),
					   tcb_->hbaseAccessTdb().getHbasePerfAttributes()->numCacheRows(),
                                           TRUE,
					   (tcb_->hbaseFilterColumns_.entries() > 0 ?
					    &tcb_->hbaseFilterColumns_ : NULL),
					   (tcb_->hbaseFilterOps_.entries() > 0 ?
					    &tcb_->hbaseFilterOps_ : NULL),
					   (tcb_->hbaseFilterValues_.entries() > 0 ?
					    &tcb_->hbaseFilterValues_ : NULL),
                       tcb_->hbaseAccessTdb().getHbasePerfAttributes()->dopParallelScanner(),
                                           tcb_->getSamplePercentage(),
                                           tcb_->hbaseAccessTdb().getHbaseSnapshotScanAttributes()->getUseSnapshotScan(),
                                           tcb_->hbaseAccessTdb().getHbaseSnapshotScanAttributes()->getSnapshotScanTimeout(),
                                           tcb_->hbaseAccessTdb().getHbaseSnapshotScanAttributes()->getSnapshotName(),
                                           tcb_->hbaseAccessTdb().getHbaseSnapshotScanAttributes()->getSnapScanTmpLocation(),
                                           tcb_->getGlobals()->castToExExeStmtGlobals()->getMyInstanceNumber(),
                                           (tcb_->hbaseAccessTdb().multiVersions()
                                            ? tcb_->hbaseAccessTdb().getHbaseAccessOptions()->getNumVersions() : 0)
                                           );

	    if (tcb_->setupError(retcode, "ExpHbaseInterface::scanOpen"))
	      step_ = HANDLE_ERROR;
	    else
	      step_ = NEXT_ROW;
	  }
	  break;

	case NEXT_ROW:
	  {
            if (--remainingInBatch <= 0)
            {
              rc = WORK_CALL_AGAIN;
              return 1;
            }
	    retcode = tcb_->ehi_->nextRow();
	    if (retcode == HBASE_ACCESS_EOD || retcode == HBASE_ACCESS_EOR)
	      {
		step_ = SCAN_CLOSE;
		break;
	      }
	    if (tcb_->setupError(retcode, "ExpHbaseInterface::nextRow"))
               step_ = HANDLE_ERROR;
            else if (tcb_->hbaseAccessTdb().multiVersions())
              step_ = SETUP_MULTI_VERSION_ROW;
            else
	       step_ = CREATE_ROW;
	  }
	  break;

        case SETUP_MULTI_VERSION_ROW:
          {
	    retcode = tcb_->setupSQMultiVersionRow();
	    if (retcode == HBASE_ACCESS_NO_ROW)
              {
	        step_ = NEXT_ROW;
	        break;
              }

	    if (retcode < 0)
              {
	        rc = (short)retcode;
	        tcb_->setupError(rc, "setupSQMultiVersionRow");
	        step_ = HANDLE_ERROR;
	        break;
              }

            step_ = CREATE_ROW;
          }
          break;

	case CREATE_ROW:
	  {
	    retcode = tcb_->createSQRowDirect();
	    if (retcode == HBASE_ACCESS_NO_ROW)
	    {
	        step_ = NEXT_ROW;
	        break;
	    }
	    if (retcode < 0)
	    {
	        rc = (short)retcode;
	        tcb_->setupError(rc, "createSQRowDirect");
	        step_ = HANDLE_ERROR;
	        break;
	    }
	    if (retcode != HBASE_ACCESS_SUCCESS)
	    {
	        step_ = HANDLE_ERROR;
	        break;
	    }

	    step_ = APPLY_PRED;
	  }
	  break;

	  case APPLY_PRED:
	  {
	    rc = tcb_->applyPred(tcb_->scanExpr());
	    if (rc == 1)
	      step_ = RETURN_ROW;
	    else if (rc == -1)
	      step_ = HANDLE_ERROR;
	    else if (tcb_->hbaseAccessTdb().multiVersions())
              step_ = CREATE_ROW;
            else
              step_ = NEXT_ROW;
	  }
	  break;

	case RETURN_ROW:
	  {
	    if (tcb_->moveRowToUpQueue(tcb_->convertRow_, tcb_->convertRowLen_, 
				       &rc, FALSE))
	      return 1;
	    
	    if (tcb_->getHbaseAccessStats())
	      tcb_->getHbaseAccessStats()->incUsedRows();
	    
	    if ((pentry_down->downState.request == ex_queue::GET_N) &&
		(pentry_down->downState.requestValue == tcb_->matches_))
	      {
		step_ = SCAN_CLOSE;
		break;
	      }

            if (tcb_->hbaseAccessTdb().multiVersions())
              step_ = CREATE_ROW;
            else
              step_ = NEXT_ROW;
	  }
	  break;

	case SCAN_CLOSE:
	  {
	      if (isFirstBatch) //only if closed happen in a single batch, batchSize - remainingInBatch = nb rows retrieved
	          tcb_->hbaseAccessTdb().getHbasePerfAttributes()->setUseSmallScannerForMDAMifNeeded(batchSize_ - remainingInBatch); //calculate MDAM small scanner flag for next scan if it was MDAM
	    retcode = tcb_->ehi_->scanClose();
	    if (tcb_->setupError(retcode, "ExpHbaseInterface::scanClose"))
	      step_ = HANDLE_ERROR;
	    else
	      step_ = DONE;
	  }
	  break;

	case HANDLE_ERROR:
	  {
	    step_ = NOT_STARTED;
	    return -1;
	  }
	  break;

	case DONE:
	  {
	    step_ = NOT_STARTED;
	    return 0;
	  }
	  break;

	}// switch

    } // while
}

Lng32 ExHbaseScanSQTaskTcb::getProbeResult(char* &keyData)
{
  Lng32 retcode = 0;
  Lng32 rc = 0;
  Lng32 probeSize = 100; // using fewer rows results in intermittent wrong 
  //results. Using the hbase default scan size of 100 as a workarorund.
  if (tcb_->hbaseAccessTdb().getHbasePerfAttributes()->useMinMdamProbeSize())
    probeSize = 1; // if performance is vital, comp_bool_184 can be set to ON 
  // to choose this path.

  retcode = tcb_->ehi_->scanOpen(tcb_->table_, 
				 tcb_->beginRowId_, tcb_->endRowId_,
				 tcb_->columns_, -1,
				 tcb_->hbaseAccessTdb().readUncommittedScan(),
				 tcb_->hbaseAccessTdb().getHbasePerfAttributes()->cacheBlocks() ||
				 tcb_->hbaseAccessTdb().getHbasePerfAttributes()->useSmallScannerForProbes(), // when small scanner feature is ON or SYSTEM force cache ON
				 tcb_->hbaseAccessTdb().getHbasePerfAttributes()->useSmallScannerForProbes(),
				 probeSize,
				 TRUE, NULL, NULL, NULL);
  if (tcb_->setupError(retcode, "ExpHbaseInterface::scanOpen"))
    {
      rc = -1;
      goto label_return;
    }
  
  retcode = tcb_->ehi_->nextRow();
  if (retcode == HBASE_ACCESS_EOD || retcode == HBASE_ACCESS_EOR)
    {
      rc = 1; // no row found
      goto label_return;
    }
  if (tcb_->setupError(retcode, "ExpHbaseInterface::nextRow"))
    {
      rc = -1;
      goto label_return;
    }
  retcode = tcb_->createSQRowDirect();
  if (retcode == HBASE_ACCESS_NO_ROW)
  {
     rc = 1;
     goto label_return;
  }
  if (retcode < 0)
  {
      rc = retcode;
      tcb_->setupError(rc, "createSQRowDirect");
      rc = -1;
      goto label_return;
  }
  if (retcode != HBASE_ACCESS_SUCCESS)
  {
     rc = -1;
     goto label_return;
  }

  // extract the key from the fetched row, encode it and pass it back to mdam
  if (tcb_->evalEncodedKeyExpr() == -1)
    {
      rc = -1;
      goto label_return;
    }
  
 label_return:
  retcode = tcb_->ehi_->scanClose();
  if (tcb_->setupError(retcode, "ExpHbaseInterface::scanClose"))
    {
      rc = -1;
    }

  keyData = tcb_->encodedKeyRow_;

  return rc;
}

ExHbaseGetTaskTcb::ExHbaseGetTaskTcb(
				     ExHbaseAccessSelectTcb * tcb)
  :  ExHbaseTaskTcb(tcb)
  , step_(NOT_STARTED)
{
}

void ExHbaseGetTaskTcb::init() 
{
  step_ = NOT_STARTED;
}

ExWorkProcRetcode ExHbaseGetTaskTcb::work(short &rc)
{
  Lng32 retcode = 0;
  rc = 0;
  Lng32 remainingInBatch = batchSize_;

  while (1)
    {
      ex_queue_entry *pentry_down = tcb_->qparent_.down->getHeadEntry();
      
      switch (step_)
	{
	case NOT_STARTED:
	  {
	    step_ = GET_OPEN;
	  }
	  break;

	case GET_OPEN:
	  {
	    tcb_->table_.val = tcb_->hbaseAccessTdb().getTableName();
	    tcb_->table_.len = strlen(tcb_->hbaseAccessTdb().getTableName());

	    if (tcb_->evalRowIdExpr(TRUE) == -1)
	      {
		step_ = HANDLE_ERROR;
		break;
	      }

	    if (tcb_->rowIds_.entries() == 1)
	      {
		retcode = tcb_->ehi_->getRowOpen(tcb_->table_, tcb_->rowIds_[0],
					     tcb_->columns_, -1);
		if (tcb_->setupError(retcode, "ExpHbaseInterface::getRowOpen"))
		  step_ = HANDLE_ERROR;
		else
		  step_ = NEXT_ROW;
	      }
	    else
	      {
		retcode = tcb_->ehi_->getRowsOpen(tcb_->table_, &tcb_->rowIds_,
					     tcb_->columns_, -1);
		if (tcb_->setupError(retcode, "ExpHbaseInterface::getRowsOpen"))
		  step_ = HANDLE_ERROR;
		else
		  step_ = NEXT_ROW;
	      }
	      
	  }
	  break;

	case NEXT_ROW:
	  {       
            if (--remainingInBatch <= 0)
            {
              rc = WORK_CALL_AGAIN;
              return 1;
            }
	    retcode = tcb_->ehi_->nextRow();
	    if (retcode == HBASE_ACCESS_EOD || retcode == HBASE_ACCESS_EOR)
	    {
 	       step_ = GET_CLOSE;
	       break;
	    }
	    if (tcb_->setupError(retcode, "ExpHbaseInterface::nextRow"))
	       step_ = HANDLE_ERROR;
	    else
	       step_ = NEXT_CELL;
	  }
	  break;

	case NEXT_CELL:
	  {
	    if (tcb_->colVal_.val == NULL)
	        tcb_->colVal_.val = new (tcb_->getHeap()) 
	        char[tcb_->hbaseAccessTdb().convertRowLen()];
	    tcb_->hbaseAccessTdb().convertRowLen();
	    retcode = tcb_->ehi_->nextCell( tcb_->rowId_, tcb_->colFamName_, 
	       tcb_->colName_, tcb_->colVal_, tcb_->colTS_);
	    if (retcode == HBASE_ACCESS_EOD)
	    {
		step_ = NEXT_ROW;
		break;
	    }
	    if (tcb_->setupError(retcode, "ExpHbaseInterface::nextCell"))
	      step_ = HANDLE_ERROR;
	    else
	      step_ = CREATE_ROW;
	  }
	  break;

	case CREATE_ROW:
	  {
	    rc = tcb_->createColumnwiseRow();
	    if (rc == -1)
	      {
		step_ = HANDLE_ERROR;
		break;
	      }
            else if (tcb_->setupError(rc, "ExHbaseAccessTcb::createColumnwiseRow", "Not enough space in target buffer to move data"))
              {
                step_ = HANDLE_ERROR;
                break;
              }
	    
	    step_ = APPLY_PRED;
	  }
	  break;

	case APPLY_PRED:
	  {
	    rc = tcb_->applyPred(tcb_->scanExpr());

	    if (rc == 1)
	      step_ = RETURN_ROW;
	    else if (rc == -1)
	      step_ = HANDLE_ERROR;
	    else
	      step_ = NEXT_CELL;
	  }
	  break;

	case RETURN_ROW:
	  {
	    rc = 0;
	    if (tcb_->moveRowToUpQueue(tcb_->convertRow_, tcb_->hbaseAccessTdb().convertRowLen(), 
				       &rc, FALSE))
	      return 1;

	    if (tcb_->getHbaseAccessStats())
	      tcb_->getHbaseAccessStats()->incUsedRows();

	    if ((pentry_down->downState.request == ex_queue::GET_N) &&
		(pentry_down->downState.requestValue == tcb_->matches_))
	      {
		step_ = GET_CLOSE;
		break;
	      }

	    step_ = NEXT_CELL;
	  }
	  break;

	case GET_CLOSE:
	  {
	    retcode = tcb_->ehi_->getClose();
	    if (tcb_->setupError(retcode, "ExpHbaseInterface::getClose"))
	      step_ = HANDLE_ERROR;
	    else
	      step_ = DONE;
	  }
	  break;

	case HANDLE_ERROR:
	  {
	    step_ = NOT_STARTED;
	    return -1;
	  }
	  break;
	    
	case DONE:
	  {
	    step_ = NOT_STARTED;
	    return 0;
	  }
	  break;

	}// switch

    } // while
}

ExHbaseGetRowwiseTaskTcb::ExHbaseGetRowwiseTaskTcb(
				       ExHbaseAccessSelectTcb * tcb)
  :  ExHbaseTaskTcb(tcb)
  , step_(NOT_STARTED)
{
}

void ExHbaseGetRowwiseTaskTcb::init() 
{
  step_ = NOT_STARTED;
}

ExWorkProcRetcode ExHbaseGetRowwiseTaskTcb::work(short &rc)
{
  Lng32 retcode = 0;
  rc = 0;
  Lng32 remainingInBatch = batchSize_;

  while (1)
    {
      ex_queue_entry *pentry_down = tcb_->qparent_.down->getHeadEntry();
      
      switch (step_)
	{
	case NOT_STARTED:
	  {
	    step_ = GET_OPEN;
	  }
	  break;

	case GET_OPEN:
	  {
	    tcb_->table_.val = tcb_->hbaseAccessTdb().getTableName();
	    tcb_->table_.len = strlen(tcb_->hbaseAccessTdb().getTableName());

	    if (tcb_->evalRowIdExpr(TRUE) == -1)
	      {
		step_ = HANDLE_ERROR;
		break;
	      }

	    if (tcb_->rowIds_.entries() == 1)
	      {
		retcode = tcb_->ehi_->getRowOpen(tcb_->table_, tcb_->rowIds_[0],
					     tcb_->columns_, -1);
		if (tcb_->setupError(retcode, "ExpHbaseInterface::getRowOpen"))
		  step_ = HANDLE_ERROR;
		else
		  step_ = NEXT_ROW;
	      }
	    else
	      {
		retcode = tcb_->ehi_->getRowsOpen(tcb_->table_, &tcb_->rowIds_,
					     tcb_->columns_, -1);
		if (tcb_->setupError(retcode, "ExpHbaseInterface::getRowsOpen"))
		  step_ = HANDLE_ERROR;
		else
		  step_ = NEXT_ROW;
	      }

	  }
	  break;

	case NEXT_ROW:
	  {
            if (--remainingInBatch <= 0)
            {
              rc = WORK_CALL_AGAIN;
              return 1;
            }

	    retcode = tcb_->ehi_->nextRow();
	    if (retcode == HBASE_ACCESS_EOD || retcode == HBASE_ACCESS_EOR)
	    {
	       step_ = GET_CLOSE;
	       break;
	    }
	    if (tcb_->setupError(retcode, "ExpHbaseInterface::nextRow"))
	       step_ = HANDLE_ERROR;
	    else
	       step_ = NEXT_CELL;
	  }
	  break;

	case NEXT_CELL:
	  {
	     if (tcb_->colVal_.val == NULL)
	        tcb_->colVal_.val = new (tcb_->getHeap()) 
		   char[tcb_->hbaseAccessTdb().convertRowLen()];
	    tcb_->colVal_.len = tcb_->hbaseAccessTdb().convertRowLen();
	    retcode = tcb_->ehi_->nextCell(tcb_->rowId_, tcb_->colFamName_, 
	        tcb_->colName_, tcb_->colVal_, tcb_->colTS_);
	    if (retcode == HBASE_ACCESS_EOD)
	       step_ = CREATE_ROW;
	    else
	    if (tcb_->setupError(retcode, "ExpHbaseInterface::nextCell"))
	       step_ = HANDLE_ERROR;
	    else
	      step_ = APPEND_ROW;
	  }
	  break;

	case APPEND_ROW:
	  {
	    retcode = tcb_->copyCell();
            if (tcb_->setupError(retcode, "ExHbaseAccessTcb::copyCell", "Not enough space in target buffer to move data"))
              step_ = HANDLE_ERROR;
            else
              step_ = NEXT_CELL;
	  }
	  break;

	case CREATE_ROW:
	  {
	    rc = tcb_->createRowwiseRow();
	    if (rc == -1)
	      {
		step_ = HANDLE_ERROR;
		break;
	      }

	    step_ = APPLY_PRED;
	  }
	  break;

	case APPLY_PRED:
	  {
	    rc = tcb_->applyPred(tcb_->scanExpr());

	    if (rc == 1)
	      step_ = RETURN_ROW;
	    else if (rc == -1)
	      step_ = HANDLE_ERROR;
	    else
	      step_ = GET_CLOSE;
	  }
	  break;

	case RETURN_ROW:
	  {
	    rc = 0;
	    if (tcb_->moveRowToUpQueue(tcb_->convertRow_, tcb_->hbaseAccessTdb().convertRowLen(), 
				 &rc, FALSE))
	      return 1;
	    
	    if (tcb_->getHbaseAccessStats())
	      tcb_->getHbaseAccessStats()->incUsedRows();

	    step_ = GET_CLOSE;
	  }
	  break;

	case GET_CLOSE:
	  {
	    retcode = tcb_->ehi_->getClose();
	    if (tcb_->setupError(retcode, "ExpHbaseInterface::getClose"))
	      step_ = HANDLE_ERROR;
	    else
	      step_ = DONE;
	  }
	  break;

	case HANDLE_ERROR:
	  {
	    step_ = NOT_STARTED;
	    return -1;
	  }
	  break;
	    
	case DONE:
	  {
	    step_ = NOT_STARTED;
	    return 0;
	  }
	  break;

	}// switch

    } // while
}

ExHbaseGetSQTaskTcb::ExHbaseGetSQTaskTcb( ExHbaseAccessTcb * tcb, NABoolean rowsetTcb)
  :  ExHbaseTaskTcb(tcb, rowsetTcb)
  , step_(NOT_STARTED)
{
}

void ExHbaseGetSQTaskTcb::init() 
{
  step_ = NOT_STARTED;
}

ExWorkProcRetcode ExHbaseGetSQTaskTcb::work(short &rc)
{
  Lng32 retcode = 0;
  rc = 0;

  while (1)
    {
      
      ex_queue_entry *pentry_down = NULL;
      if (! tcb_->qparent_.down->isEmpty())
          pentry_down = tcb_->qparent_.down->getHeadEntry();
      
      switch (step_)
	{
	case NOT_STARTED:
	  {
	    step_ = GET_OPEN;
	  }
	  break;

	case GET_OPEN:
	  {
	    tcb_->table_.val = tcb_->hbaseAccessTdb().getTableName();
	    tcb_->table_.len = strlen(tcb_->hbaseAccessTdb().getTableName());
            remainingInBatch_ = tcb_->rowIds_.entries();
	    if (tcb_->rowIds_.entries() == 1)
	      {
		retcode = tcb_->ehi_->getRowOpen(tcb_->table_, tcb_->rowIds_[0],
					     tcb_->columns_, -1);
		if (tcb_->setupError(retcode, "ExpHbaseInterface::getRowOpen"))
		  step_ = HANDLE_ERROR;
		else
		  step_ = NEXT_ROW;
	      }
	    else
	      {
		retcode = tcb_->ehi_->getRowsOpen(tcb_->table_, &tcb_->rowIds_,
					     tcb_->columns_, -1);
		if (tcb_->setupError(retcode, "ExpHbaseInterface::getRowsOpen"))
		  step_ = HANDLE_ERROR;
		else
		  step_ = NEXT_ROW;
	      }
	  }
	  break;

	case NEXT_ROW:
	  {
            if (remainingInBatch_ <= 0) {
               step_ = GET_CLOSE;
               break;
            }
	    retcode = tcb_->ehi_->nextRow();
            remainingInBatch_--;
            // EOD is end of data, EOR is end of result set. 
            // for single get, EOD or EOR indicates DONE
            // for multi get, only EOR indicates DONE
	    if ( (retcode == HBASE_ACCESS_EOR) ||
                 ( (retcode == HBASE_ACCESS_EOD) &&
                   (tcb_->rowIds_.entries() == 1) ) )
	      {
                if (rowsetTcb_)
                   step_ = DONE;
                else
		   step_ = GET_CLOSE;
		break;
	      }
            
            // for multi get, do FETCH if retcode is EOD
            if ( (retcode == HBASE_ACCESS_EOD) && 
                 (tcb_->rowIds_.entries() > 1) )
            {
                if (rowsetTcb_)
                   step_ = DONE;
                else
                   step_ = NEXT_ROW;
                break;
            }
	    if (tcb_->setupError(retcode, "ExpHbaseInterface::nextRow"))
	      step_ = HANDLE_ERROR;
	    else
	      step_ = CREATE_ROW;
	  }
	  break;

	case CREATE_ROW:
	  {
	    retcode = tcb_->createSQRowDirect();
	    if (retcode == HBASE_ACCESS_NO_ROW)
	    {
               if (rowsetTcb_)
                  step_ = DONE;
               else
	          step_ = NEXT_ROW;
	       break;
	    }
	    if (retcode < 0)
	    {
	        rc = (short)retcode;
	        tcb_->setupError(rc, "createSQRowDirect");
	        step_ = HANDLE_ERROR;
	        break;
	    }  
	    if (retcode != HBASE_ACCESS_SUCCESS)
	    {
	        step_ = HANDLE_ERROR;
	        break;
	    }
	    step_ = APPLY_PRED;
	  }
	  break;

	case APPLY_PRED:
	  {
	    rc = tcb_->applyPred(tcb_->scanExpr());

	    if (rc == 1)
	      step_ = RETURN_ROW;
	    else if (rc == -1)
	      step_ = HANDLE_ERROR;
	    else
            {
              if (rowsetTcb_)
                 step_ = DONE;
              else
	         step_ = NEXT_ROW;
            }
	  }
	  break;
	case RETURN_ROW:
	  {
	    rc = 0;
	    if (tcb_->moveRowToUpQueue(tcb_->convertRow_, tcb_->hbaseAccessTdb().convertRowLen(), 
				       &rc, FALSE))
	      return 1;
	    if (tcb_->getHbaseAccessStats())
	      tcb_->getHbaseAccessStats()->incUsedRows();
            if (rowsetTcb_)
                step_ = DONE;
            else
            {
	      if ((pentry_down->downState.request == ex_queue::GET_N) &&
		(pentry_down->downState.requestValue == tcb_->matches_))
	      {
		step_ = GET_CLOSE;
		break;
	      }
	      step_ = NEXT_ROW;
            }
	  }
	  break;

	case GET_CLOSE:
	  {
	    retcode = tcb_->ehi_->getClose();
	    if (tcb_->setupError(retcode, "ExpHbaseInterface::getClose"))
	      step_ = HANDLE_ERROR;
	    else
	      step_ = ALL_DONE;
	  }
	  break;

	case HANDLE_ERROR:
	  {
	    step_ = NOT_STARTED;
	    return -1;
	  }
	  break;
	    
	case DONE:
           if (tcb_->handleDone(rc, 0))
               return 1;
           else
               step_ = NEXT_ROW;
           break;
        case ALL_DONE:
	   step_ = NOT_STARTED;
	   return 0;
	   break;
	}// switch

    } // while
    return 0;
}

ExHbaseAccessSelectTcb::ExHbaseAccessSelectTcb(
          const ExHbaseAccessTdb &hbaseAccessTdb, 
          ex_globals * glob ) :
  ExHbaseAccessTcb(hbaseAccessTdb, glob),
  step_(NOT_STARTED)
{
  scanRowwiseTaskTcb_ = NULL;
  scanTaskTcb_ = NULL;
  getRowwiseTaskTcb_ = NULL;
  getTaskTcb_ = NULL;
  scanSQTaskTcb_ = NULL;
  getSQTaskTcb_ = NULL;

  ExHbaseAccessTdb &hbaseTdb = (ExHbaseAccessTdb&)hbaseAccessTdb;
  samplePercentage_ = hbaseTdb.samplingRate_;

  if ((hbaseTdb.listOfScanRows()) ||
      ((hbaseTdb.keySubsetGen()) &&
       (NOT hbaseTdb.uniqueKeyInfo())) ||
      (hbaseTdb.keyMDAMGen()))
    {
      if (hbaseTdb.sqHbaseTable())
	scanSQTaskTcb_ = 
	  new(getGlobals()->getDefaultHeap()) ExHbaseScanSQTaskTcb(this);
      else if (hbaseTdb.rowwiseFormat())
	scanRowwiseTaskTcb_ = 
	  new(getGlobals()->getDefaultHeap()) ExHbaseScanRowwiseTaskTcb(this);
      else
	scanTaskTcb_ = 
	  new(getGlobals()->getDefaultHeap()) ExHbaseScanTaskTcb(this);
    }

  if ((hbaseTdb.listOfGetRows()) ||
      ((hbaseTdb.keySubsetGen()) &&
       (hbaseTdb.uniqueKeyInfo())) ||
      (hbaseTdb.keyMDAMGen()))
    {
      if (hbaseTdb.sqHbaseTable())
	getSQTaskTcb_ = 
	  new(getGlobals()->getDefaultHeap()) ExHbaseGetSQTaskTcb(this, FALSE);
      else if (hbaseTdb.rowwiseFormat())
	getRowwiseTaskTcb_ = 
	  new(getGlobals()->getDefaultHeap()) ExHbaseGetRowwiseTaskTcb(this);
      else
	getTaskTcb_ = 
	  new(getGlobals()->getDefaultHeap()) ExHbaseGetTaskTcb(this);
     }

  if (hbaseTdb.sqHbaseTable())
    {
      scanTask_ = scanSQTaskTcb_;
      getTask_   = getSQTaskTcb_;
    }
  else if (hbaseTdb.rowwiseFormat())
    {
      scanTask_ = scanRowwiseTaskTcb_;
      getTask_   = getRowwiseTaskTcb_;
    }
  else
    {
      scanTask_ = scanTaskTcb_;
      getTask_   = getTaskTcb_;
    }
}

ExWorkProcRetcode ExHbaseAccessSelectTcb::work()
{
  Lng32 retcode = 0;
  short rc = 0;

  while (!qparent_.down->isEmpty())
    {
      ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
      if ((pentry_down->downState.request == ex_queue::GET_NOMORE) &&
	  (step_ != DONE))
	{
	  step_ = SELECT_CLOSE_NO_ERROR; //DONE;
	}

      switch (step_)
	{
	case NOT_STARTED:
	  {
	    matches_ = 0;

	    if ((pentry_down->downState.request == ex_queue::GET_N) &&
		(pentry_down->downState.requestValue == matches_))
	      {
		step_ = DONE;
		break;
	      }

	    step_ = SELECT_INIT;
	  }
	  break;

	case SELECT_INIT:
	  {
	    retcode = ehi_->init(getHbaseAccessStats());
	    if (setupError(retcode, "ExpHbaseInterface::init"))
	      {
		step_ = HANDLE_ERROR;
		break;
	      }

	    if (hbaseAccessTdb().listOfScanRows())
	      hbaseAccessTdb().listOfScanRows()->position();

	    if (hbaseAccessTdb().listOfGetRows())
	      hbaseAccessTdb().listOfGetRows()->position();

	    if (scanTask_)
	      scanTask_->init();

	    if (getTask_)
	      getTask_->init();
	    
	    step_ = SETUP_SCAN;
	  }
	  break;

	case SETUP_SCAN:
	  {
	    if ((! scanTask_) || (! hbaseAccessTdb().listOfScanRows()))
	      {
		step_ = SETUP_GET;
		break;
	      }

	    hsr_ = 
	      (ComTdbHbaseAccess::HbaseScanRows*)hbaseAccessTdb().listOfScanRows()
	      ->getCurr();

	    retcode = setupSubsetRowIdsAndCols(hsr_);
	    if (retcode == -1)
	      {
		step_ = HANDLE_ERROR;
		break;
	      }

	    step_ = PROCESS_SCAN;
	  }
	  break;

	case PROCESS_SCAN:
	case PROCESS_SCAN_KEY:
	  {
	    rc = 0;
	    retcode = scanTask_->work(rc);

	    if (retcode == 1)
	      return rc;
	    else if (retcode < 0)
	      step_ = HANDLE_ERROR;
	    else if (step_ == PROCESS_SCAN_KEY)
	      step_ = SETUP_GET_KEY;
	    else
	      step_ = NEXT_SCAN;
	  }
	  break;

	case NEXT_SCAN:
	  {
	    hbaseAccessTdb().listOfScanRows()->advance();

	    if (! hbaseAccessTdb().listOfScanRows()->atEnd())
	      {
		step_ = SETUP_SCAN;
		break;
	      }

	    step_ = SETUP_GET;
	  }
	  break;

	case SETUP_GET:
	  {
	    if ((! getTask_) || (!hbaseAccessTdb().listOfGetRows()))
	      {
		step_ = SETUP_SCAN_KEY;
		break;
	      }

	    hgr_ = 
	      (ComTdbHbaseAccess::HbaseGetRows*)hbaseAccessTdb().listOfGetRows()
	      ->getCurr();

	    retcode = setupUniqueRowIdsAndCols(hgr_);
	    if (retcode == -1)
	      {
		step_ = HANDLE_ERROR;
		break;
	      }

	    step_ = PROCESS_GET;
	  }
	  break;

	case PROCESS_GET:
	case PROCESS_GET_KEY:
	  {
	    rc = 0;
	    retcode = getTask_->work(rc);
	    
	    if (retcode == 1)
	      return rc;
	    else if (retcode < 0)
	      step_ = HANDLE_ERROR;
	    else if (step_ == PROCESS_GET_KEY)
	      step_ = SELECT_CLOSE;
	    else
	      step_ = NEXT_GET;
	  }
	  break;

	case NEXT_GET:
	  {
	    hbaseAccessTdb().listOfGetRows()->advance();

	    if (! hbaseAccessTdb().listOfGetRows()->atEnd())
	      {
		step_ = SETUP_GET;
		break;
	      }

	    step_ = SETUP_SCAN_KEY;
	  }
	  break;

	case SETUP_SCAN_KEY:
	  {
	    if (! hbaseAccessTdb().keySubsetGen())
	      {
		step_ = SELECT_CLOSE;
		break;
	      }

	    if (hbaseAccessTdb().uniqueKeyInfo())
	      {
		step_ = SETUP_GET_KEY;
		break;
	      }
	    
	    retcode = setupSubsetKeysAndCols();
	    if (retcode == -1)
	      {
		step_ = HANDLE_ERROR;
		break;
	      }

	    step_ = PROCESS_SCAN_KEY;
	  }
	  break;

	case SETUP_GET_KEY:
	  {
	    if ((! getTask_) ||
		((hbaseAccessTdb().keySubsetGen()) &&
		 (NOT hbaseAccessTdb().uniqueKeyInfo())))
	      {
		step_ = SELECT_CLOSE;
		break;
	      }

	    retcode = setupUniqueKeyAndCols(TRUE);
	    if (retcode == -1)
	      {
		step_ = HANDLE_ERROR;
		break;
	      }

	    step_ = PROCESS_GET_KEY;
	  }
	  break;

	case SELECT_CLOSE:
	case SELECT_CLOSE_NO_ERROR:
	  {
	    retcode = ehi_->close();

	    if (step_ == SELECT_CLOSE)
	      {
		if (setupError(retcode, "ExpHbaseInterface::close"))
		  {
		    step_ = HANDLE_ERROR_NO_CLOSE;
		    break;
		  }
	      }

	    step_ = DONE;
	  }
	  break;

	case HANDLE_ERROR:
	case HANDLE_ERROR_NO_CLOSE:
	  {
	    if (handleError(rc))
	      return rc;

	    if (step_ == HANDLE_ERROR)
	      retcode = ehi_->close();

	    step_ = DONE;
	  }
	  break;

	case DONE:
	  {
	    if (handleDone(rc))
	      return rc;

	    if (scanTask_)
	      scanTask_->init();

	    if (getTask_)
	      getTask_->init();
	    
	    step_ = NOT_STARTED;
	  }
	  break;

	} // switch
    } // while
        
  return WORK_OK;
}

ExHbaseAccessMdamSelectTcb::ExHbaseAccessMdamSelectTcb(
          const ExHbaseAccessTdb &hbaseAccessTdb, 
          ex_globals * glob ) :
  ExHbaseAccessSelectTcb(hbaseAccessTdb, glob),
  step_(NOT_STARTED)
{
}

ExWorkProcRetcode ExHbaseAccessMdamSelectTcb::work()
{
  Lng32 retcode = 0;
  short rc = 0;

  while (!qparent_.down->isEmpty())
    {
      ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
      if ((pentry_down->downState.request == ex_queue::GET_NOMORE) &&
	  (step_ != DONE))
	{
	  step_ = SELECT_CLOSE_NO_ERROR;
	}

      switch (step_)
	{
	case NOT_STARTED:
	  {
	    matches_ = 0;
	    matchesBeforeFetch_ = 0;

	    if ((pentry_down->downState.request == ex_queue::GET_N) &&
		(pentry_down->downState.requestValue == matches_))
	      {
		step_ = DONE;
		break;
	      }
	    
	    step_ = SELECT_INIT;
	  }
	  break;

	case SELECT_INIT:
	  {
	    retcode = ehi_->init(getHbaseAccessStats());
	    if (setupError(retcode, "ExpHbaseInterface::init"))
	      {
		step_ = HANDLE_ERROR;
		break;
	      }

	    table_.val = hbaseAccessTdb().getTableName();
	    table_.len = strlen(hbaseAccessTdb().getTableName());

	    step_ = INIT_NEXT_KEY_RANGE;
	  }
	  break;

	case INIT_NEXT_KEY_RANGE:
	  {
	    retcode = initNextKeyRange(pool_, pentry_down->getAtp());
	    if (retcode == -1)
	      {
		step_ = HANDLE_ERROR;
		break;
	      }
	    
	    setupListOfColNames(hbaseAccessTdb().listOfFetchedColNames(),
				columns_);

	    fetchRangeHadRows_ = TRUE;

	    step_ = GET_NEXT_KEY_RANGE;
	  }
	  break;

	case GET_NEXT_KEY_RANGE:
	  {
	    keyRangeEx::getNextKeyRangeReturnType 
	      keyRangeStatus = setupSubsetKeys(fetchRangeHadRows_);

	    fetchRangeHadRows_ = FALSE;

	    if (keyRangeStatus == keyRangeEx::EXPRESSION_ERROR)
	      {
		step_ = HANDLE_ERROR;
		break;
	      }

	    if (keyRangeStatus == keyRangeEx::NO_MORE_RANGES)
	      {
		step_ = SELECT_CLOSE;
		break;
	      }

	    if (keyRangeStatus == keyRangeEx::PROBE_RANGE)
	      step_ = PROCESS_PROBE_RANGE;
	    else if (keyRangeStatus == keyRangeEx::FETCH_RANGE)
	      {
		matchesBeforeFetch_ = matches_;
		step_ = PROCESS_FETCH_RANGE;
	      }
	    else
	      step_ = HANDLE_ERROR;
	  }
	  break;

	case PROCESS_PROBE_RANGE:
	  {
	    char * keyData = NULL;
	    retcode = scanSQTaskTcb_->getProbeResult(keyData);
	    if (retcode == -1)
	      {
		step_ = HANDLE_ERROR;
		break;
	      }

	    if (retcode == 1) // no rows found
	      {
		keyExeExpr()->reportProbeResult(0);
		step_ = GET_NEXT_KEY_RANGE;
		break;
	      }

	    // pass the key value to the mdam generator
	    keyExeExpr()->reportProbeResult(keyData);
	    step_ = GET_NEXT_KEY_RANGE;
	  }
	  break;

	case PROCESS_FETCH_RANGE:
	  {
	    rc = 0;
	    retcode = scanSQTaskTcb_->work(rc);
	    
	    if (retcode == 1)
	      return rc;
	    else if (retcode < 0)
	      step_ = HANDLE_ERROR;
	    else
	      {
		if ((pentry_down->downState.request == ex_queue::GET_N) &&
		    (pentry_down->downState.requestValue == matches_))
		  {
		    step_ = SELECT_CLOSE;
		    break;
		  }

		if (matches_ > matchesBeforeFetch_)
		  fetchRangeHadRows_ = TRUE;

		step_ = GET_NEXT_KEY_RANGE;
	      }
	  }
	  break;

	case SELECT_CLOSE:
	case SELECT_CLOSE_NO_ERROR:
	  {
	    retcode = ehi_->close();
	    if (step_ == SELECT_CLOSE)
	      {
		if (setupError(retcode, "ExpHbaseInterface::close"))
		  {
		    step_ = HANDLE_ERROR_NO_CLOSE;
		    break;
		  }
	      }

	    step_ = DONE;
	  }
	  break;

	case HANDLE_ERROR:
	case HANDLE_ERROR_NO_CLOSE:
	  {
	    if (handleError(rc))
	      return rc;

	    if (step_ == HANDLE_ERROR)
	      retcode = ehi_->close();

	    step_ = DONE;
	  }
	  break;

	case DONE:
	  {
	    if (handleDone(rc))
	      return rc;

	    step_ = NOT_STARTED;
	  }
	  break;

	} // switch
    } // while

  return WORK_OK;
}

ExHbaseCoProcAggrTcb::ExHbaseCoProcAggrTcb(
          const ComTdbHbaseCoProcAggr &hbaseAccessTdb, 
          ex_globals * glob ) :
  ExHbaseAccessTcb(hbaseAccessTdb, glob),
  step_(NOT_STARTED)
{
}

ExWorkProcRetcode ExHbaseCoProcAggrTcb::work()
{
  Lng32 retcode = 0;
  short rc = 0;

  while (!qparent_.down->isEmpty())
    {
      ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
      if ((pentry_down->downState.request == ex_queue::GET_NOMORE) &&
	  (step_ != DONE))
	{
	  //	  step_ = SELECT_CLOSE_NO_ERROR;
	}

      switch (step_)
	{
	case NOT_STARTED:
	  {
	    matches_ = 0;

	    step_ = COPROC_INIT;
	  }
	  break;

	case COPROC_INIT:
	  {
	    retcode = ehi_->init(getHbaseAccessStats());
	    if (setupError(retcode, "ExpHbaseInterface::init"))
	      {
		step_ = HANDLE_ERROR;
		break;
	      }

	    table_.val = hbaseAccessTdb().getTableName();
	    table_.len = strlen(hbaseAccessTdb().getTableName());

	    hbaseAccessTdb().listOfFetchedColNames()->position();
	    hbaseAccessTdb().listOfAggrTypes()->position();

	    aggrIdx_ = 0;

	    step_ = COPROC_EVAL;
	  }
	  break;

	case COPROC_EVAL:
	  {
	    Lng32 aggrType = *(short*)hbaseAccessTdb().listOfAggrTypes()->getCurr();
	    char * col = (char*)hbaseAccessTdb().listOfFetchedColNames()->getCurr();

	    Text aggrVal;
	    Text colFam;
	    Text colName;
	    retcode = extractColFamilyAndName(col, colFam, colName);
	    if (retcode)
	      {
		step_ = HANDLE_ERROR;
		break;
	      }

	    retcode = ehi_->coProcAggr(table_, 
				       aggrType, 
				       "", // startRow
				       "", // stopRow
				       colFam, 
				       colName, 
				       FALSE, // cacheBlocks
				       100, //numCacheRows
				       aggrVal);
	    if (setupError(retcode, "ExpHbaseInterface::coProcAggr"))
	      {
		step_ = HANDLE_ERROR;
		break;
	      }

	    ExpTupleDesc * convertTuppTD =
	      hbaseAccessTdb().workCriDesc_->getTupleDescriptor
	      (hbaseAccessTdb().convertTuppIndex_);

	    Attributes * attr = convertTuppTD->getAttr(aggrIdx_);
	    if (! attr)
	      {
		step_ = HANDLE_ERROR;
		break;
	      }

	    if (attr->getNullFlag())
	      {
		*(short*)&convertRow_[attr->getNullIndOffset()] = 0;
	      }

	    str_cpy_all(&convertRow_[attr->getOffset()], aggrVal.data(), aggrVal.length());

	    hbaseAccessTdb().listOfAggrTypes()->advance();
	    hbaseAccessTdb().listOfFetchedColNames()->advance();
	    aggrIdx_++;

	    if (hbaseAccessTdb().listOfAggrTypes()->atEnd())
	      {
		step_ = RETURN_ROW;
		break;
	      }
	  }
	  break;

	case RETURN_ROW:
	  {
	    short rc = 0;
	    if (moveRowToUpQueue(convertRow_, hbaseAccessTdb().convertRowLen(), 
				 &rc, FALSE))
	      return 1;
	    
	    step_ = DONE;
	  }
	  break;

	case HANDLE_ERROR:
	  {
	    if (handleError(rc))
	      return rc;

	    retcode = ehi_->close();

	    step_ = DONE;
	  }
	  break;

	case DONE:
	  {
	    if (handleDone(rc))
	      return rc;

	    step_ = NOT_STARTED;
	  }
	  break;

	} // switch
    } // while

  return WORK_OK;
}
