blob: 37cbb841ad1ead8dfe935872e4f359eda949b4dc [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* This file is derived from LocalBookkeeperEnsemble from Apache BookKeeper
* http://bookkeeper.apache.org
*/
package org.apache.pulsar.zookeeper;
import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_STREAM_CONF;
import static org.apache.commons.io.FileUtils.cleanDirectory;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.bookkeeper.bookie.BookieException.InvalidCookieException;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage;
import org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.bookkeeper.clients.admin.StorageAdminClient;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.exceptions.NamespaceExistsException;
import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.bookkeeper.common.allocator.PoolingPolicy;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.Backoff;
import org.apache.bookkeeper.common.util.Backoff.Jitter.Type;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.server.conf.BookieConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.bookkeeper.stream.proto.NamespaceProperties;
import org.apache.bookkeeper.stream.server.StreamStorageLifecycleComponent;
import org.apache.bookkeeper.stream.storage.api.cluster.ClusterInitializer;
import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterInitializer;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.io.FileUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.DatadirCleanupManager;
import org.apache.zookeeper.server.NIOServerCnxnFactory;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LocalBookkeeperEnsemble {
protected static final Logger LOG = LoggerFactory.getLogger(LocalBookkeeperEnsemble.class);
public static final int CONNECTION_TIMEOUT = 30000;
int numberOfBookies;
private final boolean clearOldData;
private static class BasePortManager implements Supplier<Integer> {
private int port;
public BasePortManager(int basePort) {
this.port = basePort;
}
@Override
public synchronized Integer get() {
return port++;
}
}
private final Supplier<Integer> portManager;
public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, Supplier<Integer> portManager) {
this(numberOfBookies, zkPort, 4181, null, null, true, null, portManager);
}
public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName,
String bkDataDirName, boolean clearOldData) {
this(numberOfBookies, zkPort, bkBasePort, 4181, zkDataDirName, bkDataDirName, clearOldData, null);
}
public LocalBookkeeperEnsemble(int numberOfBookies, int zkPort, int bkBasePort, String zkDataDirName,
String bkDataDirName, boolean clearOldData, String advertisedAddress) {
this(numberOfBookies, zkPort, bkBasePort, 4181, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress);
}
public LocalBookkeeperEnsemble(int numberOfBookies,
int zkPort,
int bkBasePort,
int streamStoragePort,
String zkDataDirName,
String bkDataDirName,
boolean clearOldData,
String advertisedAddress) {
this(numberOfBookies, zkPort, streamStoragePort, zkDataDirName, bkDataDirName, clearOldData, advertisedAddress,
new BasePortManager(bkBasePort));
}
public LocalBookkeeperEnsemble(int numberOfBookies,
int zkPort,
int streamStoragePort,
String zkDataDirName,
String bkDataDirName,
boolean clearOldData,
String advertisedAddress,
Supplier<Integer> portManager) {
this.numberOfBookies = numberOfBookies;
this.portManager = portManager;
this.streamStoragePort = streamStoragePort;
this.zkDataDirName = zkDataDirName;
this.bkDataDirName = bkDataDirName;
this.clearOldData = clearOldData;
this.zkPort = zkPort;
this.advertisedAddress = null == advertisedAddress ? "127.0.0.1" : advertisedAddress;
LOG.info("Running {} bookie(s) and advertised them at {}.", this.numberOfBookies, this.advertisedAddress);
}
private String hostPort;
private final String advertisedAddress;
private int zkPort;
NIOServerCnxnFactory serverFactory;
ZooKeeperServer zks;
DatadirCleanupManager zkDataCleanupManager;
ZooKeeper zkc;
static int zkSessionTimeOut = 5000;
String zkDataDirName;
// BookKeeper variables
String bkDataDirName;
BookieServer bs[];
ServerConfiguration bsConfs[];
// Stream/Table Storage
StreamStorageLifecycleComponent streamStorage;
Integer streamStoragePort = 4181;
// directories created by this instance
// it is safe to drop them on stop
List<File> temporaryDirectories = new ArrayList<>();
private File createTempDirectory(String seed) throws IOException {
File res = Files.createTempDirectory(seed).toFile();
temporaryDirectories.add(res);
return res;
}
private void runZookeeper(int maxCC) throws IOException {
// create a ZooKeeper server(dataDir, dataLogDir, port)
LOG.info("Starting ZK server");
// ServerStats.registerAsConcrete();
// ClientBase.setupTestEnv();
File zkDataDir = isNotBlank(zkDataDirName) ? Files.createDirectories(Paths.get(zkDataDirName)).toFile()
: createTempDirectory("zktest");
if (this.clearOldData) {
cleanDirectory(zkDataDir);
}
try {
// Allow all commands on ZK control port
System.setProperty("zookeeper.4lw.commands.whitelist", "*");
zks = new ZooKeeperServer(zkDataDir, zkDataDir, ZooKeeperServer.DEFAULT_TICK_TIME);
serverFactory = new NIOServerCnxnFactory();
serverFactory.configure(new InetSocketAddress(zkPort), maxCC);
serverFactory.startup(zks);
zkDataCleanupManager = new DatadirCleanupManager(zkDataDir, zkDataDir, 3, 1 /* hour */);
zkDataCleanupManager.start();
} catch (Exception e) {
LOG.error("Exception while instantiating ZooKeeper", e);
if (serverFactory != null) {
serverFactory.shutdown();
}
throw new IOException(e);
}
this.zkPort = serverFactory.getLocalPort();
this.hostPort = "127.0.0.1:" + zkPort;
boolean b = waitForServerUp(hostPort, CONNECTION_TIMEOUT);
LOG.info("ZooKeeper server up: {}", b);
LOG.debug("Local ZK started (port: {}, data_directory: {})", zkPort, zkDataDir.getAbsolutePath());
}
public void disconnectZookeeper(ZooKeeper zooKeeper) {
ServerCnxn serverCnxn = getZookeeperServerConnection(zooKeeper);
try {
LOG.info("disconnect ZK server side connection {}", serverCnxn);
Class disconnectReasonClass = Class.forName("org.apache.zookeeper.server.ServerCnxn$DisconnectReason");
Method method = serverCnxn.getClass().getMethod("close", disconnectReasonClass);
method.invoke(serverCnxn, Stream.of(disconnectReasonClass.getEnumConstants()).filter(s ->
s.toString().equals("CONNECTION_CLOSE_FORCED")).findFirst().get());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
public ServerCnxn getZookeeperServerConnection(ZooKeeper zooKeeper) {
return StreamSupport.stream(serverFactory.getConnections().spliterator(), false)
.filter((cnxn) -> cnxn.getSessionId() == zooKeeper.getSessionId())
.findFirst()
.orElse(null);
}
private void initializeZookeper() throws IOException {
LOG.info("Instantiate ZK Client");
// initialize the zk client with values
try {
ZKConnectionWatcher zkConnectionWatcher = new ZKConnectionWatcher();
zkc = new ZooKeeper(hostPort, zkSessionTimeOut, zkConnectionWatcher);
zkConnectionWatcher.waitForConnection();
if (zkc.exists("/ledgers", false) == null) {
zkc.create("/ledgers", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
if (zkc.exists("/ledgers/available", false) == null) {
zkc.create("/ledgers/available", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
if (zkc.exists("/ledgers/available/readonly", false) == null) {
zkc.create("/ledgers/available/readonly", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
if (zkc.exists("/bookies", false) == null) {
zkc.create("/bookies", "{}".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
}
// No need to create an entry for each requested bookie anymore as the
// BookieServers will register themselves with ZooKeeper on startup.
} catch (KeeperException e) {
LOG.error("Exception while creating znodes", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Interrupted while creating znodes", e);
}
}
private void runBookies(ServerConfiguration baseConf) throws Exception {
LOG.info("Starting Bookie(s)");
// Create Bookie Servers (B1, B2, B3)
bs = new BookieServer[numberOfBookies];
bsConfs = new ServerConfiguration[numberOfBookies];
for (int i = 0; i < numberOfBookies; i++) {
File bkDataDir = isNotBlank(bkDataDirName)
? Files.createDirectories(Paths.get(bkDataDirName + i)).toFile()
: createTempDirectory("bk" + Integer.toString(i) + "test");
if (this.clearOldData) {
cleanDirectory(bkDataDir);
}
int bookiePort = portManager.get();
// Ensure registration Z-nodes are cleared when standalone service is restarted ungracefully
String registrationZnode = String.format("/ledgers/available/%s:%d",
baseConf.getAdvertisedAddress(), bookiePort);
if (zkc.exists(registrationZnode, null) != null) {
try {
zkc.delete(registrationZnode, -1);
} catch (NoNodeException nne) {
// Ignore if z-node was just expired
}
}
bsConfs[i] = new ServerConfiguration(baseConf);
// override settings
bsConfs[i].setBookiePort(bookiePort);
bsConfs[i].setZkServers("127.0.0.1:" + zkPort);
bsConfs[i].setJournalDirName(bkDataDir.getPath());
bsConfs[i].setLedgerDirNames(new String[] { bkDataDir.getPath() });
bsConfs[i].setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
bsConfs[i].setAllowEphemeralPorts(true);
try {
bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, null);
} catch (InvalidCookieException e) {
// InvalidCookieException can happen if the machine IP has changed
// Since we are running here a local bookie that is always accessed
// from localhost, we can ignore the error
for (String path : zkc.getChildren("/ledgers/cookies", false)) {
zkc.delete("/ledgers/cookies/" + path, -1);
}
// Also clean the on-disk cookie
new File(new File(bkDataDir, "current"), "VERSION").delete();
// Retry to start the bookie after cleaning the old left cookie
bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, null);
}
bs[i].start();
LOG.debug("Local BK[{}] started (port: {}, data_directory: {})", i, bookiePort,
bkDataDir.getAbsolutePath());
}
}
public void runStreamStorage(CompositeConfiguration conf) throws Exception {
String zkServers = "127.0.0.1:" + zkPort;
String metadataServiceUriStr = "zk://" + zkServers + "/ledgers";
URI metadataServiceUri = URI.create(metadataServiceUriStr);
// zookeeper servers
conf.setProperty("metadataServiceUri", metadataServiceUriStr);
// dlog settings
conf.setProperty("dlog.bkcEnsembleSize", 1);
conf.setProperty("dlog.bkcWriteQuorumSize", 1);
conf.setProperty("dlog.bkcAckQuorumSize", 1);
// stream storage port
conf.setProperty("storageserver.grpc.port", streamStoragePort);
// storage server settings
conf.setProperty("storage.range.store.dirs", bkDataDirName + "/ranges/data");
// initialize the stream storage metadata
ClusterInitializer initializer = new ZkClusterInitializer(zkServers);
initializer.initializeCluster(metadataServiceUri, 2);
// load the stream storage component
ServerConfiguration serverConf = new ServerConfiguration();
serverConf.loadConf(conf);
BookieConfiguration bkConf = new BookieConfiguration(serverConf);
this.streamStorage = new StreamStorageLifecycleComponent(bkConf, NullStatsLogger.INSTANCE);
this.streamStorage.start();
LOG.debug("Local BK stream storage started (port: {})", streamStoragePort);
// create a default namespace
try (StorageAdminClient admin = StorageClientBuilder.newBuilder()
.withSettings(StorageClientSettings.newBuilder()
.serviceUri("bk://localhost:4181")
.backoffPolicy(Backoff.Jitter.of(
Type.EXPONENTIAL,
1000,
10000,
30
))
.build())
.buildAdmin()) {
try {
NamespaceProperties ns = FutureUtils.result(admin.getNamespace("default"));
LOG.info("'default' namespace for table service : {}", ns);
} catch (NamespaceNotFoundException nnfe) {
LOG.info("Creating default namespace");
try {
NamespaceProperties ns =
FutureUtils.result(admin.createNamespace("default", NamespaceConfiguration.newBuilder()
.setDefaultStreamConf(DEFAULT_STREAM_CONF)
.build()));
LOG.info("Successfully created 'default' namespace :\n{}", ns);
} catch (NamespaceExistsException nee) {
// namespace already exists
LOG.warn("Namespace 'default' already existed.");
}
}
}
}
public void start(boolean enableStreamStorage) throws Exception {
LOG.debug("Local ZK/BK starting ...");
ServerConfiguration conf = new ServerConfiguration();
// Use minimal configuration requiring less memory for unit tests
conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
conf.setProperty("dbStorage_writeCacheMaxSizeMb", 2);
conf.setProperty("dbStorage_readAheadCacheMaxSizeMb", 1);
conf.setProperty("dbStorage_rocksDB_writeBufferSizeMB", 1);
conf.setProperty("dbStorage_rocksDB_blockCacheSize", 1024 * 1024);
conf.setFlushInterval(60000);
conf.setJournalSyncData(false);
conf.setProperty("journalMaxGroupWaitMSec", 0L);
conf.setAllowLoopback(true);
conf.setGcWaitTime(60000);
conf.setNumAddWorkerThreads(0);
conf.setNumReadWorkerThreads(0);
conf.setNumHighPriorityWorkerThreads(0);
conf.setNumJournalCallbackThreads(0);
conf.setServerNumIOThreads(1);
conf.setNumLongPollWorkerThreads(1);
conf.setAllocatorPoolingPolicy(PoolingPolicy.UnpooledHeap);
runZookeeper(1000);
initializeZookeper();
runBookies(conf);
if (enableStreamStorage) {
runStreamStorage(new CompositeConfiguration());
}
}
public void start() throws Exception {
start(false);
}
public void startStandalone() throws Exception {
startStandalone(new ServerConfiguration(), false);
}
public void startStandalone(ServerConfiguration conf, boolean enableStreamStorage) throws Exception {
LOG.debug("Local ZK/BK starting ...");
conf.setAdvertisedAddress(advertisedAddress);
runZookeeper(1000);
initializeZookeper();
runBookies(conf);
if (enableStreamStorage) {
runStreamStorage(new CompositeConfiguration());
}
}
public void stopBK(int i) {
bs[i].shutdown();
}
public void stopBK() {
LOG.debug("Local ZK/BK stopping ...");
for (BookieServer bookie : bs) {
bookie.shutdown();
}
}
public void startBK(int i) throws Exception {
try {
bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, null);
} catch (InvalidCookieException e) {
// InvalidCookieException can happen if the machine IP has changed
// Since we are running here a local bookie that is always accessed
// from localhost, we can ignore the error
for (String path : zkc.getChildren("/ledgers/cookies", false)) {
zkc.delete("/ledgers/cookies/" + path, -1);
}
// Also clean the on-disk cookie
new File(new File(bsConfs[i].getJournalDirNames()[0], "current"), "VERSION").delete();
// Retry to start the bookie after cleaning the old left cookie
bs[i] = new BookieServer(bsConfs[i], NullStatsLogger.INSTANCE, null);
}
bs[i].start();
}
public void startBK() throws Exception {
for (int i = 0; i < numberOfBookies; i++) {
startBK(i);
}
}
public void stop() throws Exception {
if (null != streamStorage) {
LOG.debug("Local bk stream storage stopping ...");
streamStorage.close();
}
LOG.debug("Local ZK/BK stopping ...");
for (BookieServer bookie : bs) {
try {
bookie.shutdown();
} catch (Exception e) {
LOG.warn("failed to shutdown bookie", e);
}
}
zkc.close();
zks.shutdown();
serverFactory.shutdown();
if (zkDataCleanupManager != null) {
zkDataCleanupManager.shutdown();
}
LOG.debug("Local ZK/BK stopped");
for (File managedDir : temporaryDirectories) {
LOG.info("deleting test directory {}", managedDir);
FileUtils.deleteDirectory(managedDir);
}
temporaryDirectories.clear();
}
/* Watching SyncConnected event from ZooKeeper */
public static class ZKConnectionWatcher implements Watcher {
private final CountDownLatch clientConnectLatch = new CountDownLatch(1);
@Override
public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
clientConnectLatch.countDown();
}
}
// Waiting for the SyncConnected event from the ZooKeeper server
public void waitForConnection() throws IOException {
try {
if (!clientConnectLatch.await(zkSessionTimeOut, TimeUnit.MILLISECONDS)) {
throw new IOException("Couldn't connect to zookeeper server");
}
} catch (InterruptedException e) {
throw new IOException("Interrupted when connecting to zookeeper server", e);
}
}
}
public static boolean waitForServerUp(String hp, long timeout) {
long start = System.currentTimeMillis();
String[] split = hp.split(":");
String host = split[0];
int port = Integer.parseInt(split[1]);
while (true) {
try {
Socket sock = new Socket(host, port);
BufferedReader reader = null;
try {
OutputStream outstream = sock.getOutputStream();
outstream.write("stat".getBytes());
outstream.flush();
reader = new BufferedReader(new InputStreamReader(sock.getInputStream()));
String line = reader.readLine();
if (line != null && line.startsWith("Zookeeper version:")) {
LOG.info("Server UP");
return true;
}
} finally {
sock.close();
if (reader != null) {
reader.close();
}
}
} catch (IOException e) {
// ignore as this is expected
LOG.info("server " + hp + " not up " + e);
}
if (System.currentTimeMillis() > start + timeout) {
break;
}
try {
Thread.sleep(250);
} catch (InterruptedException e) {
// ignore
}
}
return false;
}
public ZooKeeper getZkClient() {
return zkc;
}
public ZooKeeperServer getZkServer() {
return zks;
}
public BookieServer[] getBookies() {
return bs;
}
public int getZookeeperPort() {
return zkPort;
}
}