blob: 552ceecde7491b336a43cf8268078fe4875f10e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import static org.apache.zookeeper.client.FourLetterWordMain.send4LetterWord;
import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.BindException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.common.X509Exception;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* TODO: Most of the code in this class is ripped from ZooKeeper tests. Instead
* of redoing it, we should contribute updates to their code which let us more
* easily access testing helper objects.
*/
@InterfaceAudience.Public
public class MiniZooKeeperCluster {
private static final Logger LOG = LoggerFactory.getLogger(MiniZooKeeperCluster.class);
private static final int TICK_TIME = 2000;
private static final int TIMEOUT = 1000;
private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
private static final byte[] STATIC_BYTES = Bytes.toBytes("stat");
private final int connectionTimeout;
public static final String LOOPBACK_HOST = InetAddress.getLoopbackAddress().getHostName();
public static final String HOST = LOOPBACK_HOST;
private boolean started;
/**
* The default port. If zero, we use a random port.
*/
private int defaultClientPort = 0;
private final List<NIOServerCnxnFactory> standaloneServerFactoryList;
private final List<ZooKeeperServer> zooKeeperServers;
private final List<Integer> clientPortList;
private int activeZKServerIndex;
private int tickTime = 0;
private final Configuration configuration;
public MiniZooKeeperCluster() {
this(new Configuration());
}
public MiniZooKeeperCluster(Configuration configuration) {
this.started = false;
this.configuration = configuration;
activeZKServerIndex = -1;
zooKeeperServers = new ArrayList<>();
clientPortList = new ArrayList<>();
standaloneServerFactoryList = new ArrayList<>();
connectionTimeout = configuration
.getInt(HConstants.ZK_SESSION_TIMEOUT + ".localHBaseCluster", DEFAULT_CONNECTION_TIMEOUT);
}
/**
* Add a client port to the list.
*
* @param clientPort the specified port
*/
public void addClientPort(int clientPort) {
clientPortList.add(clientPort);
}
/**
* Get the list of client ports.
*
* @return clientPortList the client port list
*/
@InterfaceAudience.Private
public List<Integer> getClientPortList() {
return clientPortList;
}
/**
* Check whether the client port in a specific position of the client port list is valid.
*
* @param index the specified position
*/
private boolean hasValidClientPortInList(int index) {
return (clientPortList.size() > index && clientPortList.get(index) > 0);
}
public void setDefaultClientPort(int clientPort) {
if (clientPort <= 0) {
throw new IllegalArgumentException("Invalid default ZK client port: " + clientPort);
}
this.defaultClientPort = clientPort;
}
/**
* Selects a ZK client port.
*
* @param seedPort the seed port to start with; -1 means first time.
* @return a valid and unused client port
*/
private int selectClientPort(int seedPort) {
int i;
int returnClientPort = seedPort + 1;
if (returnClientPort == 0) {
// If the new port is invalid, find one - starting with the default client port.
// If the default client port is not specified, starting with a random port.
// The random port is selected from the range between 49152 to 65535. These ports cannot be
// registered with IANA and are intended for dynamic allocation (see http://bit.ly/dynports).
if (defaultClientPort > 0) {
returnClientPort = defaultClientPort;
} else {
returnClientPort = 0xc000 + new Random().nextInt(0x3f00);
}
}
// Make sure that the port is unused.
// break when an unused port is found
do {
for (i = 0; i < clientPortList.size(); i++) {
if (returnClientPort == clientPortList.get(i)) {
// Already used. Update the port and retry.
returnClientPort++;
break;
}
}
} while (i != clientPortList.size());
return returnClientPort;
}
public void setTickTime(int tickTime) {
this.tickTime = tickTime;
}
public int getBackupZooKeeperServerNum() {
return zooKeeperServers.size() - 1;
}
public int getZooKeeperServerNum() {
return zooKeeperServers.size();
}
// / XXX: From o.a.zk.t.ClientBase
private static void setupTestEnv() {
// during the tests we run with 100K prealloc in the logs.
// on windows systems prealloc of 64M was seen to take ~15seconds
// resulting in test failure (client timeout on first session).
// set env and directly in order to handle static init/gc issues
System.setProperty("zookeeper.preAllocSize", "100");
FileTxnLog.setPreallocSize(100 * 1024);
// allow all 4 letter words
System.setProperty("zookeeper.4lw.commands.whitelist", "*");
}
public int startup(File baseDir) throws IOException, InterruptedException {
int numZooKeeperServers = clientPortList.size();
if (numZooKeeperServers == 0) {
numZooKeeperServers = 1; // need at least 1 ZK server for testing
}
return startup(baseDir, numZooKeeperServers);
}
/**
* @param baseDir the base directory to use
* @param numZooKeeperServers the number of ZooKeeper servers
* @return ClientPort server bound to, -1 if there was a binding problem and we couldn't pick
* another port.
* @throws IOException if an operation fails during the startup
* @throws InterruptedException if the startup fails
*/
public int startup(File baseDir, int numZooKeeperServers)
throws IOException, InterruptedException {
if (numZooKeeperServers <= 0) {
return -1;
}
setupTestEnv();
shutdown();
int tentativePort = -1; // the seed port
int currentClientPort;
// running all the ZK servers
for (int i = 0; i < numZooKeeperServers; i++) {
File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile();
createDir(dir);
int tickTimeToUse;
if (this.tickTime > 0) {
tickTimeToUse = this.tickTime;
} else {
tickTimeToUse = TICK_TIME;
}
// Set up client port - if we have already had a list of valid ports, use it.
if (hasValidClientPortInList(i)) {
currentClientPort = clientPortList.get(i);
} else {
tentativePort = selectClientPort(tentativePort); // update the seed
currentClientPort = tentativePort;
}
ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
// Setting {min,max}SessionTimeout defaults to be the same as in Zookeeper
server.setMinSessionTimeout(configuration.getInt("hbase.zookeeper.property.minSessionTimeout",
-1));
server.setMaxSessionTimeout(configuration.getInt("hbase.zookeeper.property.maxSessionTimeout",
-1));
NIOServerCnxnFactory standaloneServerFactory;
while (true) {
try {
standaloneServerFactory = new NIOServerCnxnFactory();
standaloneServerFactory.configure(new InetSocketAddress(LOOPBACK_HOST, currentClientPort),
configuration.getInt(HConstants.ZOOKEEPER_MAX_CLIENT_CNXNS,
HConstants.DEFAULT_ZOOKEEPER_MAX_CLIENT_CNXNS));
} catch (BindException e) {
LOG.debug("Failed binding ZK Server to client port: " + currentClientPort, e);
// We're told to use some port but it's occupied, fail
if (hasValidClientPortInList(i)) {
return -1;
}
// This port is already in use, try to use another.
tentativePort = selectClientPort(tentativePort);
currentClientPort = tentativePort;
continue;
}
break;
}
// Start up this ZK server. Dump its stats.
standaloneServerFactory.startup(server);
LOG.info("Started connectionTimeout={}, dir={}, {}", connectionTimeout, dir,
getServerConfigurationOnOneLine(server));
// Runs a 'stat' against the servers.
if (!waitForServerUp(currentClientPort, connectionTimeout)) {
Threads.printThreadInfo(System.out,
"Why is zk standalone server not coming up?");
throw new IOException("Waiting for startup of standalone server; " +
"server isRunning=" + server.isRunning());
}
// We have selected a port as a client port. Update clientPortList if necessary.
if (clientPortList.size() <= i) { // it is not in the list, add the port
clientPortList.add(currentClientPort);
} else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port
clientPortList.remove(i);
clientPortList.add(i, currentClientPort);
}
standaloneServerFactoryList.add(standaloneServerFactory);
zooKeeperServers.add(server);
}
// set the first one to be active ZK; Others are backups
activeZKServerIndex = 0;
started = true;
int clientPort = clientPortList.get(activeZKServerIndex);
LOG.info("Started MiniZooKeeperCluster and ran 'stat' on client port={}", clientPort);
return clientPort;
}
private String getServerConfigurationOnOneLine(ZooKeeperServer server) {
StringWriter sw = new StringWriter();
try (PrintWriter pw = new PrintWriter(sw) {
@Override public void println(int x) {
super.print(x);
super.print(", ");
}
@Override public void println(String x) {
super.print(x);
super.print(", ");
}
}) {
server.dumpConf(pw);
}
return sw.toString();
}
private void createDir(File dir) throws IOException {
try {
if (!dir.exists()) {
dir.mkdirs();
}
} catch (SecurityException e) {
throw new IOException("creating dir: " + dir, e);
}
}
/**
* @throws IOException if waiting for the shutdown of a server fails
*/
public void shutdown() throws IOException {
// shut down all the zk servers
for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
NIOServerCnxnFactory standaloneServerFactory = standaloneServerFactoryList.get(i);
int clientPort = clientPortList.get(i);
standaloneServerFactory.shutdown();
if (!waitForServerDown(clientPort, connectionTimeout)) {
throw new IOException("Waiting for shutdown of standalone server at port=" + clientPort +
", timeout=" + this.connectionTimeout);
}
}
standaloneServerFactoryList.clear();
for (ZooKeeperServer zkServer: zooKeeperServers) {
// Explicitly close ZKDatabase since ZookeeperServer does not close them
zkServer.getZKDatabase().close();
}
zooKeeperServers.clear();
// clear everything
if (started) {
started = false;
activeZKServerIndex = 0;
clientPortList.clear();
LOG.info("Shutdown MiniZK cluster with all ZK servers");
}
}
/**
* @return clientPort return clientPort if there is another ZK backup can run
* when killing the current active; return -1, if there is no backups.
* @throws IOException if waiting for the shutdown of a server fails
*/
public int killCurrentActiveZooKeeperServer() throws IOException, InterruptedException {
if (!started || activeZKServerIndex < 0) {
return -1;
}
// Shutdown the current active one
NIOServerCnxnFactory standaloneServerFactory =
standaloneServerFactoryList.get(activeZKServerIndex);
int clientPort = clientPortList.get(activeZKServerIndex);
standaloneServerFactory.shutdown();
if (!waitForServerDown(clientPort, connectionTimeout)) {
throw new IOException("Waiting for shutdown of standalone server");
}
zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
// remove the current active zk server
standaloneServerFactoryList.remove(activeZKServerIndex);
clientPortList.remove(activeZKServerIndex);
zooKeeperServers.remove(activeZKServerIndex);
LOG.info("Kill the current active ZK servers in the cluster on client port: {}", clientPort);
if (standaloneServerFactoryList.isEmpty()) {
// there is no backup servers;
return -1;
}
clientPort = clientPortList.get(activeZKServerIndex);
LOG.info("Activate a backup zk server in the cluster on client port: {}", clientPort);
// return the next back zk server's port
return clientPort;
}
/**
* Kill one back up ZK servers.
*
* @throws IOException if waiting for the shutdown of a server fails
*/
public void killOneBackupZooKeeperServer() throws IOException, InterruptedException {
if (!started || activeZKServerIndex < 0 || standaloneServerFactoryList.size() <= 1) {
return ;
}
int backupZKServerIndex = activeZKServerIndex+1;
// Shutdown the current active one
NIOServerCnxnFactory standaloneServerFactory =
standaloneServerFactoryList.get(backupZKServerIndex);
int clientPort = clientPortList.get(backupZKServerIndex);
standaloneServerFactory.shutdown();
if (!waitForServerDown(clientPort, connectionTimeout)) {
throw new IOException("Waiting for shutdown of standalone server");
}
zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
// remove this backup zk server
standaloneServerFactoryList.remove(backupZKServerIndex);
clientPortList.remove(backupZKServerIndex);
zooKeeperServers.remove(backupZKServerIndex);
LOG.info("Kill one backup ZK servers in the cluster on client port: {}", clientPort);
}
// XXX: From o.a.zk.t.ClientBase. We just dropped the check for ssl/secure.
private static boolean waitForServerDown(int port, long timeout) throws IOException {
long start = System.currentTimeMillis();
while (true) {
try {
send4LetterWord(HOST, port, "stat", false, (int)timeout);
} catch (IOException | X509Exception.SSLContextException e) {
return true;
}
if (System.currentTimeMillis() > start + timeout) {
break;
}
try {
Thread.sleep(TIMEOUT);
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
}
return false;
}
// XXX: From o.a.zk.t.ClientBase. Its in the test jar but we don't depend on zk test jar.
// We remove the SSL/secure bit. Not used in here.
private static boolean waitForServerUp(int port, long timeout) throws IOException {
long start = System.currentTimeMillis();
while (true) {
try {
String result = send4LetterWord(HOST, port, "stat", false, (int)timeout);
if (result.startsWith("Zookeeper version:") && !result.contains("READ-ONLY")) {
return true;
} else {
LOG.debug("Read {}", result);
}
} catch (ConnectException e) {
// ignore as this is expected, do not log stacktrace
LOG.info("{}:{} not up: {}", HOST, port, e.toString());
} catch (IOException | X509Exception.SSLContextException e) {
// ignore as this is expected
LOG.info("{}:{} not up", HOST, port, e);
}
if (System.currentTimeMillis() > start + timeout) {
break;
}
try {
Thread.sleep(TIMEOUT);
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
}
return false;
}
public int getClientPort() {
return activeZKServerIndex < 0 || activeZKServerIndex >= clientPortList.size() ? -1
: clientPortList.get(activeZKServerIndex);
}
/**
* @return Address for this cluster instance.
*/
public Address getAddress() {
return Address.fromParts(HOST, getClientPort());
}
List<ZooKeeperServer> getZooKeeperServers() {
return zooKeeperServers;
}
}