| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package com.twitter.distributedlog; |
| |
| import java.net.URI; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| import com.twitter.distributedlog.exceptions.AlreadyTruncatedTransactionException; |
| import com.twitter.distributedlog.exceptions.BKTransmitException; |
| import com.twitter.distributedlog.exceptions.LogEmptyException; |
| import com.twitter.distributedlog.exceptions.LogNotFoundException; |
| import com.twitter.distributedlog.exceptions.LogReadException; |
| import com.twitter.distributedlog.impl.ZKLogSegmentMetadataStore; |
| import com.twitter.distributedlog.io.Abortables; |
| import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore; |
| import com.twitter.distributedlog.util.FutureUtils; |
| import com.twitter.distributedlog.util.OrderedScheduler; |
| import com.twitter.distributedlog.util.Utils; |
| import org.apache.bookkeeper.client.BKException; |
| import org.junit.Assert; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TestName; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import com.twitter.distributedlog.callback.LogSegmentListener; |
| import com.twitter.distributedlog.exceptions.EndOfStreamException; |
| import com.twitter.distributedlog.exceptions.InvalidStreamNameException; |
| import com.twitter.distributedlog.exceptions.LogRecordTooLongException; |
| import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException; |
| import com.twitter.distributedlog.metadata.LogMetadata; |
| import com.twitter.distributedlog.metadata.MetadataUpdater; |
| import com.twitter.distributedlog.metadata.LogSegmentMetadataStoreUpdater; |
| import com.twitter.distributedlog.namespace.DistributedLogNamespace; |
| import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder; |
| import com.twitter.distributedlog.subscription.SubscriptionsStore; |
| import com.twitter.util.Await; |
| import com.twitter.util.Duration; |
| import com.twitter.util.Future; |
| |
| import static org.junit.Assert.*; |
| import static org.junit.Assert.assertEquals; |
| |
| public class TestBKDistributedLogManager extends TestDistributedLogBase { |
| static final Logger LOG = LoggerFactory.getLogger(TestBKDistributedLogManager.class); |
| |
| private static final Random RAND = new Random(System.currentTimeMillis()); |
| |
| @Rule |
| public TestName testNames = new TestName(); |
| |
| private static final long DEFAULT_SEGMENT_SIZE = 1000; |
| |
| private void testNonPartitionedWritesInternal(String name, DistributedLogConfiguration conf) throws Exception { |
| BKDistributedLogManager dlm = createNewDLM(conf, name); |
| |
| long txid = 1; |
| for (long i = 0; i < 3; i++) { |
| long start = txid; |
| BKSyncLogWriter writer = dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| writer.write(DLMTestUtil.getLogRecordInstance(txid++)); |
| } |
| BKLogSegmentWriter perStreamLogWriter = writer.getCachedLogWriter(); |
| writer.closeAndComplete(); |
| BKLogWriteHandler blplm = dlm.createWriteHandler(true); |
| assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, txid - 1, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| FutureUtils.result(blplm.asyncClose()); |
| } |
| |
| LogWriter writer = dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) { |
| writer.write(DLMTestUtil.getLogRecordInstance(txid++)); |
| } |
| writer.setReadyToFlush(); |
| writer.flushAndSync(); |
| writer.close(); |
| |
| LogReader reader = dlm.getInputStream(1); |
| long numTrans = 0; |
| LogRecord record = reader.readNext(false); |
| long lastTxId = -1; |
| while (null != record) { |
| DLMTestUtil.verifyLogRecord(record); |
| assert (lastTxId < record.getTransactionId()); |
| lastTxId = record.getTransactionId(); |
| numTrans++; |
| record = reader.readNext(false); |
| } |
| reader.close(); |
| assertEquals((txid - 1), numTrans); |
| } |
| |
| @Test(timeout = 60000) |
| public void testSimpleWrite() throws Exception { |
| BKDistributedLogManager dlm = createNewDLM(conf, "distrlog-simplewrite"); |
| BKSyncLogWriter out = dlm.startLogSegmentNonPartitioned(); |
| for (long i = 1; i <= 100; i++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(i); |
| out.write(op); |
| } |
| BKLogSegmentWriter perStreamLogWriter = out.getCachedLogWriter(); |
| out.closeAndComplete(); |
| |
| BKLogWriteHandler blplm = dlm.createWriteHandler(true); |
| assertNotNull(zkc.exists(blplm.completedLedgerZNode(1, 100, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| FutureUtils.result(blplm.asyncClose()); |
| } |
| |
| @Test(timeout = 60000) |
| public void testNumberOfTransactions() throws Exception { |
| String name = "distrlog-txncount"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| for (long i = 1; i <= 100; i++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(i); |
| out.write(op); |
| } |
| out.closeAndComplete(); |
| |
| long numTrans = DLMTestUtil.getNumberofLogRecords(createNewDLM(conf, name), 1); |
| assertEquals(100, numTrans); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testContinuousReaders() throws Exception { |
| String name = "distrlog-continuous"; |
| BKDistributedLogManager dlm = createNewDLM(conf, name); |
| long txid = 1; |
| for (long i = 0; i < 3; i++) { |
| long start = txid; |
| BKSyncLogWriter out = dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| BKLogSegmentWriter perStreamLogWriter = out.getCachedLogWriter(); |
| out.closeAndComplete(); |
| BKLogWriteHandler blplm = dlm.createWriteHandler(true); |
| |
| assertNotNull( |
| zkc.exists(blplm.completedLedgerZNode(start, txid - 1, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| FutureUtils.result(blplm.asyncClose()); |
| } |
| |
| BKSyncLogWriter out = dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| out.setReadyToFlush(); |
| out.flushAndSync(); |
| out.close(); |
| dlm.close(); |
| |
| dlm = createNewDLM(conf, name); |
| |
| LogReader reader = dlm.getInputStream(1); |
| long numTrans = 0; |
| LogRecord record = reader.readNext(false); |
| while (null != record) { |
| DLMTestUtil.verifyLogRecord(record); |
| numTrans++; |
| record = reader.readNext(false); |
| } |
| assertEquals((txid - 1), numTrans); |
| assertEquals(txid - 1, dlm.getLogRecordCount()); |
| reader.close(); |
| dlm.close(); |
| } |
| |
| /** |
| * Create a bkdlm namespace, write a journal from txid 1, close stream. |
| * Try to create a new journal from txid 1. Should throw an exception. |
| */ |
| @Test(timeout = 60000) |
| public void testWriteRestartFrom1() throws Exception { |
| DistributedLogManager dlm = createNewDLM(conf, "distrlog-restartFrom1"); |
| long txid = 1; |
| BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| out.closeAndComplete(); |
| |
| txid = 1; |
| try { |
| out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| out.write(DLMTestUtil.getLogRecordInstance(txid)); |
| fail("Shouldn't be able to start another journal from " + txid |
| + " when one already exists"); |
| } catch (Exception ioe) { |
| LOG.info("Caught exception as expected", ioe); |
| } finally { |
| out.close(); |
| } |
| |
| // test border case |
| txid = DEFAULT_SEGMENT_SIZE - 1; |
| try { |
| out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| out.write(DLMTestUtil.getLogRecordInstance(txid)); |
| fail("Shouldn't be able to start another journal from " + txid |
| + " when one already exists"); |
| } catch (TransactionIdOutOfOrderException rste) { |
| LOG.info("Caught exception as expected", rste); |
| } finally { |
| out.close(); |
| } |
| |
| // open journal continuing from before |
| txid = DEFAULT_SEGMENT_SIZE + 1; |
| out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| assertNotNull(out); |
| |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| out.closeAndComplete(); |
| |
| // open journal arbitarily far in the future |
| txid = DEFAULT_SEGMENT_SIZE * 4; |
| out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| out.write(DLMTestUtil.getLogRecordInstance(txid)); |
| out.close(); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testTwoWritersOnLockDisabled() throws Exception { |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.addConfiguration(conf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setWriteLockEnabled(false); |
| String name = "distrlog-two-writers-lock-disabled"; |
| DistributedLogManager manager = createNewDLM(confLocal, name); |
| AsyncLogWriter writer1 = FutureUtils.result(manager.openAsyncLogWriter()); |
| FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(1L))); |
| AsyncLogWriter writer2 = FutureUtils.result(manager.openAsyncLogWriter()); |
| FutureUtils.result(writer2.write(DLMTestUtil.getLogRecordInstance(2L))); |
| |
| // write a record to writer 1 again |
| try { |
| FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(3L))); |
| fail("Should fail writing record to writer 1 again as writer 2 took over the ownership"); |
| } catch (BKTransmitException bkte) { |
| assertEquals(BKException.Code.LedgerFencedException, bkte.getBKResultCode()); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testSimpleRead() throws Exception { |
| String name = "distrlog-simpleread"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| final long numTransactions = 10000; |
| BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| for (long i = 1; i <= numTransactions; i++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(i); |
| out.write(op); |
| } |
| out.closeAndComplete(); |
| |
| assertEquals(numTransactions, DLMTestUtil.getNumberofLogRecords(createNewDLM(conf, name), 1)); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception { |
| String name = "distrlog-inprogressAtEnd"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| long txid = 1; |
| for (long i = 0; i < 3; i++) { |
| long start = txid; |
| BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| BKLogSegmentWriter perStreamLogWriter = out.getCachedLogWriter(); |
| out.closeAndComplete(); |
| BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); |
| assertNotNull( |
| zkc.exists(blplm.completedLedgerZNode(start, txid - 1, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| FutureUtils.result(blplm.asyncClose()); |
| } |
| BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| out.setReadyToFlush(); |
| out.flushAndSync(); |
| out.close(); |
| |
| long numTrans = DLMTestUtil.getNumberofLogRecords(createNewDLM(conf, name), 1); |
| assertEquals((txid - 1), numTrans); |
| } |
| |
| @Test(timeout = 60000) |
| public void testContinuousReaderBulk() throws Exception { |
| String name = "distrlog-continuous-bulk"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| long txid = 1; |
| for (long i = 0; i < 3; i++) { |
| BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| out.closeAndComplete(); |
| } |
| |
| BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| out.setReadyToFlush(); |
| out.flushAndSync(); |
| out.close(); |
| dlm.close(); |
| |
| dlm = createNewDLM(conf, name); |
| |
| LogReader reader = dlm.getInputStream(1); |
| long numTrans = 0; |
| List<LogRecordWithDLSN> recordList = reader.readBulk(false, 13); |
| long lastTxId = -1; |
| while (!recordList.isEmpty()) { |
| for (LogRecord record : recordList) { |
| assert (lastTxId < record.getTransactionId()); |
| lastTxId = record.getTransactionId(); |
| DLMTestUtil.verifyLogRecord(record); |
| numTrans++; |
| } |
| recordList = reader.readBulk(false, 13); |
| } |
| reader.close(); |
| assertEquals((txid - 1), numTrans); |
| } |
| |
| @Test(timeout = 60000) |
| public void testContinuousReadersWithEmptyLedgers() throws Exception { |
| String name = "distrlog-continuous-emptyledgers"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| long txid = 1; |
| for (long i = 0; i < 3; i++) { |
| long start = txid; |
| BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| BKLogSegmentWriter writer = out.getCachedLogWriter(); |
| out.closeAndComplete(); |
| BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); |
| |
| assertNotNull( |
| zkc.exists(blplm.completedLedgerZNode(start, txid - 1, |
| writer.getLogSegmentSequenceNumber()), false)); |
| BKLogSegmentWriter perStreamLogWriter = blplm.startLogSegment(txid - 1); |
| blplm.completeAndCloseLogSegment(perStreamLogWriter.getLogSegmentSequenceNumber(), |
| perStreamLogWriter.getLogSegmentId(), txid - 1, txid - 1, 0); |
| assertNotNull( |
| zkc.exists(blplm.completedLedgerZNode(txid - 1, txid - 1, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| FutureUtils.result(blplm.asyncClose()); |
| } |
| |
| BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| out.setReadyToFlush(); |
| out.flushAndSync(); |
| out.close(); |
| dlm.close(); |
| |
| dlm = createNewDLM(conf, name); |
| |
| AsyncLogReader asyncreader = dlm.getAsyncLogReader(DLSN.InvalidDLSN); |
| long numTrans = 0; |
| LogRecordWithDLSN record = Await.result(asyncreader.readNext()); |
| while (null != record) { |
| DLMTestUtil.verifyLogRecord(record); |
| numTrans++; |
| if (numTrans >= (txid - 1)) { |
| break; |
| } |
| record = Await.result(asyncreader.readNext()); |
| } |
| assertEquals((txid - 1), numTrans); |
| Utils.close(asyncreader); |
| |
| LogReader reader = dlm.getInputStream(1); |
| numTrans = 0; |
| record = reader.readNext(false); |
| while (null != record) { |
| DLMTestUtil.verifyLogRecord(record); |
| numTrans++; |
| record = reader.readNext(false); |
| } |
| assertEquals((txid - 1), numTrans); |
| reader.close(); |
| assertEquals(txid - 1, dlm.getLogRecordCount()); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testNonPartitionedWrites() throws Exception { |
| String name = "distrlog-non-partitioned-bulk"; |
| testNonPartitionedWritesInternal(name, conf); |
| } |
| |
| @Test(timeout = 60000) |
| public void testCheckLogExists() throws Exception { |
| String name = "distrlog-check-log-exists"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| |
| long txid = 1; |
| LogWriter writer = dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) { |
| writer.write(DLMTestUtil.getLogRecordInstance(txid++)); |
| } |
| writer.setReadyToFlush(); |
| writer.flushAndSync(); |
| writer.close(); |
| dlm.close(); |
| |
| URI uri = createDLMURI("/" + name); |
| DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() |
| .conf(conf).uri(uri).build(); |
| assertTrue(namespace.logExists(name)); |
| assertFalse(namespace.logExists("non-existent-log")); |
| URI nonExistentUri = createDLMURI("/" + "non-existent-ns"); |
| DistributedLogNamespace nonExistentNS = DistributedLogNamespaceBuilder.newBuilder() |
| .conf(conf).uri(nonExistentUri).build(); |
| assertFalse(nonExistentNS.logExists(name)); |
| |
| int logCount = 0; |
| Iterator<String> logIter = namespace.getLogs(); |
| while(logIter.hasNext()) { |
| String log = logIter.next(); |
| logCount++; |
| assertEquals(name, log); |
| } |
| assertEquals(1, logCount); |
| |
| namespace.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testMetadataAccessor() throws Exception { |
| String name = "distrlog-metadata-accessor"; |
| MetadataAccessor metadata = DLMTestUtil.createNewMetadataAccessor(conf, name, createDLMURI("/" + name)); |
| assertEquals(name, metadata.getStreamName()); |
| metadata.createOrUpdateMetadata(name.getBytes()); |
| assertEquals(name, new String(metadata.getMetadata())); |
| metadata.deleteMetadata(); |
| assertEquals(null, metadata.getMetadata()); |
| } |
| |
| @Test(timeout = 60000) |
| public void testSubscriptionsStore() throws Exception { |
| String name = "distrlog-subscriptions-store"; |
| String subscriber0 = "subscriber-0"; |
| String subscriber1 = "subscriber-1"; |
| String subscriber2 = "subscriber-2"; |
| |
| DLSN commitPosition0 = new DLSN(4, 33, 5); |
| DLSN commitPosition1 = new DLSN(4, 34, 5); |
| DLSN commitPosition2 = new DLSN(5, 34, 5); |
| DLSN commitPosition3 = new DLSN(6, 35, 6); |
| |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| |
| SubscriptionsStore store = dlm.getSubscriptionsStore(); |
| |
| // no data |
| assertEquals(Await.result(store.getLastCommitPosition(subscriber0)), DLSN.NonInclusiveLowerBound); |
| assertEquals(Await.result(store.getLastCommitPosition(subscriber1)), DLSN.NonInclusiveLowerBound); |
| assertEquals(Await.result(store.getLastCommitPosition(subscriber2)), DLSN.NonInclusiveLowerBound); |
| // empty |
| assertTrue(Await.result(store.getLastCommitPositions()).isEmpty()); |
| |
| // subscriber 0 advance |
| Await.result(store.advanceCommitPosition(subscriber0, commitPosition0)); |
| assertEquals(commitPosition0, Await.result(store.getLastCommitPosition(subscriber0))); |
| Map<String, DLSN> committedPositions = Await.result(store.getLastCommitPositions()); |
| assertEquals(1, committedPositions.size()); |
| assertEquals(commitPosition0, committedPositions.get(subscriber0)); |
| |
| // subscriber 1 advance |
| Await.result(store.advanceCommitPosition(subscriber1, commitPosition1)); |
| assertEquals(commitPosition1, Await.result(store.getLastCommitPosition(subscriber1))); |
| committedPositions = Await.result(store.getLastCommitPositions()); |
| assertEquals(2, committedPositions.size()); |
| assertEquals(commitPosition0, committedPositions.get(subscriber0)); |
| assertEquals(commitPosition1, committedPositions.get(subscriber1)); |
| |
| // subscriber 2 advance |
| Await.result(store.advanceCommitPosition(subscriber2, commitPosition2)); |
| assertEquals(commitPosition2, Await.result(store.getLastCommitPosition(subscriber2))); |
| committedPositions = Await.result(store.getLastCommitPositions()); |
| assertEquals(3, committedPositions.size()); |
| assertEquals(commitPosition0, committedPositions.get(subscriber0)); |
| assertEquals(commitPosition1, committedPositions.get(subscriber1)); |
| assertEquals(commitPosition2, committedPositions.get(subscriber2)); |
| |
| // subscriber 2 advance again |
| DistributedLogManager newDLM = createNewDLM(conf, name); |
| SubscriptionsStore newStore = newDLM.getSubscriptionsStore(); |
| Await.result(newStore.advanceCommitPosition(subscriber2, commitPosition3)); |
| newStore.close(); |
| newDLM.close(); |
| |
| committedPositions = Await.result(store.getLastCommitPositions()); |
| assertEquals(3, committedPositions.size()); |
| assertEquals(commitPosition0, committedPositions.get(subscriber0)); |
| assertEquals(commitPosition1, committedPositions.get(subscriber1)); |
| assertEquals(commitPosition3, committedPositions.get(subscriber2)); |
| |
| dlm.close(); |
| |
| } |
| |
| private long writeAndMarkEndOfStream(DistributedLogManager dlm, long txid) throws Exception { |
| for (long i = 0; i < 3; i++) { |
| long start = txid; |
| BKSyncLogWriter writer = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| writer.write(DLMTestUtil.getLogRecordInstance(txid++)); |
| } |
| |
| BKLogSegmentWriter perStreamLogWriter = writer.getCachedLogWriter(); |
| |
| if (i < 2) { |
| writer.closeAndComplete(); |
| BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); |
| assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, txid - 1, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| FutureUtils.result(blplm.asyncClose()); |
| } else { |
| writer.markEndOfStream(); |
| BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); |
| assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, DistributedLogConstants.MAX_TXID, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| FutureUtils.result(blplm.asyncClose()); |
| } |
| } |
| return txid; |
| } |
| |
| @Test(timeout = 60000) |
| public void testMarkEndOfStream() throws Exception { |
| String name = "distrlog-mark-end-of-stream"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| |
| long txid = 1; |
| txid = writeAndMarkEndOfStream(dlm, txid); |
| |
| LogReader reader = dlm.getInputStream(1); |
| long numTrans = 0; |
| boolean exceptionEncountered = false; |
| LogRecord record = null; |
| try { |
| record = reader.readNext(false); |
| long expectedTxId = 1; |
| while (null != record) { |
| DLMTestUtil.verifyLogRecord(record); |
| assertEquals(expectedTxId, record.getTransactionId()); |
| expectedTxId++; |
| numTrans++; |
| record = reader.readNext(false); |
| } |
| } catch (EndOfStreamException exc) { |
| LOG.info("Encountered EndOfStream on reading records after {}", record); |
| exceptionEncountered = true; |
| } |
| assertEquals((txid - 1), numTrans); |
| assertTrue(exceptionEncountered); |
| exceptionEncountered = false; |
| try { |
| reader.readNext(false); |
| } catch (EndOfStreamException exc) { |
| exceptionEncountered = true; |
| } |
| assertTrue(exceptionEncountered); |
| reader.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testWriteFailsAfterMarkEndOfStream() throws Exception { |
| String name = "distrlog-mark-end-failure"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| |
| long txid = 1; |
| txid = writeAndMarkEndOfStream(dlm, txid); |
| |
| assertEquals(txid - 1, dlm.getLastTxId()); |
| LogRecord last = dlm.getLastLogRecord(); |
| assertEquals(txid - 1, last.getTransactionId()); |
| DLMTestUtil.verifyLogRecord(last); |
| assertTrue(dlm.isEndOfStreamMarked()); |
| |
| LogWriter writer = null; |
| boolean exceptionEncountered = false; |
| try { |
| writer = dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) { |
| writer.write(DLMTestUtil.getLogRecordInstance(txid++)); |
| } |
| } catch (EndOfStreamException exc) { |
| exceptionEncountered = true; |
| } |
| writer.close(); |
| assertTrue(exceptionEncountered); |
| } |
| |
| @Test(timeout = 60000) |
| public void testMarkEndOfStreamOnEmptyStream() throws Exception { |
| markEndOfStreamOnEmptyLogSegment(0); |
| } |
| |
| @Test(timeout = 60000) |
| public void testMarkEndOfStreamOnClosedStream() throws Exception { |
| markEndOfStreamOnEmptyLogSegment(3); |
| } |
| |
| private void markEndOfStreamOnEmptyLogSegment(int numCompletedSegments) throws Exception { |
| String name = "distrlog-mark-end-empty-" + numCompletedSegments; |
| |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| DLMTestUtil.generateCompletedLogSegments(dlm, conf, numCompletedSegments, DEFAULT_SEGMENT_SIZE); |
| |
| BKSyncLogWriter writer = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| writer.markEndOfStream(); |
| |
| LogReader reader = dlm.getInputStream(1); |
| long numTrans = 0; |
| boolean exceptionEncountered = false; |
| try { |
| LogRecord record = reader.readNext(false); |
| long lastTxId = -1; |
| while (null != record) { |
| DLMTestUtil.verifyLogRecord(record); |
| assert (lastTxId < record.getTransactionId()); |
| lastTxId = record.getTransactionId(); |
| numTrans++; |
| record = reader.readNext(false); |
| } |
| } catch (EndOfStreamException exc) { |
| exceptionEncountered = true; |
| } |
| assertEquals(numCompletedSegments * DEFAULT_SEGMENT_SIZE, numTrans); |
| assertTrue(exceptionEncountered); |
| exceptionEncountered = false; |
| try { |
| reader.readNext(false); |
| } catch (EndOfStreamException exc) { |
| exceptionEncountered = true; |
| } |
| assertTrue(exceptionEncountered); |
| reader.close(); |
| } |
| |
| @Test(timeout = 60000, expected = LogRecordTooLongException.class) |
| public void testMaxLogRecSize() throws Exception { |
| DistributedLogManager dlm = createNewDLM(conf, "distrlog-maxlogRecSize"); |
| AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter()); |
| FutureUtils.result(writer.write(new LogRecord(1L, DLMTestUtil.repeatString( |
| DLMTestUtil.repeatString("abcdefgh", 256), 512).getBytes()))); |
| } |
| |
| @Test(timeout = 60000) |
| public void testMaxTransmissionSize() throws Exception { |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(conf); |
| confLocal.setOutputBufferSize(1024 * 1024); |
| BKDistributedLogManager dlm = |
| createNewDLM(confLocal, "distrlog-transmissionSize"); |
| AsyncLogWriter out = FutureUtils.result(dlm.openAsyncLogWriter()); |
| boolean exceptionEncountered = false; |
| byte[] largePayload = new byte[(LogRecord.MAX_LOGRECORDSET_SIZE / 2) + 2]; |
| RAND.nextBytes(largePayload); |
| try { |
| LogRecord op = new LogRecord(1L, largePayload); |
| Future<DLSN> firstWriteFuture = out.write(op); |
| op = new LogRecord(2L, largePayload); |
| // the second write will flush the first one, since we reached the maximum transmission size. |
| out.write(op); |
| FutureUtils.result(firstWriteFuture); |
| } catch (LogRecordTooLongException exc) { |
| exceptionEncountered = true; |
| } finally { |
| FutureUtils.result(out.asyncClose()); |
| } |
| assertFalse(exceptionEncountered); |
| Abortables.abortQuietly(out); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void deleteDuringRead() throws Exception { |
| String name = "distrlog-delete-with-reader"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| |
| long txid = 1; |
| for (long i = 0; i < 3; i++) { |
| long start = txid; |
| BKSyncLogWriter writer = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| writer.write(DLMTestUtil.getLogRecordInstance(txid++)); |
| } |
| |
| BKLogSegmentWriter perStreamLogWriter = writer.getCachedLogWriter(); |
| |
| writer.closeAndComplete(); |
| BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); |
| assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, txid - 1, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| FutureUtils.result(blplm.asyncClose()); |
| } |
| |
| LogReader reader = dlm.getInputStream(1); |
| long numTrans = 1; |
| LogRecord record = reader.readNext(false); |
| assert (null != record); |
| DLMTestUtil.verifyLogRecord(record); |
| long lastTxId = record.getTransactionId(); |
| |
| dlm.delete(); |
| |
| boolean exceptionEncountered = false; |
| try { |
| record = reader.readNext(false); |
| while (null != record) { |
| DLMTestUtil.verifyLogRecord(record); |
| assert (lastTxId < record.getTransactionId()); |
| lastTxId = record.getTransactionId(); |
| numTrans++; |
| record = reader.readNext(false); |
| } |
| // make sure the exception is thrown from readahead |
| while (true) { |
| reader.readNext(false); |
| } |
| } catch (LogReadException readexc) { |
| exceptionEncountered = true; |
| } catch (LogNotFoundException exc) { |
| exceptionEncountered = true; |
| } |
| assertTrue(exceptionEncountered); |
| reader.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testImmediateFlush() throws Exception { |
| String name = "distrlog-immediate-flush"; |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(conf); |
| confLocal.setOutputBufferSize(0); |
| testNonPartitionedWritesInternal(name, confLocal); |
| } |
| |
| @Test(timeout = 60000) |
| public void testLastLogRecordWithEmptyLedgers() throws Exception { |
| String name = "distrlog-lastLogRec-emptyledgers"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| long txid = 1; |
| for (long i = 0; i < 3; i++) { |
| long start = txid; |
| BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| BKLogSegmentWriter perStreamLogWriter = out.getCachedLogWriter(); |
| out.closeAndComplete(); |
| BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); |
| |
| assertNotNull( |
| zkc.exists(blplm.completedLedgerZNode(start, txid - 1, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| BKLogSegmentWriter writer = blplm.startLogSegment(txid - 1); |
| blplm.completeAndCloseLogSegment(writer.getLogSegmentSequenceNumber(), |
| writer.getLogSegmentId(), txid - 1, txid - 1, 0); |
| assertNotNull( |
| zkc.exists(blplm.completedLedgerZNode(txid - 1, txid - 1, |
| writer.getLogSegmentSequenceNumber()), false)); |
| FutureUtils.result(blplm.asyncClose()); |
| } |
| |
| BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid); |
| op.setControl(); |
| out.write(op); |
| out.setReadyToFlush(); |
| out.flushAndSync(); |
| out.abort(); |
| dlm.close(); |
| |
| dlm = createNewDLM(conf, name); |
| |
| assertEquals(txid - 1, dlm.getLastTxId()); |
| LogRecord last = dlm.getLastLogRecord(); |
| assertEquals(txid - 1, last.getTransactionId()); |
| DLMTestUtil.verifyLogRecord(last); |
| assertEquals(txid - 1, dlm.getLogRecordCount()); |
| |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testLogSegmentListener() throws Exception { |
| String name = "distrlog-logsegment-listener"; |
| int numSegments = 3; |
| final CountDownLatch[] latches = new CountDownLatch[numSegments + 1]; |
| for (int i = 0; i < numSegments + 1; i++) { |
| latches[i] = new CountDownLatch(1); |
| } |
| |
| final AtomicInteger numFailures = new AtomicInteger(0); |
| final AtomicReference<Collection<LogSegmentMetadata>> receivedStreams = |
| new AtomicReference<Collection<LogSegmentMetadata>>(); |
| |
| BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name); |
| |
| FutureUtils.result(dlm.getWriterMetadataStore().getLog(dlm.getUri(), name, true, true)); |
| dlm.registerListener(new LogSegmentListener() { |
| @Override |
| public void onSegmentsUpdated(List<LogSegmentMetadata> segments) { |
| int updates = segments.size(); |
| boolean hasIncompletedLogSegments = false; |
| for (LogSegmentMetadata l : segments) { |
| if (l.isInProgress()) { |
| hasIncompletedLogSegments = true; |
| break; |
| } |
| } |
| if (hasIncompletedLogSegments) { |
| return; |
| } |
| if (updates >= 1) { |
| if (segments.get(segments.size() - 1).getLogSegmentSequenceNumber() != updates) { |
| numFailures.incrementAndGet(); |
| } |
| } |
| receivedStreams.set(segments); |
| latches[updates].countDown(); |
| } |
| |
| @Override |
| public void onLogStreamDeleted() { |
| // no-op |
| } |
| }); |
| long txid = 1; |
| for (int i = 0; i < numSegments; i++) { |
| LOG.info("Waiting for creating log segment {}.", i); |
| latches[i].await(); |
| LOG.info("Creating log segment {}.", i); |
| BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned(); |
| LOG.info("Created log segment {}.", i); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| out.closeAndComplete(); |
| LOG.info("Completed log segment {}.", i); |
| } |
| latches[numSegments].await(); |
| assertEquals(0, numFailures.get()); |
| assertNotNull(receivedStreams.get()); |
| assertEquals(numSegments, receivedStreams.get().size()); |
| int seqno = 1; |
| for (LogSegmentMetadata m : receivedStreams.get()) { |
| assertEquals(seqno, m.getLogSegmentSequenceNumber()); |
| assertEquals((seqno - 1) * DEFAULT_SEGMENT_SIZE + 1, m.getFirstTxId()); |
| assertEquals(seqno * DEFAULT_SEGMENT_SIZE, m.getLastTxId()); |
| ++seqno; |
| } |
| |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testGetLastDLSN() throws Exception { |
| String name = "distrlog-get-last-dlsn"; |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(conf); |
| confLocal.setFirstNumEntriesPerReadLastRecordScan(2); |
| confLocal.setMaxNumEntriesPerReadLastRecordScan(4); |
| confLocal.setImmediateFlushEnabled(true); |
| confLocal.setOutputBufferSize(0); |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); |
| long txid = 1; |
| LOG.info("Writing 10 control records"); |
| for (int i = 0; i < 10; i++) { |
| LogRecord record = DLMTestUtil.getLogRecordInstance(txid++); |
| record.setControl(); |
| Await.result(writer.writeControlRecord(record)); |
| } |
| LOG.info("10 control records are written"); |
| |
| try { |
| dlm.getLastDLSN(); |
| fail("Should fail on getting last dlsn from an empty log."); |
| } catch (LogEmptyException lee) { |
| // expected |
| } |
| |
| writer.closeAndComplete(); |
| LOG.info("Completed first log segment"); |
| |
| writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); |
| Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++))); |
| LOG.info("Completed second log segment"); |
| |
| LOG.info("Writing another 10 control records"); |
| for (int i = 1; i < 10; i++) { |
| LogRecord record = DLMTestUtil.getLogRecordInstance(txid++); |
| record.setControl(); |
| Await.result(writer.write(record)); |
| } |
| |
| assertEquals(new DLSN(2, 0, 0), dlm.getLastDLSN()); |
| |
| writer.closeAndComplete(); |
| LOG.info("Completed third log segment"); |
| |
| assertEquals(new DLSN(2, 0, 0), dlm.getLastDLSN()); |
| |
| writer.close(); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testGetLogRecordCountAsync() throws Exception { |
| DistributedLogManager dlm = createNewDLM(conf, testNames.getMethodName()); |
| BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); |
| DLMTestUtil.generateCompletedLogSegments(dlm, conf, 2, 10); |
| |
| Future<Long> futureCount = dlm.getLogRecordCountAsync(DLSN.InitialDLSN); |
| Long count = Await.result(futureCount, Duration.fromSeconds(2)); |
| assertEquals(20, count.longValue()); |
| |
| writer.close(); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testInvalidStreamFromInvalidZkPath() throws Exception { |
| String baseName = testNames.getMethodName(); |
| String streamName = "\0blah"; |
| URI uri = createDLMURI("/" + baseName); |
| DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder() |
| .conf(conf).uri(uri).build(); |
| |
| DistributedLogManager dlm = null; |
| AsyncLogWriter writer = null; |
| try { |
| dlm = namespace.openLog(streamName); |
| writer = dlm.startAsyncLogSegmentNonPartitioned(); |
| fail("should have thrown"); |
| } catch (InvalidStreamNameException e) { |
| } finally { |
| if (null != writer) { |
| Utils.close(writer); |
| } |
| if (null != dlm) { |
| dlm.close(); |
| } |
| namespace.close(); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testTruncationValidation() throws Exception { |
| String name = "distrlog-truncation-validation"; |
| URI uri = createDLMURI("/" + name); |
| ZooKeeperClient zookeeperClient = TestZooKeeperClientBuilder.newBuilder() |
| .uri(uri) |
| .build(); |
| OrderedScheduler scheduler = OrderedScheduler.newBuilder() |
| .name("test-truncation-validation") |
| .corePoolSize(1) |
| .build(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(conf); |
| confLocal.setDLLedgerMetadataLayoutVersion(LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setLogSegmentCacheEnabled(false); |
| |
| LogSegmentMetadataStore metadataStore = new ZKLogSegmentMetadataStore(confLocal, zookeeperClient, scheduler); |
| |
| BKDistributedLogManager dlm = createNewDLM(confLocal, name); |
| DLSN truncDLSN = DLSN.InitialDLSN; |
| DLSN beyondTruncDLSN = DLSN.InitialDLSN; |
| long beyondTruncTxId = 1; |
| long txid = 1; |
| for (long i = 0; i < 3; i++) { |
| long start = txid; |
| BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); |
| for (long j = 1; j <= 10; j++) { |
| LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++); |
| Future<DLSN> dlsn = writer.write(record); |
| |
| if (i == 1 && j == 2) { |
| truncDLSN = Await.result(dlsn); |
| } else if (i == 2 && j == 3) { |
| beyondTruncDLSN = Await.result(dlsn); |
| beyondTruncTxId = record.getTransactionId(); |
| } else if (j == 10) { |
| Await.ready(dlsn); |
| } |
| } |
| |
| writer.close(); |
| } |
| |
| { |
| LogReader reader = dlm.getInputStream(DLSN.InitialDLSN); |
| LogRecordWithDLSN record = reader.readNext(false); |
| assertTrue((record != null) && (record.getDlsn().compareTo(DLSN.InitialDLSN) == 0)); |
| reader.close(); |
| } |
| |
| Map<Long, LogSegmentMetadata> segmentList = DLMTestUtil.readLogSegments(zookeeperClient, |
| LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); |
| |
| LOG.info("Read segments before truncating first segment : {}", segmentList); |
| |
| MetadataUpdater updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater( |
| confLocal, metadataStore); |
| FutureUtils.result(updater.setLogSegmentTruncated(segmentList.get(1L))); |
| |
| segmentList = DLMTestUtil.readLogSegments(zookeeperClient, |
| LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); |
| |
| LOG.info("Read segments after truncated first segment : {}", segmentList); |
| |
| { |
| LogReader reader = dlm.getInputStream(DLSN.InitialDLSN); |
| LogRecordWithDLSN record = reader.readNext(false); |
| assertTrue("Unexpected record : " + record, |
| (record != null) && (record.getDlsn().compareTo(new DLSN(2, 0, 0)) == 0)); |
| reader.close(); |
| } |
| |
| { |
| LogReader reader = dlm.getInputStream(1); |
| LogRecordWithDLSN record = reader.readNext(false); |
| assertTrue((record != null) && (record.getDlsn().compareTo(new DLSN(2, 0, 0)) == 0)); |
| reader.close(); |
| } |
| |
| updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, metadataStore); |
| FutureUtils.result(updater.setLogSegmentActive(segmentList.get(1L))); |
| |
| segmentList = DLMTestUtil.readLogSegments(zookeeperClient, |
| LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); |
| |
| LOG.info("Read segments after marked first segment as active : {}", segmentList); |
| |
| updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, metadataStore); |
| FutureUtils.result(updater.setLogSegmentTruncated(segmentList.get(2L))); |
| |
| segmentList = DLMTestUtil.readLogSegments(zookeeperClient, |
| LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); |
| |
| LOG.info("Read segments after truncated second segment : {}", segmentList); |
| |
| { |
| AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InitialDLSN); |
| long expectedTxId = 1L; |
| boolean exceptionEncountered = false; |
| try { |
| for (int i = 0; i < 3 * 10; i++) { |
| LogRecordWithDLSN record = Await.result(reader.readNext()); |
| DLMTestUtil.verifyLargeLogRecord(record); |
| assertEquals(expectedTxId, record.getTransactionId()); |
| expectedTxId++; |
| } |
| } catch (AlreadyTruncatedTransactionException exc) { |
| exceptionEncountered = true; |
| } |
| assertTrue(exceptionEncountered); |
| Utils.close(reader); |
| } |
| |
| updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore); |
| FutureUtils.result(updater.setLogSegmentActive(segmentList.get(2L))); |
| |
| BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); |
| Assert.assertTrue(Await.result(writer.truncate(truncDLSN))); |
| BKLogWriteHandler handler = writer.getCachedWriteHandler(); |
| List<LogSegmentMetadata> cachedSegments = handler.getCachedLogSegments(LogSegmentMetadata.COMPARATOR); |
| for (LogSegmentMetadata segment: cachedSegments) { |
| if (segment.getLastDLSN().compareTo(truncDLSN) < 0) { |
| Assert.assertTrue(segment.isTruncated()); |
| Assert.assertTrue(!segment.isPartiallyTruncated()); |
| } else if (segment.getFirstDLSN().compareTo(truncDLSN) < 0) { |
| Assert.assertTrue(!segment.isTruncated()); |
| Assert.assertTrue(segment.isPartiallyTruncated()); |
| } else { |
| Assert.assertTrue(!segment.isTruncated()); |
| Assert.assertTrue(!segment.isPartiallyTruncated()); |
| } |
| } |
| |
| segmentList = DLMTestUtil.readLogSegments(zookeeperClient, |
| LogMetadata.getLogSegmentsPath(uri, name, conf.getUnpartitionedStreamName())); |
| |
| Assert.assertTrue(segmentList.get(truncDLSN.getLogSegmentSequenceNo()).getMinActiveDLSN().compareTo(truncDLSN) == 0); |
| |
| { |
| LogReader reader = dlm.getInputStream(DLSN.InitialDLSN); |
| LogRecordWithDLSN record = reader.readNext(false); |
| assertTrue(record != null); |
| assertEquals(truncDLSN, record.getDlsn()); |
| reader.close(); |
| } |
| |
| { |
| LogReader reader = dlm.getInputStream(1); |
| LogRecordWithDLSN record = reader.readNext(false); |
| assertTrue(record != null); |
| assertEquals(truncDLSN, record.getDlsn()); |
| reader.close(); |
| } |
| |
| { |
| AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InitialDLSN); |
| LogRecordWithDLSN record = Await.result(reader.readNext()); |
| assertTrue(record != null); |
| assertEquals(truncDLSN, record.getDlsn()); |
| Utils.close(reader); |
| } |
| |
| |
| { |
| LogReader reader = dlm.getInputStream(beyondTruncDLSN); |
| LogRecordWithDLSN record = reader.readNext(false); |
| assertTrue(record != null); |
| assertEquals(beyondTruncDLSN, record.getDlsn()); |
| reader.close(); |
| } |
| |
| { |
| LogReader reader = dlm.getInputStream(beyondTruncTxId); |
| LogRecordWithDLSN record = reader.readNext(false); |
| assertTrue(record != null); |
| assertEquals(beyondTruncDLSN, record.getDlsn()); |
| assertEquals(beyondTruncTxId, record.getTransactionId()); |
| reader.close(); |
| } |
| |
| { |
| AsyncLogReader reader = dlm.getAsyncLogReader(beyondTruncDLSN); |
| LogRecordWithDLSN record = Await.result(reader.readNext()); |
| assertTrue(record != null); |
| assertEquals(beyondTruncDLSN, record.getDlsn()); |
| Utils.close(reader); |
| } |
| |
| zookeeperClient.close(); |
| } |
| } |