blob: 48430df4f3bb9256fc9b7cb5a382678aaf957757 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog.service.balancer;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.RateLimiter;
import com.twitter.common.zookeeper.ServerSet;
import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
import com.twitter.distributedlog.client.serverset.DLZkServerSet;
import com.twitter.distributedlog.impl.BKNamespaceDriver;
import com.twitter.distributedlog.service.ClientUtils;
import com.twitter.distributedlog.service.DLSocketAddress;
import com.twitter.distributedlog.service.DistributedLogClient;
import com.twitter.distributedlog.service.DistributedLogClientBuilder;
import com.twitter.distributedlog.tools.Tool;
import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.thrift.ClientId$;
import com.twitter.util.Await;
import com.twitter.util.Duration;
import java.net.InetSocketAddress;
import java.net.URI;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tool to rebalance cluster.
*/
public class BalancerTool extends Tool {
private static final Logger logger = LoggerFactory.getLogger(BalancerTool.class);
static DistributedLogClientBuilder createDistributedLogClientBuilder(ServerSet serverSet) {
return DistributedLogClientBuilder.newBuilder()
.name("rebalancer_tool")
.clientId(ClientId$.MODULE$.apply("rebalancer_tool"))
.maxRedirects(2)
.serverSet(serverSet)
.clientBuilder(ClientBuilder.get()
.connectionTimeout(Duration.fromSeconds(2))
.tcpConnectTimeout(Duration.fromSeconds(2))
.requestTimeout(Duration.fromSeconds(10))
.hostConnectionLimit(1)
.hostConnectionCoresize(1)
.keepAlive(true)
.failFast(false));
}
/**
* Base Command to run balancer.
*/
protected abstract static class BalancerCommand extends OptsCommand {
protected Options options = new Options();
protected int rebalanceWaterMark = 0;
protected double rebalanceTolerancePercentage = 0.0f;
protected int rebalanceConcurrency = 1;
protected Double rate = null;
protected Optional<RateLimiter> rateLimiter;
BalancerCommand(String name, String description) {
super(name, description);
options.addOption("rwm", "rebalance-water-mark", true, "Rebalance water mark per proxy");
options.addOption("rtp", "rebalance-tolerance-percentage", true,
"Rebalance tolerance percentage per proxy");
options.addOption("rc", "rebalance-concurrency", true, "Concurrency to rebalance stream distribution");
options.addOption("r", "rate", true, "Rebalance rate");
}
Optional<RateLimiter> getRateLimiter() {
return rateLimiter;
}
@Override
protected Options getOptions() {
return options;
}
protected void parseCommandLine(CommandLine cmdline) throws ParseException {
if (cmdline.hasOption("rwm")) {
this.rebalanceWaterMark = Integer.parseInt(cmdline.getOptionValue("rwm"));
}
if (cmdline.hasOption("rtp")) {
this.rebalanceTolerancePercentage = Double.parseDouble(cmdline.getOptionValue("rtp"));
}
if (cmdline.hasOption("rc")) {
this.rebalanceConcurrency = Integer.parseInt(cmdline.getOptionValue("rc"));
}
if (cmdline.hasOption("r")) {
this.rate = Double.parseDouble(cmdline.getOptionValue("r"));
}
checkArgument(rebalanceWaterMark >= 0,
"Rebalance Water Mark should be a non-negative number");
checkArgument(rebalanceTolerancePercentage >= 0.0f,
"Rebalance Tolerance Percentage should be a non-negative number");
checkArgument(rebalanceConcurrency > 0,
"Rebalance Concurrency should be a positive number");
if (null == rate || rate <= 0.0f) {
rateLimiter = Optional.absent();
} else {
rateLimiter = Optional.of(RateLimiter.create(rate));
}
}
@Override
protected int runCmd(CommandLine cmdline) throws Exception {
try {
parseCommandLine(cmdline);
} catch (ParseException pe) {
println("ERROR: fail to parse commandline : '" + pe.getMessage() + "'");
printUsage();
return -1;
}
return executeCommand(cmdline);
}
protected abstract int executeCommand(CommandLine cmdline) throws Exception;
}
/**
* Command to balance streams within a cluster.
*/
protected static class ClusterBalancerCommand extends BalancerCommand {
protected URI uri;
protected String source = null;
protected ClusterBalancerCommand() {
super("clusterbalancer", "Balance streams inside a cluster");
options.addOption("u", "uri", true, "DistributedLog URI");
options.addOption("sp", "source-proxy", true, "Source proxy to balance");
}
@Override
protected String getUsage() {
return "clusterbalancer [options]";
}
protected void parseCommandLine(CommandLine cmdline) throws ParseException {
super.parseCommandLine(cmdline);
if (!cmdline.hasOption("u")) {
throw new ParseException("No proxy serverset provided.");
}
uri = URI.create(cmdline.getOptionValue("u"));
if (cmdline.hasOption("sp")) {
String sourceProxyStr = cmdline.getOptionValue("sp");
try {
DLSocketAddress.parseSocketAddress(sourceProxyStr);
} catch (IllegalArgumentException iae) {
throw new ParseException("Invalid source proxy " + sourceProxyStr + " : " + iae.getMessage());
}
this.source = sourceProxyStr;
}
}
@Override
protected int executeCommand(CommandLine cmdline) throws Exception {
DLZkServerSet serverSet = DLZkServerSet.of(uri, 60000);
logger.info("Created serverset for {}", uri);
try {
DistributedLogClientBuilder clientBuilder =
createDistributedLogClientBuilder(serverSet.getServerSet());
ClusterBalancer balancer = new ClusterBalancer(clientBuilder);
try {
return runBalancer(clientBuilder, balancer);
} finally {
balancer.close();
}
} finally {
serverSet.close();
}
}
protected int runBalancer(DistributedLogClientBuilder clientBuilder,
ClusterBalancer balancer)
throws Exception {
if (null == source) {
balancer.balance(
rebalanceWaterMark,
rebalanceTolerancePercentage,
rebalanceConcurrency,
getRateLimiter());
} else {
balanceFromSource(clientBuilder, balancer, source, getRateLimiter());
}
return 0;
}
protected void balanceFromSource(DistributedLogClientBuilder clientBuilder,
ClusterBalancer balancer,
String source,
Optional<RateLimiter> rateLimiter)
throws Exception {
InetSocketAddress sourceAddr = DLSocketAddress.parseSocketAddress(source);
DistributedLogClientBuilder sourceClientBuilder =
DistributedLogClientBuilder.newBuilder(clientBuilder)
.host(sourceAddr);
Pair<DistributedLogClient, MonitorServiceClient> clientPair =
ClientUtils.buildClient(sourceClientBuilder);
try {
Await.result(clientPair.getRight().setAcceptNewStream(false));
logger.info("Disable accepting new stream on proxy {}.", source);
balancer.balanceAll(source, rebalanceConcurrency, rateLimiter);
} finally {
clientPair.getLeft().close();
}
}
}
/**
* Command to balance streams between regions.
*/
protected static class RegionBalancerCommand extends BalancerCommand {
protected URI region1;
protected URI region2;
protected String source = null;
protected RegionBalancerCommand() {
super("regionbalancer", "Balance streams between regions");
options.addOption("rs", "regions", true, "DistributedLog Region URI: uri1[,uri2]");
options.addOption("s", "source", true, "DistributedLog Source Region to balance");
}
@Override
protected String getUsage() {
return "regionbalancer [options]";
}
@Override
protected void parseCommandLine(CommandLine cmdline) throws ParseException {
super.parseCommandLine(cmdline);
if (!cmdline.hasOption("rs")) {
throw new ParseException("No regions provided.");
}
String regionsStr = cmdline.getOptionValue("rs");
String[] regions = regionsStr.split(",");
if (regions.length != 2) {
throw new ParseException("Invalid regions provided. Expected : serverset1[,serverset2]");
}
region1 = URI.create(regions[0]);
region2 = URI.create(regions[1]);
if (cmdline.hasOption("s")) {
source = cmdline.getOptionValue("s");
}
}
@Override
protected int executeCommand(CommandLine cmdline) throws Exception {
DLZkServerSet serverSet1 = DLZkServerSet.of(region1, 60000);
logger.info("Created serverset for {}", region1);
DLZkServerSet serverSet2 = DLZkServerSet.of(region2, 60000);
logger.info("Created serverset for {}", region2);
try {
DistributedLogClientBuilder builder1 =
createDistributedLogClientBuilder(serverSet1.getServerSet());
Pair<DistributedLogClient, MonitorServiceClient> pair1 =
ClientUtils.buildClient(builder1);
DistributedLogClientBuilder builder2 =
createDistributedLogClientBuilder(serverSet2.getServerSet());
Pair<DistributedLogClient, MonitorServiceClient> pair2 =
ClientUtils.buildClient(builder2);
try {
SimpleBalancer balancer = new SimpleBalancer(
BKNamespaceDriver.getZKServersFromDLUri(region1), pair1.getLeft(), pair1.getRight(),
BKNamespaceDriver.getZKServersFromDLUri(region2), pair2.getLeft(), pair2.getRight());
try {
return runBalancer(balancer);
} finally {
balancer.close();
}
} finally {
pair1.getLeft().close();
pair2.getLeft().close();
}
} finally {
serverSet1.close();
serverSet2.close();
}
}
protected int runBalancer(SimpleBalancer balancer) throws Exception {
if (null == source) {
balancer.balance(
rebalanceWaterMark,
rebalanceTolerancePercentage,
rebalanceConcurrency,
getRateLimiter());
} else {
balancer.balanceAll(source, rebalanceConcurrency, getRateLimiter());
}
return 0;
}
}
public BalancerTool() {
super();
addCommand(new ClusterBalancerCommand());
addCommand(new RegionBalancerCommand());
}
@Override
protected String getName() {
return "balancer";
}
}