blob: 52b50e3ea4b084c7eba40d1dcd43d236421058e5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.metadata.bookkeeper;
import static org.apache.bookkeeper.util.BookKeeperConstants.AVAILABLE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.COOKIE_NODE;
import static org.apache.bookkeeper.util.BookKeeperConstants.READONLY;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.discover.RegistrationClient;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.NotificationType;
public class PulsarRegistrationClient implements RegistrationClient {
private final MetadataStore store;
private final String ledgersRootPath;
// registration paths
private final String bookieRegistrationPath;
private final String bookieAllRegistrationPath;
private final String bookieReadonlyRegistrationPath;
private final Map<RegistrationListener, Boolean> writableBookiesWatchers = new ConcurrentHashMap<>();
private final Map<RegistrationListener, Boolean> readOnlyBookiesWatchers = new ConcurrentHashMap<>();
private final ScheduledExecutorService executor;
public PulsarRegistrationClient(MetadataStore store,
String ledgersRootPath) {
this.store = store;
this.ledgersRootPath = ledgersRootPath;
// Following Bookie Network Address Changes is an expensive operation
// as it requires additional ZooKeeper watches
// we can disable this feature, in case the BK cluster has only
// static addresses
this.bookieRegistrationPath = ledgersRootPath + "/" + AVAILABLE_NODE;
this.bookieAllRegistrationPath = ledgersRootPath + "/" + COOKIE_NODE;
this.bookieReadonlyRegistrationPath = this.bookieRegistrationPath + "/" + READONLY;
this.executor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-registration-client"));
store.registerListener(this::updatedBookies);
}
@Override
public void close() {
executor.shutdownNow();
}
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getWritableBookies() {
return getChildren(bookieRegistrationPath);
}
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getAllBookies() {
CompletableFuture<Versioned<Set<BookieId>>> wb = getWritableBookies();
CompletableFuture<Versioned<Set<BookieId>>> rb = getReadOnlyBookies();
return wb.thenCombine(rb, (rw, ro) -> {
Set<BookieId> res = new HashSet<>();
res.addAll(rw.getValue());
res.addAll(ro.getValue());
return new Versioned<>(res, Version.NEW);
});
}
@Override
public CompletableFuture<Versioned<Set<BookieId>>> getReadOnlyBookies() {
return getChildren(bookieReadonlyRegistrationPath);
}
private CompletableFuture<Versioned<Set<BookieId>>> getChildren(String path) {
return store.getChildren(path)
.thenApply(PulsarRegistrationClient::convertToBookieAddresses)
.thenApply(s -> new Versioned<>(s, Version.NEW));
}
@Override
public CompletableFuture<Void> watchWritableBookies(RegistrationListener registrationListener) {
writableBookiesWatchers.put(registrationListener, Boolean.TRUE);
return getWritableBookies()
.thenAcceptAsync(registrationListener::onBookiesChanged, executor);
}
@Override
public void unwatchWritableBookies(RegistrationListener registrationListener) {
writableBookiesWatchers.remove(registrationListener);
}
@Override
public CompletableFuture<Void> watchReadOnlyBookies(RegistrationListener registrationListener) {
readOnlyBookiesWatchers.put(registrationListener, Boolean.TRUE);
return getReadOnlyBookies()
.thenAcceptAsync(registrationListener::onBookiesChanged, executor);
}
@Override
public void unwatchReadOnlyBookies(RegistrationListener registrationListener) {
readOnlyBookiesWatchers.remove(registrationListener);
}
private void updatedBookies(Notification n) {
if (n.getType() == NotificationType.Created || n.getType() == NotificationType.Deleted) {
if (n.getPath().startsWith(bookieReadonlyRegistrationPath)) {
getReadOnlyBookies().thenAccept(bookies ->
readOnlyBookiesWatchers.keySet()
.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
} else if (n.getPath().startsWith(bookieRegistrationPath)) {
getWritableBookies().thenAccept(bookies ->
writableBookiesWatchers.keySet()
.forEach(w -> executor.execute(() -> w.onBookiesChanged(bookies))));
}
}
}
private static Set<BookieId> convertToBookieAddresses(List<String> children) {
// Read the bookie addresses into a set for efficient lookup
HashSet<BookieId> newBookieAddrs = new HashSet<>();
for (String bookieAddrString : children) {
if (READONLY.equals(bookieAddrString)) {
continue;
}
BookieId bookieAddr = BookieId.parse(bookieAddrString);
newBookieAddrs.add(bookieAddr);
}
return newBookieAddrs;
}
}