| /* |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.bookkeeper.stream.server; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static org.apache.bookkeeper.stream.storage.StorageConstants.ZK_METADATA_ROOT_PATH; |
| |
| import com.beust.jcommander.JCommander; |
| import com.beust.jcommander.Parameter; |
| import java.io.File; |
| import java.net.InetAddress; |
| import java.net.MalformedURLException; |
| import java.net.UnknownHostException; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutionException; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.bookkeeper.clients.config.StorageClientSettings; |
| import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel; |
| import org.apache.bookkeeper.clients.impl.internal.StorageServerClientManagerImpl; |
| import org.apache.bookkeeper.common.component.ComponentStarter; |
| import org.apache.bookkeeper.common.component.LifecycleComponent; |
| import org.apache.bookkeeper.common.component.LifecycleComponentStack; |
| import org.apache.bookkeeper.conf.ServerConfiguration; |
| import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase; |
| import org.apache.bookkeeper.statelib.impl.rocksdb.checkpoint.dlog.DLCheckpointStore; |
| import org.apache.bookkeeper.stats.NullStatsLogger; |
| import org.apache.bookkeeper.stats.StatsLogger; |
| import org.apache.bookkeeper.stream.proto.common.Endpoint; |
| import org.apache.bookkeeper.stream.server.conf.BookieConfiguration; |
| import org.apache.bookkeeper.stream.server.conf.DLConfiguration; |
| import org.apache.bookkeeper.stream.server.conf.StorageServerConfiguration; |
| import org.apache.bookkeeper.stream.server.grpc.GrpcServerSpec; |
| import org.apache.bookkeeper.stream.server.service.BookieService; |
| import org.apache.bookkeeper.stream.server.service.BookieWatchService; |
| import org.apache.bookkeeper.stream.server.service.ClusterControllerService; |
| import org.apache.bookkeeper.stream.server.service.CuratorProviderService; |
| import org.apache.bookkeeper.stream.server.service.DLNamespaceProviderService; |
| import org.apache.bookkeeper.stream.server.service.GrpcService; |
| import org.apache.bookkeeper.stream.server.service.RegistrationServiceProvider; |
| import org.apache.bookkeeper.stream.server.service.RegistrationStateService; |
| import org.apache.bookkeeper.stream.server.service.StatsProviderService; |
| import org.apache.bookkeeper.stream.server.service.StorageService; |
| import org.apache.bookkeeper.stream.storage.StorageContainerStoreBuilder; |
| import org.apache.bookkeeper.stream.storage.StorageResources; |
| import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration; |
| import org.apache.bookkeeper.stream.storage.impl.cluster.ClusterControllerImpl; |
| import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterControllerLeaderSelector; |
| import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore; |
| import org.apache.bookkeeper.stream.storage.impl.routing.RoutingHeaderProxyInterceptor; |
| import org.apache.bookkeeper.stream.storage.impl.sc.DefaultStorageContainerController; |
| import org.apache.bookkeeper.stream.storage.impl.sc.StorageContainerPlacementPolicyImpl; |
| import org.apache.bookkeeper.stream.storage.impl.sc.ZkStorageContainerManager; |
| import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactoryImpl; |
| import org.apache.commons.configuration.CompositeConfiguration; |
| import org.apache.commons.configuration.Configuration; |
| import org.apache.commons.configuration.ConfigurationException; |
| import org.apache.commons.configuration.PropertiesConfiguration; |
| import org.apache.distributedlog.DistributedLogConfiguration; |
| |
| /** |
| * A storage server is a server that run storage service and serving rpc requests. |
| */ |
| @Slf4j |
| public class StorageServer { |
| |
| private static class ServerArguments { |
| |
| @Parameter(names = {"-c", "--conf"}, description = "Configuration file for storage server") |
| private String serverConfigFile; |
| |
| @Parameter(names = {"-p", "--port"}, description = "Port to listen on for gPRC server") |
| private int port = 4181; |
| |
| @Parameter(names = {"-h", "--help"}, description = "Show this help message") |
| private boolean help = false; |
| |
| } |
| |
| private static void loadConfFile(CompositeConfiguration conf, String confFile) |
| throws IllegalArgumentException { |
| try { |
| Configuration loadedConf = new PropertiesConfiguration( |
| new File(confFile).toURI().toURL()); |
| conf.addConfiguration(loadedConf); |
| } catch (MalformedURLException e) { |
| log.error("Could not open configuration file {}", confFile, e); |
| throw new IllegalArgumentException("Could not open configuration file " + confFile, e); |
| } catch (ConfigurationException e) { |
| log.error("Malformed configuration file {}", confFile, e); |
| throw new IllegalArgumentException("Malformed configuration file " + confFile, e); |
| } |
| log.info("Loaded configuration file {}", confFile); |
| } |
| |
| public static Endpoint createLocalEndpoint(int port, boolean useHostname) throws UnknownHostException { |
| String hostname; |
| if (useHostname) { |
| hostname = InetAddress.getLocalHost().getCanonicalHostName(); |
| } else { |
| hostname = InetAddress.getLocalHost().getHostAddress(); |
| } |
| return Endpoint.newBuilder() |
| .setHostname(hostname) |
| .setPort(port) |
| .build(); |
| } |
| |
| public static void main(String[] args) { |
| int retCode = doMain(args); |
| Runtime.getRuntime().exit(retCode); |
| } |
| |
| static int doMain(String[] args) { |
| // register thread uncaught exception handler |
| Thread.setDefaultUncaughtExceptionHandler((thread, exception) -> |
| log.error("Uncaught exception in thread {}: {}", thread.getName(), exception.getMessage())); |
| |
| // parse the commandline |
| ServerArguments arguments = new ServerArguments(); |
| JCommander jCommander = new JCommander(arguments); |
| jCommander.setProgramName("StorageServer"); |
| jCommander.parse(args); |
| |
| if (arguments.help) { |
| jCommander.usage(); |
| return ExitCode.INVALID_CONF.code(); |
| } |
| |
| CompositeConfiguration conf = new CompositeConfiguration(); |
| if (null != arguments.serverConfigFile) { |
| loadConfFile(conf, arguments.serverConfigFile); |
| } |
| |
| int grpcPort = arguments.port; |
| |
| LifecycleComponent storageServer; |
| try { |
| storageServer = buildStorageServer( |
| conf, |
| grpcPort); |
| } catch (ConfigurationException e) { |
| log.error("Invalid storage configuration", e); |
| return ExitCode.INVALID_CONF.code(); |
| } catch (UnknownHostException e) { |
| log.error("Unknonw host name", e); |
| return ExitCode.UNKNOWN_HOSTNAME.code(); |
| } |
| |
| CompletableFuture<Void> liveFuture = |
| ComponentStarter.startComponent(storageServer); |
| try { |
| liveFuture.get(); |
| } catch (InterruptedException e) { |
| // the server is interrupted. |
| Thread.currentThread().interrupt(); |
| log.info("Storage server is interrupted. Exiting ..."); |
| } catch (ExecutionException e) { |
| log.info("Storage server is exiting ..."); |
| } |
| return ExitCode.OK.code(); |
| } |
| |
| public static LifecycleComponent buildStorageServer(CompositeConfiguration conf, |
| int grpcPort) |
| throws UnknownHostException, ConfigurationException { |
| return buildStorageServer(conf, grpcPort, true, NullStatsLogger.INSTANCE); |
| } |
| |
| public static LifecycleComponent buildStorageServer(CompositeConfiguration conf, |
| int grpcPort, |
| boolean startBookieAndStartProvider, |
| StatsLogger externalStatsLogger) |
| throws ConfigurationException, UnknownHostException { |
| |
| LifecycleComponentStack.Builder serverBuilder = LifecycleComponentStack.newBuilder() |
| .withName("storage-server"); |
| |
| BookieConfiguration bkConf = BookieConfiguration.of(conf); |
| bkConf.validate(); |
| |
| DLConfiguration dlConf = DLConfiguration.of(conf); |
| dlConf.validate(); |
| |
| StorageServerConfiguration serverConf = StorageServerConfiguration.of(conf); |
| serverConf.validate(); |
| |
| StorageConfiguration storageConf = new StorageConfiguration(conf); |
| storageConf.validate(); |
| |
| // Get my local endpoint |
| Endpoint myEndpoint = createLocalEndpoint(grpcPort, false); |
| |
| // Create shared resources |
| StorageResources storageResources = StorageResources.create(); |
| |
| // Create the stats provider |
| StatsLogger rootStatsLogger; |
| if (startBookieAndStartProvider) { |
| StatsProviderService statsProviderService = new StatsProviderService(bkConf); |
| rootStatsLogger = statsProviderService.getStatsProvider().getStatsLogger(""); |
| serverBuilder.addComponent(statsProviderService); |
| log.info("Bookie configuration : {}", bkConf.asJson()); |
| } else { |
| rootStatsLogger = checkNotNull(externalStatsLogger, |
| "External stats logger is not provided while not starting stats provider"); |
| } |
| |
| // dump configurations |
| log.info("Dlog configuration : {}", dlConf.asJson()); |
| log.info("Storage configuration : {}", storageConf.asJson()); |
| log.info("Server configuration : {}", serverConf.asJson()); |
| |
| // Create the bookie service |
| ServerConfiguration bkServerConf; |
| if (startBookieAndStartProvider) { |
| BookieService bookieService = new BookieService(bkConf, rootStatsLogger); |
| serverBuilder.addComponent(bookieService); |
| bkServerConf = bookieService.serverConf(); |
| } else { |
| bkServerConf = new ServerConfiguration(); |
| bkServerConf.loadConf(bkConf.getUnderlyingConf()); |
| } |
| |
| // Create the bookie watch service |
| BookieWatchService bkWatchService; |
| { |
| DistributedLogConfiguration dlogConf = new DistributedLogConfiguration(); |
| dlogConf.loadConf(dlConf); |
| bkWatchService = new BookieWatchService( |
| dlogConf.getEnsembleSize(), |
| bkConf, |
| NullStatsLogger.INSTANCE); |
| } |
| |
| // Create the curator provider service |
| CuratorProviderService curatorProviderService = new CuratorProviderService( |
| bkServerConf, dlConf, rootStatsLogger.scope("curator")); |
| |
| // Create the distributedlog namespace service |
| DLNamespaceProviderService dlNamespaceProvider = new DLNamespaceProviderService( |
| bkServerConf, |
| dlConf, |
| rootStatsLogger.scope("dlog")); |
| |
| // client settings for the proxy channels |
| StorageClientSettings proxyClientSettings = StorageClientSettings.newBuilder() |
| .serviceUri("bk://localhost:" + grpcPort) |
| .build(); |
| // Create range (stream) store |
| StorageContainerStoreBuilder storageContainerStoreBuilder = StorageContainerStoreBuilder.newBuilder() |
| .withStatsLogger(rootStatsLogger.scope("storage")) |
| .withStorageConfiguration(storageConf) |
| // the storage resources shared across multiple components |
| .withStorageResources(storageResources) |
| // the placement policy |
| .withStorageContainerPlacementPolicyFactory(() -> { |
| long numStorageContainers; |
| try (ZkClusterMetadataStore store = new ZkClusterMetadataStore( |
| curatorProviderService.get(), |
| ZKMetadataDriverBase.resolveZkServers(bkServerConf), |
| ZK_METADATA_ROOT_PATH)) { |
| numStorageContainers = store.getClusterMetadata().getNumStorageContainers(); |
| } |
| return StorageContainerPlacementPolicyImpl.of((int) numStorageContainers); |
| }) |
| // the default log backend uri |
| .withDefaultBackendUri(dlNamespaceProvider.getDlogUri()) |
| // with zk-based storage container manager |
| .withStorageContainerManagerFactory((storeConf, registry) -> |
| new ZkStorageContainerManager( |
| myEndpoint, |
| storageConf, |
| new ZkClusterMetadataStore( |
| curatorProviderService.get(), |
| ZKMetadataDriverBase.resolveZkServers(bkServerConf), |
| ZK_METADATA_ROOT_PATH), |
| registry, |
| rootStatsLogger.scope("sc").scope("manager"))) |
| // with the inter storage container client manager |
| .withRangeStoreFactory( |
| new MVCCStoreFactoryImpl( |
| dlNamespaceProvider, |
| () -> new DLCheckpointStore(dlNamespaceProvider.get()), |
| storageConf.getRangeStoreDirs(), |
| storageResources, |
| storageConf.getServeReadOnlyTables())) |
| // with client manager for proxying grpc requests |
| .withStorageServerClientManager(() -> new StorageServerClientManagerImpl( |
| proxyClientSettings, |
| storageResources.scheduler(), |
| StorageServerChannel.factory(proxyClientSettings) |
| // intercept the channel to attach routing header |
| .andThen(channel -> channel.intercept(new RoutingHeaderProxyInterceptor())) |
| )); |
| StorageService storageService = new StorageService( |
| storageConf, storageContainerStoreBuilder, rootStatsLogger.scope("storage")); |
| |
| // Create gRPC server |
| StatsLogger rpcStatsLogger = rootStatsLogger.scope("grpc"); |
| GrpcServerSpec serverSpec = GrpcServerSpec.builder() |
| .storeSupplier(storageService) |
| .storeServerConf(serverConf) |
| .endpoint(myEndpoint) |
| .statsLogger(rpcStatsLogger) |
| .build(); |
| GrpcService grpcService = new GrpcService( |
| serverConf, serverSpec, rpcStatsLogger); |
| |
| // Create a registration service provider |
| RegistrationServiceProvider regService = new RegistrationServiceProvider( |
| bkServerConf, |
| dlConf, |
| rootStatsLogger.scope("registration").scope("provider")); |
| |
| // Create a registration state service only when service is ready. |
| RegistrationStateService regStateService = new RegistrationStateService( |
| myEndpoint, |
| bkServerConf, |
| bkConf, |
| regService, |
| rootStatsLogger.scope("registration")); |
| |
| // Create a cluster controller service |
| ClusterControllerService clusterControllerService = new ClusterControllerService( |
| storageConf, |
| () -> new ClusterControllerImpl( |
| new ZkClusterMetadataStore( |
| curatorProviderService.get(), |
| ZKMetadataDriverBase.resolveZkServers(bkServerConf), |
| ZK_METADATA_ROOT_PATH), |
| regService.get(), |
| new DefaultStorageContainerController(), |
| new ZkClusterControllerLeaderSelector(curatorProviderService.get(), ZK_METADATA_ROOT_PATH), |
| storageConf), |
| rootStatsLogger.scope("cluster_controller")); |
| |
| // Create all the service stack |
| return serverBuilder |
| .addComponent(bkWatchService) // service that watches bookies |
| .addComponent(curatorProviderService) // service that provides curator client |
| .addComponent(dlNamespaceProvider) // service that provides dl namespace |
| .addComponent(storageService) // range (stream) store |
| .addComponent(grpcService) // range (stream) server (gRPC) |
| .addComponent(regService) // service that provides registration client |
| .addComponent(regStateService) // service that manages server state |
| .addComponent(clusterControllerService) // service that run cluster controller service |
| .build(); |
| } |
| |
| } |