blob: 9bcf2d48a28e78ac2b90c4c5559bd091bd208cac [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.phoenix.loadbalancer.service;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.net.HostAndPort;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.CloseableUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.ConnectException;
import java.util.ArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.io.Closeable;
import java.util.List;
/**
* LoadBalancer class is singleton , used by the client
* to find out about various location of PQS servers.
* The client needs to configure the HBase-site.xml with the needed
* properties i.e. location of zookeeper ( or service locator), username, password etc.
*/
public class LoadBalancer {
private static final LoadBalanceZookeeperConf CONFIG = new LoadBalanceZookeeperConfImpl(HBaseConfiguration.create());
private static CuratorFramework curaFramework = null;
protected static final Logger LOG = LoggerFactory.getLogger(LoadBalancer.class);
private static PathChildrenCache cache = null;
private static final LoadBalancer loadBalancer = new LoadBalancer();
private ConnectionStateListener connectionStateListener = null;
private UnhandledErrorListener unhandledErrorListener = null;
private List<Closeable> closeAbles = Lists.newArrayList();
private LoadBalancer() {
try {
start();
}catch(Exception ex){
LOG.error("Exception while creating a zookeeper clients and cache",ex);
if ((curaFramework != null) && (connectionStateListener != null)){
curaFramework.getConnectionStateListenable()
.removeListener(connectionStateListener);
}
if ((curaFramework != null) && (unhandledErrorListener != null)){
curaFramework.getUnhandledErrorListenable()
.removeListener(unhandledErrorListener);
}
for (Closeable closeable : closeAbles) {
CloseableUtils.closeQuietly(closeable);
}
}
}
/**
* Return Singleton Load Balancer every single time.
* @return LoadBalancer
*/
public static LoadBalancer getLoadBalancer() {
return loadBalancer;
}
/**
* It returns the location of Phoenix Query Server
* in form of Guava <a href="https://google.github.io/guava/releases/19.0/api/docs/com/google/common/net/HostAndPort.html">HostAndPort</a>
* from the cache. The client should catch Exception incase
* the method is unable to fetch PQS location due to network failure or
* in-correct configuration issues.
* @return - return Guava HostAndPort. See <a href="http://google.com">http://google.com</a>
* @throws Exception
*/
public HostAndPort getSingleServiceLocation() throws Exception{
List<HostAndPort> childNodes = conductSanityCheckAndReturn();
// get an random connect string
int i = ThreadLocalRandom.current().nextInt(0, childNodes.size());
return childNodes.get(i);
}
/**
* return locations of all Phoenix Query Servers
* in the form of a List of PQS servers <a href="https://google.github.io/guava/releases/19.0/api/docs/com/google/common/net/HostAndPort.html">HostAndPort</a>
* @return - HostAndPort
* @throws Exception
*/
public List<HostAndPort> getAllServiceLocation() throws Exception{
return conductSanityCheckAndReturn();
}
private List<HostAndPort> conductSanityCheckAndReturn() throws Exception{
Preconditions.checkNotNull(curaFramework
," curator framework in not initialized ");
Preconditions.checkNotNull(cache," cache value is not initialized");
boolean connected = curaFramework.getZookeeperClient().isConnected();
if (!connected) {
String message = " Zookeeper seems to be down. The data is stale ";
ConnectException exception =
new ConnectException(message);
LOG.error(message, exception);
throw exception;
}
List<String> currentNodes = curaFramework.getChildren().forPath(CONFIG.getParentPath());
List<HostAndPort> returnNodes = new ArrayList<>();
String nodeAsString = null;
for(String node:currentNodes) {
try {
returnNodes.add(HostAndPort.fromString(node));
} catch(Throwable ex) {
LOG.error(" something wrong with node string "+nodeAsString,ex);
}
}
return returnNodes;
}
private String getZkConnectString(){
return CONFIG.getZkConnectString();
}
private ConnectionStateListener getConnectionStateListener(){
return new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (!newState.isConnected()) {
LOG.error( " connection to zookeeper broken. It is in "+ newState.name()+" state.");
}
}
};
}
private UnhandledErrorListener getUnhandledErrorListener(){
return new UnhandledErrorListener() {
@Override
public void unhandledError(String message, Throwable e) {
LOG.error("unhandled exception: "+ message,e);
}
};
}
private void start() throws Exception{
curaFramework = CuratorFrameworkFactory.newClient(getZkConnectString(),
new ExponentialBackoffRetry(1000, 3));
curaFramework.start();
curaFramework.setACL().withACL(CONFIG.getAcls());
connectionStateListener = getConnectionStateListener();
curaFramework.getConnectionStateListenable()
.addListener(connectionStateListener);
unhandledErrorListener = getUnhandledErrorListener();
curaFramework.getUnhandledErrorListenable()
.addListener(unhandledErrorListener);
cache = new PathChildrenCache(curaFramework, CONFIG.getParentPath(), true);
cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
closeAbles.add(cache);
closeAbles.add(curaFramework);
}
}