blob: bc8264050149d8455f9aba82424a19fa962eef9a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.IntSupplier;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
/**
* <p>CatalogReplicaLoadBalanceReplicaSimpleSelector implements a simple catalog replica load
* balancing algorithm. It maintains a stale location cache for each table. Whenever client looks
* up location, it first check if the row is the stale location cache. If yes, the location from
* catalog replica is stale, it will go to the primary region to look up update-to-date location;
* otherwise, it will randomly pick up a replica region for lookup. When clients receive
* RegionNotServedException from region servers, it will add these region locations to the stale
* location cache. The stale cache will be cleaned up periodically by a chore.</p>
*
* It follows a simple algorithm to choose a replica to go:
*
* <ol>
* <li>If there is no stale location entry for rows it looks up, it will randomly
* pick a replica region to do lookup. </li>
* <li>If the location from the replica region is stale, client gets RegionNotServedException
* from region server, in this case, it will create StaleLocationCacheEntry in
* CatalogReplicaLoadBalanceReplicaSimpleSelector.</li>
* <li>When client tries to do location lookup, it checks StaleLocationCache first for rows it
* tries to lookup, if entry exists, it will go with primary meta region to do lookup;
* otherwise, it will follow step 1.</li>
* <li>A chore will periodically run to clean up cache entries in the StaleLocationCache.</li>
* </ol>
*/
class CatalogReplicaLoadBalanceSimpleSelector implements
CatalogReplicaLoadBalanceSelector, Stoppable {
private static final Logger LOG =
LoggerFactory.getLogger(CatalogReplicaLoadBalanceSimpleSelector.class);
private final long STALE_CACHE_TIMEOUT_IN_MILLISECONDS = 3000; // 3 seconds
private final int STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS = 1500; // 1.5 seconds
private final int REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS = 60000; // 1 minute
/**
* StaleLocationCacheEntry is the entry when a stale location is reported by an client.
*/
private static final class StaleLocationCacheEntry {
// timestamp in milliseconds
private final long timestamp;
private final byte[] endKey;
StaleLocationCacheEntry(final byte[] endKey) {
this.endKey = endKey;
timestamp = EnvironmentEdgeManager.currentTime();
}
public byte[] getEndKey() {
return this.endKey;
}
public long getTimestamp() {
return this.timestamp;
}
@Override
public String toString() {
return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
.append("endKey", endKey)
.append("timestamp", timestamp)
.toString();
}
}
private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], StaleLocationCacheEntry>>
staleCache = new ConcurrentHashMap<>();
private volatile int numOfReplicas;
private final AsyncConnectionImpl conn;
private final TableName tableName;
private final IntSupplier getNumOfReplicas;
private volatile boolean isStopped = false;
private final static int UNINITIALIZED_NUM_OF_REPLICAS = -1;
CatalogReplicaLoadBalanceSimpleSelector(TableName tableName, AsyncConnectionImpl conn,
IntSupplier getNumOfReplicas) {
this.conn = conn;
this.tableName = tableName;
this.getNumOfReplicas = getNumOfReplicas;
// This numOfReplicas is going to be lazy initialized.
this.numOfReplicas = UNINITIALIZED_NUM_OF_REPLICAS;
// Start chores
this.conn.getChoreService().scheduleChore(getCacheCleanupChore(this));
this.conn.getChoreService().scheduleChore(getRefreshReplicaCountChore(this));
}
/**
* When a client runs into RegionNotServingException, it will call this method to
* update Selector's internal state.
* @param loc the location which causes exception.
*/
public void onError(HRegionLocation loc) {
ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache =
computeIfAbsent(staleCache, loc.getRegion().getTable(),
() -> new ConcurrentSkipListMap<>(BYTES_COMPARATOR));
byte[] startKey = loc.getRegion().getStartKey();
tableCache.putIfAbsent(startKey,
new StaleLocationCacheEntry(loc.getRegion().getEndKey()));
LOG.debug("Add entry to stale cache for table {} with startKey {}, {}",
loc.getRegion().getTable(), startKey, loc.getRegion().getEndKey());
}
/**
* Select an random replica id. In case there is no replica region configured, return
* the primary replica id.
* @return Replica id
*/
private int getRandomReplicaId() {
int cachedNumOfReplicas = this.numOfReplicas;
if (cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS) {
cachedNumOfReplicas = refreshCatalogReplicaCount();
this.numOfReplicas = cachedNumOfReplicas;
}
// In case of no replica configured, return the primary region id.
if (cachedNumOfReplicas <= 1) {
return RegionInfo.DEFAULT_REPLICA_ID;
}
return 1 + ThreadLocalRandom.current().nextInt(cachedNumOfReplicas - 1);
}
/**
* When it looks up a location, it will call this method to find a replica region to go.
* For a normal case, > 99% of region locations from catalog/meta replica will be up to date.
* In extreme cases such as region server crashes, it will depends on how fast replication
* catches up.
*
* @param tablename table name it looks up
* @param row key it looks up.
* @param locateType locateType, Only BEFORE and CURRENT will be passed in.
* @return catalog replica id
*/
public int select(final TableName tablename, final byte[] row,
final RegionLocateType locateType) {
Preconditions.checkArgument(locateType == RegionLocateType.BEFORE ||
locateType == RegionLocateType.CURRENT,
"Expected type BEFORE or CURRENT but got: %s", locateType);
ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache = staleCache.get(tablename);
// If there is no entry in StaleCache, select a random replica id.
if (tableCache == null) {
return getRandomReplicaId();
}
Map.Entry<byte[], StaleLocationCacheEntry> entry;
boolean isEmptyStopRow = isEmptyStopRow(row);
// Only BEFORE and CURRENT are passed in.
if (locateType == RegionLocateType.BEFORE) {
entry = isEmptyStopRow ? tableCache.lastEntry() : tableCache.lowerEntry(row);
} else {
entry = tableCache.floorEntry(row);
}
// It is not in the stale cache, return a random replica id.
if (entry == null) {
return getRandomReplicaId();
}
// The entry here is a possible match for the location. Check if the entry times out first as
// long comparing is faster than comparing byte arrays(in most cases). It could remove
// stale entries faster. If the possible match entry does not time out, it will check if
// the entry is a match for the row passed in and select the replica id accordingly.
if ((EnvironmentEdgeManager.currentTime() - entry.getValue().getTimestamp()) >=
STALE_CACHE_TIMEOUT_IN_MILLISECONDS) {
LOG.debug("Entry for table {} with startKey {}, {} times out", tablename, entry.getKey(),
entry);
tableCache.remove(entry.getKey());
return getRandomReplicaId();
}
byte[] endKey = entry.getValue().getEndKey();
// The following logic is borrowed from AsyncNonMetaRegionLocator.
if (isEmptyStopRow(endKey)) {
LOG.debug("Lookup {} goes to primary region", row);
return RegionInfo.DEFAULT_REPLICA_ID;
}
if (locateType == RegionLocateType.BEFORE) {
if (!isEmptyStopRow && Bytes.compareTo(endKey, row) >= 0) {
LOG.debug("Lookup {} goes to primary meta", row);
return RegionInfo.DEFAULT_REPLICA_ID;
}
} else {
if (Bytes.compareTo(row, endKey) < 0) {
LOG.debug("Lookup {} goes to primary meta", row);
return RegionInfo.DEFAULT_REPLICA_ID;
}
}
// Not in stale cache, return a random replica id.
return getRandomReplicaId();
}
// This class implements the Stoppable interface as chores needs a Stopable object, there is
// no-op on this Stoppable object currently.
@Override
public void stop(String why) {
isStopped = true;
}
@Override
public boolean isStopped() {
return isStopped;
}
private void cleanupReplicaReplicaStaleCache() {
long curTimeInMills = EnvironmentEdgeManager.currentTime();
for (ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache : staleCache.values()) {
Iterator<Map.Entry<byte[], StaleLocationCacheEntry>> it =
tableCache.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<byte[], StaleLocationCacheEntry> entry = it.next();
if (curTimeInMills - entry.getValue().getTimestamp() >=
STALE_CACHE_TIMEOUT_IN_MILLISECONDS) {
LOG.debug("clean entry {}, {} from stale cache", entry.getKey(), entry.getValue());
it.remove();
}
}
}
}
private int refreshCatalogReplicaCount() {
int newNumOfReplicas = this.getNumOfReplicas.getAsInt();
LOG.debug("Refreshed replica count {}", newNumOfReplicas);
if (newNumOfReplicas == 1) {
LOG.warn("Table {}'s region replica count is 1, maybe a misconfiguration or failure to "
+ "fetch the replica count", tableName);
}
int cachedNumOfReplicas = this.numOfReplicas;
// If the returned number of replicas is 1, it is mostly caused by failure to fetch the
// replica count. Do not update the numOfReplicas in this case.
if ((cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS) ||
((cachedNumOfReplicas != newNumOfReplicas) && (newNumOfReplicas != 1))) {
this.numOfReplicas = newNumOfReplicas;
}
return newNumOfReplicas;
}
private ScheduledChore getCacheCleanupChore(
final CatalogReplicaLoadBalanceSimpleSelector selector) {
return new ScheduledChore("CleanupCatalogReplicaStaleCache", this,
STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS) {
@Override
protected void chore() {
selector.cleanupReplicaReplicaStaleCache();
}
};
}
private ScheduledChore getRefreshReplicaCountChore(
final CatalogReplicaLoadBalanceSimpleSelector selector) {
return new ScheduledChore("RefreshReplicaCountChore", this,
REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS) {
@Override
protected void chore() {
selector.refreshCatalogReplicaCount();
}
};
}
}