| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.distributedlog; |
| |
| |
| import static com.google.common.base.Charsets.UTF_8; |
| import static org.junit.Assert.*; |
| import static org.junit.Assert.assertEquals; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicReference; |
| import org.apache.bookkeeper.client.BKException; |
| import org.apache.bookkeeper.client.BookKeeper; |
| import org.apache.distributedlog.api.AsyncLogReader; |
| import org.apache.distributedlog.api.AsyncLogWriter; |
| import org.apache.distributedlog.api.DistributedLogManager; |
| import org.apache.distributedlog.api.LogReader; |
| import org.apache.distributedlog.api.LogWriter; |
| import org.apache.distributedlog.api.MetadataAccessor; |
| import org.apache.distributedlog.api.namespace.Namespace; |
| import org.apache.distributedlog.api.namespace.NamespaceBuilder; |
| import org.apache.distributedlog.api.subscription.SubscriptionsStore; |
| import org.apache.distributedlog.callback.LogSegmentListener; |
| import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException; |
| import org.apache.distributedlog.exceptions.BKTransmitException; |
| import org.apache.distributedlog.exceptions.EndOfStreamException; |
| import org.apache.distributedlog.exceptions.InvalidStreamNameException; |
| import org.apache.distributedlog.exceptions.LogEmptyException; |
| import org.apache.distributedlog.exceptions.LogNotFoundException; |
| import org.apache.distributedlog.exceptions.LogReadException; |
| import org.apache.distributedlog.exceptions.LogRecordTooLongException; |
| import org.apache.distributedlog.exceptions.TransactionIdOutOfOrderException; |
| import org.apache.distributedlog.impl.BKNamespaceDriver; |
| import org.apache.distributedlog.impl.ZKLogSegmentMetadataStore; |
| import org.apache.distributedlog.io.Abortables; |
| import org.apache.distributedlog.logsegment.LogSegmentMetadataStore; |
| import org.apache.distributedlog.metadata.LogMetadata; |
| import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater; |
| import org.apache.distributedlog.metadata.MetadataUpdater; |
| import org.apache.distributedlog.util.OrderedScheduler; |
| import org.apache.distributedlog.util.Utils; |
| import org.junit.Assert; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TestName; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| |
| |
| /** |
| * Test Cases for {@link DistributedLogManager}. |
| */ |
| public class TestBKDistributedLogManager extends TestDistributedLogBase { |
| static final Logger LOG = LoggerFactory.getLogger(TestBKDistributedLogManager.class); |
| |
| private static final Random RAND = new Random(System.currentTimeMillis()); |
| |
| @Rule |
| public TestName testNames = new TestName(); |
| |
| private static final long DEFAULT_SEGMENT_SIZE = 1000; |
| |
| private void testNonPartitionedWritesInternal(String name, DistributedLogConfiguration conf) throws Exception { |
| BKDistributedLogManager dlm = createNewDLM(conf, name); |
| |
| long txid = 1; |
| for (long i = 0; i < 3; i++) { |
| long start = txid; |
| BKSyncLogWriter writer = dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| writer.write(DLMTestUtil.getLogRecordInstance(txid++)); |
| } |
| BKLogSegmentWriter perStreamLogWriter = writer.getCachedLogWriter(); |
| writer.closeAndComplete(); |
| BKLogWriteHandler blplm = dlm.createWriteHandler(true); |
| assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, txid - 1, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| Utils.ioResult(blplm.asyncClose()); |
| } |
| |
| LogWriter writer = dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) { |
| writer.write(DLMTestUtil.getLogRecordInstance(txid++)); |
| } |
| writer.flush(); |
| writer.commit(); |
| writer.close(); |
| |
| LogReader reader = dlm.getInputStream(1); |
| long numTrans = 0; |
| LogRecord record = reader.readNext(false); |
| long lastTxId = -1; |
| while (null != record) { |
| DLMTestUtil.verifyLogRecord(record); |
| assert (lastTxId < record.getTransactionId()); |
| lastTxId = record.getTransactionId(); |
| numTrans++; |
| record = reader.readNext(false); |
| } |
| reader.close(); |
| assertEquals((txid - 1), numTrans); |
| } |
| |
| @Test(timeout = 60000) |
| public void testSimpleWrite() throws Exception { |
| BKDistributedLogManager dlm = createNewDLM(conf, "distrlog-simplewrite"); |
| BKSyncLogWriter out = dlm.startLogSegmentNonPartitioned(); |
| for (long i = 1; i <= 100; i++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(i); |
| out.write(op); |
| } |
| BKLogSegmentWriter perStreamLogWriter = out.getCachedLogWriter(); |
| out.closeAndComplete(); |
| |
| BKLogWriteHandler blplm = dlm.createWriteHandler(true); |
| assertNotNull(zkc.exists(blplm.completedLedgerZNode(1, 100, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| Utils.ioResult(blplm.asyncClose()); |
| } |
| |
| @Test(timeout = 60000) |
| public void testNumberOfTransactions() throws Exception { |
| String name = "distrlog-txncount"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| BKSyncLogWriter out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| for (long i = 1; i <= 100; i++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(i); |
| out.write(op); |
| } |
| out.closeAndComplete(); |
| |
| long numTrans = DLMTestUtil.getNumberofLogRecords(createNewDLM(conf, name), 1); |
| assertEquals(100, numTrans); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testContinuousReaders() throws Exception { |
| String name = "distrlog-continuous"; |
| BKDistributedLogManager dlm = createNewDLM(conf, name); |
| long txid = 1; |
| for (long i = 0; i < 3; i++) { |
| long start = txid; |
| BKSyncLogWriter out = dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| BKLogSegmentWriter perStreamLogWriter = out.getCachedLogWriter(); |
| out.closeAndComplete(); |
| BKLogWriteHandler blplm = dlm.createWriteHandler(true); |
| |
| assertNotNull( |
| zkc.exists(blplm.completedLedgerZNode(start, txid - 1, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| Utils.ioResult(blplm.asyncClose()); |
| } |
| |
| BKSyncLogWriter out = dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| out.flush(); |
| out.commit(); |
| out.close(); |
| dlm.close(); |
| |
| dlm = createNewDLM(conf, name); |
| |
| LogReader reader = dlm.getInputStream(1); |
| long numTrans = 0; |
| LogRecord record = reader.readNext(false); |
| while (null != record) { |
| DLMTestUtil.verifyLogRecord(record); |
| numTrans++; |
| record = reader.readNext(false); |
| } |
| assertEquals((txid - 1), numTrans); |
| assertEquals(txid - 1, dlm.getLogRecordCount()); |
| reader.close(); |
| dlm.close(); |
| } |
| |
| /** |
| * Create a bkdlm namespace, write a journal from txid 1, close stream. |
| * Try to create a new journal from txid 1. Should throw an exception. |
| */ |
| @Test(timeout = 60000) |
| public void testWriteRestartFrom1() throws Exception { |
| DistributedLogManager dlm = createNewDLM(conf, "distrlog-restartFrom1"); |
| long txid = 1; |
| BKSyncLogWriter out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| out.closeAndComplete(); |
| |
| txid = 1; |
| try { |
| out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| out.write(DLMTestUtil.getLogRecordInstance(txid)); |
| fail("Shouldn't be able to start another journal from " + txid |
| + " when one already exists"); |
| } catch (Exception ioe) { |
| LOG.info("Caught exception as expected", ioe); |
| } finally { |
| out.close(); |
| } |
| |
| // test border case |
| txid = DEFAULT_SEGMENT_SIZE - 1; |
| try { |
| out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| out.write(DLMTestUtil.getLogRecordInstance(txid)); |
| fail("Shouldn't be able to start another journal from " + txid |
| + " when one already exists"); |
| } catch (TransactionIdOutOfOrderException rste) { |
| LOG.info("Caught exception as expected", rste); |
| } finally { |
| out.close(); |
| } |
| |
| // open journal continuing from before |
| txid = DEFAULT_SEGMENT_SIZE + 1; |
| out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| assertNotNull(out); |
| |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| out.closeAndComplete(); |
| |
| // open journal arbitarily far in the future |
| txid = DEFAULT_SEGMENT_SIZE * 4; |
| out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| out.write(DLMTestUtil.getLogRecordInstance(txid)); |
| out.close(); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testTwoWritersOnLockDisabled() throws Exception { |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.addConfiguration(conf); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setWriteLockEnabled(false); |
| String name = "distrlog-two-writers-lock-disabled"; |
| DistributedLogManager manager = createNewDLM(confLocal, name); |
| AsyncLogWriter writer1 = Utils.ioResult(manager.openAsyncLogWriter()); |
| Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(1L))); |
| AsyncLogWriter writer2 = Utils.ioResult(manager.openAsyncLogWriter()); |
| Utils.ioResult(writer2.write(DLMTestUtil.getLogRecordInstance(2L))); |
| |
| // write a record to writer 1 again |
| try { |
| Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(3L))); |
| fail("Should fail writing record to writer 1 again as writer 2 took over the ownership"); |
| } catch (BKTransmitException bkte) { |
| assertEquals(BKException.Code.LedgerFencedException, bkte.getBKResultCode()); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testSimpleRead() throws Exception { |
| String name = "distrlog-simpleread"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| final long numTransactions = 10000; |
| BKSyncLogWriter out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| for (long i = 1; i <= numTransactions; i++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(i); |
| out.write(op); |
| } |
| out.closeAndComplete(); |
| |
| assertEquals(numTransactions, DLMTestUtil.getNumberofLogRecords(createNewDLM(conf, name), 1)); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testNumberOfTransactionsWithInprogressAtEnd() throws Exception { |
| String name = "distrlog-inprogressAtEnd"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| long txid = 1; |
| for (long i = 0; i < 3; i++) { |
| long start = txid; |
| BKSyncLogWriter out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| BKLogSegmentWriter perStreamLogWriter = out.getCachedLogWriter(); |
| out.closeAndComplete(); |
| BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); |
| assertNotNull( |
| zkc.exists(blplm.completedLedgerZNode(start, txid - 1, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| Utils.ioResult(blplm.asyncClose()); |
| } |
| BKSyncLogWriter out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| out.flush(); |
| out.commit(); |
| out.close(); |
| |
| long numTrans = DLMTestUtil.getNumberofLogRecords(createNewDLM(conf, name), 1); |
| assertEquals((txid - 1), numTrans); |
| } |
| |
| @Test(timeout = 60000) |
| public void testContinuousReaderBulk() throws Exception { |
| String name = "distrlog-continuous-bulk"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| long txid = 1; |
| for (long i = 0; i < 3; i++) { |
| BKSyncLogWriter out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| out.closeAndComplete(); |
| } |
| |
| BKSyncLogWriter out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| out.flush(); |
| out.commit(); |
| out.close(); |
| dlm.close(); |
| |
| dlm = createNewDLM(conf, name); |
| |
| LogReader reader = dlm.getInputStream(1); |
| long numTrans = 0; |
| List<LogRecordWithDLSN> recordList = reader.readBulk(false, 13); |
| long lastTxId = -1; |
| while (!recordList.isEmpty()) { |
| for (LogRecord record : recordList) { |
| assert (lastTxId < record.getTransactionId()); |
| lastTxId = record.getTransactionId(); |
| DLMTestUtil.verifyLogRecord(record); |
| numTrans++; |
| } |
| recordList = reader.readBulk(false, 13); |
| } |
| reader.close(); |
| assertEquals((txid - 1), numTrans); |
| } |
| |
| @Test(timeout = 60000) |
| public void testContinuousReadersWithEmptyLedgers() throws Exception { |
| String name = "distrlog-continuous-emptyledgers"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| long txid = 1; |
| for (long i = 0; i < 3; i++) { |
| long start = txid; |
| BKSyncLogWriter out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| BKLogSegmentWriter writer = out.getCachedLogWriter(); |
| out.closeAndComplete(); |
| BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); |
| |
| assertNotNull( |
| zkc.exists(blplm.completedLedgerZNode(start, txid - 1, |
| writer.getLogSegmentSequenceNumber()), false)); |
| BKLogSegmentWriter perStreamLogWriter = blplm.startLogSegment(txid - 1); |
| blplm.completeAndCloseLogSegment(perStreamLogWriter.getLogSegmentSequenceNumber(), |
| perStreamLogWriter.getLogSegmentId(), txid - 1, txid - 1, 0); |
| assertNotNull( |
| zkc.exists(blplm.completedLedgerZNode(txid - 1, txid - 1, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| Utils.ioResult(blplm.asyncClose()); |
| } |
| |
| BKSyncLogWriter out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| out.flush(); |
| out.commit(); |
| out.close(); |
| dlm.close(); |
| |
| dlm = createNewDLM(conf, name); |
| |
| AsyncLogReader asyncreader = dlm.getAsyncLogReader(DLSN.InvalidDLSN); |
| long numTrans = 0; |
| LogRecordWithDLSN record = Utils.ioResult(asyncreader.readNext()); |
| while (null != record) { |
| DLMTestUtil.verifyLogRecord(record); |
| numTrans++; |
| if (numTrans >= (txid - 1)) { |
| break; |
| } |
| record = Utils.ioResult(asyncreader.readNext()); |
| } |
| assertEquals((txid - 1), numTrans); |
| Utils.close(asyncreader); |
| |
| LogReader reader = dlm.getInputStream(1); |
| numTrans = 0; |
| record = reader.readNext(false); |
| while (null != record) { |
| DLMTestUtil.verifyLogRecord(record); |
| numTrans++; |
| record = reader.readNext(false); |
| } |
| assertEquals((txid - 1), numTrans); |
| reader.close(); |
| assertEquals(txid - 1, dlm.getLogRecordCount()); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testNonPartitionedWrites() throws Exception { |
| String name = "distrlog-non-partitioned-bulk"; |
| testNonPartitionedWritesInternal(name, conf); |
| } |
| |
| @Test(timeout = 60000) |
| public void testCheckLogExists() throws Exception { |
| String name = "distrlog-check-log-exists"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| |
| long txid = 1; |
| LogWriter writer = dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) { |
| writer.write(DLMTestUtil.getLogRecordInstance(txid++)); |
| } |
| writer.flush(); |
| writer.commit(); |
| writer.close(); |
| dlm.close(); |
| |
| URI uri = createDLMURI("/" + name); |
| Namespace namespace = NamespaceBuilder.newBuilder() |
| .conf(conf).uri(uri).build(); |
| assertTrue(namespace.logExists(name)); |
| assertFalse(namespace.logExists("non-existent-log")); |
| URI nonExistentUri = createDLMURI("/" + "non-existent-ns"); |
| Namespace nonExistentNS = NamespaceBuilder.newBuilder() |
| .conf(conf).uri(nonExistentUri).build(); |
| assertFalse(nonExistentNS.logExists(name)); |
| |
| int logCount = 0; |
| Iterator<String> logIter = namespace.getLogs(); |
| while (logIter.hasNext()) { |
| String log = logIter.next(); |
| logCount++; |
| assertEquals(name, log); |
| } |
| assertEquals(1, logCount); |
| |
| namespace.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testMetadataAccessor() throws Exception { |
| String name = "distrlog-metadata-accessor"; |
| MetadataAccessor metadata = DLMTestUtil.createNewMetadataAccessor(conf, name, createDLMURI("/" + name)); |
| assertEquals(name, metadata.getStreamName()); |
| metadata.createOrUpdateMetadata(name.getBytes()); |
| assertEquals(name, new String(metadata.getMetadata())); |
| metadata.deleteMetadata(); |
| assertEquals(null, metadata.getMetadata()); |
| } |
| |
| @Test(timeout = 60000) |
| public void testSubscriptionsStore() throws Exception { |
| String name = "distrlog-subscriptions-store"; |
| String subscriber0 = "subscriber-0"; |
| String subscriber1 = "subscriber-1"; |
| String subscriber2 = "subscriber-2"; |
| |
| DLSN commitPosition0 = new DLSN(4, 33, 5); |
| DLSN commitPosition1 = new DLSN(4, 34, 5); |
| DLSN commitPosition2 = new DLSN(5, 34, 5); |
| DLSN commitPosition3 = new DLSN(6, 35, 6); |
| |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| |
| SubscriptionsStore store = dlm.getSubscriptionsStore(); |
| |
| // no data |
| assertEquals(Utils.ioResult(store.getLastCommitPosition(subscriber0)), DLSN.NonInclusiveLowerBound); |
| assertEquals(Utils.ioResult(store.getLastCommitPosition(subscriber1)), DLSN.NonInclusiveLowerBound); |
| assertEquals(Utils.ioResult(store.getLastCommitPosition(subscriber2)), DLSN.NonInclusiveLowerBound); |
| // empty |
| assertTrue(Utils.ioResult(store.getLastCommitPositions()).isEmpty()); |
| |
| // subscriber 0 advance |
| Utils.ioResult(store.advanceCommitPosition(subscriber0, commitPosition0)); |
| assertEquals(commitPosition0, Utils.ioResult(store.getLastCommitPosition(subscriber0))); |
| Map<String, DLSN> committedPositions = Utils.ioResult(store.getLastCommitPositions()); |
| assertEquals(1, committedPositions.size()); |
| assertEquals(commitPosition0, committedPositions.get(subscriber0)); |
| |
| // subscriber 1 advance |
| Utils.ioResult(store.advanceCommitPosition(subscriber1, commitPosition1)); |
| assertEquals(commitPosition1, Utils.ioResult(store.getLastCommitPosition(subscriber1))); |
| committedPositions = Utils.ioResult(store.getLastCommitPositions()); |
| assertEquals(2, committedPositions.size()); |
| assertEquals(commitPosition0, committedPositions.get(subscriber0)); |
| assertEquals(commitPosition1, committedPositions.get(subscriber1)); |
| |
| // subscriber 2 advance |
| Utils.ioResult(store.advanceCommitPosition(subscriber2, commitPosition2)); |
| assertEquals(commitPosition2, Utils.ioResult(store.getLastCommitPosition(subscriber2))); |
| committedPositions = Utils.ioResult(store.getLastCommitPositions()); |
| assertEquals(3, committedPositions.size()); |
| assertEquals(commitPosition0, committedPositions.get(subscriber0)); |
| assertEquals(commitPosition1, committedPositions.get(subscriber1)); |
| assertEquals(commitPosition2, committedPositions.get(subscriber2)); |
| |
| // subscriber 2 advance again |
| DistributedLogManager newDLM = createNewDLM(conf, name); |
| SubscriptionsStore newStore = newDLM.getSubscriptionsStore(); |
| Utils.ioResult(newStore.advanceCommitPosition(subscriber2, commitPosition3)); |
| newStore.close(); |
| newDLM.close(); |
| |
| committedPositions = Utils.ioResult(store.getLastCommitPositions()); |
| assertEquals(3, committedPositions.size()); |
| assertEquals(commitPosition0, committedPositions.get(subscriber0)); |
| assertEquals(commitPosition1, committedPositions.get(subscriber1)); |
| assertEquals(commitPosition3, committedPositions.get(subscriber2)); |
| |
| dlm.close(); |
| |
| } |
| |
| private long writeAndMarkEndOfStream(DistributedLogManager dlm, long txid) throws Exception { |
| for (long i = 0; i < 3; i++) { |
| long start = txid; |
| BKSyncLogWriter writer = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| writer.write(DLMTestUtil.getLogRecordInstance(txid++)); |
| } |
| |
| BKLogSegmentWriter perStreamLogWriter = writer.getCachedLogWriter(); |
| |
| if (i < 2) { |
| writer.closeAndComplete(); |
| BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); |
| assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, txid - 1, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| Utils.ioResult(blplm.asyncClose()); |
| } else { |
| writer.markEndOfStream(); |
| BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); |
| assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, DistributedLogConstants.MAX_TXID, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| Utils.ioResult(blplm.asyncClose()); |
| } |
| } |
| return txid; |
| } |
| |
| @Test(timeout = 60000) |
| public void testMarkEndOfStream() throws Exception { |
| String name = "distrlog-mark-end-of-stream"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| |
| long txid = 1; |
| txid = writeAndMarkEndOfStream(dlm, txid); |
| |
| LogReader reader = dlm.getInputStream(1); |
| long numTrans = 0; |
| boolean exceptionEncountered = false; |
| LogRecord record = null; |
| try { |
| record = reader.readNext(false); |
| long expectedTxId = 1; |
| while (null != record) { |
| DLMTestUtil.verifyLogRecord(record); |
| assertEquals(expectedTxId, record.getTransactionId()); |
| expectedTxId++; |
| numTrans++; |
| record = reader.readNext(false); |
| } |
| } catch (EndOfStreamException exc) { |
| LOG.info("Encountered EndOfStream on reading records after {}", record); |
| exceptionEncountered = true; |
| } |
| assertEquals((txid - 1), numTrans); |
| assertTrue(exceptionEncountered); |
| exceptionEncountered = false; |
| try { |
| reader.readNext(false); |
| } catch (EndOfStreamException exc) { |
| exceptionEncountered = true; |
| } |
| assertTrue(exceptionEncountered); |
| reader.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testWriteFailsAfterMarkEndOfStream() throws Exception { |
| String name = "distrlog-mark-end-failure"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| |
| long txid = 1; |
| txid = writeAndMarkEndOfStream(dlm, txid); |
| |
| assertEquals(txid - 1, dlm.getLastTxId()); |
| LogRecord last = dlm.getLastLogRecord(); |
| assertEquals(txid - 1, last.getTransactionId()); |
| DLMTestUtil.verifyLogRecord(last); |
| assertTrue(dlm.isEndOfStreamMarked()); |
| |
| LogWriter writer = null; |
| boolean exceptionEncountered = false; |
| try { |
| writer = dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) { |
| writer.write(DLMTestUtil.getLogRecordInstance(txid++)); |
| } |
| } catch (EndOfStreamException exc) { |
| exceptionEncountered = true; |
| } |
| writer.close(); |
| assertTrue(exceptionEncountered); |
| } |
| |
| @Test(timeout = 60000) |
| public void testMarkEndOfStreamOnEmptyStream() throws Exception { |
| markEndOfStreamOnEmptyLogSegment(0); |
| } |
| |
| @Test(timeout = 60000) |
| public void testMarkEndOfStreamOnClosedStream() throws Exception { |
| markEndOfStreamOnEmptyLogSegment(3); |
| } |
| |
| private void markEndOfStreamOnEmptyLogSegment(int numCompletedSegments) throws Exception { |
| String name = "distrlog-mark-end-empty-" + numCompletedSegments; |
| |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| DLMTestUtil.generateCompletedLogSegments(dlm, conf, numCompletedSegments, DEFAULT_SEGMENT_SIZE); |
| |
| BKSyncLogWriter writer = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| writer.markEndOfStream(); |
| |
| LogReader reader = dlm.getInputStream(1); |
| long numTrans = 0; |
| boolean exceptionEncountered = false; |
| try { |
| LogRecord record = reader.readNext(false); |
| long lastTxId = -1; |
| while (null != record) { |
| DLMTestUtil.verifyLogRecord(record); |
| assert (lastTxId < record.getTransactionId()); |
| lastTxId = record.getTransactionId(); |
| numTrans++; |
| record = reader.readNext(false); |
| } |
| } catch (EndOfStreamException exc) { |
| exceptionEncountered = true; |
| } |
| assertEquals(numCompletedSegments * DEFAULT_SEGMENT_SIZE, numTrans); |
| assertTrue(exceptionEncountered); |
| exceptionEncountered = false; |
| try { |
| reader.readNext(false); |
| } catch (EndOfStreamException exc) { |
| exceptionEncountered = true; |
| } |
| assertTrue(exceptionEncountered); |
| reader.close(); |
| } |
| |
| @Test(timeout = 60000, expected = LogRecordTooLongException.class) |
| public void testMaxLogRecSize() throws Exception { |
| DistributedLogManager dlm = createNewDLM(conf, "distrlog-maxlogRecSize"); |
| AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter()); |
| Utils.ioResult(writer.write(new LogRecord(1L, DLMTestUtil.repeatString( |
| DLMTestUtil.repeatString("abcdefgh", 256), 512).getBytes()))); |
| } |
| |
| @Test(timeout = 60000) |
| public void testMaxTransmissionSize() throws Exception { |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(conf); |
| confLocal.setOutputBufferSize(1024 * 1024); |
| BKDistributedLogManager dlm = |
| createNewDLM(confLocal, "distrlog-transmissionSize"); |
| AsyncLogWriter out = Utils.ioResult(dlm.openAsyncLogWriter()); |
| boolean exceptionEncountered = false; |
| byte[] largePayload = new byte[(LogRecord.MAX_LOGRECORDSET_SIZE / 2) + 2]; |
| RAND.nextBytes(largePayload); |
| try { |
| LogRecord op = new LogRecord(1L, largePayload); |
| CompletableFuture<DLSN> firstWriteFuture = out.write(op); |
| op = new LogRecord(2L, largePayload); |
| // the second write will flush the first one, since we reached the maximum transmission size. |
| out.write(op); |
| Utils.ioResult(firstWriteFuture); |
| } catch (LogRecordTooLongException exc) { |
| exceptionEncountered = true; |
| } finally { |
| Utils.ioResult(out.asyncClose()); |
| } |
| assertFalse(exceptionEncountered); |
| Abortables.abortQuietly(out); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void deleteDuringRead() throws Exception { |
| String name = "distrlog-delete-with-reader"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| |
| long txid = 1; |
| for (long i = 0; i < 3; i++) { |
| long start = txid; |
| BKSyncLogWriter writer = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| writer.write(DLMTestUtil.getLogRecordInstance(txid++)); |
| } |
| |
| BKLogSegmentWriter perStreamLogWriter = writer.getCachedLogWriter(); |
| |
| writer.closeAndComplete(); |
| BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); |
| assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, txid - 1, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| Utils.ioResult(blplm.asyncClose()); |
| } |
| |
| LogReader reader = dlm.getInputStream(1); |
| long numTrans = 1; |
| LogRecord record = reader.readNext(false); |
| assert (null != record); |
| DLMTestUtil.verifyLogRecord(record); |
| long lastTxId = record.getTransactionId(); |
| |
| dlm.delete(); |
| |
| boolean exceptionEncountered = false; |
| try { |
| record = reader.readNext(false); |
| while (null != record) { |
| DLMTestUtil.verifyLogRecord(record); |
| assert (lastTxId < record.getTransactionId()); |
| lastTxId = record.getTransactionId(); |
| numTrans++; |
| record = reader.readNext(false); |
| } |
| // make sure the exception is thrown from readahead |
| while (true) { |
| reader.readNext(false); |
| } |
| } catch (LogReadException readexc) { |
| exceptionEncountered = true; |
| } catch (LogNotFoundException exc) { |
| exceptionEncountered = true; |
| } |
| assertTrue(exceptionEncountered); |
| reader.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testImmediateFlush() throws Exception { |
| String name = "distrlog-immediate-flush"; |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(conf); |
| confLocal.setOutputBufferSize(0); |
| testNonPartitionedWritesInternal(name, confLocal); |
| } |
| |
| @Test(timeout = 60000) |
| public void testLastLogRecordWithEmptyLedgers() throws Exception { |
| String name = "distrlog-lastLogRec-emptyledgers"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| long txid = 1; |
| for (long i = 0; i < 3; i++) { |
| long start = txid; |
| BKSyncLogWriter out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| BKLogSegmentWriter perStreamLogWriter = out.getCachedLogWriter(); |
| out.closeAndComplete(); |
| BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); |
| |
| assertNotNull( |
| zkc.exists(blplm.completedLedgerZNode(start, txid - 1, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| BKLogSegmentWriter writer = blplm.startLogSegment(txid - 1); |
| blplm.completeAndCloseLogSegment(writer.getLogSegmentSequenceNumber(), |
| writer.getLogSegmentId(), txid - 1, txid - 1, 0); |
| assertNotNull( |
| zkc.exists(blplm.completedLedgerZNode(txid - 1, txid - 1, |
| writer.getLogSegmentSequenceNumber()), false)); |
| Utils.ioResult(blplm.asyncClose()); |
| } |
| |
| BKSyncLogWriter out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid); |
| op.setControl(); |
| out.write(op); |
| out.flush(); |
| out.commit(); |
| out.abort(); |
| dlm.close(); |
| |
| dlm = createNewDLM(conf, name); |
| |
| assertEquals(txid - 1, dlm.getLastTxId()); |
| LogRecord last = dlm.getLastLogRecord(); |
| assertEquals(txid - 1, last.getTransactionId()); |
| DLMTestUtil.verifyLogRecord(last); |
| assertEquals(txid - 1, dlm.getLogRecordCount()); |
| |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testLogSegmentListener() throws Exception { |
| String name = "distrlog-logsegment-listener"; |
| int numSegments = 3; |
| final CountDownLatch[] latches = new CountDownLatch[numSegments + 1]; |
| for (int i = 0; i < numSegments + 1; i++) { |
| latches[i] = new CountDownLatch(1); |
| } |
| |
| final AtomicInteger numFailures = new AtomicInteger(0); |
| final AtomicReference<Collection<LogSegmentMetadata>> receivedStreams = |
| new AtomicReference<Collection<LogSegmentMetadata>>(); |
| |
| BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name); |
| |
| Utils.ioResult(dlm.getWriterMetadataStore().getLog(dlm.getUri(), name, true, true)); |
| dlm.registerListener(new LogSegmentListener() { |
| @Override |
| public void onSegmentsUpdated(List<LogSegmentMetadata> segments) { |
| int updates = segments.size(); |
| boolean hasIncompletedLogSegments = false; |
| for (LogSegmentMetadata l : segments) { |
| if (l.isInProgress()) { |
| hasIncompletedLogSegments = true; |
| break; |
| } |
| } |
| if (hasIncompletedLogSegments) { |
| return; |
| } |
| if (updates >= 1) { |
| if (segments.get(segments.size() - 1).getLogSegmentSequenceNumber() != updates) { |
| numFailures.incrementAndGet(); |
| } |
| } |
| receivedStreams.set(segments); |
| latches[updates].countDown(); |
| } |
| |
| @Override |
| public void onLogStreamDeleted() { |
| // no-op |
| } |
| }); |
| long txid = 1; |
| for (int i = 0; i < numSegments; i++) { |
| LOG.info("Waiting for creating log segment {}.", i); |
| latches[i].await(); |
| LOG.info("Creating log segment {}.", i); |
| BKSyncLogWriter out = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| LOG.info("Created log segment {}.", i); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| LogRecord op = DLMTestUtil.getLogRecordInstance(txid++); |
| out.write(op); |
| } |
| out.closeAndComplete(); |
| LOG.info("Completed log segment {}.", i); |
| } |
| latches[numSegments].await(); |
| assertEquals(0, numFailures.get()); |
| assertNotNull(receivedStreams.get()); |
| assertEquals(numSegments, receivedStreams.get().size()); |
| int seqno = 1; |
| for (LogSegmentMetadata m : receivedStreams.get()) { |
| assertEquals(seqno, m.getLogSegmentSequenceNumber()); |
| assertEquals((seqno - 1) * DEFAULT_SEGMENT_SIZE + 1, m.getFirstTxId()); |
| assertEquals(seqno * DEFAULT_SEGMENT_SIZE, m.getLastTxId()); |
| ++seqno; |
| } |
| |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testGetLastDLSN() throws Exception { |
| String name = "distrlog-get-last-dlsn"; |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(conf); |
| confLocal.setFirstNumEntriesPerReadLastRecordScan(2); |
| confLocal.setMaxNumEntriesPerReadLastRecordScan(4); |
| confLocal.setImmediateFlushEnabled(true); |
| confLocal.setOutputBufferSize(0); |
| DistributedLogManager dlm = createNewDLM(confLocal, name); |
| BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); |
| long txid = 1; |
| LOG.info("Writing 10 control records"); |
| for (int i = 0; i < 10; i++) { |
| LogRecord record = DLMTestUtil.getLogRecordInstance(txid++); |
| record.setControl(); |
| Utils.ioResult(writer.writeControlRecord(record)); |
| } |
| LOG.info("10 control records are written"); |
| |
| try { |
| dlm.getLastDLSN(); |
| fail("Should fail on getting last dlsn from an empty log."); |
| } catch (LogEmptyException lee) { |
| // expected |
| } |
| |
| writer.closeAndComplete(); |
| LOG.info("Completed first log segment"); |
| |
| writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); |
| Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++))); |
| LOG.info("Completed second log segment"); |
| |
| LOG.info("Writing another 10 control records"); |
| for (int i = 1; i < 10; i++) { |
| LogRecord record = DLMTestUtil.getLogRecordInstance(txid++); |
| record.setControl(); |
| Utils.ioResult(writer.write(record)); |
| } |
| |
| assertEquals(new DLSN(2, 0, 0), dlm.getLastDLSN()); |
| |
| writer.closeAndComplete(); |
| LOG.info("Completed third log segment"); |
| |
| assertEquals(new DLSN(2, 0, 0), dlm.getLastDLSN()); |
| |
| writer.close(); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testGetLogRecordCountAsync() throws Exception { |
| DistributedLogManager dlm = createNewDLM(conf, testNames.getMethodName()); |
| BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned(); |
| DLMTestUtil.generateCompletedLogSegments(dlm, conf, 2, 10); |
| |
| CompletableFuture<Long> futureCount = dlm.getLogRecordCountAsync(DLSN.InitialDLSN); |
| Long count = Utils.ioResult(futureCount, 2, TimeUnit.SECONDS); |
| assertEquals(20, count.longValue()); |
| |
| writer.close(); |
| dlm.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testInvalidStreamFromInvalidZkPath() throws Exception { |
| String baseName = testNames.getMethodName(); |
| String streamName = "\0blah"; |
| URI uri = createDLMURI("/" + baseName); |
| Namespace namespace = NamespaceBuilder.newBuilder() |
| .conf(conf).uri(uri).build(); |
| |
| DistributedLogManager dlm = null; |
| AsyncLogWriter writer = null; |
| try { |
| dlm = namespace.openLog(streamName); |
| writer = dlm.startAsyncLogSegmentNonPartitioned(); |
| fail("should have thrown"); |
| } catch (InvalidStreamNameException e) { |
| } finally { |
| if (null != writer) { |
| Utils.close(writer); |
| } |
| if (null != dlm) { |
| dlm.close(); |
| } |
| namespace.close(); |
| } |
| } |
| |
| @Test(timeout = 60000) |
| public void testTruncationValidation() throws Exception { |
| String name = "distrlog-truncation-validation"; |
| URI uri = createDLMURI("/" + name); |
| ZooKeeperClient zookeeperClient = TestZooKeeperClientBuilder.newBuilder() |
| .uri(uri) |
| .build(); |
| OrderedScheduler scheduler = OrderedScheduler.newBuilder() |
| .name("test-truncation-validation") |
| .corePoolSize(1) |
| .build(); |
| DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); |
| confLocal.loadConf(conf); |
| confLocal.setDLLedgerMetadataLayoutVersion(LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION); |
| confLocal.setOutputBufferSize(0); |
| confLocal.setLogSegmentCacheEnabled(false); |
| |
| LogSegmentMetadataStore metadataStore = new ZKLogSegmentMetadataStore(confLocal, zookeeperClient, scheduler); |
| |
| BKDistributedLogManager dlm = createNewDLM(confLocal, name); |
| DLSN truncDLSN = DLSN.InitialDLSN; |
| DLSN beyondTruncDLSN = DLSN.InitialDLSN; |
| long beyondTruncTxId = 1; |
| long txid = 1; |
| for (long i = 0; i < 3; i++) { |
| long start = txid; |
| BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); |
| for (long j = 1; j <= 10; j++) { |
| LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++); |
| CompletableFuture<DLSN> dlsn = writer.write(record); |
| |
| if (i == 1 && j == 2) { |
| truncDLSN = Utils.ioResult(dlsn); |
| } else if (i == 2 && j == 3) { |
| beyondTruncDLSN = Utils.ioResult(dlsn); |
| beyondTruncTxId = record.getTransactionId(); |
| } else if (j == 10) { |
| Utils.ioResult(dlsn); |
| } |
| } |
| |
| writer.close(); |
| } |
| |
| { |
| LogReader reader = dlm.getInputStream(DLSN.InitialDLSN); |
| LogRecordWithDLSN record = reader.readNext(false); |
| assertTrue((record != null) && (record.getDlsn().compareTo(DLSN.InitialDLSN) == 0)); |
| reader.close(); |
| } |
| |
| Map<Long, LogSegmentMetadata> segmentList = DLMTestUtil.readLogSegments(zookeeperClient, |
| LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); |
| |
| LOG.info("Read segments before truncating first segment : {}", segmentList); |
| |
| MetadataUpdater updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater( |
| confLocal, metadataStore); |
| Utils.ioResult(updater.setLogSegmentTruncated(segmentList.get(1L))); |
| |
| segmentList = DLMTestUtil.readLogSegments(zookeeperClient, |
| LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); |
| |
| LOG.info("Read segments after truncated first segment : {}", segmentList); |
| |
| { |
| LogReader reader = dlm.getInputStream(DLSN.InitialDLSN); |
| LogRecordWithDLSN record = reader.readNext(false); |
| assertTrue("Unexpected record : " + record, |
| (record != null) && (record.getDlsn().compareTo(new DLSN(2, 0, 0)) == 0)); |
| reader.close(); |
| } |
| |
| { |
| LogReader reader = dlm.getInputStream(1); |
| LogRecordWithDLSN record = reader.readNext(false); |
| assertTrue((record != null) && (record.getDlsn().compareTo(new DLSN(2, 0, 0)) == 0)); |
| reader.close(); |
| } |
| |
| updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, metadataStore); |
| Utils.ioResult(updater.setLogSegmentActive(segmentList.get(1L))); |
| |
| segmentList = DLMTestUtil.readLogSegments(zookeeperClient, |
| LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); |
| |
| LOG.info("Read segments after marked first segment as active : {}", segmentList); |
| |
| updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, metadataStore); |
| Utils.ioResult(updater.setLogSegmentTruncated(segmentList.get(2L))); |
| |
| segmentList = DLMTestUtil.readLogSegments(zookeeperClient, |
| LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName())); |
| |
| LOG.info("Read segments after truncated second segment : {}", segmentList); |
| |
| { |
| AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InitialDLSN); |
| long expectedTxId = 1L; |
| boolean exceptionEncountered = false; |
| try { |
| for (int i = 0; i < 3 * 10; i++) { |
| LogRecordWithDLSN record = Utils.ioResult(reader.readNext()); |
| DLMTestUtil.verifyLargeLogRecord(record); |
| assertEquals(expectedTxId, record.getTransactionId()); |
| expectedTxId++; |
| } |
| } catch (AlreadyTruncatedTransactionException exc) { |
| exceptionEncountered = true; |
| } |
| assertTrue(exceptionEncountered); |
| Utils.close(reader); |
| } |
| |
| updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore); |
| Utils.ioResult(updater.setLogSegmentActive(segmentList.get(2L))); |
| |
| BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned(); |
| Assert.assertTrue(Utils.ioResult(writer.truncate(truncDLSN))); |
| BKLogWriteHandler handler = writer.getCachedWriteHandler(); |
| List<LogSegmentMetadata> cachedSegments = handler.getCachedLogSegments(LogSegmentMetadata.COMPARATOR); |
| for (LogSegmentMetadata segment: cachedSegments) { |
| if (segment.getLastDLSN().compareTo(truncDLSN) < 0) { |
| Assert.assertTrue(segment.isTruncated()); |
| Assert.assertTrue(!segment.isPartiallyTruncated()); |
| } else if (segment.getFirstDLSN().compareTo(truncDLSN) < 0) { |
| Assert.assertTrue(!segment.isTruncated()); |
| Assert.assertTrue(segment.isPartiallyTruncated()); |
| } else { |
| Assert.assertTrue(!segment.isTruncated()); |
| Assert.assertTrue(!segment.isPartiallyTruncated()); |
| } |
| } |
| |
| segmentList = DLMTestUtil.readLogSegments(zookeeperClient, |
| LogMetadata.getLogSegmentsPath(uri, name, conf.getUnpartitionedStreamName())); |
| |
| Assert.assertTrue(segmentList.get(truncDLSN.getLogSegmentSequenceNo()) |
| .getMinActiveDLSN().compareTo(truncDLSN) == 0); |
| |
| { |
| LogReader reader = dlm.getInputStream(DLSN.InitialDLSN); |
| LogRecordWithDLSN record = reader.readNext(false); |
| assertTrue(record != null); |
| assertEquals(truncDLSN, record.getDlsn()); |
| reader.close(); |
| } |
| |
| { |
| LogReader reader = dlm.getInputStream(1); |
| LogRecordWithDLSN record = reader.readNext(false); |
| assertTrue(record != null); |
| assertEquals(truncDLSN, record.getDlsn()); |
| reader.close(); |
| } |
| |
| { |
| AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InitialDLSN); |
| LogRecordWithDLSN record = Utils.ioResult(reader.readNext()); |
| assertTrue(record != null); |
| assertEquals(truncDLSN, record.getDlsn()); |
| Utils.close(reader); |
| } |
| |
| |
| { |
| LogReader reader = dlm.getInputStream(beyondTruncDLSN); |
| LogRecordWithDLSN record = reader.readNext(false); |
| assertTrue(record != null); |
| assertEquals(beyondTruncDLSN, record.getDlsn()); |
| reader.close(); |
| } |
| |
| { |
| LogReader reader = dlm.getInputStream(beyondTruncTxId); |
| LogRecordWithDLSN record = reader.readNext(false); |
| assertTrue(record != null); |
| assertEquals(beyondTruncDLSN, record.getDlsn()); |
| assertEquals(beyondTruncTxId, record.getTransactionId()); |
| reader.close(); |
| } |
| |
| { |
| AsyncLogReader reader = dlm.getAsyncLogReader(beyondTruncDLSN); |
| LogRecordWithDLSN record = Utils.ioResult(reader.readNext()); |
| assertTrue(record != null); |
| assertEquals(beyondTruncDLSN, record.getDlsn()); |
| Utils.close(reader); |
| } |
| |
| zookeeperClient.close(); |
| } |
| |
| @Test(timeout = 60000) |
| public void testDeleteLog() throws Exception { |
| String name = "delete-log-should-delete-ledgers"; |
| DistributedLogManager dlm = createNewDLM(conf, name); |
| long txid = 1; |
| // Create the log and write some records |
| BKSyncLogWriter writer = (BKSyncLogWriter) dlm.startLogSegmentNonPartitioned(); |
| for (long j = 1; j <= DEFAULT_SEGMENT_SIZE; j++) { |
| writer.write(DLMTestUtil.getLogRecordInstance(txid++)); |
| } |
| BKLogSegmentWriter perStreamLogWriter = writer.getCachedLogWriter(); |
| writer.closeAndComplete(); |
| BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true); |
| assertNotNull(zkc.exists(blplm.completedLedgerZNode(txid, txid - 1, |
| perStreamLogWriter.getLogSegmentSequenceNumber()), false)); |
| Utils.ioResult(blplm.asyncClose()); |
| |
| // Should be able to open the underline ledger using BK client |
| long ledgerId = perStreamLogWriter.getLogSegmentId(); |
| BKNamespaceDriver driver = (BKNamespaceDriver) dlm.getNamespaceDriver(); |
| driver.getReaderBKC().get().openLedgerNoRecovery(ledgerId, |
| BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8)); |
| // Delete the log and we shouldn't be able the open the ledger |
| dlm.delete(); |
| try { |
| driver.getReaderBKC().get().openLedgerNoRecovery(ledgerId, |
| BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8)); |
| fail("Should fail to open ledger after we delete the log"); |
| } catch (BKException.BKNoSuchLedgerExistsException e) { |
| // ignore |
| } |
| // delete again should not throw any exception |
| try { |
| dlm.delete(); |
| } catch (IOException ioe) { |
| fail("Delete log twice should not throw any exception"); |
| } |
| } |
| } |