blob: 55da127bfbf7b3f1b79d35f56ac0ee6923e0aa7c [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.stream.server;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.bookkeeper.stream.storage.StorageConstants.ZK_METADATA_ROOT_PATH;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import java.io.File;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.UnknownHostException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
import org.apache.bookkeeper.clients.impl.internal.StorageServerClientManagerImpl;
import org.apache.bookkeeper.common.component.ComponentStarter;
import org.apache.bookkeeper.common.component.LifecycleComponent;
import org.apache.bookkeeper.common.component.LifecycleComponentStack;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog.DLCheckpointStore;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.apache.bookkeeper.stream.server.conf.BookieConfiguration;
import org.apache.bookkeeper.stream.server.conf.DLConfiguration;
import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration;
import org.apache.bookkeeper.stream.server.grpc.GrpcServerSpec;
import org.apache.bookkeeper.stream.server.service.BookieService;
import org.apache.bookkeeper.stream.server.service.BookieWatchService;
import org.apache.bookkeeper.stream.server.service.ClusterControllerService;
import org.apache.bookkeeper.stream.server.service.CuratorProviderService;
import org.apache.bookkeeper.stream.server.service.DLNamespaceProviderService;
import org.apache.bookkeeper.stream.server.service.GrpcService;
import org.apache.bookkeeper.stream.server.service.RegistrationServiceProvider;
import org.apache.bookkeeper.stream.server.service.RegistrationStateService;
import org.apache.bookkeeper.stream.server.service.StatsProviderService;
import org.apache.bookkeeper.stream.server.service.StorageService;
import org.apache.bookkeeper.stream.storage.StorageContainerStoreBuilder;
import org.apache.bookkeeper.stream.storage.StorageResources;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
import org.apache.bookkeeper.stream.storage.impl.cluster.ClusterControllerImpl;
import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelector;
import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
import org.apache.bookkeeper.stream.storage.impl.routing.RoutingHeaderProxyInterceptor;
import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController;
import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl;
import org.apache.bookkeeper.stream.storage.impl.sc.ZkStorageContainerManager;
import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactoryImpl;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.distributedlog.DistributedLogConfiguration;
/**
* A storage server is a server that run storage service and serving rpc requests.
*/
@Slf4j
public class StorageServer {
private static class ServerArguments {
@Parameter(names = {"-c", "--conf"}, description = "Configuration file for storage server")
private String serverConfigFile;
@Parameter(names = {"-p", "--port"}, description = "Port to listen on for gPRC server")
private int port = 4181;
@Parameter(names = {"-h", "--help"}, description = "Show this help message")
private boolean help = false;
}
private static void loadConfFile(CompositeConfiguration conf, String confFile)
throws IllegalArgumentException {
try {
Configuration loadedConf = new PropertiesConfiguration(
new File(confFile).toURI().toURL());
conf.addConfiguration(loadedConf);
} catch (MalformedURLException e) {
log.error("Could not open configuration file {}", confFile, e);
throw new IllegalArgumentException("Could not open configuration file " + confFile, e);
} catch (ConfigurationException e) {
log.error("Malformed configuration file {}", confFile, e);
throw new IllegalArgumentException("Malformed configuration file " + confFile, e);
}
log.info("Loaded configuration file {}", confFile);
}
public static Endpoint createLocalEndpoint(int port, boolean useHostname) throws UnknownHostException {
String hostname;
if (useHostname) {
hostname = InetAddress.getLocalHost().getCanonicalHostName();
} else {
hostname = InetAddress.getLocalHost().getHostAddress();
}
return Endpoint.newBuilder()
.setHostname(hostname)
.setPort(port)
.build();
}
public static void main(String[] args) {
int retCode = doMain(args);
Runtime.getRuntime().exit(retCode);
}
static int doMain(String[] args) {
// register thread uncaught exception handler
Thread.setDefaultUncaughtExceptionHandler((thread, exception) ->
log.error("Uncaught exception in thread {}: {}", thread.getName(), exception.getMessage()));
// parse the commandline
ServerArguments arguments = new ServerArguments();
JCommander jCommander = new JCommander(arguments);
jCommander.setProgramName("StorageServer");
jCommander.parse(args);
if (arguments.help) {
jCommander.usage();
return ExitCode.INVALID_CONF.code();
}
CompositeConfiguration conf = new CompositeConfiguration();
if (null != arguments.serverConfigFile) {
loadConfFile(conf, arguments.serverConfigFile);
}
int grpcPort = arguments.port;
LifecycleComponent storageServer;
try {
storageServer = buildStorageServer(
conf,
grpcPort);
} catch (ConfigurationException e) {
log.error("Invalid storage configuration", e);
return ExitCode.INVALID_CONF.code();
} catch (UnknownHostException e) {
log.error("Unknonw host name", e);
return ExitCode.UNKNOWN_HOSTNAME.code();
}
CompletableFuture<Void> liveFuture =
ComponentStarter.startComponent(storageServer);
try {
liveFuture.get();
} catch (InterruptedException e) {
// the server is interrupted.
Thread.currentThread().interrupt();
log.info("Storage server is interrupted. Exiting ...");
} catch (ExecutionException e) {
log.info("Storage server is exiting ...");
}
return ExitCode.OK.code();
}
public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
int grpcPort)
throws UnknownHostException, ConfigurationException {
return buildStorageServer(conf, grpcPort, true, NullStatsLogger.INSTANCE);
}
public static LifecycleComponent buildStorageServer(CompositeConfiguration conf,
int grpcPort,
boolean startBookieAndStartProvider,
StatsLogger externalStatsLogger)
throws ConfigurationException, UnknownHostException {
LifecycleComponentStack.Builder serverBuilder = LifecycleComponentStack.newBuilder()
.withName("storage-server");
BookieConfiguration bkConf = BookieConfiguration.of(conf);
bkConf.validate();
DLConfiguration dlConf = DLConfiguration.of(conf);
dlConf.validate();
StorageServerConfiguration serverConf = StorageServerConfiguration.of(conf);
serverConf.validate();
StorageConfiguration storageConf = new StorageConfiguration(conf);
storageConf.validate();
// Get my local endpoint
Endpoint myEndpoint = createLocalEndpoint(grpcPort, false);
// Create shared resources
StorageResources storageResources = StorageResources.create();
// Create the stats provider
StatsLogger rootStatsLogger;
if (startBookieAndStartProvider) {
StatsProviderService statsProviderService = new StatsProviderService(bkConf);
rootStatsLogger = statsProviderService.getStatsProvider().getStatsLogger("");
serverBuilder.addComponent(statsProviderService);
log.info("Bookie configuration : {}", bkConf.asJson());
} else {
rootStatsLogger = checkNotNull(externalStatsLogger,
"External stats logger is not provided while not starting stats provider");
}
// dump configurations
log.info("Dlog configuration : {}", dlConf.asJson());
log.info("Storage configuration : {}", storageConf.asJson());
log.info("Server configuration : {}", serverConf.asJson());
// Create the bookie service
ServerConfiguration bkServerConf;
if (startBookieAndStartProvider) {
BookieService bookieService = new BookieService(bkConf, rootStatsLogger);
serverBuilder.addComponent(bookieService);
bkServerConf = bookieService.serverConf();
} else {
bkServerConf = new ServerConfiguration();
bkServerConf.loadConf(bkConf.getUnderlyingConf());
}
// Create the bookie watch service
BookieWatchService bkWatchService;
{
DistributedLogConfiguration dlogConf = new DistributedLogConfiguration();
dlogConf.loadConf(dlConf);
bkWatchService = new BookieWatchService(
dlogConf.getEnsembleSize(),
bkConf,
NullStatsLogger.INSTANCE);
}
// Create the curator provider service
CuratorProviderService curatorProviderService = new CuratorProviderService(
bkServerConf, dlConf, rootStatsLogger.scope("curator"));
// Create the distributedlog namespace service
DLNamespaceProviderService dlNamespaceProvider = new DLNamespaceProviderService(
bkServerConf,
dlConf,
rootStatsLogger.scope("dlog"));
// client settings for the proxy channels
StorageClientSettings proxyClientSettings = StorageClientSettings.newBuilder()
.serviceUri("bk://localhost:" + grpcPort)
.build();
// Create range (stream) store
StorageContainerStoreBuilder storageContainerStoreBuilder = StorageContainerStoreBuilder.newBuilder()
.withStatsLogger(rootStatsLogger.scope("storage"))
.withStorageConfiguration(storageConf)
// the storage resources shared across multiple components
.withStorageResources(storageResources)
// the placement policy
.withStorageContainerPlacementPolicyFactory(() -> {
long numStorageContainers;
try (ZkClusterMetadataStore store = new ZkClusterMetadataStore(
curatorProviderService.get(),
ZKMetadataDriverBase.resolveZkServers(bkServerConf),
ZK_METADATA_ROOT_PATH)) {
numStorageContainers = store.getClusterMetadata().getNumStorageContainers();
}
return StorageContainerPlacementPolicyImpl.of((int) numStorageContainers);
})
// the default log backend uri
.withDefaultBackendUri(dlNamespaceProvider.getDlogUri())
// with zk-based storage container manager
.withStorageContainerManagerFactory((storeConf, registry) ->
new ZkStorageContainerManager(
myEndpoint,
storageConf,
new ZkClusterMetadataStore(
curatorProviderService.get(),
ZKMetadataDriverBase.resolveZkServers(bkServerConf),
ZK_METADATA_ROOT_PATH),
registry,
rootStatsLogger.scope("sc").scope("manager")))
// with the inter storage container client manager
.withRangeStoreFactory(
new MVCCStoreFactoryImpl(
dlNamespaceProvider,
() -> new DLCheckpointStore(dlNamespaceProvider.get()),
storageConf.getRangeStoreDirs(),
storageResources,
storageConf.getServeReadOnlyTables()))
// with client manager for proxying grpc requests
.withStorageServerClientManager(() -> new StorageServerClientManagerImpl(
proxyClientSettings,
storageResources.scheduler(),
StorageServerChannel.factory(proxyClientSettings)
// intercept the channel to attach routing header
.andThen(channel -> channel.intercept(new RoutingHeaderProxyInterceptor()))
));
StorageService storageService = new StorageService(
storageConf, storageContainerStoreBuilder, rootStatsLogger.scope("storage"));
// Create gRPC server
StatsLogger rpcStatsLogger = rootStatsLogger.scope("grpc");
GrpcServerSpec serverSpec = GrpcServerSpec.builder()
.storeSupplier(storageService)
.storeServerConf(serverConf)
.endpoint(myEndpoint)
.statsLogger(rpcStatsLogger)
.build();
GrpcService grpcService = new GrpcService(
serverConf, serverSpec, rpcStatsLogger);
// Create a registration service provider
RegistrationServiceProvider regService = new RegistrationServiceProvider(
bkServerConf,
dlConf,
rootStatsLogger.scope("registration").scope("provider"));
// Create a registration state service only when service is ready.
RegistrationStateService regStateService = new RegistrationStateService(
myEndpoint,
bkServerConf,
bkConf,
regService,
rootStatsLogger.scope("registration"));
// Create a cluster controller service
ClusterControllerService clusterControllerService = new ClusterControllerService(
storageConf,
() -> new ClusterControllerImpl(
new ZkClusterMetadataStore(
curatorProviderService.get(),
ZKMetadataDriverBase.resolveZkServers(bkServerConf),
ZK_METADATA_ROOT_PATH),
regService.get(),
new DefaultStorageContainerController(),
new ZkClusterControllerLeaderSelector(curatorProviderService.get(), ZK_METADATA_ROOT_PATH),
storageConf),
rootStatsLogger.scope("cluster_controller"));
// Create all the service stack
return serverBuilder
.addComponent(bkWatchService) // service that watches bookies
.addComponent(curatorProviderService) // service that provides curator client
.addComponent(dlNamespaceProvider) // service that provides dl namespace
.addComponent(storageService) // range (stream) store
.addComponent(grpcService) // range (stream) server (gRPC)
.addComponent(regService) // service that provides registration client
.addComponent(regStateService) // service that manages server state
.addComponent(clusterControllerService) // service that run cluster controller service
.build();
}
}