blob: 1d77b8d4c96b1e8214ed63164adf58d854ad5c2c [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bookkeeper.tools.perf.dlog;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.common.net.ServiceURI;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.distributedlog.Entry;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.exceptions.EndOfLogSegmentException;
import org.apache.distributedlog.logsegment.LogSegmentEntryReader;
import org.apache.distributedlog.namespace.NamespaceDriver.Role;
/**
* A perf writer to evaluate write performance.
*/
@Slf4j
public class PerfSegmentReader extends PerfReaderBase {
@Data
static class Split {
final DistributedLogManager manager;
final LogSegmentMetadata segment;
final long startEntryId;
final long endEntryId;
}
PerfSegmentReader(ServiceURI serviceURI, Flags flags) {
super(serviceURI, flags);
}
@Override
protected void execute(Namespace namespace) throws Exception {
List<DistributedLogManager> managers = new ArrayList<>(flags.numLogs);
for (int i = 0; i < flags.numLogs; i++) {
String logName = String.format(flags.logName, i);
managers.add(namespace.openLog(logName));
}
log.info("Successfully open {} logs", managers.size());
// Get all the log segments
final List<Pair<DistributedLogManager, LogSegmentMetadata>> segments = managers.stream()
.flatMap(manager -> {
try {
return manager.getLogSegments().stream().map(segment -> Pair.of(manager, segment));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
})
.collect(Collectors.toList());
final List<Split> splits = segments.stream()
.flatMap(entry -> getNumSplits(entry.getLeft(), entry.getRight()).stream())
.collect(Collectors.toList());
// register shutdown hook to aggregate stats
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
isDone.set(true);
printAggregatedStats(cumulativeRecorder);
}));
ExecutorService executor = Executors.newFixedThreadPool(flags.numThreads);
try {
for (int i = 0; i < flags.numThreads; i++) {
final int idx = i;
final List<Split> splitsThisThread = splits
.stream()
.filter(split -> splits.indexOf(split) % flags.numThreads == idx)
.collect(Collectors.toList());
executor.submit(() -> {
try {
read(splitsThisThread);
} catch (Exception e) {
log.error("Encountered error at writing records", e);
}
});
}
log.info("Started {} write threads", flags.numThreads);
reportStats();
} finally {
executor.shutdown();
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
managers.forEach(manager -> manager.asyncClose());
}
}
void read(List<Split> splits) throws Exception {
log.info("Read thread started with : splits = {}",
splits.stream()
.map(l -> "(log = " + l.manager.getStreamName() + ", segment = "
+ l.segment.getLogSegmentSequenceNumber() + " [" + l.startEntryId + ", " + l.endEntryId + "])")
.collect(Collectors.toList()));
splits.forEach(entry -> {
try {
readSegmentSplit(entry);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
void readSegmentSplit(Split split) throws Exception {
LogSegmentEntryReader reader = result(split.manager.getNamespaceDriver().getLogSegmentEntryStore(Role.READER)
.openReader(split.segment, split.getStartEntryId()));
reader.start();
try {
MutableBoolean isDone = new MutableBoolean(false);
while (!isDone.booleanValue()) {
// 100 is just an indicator
List<Entry.Reader> entries = result(reader.readNext(100));
entries.forEach(entry -> {
LogRecordWithDLSN record;
try {
while ((record = entry.nextRecord()) != null) {
recordsRead.increment();
bytesRead.add(record.getPayloadBuf().readableBytes());
}
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
} finally {
entry.release();
}
if (split.getEndEntryId() >= 0 && entry.getEntryId() >= split.getEndEntryId()) {
isDone.setValue(true);
}
});
}
} catch (EndOfLogSegmentException e) {
// we reached end of log segment
return;
} finally {
reader.asyncClose();
}
}
List<Split> getNumSplits(DistributedLogManager manager, LogSegmentMetadata segment) {
if (flags.numSplitsPerSegment <= 1) {
// do split
return Lists.newArrayList(
new Split(
manager,
segment,
0L,
-1L)
);
} else {
long lastEntryId = segment.getLastEntryId();
long numEntriesPerSplit = (lastEntryId + 1) / 2;
long nextEntryId = 0L;
List<Split> splitsInSegment = new ArrayList<>(flags.numSplitsPerSegment);
for (int i = 0; i < flags.numSplitsPerSegment; i++) {
long startEntryId = nextEntryId;
long endEntryId;
if (i == flags.numSplitsPerSegment - 1) {
endEntryId = lastEntryId;
} else {
endEntryId = nextEntryId + numEntriesPerSplit - 1;
}
splitsInSegment.add(new Split(
manager,
segment,
startEntryId,
endEntryId
));
nextEntryId = endEntryId + 1;
}
return splitsInSegment;
}
}
}