blob: 40ecbde3c0899a1ef59d621910be378279db8185 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
import org.testng.annotations.Test;
public class MetaStoreImplTest extends MockedBookKeeperTestCase {
@Data
@AllArgsConstructor
@NoArgsConstructor
static class MyClass {
String a;
int b;
}
@Test
void getMLList() throws Exception {
MetaStore store = new MetaStoreImpl(metadataStore, executor);
metadataStore.failConditional(new MetadataStoreException("error"), (op, path) ->
op == FaultInjectionMetadataStore.OperationType.GET_CHILDREN
&& path.equals("/managed-ledgers")
);
try {
store.getManagedLedgers();
fail("should fail in getting the list");
} catch (MetaStoreException e) {
// ok
}
}
@Test
void deleteNonExistingML() throws Exception {
MetaStore store = new MetaStoreImpl(metadataStore, executor);
AtomicReference<MetaStoreException> exception = new AtomicReference<>();
CountDownLatch counter = new CountDownLatch(1);
store.removeManagedLedger("non-existing", new MetaStoreCallback<Void>() {
@Override
public void operationComplete(Void result, Stat version) {
counter.countDown();
}
@Override
public void operationFailed(MetaStoreException e) {
exception.set(e);
counter.countDown();
}
});
counter.await();
assertNotNull(exception.get());
}
@Test(timeOut = 20000)
void readMalformedML() throws Exception {
MetaStore store = new MetaStoreImpl(metadataStore, executor);
metadataStore.put("/managed-ledgers/my_test", "non-valid".getBytes(), Optional.empty()).join();
final CountDownLatch latch = new CountDownLatch(1);
store.getManagedLedgerInfo("my_test", false, new MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
public void operationFailed(MetaStoreException e) {
// Ok
latch.countDown();
}
public void operationComplete(ManagedLedgerInfo result, Stat version) {
fail("Operation should have failed");
}
});
latch.await();
}
@Test(timeOut = 20000)
void readMalformedCursorNode() throws Exception {
MetaStore store = new MetaStoreImpl(metadataStore, executor);
metadataStore.put("/managed-ledgers/my_test", "".getBytes(), Optional.empty()).join();
metadataStore.put("/managed-ledgers/my_test/c1", "non-valid".getBytes(), Optional.empty()).join();
final CountDownLatch latch = new CountDownLatch(1);
store.asyncGetCursorInfo("my_test", "c1", new MetaStoreCallback<MLDataFormats.ManagedCursorInfo>() {
public void operationFailed(MetaStoreException e) {
// Ok
latch.countDown();
}
public void operationComplete(ManagedCursorInfo result, Stat version) {
fail("Operation should have failed");
}
});
latch.await();
}
@Test(timeOut = 20000)
void failInCreatingMLnode() throws Exception {
MetaStore store = new MetaStoreImpl(metadataStore, executor);
final CompletableFuture<Void> promise = new CompletableFuture<>();
metadataStore.failConditional(new MetadataStoreException("error"), (op, path) ->
op == FaultInjectionMetadataStore.OperationType.PUT
);
store.getManagedLedgerInfo("my_test", false, new MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
public void operationFailed(MetaStoreException e) {
promise.complete(null);
}
public void operationComplete(ManagedLedgerInfo result, Stat version) {
promise.completeExceptionally(new Exception("Operation should have failed"));
}
});
promise.get();
}
@Test(timeOut = 20000)
void updatingCursorNode() throws Exception {
MetaStore store = new MetaStoreImpl(metadataStore, executor);
metadataStore.put("/managed-ledgers/my_test", "".getBytes(), Optional.empty()).join();
final CompletableFuture<Void> promise = new CompletableFuture<>();
ManagedCursorInfo info = ManagedCursorInfo.newBuilder().setCursorsLedgerId(1).build();
store.asyncUpdateCursorInfo("my_test", "c1", info, null, new MetaStoreCallback<Void>() {
public void operationFailed(MetaStoreException e) {
promise.completeExceptionally(e);
}
public void operationComplete(Void result, Stat version) {
// Update again using the version
metadataStore.failConditional(new MetadataStoreException("error"), (op, path) ->
op == FaultInjectionMetadataStore.OperationType.PUT
&& path.contains("my_test") && path.contains("c1")
);
ManagedCursorInfo info = ManagedCursorInfo.newBuilder().setCursorsLedgerId(2).build();
store.asyncUpdateCursorInfo("my_test", "c1", info, version, new MetaStoreCallback<Void>() {
public void operationFailed(MetaStoreException e) {
// ok
promise.complete(null);
}
@Override
public void operationComplete(Void result, Stat version) {
promise.completeExceptionally(new Exception("should have failed"));
}
});
}
});
promise.get();
}
@Test(timeOut = 20000)
void updatingMLNode() throws Exception {
MetaStore store = new MetaStoreImpl(metadataStore, executor);
metadataStore.put("/managed-ledgers/my_test", "".getBytes(), Optional.empty());
final CompletableFuture<Void> promise = new CompletableFuture<>();
store.getManagedLedgerInfo("my_test", false, new MetaStoreCallback<ManagedLedgerInfo>() {
public void operationFailed(MetaStoreException e) {
promise.completeExceptionally(e);
}
public void operationComplete(ManagedLedgerInfo mlInfo, Stat version) {
// Update again using the version
metadataStore.failConditional(new MetadataStoreException.BadVersionException("error"), (op, path) ->
op == FaultInjectionMetadataStore.OperationType.PUT
&& path.contains("my_test")
);
store.asyncUpdateLedgerIds("my_test", mlInfo, version, new MetaStoreCallback<Void>() {
public void operationFailed(MetaStoreException e) {
// ok
promise.complete(null);
}
@Override
public void operationComplete(Void result, Stat version) {
promise.completeExceptionally(new Exception("should have failed"));
}
});
}
});
promise.get();
}
@Test
public void testGetChildrenWatch() throws Exception {
MetadataCache<MyClass> objCache1 = metadataStore.getMetadataCache(MyClass.class);
String path = "/managed-ledgers/prop-xyz/ns1/persistent";
assertTrue(objCache1.getChildren(path).get().isEmpty());
metadataStore.put("/managed-ledgers/prop-xyz/ns1/persistent/t1", "".getBytes(), Optional.empty()).join();
ManagedLedgerTest.retryStrategically((test) -> {
try {
return !objCache1.getChildren(path).get().isEmpty();
} catch (Exception e) {
// Ok
}
return false;
}, 5, 1000);
assertFalse(objCache1.getChildren(path).get().isEmpty());
}
}