blob: 67810295e234fc9cdc87ef1609fc0b76217f1977 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.util;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.Logger;
import org.springframework.scheduling.support.CronSequenceGenerator;
import org.apache.geode.GemFireConfigException;
import org.apache.geode.annotations.Experimental;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheClosedException;
import org.apache.geode.cache.Declarable;
import org.apache.geode.cache.GemFireCache;
import org.apache.geode.cache.control.RebalanceOperation;
import org.apache.geode.cache.control.RebalanceResults;
import org.apache.geode.cache.partition.PartitionMemberInfo;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.partitioned.InternalPRInfo;
import org.apache.geode.internal.cache.partitioned.LoadProbe;
import org.apache.geode.internal.logging.LogService;
/**
* Re-balancing operation relocates data from heavily loaded members to lightly loaded members. In
* most cases, the decision to re-balance is based on the size of the member and a few other
* statistics. {@link AutoBalancer} monitors these statistics and if necessary, triggers a
* re-balancing request. Auto-Balancing is expected to prevent failures and data loss.
*
* <P>
* This implementation is based on {@code ConfigInitialization} implementation. By default
* auto-balancing is disabled. A user needs to configure {@link AutoBalancer} during cache
* initialization {@link GemFireCache#getInitializer()}
*
* <P>
* In a cluster only one member owns auto-balancing responsibility. This is achieved by grabbing a
* distributed lock. In case of a failure a new member will grab the lock and manage auto balancing.
*
* <P>
* {@link AutoBalancer} can be controlled using the following configurations
* <OL>
* <LI>{@link AutoBalancer#SCHEDULE}
* <LI>TBD THRESHOLDS
*/
@Experimental("The autobalancer may be removed or the API may change in future releases")
public class AutoBalancer implements Declarable {
/**
* Use this configuration to manage out-of-balance audit frequency. If the auditor finds the
* system to be out-of-balance, it will trigger re-balancing. Any valid cron string is accepted.
* The sub-expressions represent the following:
* <OL>
* <LI>Seconds
* <LI>Minutes
* <LI>Hours
* <LI>Day-of-Month
* <LI>Month
* <LI>Day-of-Week
* <LI>Year (optional field)
*
* <P>
* For. e.g. {@code 0 0 * * * ?} for auditing the system every hour
*/
public static final String SCHEDULE = "schedule";
/**
* Use this configuration to manage re-balance invocation. Rebalance operation will be triggered
* if the total number of bytes rebalance operation may move is more than this threshold, in
* percentage of the total data size.
* <P>
* Default value {@link #DEFAULT_SIZE_THRESHOLD_PERCENT}
*/
public static final String SIZE_THRESHOLD_PERCENT = "size-threshold-percent";
/**
* Default value of {@link AutoBalancer#SIZE_THRESHOLD_PERCENT}. If 10% of data is misplaced, its
* a good time to redistribute buckets
*/
public static final int DEFAULT_SIZE_THRESHOLD_PERCENT = 10;
/**
* In the initial data load phases, {@link AutoBalancer#SIZE_THRESHOLD_PERCENT} based rebalance
* invocation may be unnecessary. Rebalance should not be triggered if the total data size managed
* by cluster is too small. Rebalance operation will be triggered if the total number of bytes
* rebalance operation may move is more than this number of bytes.
* <P>
* Default value {@link #DEFAULT_MINIMUM_SIZE}
*/
public static final String MINIMUM_SIZE = "minimum-size";
/**
* Default value of {@link AutoBalancer#MINIMUM_SIZE}. In the initial data load phases,
* {@link AutoBalancer#SIZE_THRESHOLD_PERCENT} based rebalance invocation may be unnecessary. Do
* not rebalance if the data to be moved is less than 100MB
*/
public static final int DEFAULT_MINIMUM_SIZE = 100 * 1024 * 1024;
/**
* Name of the DistributedLockService that {@link AutoBalancer} will use to guard against
* concurrent maintenance activity
*/
public static final String AUTO_BALANCER_LOCK_SERVICE_NAME = "__AUTO_B";
public static final Object AUTO_BALANCER_LOCK = "__AUTO_B_LOCK";
private final AuditScheduler scheduler;
private final OOBAuditor auditor;
private final TimeProvider clock;
private final CacheOperationFacade cacheFacade;
private boolean initialized;
private static final Logger logger = LogService.getLogger();
public AutoBalancer() {
this(null, null, null, null);
}
public AutoBalancer(AuditScheduler scheduler, OOBAuditor auditor, TimeProvider clock,
CacheOperationFacade cacheFacade) {
this.cacheFacade = cacheFacade == null ? new GeodeCacheFacade() : cacheFacade;
this.scheduler = scheduler == null ? new CronScheduler() : scheduler;
this.auditor = auditor == null ? new SizeBasedOOBAuditor(this.cacheFacade) : auditor;
this.clock = clock == null ? new SystemClockTimeProvider() : clock;
}
@Override
public void initialize(Cache cache, Properties props) {
this.cacheFacade.setCache(cache);
internalInitialize(props);
}
/**
* @deprecated as of Geode 1.5 use initialize instead.
*/
@Override
public void init(Properties props) {
internalInitialize(props);
}
private void internalInitialize(Properties props) {
if (this.initialized) {
// For backwards compatibility we need to keep the external
// init method. But the product will call both initialize and
// init. So if we are already initialized subsequent calls
// are a noop. Once the deprecated init method is removed, this
// boolean check can also be removed.
return;
}
this.initialized = true;
if (logger.isDebugEnabled()) {
logger.debug("Initializing " + this.getClass().getSimpleName() + " with " + props);
}
auditor.init(props);
String schedule = null;
if (props != null) {
schedule = props.getProperty(SCHEDULE);
}
scheduler.init(schedule);
}
/**
* Invokes audit triggers based on a cron schedule.
* <OL>
* <LI>computes delay = next slot - current time
* <LI>schedules a out-of-balance audit task to be started after delay computed earlier
* <LI>once the audit task completes, it repeats delay computation and task submission
*/
private class CronScheduler implements AuditScheduler {
final ScheduledExecutorService trigger;
CronSequenceGenerator generator;
CronScheduler() {
trigger = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "AutoBalancer");
thread.setDaemon(true);
return thread;
}
});
}
@Override
public void init(String schedule) {
if (logger.isDebugEnabled()) {
logger.debug("Initializing " + this.getClass().getSimpleName() + " with " + schedule);
}
if (schedule == null || schedule.isEmpty()) {
throw new GemFireConfigException("Missing configuration: " + SCHEDULE);
}
try {
generator = new CronSequenceGenerator(schedule);
} catch (Exception e) {
throw new GemFireConfigException("Cron expression could not be parsed: " + schedule, e);
}
submitNext();
}
private void submitNext() {
long currentTime = clock.currentTimeMillis();
Date nextSchedule = generator.next(new Date(currentTime));
long delay = nextSchedule.getTime() - currentTime;
if (logger.isDebugEnabled()) {
logger.debug("Now={}, next audit time={}, delay={} ms", new Date(currentTime), nextSchedule,
delay);
}
trigger.schedule(new Runnable() {
@Override
public void run() {
try {
auditor.execute();
} catch (CacheClosedException e) {
logger.warn("Cache closed while attempting to rebalance the cluster. Abort future jobs",
e);
return;
} catch (Exception e) {
logger.warn("Error while executing out-of-balance audit.", e);
}
submitNext();
}
}, delay, TimeUnit.MILLISECONDS);
}
@Override
public void destroy() {
trigger.shutdownNow();
}
}
/**
* Queries member statistics and health to determine if a re-balance operation is needed
* <OL>
* <LI>acquires distributed lock
* <LI>queries member health
* <LI>updates auto-balance stat
* <LI>release lock
*/
static class SizeBasedOOBAuditor implements OOBAuditor {
private int sizeThreshold = DEFAULT_SIZE_THRESHOLD_PERCENT;
private int sizeMinimum = DEFAULT_MINIMUM_SIZE;
final CacheOperationFacade cache;
public SizeBasedOOBAuditor(CacheOperationFacade cache) {
this.cache = cache;
}
@Override
public void init(Properties props) {
if (logger.isDebugEnabled()) {
logger.debug("Initializing " + this.getClass().getSimpleName());
}
if (props != null) {
if (props.getProperty(SIZE_THRESHOLD_PERCENT) != null) {
sizeThreshold = Integer.valueOf(props.getProperty(SIZE_THRESHOLD_PERCENT));
if (sizeThreshold <= 0 || sizeThreshold >= 100) {
throw new GemFireConfigException(
SIZE_THRESHOLD_PERCENT + " should be integer, 1 to 99");
}
}
if (props.getProperty(MINIMUM_SIZE) != null) {
sizeMinimum = Integer.valueOf(props.getProperty(MINIMUM_SIZE));
if (sizeMinimum <= 0) {
throw new GemFireConfigException(MINIMUM_SIZE + " should be greater than 0");
}
}
}
}
@Override
public void execute() {
boolean result = cache.acquireAutoBalanceLock();
if (!result) {
if (logger.isDebugEnabled()) {
logger.debug(
"Another member owns auto-balance lock. Skip this attempt to rebalance the cluster");
}
return;
}
cache.incrementAttemptCounter();
result = needsRebalancing();
if (!result) {
if (logger.isDebugEnabled()) {
logger.debug("Rebalancing is not needed");
}
return;
}
cache.rebalance();
}
/**
* By default auto-balancer will avoid rebalancing, because a user can always trigger rebalance
* manually. So in case of error or inconsistent data, return false. Return true if
* <OL>
* <LI>total transfer size is above threshold percent of total data size at cluster level
* <LI>If some smaller capacity nodes are heavily loaded while bigger capacity nodes are
* balanced. In such a scenario transfer size based trigger may not cause rebalance.
*/
boolean needsRebalancing() {
// test cluster level status
long transferSize = cache.getTotalTransferSize();
if (transferSize <= sizeMinimum) {
return false;
}
Map<PartitionedRegion, InternalPRInfo> details = cache.getRegionMemberDetails();
long totalSize = cache.getTotalDataSize(details);
if (totalSize > 0) {
int transferPercent = (int) ((100.0 * transferSize) / totalSize);
if (transferPercent >= sizeThreshold) {
return true;
}
}
// TODO test member level skew
return false;
}
int getSizeThreshold() {
return sizeThreshold;
}
public long getSizeMinimum() {
return sizeMinimum;
}
}
/**
* Hides cache level details and exposes simple methods relevant for auto-balancing
*/
static class GeodeCacheFacade implements CacheOperationFacade {
private final AtomicBoolean isLockAcquired = new AtomicBoolean(false);
private InternalCache cache;
public GeodeCacheFacade() {
this(null);
}
public GeodeCacheFacade(InternalCache cache) {
this.cache = cache;
}
@Override
public Map<PartitionedRegion, InternalPRInfo> getRegionMemberDetails() {
InternalCache cache = getCache();
Map<PartitionedRegion, InternalPRInfo> detailsMap = new HashMap<>();
for (PartitionedRegion region : cache.getPartitionedRegions()) {
LoadProbe probe = cache.getInternalResourceManager().getLoadProbe();
InternalPRInfo info =
region.getRedundancyProvider().buildPartitionedRegionInfo(true, probe);
detailsMap.put(region, info);
}
return detailsMap;
}
@Override
public long getTotalDataSize(Map<PartitionedRegion, InternalPRInfo> details) {
long totalSize = 0;
if (details != null) {
for (PartitionedRegion region : details.keySet()) {
InternalPRInfo info = details.get(region);
Set<PartitionMemberInfo> membersInfo = info.getPartitionMemberInfo();
for (PartitionMemberInfo member : membersInfo) {
if (logger.isDebugEnabled()) {
logger.debug("Region:{}, Member: {}, Size: {}", region.getFullPath(), member,
member.getSize());
}
totalSize += member.getSize();
}
}
}
return totalSize;
}
@Override
public long getTotalTransferSize() {
try {
RebalanceOperation operation =
getCache().getResourceManager().createRebalanceFactory().simulate();
RebalanceResults result = operation.getResults();
if (logger.isDebugEnabled()) {
logger.debug("Rebalance estimate: RebalanceResultsImpl [TotalBucketCreateBytes="
+ result.getTotalBucketCreateBytes() + ", TotalBucketCreatesCompleted="
+ result.getTotalBucketCreatesCompleted() + ", TotalBucketTransferBytes="
+ result.getTotalBucketTransferBytes() + ", TotalBucketTransfersCompleted="
+ result.getTotalBucketTransfersCompleted() + ", TotalPrimaryTransfersCompleted="
+ result.getTotalPrimaryTransfersCompleted() + "]");
}
return result.getTotalBucketTransferBytes();
} catch (CancellationException e) {
logger.info("Error while trying to estimate rebalance cost ", e);
} catch (InterruptedException e) {
logger.info("Error while trying to estimate rebalance cost ", e);
}
return 0;
}
@Override
public void incrementAttemptCounter() {
InternalCache cache = getCache();
try {
cache.getInternalResourceManager().getStats().incAutoRebalanceAttempts();
} catch (Exception e) {
logger.warn("Failed to increment AutoBalanceAttempts counter");
}
}
@Override
public void rebalance() {
try {
RebalanceOperation operation =
getCache().getResourceManager().createRebalanceFactory().start();
RebalanceResults result = operation.getResults();
logger
.info("Rebalance result: [TotalBucketCreateBytes=" + result.getTotalBucketCreateBytes()
+ ", TotalBucketCreateTime=" + result.getTotalBucketCreateTime()
+ ", TotalBucketCreatesCompleted=" + result.getTotalBucketCreatesCompleted()
+ ", TotalBucketTransferBytes=" + result.getTotalBucketTransferBytes()
+ ", TotalBucketTransferTime=" + result.getTotalBucketTransferTime()
+ ", TotalBucketTransfersCompleted=" + +result.getTotalBucketTransfersCompleted()
+ ", TotalPrimaryTransferTime=" + result.getTotalPrimaryTransferTime()
+ ", TotalPrimaryTransfersCompleted=" + result.getTotalPrimaryTransfersCompleted()
+ ", TotalTime=" + result.getTotalTime() + "]");
} catch (CancellationException e) {
logger.info("Error rebalancing the cluster", e);
} catch (InterruptedException e) {
logger.info("Error rebalancing the cluster", e);
}
}
InternalCache getCache() {
InternalCache result = cache;
if (result == null) {
throw new IllegalStateException("Missing cache instance.");
}
if (result.isClosed()) {
throw new CacheClosedException();
}
return result;
}
@Override
public boolean acquireAutoBalanceLock() {
// TODO: delete this double-checking
if (!isLockAcquired.get()) {
synchronized (isLockAcquired) {
if (!isLockAcquired.get()) {
DistributedLockService dls = getDLS();
boolean result = dls.lock(AUTO_BALANCER_LOCK, 0, -1);
if (result) {
isLockAcquired.set(true);
if (logger.isDebugEnabled()) {
logger.debug("Grabbed AutoBalancer lock");
}
} else {
if (logger.isDebugEnabled()) {
logger.debug(
"Another member owns auto-balance lock. Skip this attempt to rebalance the cluster");
}
}
}
}
}
return isLockAcquired.get();
}
@Override
public DistributedLockService getDLS() {
InternalCache cache = getCache();
DistributedLockService dls =
DistributedLockService.getServiceNamed(AUTO_BALANCER_LOCK_SERVICE_NAME);
if (dls == null) {
if (logger.isDebugEnabled()) {
logger.debug("Creating DistributeLockService");
}
dls = DLockService.create(AUTO_BALANCER_LOCK_SERVICE_NAME,
cache.getInternalDistributedSystem(), true, true, true);
}
return dls;
}
@Override
public void setCache(Cache cache) {
this.cache = (InternalCache) cache;
}
}
private static class SystemClockTimeProvider implements TimeProvider {
@Override
public long currentTimeMillis() {
return System.currentTimeMillis();
}
}
interface AuditScheduler {
void init(String schedule);
void destroy();
}
interface OOBAuditor {
void init(Properties props);
void execute();
}
interface TimeProvider {
long currentTimeMillis();
}
interface CacheOperationFacade {
boolean acquireAutoBalanceLock();
void setCache(Cache cache);
DistributedLockService getDLS();
void rebalance();
void incrementAttemptCounter();
Map<PartitionedRegion, InternalPRInfo> getRegionMemberDetails();
long getTotalDataSize(Map<PartitionedRegion, InternalPRInfo> details);
long getTotalTransferSize();
}
OOBAuditor getOOBAuditor() {
return auditor;
}
public CacheOperationFacade getCacheOperationFacade() {
return this.cacheFacade;
}
public void destroy() {
scheduler.destroy();
}
}