blob: fb58b54ffd2cf6105e90a6ae105d8617887a6462 [file] [log] [blame]
// **********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
// **********************************************************************
#include "ComTdbHdfsScan.h"
#include "ComTdbCommon.h"
// Dummy constructor for "unpack" routines.
ComTdbHdfsScan::ComTdbHdfsScan():
ComTdb(ComTdb::ex_HDFS_SCAN, eye_HDFS_SCAN)
{};
// Constructor
ComTdbHdfsScan::ComTdbHdfsScan(
char * tableName,
short type,
ex_expr * select_pred,
ex_expr * move_expr,
ex_expr * convert_expr,
ex_expr * move_convert_expr,
UInt16 convertSkipListSize,
Int16 * convertSkipList,
char * hostName,
tPort port,
Queue * hdfsFileInfoList,
Queue * hdfsFileRangeBeginList,
Queue * hdfsFileRangeNumList,
char recordDelimiter,
char columnDelimiter,
char * nullFormat,
Int64 hdfsBufSize,
UInt32 rangeTailIOSize,
Int64 hdfsSqlMaxRecLen,
Int64 outputRowLength,
Int64 asciiRowLen,
Int64 moveColsRowLen,
const unsigned short tuppIndex,
const unsigned short asciiTuppIndex,
const unsigned short workAtpIndex,
const unsigned short moveColsTuppIndex,
const unsigned short origTuppIndex,
ex_cri_desc * work_cri_desc,
ex_cri_desc * given_cri_desc,
ex_cri_desc * returned_cri_desc,
queue_index down,
queue_index up,
Cardinality estimatedRowCount,
Int32 numBuffers,
UInt32 bufferSize,
char * errCountTable,
char * loggingLocation,
char * errCountId,
char * hdfsRootDir,
Int64 modTSforDir,
Lng32 numOfPartCols,
Queue * hdfsDirsToCheck
)
: ComTdb( ComTdb::ex_HDFS_SCAN,
eye_HDFS_SCAN,
estimatedRowCount,
given_cri_desc,
returned_cri_desc,
down,
up,
numBuffers, // num_buffers - we use numInnerTuples_ instead.
bufferSize), // buffer_size - we use numInnerTuples_ instead.
tableName_(tableName),
type_(type),
selectPred_(select_pred),
moveExpr_(move_expr),
convertExpr_(convert_expr),
moveColsConvertExpr_(move_convert_expr),
convertSkipListSize_(convertSkipListSize),
convertSkipList_(convertSkipList),
hostName_(hostName),
port_(port),
hdfsFileInfoList_(hdfsFileInfoList),
hdfsFileRangeBeginList_(hdfsFileRangeBeginList),
hdfsFileRangeNumList_(hdfsFileRangeNumList),
recordDelimiter_(recordDelimiter),
columnDelimiter_(columnDelimiter),
nullFormat_(nullFormat),
hdfsBufSize_(hdfsBufSize),
rangeTailIOSize_(rangeTailIOSize),
hdfsSqlMaxRecLen_(hdfsSqlMaxRecLen),
outputRowLength_(outputRowLength),
asciiRowLen_(asciiRowLen),
moveExprColsRowLength_(moveColsRowLen),
origTuppIndex_(origTuppIndex),
tuppIndex_(tuppIndex),
asciiTuppIndex_(asciiTuppIndex),
workAtpIndex_(workAtpIndex),
moveExprColsTuppIndex_(moveColsTuppIndex),
workCriDesc_(work_cri_desc),
flags_(0),
errCountTable_(errCountTable),
loggingLocation_(loggingLocation),
errCountRowId_(errCountId),
hdfsRootDir_(hdfsRootDir),
modTSforDir_(modTSforDir),
numOfPartCols_(numOfPartCols),
hdfsIoByteArraySizeInKB_(0),
hdfsDirsToCheck_(hdfsDirsToCheck)
{};
ComTdbHdfsScan::~ComTdbHdfsScan()
{};
void ComTdbHdfsScan::display() const
{};
Long ComTdbHdfsScan::pack(void * space)
{
tableName_.pack(space);
selectPred_.pack(space);
moveExpr_.pack(space);
convertExpr_.pack(space);
moveColsConvertExpr_.pack(space);
convertSkipList_.pack(space);
hostName_.pack(space);
workCriDesc_.pack(space);
// pack elements in hdfsFileInfoList
getHdfsFileInfoList()->position();
for (Lng32 i = 0; i < getHdfsFileInfoList()->numEntries(); i++)
{
HdfsFileInfo * hdf = (HdfsFileInfo*)getHdfsFileInfoList()->getNext();
hdf->fileName_.pack(space);
}
hdfsFileInfoList_.pack(space);
hdfsFileRangeBeginList_.pack(space);
hdfsFileRangeNumList_.pack(space);
nullFormat_.pack(space);
errCountTable_.pack(space);
loggingLocation_.pack(space);
errCountRowId_.pack(space);
hdfsRootDir_.pack(space);
hdfsDirsToCheck_.pack(space);
return ComTdb::pack(space);
}
Lng32 ComTdbHdfsScan::unpack(void * base, void * reallocator)
{
if (tableName_.unpack(base)) return -1;
if(selectPred_.unpack(base, reallocator)) return -1;
if(moveExpr_.unpack(base, reallocator)) return -1;
if(convertExpr_.unpack(base, reallocator)) return -1;
if(moveColsConvertExpr_.unpack(base, reallocator)) return -1;
if(convertSkipList_.unpack(base)) return -1;
if(hostName_.unpack(base)) return -1;
if(workCriDesc_.unpack(base, reallocator)) return -1;
if (hdfsFileInfoList_.unpack(base, reallocator)) return -1;
// unpack elements in hdfsFileInfoList
getHdfsFileInfoList()->position();
for (Lng32 i = 0; i < getHdfsFileInfoList()->numEntries(); i++)
{
HdfsFileInfo * hdf = (HdfsFileInfo*)getHdfsFileInfoList()->getNext();
if (hdf->fileName_.unpack(base)) return -1;
}
if (hdfsFileRangeBeginList_.unpack(base, reallocator)) return -1;
if (hdfsFileRangeNumList_.unpack(base, reallocator)) return -1;
if (nullFormat_.unpack(base)) return -1;
if (errCountTable_.unpack(base)) return -1;
if (loggingLocation_.unpack(base)) return -1;
if (errCountRowId_.unpack(base)) return -1;
if (hdfsRootDir_.unpack(base)) return -1;
if (hdfsDirsToCheck_.unpack(base, reallocator)) return -1;
return ComTdb::unpack(base, reallocator);
}
void ComTdbHdfsScan::displayContents(Space * space,ULng32 flag)
{
ComTdb::displayContents(space,flag & 0xFFFFFFFE);
if(flag & 0x00000008)
{
char buf[16384];
str_sprintf(buf, "\nFor ComTdbHdfsScan :");
space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short));
str_sprintf(buf, "tableName_ = %s", (char*)tableName_);
space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short));
if (hostName_)
str_sprintf(buf, "hostName_ = %s, port_ = %d",
(char *) hostName_, port_);
else
str_sprintf(buf, "hostName_ = NULL (empty scan)");
space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short));
str_sprintf(buf, "recordDelimiter_ = %d, columnDelimiter_ = %d",
recordDelimiter_ , columnDelimiter_ );
space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short));
str_sprintf(buf, "hdfsBufSize_ = %ld, rangeTailIOSize_ = %d, "
"hdfsSqlMaxRecLen_ = %ld",
hdfsBufSize_ , rangeTailIOSize_, hdfsSqlMaxRecLen_ );
space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short));
str_sprintf(buf, "tuppIndex_ = %d, workAtpIndex_ = %d",
tuppIndex_ , workAtpIndex_ );
space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short));
str_sprintf(buf, "asciiTuppIndex_ = %d, asciiRowLen_ = %ld",
asciiTuppIndex_ , asciiRowLen_);
space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short));
str_sprintf(buf, "moveExprColsTuppIndex_ = %d, moveExprColsRowLength_ = %ld",
moveExprColsTuppIndex_ , moveExprColsRowLength_);
space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short));
if (convertSkipListSize_)
{
// only the first element of the skip list is printed for now
str_sprintf(buf, "convertSkipListSize_ = %d, convertSkipList_ = %d",
convertSkipListSize_ , convertSkipList_[0]);
space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short));
}
str_sprintf(buf, "outputRowLength_ = %ld", outputRowLength_);
space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short));
str_sprintf(buf, "Flag = %x",flags_);
space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short));
Queue *hdfsFileInfoList = hdfsFileInfoList_;
if (hdfsFileInfoList)
{
UInt32 dataElems = hdfsFileInfoList->numEntries();
str_sprintf(buf, "\nNumber of ranges to scan: %d",
(Int32) dataElems);
space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short));
Lng32 currInstNum = -1;
Lng32 currEntryNum = 0;
Lng32 startOfNextInst = 0;
HdfsFileInfo * hdfo;
hdfsFileInfoList->position();
while ((hdfo = (HdfsFileInfo*)hdfsFileInfoList->getNext()) != NULL)
{
if (currEntryNum >= startOfNextInst)
{
currInstNum++;
startOfNextInst =
*((Lng32 *) getHdfsFileRangeBeginList()->get(currInstNum)) +
*((Lng32 *) getHdfsFileRangeNumList()->get(currInstNum));
}
currEntryNum++;
}
Int32 numESPs = currInstNum+1;
str_sprintf(buf, "Number of esps to scan: %d\n",
numESPs);
space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short));
currInstNum = -1;
currEntryNum = 0;
startOfNextInst = 0;
Int64 bytesPerESP[4096];
Int64 totalBytes = 0;
for (Int32 i=0; i<4096; i++)
bytesPerESP[i] = 0;
if (isOrcFile())
str_sprintf(
buf,
"%6s %6s %12s %12s %s",
" Esp#", "Range#", " StartRowNum", " NumRows", " FileName");
else
str_sprintf(
buf,
"%6s %6s %12s %12s %s",
" Esp#", "Range#", " StartOffset", " BytesRead", " FileName");
space->allocateAndCopyToAlignedSpace(buf, str_len(buf),
sizeof(short));
str_sprintf(
buf, "====== ====== ============ ============ ==============================\n");
space->allocateAndCopyToAlignedSpace(buf, str_len(buf),
sizeof(short));
hdfsFileInfoList->position();
while ((hdfo = (HdfsFileInfo*)hdfsFileInfoList->getNext()) != NULL)
{
if (currEntryNum >= startOfNextInst)
{
currInstNum++;
startOfNextInst =
*((Lng32 *) getHdfsFileRangeBeginList()->get(currInstNum)) +
*((Lng32 *) getHdfsFileRangeNumList()->get(currInstNum));
}
NABoolean printStr = FALSE;
NABoolean isLocal = FALSE;
char splitInfo[200];
strcpy(splitInfo, " ");
if (hdfo->fileIsLocal()) //hdfo->getFlags() & HdfsFileInfo::HDFSFILEFLAGS_LOCAL)
{
isLocal = TRUE;
strcpy(splitInfo, "(local");
}
if ((hdfo->fileIsSplitBegin()) ||
(hdfo->fileIsSplitEnd()))
{
if (isLocal)
strcat(splitInfo, ", ");
else
strcpy(splitInfo, "(");
printStr = TRUE;
strcat(splitInfo, "split_");
if (hdfo->fileIsSplitBegin())
{
strcat(splitInfo, "b");
if (hdfo->fileIsSplitEnd())
strcat(splitInfo, "/");
}
if (hdfo->fileIsSplitEnd())
strcat(splitInfo, "e");
strcat(splitInfo, ")");
}
else if (isLocal)
strcat(splitInfo, ")");
if (isSequenceFile())
{
if ( isLocal ||
(hdfo->fileIsSplitBegin()) ||
(hdfo->fileIsSplitEnd()))
strcat(splitInfo, ", ");
strcat(splitInfo, "seq");
}
// filename is fully qualified addr and looks like this:
// hdfs://....com/hive/warehouse/so_dtl_f_test/000030_0
// To make it little easier to read, if it starts with "hdfs://", then pick
// the last 2 parts, or maybe 3 parts at some point.
Lng32 i = 0;
if (str_cmp(hdfo->fileName(), "hdfs://", strlen("hdfs://")) == 0)
{
i = strlen(hdfo->fileName()) - 1;
Lng32 numParts = 2;
NABoolean done = FALSE;
while (NOT done)
{
if ((i > 0) && (numParts > 0) && (hdfo->fileName()[i] != '/'))
i--;
else
{
if (i <= 0)
done = TRUE;
else
{
numParts--;
if (numParts == 0)
{
i++;
done = TRUE;
}
else
i--;
} // else
} // else
} // while
} // if
str_sprintf(
buf,
"%6d %6d %12ld %12ld %s %s",
currInstNum,
currEntryNum,
hdfo->getStartOffset(),
hdfo->getBytesToRead(),
&hdfo->fileName()[i],
splitInfo);
space->allocateAndCopyToAlignedSpace(buf, str_len(buf),
sizeof(short));
bytesPerESP[currInstNum] += (hdfo->getBytesToRead() >= 0 ? hdfo->getBytesToRead() : 0);
totalBytes += (hdfo->getBytesToRead() >= 0 ? hdfo->getBytesToRead() : 0);
currEntryNum++;
}
str_sprintf(buf, "\nSummary of bytes read per ESP (%ld = 100 percent):\n",
totalBytes);
space->allocateAndCopyToAlignedSpace(buf, str_len(buf),
sizeof(short));
if (totalBytes == 0)
totalBytes = 1; // avoid divide by zero below
numESPs = currInstNum+1;
for (Int32 e=0; e<numESPs; e++)
{
str_sprintf(buf,"ESP %4d reads %18ld bytes (%4ld percent of avg)",
e,
bytesPerESP[e],
(Int32) 100 * bytesPerESP[e] * numESPs / totalBytes);
space->allocateAndCopyToAlignedSpace(buf, str_len(buf),
sizeof(short));
}
}
if (hdfsRootDir_)
{
str_sprintf(buf, "hdfsRootDir: %s", hdfsRootDir());
space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short));
str_sprintf(buf, "modTSforDir_ = %ld, numOfPartCols_ = %d",
modTSforDir_, numOfPartCols_);
space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short));
if (hdfsDirsToCheck())
{
hdfsDirsToCheck()->position();
char * dir = NULL;
while ((dir = (char*)hdfsDirsToCheck()->getNext()) != NULL)
{
str_sprintf(buf, "Dir Name: %s", dir);
space->allocateAndCopyToAlignedSpace(buf, str_len(buf), sizeof(short));
}
}
}
}
if(flag & 0x00000001)
{
displayExpression(space,flag);
displayChildren(space,flag);
}
}
Int32 ComTdbHdfsScan::orderedQueueProtocol() const
{
return 1;
}
///////////////////////////////////////////////////////////////
// ComTdbOrcAggr
///////////////////////////////////////////////////////////////
// Dummy constructor for "unpack" routines.
ComTdbOrcFastAggr::ComTdbOrcFastAggr():
ComTdbHdfsScan()
{
setNodeType(ComTdb::ex_ORC_AGGR);
setEyeCatcher(eye_ORC_AGGR);
};
// Constructor
ComTdbOrcFastAggr::ComTdbOrcFastAggr(
char * tableName,
OrcAggrType type,
Queue * hdfsFileInfoList,
Queue * hdfsFileRangeBeginList,
Queue * hdfsFileRangeNumList,
ex_expr * proj_expr,
Int64 projRowLen,
const unsigned short projTuppIndex,
const unsigned short returnedTuppIndex,
ex_cri_desc * work_cri_desc,
ex_cri_desc * given_cri_desc,
ex_cri_desc * returned_cri_desc,
queue_index down,
queue_index up,
Int32 numBuffers,
UInt32 bufferSize
)
: ComTdbHdfsScan(
tableName,
type,
NULL, NULL,
proj_expr,
NULL,
0, NULL, NULL, 0,
hdfsFileInfoList,
hdfsFileRangeBeginList,
hdfsFileRangeNumList,
0, 0, NULL, 0, 0, 0,
projRowLen,
0, 0,
returnedTuppIndex,
0,
projTuppIndex,
0, 0,
work_cri_desc,
given_cri_desc,
returned_cri_desc,
down,
up,
0,
numBuffers, // num_buffers - we use numInnerTuples_ instead.
bufferSize), // buffer_size - we use numInnerTuples_ instead.
type_(type)
{
setNodeType(ComTdb::ex_ORC_AGGR);
setEyeCatcher(eye_ORC_AGGR);
}
ComTdbOrcFastAggr::~ComTdbOrcFastAggr()
{
}