blob: c15a38478048a458026a4dfd0a1e38837dffc825 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.Cleanup;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException.LockBusyException;
import org.apache.pulsar.metadata.api.coordination.CoordinationService;
import org.apache.pulsar.metadata.api.coordination.LockManager;
import org.apache.pulsar.metadata.api.coordination.ResourceLock;
import org.apache.pulsar.metadata.api.extended.CreateOption;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;
public class LockManagerTest extends BaseMetadataStoreTest {
@Test(dataProvider = "impl")
public void acquireLocks(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
@Cleanup
CoordinationService coordinationService = new CoordinationServiceImpl(store);
@Cleanup
LockManager<String> lockManager = coordinationService.getLockManager(String.class);
String key = newKey();
assertEquals(lockManager.listLocks(key).join(), Collections.emptyList());
assertEquals(lockManager.readLock(key + "/1").join(), Optional.empty());
ResourceLock<String> lock1 = lockManager.acquireLock(key + "/1", "lock-1").join();
assertEquals(lockManager.listLocks(key).join(), Collections.singletonList("1"));
assertEquals(lockManager.readLock(key + "/1").join(), Optional.of("lock-1"));
assertEquals(lock1.getPath(), key + "/1");
assertEquals(lock1.getValue(), "lock-1");
CountDownLatch latchLock1 = new CountDownLatch(1);
lock1.getLockExpiredFuture().thenRun(() -> {
latchLock1.countDown();
});
assertEquals(latchLock1.getCount(), 1);
assertEquals(lockManager.listLocks(key).join(), Collections.singletonList("1"));
assertEquals(lockManager.readLock(key + "/1").join(), Optional.of("lock-1"));
assertEquals(latchLock1.getCount(), 1);
lock1.release().join();
assertEquals(lockManager.listLocks(key).join(), Collections.emptyList());
assertEquals(lockManager.readLock(key + "/1").join(), Optional.empty());
// The future should have been triggered before the release is complete
latchLock1.await(0, TimeUnit.SECONDS);
// Double release shoud be a no-op
lock1.release().join();
ResourceLock<String> lock2 = lockManager.acquireLock(key + "/1", "lock-1").join();
assertEquals(lockManager.listLocks(key).join(), Collections.singletonList("1"));
assertEquals(lockManager.readLock(key + "/1").join(), Optional.of("lock-1"));
assertEquals(lock2.getPath(), key + "/1");
assertEquals(lock2.getValue(), "lock-1");
}
@Test(dataProvider = "impl")
public void cleanupOnClose(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
@Cleanup
CoordinationService coordinationService = new CoordinationServiceImpl(store);
LockManager<String> lockManager = coordinationService.getLockManager(String.class);
String key = newKey();
assertEquals(lockManager.listLocks(key).join(), Collections.emptyList());
assertEquals(lockManager.readLock(key + "/1").join(), Optional.empty());
lockManager.acquireLock(key + "/1", "lock-1").join();
assertEquals(lockManager.listLocks(key).join(), Collections.singletonList("1"));
assertEquals(lockManager.readLock(key + "/1").join(), Optional.of("lock-1"));
lockManager.acquireLock(key + "/2", "lock-2").join();
assertEquals(lockManager.listLocks(key + "").join(), new ArrayList<>(Arrays.asList("1", "2")));
assertEquals(lockManager.readLock(key + "/2").join(), Optional.of("lock-2"));
lockManager.close();
lockManager = coordinationService.getLockManager(String.class);
assertEquals(lockManager.listLocks(key).join(), Collections.emptyList());
assertEquals(lockManager.readLock(key + "/1").join(), Optional.empty());
assertEquals(lockManager.readLock(key + "/2").join(), Optional.empty());
}
@Test(dataProvider = "impl")
public void updateValue(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
MetadataCache<String> cache = store.getMetadataCache(String.class);
@Cleanup
CoordinationService coordinationService = new CoordinationServiceImpl(store);
@Cleanup
LockManager<String> lockManager = coordinationService.getLockManager(String.class);
String key = newKey();
ResourceLock<String> lock = lockManager.acquireLock(key + "/1", "lock-1").join();
assertEquals(lock.getValue(), "lock-1");
assertEquals(cache.get(key + "/1").join().get(), "lock-1");
lock.updateValue("value-2").join();
assertEquals(lock.getValue(), "value-2");
assertEquals(cache.get(key + "/1").join().get(), "value-2");
}
@Test(dataProvider = "impl")
public void updateValueWhenVersionIsOutOfSync(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
MetadataCache<String> cache = store.getMetadataCache(String.class);
@Cleanup
CoordinationService coordinationService = new CoordinationServiceImpl(store);
@Cleanup
LockManager<String> lockManager = coordinationService.getLockManager(String.class);
String key = newKey();
ResourceLock<String> lock = lockManager.acquireLock(key + "/1", "lock-1").join();
assertEquals(lock.getValue(), "lock-1");
assertEquals(cache.get(key + "/1").join().get(), "lock-1");
store.put(key + "/1",
ObjectMapperFactory.getThreadLocal().writeValueAsBytes("value-2"),
Optional.empty(), EnumSet.of(CreateOption.Ephemeral)).join();
lock.updateValue("value-2").join();
assertEquals(lock.getValue(), "value-2");
assertEquals(cache.get(key + "/1").join().get(), "value-2");
}
@Test(dataProvider = "impl")
public void updateValueWhenKeyDisappears(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
MetadataCache<String> cache = store.getMetadataCache(String.class);
@Cleanup
CoordinationService coordinationService = new CoordinationServiceImpl(store);
@Cleanup
LockManager<String> lockManager = coordinationService.getLockManager(String.class);
String key = newKey();
ResourceLock<String> lock = lockManager.acquireLock(key + "/1", "lock-1").join();
assertEquals(lock.getValue(), "lock-1");
assertEquals(cache.get(key + "/1").join().get(), "lock-1");
store.delete(key + "/1", Optional.empty()).join();
lock.updateValue("value-2").join();
assertEquals(lock.getValue(), "value-2");
assertEquals(cache.get(key + "/1").join().get(), "value-2");
}
@Test(dataProvider = "impl")
public void revalidateLockWithinSameSession(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStoreExtended store = MetadataStoreExtended.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
@Cleanup
CoordinationService cs2 = new CoordinationServiceImpl(store);
@Cleanup
LockManager<String> lm2 = cs2.getLockManager(String.class);
String path1 = newKey();
// Simulate existing lock with same content
store.put(path1, "\"value-1\"".getBytes(StandardCharsets.UTF_8), Optional.of(-1L),
EnumSet.of(CreateOption.Ephemeral)).join();
ResourceLock<String> rl2 = lm2.acquireLock(path1, "value-1").join();
assertEquals(new String(store.get(path1).join().get().getValue()), "\"value-1\"");
assertFalse(rl2.getLockExpiredFuture().isDone());
String path2 = newKey();
// Simulate existing lock with different content
store.put(path2, "\"value-1\"".getBytes(StandardCharsets.UTF_8), Optional.of(-1L),
EnumSet.of(CreateOption.Ephemeral)).join();
rl2 = lm2.acquireLock(path2, "value-2").join();
assertEquals(new String(store.get(path2).join().get().getValue()), "\"value-2\"");
assertFalse(rl2.getLockExpiredFuture().isDone());
}
@Test(dataProvider = "impl")
public void revalidateLockOnDifferentSession(String provider, Supplier<String> urlSupplier) throws Exception {
if (provider.equals("Memory") || provider.equals("RocksDB")) {
// Local memory provider doesn't really have the concept of multiple sessions
return;
}
@Cleanup
MetadataStoreExtended store1 = MetadataStoreExtended.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
@Cleanup
MetadataStoreExtended store2 = MetadataStoreExtended.create(urlSupplier.get(),
MetadataStoreConfig.builder().build());
@Cleanup
CoordinationService cs1 = new CoordinationServiceImpl(store1);
@Cleanup
LockManager<String> lm1 = cs1.getLockManager(String.class);
@Cleanup
CoordinationService cs2 = new CoordinationServiceImpl(store2);
@Cleanup
LockManager<String> lm2 = cs2.getLockManager(String.class);
String path1 = newKey();
// Simulate existing lock with different content
ResourceLock<String> rl1 = lm1.acquireLock(path1, "value-1").join();
try {
lm2.acquireLock(path1, "value-2").join();
} catch (CompletionException e) {
assertEquals(e.getCause().getClass(), LockBusyException.class);
}
// Lock-1 should not get invalidated
assertFalse(rl1.getLockExpiredFuture().isDone());
assertEquals(new String(store1.get(path1).join().get().getValue()), "\"value-1\"");
// Simulate existing lock with same content. The 2nd acquirer will steal the lock
String path2 = newKey();
store1.put(path2, ObjectMapperFactory.getThreadLocal().writeValueAsBytes("value-1"), Optional.of(-1L));
ResourceLock<String> rl2 = lm2.acquireLock(path2, "value-1").join();
assertFalse(rl2.getLockExpiredFuture().isDone());
Awaitility.await().untilAsserted(() -> {
// On 'store1' we might see for a short amount of time an empty result still cached while the lock is
// being reacquired.
assertEquals(new String(store1.get(path2).join().get().getValue()), "\"value-1\"");
});
}
}