blob: 500de5dd13879265e36faa02f0f676afb48b8863 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl;
import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.CURSOR_INTERNAL_PROPERTY_PREFIX;
import static org.apache.bookkeeper.mledger.util.Futures.executeWithRetry;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.util.FutureUtil;
import org.testng.Assert;
import org.testng.annotations.Test;
public class ManagedCursorPropertiesTest extends MockedBookKeeperTestCase {
@Test(timeOut = 20000)
void testPropertiesClose() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
ManagedCursor c1 = ledger.openCursor("c1");
assertEquals(c1.getProperties(), Collections.emptyMap());
ledger.addEntry("entry-1".getBytes());
ledger.addEntry("entry-2".getBytes());
Position p3 = ledger.addEntry("entry-3".getBytes());
ledger.addEntry("entry-4".getBytes());
Map<String, Long> properties = new TreeMap<>();
properties.put("a", 1L);
properties.put("b", 2L);
properties.put("c", 3L);
c1.markDelete(p3, properties);
assertEquals(c1.getProperties(), properties);
Map<String, Long> properties2 = new TreeMap<>();
properties2.put("a", 4L);
properties2.put("b", 5L);
properties2.put("c", 6L);
c1.markDelete(p3, properties2);
assertEquals(c1.getProperties(), properties2);
ledger.close();
// Reopen the managed ledger
ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
c1 = ledger.openCursor("c1");
assertEquals(c1.getMarkDeletedPosition(), p3);
assertEquals(c1.getProperties(), properties2);
}
@Test(timeOut = 20000)
void testPropertiesRecoveryAfterCrash() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
Map<String, String> cursorProperties = new TreeMap<>();
cursorProperties.put("custom1", "one");
cursorProperties.put("custom2", "two");
ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, Collections.emptyMap(), cursorProperties);
assertEquals(c1.getProperties(), Collections.emptyMap());
assertEquals(c1.getCursorProperties(), cursorProperties);
ledger.addEntry("entry-1".getBytes());
ledger.addEntry("entry-2".getBytes());
Position p3 = ledger.addEntry("entry-3".getBytes());
ledger.addEntry("entry-4".getBytes());
Map<String, Long> properties = new TreeMap<>();
properties.put("a", 1L);
properties.put("b", 2L);
properties.put("c", 3L);
c1.markDelete(p3, properties);
// Create a new factory to force a managed ledger close and recovery
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
// Reopen the managed ledger
ledger = factory2.open("my_test_ledger", new ManagedLedgerConfig());
c1 = ledger.openCursor("c1");
assertEquals(c1.getMarkDeletedPosition(), p3);
assertEquals(c1.getProperties(), properties);
assertEquals(c1.getCursorProperties(), cursorProperties);
factory2.shutdown();
}
@Test(timeOut = 20000)
void testPropertiesOnDelete() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
ManagedCursor c1 = ledger.openCursor("c1");
assertEquals(c1.getProperties(), Collections.emptyMap());
ledger.addEntry("entry-1".getBytes());
Position p2 = ledger.addEntry("entry-2".getBytes());
Position p3 = ledger.addEntry("entry-3".getBytes());
ledger.addEntry("entry-4".getBytes());
Map<String, Long> properties = new TreeMap<>();
properties.put("a", 1L);
properties.put("b", 2L);
properties.put("c", 3L);
c1.markDelete(p2, properties);
assertEquals(c1.getProperties(), properties);
// Delete p3 and ensure the properties are carried over
c1.markDelete(p3, properties);
assertEquals(c1.getProperties(), properties);
ledger.close();
// Reopen the managed ledger
ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
c1 = ledger.openCursor("c1");
assertEquals(c1.getMarkDeletedPosition(), p3);
assertEquals(c1.getProperties(), properties);
}
@Test
void testPropertiesAtCreation() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger_at_creation", new ManagedLedgerConfig());
Map<String, Long> properties = new TreeMap<>();
properties.put("a", 1L);
properties.put("b", 2L);
properties.put("c", 3L);
Map<String, String> cursorProperties = new TreeMap<>();
cursorProperties.put("custom1", "one");
cursorProperties.put("custom2", "two");
ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties, cursorProperties);
assertEquals(c1.getProperties(), properties);
assertEquals(c1.getCursorProperties(), cursorProperties);
ledger.addEntry("entry-1".getBytes());
ledger.close();
// Reopen the managed ledger
ledger = factory.open("my_test_ledger_at_creation", new ManagedLedgerConfig());
c1 = ledger.openCursor("c1");
assertEquals(c1.getProperties(), properties);
assertEquals(c1.getCursorProperties(), cursorProperties);
}
@Test
void testUpdateCursorProperties() throws Exception {
ManagedLedger ledger = factory.open("testUpdateCursorProperties", new ManagedLedgerConfig());
Map<String, Long> properties = new TreeMap<>();
properties.put("a", 1L);
Map<String, String> cursorProperties = new TreeMap<>();
cursorProperties.put("custom1", "one");
cursorProperties.put("custom2", "two");
ManagedCursor c1 = ledger.openCursor("c1", InitialPosition.Latest, properties, cursorProperties);
assertEquals(c1.getProperties(), properties);
assertEquals(c1.getCursorProperties(), cursorProperties);
ledger.addEntry("entry-1".getBytes());
Map<String, String> cursorPropertiesUpdated = new TreeMap<>();
cursorPropertiesUpdated.put("custom1", "three");
cursorPropertiesUpdated.put("custom2", "four");
c1.setCursorProperties(cursorPropertiesUpdated).get(10, TimeUnit.SECONDS);
ledger.close();
// Reopen the managed ledger
ledger = factory.open("testUpdateCursorProperties", new ManagedLedgerConfig());
c1 = ledger.openCursor("c1");
assertEquals(c1.getProperties(), properties);
assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);
c1.putCursorProperty("custom3", "Five").get();
cursorPropertiesUpdated.put("custom3", "Five");
c1.removeCursorProperty("custom1").get();
cursorPropertiesUpdated.remove("custom1");
assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);
// Create a new factory to force a managed ledger close and recovery
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
// Reopen the managed ledger
ledger = factory2.open("testUpdateCursorProperties", new ManagedLedgerConfig());
c1 = ledger.openCursor("c1");
assertEquals(c1.getProperties(), properties);
assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);
ledger.close();
factory2.shutdown();
// Create a new factory to force a managed ledger close and recovery
ManagedLedgerFactory factory3 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
// Reopen the managed ledger
ledger = factory3.open("testUpdateCursorProperties", new ManagedLedgerConfig());
c1 = ledger.openCursor("c1");
c1.putCursorProperty(CURSOR_INTERNAL_PROPERTY_PREFIX + "test", "test").get(10, TimeUnit.SECONDS);
c1.putCursorProperty("custom4", "custom4").get(10, TimeUnit.SECONDS);
c1.setCursorProperties(cursorPropertiesUpdated).get(10, TimeUnit.SECONDS);
cursorPropertiesUpdated.put(CURSOR_INTERNAL_PROPERTY_PREFIX + "test", "test");
try {
c1.setCursorProperties(cursorPropertiesUpdated).get(10, TimeUnit.SECONDS);
Assert.fail("Should fail");
} catch (Exception e) {
assertTrue(
FutureUtil.unwrapCompletionException(e).getMessage().contains("The property key can't start with"));
}
assertEquals(c1.getCursorProperties(), cursorPropertiesUpdated);
ledger.close();
factory3.shutdown();
}
@Test
public void testUpdateCursorPropertiesConcurrent() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger", new ManagedLedgerConfig());
ManagedCursor c1 = ledger.openCursor("c1");
List<CompletableFuture<Void>> futures = new ArrayList<>();
Map<String, String> map = new HashMap<>();
map.put("a", "1");
map.put("b", "2");
map.put("c", "3");
futures.add(executeWithRetry(() -> c1.setCursorProperties(map),
ManagedLedgerException.BadVersionException.class, 3));
futures.add(executeWithRetry(() -> c1.putCursorProperty("a", "2"),
ManagedLedgerException.BadVersionException.class, 3));
futures.add(executeWithRetry(() -> c1.removeCursorProperty("c"),
ManagedLedgerException.BadVersionException.class, 3));
for (CompletableFuture<Void> future : futures) {
future.get(10, TimeUnit.SECONDS);
}
assertEquals(c1.getCursorProperties().get("a"), "2");
assertEquals(c1.getCursorProperties().get("b"), "2");
assertNull(c1.getCursorProperties().get("c"));
}
}