blob: 3934eb51bda3e0590cfc6d8d3b952bb642f5b9e2 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.service.tools;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.client.monitor.MonitorServiceClient;
import org.apache.distributedlog.client.serverset.DLZkServerSet;
import org.apache.distributedlog.service.ClientUtils;
import org.apache.distributedlog.service.DLSocketAddress;
import org.apache.distributedlog.service.DistributedLogClient;
import org.apache.distributedlog.service.DistributedLogClientBuilder;
import org.apache.distributedlog.tools.Tool;
import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.thrift.ClientId$;
import com.twitter.util.Await;
import com.twitter.util.Duration;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tools to interact with proxies.
*/
public class ProxyTool extends Tool {
private static final Logger logger = LoggerFactory.getLogger(ProxyTool.class);
/**
* Abstract Cluster level command.
*/
protected abstract static class ClusterCommand extends OptsCommand {
protected Options options = new Options();
protected URI uri;
protected final List<String> streams = new ArrayList<String>();
protected ClusterCommand(String name, String description) {
super(name, description);
options.addOption("u", "uri", true, "DistributedLog URI");
options.addOption("r", "prefix", true, "Prefix of stream name. E.g. 'QuantumLeapTest-'.");
options.addOption("e", "expression", true, "Expression to generate stream suffix. "
+ "Currently we support range '0-9', list '1,2,3' and name '143'");
}
@Override
protected int runCmd(CommandLine commandLine) throws Exception {
try {
parseCommandLine(commandLine);
} catch (ParseException pe) {
System.err.println("ERROR: failed to parse commandline : '" + pe.getMessage() + "'");
printUsage();
return -1;
}
DLZkServerSet serverSet = DLZkServerSet.of(uri, 60000);
logger.info("Created serverset for {}", uri);
try {
DistributedLogClient client = DistributedLogClientBuilder.newBuilder()
.name("proxy_tool")
.clientId(ClientId$.MODULE$.apply("proxy_tool"))
.maxRedirects(2)
.serverSet(serverSet.getServerSet())
.clientBuilder(ClientBuilder.get()
.connectionTimeout(Duration.fromSeconds(2))
.tcpConnectTimeout(Duration.fromSeconds(2))
.requestTimeout(Duration.fromSeconds(10))
.hostConnectionLimit(1)
.hostConnectionCoresize(1)
.keepAlive(true)
.failFast(false))
.build();
try {
return runCmd(client);
} finally {
client.close();
}
} finally {
serverSet.close();
}
}
protected abstract int runCmd(DistributedLogClient client) throws Exception;
@Override
protected Options getOptions() {
return options;
}
protected void parseCommandLine(CommandLine cmdline) throws ParseException {
if (!cmdline.hasOption("u")) {
throw new ParseException("No distributedlog uri provided.");
}
this.uri = URI.create(cmdline.getOptionValue("u"));
// get stream names
String streamPrefix = cmdline.hasOption("r") ? cmdline.getOptionValue("r") : "";
String streamExpression = null;
if (cmdline.hasOption("e")) {
streamExpression = cmdline.getOptionValue("e");
}
if (null == streamPrefix || null == streamExpression) {
throw new ParseException("Please specify stream prefix & expression.");
}
// parse the stream expression
if (streamExpression.contains("-")) {
// a range expression
String[] parts = streamExpression.split("-");
if (parts.length != 2) {
throw new ParseException("Invalid stream index range : " + streamExpression);
}
try {
int start = Integer.parseInt(parts[0]);
int end = Integer.parseInt(parts[1]);
if (start > end) {
throw new ParseException("Invalid stream index range : " + streamExpression);
}
for (int i = start; i <= end; i++) {
streams.add(streamPrefix + i);
}
} catch (NumberFormatException nfe) {
throw new ParseException("Invalid stream index range : " + streamExpression);
}
} else if (streamExpression.contains(",")) {
// a list expression
String[] parts = streamExpression.split(",");
try {
for (String part : parts) {
streams.add(streamPrefix + part);
}
} catch (NumberFormatException nfe) {
throw new ParseException("Invalid stream suffix list : " + streamExpression);
}
} else {
streams.add(streamPrefix + streamExpression);
}
}
}
/**
* Command to release ownership of a log stream.
*/
static class ReleaseCommand extends ClusterCommand {
double rate = 100f;
ReleaseCommand() {
super("release", "Release Stream Ownerships");
options.addOption("t", "rate", true, "Rate to release streams");
}
@Override
protected void parseCommandLine(CommandLine cmdline) throws ParseException {
super.parseCommandLine(cmdline);
if (cmdline.hasOption("t")) {
rate = Double.parseDouble(cmdline.getOptionValue("t", "100"));
}
}
@Override
protected int runCmd(DistributedLogClient client) throws Exception {
RateLimiter rateLimiter = RateLimiter.create(rate);
for (String stream : streams) {
rateLimiter.acquire();
try {
Await.result(client.release(stream));
System.out.println("Release ownership of stream " + stream);
} catch (Exception e) {
System.err.println("Failed to release ownership of stream " + stream);
throw e;
}
}
return 0;
}
@Override
protected String getUsage() {
return "release [options]";
}
}
/**
* Command to truncate a log stream.
*/
static class TruncateCommand extends ClusterCommand {
DLSN dlsn = DLSN.InitialDLSN;
TruncateCommand() {
super("truncate", "Truncate streams until given dlsn.");
options.addOption("d", "dlsn", true, "DLSN to truncate until");
}
@Override
protected int runCmd(DistributedLogClient client) throws Exception {
System.out.println("Truncating streams : " + streams);
for (String stream : streams) {
boolean success = Await.result(client.truncate(stream, dlsn));
System.out.println("Truncate " + stream + " to " + dlsn + " : " + success);
}
return 0;
}
@Override
protected void parseCommandLine(CommandLine cmdline) throws ParseException {
super.parseCommandLine(cmdline);
if (!cmdline.hasOption("d")) {
throw new ParseException("No DLSN provided");
}
String[] dlsnStrs = cmdline.getOptionValue("d").split(",");
if (dlsnStrs.length != 3) {
throw new ParseException("Invalid DLSN : " + cmdline.getOptionValue("d"));
}
dlsn = new DLSN(Long.parseLong(dlsnStrs[0]), Long.parseLong(dlsnStrs[1]), Long.parseLong(dlsnStrs[2]));
}
@Override
protected String getUsage() {
return "truncate [options]";
}
}
/**
* Abstract command to operate on a single proxy server.
*/
protected abstract static class ProxyCommand extends OptsCommand {
protected Options options = new Options();
protected InetSocketAddress address;
protected ProxyCommand(String name, String description) {
super(name, description);
options.addOption("H", "host", true, "Single Proxy Address");
}
@Override
protected Options getOptions() {
return options;
}
protected void parseCommandLine(CommandLine cmdline) throws ParseException {
if (!cmdline.hasOption("H")) {
throw new ParseException("No proxy address provided");
}
address = DLSocketAddress.parseSocketAddress(cmdline.getOptionValue("H"));
}
@Override
protected int runCmd(CommandLine commandLine) throws Exception {
try {
parseCommandLine(commandLine);
} catch (ParseException pe) {
System.err.println("ERROR: failed to parse commandline : '" + pe.getMessage() + "'");
printUsage();
return -1;
}
DistributedLogClientBuilder clientBuilder = DistributedLogClientBuilder.newBuilder()
.name("proxy_tool")
.clientId(ClientId$.MODULE$.apply("proxy_tool"))
.maxRedirects(2)
.host(address)
.clientBuilder(ClientBuilder.get()
.connectionTimeout(Duration.fromSeconds(2))
.tcpConnectTimeout(Duration.fromSeconds(2))
.requestTimeout(Duration.fromSeconds(10))
.hostConnectionLimit(1)
.hostConnectionCoresize(1)
.keepAlive(true)
.failFast(false));
Pair<DistributedLogClient, MonitorServiceClient> clientPair =
ClientUtils.buildClient(clientBuilder);
try {
return runCmd(clientPair);
} finally {
clientPair.getLeft().close();
}
}
protected abstract int runCmd(Pair<DistributedLogClient, MonitorServiceClient> client) throws Exception;
}
/**
* Command to enable/disable accepting new streams.
*/
static class AcceptNewStreamCommand extends ProxyCommand {
boolean enabled = false;
AcceptNewStreamCommand() {
super("accept-new-stream", "Enable/Disable accepting new streams for one proxy");
options.addOption("e", "enabled", true, "Enable/Disable accepting new streams");
}
@Override
protected void parseCommandLine(CommandLine cmdline) throws ParseException {
super.parseCommandLine(cmdline);
if (!cmdline.hasOption("e")) {
throw new ParseException("No action 'enable/disable' provided");
}
enabled = Boolean.parseBoolean(cmdline.getOptionValue("e"));
}
@Override
protected int runCmd(Pair<DistributedLogClient, MonitorServiceClient> client)
throws Exception {
Await.result(client.getRight().setAcceptNewStream(enabled));
return 0;
}
@Override
protected String getUsage() {
return "accept-new-stream [options]";
}
}
public ProxyTool() {
super();
addCommand(new ReleaseCommand());
addCommand(new TruncateCommand());
addCommand(new AcceptNewStreamCommand());
}
@Override
protected String getName() {
return "proxy_tool";
}
}