blob: 24f9037d4d8b11d9da26014fcab53c0f85ba6807 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "drill/common.hpp"
#include <queue>
#include <boost/algorithm/string.hpp>
#include <boost/asio.hpp>
#include <boost/assign.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/functional/factory.hpp>
#include <boost/thread.hpp>
#include "drill/drillClient.hpp"
#include "drill/fieldmeta.hpp"
#include "drill/recordBatch.hpp"
#include "drill/userProperties.hpp"
#include "drillClientImpl.hpp"
#include "errmsgs.hpp"
#include "logger.hpp"
#include "zookeeperClient.hpp"
namespace Drill{
namespace { // anonymous namespace
static std::map<exec::shared::QueryResult_QueryState, status_t> QUERYSTATE_TO_STATUS_MAP = boost::assign::map_list_of
(exec::shared::QueryResult_QueryState_STARTING, QRY_PENDING)
(exec::shared::QueryResult_QueryState_RUNNING, QRY_RUNNING)
(exec::shared::QueryResult_QueryState_COMPLETED, QRY_COMPLETED)
(exec::shared::QueryResult_QueryState_CANCELED, QRY_CANCELED)
(exec::shared::QueryResult_QueryState_FAILED, QRY_FAILED)
;
static std::string debugPrintQid(const exec::shared::QueryId& qid){
return std::string("[")+boost::lexical_cast<std::string>(qid.part1()) +std::string(":") + boost::lexical_cast<std::string>(qid.part2())+std::string("] ");
}
// Convertion helper
struct ToRpcType: public std::unary_function<google::protobuf::int32, exec::user::RpcType> {
exec::user::RpcType operator() (google::protobuf::int32 i) const {
return static_cast<exec::user::RpcType>(i);
}
};
} // anonymous
connectionStatus_t DrillClientImpl::connect(const char* connStr, DrillUserProperties* props){
if (this->m_bIsConnected) {
if(!std::strcmp(connStr, m_connectStr.c_str())){
// trying to connect to a different address is not allowed if already connected
return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN));
}
return CONN_SUCCESS;
}
std::string val;
channelType_t type = ( props->isPropSet(USERPROP_USESSL) &&
props->getProp(USERPROP_USESSL, val) =="true") ?
CHANNEL_TYPE_SSLSTREAM :
CHANNEL_TYPE_SOCKET;
connectionStatus_t ret = CONN_SUCCESS;
m_pChannel= ChannelFactory::getChannel(type, m_io_service, connStr, props);
ret=m_pChannel->init();
if(ret!=CONN_SUCCESS){
handleConnError(m_pChannel->getError());
return ret;
}
ret= m_pChannel->connect();
if(ret!=CONN_SUCCESS){
handleConnError(m_pChannel->getError());
return ret;
}
props->setDefaultProperty(USERPROP_SERVICE_HOST, m_pChannel->getEndpoint()->getHost());
m_bIsConnected = true;
return ret;
}
connectionStatus_t DrillClientImpl::connect(const char* host, const char* port, DrillUserProperties* props){
if (this->m_bIsConnected) {
std::string connStr = std::string(host)+":"+std::string(port);
if(!std::strcmp(connStr.c_str(), m_connectStr.c_str())){
// trying to connect to a different address is not allowed if already connected
return handleConnError(CONN_ALREADYCONNECTED, getMessage(ERR_CONN_ALREADYCONN));
}
return CONN_SUCCESS;
}
std::string val;
channelType_t type = ( props->isPropSet(USERPROP_USESSL) &&
props->getProp(USERPROP_USESSL, val) =="true") ?
CHANNEL_TYPE_SSLSTREAM :
CHANNEL_TYPE_SOCKET;
connectionStatus_t ret = CONN_SUCCESS;
m_pChannel= ChannelFactory::getChannel(type, m_io_service, host, port, props);
ret=m_pChannel->init();
if(ret!=CONN_SUCCESS){
handleConnError(m_pChannel->getError());
return ret;
}
ret=m_pChannel->connect();
if(ret!=CONN_SUCCESS){
handleConnError(m_pChannel->getError());
return ret;
}
props->setDefaultProperty(USERPROP_SERVICE_HOST, m_pChannel->getEndpoint()->getHost());
m_bIsConnected = true;
return ret;
}
void DrillClientImpl::startHeartbeatTimer(){
if (DrillClientConfig::getHeartbeatFrequency() > 0) {
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new heartbeat timer with "
<< DrillClientConfig::getHeartbeatFrequency()
<< " seconds." << std::endl;)
m_heartbeatTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getHeartbeatFrequency()));
m_heartbeatTimer.async_wait(boost::bind(
&DrillClientImpl::handleHeartbeatTimeout,
this,
boost::asio::placeholders::error
));
startMessageListener(); // start this thread early so we don't have the timer blocked
}
}
connectionStatus_t DrillClientImpl::sendHeartbeat(){
connectionStatus_t status=CONN_SUCCESS;
exec::rpc::Ack ack;
ack.set_ok(true);
rpc::OutBoundRpcMessage heartbeatMsg(exec::rpc::PING, exec::user::ACK/*can be anything */, 0, &ack);
boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
boost::lock_guard<boost::mutex> lock(m_dcMutex);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Heartbeat sent." << std::endl;)
status=sendSyncCommon(heartbeatMsg);
status=status==CONN_SUCCESS?status:CONN_DEAD;
//If the server sends responses to a heartbeat, we need to increment the pending requests counter.
if(m_pendingRequests++==0){
getNextResult(); // async wait for results
}
return status;
}
void DrillClientImpl::handleHeartbeatTimeout(const boost::system::error_code & err){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: Heartbeat timer expired." << std::endl;)
if(err != boost::asio::error::operation_aborted){
// Check whether the deadline has passed.
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::Heartbeat Timer - Expires at: "
<< to_simple_string(m_heartbeatTimer.expires_at())
<< " and time now is: "
<< to_simple_string(boost::asio::deadline_timer::traits_type::now())
<< std::endl;)
;
if (m_heartbeatTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
// The deadline has passed.
m_heartbeatTimer.expires_at(boost::posix_time::pos_infin);
if(sendHeartbeat()==CONN_SUCCESS){
startHeartbeatTimer();
}else{
// Close connection.
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl:: No heartbeat. Closing connection.";)
shutdownSocket();
//broadcast to any executing queries
handleConnError(CONN_FAILURE, getMessage(ERR_QRY_COMMERR, "Connection to drillbit lost."));
}
}
}
return;
}
void DrillClientImpl::Close() {
shutdownSocket();
}
/*
* Write bytesToWrite length data bytes pointed by dataPtr. It handles EINTR error
* occurred during write_some sys call and does a retry on that.
*
* Parameters:
* dataPtr - in param - Pointer to data bytes to write on socket.
* bytesToWrite - in param - Length of data bytes to write from dataPtr.
* errorCode - out param - Error code set by boost.
*/
void DrillClientImpl::doWriteToSocket(const char* dataPtr, size_t bytesToWrite,
boost::system::error_code& errorCode) {
if(0 == bytesToWrite) {
return;
}
// Write all the bytes to socket. In case of error when all bytes are not successfully written
// proper errorCode will be set.
while(1) {
size_t bytesWritten;
{
boost::lock_guard<boost::mutex> lock(m_channelMutex);
if(m_pChannel==NULL){
return;
}
bytesWritten = m_pChannel->getSocketStream().writeSome(boost::asio::buffer(dataPtr, bytesToWrite),
errorCode);
}
if(errorCode && boost::asio::error::interrupted != errorCode){
break;
}
// Update the state
bytesToWrite -= bytesWritten;
dataPtr += bytesWritten;
// Check if all the data is written then break from loop
if(0 == bytesToWrite) break;
}
}
/*
* Common wrapper to take care of sending both plain or encrypted message. It creates a send buffer from an
* OutboundRPCMessage and then call the send handler pointing to either sendSyncPlain or sendSyncEncrypted
*
* Return:
* connectionStatus_t - CONN_SUCCESS - In case of successful send
* - CONN_FAILURE - In case of failure to send
*/
connectionStatus_t DrillClientImpl::sendSyncCommon(rpc::OutBoundRpcMessage& msg) {
encode(m_wbuf, msg);
return (this->*m_fpCurrentSendHandler)();
}
/*
* Send handler for sending plain messages over wire
*
* Return:
* connectionStatus_t - CONN_SUCCESS - In case of successful send
* - CONN_FAILURE - In case of failure to send
*/
connectionStatus_t DrillClientImpl::sendSyncPlain(){
boost::system::error_code ec;
doWriteToSocket(reinterpret_cast<char*>(m_wbuf.data()), m_wbuf.size(), ec);
if(!ec) {
return CONN_SUCCESS;
} else {
return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_WFAIL, ec.message().c_str()));
}
}
/*
* Send handler for sending encrypted messages over wire. It encrypts the send buffer using wrap api provided by
* saslAuthenticatorImpl and then transmit the encrypted bytes over wire.
*
* Return:
* connectionStatus_t - CONN_SUCCESS - In case of successful send
* - CONN_FAILURE - In case of failure to send
*/
connectionStatus_t DrillClientImpl::sendSyncEncrypted() {
boost::system::error_code ec;
// Encoded message is encrypted into chunks of size <= WrapSizeLimit. Each encrypted chunk along with
// its encrypted length in network order (added by Cyrus-SASL plugin) is sent over wire.
const int wrapChunkSize = m_encryptionCtxt.getWrapSizeLimit();
int lengthToEncrypt = m_wbuf.size();
int currentChunkLen = std::min(wrapChunkSize, lengthToEncrypt);
uint32_t currentChunkOffset = 0;
std::stringstream errorMsg;
// Encrypt and send each chunk
while(lengthToEncrypt != 0) {
const char* wrappedChunk = NULL;
uint32_t wrappedLen = 0;
const int wrapResult = m_saslAuthenticator->wrap(reinterpret_cast<const char*>(m_wbuf.data() + currentChunkOffset),
currentChunkLen, &wrappedChunk, wrappedLen);
if(SASL_OK != wrapResult) {
errorMsg << "Sasl wrap failed while encrypting chunk of length: " << currentChunkLen << " , EncodeError: "
<< wrapResult;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::sendSyncEncrypted - " << errorMsg.str()
<< " ,ChunkOffset: " << currentChunkOffset << ", Message Len: " << m_wbuf.size()
<< ", Closing connection.";)
return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_WFAIL, errorMsg.str().c_str()));
}
// Send the encrypted chunk.
doWriteToSocket(wrappedChunk, wrappedLen, ec);
if(ec) {
errorMsg << "Failure while sending encrypted chunk. Error: " << ec.message().c_str();
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::sendSyncEncrypted - " << errorMsg.str()
<< ", Chunk Length: " << currentChunkLen << ", ChunkOffset:" << currentChunkOffset
<< ", Message Len: " << m_wbuf.size() << ", Closing connection.";)
return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_WFAIL, errorMsg.str().c_str()));
}
// Update variables after sending each encrypted chunk
lengthToEncrypt -= currentChunkLen;
currentChunkOffset += currentChunkLen;
currentChunkLen = std::min(wrapChunkSize, lengthToEncrypt);
}
return CONN_SUCCESS;
}
connectionStatus_t DrillClientImpl::recvHandshake(){
if(m_rbuf==NULL){
m_rbuf = Utils::allocateBuffer(MAX_SOCK_RD_BUFSIZE);
}
m_io_service.reset();
int32_t handshakeTimeout=DrillClientConfig::getHandshakeTimeout();
if (handshakeTimeout > 0){
m_deadlineTimer.expires_from_now(boost::posix_time::seconds(handshakeTimeout));
m_deadlineTimer.async_wait(boost::bind(
&DrillClientImpl::handleHShakeReadTimeout,
this,
boost::asio::placeholders::error
));
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new handshake wait timer with "
<< DrillClientConfig::getHandshakeTimeout() << " seconds." << std::endl;)
}
{
boost::lock_guard<boost::mutex> lock(m_channelMutex);
if (m_pChannel == NULL) {
return CONN_NOSOCKET;
}
m_pChannel->getSocketStream().asyncRead(
boost::asio::buffer(m_rbuf, LEN_PREFIX_BUFLEN),
boost::bind(
&DrillClientImpl::handleHandshake,
this,
m_rbuf,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)
);
}
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::recvHandshake: async read waiting for server handshake response.\n";)
m_io_service.run();
if(m_rbuf!=NULL){
Utils::freeBuffer(m_rbuf, MAX_SOCK_RD_BUFSIZE); m_rbuf=NULL;
}
if (m_pError != NULL) {
DRILL_MT_LOG(DRILL_LOG(LOG_ERROR) << "DrillClientImpl::recvHandshake: failed to complete handshake with server."
<< m_pError->msg << "\n";)
return static_cast<connectionStatus_t>(m_pError->status);
}
startHeartbeatTimer();
return CONN_SUCCESS;
}
/*
* Read bytesToRead length data bytes from socket into inBuf. It handles EINTR error
* occurred during read_some sys call and does a retry on that.
*
* Parameters:
* inBuf - out param - Pointer to buffer to read data into from socket.
* bytesToRead - in param - Length of data bytes to read from socket.
* errorCode - out param - Error code set by boost.
*/
void DrillClientImpl::doReadFromSocket(ByteBuf_t inBuf, size_t bytesToRead,
boost::system::error_code& errorCode) {
// Check if bytesToRead is zero
if(0 == bytesToRead) {
return;
}
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Socket read: reading " << bytesToRead << "data bytes" << std::endl;)
// Read all the bytes. In case when all the bytes were not read the proper
// errorCode will be set.
while(1){
size_t dataBytesRead;
{
boost::lock_guard<boost::mutex> lock(m_channelMutex);
if(m_pChannel==NULL){
return;
}
dataBytesRead = m_pChannel->getSocketStream().readSome(boost::asio::buffer(inBuf, bytesToRead),
errorCode);
}
// Check if errorCode is EINTR then just retry otherwise break from loop
if(errorCode && boost::asio::error::interrupted != errorCode){
break;
}
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Socket read: actual bytes read = " << dataBytesRead << std::endl;)
// Update the state
bytesToRead -= dataBytesRead;
inBuf += dataBytesRead;
// Check if all the data is read then break from loop
if(0 == bytesToRead) break;
}
}
void DrillClientImpl::handleHandshake(ByteBuf_t inBuf,
const boost::system::error_code& err,
size_t bytes_transferred) {
boost::system::error_code error=err;
// cancel the timer
m_deadlineTimer.cancel();
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Deadline timer cancelled." << std::endl;)
if(!error){
rpc::InBoundRpcMessage msg;
uint32_t length = 0;
std::size_t bytes_read = rpcLengthDecode(m_rbuf, length);
if(length>0){
const size_t leftover = LEN_PREFIX_BUFLEN - bytes_read;
const ByteBuf_t b = m_rbuf + LEN_PREFIX_BUFLEN;
const size_t bytesToRead=length - leftover;
doReadFromSocket(b, bytesToRead, error);
// Check if any error happen while reading the message bytes. If yes then return before decoding the Msg
if(error) {
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. "
<< " Failed to read entire handshake message. with error: "
<< error.message().c_str() << "\n";)
handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "Failed to read entire handshake message"));
return;
}
// Decode the bytes into a valid RPC Message
if (!decode(m_rbuf+bytes_read, length, msg)) {
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. Cannot decode handshake.\n";)
handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "Cannot decode handshake"));
return;
}
}else{
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleHandshake: ERR_CONN_RDFAIL. No handshake.\n";)
handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, "No handshake"));
return;
}
exec::user::BitToUserHandshake b2u;
b2u.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
this->m_handshakeVersion=b2u.rpc_version();
this->m_handshakeStatus=b2u.status();
this->m_handshakeErrorId=b2u.errorid();
this->m_handshakeErrorMsg=b2u.errormessage();
this->m_serverInfos = b2u.server_infos();
std::transform(b2u.supported_methods().begin(), b2u.supported_methods().end(),
std::back_inserter(this->m_supportedMethods),
ToRpcType());
for (int i=0; i<b2u.authenticationmechanisms_size(); i++) {
std::string mechanism = b2u.authenticationmechanisms(i);
boost::algorithm::to_lower(mechanism);
this->m_serverAuthMechanisms.push_back(mechanism);
}
// Updated encryption context based on server response
this->m_encryptionCtxt.setEncryptionReqd(b2u.has_encrypted() && b2u.encrypted());
if(b2u.has_maxwrappedsize()) {
this->m_encryptionCtxt.setMaxWrappedSize(b2u.maxwrappedsize());
}
}else{
// boost error
if(error==boost::asio::error::eof){ // Server broke off the connection
handleConnError(CONN_HANDSHAKE_FAILED,
getMessage(ERR_CONN_NOHSHAKE, DRILL_RPC_VERSION));
}else{
handleConnError(CONN_FAILURE, getMessage(ERR_CONN_RDFAIL, error.message().c_str()));
}
return;
}
return;
}
void DrillClientImpl::handleHShakeReadTimeout(const boost::system::error_code & err){
// if err == boost::asio::error::operation_aborted) then the caller cancelled the timer.
if(err != boost::asio::error::operation_aborted){
// Check whether the deadline has passed.
if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
// The deadline has passed.
m_deadlineTimer.expires_at(boost::posix_time::pos_infin);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::HandleHShakeReadTimeout: "
<< "Deadline timer expired; ERR_CONN_HSHAKETIMOUT.\n";)
handleConnError(CONN_HANDSHAKE_TIMEOUT, getMessage(ERR_CONN_HSHAKETIMOUT));
m_io_service.stop();
{
boost::lock_guard<boost::mutex> lock(m_channelMutex);
if(m_pChannel != NULL) m_pChannel->close();
}
}
}
return;
}
/*
* Check's if client has explicitly expressed interest in encrypted connections only. It looks for USERPROP_SASL_ENCRYPT
* connection string property. If set to true then returns true else returns false
*/
bool DrillClientImpl::clientNeedsEncryption(const DrillUserProperties* userProperties) {
bool needsEncryption = false;
// check if userProperties is null
if(!userProperties) {
return needsEncryption;
}
std::string val;
needsEncryption = userProperties->isPropSet(USERPROP_SASL_ENCRYPT) &&
boost::iequals(userProperties->getProp(USERPROP_SASL_ENCRYPT, val), "true") ;
return needsEncryption;
}
/*
* Checks if the client has explicitly expressed interest in authenticated connections only.
* If the USERPROP_AUTH_MECHANISM connection string properties is non-empty and not equal to PLAIN,
* then it is implied that the client wants authentication.
*
* Explicitly skipping PLAIN to maintain forward compatibility with 1.9 Drillbit and it doesn't matter
* if this security check is not there for PLAIN mechanism
*/
bool DrillClientImpl::clientNeedsAuthentication(const DrillUserProperties* userProperties) {
bool needsAuthentication = false;
if(!userProperties) { return needsAuthentication; }
std::string authMech = "";
userProperties->getProp(USERPROP_AUTH_MECHANISM, authMech);
boost::algorithm::to_lower(authMech);
if(!authMech.empty() && SaslAuthenticatorImpl::PLAIN_NAME != authMech) {
needsAuthentication = true;
}
return needsAuthentication;
}
/*
* Check's if client has explicitly expressed interest in supporting complex types. It looks for USERPROP_SUPPORT_COMPLEX_TYPES
* connection string property. If set to true then returns true else returns false
*/
bool DrillClientImpl::handleComplexTypes(const DrillUserProperties* userProperties) {
bool support_complex_types = false;
// check if userProperties is null
if (!userProperties) {
return support_complex_types;
}
std::string val;
support_complex_types = userProperties->isPropSet(USERPROP_SUPPORT_COMPLEX_TYPES) &&
boost::iequals(userProperties->getProp(USERPROP_SUPPORT_COMPLEX_TYPES, val), "true");
return support_complex_types;
}
connectionStatus_t DrillClientImpl::validateHandshake(DrillUserProperties* properties){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "validateHandShake\n";)
exec::user::UserToBitHandshake u2b;
u2b.set_channel(exec::shared::USER);
u2b.set_rpc_version(DRILL_RPC_VERSION);
u2b.set_support_listening(true);
u2b.set_support_timeout(DrillClientConfig::getHeartbeatFrequency() > 0);
u2b.set_sasl_support(exec::user::SASL_PRIVACY);
u2b.set_support_complex_types(handleComplexTypes(properties));
// Adding version info
exec::user::RpcEndpointInfos* infos = u2b.mutable_client_infos();
infos->set_name(DrillClientConfig::getClientName());
infos->set_application(DrillClientConfig::getApplicationName());
infos->set_version(DRILL_VERSION_STRING);
infos->set_majorversion(DRILL_VERSION_MAJOR);
infos->set_minorversion(DRILL_VERSION_MINOR);
infos->set_patchversion(DRILL_VERSION_PATCH);
if(properties != NULL && properties->size()>0){
std::string username;
std::string err;
if(!properties->validate(err)){
DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Invalid user input:" << err << std::endl;)
}
exec::user::UserProperties* userProperties = u2b.mutable_properties();
std::map<char,int>::iterator it;
for (std::map<std::string,std::string>::const_iterator propIter=properties->begin(); propIter!=properties->end(); ++propIter){
std::string currKey=propIter->first;
std::string currVal=propIter->second;
std::map<std::string,uint32_t>::const_iterator it=DrillUserProperties::USER_PROPERTIES.find(currKey);
if(it==DrillUserProperties::USER_PROPERTIES.end()){
DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << "Connection property ("<< currKey
<< ") is unknown" << std::endl;)
exec::user::Property* connProp = userProperties->add_properties();
connProp->set_key(currKey);
connProp->set_value(currVal);
continue;
}
if(IS_BITSET((*it).second,USERPROP_FLAGS_SERVERPROP)){
exec::user::Property* connProp = userProperties->add_properties();
connProp->set_key(currKey);
connProp->set_value(currVal);
//Username(but not the password) also needs to be set in UserCredentials
if(IS_BITSET((*it).second,USERPROP_FLAGS_USERNAME)){
exec::shared::UserCredentials* creds = u2b.mutable_credentials();
username=currVal;
creds->set_user_name(username);
//u2b.set_credentials(&creds);
}
if(IS_BITSET((*it).second,USERPROP_FLAGS_PASSWORD)){
DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << currKey << ": ********** " << std::endl;)
}else{
DRILL_MT_LOG(DRILL_LOG(LOG_INFO) << currKey << ":" << currVal << std::endl;)
}
}// Server properties
}
}
{
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
uint64_t coordId = this->getNextCoordinationId();
rpc::OutBoundRpcMessage out_msg(exec::rpc::REQUEST, exec::user::HANDSHAKE, coordId, &u2b);
sendSyncCommon(out_msg);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Sent handshake request message. Coordination id: " << coordId << "\n";)
}
connectionStatus_t ret = recvHandshake();
if(ret!=CONN_SUCCESS){
return ret;
}
switch(this->m_handshakeStatus) {
case exec::user::SUCCESS:
// Check if client needs auth/encryption and server is not requiring it
if(clientNeedsAuthentication(properties) || clientNeedsEncryption(properties)) {
return handleConnError(CONN_AUTH_FAILED, getMessage(ERR_CONN_NOSERVERAUTH));
}
// reset io_service after handshake is validated before running queries
m_io_service.reset();
return CONN_SUCCESS;
case exec::user::RPC_VERSION_MISMATCH:
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Invalid rpc version. Expected "
<< DRILL_RPC_VERSION << ", actual "<< m_handshakeVersion << "." << std::endl;)
return handleConnError(CONN_BAD_RPC_VER, getMessage(ERR_CONN_BAD_RPC_VER, DRILL_RPC_VERSION,
m_handshakeVersion,
this->m_handshakeErrorId.c_str(),
this->m_handshakeErrorMsg.c_str()));
case exec::user::AUTH_FAILED:
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Authentication failed." << std::endl;)
return handleConnError(CONN_AUTH_FAILED, getMessage(ERR_CONN_AUTHFAIL,
this->m_handshakeErrorId.c_str(),
this->m_handshakeErrorMsg.c_str()));
case exec::user::UNKNOWN_FAILURE:
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Unknown error during handshake." << std::endl;)
return handleConnError(CONN_HANDSHAKE_FAILED, getMessage(ERR_CONN_UNKNOWN_ERR,
this->m_handshakeErrorId.c_str(),
this->m_handshakeErrorMsg.c_str()));
case exec::user::AUTH_REQUIRED:
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Server requires SASL authentication." << std::endl;)
return handleAuthentication(properties);
default:
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Unknown return status." << std::endl;)
return handleConnError(CONN_HANDSHAKE_FAILED, getMessage(ERR_CONN_UNKNOWN_ERR,
this->m_handshakeErrorId.c_str(),
this->m_handshakeErrorMsg.c_str()));
}
}
connectionStatus_t DrillClientImpl::handleAuthentication(const DrillUserProperties *userProperties) {
// Check if client needs encryption and server is configured for encryption or not before starting handshake
if(clientNeedsEncryption(userProperties) && !m_encryptionCtxt.isEncryptionReqd()) {
return handleConnError(CONN_AUTH_FAILED, getMessage(ERR_CONN_NOSERVERENC));
}
try {
m_saslAuthenticator = new SaslAuthenticatorImpl(userProperties);
} catch (std::runtime_error& e) {
return handleConnError(CONN_AUTH_FAILED, e.what());
}
startMessageListener();
initiateAuthentication();
{ // block until SASL exchange is complete
boost::mutex::scoped_lock lock(m_saslMutex);
while (!m_saslDone) {
m_saslCv.wait(lock);
}
}
std::stringstream logMsg;
logMsg << "DrillClientImpl::handleAuthentication: Authentication failed. [Details: ";
if (SASL_OK == m_saslResultCode) {
// Check the negotiated SSF value and change the handlers.
if(m_encryptionCtxt.isEncryptionReqd()) {
if(SASL_OK != m_saslAuthenticator->verifyAndUpdateSaslProps()) {
logMsg << m_encryptionCtxt
<< ", Mechanism: " << m_saslAuthenticator->getAuthMechanismName()
<< ", Error: " << m_saslResultCode
<< ", Cause: " << m_saslAuthenticator->getErrorMessage(m_saslResultCode);
logMsg << "]. Negotiated Parameter is invalid.";
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << logMsg.str() << std::endl;)
return handleConnError(CONN_AUTH_FAILED, logMsg.str().c_str());
}
// Successfully negotiated for encryption related security parameters.
// Start using Encrypt and Decrypt handlers.
m_fpCurrentSendHandler = &DrillClientImpl::sendSyncEncrypted;
m_fpCurrentReadMsgHandler = &DrillClientImpl::readAndDecryptMsg;
}
// Reset the errorMsg stream since this is success case.
logMsg.str(std::string());
logMsg << "DrillClientImpl::handleAuthentication: Successfully authenticated! [Details: "
<< m_encryptionCtxt << " ]";
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << logMsg.str() << std::endl;)
m_io_service.reset();
return CONN_SUCCESS;
} else {
logMsg << m_encryptionCtxt
<< ", Mechanism: " << m_saslAuthenticator->getAuthMechanismName()
<< ", Error: " << m_saslResultCode
<< ", Cause: " << m_saslAuthenticator->getErrorMessage(m_saslResultCode);
logMsg << "]. Check connection parameters?";
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << logMsg.str() << std::endl;)
// shuts down socket as well
return handleConnError(CONN_AUTH_FAILED, logMsg.str().c_str());
}
}
void DrillClientImpl::initiateAuthentication() {
exec::shared::SaslMessage response;
m_saslResultCode = m_saslAuthenticator->init(m_serverAuthMechanisms, response, &m_encryptionCtxt);
switch (m_saslResultCode) {
case SASL_CONTINUE:
case SASL_OK: {
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::initiateAuthentication: initiated. " << std::endl;)
boost::lock_guard<boost::mutex> prLock(m_prMutex);
sendSaslResponse(response); // the challenge returned by server is handled by processSaslChallenge
break;
}
case SASL_NOMECH:
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::initiateAuthentication: "
<< "Mechanism is not supported (by server/client)." << std::endl;)
default:
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::initiateAuthentication: "
<< "Failed to initiate authentication." << std::endl;)
finishAuthentication();
break;
}
}
void DrillClientImpl::sendSaslResponse(const exec::shared::SaslMessage& response) {
boost::lock_guard<boost::mutex> lock(m_dcMutex);
const int32_t coordId = getNextCoordinationId();
rpc::OutBoundRpcMessage msg(exec::rpc::REQUEST, exec::user::SASL_MESSAGE, coordId, &response);
sendSyncCommon(msg);
if (m_pendingRequests++ == 0) {
getNextResult();
}
}
void DrillClientImpl::processSaslChallenge(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg) {
boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
assert(m_saslAuthenticator != NULL);
// parse challenge
exec::shared::SaslMessage challenge;
const bool parseStatus = challenge.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
if (!parseStatus) {
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Failed to parse challenge." << std::endl;)
m_saslResultCode = SASL_FAIL;
finishAuthentication();
m_pendingRequests--;
return;
}
// respond accordingly
exec::shared::SaslMessage response;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processSaslChallenge: status: "
<< exec::shared::SaslStatus_Name(challenge.status()) << std::endl;)
switch (challenge.status()) {
case exec::shared::SASL_IN_PROGRESS:
m_saslResultCode = m_saslAuthenticator->step(challenge, response);
if (m_saslResultCode == SASL_CONTINUE || m_saslResultCode == SASL_OK) {
sendSaslResponse(response);
} else { // failure
finishAuthentication();
}
break;
case exec::shared::SASL_SUCCESS:
if (SASL_CONTINUE == m_saslResultCode) { // client may need to evaluate once more
m_saslResultCode = m_saslAuthenticator->step(challenge, response);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "SASL succeeded on client? " << m_saslResultCode << std::endl;)
}
finishAuthentication();
break;
default:
m_saslResultCode = SASL_FAIL;
finishAuthentication();
break;
}
m_pendingRequests--;
}
void DrillClientImpl::finishAuthentication() {
boost::mutex::scoped_lock lock(m_saslMutex);
m_saslDone = true;
m_saslCv.notify_one();
}
FieldDefPtr DrillClientQueryResult::s_emptyColDefs( new (std::vector<Drill::FieldMetadata*>));
void DrillClientImpl::startMessageListener() {
if(this->m_pListenerThread==NULL){
// Stopping the io_service from running out-of-work
if(m_io_service.stopped()){
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: io_service is stopped. Restarting." <<std::endl;)
m_io_service.reset();
}
this->m_pWork = new boost::asio::io_service::work(m_io_service);
this->m_pListenerThread = new boost::thread(boost::bind(&boost::asio::io_service::run,
&this->m_io_service));
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::startMessageListener: Starting listener thread: "
<< this->m_pListenerThread << std::endl;)
}
}
DrillClientQueryResult* DrillClientImpl::SubmitQuery(::exec::shared::QueryType t,
const std::string& plan,
pfnQueryResultsListener l,
void* lCtx){
exec::user::RunQuery query;
query.set_results_mode(exec::user::STREAM_FULL);
query.set_type(t);
query.set_plan(plan);
boost::function<DrillClientQueryResult*(int32_t)> factory = boost::bind(
boost::factory<DrillClientQueryResult*>(),
boost::ref(*this),
_1,
boost::cref(plan),
l,
lCtx);
return sendMsg(factory, ::exec::user::RUN_QUERY, query);
}
DrillClientPrepareHandle* DrillClientImpl::PrepareQuery(const std::string& plan,
pfnPreparedStatementListener l,
void* lCtx){
exec::user::CreatePreparedStatementReq query;
query.set_sql_query(plan);
boost::function<DrillClientPrepareHandle*(int32_t)> factory = boost::bind(
boost::factory<DrillClientPrepareHandle*>(),
boost::ref(*this),
_1,
boost::cref(plan),
l,
lCtx);
return sendMsg(factory, ::exec::user::CREATE_PREPARED_STATEMENT, query);
}
DrillClientQueryResult* DrillClientImpl::ExecuteQuery(const PreparedStatement& pstmt,
pfnQueryResultsListener l,
void* lCtx){
const DrillClientPrepareHandle& handle = static_cast<const DrillClientPrepareHandle&>(pstmt);
exec::user::RunQuery query;
query.set_results_mode(exec::user::STREAM_FULL);
query.set_type(::exec::shared::PREPARED_STATEMENT);
query.set_allocated_prepared_statement_handle(new ::exec::user::PreparedStatementHandle(handle.m_preparedStatementHandle));
boost::function<DrillClientQueryResult*(int32_t)> factory = boost::bind(
boost::factory<DrillClientQueryResult*>(),
boost::ref(*this),
_1,
boost::cref(handle.m_query),
l,
lCtx);
return sendMsg(factory, ::exec::user::RUN_QUERY, query);
}
static void updateLikeFilter(exec::user::LikeFilter& likeFilter, const std::string& pattern,
const std::string& searchEscapeString) {
likeFilter.set_pattern(pattern);
likeFilter.set_escape(searchEscapeString);
}
DrillClientCatalogResult* DrillClientImpl::getCatalogs(const std::string& catalogPattern,
const std::string& searchEscapeString,
Metadata::pfnCatalogMetadataListener listener,
void* listenerCtx) {
exec::user::GetCatalogsReq query;
updateLikeFilter(*query.mutable_catalog_name_filter(), catalogPattern, searchEscapeString);
boost::function<DrillClientCatalogResult*(int32_t)> factory = boost::bind(
boost::factory<DrillClientCatalogResult*>(),
boost::ref(*this),
_1,
listener,
listenerCtx);
return sendMsg(factory, ::exec::user::GET_CATALOGS, query);
}
DrillClientSchemaResult* DrillClientImpl::getSchemas(const std::string& catalogPattern,
const std::string& schemaPattern,
const std::string& searchEscapeString,
Metadata::pfnSchemaMetadataListener listener,
void* listenerCtx) {
exec::user::GetSchemasReq query;
updateLikeFilter(*query.mutable_catalog_name_filter(), catalogPattern, searchEscapeString);
updateLikeFilter(*query.mutable_schema_name_filter(), schemaPattern, searchEscapeString);
boost::function<DrillClientSchemaResult*(int32_t)> factory = boost::bind(
boost::factory<DrillClientSchemaResult*>(),
boost::ref(*this),
_1,
listener,
listenerCtx);
return sendMsg(factory, ::exec::user::GET_SCHEMAS, query);
}
DrillClientTableResult* DrillClientImpl::getTables(const std::string& catalogPattern,
const std::string& schemaPattern,
const std::string& tablePattern,
const std::vector<std::string>* tableTypes,
const std::string& searchEscapeString,
Metadata::pfnTableMetadataListener listener,
void* listenerCtx) {
exec::user::GetTablesReq query;
updateLikeFilter(*query.mutable_catalog_name_filter(), catalogPattern, searchEscapeString);
updateLikeFilter(*query.mutable_schema_name_filter(), schemaPattern, searchEscapeString);
updateLikeFilter(*query.mutable_table_name_filter(), tablePattern, searchEscapeString);
if (tableTypes) {
std::copy(tableTypes->begin(), tableTypes->end(),
google::protobuf::RepeatedFieldBackInserter(query.mutable_table_type_filter()));
}
boost::function<DrillClientTableResult*(int32_t)> factory = boost::bind(
boost::factory<DrillClientTableResult*>(),
boost::ref(*this),
_1,
listener,
listenerCtx);
return sendMsg(factory, ::exec::user::GET_TABLES, query);
}
DrillClientColumnResult* DrillClientImpl::getColumns(const std::string& catalogPattern,
const std::string& schemaPattern,
const std::string& tablePattern,
const std::string& columnsPattern,
const std::string& searchEscapeString,
Metadata::pfnColumnMetadataListener listener,
void* listenerCtx) {
exec::user::GetColumnsReq query;
updateLikeFilter(*query.mutable_catalog_name_filter(), catalogPattern, searchEscapeString);
updateLikeFilter(*query.mutable_schema_name_filter(), schemaPattern, searchEscapeString);
updateLikeFilter(*query.mutable_table_name_filter(), tablePattern, searchEscapeString);
updateLikeFilter(*query.mutable_column_name_filter(), columnsPattern, searchEscapeString);
boost::function<DrillClientColumnResult*(int32_t)> factory = boost::bind(
boost::factory<DrillClientColumnResult*>(),
boost::ref(*this),
_1,
listener,
listenerCtx);
return sendMsg(factory, ::exec::user::GET_COLUMNS, query);
}
template<typename Handle>
Handle* DrillClientImpl::sendMsg(boost::function<Handle*(int32_t)> handleFactory, ::exec::user::RpcType type, const ::google::protobuf::Message& message) {
int32_t coordId;
Handle* phandle=NULL;
connectionStatus_t cStatus=CONN_SUCCESS;
{
boost::lock_guard<boost::mutex> prLock(this->m_prMutex);
boost::lock_guard<boost::mutex> dcLock(this->m_dcMutex);
coordId = this->getNextCoordinationId();
rpc::OutBoundRpcMessage out_msg(exec::rpc::REQUEST, type, coordId, &message);
phandle = handleFactory(coordId);
this->m_queryHandles[coordId]=phandle;
connectionStatus_t cStatus = sendSyncCommon(out_msg);
if(cStatus == CONN_SUCCESS){
bool sendRequest=false;
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent " << ::exec::user::RpcType_Name(type) << " request. " << "[" << m_connectedHost << "]" << "Coordination id = " << coordId << std::endl;)
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sent " << ::exec::user::RpcType_Name(type) << " Coordination id = " << coordId << " query: " << phandle->getQuery() << std::endl;)
if(m_pendingRequests++==0){
sendRequest=true;
}else{
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Queuing " << ::exec::user::RpcType_Name(type) << " request to server" << std::endl;)
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Number of pending requests = " << m_pendingRequests << std::endl;)
}
if(sendRequest){
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Sending " << ::exec::user::RpcType_Name(type) << " request. Number of pending requests = "
<< m_pendingRequests << std::endl;)
getNextResult(); // async wait for results
}
}
}
if(cStatus!=CONN_SUCCESS){
this->m_queryHandles.erase(coordId);
delete phandle;
return NULL;
}
//run this in a new thread
startMessageListener();
return phandle;
}
void DrillClientImpl::getNextResult(){
// This call is always made from within a function where the mutex has already been acquired
//boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
{
boost::unique_lock<boost::mutex> memLock(AllocatedBuffer::s_memCVMutex);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Read blocked waiting for memory." << std::endl;)
while(AllocatedBuffer::s_isBufferLimitReached){
AllocatedBuffer::s_memCV.wait(memLock);
}
}
//use free, not delete to free
ByteBuf_t readBuf = Utils::allocateBuffer(LEN_PREFIX_BUFLEN);
if (DrillClientConfig::getQueryTimeout() > 0){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Started new query wait timer with "
<< DrillClientConfig::getQueryTimeout() << " seconds." << std::endl;)
m_deadlineTimer.expires_from_now(boost::posix_time::seconds(DrillClientConfig::getQueryTimeout()));
m_deadlineTimer.async_wait(boost::bind(
&DrillClientImpl::handleReadTimeout,
this,
boost::asio::placeholders::error
));
}
startHeartbeatTimer();
{
boost::lock_guard<boost::mutex> lock(m_channelMutex);
if (m_pChannel == NULL) {
return;
}
m_pChannel->getSocketStream().asyncRead(
boost::asio::buffer(readBuf, LEN_PREFIX_BUFLEN),
boost::bind(
&DrillClientImpl::handleRead,
this,
readBuf,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred)
);
}
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::getNextResult: async_read from the server\n";)
}
void DrillClientImpl::waitForResults(){
// The listener thread never exists because it may be sending/receiving a heartbeat. Before the heartbeat was introduced
// we could check if the listener thread has exited to tell if the queries are done. We can no longer do so now. We check
// a condition variable instead
{
boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
//if no more data, return NULL;
while(this->m_pendingRequests>0) {
this->m_cv.wait(cvLock);
}
}
}
/*
* Decode the length of the message from bufWithLen and then read entire message from the socket.
* Parameters:
* bufWithLenField - in param - buffer containing the length of the RPC message/encrypted chunk
* bufferWithDataAndLenBytes - out param - buffer pointer which points to memory allocated in this function and has the
* entire one RPC message / encrypted chunk along with the length of the message.
* Memory for this buffer is released by caller.
* lengthFieldLength - out param - bytes of bufWithLen which contains the length of the entire RPC message or
* encrypted chunk
* lengthDecodeHandler - in param - function pointer with length decoder to use. For encrypted chunk we use
* lengthDecode and for plain RPC message we use rpcLengthDecode.
* Return:
* status_t - QRY_SUCCESS - In case of success.
* - QRY_COMM_ERROR/QRY_INTERNAL_ERROR/QRY_CLIENT_OUTOFMEM - In cases of error.
*/
status_t DrillClientImpl::readLenBytesFromSocket(const ByteBuf_t bufWithLenField, AllocatedBufferPtr* bufferWithDataAndLenBytes,
uint32_t& lengthFieldLength, lengthDecoder lengthDecodeHandler) {
uint32_t rmsgLen = 0;
boost::system::error_code error;
*bufferWithDataAndLenBytes = NULL;
// Decode the length field
lengthFieldLength = (this->*lengthDecodeHandler)(bufWithLenField, rmsgLen);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Length bytes = " << lengthFieldLength << std::endl;)
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Msg Length = " << rmsgLen << std::endl;)
if(rmsgLen>0) {
const size_t leftover = LEN_PREFIX_BUFLEN - lengthFieldLength;
// Allocate a buffer for reading all the bytes in bufWithLen and length number of bytes.
const size_t bufferSizeWithLenBytes = rmsgLen + lengthFieldLength;
*bufferWithDataAndLenBytes = new AllocatedBuffer(bufferSizeWithLenBytes);
if(*bufferWithDataAndLenBytes == NULL) {
return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL);
}
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readLenBytesFromSocket: Allocated and locked buffer: [ "
<< *bufferWithDataAndLenBytes << ", size = " << bufferSizeWithLenBytes << " ]\n";)
// Copy the memory of bufWithLen into bufferWithLenBytesSize
memcpy((*bufferWithDataAndLenBytes)->m_pBuffer, bufWithLenField, LEN_PREFIX_BUFLEN);
const size_t bytesToRead = rmsgLen - leftover;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Copied bufWithLen into bufferWithLenBytes. "
<< "Now reading data (rmsgLen - leftover) : " << bytesToRead
<< std::endl;)
// Read the entire data left from socket and copy to currentBuffer.
const ByteBuf_t b = (*bufferWithDataAndLenBytes)->m_pBuffer + LEN_PREFIX_BUFLEN;
doReadFromSocket(b, bytesToRead, error);
} else {
return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL);
}
return error ? handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL)
: QRY_SUCCESS;
}
/*
* Function to read entire RPC message from socket and decode it to InboundRpcMessage
* Parameters:
* inBuf - in param - Buffer containing the length bytes.
* allocatedBuffer - out param - Buffer containing the length bytes and entire RPC message bytes.
* msg - out param - Decoded InBoundRpcMessage from the bytes in allocatedBuffer
* Return:
* status_t - QRY_SUCCESS - In case of success.
* - QRY_COMM_ERROR/QRY_INTERNAL_ERROR/QRY_CLIENT_OUTOFMEM - In cases of error.
*/
status_t DrillClientImpl::readMsg(const ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer,
rpc::InBoundRpcMessage& msg){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Read message from buffer "
<< reinterpret_cast<int*>(inBuf) << std::endl;)
*allocatedBuffer = NULL;
{
// We need to protect the readLength and read buffer, and the pending requests counter,
// but we don't have to keep the lock while we decode the rest of the buffer.
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
uint32_t lengthFieldSize = 0;
// Read the message length and extract length size bytes to form InBoundRpcMessage
const status_t statusCode = readLenBytesFromSocket(inBuf, allocatedBuffer, lengthFieldSize,
&DrillClientImpl::rpcLengthDecode);
// Check for error conditions
if(QRY_SUCCESS != statusCode) {
Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
return statusCode;
}
// Get the message size
size_t msgLen = (*allocatedBuffer)->m_bufSize;
// Read data successfully, now let's try to decode the buffer and form a valid RPC message.
// allocatedBuffer also contains the length bytes which is not needed by decodes so skip that part of buffer.
// We have it since in case of encryption the unwrap function expects it
if (!decode((*allocatedBuffer)->m_pBuffer + lengthFieldSize, msgLen - lengthFieldSize, msg)) {
Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
return handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, "Cannot decode server message"), NULL);
}
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Successfully created a RPC message with Coordination id: "
<< msg.m_coord_id << std::endl;)
}
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readMsg: Free buffer "
<< reinterpret_cast<int*>(inBuf) << std::endl;)
Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
return QRY_SUCCESS;
}
/*
* Read ENCRYPT_LEN_PREFIX_BUFLEN bytes to decode length of one complete encrypted chunk. The length bytes are expected
* to be in network order. It is converted to host order and the value is stored in rmsgLen parameter.
* Parameters:
* inBuf - in param - ByteBuf_t containing atleast the length bytes.
* rmsgLen - out param - Contain the decoded value of length.
* Return:
* size_t - length bytes read to decode
*/
size_t DrillClientImpl::lengthDecode(const ByteBuf_t inBuf, uint32_t& rmsgLen) {
memcpy(&rmsgLen, inBuf, ENCRYPT_LEN_PREFIX_BUFLEN);
rmsgLen = ntohl(rmsgLen);
return ENCRYPT_LEN_PREFIX_BUFLEN;
}
/*
* Wrapper which uses RPC message length decoder to get length of one complete RPC message from _buf.
* Parameters:
* inBuf - in param - ByteBuf_t containing atleast the length bytes.
* rmsgLen - out param - Contain the decoded value of length.
* Return:
* size_t - length bytes read to decode
*/
size_t DrillClientImpl::rpcLengthDecode(const ByteBuf_t inBuf, uint32_t& rmsgLen) {
return rpc::lengthDecode(inBuf, rmsgLen);
}
/*
* Read all the encrypted chunk needed to form a complete RPC message. Read an entire chunk from network, decrypt it
* and put in a buffer. The same process is repeated until the entire buffer to form a completed RPC message is read.
* Parameters:
* inBuf - in param - ByteBuf_t containing atleast the length bytes.
* allocatedBuffer - out param - Buffer containing the entire RPC message bytes which is formed by reading all the
* required encrypted chunk from network and decrypting each individual chunk. The
* buffer memory is released by caller.
.* msg - out param - InBoundRpcMessage formed from bytes in allocatedBuffer
* Return:
* status_t - QRY_SUCCESS - In case of success.
* - QRY_COMM_ERROR/QRY_INTERNAL_ERROR/QRY_CLIENT_OUTOFMEM - In cases of error.
*/
status_t DrillClientImpl::readAndDecryptMsg(const ByteBuf_t inBuf, AllocatedBufferPtr* allocatedBuffer,
rpc::InBoundRpcMessage& msg) {
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Read message from buffer "
<< reinterpret_cast<int*>(inBuf) << std::endl;)
size_t leftover = 0;
uint32_t rpcMsgLen = 0;
size_t bytes_read = 0;
uint32_t writeIndex = 0;
size_t bytesToRead = 0;
*allocatedBuffer = NULL;
boost::system::error_code error;
std::stringstream errorMsg;
{
// We need to protect the readLength and read buffer, and the pending requests counter,
// but we don't have to keep the lock while we decode the rest of the buffer.
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
do{
AllocatedBufferPtr currentBuffer = NULL;
uint32_t lengthFieldSize = 0;
const status_t statusCode = readLenBytesFromSocket(inBuf, &currentBuffer, lengthFieldSize,
&DrillClientImpl::lengthDecode);
if(QRY_SUCCESS != statusCode) {
Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
// Release the buffer allocated to hold chunk
if(currentBuffer != NULL) {
Utils::freeBuffer(currentBuffer->m_pBuffer, currentBuffer->m_bufSize);
currentBuffer = NULL;
}
return statusCode;
}
// read one chunk successfully. Let's try to decrypt the message
const char* unWrappedData = NULL;
uint32_t unWrappedLen = 0;
const int decryptResult = m_saslAuthenticator->unwrap(reinterpret_cast<const char*>(currentBuffer->m_pBuffer),
currentBuffer->m_bufSize, &unWrappedData, unWrappedLen);
if(SASL_OK != decryptResult) {
errorMsg << "Sasl unwrap failed for the buffer of size:" << currentBuffer->m_bufSize << " , Error: "
<< decryptResult;
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::readAndDecryptMsg: "
<< errorMsg.str() << std::endl;)
Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
// Release the buffer allocated to hold chunk
Utils::freeBuffer(currentBuffer->m_pBuffer, currentBuffer->m_bufSize);
currentBuffer = NULL;
return handleQryError(QRY_COMM_ERROR,
getMessage(ERR_QRY_COMMERR, errorMsg.str().c_str()), NULL);
}
// Check for case if the unWrappedLen is 0, since Cyrus SASL plugin verifies if the length of wrapped data
// is less than the length specified by prepended 4 octets as per RFC 4422/2222. If so it just returns
// and waits for more data
if(unWrappedLen == 0 || (unWrappedData == NULL)) {
errorMsg << "Sasl unwrap failed with mismatch in length of wrapped data and the prepended length value";
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::readAndDecryptMsg: " << errorMsg.str()
<< std::endl;)
Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
// Release the buffer allocated to hold chunk
Utils::freeBuffer(currentBuffer->m_pBuffer, currentBuffer->m_bufSize);
currentBuffer = NULL;
return handleQryError(QRY_COMM_ERROR,
getMessage(ERR_QRY_COMMERR, errorMsg.str().c_str()), NULL);
}
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Successfully decrypted the buffer"
<< " Sizes - Before Decryption = " << currentBuffer->m_bufSize
<< " and After Decryption = " << unWrappedLen << std::endl;)
// Release the buffer allocated to hold chunk
Utils::freeBuffer(currentBuffer->m_pBuffer, currentBuffer->m_bufSize);
currentBuffer = NULL;
bytes_read = 0;
if(*allocatedBuffer == NULL) {
// This is the first chunk of the RPC message. We will decode the RPC message full length
bytes_read = rpcLengthDecode(reinterpret_cast<ByteBuf_t>(const_cast<char*>(unWrappedData)), rpcMsgLen);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Rpc Message Length bytes = "
<< bytes_read << std::endl;)
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Rpc Message Length = "
<< rpcMsgLen << std::endl;)
if(rpcMsgLen == 0) {
Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVREADLEN), NULL);
}
// Allocate a buffer for storing full RPC message. This is released by the caller
*allocatedBuffer = new AllocatedBuffer(rpcMsgLen);
if(*allocatedBuffer == NULL){
Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
return handleQryError(QRY_CLIENT_OUTOFMEM, getMessage(ERR_QRY_OUTOFMEM), NULL);
}
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Allocated and locked buffer:"
<< "[ " << *allocatedBuffer << ", size = " << rpcMsgLen << " ]\n";)
bytesToRead = rpcMsgLen;
}
// Update the leftover bytes that is not copied yet
leftover = unWrappedLen - bytes_read;
// Copy rest of decrypted message to the buffer. We can do this since it is assured that one
// entire decrypted chunk is part of the same RPC message.
if(leftover) {
memcpy((*allocatedBuffer)->m_pBuffer + writeIndex, unWrappedData + bytes_read, leftover);
}
// Update bytes left to read to form full RPC message.
bytesToRead -= leftover;
writeIndex += leftover;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Left to read unencrypted data"
<< " of length (bytesToRead) : " << bytesToRead << std::endl;)
if(bytesToRead > 0) {
// Read synchronously buffer of size LEN_PREFIX_BUFLEN to get length of next chunk
doReadFromSocket(inBuf, LEN_PREFIX_BUFLEN, error);
if(error) {
Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
return handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
}
}
}while(bytesToRead > 0); // more chunks to read for entire RPC message
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Done decrypting entire RPC message "
<< " of length: " << rpcMsgLen << ". Now starting decode:" << std::endl;)
// Decode the buffer and form a RPC message
if (!decode((*allocatedBuffer)->m_pBuffer, rpcMsgLen, msg)) {
Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
return handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR,
"Cannot decode server message into valid RPC message"), NULL);
}
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Successfully created a RPC message with Coordination id: "
<< msg.m_coord_id << std::endl;)
}
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::readAndDecryptMsg: Free buffer "
<< reinterpret_cast<int*>(inBuf) << std::endl;)
Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
return QRY_SUCCESS;
}
status_t DrillClientImpl::processQueryResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
DrillClientQueryResult* pDrillClientQueryResult=NULL;
status_t ret=QRY_SUCCESS;
exec::shared::QueryId qid;
sendAck(msg, true);
{
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
exec::shared::QueryResult qr;
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Result " << std::endl;)
qr.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr.DebugString() << std::endl;)
qid.CopyFrom(qr.query_id());
if (qr.has_query_state() &&
qr.query_state() != exec::shared::QueryResult_QueryState_RUNNING &&
qr.query_state() != exec::shared::QueryResult_QueryState_STARTING) {
pDrillClientQueryResult=findQueryResult(qid);
//Queries that have been cancelled or whose resources are freed before completion
//do not have a DrillClientQueryResult object. We need not handle the terminal message
//in that case since all it does is to free resources (and they have already been freed)
if(pDrillClientQueryResult!=NULL){
//Validate the RPC message
std::string valErr;
if( (ret=validateResultMessage(msg, qr, valErr)) != QRY_SUCCESS){
delete allocatedBuffer;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: ERR_QRY_INVRPC." << std::endl;)
return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult);
}
ret=processQueryStatusResult(&qr, pDrillClientQueryResult);
}else{
// We've received the final message for a query that has been cancelled
// or for which the resources have been freed. We no longer need to listen
// for more incoming messages for such a query.
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult:" << debugPrintQid(qid)<< " completed."<< std::endl;)
m_pendingRequests--;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: pending requests is " << m_pendingRequests<< std::endl;)
ret=QRY_CANCELED;
}
delete allocatedBuffer;
//return ret;
}else{
// Normal query results come back with query_state not set.
// Actually this is not strictly true. The query state is set to
// 0(i.e. PENDING), but protobuf thinks this means the value is not set.
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: Query State was not set.\n";)
}
}
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: " << m_pendingRequests << " requests pending." << std::endl;)
if(m_pendingRequests==0){
// signal any waiting client that it can exit because there are no more any query results to arrive.
// We keep the heartbeat going though.
m_cv.notify_one();
}
return ret;
}
status_t DrillClientImpl::processQueryData(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
DrillClientQueryResult* pDrillClientQueryResult=NULL;
status_t ret=QRY_SUCCESS;
// Be a good client and send ack as early as possible.
// Drillbit pushed the query result to the client, the client should send ack
// whenever it receives the message
sendAck(msg, true);
RecordBatch* pRecordBatch=NULL;
{
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
exec::shared::QueryData* qr = new exec::shared::QueryData; //Record Batch will own this object and free it up.
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Data " << std::endl;)
qr->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << qr->DebugString() << std::endl;)
const ::exec::shared::QueryId& qid = qr->query_id();
if(qid.part1()==0){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: QID=0. Ignore and return QRY_SUCCESS." << std::endl;)
delete allocatedBuffer;
return QRY_SUCCESS;
}
pDrillClientQueryResult=findQueryResult(qid);
if(pDrillClientQueryResult==NULL){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Cleaning up resources allocated for canceled query ("
<< debugPrintQid(qid) << ")." << std::endl;)
delete qr;
delete allocatedBuffer;
return ret;
}
// check if query has been cancelled
if (pDrillClientQueryResult->isCancelled()) {
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query cancellation " << std::endl;)
delete qr;
delete allocatedBuffer;
ret = QRY_CANCELED;
} else {
//Validate the RPC message
std::string valErr;
if( (ret=validateDataMessage(msg, *qr, valErr)) != QRY_SUCCESS){
delete allocatedBuffer;
delete qr;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryData: ERR_QRY_INVRPC.\n";)
pDrillClientQueryResult->setQueryStatus(ret);
return handleQryError(ret, getMessage(ERR_QRY_INVRPC, valErr.c_str()), pDrillClientQueryResult);
}
//Build Record Batch here
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Building record batch for Query Id - " << debugPrintQid(qid) << std::endl;)
pRecordBatch= new RecordBatch(qr, allocatedBuffer, msg.m_dbody);
pDrillClientQueryResult->m_numBatches++;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Allocated new Record batch." << (void*)pRecordBatch << std::endl;)
pRecordBatch->build();
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid)<<"recordBatch.numRecords "
<< pRecordBatch->getNumRecords() << std::endl;)
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid)<<"recordBatch.numFields "
<< pRecordBatch->getNumFields() << std::endl;)
ret=pDrillClientQueryResult->setupColumnDefs(qr);
if(ret==QRY_SUCCESS_WITH_INFO){
pRecordBatch->schemaChanged(true);
}
pDrillClientQueryResult->setIsQueryPending(true);
if(pDrillClientQueryResult->m_bIsLastChunk){
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << debugPrintQid(qid)
<< "Received last batch. " << std::endl;)
ret=QRY_NO_MORE_DATA;
}
pDrillClientQueryResult->setQueryStatus(ret);
ret = pDrillClientQueryResult->notifyListener(pRecordBatch, NULL);
}
} // release lock
if((ret==QRY_FAILURE || ret==QRY_CANCELED) && pDrillClientQueryResult != NULL){
return handleQryCancellation(ret, pDrillClientQueryResult);
}
return ret;
}
status_t DrillClientImpl::processQueryId(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Query Handle with coordination id:" << msg.m_coord_id << std::endl;)
DrillClientQueryResult* pDrillClientQueryResult=NULL;
status_t ret=QRY_SUCCESS;
// make sure to deallocate buffer
boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
{
boost::lock_guard<boost::mutex> lock(m_dcMutex);
if(msg.m_coord_id==0){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryId: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
return QRY_SUCCESS;
}
for(std::map< ::exec::shared::QueryId*, DrillClientQueryResult*>::const_iterator it=this->m_queryResults.begin();it!=this->m_queryResults.end();it++){
DrillClientQueryResult* pQueryResult=it->second;
std::string qidString = (pQueryResult->m_pQueryId!=NULL)?debugPrintQid(*pQueryResult->m_pQueryId):std::string("NULL");
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "DrillClientImpl::processQueryId: m_queryIds: coordinationId: " << pQueryResult->m_coordinationId
<< " QueryId: "<< qidString << std::endl;)
}
std::map<int, DrillClientQueryHandle*>::const_iterator it;
it=this->m_queryHandles.find(msg.m_coord_id);
if(it==this->m_queryHandles.end()){
return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
}
pDrillClientQueryResult=dynamic_cast<DrillClientQueryResult*>((*it).second);
if (!pDrillClientQueryResult) {
return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
}
// Check for cancellation to notify
if (pDrillClientQueryResult->isCancelled()) {
ret = QRY_CANCELED;
}
else {
exec::shared::QueryId *qid = new exec::shared::QueryId;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Query Handle " << msg.m_pbody.size() << std::endl;)
qid->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size());
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Query Id - " << debugPrintQid(*qid) << std::endl;)
m_queryResults[qid]=pDrillClientQueryResult;
//save queryId allocated here so we can free it later
pDrillClientQueryResult->setQueryId(qid);
}
}
if (ret == QRY_CANCELED && pDrillClientQueryResult != NULL) {
return handleQryCancellation(ret, pDrillClientQueryResult);
}
return ret;
}
status_t DrillClientImpl::processPreparedStatement(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing Prepared Statement with coordination id:" << msg.m_coord_id << std::endl;)
status_t ret=QRY_SUCCESS;
// make sure to deallocate buffer
boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
boost::lock_guard<boost::mutex> lock(m_dcMutex);
if(msg.m_coord_id==0){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processPreparedStatement: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
return QRY_SUCCESS;
}
std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
if(it!=this->m_queryHandles.end()){
DrillClientPrepareHandle* pDrillClientPrepareHandle=static_cast<DrillClientPrepareHandle*>((*it).second);
if (!validateResultRPCType(pDrillClientPrepareHandle, msg)){
return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for prepared statement.", pDrillClientPrepareHandle);
}
exec::user::CreatePreparedStatementResp resp;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received Prepared Statement Handle " << msg.m_pbody.size() << std::endl;)
if (!resp.ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size())) {
return handleQryError(QRY_COMM_ERROR, "Cannot decode prepared statement", pDrillClientPrepareHandle);
}
if (resp.has_status() && resp.status() != exec::user::OK) {
return handleQryError(QRY_FAILED, resp.error(), pDrillClientPrepareHandle);
}
if (QRY_SUCCESS != pDrillClientPrepareHandle->setupPreparedStatement(resp.prepared_statement())){
return handleQryError(QRY_FAILED, "Error during prepared statement setup.", pDrillClientPrepareHandle);
}
pDrillClientPrepareHandle->notifyListener(pDrillClientPrepareHandle, NULL);
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Prepared Statement handle - " << resp.prepared_statement().server_handle().DebugString() << std::endl;)
}else{
return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
}
m_pendingRequests--;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processPreparedStament: " << m_pendingRequests << " requests pending." << std::endl;)
if(m_pendingRequests==0){
// signal any waiting client that it can exit because there are no more any query results to arrive.
// We keep the heartbeat going though.
m_cv.notify_one();
}
return ret;
}
status_t DrillClientImpl::processCatalogsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetCatalogsResp with coordination id:" << msg.m_coord_id << std::endl;)
status_t ret=QRY_SUCCESS;
// make sure to deallocate buffer
boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
boost::lock_guard<boost::mutex> lock(m_dcMutex);
if(msg.m_coord_id==0){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processCatalogsResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
return QRY_SUCCESS;
}
std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
if(it!=this->m_queryHandles.end()){
DrillClientCatalogResult* pHandle=static_cast<DrillClientCatalogResult*>((*it).second);
if (!validateResultRPCType(pHandle, msg)){
return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for getcatalogs results.", pHandle);
}
exec::user::GetCatalogsResp* resp = new exec::user::GetCatalogsResp;
pHandle->attachMetadataResult(resp);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetCatalogs result Handle " << msg.m_pbody.size() << std::endl;)
if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
return handleQryError(QRY_COMM_ERROR, "Cannot decode getcatalogs results", pHandle);
}
if (resp->status() != exec::user::OK) {
return handleQryError(QRY_FAILED, resp->error(), pHandle);
}
const ::google::protobuf::RepeatedPtrField< ::exec::user::CatalogMetadata>& catalogs = resp->catalogs();
pHandle->m_meta.clear();
pHandle->m_meta.reserve(resp->catalogs_size());
for(::google::protobuf::RepeatedPtrField< ::exec::user::CatalogMetadata>::const_iterator it = catalogs.begin(); it != catalogs.end(); ++it) {
meta::DrillCatalogMetadata meta(*it);
pHandle->m_meta.push_back(meta);
}
pHandle->notifyListener(&pHandle->m_meta, NULL);
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetCatalogs result - " << resp->catalogs_size() << " catalog(s)" << std::endl;)
}else{
return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
}
m_pendingRequests--;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processCatalogsResult: " << m_pendingRequests << " requests pending." << std::endl;)
if(m_pendingRequests==0){
// signal any waiting client that it can exit because there are no more any query results to arrive.
// We keep the heartbeat going though.
m_cv.notify_one();
}
return ret;
}
status_t DrillClientImpl::processSchemasResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetSchemaResp with coordination id:" << msg.m_coord_id << std::endl;)
status_t ret=QRY_SUCCESS;
// make sure to deallocate buffer
boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
boost::lock_guard<boost::mutex> lock(m_dcMutex);
if(msg.m_coord_id==0){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processSchemasResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
return QRY_SUCCESS;
}
std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
if(it!=this->m_queryHandles.end()){
DrillClientSchemaResult* pHandle=static_cast<DrillClientSchemaResult*>((*it).second);
if (!validateResultRPCType(pHandle, msg)){
return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for getschemas results.", pHandle);
}
exec::user::GetSchemasResp* resp = new exec::user::GetSchemasResp();
pHandle->attachMetadataResult(resp);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetSchemasResp result Handle " << msg.m_pbody.size() << std::endl;)
if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
return handleQryError(QRY_COMM_ERROR, "Cannot decode getschemas results", pHandle);
}
if (resp->status() != exec::user::OK) {
return handleQryError(QRY_FAILED, resp->error(), pHandle);
}
const ::google::protobuf::RepeatedPtrField< ::exec::user::SchemaMetadata>& schemas = resp->schemas();
pHandle->m_meta.clear();
pHandle->m_meta.reserve(resp->schemas_size());
for(::google::protobuf::RepeatedPtrField< ::exec::user::SchemaMetadata>::const_iterator it = schemas.begin(); it != schemas.end(); ++it) {
meta::DrillSchemaMetadata meta(*it);
pHandle->m_meta.push_back(meta);
}
pHandle->notifyListener(&pHandle->m_meta, NULL);
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetSchemaResp result - " << resp->schemas_size() << " schema(s)" << std::endl;)
}else{
return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
}
m_pendingRequests--;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processSchemasResult: " << m_pendingRequests << " requests pending." << std::endl;)
if(m_pendingRequests==0){
// signal any waiting client that it can exit because there are no more any query results to arrive.
// We keep the heartbeat going though.
m_cv.notify_one();
}
return ret;
}
status_t DrillClientImpl::processTablesResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetTablesResp with coordination id:" << msg.m_coord_id << std::endl;)
status_t ret=QRY_SUCCESS;
// make sure to deallocate buffer
boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
boost::lock_guard<boost::mutex> lock(m_dcMutex);
if(msg.m_coord_id==0){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processTablesResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
return QRY_SUCCESS;
}
std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
if(it!=this->m_queryHandles.end()){
DrillClientTableResult* pHandle=static_cast<DrillClientTableResult*>((*it).second);
if (!validateResultRPCType(pHandle, msg)){
return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for gettables results.", pHandle);
}
exec::user::GetTablesResp* resp = new exec::user::GetTablesResp();
pHandle->attachMetadataResult(resp);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GeTablesResp result Handle " << msg.m_pbody.size() << std::endl;)
if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
return handleQryError(QRY_COMM_ERROR, "Cannot decode gettables results", pHandle);
}
if (resp->status() != exec::user::OK) {
return handleQryError(QRY_FAILED, resp->error(), pHandle);
}
const ::google::protobuf::RepeatedPtrField< ::exec::user::TableMetadata>& tables = resp->tables();
pHandle->m_meta.clear();
pHandle->m_meta.reserve(resp->tables_size());
for(::google::protobuf::RepeatedPtrField< ::exec::user::TableMetadata>::const_iterator it = tables.begin(); it != tables.end(); ++it) {
meta::DrillTableMetadata meta(*it);
pHandle->m_meta.push_back(meta);
}
pHandle->notifyListener(&pHandle->m_meta, NULL);
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetTables result - " << resp->tables_size() << " table(s)" << std::endl;)
}else{
return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
}
m_pendingRequests--;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processTablesResult: " << m_pendingRequests << " requests pending." << std::endl;)
if(m_pendingRequests==0){
// signal any waiting client that it can exit because there are no more any query results to arrive.
// We keep the heartbeat going though.
m_cv.notify_one();
}
return ret;
}
status_t DrillClientImpl::processColumnsResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetColumnsResp with coordination id:" << msg.m_coord_id << std::endl;)
status_t ret=QRY_SUCCESS;
// make sure to deallocate buffer
boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
boost::lock_guard<boost::mutex> lock(m_dcMutex);
if(msg.m_coord_id==0){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processColumnsResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
return QRY_SUCCESS;
}
std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
if(it!=this->m_queryHandles.end()){
DrillClientColumnResult* pHandle=static_cast<DrillClientColumnResult*>((*it).second);
if (!validateResultRPCType(pHandle, msg)){
return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for getcolumns results.", pHandle);
}
exec::user::GetColumnsResp* resp = new exec::user::GetColumnsResp();
pHandle->attachMetadataResult(resp);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetColumnsResp result Handle " << msg.m_pbody.size() << std::endl;)
if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
return handleQryError(QRY_COMM_ERROR, "Cannot decode getcolumns results", pHandle);
}
if (resp->status() != exec::user::OK) {
return handleQryError(QRY_FAILED, resp->error(), pHandle);
}
const ::google::protobuf::RepeatedPtrField< ::exec::user::ColumnMetadata>& columns = resp->columns();
pHandle->m_meta.clear();
pHandle->m_meta.reserve(resp->columns_size());
for(::google::protobuf::RepeatedPtrField< ::exec::user::ColumnMetadata>::const_iterator it = columns.begin(); it != columns.end(); ++it) {
meta::DrillColumnMetadata meta(*it);
pHandle->m_meta.push_back(meta);
}
pHandle->notifyListener(&pHandle->m_meta, NULL);
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetColumnsResp result - " << resp->columns_size() << " columns(s)" << std::endl;)
}else{
return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
}
m_pendingRequests--;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processColumnsResult: " << m_pendingRequests << " requests pending." << std::endl;)
if(m_pendingRequests==0){
// signal any waiting client that it can exit because there are no more any query results to arrive.
// We keep the heartbeat going though.
m_cv.notify_one();
}
return ret;
}
status_t DrillClientImpl::processServerMetaResult(AllocatedBufferPtr allocatedBuffer, const rpc::InBoundRpcMessage& msg ){
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Processing GetServerMetaResp with coordination id:" << msg.m_coord_id << std::endl;)
status_t ret=QRY_SUCCESS;
// make sure to deallocate buffer
boost::shared_ptr<AllocatedBuffer> deallocationGuard(allocatedBuffer);
boost::lock_guard<boost::mutex> lock(m_dcMutex);
if(msg.m_coord_id==0){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processServerMetaResult: m_coord_id=0. Ignore and return QRY_SUCCESS." << std::endl;)
return QRY_SUCCESS;
}
std::map<int,DrillClientQueryHandle*>::const_iterator it=this->m_queryHandles.find(msg.m_coord_id);
if(it!=this->m_queryHandles.end()){
DrillClientServerMetaHandle* pHandle=static_cast<DrillClientServerMetaHandle*>((*it).second);
if (!validateResultRPCType(pHandle, msg)){
return handleQryError(QRY_COMM_ERROR, "Unexpected RPC Type for GetServerMetaResp results.", pHandle);
}
exec::user::GetServerMetaResp* resp = new exec::user::GetServerMetaResp();
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received GetServerMetaResp result Handle " << msg.m_pbody.size() << std::endl;)
if (!(resp->ParseFromArray(msg.m_pbody.data(), msg.m_pbody.size()))) {
return handleQryError(QRY_COMM_ERROR, "Cannot decode GetServerMetaResp results", pHandle);
}
if (resp->status() != exec::user::OK) {
return handleQryError(QRY_FAILED, resp->error(), pHandle);
}
pHandle->notifyListener(&(resp->server_meta()), NULL);
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "GetServerMetaResp result " << std::endl;)
}else{
return handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVQUERYID), NULL);
}
m_pendingRequests--;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processServerMetaResult: " << m_pendingRequests << " requests pending." << std::endl;)
if(m_pendingRequests==0){
// signal any waiting client that it can exit because there are no more any query results to arrive.
// We keep the heartbeat going though.
m_cv.notify_one();
}
return ret;
}
DrillClientQueryResult* DrillClientImpl::findQueryResult(const exec::shared::QueryId& qid){
DrillClientQueryResult* pDrillClientQueryResult=NULL;
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Searching for Query Id - " << debugPrintQid(qid) << std::endl;)
std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::iterator it;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryResult: m_queryResults size: " << m_queryResults.size() << std::endl;)
if(m_queryResults.size() != 0){
for(it=m_queryResults.begin(); it!=m_queryResults.end(); it++){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::findQueryResult: m_QueryResult ids: [" << it->first->part1() << ":"
<< it->first->part2() << "]\n";)
}
}
it=this->m_queryResults.find(const_cast<exec::shared::QueryId * const>(&qid));
if(it!=this->m_queryResults.end()){
pDrillClientQueryResult=(*it).second;
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Drill Client Query Result Query Id - " <<
debugPrintQid(*pDrillClientQueryResult->m_pQueryId) << std::endl;)
}
return pDrillClientQueryResult;
}
status_t DrillClientImpl::processQueryStatusResult(exec::shared::QueryResult* qr,
DrillClientQueryResult* pDrillClientQueryResult){
status_t ret = QUERYSTATE_TO_STATUS_MAP[qr->query_state()];
if(pDrillClientQueryResult!=NULL){
pDrillClientQueryResult->setQueryStatus(ret);
pDrillClientQueryResult->setQueryState(qr->query_state());
}
switch(qr->query_state()) {
case exec::shared::QueryResult_QueryState_FAILED:
{
// get the error message from protobuf and handle errors
ret = (0 == qr->error_size()) ?
handleQryError(ret, "Unknown protobuf error.", pDrillClientQueryResult) : handleQryError(ret, qr->error(0), pDrillClientQueryResult);
}
break;
// m_pendingRequests should be decremented when the query is
// completed
case exec::shared::QueryResult_QueryState_CANCELED:
{
ret=handleTerminatedQryState(ret,
getMessage(ERR_QRY_CANCELED),
pDrillClientQueryResult);
m_pendingRequests--;
}
break;
case exec::shared::QueryResult_QueryState_COMPLETED:
{
//Not clean to call the handleTerminatedQryState method
//because it signals an error to the listener.
//The ODBC driver expects this though and the sync API
//handles this (luckily).
ret=handleTerminatedQryState(ret,
getMessage(ERR_QRY_COMPLETED),
pDrillClientQueryResult);
m_pendingRequests--;
}
break;
default:
{
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::processQueryStatusResult: Unknown Query State.\n";)
ret=handleQryError(QRY_INTERNAL_ERROR,
getMessage(ERR_QRY_UNKQRYSTATE),
pDrillClientQueryResult);
}
break;
}
return ret;
}
void DrillClientImpl::handleReadTimeout(const boost::system::error_code & err){
// if err == boost::asio::error::operation_aborted) then the caller cancelled the timer.
if(err != boost::asio::error::operation_aborted){
// Check whether the deadline has passed.
if (m_deadlineTimer.expires_at() <= boost::asio::deadline_timer::traits_type::now()){
// The deadline has passed.
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleReadTimeout: Deadline timer expired; ERR_QRY_TIMOUT. \n";)
handleQryError(QRY_TIMEOUT, getMessage(ERR_QRY_TIMOUT), NULL);
// There is no longer an active deadline. The expiry is set to positive
// infinity so that the timer never expires until a new deadline is set.
// Note that at this time, the caller is not in a (async) wait for the timer.
m_deadlineTimer.expires_at(boost::posix_time::pos_infin);
// Cancel all pending async IOs.
// The cancel call _MAY_ not work on all platforms. To be a little more reliable we need
// to have the BOOST_ASIO_ENABLE_CANCELIO macro (as well as the BOOST_ASIO_DISABLE_IOCP macro?)
// defined. To be really sure, we need to close the socket. Closing the socket is a bit
// drastic and we will defer that till a later release.
#ifdef WIN32_SHUTDOWN_ON_TIMEOUT
{
boost::lock_guard<boost::mutex> lock(m_channelMutex);
if(m_pChannel != NULL) m_pChannel->close();
}
m_pChannel->close();
#else // NOT WIN32_SHUTDOWN_ON_TIMEOUT
{
boost::lock_guard<boost::mutex> lock(m_channelMutex);
if(m_pChannel != NULL) m_pChannel->getInnerSocket().cancel();
}
#endif // WIN32_SHUTDOWN_ON_TIMEOUT
}
}
return;
}
void DrillClientImpl::handleRead(ByteBuf_t inBuf,
const boost::system::error_code& error,
size_t bytes_transferred) {
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handle Read from buffer "
<< reinterpret_cast<int*>(inBuf) << std::endl;)
if(DrillClientConfig::getQueryTimeout() > 0){
// Cancel the timeout if handleRead is called
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Cancel deadline timer.\n";)
m_deadlineTimer.cancel();
}
if (error) {
// boost error
Utils::freeBuffer(inBuf, LEN_PREFIX_BUFLEN);
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_COMMERR. "
"Boost Communication Error: " << error.message() << std::endl;)
handleQryError(QRY_COMM_ERROR, getMessage(ERR_QRY_COMMERR, error.message().c_str()), NULL);
return;
}
rpc::InBoundRpcMessage msg;
boost::lock_guard<boost::mutex> lockPR(this->m_prMutex);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting new message" << std::endl;)
AllocatedBufferPtr allocatedBuffer=NULL;
if((this->*m_fpCurrentReadMsgHandler)(inBuf, &allocatedBuffer, msg)!=QRY_SUCCESS){
delete allocatedBuffer;
if(m_pendingRequests!=0){
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
getNextResult();
}
return;
}
if(msg.m_mode==exec::rpc::PONG) { //heartbeat response. Throw it away
m_pendingRequests--;
delete allocatedBuffer;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Received heartbeat from server. " << std::endl;)
if(m_pendingRequests!=0){
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
getNextResult();
}else{
boost::unique_lock<boost::mutex> cvLock(this->m_dcMutex);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "No more results expected from server. " << std::endl;)
m_cv.notify_one();
}
return;
}
if(msg.m_mode == exec::rpc::RESPONSE) {
status_t s;
switch(msg.m_rpc_type) {
case exec::user::QUERY_HANDLE:
s = processQueryId(allocatedBuffer, msg);
break;
case exec::user::PREPARED_STATEMENT:
s = processPreparedStatement(allocatedBuffer, msg);
break;
case exec::user::CATALOGS:
s = processCatalogsResult(allocatedBuffer, msg);
break;
case exec::user::SCHEMAS:
s = processSchemasResult(allocatedBuffer, msg);
break;
case exec::user::TABLES:
s = processTablesResult(allocatedBuffer, msg);
break;
case exec::user::COLUMNS:
s = processColumnsResult(allocatedBuffer, msg);
break;
case exec::user::HANDSHAKE:
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: Handshake response from server. Ignore.\n";)
delete allocatedBuffer;
break;
case exec::user::SASL_MESSAGE:
processSaslChallenge(allocatedBuffer, msg);
break;
case exec::user::SERVER_META:
processServerMetaResult(allocatedBuffer, msg);
break;
case exec::user::ACK:
// Cancel requests will result in an ACK sent back.
// Consume silently
s = QRY_CANCELED;
delete allocatedBuffer;
break;
default:
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
<< "QueryResult returned " << msg.m_rpc_type << std::endl;)
delete allocatedBuffer;
handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
}
if (m_pendingRequests != 0) {
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
getNextResult();
}
return;
}
if (msg.has_mode() && msg.m_mode == exec::rpc::REQUEST) {
status_t s;
switch(msg.m_rpc_type) {
case exec::user::QUERY_RESULT:
s = processQueryResult(allocatedBuffer, msg);
break;
case exec::user::QUERY_DATA:
s = processQueryData(allocatedBuffer, msg);
break;
default:
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
<< "QueryResult returned " << msg.m_rpc_type << std::endl;)
delete allocatedBuffer;
handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
}
if (m_pendingRequests != 0) {
boost::lock_guard<boost::mutex> lock(this->m_dcMutex);
getNextResult();
}
return;
}
// If not QUERY_RESULT, then we think something serious has gone wrong?
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::handleRead: ERR_QRY_INVRPCTYPE. "
<< "QueryResult returned " << msg.m_rpc_type << " for " << msg.m_mode << std::endl;)
handleQryError(QRY_INTERNAL_ERROR, getMessage(ERR_QRY_INVRPCTYPE, msg.m_rpc_type), NULL);
delete allocatedBuffer;
}
status_t DrillClientImpl::validateDataMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryData& qd, std::string& valErr){
if(msg.m_mode == exec::rpc::RESPONSE_FAILURE){
valErr=getMessage(ERR_QRY_RESPFAIL);
return QRY_FAILURE;
}
if(qd.def().carries_two_byte_selection_vector() == true){
valErr=getMessage(ERR_QRY_SELVEC2);
return QRY_FAILURE;
}
return QRY_SUCCESS;
}
status_t DrillClientImpl::validateResultMessage(const rpc::InBoundRpcMessage& msg, const exec::shared::QueryResult& qr, std::string& valErr){
if(msg.m_mode == exec::rpc::RESPONSE_FAILURE){
valErr=getMessage(ERR_QRY_RESPFAIL);
return QRY_FAILURE;
}
if(qr.query_state()==exec::shared::QueryResult_QueryState_CANCELED){
valErr=getMessage(ERR_QRY_CANCELED);
return QRY_FAILURE;
}
return QRY_SUCCESS;
}
bool DrillClientImpl::validateResultRPCType(DrillClientQueryHandle* pQueryHandle, const rpc::InBoundRpcMessage& msg) {
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "DrillClientImpl::validateResultRPCType" << std::endl;)
if (NULL != pQueryHandle) {
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)
<< "DrillClientImpl::validateResultRPCType: Expected RPC Type: "
<< pQueryHandle->getExpectedRPCType()
<< " inbound RPC Type: "
<< msg.m_rpc_type
<< std::endl;)
return (pQueryHandle->getExpectedRPCType() == msg.m_rpc_type);
}
return false;
}
/*
* Called when there is failure in connect/send.
*/
connectionStatus_t DrillClientImpl::handleConnError(connectionStatus_t status, const std::string& msg){
DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg);
m_pendingRequests=0;
if(!m_queryHandles.empty()){
// set query error only if queries are running
broadcastError(pErr);
}else{
if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
m_pError=pErr;
shutdownSocket();
}
return status;
}
connectionStatus_t DrillClientImpl::handleConnError(DrillClientError* err){
DrillClientError* pErr = new DrillClientError(*err);
m_pendingRequests=0;
if(!m_queryHandles.empty()){
// set query error only if queries are running
broadcastError(pErr);
}else{
if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
m_pError=pErr;
shutdownSocket();
}
return (connectionStatus_t)pErr->status;
}
/*
* Always called with NULL QueryHandle when there is any error while reading data from socket. Once enough data is read
* and a valid RPC message is formed then it can get called with NULL/valid QueryHandle depending on if QueryHandle is found
* for the created RPC message.
*/
status_t DrillClientImpl::handleQryError(status_t status, const std::string& msg, DrillClientQueryHandle* pQueryHandle){
DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg);
// Set query error only if queries are running. If valid QueryHandle that means the bytes to form a valid
// RPC message was read successfully from socket. So there is no socket/connection issues.
if(pQueryHandle!=NULL){
m_pendingRequests--;
pQueryHandle->signalError(pErr);
}else{ // This means error was while reading from socket, hence call broadcastError which eventually closes socket.
m_pendingRequests=0;
broadcastError(pErr);
}
return status;
}
/*
* Always called with valid QueryHandle when there is any error processing Query related data.
*/
status_t DrillClientImpl::handleQryError(status_t status,
const exec::shared::DrillPBError& e,
DrillClientQueryHandle* pQueryHandle){
assert(pQueryHandle!=NULL);
DrillClientError* pErr = DrillClientError::getErrorObject(e);
pQueryHandle->signalError(pErr);
m_pendingRequests--;
return status;
}
status_t DrillClientImpl::handleQryCancellation(status_t status, DrillClientQueryResult* pQueryHandle) {
sendCancel(&pQueryHandle->getQueryId());
// Do not decrement pending requests here. We have sent a cancel and we may still receive results that are
// pushed on the wire before the cancel is processed.
pQueryHandle->setIsQueryPending(false);
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Client app cancelled query." << std::endl;)
pQueryHandle->setQueryStatus(status);
removeQueryHandle(pQueryHandle);
return status;
}
void DrillClientImpl::broadcastError(DrillClientError* pErr){
if(pErr!=NULL){
std::map<int, DrillClientQueryHandle*>::const_iterator iter;
if(!m_queryHandles.empty()){
for(iter = m_queryHandles.begin(); iter != m_queryHandles.end(); iter++) {
DrillClientError* err=new DrillClientError(pErr->status, pErr->errnum, pErr->msg);
iter->second->signalError(err);
}
}
delete pErr;
}
// We have an error at the connection level. Cancel the heartbeat.
// And close the connection
m_heartbeatTimer.cancel();
m_pendingRequests=0;
m_cv.notify_one();
shutdownSocket();
return;
}
// The implementation is similar to handleQryError
status_t DrillClientImpl::handleTerminatedQryState(
status_t status,
const std::string& msg,
DrillClientQueryResult* pQueryResult){
assert(pQueryResult!=NULL);
if(status==QRY_COMPLETED){
pQueryResult->signalComplete();
}else{
// set query error only if queries did not complete successfully
DrillClientError* pErr = new DrillClientError(status, DrillClientError::QRY_ERROR_START+status, msg);
pQueryResult->signalError(pErr);
}
return status;
}
void DrillClientImpl::removeQueryHandle(DrillClientQueryHandle* pQueryHandle){
boost::lock_guard<boost::mutex> lock(m_dcMutex);
// Removing first the base handle
for(std::map<int, DrillClientQueryHandle*>::const_iterator iter=m_queryHandles.begin(); iter!=m_queryHandles.end(); iter++) {
if(pQueryHandle==(DrillClientQueryHandle*)iter->second){
m_queryHandles.erase(iter->first);
break;
}
}
// if the query handle is a result handle, m_queryResults also need to be cleaned.
DrillClientQueryResult* pQueryResult = dynamic_cast<DrillClientQueryResult*>(pQueryHandle);
if (pQueryResult) {
for(std::map<exec::shared::QueryId*, DrillClientQueryResult*, compareQueryId>::const_iterator it=m_queryResults.begin(); it!=m_queryResults.end(); it++) {
if(pQueryResult==(DrillClientQueryResult*)it->second){
m_queryResults.erase(it->first);
break;
}
}
}
}
void DrillClientImpl::sendAck(const rpc::InBoundRpcMessage& msg, bool isOk){
exec::rpc::Ack ack;
ack.set_ok(isOk);
rpc::OutBoundRpcMessage ack_msg(exec::rpc::RESPONSE, exec::user::ACK, msg.m_coord_id, &ack);
boost::lock_guard<boost::mutex> lock(m_dcMutex);
sendSyncCommon(ack_msg);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "ACK sent" << std::endl;)
}
void DrillClientImpl::sendCancel(const exec::shared::QueryId* pQueryId){
boost::lock_guard<boost::mutex> lock(m_dcMutex);
uint64_t coordId = this->getNextCoordinationId();
rpc::OutBoundRpcMessage cancel_msg(exec::rpc::REQUEST, exec::user::CANCEL_QUERY, coordId, pQueryId);
sendSyncCommon(cancel_msg);
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "CANCEL sent" << std::endl;)
}
void DrillClientImpl::shutdownSocket(){
m_pendingRequests=0;
m_heartbeatTimer.cancel();
m_deadlineTimer.cancel();
{
boost::lock_guard<boost::mutex> lock(m_channelMutex);
if (m_pChannel != NULL) {
m_pChannel->close();
}
}
m_io_service.stop();
m_bIsConnected=false;
// Delete the saslAuthenticatorImpl instance since connection is broken. It will recreated on next
// call to connect.
if (m_saslAuthenticator != NULL) {
{
boost::mutex::scoped_lock lock(m_sasl_dispose_mutex);
if (m_saslAuthenticator != NULL) {
delete m_saslAuthenticator;
m_saslAuthenticator = NULL;
}
}
}
// Reset the SASL states.
m_saslDone = false;
m_saslResultCode = SASL_OK;
// Reset the encryption context since connection is invalid
m_encryptionCtxt.reset();
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Socket shutdown" << std::endl;)
}
namespace { // anonymous
}
namespace { // anonymous
// Helper class to wait on ServerMeta results
struct ServerMetaContext {
ServerMetaContext() : m_done(false), m_status(QRY_FAILURE)
{
; // Do nothing.
}
bool m_done;
status_t m_status;
exec::user::ServerMeta m_serverMeta;
boost::mutex m_mutex;
boost::condition_variable m_cv;
static status_t listener(void* ctx, const exec::user::ServerMeta* serverMeta, DrillClientError* err) {
ServerMetaContext* context = static_cast<ServerMetaContext*>(ctx);
if (err) {
context->m_status = QRY_FAILURE;
} else {
context->m_status = QRY_SUCCESS;
context->m_serverMeta.CopyFrom(*serverMeta);
}
{
boost::lock_guard<boost::mutex> lock(context->m_mutex);
context->m_done = true;
}
context->m_cv.notify_one();
return QRY_SUCCESS;
}
};
}
meta::DrillMetadata* DrillClientImpl::getMetadata() {
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Getting metadata" << std::endl;)
if (std::find(m_supportedMethods.begin(), m_supportedMethods.end(), exec::user::GET_SERVER_META) == m_supportedMethods.end()) {
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Server metadata not supported " << m_supportedMethods.size() << ". Falling back to default." << std::endl;)
return new meta::DrillMetadata(*this, meta::DrillMetadata::s_defaultServerMeta);
}
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Server metadata supported." << std::endl;)
exec::user::GetServerMetaReq req;
ServerMetaContext ctx;
boost::function<DrillClientServerMetaHandle*(int32_t)> factory = boost::bind(
boost::factory<DrillClientServerMetaHandle*>(),
boost::ref(*this),
_1,
ServerMetaContext::listener,
&ctx);
// Getting a query handle, and make sure to free when done
boost::shared_ptr<DrillClientServerMetaHandle> handle = boost::shared_ptr<DrillClientServerMetaHandle>(
sendMsg(factory, exec::user::GET_SERVER_META, req),
boost::bind(&DrillClientImpl::freeQueryResources, this, _1));
{
boost::unique_lock<boost::mutex> lock(ctx.m_mutex);
while(!ctx.m_done) {
ctx.m_cv.wait(lock);
}
}
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Server metadata received." << std::endl;)
if (ctx.m_status != QRY_SUCCESS) {
return NULL;
}
return new meta::DrillMetadata(*this, ctx.m_serverMeta);
}
void DrillClientImpl::freeMetadata(meta::DrillMetadata* metadata) {
delete metadata;
}
// This COPIES the FieldMetadata definition for the record batch. ColumnDefs held by this
// class are used by the async callbacks.
status_t DrillClientQueryResult::setupColumnDefs(exec::shared::QueryData* pQueryData) {
bool hasSchemaChanged=false;
bool isFirstIter=false;
boost::lock_guard<boost::mutex> schLock(this->m_schemaMutex);
isFirstIter=this->m_numBatches==1?true:false;
std::map<std::string, Drill::FieldMetadata*> oldSchema;
if(!m_columnDefs->empty()){
for(std::vector<Drill::FieldMetadata*>::iterator it = this->m_columnDefs->begin(); it != this->m_columnDefs->end(); ++it){
// the key is the field_name + type
char type[256];
snprintf(type, sizeof(type), ":%d:%d",(*it)->getMinorType(), (*it)->getDataMode() );
std::string k= (*it)->getName()+type;
oldSchema[k]=*it;
delete *it;
}
}
m_columnDefs->clear();
size_t numFields=pQueryData->def().field_size();
if (numFields > 0){
for(size_t i=0; i<numFields; i++){
Drill::FieldMetadata* fmd= new Drill::FieldMetadata;
fmd->set(pQueryData->def().field(i));
this->m_columnDefs->push_back(fmd);
//Look for changes in the vector and trigger a Schema change event if necessary.
//If vectors are different, then call the schema change listener.
char type[256];
snprintf(type, sizeof(type), ":%d:%d",fmd->getMinorType(), fmd->getDataMode() );
std::string k= fmd->getName()+type;
std::map<std::string, Drill::FieldMetadata*>::iterator iter=oldSchema.find(k);
if(iter==oldSchema.end()){
// not found
hasSchemaChanged=true;
}else{
oldSchema.erase(iter);
}
}
if(oldSchema.size()>0){
hasSchemaChanged=true;
oldSchema.clear();
}
}
this->m_bHasSchemaChanged=hasSchemaChanged&&!isFirstIter;
if(this->m_bHasSchemaChanged){
//invoke schema change Listener
if(m_pSchemaListener!=NULL){
m_pSchemaListener(this, m_columnDefs, NULL);
}
}
return this->m_bHasSchemaChanged?QRY_SUCCESS_WITH_INFO:QRY_SUCCESS;
}
status_t DrillClientQueryResult::defaultQueryResultsListener(void* ctx,
RecordBatch* b,
DrillClientError* err) {
//ctx; // unused, we already have the this pointer
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query result listener called" << std::endl;)
//check if the query has been canceled. IF so then return FAILURE. Caller will send cancel to the server.
if(this->isCancelled()){
if(b!=NULL) delete b;
return QRY_FAILURE;
}
if (!err) {
// signal the cond var
{
if(b!=NULL){
#ifdef DEBUG
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG)<<debugPrintQid(b->getQueryResult()->query_id())
<< "Query result listener saved result to queue." << std::endl;)
#endif
boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
this->m_recordBatches.push(b);
this->m_bHasData=true;
}
}
m_cv.notify_one();
}else{
return QRY_FAILURE;
}
return QRY_SUCCESS;
}
RecordBatch* DrillClientQueryResult::peekNext(){
RecordBatch* pRecordBatch=NULL;
boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
//if no more data, return NULL;
if(!m_bIsQueryPending) return NULL;
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;)
while(!this->m_bHasData && !this->hasError() && m_bIsQueryPending) {
this->m_cv.wait(cvLock);
}
// READ but not remove first element from queue
pRecordBatch = this->m_recordBatches.front();
return pRecordBatch;
}
void DrillClientQueryResult::cancel() {
// Calling parent class
DrillClientBaseHandle<pfnQueryResultsListener, RecordBatch*>::cancel();
// If queryId has already been received, don't wait to send the
// cancellation message
if (this->m_pQueryId) {
this->client().handleQryCancellation(QRY_CANCELED, this);
}
}
RecordBatch* DrillClientQueryResult::getNext() {
RecordBatch* pRecordBatch=NULL;
boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
//if no more data, return NULL;
if(!m_bIsQueryPending){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Query is done." << std::endl;)
if(!m_recordBatches.empty()){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << " But there is a Record batch left behind." << std::endl;)
}
return NULL;
}
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Synchronous read waiting for data." << std::endl;)
while(!this->m_bHasData && !this->hasError() && m_bIsQueryPending){
this->m_cv.wait(cvLock);
}
// remove first element from queue
pRecordBatch = this->m_recordBatches.front();
this->m_recordBatches.pop();
this->m_bHasData=!this->m_recordBatches.empty();
// if vector is empty, set m_bHasDataPending to false;
m_bIsQueryPending=!(this->m_recordBatches.empty()&&m_queryState==exec::shared::QueryResult_QueryState_COMPLETED);
return pRecordBatch;
}
// Blocks until data is available
void DrillClientQueryResult::waitForData() {
boost::unique_lock<boost::mutex> cvLock(this->m_cvMutex);
//if no more data, return NULL;
if(!m_bIsQueryPending) return;
while(!this->m_bHasData && !this->hasError() && m_bIsQueryPending) {
this->m_cv.wait(cvLock);
}
}
template<typename Listener, typename Value>
status_t DrillClientBaseHandle<Listener, Value>::notifyListener(Value v, DrillClientError* pErr){
return m_pApplicationListener(getApplicationContext(), v, pErr);
}
void DrillClientQueryHandle::cancel() {
this->m_bCancel=true;
}
void DrillClientQueryHandle::signalError(DrillClientError* pErr){
// Ignore return values from the listener.
if(pErr!=NULL){
if(m_pError!=NULL){
delete m_pError; m_pError=NULL;
}
m_pError=pErr;
// TODO should it be protected by m_cvMutex?
m_bHasError=true;
}
return;
}
template<typename Listener, typename Value>
void DrillClientBaseHandle<Listener, Value>::signalError(DrillClientError* pErr){
DrillClientQueryHandle::signalError(pErr);
// Ignore return values from the listener.
if(pErr!=NULL){
this->notifyListener(NULL, pErr);
}
}
status_t DrillClientQueryResult::notifyListener(RecordBatch* batch, DrillClientError* pErr) {
pfnQueryResultsListener pResultsListener=getApplicationListener();
if(pResultsListener!=NULL){
return pResultsListener(this, batch, pErr);
}else{
return defaultQueryResultsListener(this, batch, pErr);
}
}
void DrillClientQueryResult::signalError(DrillClientError* pErr){
DrillClientQueryHandle::signalError(pErr);
// Ignore return values from the listener.
if(pErr!=NULL){
this->notifyListener(NULL, pErr);
{
boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
m_bIsQueryPending=false;
m_bHasData=false;
}
//Signal the cv in case there is a client waiting for data already.
m_cv.notify_one();
}
return;
}
void DrillClientQueryResult::signalComplete(){
this->notifyListener(NULL, NULL);
{
boost::lock_guard<boost::mutex> cvLock(this->m_cvMutex);
m_bIsQueryPending=!(this->m_recordBatches.empty()&&m_queryState==exec::shared::QueryResult_QueryState_COMPLETED);
resetError();
}
//Signal the cv in case there is a client waiting for data already.
m_cv.notify_one();
return;
}
void DrillClientQueryHandle::clearAndDestroy(){
//Tell the parent to remove this from its lists
m_client.removeQueryHandle(this);
if(m_pError!=NULL){
delete m_pError; m_pError=NULL;
}
}
void DrillClientQueryResult::clearAndDestroy(){
DrillClientQueryHandle::clearAndDestroy();
//free memory allocated for FieldMetadata objects saved in m_columnDefs;
if(!m_columnDefs->empty()){
for(std::vector<Drill::FieldMetadata*>::iterator it = m_columnDefs->begin(); it != m_columnDefs->end(); ++it){
delete *it;
}
m_columnDefs->clear();
}
if(this->m_pQueryId!=NULL){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Clearing state for Query Id - " << debugPrintQid(*this->m_pQueryId) << std::endl;)
}
//clear query id map entries.
if(this->m_pQueryId!=NULL){
delete this->m_pQueryId; this->m_pQueryId=NULL;
}
if(!m_recordBatches.empty()){
// When multiple queries execute in parallel we sometimes get an empty record batch back from the server _after_
// the last chunk has been received. We eventually delete it.
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Freeing Record batch(es) left behind "<< std::endl;)
RecordBatch* pR=NULL;
while(!m_recordBatches.empty()){
pR=m_recordBatches.front();
m_recordBatches.pop();
delete pR;
}
}
}
status_t DrillClientPrepareHandle::setupPreparedStatement(const exec::user::PreparedStatement& pstmt) {
// Get columns schema information
const ::google::protobuf::RepeatedPtrField< ::exec::user::ResultColumnMetadata>& columns = pstmt.columns();
for(::google::protobuf::RepeatedPtrField< ::exec::user::ResultColumnMetadata>::const_iterator it = columns.begin(); it != columns.end(); ++it) {
FieldMetadata* metadata = new FieldMetadata;
metadata->set(*it);
m_columnDefs->push_back(metadata);
}
// Copy server handle
if (pstmt.has_server_handle()){
this->m_preparedStatementHandle.CopyFrom(pstmt.server_handle());
return QRY_SUCCESS;
}
return QRY_FAILURE;
}
void DrillClientPrepareHandle::clearAndDestroy(){
DrillClientQueryHandle::clearAndDestroy();
//free memory allocated for FieldMetadata objects saved in m_columnDefs;
if(!m_columnDefs->empty()){
for(std::vector<Drill::FieldMetadata*>::iterator it = m_columnDefs->begin(); it != m_columnDefs->end(); ++it){
delete *it;
}
m_columnDefs->clear();
}
}
connectionStatus_t PooledDrillClientImpl::connect(const char* connStr, DrillUserProperties* props){
connectionStatus_t stat = CONN_SUCCESS;
std::string pathToDrill, protocol, hostPortStr;
std::string host;
std::string port;
m_connectStr=connStr;
Utils::parseConnectStr(connStr, pathToDrill, protocol, hostPortStr);
if(!strcmp(protocol.c_str(), "zk")){
// Get a list of drillbits
ZookeeperClient zook(pathToDrill);
std::vector<std::string> drillbits;
int err = zook.getAllDrillbits(hostPortStr, drillbits);
if(!err){
if (drillbits.empty()){
return handleConnError(CONN_FAILURE, getMessage(ERR_CONN_ZKNODBIT));
}
Utils::shuffle(drillbits);
// The original shuffled order is maintained if we shuffle first and then add any missing elements
Utils::add(m_drillbits, drillbits);
exec::DrillbitEndpoint e;
size_t nextIndex=0;
{
boost::lock_guard<boost::mutex> cLock(m_cMutex);
m_lastConnection++;
nextIndex = (m_lastConnection)%(getDrillbitCount());
}
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Pooled Connection"
<< "(" << (void*)this << ")"
<< ": Current counter is: "
<< m_lastConnection << std::endl;)
err=zook.getEndPoint(m_drillbits[nextIndex], e);
if(!err){
host=boost::lexical_cast<std::string>(e.address());
port=boost::lexical_cast<std::string>(e.user_port());
}
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Choosing drillbit <" << nextIndex << ">. Selected " << e.DebugString() << std::endl;)
}
if(err){
return handleConnError(CONN_ZOOKEEPER_ERROR, getMessage(ERR_CONN_ZOOKEEPER, zook.getError().c_str()));
}
zook.close();
m_bIsDirectConnection=false;
}else if(!strcmp(protocol.c_str(), "local")){
char tempStr[MAX_CONNECT_STR+1];
strncpy(tempStr, hostPortStr.c_str(), MAX_CONNECT_STR); tempStr[MAX_CONNECT_STR]=0;
host=strtok(tempStr, ":");
port=strtok(NULL, "");
m_bIsDirectConnection=true;
}else{
return handleConnError(CONN_INVALID_INPUT, getMessage(ERR_CONN_UNKPROTO, protocol.c_str()));
}
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connecting to endpoint: (Pooled) " << host << ":" << port << std::endl;)
DrillClientImpl* pDrillClientImpl = new DrillClientImpl();
stat = pDrillClientImpl->connect(host.c_str(), port.c_str(), props);
if(stat == CONN_SUCCESS){
boost::lock_guard<boost::mutex> lock(m_poolMutex);
m_clientConnections.push_back(pDrillClientImpl);
}else{
DrillClientError* pErr = pDrillClientImpl->getError();
handleConnError((connectionStatus_t)pErr->status, pErr->msg);
delete pDrillClientImpl;
}
return stat;
}
connectionStatus_t PooledDrillClientImpl::validateHandshake(DrillUserProperties* props){
// Assume there is one valid connection to at least one drillbit
connectionStatus_t stat=CONN_FAILURE;
// Keep a copy of the user properties
if(props!=NULL){
m_pUserProperties = boost::shared_ptr<DrillUserProperties>(new DrillUserProperties);
//for(size_t i=0; i<props->size(); i++){
for(std::map<std::string, std::string>::const_iterator propIter = props->begin(); propIter != props->end(); ++propIter){
std::string currKey=propIter->first;
std::string currVal=propIter->second;
m_pUserProperties->setProperty(
currKey,
currVal
);
}
}
DrillClientImpl* pDrillClientImpl = getOneConnection();
if(pDrillClientImpl != NULL){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Validating handshake: (Pooled) " << pDrillClientImpl->m_connectedHost << std::endl;)
stat = pDrillClientImpl->validateHandshake(m_pUserProperties.get());
}
else{
stat = handleConnError(CONN_NOTCONNECTED, getMessage(ERR_CONN_NOCONN));
}
return stat;
}
DrillClientQueryResult* PooledDrillClientImpl::SubmitQuery(::exec::shared::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx){
DrillClientQueryResult* pDrillClientQueryResult = NULL;
DrillClientImpl* pDrillClientImpl = NULL;
pDrillClientImpl = getOneConnection();
if(pDrillClientImpl != NULL){
pDrillClientQueryResult=pDrillClientImpl->SubmitQuery(t,plan,listener,listenerCtx);
m_queriesExecuted++;
}
return pDrillClientQueryResult;
}
DrillClientPrepareHandle* PooledDrillClientImpl::PrepareQuery(const std::string& plan, pfnPreparedStatementListener listener, void* listenerCtx){
DrillClientPrepareHandle* pDrillClientPrepareHandle = NULL;
DrillClientImpl* pDrillClientImpl = NULL;
pDrillClientImpl = getOneConnection();
if(pDrillClientImpl != NULL){
pDrillClientPrepareHandle=pDrillClientImpl->PrepareQuery(plan,listener,listenerCtx);
m_queriesExecuted++;
}
return pDrillClientPrepareHandle;
}
DrillClientQueryResult* PooledDrillClientImpl::ExecuteQuery(const PreparedStatement& pstmt, pfnQueryResultsListener listener, void* listenerCtx){
DrillClientQueryResult* pDrillClientQueryResult = NULL;
DrillClientImpl* pDrillClientImpl = NULL;
pDrillClientImpl = getOneConnection();
if(pDrillClientImpl != NULL){
pDrillClientQueryResult=pDrillClientImpl->ExecuteQuery(pstmt, listener, listenerCtx);
m_queriesExecuted++;
}
return pDrillClientQueryResult;
}
void PooledDrillClientImpl::freeQueryResources(DrillClientQueryHandle* pQryHandle){
// If this class ever keeps track of executing queries then it will need
// to implement this call to free any query specific resources the pool might have
// allocated
pQryHandle->client().freeQueryResources(pQryHandle);
}
meta::DrillMetadata* PooledDrillClientImpl::getMetadata() {
meta::DrillMetadata* metadata = NULL;
DrillClientImpl* pDrillClientImpl = getOneConnection();
if (pDrillClientImpl != NULL) {
metadata = pDrillClientImpl->getMetadata();
}
return metadata;
}
void PooledDrillClientImpl::freeMetadata(meta::DrillMetadata* metadata) {
metadata->client().freeMetadata(metadata);
}
bool PooledDrillClientImpl::Active(){
boost::lock_guard<boost::mutex> lock(m_poolMutex);
for(std::vector<DrillClientImpl*>::const_iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
if((*it)->Active()){
return true;
}
}
return false;
}
void PooledDrillClientImpl::Close() {
boost::lock_guard<boost::mutex> lock(m_poolMutex);
for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
(*it)->Close();
delete *it;
}
m_clientConnections.clear();
m_pUserProperties.reset();
if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
m_lastConnection=-1;
m_queriesExecuted=0;
}
DrillClientError* PooledDrillClientImpl::getError(){
std::string errMsg;
std::string nl="";
uint32_t stat;
boost::lock_guard<boost::mutex> lock(m_poolMutex);
for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
if((*it)->getError() != NULL){
errMsg+=nl+"Query"/*+(*it)->queryId() +*/":"+(*it)->getError()->msg;
stat=(*it)->getError()->status;
}
}
if(errMsg.length()>0){
if(m_pError!=NULL){ delete m_pError; m_pError=NULL; }
m_pError = new DrillClientError(stat, DrillClientError::QRY_ERROR_START+stat, errMsg);
}
return m_pError;
}
//Waits as long as any one drillbit connection has results pending
void PooledDrillClientImpl::waitForResults(){
boost::lock_guard<boost::mutex> lock(m_poolMutex);
for(std::vector<DrillClientImpl*>::iterator it = m_clientConnections.begin(); it != m_clientConnections.end(); ++it){
(*it)->waitForResults();
}
return;
}
connectionStatus_t PooledDrillClientImpl::handleConnError(connectionStatus_t status, std::string msg){
DrillClientError* pErr = new DrillClientError(status, DrillClientError::CONN_ERROR_START+status, msg);
DRILL_MT_LOG(DRILL_LOG(LOG_DEBUG) << "Connection Error: (Pooled) " << pErr->msg << std::endl;)
if(m_pError!=NULL){ delete m_pError; m_pError=NULL;}
m_pError=pErr;
return status;
}
DrillClientImpl* PooledDrillClientImpl::getOneConnection(){
DrillClientImpl* pDrillClientImpl = NULL;
while(pDrillClientImpl==NULL){
if(m_queriesExecuted == 0){
// First query ever sent can use the connection already established to handleAuthentication the user
boost::lock_guard<boost::mutex> lock(m_poolMutex);
pDrillClientImpl=m_clientConnections[0];// There should be one connection in the list when the first query is executed
}else if(m_clientConnections.size() == m_maxConcurrentConnections){
// Pool is full. Use one of the already established connections
boost::lock_guard<boost::mutex> lock(m_poolMutex);
pDrillClientImpl = m_clientConnections[m_queriesExecuted%m_maxConcurrentConnections];
if(!pDrillClientImpl->Active()){
Utils::eraseRemove(m_clientConnections, pDrillClientImpl);
pDrillClientImpl=NULL;
}
}else{
int tries=0;
connectionStatus_t ret=CONN_SUCCESS;
while(pDrillClientImpl==NULL && tries++ < 3){
if((ret=connect(m_connectStr.c_str(), m_pUserProperties.get()))==CONN_SUCCESS){
boost::lock_guard<boost::mutex> lock(m_poolMutex);
pDrillClientImpl=m_clientConnections.back();
ret=pDrillClientImpl->validateHandshake(m_pUserProperties.get());
if(ret!=CONN_SUCCESS){
delete pDrillClientImpl; pDrillClientImpl=NULL;
m_clientConnections.erase(m_clientConnections.end());
}
}
} // try a few times
if(ret!=CONN_SUCCESS){
break;
}
} // need a new connection
}// while
if(pDrillClientImpl==NULL){
connectionStatus_t status = CONN_NOTCONNECTED;
handleConnError(status, getMessage(ERR_CONN_NOCONN));
}
return pDrillClientImpl;
}
} // namespace Drill