blob: d73932415bc68ad6a48c375975b93fe7c0bd57e9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.Reader;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
/**
* This is a version of the HBase ZK cluster cut out to be standalone
*/
public class MiniZooKeeperCluster {
private static final Logger LOG = LoggerFactory.getLogger(
MiniZooKeeperCluster.class);
private static final int TICK_TIME = 2000;
private static final int CONNECTION_TIMEOUT = 30000;
public static final int MAX_CLIENT_CONNECTIONS = 1000;
private boolean started;
/** The default port. If zero, we use a random port. */
private int defaultClientPort = 0;
private int clientPort;
private List<NIOServerCnxnFactory> standaloneServerFactoryList;
private List<ZooKeeperServer> zooKeeperServers;
private List<Integer> clientPortList;
private int activeZKServerIndex;
private int tickTime = 0;
private Configuration configuration;
public MiniZooKeeperCluster() {
this(new Configuration());
}
public MiniZooKeeperCluster(Configuration configuration) {
this.started = false;
this.configuration = configuration;
activeZKServerIndex = -1;
zooKeeperServers = new ArrayList<ZooKeeperServer>();
clientPortList = new ArrayList<Integer>();
standaloneServerFactoryList = new ArrayList<NIOServerCnxnFactory>();
}
public void setDefaultClientPort(int clientPort) {
if (clientPort <= 0) {
throw new IllegalArgumentException("Invalid default ZK client port: "
+ clientPort);
}
this.defaultClientPort = clientPort;
}
/**
* Selects a ZK client port. Returns the default port if specified.
* Otherwise, returns a random port. The random port is selected from the
* range between 49152 to 65535. These ports cannot be registered with IANA
* and are intended for dynamic allocation (see http://bit.ly/dynports).
*/
private int selectClientPort() {
if (defaultClientPort > 0) {
return defaultClientPort;
}
return 0xc000 + new Random().nextInt(0x3f00);
}
public void setTickTime(int tickTime) {
this.tickTime = tickTime;
}
public int getBackupZooKeeperServerNum() {
return zooKeeperServers.size() - 1;
}
public int getZooKeeperServerNum() {
return zooKeeperServers.size();
}
// / XXX: From o.a.zk.t.ClientBase
private static void setupTestEnv() {
// during the tests we run with 100K prealloc in the logs.
// on windows systems prealloc of 64M was seen to take ~15seconds
// resulting in test failure (client timeout on first session).
// set env and directly in order to handle static init/gc issues
System.setProperty("zookeeper.preAllocSize", "100");
FileTxnLog.setPreallocSize(100 * 1024);
}
public int startup(File baseDir) throws IOException, InterruptedException {
return startup(baseDir, 1);
}
/**
* @param baseDir
* @param numZooKeeperServers
* @return ClientPort server bound to, -1 if there was a
* binding problem and we couldn't pick another port.
* @throws IOException
* @throws InterruptedException
*/
public int startup(File baseDir, int numZooKeeperServers) throws IOException,
InterruptedException {
if (numZooKeeperServers <= 0)
return -1;
setupTestEnv();
shutdown();
int tentativePort = selectClientPort();
// running all the ZK servers
for (int i = 0; i < numZooKeeperServers; i++) {
File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile();
recreateDir(dir);
int tickTimeToUse;
if (this.tickTime > 0) {
tickTimeToUse = this.tickTime;
} else {
tickTimeToUse = TICK_TIME;
}
ZooKeeperServer server = new ZooKeeperServer(dir, dir, tickTimeToUse);
NIOServerCnxnFactory standaloneServerFactory;
while (true) {
try {
standaloneServerFactory = new NIOServerCnxnFactory();
standaloneServerFactory.configure(
new InetSocketAddress(tentativePort),
MAX_CLIENT_CONNECTIONS
);
} catch (BindException e) {
LOG.debug("Failed binding ZK Server to client port: " +
tentativePort, e);
// We're told to use some port but it's occupied, fail
if (defaultClientPort > 0) return -1;
// This port is already in use, try to use another.
tentativePort = selectClientPort();
continue;
}
break;
}
// Start up this ZK server
standaloneServerFactory.startup(server);
if (!waitForServerUp(tentativePort, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for startup of standalone server");
}
// We have selected this port as a client port.
clientPortList.add(tentativePort);
standaloneServerFactoryList.add(standaloneServerFactory);
zooKeeperServers.add(server);
tentativePort++; //for the next server
}
// set the first one to be active ZK; Others are backups
activeZKServerIndex = 0;
started = true;
clientPort = clientPortList.get(activeZKServerIndex);
LOG.info("Started MiniZK Cluster and connect 1 ZK server " +
"on client port: " + clientPort);
return clientPort;
}
private void recreateDir(File dir) throws IOException {
if (dir.exists()) {
if (!FileUtil.fullyDelete(dir)) {
throw new IOException("Could not delete zk base directory: " + dir);
}
}
try {
dir.mkdirs();
} catch (SecurityException e) {
throw new IOException("creating dir: " + dir, e);
}
}
/**
* @throws IOException
*/
public void shutdown() throws IOException {
if (!started) {
return;
}
// shut down all the zk servers
for (int i = 0; i < standaloneServerFactoryList.size(); i++) {
NIOServerCnxnFactory standaloneServerFactory =
standaloneServerFactoryList.get(i);
int clientPort = clientPortList.get(i);
standaloneServerFactory.shutdown();
if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for shutdown of standalone server");
}
}
for (ZooKeeperServer zkServer : zooKeeperServers) {
//explicitly close ZKDatabase since ZookeeperServer does not close them
zkServer.getZKDatabase().close();
}
// clear everything
started = false;
activeZKServerIndex = 0;
standaloneServerFactoryList.clear();
clientPortList.clear();
zooKeeperServers.clear();
LOG.info("Shutdown MiniZK cluster with all ZK servers");
}
/**@return clientPort return clientPort if there is another ZK backup can run
* when killing the current active; return -1, if there is no backups.
* @throws IOException
* @throws InterruptedException
*/
public int killCurrentActiveZooKeeperServer() throws IOException,
InterruptedException {
if (!started || activeZKServerIndex < 0) {
return -1;
}
// Shutdown the current active one
NIOServerCnxnFactory standaloneServerFactory =
standaloneServerFactoryList.get(activeZKServerIndex);
int clientPort = clientPortList.get(activeZKServerIndex);
standaloneServerFactory.shutdown();
if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for shutdown of standalone server");
}
zooKeeperServers.get(activeZKServerIndex).getZKDatabase().close();
// remove the current active zk server
standaloneServerFactoryList.remove(activeZKServerIndex);
clientPortList.remove(activeZKServerIndex);
zooKeeperServers.remove(activeZKServerIndex);
LOG.info("Kill the current active ZK servers in the cluster " +
"on client port: " + clientPort);
if (standaloneServerFactoryList.size() == 0) {
// there is no backup servers;
return -1;
}
clientPort = clientPortList.get(activeZKServerIndex);
LOG.info("Activate a backup zk server in the cluster " +
"on client port: " + clientPort);
// return the next back zk server's port
return clientPort;
}
/**
* Kill one back up ZK servers
* @throws IOException
* @throws InterruptedException
*/
public void killOneBackupZooKeeperServer() throws IOException,
InterruptedException {
if (!started || activeZKServerIndex < 0 ||
standaloneServerFactoryList.size() <= 1) {
return;
}
int backupZKServerIndex = activeZKServerIndex + 1;
// Shutdown the current active one
NIOServerCnxnFactory standaloneServerFactory =
standaloneServerFactoryList.get(backupZKServerIndex);
int clientPort = clientPortList.get(backupZKServerIndex);
standaloneServerFactory.shutdown();
if (!waitForServerDown(clientPort, CONNECTION_TIMEOUT)) {
throw new IOException("Waiting for shutdown of standalone server");
}
zooKeeperServers.get(backupZKServerIndex).getZKDatabase().close();
// remove this backup zk server
standaloneServerFactoryList.remove(backupZKServerIndex);
clientPortList.remove(backupZKServerIndex);
zooKeeperServers.remove(backupZKServerIndex);
LOG.info("Kill one backup ZK servers in the cluster " +
"on client port: " + clientPort);
}
// XXX: From o.a.zk.t.ClientBase
private static boolean waitForServerDown(int port, long timeout) {
long start = System.currentTimeMillis();
while (true) {
try {
Socket sock = new Socket("localhost", port);
try {
OutputStream outstream = sock.getOutputStream();
outstream.write("stat".getBytes());
outstream.flush();
} finally {
sock.close();
}
} catch (IOException e) {
return true;
}
if (System.currentTimeMillis() > start + timeout) {
break;
}
try {
Thread.sleep(250);
} catch (InterruptedException e) {
// ignore
}
}
return false;
}
// XXX: From o.a.zk.t.ClientBase
private static boolean waitForServerUp(int port, long timeout) {
long start = System.currentTimeMillis();
while (true) {
try {
Socket sock = new Socket("localhost", port);
BufferedReader reader = null;
try {
OutputStream outstream = sock.getOutputStream();
outstream.write("stat".getBytes());
outstream.flush();
Reader isr = new InputStreamReader(sock.getInputStream());
reader = new BufferedReader(isr);
String line = reader.readLine();
if (line != null && line.startsWith("Zookeeper version:")) {
return true;
}
} finally {
sock.close();
if (reader != null) {
reader.close();
}
}
} catch (IOException e) {
// ignore as this is expected
LOG.info("server localhost:" + port + " not up " + e);
}
if (System.currentTimeMillis() > start + timeout) {
break;
}
try {
Thread.sleep(250);
} catch (InterruptedException e) {
// ignore
}
}
return false;
}
public int getClientPort() {
return clientPort;
}
}