blob: 830e0596c69855426fbfa5062e82902c982dd81a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.distributedlog;
import com.twitter.distributedlog.util.FutureUtils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestInterleavedReaders extends TestDistributedLogBase {
static final Logger LOG = LoggerFactory.getLogger(TestInterleavedReaders.class);
static {
conf.setOutputBufferSize(0);
conf.setImmediateFlushEnabled(true);
}
private int drainStreams(LogReader reader0, int num0, LogReader reader1, int num1)
throws Exception {
// Allow time for watches to fire
Thread.sleep(15);
int numTrans = 0;
LogRecord record;
int i = 0;
while (i < num0) {
record = reader0.readNext(false);
if (null != record) {
assertTrue((record.getTransactionId() % 2 == 0));
DLMTestUtil.verifyLogRecord(record);
numTrans++;
i++;
LOG.info("Read record {}", record);
}
}
i = 0;
while (i < num1) {
record = reader1.readNext(false);
if (null != record) {
assertTrue((record.getTransactionId() % 2 == 1));
DLMTestUtil.verifyLogRecord(record);
numTrans++;
i++;
LOG.info("Read record {}", record);
}
}
return numTrans;
}
@Test(timeout = 60000)
public void testInterleavedReaders() throws Exception {
String name = "distrlog-interleaved";
BKDistributedLogManager dlmwrite0 = createNewDLM(conf, name + "-0");
BKDistributedLogManager dlmreader0 = createNewDLM(conf, name + "-0");
BKDistributedLogManager dlmwrite1 = createNewDLM(conf, name + "-1");
BKDistributedLogManager dlmreader1 = createNewDLM(conf, name + "-1");
LogReader reader0 = null;
LogReader reader1 = null;
long txid = 1;
int numTrans = 0;
BKAsyncLogWriter writer0 = dlmwrite0.startAsyncLogSegmentNonPartitioned();
BKAsyncLogWriter writer1 = dlmwrite1.startAsyncLogSegmentNonPartitioned();
for (long j = 1; j <= 4; j++) {
for (int k = 1; k <= 10; k++) {
FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
}
FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
if (null == reader0) {
reader0 = dlmreader0.getInputStream(1);
}
if (null == reader1) {
reader1 = dlmreader1.getInputStream(1);
}
numTrans += drainStreams(reader0, 10, reader1, 10);
assertEquals((txid - 1), numTrans);
}
reader0.close();
reader1.close();
dlmreader0.close();
dlmwrite0.close();
dlmreader1.close();
dlmwrite1.close();
}
@Test(timeout = 60000)
public void testInterleavedReadersWithRollingEdge() throws Exception {
String name = "distrlog-interleaved-rolling-edge";
BKDistributedLogManager dlmwrite0 = createNewDLM(conf, name + "-0");
BKDistributedLogManager dlmreader0 = createNewDLM(conf, name + "-0");
BKDistributedLogManager dlmwrite1 = createNewDLM(conf, name + "-1");
BKDistributedLogManager dlmreader1 = createNewDLM(conf, name + "-1");
LogReader reader0 = null;
LogReader reader1 = null;
long txid = 1;
int numTrans = 0;
BKAsyncLogWriter writer0 = dlmwrite0.startAsyncLogSegmentNonPartitioned();
BKAsyncLogWriter writer1 = dlmwrite1.startAsyncLogSegmentNonPartitioned();
for (long j = 1; j <= 4; j++) {
if (j > 1) {
writer0.setForceRolling(true);
writer1.setForceRolling(true);
}
for (int k = 1; k <= 2; k++) {
FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
writer0.setForceRolling(false);
writer1.setForceRolling(false);
}
FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
LOG.info("Completed {} write", j);
if (null == reader0) {
reader0 = dlmreader0.getInputStream(1);
}
if (null == reader1) {
reader1 = dlmreader1.getInputStream(1);
}
numTrans += drainStreams(reader0, 2, reader1, 2);
assertEquals((txid - 1), numTrans);
}
reader0.close();
reader1.close();
dlmreader0.close();
dlmwrite0.close();
dlmreader1.close();
dlmwrite1.close();
}
@Test(timeout = 60000)
public void testInterleavedReadersWithRolling() throws Exception {
String name = "distrlog-interleaved-rolling";
BKDistributedLogManager dlmwrite0 = createNewDLM(conf, name + "-0");
BKDistributedLogManager dlmreader0 = createNewDLM(conf, name + "-0");
BKDistributedLogManager dlmwrite1 = createNewDLM(conf, name + "-1");
BKDistributedLogManager dlmreader1 = createNewDLM(conf, name + "-1");
LogReader reader0 = null;
LogReader reader1 = null;
long txid = 1;
int numTrans = 0;
BKAsyncLogWriter writer0 = dlmwrite0.startAsyncLogSegmentNonPartitioned();
BKAsyncLogWriter writer1 = dlmwrite1.startAsyncLogSegmentNonPartitioned();
for (long j = 1; j <= 2; j++) {
for (int k = 1; k <= 6; k++) {
if (k == 3) {
writer0.setForceRolling(true);
writer1.setForceRolling(true);
}
FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
writer0.setForceRolling(false);
writer1.setForceRolling(false);
}
FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
if (null == reader0) {
reader0 = dlmreader0.getInputStream(1);
}
if (null == reader1) {
reader1 = dlmreader1.getInputStream(1);
}
numTrans += drainStreams(reader0, 6, reader1, 6);
assertEquals((txid - 1), numTrans);
}
reader0.close();
reader1.close();
dlmreader0.close();
dlmwrite0.close();
dlmreader1.close();
dlmwrite1.close();
}
@Test(timeout = 60000)
public void testInterleavedReadersWithCleanup() throws Exception {
String name = "distrlog-interleaved-cleanup";
BKDistributedLogManager dlmwrite0 = createNewDLM(conf, name + "-0");
BKDistributedLogManager dlmwrite1 = createNewDLM(conf, name + "-1");
long txid = 1;
Long retentionPeriodOverride = null;
BKAsyncLogWriter writer0 = dlmwrite0.startAsyncLogSegmentNonPartitioned();
BKAsyncLogWriter writer1 = dlmwrite1.startAsyncLogSegmentNonPartitioned();
for (long j = 1; j <= 4; j++) {
for (int k = 1; k <= 10; k++) {
if (k == 5) {
writer0.setForceRolling(true);
writer0.overRideMinTimeStampToKeep(retentionPeriodOverride);
writer1.setForceRolling(true);
writer1.overRideMinTimeStampToKeep(retentionPeriodOverride);
}
DLSN dlsn1 = FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
LOG.info("writer1 write record {}", dlsn1);
DLSN dlsn0 = FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
LOG.info("writer0 write record {}", dlsn0);
if (k == 5) {
writer0.setForceRolling(false);
writer1.setForceRolling(false);
retentionPeriodOverride = System.currentTimeMillis();
}
Thread.sleep(5);
}
FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
}
writer0.close();
writer1.close();
DistributedLogManager dlmreader0 = createNewDLM(conf, name + "-0");
DistributedLogManager dlmreader1 = createNewDLM(conf, name + "-1");
LogReader reader0 = dlmreader0.getInputStream(1);
LogReader reader1 = dlmreader1.getInputStream(1);
int numTrans = drainStreams(reader0, 15, reader1, 15);
assertEquals(30, numTrans);
reader0.close();
reader1.close();
dlmreader0.close();
dlmwrite0.close();
dlmreader1.close();
dlmwrite1.close();
}
@Test(timeout = 60000)
public void testInterleavedReadersWithRecovery() throws Exception {
String name = "distrlog-interleaved-recovery";
BKDistributedLogManager dlmwrite0 = createNewDLM(conf, name + "-0");
BKDistributedLogManager dlmreader0 = createNewDLM(conf, name + "-0");
BKDistributedLogManager dlmwrite1 = createNewDLM(conf, name + "-1");
BKDistributedLogManager dlmreader1 = createNewDLM(conf, name + "-1");
LogReader reader0 = null;
LogReader reader1 = null;
long txid = 1;
int numTrans = 0;
BKAsyncLogWriter writer0 = dlmwrite0.startAsyncLogSegmentNonPartitioned();
BKAsyncLogWriter writer1 = dlmwrite1.startAsyncLogSegmentNonPartitioned();
for (long j = 1; j <= 2; j++) {
for (int k = 1; k <= 6; k++) {
if (k == 3) {
writer0.setForceRecovery(true);
writer1.setForceRecovery(true);
}
DLSN dlsn1 = FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
LOG.info("writer1 write record {} - txid = {}", dlsn1, txid-1);
DLSN dlsn0 = FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
LOG.info("writer0 write record {} - txid = {}", dlsn0, txid-1);
writer0.setForceRecovery(false);
writer1.setForceRecovery(false);
}
FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
if (null == reader0) {
reader0 = dlmreader0.getInputStream(1);
}
if (null == reader1) {
reader1 = dlmreader1.getInputStream(1);
}
numTrans += drainStreams(reader0, 6, reader1, 6);
assertEquals((txid - 1), numTrans);
}
reader0.close();
reader1.close();
assertEquals(txid - 1,
dlmreader0.getLogRecordCount() + dlmreader1.getLogRecordCount());
dlmreader0.close();
dlmwrite0.close();
dlmreader1.close();
dlmwrite1.close();
}
@Test(timeout = 60000)
public void testInterleavedReadersWithRollingEdgeUnPartitioned() throws Exception {
String name = "distrlog-interleaved-rolling-edge-unpartitioned";
BKDistributedLogManager dlmwrite0 = createNewDLM(conf, name + "-0");
BKDistributedLogManager dlmreader0 = createNewDLM(conf, name + "-0");
BKDistributedLogManager dlmwrite1 = createNewDLM(conf, name + "-1");
BKDistributedLogManager dlmreader1 = createNewDLM(conf, name + "-1");
LogReader reader0 = null;
LogReader reader1 = null;
long txid = 1;
int numTrans = 0;
BKAsyncLogWriter writer0 = dlmwrite0.startAsyncLogSegmentNonPartitioned();
BKAsyncLogWriter writer1 = dlmwrite1.startAsyncLogSegmentNonPartitioned();
for (long j = 1; j <= 4; j++) {
if (j > 1) {
writer0.setForceRolling(true);
writer1.setForceRolling(true);
}
for (int k = 1; k <= 2; k++) {
FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
writer0.setForceRolling(false);
writer1.setForceRolling(false);
}
FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
if (null == reader0) {
reader0 = dlmreader0.getInputStream(1);
}
if (null == reader1) {
reader1 = dlmreader1.getInputStream(1);
}
numTrans += drainStreams(reader0, 2, reader1, 2);
assertEquals((txid - 1), numTrans);
}
reader0.close();
reader1.close();
dlmreader0.close();
dlmreader1.close();
}
}