blob: 721bfbf08acc57263ac5f4ab79ed5f25a7023e55 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.distributedlog.impl;
import com.google.common.collect.Lists;
import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientUtils;
import org.apache.distributedlog.callback.LogSegmentNamesListener;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.metadata.LogMetadata;
import org.apache.distributedlog.metadata.LogMetadataForWriter;
import org.apache.distributedlog.util.DLUtils;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Transaction;
import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.distributedlog.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
/**
* Test ZK based log segment metadata store.
*/
public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
private static final Logger logger = LoggerFactory.getLogger(TestZKLogSegmentMetadataStore.class);
private final static int zkSessionTimeoutMs = 2000;
private LogSegmentMetadata createLogSegment(
long logSegmentSequenceNumber) {
return createLogSegment(logSegmentSequenceNumber, 99L);
}
private LogSegmentMetadata createLogSegment(
long logSegmentSequenceNumber,
long lastEntryId) {
return DLMTestUtil.completedLogSegment(
"/" + runtime.getMethodName(),
logSegmentSequenceNumber,
logSegmentSequenceNumber,
1L,
100,
logSegmentSequenceNumber,
lastEntryId,
0L,
LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION);
}
@Rule
public TestName runtime = new TestName();
protected final DistributedLogConfiguration baseConf =
new DistributedLogConfiguration();
protected ZooKeeperClient zkc;
protected ZKLogSegmentMetadataStore lsmStore;
protected OrderedScheduler scheduler;
protected URI uri;
protected String rootZkPath;
@Before
public void setup() throws Exception {
zkc = TestZooKeeperClientBuilder.newBuilder()
.uri(createDLMURI("/"))
.sessionTimeoutMs(zkSessionTimeoutMs)
.build();
scheduler = OrderedScheduler.newBuilder()
.name("test-zk-logsegment-metadata-store")
.corePoolSize(1)
.build();
DistributedLogConfiguration conf = new DistributedLogConfiguration();
conf.addConfiguration(baseConf);
this.uri = createDLMURI("/" + runtime.getMethodName());
lsmStore = new ZKLogSegmentMetadataStore(conf, zkc, scheduler);
zkc.get().create(
"/" + runtime.getMethodName(),
new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
this.rootZkPath = "/" + runtime.getMethodName();
}
@After
public void teardown() throws Exception {
if (null != zkc) {
zkc.close();
}
if (null != scheduler) {
scheduler.shutdown();
}
}
@Test(timeout = 60000)
public void testCreateLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L);
Transaction<Object> createTxn = lsmStore.transaction();
lsmStore.createLogSegment(createTxn, segment, null);
Utils.ioResult(createTxn.execute());
// the log segment should be created
assertNotNull("LogSegment " + segment + " should be created",
zkc.get().exists(segment.getZkPath(), false));
LogSegmentMetadata segment2 = createLogSegment(1L);
Transaction<Object> createTxn2 = lsmStore.transaction();
lsmStore.createLogSegment(createTxn2, segment2, null);
try {
Utils.ioResult(createTxn2.execute());
fail("Should fail if log segment exists");
} catch (Throwable t) {
// expected
assertTrue("Should throw NodeExistsException if log segment exists",
t instanceof ZKException);
ZKException zke = (ZKException) t;
assertEquals("Should throw NodeExistsException if log segment exists",
KeeperException.Code.NODEEXISTS, zke.getKeeperExceptionCode());
}
}
@Test(timeout = 60000)
public void testDeleteLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L);
Transaction<Object> createTxn = lsmStore.transaction();
lsmStore.createLogSegment(createTxn, segment, null);
Utils.ioResult(createTxn.execute());
// the log segment should be created
assertNotNull("LogSegment " + segment + " should be created",
zkc.get().exists(segment.getZkPath(), false));
Transaction<Object> deleteTxn = lsmStore.transaction();
lsmStore.deleteLogSegment(deleteTxn, segment, null);
Utils.ioResult(deleteTxn.execute());
assertNull("LogSegment " + segment + " should be deleted",
zkc.get().exists(segment.getZkPath(), false));
}
@Test(timeout = 60000)
public void testDeleteNonExistentLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L);
Transaction<Object> deleteTxn = lsmStore.transaction();
lsmStore.deleteLogSegment(deleteTxn, segment, null);
try {
Utils.ioResult(deleteTxn.execute());
fail("Should fail deletion if log segment doesn't exist");
} catch (Throwable t) {
assertTrue("Should throw NoNodeException if log segment doesn't exist",
t instanceof ZKException);
ZKException zke = (ZKException) t;
assertEquals("Should throw NoNodeException if log segment doesn't exist",
KeeperException.Code.NONODE, zke.getKeeperExceptionCode());
}
}
@Test(timeout = 60000)
public void testUpdateNonExistentLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L);
Transaction<Object> updateTxn = lsmStore.transaction();
lsmStore.updateLogSegment(updateTxn, segment);
try {
Utils.ioResult(updateTxn.execute());
fail("Should fail update if log segment doesn't exist");
} catch (Throwable t) {
assertTrue("Should throw NoNodeException if log segment doesn't exist",
t instanceof ZKException);
ZKException zke = (ZKException) t;
assertEquals("Should throw NoNodeException if log segment doesn't exist",
KeeperException.Code.NONODE, zke.getKeeperExceptionCode());
}
}
@Test(timeout = 60000)
public void testUpdateLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L, 99L);
Transaction<Object> createTxn = lsmStore.transaction();
lsmStore.createLogSegment(createTxn, segment, null);
Utils.ioResult(createTxn.execute());
// the log segment should be created
assertNotNull("LogSegment " + segment + " should be created",
zkc.get().exists(segment.getZkPath(), false));
LogSegmentMetadata modifiedSegment = createLogSegment(1L, 999L);
Transaction<Object> updateTxn = lsmStore.transaction();
lsmStore.updateLogSegment(updateTxn, modifiedSegment);
Utils.ioResult(updateTxn.execute());
// the log segment should be updated
LogSegmentMetadata readSegment =
Utils.ioResult(LogSegmentMetadata.read(zkc, segment.getZkPath(), true));
assertEquals("Last entry id should be changed from 99L to 999L",
999L, readSegment.getLastEntryId());
}
@Test(timeout = 60000)
public void testCreateDeleteLogSegmentSuccess() throws Exception {
LogSegmentMetadata segment1 = createLogSegment(1L);
LogSegmentMetadata segment2 = createLogSegment(2L);
// create log segment 1
Transaction<Object> createTxn = lsmStore.transaction();
lsmStore.createLogSegment(createTxn, segment1, null);
Utils.ioResult(createTxn.execute());
// the log segment should be created
assertNotNull("LogSegment " + segment1 + " should be created",
zkc.get().exists(segment1.getZkPath(), false));
// delete log segment 1 and create log segment 2
Transaction<Object> createDeleteTxn = lsmStore.transaction();
lsmStore.createLogSegment(createDeleteTxn, segment2, null);
lsmStore.deleteLogSegment(createDeleteTxn, segment1, null);
Utils.ioResult(createDeleteTxn.execute());
// segment 1 should be deleted, segment 2 should be created
assertNull("LogSegment " + segment1 + " should be deleted",
zkc.get().exists(segment1.getZkPath(), false));
assertNotNull("LogSegment " + segment2 + " should be created",
zkc.get().exists(segment2.getZkPath(), false));
}
@Test(timeout = 60000)
public void testCreateDeleteLogSegmentFailure() throws Exception {
LogSegmentMetadata segment1 = createLogSegment(1L);
LogSegmentMetadata segment2 = createLogSegment(2L);
LogSegmentMetadata segment3 = createLogSegment(3L);
// create log segment 1
Transaction<Object> createTxn = lsmStore.transaction();
lsmStore.createLogSegment(createTxn, segment1, null);
Utils.ioResult(createTxn.execute());
// the log segment should be created
assertNotNull("LogSegment " + segment1 + " should be created",
zkc.get().exists(segment1.getZkPath(), false));
// delete log segment 1 and delete log segment 2
Transaction<Object> createDeleteTxn = lsmStore.transaction();
lsmStore.deleteLogSegment(createDeleteTxn, segment1, null);
lsmStore.deleteLogSegment(createDeleteTxn, segment2, null);
lsmStore.createLogSegment(createDeleteTxn, segment3, null);
try {
Utils.ioResult(createDeleteTxn.execute());
fail("Should fail transaction if one operation failed");
} catch (Throwable t) {
assertTrue("Transaction is aborted",
t instanceof ZKException);
ZKException zke = (ZKException) t;
assertEquals("Transaction is aborted",
KeeperException.Code.NONODE, zke.getKeeperExceptionCode());
}
// segment 1 should not be deleted
assertNotNull("LogSegment " + segment1 + " should not be deleted",
zkc.get().exists(segment1.getZkPath(), false));
// segment 3 should not be created
assertNull("LogSegment " + segment3 + " should be created",
zkc.get().exists(segment3.getZkPath(), false));
}
@Test(timeout = 60000)
public void testGetLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L, 99L);
Transaction<Object> createTxn = lsmStore.transaction();
lsmStore.createLogSegment(createTxn, segment, null);
Utils.ioResult(createTxn.execute());
// the log segment should be created
assertNotNull("LogSegment " + segment + " should be created",
zkc.get().exists(segment.getZkPath(), false));
LogSegmentMetadata readSegment =
Utils.ioResult(lsmStore.getLogSegment(segment.getZkPath()));
assertEquals("Log segment should match",
segment, readSegment);
}
@Test(timeout = 60000)
public void testGetLogSegmentNames() throws Exception {
Transaction<Object> createTxn = lsmStore.transaction();
List<LogSegmentMetadata> createdSegments = Lists.newArrayListWithExpectedSize(10);
for (int i = 0; i < 10; i++) {
LogSegmentMetadata segment = createLogSegment(i);
createdSegments.add(segment);
lsmStore.createLogSegment(createTxn, segment, null);
}
Utils.ioResult(createTxn.execute());
String rootPath = "/" + runtime.getMethodName();
List<String> children = zkc.get().getChildren(rootPath, false);
Collections.sort(children);
assertEquals("Should find 10 log segments",
10, children.size());
List<String> logSegmentNames =
Utils.ioResult(lsmStore.getLogSegmentNames(rootPath, null)).getValue();
Collections.sort(logSegmentNames);
assertEquals("Should find 10 log segments",
10, logSegmentNames.size());
assertEquals(children, logSegmentNames);
List<CompletableFuture<LogSegmentMetadata>> getFutures = Lists.newArrayListWithExpectedSize(10);
for (int i = 0; i < 10; i++) {
getFutures.add(lsmStore.getLogSegment(rootPath + "/" + logSegmentNames.get(i)));
}
List<LogSegmentMetadata> segments =
Utils.ioResult(FutureUtils.collect(getFutures));
for (int i = 0; i < 10; i++) {
assertEquals(createdSegments.get(i), segments.get(i));
}
}
@Test(timeout = 60000)
public void testRegisterListenerAfterLSMStoreClosed() throws Exception {
lsmStore.close();
LogSegmentMetadata segment = createLogSegment(1L);
lsmStore.getLogSegmentNames(segment.getZkPath(), new LogSegmentNamesListener() {
@Override
public void onSegmentsUpdated(Versioned<List<String>> segments) {
// no-op;
}
@Override
public void onLogStreamDeleted() {
// no-op;
}
});
assertTrue("No listener is registered",
lsmStore.listeners.isEmpty());
}
@Test(timeout = 60000)
public void testLogSegmentNamesListener() throws Exception {
int numSegments = 3;
Transaction<Object> createTxn = lsmStore.transaction();
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
lsmStore.createLogSegment(createTxn, segment, null);
}
Utils.ioResult(createTxn.execute());
String rootPath = "/" + runtime.getMethodName();
List<String> children = zkc.get().getChildren(rootPath, false);
Collections.sort(children);
final AtomicInteger numNotifications = new AtomicInteger(0);
final List<List<String>> segmentLists = Lists.newArrayListWithExpectedSize(2);
LogSegmentNamesListener listener = new LogSegmentNamesListener() {
@Override
public void onSegmentsUpdated(Versioned<List<String>> segments) {
logger.info("Received segments : {}", segments);
segmentLists.add(segments.getValue());
numNotifications.incrementAndGet();
}
@Override
public void onLogStreamDeleted() {
// no-op;
}
};
lsmStore.getLogSegmentNames(rootPath, listener);
assertEquals(1, lsmStore.listeners.size());
assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath));
assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener));
while (numNotifications.get() < 1) {
TimeUnit.MILLISECONDS.sleep(10);
}
assertEquals("Should receive one segment list update",
1, numNotifications.get());
List<String> firstSegmentList = segmentLists.get(0);
Collections.sort(firstSegmentList);
assertEquals("List of segments should be same",
children, firstSegmentList);
logger.info("Create another {} segments.", numSegments);
// create another log segment, it should trigger segment list updated
Transaction<Object> anotherCreateTxn = lsmStore.transaction();
for (int i = numSegments; i < 2 * numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
lsmStore.createLogSegment(anotherCreateTxn, segment, null);
}
Utils.ioResult(anotherCreateTxn.execute());
List<String> newChildren = zkc.get().getChildren(rootPath, false);
Collections.sort(newChildren);
logger.info("All log segments become {}", newChildren);
while (numNotifications.get() < 2) {
TimeUnit.MILLISECONDS.sleep(10);
}
assertEquals("Should receive second segment list update",
2, numNotifications.get());
List<String> secondSegmentList = segmentLists.get(1);
Collections.sort(secondSegmentList);
assertEquals("List of segments should be updated",
2 * numSegments, secondSegmentList.size());
assertEquals("List of segments should be updated",
newChildren, secondSegmentList);
}
@Test(timeout = 60000)
public void testLogSegmentNamesListenerOnDeletion() throws Exception {
int numSegments = 3;
Transaction<Object> createTxn = lsmStore.transaction();
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
lsmStore.createLogSegment(createTxn, segment, null);
}
Utils.ioResult(createTxn.execute());
String rootPath = "/" + runtime.getMethodName();
List<String> children = zkc.get().getChildren(rootPath, false);
Collections.sort(children);
final AtomicInteger numNotifications = new AtomicInteger(0);
final List<List<String>> segmentLists = Lists.newArrayListWithExpectedSize(2);
LogSegmentNamesListener listener = new LogSegmentNamesListener() {
@Override
public void onSegmentsUpdated(Versioned<List<String>> segments) {
logger.info("Received segments : {}", segments);
segmentLists.add(segments.getValue());
numNotifications.incrementAndGet();
}
@Override
public void onLogStreamDeleted() {
// no-op;
}
};
lsmStore.getLogSegmentNames(rootPath, listener);
assertEquals(1, lsmStore.listeners.size());
assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath));
assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener));
while (numNotifications.get() < 1) {
TimeUnit.MILLISECONDS.sleep(10);
}
assertEquals("Should receive one segment list update",
1, numNotifications.get());
List<String> firstSegmentList = segmentLists.get(0);
Collections.sort(firstSegmentList);
assertEquals("List of segments should be same",
children, firstSegmentList);
// delete all log segments, it should trigger segment list updated
Transaction<Object> deleteTxn = lsmStore.transaction();
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
lsmStore.deleteLogSegment(deleteTxn, segment, null);
}
Utils.ioResult(deleteTxn.execute());
List<String> newChildren = zkc.get().getChildren(rootPath, false);
Collections.sort(newChildren);
while (numNotifications.get() < 2) {
TimeUnit.MILLISECONDS.sleep(10);
}
assertEquals("Should receive second segment list update",
2, numNotifications.get());
List<String> secondSegmentList = segmentLists.get(1);
Collections.sort(secondSegmentList);
assertEquals("List of segments should be updated",
0, secondSegmentList.size());
assertEquals("List of segments should be updated",
newChildren, secondSegmentList);
// delete the root path
zkc.get().delete(rootPath, -1);
while (!lsmStore.listeners.isEmpty()) {
TimeUnit.MILLISECONDS.sleep(10);
}
assertTrue("listener should be removed after root path is deleted",
lsmStore.listeners.isEmpty());
}
@Test(timeout = 60000)
public void testLogSegmentNamesListenerOnSessionExpired() throws Exception {
int numSegments = 3;
Transaction<Object> createTxn = lsmStore.transaction();
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
lsmStore.createLogSegment(createTxn, segment, null);
}
Utils.ioResult(createTxn.execute());
String rootPath = "/" + runtime.getMethodName();
List<String> children = zkc.get().getChildren(rootPath, false);
Collections.sort(children);
final AtomicInteger numNotifications = new AtomicInteger(0);
final List<List<String>> segmentLists = Lists.newArrayListWithExpectedSize(2);
LogSegmentNamesListener listener = new LogSegmentNamesListener() {
@Override
public void onSegmentsUpdated(Versioned<List<String>> segments) {
logger.info("Received segments : {}", segments);
segmentLists.add(segments.getValue());
numNotifications.incrementAndGet();
}
@Override
public void onLogStreamDeleted() {
// no-op;
}
};
lsmStore.getLogSegmentNames(rootPath, listener);
assertEquals(1, lsmStore.listeners.size());
assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath));
assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener));
while (numNotifications.get() < 1) {
TimeUnit.MILLISECONDS.sleep(10);
}
assertEquals("Should receive one segment list update",
1, numNotifications.get());
List<String> firstSegmentList = segmentLists.get(0);
Collections.sort(firstSegmentList);
assertEquals("List of segments should be same",
children, firstSegmentList);
ZooKeeperClientUtils.expireSession(zkc,
BKNamespaceDriver.getZKServersFromDLUri(uri), conf.getZKSessionTimeoutMilliseconds());
logger.info("Create another {} segments.", numSegments);
// create another log segment, it should trigger segment list updated
Transaction<Object> anotherCreateTxn = lsmStore.transaction();
for (int i = numSegments; i < 2 * numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
lsmStore.createLogSegment(anotherCreateTxn, segment, null);
}
Utils.ioResult(anotherCreateTxn.execute());
List<String> newChildren = zkc.get().getChildren(rootPath, false);
Collections.sort(newChildren);
logger.info("All log segments become {}", newChildren);
while (numNotifications.get() < 2) {
TimeUnit.MILLISECONDS.sleep(10);
}
assertEquals("Should receive third segment list update",
2, numNotifications.get());
List<String> thirdSegmentList = segmentLists.get(1);
Collections.sort(thirdSegmentList);
assertEquals("List of segments should be updated",
2 * numSegments, thirdSegmentList.size());
assertEquals("List of segments should be updated",
newChildren, thirdSegmentList);
}
@Test(timeout = 60000)
public void testLogSegmentNamesListenerOnDeletingLogStream() throws Exception {
int numSegments = 3;
Transaction<Object> createTxn = lsmStore.transaction();
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
lsmStore.createLogSegment(createTxn, segment, null);
}
Utils.ioResult(createTxn.execute());
String rootPath = "/" + runtime.getMethodName();
List<String> children = zkc.get().getChildren(rootPath, false);
Collections.sort(children);
final AtomicInteger numNotifications = new AtomicInteger(0);
final List<List<String>> segmentLists = Lists.newArrayListWithExpectedSize(2);
final CountDownLatch deleteLatch = new CountDownLatch(1);
LogSegmentNamesListener listener = new LogSegmentNamesListener() {
@Override
public void onSegmentsUpdated(Versioned<List<String>> segments) {
logger.info("Received segments : {}", segments);
segmentLists.add(segments.getValue());
numNotifications.incrementAndGet();
}
@Override
public void onLogStreamDeleted() {
deleteLatch.countDown();
}
};
lsmStore.getLogSegmentNames(rootPath, listener);
assertEquals(1, lsmStore.listeners.size());
assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath));
assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener));
while (numNotifications.get() < 1) {
TimeUnit.MILLISECONDS.sleep(10);
}
assertEquals("Should receive one segment list update",
1, numNotifications.get());
List<String> firstSegmentList = segmentLists.get(0);
Collections.sort(firstSegmentList);
assertEquals("List of segments should be same",
children, firstSegmentList);
// delete all log segments, it should trigger segment list updated
Transaction<Object> deleteTxn = lsmStore.transaction();
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
lsmStore.deleteLogSegment(deleteTxn, segment, null);
}
Utils.ioResult(deleteTxn.execute());
List<String> newChildren = zkc.get().getChildren(rootPath, false);
Collections.sort(newChildren);
while (numNotifications.get() < 2) {
TimeUnit.MILLISECONDS.sleep(10);
}
assertEquals("Should receive second segment list update",
2, numNotifications.get());
List<String> secondSegmentList = segmentLists.get(1);
Collections.sort(secondSegmentList);
assertEquals("List of segments should be updated",
0, secondSegmentList.size());
assertEquals("List of segments should be updated",
newChildren, secondSegmentList);
// delete the root path
zkc.get().delete(rootPath, -1);
while (!lsmStore.listeners.isEmpty()) {
TimeUnit.MILLISECONDS.sleep(10);
}
assertTrue("listener should be removed after root path is deleted",
lsmStore.listeners.isEmpty());
deleteLatch.await();
}
@Test(timeout = 60000)
public void testStoreMaxLogSegmentSequenceNumber() throws Exception {
Transaction<Object> updateTxn = lsmStore.transaction();
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0));
final CompletableFuture<Version> result = new CompletableFuture<Version>();
LogMetadata metadata = mock(LogMetadata.class);
when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath);
lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
result.complete(r);
}
@Override
public void onAbort(Throwable t) {
result.completeExceptionally(t);
}
});
Utils.ioResult(updateTxn.execute());
assertEquals(1, ((ZkVersion) Utils.ioResult(result)).getZnodeVersion());
Stat stat = new Stat();
byte[] data = zkc.get().getData(rootZkPath, false, stat);
assertEquals(999L, DLUtils.deserializeLogSegmentSequenceNumber(data));
assertEquals(1, stat.getVersion());
}
@Test(timeout = 60000)
public void testStoreMaxLogSegmentSequenceNumberBadVersion() throws Exception {
Transaction<Object> updateTxn = lsmStore.transaction();
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
final CompletableFuture<Version> result = new CompletableFuture<Version>();
LogMetadata metadata = mock(LogMetadata.class);
when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath);
lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
result.complete(r);
}
@Override
public void onAbort(Throwable t) {
result.completeExceptionally(t);
}
});
try {
Utils.ioResult(updateTxn.execute());
fail("Should fail on storing log segment sequence number if providing bad version");
} catch (ZKException zke) {
assertEquals(KeeperException.Code.BADVERSION, zke.getKeeperExceptionCode());
}
try {
Utils.ioResult(result);
fail("Should fail on storing log segment sequence number if providing bad version");
} catch (ZKException ze) {
assertEquals(KeeperException.Code.BADVERSION, ze.getKeeperExceptionCode());
}
Stat stat = new Stat();
byte[] data = zkc.get().getData(rootZkPath, false, stat);
assertEquals(0, stat.getVersion());
assertEquals(0, data.length);
}
@Test(timeout = 60000)
public void testStoreMaxLogSegmentSequenceNumberOnNonExistentPath() throws Exception {
Transaction<Object> updateTxn = lsmStore.transaction();
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
final CompletableFuture<Version> result = new CompletableFuture<Version>();
String nonExistentPath = rootZkPath + "/non-existent";
LogMetadata metadata = mock(LogMetadata.class);
when(metadata.getLogSegmentsPath()).thenReturn(nonExistentPath);
lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
result.complete(r);
}
@Override
public void onAbort(Throwable t) {
result.completeExceptionally(t);
}
});
try {
Utils.ioResult(updateTxn.execute());
fail("Should fail on storing log segment sequence number if path doesn't exist");
} catch (ZKException zke) {
assertEquals(KeeperException.Code.NONODE, zke.getKeeperExceptionCode());
}
try {
Utils.ioResult(result);
fail("Should fail on storing log segment sequence number if path doesn't exist");
} catch (ZKException ke) {
assertEquals(KeeperException.Code.NONODE, ke.getKeeperExceptionCode());
}
}
@Test(timeout = 60000)
public void testStoreMaxTxnId() throws Exception {
Transaction<Object> updateTxn = lsmStore.transaction();
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0));
final CompletableFuture<Version> result = new CompletableFuture<Version>();
LogMetadataForWriter metadata = mock(LogMetadataForWriter.class);
when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath);
lsmStore.storeMaxTxnId(updateTxn, metadata, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
result.complete(r);
}
@Override
public void onAbort(Throwable t) {
result.completeExceptionally(t);
}
});
Utils.ioResult(updateTxn.execute());
assertEquals(1, ((ZkVersion) Utils.ioResult(result)).getZnodeVersion());
Stat stat = new Stat();
byte[] data = zkc.get().getData(rootZkPath, false, stat);
assertEquals(999L, DLUtils.deserializeTransactionId(data));
assertEquals(1, stat.getVersion());
}
@Test(timeout = 60000)
public void testStoreMaxTxnIdBadVersion() throws Exception {
Transaction<Object> updateTxn = lsmStore.transaction();
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
final CompletableFuture<Version> result = new CompletableFuture<Version>();
LogMetadataForWriter metadata = mock(LogMetadataForWriter.class);
when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath);
lsmStore.storeMaxTxnId(updateTxn, metadata, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
result.complete(r);
}
@Override
public void onAbort(Throwable t) {
result.completeExceptionally(t);
}
});
try {
Utils.ioResult(updateTxn.execute());
fail("Should fail on storing log record transaction id if providing bad version");
} catch (ZKException zke) {
assertEquals(KeeperException.Code.BADVERSION, zke.getKeeperExceptionCode());
}
try {
Utils.ioResult(result);
fail("Should fail on storing log record transaction id if providing bad version");
} catch (ZKException ze) {
assertEquals(KeeperException.Code.BADVERSION, ze.getKeeperExceptionCode());
}
Stat stat = new Stat();
byte[] data = zkc.get().getData(rootZkPath, false, stat);
assertEquals(0, stat.getVersion());
assertEquals(0, data.length);
}
@Test(timeout = 60000)
public void testStoreMaxTxnIdOnNonExistentPath() throws Exception {
Transaction<Object> updateTxn = lsmStore.transaction();
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
final CompletableFuture<Version> result = new CompletableFuture<Version>();
String nonExistentPath = rootZkPath + "/non-existent";
LogMetadataForWriter metadata = mock(LogMetadataForWriter.class);
when(metadata.getMaxTxIdPath()).thenReturn(nonExistentPath);
lsmStore.storeMaxTxnId(updateTxn, metadata, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
result.complete(r);
}
@Override
public void onAbort(Throwable t) {
result.completeExceptionally(t);
}
});
try {
Utils.ioResult(updateTxn.execute());
fail("Should fail on storing log record transaction id if path doesn't exist");
} catch (ZKException zke) {
assertEquals(KeeperException.Code.NONODE, zke.getKeeperExceptionCode());
}
try {
Utils.ioResult(result);
fail("Should fail on storing log record transaction id if path doesn't exist");
} catch (ZKException ze) {
assertEquals(KeeperException.Code.NONODE, ze.getKeeperExceptionCode());
}
}
}