| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.twitter.distributedlog.benchmark; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| |
| import com.twitter.distributedlog.DistributedLogConfiguration; |
| import com.twitter.distributedlog.benchmark.utils.ShiftableRateLimiter; |
| import com.twitter.finagle.stats.OstrichStatsReceiver; |
| import com.twitter.finagle.stats.StatsReceiver; |
| import java.io.File; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.concurrent.TimeUnit; |
| import org.apache.bookkeeper.stats.NullStatsProvider; |
| import org.apache.bookkeeper.stats.StatsLogger; |
| import org.apache.bookkeeper.stats.StatsProvider; |
| import org.apache.bookkeeper.util.ReflectionUtils; |
| import org.apache.commons.cli.BasicParser; |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.cli.HelpFormatter; |
| import org.apache.commons.cli.Options; |
| import org.apache.commons.lang.StringUtils; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * The launcher for benchmarks. |
| */ |
| public class Benchmarker { |
| |
| private static final Logger logger = LoggerFactory.getLogger(Benchmarker.class); |
| |
| static final String USAGE = "Benchmarker [-u <uri>] [-c <conf>] [-s serverset] [-m (read|write|dlwrite)]"; |
| |
| final String[] args; |
| final Options options = new Options(); |
| |
| int rate = 100; |
| int maxRate = 1000; |
| int changeRate = 100; |
| int changeRateSeconds = 1800; |
| int concurrency = 10; |
| String streamPrefix = "dlog-loadtest"; |
| int shardId = -1; |
| int numStreams = 10; |
| List<String> serversetPaths = new ArrayList<String>(); |
| List<String> finagleNames = new ArrayList<String>(); |
| int msgSize = 256; |
| String mode = null; |
| int durationMins = 60; |
| URI dlUri = null; |
| int batchSize = 0; |
| int readersPerStream = 1; |
| Integer maxStreamId = null; |
| int truncationInterval = 3600; |
| Integer startStreamId = null; |
| Integer endStreamId = null; |
| int hostConnectionCoreSize = 10; |
| int hostConnectionLimit = 10; |
| boolean thriftmux = false; |
| boolean handshakeWithClientInfo = false; |
| boolean readFromHead = false; |
| int sendBufferSize = 1024 * 1024; |
| int recvBufferSize = 1024 * 1024; |
| boolean enableBatching = false; |
| int batchBufferSize = 256 * 1024; |
| int batchFlushIntervalMicros = 2000; |
| String routingServiceFinagleNameString; |
| |
| final DistributedLogConfiguration conf = new DistributedLogConfiguration(); |
| final StatsReceiver statsReceiver = new OstrichStatsReceiver(); |
| StatsProvider statsProvider = null; |
| |
| Benchmarker(String[] args) { |
| this.args = args; |
| // prepare options |
| options.addOption("s", "serverset", true, "Proxy Server Set (separated by ',')"); |
| options.addOption("fn", "finagle-name", true, "Write proxy finagle name (separated by ',')"); |
| options.addOption("c", "conf", true, "DistributedLog Configuration File"); |
| options.addOption("u", "uri", true, "DistributedLog URI"); |
| options.addOption("i", "shard", true, "Shard Id"); |
| options.addOption("p", "provider", true, "DistributedLog Stats Provider"); |
| options.addOption("d", "duration", true, "Duration (minutes)"); |
| options.addOption("sp", "streamprefix", true, "Stream Prefix"); |
| options.addOption("sc", "streamcount", true, "Number of Streams"); |
| options.addOption("ms", "messagesize", true, "Message Size (bytes)"); |
| options.addOption("bs", "batchsize", true, "Batch Size"); |
| options.addOption("r", "rate", true, "Rate limit (requests/second)"); |
| options.addOption("mr", "max-rate", true, "Maximum Rate limit (requests/second)"); |
| options.addOption("cr", "change-rate", true, "Rate to increase each change period (requests/second)"); |
| options.addOption("ci", "change-interval", true, "Rate to increase period, seconds"); |
| options.addOption("t", "concurrency", true, "Concurrency (number of threads)"); |
| options.addOption("m", "mode", true, "Benchmark mode (read/write)"); |
| options.addOption("rps", "readers-per-stream", true, "Number readers per stream"); |
| options.addOption("msid", "max-stream-id", true, "Max Stream ID"); |
| options.addOption("ti", "truncation-interval", true, "Truncation interval in seconds"); |
| options.addOption("ssid", "start-stream-id", true, "Start Stream ID"); |
| options.addOption("esid", "end-stream-id", true, "Start Stream ID"); |
| options.addOption("hccs", "host-connection-core-size", true, "Finagle hostConnectionCoreSize"); |
| options.addOption("hcl", "host-connection-limit", true, "Finagle hostConnectionLimit"); |
| options.addOption("mx", "thriftmux", false, "Enable thriftmux (write mode only)"); |
| options.addOption("hsci", "handshake-with-client-info", false, "Enable handshaking with client info"); |
| options.addOption("rfh", "read-from-head", false, "Read from head of the stream"); |
| options.addOption("sb", "send-buffer", true, "Channel send buffer size, in bytes"); |
| options.addOption("rb", "recv-buffer", true, "Channel recv buffer size, in bytes"); |
| options.addOption("bt", "enable-batch", false, "Enable batching on writers"); |
| options.addOption("bbs", "batch-buffer-size", true, "The batch buffer size in bytes"); |
| options.addOption("bfi", "batch-flush-interval", true, "The batch buffer flush interval in micros"); |
| options.addOption("rs", "routing-service", true, "The routing service finagle name for server-side routing"); |
| options.addOption("h", "help", false, "Print usage."); |
| } |
| |
| void printUsage() { |
| HelpFormatter helpFormatter = new HelpFormatter(); |
| helpFormatter.printHelp(USAGE, options); |
| } |
| |
| void run() throws Exception { |
| logger.info("Running benchmark."); |
| |
| BasicParser parser = new BasicParser(); |
| CommandLine cmdline = parser.parse(options, args); |
| if (cmdline.hasOption("h")) { |
| printUsage(); |
| System.exit(0); |
| } |
| if (cmdline.hasOption("s")) { |
| String serversetPathStr = cmdline.getOptionValue("s"); |
| serversetPaths = Arrays.asList(StringUtils.split(serversetPathStr, ',')); |
| } |
| if (cmdline.hasOption("fn")) { |
| String finagleNameStr = cmdline.getOptionValue("fn"); |
| finagleNames = Arrays.asList(StringUtils.split(finagleNameStr, ',')); |
| } |
| if (cmdline.hasOption("i")) { |
| shardId = Integer.parseInt(cmdline.getOptionValue("i")); |
| } |
| if (cmdline.hasOption("d")) { |
| durationMins = Integer.parseInt(cmdline.getOptionValue("d")); |
| } |
| if (cmdline.hasOption("sp")) { |
| streamPrefix = cmdline.getOptionValue("sp"); |
| } |
| if (cmdline.hasOption("sc")) { |
| numStreams = Integer.parseInt(cmdline.getOptionValue("sc")); |
| } |
| if (cmdline.hasOption("ms")) { |
| msgSize = Integer.parseInt(cmdline.getOptionValue("ms")); |
| } |
| if (cmdline.hasOption("r")) { |
| rate = Integer.parseInt(cmdline.getOptionValue("r")); |
| } |
| if (cmdline.hasOption("mr")) { |
| maxRate = Integer.parseInt(cmdline.getOptionValue("mr")); |
| } |
| if (cmdline.hasOption("cr")) { |
| changeRate = Integer.parseInt(cmdline.getOptionValue("cr")); |
| } |
| if (cmdline.hasOption("ci")) { |
| changeRateSeconds = Integer.parseInt(cmdline.getOptionValue("ci")); |
| } |
| if (cmdline.hasOption("t")) { |
| concurrency = Integer.parseInt(cmdline.getOptionValue("t")); |
| } |
| if (cmdline.hasOption("m")) { |
| mode = cmdline.getOptionValue("m"); |
| } |
| if (cmdline.hasOption("u")) { |
| dlUri = URI.create(cmdline.getOptionValue("u")); |
| } |
| if (cmdline.hasOption("bs")) { |
| batchSize = Integer.parseInt(cmdline.getOptionValue("bs")); |
| checkArgument("write" != mode, "batchSize supported only for mode=write"); |
| } |
| if (cmdline.hasOption("c")) { |
| String configFile = cmdline.getOptionValue("c"); |
| conf.loadConf(new File(configFile).toURI().toURL()); |
| } |
| if (cmdline.hasOption("rps")) { |
| readersPerStream = Integer.parseInt(cmdline.getOptionValue("rps")); |
| } |
| if (cmdline.hasOption("msid")) { |
| maxStreamId = Integer.parseInt(cmdline.getOptionValue("msid")); |
| } |
| if (cmdline.hasOption("ti")) { |
| truncationInterval = Integer.parseInt(cmdline.getOptionValue("ti")); |
| } |
| if (cmdline.hasOption("ssid")) { |
| startStreamId = Integer.parseInt(cmdline.getOptionValue("ssid")); |
| } |
| if (cmdline.hasOption("esid")) { |
| endStreamId = Integer.parseInt(cmdline.getOptionValue("esid")); |
| } |
| if (cmdline.hasOption("hccs")) { |
| hostConnectionCoreSize = Integer.parseInt(cmdline.getOptionValue("hccs")); |
| } |
| if (cmdline.hasOption("hcl")) { |
| hostConnectionLimit = Integer.parseInt(cmdline.getOptionValue("hcl")); |
| } |
| if (cmdline.hasOption("sb")) { |
| sendBufferSize = Integer.parseInt(cmdline.getOptionValue("sb")); |
| } |
| if (cmdline.hasOption("rb")) { |
| recvBufferSize = Integer.parseInt(cmdline.getOptionValue("rb")); |
| } |
| if (cmdline.hasOption("rs")) { |
| routingServiceFinagleNameString = cmdline.getOptionValue("rs"); |
| } |
| thriftmux = cmdline.hasOption("mx"); |
| handshakeWithClientInfo = cmdline.hasOption("hsci"); |
| readFromHead = cmdline.hasOption("rfh"); |
| enableBatching = cmdline.hasOption("bt"); |
| if (cmdline.hasOption("bbs")) { |
| batchBufferSize = Integer.parseInt(cmdline.getOptionValue("bbs")); |
| } |
| if (cmdline.hasOption("bfi")) { |
| batchFlushIntervalMicros = Integer.parseInt(cmdline.getOptionValue("bfi")); |
| } |
| |
| checkArgument(shardId >= 0, "shardId must be >= 0"); |
| checkArgument(numStreams > 0, "numStreams must be > 0"); |
| checkArgument(durationMins > 0, "durationMins must be > 0"); |
| checkArgument(streamPrefix != null, "streamPrefix must be defined"); |
| checkArgument(hostConnectionCoreSize > 0, "host connection core size must be > 0"); |
| checkArgument(hostConnectionLimit > 0, "host connection limit must be > 0"); |
| |
| if (cmdline.hasOption("p")) { |
| statsProvider = ReflectionUtils.newInstance(cmdline.getOptionValue("p"), StatsProvider.class); |
| } else { |
| statsProvider = new NullStatsProvider(); |
| } |
| |
| logger.info("Starting stats provider : {}.", statsProvider.getClass()); |
| statsProvider.start(conf); |
| |
| Worker w = null; |
| if (mode.startsWith("read")) { |
| w = runReader(); |
| } else if (mode.startsWith("write")) { |
| w = runWriter(); |
| } else if (mode.startsWith("dlwrite")) { |
| w = runDLWriter(); |
| } else if (mode.startsWith("dlread")) { |
| w = runDLReader(); |
| } |
| |
| if (w == null) { |
| throw new IOException("Unknown mode " + mode + " to run the benchmark."); |
| } |
| |
| Thread workerThread = new Thread(w, mode + "-benchmark-thread"); |
| workerThread.start(); |
| |
| TimeUnit.MINUTES.sleep(durationMins); |
| |
| logger.info("{} minutes passed, exiting...", durationMins); |
| w.close(); |
| |
| if (null != statsProvider) { |
| statsProvider.stop(); |
| } |
| |
| Runtime.getRuntime().exit(0); |
| } |
| |
| Worker runWriter() { |
| checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri, |
| "either serverset paths, finagle-names or uri required"); |
| checkArgument(msgSize > 0, "messagesize must be greater than 0"); |
| checkArgument(rate > 0, "rate must be greater than 0"); |
| checkArgument(maxRate >= rate, "max rate must be greater than rate"); |
| checkArgument(changeRate >= 0, "change rate must be positive"); |
| checkArgument(changeRateSeconds >= 0, "change rate must be positive"); |
| checkArgument(concurrency > 0, "concurrency must be greater than 0"); |
| |
| ShiftableRateLimiter rateLimiter = |
| new ShiftableRateLimiter(rate, maxRate, changeRate, changeRateSeconds, TimeUnit.SECONDS); |
| return createWriteWorker( |
| streamPrefix, |
| dlUri, |
| null == startStreamId ? shardId * numStreams : startStreamId, |
| null == endStreamId ? (shardId + 1) * numStreams : endStreamId, |
| rateLimiter, |
| concurrency, |
| msgSize, |
| batchSize, |
| hostConnectionCoreSize, |
| hostConnectionLimit, |
| serversetPaths, |
| finagleNames, |
| statsReceiver.scope("write_client"), |
| statsProvider.getStatsLogger("write"), |
| thriftmux, |
| handshakeWithClientInfo, |
| sendBufferSize, |
| recvBufferSize, |
| enableBatching, |
| batchBufferSize, |
| batchFlushIntervalMicros, |
| routingServiceFinagleNameString); |
| } |
| |
| protected WriterWorker createWriteWorker( |
| String streamPrefix, |
| URI uri, |
| int startStreamId, |
| int endStreamId, |
| ShiftableRateLimiter rateLimiter, |
| int writeConcurrency, |
| int messageSizeBytes, |
| int batchSize, |
| int hostConnectionCoreSize, |
| int hostConnectionLimit, |
| List<String> serverSetPaths, |
| List<String> finagleNames, |
| StatsReceiver statsReceiver, |
| StatsLogger statsLogger, |
| boolean thriftmux, |
| boolean handshakeWithClientInfo, |
| int sendBufferSize, |
| int recvBufferSize, |
| boolean enableBatching, |
| int batchBufferSize, |
| int batchFlushIntervalMicros, |
| String routingServiceFinagleNameString) { |
| return new WriterWorker( |
| streamPrefix, |
| uri, |
| startStreamId, |
| endStreamId, |
| rateLimiter, |
| writeConcurrency, |
| messageSizeBytes, |
| batchSize, |
| hostConnectionCoreSize, |
| hostConnectionLimit, |
| serverSetPaths, |
| finagleNames, |
| statsReceiver, |
| statsLogger, |
| thriftmux, |
| handshakeWithClientInfo, |
| sendBufferSize, |
| recvBufferSize, |
| enableBatching, |
| batchBufferSize, |
| batchFlushIntervalMicros, |
| routingServiceFinagleNameString); |
| } |
| |
| Worker runDLWriter() throws IOException { |
| checkNotNull(dlUri, "dlUri must be defined"); |
| checkArgument(rate > 0, "rate must be greater than 0"); |
| checkArgument(maxRate >= rate, "max rate must be greater than rate"); |
| checkArgument(changeRate >= 0, "change rate must be positive"); |
| checkArgument(changeRateSeconds >= 0, "change rate must be positive"); |
| checkArgument(concurrency > 0, "concurrency must be greater than 0"); |
| |
| ShiftableRateLimiter rateLimiter = |
| new ShiftableRateLimiter(rate, maxRate, changeRate, changeRateSeconds, TimeUnit.SECONDS); |
| |
| return new DLWriterWorker(conf, |
| dlUri, |
| streamPrefix, |
| shardId * numStreams, |
| (shardId + 1) * numStreams, |
| rateLimiter, |
| concurrency, |
| msgSize, |
| statsProvider.getStatsLogger("dlwrite")); |
| } |
| |
| Worker runReader() throws IOException { |
| checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri, |
| "either serverset paths, finagle-names or dlUri required"); |
| checkArgument(concurrency > 0, "concurrency must be greater than 0"); |
| checkArgument(truncationInterval > 0, "truncation interval should be greater than 0"); |
| return runReaderInternal(serversetPaths, finagleNames, truncationInterval); |
| } |
| |
| Worker runDLReader() throws IOException { |
| return runReaderInternal(new ArrayList<String>(), new ArrayList<String>(), 0); |
| } |
| |
| private Worker runReaderInternal(List<String> serversetPaths, |
| List<String> finagleNames, |
| int truncationInterval) throws IOException { |
| checkNotNull(dlUri); |
| |
| int ssid = null == startStreamId ? shardId * numStreams : startStreamId; |
| int esid = null == endStreamId ? (shardId + readersPerStream) * numStreams : endStreamId; |
| if (null != maxStreamId) { |
| esid = Math.min(esid, maxStreamId); |
| } |
| |
| return createReaderWorker( |
| conf, |
| dlUri, |
| streamPrefix, |
| ssid, |
| esid, |
| concurrency, |
| serversetPaths, |
| finagleNames, |
| truncationInterval, |
| readFromHead, |
| statsReceiver, |
| statsProvider.getStatsLogger("dlreader")); |
| } |
| |
| protected ReaderWorker createReaderWorker( |
| DistributedLogConfiguration conf, |
| URI uri, |
| String streamPrefix, |
| int startStreamId, |
| int endStreamId, |
| int readThreadPoolSize, |
| List<String> serverSetPaths, |
| List<String> finagleNames, |
| int truncationIntervalInSeconds, |
| boolean readFromHead, /* read from the earliest data of log */ |
| StatsReceiver statsReceiver, |
| StatsLogger statsLogger) throws IOException { |
| return new ReaderWorker( |
| conf, |
| uri, |
| streamPrefix, |
| startStreamId, |
| endStreamId, |
| readThreadPoolSize, |
| serverSetPaths, |
| finagleNames, |
| truncationIntervalInSeconds, |
| readFromHead, |
| statsReceiver, |
| statsLogger); |
| } |
| |
| public static void main(String[] args) { |
| Benchmarker benchmarker = new Benchmarker(args); |
| try { |
| benchmarker.run(); |
| } catch (Exception e) { |
| logger.info("Benchmark quit due to : ", e); |
| } |
| } |
| |
| } |