blob: 856c789b54ede8a024a27575135df06062979d0d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import lombok.Cleanup;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.MetadataStoreException.BadVersionException;
import org.apache.pulsar.metadata.api.MetadataStoreException.NotFoundException;
import org.apache.pulsar.metadata.api.MetadataStoreFactory;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
import org.apache.pulsar.metadata.api.Stat;
import org.assertj.core.util.Lists;
import org.testng.annotations.Test;
@Slf4j
public class MetadataStoreTest extends BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void emptyStoreTest(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
assertFalse(store.exists("/non-existing-key").join());
assertFalse(store.exists("/non-existing-key/child").join());
assertFalse(store.get("/non-existing-key").join().isPresent());
assertFalse(store.get("/non-existing-key/child").join().isPresent());
assertEquals(store.getChildren("/non-existing-key").join(), Collections.emptyList());
assertEquals(store.getChildren("/non-existing-key/child").join(), Collections.emptyList());
try {
store.delete("/non-existing-key", Optional.empty()).join();
fail("Should have failed");
} catch (CompletionException e) {
assertException(e, NotFoundException.class);
}
try {
store.delete("/non-existing-key", Optional.of(1L)).join();
fail("Should have failed");
} catch (CompletionException e) {
assertTrue(NotFoundException.class.isInstance(e.getCause()) || BadVersionException.class.isInstance(
e.getCause()));
}
}
@Test(dataProvider = "impl")
public void insertionTestWithExpectedVersion(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
String key1 = newKey();
try {
store.put(key1, "value-1".getBytes(), Optional.of(0L)).join();
fail("Should have failed");
} catch (CompletionException e) {
assertException(e, BadVersionException.class);
}
try {
store.put(key1, "value-1".getBytes(), Optional.of(1L)).join();
fail("Should have failed");
} catch (CompletionException e) {
assertException(e, BadVersionException.class);
}
store.put(key1, "value-1".getBytes(), Optional.of(-1L)).join();
assertTrue(store.exists(key1).join());
Optional<GetResult> optRes = store.get(key1).join();
assertTrue(optRes.isPresent());
assertEquals(optRes.get().getValue(), "value-1".getBytes());
assertEquals(optRes.get().getStat().getVersion(), 0);
try {
store.put(key1, "value-2".getBytes(), Optional.of(-1L)).join();
fail("Should have failed");
} catch (CompletionException e) {
assertException(e, BadVersionException.class);
}
try {
store.put(key1, "value-2".getBytes(), Optional.of(1L)).join();
fail("Should have failed");
} catch (CompletionException e) {
assertException(e, BadVersionException.class);
}
assertTrue(store.exists(key1).join());
optRes = store.get(key1).join();
assertTrue(optRes.isPresent());
assertEquals(optRes.get().getValue(), "value-1".getBytes());
assertEquals(optRes.get().getStat().getVersion(), 0);
store.put(key1, "value-2".getBytes(), Optional.of(0L)).join();
assertTrue(store.exists(key1).join());
optRes = store.get(key1).join();
assertTrue(optRes.isPresent());
assertEquals(optRes.get().getValue(), "value-2".getBytes());
assertEquals(optRes.get().getStat().getVersion(), 1);
}
@Test(dataProvider = "impl")
public void getChildrenTest(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
String key = newKey();
int n = 10;
List<String> expectedChildren = new ArrayList<>();
assertEquals(store.getChildren(key).join(), Collections.emptyList());
for (int i = 0; i < n; i++) {
store.put(key + "/c-" + i, new byte[0], Optional.empty()).join();
expectedChildren.add("c-" + i);
}
assertEquals(store.getChildren(key).join(), expectedChildren);
// Nested children
for (int i = 0; i < n; i++) {
store.put(key + "/c-0/cc-" + i, new byte[0], Optional.empty()).join();
}
assertEquals(store.getChildren(key).join(), expectedChildren);
for (int i = 0; i < n; i++) {
store.deleteRecursive(key + "/c-" + i).join();
}
assertEquals(store.getChildren(key).join(), Collections.emptyList());
}
@Test(dataProvider = "impl")
public void navigateChildrenTest(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
String key = newKey();
// Nested children
store.put(key + "/c-0/cc-1", new byte[0], Optional.empty()).join();
store.put(key + "/c-0/cc-2/ccc-1", new byte[0], Optional.empty()).join();
assertEquals(store.getChildren(key).join(), Collections.singletonList("c-0"));
assertEquals(store.getChildren(key + "/c-0").join(),
Lists.newArrayList("cc-1", "cc-2"));
assertEquals(store.getChildren(key + "/c-0/cc-2").join(),
Lists.newArrayList("ccc-1"));
}
@Test(dataProvider = "impl")
public void deletionTest(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
String key = newKey();
int n = 10;
List<String> expectedChildren = new ArrayList<>();
for (int i = 0; i < n; i++) {
store.put(key + "/c-" + i, new byte[0], Optional.empty()).join();
expectedChildren.add("c-" + i);
}
try {
store.delete(key, Optional.empty()).join();
fail("The key has children");
} catch (CompletionException e) {
assertException(e, MetadataStoreException.class);
}
for (int i = 0; i < n; i++) {
try {
store.delete(key + "/c-" + i, Optional.of(1L)).join();
fail("The key has children");
} catch (CompletionException e) {
assertException(e, BadVersionException.class);
}
store.delete(key + "/c-" + i, Optional.empty()).join();
}
}
@Test(dataProvider = "impl")
public void emptyKeyTest(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
try {
store.delete("", Optional.empty()).join();
fail("The key cannot be empty");
} catch (CompletionException e) {
assertException(e, MetadataStoreException.class);
}
try {
store.getChildren("").join();
fail("The key cannot be empty");
} catch (CompletionException e) {
assertException(e, MetadataStoreException.class);
}
try {
store.get("").join();
fail("The key cannot be empty");
} catch (CompletionException e) {
assertException(e, MetadataStoreException.class);
}
try {
store.exists("").join();
fail("The key cannot be empty");
} catch (CompletionException e) {
assertException(e, MetadataStoreException.class);
}
try {
store.put("", new byte[0], Optional.empty()).join();
fail("The key cannot be empty");
} catch (CompletionException e) {
assertException(e, MetadataStoreException.class);
}
}
@Test(dataProvider = "impl")
public void notificationListeners(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
BlockingQueue<Notification> notifications = new LinkedBlockingDeque<>();
store.registerListener(n -> {
notifications.add(n);
});
String key1 = newKey();
assertFalse(store.get(key1).join().isPresent());
// Trigger created notification
Stat stat = store.put(key1, "value-1".getBytes(), Optional.empty()).join();
assertTrue(store.get(key1).join().isPresent());
assertEquals(store.getChildren(key1).join(), Collections.emptyList());
assertEquals(stat.getVersion(), 0);
Notification n = notifications.poll(3, TimeUnit.SECONDS);
assertNotNull(n);
assertEquals(n.getType(), NotificationType.Created);
assertEquals(n.getPath(), key1);
// Trigger modified notification
stat = store.put(key1, "value-2".getBytes(), Optional.empty()).join();
n = notifications.poll(3, TimeUnit.SECONDS);
assertNotNull(n);
assertEquals(n.getType(), NotificationType.Modified);
assertEquals(n.getPath(), key1);
assertEquals(stat.getVersion(), 1);
// Trigger modified notification on the parent
String key1Child = key1 + "/xx";
assertFalse(store.get(key1Child).join().isPresent());
store.put(key1Child, "value-2".getBytes(), Optional.empty()).join();
n = notifications.poll(3, TimeUnit.SECONDS);
assertNotNull(n);
assertEquals(n.getType(), NotificationType.Created);
assertEquals(n.getPath(), key1Child);
n = notifications.poll(3, TimeUnit.SECONDS);
assertNotNull(n);
assertEquals(n.getType(), NotificationType.ChildrenChanged);
assertEquals(n.getPath(), key1);
assertTrue(store.exists(key1Child).join());
assertEquals(store.getChildren(key1).join(), Collections.singletonList("xx"));
store.delete(key1Child, Optional.empty()).join();
n = notifications.poll(3, TimeUnit.SECONDS);
assertNotNull(n);
assertEquals(n.getType(), NotificationType.Deleted);
assertEquals(n.getPath(), key1Child);
// Parent should be notified of the deletion
n = notifications.poll(3, TimeUnit.SECONDS);
assertNotNull(n);
assertEquals(n.getType(), NotificationType.ChildrenChanged);
assertEquals(n.getPath(), key1);
}
@Test(dataProvider = "impl")
public void testDeleteRecursive(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
String prefix = newKey();
String key1 = newKey();
store.put(prefix + key1, "value-1".getBytes(), Optional.of(-1L)).join();
store.put(prefix + key1 + "/c1", "value".getBytes(), Optional.of(-1L)).join();
store.put(prefix + key1 + "/c2", "value".getBytes(), Optional.of(-1L)).join();
store.put(prefix + key1 + "/c1/x1", "value".getBytes(), Optional.of(-1L)).join();
store.put(prefix + key1 + "/c1/x2", "value".getBytes(), Optional.of(-1L)).join();
store.put(prefix + key1 + "/c2/y2", "value".getBytes(), Optional.of(-1L)).join();
store.put(prefix + key1 + "/c3", "value".getBytes(), Optional.of(-1L)).join();
String key2 = newKey();
store.put(prefix + key2, "value-2".getBytes(), Optional.of(-1L)).join();
store.deleteRecursive(prefix + key1).join();
assertEquals(store.getChildren(prefix).join(), Collections.singletonList(key2.substring(1)));
}
@Test(dataProvider = "impl")
public void testDeleteUnusedDirectories(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
String prefix = newKey();
store.put(prefix + "/a1/b1/c1", "value".getBytes(), Optional.of(-1L)).join();
store.put(prefix + "/a1/b1/c2", "value".getBytes(), Optional.of(-1L)).join();
store.put(prefix + "/a1/b2/c1", "value".getBytes(), Optional.of(-1L)).join();
store.delete(prefix + "/a1/b1/c1", Optional.empty()).join();
store.delete(prefix + "/a1/b1/c2", Optional.empty()).join();
zks.checkContainers();
assertFalse(store.exists(prefix + "/a1/b1").join());
store.delete(prefix + "/a1/b2/c1", Optional.empty()).join();
zks.checkContainers();
assertFalse(store.exists(prefix + "/a1/b2").join());
zks.checkContainers();
assertFalse(store.exists(prefix + "/a1").join());
zks.checkContainers();
assertFalse(store.exists(prefix).join());
}
@Test(dataProvider = "impl")
public void testPersistent(String provider, Supplier<String> urlSupplier) throws Exception {
String metadataUrl = urlSupplier.get();
MetadataStore store = MetadataStoreFactory.create(metadataUrl, MetadataStoreConfig.builder().build());
byte[] data = "testPersistent".getBytes(StandardCharsets.UTF_8);
String key = newKey() + "/a/b/c";
store.put(key, data, Optional.of(-1L)).join();
store.close();
store = MetadataStoreFactory.create(metadataUrl, MetadataStoreConfig.builder().build());
Optional<GetResult> result = store.get(key).get();
assertTrue(result.isPresent());
assertEquals(result.get().getValue(), data);
store.close();
}
@Test(dataProvider = "impl")
public void testConcurrentPutGetOneKey(String provider, Supplier<String> urlSupplier) throws Exception {
MetadataStore store = MetadataStoreFactory.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
byte[] data = new byte[]{0};
String path = newKey();
int maxValue = 100;
store.put(path, data, Optional.of(-1L)).join();
AtomicInteger successWrites = new AtomicInteger(0);
Runnable task = new Runnable() {
@SneakyThrows
@Override
public void run() {
byte value;
while (true) {
GetResult readResult = store.get(path).get().get();
value = (byte) (readResult.getValue()[0] + 1);
if (value <= maxValue) {
CompletableFuture<Void> putResult =
store.put(path, new byte[]{value}, Optional.of(readResult.getStat().getVersion()))
.thenRun(successWrites::incrementAndGet);
try {
putResult.get();
} catch (Exception ignore) {
}
log.info("Put value {} success:{}. ", value, !putResult.isCompletedExceptionally());
} else {
break;
}
}
}
};
CompletableFuture<Void> t1 = CompletableFuture.completedFuture(null).thenRunAsync(task);
CompletableFuture<Void> t2 = CompletableFuture.completedFuture(null).thenRunAsync(task);
task.run();
t1.join();
t2.join();
assertFalse(t1.isCompletedExceptionally());
assertFalse(t2.isCompletedExceptionally());
assertEquals(successWrites.get(), maxValue);
assertEquals(store.get(path).get().get().getValue()[0], maxValue);
}
}