| /** |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.bookkeeper.proto; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.MalformedURLException; |
| import java.net.UnknownHostException; |
| |
| import org.apache.bookkeeper.bookie.Bookie; |
| import org.apache.bookkeeper.bookie.BookieCriticalThread; |
| import org.apache.bookkeeper.bookie.BookieException; |
| import org.apache.bookkeeper.bookie.ExitCode; |
| import org.apache.bookkeeper.conf.ServerConfiguration; |
| import org.apache.bookkeeper.jmx.BKMBeanRegistry; |
| import org.apache.bookkeeper.net.BookieSocketAddress; |
| import org.apache.bookkeeper.processor.RequestProcessor; |
| import org.apache.bookkeeper.replication.AutoRecoveryMain; |
| import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException; |
| import org.apache.bookkeeper.replication.ReplicationException.UnavailableException; |
| import org.apache.bookkeeper.stats.NullStatsLogger; |
| import org.apache.bookkeeper.stats.StatsLogger; |
| import org.apache.bookkeeper.stats.StatsProvider; |
| import org.apache.bookkeeper.util.ReflectionUtils; |
| import org.apache.commons.cli.BasicParser; |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.cli.HelpFormatter; |
| import org.apache.commons.cli.Options; |
| import org.apache.commons.cli.ParseException; |
| import org.apache.commons.configuration.ConfigurationException; |
| import org.apache.zookeeper.KeeperException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| |
| import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE; |
| import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_SCOPE; |
| import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_SCOPE; |
| |
| /** |
| * Implements the server-side part of the BookKeeper protocol. |
| * |
| */ |
| public class BookieServer { |
| final ServerConfiguration conf; |
| BookieNettyServer nettyServer; |
| private volatile boolean running = false; |
| Bookie bookie; |
| DeathWatcher deathWatcher; |
| private final static Logger LOG = LoggerFactory.getLogger(BookieServer.class); |
| |
| int exitCode = ExitCode.OK; |
| |
| // operation stats |
| protected BookieServerBean jmxBkServerBean; |
| AutoRecoveryMain autoRecoveryMain = null; |
| private boolean isAutoRecoveryDaemonEnabled; |
| |
| // request processor |
| private final RequestProcessor requestProcessor; |
| |
| // Expose Stats |
| private final StatsLogger statsLogger; |
| |
| public BookieServer(ServerConfiguration conf) throws IOException, |
| KeeperException, InterruptedException, BookieException, |
| UnavailableException, CompatibilityException { |
| this(conf, NullStatsLogger.INSTANCE); |
| } |
| |
| public BookieServer(ServerConfiguration conf, StatsLogger statsLogger) |
| throws IOException, KeeperException, InterruptedException, |
| BookieException, UnavailableException, CompatibilityException { |
| this.conf = conf; |
| this.statsLogger = statsLogger; |
| this.bookie = newBookie(conf); |
| this.requestProcessor = new BookieRequestProcessor(conf, bookie, |
| statsLogger.scope(SERVER_SCOPE)); |
| this.nettyServer = new BookieNettyServer(this.conf, requestProcessor); |
| isAutoRecoveryDaemonEnabled = conf.isAutoRecoveryDaemonEnabled(); |
| if (isAutoRecoveryDaemonEnabled) { |
| this.autoRecoveryMain = new AutoRecoveryMain(conf, statsLogger.scope(REPLICATION_SCOPE)); |
| } |
| } |
| |
| protected Bookie newBookie(ServerConfiguration conf) |
| throws IOException, KeeperException, InterruptedException, BookieException { |
| return new Bookie(conf, statsLogger.scope(BOOKIE_SCOPE)); |
| } |
| |
| public void start() throws IOException, UnavailableException { |
| this.bookie.start(); |
| // fail fast, when bookie startup is not successful |
| if (!this.bookie.isRunning()) { |
| exitCode = bookie.getExitCode(); |
| return; |
| } |
| if (isAutoRecoveryDaemonEnabled && this.autoRecoveryMain != null) { |
| this.autoRecoveryMain.start(); |
| } |
| this.nettyServer.start(); |
| |
| running = true; |
| deathWatcher = new DeathWatcher(conf); |
| deathWatcher.start(); |
| |
| // register jmx |
| registerJMX(); |
| } |
| |
| @VisibleForTesting |
| public BookieSocketAddress getLocalAddress() throws UnknownHostException { |
| return Bookie.getBookieAddress(conf); |
| } |
| |
| @VisibleForTesting |
| public Bookie getBookie() { |
| return bookie; |
| } |
| |
| /** |
| * Suspend processing of requests in the bookie (for testing) |
| */ |
| @VisibleForTesting |
| public void suspendProcessing() { |
| LOG.debug("Suspending bookie server, port is {}", conf.getBookiePort()); |
| nettyServer.suspendProcessing(); |
| } |
| |
| /** |
| * Resume processing requests in the bookie (for testing) |
| */ |
| @VisibleForTesting |
| public void resumeProcessing() { |
| LOG.debug("Resuming bookie server, port is {}", conf.getBookiePort()); |
| nettyServer.resumeProcessing(); |
| } |
| |
| public synchronized void shutdown() { |
| if (!running) { |
| return; |
| } |
| LOG.info("Shutting down BookieServer"); |
| this.nettyServer.shutdown(); |
| exitCode = bookie.shutdown(); |
| if (isAutoRecoveryDaemonEnabled && this.autoRecoveryMain != null) { |
| this.autoRecoveryMain.shutdown(); |
| } |
| this.requestProcessor.close(); |
| running = false; |
| |
| // unregister JMX |
| unregisterJMX(); |
| } |
| |
| protected void registerJMX() { |
| try { |
| jmxBkServerBean = new BookieServerBean(conf, this); |
| BKMBeanRegistry.getInstance().register(jmxBkServerBean, null); |
| |
| bookie.registerJMX(jmxBkServerBean); |
| } catch (Exception e) { |
| LOG.warn("Failed to register with JMX", e); |
| jmxBkServerBean = null; |
| } |
| } |
| |
| protected void unregisterJMX() { |
| try { |
| bookie.unregisterJMX(); |
| if (jmxBkServerBean != null) { |
| BKMBeanRegistry.getInstance().unregister(jmxBkServerBean); |
| } |
| } catch (Exception e) { |
| LOG.warn("Failed to unregister with JMX", e); |
| } |
| jmxBkServerBean = null; |
| } |
| |
| public boolean isRunning() { |
| return bookie.isRunning() && nettyServer.isRunning() && running; |
| } |
| |
| /** |
| * Whether bookie is running? |
| * |
| * @return true if bookie is running, otherwise return false |
| */ |
| public boolean isBookieRunning() { |
| return bookie.isRunning(); |
| } |
| |
| /** |
| * Whether auto-recovery service running with Bookie? |
| * |
| * @return true if auto-recovery service is running, otherwise return false |
| */ |
| public boolean isAutoRecoveryRunning() { |
| return this.autoRecoveryMain != null |
| && this.autoRecoveryMain.isAutoRecoveryRunning(); |
| } |
| |
| public void join() throws InterruptedException { |
| bookie.join(); |
| } |
| |
| public int getExitCode() { |
| return exitCode; |
| } |
| |
| /** |
| * A thread to watch whether bookie & nioserver is still alive |
| */ |
| private class DeathWatcher extends BookieCriticalThread { |
| |
| private final int watchInterval; |
| |
| DeathWatcher(ServerConfiguration conf) { |
| super("BookieDeathWatcher-" + conf.getBookiePort()); |
| watchInterval = conf.getDeathWatchInterval(); |
| } |
| |
| @Override |
| public void run() { |
| while(true) { |
| try { |
| Thread.sleep(watchInterval); |
| } catch (InterruptedException ie) { |
| // do nothing |
| } |
| if (!isBookieRunning()) { |
| shutdown(); |
| break; |
| } |
| if (isAutoRecoveryDaemonEnabled && !isAutoRecoveryRunning()) { |
| LOG.error("Autorecovery daemon has stopped. Please check the logs"); |
| isAutoRecoveryDaemonEnabled = false; // to avoid spamming the logs |
| } |
| } |
| LOG.info("BookieDeathWatcher exited loop!"); |
| } |
| } |
| |
| static final Options bkOpts = new Options(); |
| static { |
| bkOpts.addOption("c", "conf", true, "Configuration for Bookie Server"); |
| bkOpts.addOption("withAutoRecovery", false, |
| "Start Autorecovery service Bookie server"); |
| bkOpts.addOption("h", "help", false, "Print help message"); |
| } |
| |
| /** |
| * Print usage |
| */ |
| private static void printUsage() { |
| HelpFormatter hf = new HelpFormatter(); |
| hf.printHelp("BookieServer [options]\n\tor\n" |
| + "BookieServer <bookie_port> <zk_servers> <journal_dir> <ledger_dir [ledger_dir]>", bkOpts); |
| } |
| |
| private static void loadConfFile(ServerConfiguration conf, String confFile) |
| throws IllegalArgumentException { |
| try { |
| conf.loadConf(new File(confFile).toURI().toURL()); |
| conf.validate(); |
| } catch (MalformedURLException e) { |
| LOG.error("Could not open configuration file: " + confFile, e); |
| throw new IllegalArgumentException(); |
| } catch (ConfigurationException e) { |
| LOG.error("Malformed configuration file: " + confFile, e); |
| throw new IllegalArgumentException(); |
| } |
| LOG.info("Using configuration file " + confFile); |
| } |
| |
| private static ServerConfiguration parseArgs(String[] args) |
| throws IllegalArgumentException { |
| try { |
| BasicParser parser = new BasicParser(); |
| CommandLine cmdLine = parser.parse(bkOpts, args); |
| |
| if (cmdLine.hasOption('h')) { |
| throw new IllegalArgumentException(); |
| } |
| |
| ServerConfiguration conf = new ServerConfiguration(); |
| String[] leftArgs = cmdLine.getArgs(); |
| |
| if (cmdLine.hasOption('c')) { |
| if (null != leftArgs && leftArgs.length > 0) { |
| throw new IllegalArgumentException(); |
| } |
| String confFile = cmdLine.getOptionValue("c"); |
| loadConfFile(conf, confFile); |
| return conf; |
| } |
| |
| if (cmdLine.hasOption("withAutoRecovery")) { |
| conf.setAutoRecoveryDaemonEnabled(true); |
| } |
| |
| if (leftArgs.length < 4) { |
| throw new IllegalArgumentException(); |
| } |
| |
| // command line arguments overwrite settings in configuration file |
| conf.setBookiePort(Integer.parseInt(leftArgs[0])); |
| conf.setZkServers(leftArgs[1]); |
| conf.setJournalDirName(leftArgs[2]); |
| String[] ledgerDirNames = new String[leftArgs.length - 3]; |
| System.arraycopy(leftArgs, 3, ledgerDirNames, 0, ledgerDirNames.length); |
| conf.setLedgerDirNames(ledgerDirNames); |
| |
| return conf; |
| } catch (ParseException e) { |
| LOG.error("Error parsing command line arguments : ", e); |
| throw new IllegalArgumentException(e); |
| } |
| } |
| |
| /** |
| * @param args |
| * @throws IOException |
| * @throws InterruptedException |
| */ |
| public static void main(String[] args) { |
| ServerConfiguration conf = null; |
| try { |
| conf = parseArgs(args); |
| } catch (IllegalArgumentException iae) { |
| LOG.error("Error parsing command line arguments : ", iae); |
| System.err.println(iae.getMessage()); |
| printUsage(); |
| System.exit(ExitCode.INVALID_CONF); |
| } |
| |
| StringBuilder sb = new StringBuilder(); |
| String[] ledgerDirNames = conf.getLedgerDirNames(); |
| for (int i = 0; i < ledgerDirNames.length; i++) { |
| if (i != 0) { |
| sb.append(','); |
| } |
| sb.append(ledgerDirNames[i]); |
| } |
| |
| String hello = String.format( |
| "Hello, I'm your bookie, listening on port %1$s. ZKServers are on %2$s. Journals are in %3$s. Ledgers are stored in %4$s.", |
| conf.getBookiePort(), conf.getZkServers(), |
| conf.getJournalDirName(), sb); |
| LOG.info(hello); |
| try { |
| // Initialize Stats Provider |
| Class<? extends StatsProvider> statsProviderClass = |
| conf.getStatsProviderClass(); |
| final StatsProvider statsProvider = ReflectionUtils.newInstance(statsProviderClass); |
| statsProvider.start(conf); |
| |
| final BookieServer bs = new BookieServer(conf, statsProvider.getStatsLogger("")); |
| bs.start(); |
| Runtime.getRuntime().addShutdownHook(new Thread() { |
| @Override |
| public void run() { |
| bs.shutdown(); |
| LOG.info("Shut down bookie server successfully"); |
| } |
| }); |
| LOG.info("Register shutdown hook successfully"); |
| bs.join(); |
| |
| statsProvider.stop(); |
| LOG.info("Stop stats provider"); |
| System.exit(bs.getExitCode()); |
| } catch (Exception e) { |
| LOG.error("Exception running bookie server : ", e); |
| System.exit(ExitCode.SERVER_EXCEPTION); |
| } |
| } |
| } |