blob: 2ffe466851caecfade6e56b47035c82cdd0a4818 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.RegionStateListener;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.MasterSwitchType;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.FailedServerException;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
import org.apache.hadoop.hbase.master.normalizer.NormalizationPlan.PlanType;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.quotas.QuotaExceededException;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
/**
* Manages and performs region assignment.
* Related communications with regionserver are all done over RPC.
*/
@InterfaceAudience.Private
public class AssignmentManager {
private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
protected final MasterServices server;
private ServerManager serverManager;
private boolean shouldAssignRegionsWithFavoredNodes;
private LoadBalancer balancer;
private final MetricsAssignmentManager metricsAssignmentManager;
private final TableLockManager tableLockManager;
private AtomicInteger numRegionsOpened = new AtomicInteger(0);
final private KeyLocker<String> locker = new KeyLocker<String>();
Set<HRegionInfo> replicasToClose = Collections.synchronizedSet(new HashSet<HRegionInfo>());
/**
* Map of regions to reopen after the schema of a table is changed. Key -
* encoded region name, value - HRegionInfo
*/
private final Map <String, HRegionInfo> regionsToReopen;
/*
* Maximum times we recurse an assignment/unassignment.
* See below in {@link #assign()} and {@link #unassign()}.
*/
private final int maximumAttempts;
/**
* The sleep time for which the assignment will wait before retrying in case of
* hbase:meta assignment failure due to lack of availability of region plan or bad region plan
*/
private final long sleepTimeBeforeRetryingMetaAssignment;
/** Plans for region movement. Key is the encoded version of a region name*/
// TODO: When do plans get cleaned out? Ever? In server open and in server
// shutdown processing -- St.Ack
// All access to this Map must be synchronized.
final NavigableMap<String, RegionPlan> regionPlans =
new TreeMap<String, RegionPlan>();
private final TableStateManager tableStateManager;
private final ExecutorService executorService;
// Thread pool executor service. TODO, consolidate with executorService?
private java.util.concurrent.ExecutorService threadPoolExecutorService;
private final RegionStates regionStates;
// The threshold to use bulk assigning. Using bulk assignment
// only if assigning at least this many regions to at least this
// many servers. If assigning fewer regions to fewer servers,
// bulk assigning may be not as efficient.
private final int bulkAssignThresholdRegions;
private final int bulkAssignThresholdServers;
private final int bulkPerRegionOpenTimeGuesstimate;
// Should bulk assignment wait till all regions are assigned,
// or it is timed out? This is useful to measure bulk assignment
// performance, but not needed in most use cases.
private final boolean bulkAssignWaitTillAllAssigned;
/**
* Indicator that AssignmentManager has recovered the region states so
* that ServerShutdownHandler can be fully enabled and re-assign regions
* of dead servers. So that when re-assignment happens, AssignmentManager
* has proper region states.
*
* Protected to ease testing.
*/
protected final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
/**
* A map to track the count a region fails to open in a row.
* So that we don't try to open a region forever if the failure is
* unrecoverable. We don't put this information in region states
* because we don't expect this to happen frequently; we don't
* want to copy this information over during each state transition either.
*/
private final ConcurrentHashMap<String, AtomicInteger>
failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
// In case not using ZK for region assignment, region states
// are persisted in meta with a state store
private final RegionStateStore regionStateStore;
/**
* For testing only! Set to true to skip handling of split.
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="MS_SHOULD_BE_FINAL")
public static boolean TEST_SKIP_SPLIT_HANDLING = false;
/** Listeners that are called on assignment events. */
private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
private RegionStateListener regionStateListener;
/**
* Constructs a new assignment manager.
*
* @param server instance of HMaster this AM running inside
* @param serverManager serverManager for associated HMaster
* @param balancer implementation of {@link LoadBalancer}
* @param service Executor service
* @param metricsMaster metrics manager
* @param tableLockManager TableLock manager
* @throws IOException
*/
public AssignmentManager(MasterServices server, ServerManager serverManager,
final LoadBalancer balancer,
final ExecutorService service, MetricsMaster metricsMaster,
final TableLockManager tableLockManager,
final TableStateManager tableStateManager)
throws IOException {
this.server = server;
this.serverManager = serverManager;
this.executorService = service;
this.regionStateStore = new RegionStateStore(server);
this.regionsToReopen = Collections.synchronizedMap
(new HashMap<String, HRegionInfo> ());
Configuration conf = server.getConfiguration();
// Only read favored nodes if using the favored nodes load balancer.
this.shouldAssignRegionsWithFavoredNodes = conf.getClass(
HConstants.HBASE_MASTER_LOADBALANCER_CLASS, Object.class).equals(
FavoredNodeLoadBalancer.class);
this.tableStateManager = tableStateManager;
// This is the max attempts, not retries, so it should be at least 1.
this.maximumAttempts = Math.max(1,
this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10));
this.sleepTimeBeforeRetryingMetaAssignment = this.server.getConfiguration().getLong(
"hbase.meta.assignment.retry.sleeptime", 1000l);
this.balancer = balancer;
int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
this.regionStates = new RegionStates(
server, tableStateManager, serverManager, regionStateStore);
this.bulkAssignWaitTillAllAssigned =
conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
this.bulkPerRegionOpenTimeGuesstimate =
conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
this.metricsAssignmentManager = new MetricsAssignmentManager();
this.tableLockManager = tableLockManager;
}
MetricsAssignmentManager getAssignmentManagerMetrics() {
return this.metricsAssignmentManager;
}
/**
* Add the listener to the notification list.
* @param listener The AssignmentListener to register
*/
public void registerListener(final AssignmentListener listener) {
this.listeners.add(listener);
}
/**
* Remove the listener from the notification list.
* @param listener The AssignmentListener to unregister
*/
public boolean unregisterListener(final AssignmentListener listener) {
return this.listeners.remove(listener);
}
/**
* @return Instance of ZKTableStateManager.
*/
public TableStateManager getTableStateManager() {
// These are 'expensive' to make involving trip to zk ensemble so allow
// sharing.
return this.tableStateManager;
}
/**
* This SHOULD not be public. It is public now
* because of some unit tests.
*
* TODO: make it package private and keep RegionStates in the master package
*/
public RegionStates getRegionStates() {
return regionStates;
}
/**
* Used in some tests to mock up region state in meta
*/
@VisibleForTesting
RegionStateStore getRegionStateStore() {
return regionStateStore;
}
public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
}
/**
* Add a regionPlan for the specified region.
* @param encodedName
* @param plan
*/
public void addPlan(String encodedName, RegionPlan plan) {
synchronized (regionPlans) {
regionPlans.put(encodedName, plan);
}
}
/**
* Add a map of region plans.
*/
public void addPlans(Map<String, RegionPlan> plans) {
synchronized (regionPlans) {
regionPlans.putAll(plans);
}
}
/**
* Set the list of regions that will be reopened
* because of an update in table schema
*
* @param regions
* list of regions that should be tracked for reopen
*/
public void setRegionsToReopen(List <HRegionInfo> regions) {
for(HRegionInfo hri : regions) {
regionsToReopen.put(hri.getEncodedName(), hri);
}
}
/**
* Used by the client to identify if all regions have the schema updates
*
* @param tableName
* @return Pair indicating the status of the alter command
* @throws IOException
*/
public Pair<Integer, Integer> getReopenStatus(TableName tableName)
throws IOException {
List<HRegionInfo> hris;
if (TableName.META_TABLE_NAME.equals(tableName)) {
hris = new MetaTableLocator().getMetaRegions(server.getZooKeeper());
} else {
hris = MetaTableAccessor.getTableRegions(server.getConnection(), tableName, true);
}
Integer pending = 0;
for (HRegionInfo hri : hris) {
String name = hri.getEncodedName();
// no lock concurrent access ok: sequential consistency respected.
if (regionsToReopen.containsKey(name)
|| regionStates.isRegionInTransition(name)) {
pending++;
}
}
return new Pair<Integer, Integer>(pending, hris.size());
}
/**
* Used by ServerShutdownHandler to make sure AssignmentManager has completed
* the failover cleanup before re-assigning regions of dead servers. So that
* when re-assignment happens, AssignmentManager has proper region states.
*/
public boolean isFailoverCleanupDone() {
return failoverCleanupDone.get();
}
/**
* To avoid racing with AM, external entities may need to lock a region,
* for example, when SSH checks what regions to skip re-assigning.
*/
public Lock acquireRegionLock(final String encodedName) {
return locker.acquireLock(encodedName);
}
/**
* Now, failover cleanup is completed. Notify server manager to
* process queued up dead servers processing, if any.
*/
void failoverCleanupDone() {
failoverCleanupDone.set(true);
serverManager.processQueuedDeadServers();
}
/**
* Called on startup.
* Figures whether a fresh cluster start of we are joining extant running cluster.
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
* @throws CoordinatedStateException
*/
void joinCluster()
throws IOException, KeeperException, InterruptedException, CoordinatedStateException {
long startTime = System.currentTimeMillis();
// Concurrency note: In the below the accesses on regionsInTransition are
// outside of a synchronization block where usually all accesses to RIT are
// synchronized. The presumption is that in this case it is safe since this
// method is being played by a single thread on startup.
// TODO: Regions that have a null location and are not in regionsInTransitions
// need to be handled.
// Scan hbase:meta to build list of existing regions, servers, and assignment
// Returns servers who have not checked in (assumed dead) that some regions
// were assigned to (according to the meta)
Set<ServerName> deadServers = rebuildUserRegions();
// This method will assign all user regions if a clean server startup or
// it will reconstruct master state and cleanup any leftovers from previous master process.
boolean failover = processDeadServersAndRegionsInTransition(deadServers);
LOG.info("Joined the cluster in " + (System.currentTimeMillis()
- startTime) + "ms, failover=" + failover);
}
/**
* Process all regions that are in transition in zookeeper and also
* processes the list of dead servers.
* Used by master joining an cluster. If we figure this is a clean cluster
* startup, will assign all user regions.
* @param deadServers Set of servers that are offline probably legitimately that were carrying
* regions according to a scan of hbase:meta. Can be null.
* @throws IOException
* @throws InterruptedException
*/
boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
// TODO Needed? List<String> nodes = ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
boolean failover = !serverManager.getDeadServers().isEmpty();
if (failover) {
// This may not be a failover actually, especially if meta is on this master.
if (LOG.isDebugEnabled()) {
LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
}
// Check if there are any regions on these servers
failover = false;
for (ServerName serverName : serverManager.getDeadServers().copyServerNames()) {
if (regionStates.getRegionAssignments().values().contains(serverName)) {
LOG.debug("Found regions on dead server: " + serverName);
failover = true;
break;
}
}
}
Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
if (!failover) {
// If any one region except meta is assigned, it's a failover.
for (Map.Entry<HRegionInfo, ServerName> en:
regionStates.getRegionAssignments().entrySet()) {
HRegionInfo hri = en.getKey();
if (!hri.isMetaTable()
&& onlineServers.contains(en.getValue())) {
LOG.debug("Found region " + hri + " out on cluster");
failover = true;
break;
}
}
}
if (!failover) {
// If any region except meta is in transition on a live server, it's a failover.
Set<RegionState> regionsInTransition = regionStates.getRegionsInTransition();
if (!regionsInTransition.isEmpty()) {
for (RegionState regionState: regionsInTransition) {
ServerName serverName = regionState.getServerName();
if (!regionState.getRegion().isMetaRegion()
&& serverName != null && onlineServers.contains(serverName)) {
LOG.debug("Found " + regionState + " for region " +
regionState.getRegion().getRegionNameAsString() + " for server " +
serverName + "in RITs");
failover = true;
break;
}
}
}
}
if (!failover) {
// If we get here, we have a full cluster restart. It is a failover only
// if there are some WALs are not split yet. For meta WALs, they should have
// been split already, if any. We can walk through those queued dead servers,
// if they don't have any WALs, this restart should be considered as a clean one
Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
if (!queuedDeadServers.isEmpty()) {
Configuration conf = server.getConfiguration();
Path rootdir = FSUtils.getRootDir(conf);
FileSystem fs = rootdir.getFileSystem(conf);
for (ServerName serverName: queuedDeadServers) {
// In the case of a clean exit, the shutdown handler would have presplit any WALs and
// removed empty directories.
Path logDir = new Path(rootdir,
AbstractFSWALProvider.getWALDirectoryName(serverName.toString()));
Path splitDir = logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT);
if (checkWals(fs, logDir) || checkWals(fs, splitDir)) {
LOG.debug("Found queued dead server " + serverName);
failover = true;
break;
}
}
if (!failover) {
// We figured that it's not a failover, so no need to
// work on these re-queued dead servers any more.
LOG.info("AM figured that it's not a failover and cleaned up "
+ queuedDeadServers.size() + " queued dead servers");
serverManager.removeRequeuedDeadServers();
}
}
}
Set<TableName> disabledOrDisablingOrEnabling = null;
Map<HRegionInfo, ServerName> allRegions = null;
if (!failover) {
disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
TableState.State.DISABLED, TableState.State.DISABLING,
TableState.State.ENABLING);
// Clean re/start, mark all user regions closed before reassignment
allRegions = regionStates.closeAllUserRegions(
disabledOrDisablingOrEnabling);
}
// Now region states are restored
regionStateStore.start();
if (failover) {
if (deadServers != null && !deadServers.isEmpty()) {
for (ServerName serverName: deadServers) {
if (!serverManager.isServerDead(serverName)) {
serverManager.expireServer(serverName); // Let SSH do region re-assign
}
}
}
processRegionsInTransition(regionStates.getRegionsInTransition());
}
// Now we can safely claim failover cleanup completed and enable
// ServerShutdownHandler for further processing. The nodes (below)
// in transition, if any, are for regions not related to those
// dead servers at all, and can be done in parallel to SSH.
failoverCleanupDone();
if (!failover) {
// Fresh cluster startup.
LOG.info("Clean cluster startup. Don't reassign user regions");
assignAllUserRegions(allRegions);
} else {
LOG.info("Failover! Reassign user regions");
}
// unassign replicas of the split parents and the merged regions
// the daughter replicas are opened in assignAllUserRegions if it was
// not already opened.
for (HRegionInfo h : replicasToClose) {
unassign(h);
}
replicasToClose.clear();
return failover;
}
private boolean checkWals(FileSystem fs, Path dir) throws IOException {
if (!fs.exists(dir)) {
LOG.debug(dir + " doesn't exist");
return false;
}
if (!fs.getFileStatus(dir).isDirectory()) {
LOG.warn(dir + " is not a directory");
return false;
}
FileStatus[] files = FSUtils.listStatus(fs, dir);
if (files == null || files.length == 0) {
LOG.debug(dir + " has no files");
return false;
}
for (int i = 0; i < files.length; i++) {
if (files[i].isFile() && files[i].getLen() > 0) {
LOG.debug(dir + " has a non-empty file: " + files[i].getPath());
return true;
} else if (files[i].isDirectory() && checkWals(fs, dir)) {
LOG.debug(dir + " is a directory and has a non-empty file: " + files[i].getPath());
return true;
}
}
LOG.debug("Found 0 non-empty wal files for :" + dir);
return false;
}
/**
* When a region is closed, it should be removed from the regionsToReopen
* @param hri HRegionInfo of the region which was closed
*/
public void removeClosedRegion(HRegionInfo hri) {
if (regionsToReopen.remove(hri.getEncodedName()) != null) {
LOG.debug("Removed region from reopening regions because it was closed");
}
}
// TODO: processFavoredNodes might throw an exception, for e.g., if the
// meta could not be contacted/updated. We need to see how seriously to treat
// this problem as. Should we fail the current assignment. We should be able
// to recover from this problem eventually (if the meta couldn't be updated
// things should work normally and eventually get fixed up).
void processFavoredNodes(List<HRegionInfo> regions) throws IOException {
if (!shouldAssignRegionsWithFavoredNodes) return;
// The AM gets the favored nodes info for each region and updates the meta
// table with that info
Map<HRegionInfo, List<ServerName>> regionToFavoredNodes =
new HashMap<HRegionInfo, List<ServerName>>();
for (HRegionInfo region : regions) {
regionToFavoredNodes.put(region,
((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region));
}
FavoredNodeAssignmentHelper.updateMetaWithFavoredNodesInfo(regionToFavoredNodes,
this.server.getConnection());
}
/**
* Marks the region as online. Removes it from regions in transition and
* updates the in-memory assignment information.
* <p>
* Used when a region has been successfully opened on a region server.
* @param regionInfo
* @param sn
*/
void regionOnline(HRegionInfo regionInfo, ServerName sn) {
regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
}
void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
numRegionsOpened.incrementAndGet();
regionStates.regionOnline(regionInfo, sn, openSeqNum);
// Remove plan if one.
clearRegionPlan(regionInfo);
balancer.regionOnline(regionInfo, sn);
// Tell our listeners that a region was opened
sendRegionOpenedNotification(regionInfo, sn);
}
/**
* Marks the region as offline. Removes it from regions in transition and
* removes in-memory assignment information.
* <p>
* Used when a region has been closed and should remain closed.
* @param regionInfo
*/
public void regionOffline(final HRegionInfo regionInfo) {
regionOffline(regionInfo, null);
}
public void offlineDisabledRegion(HRegionInfo regionInfo) {
replicasToClose.remove(regionInfo);
regionOffline(regionInfo);
}
// Assignment methods
/**
* Assigns the specified region.
* <p>
* If a RegionPlan is available with a valid destination then it will be used
* to determine what server region is assigned to. If no RegionPlan is
* available, region will be assigned to a random available server.
* <p>
* Updates the RegionState and sends the OPEN RPC.
* <p>
* This will only succeed if the region is in transition and in a CLOSED or
* OFFLINE state or not in transition, and of course, the
* chosen server is up and running (It may have just crashed!).
*
* @param region server to be assigned
*/
public void assign(HRegionInfo region) {
assign(region, false);
}
/**
* Use care with forceNewPlan. It could cause double assignment.
*/
public void assign(HRegionInfo region, boolean forceNewPlan) {
if (isDisabledorDisablingRegionInRIT(region)) {
return;
}
String encodedName = region.getEncodedName();
Lock lock = locker.acquireLock(encodedName);
try {
RegionState state = forceRegionStateToOffline(region, forceNewPlan);
if (state != null) {
if (regionStates.wasRegionOnDeadServer(encodedName)) {
LOG.info("Skip assigning " + region.getRegionNameAsString()
+ ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
+ " is dead but not processed yet");
return;
}
assign(state, forceNewPlan);
}
} finally {
lock.unlock();
}
}
/**
* Bulk assign regions to <code>destination</code>.
* @param destination
* @param regions Regions to assign.
* @return true if successful
*/
boolean assign(final ServerName destination, final List<HRegionInfo> regions)
throws InterruptedException {
long startTime = EnvironmentEdgeManager.currentTime();
try {
int regionCount = regions.size();
if (regionCount == 0) {
return true;
}
LOG.info("Assigning " + regionCount + " region(s) to " + destination.toString());
Set<String> encodedNames = new HashSet<String>(regionCount);
for (HRegionInfo region : regions) {
encodedNames.add(region.getEncodedName());
}
List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
Map<String, Lock> locks = locker.acquireLocks(encodedNames);
try {
Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regionCount);
List<RegionState> states = new ArrayList<RegionState>(regionCount);
for (HRegionInfo region : regions) {
String encodedName = region.getEncodedName();
if (!isDisabledorDisablingRegionInRIT(region)) {
RegionState state = forceRegionStateToOffline(region, false);
boolean onDeadServer = false;
if (state != null) {
if (regionStates.wasRegionOnDeadServer(encodedName)) {
LOG.info("Skip assigning " + region.getRegionNameAsString()
+ ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
+ " is dead but not processed yet");
onDeadServer = true;
} else {
RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
plans.put(encodedName, plan);
states.add(state);
continue;
}
}
// Reassign if the region wasn't on a dead server
if (!onDeadServer) {
LOG.info("failed to force region state to offline, "
+ "will reassign later: " + region);
failedToOpenRegions.add(region); // assign individually later
}
}
// Release the lock, this region is excluded from bulk assign because
// we can't update its state, or set its znode to offline.
Lock lock = locks.remove(encodedName);
lock.unlock();
}
if (server.isStopped()) {
return false;
}
// Add region plans, so we can updateTimers when one region is opened so
// that unnecessary timeout on RIT is reduced.
this.addPlans(plans);
List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos =
new ArrayList<Pair<HRegionInfo, List<ServerName>>>(states.size());
for (RegionState state: states) {
HRegionInfo region = state.getRegion();
regionStates.updateRegionState(
region, State.PENDING_OPEN, destination);
List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
if (this.shouldAssignRegionsWithFavoredNodes) {
favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
}
regionOpenInfos.add(new Pair<HRegionInfo, List<ServerName>>(
region, favoredNodes));
}
// Move on to open regions.
try {
// Send OPEN RPC. If it fails on a IOE or RemoteException,
// regions will be assigned individually.
Configuration conf = server.getConfiguration();
long maxWaitTime = System.currentTimeMillis() +
conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000);
for (int i = 1; i <= maximumAttempts && !server.isStopped(); i++) {
try {
List<RegionOpeningState> regionOpeningStateList = serverManager
.sendRegionOpen(destination, regionOpenInfos);
for (int k = 0, n = regionOpeningStateList.size(); k < n; k++) {
RegionOpeningState openingState = regionOpeningStateList.get(k);
if (openingState != RegionOpeningState.OPENED) {
HRegionInfo region = regionOpenInfos.get(k).getFirst();
LOG.info("Got opening state " + openingState
+ ", will reassign later: " + region);
// Failed opening this region, reassign it later
forceRegionStateToOffline(region, true);
failedToOpenRegions.add(region);
}
}
break;
} catch (IOException e) {
if (e instanceof RemoteException) {
e = ((RemoteException)e).unwrapRemoteException();
}
if (e instanceof RegionServerStoppedException) {
LOG.warn("The region server was shut down, ", e);
// No need to retry, the region server is a goner.
return false;
} else if (e instanceof ServerNotRunningYetException) {
long now = System.currentTimeMillis();
if (now < maxWaitTime) {
if (LOG.isDebugEnabled()) {
LOG.debug("Server is not yet up; waiting up to " +
(maxWaitTime - now) + "ms", e);
}
Thread.sleep(100);
i--; // reset the try count
continue;
}
} else if (e instanceof java.net.SocketTimeoutException
&& this.serverManager.isServerOnline(destination)) {
// In case socket is timed out and the region server is still online,
// the openRegion RPC could have been accepted by the server and
// just the response didn't go through. So we will retry to
// open the region on the same server.
if (LOG.isDebugEnabled()) {
LOG.debug("Bulk assigner openRegion() to " + destination
+ " has timed out, but the regions might"
+ " already be opened on it.", e);
}
// wait and reset the re-try count, server might be just busy.
Thread.sleep(100);
i--;
continue;
} else if (e instanceof FailedServerException && i < maximumAttempts) {
// In case the server is in the failed server list, no point to
// retry too soon. Retry after the failed_server_expiry time
long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug(destination + " is on failed server list; waiting "
+ sleepTime + "ms", e);
}
Thread.sleep(sleepTime);
continue;
}
throw e;
}
}
} catch (IOException e) {
// Can be a socket timeout, EOF, NoRouteToHost, etc
LOG.info("Unable to communicate with " + destination
+ " in order to assign regions, ", e);
for (RegionState state: states) {
HRegionInfo region = state.getRegion();
forceRegionStateToOffline(region, true);
}
return false;
}
} finally {
for (Lock lock : locks.values()) {
lock.unlock();
}
}
if (!failedToOpenRegions.isEmpty()) {
for (HRegionInfo region : failedToOpenRegions) {
if (!regionStates.isRegionOnline(region)) {
invokeAssign(region);
}
}
}
// wait for assignment completion
ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
for (HRegionInfo region: regions) {
if (!region.getTable().isSystemTable()) {
userRegionSet.add(region);
}
}
if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
System.currentTimeMillis())) {
LOG.debug("some user regions are still in transition: " + userRegionSet);
}
LOG.debug("Bulk assigning done for " + destination);
return true;
} finally {
metricsAssignmentManager.updateBulkAssignTime(EnvironmentEdgeManager.currentTime() - startTime);
}
}
/**
* Send CLOSE RPC if the server is online, otherwise, offline the region.
*
* The RPC will be sent only to the region sever found in the region state
* if it is passed in, otherwise, to the src server specified. If region
* state is not specified, we don't update region state at all, instead
* we just send the RPC call. This is useful for some cleanup without
* messing around the region states (see handleRegion, on region opened
* on an unexpected server scenario, for an example)
*/
private void unassign(final HRegionInfo region,
final ServerName server, final ServerName dest) {
for (int i = 1; i <= this.maximumAttempts; i++) {
if (this.server.isStopped() || this.server.isAborted()) {
LOG.debug("Server stopped/aborted; skipping unassign of " + region);
return;
}
if (!serverManager.isServerOnline(server)) {
LOG.debug("Offline " + region.getRegionNameAsString()
+ ", no need to unassign since it's on a dead server: " + server);
regionStates.updateRegionState(region, State.OFFLINE);
return;
}
try {
// Send CLOSE RPC
if (serverManager.sendRegionClose(server, region, dest)) {
LOG.debug("Sent CLOSE to " + server + " for region " +
region.getRegionNameAsString());
return;
}
// This never happens. Currently regionserver close always return true.
// Todo; this can now happen (0.96) if there is an exception in a coprocessor
LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
region.getRegionNameAsString());
} catch (Throwable t) {
long sleepTime = 0;
Configuration conf = this.server.getConfiguration();
if (t instanceof RemoteException) {
t = ((RemoteException)t).unwrapRemoteException();
}
if (t instanceof RegionServerAbortedException
|| t instanceof RegionServerStoppedException
|| t instanceof ServerNotRunningYetException) {
// RS is aborting, we cannot offline the region since the region may need to do WAL
// recovery. Until we see the RS expiration, we should retry.
sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
} else if (t instanceof NotServingRegionException) {
LOG.debug("Offline " + region.getRegionNameAsString()
+ ", it's not any more on " + server, t);
regionStates.updateRegionState(region, State.OFFLINE);
return;
} else if (t instanceof FailedServerException && i < maximumAttempts) {
// In case the server is in the failed server list, no point to
// retry too soon. Retry after the failed_server_expiry time
sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug(server + " is on failed server list; waiting " + sleepTime + "ms", t);
}
}
try {
if (sleepTime > 0) {
Thread.sleep(sleepTime);
}
} catch (InterruptedException ie) {
LOG.warn("Interrupted unassign " + region.getRegionNameAsString(), ie);
Thread.currentThread().interrupt();
regionStates.updateRegionState(region, State.FAILED_CLOSE);
return;
}
LOG.info("Server " + server + " returned " + t + " for "
+ region.getRegionNameAsString() + ", try=" + i
+ " of " + this.maximumAttempts, t);
}
}
// Run out of attempts
regionStates.updateRegionState(region, State.FAILED_CLOSE);
}
/**
* Set region to OFFLINE unless it is opening and forceNewPlan is false.
*/
private RegionState forceRegionStateToOffline(
final HRegionInfo region, final boolean forceNewPlan) {
RegionState state = regionStates.getRegionState(region);
if (state == null) {
LOG.warn("Assigning but not in region states: " + region);
state = regionStates.createRegionState(region);
}
if (forceNewPlan && LOG.isDebugEnabled()) {
LOG.debug("Force region state offline " + state);
}
switch (state.getState()) {
case OPEN:
case OPENING:
case PENDING_OPEN:
case CLOSING:
case PENDING_CLOSE:
if (!forceNewPlan) {
LOG.debug("Skip assigning " +
region + ", it is already " + state);
return null;
}
case FAILED_CLOSE:
case FAILED_OPEN:
regionStates.updateRegionState(region, State.PENDING_CLOSE);
unassign(region, state.getServerName(), null);
state = regionStates.getRegionState(region);
if (!state.isOffline() && !state.isClosed()) {
// If the region isn't offline, we can't re-assign
// it now. It will be assigned automatically after
// the regionserver reports it's closed.
return null;
}
case OFFLINE:
case CLOSED:
break;
default:
LOG.error("Trying to assign region " + region
+ ", which is " + state);
return null;
}
return state;
}
/**
* Caller must hold lock on the passed <code>state</code> object.
* @param state
* @param forceNewPlan
*/
private void assign(RegionState state, boolean forceNewPlan) {
long startTime = EnvironmentEdgeManager.currentTime();
try {
Configuration conf = server.getConfiguration();
RegionPlan plan = null;
long maxWaitTime = -1;
HRegionInfo region = state.getRegion();
Throwable previousException = null;
for (int i = 1; i <= maximumAttempts; i++) {
if (server.isStopped() || server.isAborted()) {
LOG.info("Skip assigning " + region.getRegionNameAsString()
+ ", the server is stopped/aborted");
return;
}
if (plan == null) { // Get a server for the region at first
try {
plan = getRegionPlan(region, forceNewPlan);
} catch (HBaseIOException e) {
LOG.warn("Failed to get region plan", e);
}
}
if (plan == null) {
LOG.warn("Unable to determine a plan to assign " + region);
// For meta region, we have to keep retrying until succeeding
if (region.isMetaRegion()) {
if (i == maximumAttempts) {
i = 0; // re-set attempt count to 0 for at least 1 retry
LOG.warn("Unable to determine a plan to assign a hbase:meta region " + region +
" after maximumAttempts (" + this.maximumAttempts +
"). Reset attempts count and continue retrying.");
}
waitForRetryingMetaAssignment();
continue;
}
regionStates.updateRegionState(region, State.FAILED_OPEN);
return;
}
LOG.info("Assigning " + region.getRegionNameAsString() +
" to " + plan.getDestination());
// Transition RegionState to PENDING_OPEN
regionStates.updateRegionState(region,
State.PENDING_OPEN, plan.getDestination());
boolean needNewPlan = false;
final String assignMsg = "Failed assignment of " + region.getRegionNameAsString() +
" to " + plan.getDestination();
try {
List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
if (this.shouldAssignRegionsWithFavoredNodes) {
favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
}
serverManager.sendRegionOpen(plan.getDestination(), region, favoredNodes);
return; // we're done
} catch (Throwable t) {
if (t instanceof RemoteException) {
t = ((RemoteException) t).unwrapRemoteException();
}
previousException = t;
// Should we wait a little before retrying? If the server is starting it's yes.
boolean hold = (t instanceof ServerNotRunningYetException);
// In case socket is timed out and the region server is still online,
// the openRegion RPC could have been accepted by the server and
// just the response didn't go through. So we will retry to
// open the region on the same server.
boolean retry = !hold && (t instanceof java.net.SocketTimeoutException
&& this.serverManager.isServerOnline(plan.getDestination()));
if (hold) {
LOG.warn(assignMsg + ", waiting a little before trying on the same region server " +
"try=" + i + " of " + this.maximumAttempts, t);
if (maxWaitTime < 0) {
maxWaitTime = EnvironmentEdgeManager.currentTime()
+ this.server.getConfiguration().getLong(
"hbase.regionserver.rpc.startup.waittime", 60000);
}
try {
long now = EnvironmentEdgeManager.currentTime();
if (now < maxWaitTime) {
if (LOG.isDebugEnabled()) {
LOG.debug("Server is not yet up; waiting up to "
+ (maxWaitTime - now) + "ms", t);
}
Thread.sleep(100);
i--; // reset the try count
} else {
LOG.debug("Server is not up for a while; try a new one", t);
needNewPlan = true;
}
} catch (InterruptedException ie) {
LOG.warn("Failed to assign "
+ region.getRegionNameAsString() + " since interrupted", ie);
regionStates.updateRegionState(region, State.FAILED_OPEN);
Thread.currentThread().interrupt();
return;
}
} else if (retry) {
i--; // we want to retry as many times as needed as long as the RS is not dead.
if (LOG.isDebugEnabled()) {
LOG.debug(assignMsg + ", trying to assign to the same region server due ", t);
}
} else {
needNewPlan = true;
LOG.warn(assignMsg + ", trying to assign elsewhere instead;" +
" try=" + i + " of " + this.maximumAttempts, t);
}
}
if (i == this.maximumAttempts) {
// For meta region, we have to keep retrying until succeeding
if (region.isMetaRegion()) {
i = 0; // re-set attempt count to 0 for at least 1 retry
LOG.warn(assignMsg +
", trying to assign a hbase:meta region reached to maximumAttempts (" +
this.maximumAttempts + "). Reset attempt counts and continue retrying.");
waitForRetryingMetaAssignment();
}
else {
// Don't reset the region state or get a new plan any more.
// This is the last try.
continue;
}
}
// If region opened on destination of present plan, reassigning to new
// RS may cause double assignments. In case of RegionAlreadyInTransitionException
// reassigning to same RS.
if (needNewPlan) {
// Force a new plan and reassign. Will return null if no servers.
// The new plan could be the same as the existing plan since we don't
// exclude the server of the original plan, which should not be
// excluded since it could be the only server up now.
RegionPlan newPlan = null;
try {
newPlan = getRegionPlan(region, true);
} catch (HBaseIOException e) {
LOG.warn("Failed to get region plan", e);
}
if (newPlan == null) {
regionStates.updateRegionState(region, State.FAILED_OPEN);
LOG.warn("Unable to find a viable location to assign region " +
region.getRegionNameAsString());
return;
}
if (plan != newPlan && !plan.getDestination().equals(newPlan.getDestination())) {
// Clean out plan we failed execute and one that doesn't look like it'll
// succeed anyways; we need a new plan!
// Transition back to OFFLINE
regionStates.updateRegionState(region, State.OFFLINE);
plan = newPlan;
} else if(plan.getDestination().equals(newPlan.getDestination()) &&
previousException instanceof FailedServerException) {
try {
LOG.info("Trying to re-assign " + region.getRegionNameAsString() +
" to the same failed server.");
Thread.sleep(1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
RpcClient.FAILED_SERVER_EXPIRY_DEFAULT));
} catch (InterruptedException ie) {
LOG.warn("Failed to assign "
+ region.getRegionNameAsString() + " since interrupted", ie);
regionStates.updateRegionState(region, State.FAILED_OPEN);
Thread.currentThread().interrupt();
return;
}
}
}
}
// Run out of attempts
regionStates.updateRegionState(region, State.FAILED_OPEN);
} finally {
metricsAssignmentManager.updateAssignmentTime(EnvironmentEdgeManager.currentTime() - startTime);
}
}
private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
if (this.tableStateManager.isTableState(region.getTable(),
TableState.State.DISABLED,
TableState.State.DISABLING) || replicasToClose.contains(region)) {
LOG.info("Table " + region.getTable() + " is disabled or disabling;"
+ " skipping assign of " + region.getRegionNameAsString());
offlineDisabledRegion(region);
return true;
}
return false;
}
/**
* @param region the region to assign
* @param forceNewPlan If true, then if an existing plan exists, a new plan
* will be generated.
* @return Plan for passed <code>region</code> (If none currently, it creates one or
* if no servers to assign, it returns null).
*/
private RegionPlan getRegionPlan(final HRegionInfo region,
final boolean forceNewPlan) throws HBaseIOException {
// Pickup existing plan or make a new one
final String encodedName = region.getEncodedName();
final List<ServerName> destServers =
serverManager.createDestinationServersList();
if (destServers.isEmpty()){
LOG.warn("Can't move " + encodedName +
", there is no destination server available.");
return null;
}
RegionPlan randomPlan = null;
boolean newPlan = false;
RegionPlan existingPlan;
synchronized (this.regionPlans) {
existingPlan = this.regionPlans.get(encodedName);
if (existingPlan != null && existingPlan.getDestination() != null) {
LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
+ " destination server is " + existingPlan.getDestination() +
" accepted as a dest server = " + destServers.contains(existingPlan.getDestination()));
}
if (forceNewPlan
|| existingPlan == null
|| existingPlan.getDestination() == null
|| !destServers.contains(existingPlan.getDestination())) {
newPlan = true;
try {
randomPlan = new RegionPlan(region, null,
balancer.randomAssignment(region, destServers));
} catch (IOException ex) {
LOG.warn("Failed to create new plan.",ex);
return null;
}
if (!region.isMetaTable() && shouldAssignRegionsWithFavoredNodes) {
List<HRegionInfo> regions = new ArrayList<HRegionInfo>(1);
regions.add(region);
try {
processFavoredNodes(regions);
} catch (IOException ie) {
LOG.warn("Ignoring exception in processFavoredNodes " + ie);
}
}
this.regionPlans.put(encodedName, randomPlan);
}
}
if (newPlan) {
if (randomPlan.getDestination() == null) {
LOG.warn("Can't find a destination for " + encodedName);
return null;
}
if (LOG.isDebugEnabled()) {
LOG.debug("No previous transition plan found (or ignoring " +
"an existing plan) for " + region.getRegionNameAsString() +
"; generated random plan=" + randomPlan + "; " + destServers.size() +
" (online=" + serverManager.getOnlineServers().size() +
") available servers, forceNewPlan=" + forceNewPlan);
}
return randomPlan;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Using pre-existing plan for " +
region.getRegionNameAsString() + "; plan=" + existingPlan);
}
return existingPlan;
}
/**
* Wait for some time before retrying meta table region assignment
*/
private void waitForRetryingMetaAssignment() {
try {
Thread.sleep(this.sleepTimeBeforeRetryingMetaAssignment);
} catch (InterruptedException e) {
LOG.error("Got exception while waiting for hbase:meta assignment");
Thread.currentThread().interrupt();
}
}
/**
* Unassigns the specified region.
* <p>
* Updates the RegionState and sends the CLOSE RPC unless region is being
* split by regionserver; then the unassign fails (silently) because we
* presume the region being unassigned no longer exists (its been split out
* of existence). TODO: What to do if split fails and is rolled back and
* parent is revivified?
* <p>
* If a RegionPlan is already set, it will remain.
*
* @param region server to be unassigned
*/
public void unassign(HRegionInfo region) {
unassign(region, null);
}
/**
* Unassigns the specified region.
* <p>
* Updates the RegionState and sends the CLOSE RPC unless region is being
* split by regionserver; then the unassign fails (silently) because we
* presume the region being unassigned no longer exists (its been split out
* of existence). TODO: What to do if split fails and is rolled back and
* parent is revivified?
* <p>
* If a RegionPlan is already set, it will remain.
*
* @param region server to be unassigned
* @param dest the destination server of the region
*/
public void unassign(HRegionInfo region, ServerName dest) {
// TODO: Method needs refactoring. Ugly buried returns throughout. Beware!
LOG.debug("Starting unassign of " + region.getRegionNameAsString()
+ " (offlining), current state: " + regionStates.getRegionState(region));
String encodedName = region.getEncodedName();
// Grab the state of this region and synchronize on it
// We need a lock here as we're going to do a put later and we don't want multiple states
// creation
ReentrantLock lock = locker.acquireLock(encodedName);
RegionState state = regionStates.getRegionTransitionState(encodedName);
try {
if (state == null || state.isFailedClose()) {
if (state == null) {
// Region is not in transition.
// We can unassign it only if it's not SPLIT/MERGED.
state = regionStates.getRegionState(encodedName);
if (state != null && state.isUnassignable()) {
LOG.info("Attempting to unassign " + state + ", ignored");
// Offline region will be reassigned below
return;
}
if (state == null || state.getServerName() == null) {
// We don't know where the region is, offline it.
// No need to send CLOSE RPC
LOG.warn("Attempting to unassign a region not in RegionStates "
+ region.getRegionNameAsString() + ", offlined");
regionOffline(region);
return;
}
}
state = regionStates.updateRegionState(
region, State.PENDING_CLOSE);
} else if (state.isFailedOpen()) {
// The region is not open yet
regionOffline(region);
return;
} else {
LOG.debug("Attempting to unassign " +
region.getRegionNameAsString() + " but it is " +
"already in transition (" + state.getState());
return;
}
unassign(region, state.getServerName(), dest);
} finally {
lock.unlock();
// Region is expected to be reassigned afterwards
if (!replicasToClose.contains(region)
&& regionStates.isRegionInState(region, State.OFFLINE)) {
assign(region);
}
}
}
/**
* Used by unit tests. Return the number of regions opened so far in the life
* of the master. Increases by one every time the master opens a region
* @return the counter value of the number of regions opened so far
*/
public int getNumRegionsOpened() {
return numRegionsOpened.get();
}
/**
* Waits until the specified region has completed assignment.
* <p>
* If the region is already assigned, returns immediately. Otherwise, method
* blocks until the region is assigned.
* @param regionInfo region to wait on assignment for
* @return true if the region is assigned false otherwise.
* @throws InterruptedException
*/
public boolean waitForAssignment(HRegionInfo regionInfo)
throws InterruptedException {
ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
regionSet.add(regionInfo);
return waitForAssignment(regionSet, true, Long.MAX_VALUE);
}
/**
* Waits until the specified region has completed assignment, or the deadline is reached.
*/
protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
final boolean waitTillAllAssigned, final int reassigningRegions,
final long minEndTime) throws InterruptedException {
long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
if (deadline < 0) { // Overflow
deadline = Long.MAX_VALUE; // wait forever
}
return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
}
/**
* Waits until the specified region has completed assignment, or the deadline is reached.
* @param regionSet set of region to wait on. the set is modified and the assigned regions removed
* @param waitTillAllAssigned true if we should wait all the regions to be assigned
* @param deadline the timestamp after which the wait is aborted
* @return true if all the regions are assigned false otherwise.
* @throws InterruptedException
*/
protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
// We're not synchronizing on regionsInTransition now because we don't use any iterator.
while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
int failedOpenCount = 0;
Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
while (regionInfoIterator.hasNext()) {
HRegionInfo hri = regionInfoIterator.next();
if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
regionInfoIterator.remove();
} else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
failedOpenCount++;
}
}
if (!waitTillAllAssigned) {
// No need to wait, let assignment going on asynchronously
break;
}
if (!regionSet.isEmpty()) {
if (failedOpenCount == regionSet.size()) {
// all the regions we are waiting had an error on open.
break;
}
regionStates.waitForUpdate(100);
}
}
return regionSet.isEmpty();
}
/**
* Assigns the hbase:meta region or a replica.
* <p>
* Assumes that hbase:meta is currently closed and is not being actively served by
* any RegionServer.
* @param hri TODO
*/
public void assignMeta(HRegionInfo hri) throws KeeperException {
regionStates.updateRegionState(hri, State.OFFLINE);
assign(hri);
}
/**
* Assigns specified regions retaining assignments, if any.
* <p>
* This is a synchronous call and will return once every region has been
* assigned. If anything fails, an exception is thrown
* @throws InterruptedException
* @throws IOException
*/
public void assign(Map<HRegionInfo, ServerName> regions)
throws IOException, InterruptedException {
if (regions == null || regions.isEmpty()) {
return;
}
List<ServerName> servers = serverManager.createDestinationServersList();
if (servers == null || servers.isEmpty()) {
throw new IOException("Found no destination server to assign region(s)");
}
// Reuse existing assignment info
Map<ServerName, List<HRegionInfo>> bulkPlan =
balancer.retainAssignment(regions, servers);
if (bulkPlan == null) {
throw new IOException("Unable to determine a plan to assign region(s)");
}
processBogusAssignments(bulkPlan);
assign(regions.size(), servers.size(),
"retainAssignment=true", bulkPlan);
}
/**
* Assigns specified regions round robin, if any.
* <p>
* This is a synchronous call and will return once every region has been
* assigned. If anything fails, an exception is thrown
* @throws InterruptedException
* @throws IOException
*/
public void assign(List<HRegionInfo> regions)
throws IOException, InterruptedException {
if (regions == null || regions.isEmpty()) {
return;
}
List<ServerName> servers = serverManager.createDestinationServersList();
if (servers == null || servers.isEmpty()) {
throw new IOException("Found no destination server to assign region(s)");
}
// Generate a round-robin bulk assignment plan
Map<ServerName, List<HRegionInfo>> bulkPlan = balancer.roundRobinAssignment(regions, servers);
if (bulkPlan == null) {
throw new IOException("Unable to determine a plan to assign region(s)");
}
processBogusAssignments(bulkPlan);
processFavoredNodes(regions);
assign(regions.size(), servers.size(), "round-robin=true", bulkPlan);
}
private void assign(int regions, int totalServers,
String message, Map<ServerName, List<HRegionInfo>> bulkPlan)
throws InterruptedException, IOException {
int servers = bulkPlan.size();
if (servers == 1 || (regions < bulkAssignThresholdRegions
&& servers < bulkAssignThresholdServers)) {
// Not use bulk assignment. This could be more efficient in small
// cluster, especially mini cluster for testing, so that tests won't time out
if (LOG.isTraceEnabled()) {
LOG.trace("Not using bulk assignment since we are assigning only " + regions +
" region(s) to " + servers + " server(s)");
}
// invoke assignment (async)
ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) {
for (HRegionInfo region: plan.getValue()) {
if (!regionStates.isRegionOnline(region)) {
invokeAssign(region);
if (!region.getTable().isSystemTable()) {
userRegionSet.add(region);
}
}
}
}
}
// wait for assignment completion
if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
System.currentTimeMillis())) {
LOG.debug("some user regions are still in transition: " + userRegionSet);
}
} else {
LOG.info("Bulk assigning " + regions + " region(s) across "
+ totalServers + " server(s), " + message);
// Use fixed count thread pool assigning.
BulkAssigner ba = new GeneralBulkAssigner(
this.server, bulkPlan, this, bulkAssignWaitTillAllAssigned);
ba.bulkAssign();
LOG.info("Bulk assigning done");
}
}
/**
* Assigns all user regions, if any exist. Used during cluster startup.
* <p>
* This is a synchronous call and will return once every region has been
* assigned. If anything fails, an exception is thrown and the cluster
* should be shutdown.
* @throws InterruptedException
* @throws IOException
*/
private void assignAllUserRegions(Map<HRegionInfo, ServerName> allRegions)
throws IOException, InterruptedException {
if (allRegions == null || allRegions.isEmpty()) return;
// Determine what type of assignment to do on startup
boolean retainAssignment = server.getConfiguration().
getBoolean("hbase.master.startup.retainassign", true);
Set<HRegionInfo> regionsFromMetaScan = allRegions.keySet();
if (retainAssignment) {
assign(allRegions);
} else {
List<HRegionInfo> regions = new ArrayList<HRegionInfo>(regionsFromMetaScan);
assign(regions);
}
for (HRegionInfo hri : regionsFromMetaScan) {
TableName tableName = hri.getTable();
if (!tableStateManager.isTableState(tableName,
TableState.State.ENABLED)) {
setEnabledTable(tableName);
}
}
// assign all the replicas that were not recorded in the meta
assign(replicaRegionsNotRecordedInMeta(regionsFromMetaScan, (MasterServices)server));
}
/**
* Get a list of replica regions that are:
* not recorded in meta yet. We might not have recorded the locations
* for the replicas since the replicas may not have been online yet, master restarted
* in the middle of assigning, ZK erased, etc.
* @param regionsRecordedInMeta the list of regions we know are recorded in meta
* either as a default, or, as the location of a replica
* @param master
* @return list of replica regions
* @throws IOException
*/
public static List<HRegionInfo> replicaRegionsNotRecordedInMeta(
Set<HRegionInfo> regionsRecordedInMeta, MasterServices master)throws IOException {
List<HRegionInfo> regionsNotRecordedInMeta = new ArrayList<HRegionInfo>();
for (HRegionInfo hri : regionsRecordedInMeta) {
TableName table = hri.getTable();
HTableDescriptor htd = master.getTableDescriptors().get(table);
// look at the HTD for the replica count. That's the source of truth
int desiredRegionReplication = htd.getRegionReplication();
for (int i = 0; i < desiredRegionReplication; i++) {
HRegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(hri, i);
if (regionsRecordedInMeta.contains(replica)) continue;
regionsNotRecordedInMeta.add(replica);
}
}
return regionsNotRecordedInMeta;
}
/**
* Rebuild the list of user regions and assignment information.
* Updates regionstates with findings as we go through list of regions.
* @return set of servers not online that hosted some regions according to a scan of hbase:meta
* @throws IOException
*/
Set<ServerName> rebuildUserRegions() throws
IOException, KeeperException {
Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
TableState.State.DISABLED, TableState.State.ENABLING);
Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
TableState.State.DISABLED,
TableState.State.DISABLING,
TableState.State.ENABLING);
// Region assignment from META
List<Result> results = MetaTableAccessor.fullScanRegions(server.getConnection());
// Get any new but slow to checkin region server that joined the cluster
Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
// Set of offline servers to be returned
Set<ServerName> offlineServers = new HashSet<ServerName>();
// Iterate regions in META
for (Result result : results) {
if (result == null && LOG.isDebugEnabled()){
LOG.debug("null result from meta - ignoring but this is strange.");
continue;
}
// keep a track of replicas to close. These were the replicas of the originally
// unmerged regions. The master might have closed them before but it mightn't
// maybe because it crashed.
PairOfSameType<HRegionInfo> p = MetaTableAccessor.getMergeRegions(result);
if (p.getFirst() != null && p.getSecond() != null) {
int numReplicas = ((MasterServices)server).getTableDescriptors().get(p.getFirst().
getTable()).getRegionReplication();
for (HRegionInfo merge : p) {
for (int i = 1; i < numReplicas; i++) {
replicasToClose.add(RegionReplicaUtil.getRegionInfoForReplica(merge, i));
}
}
}
RegionLocations rl = MetaTableAccessor.getRegionLocations(result);
if (rl == null) {
continue;
}
HRegionLocation[] locations = rl.getRegionLocations();
if (locations == null) {
continue;
}
for (HRegionLocation hrl : locations) {
if (hrl == null) continue;
HRegionInfo regionInfo = hrl.getRegionInfo();
if (regionInfo == null) continue;
int replicaId = regionInfo.getReplicaId();
State state = RegionStateStore.getRegionState(result, replicaId);
// keep a track of replicas to close. These were the replicas of the split parents
// from the previous life of the master. The master should have closed them before
// but it couldn't maybe because it crashed
if (replicaId == 0 && state.equals(State.SPLIT)) {
for (HRegionLocation h : locations) {
replicasToClose.add(h.getRegionInfo());
}
}
ServerName lastHost = hrl.getServerName();
ServerName regionLocation = RegionStateStore.getRegionServer(result, replicaId);
regionStates.createRegionState(regionInfo, state, regionLocation, lastHost);
if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
// Region is not open (either offline or in transition), skip
continue;
}
TableName tableName = regionInfo.getTable();
if (!onlineServers.contains(regionLocation)) {
// Region is located on a server that isn't online
offlineServers.add(regionLocation);
} else if (!disabledOrEnablingTables.contains(tableName)) {
// Region is being served and on an active server
// add only if region not in disabled or enabling table
regionStates.regionOnline(regionInfo, regionLocation);
balancer.regionOnline(regionInfo, regionLocation);
}
// need to enable the table if not disabled or disabling or enabling
// this will be used in rolling restarts
if (!disabledOrDisablingOrEnabling.contains(tableName)
&& !getTableStateManager().isTableState(tableName,
TableState.State.ENABLED)) {
setEnabledTable(tableName);
}
}
}
return offlineServers;
}
/**
* Processes list of regions in transition at startup
*/
void processRegionsInTransition(Collection<RegionState> regionsInTransition) {
// We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
// in case the RPC call is not sent out yet before the master was shut down
// since we update the state before we send the RPC call. We can't update
// the state after the RPC call. Otherwise, we don't know what's happened
// to the region if the master dies right after the RPC call is out.
for (RegionState regionState: regionsInTransition) {
LOG.info("Processing " + regionState);
ServerName serverName = regionState.getServerName();
// Server could be null in case of FAILED_OPEN when master cannot find a region plan. In that
// case, try assigning it here.
if (serverName != null && !serverManager.getOnlineServers().containsKey(serverName)) {
LOG.info("Server " + serverName + " isn't online. SSH will handle this");
continue; // SSH will handle it
}
HRegionInfo regionInfo = regionState.getRegion();
RegionState.State state = regionState.getState();
switch (state) {
case CLOSED:
invokeAssign(regionState.getRegion());
break;
case PENDING_OPEN:
retrySendRegionOpen(regionState);
break;
case PENDING_CLOSE:
retrySendRegionClose(regionState);
break;
case FAILED_CLOSE:
case FAILED_OPEN:
invokeUnAssign(regionInfo);
break;
default:
// No process for other states
break;
}
}
}
/**
* At master failover, for pending_open region, make sure
* sendRegionOpen RPC call is sent to the target regionserver
*/
private void retrySendRegionOpen(final RegionState regionState) {
this.executorService.submit(
new EventHandler(server, EventType.M_MASTER_RECOVERY) {
@Override
public void process() throws IOException {
HRegionInfo hri = regionState.getRegion();
ServerName serverName = regionState.getServerName();
ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
try {
for (int i = 1; i <= maximumAttempts; i++) {
if (!serverManager.isServerOnline(serverName)
|| server.isStopped() || server.isAborted()) {
return; // No need any more
}
try {
if (!regionState.equals(regionStates.getRegionState(hri))) {
return; // Region is not in the expected state any more
}
List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
if (shouldAssignRegionsWithFavoredNodes) {
favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
}
serverManager.sendRegionOpen(serverName, hri, favoredNodes);
return; // we're done
} catch (Throwable t) {
if (t instanceof RemoteException) {
t = ((RemoteException) t).unwrapRemoteException();
}
if (t instanceof FailedServerException && i < maximumAttempts) {
// In case the server is in the failed server list, no point to
// retry too soon. Retry after the failed_server_expiry time
try {
Configuration conf = this.server.getConfiguration();
long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug(serverName + " is on failed server list; waiting "
+ sleepTime + "ms", t);
}
Thread.sleep(sleepTime);
continue;
} catch (InterruptedException ie) {
LOG.warn("Failed to assign "
+ hri.getRegionNameAsString() + " since interrupted", ie);
regionStates.updateRegionState(hri, State.FAILED_OPEN);
Thread.currentThread().interrupt();
return;
}
}
if (serverManager.isServerOnline(serverName)
&& t instanceof java.net.SocketTimeoutException) {
i--; // reset the try count
} else {
LOG.info("Got exception in retrying sendRegionOpen for "
+ regionState + "; try=" + i + " of " + maximumAttempts, t);
}
Threads.sleep(100);
}
}
// Run out of attempts
regionStates.updateRegionState(hri, State.FAILED_OPEN);
} finally {
lock.unlock();
}
}
});
}
/**
* At master failover, for pending_close region, make sure
* sendRegionClose RPC call is sent to the target regionserver
*/
private void retrySendRegionClose(final RegionState regionState) {
this.executorService.submit(
new EventHandler(server, EventType.M_MASTER_RECOVERY) {
@Override
public void process() throws IOException {
HRegionInfo hri = regionState.getRegion();
ServerName serverName = regionState.getServerName();
ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
try {
for (int i = 1; i <= maximumAttempts; i++) {
if (!serverManager.isServerOnline(serverName)
|| server.isStopped() || server.isAborted()) {
return; // No need any more
}
try {
if (!regionState.equals(regionStates.getRegionState(hri))) {
return; // Region is not in the expected state any more
}
serverManager.sendRegionClose(serverName, hri, null);
return; // Done.
} catch (Throwable t) {
if (t instanceof RemoteException) {
t = ((RemoteException) t).unwrapRemoteException();
}
if (t instanceof FailedServerException && i < maximumAttempts) {
// In case the server is in the failed server list, no point to
// retry too soon. Retry after the failed_server_expiry time
try {
Configuration conf = this.server.getConfiguration();
long sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
if (LOG.isDebugEnabled()) {
LOG.debug(serverName + " is on failed server list; waiting "
+ sleepTime + "ms", t);
}
Thread.sleep(sleepTime);
continue;
} catch (InterruptedException ie) {
LOG.warn("Failed to unassign "
+ hri.getRegionNameAsString() + " since interrupted", ie);
regionStates.updateRegionState(hri, RegionState.State.FAILED_CLOSE);
Thread.currentThread().interrupt();
return;
}
}
if (serverManager.isServerOnline(serverName)
&& t instanceof java.net.SocketTimeoutException) {
i--; // reset the try count
} else {
LOG.info("Got exception in retrying sendRegionClose for "
+ regionState + "; try=" + i + " of " + maximumAttempts, t);
}
Threads.sleep(100);
}
}
// Run out of attempts
regionStates.updateRegionState(hri, State.FAILED_CLOSE);
} finally {
lock.unlock();
}
}
});
}
/**
* Set Regions in transitions metrics.
* This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
* This iterator is not fail fast, which may lead to stale read; but that's better than
* creating a copy of the map for metrics computation, as this method will be invoked
* on a frequent interval.
*/
public void updateRegionsInTransitionMetrics() {
long currentTime = System.currentTimeMillis();
int totalRITs = 0;
int totalRITsOverThreshold = 0;
long oldestRITTime = 0;
int ritThreshold = this.server.getConfiguration().
getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
for (RegionState state: regionStates.getRegionsInTransition()) {
totalRITs++;
long ritTime = currentTime - state.getStamp();
if (ritTime > ritThreshold) { // more than the threshold
totalRITsOverThreshold++;
}
if (oldestRITTime < ritTime) {
oldestRITTime = ritTime;
}
}
if (this.metricsAssignmentManager != null) {
this.metricsAssignmentManager.updateRITOldestAge(oldestRITTime);
this.metricsAssignmentManager.updateRITCount(totalRITs);
this.metricsAssignmentManager.updateRITCountOverThreshold(totalRITsOverThreshold);
}
}
/**
* @param region Region whose plan we are to clear.
*/
private void clearRegionPlan(final HRegionInfo region) {
synchronized (this.regionPlans) {
this.regionPlans.remove(region.getEncodedName());
}
}
/**
* Wait on region to clear regions-in-transition.
* @param hri Region to wait on.
* @throws IOException
*/
public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
throws IOException, InterruptedException {
waitOnRegionToClearRegionsInTransition(hri, -1L);
}
/**
* Wait on region to clear regions-in-transition or time out
* @param hri
* @param timeOut Milliseconds to wait for current region to be out of transition state.
* @return True when a region clears regions-in-transition before timeout otherwise false
* @throws InterruptedException
*/
public boolean waitOnRegionToClearRegionsInTransition(final HRegionInfo hri, long timeOut)
throws InterruptedException {
if (!regionStates.isRegionInTransition(hri)) {
return true;
}
long end = (timeOut <= 0) ? Long.MAX_VALUE : EnvironmentEdgeManager.currentTime()
+ timeOut;
// There is already a timeout monitor on regions in transition so I
// should not have to have one here too?
LOG.info("Waiting for " + hri.getEncodedName() +
" to leave regions-in-transition, timeOut=" + timeOut + " ms.");
while (!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
regionStates.waitForUpdate(100);
if (EnvironmentEdgeManager.currentTime() > end) {
LOG.info("Timed out on waiting for " + hri.getEncodedName() + " to be assigned.");
return false;
}
}
if (this.server.isStopped()) {
LOG.info("Giving up wait on regions in transition because stoppable.isStopped is set");
return false;
}
return true;
}
void invokeAssign(HRegionInfo regionInfo) {
threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
}
void invokeUnAssign(HRegionInfo regionInfo) {
threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
}
public boolean isCarryingMeta(ServerName serverName) {
return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
}
public boolean isCarryingMetaReplica(ServerName serverName, int replicaId) {
return isCarryingRegion(serverName,
RegionReplicaUtil.getRegionInfoForReplica(HRegionInfo.FIRST_META_REGIONINFO, replicaId));
}
public boolean isCarryingMetaReplica(ServerName serverName, HRegionInfo metaHri) {
return isCarryingRegion(serverName, metaHri);
}
/**
* Check if the shutdown server carries the specific region.
* @return whether the serverName currently hosts the region
*/
private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
RegionState regionState = regionStates.getRegionTransitionState(hri);
ServerName transitionAddr = regionState != null? regionState.getServerName(): null;
if (transitionAddr != null) {
boolean matchTransitionAddr = transitionAddr.equals(serverName);
LOG.debug("Checking region=" + hri.getRegionNameAsString()
+ ", transitioning on server=" + matchTransitionAddr
+ " server being checked: " + serverName
+ ", matches=" + matchTransitionAddr);
return matchTransitionAddr;
}
ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri);
boolean matchAssignedAddr = serverName.equals(assignedAddr);
LOG.debug("based on AM, current region=" + hri.getRegionNameAsString()
+ " is on server=" + assignedAddr + ", server being checked: "
+ serverName);
return matchAssignedAddr;
}
/**
* Clean out crashed server removing any assignments.
* @param sn Server that went down.
* @return list of regions in transition on this server
*/
public List<HRegionInfo> cleanOutCrashedServerReferences(final ServerName sn) {
// Clean out any existing assignment plans for this server
synchronized (this.regionPlans) {
for (Iterator <Map.Entry<String, RegionPlan>> i = this.regionPlans.entrySet().iterator();
i.hasNext();) {
Map.Entry<String, RegionPlan> e = i.next();
ServerName otherSn = e.getValue().getDestination();
// The name will be null if the region is planned for a random assign.
if (otherSn != null && otherSn.equals(sn)) {
// Use iterator's remove else we'll get CME
i.remove();
}
}
}
List<HRegionInfo> rits = regionStates.serverOffline(sn);
for (Iterator<HRegionInfo> it = rits.iterator(); it.hasNext(); ) {
HRegionInfo hri = it.next();
String encodedName = hri.getEncodedName();
// We need a lock on the region as we could update it
Lock lock = locker.acquireLock(encodedName);
try {
RegionState regionState = regionStates.getRegionTransitionState(encodedName);
if (regionState == null
|| (regionState.getServerName() != null && !regionState.isOnServer(sn))
|| !RegionStates.isOneOfStates(regionState, State.PENDING_OPEN,
State.OPENING, State.FAILED_OPEN, State.FAILED_CLOSE, State.OFFLINE)) {
LOG.info("Skip " + regionState + " since it is not opening/failed_close"
+ " on the dead server any more: " + sn);
it.remove();
} else {
if (tableStateManager.isTableState(hri.getTable(),
TableState.State.DISABLED, TableState.State.DISABLING)) {
regionStates.regionOffline(hri);
it.remove();
continue;
}
// Mark the region offline and assign it again by SSH
regionStates.updateRegionState(hri, State.OFFLINE);
}
} finally {
lock.unlock();
}
}
return rits;
}
/**
* @param plan Plan to execute.
*/
public void balance(final RegionPlan plan) {
HRegionInfo hri = plan.getRegionInfo();
TableName tableName = hri.getTable();
if (tableStateManager.isTableState(tableName,
TableState.State.DISABLED, TableState.State.DISABLING)) {
LOG.info("Ignored moving region of disabling/disabled table "
+ tableName);
return;
}
// Move the region only if it's assigned
String encodedName = hri.getEncodedName();
ReentrantLock lock = locker.acquireLock(encodedName);
try {
if (!regionStates.isRegionOnline(hri)) {
RegionState state = regionStates.getRegionState(encodedName);
LOG.info("Ignored moving region not assigned: " + hri + ", "
+ (state == null ? "not in region states" : state));
return;
}
synchronized (this.regionPlans) {
this.regionPlans.put(plan.getRegionName(), plan);
}
unassign(hri, plan.getDestination());
} finally {
lock.unlock();
}
}
public void stop() {
// Shutdown the threadpool executor service
threadPoolExecutorService.shutdownNow();
regionStateStore.stop();
}
protected void setEnabledTable(TableName tableName) {
try {
this.tableStateManager.setTableState(tableName,
TableState.State.ENABLED);
} catch (IOException e) {
// here we can abort as it is the start up flow
String errorMsg = "Unable to ensure that the table " + tableName
+ " will be" + " enabled because of a ZooKeeper issue";
LOG.error(errorMsg);
this.server.abort(errorMsg, e);
}
}
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
justification="Worth fixing but not the end of the world.")
private String onRegionFailedOpen(final RegionState current,
final HRegionInfo hri, final ServerName serverName) {
// The region must be opening on this server.
// If current state is failed_open on the same server,
// it could be a reportRegionTransition RPC retry.
if (current == null || !current.isOpeningOrFailedOpenOnServer(serverName)) {
return hri.getShortNameToLog() + " is not opening on " + serverName;
}
// Just return in case of retrying
if (current.isFailedOpen()) {
return null;
}
String encodedName = hri.getEncodedName();
// FindBugs: AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION Worth fixing!!!
AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
if (failedOpenCount == null) {
failedOpenCount = new AtomicInteger();
// No need to use putIfAbsent, or extra synchronization since
// this whole handleRegion block is locked on the encoded region
// name, and failedOpenTracker is updated only in this block
failedOpenTracker.put(encodedName, failedOpenCount);
}
if (failedOpenCount.incrementAndGet() >= maximumAttempts && !hri.isMetaRegion()) {
regionStates.updateRegionState(hri, State.FAILED_OPEN);
// remove the tracking info to save memory, also reset
// the count for next open initiative
failedOpenTracker.remove(encodedName);
} else {
if (hri.isMetaRegion() && failedOpenCount.get() >= maximumAttempts) {
// Log a warning message if a meta region failedOpenCount exceeds maximumAttempts
// so that we are aware of potential problem if it persists for a long time.
LOG.warn("Failed to open the hbase:meta region " +
hri.getRegionNameAsString() + " after" +
failedOpenCount.get() + " retries. Continue retrying.");
}
// Handle this the same as if it were opened and then closed.
RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
if (regionState != null) {
// When there are more than one region server a new RS is selected as the
// destination and the same is updated in the region plan. (HBASE-5546)
if (getTableStateManager().isTableState(hri.getTable(),
TableState.State.DISABLED, TableState.State.DISABLING) ||
replicasToClose.contains(hri)) {
offlineDisabledRegion(hri);
return null;
}
regionStates.updateRegionState(hri, RegionState.State.CLOSED);
// This below has to do w/ online enable/disable of a table
removeClosedRegion(hri);
try {
getRegionPlan(hri, true);
} catch (HBaseIOException e) {
LOG.warn("Failed to get region plan", e);
}
invokeAssign(hri);
}
}
// Null means no error
return null;
}
private String onRegionOpen(final RegionState current, final HRegionInfo hri,
final ServerName serverName, final RegionStateTransition transition) {
// The region must be opening on this server.
// If current state is already opened on the same server,
// it could be a reportRegionTransition RPC retry.
if (current == null || !current.isOpeningOrOpenedOnServer(serverName)) {
return hri.getShortNameToLog() + " is not opening on " + serverName;
}
// Just return in case of retrying
if (current.isOpened()) {
return null;
}
long openSeqNum = transition.hasOpenSeqNum()
? transition.getOpenSeqNum() : HConstants.NO_SEQNUM;
if (openSeqNum < 0) {
return "Newly opened region has invalid open seq num " + openSeqNum;
}
regionOnline(hri, serverName, openSeqNum);
// reset the count, if any
failedOpenTracker.remove(hri.getEncodedName());
if (getTableStateManager().isTableState(hri.getTable(),
TableState.State.DISABLED, TableState.State.DISABLING)) {
invokeUnAssign(hri);
}
return null;
}
private String onRegionClosed(final RegionState current,
final HRegionInfo hri, final ServerName serverName) {
// Region will be usually assigned right after closed. When a RPC retry comes
// in, the region may already have moved away from closed state. However, on the
// region server side, we don't care much about the response for this transition.
// We only make sure master has got and processed this report, either
// successfully or not. So this is fine, not a problem at all.
if (current == null || !current.isClosingOrClosedOnServer(serverName)) {
return hri.getShortNameToLog() + " is not closing on " + serverName;
}
// Just return in case of retrying
if (current.isClosed()) {
return null;
}
if (getTableStateManager().isTableState(hri.getTable(), TableState.State.DISABLED,
TableState.State.DISABLING) || replicasToClose.contains(hri)) {
offlineDisabledRegion(hri);
return null;
}
regionStates.updateRegionState(hri, RegionState.State.CLOSED);
sendRegionClosedNotification(hri);
// This below has to do w/ online enable/disable of a table
removeClosedRegion(hri);
invokeAssign(hri);
return null;
}
private String onRegionReadyToSplit(final RegionState current, final HRegionInfo hri,
final ServerName serverName, final RegionStateTransition transition) {
// The region must be opened on this server.
// If current state is already splitting on the same server,
// it could be a reportRegionTransition RPC retry.
if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
return hri.getShortNameToLog() + " is not opening on " + serverName;
}
if (!((HMaster)server).getSplitOrMergeTracker().isSplitOrMergeEnabled(
MasterSwitchType.SPLIT)) {
return "split switch is off!";
}
// Just return in case of retrying
if (current.isSplitting()) {
return null;
}
final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
if (rs_a != null || rs_b != null) {
return "Some daughter is already existing. "
+ "a=" + rs_a + ", b=" + rs_b;
}
// Server holding is not updated at this stage.
// It is done after PONR.
regionStates.updateRegionState(hri, State.SPLITTING);
regionStates.createRegionState(
a, State.SPLITTING_NEW, serverName, null);
regionStates.createRegionState(
b, State.SPLITTING_NEW, serverName, null);
return null;
}
private String onRegionSplitPONR(final RegionState current, final HRegionInfo hri,
final ServerName serverName, final RegionStateTransition transition) {
// The region must be splitting on this server, and the daughters must be in
// splitting_new state. To check RPC retry, we use server holding info.
if (current == null || !current.isSplittingOnServer(serverName)) {
return hri.getShortNameToLog() + " is not splitting on " + serverName;
}
final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
// Master could have restarted and lost the new region
// states, if so, they must be lost together
if (rs_a == null && rs_b == null) {
rs_a = regionStates.createRegionState(
a, State.SPLITTING_NEW, serverName, null);
rs_b = regionStates.createRegionState(
b, State.SPLITTING_NEW, serverName, null);
}
if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
|| rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
return "Some daughter is not known to be splitting on " + serverName
+ ", a=" + rs_a + ", b=" + rs_b;
}
// Just return in case of retrying
if (!regionStates.isRegionOnServer(hri, serverName)) {
return null;
}
try {
regionStates.splitRegion(hri, a, b, serverName);
} catch (IOException ioe) {
LOG.info("Failed to record split region " + hri.getShortNameToLog());
return "Failed to record the splitting in meta";
}
return null;
}
private String onRegionSplit(final RegionState current, final HRegionInfo hri,
final ServerName serverName, final RegionStateTransition transition) {
// The region must be splitting on this server, and the daughters must be in
// splitting_new state.
// If current state is already split on the same server,
// it could be a reportRegionTransition RPC retry.
if (current == null || !current.isSplittingOrSplitOnServer(serverName)) {
return hri.getShortNameToLog() + " is not splitting on " + serverName;
}
// Just return in case of retrying
if (current.isSplit()) {
return null;
}
final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
|| rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
return "Some daughter is not known to be splitting on " + serverName
+ ", a=" + rs_a + ", b=" + rs_b;
}
if (TEST_SKIP_SPLIT_HANDLING) {
return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
}
regionOffline(hri, State.SPLIT);
regionOnline(a, serverName, 1);
regionOnline(b, serverName, 1);
// User could disable the table before master knows the new region.
if (getTableStateManager().isTableState(hri.getTable(),
TableState.State.DISABLED, TableState.State.DISABLING)) {
invokeUnAssign(a);
invokeUnAssign(b);
} else {
Callable<Object> splitReplicasCallable = new Callable<Object>() {
@Override
public Object call() {
doSplittingOfReplicas(hri, a, b);
return null;
}
};
threadPoolExecutorService.submit(splitReplicasCallable);
}
return null;
}
private String onRegionSplitReverted(final RegionState current, final HRegionInfo hri,
final ServerName serverName, final RegionStateTransition transition) {
// The region must be splitting on this server, and the daughters must be in
// splitting_new state.
// If the region is in open state, it could be an RPC retry.
if (current == null || !current.isSplittingOrOpenedOnServer(serverName)) {
return hri.getShortNameToLog() + " is not splitting on " + serverName;
}
// Just return in case of retrying
if (current.isOpened()) {
return null;
}
final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
if (rs_a == null || !rs_a.isSplittingNewOnServer(serverName)
|| rs_b == null || !rs_b.isSplittingNewOnServer(serverName)) {
return "Some daughter is not known to be splitting on " + serverName
+ ", a=" + rs_a + ", b=" + rs_b;
}
regionOnline(hri, serverName);
regionOffline(a);
regionOffline(b);
if (getTableStateManager().isTableState(hri.getTable(),
TableState.State.DISABLED, TableState.State.DISABLING)) {
invokeUnAssign(hri);
}
return null;
}
private String onRegionReadyToMerge(final RegionState current, final HRegionInfo hri,
final ServerName serverName, final RegionStateTransition transition) {
// The region must be new, and the daughters must be open on this server.
// If the region is in merge_new state, it could be an RPC retry.
if (current != null && !current.isMergingNewOnServer(serverName)) {
return "Merging daughter region already exists, p=" + current;
}
if (!((HMaster)server).getSplitOrMergeTracker().isSplitOrMergeEnabled(
MasterSwitchType.MERGE)) {
return "merge switch is off!";
}
// Just return in case of retrying
if (current != null) {
return null;
}
final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
Set<String> encodedNames = new HashSet<String>(2);
encodedNames.add(a.getEncodedName());
encodedNames.add(b.getEncodedName());
Map<String, Lock> locks = locker.acquireLocks(encodedNames);
try {
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
if (rs_a == null || !rs_a.isOpenedOnServer(serverName)
|| rs_b == null || !rs_b.isOpenedOnServer(serverName)) {
return "Some daughter is not in a state to merge on " + serverName
+ ", a=" + rs_a + ", b=" + rs_b;
}
regionStates.updateRegionState(a, State.MERGING);
regionStates.updateRegionState(b, State.MERGING);
regionStates.createRegionState(
hri, State.MERGING_NEW, serverName, null);
return null;
} finally {
for (Lock lock: locks.values()) {
lock.unlock();
}
}
}
private String onRegionMergePONR(final RegionState current, final HRegionInfo hri,
final ServerName serverName, final RegionStateTransition transition) {
// The region must be in merging_new state, and the daughters must be
// merging. To check RPC retry, we use server holding info.
if (current != null && !current.isMergingNewOnServer(serverName)) {
return hri.getShortNameToLog() + " is not merging on " + serverName;
}
final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
if (rs_a == null || !rs_a.isMergingOnServer(serverName)
|| rs_b == null || !rs_b.isMergingOnServer(serverName)) {
return "Some daughter is not known to be merging on " + serverName
+ ", a=" + rs_a + ", b=" + rs_b;
}
// Master could have restarted and lost the new region state
if (current == null) {
regionStates.createRegionState(
hri, State.MERGING_NEW, serverName, null);
}
// Just return in case of retrying
if (regionStates.isRegionOnServer(hri, serverName)) {
return null;
}
try {
regionStates.mergeRegions(hri, a, b, serverName);
} catch (IOException ioe) {
LOG.info("Failed to record merged region " + hri.getShortNameToLog());
return "Failed to record the merging in meta";
}
return null;
}
private String onRegionMerged(final RegionState current, final HRegionInfo hri,
final ServerName serverName, final RegionStateTransition transition) {
// The region must be in merging_new state, and the daughters must be
// merging on this server.
// If current state is already opened on the same server,
// it could be a reportRegionTransition RPC retry.
if (current == null || !current.isMergingNewOrOpenedOnServer(serverName)) {
return hri.getShortNameToLog() + " is not merging on " + serverName;
}
// Just return in case of retrying
if (current.isOpened()) {
return null;
}
final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
if (rs_a == null || !rs_a.isMergingOnServer(serverName)
|| rs_b == null || !rs_b.isMergingOnServer(serverName)) {
return "Some daughter is not known to be merging on " + serverName
+ ", a=" + rs_a + ", b=" + rs_b;
}
regionOffline(a, State.MERGED);
regionOffline(b, State.MERGED);
regionOnline(hri, serverName, 1);
// User could disable the table before master knows the new region.
if (getTableStateManager().isTableState(hri.getTable(),
TableState.State.DISABLED, TableState.State.DISABLING)) {
invokeUnAssign(hri);
} else {
Callable<Object> mergeReplicasCallable = new Callable<Object>() {
@Override
public Object call() {
doMergingOfReplicas(hri, a, b);
return null;
}
};
threadPoolExecutorService.submit(mergeReplicasCallable);
}
return null;
}
private String onRegionMergeReverted(final RegionState current, final HRegionInfo hri,
final ServerName serverName, final RegionStateTransition transition) {
// The region must be in merging_new state, and the daughters must be
// merging on this server.
// If the region is in offline state, it could be an RPC retry.
if (current == null || !current.isMergingNewOrOfflineOnServer(serverName)) {
return hri.getShortNameToLog() + " is not merging on " + serverName;
}
// Just return in case of retrying
if (current.isOffline()) {
return null;
}
final HRegionInfo a = HRegionInfo.convert(transition.getRegionInfo(1));
final HRegionInfo b = HRegionInfo.convert(transition.getRegionInfo(2));
RegionState rs_a = regionStates.getRegionState(a);
RegionState rs_b = regionStates.getRegionState(b);
if (rs_a == null || !rs_a.isMergingOnServer(serverName)
|| rs_b == null || !rs_b.isMergingOnServer(serverName)) {
return "Some daughter is not known to be merging on " + serverName
+ ", a=" + rs_a + ", b=" + rs_b;
}
regionOnline(a, serverName);
regionOnline(b, serverName);
regionOffline(hri);
if (getTableStateManager().isTableState(hri.getTable(),
TableState.State.DISABLED, TableState.State.DISABLING)) {
invokeUnAssign(a);
invokeUnAssign(b);
}
return null;
}
private void doMergingOfReplicas(HRegionInfo mergedHri, final HRegionInfo hri_a,
final HRegionInfo hri_b) {
// Close replicas for the original unmerged regions. create/assign new replicas
// for the merged parent.
List<HRegionInfo> unmergedRegions = new ArrayList<HRegionInfo>();
unmergedRegions.add(hri_a);
unmergedRegions.add(hri_b);
Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(unmergedRegions);
Collection<List<HRegionInfo>> c = map.values();
for (List<HRegionInfo> l : c) {
for (HRegionInfo h : l) {
if (!RegionReplicaUtil.isDefaultReplica(h)) {
LOG.debug("Unassigning un-merged replica " + h);
unassign(h);
}
}
}
int numReplicas = 1;
try {
numReplicas = ((MasterServices)server).getTableDescriptors().get(mergedHri.getTable()).
getRegionReplication();
} catch (IOException e) {
LOG.warn("Couldn't get the replication attribute of the table " + mergedHri.getTable() +
" due to " + e.getMessage() + ". The assignment of replicas for the merged region " +
"will not be done");
}
List<HRegionInfo> regions = new ArrayList<HRegionInfo>();
for (int i = 1; i < numReplicas; i++) {
regions.add(RegionReplicaUtil.getRegionInfoForReplica(mergedHri, i));
}
try {
assign(regions);
} catch (IOException ioe) {
LOG.warn("Couldn't assign all replica(s) of region " + mergedHri + " because of " +
ioe.getMessage());
} catch (InterruptedException ie) {
LOG.warn("Couldn't assign all replica(s) of region " + mergedHri+ " because of " +
ie.getMessage());
}
}
private void doSplittingOfReplicas(final HRegionInfo parentHri, final HRegionInfo hri_a,
final HRegionInfo hri_b) {
// create new regions for the replica, and assign them to match with the
// current replica assignments. If replica1 of parent is assigned to RS1,
// the replica1s of daughters will be on the same machine
int numReplicas = 1;
try {
numReplicas = ((MasterServices)server).getTableDescriptors().get(parentHri.getTable()).
getRegionReplication();
} catch (IOException e) {
LOG.warn("Couldn't get the replication attribute of the table " + parentHri.getTable() +
" due to " + e.getMessage() + ". The assignment of daughter replicas " +
"replicas will not be done");
}
// unassign the old replicas
List<HRegionInfo> parentRegion = new ArrayList<HRegionInfo>();
parentRegion.add(parentHri);
Map<ServerName, List<HRegionInfo>> currentAssign =
regionStates.getRegionAssignments(parentRegion);
Collection<List<HRegionInfo>> c = currentAssign.values();
for (List<HRegionInfo> l : c) {
for (HRegionInfo h : l) {
if (!RegionReplicaUtil.isDefaultReplica(h)) {
LOG.debug("Unassigning parent's replica " + h);
unassign(h);
}
}
}
// assign daughter replicas
Map<HRegionInfo, ServerName> map = new HashMap<HRegionInfo, ServerName>();
for (int i = 1; i < numReplicas; i++) {
prepareDaughterReplicaForAssignment(hri_a, parentHri, i, map);
prepareDaughterReplicaForAssignment(hri_b, parentHri, i, map);
}
try {
assign(map);
} catch (IOException e) {
LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
} catch (InterruptedException e) {
LOG.warn("Caught exception " + e + " while trying to assign replica(s) of daughter(s)");
}
}
private void prepareDaughterReplicaForAssignment(HRegionInfo daughterHri, HRegionInfo parentHri,
int replicaId, Map<HRegionInfo, ServerName> map) {
HRegionInfo parentReplica = RegionReplicaUtil.getRegionInfoForReplica(parentHri, replicaId);
HRegionInfo daughterReplica = RegionReplicaUtil.getRegionInfoForReplica(daughterHri,
replicaId);
LOG.debug("Created replica region for daughter " + daughterReplica);
ServerName sn;
if ((sn = regionStates.getRegionServerOfRegion(parentReplica)) != null) {
map.put(daughterReplica, sn);
} else {
List<ServerName> servers = serverManager.getOnlineServersList();
sn = servers.get((new Random(System.currentTimeMillis())).nextInt(servers.size()));
map.put(daughterReplica, sn);
}
}
public Set<HRegionInfo> getReplicasToClose() {
return replicasToClose;
}
/**
* A region is offline. The new state should be the specified one,
* if not null. If the specified state is null, the new state is Offline.
* The specified state can be Split/Merged/Offline/null only.
*/
private void regionOffline(final HRegionInfo regionInfo, final State state) {
regionStates.regionOffline(regionInfo, state);
removeClosedRegion(regionInfo);
// remove the region plan as well just in case.
clearRegionPlan(regionInfo);
balancer.regionOffline(regionInfo);
// Tell our listeners that a region was closed
sendRegionClosedNotification(regionInfo);
// also note that all the replicas of the primary should be closed
if (state != null && state.equals(State.SPLIT)) {
Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
c.add(regionInfo);
Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
Collection<List<HRegionInfo>> allReplicas = map.values();
for (List<HRegionInfo> list : allReplicas) {
replicasToClose.addAll(list);
}
}
else if (state != null && state.equals(State.MERGED)) {
Collection<HRegionInfo> c = new ArrayList<HRegionInfo>(1);
c.add(regionInfo);
Map<ServerName, List<HRegionInfo>> map = regionStates.getRegionAssignments(c);
Collection<List<HRegionInfo>> allReplicas = map.values();
for (List<HRegionInfo> list : allReplicas) {
replicasToClose.addAll(list);
}
}
}
private void sendRegionOpenedNotification(final HRegionInfo regionInfo,
final ServerName serverName) {
if (!this.listeners.isEmpty()) {
for (AssignmentListener listener : this.listeners) {
listener.regionOpened(regionInfo, serverName);
}
}
}
private void sendRegionClosedNotification(final HRegionInfo regionInfo) {
if (!this.listeners.isEmpty()) {
for (AssignmentListener listener : this.listeners) {
listener.regionClosed(regionInfo);
}
}
}
/**
* Try to update some region states. If the state machine prevents
* such update, an error message is returned to explain the reason.
*
* It's expected that in each transition there should have just one
* region for opening/closing, 3 regions for splitting/merging.
* These regions should be on the server that requested the change.
*
* Region state machine. Only these transitions
* are expected to be triggered by a region server.
*
* On the state transition:
* (1) Open/Close should be initiated by master
* (a) Master sets the region to pending_open/pending_close
* in memory and hbase:meta after sending the request
* to the region server
* (b) Region server reports back to the master
* after open/close is done (either success/failure)
* (c) If region server has problem to report the status
* to master, it must be because the master is down or some
* temporary network issue. Otherwise, the region server should
* abort since it must be a bug. If the master is not accessible,
* the region server should keep trying until the server is
* stopped or till the status is reported to the (new) master
* (d) If region server dies in the middle of opening/closing
* a region, SSH picks it up and finishes it
* (e) If master dies in the middle, the new master recovers
* the state during initialization from hbase:meta. Region server
* can report any transition that has not been reported to
* the previous active master yet
* (2) Split/merge is initiated by region servers
* (a) To split a region, a region server sends a request
* to master to try to set a region to splitting, together with
* two daughters (to be created) to splitting new. If approved
* by the master, the splitting can then move ahead
* (b) To merge two regions, a region server sends a request to
* master to try to set the new merged region (to be created) to
* merging_new, together with two regions (to be merged) to merging.
* If it is ok with the master, the merge can then move ahead
* (c) Once the splitting/merging is done, the region server
* reports the status back to the master either success/failure.
* (d) Other scenarios should be handled similarly as for
* region open/close
*/
protected String onRegionTransition(final ServerName serverName,
final RegionStateTransition transition) {
TransitionCode code = transition.getTransitionCode();
HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
Lock lock = locker.acquireLock(hri.getEncodedName());
try {
RegionState current = regionStates.getRegionState(hri);
if (LOG.isDebugEnabled()) {
LOG.debug("Got transition " + code + " for "
+ (current != null ? current.toString() : hri.getShortNameToLog())
+ " from " + serverName);
}
String errorMsg = null;
switch (code) {
case OPENED:
errorMsg = onRegionOpen(current, hri, serverName, transition);
break;
case FAILED_OPEN:
errorMsg = onRegionFailedOpen(current, hri, serverName);
break;
case CLOSED:
errorMsg = onRegionClosed(current, hri, serverName);
break;
case READY_TO_SPLIT:
try {
regionStateListener.onRegionSplit(hri);
errorMsg = onRegionReadyToSplit(current, hri, serverName, transition);
} catch (IOException exp) {
if (exp instanceof QuotaExceededException) {
server.getRegionNormalizer().planSkipped(hri, PlanType.SPLIT);
}
errorMsg = StringUtils.stringifyException(exp);
}
break;
case SPLIT_PONR:
errorMsg = onRegionSplitPONR(current, hri, serverName, transition);
break;
case SPLIT:
errorMsg = onRegionSplit(current, hri, serverName, transition);
break;
case SPLIT_REVERTED:
errorMsg = onRegionSplitReverted(current, hri, serverName, transition);
if (org.apache.commons.lang.StringUtils.isEmpty(errorMsg)) {
try {
regionStateListener.onRegionSplitReverted(hri);
} catch (IOException exp) {
LOG.warn(StringUtils.stringifyException(exp));
}
}
break;
case READY_TO_MERGE:
errorMsg = onRegionReadyToMerge(current, hri, serverName, transition);
break;
case MERGE_PONR:
errorMsg = onRegionMergePONR(current, hri, serverName, transition);
break;
case MERGED:
try {
errorMsg = onRegionMerged(current, hri, serverName, transition);
regionStateListener.onRegionMerged(hri);
} catch (IOException exp) {
errorMsg = StringUtils.stringifyException(exp);
}
break;
case MERGE_REVERTED:
errorMsg = onRegionMergeReverted(current, hri, serverName, transition);
break;
default:
errorMsg = "Unexpected transition code " + code;
}
if (errorMsg != null) {
LOG.info("Could not transition region from " + current + " on "
+ code + " by " + serverName + ": " + errorMsg);
}
return errorMsg;
} finally {
lock.unlock();
}
}
private void processBogusAssignments(Map<ServerName, List<HRegionInfo>> bulkPlan) {
if (bulkPlan.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
// Found no plan for some regions, put those regions in RIT
for (HRegionInfo hri : bulkPlan.get(LoadBalancer.BOGUS_SERVER_NAME)) {
regionStates.updateRegionState(hri, State.FAILED_OPEN);
}
bulkPlan.remove(LoadBalancer.BOGUS_SERVER_NAME);
}
}
/**
* @return Instance of load balancer
*/
public LoadBalancer getBalancer() {
return this.balancer;
}
public Map<ServerName, List<HRegionInfo>>
getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
return getRegionStates().getRegionAssignments(infos);
}
void setRegionStateListener(RegionStateListener listener) {
this.regionStateListener = listener;
}
}