| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.cache.util; |
| |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ThreadFactory; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.logging.log4j.Logger; |
| import org.springframework.scheduling.support.CronSequenceGenerator; |
| |
| import org.apache.geode.GemFireConfigException; |
| import org.apache.geode.annotations.Experimental; |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.CacheClosedException; |
| import org.apache.geode.cache.Declarable; |
| import org.apache.geode.cache.GemFireCache; |
| import org.apache.geode.cache.control.RebalanceOperation; |
| import org.apache.geode.cache.control.RebalanceResults; |
| import org.apache.geode.cache.partition.PartitionMemberInfo; |
| import org.apache.geode.distributed.DistributedLockService; |
| import org.apache.geode.distributed.internal.locks.DLockService; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.PartitionedRegion; |
| import org.apache.geode.internal.cache.partitioned.InternalPRInfo; |
| import org.apache.geode.internal.cache.partitioned.LoadProbe; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * Re-balancing operation relocates data from heavily loaded members to lightly loaded members. In |
| * most cases, the decision to re-balance is based on the size of the member and a few other |
| * statistics. {@link AutoBalancer} monitors these statistics and if necessary, triggers a |
| * re-balancing request. Auto-Balancing is expected to prevent failures and data loss. |
| * |
| * <P> |
| * This implementation is based on {@code ConfigInitialization} implementation. By default |
| * auto-balancing is disabled. A user needs to configure {@link AutoBalancer} during cache |
| * initialization {@link GemFireCache#getInitializer()} |
| * |
| * <P> |
| * In a cluster only one member owns auto-balancing responsibility. This is achieved by grabbing a |
| * distributed lock. In case of a failure a new member will grab the lock and manage auto balancing. |
| * |
| * <P> |
| * {@link AutoBalancer} can be controlled using the following configurations |
| * <OL> |
| * <LI>{@link AutoBalancer#SCHEDULE} |
| * <LI>TBD THRESHOLDS |
| */ |
| @Experimental("The autobalancer may be removed or the API may change in future releases") |
| public class AutoBalancer implements Declarable { |
| /** |
| * Use this configuration to manage out-of-balance audit frequency. If the auditor finds the |
| * system to be out-of-balance, it will trigger re-balancing. Any valid cron string is accepted. |
| * The sub-expressions represent the following: |
| * <OL> |
| * <LI>Seconds |
| * <LI>Minutes |
| * <LI>Hours |
| * <LI>Day-of-Month |
| * <LI>Month |
| * <LI>Day-of-Week |
| * <LI>Year (optional field) |
| * |
| * <P> |
| * For. e.g. {@code 0 0 * * * ?} for auditing the system every hour |
| */ |
| public static final String SCHEDULE = "schedule"; |
| |
| /** |
| * Use this configuration to manage re-balance invocation. Rebalance operation will be triggered |
| * if the total number of bytes rebalance operation may move is more than this threshold, in |
| * percentage of the total data size. |
| * <P> |
| * Default value {@link #DEFAULT_SIZE_THRESHOLD_PERCENT} |
| */ |
| public static final String SIZE_THRESHOLD_PERCENT = "size-threshold-percent"; |
| |
| /** |
| * Default value of {@link AutoBalancer#SIZE_THRESHOLD_PERCENT}. If 10% of data is misplaced, its |
| * a good time to redistribute buckets |
| */ |
| public static final int DEFAULT_SIZE_THRESHOLD_PERCENT = 10; |
| |
| /** |
| * In the initial data load phases, {@link AutoBalancer#SIZE_THRESHOLD_PERCENT} based rebalance |
| * invocation may be unnecessary. Rebalance should not be triggered if the total data size managed |
| * by cluster is too small. Rebalance operation will be triggered if the total number of bytes |
| * rebalance operation may move is more than this number of bytes. |
| * <P> |
| * Default value {@link #DEFAULT_MINIMUM_SIZE} |
| */ |
| public static final String MINIMUM_SIZE = "minimum-size"; |
| |
| /** |
| * Default value of {@link AutoBalancer#MINIMUM_SIZE}. In the initial data load phases, |
| * {@link AutoBalancer#SIZE_THRESHOLD_PERCENT} based rebalance invocation may be unnecessary. Do |
| * not rebalance if the data to be moved is less than 100MB |
| */ |
| public static final int DEFAULT_MINIMUM_SIZE = 100 * 1024 * 1024; |
| |
| /** |
| * Name of the DistributedLockService that {@link AutoBalancer} will use to guard against |
| * concurrent maintenance activity |
| */ |
| public static final String AUTO_BALANCER_LOCK_SERVICE_NAME = "__AUTO_B"; |
| |
| public static final Object AUTO_BALANCER_LOCK = "__AUTO_B_LOCK"; |
| |
| private final AuditScheduler scheduler; |
| private final OOBAuditor auditor; |
| private final TimeProvider clock; |
| private final CacheOperationFacade cacheFacade; |
| |
| private boolean initialized; |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| public AutoBalancer() { |
| this(null, null, null, null); |
| } |
| |
| public AutoBalancer(AuditScheduler scheduler, OOBAuditor auditor, TimeProvider clock, |
| CacheOperationFacade cacheFacade) { |
| this.cacheFacade = cacheFacade == null ? new GeodeCacheFacade() : cacheFacade; |
| this.scheduler = scheduler == null ? new CronScheduler() : scheduler; |
| this.auditor = auditor == null ? new SizeBasedOOBAuditor(this.cacheFacade) : auditor; |
| this.clock = clock == null ? new SystemClockTimeProvider() : clock; |
| } |
| |
| @Override |
| public void initialize(Cache cache, Properties props) { |
| this.cacheFacade.setCache(cache); |
| internalInitialize(props); |
| } |
| |
| /** |
| * @deprecated as of Geode 1.5 use initialize instead. |
| */ |
| @Override |
| public void init(Properties props) { |
| internalInitialize(props); |
| } |
| |
| private void internalInitialize(Properties props) { |
| if (this.initialized) { |
| // For backwards compatibility we need to keep the external |
| // init method. But the product will call both initialize and |
| // init. So if we are already initialized subsequent calls |
| // are a noop. Once the deprecated init method is removed, this |
| // boolean check can also be removed. |
| return; |
| } |
| this.initialized = true; |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Initializing " + this.getClass().getSimpleName() + " with " + props); |
| } |
| |
| auditor.init(props); |
| |
| String schedule = null; |
| if (props != null) { |
| schedule = props.getProperty(SCHEDULE); |
| } |
| scheduler.init(schedule); |
| } |
| |
| /** |
| * Invokes audit triggers based on a cron schedule. |
| * <OL> |
| * <LI>computes delay = next slot - current time |
| * <LI>schedules a out-of-balance audit task to be started after delay computed earlier |
| * <LI>once the audit task completes, it repeats delay computation and task submission |
| */ |
| private class CronScheduler implements AuditScheduler { |
| final ScheduledExecutorService trigger; |
| CronSequenceGenerator generator; |
| |
| CronScheduler() { |
| trigger = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() { |
| @Override |
| public Thread newThread(Runnable r) { |
| Thread thread = new Thread(r, "AutoBalancer"); |
| thread.setDaemon(true); |
| return thread; |
| } |
| }); |
| } |
| |
| @Override |
| public void init(String schedule) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Initializing " + this.getClass().getSimpleName() + " with " + schedule); |
| } |
| |
| if (schedule == null || schedule.isEmpty()) { |
| throw new GemFireConfigException("Missing configuration: " + SCHEDULE); |
| } |
| |
| try { |
| generator = new CronSequenceGenerator(schedule); |
| } catch (Exception e) { |
| throw new GemFireConfigException("Cron expression could not be parsed: " + schedule, e); |
| } |
| |
| submitNext(); |
| } |
| |
| private void submitNext() { |
| long currentTime = clock.currentTimeMillis(); |
| Date nextSchedule = generator.next(new Date(currentTime)); |
| long delay = nextSchedule.getTime() - currentTime; |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Now={}, next audit time={}, delay={} ms", new Date(currentTime), nextSchedule, |
| delay); |
| } |
| |
| trigger.schedule(new Runnable() { |
| @Override |
| public void run() { |
| try { |
| auditor.execute(); |
| } catch (CacheClosedException e) { |
| logger.warn("Cache closed while attempting to rebalance the cluster. Abort future jobs", |
| e); |
| return; |
| } catch (Exception e) { |
| logger.warn("Error while executing out-of-balance audit.", e); |
| } |
| submitNext(); |
| } |
| }, delay, TimeUnit.MILLISECONDS); |
| } |
| |
| @Override |
| public void destroy() { |
| trigger.shutdownNow(); |
| } |
| } |
| |
| /** |
| * Queries member statistics and health to determine if a re-balance operation is needed |
| * <OL> |
| * <LI>acquires distributed lock |
| * <LI>queries member health |
| * <LI>updates auto-balance stat |
| * <LI>release lock |
| */ |
| static class SizeBasedOOBAuditor implements OOBAuditor { |
| private int sizeThreshold = DEFAULT_SIZE_THRESHOLD_PERCENT; |
| private int sizeMinimum = DEFAULT_MINIMUM_SIZE; |
| |
| final CacheOperationFacade cache; |
| |
| public SizeBasedOOBAuditor(CacheOperationFacade cache) { |
| this.cache = cache; |
| } |
| |
| @Override |
| public void init(Properties props) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Initializing " + this.getClass().getSimpleName()); |
| } |
| |
| if (props != null) { |
| if (props.getProperty(SIZE_THRESHOLD_PERCENT) != null) { |
| sizeThreshold = Integer.valueOf(props.getProperty(SIZE_THRESHOLD_PERCENT)); |
| if (sizeThreshold <= 0 || sizeThreshold >= 100) { |
| throw new GemFireConfigException( |
| SIZE_THRESHOLD_PERCENT + " should be integer, 1 to 99"); |
| } |
| } |
| if (props.getProperty(MINIMUM_SIZE) != null) { |
| sizeMinimum = Integer.valueOf(props.getProperty(MINIMUM_SIZE)); |
| if (sizeMinimum <= 0) { |
| throw new GemFireConfigException(MINIMUM_SIZE + " should be greater than 0"); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void execute() { |
| boolean result = cache.acquireAutoBalanceLock(); |
| if (!result) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Another member owns auto-balance lock. Skip this attempt to rebalance the cluster"); |
| } |
| return; |
| } |
| |
| cache.incrementAttemptCounter(); |
| result = needsRebalancing(); |
| if (!result) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Rebalancing is not needed"); |
| } |
| return; |
| } |
| |
| cache.rebalance(); |
| } |
| |
| /** |
| * By default auto-balancer will avoid rebalancing, because a user can always trigger rebalance |
| * manually. So in case of error or inconsistent data, return false. Return true if |
| * <OL> |
| * <LI>total transfer size is above threshold percent of total data size at cluster level |
| * <LI>If some smaller capacity nodes are heavily loaded while bigger capacity nodes are |
| * balanced. In such a scenario transfer size based trigger may not cause rebalance. |
| */ |
| boolean needsRebalancing() { |
| // test cluster level status |
| long transferSize = cache.getTotalTransferSize(); |
| if (transferSize <= sizeMinimum) { |
| return false; |
| } |
| |
| Map<PartitionedRegion, InternalPRInfo> details = cache.getRegionMemberDetails(); |
| long totalSize = cache.getTotalDataSize(details); |
| |
| if (totalSize > 0) { |
| int transferPercent = (int) ((100.0 * transferSize) / totalSize); |
| if (transferPercent >= sizeThreshold) { |
| return true; |
| } |
| } |
| |
| // TODO test member level skew |
| |
| return false; |
| } |
| |
| int getSizeThreshold() { |
| return sizeThreshold; |
| } |
| |
| public long getSizeMinimum() { |
| return sizeMinimum; |
| } |
| } |
| |
| /** |
| * Hides cache level details and exposes simple methods relevant for auto-balancing |
| */ |
| static class GeodeCacheFacade implements CacheOperationFacade { |
| private final AtomicBoolean isLockAcquired = new AtomicBoolean(false); |
| |
| private InternalCache cache; |
| |
| public GeodeCacheFacade() { |
| this(null); |
| } |
| |
| public GeodeCacheFacade(InternalCache cache) { |
| this.cache = cache; |
| } |
| |
| @Override |
| public Map<PartitionedRegion, InternalPRInfo> getRegionMemberDetails() { |
| InternalCache cache = getCache(); |
| Map<PartitionedRegion, InternalPRInfo> detailsMap = new HashMap<>(); |
| for (PartitionedRegion region : cache.getPartitionedRegions()) { |
| LoadProbe probe = cache.getInternalResourceManager().getLoadProbe(); |
| InternalPRInfo info = |
| region.getRedundancyProvider().buildPartitionedRegionInfo(true, probe); |
| detailsMap.put(region, info); |
| } |
| return detailsMap; |
| } |
| |
| @Override |
| public long getTotalDataSize(Map<PartitionedRegion, InternalPRInfo> details) { |
| long totalSize = 0; |
| if (details != null) { |
| for (PartitionedRegion region : details.keySet()) { |
| InternalPRInfo info = details.get(region); |
| Set<PartitionMemberInfo> membersInfo = info.getPartitionMemberInfo(); |
| for (PartitionMemberInfo member : membersInfo) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Region:{}, Member: {}, Size: {}", region.getFullPath(), member, |
| member.getSize()); |
| } |
| totalSize += member.getSize(); |
| } |
| } |
| } |
| return totalSize; |
| } |
| |
| @Override |
| public long getTotalTransferSize() { |
| try { |
| RebalanceOperation operation = |
| getCache().getResourceManager().createRebalanceFactory().simulate(); |
| RebalanceResults result = operation.getResults(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Rebalance estimate: RebalanceResultsImpl [TotalBucketCreateBytes=" |
| + result.getTotalBucketCreateBytes() + ", TotalBucketCreatesCompleted=" |
| + result.getTotalBucketCreatesCompleted() + ", TotalBucketTransferBytes=" |
| + result.getTotalBucketTransferBytes() + ", TotalBucketTransfersCompleted=" |
| + result.getTotalBucketTransfersCompleted() + ", TotalPrimaryTransfersCompleted=" |
| + result.getTotalPrimaryTransfersCompleted() + "]"); |
| } |
| return result.getTotalBucketTransferBytes(); |
| } catch (CancellationException e) { |
| logger.info("Error while trying to estimate rebalance cost ", e); |
| } catch (InterruptedException e) { |
| logger.info("Error while trying to estimate rebalance cost ", e); |
| } |
| return 0; |
| } |
| |
| @Override |
| public void incrementAttemptCounter() { |
| InternalCache cache = getCache(); |
| try { |
| cache.getInternalResourceManager().getStats().incAutoRebalanceAttempts(); |
| } catch (Exception e) { |
| logger.warn("Failed to increment AutoBalanceAttempts counter"); |
| } |
| } |
| |
| @Override |
| public void rebalance() { |
| try { |
| RebalanceOperation operation = |
| getCache().getResourceManager().createRebalanceFactory().start(); |
| RebalanceResults result = operation.getResults(); |
| logger |
| .info("Rebalance result: [TotalBucketCreateBytes=" + result.getTotalBucketCreateBytes() |
| + ", TotalBucketCreateTime=" + result.getTotalBucketCreateTime() |
| + ", TotalBucketCreatesCompleted=" + result.getTotalBucketCreatesCompleted() |
| + ", TotalBucketTransferBytes=" + result.getTotalBucketTransferBytes() |
| + ", TotalBucketTransferTime=" + result.getTotalBucketTransferTime() |
| + ", TotalBucketTransfersCompleted=" + +result.getTotalBucketTransfersCompleted() |
| + ", TotalPrimaryTransferTime=" + result.getTotalPrimaryTransferTime() |
| + ", TotalPrimaryTransfersCompleted=" + result.getTotalPrimaryTransfersCompleted() |
| + ", TotalTime=" + result.getTotalTime() + "]"); |
| } catch (CancellationException e) { |
| logger.info("Error rebalancing the cluster", e); |
| } catch (InterruptedException e) { |
| logger.info("Error rebalancing the cluster", e); |
| } |
| } |
| |
| InternalCache getCache() { |
| InternalCache result = cache; |
| if (result == null) { |
| throw new IllegalStateException("Missing cache instance."); |
| } |
| if (result.isClosed()) { |
| throw new CacheClosedException(); |
| } |
| return result; |
| } |
| |
| @Override |
| public boolean acquireAutoBalanceLock() { |
| // TODO: delete this double-checking |
| if (!isLockAcquired.get()) { |
| synchronized (isLockAcquired) { |
| if (!isLockAcquired.get()) { |
| DistributedLockService dls = getDLS(); |
| |
| boolean result = dls.lock(AUTO_BALANCER_LOCK, 0, -1); |
| if (result) { |
| isLockAcquired.set(true); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Grabbed AutoBalancer lock"); |
| } |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Another member owns auto-balance lock. Skip this attempt to rebalance the cluster"); |
| } |
| } |
| } |
| } |
| } |
| return isLockAcquired.get(); |
| } |
| |
| @Override |
| public DistributedLockService getDLS() { |
| InternalCache cache = getCache(); |
| DistributedLockService dls = |
| DistributedLockService.getServiceNamed(AUTO_BALANCER_LOCK_SERVICE_NAME); |
| if (dls == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Creating DistributeLockService"); |
| } |
| dls = DLockService.create(AUTO_BALANCER_LOCK_SERVICE_NAME, |
| cache.getInternalDistributedSystem(), true, true, true); |
| } |
| |
| return dls; |
| } |
| |
| @Override |
| public void setCache(Cache cache) { |
| this.cache = (InternalCache) cache; |
| } |
| } |
| |
| private static class SystemClockTimeProvider implements TimeProvider { |
| @Override |
| public long currentTimeMillis() { |
| return System.currentTimeMillis(); |
| } |
| } |
| |
| interface AuditScheduler { |
| void init(String schedule); |
| |
| void destroy(); |
| } |
| |
| interface OOBAuditor { |
| void init(Properties props); |
| |
| void execute(); |
| } |
| |
| interface TimeProvider { |
| long currentTimeMillis(); |
| } |
| |
| interface CacheOperationFacade { |
| boolean acquireAutoBalanceLock(); |
| |
| void setCache(Cache cache); |
| |
| DistributedLockService getDLS(); |
| |
| void rebalance(); |
| |
| void incrementAttemptCounter(); |
| |
| Map<PartitionedRegion, InternalPRInfo> getRegionMemberDetails(); |
| |
| long getTotalDataSize(Map<PartitionedRegion, InternalPRInfo> details); |
| |
| long getTotalTransferSize(); |
| } |
| |
| OOBAuditor getOOBAuditor() { |
| return auditor; |
| } |
| |
| public CacheOperationFacade getCacheOperationFacade() { |
| return this.cacheFacade; |
| } |
| |
| public void destroy() { |
| scheduler.destroy(); |
| } |
| } |