blob: 501ad8b8b2e6133f9a5a399dd0040670ffaef77b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A chore which computes the size of each {@link HRegion} on the FileSystem hosted by the given
* {@link HRegionServer}. The results of this computation are stored in the
* {@link RegionServerSpaceQuotaManager}'s {@link RegionSizeStore} object.
*/
@InterfaceAudience.Private
public class FileSystemUtilizationChore extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(FileSystemUtilizationChore.class);
static final String FS_UTILIZATION_CHORE_PERIOD_KEY = "hbase.regionserver.quotas.fs.utilization.chore.period";
static final int FS_UTILIZATION_CHORE_PERIOD_DEFAULT = 1000 * 60 * 5; // 5 minutes in millis
static final String FS_UTILIZATION_CHORE_DELAY_KEY = "hbase.regionserver.quotas.fs.utilization.chore.delay";
static final long FS_UTILIZATION_CHORE_DELAY_DEFAULT = 1000L * 60L; // 1 minute
static final String FS_UTILIZATION_CHORE_TIMEUNIT_KEY = "hbase.regionserver.quotas.fs.utilization.chore.timeunit";
static final String FS_UTILIZATION_CHORE_TIMEUNIT_DEFAULT = TimeUnit.MILLISECONDS.name();
static final String FS_UTILIZATION_MAX_ITERATION_DURATION_KEY = "hbase.regionserver.quotas.fs.utilization.chore.max.iteration.millis";
static final long FS_UTILIZATION_MAX_ITERATION_DURATION_DEFAULT = 5000L;
private final HRegionServer rs;
private final long maxIterationMillis;
private Iterator<Region> leftoverRegions;
public FileSystemUtilizationChore(HRegionServer rs) {
super(FileSystemUtilizationChore.class.getSimpleName(), rs, getPeriod(rs.getConfiguration()),
getInitialDelay(rs.getConfiguration()), getTimeUnit(rs.getConfiguration()));
this.rs = rs;
this.maxIterationMillis = rs.getConfiguration().getLong(
FS_UTILIZATION_MAX_ITERATION_DURATION_KEY, FS_UTILIZATION_MAX_ITERATION_DURATION_DEFAULT);
}
@Override
protected void chore() {
final RegionSizeStore regionSizeStore = getRegionSizeStore();
final Set<Region> onlineRegions = new HashSet<>(rs.getRegions());
// Process the regions from the last run if we have any. If we are somehow having difficulty
// processing the Regions, we want to avoid creating a backlog in memory of Region objs.
Iterator<Region> oldRegionsToProcess = getLeftoverRegions();
final Iterator<Region> iterator;
final boolean processingLeftovers;
if (oldRegionsToProcess == null) {
iterator = onlineRegions.iterator();
processingLeftovers = false;
} else {
iterator = oldRegionsToProcess;
processingLeftovers = true;
}
// Reset the leftoverRegions and let the loop re-assign if necessary.
setLeftoverRegions(null);
long regionSizesCalculated = 0L;
long offlineRegionsSkipped = 0L;
long skippedSplitParents = 0L;
long skippedRegionReplicas = 0L;
final long start = EnvironmentEdgeManager.currentTime();
while (iterator.hasNext()) {
// Make sure this chore doesn't hog the thread.
long timeRunning = EnvironmentEdgeManager.currentTime() - start;
if (timeRunning > maxIterationMillis) {
LOG.debug("Preempting execution of FileSystemUtilizationChore because it exceeds the"
+ " maximum iteration configuration value. Will process remaining Regions"
+ " on a subsequent invocation.");
setLeftoverRegions(iterator);
break;
}
final Region region = iterator.next();
// If we're processing leftover regions, the region may no-longer be online.
// If so, we can skip it.
if (processingLeftovers && !onlineRegions.contains(region)) {
offlineRegionsSkipped++;
continue;
}
// Avoid computing the size of regions which are the parent of split.
if (region.getRegionInfo().isSplitParent()) {
skippedSplitParents++;
continue;
}
// Avoid computing the size of region replicas.
if (RegionInfo.DEFAULT_REPLICA_ID != region.getRegionInfo().getReplicaId()) {
skippedRegionReplicas++;
continue;
}
final long sizeInBytes = computeSize(region);
regionSizeStore.put(region.getRegionInfo(), sizeInBytes);
regionSizesCalculated++;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Computed the size of " + regionSizesCalculated + " Regions. Skipped computation"
+ " of " + offlineRegionsSkipped + " regions due to not being online on this RS, "
+ skippedSplitParents + " regions due to being the parent of a split, and"
+ skippedRegionReplicas + " regions due to being region replicas.");
}
}
/**
* Returns an {@link Iterator} over the Regions which were skipped last invocation of the chore.
*
* @return Regions from the previous invocation to process, or null.
*/
Iterator<Region> getLeftoverRegions() {
return leftoverRegions;
}
/**
* Sets a new collection of Regions as leftovers.
*/
void setLeftoverRegions(Iterator<Region> newLeftovers) {
this.leftoverRegions = newLeftovers;
}
/**
* Computes total FileSystem size for the given {@link Region}.
*
* @param r The region
* @return The size, in bytes, of the Region.
*/
long computeSize(Region r) {
long regionSize = 0L;
for (Store store : r.getStores()) {
regionSize += store.getHFilesSize();
}
if (LOG.isTraceEnabled()) {
LOG.trace("Size of " + r + " is " + regionSize);
}
return regionSize;
}
// visible for testing
RegionSizeStore getRegionSizeStore() {
return rs.getRegionServerSpaceQuotaManager().getRegionSizeStore();
}
/**
* Extracts the period for the chore from the configuration.
*
* @param conf The configuration object.
* @return The configured chore period or the default value.
*/
static int getPeriod(Configuration conf) {
return conf.getInt(FS_UTILIZATION_CHORE_PERIOD_KEY, FS_UTILIZATION_CHORE_PERIOD_DEFAULT);
}
/**
* Extracts the initial delay for the chore from the configuration.
*
* @param conf The configuration object.
* @return The configured chore initial delay or the default value.
*/
static long getInitialDelay(Configuration conf) {
return conf.getLong(FS_UTILIZATION_CHORE_DELAY_KEY, FS_UTILIZATION_CHORE_DELAY_DEFAULT);
}
/**
* Extracts the time unit for the chore period and initial delay from the configuration. The
* configuration value for {@link #FS_UTILIZATION_CHORE_TIMEUNIT_KEY} must correspond to a
* {@link TimeUnit} value.
*
* @param conf The configuration object.
* @return The configured time unit for the chore period and initial delay or the default value.
*/
static TimeUnit getTimeUnit(Configuration conf) {
return TimeUnit.valueOf(conf.get(FS_UTILIZATION_CHORE_TIMEUNIT_KEY,
FS_UTILIZATION_CHORE_TIMEUNIT_DEFAULT));
}
}