blob: 946616f30b6f751c8e869c8f36697d50b5652858 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.tools.perf.dlog;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import com.beust.jcommander.Parameter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.google.common.util.concurrent.RateLimiter;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.Unpooled;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.Recorder;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.bookkeeper.tools.framework.CliFlags;
import org.apache.bookkeeper.tools.perf.utils.PaddingDecimalFormat;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
/**
* A perf writer to evaluate write performance.
*/
@Slf4j
public class PerfWriter implements Runnable {
/**
* Flags for the write command.
*/
public static class Flags extends CliFlags {
@Parameter(
names = {
"-r", "--rate"
},
description = "Write rate bytes/s across log streams")
public int writeRate = 0;
@Parameter(
names = {
"-rs", "--record-size"
},
description = "Log record size")
public int recordSize = 1024;
@Parameter(
names = {
"-ln", "--log-name"
},
description = "Log name or log name pattern if more than 1 log is specified at `--num-logs`")
public String logName = "test-log-%06d";
@Parameter(
names = {
"-l", "--num-logs"
},
description = "Number of log streams")
public int numLogs = 1;
@Parameter(
names = {
"-t", "--threads"
},
description = "Number of threads writing")
public int numThreads = 1;
@Parameter(
names = {
"-mob", "--max-outstanding-megabytes"
},
description = "Number of threads writing")
public long maxOutstandingMB = 200;
@Parameter(
names = {
"-n", "--num-records"
},
description = "Number of records to write in total. If 0, it will keep writing")
public long numRecords = 0;
@Parameter(
names = {
"-mlss", "--max-log-segment-size"
},
description = "Max log segment size")
public int maxLogSegmentSize = 64 * 1024 * 1024;
@Parameter(
names = {
"-b", "--num-bytes"
},
description = "Number of bytes to write in total. If 0, it will keep writing")
public long numBytes = 0;
@Parameter(
names = {
"-e", "--ensemble-size"
},
description = "Ledger ensemble size")
public int ensembleSize = 1;
@Parameter(
names = {
"-w", "--write-quorum-size"
},
description = "Ledger write quorum size")
public int writeQuorumSize = 1;
@Parameter(
names = {
"-a", "--ack-quorum-size"
},
description = "Ledger ack quorum size")
public int ackQuorumSize = 1;
}
// stats
private final LongAdder recordsWritten = new LongAdder();
private final LongAdder bytesWritten = new LongAdder();
private final byte[] payload;
private final ServiceURI serviceURI;
private final Flags flags;
private final Recorder recorder = new Recorder(
TimeUnit.SECONDS.toMillis(120000), 5
);
private final Recorder cumulativeRecorder = new Recorder(
TimeUnit.SECONDS.toMillis(120000), 5
);
private final AtomicBoolean isDone = new AtomicBoolean(false);
PerfWriter(ServiceURI serviceURI, Flags flags) {
this.serviceURI = serviceURI;
this.flags = flags;
this.payload = new byte[flags.recordSize];
ThreadLocalRandom.current().nextBytes(payload);
}
@Override
public void run() {
try {
execute();
} catch (Exception e) {
log.error("Encountered exception at running dlog perf writer", e);
}
}
void execute() throws Exception {
ObjectMapper m = new ObjectMapper();
ObjectWriter w = m.writerWithDefaultPrettyPrinter();
log.info("Starting dlog perf writer with config : {}", w.writeValueAsString(flags));
DistributedLogConfiguration conf = newDlogConf(flags);
try (Namespace namespace = NamespaceBuilder.newBuilder()
.conf(conf)
.uri(serviceURI.getUri())
.build()) {
execute(namespace);
}
}
void execute(Namespace namespace) throws Exception {
List<Pair<Integer, DistributedLogManager>> managers = new ArrayList<>(flags.numLogs);
for (int i = 0; i < flags.numLogs; i++) {
String logName = String.format(flags.logName, i);
managers.add(Pair.of(i, namespace.openLog(logName)));
}
log.info("Successfully open {} logs", managers.size());
// register shutdown hook to aggregate stats
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
isDone.set(true);
printAggregatedStats(cumulativeRecorder);
}));
ExecutorService executor = Executors.newFixedThreadPool(flags.numThreads);
try {
for (int i = 0; i < flags.numThreads; i++) {
final int idx = i;
final List<DistributedLogManager> logsThisThread = managers
.stream()
.filter(pair -> pair.getLeft() % flags.numThreads == idx)
.map(pair -> pair.getRight())
.collect(Collectors.toList());
final long numRecordsForThisThread = flags.numRecords / flags.numThreads;
final long numBytesForThisThread = flags.numBytes / flags.numThreads;
final double writeRateForThisThread = flags.writeRate / (double) flags.numThreads;
final long maxOutstandingBytesForThisThread = flags.maxOutstandingMB * 1024 * 1024 / flags.numThreads;
executor.submit(() -> {
try {
write(
logsThisThread,
writeRateForThisThread,
(int) maxOutstandingBytesForThisThread,
numRecordsForThisThread,
numBytesForThisThread);
} catch (Exception e) {
log.error("Encountered error at writing records", e);
}
});
}
log.info("Started {} write threads", flags.numThreads);
reportStats();
} finally {
executor.shutdown();
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
managers.forEach(manager -> manager.getRight().asyncClose());
}
}
void write(List<DistributedLogManager> logs,
double writeRate,
int maxOutstandingBytesForThisThread,
long numRecordsForThisThread,
long numBytesForThisThread) throws Exception {
log.info("Write thread started with : logs = {}, rate = {},"
+ " num records = {}, num bytes = {}, max outstanding bytes = {}",
logs.stream().map(l -> l.getStreamName()).collect(Collectors.toList()),
writeRate,
numRecordsForThisThread,
numBytesForThisThread,
maxOutstandingBytesForThisThread);
List<CompletableFuture<AsyncLogWriter>> writerFutures = logs.stream()
.map(manager -> manager.openAsyncLogWriter())
.collect(Collectors.toList());
List<AsyncLogWriter> writers = result(FutureUtils.collect(writerFutures));
long txid = writers
.stream()
.mapToLong(writer -> writer.getLastTxId())
.max()
.orElse(0L);
txid = Math.max(0L, txid);
RateLimiter limiter;
if (writeRate > 0) {
limiter = RateLimiter.create(writeRate);
} else {
limiter = null;
}
final Semaphore semaphore;
if (maxOutstandingBytesForThisThread > 0) {
semaphore = new Semaphore(maxOutstandingBytesForThisThread);
} else {
semaphore = null;
}
// Acquire 1 second worth of records to have a slower ramp-up
if (limiter != null) {
limiter.acquire((int) writeRate);
}
long totalWritten = 0L;
long totalBytesWritten = 0L;
final int numLogs = logs.size();
while (true) {
for (int i = 0; i < numLogs; i++) {
if (numRecordsForThisThread > 0
&& totalWritten >= numRecordsForThisThread) {
markPerfDone();
}
if (numBytesForThisThread > 0
&& totalBytesWritten >= numBytesForThisThread) {
markPerfDone();
}
if (null != semaphore) {
semaphore.acquire(payload.length);
}
totalWritten++;
totalBytesWritten += payload.length;
if (null != limiter) {
limiter.acquire(payload.length);
}
final long sendTime = System.nanoTime();
writers.get(i).write(
new LogRecord(++txid, Unpooled.wrappedBuffer(payload))
).thenAccept(dlsn -> {
if (null != semaphore) {
semaphore.release(payload.length);
}
recordsWritten.increment();
bytesWritten.add(payload.length);
long latencyMicros = TimeUnit.NANOSECONDS.toMicros(
System.nanoTime() - sendTime
);
recorder.recordValue(latencyMicros);
cumulativeRecorder.recordValue(latencyMicros);
}).exceptionally(cause -> {
log.warn("Error at writing records", cause);
System.exit(-1);
return null;
});
}
}
}
@SuppressFBWarnings("DM_EXIT")
void markPerfDone() throws Exception {
log.info("------------------- DONE -----------------------");
printAggregatedStats(cumulativeRecorder);
isDone.set(true);
Thread.sleep(5000);
System.exit(0);
}
void reportStats() {
// Print report stats
long oldTime = System.nanoTime();
Histogram reportHistogram = null;
while (true) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
break;
}
if (isDone.get()) {
break;
}
long now = System.nanoTime();
double elapsed = (now - oldTime) / 1e9;
double rate = recordsWritten.sumThenReset() / elapsed;
double throughput = bytesWritten.sumThenReset() / elapsed / 1024 / 1024;
reportHistogram = recorder.getIntervalHistogram(reportHistogram);
log.info(
"Throughput written : {} records/s --- {} MB/s --- Latency: mean:"
+ " {} ms - med: {} - 95pct: {} - 99pct: {} - 99.9pct: {} - 99.99pct: {} - Max: {}",
throughputFormat.format(rate), throughputFormat.format(throughput),
dec.format(reportHistogram.getMean() / 1000.0),
dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
dec.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
dec.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
dec.format(reportHistogram.getMaxValue() / 1000.0));
reportHistogram.reset();
oldTime = now;
}
}
private static DistributedLogConfiguration newDlogConf(Flags flags) {
return new DistributedLogConfiguration()
.setEnsembleSize(flags.ensembleSize)
.setWriteQuorumSize(flags.writeQuorumSize)
.setAckQuorumSize(flags.ackQuorumSize)
.setOutputBufferSize(512 * 1024)
.setPeriodicFlushFrequencyMilliSeconds(2)
.setWriteLockEnabled(false)
.setMaxLogSegmentBytes(flags.maxLogSegmentSize)
.setLogSegmentRollingIntervalMinutes(1)
.setExplicitTruncationByApplication(true);
}
private static final DecimalFormat throughputFormat = new PaddingDecimalFormat("0.0", 8);
private static final DecimalFormat dec = new PaddingDecimalFormat("0.000", 7);
private static void printAggregatedStats(Recorder recorder) {
Histogram reportHistogram = recorder.getIntervalHistogram();
log.info("Aggregated latency stats --- Latency: mean: {} ms - med: {} - 95pct: {} - 99pct: {}"
+ " - 99.9pct: {} - 99.99pct: {} - 99.999pct: {} - Max: {}",
dec.format(reportHistogram.getMean() / 1000.0),
dec.format(reportHistogram.getValueAtPercentile(50) / 1000.0),
dec.format(reportHistogram.getValueAtPercentile(95) / 1000.0),
dec.format(reportHistogram.getValueAtPercentile(99) / 1000.0),
dec.format(reportHistogram.getValueAtPercentile(99.9) / 1000.0),
dec.format(reportHistogram.getValueAtPercentile(99.99) / 1000.0),
dec.format(reportHistogram.getValueAtPercentile(99.999) / 1000.0),
dec.format(reportHistogram.getMaxValue() / 1000.0));
}
}