| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.bookkeeper.util; |
| |
| import static java.nio.charset.StandardCharsets.UTF_8; |
| import static org.apache.bookkeeper.server.Main.storageDirectoriesFromConf; |
| import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE; |
| import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY; |
| |
| import com.google.common.collect.Lists; |
| import java.io.BufferedReader; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.InputStreamReader; |
| import java.io.OutputStream; |
| import java.io.PrintWriter; |
| import java.net.InetAddress; |
| import java.net.Socket; |
| import java.util.ArrayList; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.function.Supplier; |
| import java.util.stream.Collectors; |
| import org.apache.bookkeeper.bookie.Bookie; |
| import org.apache.bookkeeper.bookie.BookieImpl; |
| import org.apache.bookkeeper.bookie.BookieResources; |
| import org.apache.bookkeeper.bookie.CookieValidation; |
| import org.apache.bookkeeper.bookie.LedgerDirsManager; |
| import org.apache.bookkeeper.bookie.LedgerStorage; |
| import org.apache.bookkeeper.bookie.LegacyCookieValidation; |
| import org.apache.bookkeeper.bookie.UncleanShutdownDetection; |
| import org.apache.bookkeeper.bookie.UncleanShutdownDetectionImpl; |
| import org.apache.bookkeeper.common.allocator.ByteBufAllocatorWithOomHandler; |
| import org.apache.bookkeeper.common.component.ComponentInfoPublisher; |
| import org.apache.bookkeeper.conf.ServerConfiguration; |
| import org.apache.bookkeeper.discover.BookieServiceInfo; |
| import org.apache.bookkeeper.discover.BookieServiceInfo.Endpoint; |
| import org.apache.bookkeeper.discover.RegistrationManager; |
| import org.apache.bookkeeper.meta.LedgerManager; |
| import org.apache.bookkeeper.meta.LedgerManagerFactory; |
| import org.apache.bookkeeper.meta.MetadataBookieDriver; |
| import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; |
| import org.apache.bookkeeper.proto.BookieServer; |
| import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim; |
| import org.apache.bookkeeper.shims.zk.ZooKeeperServerShimFactory; |
| import org.apache.bookkeeper.stats.NullStatsLogger; |
| import org.apache.bookkeeper.zookeeper.ZooKeeperClient; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.Op; |
| import org.apache.zookeeper.ZooDefs.Ids; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Local Bookkeeper. |
| */ |
| public class LocalBookKeeper implements AutoCloseable { |
| protected static final Logger LOG = LoggerFactory.getLogger(LocalBookKeeper.class); |
| public static final int CONNECTION_TIMEOUT = 30000; |
| |
| private static String newMetadataServiceUri(String zkServers, int port, String layout, String ledgerPath) { |
| return "zk+" + layout + "://" + zkServers + ":" + port + ledgerPath; |
| } |
| |
| int numberOfBookies; |
| |
| public LocalBookKeeper( |
| int numberOfBookies, |
| ServerConfiguration baseConf, |
| String localBookiesConfigDirName, |
| boolean stopOnExit, String dirSuffix, |
| String zkHost, int zkPort) { |
| this.numberOfBookies = numberOfBookies; |
| this.localBookiesConfigDir = new File(localBookiesConfigDirName); |
| this.baseConf = baseConf; |
| this.localBookies = new ArrayList<>(); |
| this.stopOnExit = stopOnExit; |
| this.dirSuffix = dirSuffix; |
| this.zkHost = zkHost; |
| this.zkPort = zkPort; |
| this.dirsToCleanUp = new ArrayList<>(); |
| LOG.info("Running {} bookie(s) on zk ensemble = '{}:{}'.", this.numberOfBookies, |
| zooKeeperDefaultHost, zooKeeperDefaultPort); |
| } |
| |
| private static String zooKeeperDefaultHost = "127.0.0.1"; |
| private static int zooKeeperDefaultPort = 2181; |
| private static int zkSessionTimeOut = 5000; |
| private static String defaultLocalBookiesConfigDir = "/tmp/localbookies-config"; |
| |
| //BookKeeper variables |
| List<LocalBookie> localBookies; |
| ZooKeeperServerShim zks; |
| String zkHost; |
| int zkPort; |
| String dirSuffix; |
| ByteBufAllocatorWithOomHandler allocator; |
| private ServerConfiguration baseConf; |
| File localBookiesConfigDir; |
| List<File> dirsToCleanUp; |
| boolean stopOnExit; |
| |
| /** |
| * @param maxCC |
| * Max Concurrency of Client |
| * @param zookeeperPort |
| * ZooKeeper Server Port |
| */ |
| public static ZooKeeperServerShim runZookeeper(int maxCC, int zookeeperPort) throws IOException { |
| File zkTmpDir = IOUtils.createTempDir("zookeeper", "localbookkeeper"); |
| return runZookeeper(maxCC, zookeeperPort, zkTmpDir); |
| } |
| |
| public static ZooKeeperServerShim runZookeeper(int maxCC, int zookeeperPort, File zkDir) throws IOException { |
| LOG.info("Starting ZK server"); |
| ZooKeeperServerShim server = ZooKeeperServerShimFactory.createServer(zkDir, zkDir, zookeeperPort, maxCC); |
| server.start(); |
| |
| boolean b = waitForServerUp(InetAddress.getLoopbackAddress().getHostAddress() + ":" + zookeeperPort, |
| CONNECTION_TIMEOUT); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("ZooKeeper server up: {}", b); |
| } |
| return server; |
| } |
| |
| private void initializeZookeeper() throws IOException { |
| LOG.info("Instantiate ZK Client"); |
| //initialize the zk client with values |
| try (ZooKeeperClient zkc = ZooKeeperClient.newBuilder() |
| .connectString(zkHost + ":" + zkPort) |
| .sessionTimeoutMs(zkSessionTimeOut) |
| .build()) { |
| String zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseConf); |
| ZkUtils.createFullPathOptimistic(zkc, zkLedgersRootPath, new byte[0], Ids.OPEN_ACL_UNSAFE, |
| CreateMode.PERSISTENT); |
| List<Op> multiOps = Lists.newArrayListWithExpectedSize(2); |
| multiOps.add( |
| Op.create(zkLedgersRootPath + "/" + AVAILABLE_NODE, |
| new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); |
| multiOps.add( |
| Op.create(zkLedgersRootPath + "/" + AVAILABLE_NODE + "/" + READONLY, |
| new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT)); |
| zkc.multi(multiOps); |
| // No need to create an entry for each requested bookie anymore as the |
| // BookieServers will register themselves with ZooKeeper on startup. |
| } catch (KeeperException e) { |
| LOG.error("Exception while creating znodes", e); |
| throw new IOException("Error creating znodes : ", e); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| LOG.error("Interrupted while creating znodes", e); |
| throw new IOException("Error creating znodes : ", e); |
| } |
| } |
| |
| private static void cleanupDirectories(List<File> dirs) throws IOException { |
| for (File dir : dirs) { |
| FileUtils.deleteDirectory(dir); |
| } |
| } |
| |
| private void runBookies() |
| throws Exception { |
| LOG.info("Starting Bookie(s)"); |
| // Create Bookie Servers (B1, B2, B3) |
| |
| if (localBookiesConfigDir.exists() && localBookiesConfigDir.isFile()) { |
| throw new IOException("Unable to create LocalBookiesConfigDir, since there is a file at " |
| + localBookiesConfigDir.getAbsolutePath()); |
| } |
| if (!localBookiesConfigDir.exists() && !localBookiesConfigDir.mkdirs()) { |
| throw new IOException( |
| "Unable to create LocalBookiesConfigDir - " + localBookiesConfigDir.getAbsolutePath()); |
| } |
| allocator = BookieResources.createAllocator(baseConf); |
| for (int i = 0; i < numberOfBookies; i++) { |
| runBookie(i); |
| } |
| |
| /* |
| * baseconf.conf is needed because for executing any BookieShell command |
| * of Metadata/Zookeeper Operation nature we need a valid conf file |
| * having correct zk details and this could be used for running any such |
| * bookieshell commands if bookieid is not provided as parameter to |
| * bookkeeper shell operation. for eg: |
| * "./bookkeeper shell localbookie listbookies -rw". But for execution |
| * shell command of bookie Operation nature we need to provide bookieid, |
| * for eg "./bookkeeper shell -localbookie 10.3.27.190:5000 lastmark", |
| * so this shell command would use '10.3.27.190:5000.conf' file |
| */ |
| ServerConfiguration baseConfWithCorrectZKServers = new ServerConfiguration( |
| (ServerConfiguration) baseConf.clone()); |
| if (null == baseConf.getMetadataServiceUriUnchecked()) { |
| baseConfWithCorrectZKServers.setMetadataServiceUri(baseConf.getMetadataServiceUri()); |
| } |
| serializeLocalBookieConfig(baseConfWithCorrectZKServers, "baseconf.conf"); |
| } |
| |
| @SuppressWarnings("deprecation") |
| private void runBookie(int bookieIndex) throws Exception { |
| File journalDirs; |
| if (null == baseConf.getJournalDirNameWithoutDefault()) { |
| journalDirs = IOUtils.createTempDir("localbookkeeper" + bookieIndex, dirSuffix); |
| dirsToCleanUp.add(journalDirs); |
| } else { |
| journalDirs = new File(baseConf.getJournalDirName(), "bookie" + bookieIndex); |
| } |
| if (journalDirs.exists()) { |
| if (journalDirs.isDirectory()) { |
| FileUtils.deleteDirectory(journalDirs); |
| } else if (!journalDirs.delete()) { |
| throw new IOException("Couldn't cleanup bookie journal dir " + journalDirs); |
| } |
| } |
| if (!journalDirs.mkdirs()) { |
| throw new IOException("Couldn't create bookie journal dir " + journalDirs); |
| } |
| |
| String[] ledgerDirs = baseConf.getLedgerDirWithoutDefault(); |
| if ((null == ledgerDirs) || (0 == ledgerDirs.length)) { |
| ledgerDirs = new String[]{journalDirs.getPath()}; |
| } else { |
| for (int l = 0; l < ledgerDirs.length; l++) { |
| File dir = new File(ledgerDirs[l], "bookie" + bookieIndex); |
| if (dir.exists()) { |
| if (dir.isDirectory()) { |
| FileUtils.deleteDirectory(dir); |
| } else if (!dir.delete()) { |
| throw new IOException("Couldn't cleanup bookie ledger dir " + dir); |
| } |
| } |
| if (!dir.mkdirs()) { |
| throw new IOException("Couldn't create bookie ledger dir " + dir); |
| } |
| dirsToCleanUp.add(dir); |
| ledgerDirs[l] = dir.getPath(); |
| } |
| } |
| ServerConfiguration conf = new ServerConfiguration((ServerConfiguration) baseConf.clone()); |
| |
| conf.setBookiePort(PortManager.nextFreePort()); |
| |
| if (null == baseConf.getMetadataServiceUriUnchecked()) { |
| conf.setMetadataServiceUri(baseConf.getMetadataServiceUri()); |
| } |
| |
| conf.setJournalDirName(journalDirs.getPath()); |
| conf.setLedgerDirNames(ledgerDirs); |
| |
| // write config into file before start so we can know what's wrong if start failed |
| String fileName = BookieImpl.getBookieId(conf).toString() + ".conf"; |
| serializeLocalBookieConfig(conf, fileName); |
| |
| LocalBookie b = new LocalBookie(conf); |
| b.start(); |
| localBookies.add(b); |
| } |
| |
| private void setZooKeeperShim(ZooKeeperServerShim zks, File zkTmpDir) { |
| this.zks = zks; |
| this.dirsToCleanUp.add(zkTmpDir); |
| } |
| |
| public static LocalBookKeeper getLocalBookies(String zkHost, |
| int zkPort, |
| int numBookies, |
| boolean shouldStartZK, |
| ServerConfiguration conf) throws Exception { |
| return getLocalBookiesInternal( |
| conf, zkHost, zkPort, numBookies, shouldStartZK, |
| true, "test", null, defaultLocalBookiesConfigDir); |
| } |
| |
| @SuppressWarnings("deprecation") |
| private static LocalBookKeeper getLocalBookiesInternal(ServerConfiguration conf, |
| String zkHost, |
| int zkPort, |
| int numBookies, |
| boolean shouldStartZK, |
| boolean stopOnExit, |
| String dirSuffix, |
| String zkDataDir, |
| String localBookiesConfigDirName) throws Exception { |
| conf.setMetadataServiceUri( |
| newMetadataServiceUri( |
| zkHost, |
| zkPort, |
| conf.getLedgerManagerLayoutStringFromFactoryClass(), |
| conf.getZkLedgersRootPath())); |
| LocalBookKeeper lb = new LocalBookKeeper(numBookies, conf, localBookiesConfigDirName, stopOnExit, |
| dirSuffix, zkHost, zkPort); |
| if (shouldStartZK) { |
| File zkDataDirFile = null; |
| if (zkDataDir != null) { |
| zkDataDirFile = new File(zkDataDir); |
| if (zkDataDirFile.exists() && zkDataDirFile.isFile()) { |
| throw new IOException("Unable to create zkDataDir, since there is a file at " |
| + zkDataDirFile.getAbsolutePath()); |
| } |
| if (!zkDataDirFile.exists() && !zkDataDirFile.mkdirs()) { |
| throw new IOException("Unable to create zkDataDir - " + zkDataDirFile.getAbsolutePath()); |
| } |
| } |
| File zkTmpDir = IOUtils.createTempDir("zookeeper", dirSuffix, zkDataDirFile); |
| lb.setZooKeeperShim(LocalBookKeeper.runZookeeper(1000, zkPort, zkTmpDir), zkTmpDir); |
| } |
| |
| return lb; |
| } |
| |
| /** |
| * Serializes the config object to the specified file in localBookiesConfigDir. |
| * |
| * @param localBookieConfig |
| * config object which has to be serialized |
| * @param fileName |
| * name of the file |
| * @throws IOException |
| */ |
| private void serializeLocalBookieConfig(ServerConfiguration localBookieConfig, String fileName) throws IOException { |
| if (StringUtils.isBlank(fileName) |
| || fileName.contains("..") |
| || fileName.contains("/") |
| || fileName.contains("\\")) { |
| throw new IllegalArgumentException("Invalid filename: " + fileName); |
| } |
| |
| File localBookieConfFile = new File(localBookiesConfigDir, fileName); |
| if (localBookieConfFile.exists() && !localBookieConfFile.delete()) { |
| throw new IOException( |
| "Unable to delete the existing LocalBookieConfigFile - " + localBookieConfFile.getAbsolutePath()); |
| } |
| if (!localBookieConfFile.createNewFile()) { |
| throw new IOException("Unable to create new File - " + localBookieConfFile.getAbsolutePath()); |
| } |
| Iterator<String> keys = localBookieConfig.getKeys(); |
| try (PrintWriter writer = new PrintWriter(localBookieConfFile, "UTF-8")) { |
| while (keys.hasNext()) { |
| String key = keys.next(); |
| String[] values = localBookieConfig.getStringArray(key); |
| StringBuilder concatenatedValue = new StringBuilder(values[0]); |
| for (int i = 1; i < values.length; i++) { |
| concatenatedValue.append(",").append(values[i]); |
| } |
| writer.println(key + "=" + concatenatedValue); |
| } |
| } |
| } |
| |
| public static void main(String[] args) { |
| System.setProperty("zookeeper.4lw.commands.whitelist", "*"); |
| try { |
| if (args.length < 1) { |
| usage(); |
| System.exit(-1); |
| } |
| |
| int numBookies = 0; |
| try { |
| numBookies = Integer.parseInt(args[0]); |
| } catch (NumberFormatException nfe) { |
| LOG.error("Unrecognized number-of-bookies: {}", args[0]); |
| usage(); |
| System.exit(-1); |
| } |
| |
| ServerConfiguration conf = new ServerConfiguration(); |
| conf.setAllowLoopback(true); |
| if (args.length >= 2) { |
| String confFile = args[1]; |
| try { |
| conf.loadConf(new File(confFile).toURI().toURL()); |
| LOG.info("Using configuration file {}", confFile); |
| } catch (Exception e) { |
| // load conf failed |
| LOG.warn("Error loading configuration file {}", confFile, e); |
| } |
| } |
| |
| String zkDataDir = null; |
| if (args.length >= 3) { |
| zkDataDir = args[2]; |
| } |
| |
| String localBookiesConfigDirName = defaultLocalBookiesConfigDir; |
| if (args.length >= 4) { |
| localBookiesConfigDirName = args[3]; |
| } |
| |
| try (LocalBookKeeper lb = getLocalBookiesInternal(conf, zooKeeperDefaultHost, zooKeeperDefaultPort, |
| numBookies, true, false, "test", zkDataDir, |
| localBookiesConfigDirName)) { |
| try { |
| lb.start(); |
| while (true) { |
| Thread.sleep(1000); |
| } |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| throw ie; |
| } |
| } |
| } catch (Exception e) { |
| LOG.error("Exiting LocalBookKeeper because of exception in main method", e); |
| /* |
| * This is needed because, some non-daemon thread (probably in ZK or |
| * some other dependent service) is preventing the JVM from exiting, though |
| * there is exception in main thread. |
| */ |
| System.exit(-1); |
| } |
| } |
| |
| private static void usage() { |
| System.err.println( |
| "Usage: LocalBookKeeper number-of-bookies [path to bookie config] " |
| + "[path to create ZK data directory at] [path to LocalBookiesConfigDir]"); |
| } |
| |
| public static boolean waitForServerUp(String hp, long timeout) { |
| long start = System.currentTimeMillis(); |
| String[] split = hp.split(":"); |
| String host = split[0]; |
| int port = Integer.parseInt(split[1]); |
| while (true) { |
| try (Socket sock = new Socket(host, port); |
| BufferedReader reader = new BufferedReader(new InputStreamReader(sock.getInputStream(), UTF_8))) { |
| OutputStream outstream = sock.getOutputStream(); |
| outstream.write("stat".getBytes(UTF_8)); |
| outstream.flush(); |
| |
| String line = reader.readLine(); |
| if (line != null && line.startsWith("Zookeeper version:")) { |
| LOG.info("Server UP"); |
| return true; |
| } |
| } catch (IOException e) { |
| // ignore as this is expected |
| LOG.info("server " + hp + " not up " + e); |
| } |
| |
| if (System.currentTimeMillis() > start + timeout) { |
| break; |
| } |
| try { |
| Thread.sleep(250); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| // ignore |
| } |
| } |
| return false; |
| } |
| |
| public void start() throws Exception { |
| initializeZookeeper(); |
| runBookies(); |
| } |
| |
| public void addBookie() throws Exception { |
| int bookieIndex = localBookies.size() + 1; |
| runBookie(bookieIndex); |
| } |
| |
| public void removeBookie() throws Exception { |
| int index = localBookies.size() - 1; |
| LocalBookie bookie = localBookies.get(index); |
| bookie.shutdown(); |
| localBookies.remove(index); |
| } |
| |
| public void shutdownBookies() throws Exception { |
| for (LocalBookie b : localBookies) { |
| b.shutdown(); |
| } |
| } |
| |
| @Override |
| public void close() throws Exception { |
| if (stopOnExit) { |
| shutdownBookies(); |
| |
| if (null != zks) { |
| zks.stop(); |
| } |
| } |
| |
| cleanupDirectories(dirsToCleanUp); |
| } |
| |
| private class LocalBookie { |
| final BookieServer server; |
| final Bookie bookie; |
| final MetadataBookieDriver metadataDriver; |
| final RegistrationManager registrationManager; |
| final LedgerManagerFactory lmFactory; |
| final LedgerManager ledgerManager; |
| |
| LocalBookie(ServerConfiguration conf) throws Exception { |
| metadataDriver = BookieResources.createMetadataDriver(conf, NullStatsLogger.INSTANCE); |
| registrationManager = metadataDriver.createRegistrationManager(); |
| lmFactory = metadataDriver.getLedgerManagerFactory(); |
| ledgerManager = lmFactory.newLedgerManager(); |
| |
| DiskChecker diskChecker = BookieResources.createDiskChecker(conf); |
| LedgerDirsManager ledgerDirsManager = BookieResources.createLedgerDirsManager( |
| conf, diskChecker, NullStatsLogger.INSTANCE); |
| LedgerDirsManager indexDirsManager = BookieResources.createIndexDirsManager( |
| conf, diskChecker, NullStatsLogger.INSTANCE, ledgerDirsManager); |
| LedgerStorage storage = BookieResources.createLedgerStorage( |
| conf, ledgerManager, ledgerDirsManager, indexDirsManager, |
| NullStatsLogger.INSTANCE, allocator); |
| |
| CookieValidation cookieValidation = new LegacyCookieValidation(conf, registrationManager); |
| cookieValidation.checkCookies(storageDirectoriesFromConf(conf)); |
| |
| UncleanShutdownDetection shutdownManager = new UncleanShutdownDetectionImpl(ledgerDirsManager); |
| |
| final ComponentInfoPublisher componentInfoPublisher = new ComponentInfoPublisher(); |
| final Supplier<BookieServiceInfo> bookieServiceInfoProvider = |
| () -> buildBookieServiceInfo(componentInfoPublisher); |
| |
| componentInfoPublisher.startupFinished(); |
| bookie = new BookieImpl(conf, registrationManager, storage, diskChecker, |
| ledgerDirsManager, indexDirsManager, |
| NullStatsLogger.INSTANCE, allocator, bookieServiceInfoProvider); |
| server = new BookieServer(conf, bookie, NullStatsLogger.INSTANCE, allocator, |
| shutdownManager); |
| } |
| |
| void start() throws Exception { |
| server.start(); |
| } |
| |
| void shutdown() throws Exception { |
| server.shutdown(); |
| ledgerManager.close(); |
| lmFactory.close(); |
| registrationManager.close(); |
| metadataDriver.close(); |
| } |
| } |
| |
| /** |
| * Create the {@link BookieServiceInfo} starting from the published endpoints. |
| * |
| * @see ComponentInfoPublisher |
| * @param componentInfoPublisher the endpoint publisher |
| * @return the created bookie service info |
| */ |
| private static BookieServiceInfo buildBookieServiceInfo(ComponentInfoPublisher componentInfoPublisher) { |
| List<Endpoint> endpoints = componentInfoPublisher.getEndpoints().values() |
| .stream().map(e -> { |
| return new Endpoint( |
| e.getId(), |
| e.getPort(), |
| e.getHost(), |
| e.getProtocol(), |
| e.getAuth(), |
| e.getExtensions() |
| ); |
| }).collect(Collectors.toList()); |
| return new BookieServiceInfo(componentInfoPublisher.getProperties(), endpoints); |
| } |
| } |