blob: 5c9b2a9cf55df56fd439d45886bc0d6bd3f7ceae [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.benchmark;
import static com.google.common.base.Preconditions.checkArgument;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.benchmark.utils.ShiftableRateLimiter;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.common.util.SchedulerUtils;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The benchmark for core library writer.
*/
public class DLWriterWorker implements Worker {
private static final Logger LOG = LoggerFactory.getLogger(DLWriterWorker.class);
static final int BACKOFF_MS = 200;
final String streamPrefix;
final int startStreamId;
final int endStreamId;
final int writeConcurrency;
final int messageSizeBytes;
final ExecutorService executorService;
final ScheduledExecutorService rescueService;
final ShiftableRateLimiter rateLimiter;
final Random random;
final Namespace namespace;
final List<DistributedLogManager> dlms;
final List<AsyncLogWriter> streamWriters;
final int numStreams;
volatile boolean running = true;
final StatsLogger statsLogger;
final OpStatsLogger requestStat;
public DLWriterWorker(DistributedLogConfiguration conf,
URI uri,
String streamPrefix,
int startStreamId,
int endStreamId,
ShiftableRateLimiter rateLimiter,
int writeConcurrency,
int messageSizeBytes,
StatsLogger statsLogger) throws IOException {
checkArgument(startStreamId <= endStreamId);
this.streamPrefix = streamPrefix;
this.startStreamId = startStreamId;
this.endStreamId = endStreamId;
this.rateLimiter = rateLimiter;
this.writeConcurrency = writeConcurrency;
this.messageSizeBytes = messageSizeBytes;
this.statsLogger = statsLogger;
this.requestStat = this.statsLogger.getOpStatsLogger("requests");
this.executorService = Executors.newCachedThreadPool();
this.rescueService = Executors.newSingleThreadScheduledExecutor();
this.random = new Random(System.currentTimeMillis());
this.namespace = NamespaceBuilder.newBuilder()
.conf(conf)
.uri(uri)
.statsLogger(statsLogger.scope("dl"))
.build();
this.numStreams = endStreamId - startStreamId;
dlms = new ArrayList<DistributedLogManager>(numStreams);
streamWriters = new ArrayList<AsyncLogWriter>(numStreams);
final ConcurrentMap<String, AsyncLogWriter> writers = new ConcurrentHashMap<String, AsyncLogWriter>();
final CountDownLatch latch = new CountDownLatch(this.numStreams);
for (int i = startStreamId; i < endStreamId; i++) {
final String streamName = String.format("%s_%d", streamPrefix, i);
final DistributedLogManager dlm = namespace.openLog(streamName);
executorService.submit(new Runnable() {
@Override
public void run() {
try {
AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
if (null != writers.putIfAbsent(streamName, writer)) {
FutureUtils.result(writer.asyncClose());
}
latch.countDown();
} catch (Exception e) {
LOG.error("Failed to intialize writer for stream : {}", streamName, e);
}
}
});
dlms.add(dlm);
}
try {
latch.await();
} catch (InterruptedException e) {
throw new IOException("Interrupted on initializing writers for streams.", e);
}
for (int i = startStreamId; i < endStreamId; i++) {
final String streamName = String.format("%s_%d", streamPrefix, i);
AsyncLogWriter writer = writers.get(streamName);
if (null == writer) {
throw new IOException("Writer for " + streamName + " never initialized.");
}
streamWriters.add(writer);
}
LOG.info("Writing to {} streams.", numStreams);
}
void rescueWriter(int idx, AsyncLogWriter writer) {
if (streamWriters.get(idx) == writer) {
try {
FutureUtils.result(writer.asyncClose());
} catch (Exception e) {
LOG.error("Failed to close writer for stream {}.", idx);
}
AsyncLogWriter newWriter = null;
try {
newWriter = dlms.get(idx).startAsyncLogSegmentNonPartitioned();
} catch (IOException e) {
LOG.error("Failed to create new writer for stream {}, backoff for {} ms.",
idx, BACKOFF_MS);
scheduleRescue(idx, writer, BACKOFF_MS);
}
streamWriters.set(idx, newWriter);
} else {
LOG.warn("AsyncLogWriter for stream {} was already rescued.", idx);
}
}
void scheduleRescue(final int idx, final AsyncLogWriter writer, int delayMs) {
Runnable r = new Runnable() {
@Override
public void run() {
rescueWriter(idx, writer);
}
};
if (delayMs > 0) {
rescueService.schedule(r, delayMs, TimeUnit.MILLISECONDS);
} else {
rescueService.submit(r);
}
}
@Override
public void close() throws IOException {
this.running = false;
SchedulerUtils.shutdownScheduler(this.executorService, 2, TimeUnit.MINUTES);
SchedulerUtils.shutdownScheduler(this.rescueService, 2, TimeUnit.MINUTES);
for (AsyncLogWriter writer : streamWriters) {
org.apache.distributedlog.util.Utils.ioResult(writer.asyncClose());
}
for (DistributedLogManager dlm : dlms) {
dlm.close();
}
namespace.close();
}
@Override
public void run() {
LOG.info("Starting dlwriter (concurrency = {}, prefix = {}, numStreams = {})",
new Object[] { writeConcurrency, streamPrefix, numStreams });
for (int i = 0; i < writeConcurrency; i++) {
executorService.submit(new Writer(i));
}
}
class Writer implements Runnable {
final int idx;
Writer(int idx) {
this.idx = idx;
}
@Override
public void run() {
LOG.info("Started writer {}.", idx);
while (running) {
final int streamIdx = random.nextInt(numStreams);
final AsyncLogWriter writer = streamWriters.get(streamIdx);
rateLimiter.getLimiter().acquire();
final long requestMillis = System.currentTimeMillis();
final byte[] data;
try {
data = Utils.generateMessage(requestMillis, messageSizeBytes);
} catch (TException e) {
LOG.error("Error on generating message : ", e);
break;
}
writer.write(new LogRecord(requestMillis, data)).whenComplete(new FutureEventListener<DLSN>() {
@Override
public void onSuccess(DLSN value) {
requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis);
}
@Override
public void onFailure(Throwable cause) {
requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis);
LOG.error("Failed to publish, rescue it : ", cause);
scheduleRescue(streamIdx, writer, 0);
}
});
}
}
}
}