| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hbase.client; |
| |
| import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow; |
| import static org.apache.hadoop.hbase.util.Bytes.BYTES_COMPARATOR; |
| import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ConcurrentNavigableMap; |
| import java.util.concurrent.ConcurrentSkipListMap; |
| import java.util.concurrent.ThreadLocalRandom; |
| import java.util.function.IntSupplier; |
| import org.apache.commons.lang3.builder.ToStringBuilder; |
| import org.apache.commons.lang3.builder.ToStringStyle; |
| import org.apache.hadoop.hbase.HRegionLocation; |
| import org.apache.hadoop.hbase.ScheduledChore; |
| import org.apache.hadoop.hbase.Stoppable; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.util.Bytes; |
| import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; |
| /** |
| * <p>CatalogReplicaLoadBalanceReplicaSimpleSelector implements a simple catalog replica load |
| * balancing algorithm. It maintains a stale location cache for each table. Whenever client looks |
| * up location, it first check if the row is the stale location cache. If yes, the location from |
| * catalog replica is stale, it will go to the primary region to look up update-to-date location; |
| * otherwise, it will randomly pick up a replica region for lookup. When clients receive |
| * RegionNotServedException from region servers, it will add these region locations to the stale |
| * location cache. The stale cache will be cleaned up periodically by a chore.</p> |
| * |
| * It follows a simple algorithm to choose a replica to go: |
| * |
| * <ol> |
| * <li>If there is no stale location entry for rows it looks up, it will randomly |
| * pick a replica region to do lookup. </li> |
| * <li>If the location from the replica region is stale, client gets RegionNotServedException |
| * from region server, in this case, it will create StaleLocationCacheEntry in |
| * CatalogReplicaLoadBalanceReplicaSimpleSelector.</li> |
| * <li>When client tries to do location lookup, it checks StaleLocationCache first for rows it |
| * tries to lookup, if entry exists, it will go with primary meta region to do lookup; |
| * otherwise, it will follow step 1.</li> |
| * <li>A chore will periodically run to clean up cache entries in the StaleLocationCache.</li> |
| * </ol> |
| */ |
| class CatalogReplicaLoadBalanceSimpleSelector implements |
| CatalogReplicaLoadBalanceSelector, Stoppable { |
| private static final Logger LOG = |
| LoggerFactory.getLogger(CatalogReplicaLoadBalanceSimpleSelector.class); |
| private final long STALE_CACHE_TIMEOUT_IN_MILLISECONDS = 3000; // 3 seconds |
| private final int STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS = 1500; // 1.5 seconds |
| private final int REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS = 60000; // 1 minute |
| |
| /** |
| * StaleLocationCacheEntry is the entry when a stale location is reported by an client. |
| */ |
| private static final class StaleLocationCacheEntry { |
| // timestamp in milliseconds |
| private final long timestamp; |
| |
| private final byte[] endKey; |
| |
| StaleLocationCacheEntry(final byte[] endKey) { |
| this.endKey = endKey; |
| timestamp = EnvironmentEdgeManager.currentTime(); |
| } |
| |
| public byte[] getEndKey() { |
| return this.endKey; |
| } |
| |
| public long getTimestamp() { |
| return this.timestamp; |
| } |
| |
| @Override |
| public String toString() { |
| return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE) |
| .append("endKey", endKey) |
| .append("timestamp", timestamp) |
| .toString(); |
| } |
| } |
| |
| private final ConcurrentMap<TableName, ConcurrentNavigableMap<byte[], StaleLocationCacheEntry>> |
| staleCache = new ConcurrentHashMap<>(); |
| private volatile int numOfReplicas; |
| private final AsyncConnectionImpl conn; |
| private final TableName tableName; |
| private final IntSupplier getNumOfReplicas; |
| private volatile boolean isStopped = false; |
| private final static int UNINITIALIZED_NUM_OF_REPLICAS = -1; |
| |
| CatalogReplicaLoadBalanceSimpleSelector(TableName tableName, AsyncConnectionImpl conn, |
| IntSupplier getNumOfReplicas) { |
| this.conn = conn; |
| this.tableName = tableName; |
| this.getNumOfReplicas = getNumOfReplicas; |
| |
| // This numOfReplicas is going to be lazy initialized. |
| this.numOfReplicas = UNINITIALIZED_NUM_OF_REPLICAS; |
| // Start chores |
| this.conn.getChoreService().scheduleChore(getCacheCleanupChore(this)); |
| this.conn.getChoreService().scheduleChore(getRefreshReplicaCountChore(this)); |
| } |
| |
| /** |
| * When a client runs into RegionNotServingException, it will call this method to |
| * update Selector's internal state. |
| * @param loc the location which causes exception. |
| */ |
| public void onError(HRegionLocation loc) { |
| ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache = |
| computeIfAbsent(staleCache, loc.getRegion().getTable(), |
| () -> new ConcurrentSkipListMap<>(BYTES_COMPARATOR)); |
| byte[] startKey = loc.getRegion().getStartKey(); |
| tableCache.putIfAbsent(startKey, |
| new StaleLocationCacheEntry(loc.getRegion().getEndKey())); |
| LOG.debug("Add entry to stale cache for table {} with startKey {}, {}", |
| loc.getRegion().getTable(), startKey, loc.getRegion().getEndKey()); |
| } |
| |
| /** |
| * Select an random replica id. In case there is no replica region configured, return |
| * the primary replica id. |
| * @return Replica id |
| */ |
| private int getRandomReplicaId() { |
| int cachedNumOfReplicas = this.numOfReplicas; |
| if (cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS) { |
| cachedNumOfReplicas = refreshCatalogReplicaCount(); |
| this.numOfReplicas = cachedNumOfReplicas; |
| } |
| // In case of no replica configured, return the primary region id. |
| if (cachedNumOfReplicas <= 1) { |
| return RegionInfo.DEFAULT_REPLICA_ID; |
| } |
| return 1 + ThreadLocalRandom.current().nextInt(cachedNumOfReplicas - 1); |
| } |
| |
| /** |
| * When it looks up a location, it will call this method to find a replica region to go. |
| * For a normal case, > 99% of region locations from catalog/meta replica will be up to date. |
| * In extreme cases such as region server crashes, it will depends on how fast replication |
| * catches up. |
| * |
| * @param tablename table name it looks up |
| * @param row key it looks up. |
| * @param locateType locateType, Only BEFORE and CURRENT will be passed in. |
| * @return catalog replica id |
| */ |
| public int select(final TableName tablename, final byte[] row, |
| final RegionLocateType locateType) { |
| Preconditions.checkArgument(locateType == RegionLocateType.BEFORE || |
| locateType == RegionLocateType.CURRENT, |
| "Expected type BEFORE or CURRENT but got: %s", locateType); |
| |
| ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache = staleCache.get(tablename); |
| |
| // If there is no entry in StaleCache, select a random replica id. |
| if (tableCache == null) { |
| return getRandomReplicaId(); |
| } |
| |
| Map.Entry<byte[], StaleLocationCacheEntry> entry; |
| boolean isEmptyStopRow = isEmptyStopRow(row); |
| // Only BEFORE and CURRENT are passed in. |
| if (locateType == RegionLocateType.BEFORE) { |
| entry = isEmptyStopRow ? tableCache.lastEntry() : tableCache.lowerEntry(row); |
| } else { |
| entry = tableCache.floorEntry(row); |
| } |
| |
| // It is not in the stale cache, return a random replica id. |
| if (entry == null) { |
| return getRandomReplicaId(); |
| } |
| |
| // The entry here is a possible match for the location. Check if the entry times out first as |
| // long comparing is faster than comparing byte arrays(in most cases). It could remove |
| // stale entries faster. If the possible match entry does not time out, it will check if |
| // the entry is a match for the row passed in and select the replica id accordingly. |
| if ((EnvironmentEdgeManager.currentTime() - entry.getValue().getTimestamp()) >= |
| STALE_CACHE_TIMEOUT_IN_MILLISECONDS) { |
| LOG.debug("Entry for table {} with startKey {}, {} times out", tablename, entry.getKey(), |
| entry); |
| tableCache.remove(entry.getKey()); |
| return getRandomReplicaId(); |
| } |
| |
| byte[] endKey = entry.getValue().getEndKey(); |
| |
| // The following logic is borrowed from AsyncNonMetaRegionLocator. |
| if (isEmptyStopRow(endKey)) { |
| LOG.debug("Lookup {} goes to primary region", row); |
| return RegionInfo.DEFAULT_REPLICA_ID; |
| } |
| |
| if (locateType == RegionLocateType.BEFORE) { |
| if (!isEmptyStopRow && Bytes.compareTo(endKey, row) >= 0) { |
| LOG.debug("Lookup {} goes to primary meta", row); |
| return RegionInfo.DEFAULT_REPLICA_ID; |
| } |
| } else { |
| if (Bytes.compareTo(row, endKey) < 0) { |
| LOG.debug("Lookup {} goes to primary meta", row); |
| return RegionInfo.DEFAULT_REPLICA_ID; |
| } |
| } |
| |
| // Not in stale cache, return a random replica id. |
| return getRandomReplicaId(); |
| } |
| |
| // This class implements the Stoppable interface as chores needs a Stopable object, there is |
| // no-op on this Stoppable object currently. |
| @Override |
| public void stop(String why) { |
| isStopped = true; |
| } |
| |
| @Override |
| public boolean isStopped() { |
| return isStopped; |
| } |
| |
| private void cleanupReplicaReplicaStaleCache() { |
| long curTimeInMills = EnvironmentEdgeManager.currentTime(); |
| for (ConcurrentNavigableMap<byte[], StaleLocationCacheEntry> tableCache : staleCache.values()) { |
| Iterator<Map.Entry<byte[], StaleLocationCacheEntry>> it = |
| tableCache.entrySet().iterator(); |
| while (it.hasNext()) { |
| Map.Entry<byte[], StaleLocationCacheEntry> entry = it.next(); |
| if (curTimeInMills - entry.getValue().getTimestamp() >= |
| STALE_CACHE_TIMEOUT_IN_MILLISECONDS) { |
| LOG.debug("clean entry {}, {} from stale cache", entry.getKey(), entry.getValue()); |
| it.remove(); |
| } |
| } |
| } |
| } |
| |
| private int refreshCatalogReplicaCount() { |
| int newNumOfReplicas = this.getNumOfReplicas.getAsInt(); |
| LOG.debug("Refreshed replica count {}", newNumOfReplicas); |
| if (newNumOfReplicas == 1) { |
| LOG.warn("Table {}'s region replica count is 1, maybe a misconfiguration or failure to " |
| + "fetch the replica count", tableName); |
| } |
| int cachedNumOfReplicas = this.numOfReplicas; |
| |
| // If the returned number of replicas is 1, it is mostly caused by failure to fetch the |
| // replica count. Do not update the numOfReplicas in this case. |
| if ((cachedNumOfReplicas == UNINITIALIZED_NUM_OF_REPLICAS) || |
| ((cachedNumOfReplicas != newNumOfReplicas) && (newNumOfReplicas != 1))) { |
| this.numOfReplicas = newNumOfReplicas; |
| } |
| return newNumOfReplicas; |
| } |
| |
| private ScheduledChore getCacheCleanupChore( |
| final CatalogReplicaLoadBalanceSimpleSelector selector) { |
| return new ScheduledChore("CleanupCatalogReplicaStaleCache", this, |
| STALE_CACHE_CLEAN_CHORE_INTERVAL_IN_MILLISECONDS) { |
| @Override |
| protected void chore() { |
| selector.cleanupReplicaReplicaStaleCache(); |
| } |
| }; |
| } |
| |
| private ScheduledChore getRefreshReplicaCountChore( |
| final CatalogReplicaLoadBalanceSimpleSelector selector) { |
| return new ScheduledChore("RefreshReplicaCountChore", this, |
| REFRESH_REPLICA_COUNT_CHORE_INTERVAL_IN_MILLISECONDS) { |
| @Override |
| protected void chore() { |
| selector.refreshCatalogReplicaCount(); |
| } |
| }; |
| } |
| } |