blob: 7e1464c4da6d8bcb6dfe0b723746dad76cb154e0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog;
import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.URI;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.versioning.Version;
import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.api.MetadataAccessor;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.common.util.PermitLimiter;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryWriter;
import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
import org.apache.distributedlog.namespace.NamespaceDriver;
import org.apache.distributedlog.util.ConfUtils;
import org.apache.distributedlog.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Utility class for setting up bookkeeper ensembles
* and bringing individual bookies up and down.
*/
public class DLMTestUtil {
protected static final Logger LOG = LoggerFactory.getLogger(DLMTestUtil.class);
private static final byte[] payloadStatic = repeatString("abc", 512).getBytes();
static String repeatString(String s, int n) {
String ret = s;
for (int i = 1; i < n; i++) {
ret += s;
}
return ret;
}
public static Map<Long, LogSegmentMetadata> readLogSegments(ZooKeeperClient zkc, String ledgerPath)
throws Exception {
List<String> children = zkc.get().getChildren(ledgerPath, false);
LOG.info("Children under {} : {}", ledgerPath, children);
Map<Long, LogSegmentMetadata> segments =
new HashMap<Long, LogSegmentMetadata>(children.size());
for (String child : children) {
LogSegmentMetadata segment =
Utils.ioResult(LogSegmentMetadata.read(zkc, ledgerPath + "/" + child));
LOG.info("Read segment {} : {}", child, segment);
segments.put(segment.getLogSegmentSequenceNumber(), segment);
}
return segments;
}
public static URI createDLMURI(int port, String path) throws Exception {
return LocalDLMEmulator.createDLMURI("127.0.0.1:" + port, path);
}
public static DistributedLogManager createNewDLM(String name,
DistributedLogConfiguration conf,
URI uri) throws Exception {
Namespace namespace = NamespaceBuilder.newBuilder()
.conf(conf).uri(uri).build();
return namespace.openLog(name);
}
static MetadataAccessor createNewMetadataAccessor(DistributedLogConfiguration conf,
String name,
URI uri) throws Exception {
// TODO: Metadata Accessor seems to be a legacy object which only used by kestrel
// (we might consider deprecating this)
Namespace namespace = NamespaceBuilder.newBuilder()
.conf(conf).uri(uri).build();
return namespace.getNamespaceDriver().getMetadataAccessor(name);
}
public static void fenceStream(DistributedLogConfiguration conf, URI uri, String name) throws Exception {
DistributedLogManager dlm = createNewDLM(name, conf, uri);
try {
List<LogSegmentMetadata> logSegmentList = dlm.getLogSegments();
LogSegmentMetadata lastSegment = logSegmentList.get(logSegmentList.size() - 1);
LogSegmentEntryStore entryStore =
dlm.getNamespaceDriver().getLogSegmentEntryStore(NamespaceDriver.Role.READER);
Utils.close(Utils.ioResult(entryStore.openRandomAccessReader(lastSegment, true)));
} finally {
dlm.close();
}
}
static long getNumberofLogRecords(DistributedLogManager bkdlm, long startTxId) throws IOException {
long numLogRecs = 0;
LogReader reader = bkdlm.getInputStream(startTxId);
LogRecord record = reader.readNext(false);
while (null != record) {
numLogRecs++;
verifyLogRecord(record);
record = reader.readNext(false);
}
reader.close();
return numLogRecs;
}
public static LogRecord getLogRecordInstance(long txId) {
return new LogRecord(txId, generatePayload(txId));
}
public static LogRecord getLogRecordInstance(long txId, int size) {
ByteBuffer buf = ByteBuffer.allocate(size);
return new LogRecord(txId, buf.array());
}
public static void verifyLogRecord(LogRecord record) {
assertEquals(generatePayload(record.getTransactionId()).length, record.getPayload().length);
assertArrayEquals(generatePayload(record.getTransactionId()), record.getPayload());
assertTrue(!record.isControl());
verifyPayload(record.getTransactionId(), record.getPayload());
}
static byte[] generatePayload(long txId) {
return String.format("%d;%d", txId, txId).getBytes();
}
static void verifyPayload(long txId, byte[] payload) {
String[] txIds = new String(payload).split(";");
assertEquals(Long.valueOf(txIds[0]), Long.valueOf(txIds[0]));
}
static LogRecord getLargeLogRecordInstance(long txId, boolean control) {
LogRecord record = new LogRecord(txId, payloadStatic);
if (control) {
record.setControl();
}
return record;
}
static LogRecord getLargeLogRecordInstance(long txId) {
return new LogRecord(txId, payloadStatic);
}
static List<LogRecord> getLargeLogRecordInstanceList(long firstTxId, int count) {
List<LogRecord> logrecs = new ArrayList<LogRecord>(count);
for (long i = 0; i < count; i++) {
logrecs.add(getLargeLogRecordInstance(firstTxId + i));
}
return logrecs;
}
static List<LogRecord> getLogRecordInstanceList(long firstTxId, int count, int size) {
List<LogRecord> logrecs = new ArrayList<LogRecord>(count);
for (long i = 0; i < count; i++) {
logrecs.add(getLogRecordInstance(firstTxId + i, size));
}
return logrecs;
}
static void verifyLargeLogRecord(LogRecord record) {
verifyLargeLogRecord(record.getPayload());
}
static void verifyLargeLogRecord(byte[] payload) {
assertArrayEquals(payloadStatic, payload);
}
static LogRecord getEmptyLogRecordInstance(long txId) {
return new LogRecord(txId, new byte[0]);
}
static void verifyEmptyLogRecord(LogRecord record) {
assertEquals(record.getPayload().length, 0);
}
public static LogRecordWithDLSN getLogRecordWithDLSNInstance(DLSN dlsn, long txId) {
return getLogRecordWithDLSNInstance(dlsn, txId, false);
}
public static LogRecordWithDLSN getLogRecordWithDLSNInstance(DLSN dlsn, long txId, boolean isControlRecord) {
LogRecordWithDLSN record = new LogRecordWithDLSN(dlsn, txId, generatePayload(txId), 1L);
record.setPositionWithinLogSegment((int) txId + 1);
if (isControlRecord) {
record.setControl();
}
return record;
}
public static String inprogressZNodeName(long logSegmentSeqNo) {
return String.format("%s_%018d", DistributedLogConstants.INPROGRESS_LOGSEGMENT_PREFIX, logSegmentSeqNo);
}
public static String completedLedgerZNodeNameWithVersion(long ledgerId,
long firstTxId, long lastTxId, long logSegmentSeqNo) {
return String.format("%s_%018d_%018d_%018d_v%dl%d_%04d", DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX,
firstTxId, lastTxId, logSegmentSeqNo, DistributedLogConstants.LOGSEGMENT_NAME_VERSION, ledgerId,
DistributedLogConstants.LOCAL_REGION_ID);
}
public static String completedLedgerZNodeNameWithTxID(long firstTxId, long lastTxId) {
return String.format("%s_%018d_%018d",
DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX, firstTxId, lastTxId);
}
public static String completedLedgerZNodeNameWithLogSegmentSequenceNumber(long logSegmentSeqNo) {
return String.format("%s_%018d", DistributedLogConstants.COMPLETED_LOGSEGMENT_PREFIX, logSegmentSeqNo);
}
public static LogSegmentMetadata inprogressLogSegment(String ledgerPath,
long ledgerId,
long firstTxId,
long logSegmentSeqNo) {
return inprogressLogSegment(ledgerPath, ledgerId, firstTxId, logSegmentSeqNo,
LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION);
}
public static LogSegmentMetadata inprogressLogSegment(String ledgerPath,
long ledgerId,
long firstTxId,
long logSegmentSeqNo,
int version) {
return new LogSegmentMetadata.LogSegmentMetadataBuilder(
ledgerPath + "/" + inprogressZNodeName(logSegmentSeqNo),
version,
ledgerId,
firstTxId)
.setLogSegmentSequenceNo(logSegmentSeqNo)
.build();
}
public static LogSegmentMetadata completedLogSegment(String ledgerPath,
long ledgerId,
long firstTxId,
long lastTxId,
int recordCount,
long logSegmentSeqNo,
long lastEntryId,
long lastSlotId) {
return completedLogSegment(ledgerPath, ledgerId, firstTxId, lastTxId,
recordCount, logSegmentSeqNo, lastEntryId, lastSlotId,
LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION);
}
public static LogSegmentMetadata completedLogSegment(String ledgerPath,
long ledgerId,
long firstTxId,
long lastTxId,
int recordCount,
long logSegmentSeqNo,
long lastEntryId,
long lastSlotId,
int version) {
LogSegmentMetadata metadata =
new LogSegmentMetadata.LogSegmentMetadataBuilder(
ledgerPath + "/" + inprogressZNodeName(logSegmentSeqNo),
version,
ledgerId,
firstTxId)
.setInprogress(false)
.setLogSegmentSequenceNo(logSegmentSeqNo)
.build();
return metadata.completeLogSegment(ledgerPath + "/"
+ completedLedgerZNodeNameWithLogSegmentSequenceNumber(logSegmentSeqNo),
lastTxId, recordCount, lastEntryId, lastSlotId, firstTxId);
}
public static void generateCompletedLogSegments(DistributedLogManager manager, DistributedLogConfiguration conf,
long numCompletedSegments, long segmentSize) throws Exception {
BKDistributedLogManager dlm = (BKDistributedLogManager) manager;
long txid = 1L;
for (long i = 0; i < numCompletedSegments; i++) {
BKSyncLogWriter writer = dlm.startLogSegmentNonPartitioned();
for (long j = 1; j <= segmentSize; j++) {
writer.write(DLMTestUtil.getLogRecordInstance(txid++));
}
writer.closeAndComplete();
}
}
public static long generateLogSegmentNonPartitioned(DistributedLogManager dlm,
int controlEntries, int userEntries, long startTxid) throws Exception {
return generateLogSegmentNonPartitioned(dlm, controlEntries, userEntries, startTxid, 1L);
}
public static long generateLogSegmentNonPartitioned(DistributedLogManager dlm, int controlEntries,
int userEntries, long startTxid, long txidStep) throws Exception {
AsyncLogWriter out = dlm.startAsyncLogSegmentNonPartitioned();
long txid = startTxid;
for (int i = 0; i < controlEntries; ++i) {
LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid);
record.setControl();
Utils.ioResult(out.write(record));
txid += txidStep;
}
for (int i = 0; i < userEntries; ++i) {
LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid);
Utils.ioResult(out.write(record));
txid += txidStep;
}
Utils.close(out);
return txid - startTxid;
}
public static ZooKeeperClient getZooKeeperClient(BKDistributedLogManager dlm) {
return ((BKNamespaceDriver) dlm.getNamespaceDriver()).getWriterZKC();
}
public static BookKeeperClient getBookKeeperClient(BKDistributedLogManager dlm) {
return ((BKNamespaceDriver) dlm.getNamespaceDriver()).getReaderBKC();
}
public static void injectLogSegmentWithGivenLogSegmentSeqNo(DistributedLogManager manager,
DistributedLogConfiguration conf, long logSegmentSeqNo, long startTxID,
boolean writeEntries, long segmentSize, boolean completeLogSegment)
throws Exception {
BKDistributedLogManager dlm = (BKDistributedLogManager) manager;
BKLogWriteHandler writeHandler = dlm.createWriteHandler(false);
Utils.ioResult(writeHandler.lockHandler());
// Start a log segment with a given ledger seq number.
BookKeeperClient bkc = getBookKeeperClient(dlm);
LedgerHandle lh = bkc.get().createLedger(conf.getEnsembleSize(), conf.getWriteQuorumSize(),
conf.getAckQuorumSize(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes());
String inprogressZnodeName = writeHandler.inprogressZNodeName(lh.getId(), startTxID, logSegmentSeqNo);
String znodePath = writeHandler.inprogressZNode(lh.getId(), startTxID, logSegmentSeqNo);
int logSegmentMetadataVersion = conf.getDLLedgerMetadataLayoutVersion();
LogSegmentMetadata l =
new LogSegmentMetadata.LogSegmentMetadataBuilder(znodePath,
logSegmentMetadataVersion, lh.getId(), startTxID)
.setLogSegmentSequenceNo(logSegmentSeqNo)
.setEnvelopeEntries(LogSegmentMetadata.supportsEnvelopedEntries(logSegmentMetadataVersion))
.build();
l.write(getZooKeeperClient(dlm));
writeHandler.maxTxId.update(Version.ANY, startTxID);
writeHandler.addLogSegmentToCache(inprogressZnodeName, l);
BKLogSegmentWriter writer = new BKLogSegmentWriter(
writeHandler.getFullyQualifiedName(),
inprogressZnodeName,
conf,
conf.getDLLedgerMetadataLayoutVersion(),
new BKLogSegmentEntryWriter(lh),
writeHandler.lock,
startTxID,
logSegmentSeqNo,
writeHandler.scheduler,
writeHandler.statsLogger,
writeHandler.statsLogger,
writeHandler.alertStatsLogger,
PermitLimiter.NULL_PERMIT_LIMITER,
new SettableFeatureProvider("", 0),
ConfUtils.getConstDynConf(conf));
if (writeEntries) {
long txid = startTxID;
for (long j = 1; j <= segmentSize; j++) {
writer.write(DLMTestUtil.getLogRecordInstance(txid++));
}
Utils.ioResult(writer.flushAndCommit());
}
if (completeLogSegment) {
Utils.ioResult(writeHandler.completeAndCloseLogSegment(writer));
}
Utils.ioResult(writeHandler.unlockHandler());
}
public static void injectLogSegmentWithLastDLSN(DistributedLogManager manager, DistributedLogConfiguration conf,
long logSegmentSeqNo, long startTxID, long segmentSize,
boolean recordWrongLastDLSN) throws Exception {
BKDistributedLogManager dlm = (BKDistributedLogManager) manager;
BKLogWriteHandler writeHandler = dlm.createWriteHandler(false);
Utils.ioResult(writeHandler.lockHandler());
// Start a log segment with a given ledger seq number.
BookKeeperClient bkc = getBookKeeperClient(dlm);
LedgerHandle lh = bkc.get().createLedger(conf.getEnsembleSize(), conf.getWriteQuorumSize(),
conf.getAckQuorumSize(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes());
String inprogressZnodeName = writeHandler.inprogressZNodeName(lh.getId(), startTxID, logSegmentSeqNo);
String znodePath = writeHandler.inprogressZNode(lh.getId(), startTxID, logSegmentSeqNo);
LogSegmentMetadata l =
new LogSegmentMetadata.LogSegmentMetadataBuilder(znodePath,
conf.getDLLedgerMetadataLayoutVersion(), lh.getId(), startTxID)
.setLogSegmentSequenceNo(logSegmentSeqNo)
.setInprogress(false)
.build();
l.write(getZooKeeperClient(dlm));
writeHandler.maxTxId.update(Version.ANY, startTxID);
writeHandler.addLogSegmentToCache(inprogressZnodeName, l);
BKLogSegmentWriter writer = new BKLogSegmentWriter(
writeHandler.getFullyQualifiedName(),
inprogressZnodeName,
conf,
conf.getDLLedgerMetadataLayoutVersion(),
new BKLogSegmentEntryWriter(lh),
writeHandler.lock,
startTxID,
logSegmentSeqNo,
writeHandler.scheduler,
writeHandler.statsLogger,
writeHandler.statsLogger,
writeHandler.alertStatsLogger,
PermitLimiter.NULL_PERMIT_LIMITER,
new SettableFeatureProvider("", 0),
ConfUtils.getConstDynConf(conf));
long txid = startTxID;
DLSN wrongDLSN = null;
for (long j = 1; j <= segmentSize; j++) {
DLSN dlsn = Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(txid++)));
if (j == (segmentSize - 1)) {
wrongDLSN = dlsn;
}
}
assertNotNull(wrongDLSN);
if (recordWrongLastDLSN) {
Utils.ioResult(writer.asyncClose());
writeHandler.completeAndCloseLogSegment(
writeHandler.inprogressZNodeName(writer.getLogSegmentId(),
writer.getStartTxId(), writer.getLogSegmentSequenceNumber()),
writer.getLogSegmentSequenceNumber(),
writer.getLogSegmentId(),
writer.getStartTxId(),
startTxID + segmentSize - 2,
writer.getPositionWithinLogSegment() - 1,
wrongDLSN.getEntryId(),
wrongDLSN.getSlotId());
} else {
Utils.ioResult(writeHandler.completeAndCloseLogSegment(writer));
}
Utils.ioResult(writeHandler.unlockHandler());
}
public static void updateSegmentMetadata(ZooKeeperClient zkc, LogSegmentMetadata segment) throws Exception {
byte[] finalisedData = segment.getFinalisedData().getBytes(UTF_8);
zkc.get().setData(segment.getZkPath(), finalisedData, -1);
}
public static ServerConfiguration loadTestBkConf() {
ServerConfiguration conf = new ServerConfiguration();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
URL confUrl = classLoader.getResource("bk_server.conf");
try {
if (null != confUrl) {
conf.loadConf(confUrl);
LOG.info("loaded bk_server.conf from resources");
}
} catch (org.apache.commons.configuration.ConfigurationException ex) {
LOG.warn("loading conf failed", ex);
}
conf.setAllowLoopback(true);
return conf;
}
public static <T> void validateFutureFailed(CompletableFuture<T> future, Class exClass) {
try {
Utils.ioResult(future);
} catch (Exception ex) {
LOG.info("Expected: {} Actual: {}", exClass.getName(), ex.getClass().getName());
assertTrue("exceptions types equal", exClass.isInstance(ex));
}
}
public static <T> T validateFutureSucceededAndGetResult(CompletableFuture<T> future) throws Exception {
try {
return Utils.ioResult(future, 10, TimeUnit.SECONDS);
} catch (Exception ex) {
fail("unexpected exception " + ex.getClass().getName());
throw ex;
}
}
}