blob: 5bf1219cc38b1e768b9cf694ee34b4a96da14742 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/*
* ExFastTransportIO.h
*
* Created on: Nov 05, 2012
*/
#ifndef __EX_FAST_TRANSPORT_H
#define __EX_FAST_TRANSPORT_H
#include "ComTdbFastTransport.h"
#include "ex_tcb.h"
#include "ComSmallDefs.h"
#include "ExStats.h"
#include "HdfsClient_JNI.h"
#include "ExpLOBinterface.h"
#include "ex_exe_stmt_globals.h"
// -----------------------------------------------------------------------
// Forward class declarations
// -----------------------------------------------------------------------
class sql_buffer;
class ExExeStmtGlobals;
class SequenceFileWriter;
class HdfsClient;
// -----------------------------------------------------------------------
// Classes defined in this file
// -----------------------------------------------------------------------
class ExFastExtractTdb;
class ExFastExtractTcb;
class IOBuffer
{
public:
friend class ExFastExtractTcb;
enum BufferStatus {PARTIAL = 0, FULL, EMPTY, ERR};
IOBuffer(char *buffer, Int32 bufSize)
: bytesLeft_(bufSize),
bufSize_(bufSize),
status_(EMPTY)
{
data_ = buffer;
//memset(data_, '\0', bufSize);
}
~IOBuffer()
{
}
void setStatus(BufferStatus val)
{
status_ = val;
}
BufferStatus getStatus()
{
return status_ ;
}
char* data_;
Int32 bytesLeft_;
Int32 bufSize_;
BufferStatus status_;
};
//----------------------------------------------------------------------
// Task control block
// -----------------------------------------------------------------------
// ExFastExtractTdb
// -----------------------------------------------------------------------
class ExFastExtractTdb : public ComTdbFastExtract
{
public:
// ---------------------------------------------------------------------
// Constructor is only called to instantiate an object used for
// retrieval of the virtual table function pointer of the class while
// unpacking. An empty constructor is enough.
// ---------------------------------------------------------------------
ExFastExtractTdb()
{
}
virtual ~ExFastExtractTdb()
{
}
// ---------------------------------------------------------------------
// Build a TCB for this TDB. Redefined in the Executor project.
// ---------------------------------------------------------------------
virtual ex_tcb *build(ex_globals *globals);
// ---------------------------------------------------------------------
// Public accessor functions
// ---------------------------------------------------------------------
// CRI desc for the work ATP. Will be NULL if the UDR
// has no input or output parameters.
inline ex_cri_desc *getWorkCriDesc() const
{
return workCriDesc_;
}
inline ULng32 getFlags() const
{
return flags_;
}
// Expressions to copy data values to/from message buffers
inline ex_expr *getInputExpression() const
{
return inputExpr_;
}
inline ex_expr *getOutputExpression() const
{
return outputExpr_;
}
// expression for copying child table input into a sqlbuffer
inline ex_expr *getChildDataExpr() const
{
return childDataExpr_;
}
// Defaults for the output buffer pool
inline ULng32 getNumOutputBuffers() const
{
return numBuffers_; // this field comes from the superclass
}
inline ULng32 getOutputSqlBufferSize() const
{
return bufferSize_; // this field comes from the superclass
}
inline ULng32 getNumInputBuffers() const
{
return 1;
}
inline ULng32 getInputSqlBufferSize() const
{
return bufferSize_; // this field comes from the superclass
} // keep it the same as output bufefr size for now
// Size of a single request/reply/output row
inline ULng32 getRequestRowLen() const
{
return requestRowLen_;
}
inline ULng32 getOutputRowLen() const
{
return outputRowLen_;
}
// Number of tupps in the output row
inline unsigned short getNumOutputTupps() const
{
return criDescUp_->noTuples();
}
inline ExpTupleDesc *getChildTuple() const
{
return workCriDesc_->getTupleDescriptor(childDataTuppIndex_);
}
inline ExpTupleDesc *getChildTuple2() const
{
return workCriDesc_->getTupleDescriptor(cnvChildDataTuppIndex_);
}
inline Attributes *getChildTableAttr( UInt32 colInd) const
{
return workCriDesc_->getTupleDescriptor(childDataTuppIndex_)->getAttr(colInd);
}
inline Attributes *getChildTableAttr2( UInt32 colInd) const
{
return workCriDesc_->getTupleDescriptor(cnvChildDataTuppIndex_)->getAttr(colInd);
}
private:
// ---------------------------------------------------------------------
// !!!!!!! IMPORTANT -- NO DATA MEMBERS ALLOWED IN EXECUTOR TDB !!!!!!!!
// *********************************************************************
// The Executor TDB's are only used for the sole purpose of providing a
// way to supplement the Compiler TDB's (in comexe) with methods whose
// implementation depends on Executor objects. This is done so as to
// decouple the Compiler from linking in Executor objects unnecessarily.
//
// When a Compiler generated TDB arrives at the Executor, the same data
// image is "cast" as an Executor TDB after unpacking. Therefore, it is
// a requirement that a Compiler TDB has the same object layout as its
// corresponding Executor TDB. As a result of this, all Executor TDB's
// must have absolutely NO data members, but only member functions. So,
// if you reach here with an intention to add data members to a TDB, ask
// yourself two questions:
//
// 1. Are those data members Compiler-generated?
// If yes, put them in the ComTdbUdr instead.
// If no, they should probably belong to someplace else (like TCB).
//
// 2. Are the classes those data members belong defined in the executor
// project?
// If your answer to both questions is yes, you might need to move
// the classes to the comexe project.
// ---------------------------------------------------------------------
};
//----------------------------------------------------------------------
// Task control block
//----------------------------------------------------------------------
class ExFastExtractTcb : public ex_tcb
{
typedef ex_tcb super;
friend class ExFastExtractTdb;
private:
public:
enum FastExtractStates
{
EXTRACT_NOT_STARTED = 0,
EXTRACT_CHECK_MOD_TS,
EXTRACT_INITIALIZE,
EXTRACT_PASS_REQUEST_TO_CHILD,
EXTRACT_RETURN_ROWS_FROM_CHILD,
EXTRACT_DATA_READY_TO_SEND,
EXTRACT_SYNC_WITH_IO_THREAD,
EXTRACT_ERROR,
EXTRACT_DONE,
EXTRACT_CANCELED
};
enum IOSyncState {PENDING_NONE = 0,
PENDING_FILEOPEN,
PENDING_FREEBUFFER,
PENDING_NOTFULL_QUEUE,
PENDING_THREAD_EXIT};
static const Int32 BUFSZ = 1024 * 1024;
ExFastExtractTcb(const ExFastExtractTdb &tdb,
const ex_tcb & childTcb,
ex_globals *glob);
~ExFastExtractTcb();
virtual void freeResources();
// ---------------------------------------------------------------------
// Standard TCB methods
// ---------------------------------------------------------------------
virtual ExWorkProcRetcode work();
void registerSubtasks();
ex_queue_pair getParentQueue() const
{
return qParent_;
}
Int32 numChildren() const
{
return myTdb().numChildren();
}
virtual const ex_tcb *getChild(Int32 pos) const
{
ex_assert((pos >= 0) , "");
if (pos ==0 )
return childTcb_;
else
return NULL;
}
ex_tcb_private_state *allocatePstates(
Lng32 &numElems, // [IN/OUT] desired/actual elements
Lng32 &pstateLength); // [OUT] length of one element
void updateWorkATPDiagsArea(ex_queue_entry * centry);
void updateWorkATPDiagsArea(ComDiagsArea *da);
void updateWorkATPDiagsArea(ExeErrorCode rc, const char* msg = NULL);
void updateWorkATPDiagsArea(const char *file, int line, const char *msg);
virtual NABoolean needStatsEntry();
virtual ExOperStats *doAllocateStatsEntry(CollHeap *heap,
ComTdb *tdb);
ExFastExtractStats * getFastExtractStats()
{
if (getStatsEntry())
return getStatsEntry()->castToExFastExtractStats();
else
return NULL;
}
protected:
inline const ExFastExtractTdb &myTdb() const { return (const ExFastExtractTdb &) tdb; }
ExExeStmtGlobals *myExeStmtGlobals() const;
virtual void insertUpQueueEntry(ex_queue::up_status status,
ComDiagsArea *diags,
NABoolean popDownQueue);
static const char *getTcbStateString(FastExtractStates s);
//
// Protected data members
//
ex_queue_pair qParent_;
atp_struct *workAtp_;
sql_buffer_pool *outputPool_;
sql_buffer_pool *inputPool_;
IOBuffer *bufferPool_[10];
Int32 numBuffers_;
IOBuffer *currBuffer_;
const ex_tcb *childTcb_;
ex_queue_pair qChild_;
int *sourceFieldsConvIndex_;
SqlBuffer *inSqlBuffer_;
UInt32 maxExtractRowLength_;
tupp_descriptor *childOutputTD_;
NABoolean endOfData_;
CollHeap *heap_;
ExFastExtractStats *feStats_;
time_t tstart_;
UInt32 bufferAllocFailuresCount_;
// modification timestamp of root dir location.
Int64 modTS_;
}; // class ExFastExtractTcb
/////////////////////////////////////////////////////
class ExHdfsFastExtractTcb : public ExFastExtractTcb
{
typedef ex_tcb super;
void convertSQRowToString(ULng32 nullLen,
ULng32 recSepLen,
ULng32 delimLen,
tupp_descriptor* dataDesc,
char* targetData,
NABoolean & convError);
friend class ExFastExtractTdb;
public:
ExHdfsFastExtractTcb(const ExFastExtractTdb &tdb,
const ex_tcb & childTcb,
ex_globals *glob);
~ExHdfsFastExtractTcb();
// ---------------------------------------------------------------------
// Standard TCB methods
// ---------------------------------------------------------------------
virtual Int32 fixup();
virtual ExWorkProcRetcode work();
protected:
Lng32 lobInterfaceDataModCheck(Int64 &failedModTS,
char * failedLocBuf,
Int32 &failedLocBufLen);
virtual void insertUpQueueEntry(ex_queue::up_status status,
ComDiagsArea *diags,
NABoolean popDownQueue);
NABoolean isSequenceFile();
void createSequenceFileError(Int32 sfwRetCode);
void createHdfsClientFileError(Int32 hdfsClientRetCode);
NABoolean isHdfsCompressed();
NABoolean getEmptyNullString()
{
if (myTdb().getNullString() == NULL)
return TRUE;
return strlen(myTdb().getNullString()) == 0;
}
ExLobGlobals * lobGlob_;
char hdfsHost_[500];
int hdfsPort_;
char fileName_[1000];
char targetLocation_[1000];
NABoolean errorOccurred_;
SequenceFileWriter* sequenceFileWriter_;
HdfsClient *hdfsClient_;
}; // class ExHdfsFastExtractTcb
//----------------------------------------------------------------------
// class ExFastExtractPrivateState
//----------------------------------------------------------------------
class ExFastExtractPrivateState : public ex_tcb_private_state
{
friend class ExFastExtractTcb;
friend class ExHdfsFastExtractTcb;
void init()
{
step_ = ExFastExtractTcb::EXTRACT_NOT_STARTED;
matchCount_ = 0;
successRowCount_ = 0;
errorRowCount_ = 0;
processingStarted_ = FALSE;
}
public:
ExFastExtractPrivateState();
~ExFastExtractPrivateState();
protected:
ExFastExtractTcb::FastExtractStates step_;
NABoolean processingStarted_;
Int64 matchCount_;
Int64 successRowCount_;
Int64 errorRowCount_;
};
#endif // __EX_FAST_TRANSPORT_H