blob: aadd71e602149391c40a8bc7362dc94098f20420 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.service;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LocalDLMEmulator;
import org.apache.distributedlog.client.routing.SingleHostRoutingService;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.distributedlog.service.placement.EqualLoadAppraiser;
import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
import com.twitter.finagle.builder.Server;
import java.io.File;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.LocalBookKeeper;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* DistributedLog Cluster is an emulator to run distributedlog components.
*/
public class DistributedLogCluster {
private static final Logger LOG = LoggerFactory.getLogger(DistributedLogCluster.class);
public static Builder newBuilder() {
return new Builder();
}
/**
* Builder to build distributedlog cluster.
*/
public static class Builder {
int numBookies = 3;
boolean shouldStartZK = true;
String zkHost = "127.0.0.1";
int zkPort = 0;
boolean shouldStartProxy = true;
int proxyPort = 7000;
boolean thriftmux = false;
DistributedLogConfiguration dlConf = new DistributedLogConfiguration()
.setLockTimeout(10)
.setOutputBufferSize(0)
.setImmediateFlushEnabled(true);
ServerConfiguration bkConf = new ServerConfiguration();
private Builder() {}
/**
* How many bookies to run. By default is 3.
*
* @return builder
*/
public Builder numBookies(int numBookies) {
this.numBookies = numBookies;
return this;
}
/**
* Whether to start zookeeper? By default is true.
*
* @param startZK
* flag to start zookeeper?
* @return builder
*/
public Builder shouldStartZK(boolean startZK) {
this.shouldStartZK = startZK;
return this;
}
/**
* ZooKeeper server to run. By default it runs locally on '127.0.0.1'.
*
* @param zkServers
* zk servers
* @return builder
*/
public Builder zkServers(String zkServers) {
this.zkHost = zkServers;
return this;
}
/**
* ZooKeeper server port to listen on. By default it listens on 2181.
*
* @param zkPort
* zookeeper server port.
* @return builder.
*/
public Builder zkPort(int zkPort) {
this.zkPort = zkPort;
return this;
}
/**
* Whether to start proxy or not. By default is true.
*
* @param startProxy
* whether to start proxy or not.
* @return builder
*/
public Builder shouldStartProxy(boolean startProxy) {
this.shouldStartProxy = startProxy;
return this;
}
/**
* Port that proxy server to listen on. By default is 7000.
*
* @param proxyPort
* port that proxy server to listen on.
* @return builder
*/
public Builder proxyPort(int proxyPort) {
this.proxyPort = proxyPort;
return this;
}
/**
* Set the distributedlog configuration.
*
* @param dlConf
* distributedlog configuration
* @return builder
*/
public Builder dlConf(DistributedLogConfiguration dlConf) {
this.dlConf = dlConf;
return this;
}
/**
* Set the Bookkeeper server configuration.
*
* @param bkConf
* bookkeeper server configuration
* @return builder
*/
public Builder bkConf(ServerConfiguration bkConf) {
this.bkConf = bkConf;
return this;
}
/**
* Enable thriftmux for the dl server.
*
* @param enabled flag to enable thriftmux
* @return builder
*/
public Builder thriftmux(boolean enabled) {
this.thriftmux = enabled;
return this;
}
public DistributedLogCluster build() throws Exception {
// build the cluster
return new DistributedLogCluster(
dlConf,
bkConf,
numBookies,
shouldStartZK,
zkHost,
zkPort,
shouldStartProxy,
proxyPort,
thriftmux);
}
}
/**
* Run a distributedlog proxy server.
*/
public static class DLServer {
static final int MAX_RETRIES = 20;
static final int MIN_PORT = 1025;
static final int MAX_PORT = 65535;
int proxyPort;
public final InetSocketAddress address;
public final Pair<DistributedLogServiceImpl, Server> dlServer;
private final SingleHostRoutingService routingService = SingleHostRoutingService.of(null);
protected DLServer(DistributedLogConfiguration dlConf,
URI uri,
int basePort,
boolean thriftmux) throws Exception {
proxyPort = basePort;
boolean success = false;
int retries = 0;
Pair<DistributedLogServiceImpl, Server> serverPair = null;
while (!success) {
try {
org.apache.distributedlog.service.config.ServerConfiguration serverConf =
new org.apache.distributedlog.service.config.ServerConfiguration();
serverConf.loadConf(dlConf);
serverConf.setServerShardId(proxyPort);
serverPair = DistributedLogServer.runServer(
serverConf,
dlConf,
uri,
new IdentityStreamPartitionConverter(),
routingService,
new NullStatsProvider(),
proxyPort,
thriftmux,
new EqualLoadAppraiser());
routingService.setAddress(DLSocketAddress.getSocketAddress(proxyPort));
routingService.startService();
serverPair.getLeft().startPlacementPolicy();
success = true;
} catch (BindException be) {
retries++;
if (retries > MAX_RETRIES) {
throw be;
}
proxyPort++;
if (proxyPort > MAX_PORT) {
proxyPort = MIN_PORT;
}
}
}
LOG.info("Running DL on port {}", proxyPort);
dlServer = serverPair;
address = DLSocketAddress.getSocketAddress(proxyPort);
}
public InetSocketAddress getAddress() {
return address;
}
public void shutdown() {
DistributedLogServer.closeServer(dlServer, 0, TimeUnit.MILLISECONDS);
routingService.stopService();
}
}
private final DistributedLogConfiguration dlConf;
private final ZooKeeperServerShim zks;
private final LocalDLMEmulator dlmEmulator;
private DLServer dlServer;
private final boolean shouldStartProxy;
private final int proxyPort;
private final boolean thriftmux;
private final List<File> tmpDirs = new ArrayList<File>();
private DistributedLogCluster(DistributedLogConfiguration dlConf,
ServerConfiguration bkConf,
int numBookies,
boolean shouldStartZK,
String zkServers,
int zkPort,
boolean shouldStartProxy,
int proxyPort,
boolean thriftmux) throws Exception {
this.dlConf = dlConf;
if (shouldStartZK) {
File zkTmpDir = IOUtils.createTempDir("zookeeper", "distrlog");
tmpDirs.add(zkTmpDir);
if (0 == zkPort) {
Pair<ZooKeeperServerShim, Integer> serverAndPort = LocalDLMEmulator.runZookeeperOnAnyPort(zkTmpDir);
this.zks = serverAndPort.getLeft();
zkPort = serverAndPort.getRight();
} else {
this.zks = LocalBookKeeper.runZookeeper(1000, zkPort, zkTmpDir);
}
} else {
this.zks = null;
}
this.dlmEmulator = LocalDLMEmulator.newBuilder()
.numBookies(numBookies)
.zkHost(zkServers)
.zkPort(zkPort)
.serverConf(bkConf)
.shouldStartZK(false)
.build();
this.shouldStartProxy = shouldStartProxy;
this.proxyPort = proxyPort;
this.thriftmux = thriftmux;
}
public void start() throws Exception {
this.dlmEmulator.start();
BKDLConfig bkdlConfig = new BKDLConfig(this.dlmEmulator.getZkServers(), "/ledgers").setACLRootPath(".acl");
DLMetadata.create(bkdlConfig).update(this.dlmEmulator.getUri());
if (shouldStartProxy) {
this.dlServer = new DLServer(
dlConf,
this.dlmEmulator.getUri(),
proxyPort,
thriftmux);
} else {
this.dlServer = null;
}
}
public void stop() throws Exception {
if (null != dlServer) {
this.dlServer.shutdown();
}
this.dlmEmulator.teardown();
if (null != this.zks) {
this.zks.stop();
}
for (File dir : tmpDirs) {
FileUtils.deleteDirectory(dir);
}
}
public URI getUri() {
return this.dlmEmulator.getUri();
}
public String getZkServers() {
return this.dlmEmulator.getZkServers();
}
public String getProxyFinagleStr() {
return "inet!" + (dlServer == null ? "127.0.0.1:" + proxyPort : dlServer.getAddress().toString());
}
}