blob: ada63e1137b01c503d2383e255afafc95bdf6bac [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef DRILL_CLIENT_IMPL_H
#define DRILL_CLIENT_IMPL_H
#include "drill/common.hpp"
// Define some BOOST defines
// WIN32_SHUTDOWN_ON_TIMEOUT is defined in "drill/common.hpp" for Windows 32 bit platform
#ifndef WIN32_SHUTDOWN_ON_TIMEOUT
#define BOOST_ASIO_ENABLE_CANCELIO
#endif //WIN32_SHUTDOWN_ON_TIMEOUT
#include <stdlib.h>
#include <time.h>
#include <queue>
#include <vector>
#include <boost/asio.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/thread.hpp>
#ifdef _WIN32
#include <zookeeper.h>
#else
#include <zookeeper/zookeeper.h>
#endif
#include "drill/drillClient.hpp"
#include "rpcEncoder.hpp"
#include "rpcDecoder.hpp"
#include "utils.hpp"
#include "User.pb.h"
#include "UserBitShared.pb.h"
namespace Drill {
class DrillClientImpl;
class InBoundRpcMessage;
class OutBoundRpcMessage;
class RecordBatch;
class RpcEncoder;
class RpcDecoder;
class DrillClientQueryResult{
friend class DrillClientImpl;
public:
DrillClientQueryResult(DrillClientImpl * pClient, uint64_t coordId):
m_pClient(pClient),
m_coordinationId(coordId),
m_numBatches(0),
m_columnDefs(new std::vector<Drill::FieldMetadata*>),
m_bIsQueryPending(true),
m_bIsLastChunk(false),
m_bCancel(false),
m_bHasSchemaChanged(false),
m_bHasData(false),
m_bHasError(false),
m_queryState(exec::shared::QueryResult_QueryState_PENDING),
m_pError(NULL),
m_pQueryId(NULL),
m_pSchemaListener(NULL),
m_pResultsListener(NULL),
m_pListenerCtx(NULL) {
};
~DrillClientQueryResult(){
this->clearAndDestroy();
};
// get data asynchronously
void registerListener(pfnQueryResultsListener listener, void* listenerCtx){
this->m_pResultsListener=listener;
this->m_pListenerCtx = listenerCtx;
}
void registerSchemaChangeListener(pfnSchemaListener l){
m_pSchemaListener=l;
}
// Synchronous call to get data. Caller assumes ownership of the recod batch
// returned and it is assumed to have been consumed.
RecordBatch* getNext();
// Synchronous call to get a look at the next Record Batch. This
// call does not move the current pointer forward. Repeatied calls
// to peekNext return the same value until getNext is called.
RecordBatch* peekNext();
// Blocks until data is available.
void waitForData();
// placeholder to return an empty col def vector when calls are made out of order.
static FieldDefPtr s_emptyColDefs;
FieldDefPtr getColumnDefs(){
boost::lock_guard<boost::mutex> bufferLock(this->m_schemaMutex);
return this->m_columnDefs;
}
void cancel();
bool isCancelled(){return this->m_bCancel;};
bool hasSchemaChanged(){return this->m_bHasSchemaChanged;};
int32_t getCoordinationId(){ return this->m_coordinationId;}
void setQueryId(exec::shared::QueryId* q){this->m_pQueryId=q;}
void* getListenerContext() {return this->m_pListenerCtx;}
exec::shared::QueryId& getQueryId(){ return *(this->m_pQueryId); }
bool hasError(){ return m_bHasError;}
status_t getErrorStatus(){ return m_pError!=NULL?(status_t)m_pError->status:QRY_SUCCESS;}
const DrillClientError* getError(){ return m_pError;}
void setQueryStatus(status_t s){ m_status = s;}
status_t getQueryStatus(){ return m_status;}
void setQueryState(exec::shared::QueryResult_QueryState s){ m_queryState = s;}
exec::shared::QueryResult_QueryState getQueryState(){ return m_queryState;}
void setIsQueryPending(bool isPending){
boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
m_bIsQueryPending=isPending;
}
private:
status_t setupColumnDefs(exec::shared::QueryData* pQueryData);
status_t defaultQueryResultsListener(void* ctx, RecordBatch* b, DrillClientError* err);
// Construct a DrillClientError object, set the appropriate state and signal any listeners, condition variables.
// Also used when a query is cancelled or when a query completed response is received.
// Error object is now owned by the DrillClientQueryResult object.
void signalError(DrillClientError* pErr);
void signalComplete();
void clearAndDestroy();
DrillClientImpl* m_pClient;
int32_t m_coordinationId;
size_t m_numBatches; // number of record batches received so far
// Vector of Buffers holding data returned by the server
// Each data buffer is decoded into a RecordBatch
std::vector<ByteBuf_t> m_dataBuffers;
std::queue<RecordBatch*> m_recordBatches;
FieldDefPtr m_columnDefs;
// Mutex to protect schema definitions
boost::mutex m_schemaMutex;
// Mutex for Cond variable for read write to batch vector
boost::mutex m_cvMutex;
// Condition variable to signal arrival of more data. Condition variable is signaled
// if the recordBatches queue is not empty
boost::condition_variable m_cv;
// state
// if m_bIsQueryPending is true, we continue to wait for results
bool m_bIsQueryPending;
bool m_bIsLastChunk;
bool m_bCancel;
bool m_bHasSchemaChanged;
bool m_bHasData;
bool m_bHasError;
// state in the last query result received from the server.
exec::shared::QueryResult_QueryState m_queryState;
const DrillClientError* m_pError;
exec::shared::QueryId* m_pQueryId;
status_t m_status;
// Schema change listener
pfnSchemaListener m_pSchemaListener;
// Results callback
pfnQueryResultsListener m_pResultsListener;
// Listener context
void * m_pListenerCtx;
};
class DrillClientImpl{
public:
DrillClientImpl():
m_coordinationId(1),
m_handshakeVersion(0),
m_handshakeStatus(exec::user::SUCCESS),
m_bIsConnected(false),
m_pendingRequests(0),
m_pError(NULL),
m_pListenerThread(NULL),
m_pWork(NULL),
m_socket(m_io_service),
m_deadlineTimer(m_io_service),
m_heartbeatTimer(m_io_service),
m_rbuf(NULL),
m_wbuf(MAX_SOCK_RD_BUFSIZE)
{
srand(time(NULL));
m_coordinationId=rand()%1729+1;
};
~DrillClientImpl(){
//TODO: Cleanup.
//Free any record batches or buffers remaining
//Cancel any pending requests
//Clear and destroy DrillClientQueryResults vector?
if(this->m_pWork!=NULL){
delete this->m_pWork;
this->m_pWork = NULL;
}
m_heartbeatTimer.cancel();
m_deadlineTimer.cancel();
m_io_service.stop();
boost::system::error_code ignorederr;
m_socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ignorederr);
m_socket.close();
if(m_rbuf!=NULL){
Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL;
}
if(m_pError!=NULL){
delete m_pError; m_pError=NULL;
}
//Terminate and free the heartbeat thread
//if(this->m_pHeartbeatThread!=NULL){
// this->m_pHeartbeatThread->interrupt();
// this->m_pHeartbeatThread->join();
// delete this->m_pHeartbeatThread;
// this->m_pHeartbeatThread = NULL;
//}
//Terminate and free the listener thread
if(this->m_pListenerThread!=NULL){
this->m_pListenerThread->interrupt();
this->m_pListenerThread->join();
delete this->m_pListenerThread;
this->m_pListenerThread = NULL;
}
};
//Connect via Zookeeper or directly
connectionStatus_t connect(const char* connStr);
// test whether the client is active
bool Active();
void Close() ;
DrillClientError* getError(){ return m_pError;}
DrillClientQueryResult* SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx);
void waitForResults();
connectionStatus_t validateHandshake(DrillUserProperties* props);
private:
friend class DrillClientQueryResult;
struct compareQueryId{
bool operator()(const exec::shared::QueryId* q1, const exec::shared::QueryId* q2) const {
return q1->part1()<q2->part1() || (q1->part1()==q2->part1() && q1->part2() < q2->part2());
}
};
// Direct connection to a drillbit
// host can be name or ip address, port can be port number or name of service in /etc/services
connectionStatus_t connect(const char* host, const char* port);
void startHeartbeatTimer();// start a heartbeat timer
connectionStatus_t sendHeartbeat(); // send a heartbeat to the server
void resetHeartbeatTimer(); // reset the heartbeat timer (called every time one sends a message to the server (after sendAck, or submitQuery)
void handleHeartbeatTimeout(const boost::system::error_code & err); // send a heartbeat. If send fails, broadcast error, close connection and bail out.
int32_t getNextCoordinationId(){ return ++m_coordinationId; };
void parseConnectStr(const char* connectStr, std::string& pathToDrill, std::string& protocol, std::string& hostPortStr);
// send synchronous messages
//connectionStatus_t recvSync(InBoundRpcMessage& msg);
connectionStatus_t sendSync(OutBoundRpcMessage& msg);
// handshake
connectionStatus_t recvHandshake();
void handleHandshake(ByteBuf_t b, const boost::system::error_code& err, std::size_t bytes_transferred );
void handleHShakeReadTimeout(const boost::system::error_code & err);
// starts the listener thread that receives responses/messages from the server
void startMessageListener();
// Query results
void getNextResult();
status_t readMsg(
ByteBuf_t _buf,
AllocatedBufferPtr* allocatedBuffer,
InBoundRpcMessage& msg,
boost::system::error_code& error);
status_t processQueryResult(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg);
status_t processQueryData(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg);
status_t processCancelledQueryResult( exec::shared::QueryId& qid, exec::shared::QueryResult* qr);
status_t processQueryId(AllocatedBufferPtr allocatedBuffer, InBoundRpcMessage& msg );
DrillClientQueryResult* findQueryResult(exec::shared::QueryId& qid);
status_t processQueryStatusResult( exec::shared::QueryResult* qr,
DrillClientQueryResult* pDrillClientQueryResult);
void handleReadTimeout(const boost::system::error_code & err);
void handleRead(ByteBuf_t _buf, const boost::system::error_code & err, size_t bytes_transferred) ;
status_t validateDataMessage(InBoundRpcMessage& msg, exec::shared::QueryData& qd, std::string& valError);
status_t validateResultMessage(InBoundRpcMessage& msg, exec::shared::QueryResult& qr, std::string& valError);
connectionStatus_t handleConnError(connectionStatus_t status, std::string msg);
status_t handleQryError(status_t status, std::string msg, DrillClientQueryResult* pQueryResult);
status_t handleQryError(status_t status,
const exec::shared::DrillPBError& e,
DrillClientQueryResult* pQueryResult);
// handle query state indicating query is COMPELTED or CANCELED
// (i.e., COMPELTED or CANCELED)
status_t handleTerminatedQryState(status_t status,
std::string msg,
DrillClientQueryResult* pQueryResult);
void broadcastError(DrillClientError* pErr);
void clearMapEntries(DrillClientQueryResult* pQueryResult);
void sendAck(InBoundRpcMessage& msg, bool isOk);
void sendCancel(exec::shared::QueryId* pQueryId);
void shutdownSocket();
static RpcEncoder s_encoder;
static RpcDecoder s_decoder;
int32_t m_coordinationId;
int32_t m_handshakeVersion;
exec::user::HandshakeStatus m_handshakeStatus;
std::string m_handshakeErrorId;
std::string m_handshakeErrorMsg;
bool m_bIsConnected;
// number of outstanding read requests.
// handleRead will keep asking for more results as long as this number is not zero.
size_t m_pendingRequests;
//mutex to protect m_pendingRequests
boost::mutex m_prMutex;
// Error Object. NULL if no error. Set if the error is valid for ALL running queries.
// All the query result objects will
// also have the error object set.
// If the error is query specific, only the query results object will have the error set.
DrillClientError* m_pError;
//Started after the connection is established and sends heartbeat messages after {heartbeat frequency} seconds
//The thread is killed on disconnect.
//boost::thread * m_pHeartbeatThread;
// for boost asio
boost::thread * m_pListenerThread;
boost::asio::io_service m_io_service;
// the work object prevent io_service running out of work
boost::asio::io_service::work * m_pWork;
boost::asio::ip::tcp::socket m_socket;
boost::asio::deadline_timer m_deadlineTimer; // to timeout async queries that never return
boost::asio::deadline_timer m_heartbeatTimer; // to send heartbeat messages
//for synchronous messages, like validate handshake
ByteBuf_t m_rbuf; // buffer for receiving synchronous messages
DataBuf m_wbuf; // buffer for sending synchronous message
// Mutex to protect drill client operations
boost::mutex m_dcMutex;
// Map of coordination id to Query Ids.
std::map<int, DrillClientQueryResult*> m_queryIds;
// Map of query id to query result for currently executing queries
std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId> m_queryResults;
// Condition variable to signal completion of all queries.
boost::condition_variable m_cv;
};
inline bool DrillClientImpl::Active() {
return this->m_bIsConnected;;
}
class ZookeeperImpl{
public:
ZookeeperImpl();
~ZookeeperImpl();
static ZooLogLevel getZkLogLevel();
// comma separated host:port pairs, each corresponding to a zk
// server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
int connectToZookeeper(const char* connectStr, const char* pathToDrill);
void close();
static void watcher(zhandle_t *zzh, int type, int state, const char *path, void* context);
void debugPrint();
std::string& getError(){return m_err;}
const exec::DrillbitEndpoint& getEndPoint(){ return m_drillServiceInstance.endpoint();}
private:
static char s_drillRoot[];
static char s_defaultCluster[];
zhandle_t* m_zh;
clientid_t m_id;
int m_state;
std::string m_err;
struct String_vector* m_pDrillbits;
boost::mutex m_cvMutex;
// Condition variable to signal connection callback has been processed
boost::condition_variable m_cv;
bool m_bConnecting;
exec::DrillServiceInstance m_drillServiceInstance;
};
} // namespace Drill
#endif