blob: 53e1a683c553aea70fa4a73a33f00b5f69550e6f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.bookie.rackawareness;
import static org.apache.pulsar.metadata.bookkeeper.AbstractMetadataDriver.METADATA_STORE_SCHEME;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.client.ITopologyAwareEnsemblePlacementPolicy;
import org.apache.bookkeeper.client.RackChangeNotifier;
import org.apache.bookkeeper.meta.exceptions.Code;
import org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.bookkeeper.net.AbstractDNSToSwitchMapping;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.net.BookieNode;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.policies.data.BookieInfo;
import org.apache.pulsar.common.policies.data.BookiesRackConfiguration;
import org.apache.pulsar.metadata.api.MetadataCache;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Notification;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* It provides the mapping of bookies to its rack from zookeeper.
*/
public class BookieRackAffinityMapping extends AbstractDNSToSwitchMapping
implements RackChangeNotifier {
private static final Logger LOG = LoggerFactory.getLogger(BookieRackAffinityMapping.class);
public static final String BOOKIE_INFO_ROOT_PATH = "/bookies";
public static final String METADATA_STORE_INSTANCE = "METADATA_STORE_INSTANCE";
private MetadataCache<BookiesRackConfiguration> bookieMappingCache = null;
private ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy = null;
private List<BookieId> bookieAddressListLastTime = new ArrayList<>();
private BookiesRackConfiguration racksWithHost = new BookiesRackConfiguration();
private Map<String, BookieInfo> bookieInfoMap = new HashMap<>();
public static MetadataStore createMetadataStore(Configuration conf) throws MetadataException {
MetadataStore store;
Object storeProperty = conf.getProperty(METADATA_STORE_INSTANCE);
if (storeProperty != null) {
if (!(storeProperty instanceof MetadataStore)) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " is not an instance of MetadataStore");
}
store = (MetadataStore) storeProperty;
} else {
String url;
String metadataServiceUri = ConfigurationStringUtil.castToString(conf.getProperty("metadataServiceUri"));
if (StringUtils.isNotBlank(metadataServiceUri)) {
try {
url = metadataServiceUri.replaceFirst(METADATA_STORE_SCHEME + ":", "")
.replace(";", ",");
} catch (Exception e) {
throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);
}
} else {
String zkServers = ConfigurationStringUtil.castToString(conf.getProperty("zkServers"));
if (StringUtils.isBlank(zkServers)) {
String errorMsg = String.format("Neither %s configuration set in the BK client configuration nor "
+ "metadataServiceUri/zkServers set in bk server configuration", METADATA_STORE_INSTANCE);
throw new RuntimeException(errorMsg);
}
url = zkServers;
}
try {
int zkTimeout = Integer.parseInt((String) conf.getProperty("zkTimeout"));
store = MetadataStoreExtended.create(url,
MetadataStoreConfig.builder()
.sessionTimeoutMillis(zkTimeout)
.build());
} catch (MetadataStoreException e) {
throw new MetadataException(Code.METADATA_SERVICE_ERROR, e);
}
}
return store;
}
@Override
public synchronized void setConf(Configuration conf) {
super.setConf(conf);
MetadataStore store;
try {
store = createMetadataStore(conf);
bookieMappingCache = store.getMetadataCache(BookiesRackConfiguration.class);
store.registerListener(this::handleUpdates);
racksWithHost = bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH).get()
.orElseGet(BookiesRackConfiguration::new);
updateRacksWithHost(racksWithHost);
for (Map<String, BookieInfo> bookieMapping : racksWithHost.values()) {
for (String address : bookieMapping.keySet()) {
bookieAddressListLastTime.add(BookieId.parse(address));
}
if (LOG.isDebugEnabled()) {
LOG.debug("BookieRackAffinityMapping init, bookieAddressListLastTime {}",
bookieAddressListLastTime);
}
}
} catch (InterruptedException | ExecutionException | MetadataException e) {
throw new RuntimeException(METADATA_STORE_INSTANCE + " failed to init BookieId list");
}
}
private synchronized void updateRacksWithHost(BookiesRackConfiguration racks) {
// In config z-node, the bookies are added in the `ip:port` notation, while BK will ask
// for just the IP/hostname when trying to get the rack for a bookie.
// To work around this issue, we insert in the map the bookie ip/hostname with same rack-info
BookiesRackConfiguration newRacksWithHost = new BookiesRackConfiguration();
Map<String, BookieInfo> newBookieInfoMap = new HashMap<>();
racks.forEach((group, bookies) ->
bookies.forEach((addr, bi) -> {
try {
BookieId bookieId = BookieId.parse(addr);
BookieAddressResolver addressResolver = getBookieAddressResolver();
if (addressResolver == null) {
LOG.warn("Bookie address resolver not yet initialized, skipping resolution");
} else {
BookieSocketAddress bsa = addressResolver.resolve(bookieId);
newRacksWithHost.updateBookie(group, bsa.toString(), bi);
String hostname = bsa.getSocketAddress().getHostName();
newBookieInfoMap.put(hostname, bi);
InetAddress address = bsa.getSocketAddress().getAddress();
if (null != address) {
String hostIp = address.getHostAddress();
if (null != hostIp) {
newBookieInfoMap.put(hostIp, bi);
}
} else {
LOG.info("Network address for {} is unresolvable yet.", addr);
}
}
} catch (BookieAddressResolver.BookieIdNotResolvedException e) {
LOG.info("Network address for {} is unresolvable yet. error is {}", addr, e);
}
})
);
racksWithHost = newRacksWithHost;
bookieInfoMap = newBookieInfoMap;
}
@Override
public synchronized List<String> resolve(List<String> bookieAddressList) {
List<String> racks = new ArrayList<>(bookieAddressList.size());
for (String bookieAddress : bookieAddressList) {
racks.add(getRack(bookieAddress));
}
return racks;
}
private String getRack(String bookieAddress) {
BookieInfo bi = bookieInfoMap.get(bookieAddress);
if (bi == null) {
bi = racksWithHost.getBookie(bookieAddress).orElse(null);
}
if (bi != null
&& !StringUtils.isEmpty(bi.getRack())
&& !bi.getRack().trim().equals("/")) {
String rack = bi.getRack();
if (!rack.startsWith("/")) {
rack = "/" + rack;
}
return rack;
} else {
// since different placement policy will have different default rack,
// don't be smart here and just return null
return null;
}
}
@Override
public String toString() {
return "zk based bookie rack affinity mapping";
}
@Override
public void reloadCachedMappings() {
// no-op
}
private void handleUpdates(Notification n) {
if (!n.getPath().equals(BOOKIE_INFO_ROOT_PATH)) {
return;
}
bookieMappingCache.get(BOOKIE_INFO_ROOT_PATH)
.thenAccept(optVal -> {
synchronized (this) {
LOG.info("Bookie rack info updated to {}. Notifying rackaware policy.", optVal);
this.updateRacksWithHost(optVal.orElseGet(BookiesRackConfiguration::new));
List<BookieId> bookieAddressList = new ArrayList<>();
for (Map<String, BookieInfo> bookieMapping : optVal.map(Map::values).orElse(
Collections.emptyList())) {
for (String addr : bookieMapping.keySet()) {
bookieAddressList.add(BookieId.parse(addr));
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Bookies with rack update from {} to {}", bookieAddressListLastTime,
bookieAddressList);
}
Set<BookieId> bookieIdSet = new HashSet<>(bookieAddressList);
bookieIdSet.addAll(bookieAddressListLastTime);
bookieAddressListLastTime = bookieAddressList;
if (rackawarePolicy != null) {
rackawarePolicy.onBookieRackChange(new ArrayList<>(bookieIdSet));
}
}
});
}
@Override
public void registerRackChangeListener(ITopologyAwareEnsemblePlacementPolicy<BookieNode> rackawarePolicy) {
this.rackawarePolicy = rackawarePolicy;
}
}