blob: 0e7f26703e0bf4734a29aab1400176cf2296d8f7 [file] [log] [blame]
/**
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.rsgroup;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.net.HostAndPort;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
import org.apache.hadoop.util.ReflectionUtils;
/**
* GroupBasedLoadBalancer, used when Region Server Grouping is configured (HBase-6721)
* It does region balance based on a table's group membership.
*
* Most assignment methods contain two exclusive code paths: Online - when the group
* table is online and Offline - when it is unavailable.
*
* During Offline, assignments are assigned based on cached information in zookeeper.
* If unavailable (ie bootstrap) then regions are assigned randomly.
*
* Once the GROUP table has been assigned, the balancer switches to Online and will then
* start providing appropriate assignments for user tables.
*
*/
@InterfaceAudience.Private
public class RSGroupBasedLoadBalancer implements RSGroupableBalancer, LoadBalancer {
/** Config for pluggable load balancers */
public static final String HBASE_GROUP_LOADBALANCER_CLASS = "hbase.group.grouploadbalancer.class";
private static final Log LOG = LogFactory.getLog(RSGroupBasedLoadBalancer.class);
private Configuration config;
private ClusterStatus clusterStatus;
private MasterServices masterServices;
private RSGroupInfoManager RSGroupInfoManager;
private LoadBalancer internalBalancer;
//used during reflection by LoadBalancerFactory
@InterfaceAudience.Private
public RSGroupBasedLoadBalancer() {
}
//This constructor should only be used for unit testing
@InterfaceAudience.Private
public RSGroupBasedLoadBalancer(RSGroupInfoManager RSGroupInfoManager) {
this.RSGroupInfoManager = RSGroupInfoManager;
}
@Override
public Configuration getConf() {
return config;
}
@Override
public void setConf(Configuration conf) {
this.config = conf;
}
@Override
public void setClusterStatus(ClusterStatus st) {
this.clusterStatus = st;
}
@Override
public void setMasterServices(MasterServices masterServices) {
this.masterServices = masterServices;
}
@Override
public List<RegionPlan> balanceCluster(TableName tableName, Map<ServerName, List<HRegionInfo>>
clusterState) throws HBaseIOException {
return balanceCluster(clusterState);
}
@Override
public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState)
throws HBaseIOException {
if (!isOnline()) {
throw new ConstraintException(RSGroupInfoManager.RSGROUP_TABLE_NAME +
" is not online, unable to perform balance");
}
Map<ServerName,List<HRegionInfo>> correctedState = correctAssignments(clusterState);
List<RegionPlan> regionPlans = new ArrayList<RegionPlan>();
List<HRegionInfo> misplacedRegions = correctedState.get(LoadBalancer.BOGUS_SERVER_NAME);
for (HRegionInfo regionInfo : misplacedRegions) {
regionPlans.add(new RegionPlan(regionInfo, null, null));
}
try {
for (RSGroupInfo info : RSGroupInfoManager.listRSGroups()) {
Map<ServerName, List<HRegionInfo>> groupClusterState =
new HashMap<ServerName, List<HRegionInfo>>();
for (HostAndPort sName : info.getServers()) {
for(ServerName curr: clusterState.keySet()) {
if(curr.getHostPort().equals(sName)) {
groupClusterState.put(curr, correctedState.get(curr));
}
}
}
List<RegionPlan> groupPlans = this.internalBalancer
.balanceCluster(groupClusterState);
if (groupPlans != null) {
regionPlans.addAll(groupPlans);
}
}
} catch (IOException exp) {
LOG.warn("Exception while balancing cluster.", exp);
regionPlans.clear();
}
return regionPlans;
}
@Override
public Map<ServerName, List<HRegionInfo>> roundRobinAssignment(
List<HRegionInfo> regions, List<ServerName> servers) throws HBaseIOException {
Map<ServerName, List<HRegionInfo>> assignments = Maps.newHashMap();
ListMultimap<String,HRegionInfo> regionMap = ArrayListMultimap.create();
ListMultimap<String,ServerName> serverMap = ArrayListMultimap.create();
generateGroupMaps(regions, servers, regionMap, serverMap);
for(String groupKey : regionMap.keySet()) {
if (regionMap.get(groupKey).size() > 0) {
Map<ServerName, List<HRegionInfo>> result =
this.internalBalancer.roundRobinAssignment(
regionMap.get(groupKey),
serverMap.get(groupKey));
if(result != null) {
assignments.putAll(result);
}
}
}
return assignments;
}
@Override
public Map<ServerName, List<HRegionInfo>> retainAssignment(
Map<HRegionInfo, ServerName> regions, List<ServerName> servers) throws HBaseIOException {
try {
Map<ServerName, List<HRegionInfo>> assignments = new TreeMap<ServerName, List<HRegionInfo>>();
ListMultimap<String, HRegionInfo> groupToRegion = ArrayListMultimap.create();
Set<HRegionInfo> misplacedRegions = getMisplacedRegions(regions);
for (HRegionInfo region : regions.keySet()) {
if (!misplacedRegions.contains(region)) {
String groupName = RSGroupInfoManager.getRSGroupOfTable(region.getTable());
groupToRegion.put(groupName, region);
}
}
// Now the "groupToRegion" map has only the regions which have correct
// assignments.
for (String key : groupToRegion.keySet()) {
Map<HRegionInfo, ServerName> currentAssignmentMap = new TreeMap<HRegionInfo, ServerName>();
List<HRegionInfo> regionList = groupToRegion.get(key);
RSGroupInfo info = RSGroupInfoManager.getRSGroup(key);
List<ServerName> candidateList = filterOfflineServers(info, servers);
for (HRegionInfo region : regionList) {
currentAssignmentMap.put(region, regions.get(region));
}
if(candidateList.size() > 0) {
assignments.putAll(this.internalBalancer.retainAssignment(
currentAssignmentMap, candidateList));
}
}
for (HRegionInfo region : misplacedRegions) {
String groupName = RSGroupInfoManager.getRSGroupOfTable(
region.getTable());
RSGroupInfo info = RSGroupInfoManager.getRSGroup(groupName);
List<ServerName> candidateList = filterOfflineServers(info, servers);
ServerName server = this.internalBalancer.randomAssignment(region,
candidateList);
if (server != null) {
if (!assignments.containsKey(server)) {
assignments.put(server, new ArrayList<HRegionInfo>());
}
assignments.get(server).add(region);
} else {
//if not server is available assign to bogus so it ends up in RIT
if(!assignments.containsKey(LoadBalancer.BOGUS_SERVER_NAME)) {
assignments.put(LoadBalancer.BOGUS_SERVER_NAME, new ArrayList<HRegionInfo>());
}
assignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
}
}
return assignments;
} catch (IOException e) {
throw new HBaseIOException("Failed to do online retain assignment", e);
}
}
@Override
public ServerName randomAssignment(HRegionInfo region,
List<ServerName> servers) throws HBaseIOException {
ListMultimap<String,HRegionInfo> regionMap = LinkedListMultimap.create();
ListMultimap<String,ServerName> serverMap = LinkedListMultimap.create();
generateGroupMaps(Lists.newArrayList(region), servers, regionMap, serverMap);
List<ServerName> filteredServers = serverMap.get(regionMap.keySet().iterator().next());
return this.internalBalancer.randomAssignment(region, filteredServers);
}
private void generateGroupMaps(
List<HRegionInfo> regions,
List<ServerName> servers,
ListMultimap<String, HRegionInfo> regionMap,
ListMultimap<String, ServerName> serverMap) throws HBaseIOException {
try {
for (HRegionInfo region : regions) {
String groupName = RSGroupInfoManager.getRSGroupOfTable(region.getTable());
if(groupName == null) {
LOG.warn("Group for table "+region.getTable()+" is null");
}
regionMap.put(groupName, region);
}
for (String groupKey : regionMap.keySet()) {
RSGroupInfo info = RSGroupInfoManager.getRSGroup(groupKey);
serverMap.putAll(groupKey, filterOfflineServers(info, servers));
if(serverMap.get(groupKey).size() < 1) {
serverMap.put(groupKey, LoadBalancer.BOGUS_SERVER_NAME);
}
}
} catch(IOException e) {
throw new HBaseIOException("Failed to generate group maps", e);
}
}
private List<ServerName> filterOfflineServers(RSGroupInfo RSGroupInfo,
List<ServerName> onlineServers) {
if (RSGroupInfo != null) {
return filterServers(RSGroupInfo.getServers(), onlineServers);
} else {
LOG.debug("Group Information found to be null. Some regions might be unassigned.");
return Collections.EMPTY_LIST;
}
}
/**
* Filter servers based on the online servers.
*
* @param servers
* the servers
* @param onlineServers
* List of servers which are online.
* @return the list
*/
private List<ServerName> filterServers(Collection<HostAndPort> servers,
Collection<ServerName> onlineServers) {
ArrayList<ServerName> finalList = new ArrayList<ServerName>();
for (HostAndPort server : servers) {
for(ServerName curr: onlineServers) {
if(curr.getHostPort().equals(server)) {
finalList.add(curr);
}
}
}
return finalList;
}
private ListMultimap<String, HRegionInfo> groupRegions(
List<HRegionInfo> regionList) throws IOException {
ListMultimap<String, HRegionInfo> regionGroup = ArrayListMultimap
.create();
for (HRegionInfo region : regionList) {
String groupName = RSGroupInfoManager.getRSGroupOfTable(region.getTable());
regionGroup.put(groupName, region);
}
return regionGroup;
}
private Set<HRegionInfo> getMisplacedRegions(
Map<HRegionInfo, ServerName> regions) throws IOException {
Set<HRegionInfo> misplacedRegions = new HashSet<HRegionInfo>();
for (HRegionInfo region : regions.keySet()) {
ServerName assignedServer = regions.get(region);
RSGroupInfo info =
RSGroupInfoManager.getRSGroup(RSGroupInfoManager.getRSGroupOfTable(region.getTable()));
if (assignedServer != null &&
(info == null || !info.containsServer(assignedServer.getHostPort()))) {
LOG.debug("Found misplaced region: " + region.getRegionNameAsString() +
" on server: " + assignedServer +
" found in group: " +
RSGroupInfoManager.getRSGroupOfServer(assignedServer.getHostPort()) +
" outside of group: " + (info == null ? "UNKNOWN" : info.getName()));
misplacedRegions.add(region);
}
}
return misplacedRegions;
}
private Map<ServerName, List<HRegionInfo>> correctAssignments(
Map<ServerName, List<HRegionInfo>> existingAssignments){
Map<ServerName, List<HRegionInfo>> correctAssignments =
new TreeMap<ServerName, List<HRegionInfo>>();
List<HRegionInfo> misplacedRegions = new LinkedList<HRegionInfo>();
correctAssignments.put(LoadBalancer.BOGUS_SERVER_NAME, new LinkedList<HRegionInfo>());
for (ServerName sName : existingAssignments.keySet()) {
correctAssignments.put(sName, new LinkedList<HRegionInfo>());
List<HRegionInfo> regions = existingAssignments.get(sName);
for (HRegionInfo region : regions) {
RSGroupInfo info = null;
try {
info = RSGroupInfoManager.getRSGroup(
RSGroupInfoManager.getRSGroupOfTable(region.getTable()));
} catch (IOException exp) {
LOG.debug("Group information null for region of table " + region.getTable(),
exp);
}
if ((info == null) || (!info.containsServer(sName.getHostPort()))) {
correctAssignments.get(LoadBalancer.BOGUS_SERVER_NAME).add(region);
} else {
correctAssignments.get(sName).add(region);
}
}
}
//TODO bulk unassign?
//unassign misplaced regions, so that they are assigned to correct groups.
for(HRegionInfo info: misplacedRegions) {
this.masterServices.getAssignmentManager().unassign(info);
}
return correctAssignments;
}
@Override
public void initialize() throws HBaseIOException {
try {
if (RSGroupInfoManager == null) {
List<RSGroupAdminEndpoint> cps =
masterServices.getMasterCoprocessorHost().findCoprocessors(RSGroupAdminEndpoint.class);
if (cps.size() != 1) {
String msg = "Expected one implementation of GroupAdminEndpoint but found " + cps.size();
LOG.error(msg);
throw new HBaseIOException(msg);
}
RSGroupInfoManager = cps.get(0).getGroupInfoManager();
}
} catch (IOException e) {
throw new HBaseIOException("Failed to initialize GroupInfoManagerImpl", e);
}
// Create the balancer
Class<? extends LoadBalancer> balancerKlass = config.getClass(
HBASE_GROUP_LOADBALANCER_CLASS,
StochasticLoadBalancer.class, LoadBalancer.class);
internalBalancer = ReflectionUtils.newInstance(balancerKlass, config);
internalBalancer.setClusterStatus(clusterStatus);
internalBalancer.setMasterServices(masterServices);
internalBalancer.setConf(config);
internalBalancer.initialize();
}
public boolean isOnline() {
return RSGroupInfoManager != null && RSGroupInfoManager.isOnline();
}
@Override
public void regionOnline(HRegionInfo regionInfo, ServerName sn) {
}
@Override
public void regionOffline(HRegionInfo regionInfo) {
}
@Override
public void onConfigurationChange(Configuration conf) {
//DO nothing for now
}
@Override
public void stop(String why) {
}
@Override
public boolean isStopped() {
return false;
}
}