blob: 1aec212a31813ee6a78bbe9f9a93b33323dbf938 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
#ifndef _HDFSHOOK_H
#define _HDFSHOOK_H
// HDFS-level statistics for Hive tables (Hive HDFS = HHDFS)
//
// We assume a Hive table to look like this in HDFS
//
// + top-level directory for Hive table
// |
// ---> + partition directories for partitioned tables only
// |
// ---> + buckets (_<bucket> file suffixes) for bucketed tables only
// |
// ---> + files (one or more HDFS files with data)
// |
// ---> + blocks of the file
// |
// ---> list of hosts with replicas for a given block
#include "Collections.h"
#include "NAString.h"
#include "CmpContext.h"
#include "hdfs.h"
#include <stdio.h>
// forward declarations
struct hive_tbl_desc;
class HivePartitionAndBucketKey;
class HHDFSTableStats;
class OsimHHDFSStatsBase;
class OptimizerSimulator;
class ExLobGlobals;
typedef CollIndex HostId;
typedef Int64 BucketNum;
typedef Int64 BlockNum;
typedef Int64 Offset;
class HHDFSMasterHostList : public NABasicObject
{
friend class OptimizerSimulator;
public:
HHDFSMasterHostList(NAMemory *heap) {}
~HHDFSMasterHostList();
// translate a host name to a number (add host if needed)
static HostId getHostNum(const char *hostName);
// get host name from host number
static const char * getHostName(HostId hostNum);
static CollIndex entries() { return getHosts()->entries(); }
static CollIndex getNumSQNodes() { return numSQNodes_; }
static void resetNumSQNodes() { numSQNodes_ = 0; }
static void resethasVirtualSQNodes() { hasVirtualSQNodes_ = FALSE; }
static CollIndex getNumNonSQNodes() { return getHosts()->entries()-numSQNodes_; }
static NABoolean usesRemoteHDFS() { return getNumNonSQNodes() > 0; }
static NABoolean hasVirtualSQNodes() { return hasVirtualSQNodes_; }
static const CollIndex InvalidHostId = NULL_COLL_INDEX;
// make sure all SeaQuest nodes are recorded with their SQ node numbers 0...n-1
static NABoolean initializeWithSeaQuestNodes();
private:
static HostId getHostNumInternal(const char *hostName);
static ARRAY(const char *) *getHosts()
{
return CmpCommon::context()->getHosts();
}
// an array that translates host names into integer indexes
static THREAD_P CollIndex numSQNodes_;
static THREAD_P NABoolean hasVirtualSQNodes_;
};
class HHDFSDiags
{
public:
HHDFSDiags() { success_ = TRUE; }
NABoolean isSuccess() const { return success_; }
const char *getErrLoc() const { return errLoc_; }
const char *getErrMsg() const { return errMsg_; }
void recordError(const char *errMsg,
const char *errLoc = NULL);
void reset() { errLoc_ = ""; errMsg_ = ""; success_ = TRUE; }
private:
NABoolean success_;
NAString errLoc_;
NAString errMsg_;
};
class HHDFSStatsBase : public NABasicObject
{
friend class OsimHHDFSStatsBase;
public:
HHDFSStatsBase(HHDFSTableStats *table) : numBlocks_(0),
numFiles_(0),
totalRows_(0),
totalStringLengths_(0),
totalSize_(0),
numStripes_(0),
modificationTS_(0),
sampledBytes_(0),
sampledRows_(0),
table_(table) {}
void add(const HHDFSStatsBase *o);
void subtract(const HHDFSStatsBase *o);
Int64 getTotalSize() const { return totalSize_; }
Int64 getNumFiles() const { return numFiles_; }
Int64 getNumBlocks() const { return numBlocks_; }
Int64 getTotalRows() const { return totalRows_; }
Int64 getTotalStringLengths() { return totalStringLengths_; }
Int64 getNumStripes() const { return numStripes_; }
Int64 getSampledBytes() const { return sampledBytes_; }
Int64 getSampledRows() const { return sampledRows_; }
time_t getModificationTS() const { return modificationTS_; }
Int64 getEstimatedBlockSize() const;
Int64 getEstimatedRowCount() const;
Int64 getEstimatedRecordLength() const;
void print(FILE *ofd, const char *msg);
const HHDFSTableStats *getTable() const { return table_; }
HHDFSTableStats *getTable() { return table_; }
virtual OsimHHDFSStatsBase* osimSnapShot(NAMemory * heap){ return NULL; }
protected:
Int64 numBlocks_;
Int64 numFiles_;
Int64 totalRows_; // for ORC files
Int64 numStripes_; // for ORC files
Int64 totalStringLengths_; // for ORC files
Int64 totalSize_;
time_t modificationTS_; // last modification time of this object (file, partition/directory, bucket or table)
Int64 sampledBytes_;
Int64 sampledRows_;
HHDFSTableStats *table_;
};
class HHDFSFileStats : public HHDFSStatsBase
{
friend class OsimHHDFSFileStats;
public:
HHDFSFileStats(NAMemory *heap,
HHDFSTableStats *table) :
HHDFSStatsBase(table),
heap_(heap),
fileName_(heap),
blockHosts_(NULL) {}
~HHDFSFileStats();
void populate(hdfsFS fs,
hdfsFileInfo *fileInfo,
Int32& samples,
HHDFSDiags &diags,
NABoolean doEstimation = TRUE,
char recordTerminator = '\n'
);
const NAString & getFileName() const { return fileName_; }
Int32 getReplication() const { return replication_; }
Int64 getBlockSize() const { return blockSize_; }
HostId getHostId(Int32 replicate, Int64 blockNum) const
{ return blockHosts_[replicate*numBlocks_+blockNum]; }
virtual void print(FILE *ofd);
virtual OsimHHDFSStatsBase* osimSnapShot(NAMemory * heap);
private:
NAString fileName_;
Int32 replication_;
Int64 blockSize_;
// list of blocks for this file
HostId *blockHosts_;
NAMemory *heap_;
};
class HHDFSBucketStats : public HHDFSStatsBase
{
friend class OsimHHDFSBucketStats;
public:
HHDFSBucketStats(NAMemory *heap,
HHDFSTableStats *table) :
HHDFSStatsBase(table),
heap_(heap), fileStatsList_(heap), scount_(0) {}
~HHDFSBucketStats();
const CollIndex entries() const { return fileStatsList_.entries(); }
const HHDFSFileStats * operator[](CollIndex i) const
{ return fileStatsList_[i]; }
HHDFSFileStats * operator[](CollIndex i) { return fileStatsList_[i]; }
void addFile(hdfsFS fs, hdfsFileInfo *fileInfo,
HHDFSDiags &diags,
NABoolean doEstimate = TRUE,
char recordTerminator = '\n',
CollIndex pos = NULL_COLL_INDEX);
void removeAt(CollIndex i);
void print(FILE *ofd);
void insertAt(Int32 pos, HHDFSFileStats* st){ fileStatsList_.insertAt(pos, st); }
virtual OsimHHDFSStatsBase* osimSnapShot(NAMemory * heap);
private:
// list of files in this bucket
LIST(HHDFSFileStats *) fileStatsList_;
NAMemory *heap_;
Int32 scount_; // total # of sampled files associated with this bucket
};
class HHDFSListPartitionStats : public HHDFSStatsBase
{
friend class OsimHHDFSListPartitionStats;
public:
HHDFSListPartitionStats(NAMemory *heap,
HHDFSTableStats *table) :
HHDFSStatsBase(table),
heap_(heap), partitionDir_(heap),
bucketStatsList_(heap),
doEstimation_(FALSE),
recordTerminator_(0)
{}
~HHDFSListPartitionStats();
const NAString &getDirName() const { return partitionDir_; }
const CollIndex entries() const { return bucketStatsList_.entries(); }
const HHDFSBucketStats * operator[](CollIndex i) const
{ return (bucketStatsList_.used(i) ? bucketStatsList_[i] : NULL); }
Int32 getNumOfBuckets() const { return (defaultBucketIdx_ ? defaultBucketIdx_ : 1); }
Int32 getLastValidBucketIndx() const { return defaultBucketIdx_; }
const hdfsFileInfo * dirInfo() const {return &dirInfo_; }
void populate(hdfsFS fs, const NAString &dir, Int32 numOfBuckets,
HHDFSDiags &diags,
NABoolean doEsTimation, char recordTerminator);
NABoolean validateAndRefresh(hdfsFS fs, HHDFSDiags &diags, NABoolean refresh);
Int32 determineBucketNum(const char *fileName);
void print(FILE *ofd);
void insertAt(Int32 pos, HHDFSBucketStats* st){ bucketStatsList_.insertAt(pos, st); }
virtual OsimHHDFSStatsBase* osimSnapShot(NAMemory * heap);
private:
// directory of the partition
NAString partitionDir_;
NAString partitionKeyValues_;
int partIndex_; // index in HDFSTableStats list
// number of buckets (from table DDL) or 0 if partition is not bucketed
// Note this value can never be 1. This value indicates the last
// valid index in bucketStatsList_ below.
Int32 defaultBucketIdx_;
// array of buckets in this partition (index is bucket #)
ARRAY(HHDFSBucketStats *) bucketStatsList_;
NABoolean doEstimation_;
char recordTerminator_;
hdfsFileInfo dirInfo_;
NAMemory *heap_;
};
// HDFS file-level statistics for a Hive table. This includes
// partitioned and bucketed tables, binary and compressed tables
class HHDFSTableStats : public HHDFSStatsBase
{
friend class HivePartitionAndBucketKey; // to be able to make a subarray of the partitions
friend class OsimHHDFSTableStats;
friend class OptimizerSimulator;
public:
HHDFSTableStats(NAMemory *heap) : HHDFSStatsBase(this),
currHdfsPort_(-1),
fs_(NULL),
hdfsPortOverride_(-1),
tableDir_(heap),
numOfPartCols_(0),
totalNumPartitions_(0),
recordTerminator_(0),
fieldTerminator_(0),
nullFormat_(NULL),
validationJTimestamp_(-1),
listPartitionStatsList_(heap),
hiveStatsSize_(0),
heap_(heap),
type_(UNKNOWN_),
modificationTSInMillisec_(-1)
{}
~HHDFSTableStats();
const CollIndex entries() const { return listPartitionStatsList_.entries(); }
const HHDFSListPartitionStats * operator[](CollIndex i) const
{ return listPartitionStatsList_[i]; }
// Populate the HDFS statistics for this table and set diagnostics info
// return success status: TRUE = success, FALSE = failure.
// Use getDiags() method to get error details.
NABoolean populate(struct hive_tbl_desc *htd);
// Check for HDFS file changes, return TRUE if stats are still valid
// or could be refreshed. Diagnostics could be set if returning FALSE,
// checking for diagnostics is optional.
NABoolean validateAndRefresh(Int64 expirationTimestamp=-1, NABoolean refresh=TRUE);
// Split a location into its parts.
// If you want to set a ComDiagsArea for errors, use
// TableDesc::splitHiveLocation
static NABoolean splitLocation(const char *tableLocation,
NAString &hdfsHost,
Int32 &hdfsPort,
NAString &tableDir,
HHDFSDiags &diags,
int hdfsPortOverride);
void processDirectory(const NAString &dir, Int32 numOfBuckets,
NABoolean doEstimation, char recordTerminator);
void setPortOverride(Int32 portOverride) { hdfsPortOverride_ = portOverride; }
Int32 getPortOverride() const {return hdfsPortOverride_;}
char getRecordTerminator() const {return recordTerminator_;}
char getFieldTerminator() const {return fieldTerminator_;}
char *getNullFormat() const { return nullFormat_; }
Int32 getNumPartitions() const {return totalNumPartitions_;}
Int64 getValidationTimestamp() const { return validationJTimestamp_; }
// return # of buckets if all partns are consistently bucketed, 0 otherwise
// caller has to check for same bucket cols
Int32 getNumOfConsistentBuckets() const;
// for the NATable cache
void setupForStatement();
void resetAfterStatement();
void print(FILE *ofd);
// heap size used by the hive stats
Int32 getHiveStatsUsedHeap()
{
return (hiveStatsSize_);
}
const HHDFSDiags &getDiags() const { return diags_; }
const NABoolean hasError() const { return !diags_.isSuccess(); }
const NABoolean isTextFile() const { return (type_ == TEXT_);}
const NABoolean isSequenceFile() const { return (type_ == SEQUENCE_);}
const NABoolean isOrcFile() const { return (type_ == ORC_);}
const NAString &tableDir() const { return tableDir_; }
void insertAt(Int32 pos, HHDFSListPartitionStats * st) { listPartitionStatsList_.insertAt(pos, st); }
virtual OsimHHDFSStatsBase* osimSnapShot(NAMemory * heap);
void captureHiveTableStats(const NAString &tableName, Int64 lastModificationTs, hive_tbl_desc* hvt_desc );
static HHDFSTableStats * restoreHiveTableStats(const NAString & tableName, Int64 lastModificationTs, hive_tbl_desc* hvt_desc, NAMemory* heap);
const Lng32 numOfPartCols() const { return numOfPartCols_; }
const Lng32 totalNumPartitions() const { return totalNumPartitions_; }
// finer-resolution timestamp for entire table
// (can remove this once we use JNI to collect this info
// for all HDFS files)
Int64 getModificationTSmsec() const { return modificationTSInMillisec_; }
void computeModificationTSmsec();
private:
enum FileType
{
UNKNOWN_ = 0,
TEXT_ = 1,
SEQUENCE_ = 2,
ORC_ = 3
};
NABoolean connectHDFS(const NAString &host, Int32 port);
void disconnectHDFS();
NAString currHdfsHost_;
Int32 currHdfsPort_;
hdfsFS fs_;
// host/port used to connect to HDFS
Int32 hdfsPortOverride_; // if > -1, use this port # instead of what's in the Hive metadata
// HDFS directory name of the table
NAString tableDir_;
// indicates how many levels of partitioning directories the table has
int numOfPartCols_;
// total number of actual partitions, at all levels,
// or 1 if the table is not partitioned
int totalNumPartitions_;
char recordTerminator_ ;
char fieldTerminator_ ;
char *nullFormat_;
Int64 validationJTimestamp_;
// heap size used by the hive stats
Int32 hiveStatsSize_;
// List of partitions of this file. The reason why this is
// an array and not a list is that we want to use an NASubarray
// for it in class HivePartitionAndBucketKey (file SearchKey.h).
ARRAY(HHDFSListPartitionStats *) listPartitionStatsList_;
// These diags get reset and populated by the two top-level calls
// for HDFS statistics: populate() and validateAndRefresh().
// Check the diags after these two calls.
// For validateAndRefresh() the best way to deal with any errors is
// to treat them as an invalid cache entry (FALSE return value) and
// retry reading all stats - it may succeed with fresh information.
HHDFSDiags diags_;
NAMemory *heap_;
FileType type_;
Int64 modificationTSInMillisec_;
};
#endif