blob: 4e6a7821d44a5d4351176abbd37f86a4026e04cd [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.proto;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_SCOPE;
import static org.apache.bookkeeper.conf.AbstractConfiguration.PERMITTED_STARTUP_USERS;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBufAllocator;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.net.UnknownHostException;
import java.security.AccessControlException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieCriticalThread;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.bookie.ExitCode;
import org.apache.bookkeeper.bookie.ReadOnlyBookie;
import org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
import org.apache.bookkeeper.common.util.JsonUtil.ParseJsonException;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.BookieServiceInfoUtils;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.processor.RequestProcessor;
import org.apache.bookkeeper.replication.ReplicationException.CompatibilityException;
import org.apache.bookkeeper.replication.ReplicationException.UnavailableException;
import org.apache.bookkeeper.server.Main;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.tls.SecurityException;
import org.apache.bookkeeper.tls.SecurityHandlerFactory;
import org.apache.bookkeeper.tls.SecurityProviderFactoryFactory;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implements the server-side part of the BookKeeper protocol.
*
*/
public class BookieServer {
final ServerConfiguration conf;
BookieNettyServer nettyServer;
private volatile boolean running = false;
private final Bookie bookie;
DeathWatcher deathWatcher;
private static final Logger LOG = LoggerFactory.getLogger(BookieServer.class);
int exitCode = ExitCode.OK;
// request processor
private final RequestProcessor requestProcessor;
// Expose Stats
private final StatsLogger statsLogger;
// Exception handler
private volatile UncaughtExceptionHandler uncaughtExceptionHandler = null;
public BookieServer(ServerConfiguration conf) throws IOException,
KeeperException, InterruptedException, BookieException,
UnavailableException, CompatibilityException, SecurityException {
this(conf, NullStatsLogger.INSTANCE, null);
}
public BookieServer(ServerConfiguration conf, StatsLogger statsLogger,
Supplier<BookieServiceInfo> bookieServiceInfoProvider)
throws IOException, KeeperException, InterruptedException,
BookieException, UnavailableException, CompatibilityException, SecurityException {
if (bookieServiceInfoProvider == null) {
bookieServiceInfoProvider = () -> {
try {
return BookieServiceInfoUtils
.buildLegacyBookieServiceInfo(this.getLocalAddress().toBookieId().toString());
} catch (IOException err) {
throw new RuntimeException(err);
}
};
}
this.conf = conf;
validateUser(conf);
String configAsString;
try {
configAsString = conf.asJson();
LOG.info(configAsString);
} catch (ParseJsonException pe) {
LOG.error("Got ParseJsonException while converting Config to JSONString", pe);
}
ByteBufAllocator allocator = getAllocator(conf);
this.statsLogger = statsLogger;
this.nettyServer = new BookieNettyServer(this.conf, null, allocator);
try {
this.bookie = newBookie(conf, allocator, bookieServiceInfoProvider);
} catch (IOException | KeeperException | InterruptedException | BookieException e) {
// interrupted on constructing a bookie
this.nettyServer.shutdown();
throw e;
}
final SecurityHandlerFactory shFactory;
shFactory = SecurityProviderFactoryFactory
.getSecurityProviderFactory(conf.getTLSProviderFactoryClass());
this.requestProcessor = new BookieRequestProcessor(conf, bookie,
statsLogger.scope(SERVER_SCOPE), shFactory, allocator);
this.nettyServer.setRequestProcessor(this.requestProcessor);
}
/**
* Currently the uncaught exception handler is used for DeathWatcher to notify
* lifecycle management that a bookie is dead for some reasons.
*
* <p>in future, we can register this <tt>exceptionHandler</tt> to critical threads
* so when those threads are dead, it will automatically trigger lifecycle management
* to shutdown the process.
*/
public void setExceptionHandler(UncaughtExceptionHandler exceptionHandler) {
this.uncaughtExceptionHandler = exceptionHandler;
}
protected Bookie newBookie(ServerConfiguration conf, ByteBufAllocator allocator,
Supplier<BookieServiceInfo> bookieServiceInfoProvider)
throws IOException, KeeperException, InterruptedException, BookieException {
return conf.isForceReadOnlyBookie()
? new ReadOnlyBookie(conf, statsLogger.scope(BOOKIE_SCOPE), allocator, bookieServiceInfoProvider)
: new BookieImpl(conf, statsLogger.scope(BOOKIE_SCOPE), allocator, bookieServiceInfoProvider);
}
public void start() throws InterruptedException {
this.bookie.start();
// fail fast, when bookie startup is not successful
if (!this.bookie.isRunning()) {
exitCode = bookie.getExitCode();
this.requestProcessor.close();
return;
}
this.nettyServer.start();
running = true;
deathWatcher = new DeathWatcher(conf);
if (null != uncaughtExceptionHandler) {
deathWatcher.setUncaughtExceptionHandler(uncaughtExceptionHandler);
}
deathWatcher.start();
// fixes test flappers at random places until ISSUE#1400 is resolved
// https://github.com/apache/bookkeeper/issues/1400
TimeUnit.MILLISECONDS.sleep(250);
}
@VisibleForTesting
public BookieSocketAddress getLocalAddress() throws UnknownHostException {
return BookieImpl.getBookieAddress(conf);
}
@VisibleForTesting
public BookieId getBookieId() throws UnknownHostException {
return BookieImpl.getBookieId(conf);
}
@VisibleForTesting
public Bookie getBookie() {
return bookie;
}
@VisibleForTesting
public BookieRequestProcessor getBookieRequestProcessor() {
return (BookieRequestProcessor) requestProcessor;
}
/**
* Suspend processing of requests in the bookie (for testing).
*/
@VisibleForTesting
public void suspendProcessing() {
if (LOG.isDebugEnabled()) {
LOG.debug("Suspending bookie server, port is {}", conf.getBookiePort());
}
nettyServer.suspendProcessing();
}
/**
* Resume processing requests in the bookie (for testing).
*/
@VisibleForTesting
public void resumeProcessing() {
if (LOG.isDebugEnabled()) {
LOG.debug("Resuming bookie server, port is {}", conf.getBookiePort());
}
nettyServer.resumeProcessing();
}
public synchronized void shutdown() {
LOG.info("Shutting down BookieServer");
this.nettyServer.shutdown();
if (!running) {
return;
}
exitCode = bookie.shutdown();
this.requestProcessor.close();
running = false;
}
/**
* Ensure the current user can start-up the process if it's restricted.
*/
private void validateUser(ServerConfiguration conf) throws AccessControlException {
if (conf.containsKey(PERMITTED_STARTUP_USERS)) {
String currentUser = System.getProperty("user.name");
String[] propertyValue = conf.getPermittedStartupUsers();
for (String s : propertyValue) {
if (s.equals(currentUser)) {
return;
}
}
String errorMsg =
"System cannot start because current user isn't in permittedStartupUsers."
+ " Current user: " + currentUser + " permittedStartupUsers: "
+ Arrays.toString(propertyValue);
LOG.error(errorMsg);
throw new AccessControlException(errorMsg);
}
}
public boolean isRunning() {
return bookie.isRunning() && nettyServer.isRunning() && running;
}
/**
* Whether bookie is running?
*
* @return true if bookie is running, otherwise return false
*/
public boolean isBookieRunning() {
return bookie.isRunning();
}
public void join() throws InterruptedException {
bookie.join();
}
public int getExitCode() {
return exitCode;
}
/**
* A thread to watch whether bookie and nioserver are still alive.
*/
private class DeathWatcher extends BookieCriticalThread {
private final int watchInterval;
DeathWatcher(ServerConfiguration conf) {
super("BookieDeathWatcher-" + conf.getBookiePort());
watchInterval = conf.getDeathWatchInterval();
// set a default uncaught exception handler to shutdown the bookie server
// when it notices the bookie is not running any more.
setUncaughtExceptionHandler((thread, cause) -> {
LOG.info("BookieDeathWatcher exited loop due to uncaught exception from thread {}",
thread.getName(), cause);
shutdown();
});
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(watchInterval);
} catch (InterruptedException ie) {
// do nothing
Thread.currentThread().interrupt();
}
if (!isBookieRunning()) {
LOG.info("BookieDeathWatcher noticed the bookie is not running any more, exiting the watch loop!");
// death watcher has noticed that bookie is not running any more
// throw an exception to fail the death watcher thread and it will
// trigger the uncaught exception handler to handle this "bookie not running" situation.
throw new RuntimeException("Bookie is not running any more");
}
}
}
}
private ByteBufAllocator getAllocator(ServerConfiguration conf) {
return ByteBufAllocatorBuilder.create()
.poolingPolicy(conf.getAllocatorPoolingPolicy())
.poolingConcurrency(conf.getAllocatorPoolingConcurrency())
.outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy())
.outOfMemoryListener((ex) -> {
try {
LOG.error("Unable to allocate memory, exiting bookie", ex);
} finally {
if (uncaughtExceptionHandler != null) {
uncaughtExceptionHandler.uncaughtException(Thread.currentThread(), ex);
}
}
})
.leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy())
.build();
}
/**
* Legacy Method to run bookie server.
*/
public static void main(String[] args) {
Main.main(args);
}
@Override
public String toString() {
String addr = "UNKNOWN";
String id = "?";
try {
addr = BookieImpl.getBookieAddress(conf).toString();
id = getBookieId().toString();
} catch (UnknownHostException e) {
//Ignored...
}
return "Bookie Server listening on " + addr + " with id " + id;
}
}