blob: 565e398d35c9c2870047891d1af8a31e13862c46 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.test;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.junit.Assert.assertTrue;
import com.google.common.base.Stopwatch;
import io.netty.buffer.ByteBufAllocator;
import java.io.File;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.metastore.InMemoryMetaStore;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.replication.Auditor;
import org.apache.bookkeeper.replication.AutoRecoveryMain;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.PortManager;
import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A class runs several bookie servers for testing.
*/
public abstract class BookKeeperClusterTestCase {
static final Logger LOG = LoggerFactory.getLogger(BookKeeperClusterTestCase.class);
@Rule
public final TestName runtime = new TestName();
@Rule
public final Timeout globalTimeout;
// Metadata service related variables
protected final ZooKeeperCluster zkUtil;
protected ZooKeeper zkc;
protected String metadataServiceUri;
// BookKeeper related variables
protected final List<File> tmpDirs = new LinkedList<File>();
protected final List<BookieServer> bs = new LinkedList<BookieServer>();
protected final List<ServerConfiguration> bsConfs = new LinkedList<ServerConfiguration>();
private final Map<BookieId, TestStatsProvider> bsLoggers = new HashMap<>();
protected int numBookies;
protected BookKeeperTestClient bkc;
protected boolean useUUIDasBookieId = true;
/*
* Loopback interface is set as the listening interface and allowloopback is
* set to true in this server config. So bookies in this test process would
* bind to loopback address.
*/
protected final ServerConfiguration baseConf = TestBKConfiguration.newServerConfiguration();
protected final ClientConfiguration baseClientConf = TestBKConfiguration.newClientConfiguration();
private final Map<BookieServer, AutoRecoveryMain> autoRecoveryProcesses = new HashMap<>();
private boolean isAutoRecoveryEnabled;
SynchronousQueue<Throwable> asyncExceptions = new SynchronousQueue<>();
protected void captureThrowable(Runnable c) {
try {
c.run();
} catch (Throwable e) {
LOG.error("Captured error: {}", e);
asyncExceptions.add(e);
}
}
public BookKeeperClusterTestCase(int numBookies) {
this(numBookies, 120);
}
public BookKeeperClusterTestCase(int numBookies, int testTimeoutSecs) {
this(numBookies, 1, testTimeoutSecs);
}
public BookKeeperClusterTestCase(int numBookies, int numOfZKNodes, int testTimeoutSecs) {
this.numBookies = numBookies;
this.globalTimeout = Timeout.seconds(testTimeoutSecs);
if (numOfZKNodes == 1) {
zkUtil = new ZooKeeperUtil();
} else {
try {
zkUtil = new ZooKeeperClusterUtil(numOfZKNodes);
} catch (IOException | KeeperException | InterruptedException e) {
throw new RuntimeException(e);
}
}
}
@Before
public void setUp() throws Exception {
setUp("/ledgers");
}
protected void setUp(String ledgersRootPath) throws Exception {
LOG.info("Setting up test {}", getClass());
InMemoryMetaStore.reset();
setMetastoreImplClass(baseConf);
setMetastoreImplClass(baseClientConf);
Stopwatch sw = Stopwatch.createStarted();
try {
// start zookeeper service
startZKCluster();
// start bookkeeper service
this.metadataServiceUri = getMetadataServiceUri(ledgersRootPath);
startBKCluster(metadataServiceUri);
LOG.info("Setup testcase {} @ metadata service {} in {} ms.",
runtime.getMethodName(), metadataServiceUri, sw.elapsed(TimeUnit.MILLISECONDS));
} catch (Exception e) {
LOG.error("Error setting up", e);
throw e;
}
}
protected String getMetadataServiceUri(String ledgersRootPath) {
return zkUtil.getMetadataServiceUri(ledgersRootPath);
}
@After
public void tearDown() throws Exception {
boolean failed = false;
for (Throwable e : asyncExceptions) {
LOG.error("Got async exception: {}", e);
failed = true;
}
assertTrue("Async failure", !failed);
Stopwatch sw = Stopwatch.createStarted();
LOG.info("TearDown");
Exception tearDownException = null;
// stop bookkeeper service
try {
stopBKCluster();
} catch (Exception e) {
LOG.error("Got Exception while trying to stop BKCluster", e);
tearDownException = e;
}
// stop zookeeper service
try {
stopZKCluster();
} catch (Exception e) {
LOG.error("Got Exception while trying to stop ZKCluster", e);
tearDownException = e;
}
// cleanup temp dirs
try {
cleanupTempDirs();
} catch (Exception e) {
LOG.error("Got Exception while trying to cleanupTempDirs", e);
tearDownException = e;
}
LOG.info("Tearing down test {} in {} ms.", runtime.getMethodName(), sw.elapsed(TimeUnit.MILLISECONDS));
if (tearDownException != null) {
throw tearDownException;
}
}
protected File createTempDir(String prefix, String suffix) throws IOException {
File dir = IOUtils.createTempDir(prefix, suffix);
tmpDirs.add(dir);
return dir;
}
/**
* Start zookeeper cluster.
*
* @throws Exception
*/
protected void startZKCluster() throws Exception {
zkUtil.startCluster();
zkc = zkUtil.getZooKeeperClient();
}
/**
* Stop zookeeper cluster.
*
* @throws Exception
*/
protected void stopZKCluster() throws Exception {
zkUtil.killCluster();
}
/**
* Start cluster. Also, starts the auto recovery process for each bookie, if
* isAutoRecoveryEnabled is true.
*
* @throws Exception
*/
protected void startBKCluster(String metadataServiceUri) throws Exception {
baseConf.setMetadataServiceUri(metadataServiceUri);
baseClientConf.setMetadataServiceUri(metadataServiceUri);
baseClientConf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
if (numBookies > 0) {
bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider());
}
// Create Bookie Servers (B1, B2, B3)
for (int i = 0; i < numBookies; i++) {
startNewBookie();
}
}
/**
* Stop cluster. Also, stops all the auto recovery processes for the bookie
* cluster, if isAutoRecoveryEnabled is true.
*
* @throws Exception
*/
protected void stopBKCluster() throws Exception {
if (bkc != null) {
bkc.close();
}
for (BookieServer server : bs) {
server.shutdown();
AutoRecoveryMain autoRecovery = autoRecoveryProcesses.get(server);
if (autoRecovery != null && isAutoRecoveryEnabled()) {
autoRecovery.shutdown();
LOG.debug("Shutdown auto recovery for bookieserver:"
+ server.getBookieId());
}
}
bs.clear();
bsLoggers.clear();
}
protected void cleanupTempDirs() throws Exception {
for (File f : tmpDirs) {
FileUtils.deleteDirectory(f);
}
}
protected ServerConfiguration newServerConfiguration() throws Exception {
File f = createTempDir("bookie", "test");
int port;
if (baseConf.isEnableLocalTransport() || !baseConf.getAllowEphemeralPorts()) {
port = PortManager.nextFreePort();
} else {
port = 0;
}
return newServerConfiguration(port, f, new File[] { f });
}
protected ClientConfiguration newClientConfiguration() {
return new ClientConfiguration(baseConf);
}
protected ServerConfiguration newServerConfiguration(int port, File journalDir, File[] ledgerDirs) {
ServerConfiguration conf = new ServerConfiguration(baseConf);
conf.setBookiePort(port);
conf.setJournalDirName(journalDir.getPath());
String[] ledgerDirNames = new String[ledgerDirs.length];
for (int i = 0; i < ledgerDirs.length; i++) {
ledgerDirNames[i] = ledgerDirs[i].getPath();
}
conf.setLedgerDirNames(ledgerDirNames);
conf.setEnableTaskExecutionStats(true);
conf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
return conf;
}
protected void stopAllBookies() throws Exception {
stopAllBookies(true);
}
protected void stopAllBookies(boolean shutdownClient) throws Exception {
for (BookieServer server : bs) {
server.shutdown();
}
bsConfs.clear();
bs.clear();
if (shutdownClient && bkc != null) {
bkc.close();
bkc = null;
}
}
protected void startAllBookies() throws Exception {
for (ServerConfiguration conf : bsConfs) {
bs.add(startBookie(conf));
}
}
protected String newMetadataServiceUri(String ledgersRootPath) {
return zkUtil.getMetadataServiceUri(ledgersRootPath);
}
protected String newMetadataServiceUri(String ledgersRootPath, String type) {
return zkUtil.getMetadataServiceUri(ledgersRootPath, type);
}
/**
* Get bookie address for bookie at index.
*/
public BookieId getBookie(int index) throws Exception {
if (bs.size() <= index || index < 0) {
throw new IllegalArgumentException("Invalid index, there are only " + bs.size()
+ " bookies. Asked for " + index);
}
return bs.get(index).getBookieId();
}
public BookieSocketAddress getBookieAddress(int index) throws Exception {
return bkc.getBookieAddressResolver().resolve(getBookie(index));
}
/**
* Get bookie configuration for bookie.
*/
public ServerConfiguration getBkConf(BookieId addr) throws Exception {
int bkIndex = 0;
for (BookieServer server : bs) {
if (server.getBookieId().equals(addr)) {
break;
}
++bkIndex;
}
if (bkIndex < bs.size()) {
return bsConfs.get(bkIndex);
}
return null;
}
/**
* Kill a bookie by its socket address. Also, stops the autorecovery process
* for the corresponding bookie server, if isAutoRecoveryEnabled is true.
*
* @param addr
* Socket Address
* @return the configuration of killed bookie
* @throws InterruptedException
*/
public ServerConfiguration killBookie(BookieId addr) throws Exception {
BookieServer toRemove = null;
int toRemoveIndex = 0;
for (BookieServer server : bs) {
if (server.getBookieId().equals(addr)) {
server.shutdown();
toRemove = server;
break;
}
++toRemoveIndex;
}
if (toRemove != null) {
stopAutoRecoveryService(toRemove);
bs.remove(toRemove);
bsLoggers.remove(addr);
return bsConfs.remove(toRemoveIndex);
}
return null;
}
/**
* Set the bookie identified by its socket address to readonly.
*
* @param addr
* Socket Address
* @throws InterruptedException
*/
public void setBookieToReadOnly(BookieId addr) throws InterruptedException, UnknownHostException {
for (BookieServer server : bs) {
if (server.getBookieId().equals(addr)) {
((BookieImpl) server.getBookie()).getStateManager().doTransitionToReadOnlyMode();
break;
}
}
}
/**
* Kill a bookie by index. Also, stops the respective auto recovery process
* for this bookie, if isAutoRecoveryEnabled is true.
*
* @param index
* Bookie Index
* @return the configuration of killed bookie
* @throws InterruptedException
* @throws IOException
*/
public ServerConfiguration killBookie(int index) throws Exception {
if (index >= bs.size()) {
throw new IOException("Bookie does not exist");
}
BookieServer server = bs.get(index);
server.shutdown();
stopAutoRecoveryService(server);
bs.remove(server);
bsLoggers.remove(server.getBookieId());
return bsConfs.remove(index);
}
/**
* Kill bookie by index and verify that it's stopped.
*
* @param index index of bookie to kill
*
* @return configuration of killed bookie
*/
public ServerConfiguration killBookieAndWaitForZK(int index) throws Exception {
if (index >= bs.size()) {
throw new IOException("Bookie does not exist");
}
BookieServer server = bs.get(index);
ServerConfiguration ret = killBookie(index);
while (zkc.exists(ZKMetadataDriverBase.resolveZkLedgersRootPath(baseConf) + "/" + AVAILABLE_NODE + "/"
+ server.getBookieId().toString(), false) != null) {
Thread.sleep(500);
}
return ret;
}
/**
* Sleep a bookie.
*
* @param addr
* Socket Address
* @param seconds
* Sleep seconds
* @return Count Down latch which will be counted down just after sleep begins
* @throws InterruptedException
* @throws IOException
*/
public CountDownLatch sleepBookie(BookieId addr, final int seconds)
throws Exception {
for (final BookieServer bookie : bs) {
if (bookie.getBookieId().equals(addr)) {
final CountDownLatch l = new CountDownLatch(1);
Thread sleeper = new Thread() {
@Override
public void run() {
try {
bookie.suspendProcessing();
LOG.info("bookie {} is asleep", bookie.getBookieId());
l.countDown();
Thread.sleep(seconds * 1000);
bookie.resumeProcessing();
LOG.info("bookie {} is awake", bookie.getBookieId());
} catch (Exception e) {
LOG.error("Error suspending bookie", e);
}
}
};
sleeper.start();
return l;
}
}
throw new IOException("Bookie not found");
}
/**
* Sleep a bookie until I count down the latch.
*
* @param addr
* Socket Address
* @param l
* Latch to wait on
* @throws InterruptedException
* @throws IOException
*/
public void sleepBookie(BookieId addr, final CountDownLatch l)
throws InterruptedException, IOException {
final CountDownLatch suspendLatch = new CountDownLatch(1);
sleepBookie(addr, l, suspendLatch);
suspendLatch.await();
}
public void sleepBookie(BookieId addr, final CountDownLatch l, final CountDownLatch suspendLatch)
throws InterruptedException, IOException {
for (final BookieServer bookie : bs) {
if (bookie.getBookieId().equals(addr)) {
LOG.info("Sleep bookie {}.", addr);
Thread sleeper = new Thread() {
@Override
public void run() {
try {
bookie.suspendProcessing();
if (null != suspendLatch) {
suspendLatch.countDown();
}
l.await();
bookie.resumeProcessing();
} catch (Exception e) {
LOG.error("Error suspending bookie", e);
}
}
};
sleeper.start();
return;
}
}
throw new IOException("Bookie not found");
}
/**
* Restart bookie servers. Also restarts all the respective auto recovery
* process, if isAutoRecoveryEnabled is true.
*
* @throws InterruptedException
* @throws IOException
* @throws KeeperException
* @throws BookieException
*/
public void restartBookies()
throws Exception {
restartBookies(null);
}
/**
* Restart a bookie. Also restart the respective auto recovery process,
* if isAutoRecoveryEnabled is true.
*
* @param addr
* @throws InterruptedException
* @throws IOException
* @throws KeeperException
* @throws BookieException
*/
public void restartBookie(BookieId addr) throws Exception {
BookieServer toRemove = null;
int toRemoveIndex = 0;
for (BookieServer server : bs) {
if (server.getBookieId().equals(addr)) {
toRemove = server;
break;
}
++toRemoveIndex;
}
if (toRemove != null) {
ServerConfiguration newConfig = bsConfs.get(toRemoveIndex);
killBookie(toRemoveIndex);
Thread.sleep(1000);
bs.add(startBookie(newConfig));
bsConfs.add(newConfig);
return;
}
throw new IOException("Bookie not found");
}
/**
* Restart bookie servers using new configuration settings. Also restart the
* respective auto recovery process, if isAutoRecoveryEnabled is true.
*
* @param newConf
* New Configuration Settings
* @throws InterruptedException
* @throws IOException
* @throws KeeperException
* @throws BookieException
*/
public void restartBookies(ServerConfiguration newConf)
throws Exception {
// shut down bookie server
for (BookieServer server : bs) {
server.shutdown();
stopAutoRecoveryService(server);
}
bs.clear();
bsLoggers.clear();
Thread.sleep(1000);
// restart them to ensure we can't
for (ServerConfiguration conf : bsConfs) {
String bookieId = conf.getBookieId();
// ensure the bookie id or port is loaded correctly
int port = conf.getBookiePort();
if (null != newConf) {
conf.loadConf(newConf);
}
if (bookieId != null) {
conf.setBookieId(bookieId);
} else {
conf.setBookiePort(port);
}
bs.add(startBookie(conf));
}
}
/**
* Helper method to startup a new bookie server with the indicated port
* number. Also, starts the auto recovery process, if the
* isAutoRecoveryEnabled is set true.
*
* @throws IOException
*/
public int startNewBookie()
throws Exception {
ServerConfiguration conf = newServerConfiguration();
// use a random BookieId
if (useUUIDasBookieId) {
conf.setBookieId(UUID.randomUUID().toString());
}
bsConfs.add(conf);
LOG.info("Starting new bookie on port: {}", conf.getBookiePort());
BookieServer server = startBookie(conf);
bs.add(server);
return server.getLocalAddress().getPort();
}
public BookieId startNewBookieAndReturnBookieId()
throws Exception {
ServerConfiguration conf = newServerConfiguration();
bsConfs.add(conf);
LOG.info("Starting new bookie on port: {}", conf.getBookiePort());
BookieServer server = startBookie(conf);
bs.add(server);
return server.getBookieId();
}
/**
* Helper method to startup a bookie server using a configuration object.
* Also, starts the auto recovery process if isAutoRecoveryEnabled is true.
*
* @param conf
* Server Configuration Object
*
*/
protected BookieServer startBookie(ServerConfiguration conf)
throws Exception {
TestStatsProvider provider = new TestStatsProvider();
BookieServer server = new BookieServer(conf, provider.getStatsLogger(""), null);
BookieId address = BookieImpl.getBookieId(conf);
bsLoggers.put(address, provider);
if (bkc == null) {
bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider());
}
Future<?> waitForBookie = conf.isForceReadOnlyBookie()
? bkc.waitForReadOnlyBookie(address)
: bkc.waitForWritableBookie(address);
server.start();
waitForBookie.get(30, TimeUnit.SECONDS);
LOG.info("New bookie '{}' has been created.", address);
try {
startAutoRecovery(server, conf);
} catch (CompatibilityException ce) {
LOG.error("Exception while starting AutoRecovery!", ce);
} catch (UnavailableException ue) {
LOG.error("Exception while starting AutoRecovery!", ue);
}
return server;
}
/**
* Start a bookie with the given bookie instance. Also, starts the auto
* recovery for this bookie, if isAutoRecoveryEnabled is true.
*/
protected BookieServer startBookie(ServerConfiguration conf, final Bookie b)
throws Exception {
TestStatsProvider provider = new TestStatsProvider();
BookieServer server = new BookieServer(conf, provider.getStatsLogger(""), null) {
@Override
protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator allocator,
Supplier<BookieServiceInfo> s) {
return b;
}
};
BookieId address = BookieImpl.getBookieId(conf);
if (bkc == null) {
bkc = new BookKeeperTestClient(baseClientConf, new TestStatsProvider());
}
Future<?> waitForBookie = conf.isForceReadOnlyBookie()
? bkc.waitForReadOnlyBookie(address)
: bkc.waitForWritableBookie(address);
server.start();
bsLoggers.put(server.getBookieId(), provider);
waitForBookie.get(30, TimeUnit.SECONDS);
LOG.info("New bookie '{}' has been created.", address);
try {
startAutoRecovery(server, conf);
} catch (CompatibilityException ce) {
LOG.error("Exception while starting AutoRecovery!", ce);
} catch (UnavailableException ue) {
LOG.error("Exception while starting AutoRecovery!", ue);
}
return server;
}
public void setMetastoreImplClass(AbstractConfiguration conf) {
conf.setMetastoreImplClass(InMemoryMetaStore.class.getName());
}
/**
* Flags used to enable/disable the auto recovery process. If it is enabled,
* starting the bookie server will starts the auto recovery process for that
* bookie. Also, stopping bookie will stops the respective auto recovery
* process.
*
* @param isAutoRecoveryEnabled
* Value true will enable the auto recovery process. Value false
* will disable the auto recovery process
*/
public void setAutoRecoveryEnabled(boolean isAutoRecoveryEnabled) {
this.isAutoRecoveryEnabled = isAutoRecoveryEnabled;
}
/**
* Flag used to check whether auto recovery process is enabled/disabled. By
* default the flag is false.
*
* @return true, if the auto recovery is enabled. Otherwise return false.
*/
public boolean isAutoRecoveryEnabled() {
return isAutoRecoveryEnabled;
}
private void startAutoRecovery(BookieServer bserver,
ServerConfiguration conf) throws Exception {
if (isAutoRecoveryEnabled()) {
AutoRecoveryMain autoRecoveryProcess = new AutoRecoveryMain(conf);
autoRecoveryProcess.start();
autoRecoveryProcesses.put(bserver, autoRecoveryProcess);
LOG.debug("Starting Auditor Recovery for the bookie:"
+ bserver.getBookieId());
}
}
private void stopAutoRecoveryService(BookieServer toRemove) throws Exception {
AutoRecoveryMain autoRecoveryMain = autoRecoveryProcesses
.remove(toRemove);
if (null != autoRecoveryMain && isAutoRecoveryEnabled()) {
autoRecoveryMain.shutdown();
LOG.debug("Shutdown auto recovery for bookieserver:"
+ toRemove.getBookieId());
}
}
/**
* Will starts the auto recovery process for the bookie servers. One auto
* recovery process per each bookie server, if isAutoRecoveryEnabled is
* enabled.
*/
public void startReplicationService() throws Exception {
int index = -1;
for (BookieServer bserver : bs) {
startAutoRecovery(bserver, bsConfs.get(++index));
}
}
/**
* Will stops all the auto recovery processes for the bookie cluster, if
* isAutoRecoveryEnabled is true.
*/
public void stopReplicationService() throws Exception{
if (!isAutoRecoveryEnabled()){
return;
}
for (Entry<BookieServer, AutoRecoveryMain> autoRecoveryProcess : autoRecoveryProcesses
.entrySet()) {
autoRecoveryProcess.getValue().shutdown();
LOG.debug("Shutdown Auditor Recovery for the bookie:"
+ autoRecoveryProcess.getKey().getBookieId());
}
}
public Auditor getAuditor(int timeout, TimeUnit unit) throws Exception {
final long timeoutAt = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, unit);
while (System.nanoTime() < timeoutAt) {
for (AutoRecoveryMain p : autoRecoveryProcesses.values()) {
Auditor a = p.getAuditor();
if (a != null) {
return a;
}
}
Thread.sleep(100);
}
throw new Exception("No auditor found");
}
/**
* Check whether the InetSocketAddress was created using a hostname or an IP
* address. Represent as 'hostname/IPaddress' if the InetSocketAddress was
* created using hostname. Represent as '/IPaddress' if the
* InetSocketAddress was created using an IPaddress
*
* @param bookieId id
* @return true if the address was created using an IP address, false if the
* address was created using a hostname
*/
public boolean isCreatedFromIp(BookieId bookieId) {
BookieSocketAddress addr = bkc.getBookieAddressResolver().resolve(bookieId);
return addr.getSocketAddress().toString().startsWith("/");
}
public void resetBookieOpLoggers() {
for (TestStatsProvider provider : bsLoggers.values()) {
provider.clear();
}
}
public TestStatsProvider getStatsProvider(BookieId addr) {
return bsLoggers.get(addr);
}
public TestStatsProvider getStatsProvider(int index) throws Exception {
return getStatsProvider(bs.get(index).getBookieId());
}
}