blob: 08f35d336b42d220dd9a6dde1f1ad8b309bb343b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DRILL_CLIENT_IMPL_H
#define DRILL_CLIENT_IMPL_H
#include "drill/common.hpp"
// Define some BOOST defines
// WIN32_SHUTDOWN_ON_TIMEOUT is defined in "drill/common.hpp" for Windows 32 bit platform
#ifndef WIN32_SHUTDOWN_ON_TIMEOUT
#define BOOST_ASIO_ENABLE_CANCELIO
#endif //WIN32_SHUTDOWN_ON_TIMEOUT
#include <algorithm>
#include <queue>
#include <vector>
#include <boost/asio.hpp>
#if defined _WIN32 || defined _WIN64
//Windows header files redefine 'random'
#ifdef random
#undef random
#endif
#endif
#include <boost/asio/deadline_timer.hpp>
#include <boost/function.hpp>
#include <boost/thread.hpp>
#include "drill/drillClient.hpp"
#include "drill/drillConfig.hpp"
#include "drill/drillError.hpp"
#include "drill/preparedStatement.hpp"
#include "channel.hpp"
#include "collectionsImpl.hpp"
#include "metadata.hpp"
#include "rpcMessage.hpp"
#include "utils.hpp"
#include "User.pb.h"
#include "UserBitShared.pb.h"
#include "saslAuthenticatorImpl.hpp"
namespace Drill {
class DrillClientImpl;
class DrillClientQueryHandle;
class DrillClientPrepareHandle;
class RecordBatch;
/*
* Defines the interface used by DrillClient and implemented by DrillClientImpl and PooledDrillClientImpl
* */
class DrillClientImplBase{
public:
DrillClientImplBase(){
}
virtual ~DrillClientImplBase(){
}
//Connect via Zookeeper or directly.
//Makes an initial connection to a drillbit. successful connect adds the first drillbit to the pool.
virtual connectionStatus_t connect(const char* connStr, DrillUserProperties* props)=0;
// Test whether the client is active. Returns true if any one of the underlying connections is active
virtual bool Active()=0;
// Closes all open connections.
virtual void Close()=0;
// Returns the last error encountered by any of the underlying executing queries or connections
virtual DrillClientError* getError()=0;
// Submits a query to a drillbit.
virtual DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx)=0;
virtual DrillClientPrepareHandle* PrepareQuery(const std::string& plan, pfnPreparedStatementListener listener, void* listenerCtx)=0;
virtual DrillClientQueryResult* ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx)=0;
//Waits as a connection has results pending
virtual void waitForResults()=0;
//Validates handshake at connect time.
virtual connectionStatus_t validateHandshake(DrillUserProperties* props)=0;
virtual void freeQueryResources(DrillClientQueryHandle* pQryHandle)=0;
virtual meta::DrillMetadata* getMetadata() = 0;
virtual void freeMetadata(meta::DrillMetadata* metadata) = 0;
};
/**
* Base type for query handles
*/
class DrillClientQueryHandle{
friend class DrillClientImpl;
public:
DrillClientQueryHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, void* context, int in_expectedRPCType) :
m_client(client),
m_coordinationId(coordId),
m_query(query),
m_status(QRY_SUCCESS),
m_bCancel(false),
m_bHasError(false),
m_expectedRPCType(in_expectedRPCType),
m_pError(NULL),
m_pApplicationContext(context){
};
virtual ~DrillClientQueryHandle(){
clearAndDestroy();
};
virtual void cancel();
bool isCancelled() const {return m_bCancel;};
int32_t getCoordinationId() const { return m_coordinationId;}
const std::string& getQuery() const { return m_query;}
bool hasError() const { return m_bHasError;}
void resetError() { m_bHasError = false; }
status_t getErrorStatus() const { return m_pError!=NULL?(status_t)m_pError->status:QRY_SUCCESS;}
const DrillClientError* getError() const { return m_pError;}
void setQueryStatus(status_t s){ m_status = s;}
status_t getQueryStatus() const { return m_status;}
inline DrillClientImpl& client() const { return m_client; };
int getExpectedRPCType() const { return m_expectedRPCType; };
inline void* getApplicationContext() const { return m_pApplicationContext; }
protected:
virtual void signalError(DrillClientError* pErr);
virtual void clearAndDestroy();
private:
DrillClientImpl& m_client;
int32_t m_coordinationId;
std::string m_query;
status_t m_status;
bool m_bCancel;
bool m_bHasError;
int m_expectedRPCType;
const DrillClientError* m_pError;
void* m_pApplicationContext;
};
template<typename Listener, typename ListenerValue>
class DrillClientBaseHandle: public DrillClientQueryHandle {
friend class DrillClientImpl;
public:
DrillClientBaseHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, Listener listener, void* context, int in_expectedRPCType) :
DrillClientQueryHandle(client, coordId, query, context, in_expectedRPCType),
m_pApplicationListener(listener){
};
virtual ~DrillClientBaseHandle(){
clearAndDestroy();
};
inline Listener getApplicationListener() const { return m_pApplicationListener; }
protected:
virtual status_t notifyListener(ListenerValue v, DrillClientError* pErr);
virtual void signalError(DrillClientError* pErr);
private:
Listener m_pApplicationListener;
};
class DrillClientQueryResult: public DrillClientBaseHandle<pfnQueryResultsListener, RecordBatch*>{
friend class DrillClientImpl;
public:
DrillClientQueryResult(DrillClientImpl& client, int32_t coordId, const std::string& query, pfnQueryResultsListener listener, void* listenerCtx):
DrillClientBaseHandle<pfnQueryResultsListener, RecordBatch*>(client, coordId, query, listener, listenerCtx, exec::user::QUERY_HANDLE),
m_numBatches(0),
m_columnDefs(new std::vector<Drill::FieldMetadata*>),
m_bIsQueryPending(true),
m_bIsLastChunk(false),
m_bHasSchemaChanged(false),
m_bHasData(false),
m_queryState(exec::shared::QueryResult_QueryState_STARTING),
m_pQueryId(NULL),
m_pSchemaListener(NULL) {
};
virtual ~DrillClientQueryResult(){
this->clearAndDestroy();
};
// get data asynchronously
void registerSchemaChangeListener(pfnSchemaListener l){
m_pSchemaListener=l;
}
void cancel();
// Synchronous call to get data. Caller assumes ownership of the record batch
// returned and it is assumed to have been consumed.
RecordBatch* getNext();
// Synchronous call to get a look at the next Record Batch. This
// call does not move the current pointer forward. Repeated calls
// to peekNext return the same value until getNext is called.
RecordBatch* peekNext();
// Blocks until data is available.
void waitForData();
// placeholder to return an empty col def vector when calls are made out of order.
static FieldDefPtr s_emptyColDefs;
FieldDefPtr getColumnDefs() {
boost::lock_guard<boost::mutex> bufferLock(this->m_schemaMutex);
return this->m_columnDefs;
}
bool hasSchemaChanged() const {return this->m_bHasSchemaChanged;};
void setQueryId(exec::shared::QueryId* q){this->m_pQueryId=q;}
exec::shared::QueryId& getQueryId() const { return *(this->m_pQueryId); }
void setQueryState(exec::shared::QueryResult_QueryState s){ m_queryState = s;}
exec::shared::QueryResult_QueryState getQueryState() const { return m_queryState;}
void setIsQueryPending(bool isPending){
boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
m_bIsQueryPending=isPending;
}
protected:
virtual status_t notifyListener(RecordBatch* batch, DrillClientError* pErr);
virtual void signalError(DrillClientError* pErr);
virtual void clearAndDestroy();
private:
status_t setupColumnDefs(exec::shared::QueryData* pQueryData);
status_t defaultQueryResultsListener(void* ctx, RecordBatch* b, DrillClientError* err);
// Construct a DrillClientError object, set the appropriate state and signal any listeners, condition variables.
// Also used when a query is cancelled or when a query completed response is received.
// Error object is now owned by the DrillClientQueryResult object.
void signalComplete();
size_t m_numBatches; // number of record batches received so far
// Vector of Buffers holding data returned by the server
// Each data buffer is decoded into a RecordBatch
std::vector<ByteBuf_t> m_dataBuffers;
std::queue<RecordBatch*> m_recordBatches;
FieldDefPtr m_columnDefs;
// Mutex to protect schema definitions
boost::mutex m_schemaMutex;
// Mutex for Cond variable for read write to batch vector
boost::mutex m_cvMutex;
// Condition variable to signal arrival of more data. Condition variable is signaled
// if the recordBatches queue is not empty
boost::condition_variable m_cv;
// state
// if m_bIsQueryPending is true, we continue to wait for results
bool m_bIsQueryPending;
bool m_bIsLastChunk;
bool m_bHasSchemaChanged;
bool m_bHasData;
// state in the last query result received from the server.
exec::shared::QueryResult_QueryState m_queryState;
exec::shared::QueryId* m_pQueryId;
// Schema change listener
pfnSchemaListener m_pSchemaListener;
};
class DrillClientPrepareHandle: public DrillClientBaseHandle<pfnPreparedStatementListener, PreparedStatement*>, public PreparedStatement {
public:
DrillClientPrepareHandle(DrillClientImpl& client, int32_t coordId, const std::string& query, pfnPreparedStatementListener listener, void* listenerCtx):
DrillClientBaseHandle<pfnPreparedStatementListener, PreparedStatement*>(client, coordId, query, listener, listenerCtx, exec::user::PREPARED_STATEMENT),
PreparedStatement(),
m_columnDefs(new std::vector<Drill::FieldMetadata*>) {
};
// PreparedStatement overrides
virtual std::size_t getNumFields() const { return m_columnDefs->size(); }
virtual const Drill::FieldMetadata& getFieldMetadata(std::size_t index) const { return *m_columnDefs->at(index);}
protected:
virtual void clearAndDestroy();
private:
friend class DrillClientImpl;
status_t setupPreparedStatement(const exec::user::PreparedStatement& pstmt);
FieldDefPtr m_columnDefs;
::exec::user::PreparedStatementHandle m_preparedStatementHandle;
};
typedef status_t (*pfnServerMetaListener)(void* ctx, const exec::user::ServerMeta* serverMeta, DrillClientError* err);
class DrillClientServerMetaHandle: public DrillClientBaseHandle<pfnServerMetaListener, const exec::user::ServerMeta*> {
public:
DrillClientServerMetaHandle(DrillClientImpl& client, int32_t coordId, pfnServerMetaListener listener, void* listenerCtx):
DrillClientBaseHandle<pfnServerMetaListener, const exec::user::ServerMeta*>(client, coordId, "server meta", listener, listenerCtx, exec::user::SERVER_META) {
};
private:
friend class DrillClientImpl;
};
template<typename Listener, typename MetaType, typename MetaImpl, typename MetadataResult>
class DrillClientMetadataResult: public DrillClientBaseHandle<Listener, const DrillCollection<MetaType>*> {
public:
DrillClientMetadataResult(DrillClientImpl& client, int32_t coordId, const std::string& query, Listener listener, void* listenerCtx, int in_expectedRPCType) :
DrillClientBaseHandle<Listener, const DrillCollection<MetaType>*>(client, coordId, query, listener, listenerCtx, in_expectedRPCType) {}
void attachMetadataResult(MetadataResult* result) { this->m_pMetadata.reset(result); }
private:
friend class DrillClientImpl;
// Meta informations returned to the user, linked to the handle
DrillVector<MetaType, MetaImpl> m_meta;
// to keep a reference on the underlying metadata object, and
// make sure it's clean when this handle is destroyed
boost::shared_ptr<MetadataResult> m_pMetadata;
};
class DrillClientCatalogResult: public DrillClientMetadataResult<Metadata::pfnCatalogMetadataListener, meta::CatalogMetadata, meta::DrillCatalogMetadata, exec::user::GetCatalogsResp> {
friend class DrillClientImpl;
public:
DrillClientCatalogResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnCatalogMetadataListener listener, void* listenerCtx):
DrillClientMetadataResult<Metadata::pfnCatalogMetadataListener, meta::CatalogMetadata, meta::DrillCatalogMetadata, exec::user::GetCatalogsResp>(client, coordId, "getCatalog", listener, listenerCtx, exec::user::CATALOGS) {}
};
class DrillClientSchemaResult: public DrillClientMetadataResult<Metadata::pfnSchemaMetadataListener, meta::SchemaMetadata, meta::DrillSchemaMetadata, exec::user::GetSchemasResp> {
friend class DrillClientImpl;
public:
DrillClientSchemaResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnSchemaMetadataListener listener, void* listenerCtx):
DrillClientMetadataResult<Metadata::pfnSchemaMetadataListener, meta::SchemaMetadata, meta::DrillSchemaMetadata, exec::user::GetSchemasResp>(client, coordId, "getSchemas", listener, listenerCtx, exec::user::SCHEMAS) {}
};
class DrillClientTableResult: public DrillClientMetadataResult<Metadata::pfnTableMetadataListener, meta::TableMetadata, meta::DrillTableMetadata, exec::user::GetTablesResp> {
friend class DrillClientImpl;
public:
DrillClientTableResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnTableMetadataListener listener, void* listenerCtx):
DrillClientMetadataResult<Metadata::pfnTableMetadataListener, meta::TableMetadata, meta::DrillTableMetadata, exec::user::GetTablesResp>(client, coordId, "getTables", listener, listenerCtx, exec::user::TABLES) {}
};
class DrillClientColumnResult: public DrillClientMetadataResult<Metadata::pfnColumnMetadataListener, meta::ColumnMetadata, meta::DrillColumnMetadata, exec::user::GetColumnsResp> {
friend class DrillClientImpl;
public:
DrillClientColumnResult(DrillClientImpl& client, int32_t coordId, Metadata::pfnColumnMetadataListener listener, void* listenerCtx):
DrillClientMetadataResult<Metadata::pfnColumnMetadataListener, meta::ColumnMetadata, meta::DrillColumnMetadata, exec::user::GetColumnsResp>(client, coordId, "getColumns", listener, listenerCtx, exec::user::COLUMNS) {}
};
// Length Decoder Function Pointer definition
typedef size_t (DrillClientImpl::*lengthDecoder)(const ByteBuf_t, uint32_t&);
class DrillClientImpl : public DrillClientImplBase{
public:
DrillClientImpl():
m_handshakeVersion(0),
m_handshakeStatus(exec::user::SUCCESS),
m_bIsConnected(false),
m_saslAuthenticator(NULL),
m_saslResultCode(SASL_OK),
m_saslDone(false),
m_pendingRequests(0),
m_pError(NULL),
m_pListenerThread(NULL),
m_pWork(NULL),
m_pChannel(NULL),
m_pChannelContext(NULL),
m_deadlineTimer(m_io_service),
m_heartbeatTimer(m_io_service),
m_rbuf(NULL),
m_wbuf(MAX_SOCK_RD_BUFSIZE),
m_bIsDirectConnection(false)
{
m_coordinationId=rand()%1729+1;
m_fpCurrentReadMsgHandler = &DrillClientImpl::readMsg;
m_fpCurrentSendHandler = &DrillClientImpl::sendSyncPlain;
};
~DrillClientImpl(){
//Cancel any pending requests
m_heartbeatTimer.cancel();
m_deadlineTimer.cancel();
m_io_service.stop();
//Free any record batches or buffers remaining
//Clear and destroy DrillClientQueryResults vector?
if(this->m_pWork!=NULL){
delete this->m_pWork;
this->m_pWork = NULL;
}
if(this->m_saslAuthenticator!=NULL){
delete this->m_saslAuthenticator;
this->m_saslAuthenticator = NULL;
}
{
boost::lock_guard<boost::mutex> lock(m_channelMutex);
if (this->m_pChannel != NULL) {
m_pChannel->close();
delete this->m_pChannel;
this->m_pChannel = NULL;
}
if (this->m_pChannelContext != NULL) {
delete this->m_pChannelContext;
this->m_pChannelContext = NULL;
}
}
if(m_rbuf!=NULL){
Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL;
}
if(m_pError!=NULL){
delete m_pError; m_pError=NULL;
}
//Terminate and free the heartbeat thread
//if(this->m_pHeartbeatThread!=NULL){
// this->m_pHeartbeatThread->interrupt();
// this->m_pHeartbeatThread->join();
// delete this->m_pHeartbeatThread;
// this->m_pHeartbeatThread = NULL;
//}
//Terminate and free the listener thread
if(this->m_pListenerThread!=NULL){
this->m_pListenerThread->interrupt();
this->m_pListenerThread->join();
delete this->m_pListenerThread;
this->m_pListenerThread = NULL;
}
};
//Connect via Zookeeper or directly
connectionStatus_t connect(const char* connStr, DrillUserProperties* props);
connectionStatus_t connect(const char* host, const char* port, DrillUserProperties* props);
// test whether the client is active
bool Active();
void Close() ;
DrillClientError* getError(){ return m_pError;}
DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx);
DrillClientPrepareHandle* PrepareQuery(const std::string& plan, pfnPreparedStatementListener listener, void* listenerCtx);
DrillClientQueryResult* ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx);
void waitForResults();
connectionStatus_t validateHandshake(DrillUserProperties* props);
void freeQueryResources(DrillClientQueryHandle* pQryHandle){
delete pQryHandle;
};
const exec::user::RpcEndpointInfos& getServerInfos() const { return m_serverInfos; }
meta::DrillMetadata* getMetadata();
void freeMetadata(meta::DrillMetadata* metadata);
static bool clientNeedsAuthentication(const DrillUserProperties* userProperties);
bool handleComplexTypes(const DrillUserProperties* userProperties);
private:
friend class meta::DrillMetadata;
friend class DrillClientQueryHandle;
friend class DrillClientQueryResult;
friend class PooledDrillClientImpl;
struct compareQueryId{
bool operator()(const exec::shared::QueryId* q1, const exec::shared::QueryId* q2) const {
return q1->part1()<q2->part1() || (q1->part1()==q2->part1() && q1->part2() < q2->part2());
}
};
// Direct connection to a drillbit
// host can be name or ip address, port can be port number or name of service in /etc/services
connectionStatus_t connect(const char* host, const char* port);
void startHeartbeatTimer();// start or restart the heartbeat timer
connectionStatus_t sendHeartbeat(); // send a heartbeat to the server
void handleHeartbeatTimeout(const boost::system::error_code & err); // send a heartbeat. If send fails, broadcast error, close connection and bail out.
int32_t getNextCoordinationId(){ return ++m_coordinationId; };
// synchronous message send handlers
connectionStatus_t sendSyncCommon(rpc::OutBoundRpcMessage& msg);
connectionStatus_t sendSyncPlain();
connectionStatus_t sendSyncEncrypted();
// handshake
connectionStatus_t recvHandshake();
void handleHandshake(ByteBuf_t b, const boost::system::error_code& err, std::size_t bytes_transferred );
void handleHShakeReadTimeout(const boost::system::error_code & err);
// starts the listener thread that receives responses/messages from the server
void startMessageListener();
// Query results
void getNextResult();
// Read Message Handlers
status_t readMsg(const ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer, rpc::InBoundRpcMessage& msg);
status_t readAndDecryptMsg(const ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer, rpc::InBoundRpcMessage& msg);
status_t readLenBytesFromSocket(const ByteBuf_t bufWithLenField, AllocatedBufferPtr* bufferWithDataAndLenBytes,
uint32_t& lengthFieldLength, lengthDecoder lengthDecodeHandler);
void doReadFromSocket(ByteBuf_t inBuf, size_t bytesToRead, boost::system::error_code& errorCode);
void doWriteToSocket(const char* dataPtr, size_t bytesToWrite, boost::system::error_code& errorCode);
// Length decode handlers
size_t lengthDecode(const ByteBuf_t inBuf, uint32_t& rmsgLen);
size_t rpcLengthDecode(const ByteBuf_t inBuf, uint32_t& rmsgLen);
status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg);
status_t processQueryData(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg);
status_t processCancelledQueryResult( exec::shared::QueryId& qid, exec::shared::QueryResult* qr);
status_t processQueryId(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg );
status_t processPreparedStatement(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg );
status_t processCatalogsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg );
status_t processSchemasResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg );
status_t processTablesResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg );
status_t processColumnsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg );
status_t processServerMetaResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg );
DrillClientQueryResult* findQueryResult(const exec::shared::QueryId& qid);
status_t processQueryStatusResult( exec::shared::QueryResult* qr,
DrillClientQueryResult* pDrillClientQueryResult);
void handleReadTimeout(const boost::system::error_code & err);
void handleRead(ByteBuf_t inBuf, const boost::system::error_code & err, size_t bytes_transferred) ;
status_t validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData& qd, std::string& valError);
status_t validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valError);
bool validateResultRPCType(DrillClientQueryHandle* pQueryHandle, const rpc::InBoundRpcMessage& msg);
connectionStatus_t handleConnError(connectionStatus_t status, const std::string& msg);
connectionStatus_t handleConnError(DrillClientError* err);
status_t handleQryCancellation(status_t status, DrillClientQueryResult* pQueryResult);
status_t handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle* pQueryHandle);
status_t handleQryError(status_t status, const exec::shared::DrillPBError& e, DrillClientQueryHandle* pQueryHandle);
// handle query state indicating query is COMPLETED or CANCELED
// (i.e., COMPLETED or CANCELED)
status_t handleTerminatedQryState(status_t status,
const std::string& msg,
DrillClientQueryResult* pQueryResult);
void broadcastError(DrillClientError* pErr);
void removeQueryHandle(DrillClientQueryHandle* pQueryHandle);
void sendAck(const rpc::InBoundRpcMessage& msg, bool isOk);
void sendCancel(const exec::shared::QueryId* pQueryId);
template<typename Handle>
Handle* sendMsg(boost::function<Handle*(int32_t)> handleFactory, ::exec::user::RpcType type, const ::google::protobuf::Message& msg);
// metadata requests
DrillClientCatalogResult* getCatalogs(const std::string& catalogPattern, const std::string& searchEscapeString, Metadata::pfnCatalogMetadataListener listener, void* listenerCtx);
DrillClientSchemaResult* getSchemas(const std::string& catalogPattern, const std::string& schemaPattern, const std::string& searchEscapeString, Metadata::pfnSchemaMetadataListener listener, void* listenerCtx);
DrillClientTableResult* getTables(const std::string& catalogPattern, const std::string& schemaPattern, const std::string& tablePattern, const std::vector<std::string>* tableTypes, const std::string& searchEscapeString, Metadata::pfnTableMetadataListener listener, void* listenerCtx);
DrillClientColumnResult* getColumns(const std::string& catalogPattern, const std::string& schemaPattern, const std::string& tablePattern, const std::string& columnPattern, const std::string& searchEscapeString, Metadata::pfnColumnMetadataListener listener, void* listenerCtx);
// SASL exchange
connectionStatus_t handleAuthentication(const DrillUserProperties *userProperties);
void initiateAuthentication();
void sendSaslResponse(const exec::shared::SaslMessage& response);
void processSaslChallenge(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg);
void finishAuthentication();
void shutdownSocket();
bool clientNeedsEncryption(const DrillUserProperties* userProperties);
int32_t m_coordinationId;
int32_t m_handshakeVersion;
exec::user::HandshakeStatus m_handshakeStatus;
std::string m_handshakeErrorId;
std::string m_handshakeErrorMsg;
exec::user::RpcEndpointInfos m_serverInfos;
std::vector<exec::user::RpcType> m_supportedMethods;
bool m_bIsConnected;
std::vector<std::string> m_serverAuthMechanisms;
SaslAuthenticatorImpl* m_saslAuthenticator;
int m_saslResultCode;
bool m_saslDone;
boost::mutex m_saslMutex; // mutex to protect m_saslDone
// mutex to protect deallocation of sasl connection
boost::mutex m_sasl_dispose_mutex;
boost::condition_variable m_saslCv; // to signal completion of SASL exchange
// Used for encryption and is set when server notifies in first handshake response.
EncryptionContext m_encryptionCtxt;
// Function pointer for read and send handler. By default these are referred to handler for plain message read/send. When encryption is enabled
// then after successful handshake these pointers refer to handler for encrypted message read/send over wire.
status_t (DrillClientImpl::*m_fpCurrentReadMsgHandler)(ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer, rpc::InBoundRpcMessage& msg);
connectionStatus_t (DrillClientImpl::*m_fpCurrentSendHandler)();
std::string m_connectStr;
//
// number of outstanding read requests.
// handleRead will keep asking for more results as long as this number is not zero.
size_t m_pendingRequests;
//mutex to protect m_pendingRequests
boost::mutex m_prMutex;
// Error Object. NULL if no error. Set if the error is valid for ALL running queries.
// All the query result objects will
// also have the error object set.
// If the error is query specific, only the query results object will have the error set.
DrillClientError* m_pError;
//Started after the connection is established and sends heartbeat messages after {heartbeat frequency} seconds
//The thread is killed on disconnect.
//boost::thread * m_pHeartbeatThread;
// for boost asio
boost::thread * m_pListenerThread;
boost::asio::io_service m_io_service;
// the work object prevent io_service running out of work
boost::asio::io_service::work * m_pWork;
// Mutex to protect channel
boost::mutex m_channelMutex;
Channel* m_pChannel;
ChannelContext_t* m_pChannelContext;
boost::asio::deadline_timer m_deadlineTimer; // to timeout async queries that never return
boost::asio::deadline_timer m_heartbeatTimer; // to send heartbeat messages
std::string m_connectedHost; // The hostname and port the socket is connected to.
//for synchronous messages, like validate handshake
ByteBuf_t m_rbuf; // buffer for receiving synchronous messages
DataBuf m_wbuf; // buffer for sending synchronous message
// Mutex to protect drill client operations
boost::mutex m_dcMutex;
// Map of coordination id to Query handles.
std::map<int, DrillClientQueryHandle*> m_queryHandles;
// Map of query id to query result for currently executing queries
std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId> m_queryResults;
// Condition variable to signal completion of all queries.
boost::condition_variable m_cv;
bool m_bIsDirectConnection;
};
inline bool DrillClientImpl::Active() {
return this->m_bIsConnected;
}
/* *
* Provides the same public interface as a DrillClientImpl but holds a pool of DrillClientImpls.
* Every submitQuery uses a different DrillClientImpl to distribute the load.
* DrillClient can use this class instead of DrillClientImpl to get better load balancing.
* */
class PooledDrillClientImpl : public DrillClientImplBase{
public:
PooledDrillClientImpl():
m_lastConnection(-1),
m_queriesExecuted(0),
m_maxConcurrentConnections(DEFAULT_MAX_CONCURRENT_CONNECTIONS),
m_bIsDirectConnection(false),
m_pError(NULL),
m_pUserProperties() {
char* maxConn=std::getenv(MAX_CONCURRENT_CONNECTIONS_ENV);
if(maxConn!=NULL){
m_maxConcurrentConnections=atoi(maxConn);
}
}
~PooledDrillClientImpl(){
for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
delete *it;
}
m_clientConnections.clear();
if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
}
//Connect via Zookeeper or directly.
//Makes an initial connection to a drillbit. successful connect adds the first drillbit to the pool.
connectionStatus_t connect(const char* connStr, DrillUserProperties* props);
// Test whether the client is active. Returns true if any one of the underlying connections is active
bool Active();
// Closes all open connections.
void Close() ;
// Returns the last error encountered by any of the underlying executing queries or connections
DrillClientError* getError();
// Submits a query to a drillbit. If more than one query is to be sent, we may choose a
// a different drillbit in the pool. No more than m_maxConcurrentConnections will be allowed.
// Connections once added to the pool will be removed only when the DrillClient is closed.
DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx);
DrillClientPrepareHandle* PrepareQuery(const std::string& plan, pfnPreparedStatementListener listener, void* listenerCtx);
DrillClientQueryResult* ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx);
//Waits as long as any one drillbit connection has results pending
void waitForResults();
//Validates handshake only against the first drillbit connected to.
connectionStatus_t validateHandshake(DrillUserProperties* props);
void freeQueryResources(DrillClientQueryHandle* pQueryHandle);
int getDrillbitCount() const { return m_drillbits.size();};
meta::DrillMetadata* getMetadata();
void freeMetadata(meta::DrillMetadata* metadata);
private:
std::string m_connectStr;
std::string m_lastQuery;
// A list of all the current client connections. We choose a new one for every query.
// When picking a drillClientImpl to use, we see how many queries each drillClientImpl
// is currently executing. If none,
std::vector<DrillClientImpl*> m_clientConnections;
boost::mutex m_poolMutex; // protect access to the vector
// Use this to decide which drillbit to select next from the list of drillbits.
size_t m_lastConnection;
boost::mutex m_cMutex;
// Number of queries executed so far. Can be used to select a new Drillbit from the pool.
size_t m_queriesExecuted;
size_t m_maxConcurrentConnections;
bool m_bIsDirectConnection;
DrillClientError* m_pError;
connectionStatus_t handleConnError(connectionStatus_t status, std::string msg);
// get a connection from the pool or create a new one. Return NULL if none is found
DrillClientImpl* getOneConnection();
std::vector<std::string> m_drillbits;
boost::shared_ptr<DrillUserProperties> m_pUserProperties;//Keep a copy of user properties
};
} // namespace Drill
#endif