blob: d27209c62b7df54ba8802bd36cb2b4185a66f755 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.bookie;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.BOOKIE_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.SERVER_STATUS;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.UnknownHostException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.bookkeeper.util.DiskChecker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An implementation of StateManager.
*/
@Slf4j
@StatsDoc(
name = BOOKIE_SCOPE,
category = CATEGORY_SERVER,
help = "Bookie state manager related stats"
)
public class BookieStateManager implements StateManager {
private static final Logger LOG = LoggerFactory.getLogger(BookieStateManager.class);
private final ServerConfiguration conf;
private final Supplier<BookieServiceInfo> bookieServiceInfoProvider;
private final List<File> statusDirs;
// use an executor to execute the state changes task
final ExecutorService stateService = Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("BookieStateManagerService-%d").build());
// Running flag
private volatile boolean running = false;
// Flag identify whether it is in shutting down progress
private volatile boolean shuttingdown = false;
// Bookie status
private final BookieStatus bookieStatus = new BookieStatus();
private final AtomicBoolean rmRegistered = new AtomicBoolean(false);
private final AtomicBoolean forceReadOnly = new AtomicBoolean(false);
private volatile boolean availableForHighPriorityWrites = true;
private final BookieId bookieId;
private ShutdownHandler shutdownHandler;
private final Supplier<RegistrationManager> rm;
// Expose Stats
@StatsDoc(
name = SERVER_STATUS,
help = "Bookie status (1: up, 0: readonly, -1: unregistered)"
)
private final Gauge<Number> serverStatusGauge;
public BookieStateManager(ServerConfiguration conf,
StatsLogger statsLogger,
MetadataBookieDriver metadataDriver,
LedgerDirsManager ledgerDirsManager,
Supplier<BookieServiceInfo> bookieServiceInfoProvider) throws IOException {
this(
conf,
statsLogger,
() -> null == metadataDriver ? null : metadataDriver.getRegistrationManager(),
ledgerDirsManager.getAllLedgerDirs(),
() -> {
try {
return BookieImpl.getBookieId(conf);
} catch (UnknownHostException e) {
throw new UncheckedIOException("Failed to resolve bookie id", e);
}
},
bookieServiceInfoProvider);
}
public BookieStateManager(ServerConfiguration conf,
StatsLogger statsLogger,
Supplier<RegistrationManager> rm,
List<File> statusDirs,
Supplier<BookieId> bookieIdSupplier,
Supplier<BookieServiceInfo> bookieServiceInfoProvider) throws IOException {
this.conf = conf;
this.rm = rm;
this.statusDirs = statusDirs;
// ZK ephemeral node for this Bookie.
this.bookieId = bookieIdSupplier.get();
this.bookieServiceInfoProvider = bookieServiceInfoProvider;
// 1 : up, 0 : readonly, -1 : unregistered
this.serverStatusGauge = new Gauge<Number>() {
@Override
public Number getDefaultValue() {
return 0;
}
@Override
public Number getSample() {
if (!rmRegistered.get()){
return -1;
} else if (forceReadOnly.get() || bookieStatus.isInReadOnlyMode()) {
return 0;
} else {
return 1;
}
}
};
statsLogger.registerGauge(SERVER_STATUS, serverStatusGauge);
}
private boolean isRegistrationManagerDisabled() {
return null == rm || null == rm.get();
}
@VisibleForTesting
BookieStateManager(ServerConfiguration conf, MetadataBookieDriver metadataDriver) throws IOException {
this(conf, NullStatsLogger.INSTANCE, metadataDriver, new LedgerDirsManager(conf, conf.getLedgerDirs(),
new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold()),
NullStatsLogger.INSTANCE), BookieServiceInfo.NO_INFO);
}
@Override
public void initState(){
if (forceReadOnly.get()) {
this.bookieStatus.setToReadOnlyMode();
} else if (conf.isPersistBookieStatusEnabled()) {
this.bookieStatus.readFromDirectories(statusDirs);
}
running = true;
}
@Override
public void forceToShuttingDown(){
// mark bookie as in shutting down progress
shuttingdown = true;
}
@Override
public void forceToReadOnly(){
this.forceReadOnly.set(true);
}
@Override
public void forceToUnregistered(){
this.rmRegistered.set(false);
}
@Override
public boolean isReadOnly(){
return forceReadOnly.get() || bookieStatus.isInReadOnlyMode();
}
@Override
public boolean isAvailableForHighPriorityWrites() {
return availableForHighPriorityWrites;
}
@Override
public void setHighPriorityWritesAvailability(boolean available) {
if (this.availableForHighPriorityWrites && !available) {
log.info("Disable high priority writes on readonly bookie.");
} else if (!this.availableForHighPriorityWrites && available) {
log.info("Enable high priority writes on readonly bookie.");
}
this.availableForHighPriorityWrites = available;
}
@Override
public boolean isRunning(){
return running;
}
@Override
public boolean isShuttingDown(){
return shuttingdown;
}
@Override
public void close() {
this.running = false;
stateService.shutdown();
}
@Override
public Future<Void> registerBookie(final boolean throwException) {
return stateService.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
try {
doRegisterBookie();
} catch (IOException ioe) {
if (throwException) {
throw ioe;
} else {
LOG.error("Couldn't register bookie with zookeeper, shutting down : ", ioe);
shutdownHandler.shutdown(ExitCode.ZK_REG_FAIL);
}
}
return null;
}
});
}
@Override
public Future<Void> transitionToWritableMode() {
return stateService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception{
doTransitionToWritableMode();
return null;
}
});
}
@Override
public Future<Void> transitionToReadOnlyMode() {
return stateService.submit(new Callable<Void>() {
@Override
public Void call() throws Exception{
doTransitionToReadOnlyMode();
return null;
}
});
}
void doRegisterBookie() throws IOException {
doRegisterBookie(forceReadOnly.get() || bookieStatus.isInReadOnlyMode());
}
private void doRegisterBookie(boolean isReadOnly) throws IOException {
if (isRegistrationManagerDisabled()) {
// registration manager is null, means not register itself to metadata store.
LOG.info("null registration manager while do register");
return;
}
rmRegistered.set(false);
try {
rm.get().registerBookie(bookieId, isReadOnly, bookieServiceInfoProvider.get());
rmRegistered.set(true);
} catch (BookieException e) {
throw new IOException(e);
}
}
@VisibleForTesting
public void doTransitionToWritableMode() {
if (shuttingdown || forceReadOnly.get()) {
return;
}
if (!bookieStatus.setToWritableMode()) {
// do nothing if already in writable mode
return;
}
LOG.info("Transitioning Bookie to Writable mode and will serve read/write requests.");
if (conf.isPersistBookieStatusEnabled()) {
bookieStatus.writeToDirectories(statusDirs);
}
// change zookeeper state only when using zookeeper
if (isRegistrationManagerDisabled()) {
return;
}
try {
doRegisterBookie(false);
} catch (IOException e) {
LOG.warn("Error in transitioning back to writable mode : ", e);
transitionToReadOnlyMode();
return;
}
// clear the readonly state
try {
rm.get().unregisterBookie(bookieId, true);
} catch (BookieException e) {
// if we failed when deleting the readonly flag in zookeeper, it is OK since client would
// already see the bookie in writable list. so just log the exception
LOG.warn("Failed to delete bookie readonly state in zookeeper : ", e);
return;
}
}
@VisibleForTesting
public void doTransitionToReadOnlyMode() {
if (shuttingdown) {
return;
}
if (!bookieStatus.setToReadOnlyMode()) {
return;
}
if (!conf.isReadOnlyModeEnabled()) {
LOG.warn("ReadOnly mode is not enabled. "
+ "Can be enabled by configuring "
+ "'readOnlyModeEnabled=true' in configuration."
+ " Shutting down bookie");
shutdownHandler.shutdown(ExitCode.BOOKIE_EXCEPTION);
return;
}
LOG.info("Transitioning Bookie to ReadOnly mode,"
+ " and will serve only read requests from clients!");
// persist the bookie status if we enable this
if (conf.isPersistBookieStatusEnabled()) {
this.bookieStatus.writeToDirectories(statusDirs);
}
// change zookeeper state only when using zookeeper
if (isRegistrationManagerDisabled()) {
return;
}
try {
rm.get().registerBookie(bookieId, true, bookieServiceInfoProvider.get());
} catch (BookieException e) {
LOG.error("Error in transition to ReadOnly Mode."
+ " Shutting down", e);
shutdownHandler.shutdown(ExitCode.BOOKIE_EXCEPTION);
return;
}
}
@Override
public void setShutdownHandler(ShutdownHandler handler){
shutdownHandler = handler;
}
@VisibleForTesting
public ShutdownHandler getShutdownHandler(){
return shutdownHandler;
}
@VisibleForTesting
boolean isRegistered(){
return rmRegistered.get();
}
}