blob: 0e87e015e16eef9a283594810e0628791cb3c497 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.util;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
import com.google.common.collect.Lists;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.common.component.ComponentInfoPublisher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.BookieServiceInfo.Endpoint;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.server.conf.BookieConfiguration;
import org.apache.bookkeeper.server.service.BookieService;
import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
import org.apache.bookkeeper.shims.zk.ZooKeeperServerShimFactory;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.ZooDefs.Ids;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Local Bookkeeper.
*/
public class LocalBookKeeper {
protected static final Logger LOG = LoggerFactory.getLogger(LocalBookKeeper.class);
public static final int CONNECTION_TIMEOUT = 30000;
private static String newMetadataServiceUri(String zkServers, int port, String layout, String ledgerPath) {
return "zk+" + layout + "://" + zkServers + ":" + port + ledgerPath;
}
int numberOfBookies;
public LocalBookKeeper() {
this(3);
}
public LocalBookKeeper(int numberOfBookies) {
this(numberOfBookies, 5000, new ServerConfiguration(), defaultLocalBookiesConfigDir);
}
public LocalBookKeeper(
int numberOfBookies,
int initialPort,
ServerConfiguration baseConf,
String localBookiesConfigDirName) {
this.numberOfBookies = numberOfBookies;
this.initialPort = initialPort;
this.localBookiesConfigDir = new File(localBookiesConfigDirName);
this.baseConf = baseConf;
LOG.info("Running {} bookie(s) on zk ensemble = '{}:{}'.", this.numberOfBookies,
zooKeeperDefaultHost, zooKeeperDefaultPort);
}
private static String zooKeeperDefaultHost = "127.0.0.1";
private static int zooKeeperDefaultPort = 2181;
private static int zkSessionTimeOut = 5000;
private static Integer bookieDefaultInitialPort = 5000;
private static String defaultLocalBookiesConfigDir = "/tmp/localbookies-config";
//BookKeeper variables
File[] journalDirs;
BookieServer[] bs;
ServerConfiguration[] bsConfs;
Integer initialPort = 5000;
private ServerConfiguration baseConf;
File localBookiesConfigDir;
/**
* @param maxCC
* Max Concurrency of Client
* @param zookeeperPort
* ZooKeeper Server Port
*/
public static ZooKeeperServerShim runZookeeper(int maxCC, int zookeeperPort) throws IOException {
File zkTmpDir = IOUtils.createTempDir("zookeeper", "localbookkeeper");
return runZookeeper(maxCC, zookeeperPort, zkTmpDir);
}
public static ZooKeeperServerShim runZookeeper(int maxCC, int zookeeperPort, File zkDir) throws IOException {
LOG.info("Starting ZK server");
ZooKeeperServerShim server = ZooKeeperServerShimFactory.createServer(zkDir, zkDir, zookeeperPort, maxCC);
server.start();
boolean b = waitForServerUp(InetAddress.getLoopbackAddress().getHostAddress() + ":" + zookeeperPort,
CONNECTION_TIMEOUT);
if (LOG.isDebugEnabled()) {
LOG.debug("ZooKeeper server up: {}", b);
}
return server;
}
@SuppressWarnings("deprecation")
private void initializeZookeeper(String zkHost, int zkPort) throws IOException {
LOG.info("Instantiate ZK Client");
//initialize the zk client with values
try (ZooKeeperClient zkc = ZooKeeperClient.newBuilder()
.connectString(zkHost + ":" + zkPort)
.sessionTimeoutMs(zkSessionTimeOut)
.build()) {
String zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseConf);
ZkUtils.createFullPathOptimistic(zkc, zkLedgersRootPath, new byte[0], Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
List<Op> multiOps = Lists.newArrayListWithExpectedSize(2);
multiOps.add(
Op.create(zkLedgersRootPath + "/" + AVAILABLE_NODE,
new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
multiOps.add(
Op.create(zkLedgersRootPath + "/" + AVAILABLE_NODE + "/" + READONLY,
new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
zkc.multi(multiOps);
// No need to create an entry for each requested bookie anymore as the
// BookieServers will register themselves with ZooKeeper on startup.
} catch (KeeperException e) {
LOG.error("Exception while creating znodes", e);
throw new IOException("Error creating znodes : ", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while creating znodes", e);
throw new IOException("Error creating znodes : ", e);
}
}
private static void cleanupDirectories(List<File> dirs) throws IOException {
for (File dir : dirs) {
FileUtils.deleteDirectory(dir);
}
}
private List<File> runBookies(String dirSuffix)
throws Exception {
List<File> tempDirs = new ArrayList<>();
try {
runBookies(tempDirs, dirSuffix);
return tempDirs;
} catch (Exception ioe) {
cleanupDirectories(tempDirs);
throw ioe;
}
}
@SuppressWarnings("deprecation")
private void runBookies(List<File> tempDirs, String dirSuffix)
throws Exception {
LOG.info("Starting Bookie(s)");
// Create Bookie Servers (B1, B2, B3)
journalDirs = new File[numberOfBookies];
bs = new BookieServer[numberOfBookies];
bsConfs = new ServerConfiguration[numberOfBookies];
if (localBookiesConfigDir.exists() && localBookiesConfigDir.isFile()) {
throw new IOException("Unable to create LocalBookiesConfigDir, since there is a file at "
+ localBookiesConfigDir.getAbsolutePath());
}
if (!localBookiesConfigDir.exists() && !localBookiesConfigDir.mkdirs()) {
throw new IOException(
"Unable to create LocalBookiesConfigDir - " + localBookiesConfigDir.getAbsolutePath());
}
for (int i = 0; i < numberOfBookies; i++) {
if (null == baseConf.getJournalDirNameWithoutDefault()) {
journalDirs[i] = IOUtils.createTempDir("localbookkeeper" + Integer.toString(i), dirSuffix);
tempDirs.add(journalDirs[i]);
} else {
journalDirs[i] = new File(baseConf.getJournalDirName(), "bookie" + Integer.toString(i));
}
if (journalDirs[i].exists()) {
if (journalDirs[i].isDirectory()) {
FileUtils.deleteDirectory(journalDirs[i]);
} else if (!journalDirs[i].delete()) {
throw new IOException("Couldn't cleanup bookie journal dir " + journalDirs[i]);
}
}
if (!journalDirs[i].mkdirs()) {
throw new IOException("Couldn't create bookie journal dir " + journalDirs[i]);
}
String [] ledgerDirs = baseConf.getLedgerDirWithoutDefault();
if ((null == ledgerDirs) || (0 == ledgerDirs.length)) {
ledgerDirs = new String[] { journalDirs[i].getPath() };
} else {
for (int l = 0; l < ledgerDirs.length; l++) {
File dir = new File(ledgerDirs[l], "bookie" + Integer.toString(i));
if (dir.exists()) {
if (dir.isDirectory()) {
FileUtils.deleteDirectory(dir);
} else if (!dir.delete()) {
throw new IOException("Couldn't cleanup bookie ledger dir " + dir);
}
}
if (!dir.mkdirs()) {
throw new IOException("Couldn't create bookie ledger dir " + dir);
}
ledgerDirs[l] = dir.getPath();
}
}
bsConfs[i] = new ServerConfiguration((ServerConfiguration) baseConf.clone());
// If the caller specified ephemeral ports then use ephemeral ports for all
// the bookies else use numBookie ports starting at initialPort
PortManager.initPort(initialPort);
if (0 == initialPort) {
bsConfs[i].setBookiePort(0);
} else {
bsConfs[i].setBookiePort(PortManager.nextFreePort());
}
if (null == baseConf.getMetadataServiceUriUnchecked()) {
bsConfs[i].setMetadataServiceUri(baseConf.getMetadataServiceUri());
}
bsConfs[i].setJournalDirName(journalDirs[i].getPath());
bsConfs[i].setLedgerDirNames(ledgerDirs);
// write config into file before start so we can know what's wrong if start failed
String fileName = BookieImpl.getBookieId(bsConfs[i]).toString() + ".conf";
serializeLocalBookieConfig(bsConfs[i], fileName);
// Mimic BookKeeper Main
final ComponentInfoPublisher componentInfoPublisher = new ComponentInfoPublisher();
final Supplier<BookieServiceInfo> bookieServiceInfoProvider =
() -> buildBookieServiceInfo(componentInfoPublisher);
BookieService bookieService = new BookieService(new BookieConfiguration(bsConfs[i]),
NullStatsLogger.INSTANCE,
bookieServiceInfoProvider
);
bs[i] = bookieService.getServer();
bookieService.publishInfo(componentInfoPublisher);
componentInfoPublisher.startupFinished();
bookieService.start();
}
/*
* baseconf.conf is needed because for executing any BookieShell command
* of Metadata/Zookeeper Operation nature we need a valid conf file
* having correct zk details and this could be used for running any such
* bookieshell commands if bookieid is not provided as parameter to
* bookkeeper shell operation. for eg:
* "./bookkeeper shell localbookie listbookies -rw". But for execution
* shell command of bookie Operation nature we need to provide bookieid,
* for eg "./bookkeeper shell -localbookie 10.3.27.190:5000 lastmark",
* so this shell command would use '10.3.27.190:5000.conf' file
*/
ServerConfiguration baseConfWithCorrectZKServers = new ServerConfiguration(
(ServerConfiguration) baseConf.clone());
if (null == baseConf.getMetadataServiceUriUnchecked()) {
baseConfWithCorrectZKServers.setMetadataServiceUri(baseConf.getMetadataServiceUri());
}
serializeLocalBookieConfig(baseConfWithCorrectZKServers, "baseconf.conf");
}
public static void startLocalBookies(String zkHost,
int zkPort,
int numBookies,
boolean shouldStartZK,
int initialBookiePort)
throws Exception {
ServerConfiguration conf = new ServerConfiguration();
startLocalBookiesInternal(
conf, zkHost, zkPort, numBookies, shouldStartZK,
initialBookiePort, true, "test", null, defaultLocalBookiesConfigDir);
}
public static void startLocalBookies(String zkHost,
int zkPort,
int numBookies,
boolean shouldStartZK,
int initialBookiePort,
ServerConfiguration conf)
throws Exception {
startLocalBookiesInternal(
conf, zkHost, zkPort, numBookies, shouldStartZK,
initialBookiePort, true, "test", null, defaultLocalBookiesConfigDir);
}
public static void startLocalBookies(String zkHost,
int zkPort,
int numBookies,
boolean shouldStartZK,
int initialBookiePort,
String dirSuffix)
throws Exception {
ServerConfiguration conf = new ServerConfiguration();
startLocalBookiesInternal(
conf, zkHost, zkPort, numBookies, shouldStartZK,
initialBookiePort, true, dirSuffix, null, defaultLocalBookiesConfigDir);
}
@SuppressWarnings("deprecation")
static void startLocalBookiesInternal(ServerConfiguration conf,
String zkHost,
int zkPort,
int numBookies,
boolean shouldStartZK,
int initialBookiePort,
boolean stopOnExit,
String dirSuffix,
String zkDataDir,
String localBookiesConfigDirName)
throws Exception {
conf.setMetadataServiceUri(
newMetadataServiceUri(
zkHost,
zkPort,
conf.getLedgerManagerLayoutStringFromFactoryClass(),
conf.getZkLedgersRootPath()));
LocalBookKeeper lb = new LocalBookKeeper(numBookies, initialBookiePort, conf, localBookiesConfigDirName);
ZooKeeperServerShim zks = null;
File zkTmpDir = null;
List<File> bkTmpDirs = null;
try {
if (shouldStartZK) {
File zkDataDirFile = null;
if (zkDataDir != null) {
zkDataDirFile = new File(zkDataDir);
if (zkDataDirFile.exists() && zkDataDirFile.isFile()) {
throw new IOException("Unable to create zkDataDir, since there is a file at "
+ zkDataDirFile.getAbsolutePath());
}
if (!zkDataDirFile.exists() && !zkDataDirFile.mkdirs()) {
throw new IOException("Unable to create zkDataDir - " + zkDataDirFile.getAbsolutePath());
}
}
zkTmpDir = IOUtils.createTempDir("zookeeper", dirSuffix, zkDataDirFile);
zkTmpDir.deleteOnExit();
zks = LocalBookKeeper.runZookeeper(1000, zkPort, zkTmpDir);
}
lb.initializeZookeeper(zkHost, zkPort);
bkTmpDirs = lb.runBookies(dirSuffix);
try {
while (true) {
Thread.sleep(5000);
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
if (stopOnExit) {
lb.shutdownBookies();
if (null != zks) {
zks.stop();
}
}
throw ie;
}
} catch (Exception e) {
LOG.error("Failed to run {} bookies : zk ensemble = '{}:{}'",
numBookies, zkHost, zkPort, e);
throw e;
} finally {
if (stopOnExit) {
if (null != bkTmpDirs) {
cleanupDirectories(bkTmpDirs);
}
if (null != zkTmpDir) {
FileUtils.deleteDirectory(zkTmpDir);
}
}
}
}
/**
* Serializes the config object to the specified file in localBookiesConfigDir.
*
* @param localBookieConfig
* config object which has to be serialized
* @param fileName
* name of the file
* @throws IOException
*/
private void serializeLocalBookieConfig(ServerConfiguration localBookieConfig, String fileName) throws IOException {
File localBookieConfFile = new File(localBookiesConfigDir, fileName);
if (localBookieConfFile.exists() && !localBookieConfFile.delete()) {
throw new IOException(
"Unable to delete the existing LocalBookieConfigFile - " + localBookieConfFile.getAbsolutePath());
}
if (!localBookieConfFile.createNewFile()) {
throw new IOException("Unable to create new File - " + localBookieConfFile.getAbsolutePath());
}
Iterator<String> keys = localBookieConfig.getKeys();
try (PrintWriter writer = new PrintWriter(localBookieConfFile, "UTF-8")) {
while (keys.hasNext()) {
String key = keys.next();
String[] values = localBookieConfig.getStringArray(key);
StringBuilder concatenatedValue = new StringBuilder(values[0]);
for (int i = 1; i < values.length; i++) {
concatenatedValue.append(",").append(values[i]);
}
writer.println(key + "=" + concatenatedValue.toString());
}
}
}
public static void main(String[] args) {
try {
if (args.length < 1) {
usage();
System.exit(-1);
}
int numBookies = 0;
try {
numBookies = Integer.parseInt(args[0]);
} catch (NumberFormatException nfe) {
LOG.error("Unrecognized number-of-bookies: {}", args[0]);
usage();
System.exit(-1);
}
ServerConfiguration conf = new ServerConfiguration();
conf.setAllowLoopback(true);
if (args.length >= 2) {
String confFile = args[1];
try {
conf.loadConf(new File(confFile).toURI().toURL());
LOG.info("Using configuration file {}", confFile);
} catch (Exception e) {
// load conf failed
LOG.warn("Error loading configuration file {}", confFile, e);
}
}
String zkDataDir = null;
if (args.length >= 3) {
zkDataDir = args[2];
}
String localBookiesConfigDirName = defaultLocalBookiesConfigDir;
if (args.length >= 4) {
localBookiesConfigDirName = args[3];
}
startLocalBookiesInternal(conf, zooKeeperDefaultHost, zooKeeperDefaultPort, numBookies, true,
bookieDefaultInitialPort, false, "test", zkDataDir, localBookiesConfigDirName);
} catch (Exception e) {
LOG.error("Exiting LocalBookKeeper because of exception in main method", e);
/*
* This is needed because, some non-daemon thread (probably in ZK or
* some other dependent service) is preventing the JVM from exiting, though
* there is exception in main thread.
*/
System.exit(-1);
}
}
private static void usage() {
System.err.println(
"Usage: LocalBookKeeper number-of-bookies [path to bookie config] "
+ "[path to create ZK data directory at] [path to LocalBookiesConfigDir]");
}
public static boolean waitForServerUp(String hp, long timeout) {
long start = System.currentTimeMillis();
String[] split = hp.split(":");
String host = split[0];
int port = Integer.parseInt(split[1]);
while (true) {
try {
Socket sock = new Socket(host, port);
BufferedReader reader = null;
try {
OutputStream outstream = sock.getOutputStream();
outstream.write("stat".getBytes(UTF_8));
outstream.flush();
reader =
new BufferedReader(
new InputStreamReader(sock.getInputStream(), UTF_8));
String line = reader.readLine();
if (line != null && line.startsWith("Zookeeper version:")) {
LOG.info("Server UP");
return true;
}
} finally {
sock.close();
if (reader != null) {
reader.close();
}
}
} catch (IOException e) {
// ignore as this is expected
LOG.info("server " + hp + " not up " + e);
}
if (System.currentTimeMillis() > start + timeout) {
break;
}
try {
Thread.sleep(250);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// ignore
}
}
return false;
}
public void shutdownBookies() {
for (BookieServer bookieServer: bs) {
bookieServer.shutdown();
}
}
/**
* Create the {@link BookieServiceInfo} starting from the published endpoints.
*
* @see ComponentInfoPublisher
* @param componentInfoPublisher the endpoint publisher
* @return the created bookie service info
*/
private static BookieServiceInfo buildBookieServiceInfo(ComponentInfoPublisher componentInfoPublisher) {
List<Endpoint> endpoints = componentInfoPublisher.getEndpoints().values()
.stream().map(e -> {
return new Endpoint(
e.getId(),
e.getPort(),
e.getHost(),
e.getProtocol(),
e.getAuth(),
e.getExtensions()
);
}).collect(Collectors.toList());
return new BookieServiceInfo(componentInfoPublisher.getProperties(), endpoints);
}
}