blob: 4ac948aadd692fbb743c090daf5e970d2b2e4c30 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.benchmark;
import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.base.Stopwatch;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.twitter.common.zookeeper.ServerSet;
import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.LogRecordSet;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.benchmark.thrift.Message;
import org.apache.distributedlog.client.serverset.DLZkServerSet;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.service.DistributedLogClient;
import org.apache.distributedlog.service.DistributedLogClientBuilder;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.common.util.SchedulerUtils;
import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.stats.StatsReceiver;
import com.twitter.finagle.thrift.ClientId$;
import com.twitter.util.Duration$;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The benchmark for core library reader.
*/
public class ReaderWorker implements Worker {
private static final Logger LOG = LoggerFactory.getLogger(ReaderWorker.class);
static final int BACKOFF_MS = 200;
final String streamPrefix;
final int startStreamId;
final int endStreamId;
final ScheduledExecutorService executorService;
final ExecutorService callbackExecutor;
final Namespace namespace;
final DistributedLogManager[] dlms;
final AsyncLogReader[] logReaders;
final StreamReader[] streamReaders;
final int numStreams;
final boolean readFromHead;
final int truncationIntervalInSeconds;
// DL Client Related Variables
final DLZkServerSet[] serverSets;
final List<String> finagleNames;
final DistributedLogClient dlc;
volatile boolean running = true;
final StatsReceiver statsReceiver;
final StatsLogger statsLogger;
final OpStatsLogger e2eStat;
final OpStatsLogger deliveryStat;
final OpStatsLogger negativeE2EStat;
final OpStatsLogger negativeDeliveryStat;
final OpStatsLogger truncationStat;
final Counter invalidRecordsCounter;
final Counter outOfOrderSequenceIdCounter;
class StreamReader implements
org.apache.distributedlog.common.concurrent.FutureEventListener<List<LogRecordWithDLSN>>,
Runnable, Gauge<Number> {
final int streamIdx;
final String streamName;
DLSN prevDLSN = null;
long prevSequenceId = Long.MIN_VALUE;
private static final String gaugeLabel = "sequence_id";
StreamReader(int idx, StatsLogger statsLogger) {
this.streamIdx = idx;
int streamId = startStreamId + streamIdx;
streamName = String.format("%s_%d", streamPrefix, streamId);
statsLogger.scope(streamName).registerGauge(gaugeLabel, this);
}
@Override
public void onSuccess(final List<LogRecordWithDLSN> records) {
for (final LogRecordWithDLSN record : records) {
if (record.isRecordSet()) {
try {
processRecordSet(record);
} catch (IOException e) {
onFailure(e);
}
} else {
processRecord(record);
}
}
readLoop();
}
public void processRecordSet(final LogRecordWithDLSN record) throws IOException {
LogRecordSet.Reader reader = LogRecordSet.of(record);
LogRecordWithDLSN nextRecord = reader.nextRecord();
while (null != nextRecord) {
processRecord(nextRecord);
nextRecord = reader.nextRecord();
}
reader.release();
}
public void processRecord(final LogRecordWithDLSN record) {
Message msg;
try {
msg = Utils.parseMessage(record.getPayload());
} catch (TException e) {
invalidRecordsCounter.inc();
LOG.warn("Failed to parse record {} for stream {} : size = {} , ",
new Object[] { record, streamIdx, record.getPayload().length, e });
return;
}
long curTimeMillis = System.currentTimeMillis();
long e2eLatency = curTimeMillis - msg.getPublishTime();
long deliveryLatency = curTimeMillis - record.getTransactionId();
if (e2eLatency >= 0) {
e2eStat.registerSuccessfulEvent(e2eLatency, TimeUnit.MILLISECONDS);
} else {
negativeE2EStat.registerSuccessfulEvent(-e2eLatency, TimeUnit.MILLISECONDS);
}
if (deliveryLatency >= 0) {
deliveryStat.registerSuccessfulEvent(deliveryLatency, TimeUnit.MILLISECONDS);
} else {
negativeDeliveryStat.registerSuccessfulEvent(-deliveryLatency, TimeUnit.MILLISECONDS);
}
prevDLSN = record.getDlsn();
}
@Override
public void onFailure(Throwable cause) {
scheduleReinitStream(streamIdx).map(new Function<Void, Void>() {
@Override
public Void apply(Void value) {
prevDLSN = null;
prevSequenceId = Long.MIN_VALUE;
readLoop();
return null;
}
});
}
void readLoop() {
if (!running) {
return;
}
logReaders[streamIdx].readBulk(10).whenComplete(this);
}
@Override
public void run() {
final DLSN dlsnToTruncate = prevDLSN;
if (null == dlsnToTruncate) {
return;
}
final Stopwatch stopwatch = Stopwatch.createStarted();
dlc.truncate(streamName, dlsnToTruncate).addEventListener(
new FutureEventListener<Boolean>() {
@Override
public void onSuccess(Boolean value) {
truncationStat.registerSuccessfulEvent(
stopwatch.stop().elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
}
@Override
public void onFailure(Throwable cause) {
truncationStat.registerFailedEvent(
stopwatch.stop().elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
LOG.error("Failed to truncate stream {} to {} : ",
new Object[]{streamName, dlsnToTruncate, cause});
}
});
}
@Override
public Number getDefaultValue() {
return Long.MIN_VALUE;
}
@Override
public synchronized Number getSample() {
return prevSequenceId;
}
void unregisterGauge() {
statsLogger.scope(streamName).unregisterGauge(gaugeLabel, this);
}
}
public ReaderWorker(DistributedLogConfiguration conf,
URI uri,
String streamPrefix,
int startStreamId,
int endStreamId,
int readThreadPoolSize,
List<String> serverSetPaths,
List<String> finagleNames,
int truncationIntervalInSeconds,
boolean readFromHead, /* read from the earliest data of log */
StatsReceiver statsReceiver,
StatsLogger statsLogger) throws IOException {
checkArgument(startStreamId <= endStreamId);
this.streamPrefix = streamPrefix;
this.startStreamId = startStreamId;
this.endStreamId = endStreamId;
this.truncationIntervalInSeconds = truncationIntervalInSeconds;
this.readFromHead = readFromHead;
this.statsReceiver = statsReceiver;
this.statsLogger = statsLogger;
this.e2eStat = this.statsLogger.getOpStatsLogger("e2e");
this.negativeE2EStat = this.statsLogger.getOpStatsLogger("e2eNegative");
this.deliveryStat = this.statsLogger.getOpStatsLogger("delivery");
this.negativeDeliveryStat = this.statsLogger.getOpStatsLogger("deliveryNegative");
this.truncationStat = this.statsLogger.getOpStatsLogger("truncation");
this.invalidRecordsCounter = this.statsLogger.getCounter("invalid_records");
this.outOfOrderSequenceIdCounter = this.statsLogger.getCounter("out_of_order_seq_id");
this.executorService = Executors.newScheduledThreadPool(
readThreadPoolSize, new ThreadFactoryBuilder().setNameFormat("benchmark.reader-%d").build());
this.callbackExecutor = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
new ThreadFactoryBuilder().setNameFormat("benchmark.reader-callback-%d").build());
this.finagleNames = finagleNames;
this.serverSets = createServerSets(serverSetPaths);
conf.setDeserializeRecordSetOnReads(false);
if (truncationIntervalInSeconds > 0 && (!finagleNames.isEmpty() || !serverSetPaths.isEmpty())) {
// Construct client for truncation
DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder()
.clientId(ClientId$.MODULE$.apply("dlog_loadtest_reader"))
.clientBuilder(ClientBuilder.get()
.hostConnectionLimit(10)
.hostConnectionCoresize(10)
.tcpConnectTimeout(Duration$.MODULE$.fromSeconds(1))
.requestTimeout(Duration$.MODULE$.fromSeconds(2)))
.redirectBackoffStartMs(100)
.redirectBackoffMaxMs(500)
.requestTimeoutMs(2000)
.statsReceiver(statsReceiver)
.thriftmux(true)
.name("reader");
if (serverSetPaths.isEmpty()) {
// Prepare finagle names
String local = finagleNames.get(0);
String[] remotes = new String[finagleNames.size() - 1];
finagleNames.subList(1, finagleNames.size()).toArray(remotes);
builder = builder.finagleNameStrs(local, remotes);
LOG.info("Initialized distributedlog client for truncation @ {}.", finagleNames);
} else if (serverSets.length != 0){
ServerSet local = this.serverSets[0].getServerSet();
ServerSet[] remotes = new ServerSet[this.serverSets.length - 1];
for (int i = 1; i < serverSets.length; i++) {
remotes[i - 1] = serverSets[i].getServerSet();
}
builder = builder.serverSets(local, remotes);
LOG.info("Initialized distributedlog client for truncation @ {}.", serverSetPaths);
} else {
builder = builder.uri(uri);
LOG.info("Initialized distributedlog client for namespace {}", uri);
}
dlc = builder.build();
} else {
dlc = null;
}
// construct the factory
this.namespace = NamespaceBuilder.newBuilder()
.conf(conf)
.uri(uri)
.statsLogger(statsLogger.scope("dl"))
.build();
this.numStreams = endStreamId - startStreamId;
this.dlms = new DistributedLogManager[numStreams];
this.logReaders = new AsyncLogReader[numStreams];
final CountDownLatch latch = new CountDownLatch(numStreams);
for (int i = 0; i < numStreams; i++) {
final int idx = i;
executorService.submit(new Runnable() {
@Override
public void run() {
reinitStream(idx).map(new Function<Void, Void>() {
@Override
public Void apply(Void value) {
LOG.info("Initialized stream reader {}.", idx);
latch.countDown();
return null;
}
});
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
throw new DLInterruptedException("Failed to intialize benchmark readers : ", e);
}
this.streamReaders = new StreamReader[numStreams];
for (int i = 0; i < numStreams; i++) {
streamReaders[i] = new StreamReader(i, statsLogger.scope("perstream"));
if (truncationIntervalInSeconds > 0) {
executorService.scheduleWithFixedDelay(streamReaders[i],
truncationIntervalInSeconds, truncationIntervalInSeconds, TimeUnit.SECONDS);
}
}
LOG.info("Initialized benchmark reader on {} streams {} : [{} - {})",
new Object[] { numStreams, streamPrefix, startStreamId, endStreamId });
}
protected DLZkServerSet[] createServerSets(List<String> serverSetPaths) {
DLZkServerSet[] serverSets = new DLZkServerSet[serverSetPaths.size()];
for (int i = 0; i < serverSets.length; i++) {
String serverSetPath = serverSetPaths.get(i);
serverSets[i] = DLZkServerSet.of(URI.create(serverSetPath), 60000);
}
return serverSets;
}
private Future<Void> reinitStream(int idx) {
Promise<Void> promise = new Promise<Void>();
reinitStream(idx, promise);
return promise;
}
private void reinitStream(int idx, Promise<Void> promise) {
int streamId = startStreamId + idx;
String streamName = String.format("%s_%d", streamPrefix, streamId);
if (logReaders[idx] != null) {
try {
FutureUtils.result(logReaders[idx].asyncClose());
} catch (Exception e) {
LOG.warn("Failed on closing stream reader {} : ", streamName, e);
}
logReaders[idx] = null;
}
if (dlms[idx] != null) {
try {
dlms[idx].close();
} catch (IOException e) {
LOG.warn("Failed on closing dlm {} : ", streamName, e);
}
dlms[idx] = null;
}
try {
dlms[idx] = namespace.openLog(streamName);
} catch (IOException ioe) {
LOG.error("Failed on creating dlm {} : ", streamName, ioe);
scheduleReinitStream(idx, promise);
return;
}
DLSN lastDLSN;
if (readFromHead) {
lastDLSN = DLSN.InitialDLSN;
} else {
try {
lastDLSN = dlms[idx].getLastDLSN();
} catch (IOException ioe) {
LOG.error("Failed on getting last dlsn from stream {} : ", streamName, ioe);
scheduleReinitStream(idx, promise);
return;
}
}
try {
logReaders[idx] = dlms[idx].getAsyncLogReader(lastDLSN);
} catch (IOException ioe) {
LOG.error("Failed on opening reader for stream {} starting from {} : ",
new Object[] { streamName, lastDLSN, ioe });
scheduleReinitStream(idx, promise);
return;
}
LOG.info("Opened reader for stream {}, starting from {}.", streamName, lastDLSN);
promise.setValue(null);
}
Future<Void> scheduleReinitStream(int idx) {
Promise<Void> promise = new Promise<Void>();
scheduleReinitStream(idx, promise);
return promise;
}
void scheduleReinitStream(final int idx, final Promise<Void> promise) {
executorService.schedule(new Runnable() {
@Override
public void run() {
reinitStream(idx, promise);
}
}, BACKOFF_MS, TimeUnit.MILLISECONDS);
}
@Override
public void close() throws IOException {
this.running = false;
for (AsyncLogReader reader : logReaders) {
if (null != reader) {
org.apache.distributedlog.util.Utils.ioResult(reader.asyncClose());
}
}
for (DistributedLogManager dlm : dlms) {
if (null != dlm) {
dlm.close();
}
}
namespace.close();
SchedulerUtils.shutdownScheduler(executorService, 2, TimeUnit.MINUTES);
SchedulerUtils.shutdownScheduler(callbackExecutor, 2, TimeUnit.MINUTES);
if (this.dlc != null) {
this.dlc.close();
}
for (DLZkServerSet serverSet: serverSets) {
serverSet.close();
}
// Unregister gauges to prevent GC spirals
for (StreamReader sr : streamReaders) {
sr.unregisterGauge();
}
}
@Override
public void run() {
LOG.info("Starting reader (prefix = {}, numStreams = {}).",
streamPrefix, numStreams);
for (StreamReader sr : streamReaders) {
sr.readLoop();
}
}
}