| /********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| **********************************************************************/ |
| /* -*-C++-*- |
| ****************************************************************************** |
| * |
| * File: GenRelUpdate.C |
| * Description: update/delete/insert operators |
| * |
| * Created: 5/17/94 |
| * Language: C++ |
| * |
| * |
| ****************************************************************************** |
| */ |
| |
| #define SQLPARSERGLOBALS_FLAGS // must precede all #include's |
| #define SQLPARSERGLOBALS_NADEFAULTS |
| |
| #include "Platform.h" |
| |
| #include "Sqlcomp.h" |
| #include "GroupAttr.h" |
| #include "RelMisc.h" |
| #include "RelUpdate.h" |
| #include "RelJoin.h" |
| #include "ControlDB.h" |
| #include "GenExpGenerator.h" |
| #include "ComTdbDp2Oper.h" |
| #include "ComTdbUnion.h" |
| #include "ComTdbOnlj.h" |
| #include "ComTdbHbaseAccess.h" |
| #include "PartFunc.h" |
| #include "HashRow.h" |
| #include "CmpStatement.h" |
| #include "OptimizerSimulator.h" |
| #include "ComTdbFastTransport.h" |
| #include "CmpSeabaseDDL.h" |
| #include "NAExecTrans.h" |
| #include <algorithm> |
| #include "SqlParserGlobals.h" // must be last #include |
| |
| ///////////////////////////////////////////////////////////////////// |
| // |
| // Contents: |
| // |
| // DeleteCursor::codeGen() |
| // Delete::codeGen() |
| // |
| // Insert::codeGen() |
| // |
| // UpdateCursor::codeGen() |
| // Update::codeGen() |
| // |
| // ##IM: to be REMOVED: |
| // ## the imCodeGen methods, Generator::im*, the executor imd class. |
| // |
| ////////////////////////////////////////////////////////////////////// |
| |
| extern int CreateAllCharsExpr(const NAType &formalType, |
| ItemExpr &actualValue, |
| CmpContext *cmpContext, |
| ItemExpr *&newExpr); |
| |
| inline static NABoolean getReturnRow(const GenericUpdate *gu, |
| const IndexDesc *index) |
| { |
| return gu->producesOutputs(); |
| } |
| |
| static DP2LockFlags initLockFlags(GenericUpdate *gu, Generator * generator) |
| { |
| // fix case 10-040429-7402 by checking gu's statement level access options |
| // first before declaring any error 3140/3141. |
| |
| TransMode::IsolationLevel ilForUpd; |
| generator->verifyUpdatableTransMode(&gu->accessOptions(), |
| generator->getTransMode(), |
| &ilForUpd); |
| DP2LockFlags lf; |
| if (gu->accessOptions().userSpecified()) |
| lf = gu->accessOptions().getDP2LockFlags(); |
| else |
| lf = generator->getTransMode()->getDP2LockFlags(); |
| |
| // stable access with update/delete/insert are treated as |
| // read committed. |
| if (lf.getConsistencyLevel() == DP2LockFlags::STABLE) |
| lf.setConsistencyLevel(DP2LockFlags::READ_COMMITTED); |
| |
| if ((ilForUpd != TransMode::IL_NOT_SPECIFIED_) && |
| (NOT gu->accessOptions().userSpecified())) |
| { |
| TransMode t(ilForUpd); |
| lf.setConsistencyLevel( |
| (DP2LockFlags::ConsistencyLevel)t.getDP2LockFlags().getConsistencyLevel()); |
| lf.setLockState( |
| (DP2LockFlags::LockState)t.getDP2LockFlags().getLockState()); |
| } |
| |
| return lf; |
| } |
| |
| void GenericUpdate::setTransactionRequired(Generator *generator, |
| NABoolean isNeededForAllFragments) |
| { |
| if (!generator->isInternalRefreshStatement() || |
| getIndexDesc()->getNAFileSet()->isAudited()) |
| { |
| generator->setTransactionFlag(TRUE, isNeededForAllFragments); |
| } |
| else |
| { |
| // Internal refresh statement and table is non-audited. |
| if (!getTableDesc()->getNATable()->isAnMV() && |
| getTableName().getSpecialType() != ExtendedQualName::IUD_LOG_TABLE && |
| getTableName().getSpecialType() != ExtendedQualName::GHOST_IUD_LOG_TABLE) |
| { |
| generator->setTransactionFlag(TRUE, isNeededForAllFragments); |
| } |
| } |
| } |
| |
| /////////////////////////////////////////////////////////// |
| // |
| // DeleteCursor::codeGen() |
| // |
| /////////////////////////////////////////////////////////// |
| short DeleteCursor::codeGen(Generator * generator) |
| { |
| GenAssert(0, "DeleteCursor::codeGen:should not reach here."); |
| |
| return 0; |
| } |
| |
| static short genUpdExpr( |
| Generator * generator, |
| TableDesc * tableDesc, // IN |
| const IndexDesc * indexDesc, // IN |
| ValueIdArray &recExprArray, // IN |
| const Int32 updatedRowAtpIndex, // IN |
| ex_expr** updateExpr, // OUT |
| ULng32 &updateRowLen, // OUT |
| ExpTupleDesc** ufRowTupleDesc, // OUT fetched/updated RowTupleDesc, |
| // depending on updOpt (TRUE ->fetched) |
| NABoolean updOpt) // IN |
| { |
| ExpGenerator * expGen = generator->getExpGenerator(); |
| |
| ExpTupleDesc::TupleDataFormat tupleFormat = |
| generator->getTableDataFormat( tableDesc->getNATable(), indexDesc); |
| NABoolean alignedFormat = tupleFormat == ExpTupleDesc::SQLMX_ALIGNED_FORMAT; |
| |
| // Generate the update expression that will create the updated row |
| // given to DP2 at runtime. |
| ValueIdList updtRowVidList; |
| BaseColumn *updtCol = NULL, |
| *fetchedCol = NULL; |
| Lng32 updtColNum = -1, |
| fetchedColNum = 0; |
| ItemExpr *updtColVal = NULL, |
| *castNode = NULL; |
| CollIndex recEntries = recExprArray.entries(), |
| colEntries = indexDesc->getIndexColumns().entries(), |
| j = 0; |
| NAColumn *col; |
| NAColumnArray colArray; |
| |
| for (CollIndex i = 0; i < colEntries; i++) |
| { |
| fetchedCol = |
| (BaseColumn *)(((indexDesc->getIndexColumns())[i]).getItemExpr()); |
| fetchedColNum = fetchedCol->getColNumber(); |
| |
| updtCol = |
| (updtCol != NULL |
| ? updtCol |
| : (j < recEntries |
| ? (BaseColumn *)(recExprArray[j].getItemExpr()->child(0)->castToItemExpr()) |
| : NULL)); |
| |
| updtColNum = (updtCol ? updtCol->getColNumber() : -1); |
| |
| if (fetchedColNum == updtColNum) |
| { |
| updtColVal = recExprArray[j].getItemExpr()->child(1)->castToItemExpr(); |
| j++; |
| updtCol = NULL; |
| } |
| else |
| { |
| updtColVal = fetchedCol; |
| } |
| |
| ValueId updtValId = fetchedCol->getValueId(); |
| |
| castNode = new(generator->wHeap()) Cast(updtColVal, &(updtValId.getType())); |
| |
| castNode->bindNode(generator->getBindWA()); |
| |
| if (((updOpt) && (fetchedColNum == updtColNum)) || |
| (NOT updOpt)) |
| { |
| if (updOpt) |
| { |
| // assign the attributes of the fetched col to the |
| // updated col. |
| generator->addMapInfo( |
| castNode->getValueId(), |
| generator->getMapInfo(fetchedCol->getValueId())->getAttr()); |
| } |
| |
| if ( alignedFormat && |
| (col = updtValId.getNAColumn( TRUE )) && |
| (col != NULL) ) |
| colArray.insert( col ); |
| |
| updtRowVidList.insert(castNode->getValueId()); |
| } |
| } // for each column |
| |
| // Generate the update expression |
| // |
| if (NOT updOpt) |
| { |
| // Tell the expression generator that we're coming in for an insert |
| // or an update. This flag will be cleared in generateContigousMoveExpr. |
| if ( tupleFormat == ExpTupleDesc::SQLMX_FORMAT || |
| tupleFormat == ExpTupleDesc::SQLMX_ALIGNED_FORMAT ) |
| expGen->setForInsertUpdate( TRUE ); |
| |
| expGen->generateContiguousMoveExpr |
| (updtRowVidList, |
| // (IN) Don't add convert nodes, Cast's have already been done. |
| 0, |
| // (IN) Destination Atp |
| 1, |
| // (IN) Destination Atp index |
| updatedRowAtpIndex, |
| // (IN) Destination data format |
| tupleFormat, |
| // (OUT) Destination tuple length |
| updateRowLen, |
| // (OUT) Generated expression |
| updateExpr, |
| // (OUT) Tuple descriptor for destination tuple |
| ufRowTupleDesc, |
| // (IN) Tuple descriptor format |
| ExpTupleDesc::LONG_FORMAT, |
| NULL, NULL, 0, NULL, NULL, |
| &colArray); |
| } |
| else |
| { |
| // update opt being done. Fetched and updated row are exactly |
| // the same. Updated values will overwrite the copy of fetched row |
| // at runtime. Change the atp & atpindex for target. |
| expGen->assignAtpAndAtpIndex(updtRowVidList, |
| 1, updatedRowAtpIndex); |
| |
| // No need to generate a header clause since the entire fetched row |
| // is copied to the updated row - header is in place. |
| expGen->setNoHeaderNeeded( TRUE ); |
| |
| // generate the update expression |
| expGen->generateListExpr(updtRowVidList,ex_expr::exp_ARITH_EXPR, |
| updateExpr); |
| |
| // restore the header flag |
| expGen->setNoHeaderNeeded( FALSE ); |
| } |
| |
| return 0; |
| } |
| |
| // Used to generate update or insert constraint expressions for update operators |
| static short genUpdConstraintExpr(Generator * generator, |
| ItemExpr * constrTree, |
| const ValueIdSet & constraintColumns, |
| ValueIdArray & targetRecExprArray, |
| ex_expr ** targetExpr /* out */) |
| { |
| ExpGenerator * expGen = generator->getExpGenerator(); |
| |
| // The ValueIds in the constrTree refer to the source values of the columns. |
| // Construct a ValueIdMap so we can rewrite the constrTree to refer to the |
| // target value of the columns. |
| |
| ValueIdMap sourceToTarget; // top values will be source, bottom will be target |
| |
| for (ValueId sourceValId = constraintColumns.init(); |
| constraintColumns.next(sourceValId); |
| constraintColumns.advance(sourceValId)) |
| { |
| GenAssert(sourceValId.getItemExpr()->getOperatorType() == ITM_INDEXCOLUMN, |
| "unexpected type of constraint expression column"); |
| NAColumn * sourceCol = ((IndexColumn*)sourceValId.getItemExpr())->getNAColumn(); |
| ValueId targetValId; |
| for (CollIndex ni = 0; (ni < targetRecExprArray.entries()); ni++) |
| { |
| const ItemExpr *assignExpr = targetRecExprArray[ni].getItemExpr(); |
| targetValId = assignExpr->child(0)->castToItemExpr()->getValueId(); |
| NAColumn *targetCol = NULL; |
| if (targetValId.getItemExpr()->getOperatorType() == ITM_BASECOLUMN) |
| targetCol = ((BaseColumn*)targetValId.getItemExpr())->getNAColumn(); |
| else if (targetValId.getItemExpr()->getOperatorType() == ITM_INDEXCOLUMN) |
| targetCol = ((IndexColumn*)targetValId.getItemExpr())->getNAColumn(); |
| |
| if (targetCol && sourceCol->getPosition() == targetCol->getPosition()) |
| { |
| GenAssert(sourceCol->getNATable() == targetCol->getNATable(), |
| "expecting same NATable for constraint source and target"); |
| |
| // We found the target column matching the source column in the |
| // targetRecExprArray. Now, an optimization: If the assignment |
| // merely moves the old column value to the new, there is no need |
| // to map it. |
| |
| ValueId rhsValId = assignExpr->child(1)->castToItemExpr()->getValueId(); |
| NAColumn *rhsCol = NULL; |
| if (rhsValId.getItemExpr()->getOperatorType() == ITM_BASECOLUMN) |
| rhsCol = ((BaseColumn*)rhsValId.getItemExpr())->getNAColumn(); |
| else if (rhsValId.getItemExpr()->getOperatorType() == ITM_INDEXCOLUMN) |
| rhsCol = ((IndexColumn*)rhsValId.getItemExpr())->getNAColumn(); |
| |
| if (rhsCol && rhsCol->getPosition() == targetCol->getPosition()) |
| { |
| // assignment copies old column value to target without change; |
| // no need to map |
| GenAssert(rhsCol->getNATable() == targetCol->getNATable(), |
| "expecting same NATable for assignment source and target"); |
| } |
| else |
| { |
| // the column value is changing (or maybe this is an insert), |
| // so map it |
| sourceToTarget.addMapEntry(sourceValId, targetValId); |
| } |
| ni = targetRecExprArray.entries(); // found it, no need to search further |
| } |
| } |
| } |
| |
| // If there is anything to map, rewrite the constraint expression |
| // and generate it. If there is nothing to map, that means none of |
| // the constraint expression columns is changed (which implies this |
| // is an update expr and not an insert, by the way). In that case, we |
| // don't need to generate the constraint expression as the constraint |
| // should already be satisfied by the old values. |
| |
| if (sourceToTarget.entries() > 0) |
| { |
| // map the ValueIds in the constraint tree to target values |
| ValueId mappedConstrTree; |
| sourceToTarget.rewriteValueIdDown(constrTree->getValueId(),mappedConstrTree /* out */); |
| |
| // generate the expression |
| expGen->generateExpr(mappedConstrTree, ex_expr::exp_SCAN_PRED, |
| targetExpr); |
| } |
| else |
| { |
| targetExpr = NULL; |
| } |
| |
| return 0; |
| } |
| |
| static short genHbaseUpdOrInsertExpr( |
| Generator * generator, |
| NABoolean isInsert, |
| ValueIdArray &updRecExprArray, // IN |
| const Int32 updateTuppIndex, // IN |
| ex_expr** updateExpr, // OUT |
| ULng32 &updateRowLen, // OUT |
| ExpTupleDesc** updateTupleDesc, // OUT updated RowTupleDesc, |
| Queue* &listOfUpdatedColNames, // OUT |
| ex_expr** mergeInsertRowIdExpr, // out |
| ULng32 &mergeInsertRowIdLen, // OUT |
| const Int32 mergeInsertRowIdTuppIndex, // IN |
| const IndexDesc * indexDesc) // IN |
| { |
| ExpGenerator * expGen = generator->getExpGenerator(); |
| Space * space = generator->getSpace(); |
| |
| *updateExpr = NULL; |
| updateRowLen = 0; |
| |
| // Generate the update expression that will create the updated row |
| ValueIdList updRowVidList; |
| |
| NABoolean isAligned = FALSE; |
| |
| if (indexDesc->getNAFileSet()->isSqlmxAlignedRowFormat()) |
| isAligned = TRUE; |
| |
| ExpTupleDesc::TupleDataFormat tupleFormat; |
| if (isAligned) |
| tupleFormat = ExpTupleDesc::SQLMX_ALIGNED_FORMAT; |
| else |
| tupleFormat = ExpTupleDesc::SQLARK_EXPLODED_FORMAT; |
| |
| listOfUpdatedColNames = NULL; |
| if (updRecExprArray.entries() > 0) |
| listOfUpdatedColNames = new(space) Queue(space); |
| |
| NAColumnArray colArray; |
| NAColumn *col; |
| |
| for (CollIndex ii = 0; ii < updRecExprArray.entries(); ii++) |
| { |
| const ItemExpr *assignExpr = updRecExprArray[ii].getItemExpr(); |
| ValueId assignExprValueId = assignExpr->getValueId(); |
| |
| ValueId tgtValueId = assignExpr->child(0)->castToItemExpr()->getValueId(); |
| ValueId srcValueId = assignExpr->child(1)->castToItemExpr()->getValueId(); |
| |
| // populate the colArray because this info is needed later to identify |
| // the added columns. |
| if ( isAligned ) |
| { |
| col = tgtValueId.getNAColumn( TRUE ); |
| if ( col != NULL ) |
| colArray.insert( col ); |
| } |
| |
| ItemExpr * ie = NULL; |
| |
| ie = new(generator->wHeap()) |
| Cast(assignExpr->child(1), &tgtValueId.getType()); |
| |
| BaseColumn * bc = |
| (BaseColumn*)(updRecExprArray[ii].getItemExpr()->child(0)->castToItemExpr()); |
| |
| GenAssert(bc->getOperatorType() == ITM_BASECOLUMN, |
| "unexpected type of base table column"); |
| |
| const NAColumn *nac = bc->getNAColumn(); |
| if ((NOT isAligned) && HbaseAccess::isEncodingNeededForSerialization(bc)) |
| { |
| ie = new(generator->wHeap()) CompEncode |
| (ie, FALSE, -1, CollationInfo::Sort, TRUE); |
| } |
| |
| ie->bindNode(generator->getBindWA()); |
| updRowVidList.insert(ie->getValueId()); |
| |
| if (NOT isAligned) |
| { |
| NAString cnInList; |
| HbaseAccess::createHbaseColId(nac, cnInList); |
| |
| char * colNameInList = |
| space->AllocateAndCopyToAlignedSpace(cnInList, 0); |
| |
| listOfUpdatedColNames->insert(colNameInList); |
| } |
| } |
| |
| if ((isAligned) && (listOfUpdatedColNames) && |
| (updRecExprArray.entries() > 0)) |
| { |
| NAString cnInList(SEABASE_DEFAULT_COL_FAMILY); |
| cnInList += ":"; |
| unsigned char c = 1; |
| cnInList.append((char*)&c, 1); |
| short len = cnInList.length(); |
| cnInList.prepend((char*)&len, sizeof(short)); |
| |
| char * colNameInList = |
| space->AllocateAndCopyToAlignedSpace(cnInList, 0); |
| |
| listOfUpdatedColNames->insert(colNameInList); |
| } |
| |
| // Generate the update expression |
| // |
| expGen->generateContiguousMoveExpr |
| (updRowVidList, |
| 0, // (IN) Don't add convert nodes, Cast's have already been done. |
| 1, // (IN) Destination Atp |
| updateTuppIndex, // (IN) Destination Atp index |
| tupleFormat, |
| updateRowLen, // (OUT) Destination tuple length |
| updateExpr, // (OUT) Generated expression |
| updateTupleDesc, // (OUT) Tuple descriptor for destination tuple |
| ExpTupleDesc::LONG_FORMAT, |
| NULL, NULL, 0, NULL, NULL, |
| &colArray); // colArray is needed to identify any added cols. |
| |
| // Assign attributes to the ASSIGN nodes of the newRecExpArray() |
| // This is not the same as the generateContiguousMoveExpr() call |
| // above since different valueId's are added to the mapTable. |
| // |
| |
| // Assign attributes to the ASSIGN nodes of the updRecExpArray() |
| // This is not the same as the generateContiguousMoveExpr() call |
| // above since different valueId's are added to the mapTable. |
| // |
| for (CollIndex ii = 0; ii < updRecExprArray.entries(); ii++) |
| { |
| const ItemExpr *assignExpr = updRecExprArray[ii].getItemExpr(); |
| ValueId assignExprValueId = assignExpr->getValueId(); |
| Attributes * assignAttr = (generator->addMapInfo(assignExprValueId, 0))->getAttr(); |
| |
| ValueId updValId = updRowVidList[ii]; |
| Attributes * updValAttr = (generator->getMapInfo(updValId, 0))->getAttr(); |
| assignAttr->copyLocationAttrs(updValAttr); |
| } |
| for (CollIndex ii = 0; ii < updRecExprArray.entries(); ii++) |
| { |
| const ItemExpr *assignExpr = updRecExprArray[ii].getItemExpr(); |
| ValueId assignExprValueId = assignExpr->getValueId(); |
| |
| ValueId tgtValueId = assignExpr->child(0)->castToItemExpr()->getValueId(); |
| |
| Attributes * colAttr = (generator->addMapInfo(tgtValueId, 0))->getAttr(); |
| Attributes * assignAttr = (generator->getMapInfo(assignExprValueId, 0))->getAttr(); |
| |
| colAttr->copyLocationAttrs(assignAttr); |
| |
| BaseColumn * bc = (BaseColumn *)assignExpr->child(0)->castToItemExpr(); |
| const NAColumn *nac = bc->getNAColumn(); |
| if (nac->isAddedColumn()) |
| { |
| colAttr->setAddedCol(); |
| |
| Attributes::DefaultClass dc = expGen->getDefaultClass(nac); |
| colAttr->setDefaultClass(dc); |
| |
| Attributes * attr = (*updateTupleDesc)->getAttr(ii); |
| attr->setAddedCol(); |
| attr->setDefaultClass(dc); |
| } |
| |
| } |
| |
| if ((isInsert) && |
| (updRowVidList.entries() > 0)) |
| { |
| ValueIdList updRowKeyVidList; |
| const NAColumnArray &keyColArray = indexDesc->getNAFileSet()->getIndexKeyColumns(); |
| ULng32 firstKeyColumnOffset = 0; |
| |
| for (CollIndex kc=0; kc<keyColArray.entries(); kc++) |
| updRowKeyVidList.insert(updRowVidList[keyColArray[kc]->getPosition()]); |
| |
| expGen->generateKeyEncodeExpr(indexDesc, |
| 1, // (IN) Destination Atp |
| mergeInsertRowIdTuppIndex, |
| ExpTupleDesc::SQLMX_KEY_FORMAT, |
| mergeInsertRowIdLen, |
| mergeInsertRowIdExpr, |
| FALSE, |
| firstKeyColumnOffset, |
| &updRowKeyVidList, |
| TRUE); |
| } |
| |
| return 0; |
| } |
| |
| // |
| // Create and bind an assign node for each vertical-partition column |
| // (i.e., a column in a partition of a VP table). The assign node |
| // assigns the base table column to the VP column. |
| // |
| static void bindVPCols(Generator *generator, |
| const ValueIdList & vpCols, |
| ValueIdList & resList) |
| { |
| BindWA *bindWA = generator->getBindWA(); |
| |
| for (CollIndex colNo=0; colNo<vpCols.entries(); colNo++) |
| { |
| // Get the VP column -- must be ITM_INDEXCOLUMN |
| IndexColumn *vpColItem = (IndexColumn*)vpCols[colNo].getItemExpr(); |
| GenAssert(vpColItem->getOperatorType() == ITM_INDEXCOLUMN, |
| "unexpected type of vp column"); |
| |
| // Get the corresponding base table column -- must be ITM_BASECOLUMN |
| ItemExpr *tblColItem = vpColItem->getDefinition().getItemExpr(); |
| GenAssert(tblColItem->getOperatorType() == ITM_BASECOLUMN, |
| "unexpected type of base table column"); |
| |
| Assign *assign = new (bindWA->wHeap()) Assign(vpColItem, tblColItem, FALSE); |
| |
| assign->bindNode(bindWA); |
| if (bindWA->errStatus()) |
| { |
| GenAssert(0,"bindNode of vpCol failed"); |
| } |
| |
| resList.insertAt(colNo, assign->getValueId()); |
| } |
| } |
| |
| short HiveInsert::codeGen(Generator *generator) |
| { |
| if(!generator->explainDisabled()) { |
| |
| Space * space = generator->getSpace(); |
| |
| // a dummy tdb |
| ComTdbFastExtract* fe_tdb = new (space) ComTdbFastExtract(); |
| |
| generator->setExplainTuple( addExplainInfo(fe_tdb, 0, 0, generator)); |
| } |
| |
| return 0; |
| } |
| |
| |
| /////////////////////////////////////////////////////////// |
| // |
| // UpdateCursor::codeGen() |
| // |
| /////////////////////////////////////////////////////////// |
| short UpdateCursor::codeGen(Generator * generator) |
| { |
| GenAssert(0, "UpdateCursor::codeGen:should not reach here."); |
| |
| return 0; |
| } |
| |
| // |
| // This function is for aligned row format only. |
| // This will order all the fixed fields by their alignment size, |
| // followed by any added fixed fields, |
| // followed by all variable fields (original or added). |
| static void orderColumnsByAlignment(NAArray<BaseColumn *> columns, |
| UInt32 numColumns, |
| NAArray<BaseColumn *> * orderedCols ) |
| { |
| Int16 rc = 0; |
| NAList<BaseColumn *> varCols(STMTHEAP, 5); |
| NAList<BaseColumn *> addedCols(STMTHEAP, 5); |
| NAList<BaseColumn *> align4(STMTHEAP, 5); |
| NAList<BaseColumn *> align2(STMTHEAP, 5); |
| NAList<BaseColumn *> align1(STMTHEAP, 5); |
| BaseColumn *currColumn; |
| CollIndex i, k; |
| Int32 alignmentSize; |
| |
| for( i = 0, k = 0; i < numColumns; i++ ) |
| { |
| if ( columns.used(i) ) |
| { |
| currColumn = columns[ i ]; |
| |
| if ( currColumn->getType().isVaryingLen() ) |
| { |
| varCols.insert( currColumn ); |
| } |
| else |
| { |
| if ( currColumn->getNAColumn()->isAddedColumn() ) |
| { |
| addedCols.insert( currColumn ); |
| continue; |
| } |
| |
| alignmentSize = currColumn->getType().getDataAlignment(); |
| |
| if (8 == alignmentSize) |
| orderedCols->insertAt(k++, currColumn ); |
| else if ( 4 == alignmentSize ) |
| align4.insert( currColumn ); |
| else if ( 2 == alignmentSize ) |
| align2.insert( currColumn ); |
| else |
| align1.insert( currColumn ); |
| } |
| } |
| } |
| |
| if (align4.entries() > 0) |
| for( i = 0; i < align4.entries(); i++ ) |
| orderedCols->insertAt( k++, align4[ i ] ); |
| |
| if (align2.entries() > 0) |
| for( i = 0; i < align2.entries(); i++ ) |
| orderedCols->insertAt( k++, align2[ i ] ); |
| |
| if (align1.entries() > 0) |
| for( i = 0; i < align1.entries(); i++ ) |
| orderedCols->insertAt( k++, align1[ i ] ); |
| |
| if (addedCols.entries() > 0) |
| for( i = 0; i < addedCols.entries(); i++ ) |
| orderedCols->insertAt( k++, addedCols[ i ] ); |
| |
| if (varCols.entries() > 0) |
| for( i = 0; i < varCols.entries(); i++ ) |
| orderedCols->insertAt( k++, varCols[ i ] ); |
| } |
| |
| short Delete::codeGen(Generator * /*generator*/) |
| { |
| return -1; |
| } |
| |
| short Insert::codeGen(Generator * /*generator*/) |
| { |
| return -1; |
| } |
| |
| short Update::codeGen(Generator * /*generator*/) |
| { |
| return -1; |
| } |
| |
| short MergeUpdate::codeGen(Generator * /*generator*/) |
| { |
| return -1; |
| } |
| |
| short MergeDelete::codeGen(Generator * /*generator*/) |
| { |
| return -1; |
| } |
| |
| short HbaseDelete::codeGen(Generator * generator) |
| { |
| Space * space = generator->getSpace(); |
| ExpGenerator * expGen = generator->getExpGenerator(); |
| |
| // allocate a map table for the retrieved columns |
| // generator->appendAtEnd(); |
| MapTable * last_map_table = generator->getLastMapTable(); |
| |
| ex_expr *scanExpr = 0; |
| ex_expr *proj_expr = 0; |
| ex_expr *convert_expr = NULL; |
| ex_expr * keyColValExpr = NULL; |
| ex_expr *preCondExpr = NULL; |
| ex_expr *lobExpr = NULL; |
| |
| ex_cri_desc * givenDesc |
| = generator->getCriDesc(Generator::DOWN); |
| |
| ex_cri_desc * returnedDesc = NULL; |
| |
| const Int32 work_atp = 1; |
| const Int32 convertTuppIndex = 2; |
| const Int32 rowIdTuppIndex = 3; |
| const Int32 asciiTuppIndex = 4; |
| const Int32 rowIdAsciiTuppIndex = 5; |
| const Int32 keyColValTuppIndex = 6; |
| |
| ULng32 asciiRowLen = 0; |
| ExpTupleDesc * asciiTupleDesc = 0; |
| |
| ex_cri_desc * work_cri_desc = NULL; |
| work_cri_desc = new(space) ex_cri_desc(7, space); |
| |
| returnedDesc = new(space) ex_cri_desc(givenDesc->noTuples() + 1, space); |
| |
| NABoolean returnRow = getReturnRow(this, getIndexDesc()); |
| |
| NABoolean isAlignedFormat = getTableDesc()->getNATable()->isAlignedFormat(getIndexDesc()); |
| NABoolean isHbaseMapFormat = getTableDesc()->getNATable()->isHbaseMapTable(); |
| |
| ExpTupleDesc::TupleDataFormat asciiRowFormat = |
| (isAlignedFormat ? |
| ExpTupleDesc::SQLMX_ALIGNED_FORMAT : |
| ExpTupleDesc::SQLARK_EXPLODED_FORMAT); |
| ExpTupleDesc::TupleDataFormat hbaseRowFormat = |
| ExpTupleDesc::SQLARK_EXPLODED_FORMAT; |
| |
| ValueIdList asciiVids; |
| ValueIdList executorPredCastVids; |
| ValueIdList convertExprCastVids; |
| |
| NABoolean addDefaultValues = TRUE; |
| NABoolean hasAddedColumns = FALSE; |
| if (getTableDesc()->getNATable()->hasAddedColumn()) |
| hasAddedColumns = TRUE; |
| |
| ValueIdList columnList; |
| ValueIdList srcVIDlist; |
| ValueIdList dupVIDlist; |
| HbaseAccess::sortValues(retColRefSet_, |
| columnList, |
| srcVIDlist, dupVIDlist, |
| (getTableDesc()->getNATable()->getExtendedQualName().getSpecialType() == ExtendedQualName::INDEX_TABLE)); |
| |
| const CollIndex numColumns = columnList.entries(); |
| |
| if (! getPrecondition().isEmpty()) |
| { |
| ItemExpr * preCondTree = getPrecondition().rebuildExprTree(ITM_AND,TRUE,TRUE); |
| expGen->generateExpr(preCondTree->getValueId(), ex_expr::exp_SCAN_PRED, |
| &preCondExpr); |
| } |
| |
| // build key information |
| keyRangeGen * keyInfo = 0; |
| expGen->buildKeyInfo(&keyInfo, // out |
| generator, |
| getIndexDesc()->getNAFileSet()->getIndexKeyColumns(), |
| getIndexDesc()->getIndexKey(), |
| getBeginKeyPred(), |
| (getSearchKey() && getSearchKey()->isUnique() ? NULL : getEndKeyPred()), |
| getSearchKey(), |
| NULL, //getMdamKeyPtr(), |
| FALSE, |
| ExpTupleDesc::SQLMX_KEY_FORMAT); |
| |
| UInt32 keyColValLen = 0; |
| char * keyColName = NULL; |
| if ((canDoCheckAndUpdel()) && |
| (getSearchKey() && getSearchKey()->isUnique()) && |
| (getBeginKeyPred().entries() > 0)) |
| { |
| expGen->generateKeyColValueExpr( |
| getBeginKeyPred()[0], |
| work_atp, keyColValTuppIndex, |
| keyColValLen, |
| &keyColValExpr); |
| |
| if (! keyColValExpr) |
| canDoCheckAndUpdel() = FALSE; |
| else |
| { |
| ItemExpr * col_node = getBeginKeyPred()[0].getItemExpr()->child(0); |
| HbaseAccess::genColName(generator, col_node, keyColName); |
| } |
| } |
| |
| Queue * tdbListOfUniqueRows = NULL; |
| Queue * tdbListOfRangeRows = NULL; |
| |
| HbaseAccess::genListsOfRows(generator, |
| listOfDelSubsetRows_, |
| listOfDelUniqueRows_, |
| tdbListOfRangeRows, |
| tdbListOfUniqueRows); |
| |
| ULng32 convertRowLen = 0; |
| |
| ValueIdList lobDelVIDlist; |
| for (CollIndex ii = 0; ii < numColumns; ii++) |
| { |
| ItemExpr * col_node = ((columnList[ii]).getValueDesc())->getItemExpr(); |
| |
| const NAType &givenType = col_node->getValueId().getType(); |
| int res; |
| ItemExpr *asciiValue = NULL; |
| ItemExpr *castValue = NULL; |
| |
| res = HbaseAccess::createAsciiColAndCastExpr2( |
| generator, // for heap |
| col_node, |
| givenType, // [IN] Actual type of HDFS column |
| asciiValue, // [OUT] Returned expression for ascii rep. |
| castValue, // [OUT] Returned expression for binary rep. |
| isAlignedFormat |
| ); |
| |
| GenAssert(res == 1 && asciiValue != NULL && castValue != NULL, |
| "Error building expression tree for cast output value"); |
| asciiValue->synthTypeAndValueId(); |
| asciiValue->bindNode(generator->getBindWA()); |
| asciiVids.insert(asciiValue->getValueId()); |
| |
| castValue->bindNode(generator->getBindWA()); |
| convertExprCastVids.insert(castValue->getValueId()); |
| |
| if (col_node->getValueId().getType().isLob()) |
| { |
| ItemExpr * ld = new(generator->wHeap()) |
| LOBdelete(castValue); |
| ld->bindNode(generator->getBindWA()); |
| lobDelVIDlist.insert(ld->getValueId()); |
| } |
| |
| } // for (ii = 0; ii < numCols; ii++) |
| |
| // Add ascii columns to the MapTable. After this call the MapTable |
| // has ascii values in the work ATP at index asciiTuppIndex. |
| const NAColumnArray * colArray = NULL; |
| unsigned short pcm = expGen->getPCodeMode(); |
| if ((asciiRowFormat == ExpTupleDesc::SQLMX_ALIGNED_FORMAT) && |
| (hasAddedColumns)) |
| { |
| colArray = &getIndexDesc()->getAllColumns(); |
| |
| expGen->setPCodeMode(ex_expr::PCODE_NONE); |
| } |
| |
| expGen->processValIdList( |
| asciiVids, // [IN] ValueIdList |
| asciiRowFormat, // [IN] tuple data format |
| asciiRowLen, // [OUT] tuple length |
| work_atp, // [IN] atp number |
| asciiTuppIndex, // [IN] index into atp |
| &asciiTupleDesc, // [optional OUT] tuple desc |
| ExpTupleDesc::LONG_FORMAT, // [optional IN] desc format |
| 0, |
| NULL, |
| (NAColumnArray*)colArray); |
| |
| work_cri_desc->setTupleDescriptor(asciiTuppIndex, asciiTupleDesc); |
| |
| ExpTupleDesc * tuple_desc = 0; |
| |
| expGen->generateContiguousMoveExpr( |
| convertExprCastVids, // [IN] source ValueIds |
| FALSE, // [IN] add convert nodes? |
| work_atp, // [IN] target atp number |
| convertTuppIndex, // [IN] target tupp index |
| hbaseRowFormat, // [IN] target tuple format |
| convertRowLen, // [OUT] target tuple length |
| &convert_expr, // [OUT] move expression |
| &tuple_desc, // [optional OUT] target tuple desc |
| ExpTupleDesc::LONG_FORMAT, // [optional IN] target desc format |
| NULL, |
| NULL, |
| 0, |
| NULL, |
| FALSE, |
| NULL, |
| FALSE /* doBulkMove */); |
| |
| if ((asciiRowFormat == ExpTupleDesc::SQLMX_ALIGNED_FORMAT) && |
| (hasAddedColumns)) |
| { |
| expGen->setPCodeMode(pcm); |
| } |
| |
| for (CollIndex i = 0; i < columnList.entries(); i++) |
| { |
| ValueId colValId = columnList[i]; |
| ValueId castValId = convertExprCastVids[i]; |
| |
| Attributes * colAttr = (generator->addMapInfo(colValId, 0))->getAttr(); |
| Attributes * castAttr = (generator->getMapInfo(castValId))->getAttr(); |
| |
| colAttr->copyLocationAttrs(castAttr); |
| } // for |
| |
| if (getScanIndexDesc() != NULL) |
| { |
| for (CollIndex i = 0; i < getScanIndexDesc()->getIndexColumns().entries(); i++) |
| { |
| ValueId scanIndexDescVID = getScanIndexDesc()->getIndexColumns()[i]; |
| const ValueId indexDescVID = getIndexDesc()->getIndexColumns()[i]; |
| |
| CollIndex pos = 0; |
| |
| pos = columnList.index(indexDescVID); |
| if (pos != NULL_COLL_INDEX) |
| { |
| Attributes * colAttr = (generator->addMapInfo(scanIndexDescVID, 0))->getAttr(); |
| |
| ValueId castValId = convertExprCastVids[pos]; |
| Attributes * castAttr = (generator->getMapInfo(castValId))->getAttr(); |
| |
| colAttr->copyLocationAttrs(castAttr); |
| } // if |
| else |
| { |
| pos = columnList.index(scanIndexDescVID); |
| if (pos != NULL_COLL_INDEX) |
| { |
| Attributes * colAttr = (generator->addMapInfo(indexDescVID, 0))->getAttr(); |
| |
| ValueId castValId = convertExprCastVids[pos]; |
| Attributes * castAttr = (generator->getMapInfo(castValId))->getAttr(); |
| |
| colAttr->copyLocationAttrs(castAttr); |
| } // if |
| } // else |
| |
| } // for |
| } // getScanIndexDesc != NULL |
| |
| // assign location attributes to dup vids that were returned earlier. |
| for (CollIndex i = 0; i < srcVIDlist.entries(); i++) |
| { |
| ValueId srcValId = srcVIDlist[i]; |
| ValueId dupValId = dupVIDlist[i]; |
| |
| Attributes * srcAttr = (generator->getMapInfo(srcValId))->getAttr(); |
| Attributes * dupAttr = (generator->addMapInfo(dupValId, 0))->getAttr(); |
| |
| dupAttr->copyLocationAttrs(srcAttr); |
| } // for |
| |
| if (addDefaultValues) //hasAddedColumns) |
| { |
| expGen->addDefaultValues(columnList, |
| getIndexDesc()->getAllColumns(), |
| tuple_desc, |
| TRUE); |
| |
| if (asciiRowFormat == ExpTupleDesc::SQLMX_ALIGNED_FORMAT) |
| { |
| expGen->addDefaultValues(columnList, |
| getIndexDesc()->getAllColumns(), |
| asciiTupleDesc, |
| TRUE); |
| } |
| else |
| { |
| // copy default values from convertTupleDesc to asciiTupleDesc |
| expGen->copyDefaultValues(asciiTupleDesc, tuple_desc); |
| } |
| } |
| |
| // generate explain selection expression, if present |
| // if ((NOT (getTableDesc()->getNATable()->getExtendedQualName().getSpecialType() == ExtendedQualName::INDEX_TABLE)) && |
| // (! executorPred().isEmpty())) |
| if (! executorPred().isEmpty()) |
| { |
| ItemExpr * newPredTree = executorPred().rebuildExprTree(ITM_AND,TRUE,TRUE); |
| expGen->generateExpr(newPredTree->getValueId(), ex_expr::exp_SCAN_PRED, |
| &scanExpr); |
| } |
| |
| ex_expr * lobDelExpr = NULL; |
| if (getTableDesc()->getNATable()->hasLobColumn()) |
| { |
| // generate code to delete rows from LOB desc table |
| expGen->generateListExpr(lobDelVIDlist, |
| ex_expr::exp_ARITH_EXPR, &lobDelExpr); |
| } |
| |
| ULng32 rowIdAsciiRowLen = 0; |
| ExpTupleDesc * rowIdAsciiTupleDesc = 0; |
| ex_expr * rowIdExpr = NULL; |
| ULng32 rowIdLength = 0; |
| if (getTableDesc()->getNATable()->isSeabaseTable()) |
| { |
| // dont encode keys for hbase mapped tables since these tables |
| // could be populated from outside of traf. |
| NABoolean encodeKeys = TRUE; |
| if (getTableDesc()->getNATable()->isHbaseMapTable()) |
| encodeKeys = FALSE; |
| |
| HbaseAccess::genRowIdExpr(generator, |
| getIndexDesc()->getNAFileSet()->getIndexKeyColumns(), |
| getHbaseSearchKeys(), |
| work_cri_desc, work_atp, |
| rowIdAsciiTuppIndex, rowIdTuppIndex, |
| rowIdAsciiRowLen, rowIdAsciiTupleDesc, |
| rowIdLength, |
| rowIdExpr, |
| encodeKeys); |
| } |
| else |
| { |
| HbaseAccess::genRowIdExprForNonSQ(generator, |
| getIndexDesc()->getNAFileSet()->getIndexKeyColumns(), |
| getHbaseSearchKeys(), |
| work_cri_desc, work_atp, |
| rowIdAsciiTuppIndex, rowIdTuppIndex, |
| rowIdAsciiRowLen, rowIdAsciiTupleDesc, |
| rowIdLength, |
| rowIdExpr); |
| } |
| |
| Queue * listOfFetchedColNames = NULL; |
| if (isAlignedFormat) |
| { |
| listOfFetchedColNames = new(space) Queue(space); |
| |
| NAString cnInList(SEABASE_DEFAULT_COL_FAMILY); |
| cnInList += ":"; |
| unsigned char c = 1; |
| cnInList.append((char*)&c, 1); |
| short len = cnInList.length(); |
| cnInList.prepend((char*)&len, sizeof(short)); |
| |
| char * colNameInList = |
| space->AllocateAndCopyToAlignedSpace(cnInList, 0); |
| |
| listOfFetchedColNames->insert(colNameInList); |
| } |
| else |
| { |
| HbaseAccess::genListOfColNames(generator, |
| getIndexDesc(), |
| columnList, |
| listOfFetchedColNames); |
| } |
| |
| Queue * listOfDeletedColNames = NULL; |
| if (csl()) |
| { |
| listOfDeletedColNames = new(space) Queue(space); |
| for (Lng32 i = 0; i < csl()->entries(); i++) |
| { |
| NAString * nas = (NAString*)(*csl())[i]; |
| char * colNameInList = NULL; |
| |
| short len = nas->length(); |
| nas->prepend((char*)&len, sizeof(short)); |
| |
| colNameInList = |
| space->AllocateAndCopyToAlignedSpace(*nas, 0); |
| |
| listOfDeletedColNames->insert(colNameInList); |
| } |
| } |
| |
| if (getTableDesc()->getNATable()->isSeabaseTable()) |
| { |
| if ((keyInfo && getSearchKey() && getSearchKey()->isUnique()) || |
| (tdbListOfUniqueRows)) |
| { |
| // Save node for later use by RelRoot in the case of UPDATE CURRENT OF. |
| generator->updateCurrentOfRel() = (void*)this; |
| } |
| } |
| |
| if (getOptStoi() && getOptStoi()->getStoi()) |
| generator->addSqlTableOpenInfo(getOptStoi()->getStoi()); |
| |
| LateNameInfo* lateNameInfo = new(generator->wHeap()) LateNameInfo(); |
| char * compileTimeAnsiName = (char*)getOptStoi()->getStoi()->ansiName(); |
| |
| lateNameInfo->setCompileTimeName(compileTimeAnsiName, space); |
| lateNameInfo->setLastUsedName(compileTimeAnsiName, space); |
| lateNameInfo->setNameSpace(COM_TABLE_NAME); |
| if (getIndexDesc()->getNAFileSet()->getKeytag() != 0) |
| // is an index. |
| { |
| lateNameInfo->setIndex(TRUE); |
| lateNameInfo->setNameSpace(COM_INDEX_NAME); |
| } |
| generator->addLateNameInfo(lateNameInfo); |
| |
| if (returnRow) |
| { |
| // The hbase row will be returned as the last entry of the returned atp. |
| // Change the atp and atpindex of the returned values to indicate that. |
| expGen->assignAtpAndAtpIndex(getIndexDesc()->getIndexColumns(), |
| 0, returnedDesc->noTuples()-1); |
| |
| expGen->assignAtpAndAtpIndex(getScanIndexDesc()->getIndexColumns(), |
| 0, returnedDesc->noTuples()-1); |
| } |
| |
| Cardinality expectedRows = (Cardinality) getEstRowsUsed().getValue(); |
| ULng32 buffersize = getDefault(GEN_DPSO_BUFFER_SIZE); |
| buffersize = MAXOF(3*convertRowLen, buffersize); |
| queue_index upqueuelength = (queue_index)getDefault(GEN_DPSO_SIZE_UP); |
| queue_index downqueuelength = (queue_index)getDefault(GEN_DPSO_SIZE_DOWN); |
| Int32 numBuffers = getDefault(GEN_DPUO_NUM_BUFFERS); |
| |
| char * tablename = NULL; |
| if ((getTableDesc()->getNATable()->isHbaseRowTable()) || |
| (getTableDesc()->getNATable()->isHbaseCellTable()) || |
| (getTableName().getQualifiedNameObj().isHbaseMappedName())) |
| { |
| if (getIndexDesc() && getIndexDesc()->getNAFileSet()) |
| tablename = space->AllocateAndCopyToAlignedSpace(GenGetQualifiedName(getIndexDesc()->getNAFileSet()->getFileSetName().getObjectName()), 0); |
| } |
| else |
| { |
| if (getIndexDesc() && getIndexDesc()->getNAFileSet()) |
| tablename = space->AllocateAndCopyToAlignedSpace(GenGetQualifiedName(getIndexDesc()->getNAFileSet()->getFileSetName()), 0); |
| } |
| |
| if (! tablename) |
| tablename = |
| space->AllocateAndCopyToAlignedSpace( |
| GenGetQualifiedName(getTableName()), 0); |
| |
| NAString serverNAS = ActiveSchemaDB()->getDefaults().getValue(HBASE_SERVER); |
| NAString zkPortNAS = ActiveSchemaDB()->getDefaults().getValue(HBASE_ZOOKEEPER_PORT); |
| char * server = space->allocateAlignedSpace(serverNAS.length() + 1); |
| strcpy(server, serverNAS.data()); |
| char * zkPort = space->allocateAlignedSpace(zkPortNAS.length() + 1); |
| strcpy(zkPort, zkPortNAS.data()); |
| |
| ComTdbHbaseAccess::HbasePerfAttributes * hbpa = |
| new(space) ComTdbHbaseAccess::HbasePerfAttributes(); |
| if (CmpCommon::getDefault(HBASE_CACHE_BLOCKS) != DF_OFF) |
| hbpa->setCacheBlocks(TRUE); |
| // estrowsaccessed is 0 for now, so cache size will be set to minimum |
| generator->setHBaseNumCacheRows(getEstRowsAccessed().getValue(), hbpa,rowIdAsciiRowLen) ; |
| |
| // create hdfsscan_tdb |
| ComTdbHbaseAccess *hbasescan_tdb = new(space) |
| ComTdbHbaseAccess( |
| ComTdbHbaseAccess::DELETE_, |
| tablename, |
| |
| convert_expr, |
| scanExpr, |
| rowIdExpr, |
| NULL, // updateExpr |
| lobDelExpr, // NULL, // mergeInsertExpr |
| NULL, // mergeInsertRowIdExpr |
| NULL, // mergeUpdScanExpr |
| NULL, // projExpr |
| NULL, // returnedUpdatedExpr |
| NULL, // returnMergeUpdateExpr |
| NULL, // encodedKeyExpr |
| keyColValExpr, |
| NULL, // hbaseFilterValExpr |
| |
| asciiRowLen, |
| convertRowLen, |
| 0, // updateRowLen |
| 0, // mergeInsertRowLen |
| 0, // fetchedRowLen |
| 0, // returnedRowLen |
| |
| rowIdLength, |
| convertRowLen, |
| rowIdAsciiRowLen, |
| (keyInfo ? keyInfo->getKeyLength() : 0), |
| keyColValLen, |
| 0, // hbaseFilterValRowLen |
| |
| asciiTuppIndex, |
| convertTuppIndex, |
| 0, // updateTuppIndex |
| 0, // mergeInsertTuppIndex |
| 0, // mergeInsertRowIdTuppIndex |
| 0, // mergeIUDIndicatorTuppIndex |
| 0, // returnedFetchedTuppIndex |
| 0, // returnedUpdatedTuppIndex |
| |
| rowIdTuppIndex, |
| returnedDesc->noTuples()-1, |
| rowIdAsciiTuppIndex, |
| 0, // keyTuppIndex, |
| keyColValTuppIndex, |
| 0, // hbaseFilterValTuppIndex |
| |
| 0, // hbaseTimestamp |
| 0, // hbaseVersion |
| |
| tdbListOfRangeRows, |
| tdbListOfUniqueRows, |
| listOfFetchedColNames, |
| listOfDeletedColNames, |
| NULL, |
| |
| keyInfo, |
| keyColName, |
| |
| work_cri_desc, |
| givenDesc, |
| returnedDesc, |
| downqueuelength, |
| upqueuelength, |
| expectedRows, |
| numBuffers, |
| buffersize, |
| |
| server, |
| zkPort, |
| hbpa |
| ); |
| |
| generator->initTdbFields(hbasescan_tdb); |
| |
| if ((CmpCommon::getDefault(HBASE_ASYNC_OPERATIONS) == DF_ON) |
| && getInliningInfo().isIMGU()) |
| hbasescan_tdb->setAsyncOperations(TRUE); |
| |
| if (getTableDesc()->getNATable()->isHbaseRowTable()) //rowwiseHbaseFormat()) |
| hbasescan_tdb->setRowwiseFormat(TRUE); |
| |
| if (getTableDesc()->getNATable()->isSeabaseTable()) |
| { |
| hbasescan_tdb->setSQHbaseTable(TRUE); |
| |
| if (isAlignedFormat) |
| hbasescan_tdb->setAlignedFormat(TRUE); |
| |
| if (isHbaseMapFormat) |
| { |
| hbasescan_tdb->setHbaseMapTable(TRUE); |
| |
| if (getTableDesc()->getNATable()->getClusteringIndex()->hasSingleColVarcharKey()) |
| hbasescan_tdb->setKeyInVCformat(TRUE); |
| } |
| |
| if ((CmpCommon::getDefault(HBASE_SQL_IUD_SEMANTICS) == DF_ON) && |
| (NOT noCheck())) |
| hbasescan_tdb->setHbaseSqlIUD(TRUE); |
| |
| if (getTableDesc()->getNATable()->isEnabledForDDLQI()) |
| generator->objectUids().insert( |
| getTableDesc()->getNATable()->objectUid().get_value()); |
| } |
| |
| if (keyInfo && getSearchKey() && getSearchKey()->isUnique()) |
| hbasescan_tdb->setUniqueKeyInfo(TRUE); |
| |
| if (returnRow) |
| hbasescan_tdb->setReturnRow(TRUE); |
| |
| if (rowsAffected() != GenericUpdate::DO_NOT_COMPUTE_ROWSAFFECTED) |
| hbasescan_tdb->setComputeRowsAffected(TRUE); |
| |
| if (! tdbListOfUniqueRows) |
| { |
| hbasescan_tdb->setSubsetOper(TRUE); |
| } |
| |
| if (canDoCheckAndUpdel()) |
| hbasescan_tdb->setCanDoCheckAndUpdel(TRUE); |
| |
| if (uniqueRowsetHbaseOper()) { |
| hbasescan_tdb->setRowsetOper(TRUE); |
| hbasescan_tdb->setHbaseRowsetVsbbSize(getDefault(HBASE_ROWSET_VSBB_SIZE)); |
| } |
| |
| if (csl()) |
| hbasescan_tdb->setUpdelColnameIsStr(TRUE); |
| |
| if (preCondExpr) |
| hbasescan_tdb->setInsDelPreCondExpr(preCondExpr); |
| |
| if (generator->isTransactionNeeded()) |
| setTransactionRequired(generator); |
| else if (noDTMxn()) |
| hbasescan_tdb->setUseHbaseXn(TRUE); |
| else if (useRegionXn()) |
| hbasescan_tdb->setUseRegionXn(TRUE); |
| |
| if(!generator->explainDisabled()) { |
| generator->setExplainTuple( |
| addExplainInfo(hbasescan_tdb, 0, 0, generator)); |
| } |
| |
| if ((generator->computeStats()) && |
| (generator->collectStatsType() == ComTdb::PERTABLE_STATS |
| || generator->collectStatsType() == ComTdb::OPERATOR_STATS)) |
| { |
| hbasescan_tdb->setPertableStatsTdbId((UInt16)generator-> |
| getPertableStatsTdbId()); |
| } |
| |
| generator->setFoundAnUpdate(TRUE); |
| |
| generator->setCriDesc(givenDesc, Generator::DOWN); |
| generator->setCriDesc(returnedDesc, Generator::UP); |
| generator->setGenObj(this, hbasescan_tdb); |
| |
| return 0; |
| } |
| |
| short HbaseUpdate::codeGen(Generator * generator) |
| { |
| Space * space = generator->getSpace(); |
| ExpGenerator * expGen = generator->getExpGenerator(); |
| |
| // allocate a map table for the retrieved columns |
| // generator->appendAtEnd(); |
| MapTable * last_map_table = generator->getLastMapTable(); |
| |
| // Append a new map table for holding attributes that are only used |
| // in local expressions. This map table will be removed after all |
| // the local expressions are generated. |
| // |
| MapTable *localMapTable = generator->appendAtEnd(); |
| |
| ex_expr *scanExpr = 0; |
| ex_expr *projExpr = 0; |
| ex_expr *convert_expr = NULL; |
| ex_expr *updateExpr = NULL; |
| ex_expr *mergeInsertExpr = NULL; |
| ex_expr *returnUpdateExpr = NULL; |
| ex_expr * keyColValExpr = NULL; |
| ex_expr * insConstraintExpr = NULL; |
| ex_expr * updConstraintExpr = NULL; |
| |
| ex_cri_desc * givenDesc |
| = generator->getCriDesc(Generator::DOWN); |
| |
| ex_cri_desc * returnedDesc = NULL; |
| |
| const Int32 work_atp = 1; |
| const Int32 convertTuppIndex = 2; |
| const Int32 rowIdTuppIndex = 3; |
| const Int32 asciiTuppIndex = 4; |
| const Int32 rowIdAsciiTuppIndex = 5; |
| // const Int32 keyTuppIndex = 6; |
| const Int32 updateTuppIndex = 6; |
| const Int32 mergeInsertTuppIndex = 7; |
| const Int32 mergeInsertRowIdTuppIndex = 8; |
| const Int32 keyColValTuppIndex = 9; |
| Int32 mergeIUDIndicatorTuppIndex = 0; |
| // Do not use 10 as the next available tupp index. Please use 11 next |
| // The 10th tuple index is used by merge statement below. |
| |
| Attributes * iudIndicatorAttr = NULL; |
| |
| ULng32 asciiRowLen = 0; |
| ExpTupleDesc * asciiTupleDesc = 0; |
| |
| ex_cri_desc * work_cri_desc = NULL; |
| work_cri_desc = new(space) ex_cri_desc(11, space); |
| |
| if (getProducedMergeIUDIndicator() != NULL_VALUE_ID) |
| { |
| mergeIUDIndicatorTuppIndex = 10; |
| iudIndicatorAttr = |
| (generator->addMapInfo(getProducedMergeIUDIndicator(), 0))->getAttr(); |
| iudIndicatorAttr->setAtpIndex(mergeIUDIndicatorTuppIndex); |
| iudIndicatorAttr->setAtp(work_atp); |
| ULng32 iudIndicatorLen; |
| ExpTupleDesc::computeOffsets(iudIndicatorAttr, |
| ExpTupleDesc::SQLARK_EXPLODED_FORMAT, |
| iudIndicatorLen); |
| ExpTupleDesc *iudIndicatorTupleDesc = NULL; |
| iudIndicatorTupleDesc = new(generator->getSpace()) |
| ExpTupleDesc(1, // numAttrs |
| &iudIndicatorAttr, // **attrs |
| iudIndicatorLen, // data length |
| ExpTupleDesc::SQLARK_EXPLODED_FORMAT, |
| ExpTupleDesc::LONG_FORMAT, |
| generator->getSpace()); |
| work_cri_desc->setTupleDescriptor(mergeIUDIndicatorTuppIndex, |
| iudIndicatorTupleDesc); |
| |
| } |
| |
| NABoolean returnRow = getReturnRow(this, getIndexDesc()); |
| |
| if (returnRow) |
| // one for fetchedRow, one for updatedRow. |
| returnedDesc = new(space) ex_cri_desc(givenDesc->noTuples() + 2, space); |
| else |
| returnedDesc = new(space) ex_cri_desc(givenDesc->noTuples() + 1, space); |
| |
| const Int16 returnedFetchedTuppIndex = (Int16)(returnedDesc->noTuples()-2); |
| const Int16 returnedUpdatedTuppIndex = (Int16)(returnedFetchedTuppIndex + 1); |
| |
| NABoolean isAlignedFormat = getTableDesc()->getNATable()->isAlignedFormat(getIndexDesc()); |
| |
| ExpTupleDesc::TupleDataFormat asciiRowFormat = |
| (isAlignedFormat ? |
| ExpTupleDesc::SQLMX_ALIGNED_FORMAT : |
| ExpTupleDesc::SQLARK_EXPLODED_FORMAT); |
| |
| ExpTupleDesc::TupleDataFormat hbaseRowFormat = |
| // ExpTupleDesc::SQLMX_ALIGNED_FORMAT; |
| ExpTupleDesc::SQLARK_EXPLODED_FORMAT; |
| |
| ValueIdList asciiVids; |
| ValueIdList executorPredCastVids; |
| ValueIdList convertExprCastVids; |
| |
| NABoolean addDefaultValues = TRUE; |
| NABoolean hasAddedColumns = FALSE; |
| if (getTableDesc()->getNATable()->hasAddedColumn()) |
| hasAddedColumns = TRUE; |
| |
| ValueIdList columnList; |
| ValueIdList srcVIDlist; |
| ValueIdList dupVIDlist; |
| HbaseAccess::sortValues(retColRefSet_, columnList, |
| srcVIDlist, dupVIDlist, |
| (getTableDesc()->getNATable()->getExtendedQualName().getSpecialType() == ExtendedQualName::INDEX_TABLE)); |
| |
| const CollIndex numColumns = columnList.entries(); |
| |
| // build key information |
| keyRangeGen * keyInfo = 0; |
| expGen->buildKeyInfo(&keyInfo, // out |
| generator, |
| getIndexDesc()->getNAFileSet()->getIndexKeyColumns(), |
| getIndexDesc()->getIndexKey(), |
| getBeginKeyPred(), |
| (getSearchKey() && getSearchKey()->isUnique() ? NULL : getEndKeyPred()), |
| getSearchKey(), |
| NULL, //getMdamKeyPtr(), |
| FALSE, |
| ExpTupleDesc::SQLMX_KEY_FORMAT); |
| |
| UInt32 keyColValLen = 0; |
| char * keyColName = NULL; |
| if ((canDoCheckAndUpdel()) && |
| (getSearchKey() && getSearchKey()->isUnique()) && |
| (getBeginKeyPred().entries() > 0)) |
| { |
| expGen->generateKeyColValueExpr( |
| getBeginKeyPred()[0], |
| work_atp, keyColValTuppIndex, |
| keyColValLen, |
| &keyColValExpr); |
| if (! keyColValExpr) |
| canDoCheckAndUpdel() = FALSE; |
| else |
| { |
| ItemExpr * col_node = getBeginKeyPred()[0].getItemExpr()->child(0); |
| HbaseAccess::genColName(generator, col_node, keyColName); |
| } |
| } |
| |
| Queue * tdbListOfUniqueRows = NULL; |
| Queue * tdbListOfRangeRows = NULL; |
| |
| HbaseAccess::genListsOfRows(generator, |
| listOfUpdSubsetRows_, |
| listOfUpdUniqueRows_, |
| tdbListOfRangeRows, |
| tdbListOfUniqueRows); |
| |
| ULng32 convertRowLen = 0; |
| |
| for (CollIndex ii = 0; ii < numColumns; ii++) |
| { |
| ItemExpr * col_node = ((columnList[ii]).getValueDesc())->getItemExpr(); |
| |
| const NAType &givenType = col_node->getValueId().getType(); |
| int res; |
| ItemExpr *asciiValue = NULL; |
| ItemExpr *castValue = NULL; |
| |
| res = HbaseAccess::createAsciiColAndCastExpr2( |
| generator, // for heap |
| col_node, |
| givenType, // [IN] Actual type of HDFS column |
| asciiValue, // [OUT] Returned expression for ascii rep. |
| castValue, // [OUT] Returned expression for binary rep. |
| isAlignedFormat |
| ); |
| |
| GenAssert(res == 1 && asciiValue != NULL && castValue != NULL, |
| "Error building expression tree for cast output value"); |
| asciiValue->synthTypeAndValueId(); |
| asciiValue->bindNode(generator->getBindWA()); |
| asciiVids.insert(asciiValue->getValueId()); |
| |
| castValue->bindNode(generator->getBindWA()); |
| convertExprCastVids.insert(castValue->getValueId()); |
| } // for (ii = 0; ii < numCols; ii++) |
| |
| // Add ascii columns to the MapTable. After this call the MapTable |
| // has ascii values in the work ATP at index asciiTuppIndex. |
| const NAColumnArray * colArray = NULL; |
| unsigned short pcm = expGen->getPCodeMode(); |
| if ((asciiRowFormat == ExpTupleDesc::SQLMX_ALIGNED_FORMAT) && |
| (hasAddedColumns)) |
| { |
| colArray = &getIndexDesc()->getAllColumns(); |
| |
| expGen->setPCodeMode(ex_expr::PCODE_NONE); |
| } |
| |
| expGen->processValIdList( |
| asciiVids, // [IN] ValueIdList |
| asciiRowFormat, // [IN] tuple data format |
| asciiRowLen, // [OUT] tuple length |
| work_atp, // [IN] atp number |
| asciiTuppIndex, // [IN] index into atp |
| &asciiTupleDesc, // [optional OUT] tuple desc |
| ExpTupleDesc::LONG_FORMAT, // [optional IN] desc format |
| 0, |
| NULL, |
| (NAColumnArray*)colArray); |
| |
| work_cri_desc->setTupleDescriptor(asciiTuppIndex, asciiTupleDesc); |
| |
| ExpTupleDesc * tuple_desc = 0; |
| |
| expGen->generateContiguousMoveExpr( |
| convertExprCastVids, // [IN] source ValueIds |
| FALSE, // [IN] add convert nodes? |
| work_atp, // [IN] target atp number |
| convertTuppIndex, // [IN] target tupp index |
| hbaseRowFormat, // [IN] target tuple format |
| convertRowLen, // [OUT] target tuple length |
| &convert_expr, // [OUT] move expression |
| &tuple_desc, // [optional OUT] target tuple desc |
| ExpTupleDesc::LONG_FORMAT, // [optional IN] target desc format |
| NULL, |
| NULL, |
| 0, |
| NULL, |
| FALSE, |
| NULL, |
| FALSE /* doBulkMove */); |
| |
| if ((asciiRowFormat == ExpTupleDesc::SQLMX_ALIGNED_FORMAT) && |
| (hasAddedColumns)) |
| { |
| expGen->setPCodeMode(pcm); |
| } |
| |
| for (CollIndex i = 0; i < columnList.entries(); i++) |
| { |
| ValueId colValId = columnList[i]; |
| ValueId castValId = convertExprCastVids[i]; |
| |
| Attributes * colAttr = (generator->addMapInfo(colValId, 0))->getAttr(); |
| Attributes * castAttr = (generator->getMapInfo(castValId))->getAttr(); |
| |
| colAttr->copyLocationAttrs(castAttr); |
| } // for |
| |
| if (getScanIndexDesc() != NULL) |
| { |
| for (CollIndex i = 0; i < getScanIndexDesc()->getIndexColumns().entries(); i++) |
| { |
| ValueId scanIndexDescVID = getScanIndexDesc()->getIndexColumns()[i]; |
| const ValueId indexDescVID = getIndexDesc()->getIndexColumns()[i]; |
| |
| CollIndex pos = 0; |
| |
| pos = columnList.index(indexDescVID); |
| if (pos != NULL_COLL_INDEX) |
| { |
| Attributes * colAttr = (generator->addMapInfo(scanIndexDescVID, 0))->getAttr(); |
| |
| ValueId castValId = convertExprCastVids[pos]; |
| Attributes * castAttr = (generator->getMapInfo(castValId))->getAttr(); |
| |
| colAttr->copyLocationAttrs(castAttr); |
| } // if |
| else |
| { |
| pos = columnList.index(scanIndexDescVID); |
| if (pos != NULL_COLL_INDEX) |
| { |
| Attributes * colAttr = (generator->addMapInfo(indexDescVID, 0))->getAttr(); |
| |
| ValueId castValId = convertExprCastVids[pos]; |
| Attributes * castAttr = (generator->getMapInfo(castValId))->getAttr(); |
| |
| colAttr->copyLocationAttrs(castAttr); |
| } // if |
| } // else |
| |
| } // for |
| } // getScanIndexDesc != NULL |
| |
| // assign location attributes to dup vids that were returned earlier. |
| for (CollIndex i = 0; i < srcVIDlist.entries(); i++) |
| { |
| ValueId srcValId = srcVIDlist[i]; |
| ValueId dupValId = dupVIDlist[i]; |
| |
| Attributes * srcAttr = (generator->getMapInfo(srcValId))->getAttr(); |
| Attributes * dupAttr = (generator->addMapInfo(dupValId, 0))->getAttr(); |
| |
| dupAttr->copyLocationAttrs(srcAttr); |
| } // for |
| |
| if (addDefaultValues) |
| { |
| expGen->addDefaultValues(columnList, |
| getIndexDesc()->getAllColumns(), |
| tuple_desc, |
| TRUE); |
| |
| if (asciiRowFormat == ExpTupleDesc::SQLMX_ALIGNED_FORMAT) |
| { |
| expGen->addDefaultValues(columnList, |
| getIndexDesc()->getAllColumns(), |
| asciiTupleDesc, |
| TRUE); |
| } |
| else |
| { |
| // copy default values from convertTupleDesc to asciiTupleDesc |
| expGen->copyDefaultValues(asciiTupleDesc, tuple_desc); |
| } |
| } |
| |
| // generate explain selection expression, if present |
| // if ((NOT (getTableDesc()->getNATable()->getExtendedQualName().getSpecialType() == ExtendedQualName::INDEX_TABLE)) && |
| // (! executorPred().isEmpty())) |
| if (! executorPred().isEmpty()) |
| { |
| ItemExpr * newPredTree = executorPred().rebuildExprTree(ITM_AND,TRUE,TRUE); |
| expGen->generateExpr(newPredTree->getValueId(), ex_expr::exp_SCAN_PRED, |
| &scanExpr); |
| } |
| |
| ex_expr * mergeInsertRowIdExpr = NULL; |
| ULng32 mergeInsertRowIdLen = 0; |
| ExpTupleDesc *updatedRowTupleDesc = 0; |
| ULng32 updateRowLen = 0; |
| Queue * listOfUpdatedColNames = NULL; |
| genHbaseUpdOrInsertExpr(generator, |
| FALSE, |
| newRecExprArray(), updateTuppIndex, |
| &updateExpr, updateRowLen, |
| &updatedRowTupleDesc, |
| listOfUpdatedColNames, |
| NULL, mergeInsertRowIdLen, 0, |
| getIndexDesc()); |
| |
| work_cri_desc->setTupleDescriptor(updateTuppIndex, updatedRowTupleDesc); |
| |
| ExpTupleDesc *mergedRowTupleDesc = 0; |
| ULng32 mergeInsertRowLen = 0; |
| Queue * listOfMergedColNames = NULL; |
| if ((isMerge()) && |
| (mergeInsertRecExprArray().entries() > 0)) |
| { |
| genHbaseUpdOrInsertExpr(generator, |
| TRUE, |
| mergeInsertRecExprArray(), mergeInsertTuppIndex, |
| &mergeInsertExpr, mergeInsertRowLen, |
| &mergedRowTupleDesc, |
| listOfMergedColNames, |
| &mergeInsertRowIdExpr, mergeInsertRowIdLen, |
| mergeInsertRowIdTuppIndex, |
| getIndexDesc()); |
| |
| work_cri_desc->setTupleDescriptor(mergeInsertTuppIndex, mergedRowTupleDesc); |
| } |
| |
| ULng32 rowIdAsciiRowLen = 0; |
| ExpTupleDesc * rowIdAsciiTupleDesc = 0; |
| ex_expr * rowIdExpr = NULL; |
| ULng32 rowIdLength = 0; |
| if (getTableDesc()->getNATable()->isSeabaseTable()) |
| { |
| // dont encode keys for hbase mapped tables since these tables |
| // could be populated from outside of traf. |
| NABoolean encodeKeys = TRUE; |
| if (getTableDesc()->getNATable()->isHbaseMapTable()) |
| encodeKeys = FALSE; |
| |
| HbaseAccess::genRowIdExpr(generator, |
| getIndexDesc()->getNAFileSet()->getIndexKeyColumns(), |
| getHbaseSearchKeys(), |
| work_cri_desc, work_atp, |
| rowIdAsciiTuppIndex, rowIdTuppIndex, |
| rowIdAsciiRowLen, rowIdAsciiTupleDesc, |
| rowIdLength, |
| rowIdExpr, |
| encodeKeys); |
| } |
| else |
| { |
| HbaseAccess::genRowIdExprForNonSQ(generator, |
| getIndexDesc()->getNAFileSet()->getIndexKeyColumns(), |
| getHbaseSearchKeys(), |
| work_cri_desc, work_atp, |
| rowIdAsciiTuppIndex, rowIdTuppIndex, |
| rowIdAsciiRowLen, rowIdAsciiTupleDesc, |
| rowIdLength, |
| rowIdExpr); |
| } |
| |
| Queue * listOfFetchedColNames = NULL; |
| |
| if (isAlignedFormat) |
| { |
| listOfFetchedColNames = new(space) Queue(space); |
| |
| NAString cnInList(SEABASE_DEFAULT_COL_FAMILY); |
| cnInList += ":"; |
| unsigned char c = 1; |
| cnInList.append((char*)&c, 1); |
| short len = cnInList.length(); |
| cnInList.prepend((char*)&len, sizeof(short)); |
| |
| char * colNameInList = |
| space->AllocateAndCopyToAlignedSpace(cnInList, 0); |
| |
| listOfFetchedColNames->insert(colNameInList); |
| } |
| else |
| { |
| HbaseAccess::genListOfColNames(generator, |
| getIndexDesc(), |
| columnList, |
| listOfFetchedColNames); |
| } |
| |
| ex_expr * mergeUpdScanExpr = NULL; |
| if (isMerge() && !mergeUpdatePred().isEmpty()) |
| { |
| // Generate expression to evaluate any merge update predicate on the |
| // fetched row |
| ItemExpr* updPredTree = |
| mergeUpdatePred().rebuildExprTree(ITM_AND,TRUE,TRUE); |
| expGen->generateExpr(updPredTree->getValueId(), |
| ex_expr::exp_SCAN_PRED, |
| &mergeUpdScanExpr); |
| } |
| else if (getIndexDesc()->isClusteringIndex() && getCheckConstraints().entries()) |
| { |
| // Generate the update and insert constraint check expressions |
| |
| // The constraint expressions at this time refer to the source values |
| // of the columns. We want to evaluate the constraints aganst the target |
| // values, though. So, we collect the source column ValueIds here so |
| // we can map them to the appropriate target, which is dependent on |
| // which constraint expression we are generating. |
| |
| ValueId constraintId; |
| ValueIdSet constraintColumns; |
| for (CollIndex ci = 0; ci < getCheckConstraints().entries(); ci++) |
| { |
| constraintId = getCheckConstraints()[ci]; |
| constraintId.getItemExpr()->findAll(ITM_INDEXCOLUMN, |
| constraintColumns, // out, has append semantics |
| TRUE, // visitVEGmembers |
| FALSE); // don't visit index descriptors |
| } |
| |
| // Prepare the constraint tree for generation |
| |
| ItemExpr *constrTree = |
| getCheckConstraints().rebuildExprTree(ITM_AND, TRUE, TRUE); |
| |
| if (getTableDesc()->getNATable()->hasSerializedEncodedColumn()) |
| constrTree = generator->addCompDecodeForDerialization(constrTree, isAlignedFormat); |
| |
| // Generate the update constraint expression |
| |
| genUpdConstraintExpr(generator, |
| constrTree, |
| constraintColumns, |
| newRecExprArray(), |
| &updConstraintExpr /* out */); |
| |
| if ((isMerge()) && (mergeInsertRecExprArray().entries() > 0)) |
| { |
| // Generate the insert constraint expression |
| |
| genUpdConstraintExpr(generator, |
| constrTree, |
| constraintColumns, |
| mergeInsertRecExprArray(), |
| &insConstraintExpr /* out */); |
| } |
| |
| } |
| |
| if ((getTableDesc()->getNATable()->isSeabaseTable()) && |
| (NOT isMerge())) |
| { |
| if ((keyInfo && getSearchKey() && getSearchKey()->isUnique()) || |
| ( tdbListOfUniqueRows)) |
| { |
| // Save node for later use by RelRoot in the case of UPDATE CURRENT OF. |
| generator->updateCurrentOfRel() = (void*)this; |
| } |
| } |
| |
| if (getOptStoi() && getOptStoi()->getStoi()) |
| generator->addSqlTableOpenInfo(getOptStoi()->getStoi()); |
| |
| LateNameInfo* lateNameInfo = new(generator->wHeap()) LateNameInfo(); |
| char * compileTimeAnsiName = (char*)getOptStoi()->getStoi()->ansiName(); |
| |
| lateNameInfo->setCompileTimeName(compileTimeAnsiName, space); |
| lateNameInfo->setLastUsedName(compileTimeAnsiName, space); |
| lateNameInfo->setNameSpace(COM_TABLE_NAME); |
| if (getIndexDesc()->getNAFileSet()->getKeytag() != 0) |
| // is an index. |
| { |
| lateNameInfo->setIndex(TRUE); |
| lateNameInfo->setNameSpace(COM_INDEX_NAME); |
| } |
| generator->addLateNameInfo(lateNameInfo); |
| |
| ex_expr * returnMergeInsertExpr = NULL; |
| ULng32 returnedFetchedRowLen = 0; |
| ULng32 returnedUpdatedRowLen = 0; |
| ULng32 returnedMergeInsertedRowLen = 0; |
| Queue *listOfOmittedColNames = NULL; |
| |
| if (returnRow) |
| { |
| const ValueIdList &fetchedOutputs = |
| ((getScanIndexDesc() != NULL) ? |
| getScanIndexDesc()->getIndexColumns() : |
| getIndexDesc()->getIndexColumns()); |
| |
| // Generate a project expression to move the fetched row from |
| // the fetchedRowAtpIndex in the work Atp to the returnedFetchedAtpIndex |
| // in the return Atp. |
| MapTable * returnedFetchedMapTable = 0; |
| ExpTupleDesc * returnedFetchedTupleDesc = NULL; |
| expGen->generateContiguousMoveExpr |
| (fetchedOutputs, |
| 1, // add conv nodes |
| 0, |
| returnedFetchedTuppIndex, |
| generator->getInternalFormat(), |
| returnedFetchedRowLen, |
| &projExpr, |
| &returnedFetchedTupleDesc, |
| ExpTupleDesc::SHORT_FORMAT, |
| &returnedFetchedMapTable); |
| |
| // assign location attributes to columns referenced in scanIndex and index. |
| // If any of these columns/value_ids are being updated, the updated location |
| // will be assigned later in this code. |
| expGen->assignAtpAndAtpIndex(getScanIndexDesc()->getIndexColumns(), |
| 0, returnedFetchedTuppIndex); |
| expGen->assignAtpAndAtpIndex(getIndexDesc()->getIndexColumns(), |
| 0, returnedFetchedTuppIndex); |
| |
| ValueIdList updatedOutputs; |
| ValueIdSet alreadyDeserialized; |
| |
| if (isMerge()) |
| { |
| BaseColumn *updtCol = NULL, |
| *fetchedCol = NULL; |
| Lng32 updtColNum = -1, |
| fetchedColNum = 0; |
| CollIndex recEntries = newRecExprArray().entries(), |
| colEntries = getIndexDesc()->getIndexColumns().entries(), |
| j = 0; |
| ValueId tgtValueId; |
| for (CollIndex ii = 0; ii < colEntries; ii++) |
| { |
| fetchedCol = |
| (BaseColumn *)(((getIndexDesc()->getIndexColumns())[ii]).getItemExpr()); |
| fetchedColNum = fetchedCol->getColNumber(); |
| |
| updtCol = |
| (updtCol != NULL ? updtCol : |
| (j < recEntries ? |
| (BaseColumn *)(newRecExprArray()[j].getItemExpr()->child(0)->castToItemExpr()) : |
| NULL)); |
| |
| updtColNum = (updtCol ? updtCol->getColNumber() : -1); |
| |
| if (fetchedColNum == updtColNum) |
| { |
| const ItemExpr *assignExpr = newRecExprArray()[j].getItemExpr(); |
| tgtValueId = assignExpr->child(0)->castToItemExpr()->getValueId(); |
| |
| j++; |
| updtCol = NULL; |
| } |
| else |
| { |
| tgtValueId = fetchedCol->getValueId(); |
| alreadyDeserialized += tgtValueId; // if it is necessary to deserialize, that is |
| } |
| |
| updatedOutputs.insert(tgtValueId); |
| } |
| if (getProducedMergeIUDIndicator() != NULL_VALUE_ID) |
| updatedOutputs.insert(getProducedMergeIUDIndicator()); |
| } |
| else |
| { |
| for (CollIndex ii = 0; ii < newRecExprArray().entries(); ii++) |
| { |
| const ItemExpr *assignExpr = newRecExprArray()[ii].getItemExpr(); |
| ValueId tgtValueId = assignExpr->child(0)->castToItemExpr()->getValueId(); |
| |
| updatedOutputs.insert(tgtValueId); |
| } |
| } |
| |
| ValueIdSet outputs = fetchedOutputs; |
| ValueIdSet updatedOutputsSet = updatedOutputs; |
| outputs += updatedOutputsSet; |
| getGroupAttr()->setCharacteristicOutputs(outputs); |
| |
| MapTable * returnedUpdatedMapTable = 0; |
| ExpTupleDesc * returnedUpdatedTupleDesc = NULL; |
| ValueIdList tgtConvValueIdList; |
| |
| if (getTableDesc()->getNATable()->hasSerializedEncodedColumn()) |
| { |
| // if serialized columns are present, then create a new row with |
| // deserialized columns before returning it. |
| expGen->generateDeserializedMoveExpr |
| (updatedOutputs, |
| 0, |
| returnedUpdatedTuppIndex, //projRowTuppIndex, |
| generator->getInternalFormat(), |
| returnedUpdatedRowLen, |
| &returnUpdateExpr, |
| &returnedUpdatedTupleDesc, |
| ExpTupleDesc::SHORT_FORMAT, |
| tgtConvValueIdList, |
| alreadyDeserialized); |
| } |
| else |
| { |
| expGen->generateContiguousMoveExpr |
| (updatedOutputs, |
| 1, // add conv nodes |
| 0, |
| returnedUpdatedTuppIndex, |
| generator->getInternalFormat(), |
| returnedUpdatedRowLen, |
| &returnUpdateExpr, |
| &returnedUpdatedTupleDesc, |
| ExpTupleDesc::SHORT_FORMAT, |
| &returnedUpdatedMapTable, |
| &tgtConvValueIdList); |
| } |
| |
| Attributes * tgtIUDMergeColConvAttr = NULL; |
| const ValueIdList &indexColList = getIndexDesc()->getIndexColumns(); |
| for (CollIndex ii = 0; ii < tgtConvValueIdList.entries(); ii++) |
| { |
| const ValueId &tgtColValueId = updatedOutputs[ii]; |
| const ValueId &tgtColConvValueId = tgtConvValueIdList[ii]; |
| if (updatedOutputs[ii] == getProducedMergeIUDIndicator()) |
| { |
| tgtIUDMergeColConvAttr = |
| (generator->getMapInfo(tgtColConvValueId, 0))->getAttr(); |
| continue; |
| } |
| |
| BaseColumn * bc = (BaseColumn*)tgtColValueId.getItemExpr(); |
| const ValueId &indexColValueId = indexColList[bc->getColNumber()]; |
| |
| Attributes * tgtColConvAttr = (generator->getMapInfo(tgtColConvValueId, 0))->getAttr(); |
| Attributes * indexColAttr = (generator->addMapInfo(indexColValueId, 0))->getAttr(); |
| |
| indexColAttr->copyLocationAttrs(tgtColConvAttr); |
| } |
| |
| // Set up the returned tuple descriptor for the updated tuple. |
| // |
| returnedDesc->setTupleDescriptor(returnedFetchedTuppIndex, |
| returnedFetchedTupleDesc); |
| returnedDesc->setTupleDescriptor(returnedUpdatedTuppIndex, |
| returnedUpdatedTupleDesc); |
| |
| /* |
| expGen->assignAtpAndAtpIndex(getScanIndexDesc()->getIndexColumns(), |
| 0, returnedFetchedTuppIndex); |
| */ |
| NAColumn *col; |
| if (isMerge()) |
| { |
| ValueIdList mergeInsertOutputs; |
| |
| for (CollIndex ii = 0; ii < mergeInsertRecExprArray().entries(); ii++) |
| { |
| const ItemExpr *assignExpr = mergeInsertRecExprArray()[ii].getItemExpr(); |
| ValueId tgtValueId = assignExpr->child(0)->castToItemExpr()->getValueId(); |
| col = tgtValueId.getNAColumn( TRUE ); |
| |
| if ((NOT isAlignedFormat) && |
| ((CmpCommon::getDefault(TRAF_UPSERT_MODE) == DF_MERGE) || |
| (CmpCommon::getDefault(TRAF_UPSERT_MODE) == DF_OPTIMAL)) && |
| (((Assign*)assignExpr)->canBeSkipped()) && |
| (NOT col->isSystemColumn()) && |
| (NOT col->isClusteringKey()) && |
| (NOT col->isIdentityColumn())) |
| { |
| if (listOfOmittedColNames == NULL) |
| listOfOmittedColNames = new(space) Queue(space); |
| NAString cnInList; |
| HbaseAccess::createHbaseColId(col, cnInList); |
| |
| char * colNameInList = |
| space->AllocateAndCopyToAlignedSpace(cnInList, 0); |
| |
| listOfOmittedColNames->insert(colNameInList); |
| } |
| mergeInsertOutputs.insert(tgtValueId); |
| } |
| if (getProducedMergeIUDIndicator() != NULL_VALUE_ID) |
| mergeInsertOutputs.insert(getProducedMergeIUDIndicator()); |
| |
| MapTable * returnedMergeInsertedMapTable = 0; |
| ExpTupleDesc * returnedMergeInsertedTupleDesc = NULL; |
| ValueIdList tgtConvValueIdList; |
| |
| if (getTableDesc()->getNATable()->hasSerializedEncodedColumn()) |
| { |
| // if serialized columns are present, then create a new row with |
| // deserialized columns before returning it. |
| expGen->generateDeserializedMoveExpr |
| (mergeInsertOutputs, |
| 0, |
| returnedUpdatedTuppIndex, |
| generator->getInternalFormat(), |
| returnedMergeInsertedRowLen, |
| &returnMergeInsertExpr, |
| &returnedMergeInsertedTupleDesc, |
| ExpTupleDesc::SHORT_FORMAT, |
| tgtConvValueIdList, |
| alreadyDeserialized); |
| } |
| else |
| { |
| expGen->generateContiguousMoveExpr |
| (mergeInsertOutputs, |
| 1, // add conv nodes |
| 0, |
| returnedUpdatedTuppIndex, |
| generator->getInternalFormat(), |
| returnedMergeInsertedRowLen, |
| &returnMergeInsertExpr, |
| &returnedMergeInsertedTupleDesc, |
| ExpTupleDesc::SHORT_FORMAT, |
| &returnedMergeInsertedMapTable, |
| &tgtConvValueIdList); |
| } |
| if (getProducedMergeIUDIndicator() != NULL_VALUE_ID) |
| iudIndicatorAttr->copyLocationAttrs(tgtIUDMergeColConvAttr); |
| } |
| } |
| |
| Cardinality expectedRows = (Cardinality) getEstRowsUsed().getValue(); |
| ULng32 buffersize = getDefault(GEN_DPSO_BUFFER_SIZE); |
| buffersize = MAXOF(3*convertRowLen, buffersize); |
| |
| queue_index upqueuelength = (queue_index)getDefault(GEN_DPSO_SIZE_UP); |
| queue_index downqueuelength = (queue_index)getDefault(GEN_DPSO_SIZE_DOWN); |
| Int32 numBuffers = getDefault(GEN_DPUO_NUM_BUFFERS); |
| |
| char * tablename = NULL; |
| if ((getTableDesc()->getNATable()->isHbaseRowTable()) || |
| (getTableDesc()->getNATable()->isHbaseCellTable())) |
| { |
| if (getIndexDesc() && getIndexDesc()->getNAFileSet()) |
| tablename = space->AllocateAndCopyToAlignedSpace(GenGetQualifiedName(getIndexDesc()->getNAFileSet()->getFileSetName().getObjectName()), 0); |
| } |
| else |
| { |
| if (getIndexDesc() && getIndexDesc()->getNAFileSet()) |
| tablename = space->AllocateAndCopyToAlignedSpace(GenGetQualifiedName(getIndexDesc()->getNAFileSet()->getFileSetName()), 0); |
| } |
| |
| if (! tablename) |
| tablename = |
| space->AllocateAndCopyToAlignedSpace( |
| GenGetQualifiedName(getTableName()), 0); |
| |
| NAString serverNAS = ActiveSchemaDB()->getDefaults().getValue(HBASE_SERVER); |
| NAString zkPortNAS = ActiveSchemaDB()->getDefaults().getValue(HBASE_ZOOKEEPER_PORT); |
| char * server = space->allocateAlignedSpace(serverNAS.length() + 1); |
| strcpy(server, serverNAS.data()); |
| char * zkPort = space->allocateAlignedSpace(zkPortNAS.length() + 1); |
| strcpy(zkPort, zkPortNAS.data()); |
| |
| ComTdbHbaseAccess::HbasePerfAttributes * hbpa = |
| new(space) ComTdbHbaseAccess::HbasePerfAttributes(); |
| if (CmpCommon::getDefault(HBASE_CACHE_BLOCKS) != DF_OFF) |
| hbpa->setCacheBlocks(TRUE); |
| // estrowsaccessed is 0 for now, so cache size will be set to minimum |
| generator->setHBaseNumCacheRows(getEstRowsAccessed().getValue(), hbpa,asciiRowLen) ; |
| |
| |
| // create hdfsscan_tdb |
| ComTdbHbaseAccess *hbasescan_tdb = new(space) |
| ComTdbHbaseAccess( |
| (isMerge() ? ComTdbHbaseAccess::MERGE_ : ComTdbHbaseAccess::UPDATE_), |
| tablename, |
| |
| convert_expr, |
| scanExpr, |
| rowIdExpr, |
| updateExpr, |
| mergeInsertExpr, |
| mergeInsertRowIdExpr, |
| mergeUpdScanExpr, |
| projExpr, |
| returnUpdateExpr, |
| returnMergeInsertExpr, |
| NULL, // encodedKeyExpr |
| keyColValExpr, |
| NULL, // hbaseFilterValExpr |
| |
| asciiRowLen, |
| convertRowLen, |
| updateRowLen, |
| mergeInsertRowLen, |
| returnedFetchedRowLen, |
| returnedUpdatedRowLen, |
| |
| ((rowIdLength > 0) ? rowIdLength : mergeInsertRowIdLen), |
| convertRowLen, |
| rowIdAsciiRowLen, |
| (keyInfo ? keyInfo->getKeyLength() : 0), |
| keyColValLen, |
| 0, // hbaseFilterValRowLen |
| |
| asciiTuppIndex, |
| convertTuppIndex, |
| updateTuppIndex, |
| mergeInsertTuppIndex, |
| mergeInsertRowIdTuppIndex, |
| mergeIUDIndicatorTuppIndex, |
| returnedFetchedTuppIndex, |
| returnedUpdatedTuppIndex, |
| |
| rowIdTuppIndex, |
| returnedDesc->noTuples()-1, |
| rowIdAsciiTuppIndex, |
| 0, // keyTuppIndex, |
| keyColValTuppIndex, |
| 0, // hbaseFilterValTuppIndex |
| |
| 0, // hbaseTimestamp |
| 0, // hbaseVersion |
| |
| tdbListOfRangeRows, |
| tdbListOfUniqueRows, |
| listOfFetchedColNames, |
| listOfUpdatedColNames, |
| listOfMergedColNames, |
| |
| keyInfo, |
| keyColName, |
| |
| work_cri_desc, |
| givenDesc, |
| returnedDesc, |
| downqueuelength, |
| upqueuelength, |
| expectedRows, |
| numBuffers, |
| buffersize, |
| |
| server, |
| zkPort, |
| hbpa |
| ); |
| |
| generator->initTdbFields(hbasescan_tdb); |
| hbasescan_tdb->setListOfOmittedColNames(listOfOmittedColNames); |
| |
| if (getTableDesc()->getNATable()->isHbaseRowTable()) |
| hbasescan_tdb->setRowwiseFormat(TRUE); |
| |
| if (updConstraintExpr) |
| hbasescan_tdb->setUpdConstraintExpr(updConstraintExpr); |
| |
| if (insConstraintExpr) |
| hbasescan_tdb->setInsConstraintExpr(insConstraintExpr); |
| |
| if (getTableDesc()->getNATable()->isSeabaseTable()) |
| { |
| hbasescan_tdb->setSQHbaseTable(TRUE); |
| |
| if (isAlignedFormat) |
| hbasescan_tdb->setAlignedFormat(TRUE); |
| |
| if (CmpCommon::getDefault(HBASE_SQL_IUD_SEMANTICS) == DF_ON) |
| hbasescan_tdb->setHbaseSqlIUD(TRUE); |
| |
| if (getTableDesc()->getNATable()->isEnabledForDDLQI()) |
| generator->objectUids().insert( |
| getTableDesc()->getNATable()->objectUid().get_value()); |
| } |
| |
| if (keyInfo && getSearchKey() && getSearchKey()->isUnique()) |
| hbasescan_tdb->setUniqueKeyInfo(TRUE); |
| |
| if (returnRow) |
| hbasescan_tdb->setReturnRow(TRUE); |
| |
| if (rowsAffected() != GenericUpdate::DO_NOT_COMPUTE_ROWSAFFECTED) |
| hbasescan_tdb->setComputeRowsAffected(TRUE); |
| |
| if (! tdbListOfUniqueRows) |
| { |
| hbasescan_tdb->setSubsetOper(TRUE); |
| } |
| |
| if (uniqueRowsetHbaseOper()) { |
| hbasescan_tdb->setRowsetOper(TRUE); |
| hbasescan_tdb->setHbaseRowsetVsbbSize(getDefault(HBASE_ROWSET_VSBB_SIZE)); |
| } |
| |
| if (canDoCheckAndUpdel()) |
| hbasescan_tdb->setCanDoCheckAndUpdel(TRUE); |
| |
| if (generator->isTransactionNeeded()) |
| setTransactionRequired(generator); |
| else if (noDTMxn()) |
| hbasescan_tdb->setUseHbaseXn(TRUE); |
| else if (useRegionXn()) |
| hbasescan_tdb->setUseRegionXn(TRUE); |
| |
| if(!generator->explainDisabled()) { |
| generator->setExplainTuple( |
| addExplainInfo(hbasescan_tdb, 0, 0, generator)); |
| } |
| |
| if ((generator->computeStats()) && |
| (generator->collectStatsType() == ComTdb::PERTABLE_STATS |
| || generator->collectStatsType() == ComTdb::OPERATOR_STATS)) |
| { |
| hbasescan_tdb->setPertableStatsTdbId((UInt16)generator-> |
| getPertableStatsTdbId()); |
| } |
| |
| generator->setFoundAnUpdate(TRUE); |
| |
| generator->setCriDesc(givenDesc, Generator::DOWN); |
| generator->setCriDesc(returnedDesc, Generator::UP); |
| generator->setGenObj(this, hbasescan_tdb); |
| |
| return 0; |
| } |
| |
| bool compHBaseQualif ( NAString a , NAString b) |
| { |
| char * a_str = (char*)(a.data()); |
| char * b_str = (char*)(b.data()); |
| |
| return (strcmp (&(a_str[sizeof(short) + sizeof(UInt32)]), &(b_str[sizeof(short)+ sizeof(UInt32)]))<0); |
| }; |
| |
| extern Int64 getDefaultSlidingSampleSize(Int64 tblRowCount); |
| extern Int64 getDefaultSampleSize(Int64 tblRowCount); |
| |
| short HbaseInsert::codeGen(Generator *generator) |
| { |
| Space * space = generator->getSpace(); |
| ExpGenerator * expGen = generator->getExpGenerator(); |
| |
| // allocate a map table for the retrieved columns |
| MapTable * last_map_table = generator->getLastMapTable(); |
| NABoolean inlinedActions = FALSE; |
| if ((getInliningInfo().hasInlinedActions()) || |
| (getInliningInfo().isEffectiveGU())) |
| inlinedActions = TRUE; |
| |
| NABoolean returnRow = getReturnRow(this, getIndexDesc()); |
| if (getIsTrafLoadPrep() || (isUpsert() && inlinedActions)) |
| returnRow = isReturnRow(); |
| |
| ex_cri_desc * givenDesc = generator->getCriDesc(Generator::DOWN); |
| ex_cri_desc * returnedDesc = givenDesc; |
| |
| if (returnRow) |
| returnedDesc = new(space) ex_cri_desc(givenDesc->noTuples() + 1, space); |
| |
| const Int32 returnRowTuppIndex = returnedDesc->noTuples() - 1; |
| |
| const Int32 work_atp = 1; |
| ex_cri_desc * workCriDesc = NULL; |
| const UInt16 insertTuppIndex = 2; |
| const UInt16 rowIdTuppIndex = 3; |
| const UInt16 loggingTuppIndex = 4; |
| const UInt16 projRowTuppIndex = 5; |
| workCriDesc = new(space) ex_cri_desc(6, space); |
| |
| ULng32 loggingRowLen = 0; |
| |
| NABoolean hasAddedColumns = FALSE; |
| if (getTableDesc()->getNATable()->hasAddedColumn()) |
| hasAddedColumns = TRUE; |
| |
| ValueIdList insertVIDList; |
| ValueIdList keyVIDList; |
| NAColumnArray colArray; |
| NAColumn *col; |
| |
| ValueIdList returnRowVIDList; |
| Queue *listOfOmittedColNames = NULL; |
| |
| NABoolean isAlignedFormat = getTableDesc()->getNATable()->isAlignedFormat(getIndexDesc()); |
| NABoolean isHbaseMapFormat = getTableDesc()->getNATable()->isHbaseMapTable(); |
| Int16 colIndexOfPK1 = -1; |
| |
| for (CollIndex ii = 0; ii < newRecExprArray().entries(); ii++) |
| { |
| const ItemExpr *assignExpr = newRecExprArray()[ii].getItemExpr(); |
| ValueId tgtValueId = assignExpr->child(0)->castToItemExpr()->getValueId(); |
| ValueId srcValueId = assignExpr->child(1)->castToItemExpr()->getValueId(); |
| |
| col = tgtValueId.getNAColumn( TRUE ); |
| if ((NOT isAlignedFormat) && |
| ((CmpCommon::getDefault(TRAF_UPSERT_MODE) == DF_MERGE) || |
| (CmpCommon::getDefault(TRAF_UPSERT_MODE) == DF_OPTIMAL)) && |
| (((Assign*)assignExpr)->canBeSkipped()) && |
| (NOT col->isSystemColumn()) && |
| (NOT col->isClusteringKey()) && |
| (NOT col->isIdentityColumn())) |
| { |
| if (listOfOmittedColNames == NULL) |
| listOfOmittedColNames = new(space) Queue(space); |
| NAString cnInList; |
| HbaseAccess::createHbaseColId(col, cnInList); |
| |
| char * colNameInList = |
| space->AllocateAndCopyToAlignedSpace(cnInList, 0); |
| listOfOmittedColNames->insert(colNameInList); |
| } |
| else |
| { |
| if (col->isClusteringKey() && !isAlignedFormat && colIndexOfPK1 == -1) |
| colIndexOfPK1 = (listOfOmittedColNames == NULL) ? ii : |
| ii - listOfOmittedColNames->entries(); |
| } |
| colArray.insert( col ); |
| |
| if (returnRow) |
| returnRowVIDList.insert(tgtValueId); |
| |
| ItemExpr * child1Expr = assignExpr->child(1); |
| const NAType &givenType = tgtValueId.getType(); |
| |
| ItemExpr * ie = new(generator->wHeap()) |
| Cast(child1Expr, &givenType); |
| |
| if ((isHbaseMapFormat) && |
| (getTableDesc()->getNATable()->isHbaseDataFormatString()) && |
| (NOT (DFS2REC::isAnyCharacter(givenType.getFSDatatype())))) |
| { |
| Lng32 cvl = givenType.getDisplayLength(); |
| |
| NAType * asciiType = |
| new (generator->wHeap()) SQLVarChar(generator->wHeap(), cvl, givenType.supportsSQLnull()); |
| ie = new(generator->wHeap()) Cast(ie, asciiType); |
| } |
| |
| if ((NOT isAlignedFormat) && HbaseAccess::isEncodingNeededForSerialization |
| (assignExpr->child(0)->castToItemExpr())) |
| { |
| ie = new(generator->wHeap()) CompEncode |
| (ie, FALSE, -1, CollationInfo::Sort, TRUE); |
| } |
| |
| ie->bindNode(generator->getBindWA()); |
| if (generator->getBindWA()->errStatus()) |
| { |
| GenAssert(0,"bindNode failed"); |
| } |
| insertVIDList.insert(ie->getValueId()); |
| } |
| |
| const NATable *naTable = getTableDesc()->getNATable(); |
| |
| ULng32 insertRowLen = 0; |
| ExpTupleDesc * tupleDesc = 0; |
| ExpTupleDesc::TupleDataFormat tupleFormat; |
| |
| if (isAlignedFormat) |
| tupleFormat = ExpTupleDesc::SQLMX_ALIGNED_FORMAT; |
| else |
| tupleFormat = ExpTupleDesc::SQLARK_EXPLODED_FORMAT; |
| |
| ex_expr *insertExpr = 0; |
| expGen->generateContiguousMoveExpr( |
| insertVIDList, |
| 0, // dont add convert nodes |
| 1, // work atp |
| insertTuppIndex, |
| tupleFormat, |
| insertRowLen, |
| &insertExpr, |
| &tupleDesc, |
| ExpTupleDesc::LONG_FORMAT, |
| NULL, |
| NULL, |
| 0, |
| NULL, |
| NULL, |
| &colArray); |
| |
| ex_expr * loggingDataExpr = NULL; |
| ExpTupleDesc * loggingDataTupleDesc = NULL; |
| //--bulk load error logging |
| if (CmpCommon::getDefault(TRAF_LOAD_LOG_ERROR_ROWS) == DF_ON) { |
| CmpContext *cmpContext = generator->currentCmpContext(); |
| |
| ValueIdList loggingDataVids; |
| |
| for (CollIndex i = 0; i < insertVIDList.entries(); i++) |
| { |
| ItemExpr &inputExpr = *(insertVIDList[i].getItemExpr()); |
| const NAType &formalType = insertVIDList[i].getType(); |
| ItemExpr *lmExpr = NULL; |
| ItemExpr *lmExpr2 = NULL; |
| int res; |
| |
| lmExpr = &inputExpr; |
| res = CreateAllCharsExpr(formalType, // [IN] Child output type |
| *lmExpr, // [IN] Actual input value |
| cmpContext, // [IN] Compilation context |
| lmExpr2 // [OUT] Returned expression |
| ); |
| GenAssert(res == 0 && lmExpr != NULL, |
| "Error building expression tree for LM child Input value"); |
| if (lmExpr2) |
| { |
| lmExpr2->bindNode(generator->getBindWA()); |
| loggingDataVids.insert(lmExpr2->getValueId()); |
| } |
| } // for (i = 0; i < insertVIDList.entries(); i++) |
| |
| if (loggingDataVids.entries()>0) |
| { |
| expGen->generateContiguousMoveExpr ( |
| loggingDataVids, // [IN] source ValueIds |
| FALSE, // [IN] add convert nodes? |
| 1, // [IN] target atp number (work atp 1) |
| loggingTuppIndex, // [IN] target tupp index |
| // The target format should be exploded format always because the column delimiter |
| // added during execution assumes exploded format |
| ExpTupleDesc::SQLARK_EXPLODED_FORMAT, // [IN] target tuple data format |
| loggingRowLen, // [OUT] target tuple length |
| &loggingDataExpr, // [OUT] move expression |
| &loggingDataTupleDesc, // [optional OUT] target tuple desc |
| ExpTupleDesc::LONG_FORMAT // [optional IN] target desc format |
| ); |
| |
| } |
| // Add the tuple descriptor for request values to the work ATP |
| workCriDesc->setTupleDescriptor(loggingTuppIndex, loggingDataTupleDesc); |
| } |
| //////////// |
| // If constraints are present, generate constraint expression. |
| // Only works for base tables because the constraint information is |
| // stored with the table descriptor which doesn't exist for indexes. |
| // |
| ex_expr * insConstraintExpr = NULL; |
| Queue * listOfUpdatedColNames = NULL; |
| Lng32 keyAttrPos = -1; |
| ex_expr * rowIdExpr = NULL; |
| ULng32 rowIdLen = 0; |
| |
| const ValueIdList &indexVIDlist = getIndexDesc()->getIndexColumns(); |
| for (CollIndex ii = 0; ii < newRecExprArray().entries(); ii++) |
| { |
| const ItemExpr *assignExpr = newRecExprArray()[ii].getItemExpr(); |
| const ValueId &tgtValueId = assignExpr->child(0)->castToItemExpr()->getValueId(); |
| const ValueId &indexValueId = indexVIDlist[ii]; |
| col = tgtValueId.getNAColumn( TRUE ); |
| ValueId &srcValId = insertVIDList[ii]; |
| |
| Attributes * colAttr = (generator->addMapInfo(tgtValueId, 0))->getAttr(); |
| Attributes * indexAttr = (generator->addMapInfo(indexValueId, 0))->getAttr(); |
| Attributes * castAttr = (generator->getMapInfo(srcValId, 0))->getAttr(); |
| |
| colAttr->copyLocationAttrs(castAttr); |
| indexAttr->copyLocationAttrs(castAttr); |
| } |
| |
| ex_expr* preCondExpr = NULL; |
| if (! getPrecondition().isEmpty()) |
| { |
| ItemExpr * preCondTree = getPrecondition().rebuildExprTree(ITM_AND, |
| TRUE,TRUE); |
| expGen->generateExpr(preCondTree->getValueId(), ex_expr::exp_SCAN_PRED, |
| &preCondExpr); |
| } |
| |
| ULng32 f; |
| expGen->generateKeyEncodeExpr( |
| getIndexDesc(), // describes the columns |
| work_atp, // work Atp |
| rowIdTuppIndex, // work Atp entry |
| ExpTupleDesc::SQLMX_KEY_FORMAT, // Tuple format |
| rowIdLen, // Key length |
| &rowIdExpr, // Encode expression |
| FALSE, |
| f, |
| NULL, |
| TRUE); // handle serialization |
| |
| if (getIndexDesc()->isClusteringIndex() && getCheckConstraints().entries()) |
| { |
| ItemExpr *constrTree = |
| getCheckConstraints().rebuildExprTree(ITM_AND, TRUE, TRUE); |
| |
| if (getTableDesc()->getNATable()->hasSerializedEncodedColumn()) |
| constrTree = generator->addCompDecodeForDerialization(constrTree, isAlignedFormat); |
| |
| expGen->generateExpr(constrTree->getValueId(), ex_expr::exp_SCAN_PRED, |
| &insConstraintExpr); |
| } |
| |
| listOfUpdatedColNames = new(space) Queue(space); |
| std::vector<NAString> columNamesVec; |
| |
| if (NOT isAlignedFormat) |
| { |
| for (CollIndex c = 0; c < colArray.entries(); c++) |
| { |
| const NAColumn * nac = colArray[c]; |
| |
| NAString cnInList; |
| if (isHbaseMapFormat) |
| { |
| cnInList = nac->getHbaseColFam(); |
| cnInList += ":"; |
| cnInList += nac->getColName(); |
| |
| short len = cnInList.length(); |
| cnInList.prepend((char*)&len, sizeof(short)); |
| } |
| else |
| { |
| HbaseAccess::createHbaseColId(nac, cnInList, |
| (getIndexDesc()->getNAFileSet()->getKeytag() != 0)); |
| } |
| |
| if (getIsTrafLoadPrep()) |
| { |
| UInt32 pos = (UInt32)c +1; |
| cnInList.prepend((char*)&pos, sizeof(UInt32)); |
| columNamesVec.push_back(cnInList); |
| } |
| else |
| { |
| char * colNameInList = |
| space->AllocateAndCopyToAlignedSpace(cnInList, 0); |
| |
| listOfUpdatedColNames->insert(colNameInList); |
| } |
| } |
| |
| if (getIsTrafLoadPrep()) |
| { |
| std::sort(columNamesVec.begin(), columNamesVec.end(),compHBaseQualif); |
| for (std::vector<NAString>::iterator it = columNamesVec.begin() ; it != columNamesVec.end(); ++it) |
| { |
| NAString cnInList2 = *it; |
| char * colNameInList = |
| space->AllocateAndCopyToAlignedSpace(cnInList2, 0); |
| |
| listOfUpdatedColNames->insert(colNameInList); |
| } |
| } |
| } |
| else |
| { |
| NAString cnInList(SEABASE_DEFAULT_COL_FAMILY); |
| cnInList += ":"; |
| unsigned char c = 1; |
| cnInList.append((char*)&c, 1); |
| short len = cnInList.length(); |
| cnInList.prepend((char*)&len, sizeof(short)); |
| |
| char * colNameInList = |
| space->AllocateAndCopyToAlignedSpace(cnInList, 0); |
| |
| listOfUpdatedColNames->insert(colNameInList); |
| } |
| |
| // Assign attributes to the ASSIGN nodes of the newRecExpArray() |
| // This is not the same as the generateContiguousMoveExpr() call |
| // above since different valueId's are added to the mapTable. |
| // |
| ULng32 tempInsertRowLen = 0; |
| ExpTupleDesc * tempTupleDesc = 0; |
| expGen->processValIdList(newRecExprArray(), |
| tupleFormat, |
| tempInsertRowLen, |
| 0, |
| returnRowTuppIndex, // insertTuppIndex, |
| &tempTupleDesc, |
| ExpTupleDesc::LONG_FORMAT, |
| 0, NULL, &colArray); |
| |
| // Add the inserted tuple descriptor to the work cri descriptor. |
| // |
| if (workCriDesc) |
| workCriDesc->setTupleDescriptor(insertTuppIndex, tupleDesc); |
| |
| ex_expr * projExpr = NULL; |
| ULng32 projRowLen = 0; |
| ExpTupleDesc * projRowTupleDesc = 0; |
| if (returnRow) |
| { |
| if (getTableDesc()->getNATable()->hasSerializedEncodedColumn()) |
| { |
| ValueIdList deserColVIDList; |
| ValueIdSet dummy; |
| |
| // if serialized columns are present, then create a new row with |
| // deserialized columns before returning it. |
| expGen->generateDeserializedMoveExpr |
| (returnRowVIDList, |
| 0,//work_atp, |
| returnRowTuppIndex, //projRowTuppIndex, |
| generator->getInternalFormat(), |
| projRowLen, |
| &projExpr, |
| &projRowTupleDesc, |
| ExpTupleDesc::SHORT_FORMAT, |
| deserColVIDList, |
| dummy); |
| |
| workCriDesc->setTupleDescriptor(projRowTuppIndex, projRowTupleDesc); |
| |
| // make the location of returnRowVIDlist point to the newly generated values. |
| for (CollIndex ii = 0; ii < returnRowVIDList.entries(); ii++) |
| { |
| const ValueId &retColVID = returnRowVIDList[ii]; |
| const ValueId &deserColVID = deserColVIDList[ii]; |
| |
| Attributes * retColAttr = (generator->getMapInfo(retColVID, 0))->getAttr(); |
| Attributes * deserColAttr = (generator->addMapInfo(deserColVID, 0))->getAttr(); |
| |
| retColAttr->copyLocationAttrs(deserColAttr); |
| } |
| |
| expGen->assignAtpAndAtpIndex(returnRowVIDList, |
| 0, returnRowTuppIndex); |
| } |
| else |
| { |
| expGen->processValIdList(returnRowVIDList, |
| tupleFormat, |
| tempInsertRowLen, |
| 0, |
| returnRowTuppIndex); |
| } |
| } |
| |
| ComTdbDp2Oper::SqlTableType stt = ComTdbDp2Oper::NOOP_; |
| if (getIndexDesc()->getNAFileSet()->isKeySequenced()) |
| { |
| const NAColumnArray & column_array = getIndexDesc()->getAllColumns(); |
| |
| if ((column_array[0]->isSyskeyColumn()) && |
| (column_array[0]->getType()->getNominalSize() >= 4)) { |
| stt = ComTdbDp2Oper::KEY_SEQ_WITH_SYSKEY_; |
| } |
| else |
| stt = ComTdbDp2Oper::KEY_SEQ_; |
| } |
| |
| Cardinality expectedRows = (Cardinality) getEstRowsUsed().getValue(); |
| ULng32 buffersize = getDefault(GEN_DP2I_BUFFER_SIZE); |
| buffersize = MAXOF(3*insertRowLen, buffersize); |
| |
| queue_index upqueuelength = (queue_index)getDefault(GEN_DP2I_SIZE_UP); |
| queue_index downqueuelength = (queue_index)getDefault(GEN_DP2I_SIZE_DOWN); |
| Int32 numBuffers = getDefault(GEN_DP2I_NUM_BUFFERS); |
| |
| if ((getInsertType() == Insert::VSBB_INSERT_USER || // covers upsert |
| getInsertType() == Insert::UPSERT_LOAD) && // covers upsert using load and bulk load |
| generator->oltOptInfo()->multipleRowsReturned()) |
| { |
| downqueuelength = getDefault(HBASE_ROWSET_VSBB_SIZE); |
| queue_index dq = 1; |
| queue_index bits = downqueuelength; |
| while (bits && dq < downqueuelength) { |
| bits = bits >> 1; |
| dq = dq << 1; |
| } |
| downqueuelength = dq; |
| } |
| char * tablename = NULL; |
| if ((getTableDesc()->getNATable()->isHbaseRowTable()) || |
| (getTableDesc()->getNATable()->isHbaseCellTable()) || |
| (getTableName().getQualifiedNameObj().isHbaseMappedName())) |
| { |
| tablename = space->AllocateAndCopyToAlignedSpace(GenGetQualifiedName(getIndexDesc()->getIndexName().getObjectName()), 0); |
| } |
| else |
| { |
| tablename = space->AllocateAndCopyToAlignedSpace( |
| GenGetQualifiedName(getIndexDesc()->getIndexName()), 0); |
| } |
| |
| NAString serverNAS = ActiveSchemaDB()->getDefaults().getValue(HBASE_SERVER); |
| NAString zkPortNAS = ActiveSchemaDB()->getDefaults().getValue(HBASE_ZOOKEEPER_PORT); |
| char * server = space->allocateAlignedSpace(serverNAS.length() + 1); |
| strcpy(server, serverNAS.data()); |
| char * zkPort = space->allocateAlignedSpace(zkPortNAS.length() + 1); |
| strcpy(zkPort, zkPortNAS.data()); |
| |
| ComTdbHbaseAccess::HbasePerfAttributes * hbpa = |
| new(space) ComTdbHbaseAccess::HbasePerfAttributes(); |
| |
| ComTdbHbaseAccess::ComTdbAccessType t; |
| if (isUpsert()) |
| { |
| if (getInsertType() == Insert::UPSERT_LOAD) |
| t = ComTdbHbaseAccess::UPSERT_LOAD_; |
| else |
| t = ComTdbHbaseAccess::UPSERT_; |
| } |
| else |
| t = ComTdbHbaseAccess::INSERT_; |
| |
| // create hdfsscan_tdb |
| ComTdbHbaseAccess *hbasescan_tdb = new(space) |
| ComTdbHbaseAccess( |
| t, |
| tablename, |
| insertExpr, |
| NULL, |
| rowIdExpr, |
| loggingDataExpr, // logging expr |
| NULL, // mergeInsertExpr |
| NULL, // mergeInsertRowIdExpr |
| NULL, // mergeUpdScanExpr |
| NULL, // projExpr |
| projExpr, // returnedUpdatedExpr |
| NULL, // returnMergeUpdateExpr |
| NULL, // encodedKeyExpr, |
| NULL, // keyColValExpr |
| NULL, // hbaseFilterValExpr |
| |
| 0, //asciiRowLen, |
| insertRowLen, |
| loggingRowLen, //loggingRowLen |
| 0, // mergeInsertRowLen |
| 0, // fetchedRowLen |
| projRowLen, // returnedUpdatedRowLen |
| |
| rowIdLen, |
| 0, //outputRowLen, |
| 0, //rowIdAsciiRowLen |
| 0, // keyLen |
| 0, // keyColValLen |
| 0, // hbaseFilterValRowLen |
| |
| 0, //asciiTuppIndex, |
| insertTuppIndex, |
| loggingTuppIndex, //loggingTuppIndex |
| 0, // mergeInsertTuppIndex |
| 0, // mergeInsertRowIdTuppIndex |
| 0, // mergeIUDIndicatorTuppIndex |
| 0, // returnedFetchedTuppIndex |
| projRowTuppIndex, // returnedUpdatedTuppIndex |
| |
| rowIdTuppIndex, |
| returnRowTuppIndex, //returnedDesc->noTuples()-1, |
| 0, // rowIdAsciiTuppIndex |
| 0, // keyTuppIndex |
| 0, // keyColValTuppIndex |
| 0, // hbaseFilterValTuppIndex |
| |
| 0, // hbaseTimestamp |
| 0, // hbaseVersion |
| |
| NULL, |
| NULL, //tdbListOfDelRows, |
| NULL, |
| listOfUpdatedColNames, |
| NULL, |
| |
| NULL, |
| NULL, |
| |
| workCriDesc, |
| givenDesc, |
| returnedDesc, |
| |
| downqueuelength, |
| upqueuelength, |
| expectedRows, |
| numBuffers, |
| buffersize, |
| |
| server, |
| zkPort, |
| hbpa |
| ); |
| |
| generator->initTdbFields(hbasescan_tdb); |
| hbasescan_tdb->setListOfOmittedColNames(listOfOmittedColNames); |
| |
| if ((CmpCommon::getDefault(HBASE_ASYNC_OPERATIONS) == DF_ON) |
| && getInliningInfo().isIMGU()) |
| hbasescan_tdb->setAsyncOperations(TRUE); |
| |
| if (preCondExpr) |
| hbasescan_tdb->setInsDelPreCondExpr(preCondExpr); |
| |
| if (insConstraintExpr) |
| hbasescan_tdb->setInsConstraintExpr(insConstraintExpr); |
| |
| if (getTableDesc()->getNATable()->isSeabaseTable()) |
| { |
| hbasescan_tdb->setSQHbaseTable(TRUE); |
| |
| if (isAlignedFormat) |
| hbasescan_tdb->setAlignedFormat(TRUE); |
| |
| if (isHbaseMapFormat) |
| { |
| hbasescan_tdb->setHbaseMapTable(TRUE); |
| |
| if (getTableDesc()->getNATable()->getClusteringIndex()->hasSingleColVarcharKey()) |
| hbasescan_tdb->setKeyInVCformat(TRUE); |
| } |
| |
| if (CmpCommon::getDefault(HBASE_SQL_IUD_SEMANTICS) == DF_ON) |
| hbasescan_tdb->setHbaseSqlIUD(TRUE); |
| |
| if ((isUpsert()) || |
| (noCheck())) |
| hbasescan_tdb->setHbaseSqlIUD(FALSE); |
| |
| if (((((getInsertType() == Insert::VSBB_INSERT_USER) || isUpsert() )&& |
| generator->oltOptInfo()->multipleRowsReturned())) || |
| (getInsertType() == Insert::UPSERT_LOAD)) |
| { |
| hbasescan_tdb->setVsbbInsert(TRUE); |
| hbasescan_tdb->setHbaseRowsetVsbbSize(getDefault(HBASE_ROWSET_VSBB_SIZE)); |
| // Set the VSBB flag in the RelExpr to display in explain |
| if (! hbasescan_tdb->hbaseSqlIUD()) |
| setVsbbInsert(TRUE); |
| } |
| |
| |
| if (isUpsert() && (getInsertType() == Insert::UPSERT_LOAD)) |
| { |
| // this will cause tupleflow operator to send in an EOD to this upsert |
| // operator. On seeing that, executor will flush the buffers. |
| generator->setVSBBInsert(TRUE); |
| } |
| |
| //setting parametes for hbase bulk load integration |
| hbasescan_tdb->setIsTrafodionLoadPrep(this->getIsTrafLoadPrep()); |
| if (hbasescan_tdb->getIsTrafodionLoadPrep()) |
| { |
| NAString tlpTmpLocationNAS = ActiveSchemaDB()->getDefaults().getValue(TRAF_LOAD_PREP_TMP_LOCATION); |
| char * tlpTmpLocation = space->allocateAlignedSpace(tlpTmpLocationNAS.length() + 1); |
| strcpy(tlpTmpLocation, tlpTmpLocationNAS.data()); |
| hbasescan_tdb->setLoadPrepLocation(tlpTmpLocation); |
| hbasescan_tdb->setNoDuplicates(CmpCommon::getDefault(TRAF_LOAD_PREP_SKIP_DUPLICATES) == DF_OFF); |
| hbasescan_tdb->setMaxHFileSize(CmpCommon::getDefaultLong(TRAF_LOAD_MAX_HFILE_SIZE)); |
| |
| ULng32 loadFlushSizeinKB = getDefault(TRAF_LOAD_FLUSH_SIZE_IN_KB); |
| ULng32 loadFlushSizeinRows = 0; |
| loadFlushSizeinRows = (loadFlushSizeinKB*1024)/hbasescan_tdb->getRowLen() ; |
| // largest flush size, runtime cannot handle higher values |
| // without code change |
| if (loadFlushSizeinRows >= USHRT_MAX/2) |
| loadFlushSizeinRows = ((USHRT_MAX/2)-1); |
| else if (loadFlushSizeinRows < 1) // make sure we don't fall to zero on really long rows |
| loadFlushSizeinRows = 1; |
| hbasescan_tdb->setTrafLoadFlushSize(loadFlushSizeinRows); |
| |
| // For sample file, set the sample location in HDFS and the sampling rate. |
| // Move later, when sampling not limited to bulk loads. |
| if (getCreateUstatSample()) |
| { |
| NAString sampleLocationNAS = ActiveSchemaDB()->getDefaults().getValue(TRAF_SAMPLE_TABLE_LOCATION); |
| char * sampleLocation = space->allocateAlignedSpace(sampleLocationNAS.length() + 1); |
| strcpy(sampleLocation, sampleLocationNAS.data()); |
| hbasescan_tdb->setSampleLocation(sampleLocation); |
| |
| Int64 totalRows = (Int64)(getInputCardinality().getValue()); |
| //printf("*** Incoming cardinality is " PF64 ".\n", totalRows); |
| Int64 sampleRows; |
| if (CmpCommon::getDefault(USTAT_USE_SLIDING_SAMPLE_RATIO) == DF_ON) |
| sampleRows = getDefaultSlidingSampleSize(totalRows); |
| else |
| sampleRows = getDefaultSampleSize(totalRows); |
| Float32 sampleRate = (Float32)sampleRows / (Float32)totalRows; |
| //printf("*** In HbaseInsert::codeGen(): Sample percentage is %.2f.\n", sampleRate); |
| hbasescan_tdb->setSamplingRate(sampleRate); |
| } |
| hbasescan_tdb->setContinueOnError(CmpCommon::getDefault(TRAF_LOAD_CONTINUE_ON_ERROR) == DF_ON); |
| hbasescan_tdb->setLogErrorRows(CmpCommon::getDefault(TRAF_LOAD_LOG_ERROR_ROWS) == DF_ON); |
| hbasescan_tdb->setMaxErrorRows((UInt32)CmpCommon::getDefaultNumeric(TRAF_LOAD_MAX_ERROR_ROWS)); |
| NAString errCountRowIdNAS = CmpCommon::getDefaultString(TRAF_LOAD_ERROR_COUNT_ID); |
| char * errCountRowId = NULL; |
| if (errCountRowIdNAS.length() > 0) |
| { |
| errCountRowId = space->allocateAlignedSpace(errCountRowIdNAS.length() + 1); |
| strcpy(errCountRowId, errCountRowIdNAS.data()); |
| hbasescan_tdb->setErrCountRowId(errCountRowId); |
| } |
| |
| NAString errCountTabNAS = ActiveSchemaDB()->getDefaults().getValue(TRAF_LOAD_ERROR_COUNT_TABLE); |
| char * errCountTab = NULL; |
| if (errCountTabNAS.length() > 0) |
| { |
| errCountTab = space->allocateAlignedSpace(errCountTabNAS.length() + 1); |
| strcpy(errCountTab, errCountTabNAS.data()); |
| hbasescan_tdb->setErrCountTab(errCountTab); |
| } |
| NAString loggingLocNAS = ActiveSchemaDB()->getDefaults().getValue(TRAF_LOAD_ERROR_LOGGING_LOCATION); |
| char * loggingLoc = NULL; |
| if (loggingLocNAS.length() > 0) |
| { |
| loggingLoc = space->allocateAlignedSpace(loggingLocNAS.length() + 1); |
| strcpy(loggingLoc, loggingLocNAS.data()); |
| hbasescan_tdb->setLoggingLocation(loggingLoc); |
| } |
| } |
| |
| // setting parameters for upsert statement// not related to the hbase bulk load intergration |
| NABoolean traf_upsert_adjust_params = |
| (CmpCommon::getDefault(TRAF_UPSERT_ADJUST_PARAMS) == DF_ON); |
| if (traf_upsert_adjust_params) |
| { |
| ULng32 wbSize = getDefault(TRAF_UPSERT_WB_SIZE); |
| NABoolean traf_write_toWAL = |
| (CmpCommon::getDefault(TRAF_UPSERT_WRITE_TO_WAL) == DF_ON); |
| |
| hbasescan_tdb->setTrafWriteToWAL(traf_write_toWAL); |
| |
| hbasescan_tdb->setCanAdjustTrafParams(true); |
| |
| hbasescan_tdb->setWBSize(wbSize); |
| |
| } |
| if (getTableDesc()->getNATable()->isEnabledForDDLQI()) |
| generator->objectUids().insert( |
| getTableDesc()->getNATable()->objectUid().get_value()); |
| |
| if (colIndexOfPK1 >=0 && t == ComTdbHbaseAccess::INSERT_) |
| hbasescan_tdb->setColIndexOfPK1(colIndexOfPK1); |
| } |
| else |
| { |
| if (getTableDesc()->getNATable()->isHbaseRowTable()) //rowwiseHbaseFormat()) |
| hbasescan_tdb->setRowwiseFormat(TRUE); |
| } |
| |
| if (returnRow) |
| hbasescan_tdb->setReturnRow(TRUE); |
| |
| if (rowsAffected() != GenericUpdate::DO_NOT_COMPUTE_ROWSAFFECTED) |
| hbasescan_tdb->setComputeRowsAffected(TRUE); |
| |
| if (stt == ComTdbDp2Oper::KEY_SEQ_WITH_SYSKEY_) |
| hbasescan_tdb->setAddSyskeyTS(TRUE); |
| |
| if (generator->isTransactionNeeded()) |
| setTransactionRequired(generator); |
| else if (noDTMxn()) |
| hbasescan_tdb->setUseHbaseXn(TRUE); |
| else if (useRegionXn()) |
| hbasescan_tdb->setUseRegionXn(TRUE); |
| |
| if(!generator->explainDisabled()) { |
| generator->setExplainTuple( |
| addExplainInfo(hbasescan_tdb, 0, 0, generator)); |
| } |
| |
| if ((generator->computeStats()) && |
| (generator->collectStatsType() == ComTdb::PERTABLE_STATS |
| || generator->collectStatsType() == ComTdb::OPERATOR_STATS)) |
| { |
| hbasescan_tdb->setPertableStatsTdbId((UInt16)generator-> |
| getPertableStatsTdbId()); |
| } |
| |
| generator->setFoundAnUpdate(TRUE); |
| |
| generator->setCriDesc(givenDesc, Generator::DOWN); |
| generator->setCriDesc(returnedDesc, Generator::UP); |
| generator->setGenObj(this, hbasescan_tdb); |
| |
| return 0; |
| } |