blob: cffbd164303d11d4698da3e60727eb3022ae5178 [file] [log] [blame]
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.ArrayList;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.Collections;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionHistorian;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.HMsg;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.util.Writables;
/**
* Class to manage assigning regions to servers, state of root and meta, etc.
*/
class RegionManager implements HConstants {
protected static final Log LOG = LogFactory.getLog(RegionManager.class);
private AtomicReference<HServerAddress> rootRegionLocation =
new AtomicReference<HServerAddress>(null);
private volatile boolean safeMode = true;
final Lock splitLogLock = new ReentrantLock();
private final RootScanner rootScannerThread;
final MetaScanner metaScannerThread;
/** Set by root scanner to indicate the number of meta regions */
private final AtomicInteger numberOfMetaRegions = new AtomicInteger();
/** These are the online meta regions */
private final NavigableMap<byte [], MetaRegion> onlineMetaRegions =
new ConcurrentSkipListMap<byte [], MetaRegion>(Bytes.BYTES_COMPARATOR);
private static final byte[] OVERLOADED = Bytes.toBytes("Overloaded");
/**
* Map of region name to RegionState for regions that are in transition such as
*
* unassigned -> pendingOpen -> open
* closing -> pendingClose -> closed; if (closed && !offline) -> unassigned
*
* At the end of a transition, removeRegion is used to remove the region from
* the map (since it is no longer in transition)
*
* Note: Needs to be SortedMap so we can specify a comparator
*
* @see RegionState inner-class below
*/
private final SortedMap<byte[], RegionState> regionsInTransition =
Collections.synchronizedSortedMap(
new TreeMap<byte[], RegionState>(Bytes.BYTES_COMPARATOR));
// How many regions to assign a server at a time.
private final int maxAssignInOneGo;
private final HMaster master;
private final RegionHistorian historian;
private final float slop;
/** Set of regions to split. */
private final SortedMap<byte[], Pair<HRegionInfo,HServerAddress>> regionsToSplit =
Collections.synchronizedSortedMap(
new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
(Bytes.BYTES_COMPARATOR));
/** Set of regions to compact. */
private final SortedMap<byte[], Pair<HRegionInfo,HServerAddress>> regionsToCompact =
Collections.synchronizedSortedMap(
new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
(Bytes.BYTES_COMPARATOR));
/** Set of regions to major compact. */
private final SortedMap<byte[], Pair<HRegionInfo,HServerAddress>> regionsToMajorCompact =
Collections.synchronizedSortedMap(
new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
(Bytes.BYTES_COMPARATOR));
/** Set of regions to flush. */
private final SortedMap<byte[], Pair<HRegionInfo,HServerAddress>> regionsToFlush =
Collections.synchronizedSortedMap(
new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
(Bytes.BYTES_COMPARATOR));
RegionManager(HMaster master) {
this.master = master;
this.historian = RegionHistorian.getInstance();
this.maxAssignInOneGo = this.master.getConfiguration().
getInt("hbase.regions.percheckin", 10);
this.slop = this.master.getConfiguration().getFloat("hbase.regions.slop",
(float)0.1);
// The root region
rootScannerThread = new RootScanner(master);
// Scans the meta table
metaScannerThread = new MetaScanner(master);
reassignRootRegion();
}
void start() {
Threads.setDaemonThreadRunning(rootScannerThread,
"RegionManager.rootScanner");
Threads.setDaemonThreadRunning(metaScannerThread,
"RegionManager.metaScanner");
}
void unsetRootRegion() {
synchronized (regionsInTransition) {
rootRegionLocation.set(null);
regionsInTransition.remove(HRegionInfo.ROOT_REGIONINFO.getRegionName());
}
}
void reassignRootRegion() {
unsetRootRegion();
if (!master.shutdownRequested) {
synchronized (regionsInTransition) {
RegionState s = new RegionState(HRegionInfo.ROOT_REGIONINFO);
s.setUnassigned();
regionsInTransition.put(HRegionInfo.ROOT_REGIONINFO.getRegionName(), s);
}
}
}
/*
* Assigns regions to region servers attempting to balance the load across
* all region servers. Note that no synchronization is necessary as the caller
* (ServerManager.processMsgs) already owns the monitor for the RegionManager.
*
* @param info
* @param serverName
* @param returnMsgs
*/
void assignRegions(HServerInfo info, String serverName,
HRegionInfo[] mostLoadedRegions, ArrayList<HMsg> returnMsgs) {
HServerLoad thisServersLoad = info.getLoad();
// figure out what regions need to be assigned and aren't currently being
// worked on elsewhere.
Set<RegionState> regionsToAssign = regionsAwaitingAssignment();
if (regionsToAssign.size() == 0) {
// There are no regions waiting to be assigned.
if (!inSafeMode()) {
// We only do load balancing once all regions are assigned.
// This prevents churn while the cluster is starting up.
double avgLoad = master.serverManager.getAverageLoad();
double avgLoadWithSlop = avgLoad +
((this.slop != 0)? avgLoad * this.slop: avgLoad);
if (avgLoad > 2.0 &&
thisServersLoad.getNumberOfRegions() > avgLoadWithSlop) {
if (LOG.isDebugEnabled()) {
LOG.debug("Server " + serverName +
" is overloaded. Server load: " +
thisServersLoad.getNumberOfRegions() + " avg: " + avgLoad +
", slop: " + this.slop);
}
unassignSomeRegions(serverName, thisServersLoad,
avgLoad, mostLoadedRegions, returnMsgs);
}
}
} else {
// if there's only one server, just give it all the regions
if (master.serverManager.numServers() == 1) {
assignRegionsToOneServer(regionsToAssign, serverName, returnMsgs);
} else {
// otherwise, give this server a few regions taking into account the
// load of all the other servers.
assignRegionsToMultipleServers(thisServersLoad, regionsToAssign,
serverName, returnMsgs);
}
}
}
/*
* Make region assignments taking into account multiple servers' loads.
*
* Note that no synchronization is needed while we iterate over
* regionsInTransition because this method is only called by assignRegions
* whose caller owns the monitor for RegionManager
*/
private void assignRegionsToMultipleServers(final HServerLoad thisServersLoad,
final Set<RegionState> regionsToAssign, final String serverName,
final ArrayList<HMsg> returnMsgs) {
int nRegionsToAssign = regionsToAssign.size();
int nregions = regionsPerServer(nRegionsToAssign, thisServersLoad);
nRegionsToAssign -= nregions;
if (nRegionsToAssign > 0) {
// We still have more regions to assign. See how many we can assign
// before this server becomes more heavily loaded than the next
// most heavily loaded server.
HServerLoad heavierLoad = new HServerLoad();
int nservers = computeNextHeaviestLoad(thisServersLoad, heavierLoad);
nregions = 0;
// Advance past any less-loaded servers
for (HServerLoad load = new HServerLoad(thisServersLoad);
load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign;
load.setNumberOfRegions(load.getNumberOfRegions() + 1), nregions++) {
// continue;
}
if (nregions < nRegionsToAssign) {
// There are some more heavily loaded servers
// but we can't assign all the regions to this server.
if (nservers > 0) {
// There are other servers that can share the load.
// Split regions that need assignment across the servers.
nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
/ (1.0 * nservers));
} else {
// No other servers with same load.
// Split regions over all available servers
nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
/ (1.0 * master.serverManager.numServers()));
}
} else {
// Assign all regions to this server
nregions = nRegionsToAssign;
}
if (nregions > this.maxAssignInOneGo) {
nregions = this.maxAssignInOneGo;
}
for (RegionState s: regionsToAssign) {
LOG.info("assigning region " + Bytes.toString(s.getRegionName())+
" to server " + serverName);
s.setPendingOpen(serverName);
this.historian.addRegionAssignment(s.getRegionInfo(), serverName);
returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_OPEN, s.getRegionInfo()));
if (--nregions <= 0) {
break;
}
}
}
}
/*
* @param nRegionsToAssign
* @param thisServersLoad
* @return How many regions we can assign to more lightly loaded servers
*/
private int regionsPerServer(final int numUnassignedRegions,
final HServerLoad thisServersLoad) {
SortedMap<HServerLoad, Set<String>> lightServers =
new TreeMap<HServerLoad, Set<String>>();
// Get all the servers who are more lightly loaded than this one.
synchronized (master.serverManager.loadToServers) {
lightServers.putAll(master.serverManager.loadToServers.headMap(thisServersLoad));
}
// Examine the list of servers that are more lightly loaded than this one.
// Pretend that we will assign regions to these more lightly loaded servers
// until they reach load equal with ours. Then, see how many regions are left
// unassigned. That is how many regions we should assign to this server.
int nRegions = 0;
for (Map.Entry<HServerLoad, Set<String>> e : lightServers.entrySet()) {
HServerLoad lightLoad = new HServerLoad(e.getKey());
do {
lightLoad.setNumberOfRegions(lightLoad.getNumberOfRegions() + 1);
nRegions += 1;
} while (lightLoad.compareTo(thisServersLoad) <= 0
&& nRegions < numUnassignedRegions);
nRegions *= e.getValue().size();
if (nRegions >= numUnassignedRegions) {
break;
}
}
return nRegions;
}
/*
* Get the set of regions that should be assignable in this pass.
*
* Note that no synchronization on regionsInTransition is needed because the
* only caller (assignRegions, whose caller is ServerManager.processMsgs) owns
* the monitor for RegionManager
*/
private Set<RegionState> regionsAwaitingAssignment() {
// set of regions we want to assign to this server
Set<RegionState> regionsToAssign = new HashSet<RegionState>();
// Look over the set of regions that aren't currently assigned to
// determine which we should assign to this server.
for (RegionState s: regionsInTransition.values()) {
HRegionInfo i = s.getRegionInfo();
if (i == null) {
continue;
}
if (numberOfMetaRegions.get() != onlineMetaRegions.size() &&
!i.isMetaRegion()) {
// Can't assign user regions until all meta regions have been assigned
// and are on-line
continue;
}
if (s.isUnassigned()) {
regionsToAssign.add(s);
}
}
return regionsToAssign;
}
/*
* Figure out the load that is next highest amongst all regionservers. Also,
* return how many servers exist at that load.
*/
private int computeNextHeaviestLoad(HServerLoad referenceLoad,
HServerLoad heavierLoad) {
SortedMap<HServerLoad, Set<String>> heavyServers =
new TreeMap<HServerLoad, Set<String>>();
synchronized (master.serverManager.loadToServers) {
heavyServers.putAll(
master.serverManager.loadToServers.tailMap(referenceLoad));
}
int nservers = 0;
for (Map.Entry<HServerLoad, Set<String>> e : heavyServers.entrySet()) {
Set<String> servers = e.getValue();
nservers += servers.size();
if (e.getKey().compareTo(referenceLoad) == 0) {
// This is the load factor of the server we are considering
nservers -= 1;
continue;
}
// If we get here, we are at the first load entry that is a
// heavier load than the server we are considering
heavierLoad.setNumberOfRequests(e.getKey().getNumberOfRequests());
heavierLoad.setNumberOfRegions(e.getKey().getNumberOfRegions());
break;
}
return nservers;
}
/*
* Assign all to the only server. An unlikely case but still possible.
*
* Note that no synchronization is needed on regionsInTransition while
* iterating on it because the only caller is assignRegions whose caller owns
* the monitor for RegionManager
*
* @param regionsToAssign
* @param serverName
* @param returnMsgs
*/
private void assignRegionsToOneServer(final Set<RegionState> regionsToAssign,
final String serverName, final ArrayList<HMsg> returnMsgs) {
for (RegionState s: regionsToAssign) {
LOG.info("assigning region " + Bytes.toString(s.getRegionName()) +
" to the only server " + serverName);
s.setPendingOpen(serverName);
this.historian.addRegionAssignment(s.getRegionInfo(), serverName);
returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_OPEN, s.getRegionInfo()));
}
}
/*
* The server checking in right now is overloaded. We will tell it to close
* some or all of its most loaded regions, allowing it to reduce its load.
* The closed regions will then get picked up by other underloaded machines.
*
* Note that no synchronization is needed because the only caller
* (assignRegions) whose caller owns the monitor for RegionManager
*/
private void unassignSomeRegions(final String serverName,
final HServerLoad load, final double avgLoad,
final HRegionInfo[] mostLoadedRegions, ArrayList<HMsg> returnMsgs) {
int numRegionsToClose = load.getNumberOfRegions() - (int)Math.ceil(avgLoad);
LOG.debug("Choosing to reassign " + numRegionsToClose
+ " regions. mostLoadedRegions has " + mostLoadedRegions.length
+ " regions in it.");
int regionIdx = 0;
int regionsClosed = 0;
int skipped = 0;
while (regionsClosed < numRegionsToClose &&
regionIdx < mostLoadedRegions.length) {
HRegionInfo currentRegion = mostLoadedRegions[regionIdx];
regionIdx++;
// skip the region if it's meta or root
if (currentRegion.isRootRegion() || currentRegion.isMetaTable()) {
continue;
}
byte[] regionName = currentRegion.getRegionName();
if (regionIsInTransition(regionName)) {
skipped++;
continue;
}
LOG.debug("Going to close region " +
currentRegion.getRegionNameAsString());
// make a message to close the region
returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_CLOSE, currentRegion,
OVERLOADED));
// mark the region as closing
setClosing(serverName, currentRegion, false);
setPendingClose(currentRegion.getRegionName());
// increment the count of regions we've marked
regionsClosed++;
}
LOG.info("Skipped " + skipped + " region(s) that are in transition states");
}
/**
* @return Read-only map of online regions.
*/
public Map<byte [], MetaRegion> getOnlineMetaRegions() {
synchronized (onlineMetaRegions) {
return Collections.unmodifiableMap(onlineMetaRegions);
}
}
/**
* Stop the root and meta scanners so that the region servers serving meta
* regions can shut down.
*/
public void stopScanners() {
if (LOG.isDebugEnabled()) {
LOG.debug("telling root scanner to stop");
}
rootScannerThread.interruptIfAlive();
if (LOG.isDebugEnabled()) {
LOG.debug("telling meta scanner to stop");
}
metaScannerThread.interruptIfAlive();
if (LOG.isDebugEnabled()) {
LOG.debug("meta and root scanners notified");
}
}
/** Stop the region assigner */
public void stop() {
try {
if (rootScannerThread.isAlive()) {
rootScannerThread.join(); // Wait for the root scanner to finish.
}
} catch (Exception iex) {
LOG.warn("root scanner", iex);
}
try {
if (metaScannerThread.isAlive()) {
metaScannerThread.join(); // Wait for meta scanner to finish.
}
} catch(Exception iex) {
LOG.warn("meta scanner", iex);
}
}
/**
* Block until meta regions are online or we're shutting down.
* @return true if we found meta regions, false if we're closing.
*/
public boolean areAllMetaRegionsOnline() {
synchronized (onlineMetaRegions) {
return (rootRegionLocation.get() != null &&
numberOfMetaRegions.get() == onlineMetaRegions.size());
}
}
/**
* Search our map of online meta regions to find the first meta region that
* should contain a pointer to <i>newRegion</i>.
* @param newRegion
* @return MetaRegion where the newRegion should live
*/
public MetaRegion getFirstMetaRegionForRegion(HRegionInfo newRegion) {
synchronized (onlineMetaRegions) {
if (onlineMetaRegions.size() == 0) {
return null;
} else if (onlineMetaRegions.size() == 1) {
return onlineMetaRegions.get(onlineMetaRegions.firstKey());
} else {
if (onlineMetaRegions.containsKey(newRegion.getRegionName())) {
return onlineMetaRegions.get(newRegion.getRegionName());
}
return onlineMetaRegions.get(onlineMetaRegions.headMap(
newRegion.getTableDesc().getName()).lastKey());
}
}
}
/**
* Get a set of all the meta regions that contain info about a given table.
* @param tableName Table you need to know all the meta regions for
* @return set of MetaRegion objects that contain the table
* @throws NotAllMetaRegionsOnlineException
*/
public Set<MetaRegion> getMetaRegionsForTable(byte [] tableName)
throws NotAllMetaRegionsOnlineException {
byte [] firstMetaRegion = null;
Set<MetaRegion> metaRegions = new HashSet<MetaRegion>();
if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
if (rootRegionLocation.get() == null) {
throw new NotAllMetaRegionsOnlineException(
Bytes.toString(HConstants.ROOT_TABLE_NAME));
}
metaRegions.add(new MetaRegion(rootRegionLocation.get(),
HRegionInfo.ROOT_REGIONINFO.getRegionName()));
} else {
if (!areAllMetaRegionsOnline()) {
throw new NotAllMetaRegionsOnlineException();
}
synchronized (onlineMetaRegions) {
if (onlineMetaRegions.size() == 1) {
firstMetaRegion = onlineMetaRegions.firstKey();
} else if (onlineMetaRegions.containsKey(tableName)) {
firstMetaRegion = tableName;
} else {
firstMetaRegion = onlineMetaRegions.headMap(tableName).lastKey();
}
metaRegions.addAll(onlineMetaRegions.tailMap(firstMetaRegion).values());
}
}
return metaRegions;
}
/**
* Get metaregion that would host passed in row.
* @param row Row need to know all the meta regions for
* @return set of MetaRegion objects that contain the table
* @throws NotAllMetaRegionsOnlineException
*/
public MetaRegion getMetaRegionForRow(final byte [] row)
throws NotAllMetaRegionsOnlineException {
if (!areAllMetaRegionsOnline()) {
throw new NotAllMetaRegionsOnlineException();
}
return this.onlineMetaRegions.floorEntry(row).getValue();
}
/**
* Create a new HRegion, put a row for it into META (or ROOT), and mark the
* new region unassigned so that it will get assigned to a region server.
* @param newRegion HRegionInfo for the region to create
* @param server server hosting the META (or ROOT) region where the new
* region needs to be noted
* @param metaRegionName name of the meta region where new region is to be
* written
* @throws IOException
*/
public void createRegion(HRegionInfo newRegion, HRegionInterface server,
byte [] metaRegionName)
throws IOException {
// 2. Create the HRegion
HRegion region = HRegion.createHRegion(newRegion, master.rootdir,
master.getConfiguration());
// 3. Insert into meta
HRegionInfo info = region.getRegionInfo();
byte [] regionName = region.getRegionName();
BatchUpdate b = new BatchUpdate(regionName);
b.put(COL_REGIONINFO, Writables.getBytes(info));
server.batchUpdate(metaRegionName, b, -1L);
// 4. Close the new region to flush it to disk. Close its log file too.
region.close();
region.getLog().closeAndDelete();
// 5. Get it assigned to a server
setUnassigned(info, true);
}
/**
* Set a MetaRegion as online.
* @param metaRegion
*/
public void putMetaRegionOnline(MetaRegion metaRegion) {
onlineMetaRegions.put(metaRegion.getStartKey(), metaRegion);
}
/**
* Get a list of online MetaRegions
* @return list of MetaRegion objects
*/
public List<MetaRegion> getListOfOnlineMetaRegions() {
List<MetaRegion> regions = null;
synchronized(onlineMetaRegions) {
regions = new ArrayList<MetaRegion>(onlineMetaRegions.values());
}
return regions;
}
/**
* Count of online meta regions
* @return count of online meta regions
*/
public int numOnlineMetaRegions() {
return onlineMetaRegions.size();
}
/**
* Check if a meta region is online by its name
* @param startKey name of the meta region to check
* @return true if the region is online, false otherwise
*/
public boolean isMetaRegionOnline(byte [] startKey) {
return onlineMetaRegions.containsKey(startKey);
}
/**
* Set an online MetaRegion offline - remove it from the map.
* @param startKey region name
*/
public void offlineMetaRegion(byte [] startKey) {
onlineMetaRegions.remove(startKey);
}
/**
* Remove a region from the region state map.
*
* @param info
*/
public void removeRegion(HRegionInfo info) {
regionsInTransition.remove(info.getRegionName());
}
/**
* @param regionName
* @return true if the named region is in a transition state
*/
public boolean regionIsInTransition(byte[] regionName) {
return regionsInTransition.containsKey(regionName);
}
/**
* @param regionName
* @return true if the region is unassigned, pendingOpen or open
*/
public boolean regionIsOpening(byte[] regionName) {
RegionState state = regionsInTransition.get(regionName);
if (state != null) {
return state.isOpening();
}
return false;
}
/**
* Set a region to unassigned
* @param info Region to set unassigned
* @param force if true mark region unassigned whatever its current state
*/
public void setUnassigned(HRegionInfo info, boolean force) {
synchronized(this.regionsInTransition) {
RegionState s = regionsInTransition.get(info.getRegionName());
if (s == null) {
s = new RegionState(info);
regionsInTransition.put(info.getRegionName(), s);
}
if (force || (!s.isPendingOpen() && !s.isOpen())) {
s.setUnassigned();
}
}
}
/**
* Check if a region is on the unassigned list
* @param info HRegionInfo to check for
* @return true if on the unassigned list, false if it isn't. Note that this
* means a region could not be on the unassigned list AND not be assigned, if
* it happens to be between states.
*/
public boolean isUnassigned(HRegionInfo info) {
synchronized (regionsInTransition) {
RegionState s = regionsInTransition.get(info.getRegionName());
if (s != null) {
return s.isUnassigned();
}
}
return false;
}
/**
* Check if a region has been assigned and we're waiting for a response from
* the region server.
*
* @param regionName name of the region
* @return true if open, false otherwise
*/
public boolean isPendingOpen(byte [] regionName) {
synchronized (regionsInTransition) {
RegionState s = regionsInTransition.get(regionName);
if (s != null) {
return s.isPendingOpen();
}
}
return false;
}
/**
* Region has been assigned to a server and the server has told us it is open
* @param regionName
*/
public void setOpen(byte [] regionName) {
synchronized (regionsInTransition) {
RegionState s = regionsInTransition.get(regionName);
if (s != null) {
s.setOpen();
}
}
}
/**
* @param regionName
* @return true if region is marked to be offlined.
*/
public boolean isOfflined(byte[] regionName) {
synchronized (regionsInTransition) {
RegionState s = regionsInTransition.get(regionName);
if (s != null) {
return s.isOfflined();
}
}
return false;
}
/**
* Mark a region as closing
* @param serverName
* @param regionInfo
* @param setOffline
*/
public void setClosing(final String serverName, final HRegionInfo regionInfo,
final boolean setOffline) {
synchronized (this.regionsInTransition) {
RegionState s = this.regionsInTransition.get(regionInfo.getRegionName());
if (s == null) {
s = new RegionState(regionInfo);
}
s.setClosing(serverName, setOffline);
this.regionsInTransition.put(regionInfo.getRegionName(), s);
}
}
/**
* Remove the map of region names to region infos waiting to be offlined for a
* given server
*
* @param serverName
* @return set of infos to close
*/
public Set<HRegionInfo> getMarkedToClose(String serverName) {
Set<HRegionInfo> result = new HashSet<HRegionInfo>();
synchronized (regionsInTransition) {
for (RegionState s: regionsInTransition.values()) {
if (s.isClosing() && !s.isPendingClose() && !s.isClosed() &&
s.getServerName().compareTo(serverName) == 0) {
result.add(s.getRegionInfo());
}
}
}
return result;
}
/**
* Called when we have told a region server to close the region
*
* @param regionName
*/
public void setPendingClose(byte[] regionName) {
synchronized (regionsInTransition) {
RegionState s = regionsInTransition.get(regionName);
if (s != null) {
s.setPendingClose();
}
}
}
/**
* @param regionName
*/
public void setClosed(byte[] regionName) {
synchronized (regionsInTransition) {
RegionState s = regionsInTransition.get(regionName);
if (s != null) {
s.setClosed();
}
}
}
/**
* Add a meta region to the scan queue
* @param m MetaRegion that needs to get scanned
*/
public void addMetaRegionToScan(MetaRegion m) {
metaScannerThread.addMetaRegionToScan(m);
}
/**
* Check if the initial root scan has been completed.
* @return true if scan completed, false otherwise
*/
public boolean isInitialRootScanComplete() {
return rootScannerThread.isInitialScanComplete();
}
/**
* Check if the initial meta scan has been completed.
* @return true if meta completed, false otherwise
*/
public boolean isInitialMetaScanComplete() {
return metaScannerThread.isInitialScanComplete();
}
/**
* @return true if the initial meta scan is complete and there are no
* unassigned or pending regions
*/
public boolean inSafeMode() {
if (safeMode) {
if(isInitialMetaScanComplete() && regionsInTransition.size() == 0) {
safeMode = false;
LOG.info("exiting safe mode");
} else {
LOG.info("in safe mode");
}
}
return safeMode;
}
/**
* Get the root region location.
* @return HServerAddress describing root region server.
*/
public HServerAddress getRootRegionLocation() {
return rootRegionLocation.get();
}
/**
* Block until either the root region location is available or we're shutting
* down.
*/
public void waitForRootRegionLocation() {
synchronized (rootRegionLocation) {
while (!master.closed.get() && rootRegionLocation.get() == null) {
// rootRegionLocation will be filled in when we get an 'open region'
// regionServerReport message from the HRegionServer that has been
// allocated the ROOT region below.
try {
rootRegionLocation.wait();
} catch (InterruptedException e) {
// continue
}
}
}
}
/**
* Return the number of meta regions.
* @return number of meta regions
*/
public int numMetaRegions() {
return numberOfMetaRegions.get();
}
/**
* Bump the count of meta regions up one
*/
public void incrementNumMetaRegions() {
numberOfMetaRegions.incrementAndGet();
}
/**
* Set the root region location.
* @param address Address of the region server where the root lives
*/
public void setRootRegionLocation(HServerAddress address) {
synchronized (rootRegionLocation) {
rootRegionLocation.set(new HServerAddress(address));
rootRegionLocation.notifyAll();
}
}
/**
* Set the number of meta regions.
* @param num Number of meta regions
*/
public void setNumMetaRegions(int num) {
numberOfMetaRegions.set(num);
}
/**
* @param regionName
* @param info
* @param server
* @param op
*/
public void startAction(byte[] regionName, HRegionInfo info,
HServerAddress server, int op) {
switch (op) {
case HConstants.MODIFY_TABLE_SPLIT:
startAction(regionName, info, server, this.regionsToSplit);
break;
case HConstants.MODIFY_TABLE_COMPACT:
startAction(regionName, info, server, this.regionsToCompact);
break;
case HConstants.MODIFY_TABLE_MAJOR_COMPACT:
startAction(regionName, info, server, this.regionsToMajorCompact);
break;
case HConstants.MODIFY_TABLE_FLUSH:
startAction(regionName, info, server, this.regionsToFlush);
break;
default:
throw new IllegalArgumentException("illegal table action " + op);
}
}
private void startAction(final byte[] regionName, final HRegionInfo info,
final HServerAddress server,
final SortedMap<byte[], Pair<HRegionInfo,HServerAddress>> map) {
map.put(regionName, new Pair<HRegionInfo,HServerAddress>(info, server));
}
/**
* @param regionName
* @param op
*/
public void endAction(byte[] regionName, int op) {
switch (op) {
case HConstants.MODIFY_TABLE_SPLIT:
this.regionsToSplit.remove(regionName);
break;
case HConstants.MODIFY_TABLE_COMPACT:
this.regionsToCompact.remove(regionName);
break;
case HConstants.MODIFY_TABLE_MAJOR_COMPACT:
this.regionsToMajorCompact.remove(regionName);
break;
case HConstants.MODIFY_TABLE_FLUSH:
this.regionsToFlush.remove(regionName);
break;
default:
throw new IllegalArgumentException("illegal table action " + op);
}
}
/**
* @param regionName
*/
public void endActions(byte[] regionName) {
regionsToSplit.remove(regionName);
regionsToCompact.remove(regionName);
}
/**
* Send messages to the given region server asking it to split any
* regions in 'regionsToSplit', etc.
* @param serverInfo
* @param returnMsgs
*/
public void applyActions(HServerInfo serverInfo, ArrayList<HMsg> returnMsgs) {
applyActions(serverInfo, returnMsgs, this.regionsToCompact,
HMsg.Type.MSG_REGION_COMPACT);
applyActions(serverInfo, returnMsgs, this.regionsToSplit,
HMsg.Type.MSG_REGION_SPLIT);
applyActions(serverInfo, returnMsgs, this.regionsToFlush,
HMsg.Type.MSG_REGION_FLUSH);
applyActions(serverInfo, returnMsgs, this.regionsToMajorCompact,
HMsg.Type.MSG_REGION_MAJOR_COMPACT);
}
private void applyActions(final HServerInfo serverInfo,
final ArrayList<HMsg> returnMsgs,
SortedMap<byte[], Pair<HRegionInfo,HServerAddress>> map,
final HMsg.Type msg) {
HServerAddress addr = serverInfo.getServerAddress();
Iterator<Pair<HRegionInfo, HServerAddress>> i =
map.values().iterator();
synchronized (map) {
while (i.hasNext()) {
Pair<HRegionInfo,HServerAddress> pair = i.next();
if (addr.equals(pair.getSecond())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending " + msg + " " + pair.getFirst() + " to " + addr);
}
returnMsgs.add(new HMsg(msg, pair.getFirst()));
i.remove();
}
}
}
}
/*
* State of a Region as it transitions from closed to open, etc. See
* note on regionsInTransition data member above for listing of state
* transitions.
*/
private static class RegionState implements Comparable<RegionState> {
private final HRegionInfo regionInfo;
private volatile boolean unassigned = false;
private volatile boolean pendingOpen = false;
private volatile boolean open = false;
private volatile boolean closing = false;
private volatile boolean pendingClose = false;
private volatile boolean closed = false;
private volatile boolean offlined = false;
/* Set when region is assigned or closing */
private volatile String serverName = null;
/* Constructor */
RegionState(HRegionInfo info) {
this.regionInfo = info;
}
synchronized HRegionInfo getRegionInfo() {
return this.regionInfo;
}
synchronized byte [] getRegionName() {
return this.regionInfo.getRegionName();
}
/*
* @return Server this region was assigned to
*/
synchronized String getServerName() {
return this.serverName;
}
/*
* @return true if the region is being opened
*/
synchronized boolean isOpening() {
return this.unassigned || this.pendingOpen || this.open;
}
/*
* @return true if region is unassigned
*/
synchronized boolean isUnassigned() {
return unassigned;
}
/*
* Note: callers of this method (reassignRootRegion,
* regionsAwaitingAssignment, setUnassigned) ensure that this method is not
* called unless it is safe to do so.
*/
synchronized void setUnassigned() {
this.unassigned = true;
this.pendingOpen = false;
this.open = false;
this.closing = false;
this.pendingClose = false;
this.closed = false;
this.offlined = false;
this.serverName = null;
}
synchronized boolean isPendingOpen() {
return pendingOpen;
}
/*
* @param serverName Server region was assigned to.
*/
synchronized void setPendingOpen(final String serverName) {
if (!this.unassigned) {
throw new IllegalStateException(
"Cannot assign a region that is not currently unassigned. State: " +
toString());
}
this.unassigned = false;
this.pendingOpen = true;
this.open = false;
this.closing = false;
this.pendingClose = false;
this.closed = false;
this.offlined = false;
this.serverName = serverName;
}
synchronized boolean isOpen() {
return open;
}
synchronized void setOpen() {
if (!pendingOpen) {
throw new IllegalStateException(
"Cannot set a region as open if it has not been pending. State: " +
toString());
}
this.unassigned = false;
this.pendingOpen = false;
this.open = true;
this.closing = false;
this.pendingClose = false;
this.closed = false;
this.offlined = false;
}
synchronized boolean isClosing() {
return closing;
}
synchronized void setClosing(String serverName, boolean setOffline) {
this.unassigned = false;
this.pendingOpen = false;
this.open = false;
this.closing = true;
this.pendingClose = false;
this.closed = false;
this.offlined = setOffline;
this.serverName = serverName;
}
synchronized boolean isPendingClose() {
return this.pendingClose;
}
synchronized void setPendingClose() {
if (!closing) {
throw new IllegalStateException(
"Cannot set a region as pending close if it has not been closing. " +
"State: " + toString());
}
this.unassigned = false;
this.pendingOpen = false;
this.open = false;
this.closing = false;
this.pendingClose = true;
this.closed = false;
}
synchronized boolean isClosed() {
return this.closed;
}
synchronized void setClosed() {
if (!pendingClose && !pendingOpen) {
throw new IllegalStateException(
"Cannot set a region to be closed if it was not already marked as" +
" pending close or pending open. State: " + toString());
}
this.unassigned = false;
this.pendingOpen = false;
this.open = false;
this.closing = false;
this.pendingClose = false;
this.closed = true;
}
synchronized boolean isOfflined() {
return this.offlined;
}
@Override
public synchronized String toString() {
return ("name=" + Bytes.toString(getRegionName()) +
", unassigned=" + this.unassigned +
", pendingOpen=" + this.pendingOpen +
", open=" + this.open +
", closing=" + this.closing +
", pendingClose=" + this.pendingClose +
", closed=" + this.closed +
", offlined=" + this.offlined);
}
@Override
public boolean equals(Object o) {
return this.compareTo((RegionState) o) == 0;
}
@Override
public int hashCode() {
return Bytes.toString(getRegionName()).hashCode();
}
@Override
public int compareTo(RegionState o) {
if (o == null) {
return 1;
}
return Bytes.compareTo(getRegionName(), o.getRegionName());
}
}
}