blob: 07512d16fd60b0585fa09992e9017b46a3466dca [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ThreadFactory;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
/**
* A cache of meta region location metadata. Registers a listener on ZK to track changes to the
* meta table znodes. Clients are expected to retry if the meta information is stale. This class
* is thread-safe (a single instance of this class can be shared by multiple threads without race
* conditions).
*/
@InterfaceAudience.Private
public class MetaRegionLocationCache extends ZKListener {
private static final Logger LOG = LoggerFactory.getLogger(MetaRegionLocationCache.class);
/**
* Maximum number of times we retry when ZK operation times out.
*/
private static final int MAX_ZK_META_FETCH_RETRIES = 10;
/**
* Sleep interval ms between ZK operation retries.
*/
private static final int SLEEP_INTERVAL_MS_BETWEEN_RETRIES = 1000;
private static final int SLEEP_INTERVAL_MS_MAX = 10000;
private final RetryCounterFactory retryCounterFactory =
new RetryCounterFactory(MAX_ZK_META_FETCH_RETRIES, SLEEP_INTERVAL_MS_BETWEEN_RETRIES);
/**
* Cached meta region locations indexed by replica ID.
* CopyOnWriteArrayMap ensures synchronization during updates and a consistent snapshot during
* client requests. Even though CopyOnWriteArrayMap copies the data structure for every write,
* that should be OK since the size of the list is often small and mutations are not too often
* and we do not need to block client requests while mutations are in progress.
*/
private final CopyOnWriteArrayMap<Integer, HRegionLocation> cachedMetaLocations;
private enum ZNodeOpType {
INIT,
CREATED,
CHANGED,
DELETED
}
public MetaRegionLocationCache(ZKWatcher zkWatcher) {
super(zkWatcher);
cachedMetaLocations = new CopyOnWriteArrayMap<>();
watcher.registerListener(this);
// Populate the initial snapshot of data from meta znodes.
// This is needed because stand-by masters can potentially start after the initial znode
// creation. It blocks forever until the initial meta locations are loaded from ZK and watchers
// are established. Subsequent updates are handled by the registered listener. Also, this runs
// in a separate thread in the background to not block master init.
ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build();
RetryCounterFactory retryFactory = new RetryCounterFactory(
Integer.MAX_VALUE, SLEEP_INTERVAL_MS_BETWEEN_RETRIES, SLEEP_INTERVAL_MS_MAX);
threadFactory.newThread(
()->loadMetaLocationsFromZk(retryFactory.create(), ZNodeOpType.INIT)).start();
}
/**
* Populates the current snapshot of meta locations from ZK. If no meta znodes exist, it registers
* a watcher on base znode to check for any CREATE/DELETE events on the children.
* @param retryCounter controls the number of retries and sleep between retries.
*/
private void loadMetaLocationsFromZk(RetryCounter retryCounter, ZNodeOpType opType) {
List<String> znodes = null;
while (retryCounter.shouldRetry()) {
try {
znodes = watcher.getMetaReplicaNodesAndWatchChildren();
break;
} catch (KeeperException ke) {
LOG.debug("Error populating initial meta locations", ke);
if (!retryCounter.shouldRetry()) {
// Retries exhausted and watchers not set. This is not a desirable state since the cache
// could remain stale forever. Propagate the exception.
watcher.abort("Error populating meta locations", ke);
return;
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException ie) {
LOG.error("Interrupted while loading meta locations from ZK", ie);
Thread.currentThread().interrupt();
return;
}
}
}
if (znodes == null || znodes.isEmpty()) {
// No meta znodes exist at this point but we registered a watcher on the base znode to listen
// for updates. They will be handled via nodeChildrenChanged().
return;
}
if (znodes.size() == cachedMetaLocations.size()) {
// No new meta znodes got added.
return;
}
for (String znode: znodes) {
String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode);
updateMetaLocation(path, opType);
}
}
/**
* Gets the HRegionLocation for a given meta replica ID. Renews the watch on the znode for
* future updates.
* @param replicaId ReplicaID of the region.
* @return HRegionLocation for the meta replica.
* @throws KeeperException if there is any issue fetching/parsing the serialized data.
*/
private HRegionLocation getMetaRegionLocation(int replicaId)
throws KeeperException {
RegionState metaRegionState;
try {
byte[] data = ZKUtil.getDataAndWatch(watcher,
watcher.getZNodePaths().getZNodeForReplica(replicaId));
metaRegionState = ProtobufUtil.parseMetaRegionStateFrom(data, replicaId);
} catch (DeserializationException e) {
throw ZKUtil.convert(e);
}
return new HRegionLocation(metaRegionState.getRegion(), metaRegionState.getServerName());
}
private void updateMetaLocation(String path, ZNodeOpType opType) {
if (!isValidMetaPath(path)) {
return;
}
LOG.debug("Updating meta znode for path {}: {}", path, opType.name());
int replicaId = watcher.getZNodePaths().getMetaReplicaIdFromPath(path);
RetryCounter retryCounter = retryCounterFactory.create();
HRegionLocation location = null;
while (retryCounter.shouldRetry()) {
try {
if (opType == ZNodeOpType.DELETED) {
if (!ZKUtil.watchAndCheckExists(watcher, path)) {
// The path does not exist, we've set the watcher and we can break for now.
break;
}
// If it is a transient error and the node appears right away, we fetch the
// latest meta state.
}
location = getMetaRegionLocation(replicaId);
break;
} catch (KeeperException e) {
LOG.debug("Error getting meta location for path {}", path, e);
if (!retryCounter.shouldRetry()) {
LOG.warn("Error getting meta location for path {}. Retries exhausted.", path, e);
break;
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
return;
}
}
}
if (location == null) {
cachedMetaLocations.remove(replicaId);
return;
}
cachedMetaLocations.put(replicaId, location);
}
/**
* @return Optional list of HRegionLocations for meta replica(s), null if the cache is empty.
*
*/
public Optional<List<HRegionLocation>> getMetaRegionLocations() {
ConcurrentNavigableMap<Integer, HRegionLocation> snapshot =
cachedMetaLocations.tailMap(cachedMetaLocations.firstKey());
if (snapshot.isEmpty()) {
// This could be possible if the master has not successfully initialized yet or meta region
// is stuck in some weird state.
return Optional.empty();
}
List<HRegionLocation> result = new ArrayList<>();
// Explicitly iterate instead of new ArrayList<>(snapshot.values()) because the underlying
// ArrayValueCollection does not implement toArray().
snapshot.values().forEach(location -> result.add(location));
return Optional.of(result);
}
/**
* Helper to check if the given 'path' corresponds to a meta znode. This listener is only
* interested in changes to meta znodes.
*/
private boolean isValidMetaPath(String path) {
return watcher.getZNodePaths().isMetaZNodePath(path);
}
@Override
public void nodeCreated(String path) {
updateMetaLocation(path, ZNodeOpType.CREATED);
}
@Override
public void nodeDeleted(String path) {
updateMetaLocation(path, ZNodeOpType.DELETED);
}
@Override
public void nodeDataChanged(String path) {
updateMetaLocation(path, ZNodeOpType.CHANGED);
}
@Override
public void nodeChildrenChanged(String path) {
if (!path.equals(watcher.getZNodePaths().baseZNode)) {
return;
}
loadMetaLocationsFromZk(retryCounterFactory.create(), ZNodeOpType.CHANGED);
}
}