Revert "Make BookieId work with PulsarRegistrationDriver (#17762)" (#17914)
This reverts commit 8d7ac33751c62383b510a04ec223981bd70cd4db.
(cherry picked from commit 9d6c34ea5d77bb96ecc21b1ec3a18fa4b730e7bd)
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java
index b7e3024..78a3317 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/BookieServiceInfoSerde.java
@@ -24,7 +24,6 @@
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.discover.BookieServiceInfo;
-import org.apache.bookkeeper.discover.BookieServiceInfoUtils;
import org.apache.bookkeeper.proto.DataFormats.BookieServiceInfoFormat;
import org.apache.pulsar.metadata.api.MetadataSerde;
import org.apache.pulsar.metadata.api.Stat;
@@ -64,57 +63,7 @@
}
@Override
- public BookieServiceInfo deserialize(String path, byte[] bookieServiceInfo, Stat stat) throws IOException {
- // see https://github.com/apache/bookkeeper/blob/
- // 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/
- // src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L311
- String bookieId = extractBookiedIdFromPath(path);
- if (bookieServiceInfo == null || bookieServiceInfo.length == 0) {
- return BookieServiceInfoUtils.buildLegacyBookieServiceInfo(bookieId);
- }
-
- BookieServiceInfoFormat builder = BookieServiceInfoFormat.parseFrom(bookieServiceInfo);
- BookieServiceInfo bsi = new BookieServiceInfo();
- List<BookieServiceInfo.Endpoint> endpoints = builder.getEndpointsList().stream()
- .map(e -> {
- BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint();
- endpoint.setId(e.getId());
- endpoint.setPort(e.getPort());
- endpoint.setHost(e.getHost());
- endpoint.setProtocol(e.getProtocol());
- endpoint.setAuth(e.getAuthList());
- endpoint.setExtensions(e.getExtensionsList());
- return endpoint;
- })
- .collect(Collectors.toList());
-
- bsi.setEndpoints(endpoints);
- bsi.setProperties(builder.getPropertiesMap());
-
- return bsi;
-
- }
-
- /**
- * Extract the BookieId
- * The path should look like /ledgers/available/bookieId
- * or /ledgers/available/readonly/bookieId.
- * But the prefix depends on the configuration.
- * @param path
- * @return the bookieId
- */
- private static String extractBookiedIdFromPath(String path) throws IOException {
- // https://github.com/apache/bookkeeper/blob/
- // 034ef8566ad037937a4d58a28f70631175744f53/bookkeeper-server/
- // src/main/java/org/apache/bookkeeper/discover/ZKRegistrationClient.java#L258
- if (path == null) {
- path = "";
- }
- int last = path.lastIndexOf("/");
- if (last >= 0) {
- return path.substring(last + 1);
- } else {
- throw new IOException("The path " + path + " doesn't look like a valid path for a BookieServiceInfo node");
- }
+ public BookieServiceInfo deserialize(String path, byte[] content, Stat stat) throws IOException {
+ return null;
}
}
diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
index 1c69240..52b50e3 100644
--- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
+++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClient.java
@@ -25,21 +25,15 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import org.apache.bookkeeper.client.BKException;
-import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieId;
-import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
@@ -55,14 +49,12 @@
private final Map<RegistrationListener, Boolean> writableBookiesWatchers = new ConcurrentHashMap<>();
private final Map<RegistrationListener, Boolean> readOnlyBookiesWatchers = new ConcurrentHashMap<>();
- private final MetadataCache<BookieServiceInfo> bookieServiceInfoMetadataCache;
private final ScheduledExecutorService executor;
public PulsarRegistrationClient(MetadataStore store,
String ledgersRootPath) {
this.store = store;
this.ledgersRootPath = ledgersRootPath;
- this.bookieServiceInfoMetadataCache = store.getMetadataCache(BookieServiceInfoSerde.INSTANCE);
// Following Bookie Network Address Changes is an expensive operation
// as it requires additional ZooKeeper watches
@@ -161,32 +153,4 @@
}
return newBookieAddrs;
}
-
- @Override
- public CompletableFuture<Versioned<BookieServiceInfo>> getBookieServiceInfo(BookieId bookieId) {
- String asWritable = bookieRegistrationPath + "/" + bookieId;
-
- return bookieServiceInfoMetadataCache.get(asWritable)
- .thenCompose((Optional<BookieServiceInfo> getResult) -> {
- if (getResult.isPresent()) {
- return CompletableFuture.completedFuture(new Versioned<>(getResult.get(),
- new LongVersion(-1)));
- } else {
- return readBookieInfoAsReadonlyBookie(bookieId);
- }
- }
- );
- }
-
- final CompletableFuture<Versioned<BookieServiceInfo>> readBookieInfoAsReadonlyBookie(BookieId bookieId) {
- String asReadonly = bookieReadonlyRegistrationPath + "/" + bookieId;
- return bookieServiceInfoMetadataCache.get(asReadonly)
- .thenApply((Optional<BookieServiceInfo> getResultAsReadOnly) -> {
- if (getResultAsReadOnly.isPresent()) {
- return new Versioned<>(getResultAsReadOnly.get(), new LongVersion(-1));
- } else {
- throw new CompletionException(new BKException.BKBookieHandleNotAvailableException());
- }
- });
- }
}
diff --git a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
index 914e70e..38195b2 100644
--- a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
+++ b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/bookkeeper/PulsarRegistrationClientTest.java
@@ -23,8 +23,6 @@
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.mock;
import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -117,63 +115,6 @@
}
@Test(dataProvider = "impl")
- public void testGetBookieServiceInfo(String provider, Supplier<String> urlSupplier) throws Exception {
- @Cleanup
- MetadataStoreExtended store =
- MetadataStoreExtended.create(urlSupplier.get(), MetadataStoreConfig.builder().build());
-
- String ledgersRoot = "/test/ledgers-" + UUID.randomUUID();
-
- @Cleanup
- RegistrationManager rm = new PulsarRegistrationManager(store, ledgersRoot, mock(AbstractConfiguration.class));
-
- @Cleanup
- RegistrationClient rc = new PulsarRegistrationClient(store, ledgersRoot);
-
- List<BookieId> addresses = new ArrayList<>(prepareNBookies(10));
- List<BookieServiceInfo> bookieServiceInfos = new ArrayList<>();
- int port = 223;
- for (BookieId address : addresses) {
- BookieServiceInfo info = new BookieServiceInfo();
- BookieServiceInfo.Endpoint endpoint = new BookieServiceInfo.Endpoint();
- endpoint.setAuth(Collections.emptyList());
- endpoint.setExtensions(Collections.emptyList());
- endpoint.setId("id");
- endpoint.setHost("localhost");
- endpoint.setPort(port++);
- endpoint.setProtocol("bookie-rpc");
- info.setEndpoints(Arrays.asList(endpoint));
- bookieServiceInfos.add(info);
- // some readonly, some writable
- boolean readOnly = port % 2 == 0;
- rm.registerBookie(address, readOnly, info);
- }
-
- int i = 0;
- for (BookieId address : addresses) {
- BookieServiceInfo bookieServiceInfo = rc.getBookieServiceInfo(address).get().getValue();
- compareBookieServiceInfo(bookieServiceInfo, bookieServiceInfos.get(i++));
- }
-
- }
-
- private void compareBookieServiceInfo(BookieServiceInfo a, BookieServiceInfo b) {
- assertEquals(a.getProperties(), b.getProperties());
- assertEquals(a.getEndpoints().size(), b.getEndpoints().size());
- for (int i = 0; i < a.getEndpoints().size(); i++) {
- BookieServiceInfo.Endpoint e1 = a.getEndpoints().get(i);
- BookieServiceInfo.Endpoint e2 = b.getEndpoints().get(i);
- assertEquals(e1.getHost(), e2.getHost());
- assertEquals(e1.getPort(), e2.getPort());
- assertEquals(e1.getId(), e2.getId());
- assertEquals(e1.getProtocol(), e2.getProtocol());
- assertEquals(e1.getExtensions(), e2.getExtensions());
- assertEquals(e1.getAuth(), e2.getAuth());
- }
-
- }
-
- @Test(dataProvider = "impl")
public void testGetAllBookies(String provider, Supplier<String> urlSupplier) throws Exception {
@Cleanup
MetadataStoreExtended store =