blob: bf5f438ac28647d885e0630fe3505ba93278533b [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.proto;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.UnknownHostException;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieCriticalThread;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.ExitCode;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.jmx.BKMBeanRegistry;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.replication.AutoRecoveryMain;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_SCOPE;
import static org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_SCOPE;
/**
* Implements the server-side part of the BookKeeper protocol.
*
*/
public class BookieServer {
final ServerConfiguration conf;
BookieNettyServer nettyServer;
private volatile boolean running = false;
Bookie bookie;
DeathWatcher deathWatcher;
private final static Logger LOG = LoggerFactory.getLogger(BookieServer.class);
int exitCode = ExitCode.OK;
// operation stats
protected BookieServerBean jmxBkServerBean;
AutoRecoveryMain autoRecoveryMain = null;
private boolean isAutoRecoveryDaemonEnabled;
// request processor
private final RequestProcessor requestProcessor;
// Expose Stats
private final StatsLogger statsLogger;
public BookieServer(ServerConfiguration conf) throws IOException,
KeeperException, InterruptedException, BookieException,
UnavailableException, CompatibilityException {
this(conf, NullStatsLogger.INSTANCE);
}
public BookieServer(ServerConfiguration conf, StatsLogger statsLogger)
throws IOException, KeeperException, InterruptedException,
BookieException, UnavailableException, CompatibilityException {
this.conf = conf;
this.statsLogger = statsLogger;
this.bookie = newBookie(conf);
this.requestProcessor = new BookieRequestProcessor(conf, bookie,
statsLogger.scope(SERVER_SCOPE));
this.nettyServer = new BookieNettyServer(this.conf, requestProcessor);
isAutoRecoveryDaemonEnabled = conf.isAutoRecoveryDaemonEnabled();
if (isAutoRecoveryDaemonEnabled) {
this.autoRecoveryMain = new AutoRecoveryMain(conf, statsLogger.scope(REPLICATION_SCOPE));
}
}
protected Bookie newBookie(ServerConfiguration conf)
throws IOException, KeeperException, InterruptedException, BookieException {
return new Bookie(conf, statsLogger.scope(BOOKIE_SCOPE));
}
public void start() throws IOException, UnavailableException {
this.bookie.start();
// fail fast, when bookie startup is not successful
if (!this.bookie.isRunning()) {
exitCode = bookie.getExitCode();
return;
}
if (isAutoRecoveryDaemonEnabled && this.autoRecoveryMain != null) {
this.autoRecoveryMain.start();
}
this.nettyServer.start();
running = true;
deathWatcher = new DeathWatcher(conf);
deathWatcher.start();
// register jmx
registerJMX();
}
@VisibleForTesting
public BookieSocketAddress getLocalAddress() throws UnknownHostException {
return Bookie.getBookieAddress(conf);
}
@VisibleForTesting
public Bookie getBookie() {
return bookie;
}
/**
* Suspend processing of requests in the bookie (for testing)
*/
@VisibleForTesting
public void suspendProcessing() {
LOG.debug("Suspending bookie server, port is {}", conf.getBookiePort());
nettyServer.suspendProcessing();
}
/**
* Resume processing requests in the bookie (for testing)
*/
@VisibleForTesting
public void resumeProcessing() {
LOG.debug("Resuming bookie server, port is {}", conf.getBookiePort());
nettyServer.resumeProcessing();
}
public synchronized void shutdown() {
if (!running) {
return;
}
LOG.info("Shutting down BookieServer");
this.nettyServer.shutdown();
exitCode = bookie.shutdown();
if (isAutoRecoveryDaemonEnabled && this.autoRecoveryMain != null) {
this.autoRecoveryMain.shutdown();
}
this.requestProcessor.close();
running = false;
// unregister JMX
unregisterJMX();
}
protected void registerJMX() {
try {
jmxBkServerBean = new BookieServerBean(conf, this);
BKMBeanRegistry.getInstance().register(jmxBkServerBean, null);
bookie.registerJMX(jmxBkServerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
jmxBkServerBean = null;
}
}
protected void unregisterJMX() {
try {
bookie.unregisterJMX();
if (jmxBkServerBean != null) {
BKMBeanRegistry.getInstance().unregister(jmxBkServerBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
jmxBkServerBean = null;
}
public boolean isRunning() {
return bookie.isRunning() && nettyServer.isRunning() && running;
}
/**
* Whether bookie is running?
*
* @return true if bookie is running, otherwise return false
*/
public boolean isBookieRunning() {
return bookie.isRunning();
}
/**
* Whether auto-recovery service running with Bookie?
*
* @return true if auto-recovery service is running, otherwise return false
*/
public boolean isAutoRecoveryRunning() {
return this.autoRecoveryMain != null
&& this.autoRecoveryMain.isAutoRecoveryRunning();
}
public void join() throws InterruptedException {
bookie.join();
}
public int getExitCode() {
return exitCode;
}
/**
* A thread to watch whether bookie & nioserver is still alive
*/
private class DeathWatcher extends BookieCriticalThread {
private final int watchInterval;
DeathWatcher(ServerConfiguration conf) {
super("BookieDeathWatcher-" + conf.getBookiePort());
watchInterval = conf.getDeathWatchInterval();
}
@Override
public void run() {
while(true) {
try {
Thread.sleep(watchInterval);
} catch (InterruptedException ie) {
// do nothing
}
if (!isBookieRunning()) {
shutdown();
break;
}
if (isAutoRecoveryDaemonEnabled && !isAutoRecoveryRunning()) {
LOG.error("Autorecovery daemon has stopped. Please check the logs");
isAutoRecoveryDaemonEnabled = false; // to avoid spamming the logs
}
}
LOG.info("BookieDeathWatcher exited loop!");
}
}
static final Options bkOpts = new Options();
static {
bkOpts.addOption("c", "conf", true, "Configuration for Bookie Server");
bkOpts.addOption("withAutoRecovery", false,
"Start Autorecovery service Bookie server");
bkOpts.addOption("h", "help", false, "Print help message");
}
/**
* Print usage
*/
private static void printUsage() {
HelpFormatter hf = new HelpFormatter();
hf.printHelp("BookieServer [options]\n\tor\n"
+ "BookieServer <bookie_port> <zk_servers> <journal_dir> <ledger_dir [ledger_dir]>", bkOpts);
}
private static void loadConfFile(ServerConfiguration conf, String confFile)
throws IllegalArgumentException {
try {
conf.loadConf(new File(confFile).toURI().toURL());
conf.validate();
} catch (MalformedURLException e) {
LOG.error("Could not open configuration file: " + confFile, e);
throw new IllegalArgumentException();
} catch (ConfigurationException e) {
LOG.error("Malformed configuration file: " + confFile, e);
throw new IllegalArgumentException();
}
LOG.info("Using configuration file " + confFile);
}
private static ServerConfiguration parseArgs(String[] args)
throws IllegalArgumentException {
try {
BasicParser parser = new BasicParser();
CommandLine cmdLine = parser.parse(bkOpts, args);
if (cmdLine.hasOption('h')) {
throw new IllegalArgumentException();
}
ServerConfiguration conf = new ServerConfiguration();
String[] leftArgs = cmdLine.getArgs();
if (cmdLine.hasOption('c')) {
if (null != leftArgs && leftArgs.length > 0) {
throw new IllegalArgumentException();
}
String confFile = cmdLine.getOptionValue("c");
loadConfFile(conf, confFile);
return conf;
}
if (cmdLine.hasOption("withAutoRecovery")) {
conf.setAutoRecoveryDaemonEnabled(true);
}
if (leftArgs.length < 4) {
throw new IllegalArgumentException();
}
// command line arguments overwrite settings in configuration file
conf.setBookiePort(Integer.parseInt(leftArgs[0]));
conf.setZkServers(leftArgs[1]);
conf.setJournalDirName(leftArgs[2]);
String[] ledgerDirNames = new String[leftArgs.length - 3];
System.arraycopy(leftArgs, 3, ledgerDirNames, 0, ledgerDirNames.length);
conf.setLedgerDirNames(ledgerDirNames);
return conf;
} catch (ParseException e) {
LOG.error("Error parsing command line arguments : ", e);
throw new IllegalArgumentException(e);
}
}
/**
* @param args
* @throws IOException
* @throws InterruptedException
*/
public static void main(String[] args) {
ServerConfiguration conf = null;
try {
conf = parseArgs(args);
} catch (IllegalArgumentException iae) {
LOG.error("Error parsing command line arguments : ", iae);
System.err.println(iae.getMessage());
printUsage();
System.exit(ExitCode.INVALID_CONF);
}
StringBuilder sb = new StringBuilder();
String[] ledgerDirNames = conf.getLedgerDirNames();
for (int i = 0; i < ledgerDirNames.length; i++) {
if (i != 0) {
sb.append(',');
}
sb.append(ledgerDirNames[i]);
}
String hello = String.format(
"Hello, I'm your bookie, listening on port %1$s. ZKServers are on %2$s. Journals are in %3$s. Ledgers are stored in %4$s.",
conf.getBookiePort(), conf.getZkServers(),
conf.getJournalDirName(), sb);
LOG.info(hello);
try {
// Initialize Stats Provider
Class<? extends StatsProvider> statsProviderClass =
conf.getStatsProviderClass();
final StatsProvider statsProvider = ReflectionUtils.newInstance(statsProviderClass);
statsProvider.start(conf);
final BookieServer bs = new BookieServer(conf, statsProvider.getStatsLogger(""));
bs.start();
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
bs.shutdown();
LOG.info("Shut down bookie server successfully");
}
});
LOG.info("Register shutdown hook successfully");
bs.join();
statsProvider.stop();
LOG.info("Stop stats provider");
System.exit(bs.getExitCode());
} catch (Exception e) {
LOG.error("Exception running bookie server : ", e);
System.exit(ExitCode.SERVER_EXCEPTION);
}
}
}