blob: 3959c3d137981a8da4d20fcebbfe0efafb21f8b2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <boost/bind.hpp>
#include <drill/drillClient.hpp>
#include <drill/drillConfig.hpp>
#include "zookeeperClient.hpp"
#include "errmsgs.hpp"
#include "logger.hpp"
namespace Drill {
std::string ZookeeperClient::s_defaultDrillPath("/drill/drillbits1");
static void watcherCallback(zhandle_t *zzh, int type, int state, const char *path, void* context) {
static_cast<ZookeeperClient*>(context)->watcher(zzh, type, state, path, context);
}
ZookeeperClient::ZookeeperClient(const std::string& drillPath)
: p_zh(), m_state(), m_path(drillPath) {
m_bConnecting=true;
memset(&m_id, 0, sizeof(m_id));
}
ZookeeperClient::~ZookeeperClient(){
}
ZooLogLevel ZookeeperClient::getZkLogLevel(){
//typedef enum {ZOO_LOG_LEVEL_ERROR=1,
// ZOO_LOG_LEVEL_WARN=2,
// ZOO_LOG_LEVEL_INFO=3,
// ZOO_LOG_LEVEL_DEBUG=4
//} ZooLogLevel;
switch(DrillClientConfig::getLogLevel()){
case LOG_TRACE:
case LOG_DEBUG:
return ZOO_LOG_LEVEL_DEBUG;
case LOG_INFO:
return ZOO_LOG_LEVEL_INFO;
case LOG_WARNING:
return ZOO_LOG_LEVEL_WARN;
case LOG_ERROR:
case LOG_FATAL:
default:
return ZOO_LOG_LEVEL_ERROR;
}
return ZOO_LOG_LEVEL_ERROR;
}
void ZookeeperClient::watcher(zhandle_t *zzh, int type, int state, const char *path, void*) {
//From cli.c
/* Be careful using zh here rather than zzh - as this may be mt code
* the client lib may call the watcher before zookeeper_init returns */
this->m_state=state;
if (type == ZOO_SESSION_EVENT) {
if (state == ZOO_CONNECTED_STATE) {
} else if (state == ZOO_AUTH_FAILED_STATE) {
this->m_err= getMessage(ERR_CONN_ZKNOAUTH);
this->close();
} else if (state == ZOO_EXPIRED_SESSION_STATE) {
this->m_err= getMessage(ERR_CONN_ZKEXP);
this->close();
}
}
// signal the cond var
{
if (state == ZOO_CONNECTED_STATE){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Connected to Zookeeper." << std::endl;)
}
boost::lock_guard<boost::mutex> bufferLock(this->m_cvMutex);
this->m_bConnecting=false;
}
this->m_cv.notify_one();
}
int ZookeeperClient::getAllDrillbits(const std::string& connectStr, std::vector<std::string>& drillbits){
uint32_t waitTime=30000; // 10 seconds
zoo_set_debug_level(getZkLogLevel());
zoo_deterministic_conn_order(1); // enable deterministic order
p_zh = boost::shared_ptr<zhandle_t>(zookeeper_init(connectStr.c_str(), &watcherCallback, waitTime, &m_id, this, 0), zookeeper_close);
if(!p_zh) {
m_err = getMessage(ERR_CONN_ZKFAIL);
return -1;
}
m_err="";
//Wait for the completion handler to signal successful connection
boost::unique_lock<boost::mutex> bufferLock(this->m_cvMutex);
boost::system_time const timeout=boost::get_system_time()+ boost::posix_time::milliseconds(waitTime);
while(this->m_bConnecting) {
if(!this->m_cv.timed_wait(bufferLock, timeout)){
m_err = getMessage(ERR_CONN_ZKTIMOUT);
return -1;
}
}
if(m_state!=ZOO_CONNECTED_STATE){
return -1;
}
int rc = ZOK;
struct String_vector drillbitsVector;
rc=zoo_get_children(p_zh.get(), m_path.c_str(), 0, &drillbitsVector);
if(rc!=ZOK){
m_err=getMessage(ERR_CONN_ZKERR, rc);
p_zh.reset();
return -1;
}
// Make sure we deallocate drillbitsVector properly when we exit
boost::shared_ptr<String_vector> guard(&drillbitsVector, deallocate_String_vector);
if(drillbitsVector.count > 0){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "Found " << drillbitsVector.count << " drillbits in cluster ("
<< connectStr << "/" << m_path
<< ")." <<std::endl;)
for(int i=0; i<drillbitsVector.count; i++){
drillbits.push_back(drillbitsVector.data[i]);
}
for(int i=0; i<drillbits.size(); i++){
DRILL_MT_LOG(DRILL_LOG(LOG_TRACE) << "\t Unshuffled Drillbit id: " << drillbits[i] << std::endl;)
}
}
return 0;
}
int ZookeeperClient::getEndPoint(const std::string& drillbit, exec::DrillbitEndpoint& endpoint){
int rc = ZOK;
// pick the drillbit at 'index'
std::string s(m_path + "/" + drillbit);
int buffer_len=MAX_CONNECT_STR;
char buffer[MAX_CONNECT_STR+1];
struct Stat stat;
buffer[MAX_CONNECT_STR]=0;
rc= zoo_get(p_zh.get(), s.c_str(), 0, buffer, &buffer_len, &stat);
if(rc!=ZOK){
m_err=getMessage(ERR_CONN_ZKDBITERR, rc);
return -1;
}
exec::DrillServiceInstance drillServiceInstance;
drillServiceInstance.ParseFromArray(buffer, buffer_len);
endpoint=drillServiceInstance.endpoint();
if(p_zh!=NULL && m_state==ZOO_CONNECTED_STATE){
DRILL_LOG(LOG_TRACE) << drillServiceInstance.DebugString() << std::endl;
}
return 0;
}
void ZookeeperClient::close(){
p_zh.reset();
}
} /* namespace Drill */