| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| |
| #include <boost/assign.hpp> |
| #include "drill/common.hpp" |
| #include "drill/drillClient.hpp" |
| #include "drill/recordBatch.hpp" |
| #include "drillClientImpl.hpp" |
| #include "errmsgs.hpp" |
| #include "logger.hpp" |
| |
| #include "Types.pb.h" |
| |
| namespace Drill{ |
| |
| DrillClientError* DrillClientError::getErrorObject(const exec::shared::DrillPBError& e){ |
| std::string s=Drill::getMessage(ERR_QRY_FAILURE, e.message().c_str()); |
| DrillClientError* err=NULL; |
| err=new DrillClientError(QRY_FAILURE, QRY_ERROR_START+QRY_FAILURE, s); |
| return err; |
| } |
| |
| DrillClientInitializer::DrillClientInitializer(){ |
| GOOGLE_PROTOBUF_VERIFY_VERSION; |
| } |
| |
| DrillClientInitializer::~DrillClientInitializer(){ |
| google::protobuf::ShutdownProtobufLibrary(); |
| } |
| |
| // Initialize static member of DrillClientConfig |
| logLevel_t DrillClientConfig::s_logLevel=LOG_ERROR; |
| uint64_t DrillClientConfig::s_bufferLimit=MAX_MEM_ALLOC_SIZE; |
| int32_t DrillClientConfig::s_socketTimeout=0; |
| int32_t DrillClientConfig::s_handshakeTimeout=5; |
| int32_t DrillClientConfig::s_queryTimeout=180; |
| int32_t DrillClientConfig::s_heartbeatFrequency=15; // 15 seconds |
| |
| boost::mutex DrillClientConfig::s_mutex; |
| |
| DrillClientConfig::DrillClientConfig(){ |
| initLogging(NULL); |
| } |
| |
| DrillClientConfig::~DrillClientConfig(){ |
| Logger::close(); |
| } |
| |
| void DrillClientConfig::initLogging(const char* path){ |
| Logger::init(path); |
| } |
| |
| void DrillClientConfig::setLogLevel(logLevel_t l){ |
| boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex); |
| s_logLevel=l; |
| Logger::s_level=l; |
| //boost::log::core::get()->set_filter(boost::log::trivial::severity >= s_logLevel); |
| } |
| |
| void DrillClientConfig::setBufferLimit(uint64_t l){ |
| boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex); |
| s_bufferLimit=l; |
| } |
| |
| uint64_t DrillClientConfig::getBufferLimit(){ |
| boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex); |
| return s_bufferLimit; |
| } |
| |
| void DrillClientConfig::setSocketTimeout(int32_t t){ |
| boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex); |
| s_socketTimeout=t; |
| } |
| |
| void DrillClientConfig::setHandshakeTimeout(int32_t t){ |
| if (t > 0) { |
| boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex); |
| s_handshakeTimeout = t; |
| } |
| } |
| |
| void DrillClientConfig::setQueryTimeout(int32_t t){ |
| if (t>0){ |
| boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex); |
| s_queryTimeout=t; |
| } |
| } |
| |
| void DrillClientConfig::setHeartbeatFrequency(int32_t t){ |
| if (t>0){ |
| boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex); |
| s_heartbeatFrequency=t; |
| } |
| } |
| |
| int32_t DrillClientConfig::getSocketTimeout(){ |
| boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex); |
| return s_socketTimeout; |
| } |
| |
| int32_t DrillClientConfig::getHandshakeTimeout(){ |
| boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex); |
| return s_handshakeTimeout; |
| } |
| |
| int32_t DrillClientConfig::getQueryTimeout(){ |
| boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex); |
| return s_queryTimeout; |
| } |
| |
| int32_t DrillClientConfig::getHeartbeatFrequency(){ |
| boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex); |
| return s_heartbeatFrequency; |
| } |
| |
| logLevel_t DrillClientConfig::getLogLevel(){ |
| boost::lock_guard<boost::mutex> configLock(DrillClientConfig::s_mutex); |
| return s_logLevel; |
| } |
| |
| //Using boost assign to initialize maps. |
| const std::map<std::string, uint32_t> DrillUserProperties::USER_PROPERTIES=boost::assign::map_list_of |
| ( USERPROP_USERNAME, USERPROP_FLAGS_SERVERPROP|USERPROP_FLAGS_USERNAME|USERPROP_FLAGS_STRING ) |
| ( USERPROP_PASSWORD, USERPROP_FLAGS_SERVERPROP|USERPROP_FLAGS_PASSWORD) |
| ( USERPROP_SCHEMA, USERPROP_FLAGS_SERVERPROP|USERPROP_FLAGS_STRING) |
| ( USERPROP_USESSL, USERPROP_FLAGS_BOOLEAN|USERPROP_FLAGS_SSLPROP) |
| ( USERPROP_FILEPATH, USERPROP_FLAGS_STRING|USERPROP_FLAGS_SSLPROP|USERPROP_FLAGS_FILEPATH) |
| ( USERPROP_FILENAME, USERPROP_FLAGS_STRING|USERPROP_FLAGS_SSLPROP|USERPROP_FLAGS_FILENAME) |
| ; |
| |
| bool DrillUserProperties::validate(std::string& err){ |
| bool ret=true; |
| //We can add additional validation for any params here |
| return ret; |
| } |
| |
| RecordIterator::~RecordIterator(){ |
| if(m_pColDefs!=NULL){ |
| for(std::vector<Drill::FieldMetadata*>::iterator it=m_pColDefs->begin(); |
| it!=m_pColDefs->end(); |
| ++it){ |
| delete *it; |
| } |
| } |
| delete this->m_pQueryResult; |
| this->m_pQueryResult=NULL; |
| if(this->m_pCurrentRecordBatch!=NULL){ |
| DRILL_LOG(LOG_TRACE) << "Deleted last Record batch " << (void*) m_pCurrentRecordBatch << std::endl; |
| delete this->m_pCurrentRecordBatch; this->m_pCurrentRecordBatch=NULL; |
| } |
| } |
| |
| FieldDefPtr RecordIterator::getColDefs(){ |
| if(m_pQueryResult->hasError()){ |
| return DrillClientQueryResult::s_emptyColDefs; |
| } |
| //NOTE: if query is cancelled, return whatever you have. Client applications job to deal with it. |
| if(this->m_pColDefs==NULL || this->hasSchemaChanged()){ |
| if(this->m_pCurrentRecordBatch==NULL){ |
| this->m_pQueryResult->waitForData(); |
| if(m_pQueryResult->hasError()){ |
| return DrillClientQueryResult::s_emptyColDefs; |
| } |
| } |
| if(this->hasSchemaChanged()){ |
| if(m_pColDefs!=NULL){ |
| for(std::vector<Drill::FieldMetadata*>::iterator it=m_pColDefs->begin(); |
| it!=m_pColDefs->end(); |
| ++it){ |
| delete *it; |
| } |
| m_pColDefs->clear(); |
| //delete m_pColDefs; m_pColDefs=NULL; |
| } |
| } |
| FieldDefPtr pColDefs( new std::vector<Drill::FieldMetadata*>); |
| { //lock after we come out of the wait. |
| boost::lock_guard<boost::mutex> bufferLock(this->m_recordBatchMutex); |
| boost::shared_ptr< std::vector<Drill::FieldMetadata*> > currentColDefs=DrillClientQueryResult::s_emptyColDefs; |
| if(this->m_pCurrentRecordBatch!=NULL){ |
| currentColDefs=this->m_pCurrentRecordBatch->getColumnDefs(); |
| }else{ |
| // This is reached only when the first results have been received but |
| // the getNext call has not been made to retrieve the record batch |
| RecordBatch* pR=this->m_pQueryResult->peekNext(); |
| if(pR!=NULL){ |
| currentColDefs=pR->getColumnDefs(); |
| } |
| } |
| for(std::vector<Drill::FieldMetadata*>::iterator it=currentColDefs->begin(); it!=currentColDefs->end(); ++it){ |
| Drill::FieldMetadata* fmd= new Drill::FieldMetadata; |
| fmd->copy(*(*it));//Yup, that's 2 stars |
| pColDefs->push_back(fmd); |
| } |
| } |
| this->m_pColDefs = pColDefs; |
| } |
| return this->m_pColDefs; |
| } |
| |
| status_t RecordIterator::next(){ |
| status_t ret=QRY_SUCCESS; |
| this->m_currentRecord++; |
| |
| if(!this->m_pQueryResult->isCancelled()){ |
| if(this->m_pCurrentRecordBatch==NULL || this->m_currentRecord==this->m_pCurrentRecordBatch->getNumRecords()){ |
| boost::lock_guard<boost::mutex> bufferLock(this->m_recordBatchMutex); |
| if(this->m_pCurrentRecordBatch !=NULL){ |
| DRILL_LOG(LOG_TRACE) << "Deleted old Record batch " << (void*) m_pCurrentRecordBatch << std::endl; |
| delete this->m_pCurrentRecordBatch; //free the previous record batch |
| this->m_pCurrentRecordBatch=NULL; |
| } |
| this->m_currentRecord=0; |
| this->m_pQueryResult->waitForData(); |
| if(m_pQueryResult->hasError()){ |
| return m_pQueryResult->getErrorStatus(); |
| } |
| this->m_pCurrentRecordBatch=this->m_pQueryResult->getNext(); |
| if(this->m_pCurrentRecordBatch != NULL){ |
| DRILL_LOG(LOG_TRACE) << "Fetched new Record batch " << std::endl; |
| }else{ |
| DRILL_LOG(LOG_TRACE) << "No new Record batch found " << std::endl; |
| } |
| if(this->m_pCurrentRecordBatch==NULL || this->m_pCurrentRecordBatch->getNumRecords()==0){ |
| DRILL_LOG(LOG_TRACE) << "No more data." << std::endl; |
| ret = QRY_NO_MORE_DATA; |
| }else if(this->m_pCurrentRecordBatch->hasSchemaChanged()){ |
| ret=QRY_SUCCESS_WITH_INFO; |
| } |
| } |
| }else{ |
| ret=QRY_CANCEL; |
| } |
| return ret; |
| } |
| |
| /* Gets the ith column in the current record. */ |
| status_t RecordIterator::getCol(size_t i, void** b, size_t* sz){ |
| //TODO: check fields out of bounds without calling getColDefs |
| //if(i>=getColDefs().size()) return QRY_OUT_OF_BOUNDS; |
| //return raw byte buffer |
| if(!this->m_pQueryResult->isCancelled()){ |
| const ValueVectorBase* pVector=this->m_pCurrentRecordBatch->getFields()[i]->getVector(); |
| if(!pVector->isNull(this->m_currentRecord)){ |
| *b=pVector->getRaw(this->m_currentRecord); |
| *sz=pVector->getSize(this->m_currentRecord); |
| }else{ |
| *b=NULL; |
| *sz=0; |
| |
| } |
| return QRY_SUCCESS; |
| }else{ |
| return QRY_CANCEL; |
| } |
| } |
| |
| /* true if ith column in the current record is NULL. */ |
| bool RecordIterator::isNull(size_t i){ |
| if(!this->m_pQueryResult->isCancelled()){ |
| const ValueVectorBase* pVector=this->m_pCurrentRecordBatch->getFields()[i]->getVector(); |
| return pVector->isNull(this->m_currentRecord); |
| }else{ |
| return false; |
| } |
| } |
| |
| status_t RecordIterator::cancel(){ |
| this->m_pQueryResult->cancel(); |
| return QRY_CANCEL; |
| } |
| |
| bool RecordIterator::hasSchemaChanged(){ |
| return m_currentRecord==0 && m_pCurrentRecordBatch!=NULL && m_pCurrentRecordBatch->hasSchemaChanged(); |
| } |
| |
| void RecordIterator::registerSchemaChangeListener(pfnSchemaListener l){ |
| assert(m_pQueryResult!=NULL); |
| this->m_pQueryResult->registerSchemaChangeListener(l); |
| } |
| |
| bool RecordIterator::hasError(){ |
| return m_pQueryResult->hasError(); |
| } |
| |
| const std::string& RecordIterator::getError(){ |
| return m_pQueryResult->getError()->msg; |
| } |
| |
| DrillClientInitializer DrillClient::s_init; |
| |
| DrillClientConfig DrillClient::s_config; |
| |
| void DrillClient::initLogging(const char* path, logLevel_t l){ |
| if(path!=NULL) s_config.initLogging(path); |
| s_config.setLogLevel(l); |
| } |
| |
| DrillClient::DrillClient(){ |
| this->m_pImpl=new DrillClientImpl; |
| } |
| |
| DrillClient::~DrillClient(){ |
| delete this->m_pImpl; |
| } |
| |
| connectionStatus_t DrillClient::connect(const char* connectStr, const char* defaultSchema){ |
| connectionStatus_t ret=CONN_SUCCESS; |
| ret=this->m_pImpl->connect(connectStr); |
| DrillUserProperties props; |
| std::string schema(defaultSchema); |
| props.setProperty(USERPROP_SCHEMA, schema); |
| if(ret==CONN_SUCCESS){ |
| if(defaultSchema!=NULL){ |
| ret=this->m_pImpl->validateHandshake(&props); |
| }else{ |
| ret=this->m_pImpl->validateHandshake(NULL); |
| } |
| } |
| return ret; |
| } |
| |
| connectionStatus_t DrillClient::connect(const char* connectStr, DrillUserProperties* properties){ |
| connectionStatus_t ret=CONN_SUCCESS; |
| ret=this->m_pImpl->connect(connectStr); |
| if(ret==CONN_SUCCESS){ |
| if(properties!=NULL){ |
| ret=this->m_pImpl->validateHandshake(properties); |
| }else{ |
| ret=this->m_pImpl->validateHandshake(NULL); |
| } |
| } |
| return ret; |
| } |
| |
| bool DrillClient::isActive(){ |
| return this->m_pImpl->Active(); |
| } |
| |
| void DrillClient::close() { |
| this->m_pImpl->Close(); |
| } |
| |
| status_t DrillClient::submitQuery(Drill::QueryType t, const std::string& plan, pfnQueryResultsListener listener, void* listenerCtx, QueryHandle_t* qHandle){ |
| |
| ::exec::shared::QueryType castedType = static_cast< ::exec::shared::QueryType> (t); |
| DrillClientQueryResult* pResult=this->m_pImpl->SubmitQuery(castedType, plan, listener, listenerCtx); |
| *qHandle=(QueryHandle_t)pResult; |
| return QRY_SUCCESS; |
| } |
| |
| RecordIterator* DrillClient::submitQuery(Drill::QueryType t, const std::string& plan, DrillClientError* err){ |
| RecordIterator* pIter=NULL; |
| ::exec::shared::QueryType castedType = static_cast< ::exec::shared::QueryType> (t); |
| DrillClientQueryResult* pResult=this->m_pImpl->SubmitQuery(castedType, plan, NULL, NULL); |
| if(pResult){ |
| pIter=new RecordIterator(pResult); |
| } |
| return pIter; |
| } |
| |
| void* DrillClient::getApplicationContext(QueryHandle_t handle){ |
| return ((DrillClientQueryResult*)handle)->getListenerContext(); |
| } |
| |
| status_t DrillClient::getQueryStatus(QueryHandle_t handle){ |
| return ((DrillClientQueryResult*)handle)->getQueryStatus(); |
| } |
| |
| std::string& DrillClient::getError(){ |
| return m_pImpl->getError()->msg; |
| } |
| |
| |
| void DrillClient::waitForResults(){ |
| this->m_pImpl->waitForResults(); |
| } |
| |
| void DrillClient::registerSchemaChangeListener(QueryHandle_t* handle, pfnSchemaListener l){ |
| if(handle!=NULL){ |
| ((DrillClientQueryResult*)(*handle))->registerSchemaChangeListener(l); |
| } |
| } |
| |
| void DrillClient::freeQueryResources(QueryHandle_t* handle){ |
| delete (DrillClientQueryResult*)(*handle); |
| *handle=NULL; |
| } |
| |
| void DrillClient::freeRecordBatch(RecordBatch* pRecordBatch){ |
| delete pRecordBatch; |
| } |
| |
| } // namespace Drill |