blob: 248bcf7b1ed0e972b8b913b86af1ff8de584a8ea [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog.service;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.client.routing.RoutingService;
import com.twitter.distributedlog.config.DynamicConfigurationFactory;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.service.announcer.Announcer;
import com.twitter.distributedlog.service.announcer.NOPAnnouncer;
import com.twitter.distributedlog.service.announcer.ServerSetAnnouncer;
import com.twitter.distributedlog.service.config.DefaultStreamConfigProvider;
import com.twitter.distributedlog.service.config.NullStreamConfigProvider;
import com.twitter.distributedlog.service.config.ServerConfiguration;
import com.twitter.distributedlog.service.config.ServiceStreamConfigProvider;
import com.twitter.distributedlog.service.config.StreamConfigProvider;
import com.twitter.distributedlog.service.placement.EqualLoadAppraiser;
import com.twitter.distributedlog.service.placement.LoadAppraiser;
import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
import com.twitter.distributedlog.thrift.service.DistributedLogService;
import com.twitter.distributedlog.util.ConfUtils;
import com.twitter.distributedlog.util.SchedulerUtils;
import com.twitter.finagle.Stack;
import com.twitter.finagle.ThriftMuxServer$;
import com.twitter.finagle.builder.Server;
import com.twitter.finagle.builder.ServerBuilder;
import com.twitter.finagle.stats.NullStatsReceiver;
import com.twitter.finagle.stats.StatsReceiver;
import com.twitter.finagle.thrift.ClientIdRequiredFilter;
import com.twitter.finagle.thrift.ThriftServerFramedCodec;
import com.twitter.finagle.transport.Transport;
import com.twitter.util.Duration;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;
/**
* Running the distributedlog proxy server.
*/
public class DistributedLogServer {
private static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class);
private static final String DEFAULT_LOAD_APPRIASER = EqualLoadAppraiser.class.getCanonicalName();
private DistributedLogServiceImpl dlService = null;
private Server server = null;
private RoutingService routingService;
private StatsProvider statsProvider;
private Announcer announcer = null;
private ScheduledExecutorService configExecutorService;
private long gracefulShutdownMs = 0L;
private final StatsReceiver statsReceiver;
private final CountDownLatch keepAliveLatch = new CountDownLatch(1);
private final Optional<String> uri;
private final Optional<String> conf;
private final Optional<String> streamConf;
private final Optional<Integer> port;
private final Optional<Integer> statsPort;
private final Optional<Integer> shardId;
private final Optional<Boolean> announceServerSet;
private final Optional<String> loadAppraiserClassStr;
private final Optional<Boolean> thriftmux;
DistributedLogServer(Optional<String> uri,
Optional<String> conf,
Optional<String> streamConf,
Optional<Integer> port,
Optional<Integer> statsPort,
Optional<Integer> shardId,
Optional<Boolean> announceServerSet,
Optional<String> loadAppraiserClass,
Optional<Boolean> thriftmux,
RoutingService routingService,
StatsReceiver statsReceiver,
StatsProvider statsProvider) {
this.uri = uri;
this.conf = conf;
this.streamConf = streamConf;
this.port = port;
this.statsPort = statsPort;
this.shardId = shardId;
this.announceServerSet = announceServerSet;
this.thriftmux = thriftmux;
this.routingService = routingService;
this.statsReceiver = statsReceiver;
this.statsProvider = statsProvider;
this.loadAppraiserClassStr = loadAppraiserClass;
}
public void runServer()
throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
if (!uri.isPresent()) {
throw new IllegalArgumentException("No distributedlog uri provided.");
}
URI dlUri = URI.create(uri.get());
DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
if (conf.isPresent()) {
String configFile = conf.get();
try {
dlConf.loadConf(new File(configFile).toURI().toURL());
} catch (ConfigurationException e) {
throw new IllegalArgumentException("Failed to load distributedlog configuration from "
+ configFile + ".");
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed "
+ configFile + ".");
}
}
this.configExecutorService = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder()
.setNameFormat("DistributedLogService-Dyncfg-%d")
.setDaemon(true)
.build());
// server configuration and dynamic configuration
ServerConfiguration serverConf = new ServerConfiguration();
serverConf.loadConf(dlConf);
// overwrite the shard id if it is provided in the args
if (shardId.isPresent()) {
serverConf.setServerShardId(shardId.get());
}
serverConf.validate();
DynamicDistributedLogConfiguration dynDlConf = getServiceDynConf(dlConf);
logger.info("Starting stats provider : {}", statsProvider.getClass());
statsProvider.start(dlConf);
if (announceServerSet.isPresent() && announceServerSet.get()) {
announcer = new ServerSetAnnouncer(
dlUri,
port.or(0),
statsPort.or(0),
shardId.or(0));
} else {
announcer = new NOPAnnouncer();
}
// Build the stream partition converter
StreamPartitionConverter converter;
try {
converter = ReflectionUtils.newInstance(serverConf.getStreamPartitionConverterClass());
} catch (ConfigurationException e) {
logger.warn("Failed to load configured stream-to-partition converter. Fallback to use {}",
IdentityStreamPartitionConverter.class.getName());
converter = new IdentityStreamPartitionConverter();
}
Class loadAppraiserClass = Class.forName(loadAppraiserClassStr.or(DEFAULT_LOAD_APPRIASER));
LoadAppraiser loadAppraiser = (LoadAppraiser) ReflectionUtils.newInstance(loadAppraiserClass);
logger.info("Supplied load appraiser class is " + loadAppraiserClassStr.get()
+ " Instantiated " + loadAppraiser.getClass().getCanonicalName());
StreamConfigProvider streamConfProvider =
getStreamConfigProvider(dlConf, converter);
// pre-run
preRun(dlConf, serverConf);
Pair<DistributedLogServiceImpl, Server> serverPair = runServer(
serverConf,
dlConf,
dynDlConf,
dlUri,
converter,
routingService,
statsProvider,
port.or(0),
keepAliveLatch,
statsReceiver,
thriftmux.isPresent(),
streamConfProvider,
loadAppraiser);
this.dlService = serverPair.getLeft();
this.server = serverPair.getRight();
// announce the service
announcer.announce();
// start the routing service after announced
routingService.startService();
logger.info("Started the routing service.");
dlService.startPlacementPolicy();
logger.info("Started the placement policy.");
}
protected void preRun(DistributedLogConfiguration conf, ServerConfiguration serverConf) {
this.gracefulShutdownMs = serverConf.getGracefulShutdownPeriodMs();
if (!serverConf.isDurableWriteEnabled()) {
conf.setDurableWriteEnabled(false);
}
}
private DynamicDistributedLogConfiguration getServiceDynConf(DistributedLogConfiguration dlConf)
throws ConfigurationException {
Optional<DynamicDistributedLogConfiguration> dynConf = Optional.absent();
if (conf.isPresent()) {
DynamicConfigurationFactory configFactory = new DynamicConfigurationFactory(
configExecutorService, dlConf.getDynamicConfigReloadIntervalSec(), TimeUnit.SECONDS);
dynConf = configFactory.getDynamicConfiguration(conf.get());
}
if (dynConf.isPresent()) {
return dynConf.get();
} else {
return ConfUtils.getConstDynConf(dlConf);
}
}
private StreamConfigProvider getStreamConfigProvider(DistributedLogConfiguration dlConf,
StreamPartitionConverter partitionConverter)
throws ConfigurationException {
StreamConfigProvider streamConfProvider = new NullStreamConfigProvider();
if (streamConf.isPresent() && conf.isPresent()) {
String dynConfigPath = streamConf.get();
String defaultConfigFile = conf.get();
streamConfProvider = new ServiceStreamConfigProvider(
dynConfigPath,
defaultConfigFile,
partitionConverter,
configExecutorService,
dlConf.getDynamicConfigReloadIntervalSec(),
TimeUnit.SECONDS);
} else if (conf.isPresent()) {
String configFile = conf.get();
streamConfProvider = new DefaultStreamConfigProvider(configFile, configExecutorService,
dlConf.getDynamicConfigReloadIntervalSec(), TimeUnit.SECONDS);
}
return streamConfProvider;
}
static Pair<DistributedLogServiceImpl, Server> runServer(
ServerConfiguration serverConf,
DistributedLogConfiguration dlConf,
URI dlUri,
StreamPartitionConverter converter,
RoutingService routingService,
StatsProvider provider,
int port,
boolean thriftmux,
LoadAppraiser loadAppraiser) throws IOException {
return runServer(serverConf,
dlConf,
ConfUtils.getConstDynConf(dlConf),
dlUri,
converter,
routingService,
provider,
port,
new CountDownLatch(0),
new NullStatsReceiver(),
thriftmux,
new NullStreamConfigProvider(),
loadAppraiser);
}
static Pair<DistributedLogServiceImpl, Server> runServer(
ServerConfiguration serverConf,
DistributedLogConfiguration dlConf,
DynamicDistributedLogConfiguration dynDlConf,
URI dlUri,
StreamPartitionConverter partitionConverter,
RoutingService routingService,
StatsProvider provider,
int port,
CountDownLatch keepAliveLatch,
StatsReceiver statsReceiver,
boolean thriftmux,
StreamConfigProvider streamConfProvider,
LoadAppraiser loadAppraiser) throws IOException {
logger.info("Running server @ uri {}.", dlUri);
boolean perStreamStatsEnabled = serverConf.isPerStreamStatEnabled();
StatsLogger perStreamStatsLogger;
if (perStreamStatsEnabled) {
perStreamStatsLogger = provider.getStatsLogger("stream");
} else {
perStreamStatsLogger = NullStatsLogger.INSTANCE;
}
// dl service
DistributedLogServiceImpl dlService = new DistributedLogServiceImpl(
serverConf,
dlConf,
dynDlConf,
streamConfProvider,
dlUri,
partitionConverter,
routingService,
provider.getStatsLogger(""),
perStreamStatsLogger,
keepAliveLatch,
loadAppraiser);
StatsReceiver serviceStatsReceiver = statsReceiver.scope("service");
StatsLogger serviceStatsLogger = provider.getStatsLogger("service");
ServerBuilder serverBuilder = ServerBuilder.get()
.name("DistributedLogServer")
.codec(ThriftServerFramedCodec.get())
.reportTo(statsReceiver)
.keepAlive(true)
.bindTo(new InetSocketAddress(port));
if (thriftmux) {
logger.info("Using thriftmux.");
Tuple2<Transport.Liveness, Stack.Param<Transport.Liveness>> livenessParam = new Transport.Liveness(
Duration.Top(), Duration.Top(), Option.apply((Object) Boolean.valueOf(true))).mk();
serverBuilder = serverBuilder.stack(
ThriftMuxServer$.MODULE$.configured(livenessParam._1(), livenessParam._2()));
}
logger.info("DistributedLogServer running with the following configuration : \n{}", dlConf.getPropsAsString());
// starts dl server
Server server = ServerBuilder.safeBuild(
new ClientIdRequiredFilter<byte[], byte[]>(serviceStatsReceiver).andThen(
new StatsFilter<byte[], byte[]>(serviceStatsLogger).andThen(
new DistributedLogService.Service(dlService, new TBinaryProtocol.Factory()))),
serverBuilder);
logger.info("Started DistributedLog Server.");
return Pair.of(dlService, server);
}
static void closeServer(Pair<DistributedLogServiceImpl, Server> pair,
long gracefulShutdownPeriod,
TimeUnit timeUnit) {
if (null != pair.getLeft()) {
pair.getLeft().shutdown();
if (gracefulShutdownPeriod > 0) {
try {
timeUnit.sleep(gracefulShutdownPeriod);
} catch (InterruptedException e) {
logger.info("Interrupted on waiting service shutting down state propagated to all clients : ", e);
}
}
}
if (null != pair.getRight()) {
logger.info("Closing dl thrift server.");
pair.getRight().close();
logger.info("Closed dl thrift server.");
}
}
/**
* Close the server.
*/
public void close() {
if (null != announcer) {
try {
announcer.unannounce();
} catch (IOException e) {
logger.warn("Error on unannouncing service : ", e);
}
announcer.close();
}
closeServer(Pair.of(dlService, server), gracefulShutdownMs, TimeUnit.MILLISECONDS);
routingService.stopService();
if (null != statsProvider) {
statsProvider.stop();
}
SchedulerUtils.shutdownScheduler(configExecutorService, 60, TimeUnit.SECONDS);
keepAliveLatch.countDown();
}
public void join() throws InterruptedException {
keepAliveLatch.await();
}
/**
* Running distributedlog server.
*
* @param uri distributedlog namespace
* @param conf distributedlog configuration file location
* @param streamConf per stream configuration dir location
* @param port listen port
* @param statsPort stats port
* @param shardId shard id
* @param announceServerSet whether to announce itself to server set
* @param thriftmux flag to enable thrift mux
* @param statsReceiver receiver to receive finagle stats
* @param statsProvider provider to receive dl stats
* @return distributedlog server
* @throws ConfigurationException
* @throws IllegalArgumentException
* @throws IOException
* @throws ClassNotFoundException
*/
public static DistributedLogServer runServer(
Optional<String> uri,
Optional<String> conf,
Optional<String> streamConf,
Optional<Integer> port,
Optional<Integer> statsPort,
Optional<Integer> shardId,
Optional<Boolean> announceServerSet,
Optional<String> loadAppraiserClass,
Optional<Boolean> thriftmux,
RoutingService routingService,
StatsReceiver statsReceiver,
StatsProvider statsProvider)
throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
final DistributedLogServer server = new DistributedLogServer(
uri,
conf,
streamConf,
port,
statsPort,
shardId,
announceServerSet,
loadAppraiserClass,
thriftmux,
routingService,
statsReceiver,
statsProvider);
server.runServer();
return server;
}
}