blob: 7365632bfb1bfa4e2c1efc40b9b32ad2c97e5196 [file] [log] [blame]
/**********************************************************************
//
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
**************************************************************************
*
* File: RelFastTransport.cpp
* Description: RelExprs related to support FastTransport (Load and Extract)
* Created: 9/29/12
* Language: C++
*
*************************************************************************
*/
#include "CostMethod.h"
#include "AllRelExpr.h"
#include "Globals.h"
#include "HDFSHook.h"
// -----------------------------------------------------------------------
// methods for class RelFastExtract
// -----------------------------------------------------------------------
//! FastExtract::FastExtract Copy Constructor
FastExtract::FastExtract(const FastExtract & other)
: RelExpr(other)
{
targetType_ = other.targetType_;
targetName_ = other.targetName_;
hdfsHostName_ = other.hdfsHostName_;
hdfsPort_ = other.hdfsPort_;
hiveTableDesc_ = other.hiveTableDesc_;
hiveTableName_ = other.hiveTableName_;
delimiter_ = other.delimiter_;
isAppend_ = other.isAppend_;
includeHeader_ = other.includeHeader_;
header_ = other.header_;
cType_ = other.cType_,
nullString_ = other.nullString_,
recordSeparator_ = other.recordSeparator_;
selectList_ = other.selectList_;
isSequenceFile_ = other.isSequenceFile_;
overwriteHiveTable_ = other.overwriteHiveTable_;
isMainQueryOperator_ = other.isMainQueryOperator_;
}
//! FastExtract::~FastExtract Destructor
FastExtract::~FastExtract()
{
}
RelExpr *FastExtract::makeFastExtractTree(
TableDesc *tableDesc,
RelExpr *child,
NABoolean overwriteTable,
NABoolean calledFromBinder,
NABoolean tempTableForCSE,
BindWA *bindWA)
{
RelExpr *result = NULL;
const HHDFSTableStats* hTabStats =
tableDesc->getNATable()->getClusteringIndex()->getHHDFSTableStats();
const char * hiveTablePath;
NAString hostName;
Int32 hdfsPort;
NAString tableDir;
char fldSep[2];
char recSep[2];
memset(fldSep,'\0',2);
memset(recSep,'\0',2);
fldSep[0] = hTabStats->getFieldTerminator();
recSep[0] = hTabStats->getRecordTerminator();
// don't rely on timeouts to invalidate the HDFS stats for the target table,
// make sure that we invalidate them right after compiling this statement,
// at least for this process
((NATable*)(tableDesc->getNATable()))->setClearHDFSStatsAfterStmt(TRUE);
// inserting into tables with multiple partitions is not yet supported
CMPASSERT(hTabStats->entries() == 1);
hiveTablePath = (*hTabStats)[0]->getDirName();
NABoolean splitSuccess = TableDesc::splitHiveLocation(
hiveTablePath,
hostName,
hdfsPort,
tableDir,
CmpCommon::diags(),
hTabStats->getPortOverride());
if (!splitSuccess) {
*CmpCommon::diags() << DgSqlCode(-4224)
<< DgString0(hiveTablePath);
bindWA->setErrStatus();
return NULL;
}
const NABoolean isSequenceFile = hTabStats->isSequenceFile();
FastExtract * unloadRelExpr =
new (bindWA->wHeap()) FastExtract(
child,
new (bindWA->wHeap()) NAString(hiveTablePath, bindWA->wHeap()),
new (bindWA->wHeap()) NAString(hostName, bindWA->wHeap()),
hdfsPort,
tableDesc,
new (bindWA->wHeap()) NAString(
tableDesc->getCorrNameObj().getQualifiedNameObj().getObjectName(),
bindWA->wHeap()),
FastExtract::FILE,
bindWA->wHeap());
unloadRelExpr->setRecordSeparator(recSep);
unloadRelExpr->setDelimiter(fldSep);
unloadRelExpr->setOverwriteHiveTable(overwriteTable);
unloadRelExpr->setSequenceFile(isSequenceFile);
unloadRelExpr->setIsMainQueryOperator(calledFromBinder);
result = unloadRelExpr;
// keeping older Hive Truncate around for time being.
// Once newer method is tested, legacy will be removed.
NABoolean legacyHiveTruncate = FALSE;
char * leg = getenv("TRUNC_LEGACY");
if (leg)
legacyHiveTruncate = TRUE;
if (overwriteTable && legacyHiveTruncate)
{
ExeUtilHiveTruncateLegacy *trunc = new (bindWA->wHeap())
ExeUtilHiveTruncateLegacy(tableDesc->getCorrNameObj(),
NULL,
bindWA->wHeap());
trunc->setNoSecurityCheck(TRUE);
RelExpr * newRelExpr = trunc;
if (tempTableForCSE)
{
trunc->setSuppressModCheck();
// This table gets created at compile time, unlike most
// other tables. It gets dropped when the statement is
// deallocated. Note that there are three problems:
// a) Statement gets never executed
// b) Process exits before deallocating the statement
// c) Statement gets deallocated, then gets executed again
//
// Todo: CSE: Handle these issues.
// Cases a) and b) are handled like volatile tables, there
// is a cleanup mechanism.
// Case c) gets handled by AQR.
trunc->setDropTableOnDealloc();
}
if (calledFromBinder)
//new root to prevent error 4056 when binding
newRelExpr = new (bindWA->wHeap()) RelRoot(newRelExpr);
else
// this node must be bound, even outside the binder,
// to set some values
newRelExpr = newRelExpr->bindNode(bindWA);
Union *blockedUnion = new (bindWA->wHeap()) Union(newRelExpr, result);
blockedUnion->setBlockedUnion();
blockedUnion->setSerialUnion();
result = blockedUnion;
}
if (overwriteTable && (NOT legacyHiveTruncate))
{
NAString hiveName = ComConvertTrafHiveNameToNativeHiveName
(tableDesc->getCorrNameObj().getQualifiedNameObj().getCatalogName(),
tableDesc->getCorrNameObj().getQualifiedNameObj().getSchemaName(),
tableDesc->getCorrNameObj().getQualifiedNameObj().getObjectName());
if (hiveName.isNull())
{
*CmpCommon::diags()
<< DgSqlCode(-3242)
<< DgString0("Invalid Hive name specified.");
bindWA->setErrStatus();
return NULL;
}
NAString hiveTruncQuery("truncate table ");
hiveTruncQuery += hiveName;
ExeUtilHiveTruncate *trunc = new (bindWA->wHeap())
ExeUtilHiveTruncate(tableDesc->getCorrNameObj(),
hiveName,
hiveTruncQuery,
bindWA->wHeap());
trunc->setNoSecurityCheck(TRUE);
RelExpr * newRelExpr = trunc;
if (tempTableForCSE)
{
// This table gets created at compile time, unlike most
// other tables. It gets dropped when the statement is
// deallocated. Note that there are three problems:
// a) Statement gets never executed
// b) Process exits before deallocating the statement
// c) Statement gets deallocated, then gets executed again
//
// Todo: CSE: Handle these issues.
// Cases a) and b) are handled like volatile tables, there
// is a cleanup mechanism.
// Case c) gets handled by AQR.
trunc->setDropTableOnDealloc();
}
if (calledFromBinder)
//new root to prevent error 4056 when binding
newRelExpr = new (bindWA->wHeap()) RelRoot(newRelExpr);
else
// this node must be bound, even outside the binder,
// to set some values
newRelExpr = newRelExpr->bindNode(bindWA);
Union *blockedUnion = new (bindWA->wHeap()) Union(newRelExpr, result);
blockedUnion->setBlockedUnion();
blockedUnion->setSerialUnion();
result = blockedUnion;
}
return result;
}
//! FastExtract::copyTopNode method
RelExpr * FastExtract::copyTopNode(RelExpr *derivedNode,
CollHeap* outHeap)
{
FastExtract *result;
if (derivedNode == NULL)
{
result = new (outHeap) FastExtract(NULL, outHeap);
}
else
result = (FastExtract *) derivedNode;
result->targetType_ = targetType_;
result->targetName_ = targetName_;
result->hdfsHostName_ = hdfsHostName_;
result->hdfsPort_ = hdfsPort_;
result->hiveTableDesc_= hiveTableDesc_;
result->hiveTableName_ = hiveTableName_;
result->delimiter_ = delimiter_;
result->isAppend_ = isAppend_;
result->includeHeader_ = includeHeader_;
result->header_ = header_;
result->cType_ = cType_;
result->nullString_ = nullString_;
result->recordSeparator_ = recordSeparator_ ;
result->selectList_ = selectList_;
result->isSequenceFile_ = isSequenceFile_;
result->overwriteHiveTable_ = overwriteHiveTable_;
result->isMainQueryOperator_ = isMainQueryOperator_;
return RelExpr::copyTopNode(result, outHeap);
}
//! FastExtract::getText method
const NAString FastExtract::getText() const
{
if (isHiveInsert())
{
NAString op(CmpCommon::statementHeap());
op = "hive_insert";
NAString tname(hiveTableName_,CmpCommon::statementHeap());
return op + " " + tname;
}
else
return "UNLOAD";
}
void FastExtract::addLocalExpr(LIST(ExprNode *) &xlist,
LIST(NAString) &llist) const
{
if (NOT selectList_.isEmpty())
{
xlist.insert(selectList_.rebuildExprTree());
llist.insert("select_list");
}
RelExpr::addLocalExpr(xlist,llist);
};
void FastExtract::transformNode(NormWA & normWARef,
ExprGroupId & locationOfPointerToMe)
{
RelExpr::transformNode(normWARef, locationOfPointerToMe);
};
void FastExtract::rewriteNode(NormWA & normWARef)
{
selectList_.normalizeNode(normWARef);
// rewrite group attributes and selection pred
RelExpr::rewriteNode(normWARef);
} ;
RelExpr * FastExtract::normalizeNode(NormWA & normWARef)
{
return RelExpr::normalizeNode(normWARef);
};
void FastExtract::getPotentialOutputValues(ValueIdSet & vs) const
{
vs.clear();
};
void FastExtract::pullUpPreds()
{
// A FastExtract never pulls up predicates from its children.
for (Int32 i = 0; i < getArity(); i++)
child(i)->recomputeOuterReferences();
};
void FastExtract::pushdownCoveredExpr(
const ValueIdSet & outputExprOnOperator,
const ValueIdSet & newExternalInputs,
ValueIdSet& predOnOperator,
const ValueIdSet *
nonPredNonOutputExprOnOperator,
Lng32 childId)
{
ValueIdSet exprOnParent;
if(nonPredNonOutputExprOnOperator)
exprOnParent = *nonPredNonOutputExprOnOperator;
exprOnParent.insertList(getSelectList());
RelExpr::pushdownCoveredExpr(outputExprOnOperator,
newExternalInputs,
predOnOperator,
&exprOnParent,
childId);
};
void FastExtract::synthEstLogProp(const EstLogPropSharedPtr& inputLP)
{
// get child histograms
EstLogPropSharedPtr childrenInputEstLogProp;
CostScalar inputFromParentCard = inputLP->getResultCardinality();
EstLogPropSharedPtr myEstProps(new (HISTHEAP) EstLogProp(*inputLP));
childrenInputEstLogProp = child(0).outputLogProp(inputLP);
CostScalar extractCard = childrenInputEstLogProp->getResultCardinality();
myEstProps->setResultCardinality(extractCard);
// attach histograms to group attributes
getGroupAttr()->addInputOutputLogProp (inputLP, myEstProps);
};
short FastExtract::setOptions(NAList<UnloadOption*> *
fastExtractOptionList,
ComDiagsArea * da)
{
if (!fastExtractOptionList)
return 0;
for (CollIndex i = 0; i < fastExtractOptionList->entries(); i++)
{
UnloadOption * feo = (*fastExtractOptionList)[i];
switch (feo->option_)
{
case UnloadOption::DELIMITER_:
{
if (delimiter_.length() == 0)
delimiter_ = feo->stringVal_;
else
{
*da << DgSqlCode(-4376) << DgString0("DELIMITER");
return 1;
}
}
break;
case UnloadOption::NULL_STRING_:
{
if (nullString_.length() == 0)
nullString_ = feo->stringVal_;
else
{
*da << DgSqlCode(-4376) << DgString0("NULL_STRING");
return 1;
}
nullStringSpec_ = TRUE;
}
break;
case UnloadOption::RECORD_SEP_:
{
if (recordSeparator_.length() == 0)
recordSeparator_ = feo->stringVal_;
else
{
*da << DgSqlCode(-4376) << DgString0("RECORD_SEPARATOR");
return 1;
}
}
break;
case UnloadOption::APPEND_:
{
if (!isAppend_)
isAppend_ = TRUE;
else
{
*da << DgSqlCode(-4376) << DgString0("APPEND");
return 1;
}
}
break;
case UnloadOption::HEADER_:
{
if (includeHeader_)
includeHeader_ = FALSE;
else
{
*da << DgSqlCode(-4376) << DgString0("HEADER");
return 1;
}
}
break;
case UnloadOption::COMPRESSION_:
{
if (cType_ == NONE)
cType_ = (CompressionType) feo->numericVal_;
else
{
*da << DgSqlCode(-4376) << DgString0("COMPRESSION");
return 1;
}
}
break;
default:
return 1;
}
}
return 0;
};
///////////////////////////////////////////////////////////
// ---------------------------------------------------------------------
// comparison, hash, and copy methods
// ---------------------------------------------------------------------
HashValue FastExtract::topHash()
{
HashValue result = RelExpr::topHash();
result ^= getSelectList() ;
return result;
};
NABoolean FastExtract::duplicateMatch(const RelExpr & other) const
{
if (!RelExpr::duplicateMatch(other))
return FALSE;
FastExtract &o = (FastExtract &) other;
if (NOT (getSelectList() == o.getSelectList()))
return FALSE;
return TRUE;
};
// -----------------------------------------------------------------------
// methods for class PhysicalFastExtract
// -----------------------------------------------------------------------
PhysicalProperty* PhysicalFastExtract::synthPhysicalProperty(const Context* myContext,
const Lng32 planNumber,
PlanWorkSpace *pws)
{
PartitioningFunction* myPartFunc = NULL;
const IndexDesc* myIndexDesc = NULL;
// simply propagate the physical property
const PhysicalProperty * const sppOfChild =
myContext->getPhysicalPropertyOfSolutionForChild(0);
myPartFunc = sppOfChild->getPartitioningFunction();
myIndexDesc = sppOfChild->getIndexDesc();
PhysicalProperty * sppForMe =
new(CmpCommon::statementHeap()) PhysicalProperty(
myPartFunc,
EXECUTE_IN_MASTER_AND_ESP,
SOURCE_VIRTUAL_TABLE);
// remove anything that's not covered by the group attributes
sppForMe->enforceCoverageByGroupAttributes (getGroupAttr()) ;
sppForMe->setIndexDesc(myIndexDesc);
return sppForMe ;
};
double PhysicalFastExtract::getEstimatedRunTimeMemoryUsage(Generator *generator, ComTdb * tdb)
{
// The executor attempts to get buffers, each of size 1 MB. This memory
// is not from the SQLMXBufferSpace though, but is got directly from TSE
// through EXECUTOR_DP2_ADD_MEMORY. We may not get 1MB of memory for each
// buffer at runtime. This value is interpreted as a potential/likely
// maximum. totalMemory is per TSE now.
double totalMemory = ((ActiveSchemaDB()->getDefaults()).
getAsULong(FAST_EXTRACT_IO_BUFFERS))*1024*1024;
const PhysicalProperty* const phyProp = getPhysicalProperty();
if (phyProp != NULL)
{
PartitioningFunction * partFunc = phyProp -> getPartitioningFunction() ;
// totalMemory is for all TSEs at this point of time.
totalMemory *= partFunc->getCountOfPartitions();
}
return totalMemory;
}
// -----------------------------------------------------------------------
// PhysicalFastExtract::costMethod()
// Obtain a pointer to a CostMethod object providing access
// to the cost estimation functions for nodes of this class.
// -----------------------------------------------------------------------
CostMethod* PhysicalFastExtract::costMethod() const
{
static THREAD_P CostMethodFastExtract *m = NULL;
if (m == NULL)
m = new (GetCliGlobals()->exCollHeap()) CostMethodFastExtract();
return m;
}