| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache; |
| |
| import static org.apache.geode.internal.statistics.StatisticsClockFactory.disabledClock; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.Lock; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.InternalGemFireError; |
| import org.apache.geode.InternalGemFireException; |
| import org.apache.geode.cache.AttributesFactory; |
| import org.apache.geode.cache.AttributesMutator; |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.CacheListener; |
| import org.apache.geode.cache.CacheLoader; |
| import org.apache.geode.cache.CacheRuntimeException; |
| import org.apache.geode.cache.CustomExpiry; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.EntryEvent; |
| import org.apache.geode.cache.EntryExistsException; |
| import org.apache.geode.cache.EntryNotFoundException; |
| import org.apache.geode.cache.EvictionAttributes; |
| import org.apache.geode.cache.ExpirationAction; |
| import org.apache.geode.cache.ExpirationAttributes; |
| import org.apache.geode.cache.InterestRegistrationEvent; |
| import org.apache.geode.cache.PartitionAttributes; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.Region.Entry; |
| import org.apache.geode.cache.RegionAttributes; |
| import org.apache.geode.cache.RegionDestroyedException; |
| import org.apache.geode.cache.RegionEvent; |
| import org.apache.geode.cache.RegionExistsException; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.cache.execute.Function; |
| import org.apache.geode.cache.execute.FunctionException; |
| import org.apache.geode.cache.execute.ResultSender; |
| import org.apache.geode.cache.persistence.PartitionOfflineException; |
| import org.apache.geode.cache.query.QueryInvalidException; |
| import org.apache.geode.cache.query.internal.QCompiler; |
| import org.apache.geode.cache.query.internal.index.IndexCreationData; |
| import org.apache.geode.cache.query.internal.index.PartitionedIndex; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.OperationExecutors; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.cache.BucketRegion.RawValue; |
| import org.apache.geode.internal.cache.PartitionedRegion.BucketLock; |
| import org.apache.geode.internal.cache.PartitionedRegion.SizeEntry; |
| import org.apache.geode.internal.cache.backup.BackupService; |
| import org.apache.geode.internal.cache.execute.BucketMovedException; |
| import org.apache.geode.internal.cache.execute.PartitionedRegionFunctionResultSender; |
| import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl; |
| import org.apache.geode.internal.cache.execute.metrics.FunctionStats; |
| import org.apache.geode.internal.cache.execute.metrics.FunctionStatsManager; |
| import org.apache.geode.internal.cache.partitioned.Bucket; |
| import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException; |
| import org.apache.geode.internal.cache.partitioned.PartitionedRegionFunctionStreamingMessage; |
| import org.apache.geode.internal.cache.partitioned.PartitionedRegionObserver; |
| import org.apache.geode.internal.cache.partitioned.PartitionedRegionObserverHolder; |
| import org.apache.geode.internal.cache.partitioned.RedundancyAlreadyMetException; |
| import org.apache.geode.internal.cache.partitioned.RemoveBucketMessage; |
| import org.apache.geode.internal.cache.partitioned.RemoveBucketMessage.RemoveBucketResponse; |
| import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; |
| import org.apache.geode.internal.cache.tier.sockets.ServerConnection; |
| import org.apache.geode.internal.cache.wan.AbstractGatewaySender; |
| import org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor; |
| import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; |
| import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; |
| import org.apache.geode.internal.logging.LogService; |
| import org.apache.geode.internal.statistics.StatisticsClock; |
| import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock; |
| import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock.StoppableReadLock; |
| import org.apache.geode.internal.util.concurrent.StoppableReentrantReadWriteLock.StoppableWriteLock; |
| |
| /** |
| * Implementation of DataStore (DS) for a PartitionedRegion (PR). This will be import |
| * org.apache.geode.internal.cache.wan.AbstractGatewaySenderEventProcessor; import |
| * org.apache.geode.internal.cache.wan.GatewaySenderEventImpl; import |
| * org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderImpl; import |
| * org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue; accessed via accessor of |
| * PartitionedRegion or PartionService thread which will handle remote calls to this DataStore from |
| * other nodes participating in this PartitionedRegion. |
| * |
| */ |
| public class PartitionedRegionDataStore implements HasCachePerfStats { |
| private static final Logger logger = LogService.getLogger(); |
| |
| /** PR reference for this DataStore */ |
| protected final PartitionedRegion partitionedRegion; |
| |
| /** |
| * Total memory used by this partition. Only used for statistics. |
| */ |
| private final AtomicLong bytesInUse = new AtomicLong(0); |
| |
| /** |
| * CacheLoader of this PRDataStore |
| */ |
| private CacheLoader loader; |
| |
| /** |
| * Anyone creating a bucket must hold the read lock. Anyone deleting a bucket must hold the write |
| * lock. |
| */ |
| final StoppableReentrantReadWriteLock bucketCreationLock; |
| |
| /** |
| * Maps each bucket to a real region that contains actual key/value entries for this PR instance |
| * <p> |
| * Keys are instances of {@link Integer}. Values are instances of (@link BucketRegion}. |
| */ |
| private final ConcurrentMap<Integer, BucketRegion> localBucket2RegionMap; |
| |
| /** |
| * A counter of the number of concurrent bucket creates in progress on this node |
| */ |
| final AtomicInteger bucketCreatesInProgress = new AtomicInteger(); |
| |
| /** |
| * Variable used to determine when the data store is over the local max memory limit |
| */ |
| private boolean exceededLocalMaxMemoryLimit = false; |
| |
| /** |
| * Maximum number of bytes this dataStore has for storage |
| */ |
| private final long maximumLocalBytes; |
| |
| private final CachePerfStats bucketStats; |
| |
| /** |
| * The keysOfInterest contains a set of all keys in which any client has interest in this PR. |
| */ |
| final ConcurrentMap keysOfInterest; |
| |
| private final Object keysOfInterestLock = new Object(); |
| |
| private final StatisticsClock statisticsClock; |
| |
| /** |
| * Update an entry's last access time if a client is interested in the entry. |
| */ |
| private static final boolean UPDATE_ACCESS_TIME_ON_INTEREST = |
| Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "updateAccessTimeOnClientInterest"); |
| |
| |
| // Only for testing |
| PartitionedRegionDataStore() { |
| statisticsClock = disabledClock(); |
| this.bucketCreationLock = null; |
| bucketStats = null; |
| partitionedRegion = null; |
| maximumLocalBytes = -1; |
| this.localBucket2RegionMap = new ConcurrentHashMap<Integer, BucketRegion>(); |
| keysOfInterest = null; |
| } |
| |
| /** |
| * Creates PartitionedRegionDataStore for dataStorage of PR and starts a PartitionService to |
| * handle remote operations on this DataStore from other participating nodes. |
| * |
| * @param pr PartitionedRegion associated with this DataStore. |
| */ |
| PartitionedRegionDataStore(final PartitionedRegion pr, StatisticsClock statisticsClock) { |
| this.statisticsClock = statisticsClock; |
| final int bucketCount = pr.getTotalNumberOfBuckets(); |
| this.localBucket2RegionMap = new ConcurrentHashMap<Integer, BucketRegion>(bucketCount); |
| this.partitionedRegion = pr; |
| this.bucketCreationLock = new StoppableReentrantReadWriteLock(pr.getCancelCriterion()); |
| if (pr.getAttributes().getCacheLoader() != null) { |
| this.loader = pr.getAttributes().getCacheLoader(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Installing cache loader from partitioned region attributes: {}", loader); |
| } |
| } |
| // this.maximumLocalBytes = (long) (pr.getLocalMaxMemory() * |
| // PartitionedRegionHelper.BYTES_PER_MB |
| // * this.partitionedRegion.rebalanceThreshold); |
| this.maximumLocalBytes = (pr.getLocalMaxMemory() * PartitionedRegionHelper.BYTES_PER_MB); |
| |
| // this.bucketStats = new CachePerfStats(pr.getSystem(), "partition-" + pr.getName()); |
| this.bucketStats = |
| new RegionPerfStats(pr.getCache().getInternalDistributedSystem().getStatisticsManager(), |
| "RegionStats-partition-" + pr.getName(), pr.getCachePerfStats(), pr, |
| pr.getCache().getMeterRegistry(), statisticsClock); |
| this.keysOfInterest = new ConcurrentHashMap(); |
| } |
| |
| /** |
| * This method creates a PartitionedRegionDataStore be invoking the PRDS Constructor. |
| * |
| * @return @throws PartitionedRegionException |
| */ |
| static PartitionedRegionDataStore createDataStore(Cache cache, PartitionedRegion pr, |
| PartitionAttributes pa, StatisticsClock statisticsClock) throws PartitionedRegionException { |
| return new PartitionedRegionDataStore(pr, statisticsClock); |
| } |
| |
| ConcurrentMap<Integer, BucketRegion> getLocalBucket2RegionMap() { |
| return this.localBucket2RegionMap; |
| } |
| |
| /** |
| * Test to determine if this data store is managing a bucket |
| * |
| * @param bucketId the id of the bucket |
| * @return true if the provided bucket is being managed |
| */ |
| public boolean isManagingBucket(int bucketId) { |
| BucketRegion buk = this.localBucket2RegionMap.get(Integer.valueOf(bucketId)); |
| if (buk != null && !buk.isDestroyed()) { |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * Report the number of buckets currently managed by this DataStore |
| */ |
| public short getBucketsManaged() { |
| return (short) this.localBucket2RegionMap.size(); |
| } |
| |
| /** |
| * Report an estimate of the number of primary buckets managed locally The result of this method |
| * is not stable. |
| */ |
| public int getNumberOfPrimaryBucketsManaged() { |
| final AtomicInteger numPrimaries = new AtomicInteger(); |
| visitBuckets(new BucketVisitor() { |
| @Override |
| public void visit(Integer bucketId, Region r) { |
| BucketRegion br = (BucketRegion) r; |
| if (br.getBucketAdvisor().isPrimary()) { |
| numPrimaries.incrementAndGet(); |
| } |
| } |
| }); |
| return numPrimaries.get(); |
| } |
| |
| public boolean isPartitionedRegionReady(PartitionedRegion partitionedRegion, final int bucketId) { |
| List<PartitionedRegion> colocatedWithList = getColocatedChildRegions(partitionedRegion); |
| if (colocatedWithList.size() == 0) { |
| return partitionedRegion.isInitialized(); |
| } |
| return areAllColocatedPartitionedRegionsReady(bucketId, colocatedWithList); |
| } |
| |
| private boolean areAllColocatedPartitionedRegionsReady(int bucketId, |
| List<PartitionedRegion> colocatedWithList) { |
| return colocatedWithList.stream().allMatch( |
| partitionedRegion -> isColocatedPartitionedRegionInitialized(partitionedRegion, bucketId)); |
| } |
| |
| private boolean isColocatedPartitionedRegionInitialized(PartitionedRegion partitionedRegion, |
| final int bucketId) { |
| if (!partitionedRegion.isInitialized()) { |
| return false; |
| } |
| if (!partitionedRegion.getDataStore().isColocationComplete(bucketId)) { |
| return false; |
| } |
| List<PartitionedRegion> colocatedWithList = getColocatedChildRegions(partitionedRegion); |
| return areAllColocatedPartitionedRegionsReady(bucketId, colocatedWithList); |
| } |
| |
| List<PartitionedRegion> getColocatedChildRegions(PartitionedRegion partitionedRegion) { |
| return ColocationHelper.getColocatedChildRegions(partitionedRegion); |
| } |
| |
| /** |
| * Try to grab buckets for all the colocated regions /* In case we can't grab buckets there is no |
| * going back |
| * |
| */ |
| |
| protected CreateBucketResult grabFreeBucketRecursively(final int bucketId, |
| final PartitionedRegion pr, final InternalDistributedMember moveSource, |
| final boolean forceCreation, final boolean isRebalance, final boolean replaceOfflineData, |
| final InternalDistributedMember creationRequestor, final boolean isDiskRecovery) { |
| CreateBucketResult grab; |
| DistributedMember dm = pr.getMyId(); |
| List<PartitionedRegion> colocatedWithList = ColocationHelper.getColocatedChildRegions(pr); |
| // make sure we force creation and ignore redundancy checks for the child region. |
| // if we created the parent bucket, we want to make sure we create the child bucket. |
| grab = pr.getDataStore().grabFreeBucket(bucketId, dm, null, true, isRebalance, true, |
| replaceOfflineData, creationRequestor); |
| if (!grab.nowExists()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Failed grab for bucketId = {}{}{}", pr.getPRId(), pr.BUCKET_ID_SEPARATOR, |
| bucketId); |
| } |
| // Assert.assertTrue(nList.contains(partitionedRegion.getNode().getMemberId()) |
| // , |
| // " grab returned false and b2n does not contains this member."); |
| } |
| if (colocatedWithList != null) { |
| Iterator<PartitionedRegion> itr = colocatedWithList.iterator(); |
| while (itr.hasNext()) { |
| PartitionedRegion coLocatedWithPr = itr.next(); |
| if ((isDiskRecovery || coLocatedWithPr.isInitialized()) |
| && coLocatedWithPr.getDataStore().isColocationComplete(bucketId)) { |
| grab = coLocatedWithPr.getDataStore().grabFreeBucketRecursively(bucketId, coLocatedWithPr, |
| moveSource, forceCreation, isRebalance, replaceOfflineData, creationRequestor, |
| isDiskRecovery); |
| |
| if (!grab.nowExists()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Failed grab for bucketId = {}{}{}", pr.getPRId(), |
| pr.BUCKET_ID_SEPARATOR, bucketId); |
| } |
| // Assert.assertTrue(nList.contains(partitionedRegion.getNode().getMemberId()) |
| // , |
| // " grab returned false and b2n does not contains this member."); |
| } |
| } |
| } |
| } |
| return grab; |
| } |
| |
| /** |
| * Attempts to map a bucket id to this node. Creates real storage for the bucket by adding a new |
| * Region to bucket2Map. Bucket creation is done under the d-lock on b2n region. |
| * |
| * @param possiblyFreeBucketId the identity of the bucket + @param mustBeNew boolean enforcing |
| * that the bucket must not already exist |
| * @param sender the member requesting the bucket |
| * @param moveSource Where we are moving the bucket from, if this is a move. |
| * @param forceCreation avoid any checks (with in reason) which might prevent bucket creation |
| * @param isRebalance true if bucket creation is directed by rebalancing |
| * @return true if successful |
| */ |
| CreateBucketResult grabFreeBucket(final int possiblyFreeBucketId, final DistributedMember sender, |
| final InternalDistributedMember moveSource, final boolean forceCreation, |
| final boolean isRebalance, final boolean lockRedundancyLock, boolean replaceOffineData, |
| InternalDistributedMember creationRequestor) { |
| |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| |
| long startTime = this.partitionedRegion.getPrStats().startBucketCreate(isRebalance); |
| boolean createdBucket = false; |
| PartitionedRegionObserver observer = PartitionedRegionObserverHolder.getInstance(); |
| observer.beforeBucketCreation(this.partitionedRegion, possiblyFreeBucketId); |
| try { |
| |
| CreateBucketResult result = CreateBucketResult.FAILED; |
| if (isManagingBucket(possiblyFreeBucketId)) { |
| if (isDebugEnabled) { |
| logger.debug("grabFreeBucket: VM {} already contains the bucket with bucketId={}{}{}", |
| this.partitionedRegion.getMyId(), partitionedRegion.getPRId(), |
| PartitionedRegion.BUCKET_ID_SEPARATOR, possiblyFreeBucketId); |
| } |
| this.partitionedRegion.checkReadiness(); |
| return CreateBucketResult.ALREADY_EXISTS; |
| } |
| StoppableReadLock parentBucketCreationLock = getParentBucketCreationLock(); |
| if (parentBucketCreationLock != null) { |
| parentBucketCreationLock.lock(); |
| } |
| |
| try { |
| if (!okToCreateChildBucket(possiblyFreeBucketId)) { |
| return CreateBucketResult.FAILED; |
| } |
| StoppableReadLock lock = this.bucketCreationLock.readLock(); |
| lock.lock(); // prevent destruction while creating buckets |
| try { |
| // This counter is used by canAccomodateAnotherBucket to estimate if this member should |
| // accept another bucket |
| bucketCreatesInProgress.incrementAndGet(); |
| if (this.partitionedRegion.isDestroyed()) { |
| return CreateBucketResult.FAILED; |
| } |
| |
| // final boolean needsAllocation; |
| if (isDebugEnabled) { |
| this.logger.debug("grabFreeBucket: node list {} for bucketId={}{}{}", |
| PartitionedRegionHelper.printCollection(this.partitionedRegion.getRegionAdvisor() |
| .getBucketOwners(possiblyFreeBucketId)), |
| partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, |
| possiblyFreeBucketId); |
| } |
| |
| |
| // Final accommodation check under synchronization for |
| // a stable view of bucket creation |
| if (!forceCreation) { |
| // Currently the balancing check should only run when creating buckets, |
| // as opposed to during bucket recovery... it is assumed that |
| // ifRedudnancyNotSatisfied is false during bucket recovery |
| if (!canAccommodateAnotherBucket()) { |
| result = CreateBucketResult.FAILED; |
| return result; |
| } |
| } |
| |
| ProxyBucketRegion buk = |
| partitionedRegion.getRegionAdvisor().getProxyBucketArray()[possiblyFreeBucketId]; |
| // Prevent other threads on the same VM from creating this bucket. |
| // It doesn't look the the redundancy lock actually correctly |
| // handles multiple threads, and the isManagingBucket call |
| // also needs to be done under this lock |
| synchronized (buk) { |
| // DAN - this just needs to be done holding a lock for this particular bucket |
| if (!verifyBucketBeforeGrabbing(possiblyFreeBucketId)) { |
| result = CreateBucketResult.FAILED; |
| return result; |
| } |
| |
| if (!this.isManagingBucket(possiblyFreeBucketId)) { |
| Integer possiblyFreeBucketIdInt = Integer.valueOf(possiblyFreeBucketId); |
| |
| BucketRegion bukReg = null; |
| Object redundancyLock = |
| lockRedundancyLock(moveSource, possiblyFreeBucketId, replaceOffineData); |
| // DAN - I hope this is ok to do without that bucket admin lock |
| try { |
| buk.initializePrimaryElector(creationRequestor); |
| if (getPartitionedRegion().getColocatedWith() == null) { |
| buk.getBucketAdvisor().setShadowBucketDestroyed(false); |
| } |
| if (getPartitionedRegion().isShadowPR()) { |
| getPartitionedRegion().getColocatedWithRegion().getRegionAdvisor() |
| .getBucketAdvisor(possiblyFreeBucketId).setShadowBucketDestroyed(false); |
| } |
| bukReg = createBucketRegion(possiblyFreeBucketId); |
| // Mark the bucket as hosting and distribute to peers |
| // before we release the dlock. This makes sure that our peers |
| // won't think they need to satisfy redundancy |
| if (bukReg != null) { |
| // Let the data store know about the real bucket at this point |
| // so that other VMs which discover the real bucket via a |
| // profile exchange can send messages to the data store and |
| // safely use the bucket. |
| observer.beforeAssignBucket(this.partitionedRegion, possiblyFreeBucketId); |
| assignBucketRegion(bukReg.getId(), bukReg); |
| buk.setHosting(true); |
| bukReg.invokePartitionListenerAfterBucketCreated(); |
| } else { |
| if (buk.getPartitionedRegion().getColocatedWith() == null) { |
| buk.getBucketAdvisor().setShadowBucketDestroyed(true); |
| // clear tempQueue for all the shadowPR buckets |
| clearAllTempQueueForShadowPR(buk.getBucketId()); |
| } |
| } |
| } finally { |
| releaseRedundancyLock(redundancyLock); |
| if (bukReg == null) { |
| buk.clearPrimaryElector(); |
| } |
| } |
| |
| if (bukReg != null) { |
| if (isDebugEnabled) { |
| logger.debug("grabFreeBucket: mapped bucketId={}{}{} on node = {}", |
| this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, |
| possiblyFreeBucketId, this.partitionedRegion.getMyId()); |
| } |
| |
| createdBucket = true; |
| result = CreateBucketResult.CREATED; |
| } else { |
| Assert.assertTrue(this.localBucket2RegionMap.get(possiblyFreeBucketIdInt) == null); |
| result = CreateBucketResult.FAILED; |
| } |
| } else { |
| // Handle the case where another concurrent thread decided to manage |
| // the bucket and the creator may have died |
| if (isDebugEnabled) { |
| logger.debug("grabFreeBucket: bucketId={}{}{} already mapped on VM = {}", |
| this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, |
| possiblyFreeBucketId, partitionedRegion.getMyId()); |
| } |
| result = CreateBucketResult.ALREADY_EXISTS; |
| } |
| if (isDebugEnabled) { |
| logger.debug("grabFreeBucket: Mapped bucketId={}{}{}", |
| this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, |
| possiblyFreeBucketId); |
| } |
| } |
| } catch (RegionDestroyedException rde) { |
| RegionDestroyedException rde2 = |
| new RegionDestroyedException(toString(), this.partitionedRegion.getFullPath()); |
| rde2.initCause(rde); |
| throw rde2; |
| } catch (RedundancyAlreadyMetException e) { |
| if (isDebugEnabled) { |
| logger.debug("Redundancy already met {}{}{} assignment {}", |
| this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, |
| possiblyFreeBucketId, |
| localBucket2RegionMap.get(Integer.valueOf(possiblyFreeBucketId))); |
| } |
| result = CreateBucketResult.REDUNDANCY_ALREADY_SATISFIED; |
| } finally { |
| bucketCreatesInProgress.decrementAndGet(); |
| lock.unlock(); // prevent destruction while creating buckets |
| } |
| } finally { |
| if (parentBucketCreationLock != null) { |
| parentBucketCreationLock.unlock(); |
| } |
| } |
| this.partitionedRegion.checkReadiness(); |
| this.partitionedRegion.checkClosed(); |
| |
| return result; |
| |
| } catch (PartitionOfflineException validationException) { |
| // GEODE-3055 |
| PartitionedRegion leader = ColocationHelper.getLeaderRegion(this.partitionedRegion); |
| boolean isLeader = leader.equals(this.partitionedRegion); |
| if (!isLeader) { |
| leader.getDataStore().removeBucket(possiblyFreeBucketId, true); |
| if (isDebugEnabled) { |
| logger.debug("For bucket " + possiblyFreeBucketId |
| + ", failed to create cololcated child bucket for " |
| + this.partitionedRegion.getFullPath() + ", removed leader region " |
| + leader.getFullPath() + " bucket."); |
| } |
| } |
| throw validationException; |
| } finally { |
| this.partitionedRegion.getPrStats().endBucketCreate(startTime, createdBucket, isRebalance); |
| } |
| } |
| |
| protected void clearAllTempQueueForShadowPR(final int bucketId) { |
| List<PartitionedRegion> colocatedWithList = |
| ColocationHelper.getColocatedChildRegions(partitionedRegion); |
| for (PartitionedRegion childRegion : colocatedWithList) { |
| if (childRegion.isShadowPR()) { |
| AbstractGatewaySender sender = childRegion.getParallelGatewaySender(); |
| if (sender == null) { |
| return; |
| } |
| AbstractGatewaySenderEventProcessor eventProcessor = sender.getEventProcessor(); |
| if (eventProcessor == null) { |
| return; |
| } |
| |
| ConcurrentParallelGatewaySenderQueue queue = |
| (ConcurrentParallelGatewaySenderQueue) eventProcessor.getQueue(); |
| |
| if (queue == null) { |
| return; |
| } |
| BlockingQueue<GatewaySenderEventImpl> tempQueue = queue.getBucketTmpQueue(bucketId); |
| if (tempQueue != null) { |
| synchronized (tempQueue) { |
| for (GatewaySenderEventImpl event : tempQueue) { |
| event.release(); |
| } |
| tempQueue.clear(); |
| } |
| } |
| } |
| } |
| } |
| |
| public Object lockRedundancyLock(InternalDistributedMember moveSource, int bucketId, |
| boolean replaceOffineData) { |
| // TODO prperist - Make this thing easier to find! |
| final PartitionedRegion.BucketLock bl = partitionedRegion.getRegionAdvisor() |
| .getBucketAdvisor(bucketId).getProxyBucketRegion().getBucketLock(); |
| bl.lock(); |
| boolean succeeded = false; |
| try { |
| ProxyBucketRegion buk = partitionedRegion.getRegionAdvisor().getProxyBucketArray()[bucketId]; |
| if (!buk.checkBucketRedundancyBeforeGrab(moveSource, replaceOffineData)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Redundancy already satisfied. current owners=", |
| partitionedRegion.getRegionAdvisor().getBucketOwners(bucketId)); |
| } |
| throw new RedundancyAlreadyMetException(); |
| } |
| succeeded = true; |
| } finally { |
| if (!succeeded) { |
| bl.unlock(); |
| } |
| } |
| |
| return bl; |
| } |
| |
| public void releaseRedundancyLock(Object lock) { |
| try { |
| |
| } finally { |
| PartitionedRegion.BucketLock bl = (BucketLock) lock; |
| bl.unlock(); |
| } |
| } |
| |
| private StoppableReadLock getParentBucketCreationLock() { |
| PartitionedRegion colocatedRegion = ColocationHelper.getColocatedRegion(this.partitionedRegion); |
| StoppableReadLock parentLock = null; |
| if (colocatedRegion != null) { |
| parentLock = colocatedRegion.getDataStore().bucketCreationLock.readLock(); |
| return parentLock; |
| } |
| return null; |
| } |
| |
| /** |
| * Returns false if this region is colocated and parent bucket does not exist. |
| * |
| * @return true if ok to make bucket |
| */ |
| private boolean okToCreateChildBucket(int bucketId) { |
| PartitionedRegion colocatedRegion = ColocationHelper.getColocatedRegion(this.partitionedRegion); |
| if (colocatedRegion != null && !colocatedRegion.getDataStore().isManagingBucket(bucketId)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("okToCreateChildBucket - we don't manage the parent bucket"); |
| } |
| return false; |
| } |
| if (!isColocationComplete(bucketId)) { |
| return false; |
| } |
| |
| return true; |
| } |
| |
| boolean isColocationComplete(int bucketId) { |
| |
| if (!ColocationHelper.isColocationComplete(this.partitionedRegion)) { |
| ProxyBucketRegion pb = |
| this.partitionedRegion.getRegionAdvisor().getProxyBucketArray()[bucketId]; |
| BucketPersistenceAdvisor persistenceAdvisor = pb.getPersistenceAdvisor(); |
| |
| // Don't worry about colocation if we're recovering a persistent |
| // bucket. The bucket must have been properly colocated earlier. |
| if (persistenceAdvisor != null && persistenceAdvisor.wasHosting()) { |
| return true; |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("Colocation is incomplete"); |
| } |
| return false; |
| } |
| return true; |
| } |
| |
| |
| |
| /** |
| * This method creates bucket regions, based on redundancy level. If redundancy level is: a) = 1 |
| * it creates a local region b) >1 it creates a distributed region |
| * |
| * @return @throws CacheException |
| */ |
| private BucketRegion createBucketRegion(int bucketId) { |
| this.partitionedRegion.checkReadiness(); |
| |
| BucketAttributesFactory factory = new BucketAttributesFactory(); |
| |
| boolean isPersistent = this.partitionedRegion.getDataPolicy().withPersistence(); |
| if (isPersistent) { |
| factory.setDataPolicy(DataPolicy.PERSISTENT_REPLICATE); |
| setDiskAttributes(factory); |
| } else { |
| factory.setDataPolicy(DataPolicy.REPLICATE); |
| } |
| |
| if (PartitionedRegion.DISABLE_SECONDARY_BUCKET_ACK) { |
| factory.setScope(Scope.DISTRIBUTED_NO_ACK); |
| } else { |
| factory.setScope(Scope.DISTRIBUTED_ACK); |
| } |
| factory.setConcurrencyChecksEnabled(this.partitionedRegion.getConcurrencyChecksEnabled()); |
| factory.setIndexMaintenanceSynchronous(this.partitionedRegion.getIndexMaintenanceSynchronous()); |
| |
| if (this.partitionedRegion.getValueConstraint() != null) { |
| factory.setValueConstraint(this.partitionedRegion.getValueConstraint()); |
| } |
| if (this.loader != null) { |
| factory.setCacheLoader(this.loader); |
| } |
| factory.setEnableAsyncConflation(true); |
| |
| if (Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "PRDebug")) { |
| factory.addCacheListener(createDebugBucketListener()); |
| } |
| |
| if (this.partitionedRegion.getStatisticsEnabled()) { |
| factory.setStatisticsEnabled(true); |
| } |
| |
| factory.setCloningEnabled(this.partitionedRegion.getCloningEnabled()); |
| |
| ExpirationAttributes ea = this.partitionedRegion.getAttributes().getEntryIdleTimeout(); |
| if (ea != null) { |
| factory.setEntryIdleTimeout(ea); |
| } |
| ea = this.partitionedRegion.getAttributes().getEntryTimeToLive(); |
| if (ea != null) { |
| factory.setEntryTimeToLive(ea); |
| } |
| ea = this.partitionedRegion.getAttributes().getRegionIdleTimeout(); |
| if (ea != null) { |
| if (ea.getAction() != ExpirationAction.DESTROY) |
| factory.setRegionIdleTimeout(ea); |
| } |
| ea = this.partitionedRegion.getAttributes().getRegionTimeToLive(); |
| if (ea != null) { |
| if (ea.getAction() != ExpirationAction.DESTROY) |
| factory.setRegionTimeToLive(ea); |
| } |
| CustomExpiry ce = this.partitionedRegion.getAttributes().getCustomEntryIdleTimeout(); |
| if (ce != null) { |
| factory.setCustomEntryIdleTimeout(ce); |
| } |
| ce = this.partitionedRegion.getAttributes().getCustomEntryTimeToLive(); |
| if (ce != null) { |
| factory.setCustomEntryTimeToLive(ce); |
| } |
| |
| if (this.partitionedRegion.getStatisticsEnabled()) { |
| factory.setStatisticsEnabled(true); |
| } |
| EvictionAttributesImpl eva = |
| (EvictionAttributesImpl) this.partitionedRegion.getEvictionAttributes(); |
| if (eva != null) { |
| EvictionAttributes evBucket = eva; |
| factory.setEvictionAttributes(evBucket); |
| if (evBucket.getAction().isOverflowToDisk()) { |
| setDiskAttributes(factory); |
| } |
| } |
| |
| factory.setCompressor(this.partitionedRegion.getCompressor()); |
| factory.setOffHeap(this.partitionedRegion.getOffHeap()); |
| |
| factory.setBucketRegion(true); // prevent validation problems |
| RegionAttributes attributes = factory.create(); |
| |
| final String bucketRegionName = this.partitionedRegion.getBucketName(bucketId); |
| |
| LocalRegion rootRegion = PartitionedRegionHelper.getPRRoot(this.partitionedRegion.getCache()); |
| BucketRegion bucketRegion = null; |
| |
| if (Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "PRDebug")) { |
| logger.info("createBucketRegion: Creating bucketId, {} name, {}.", |
| this.partitionedRegion.bucketStringForLogs(bucketId), |
| bucketRegionName); |
| } |
| try { |
| final Bucket proxyBucket = this.partitionedRegion.getRegionAdvisor().getBucket(bucketId); |
| bucketRegion = (BucketRegion) rootRegion.createSubregion(bucketRegionName, attributes, |
| new InternalRegionArguments() |
| .setPartitionedRegionBucketRedundancy(this.partitionedRegion.getRedundantCopies()) |
| .setBucketAdvisor(proxyBucket.getBucketAdvisor()) |
| .setPersistenceAdvisor(proxyBucket.getPersistenceAdvisor()) |
| .setDiskRegion(proxyBucket.getDiskRegion()).setCachePerfStatsHolder(this) |
| .setLoaderHelperFactory(this.partitionedRegion) |
| .setPartitionedRegion(this.partitionedRegion) |
| .setIndexes(getIndexes(rootRegion.getFullPath(), bucketRegionName))); |
| this.partitionedRegion.getPrStats().incBucketCount(1); |
| } catch (RegionExistsException ex) { |
| // Bucket Region is already created, so do nothing. |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "PartitionedRegionDataStore#createBucketRegion: Bucket region already created for bucketId={}{}{}", |
| this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, ex); |
| } |
| bucketRegion = (BucketRegion) rootRegion.getSubregion(bucketRegionName); |
| } catch (IOException ieo) { |
| logger.debug("Unexpected error creating bucket in region", ieo); |
| Assert.assertTrue(false, "IOException creating bucket Region: " + ieo); |
| } catch (ClassNotFoundException cne) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Unexpected error creating bucket in region", cne); |
| } |
| Assert.assertTrue(false, "ClassNotFoundException creating bucket Region: " + cne); |
| } catch (InternalGemFireError e) { |
| if (logger.isDebugEnabled()) { |
| logger.info("Assertion error creating bucket in region", |
| e); |
| } |
| this.getPartitionedRegion().checkReadiness(); |
| throw e; |
| } |
| // Determine the size of the bucket (the Region in this case is mirrored, |
| // get initial image has populated the bucket, compute the size of the |
| // region) |
| if (Boolean.getBoolean(DistributionConfig.GEMFIRE_PREFIX + "PRDebug")) { |
| dumpBuckets(); |
| dumpBucket(bucketId, bucketRegion); |
| } |
| |
| // Iterator i = localRegion.entrySet().iterator(); |
| // while (i.hasNext()) { |
| // try { |
| // NonTXEntry nte = (NonTXEntry) i.next(); |
| // // updateBucket2Size(bucketId.longValue(), localRegion, null); |
| // // nte.getRegionEntry().getValueInVM(); |
| // } catch (EntryDestroyedException ignore) {} |
| // } |
| |
| return bucketRegion; |
| } |
| |
| private List getIndexes(String rootRegion, String bucketRegion) { |
| List indexes = null; |
| if (!this.partitionedRegion.isIndexed()) { |
| return indexes; |
| } |
| |
| // Get PR indexes. |
| Map indexMap = this.partitionedRegion.getIndex(); |
| if (indexMap == null || indexMap.isEmpty()) { |
| return indexes; |
| } |
| |
| // Build index info thats used to create indexes on bucket regions. |
| indexes = new ArrayList(); |
| Set indexSet = indexMap.entrySet(); |
| for (Iterator it = indexSet.iterator(); it.hasNext();) { |
| try { |
| Map.Entry indexEntry = (Map.Entry) it.next(); |
| PartitionedIndex index = (PartitionedIndex) indexEntry.getValue(); |
| IndexCreationData icd = new IndexCreationData(index.getName()); |
| new QCompiler(); |
| String imports = index.getImports(); |
| icd.setIndexData(index.getType(), index.getCanonicalizedFromClause(), |
| index.getCanonicalizedIndexedExpression(), index.getImports()); |
| icd.setPartitionedIndex(index); |
| indexes.add(icd); |
| } catch (Exception ignor) { |
| // since bucket creation should not fail. |
| logger.info(String.format("Excpetion in bucket index creation : %s", |
| ignor.getLocalizedMessage()), |
| ignor); |
| } |
| } |
| return indexes; |
| } |
| |
| |
| private void setDiskAttributes(BucketAttributesFactory factory) { |
| factory.setDiskSynchronous(this.partitionedRegion.getAttributes().isDiskSynchronous()); |
| factory.setDiskStoreName(this.partitionedRegion.getAttributes().getDiskStoreName()); |
| } |
| |
| public void assignBucketRegion(int bucketId, BucketRegion bukReg) { |
| final Object oldbukReg = |
| this.localBucket2RegionMap.putIfAbsent(Integer.valueOf(bucketId), bukReg); |
| if (logger.isDebugEnabled()) { |
| logger.debug("assigning bucket {} old assignment: {}", bukReg, oldbukReg); |
| } |
| Assert.assertTrue(oldbukReg == null); |
| } |
| |
| /* |
| * public void removeBucketRegion(int bucketId) { |
| * Assert.assertHoldsLock(this.bucketAdminLock,true); |
| * this.localBucket2RegionMap.remove(Long.valueOf(bucketId)); } |
| */ |
| |
| private CacheListener createDebugBucketListener() { |
| return new CacheListener() { |
| @Override |
| public void afterCreate(EntryEvent event) { |
| EntryEventImpl ee = (EntryEventImpl) event; |
| logger.debug("BucketListener: o={}, r={}, k={}, nv={}, dm={}", event.getOperation(), |
| event.getRegion().getFullPath(), event.getKey(), ee.getRawNewValue(), |
| event.getDistributedMember()); |
| } |
| |
| @Override |
| public void afterUpdate(EntryEvent event) { |
| EntryEventImpl ee = (EntryEventImpl) event; |
| logger.debug("BucketListener: o={}, r={}, k={}, ov={}, nv={}, dm={}", event.getOperation(), |
| event.getRegion().getFullPath(), event.getKey(), ee.getRawOldValue(), |
| ee.getRawNewValue(), event.getDistributedMember()); |
| } |
| |
| @Override |
| public void afterInvalidate(EntryEvent event) { |
| logger.debug("BucketListener: o={}, r={}, k={}, dm={}", event.getOperation(), |
| event.getRegion().getFullPath(), event.getKey(), event.getDistributedMember()); |
| } |
| |
| @Override |
| public void afterDestroy(EntryEvent event) { |
| logger.debug("BucketListener: o={}, r={}, k={}, dm={}", event.getOperation(), |
| event.getRegion().getFullPath(), event.getKey(), event.getDistributedMember()); |
| } |
| |
| @Override |
| public void afterRegionInvalidate(RegionEvent event) {} |
| |
| @Override |
| public void afterRegionDestroy(RegionEvent event) {} |
| |
| @Override |
| public void afterRegionClear(RegionEvent event) {} |
| |
| @Override |
| public void afterRegionCreate(RegionEvent event) {} |
| |
| @Override |
| public void afterRegionLive(RegionEvent event) {} |
| |
| @Override |
| public void close() {} |
| }; |
| } |
| |
| public CacheLoader getCacheLoader() { |
| return this.loader; |
| } |
| |
| /** |
| * sent by the partitioned region when its loader has changed |
| */ |
| protected void cacheLoaderChanged(final CacheLoader newLoader, final CacheLoader oldLoader) { |
| StoppableWriteLock lock = this.bucketCreationLock.writeLock(); |
| lock.lock(); |
| try { |
| this.loader = newLoader; |
| visitBuckets(new BucketVisitor() { |
| @Override |
| public void visit(Integer bucketId, Region r) { |
| AttributesMutator mut = r.getAttributesMutator(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("setting new cache loader in bucket region: {}", newLoader); |
| } |
| mut.setCacheLoader(newLoader); |
| } |
| }); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| protected void lockBucketCreationAndVisit(BucketVisitor visitor) { |
| StoppableWriteLock lock = this.bucketCreationLock.writeLock(); |
| lock.lock(); |
| try { |
| visitBuckets(visitor); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * Gets the total amount of memory in bytes allocated for all values for this PR in this VM. This |
| * is the current memory (MB) watermark for data in this PR. |
| * |
| * If eviction to disk is enabled, this does not reflect the size of entries on disk. |
| * |
| * @return the total memory size in bytes for all the Map's values |
| */ |
| public long currentAllocatedMemory() { |
| return this.bytesInUse.get(); |
| } |
| |
| /** |
| * Checks if this PartitionedRegionDataStore has the capacity to handle the bucket creation |
| * request. If so, creates the real storage for the bucket. |
| * |
| * @param bucketId the bucket id |
| * @param size the size in bytes of the bucket to create locally |
| * @param forceCreation ignore local maximum buckets check |
| * @return true if already managing the bucket or if the bucket has been created |
| */ |
| public boolean handleManageBucketRequest(int bucketId, int size, InternalDistributedMember sender, |
| boolean forceCreation) { |
| |
| // check maxMemory setting |
| if (!this.partitionedRegion.isDataStore()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("handleRemoteManageBucket: local max memory is zero"); |
| } |
| return false; |
| } |
| if (!forceCreation && !canAccommodateMoreBytesSafely(size)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Partitioned Region {} has exceeded local maximum memory configuration {} Mb, current size is {} Mb", |
| this.partitionedRegion.getFullPath(), this.partitionedRegion.getLocalMaxMemory(), |
| (this.bytesInUse.get() / PartitionedRegionHelper.BYTES_PER_MB)); |
| |
| logger.debug("Refusing remote bucket creation request for bucketId={}{}{} of size {} Mb.", |
| this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, |
| (size / PartitionedRegionHelper.BYTES_PER_MB)); |
| } |
| return false; |
| } |
| |
| if (!forceCreation && !canAccommodateAnotherBucket()) { |
| return false; |
| } |
| |
| boolean createdBucket = false; |
| if (grabBucket(bucketId, null, forceCreation, false, true, sender, false).nowExists()) { |
| this.partitionedRegion.checkReadiness(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "handleManageBucketRequest: successful, returning:{} bucketId={}{}{} for PR = {}", |
| this.partitionedRegion.getMyId(), this.partitionedRegion.getPRId(), |
| PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, this.getName()); |
| } |
| createdBucket = true; |
| } else { |
| // somebody else got it already |
| if (logger.isDebugEnabled()) { |
| logger.debug("handleManageBucketRequest: someone else grabbed this bucket"); |
| } |
| } |
| |
| return createdBucket; |
| } |
| |
| /** |
| * Determine if the ratio of buckets this VM should host is appropriate given its localMaxMemory |
| * setting as compared to others |
| * |
| * @return true if this data store can host another bucket |
| */ |
| boolean canAccommodateAnotherBucket() { |
| final int localMax = this.partitionedRegion.getLocalMaxMemory(); |
| |
| double totalMax = |
| (double) this.partitionedRegion.getRegionAdvisor().adviseTotalMemoryAllocation() + localMax; |
| Assert.assertTrue(totalMax > 0.0); |
| |
| final double memoryRatio = localMax / totalMax; |
| |
| Assert.assertTrue(memoryRatio > 0.0); |
| Assert.assertTrue(memoryRatio <= 1.0); |
| |
| final int totalBucketInstances = this.partitionedRegion.getTotalNumberOfBuckets() |
| * (this.partitionedRegion.getRedundantCopies() + 1); |
| |
| final double numBucketsToHostLocally = Math.ceil(memoryRatio * totalBucketInstances); |
| |
| Assert.assertTrue(numBucketsToHostLocally > 0.0); |
| |
| // Pessimistically assume that all concurrent bucket creates will succeed. |
| // -1 because we've already incremented bucketCreatesInProgress to include this thread. |
| final int currentNumBuckets = |
| this.localBucket2RegionMap.size() + bucketCreatesInProgress.intValue() - 1; |
| boolean ret = numBucketsToHostLocally > currentNumBuckets; |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "canAccomodateAnotherBucket: local VM can host {} does host {} concurrent creates {}", |
| numBucketsToHostLocally, this.localBucket2RegionMap.size(), |
| (bucketCreatesInProgress.intValue() - 1)); |
| } |
| |
| if (!ret) { |
| // TODO make this an info message when bucket creation requests |
| // arrive in a proportional fashion e.g. if a VM's localMaxMemory is 1/2 of it's |
| // peer, it should receive half as many bucket creation requests |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Partitioned Region {} potentially unbalanced; maximum number of buckets, {}, has been reached", |
| this.partitionedRegion.getFullPath(), numBucketsToHostLocally); |
| logger.debug("Total max: {} memoryRatio: {}", totalMax, memoryRatio); |
| } |
| } |
| |
| return ret; |
| } |
| |
| /** |
| * Checks if this PartitionedRegionDataStore has the capacity to handle the rebalancing size. |
| * |
| * @param size the size in bytes of the bucket to be rebalanced |
| * @return true if size can be accommodated without exceeding ratioFull |
| */ |
| boolean handleRemoteCanRebalance(long size) { |
| return false; |
| } |
| |
| // ///////////////////////////////////// |
| // /////////// Empty methods ////////// |
| // //////////////////////////////////// |
| |
| /** |
| * Handles rebalance by accepting new bucket contents and storing it. |
| * |
| * @param bucketId the id of the bucket to rebalance |
| * @param obj the contents of the bucket |
| * @param regionName the name of the PR |
| * @return true if successful |
| */ |
| boolean handleRemoteRebalance(int bucketId, Object obj, String regionName) { |
| return false; |
| } |
| |
| /** |
| * Checks if this PR has the capacity to handle the rebalancing size. If so, creates the real |
| * storage for the bucket and a bucket2Node Region mapping. These two operations are done as a |
| * logical unit so that the node can immediately begin handling remote requests once the |
| * bucket2Node mapping becomes visible. |
| * |
| * @param bucketId the bucket id |
| */ |
| boolean handleRemoteCreateBackupRegion(int bucketId) { |
| return false; |
| } |
| |
| /** |
| * Return the size in bytes for a given bucket. |
| * |
| * @return size in bytes |
| */ |
| public long getBucketSize(int bucketId) { |
| // This is an approximate calculation, so we don't require the |
| // bucket to be fully initialized... |
| // getInitializedBucketForId(Long.valueOf(bucketId)); // wait for the bucket to finish |
| // initializing |
| final BucketRegion bucketRegion = this.localBucket2RegionMap.get(Integer.valueOf(bucketId)); |
| if (bucketRegion == null) { |
| return 0; |
| } |
| return bucketRegion.getTotalBytes(); |
| } |
| |
| /** |
| * This method returns name of this Partitioned Region |
| * |
| * @return Partitioned Region name |
| */ |
| private String getName() { |
| return this.partitionedRegion.getName(); |
| } |
| |
| |
| |
| // ///////////////////////////////////// |
| // /////////// Local put ////////////// |
| // //////////////////////////////////// |
| |
| /** |
| * Puts the object with the given key locally. <br> |
| * Step: <br> |
| * 1) It finds out the bucket region for the bucket id. <br> |
| * 2) If from step 1 it gets null, that means the bucket is re-mapped. <br> |
| * 3) If it finds the bucket region from step 1, it tries to put the key-value on the region. <br> |
| * 4) updateBucket2Size if bucket is on more than 1 node or else bucket listeners would take care |
| * of size update. <br> |
| * |
| * @param bucketId the bucket id of the key |
| * @param event the operation event |
| * @param ifNew whether a create must be performed |
| * @param ifOld whether an existing entry must be updated |
| * @param lastModified time stamp for update operations |
| * @throws ForceReattemptException if bucket region is null |
| * @throws PrimaryBucketException if the bucket in this data store is not the primary bucket |
| * @return true if put happened |
| */ |
| public boolean putLocally(final Integer bucketId, final EntryEventImpl event, boolean ifNew, |
| boolean ifOld, Object expectedOldValue, boolean requireOldValue, final long lastModified) |
| throws PrimaryBucketException, ForceReattemptException { |
| final BucketRegion br = getInitializedBucketForId(event.getKey(), bucketId); |
| return putLocally(br, event, ifNew, ifOld, expectedOldValue, requireOldValue, lastModified); |
| } |
| |
| public boolean putLocally(final BucketRegion bucketRegion, final EntryEventImpl event, |
| boolean ifNew, boolean ifOld, Object expectedOldValue, boolean requireOldValue, |
| final long lastModified) throws PrimaryBucketException, ForceReattemptException { |
| boolean didPut = false; // false if entry put fails |
| |
| // final BucketRegion bucketRegion = getInitializedBucketForId(event.getKey(), bucketId); |
| |
| try { |
| event.setRegion(bucketRegion); |
| |
| if (event.isOriginRemote()) { |
| didPut = bucketRegion.basicUpdate(event, ifNew, ifOld, lastModified, false); |
| } else { |
| // Skip yet another validation |
| didPut = bucketRegion.virtualPut(event, ifNew, ifOld, expectedOldValue, requireOldValue, |
| lastModified, false); |
| } |
| // bug 34361: don't send a reply if bucket was destroyed during the op |
| bucketRegion.checkReadiness(); |
| } catch (RegionDestroyedException rde) { |
| checkRegionDestroyedOnBucket(bucketRegion, event.isOriginRemote(), rde); |
| } |
| |
| return didPut; |
| } |
| |
| protected boolean hasClientInterest(EntryEventImpl event) { |
| return UPDATE_ACCESS_TIME_ON_INTEREST && this.keysOfInterest.containsKey(event.getKey()); |
| } |
| |
| protected void updateMemoryStats(final long memoryDelta) { |
| this.partitionedRegion.getPrStats().incBytesInUse(memoryDelta); |
| final long locBytes = this.bytesInUse.addAndGet(memoryDelta); |
| |
| // only check for exceeding local max memory if we're not evicting entries. |
| if (this.partitionedRegion.isEntryEvictionPossible()) { |
| return; |
| } |
| |
| if (this.exceededLocalMaxMemoryLimit) { // previously over limit |
| if (locBytes <= this.maximumLocalBytes) { // not over limit now |
| this.exceededLocalMaxMemoryLimit = false; |
| logger.info( |
| "Partitioned Region {} is at or below local maximum memory configuration {} Mb, current size is {} Mb", |
| this.partitionedRegion.getFullPath(), |
| this.partitionedRegion.getLocalMaxMemory(), |
| locBytes / PartitionedRegionHelper.BYTES_PER_MB); |
| } |
| } else { // previously not over limit |
| if (locBytes > this.maximumLocalBytes) { // over limit now |
| this.exceededLocalMaxMemoryLimit = true; |
| logger.warn( |
| "Partitioned Region {} has exceeded local maximum memory configuration {} Mb, current size is {} Mb", |
| this.partitionedRegion.getFullPath(), |
| this.partitionedRegion.getLocalMaxMemory(), |
| locBytes / PartitionedRegionHelper.BYTES_PER_MB); |
| } |
| } |
| } |
| |
| /** |
| * Checks whether there is room in this Map to accommodate more data. |
| * |
| * @param bytes the size to check in bytes |
| */ |
| boolean canAccommodateMoreBytesSafely(int bytes) { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| |
| if (this.partitionedRegion.getLocalMaxMemory() == 0) { |
| return false; |
| } |
| if (this.partitionedRegion.isEntryEvictionPossible()) { |
| // assume that since we're evicting entries, we're allowed to go over localMaxMemory and |
| // eviction will take care of keeping us from running out of memory. |
| return true; |
| } |
| // long allocatedMemory = currentAllocatedMemory(); |
| // precision coercion from int to long on bytes |
| final long curBytes = this.bytesInUse.get(); |
| if (isDebugEnabled) { |
| logger.debug( |
| "canAccomodateMoreBytes: bytes = {} allocatedMemory = {} newAllocatedSize = {} thresholdSize = ", |
| bytes, curBytes, (curBytes + bytes), this.maximumLocalBytes); |
| } |
| if ((curBytes + bytes) < this.maximumLocalBytes) { |
| if (isDebugEnabled) { |
| logger.debug("canAccomodateMoreBytes: returns true"); |
| } |
| return true; |
| } else { |
| if (isDebugEnabled) { |
| logger.debug("canAccomodateMoreBytes: returns false"); |
| } |
| return false; |
| } |
| } |
| |
| // static void update |
| /** |
| * Returns the PartitionRegion of Data store. |
| */ |
| public PartitionedRegion getPartitionedRegion() { |
| return this.partitionedRegion; |
| } |
| |
| /** |
| * Handles the remote request to remove the key from this Map. <br> |
| * Step: <br> |
| * 1) Locates the bucket region. If it doesnt find the actual bucket, it means that this bucket is |
| * remapped to some other node and remappedBucketException is thrown <br> |
| * 2) Invokes destroy on that bucket region <br> |
| * 3) updateBucket2Size if bucket is on more than 1 node or else bucket listners would take care |
| * of size update. |
| * |
| * @param bucketId for the key |
| * @param event the event causing this action |
| * @param expectedOldValue if non-null, then only succeed if current value |
| * @return the removed value |
| * @throws EntryNotFoundException if entry is not found for the key or expectedOldValue is not |
| * null and current value is not equal to it |
| * @throws PrimaryBucketException if the locally managed bucket is not the primary |
| * @throws ForceReattemptException if the bucket region is null |
| */ |
| public Object destroyLocally(Integer bucketId, EntryEventImpl event, Object expectedOldValue) |
| throws EntryNotFoundException, PrimaryBucketException, ForceReattemptException { |
| if (logger.isDebugEnabled()) { |
| logger.debug("destroyLocally: key={} bucketId={}{}{}", event.getKey(), |
| this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId); |
| } |
| Object obj = null; |
| final BucketRegion bucketRegion = getInitializedBucketForId(event.getKey(), bucketId); |
| try { |
| event.setRegion(bucketRegion); |
| |
| // ?? :ezoerner:20080815 the reason why the old value used to be set here |
| // early (before sync on RegionEntry) is unknown. |
| // However, it is necessary to set it "early" in the case of |
| // distribution without destroying locally (hasSeenEvent), but this is |
| // unnecessary (and wrong) otherwise. |
| // Setting the value is deferred until just before |
| // distribution in BucketRegion#basicDestroy |
| // @see BucketRegion#basicDestroy |
| // event.setOldValueFromRegion(); |
| |
| bucketRegion.basicDestroy(event, true, expectedOldValue); |
| // bug 34361: don't send a reply if bucket was destroyed during the op |
| bucketRegion.checkReadiness(); |
| |
| } catch (EntryNotFoundException enf) { |
| if (this.partitionedRegion.isDestroyed()) { |
| checkRegionDestroyedOnBucket(bucketRegion, event.isOriginRemote(), |
| new RegionDestroyedException( |
| "Region has been destroyed", |
| this.partitionedRegion.getFullPath())); |
| } |
| |
| // ???:ezoerner:20080815 why throw a new exception here and lose the |
| // stack trace when there was a perfectly good one already? |
| // throw new EntryNotFoundException("Entry not found for key = " + |
| // event.getKey()); |
| |
| throw enf; // rethrow |
| } catch (RegionDestroyedException rde) { |
| checkRegionDestroyedOnBucket(bucketRegion, event.isOriginRemote(), rde); |
| } |
| |
| // this is done elsewhere now |
| // event.setRegion(this.partitionedRegion); |
| // this.partitionedRegion.notifyBridgeClients(EnumListenerEvent.AFTER_DESTROY, |
| // event); |
| // this.partitionedRegion.notifyGatewayHub(EnumListenerEvent.AFTER_DESTROY, |
| // event); |
| |
| return obj; |
| } |
| |
| /** |
| * This method does the cleaning up of the closed/locallyDestroyed PartitionedRegion. This cleans |
| * up the reference of the close PartitionedRegion's node from the b2n region and locallyDestroys |
| * the b2n region (if removeBucketMapping is true). It locallyDestroys the bucket region and |
| * cleans up the localBucketRegion map to avoid any stale references to locally destroyed bucket |
| * region. |
| * |
| */ |
| void cleanUp(boolean removeBucketMapping, boolean removeFromDisk) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("cleanUp: Starting cleanup for {}", this.partitionedRegion); |
| } |
| try { |
| if (removeBucketMapping) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("cleanUp: Done destroyBucket2NodeRegionLocally for {}", |
| this.partitionedRegion); |
| } |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug("cleanUp: not removing node from b2n region"); |
| } |
| } |
| |
| // Lock out bucket creation while doing this :-) |
| StoppableWriteLock lock = this.bucketCreationLock.writeLock(); |
| lock.lock(); |
| try { |
| ProxyBucketRegion[] proxyBuckets = |
| getPartitionedRegion().getRegionAdvisor().getProxyBucketArray(); |
| if (proxyBuckets != null) { |
| for (ProxyBucketRegion pbr : proxyBuckets) { |
| Integer bucketId = Integer.valueOf(pbr.getBucketId()); |
| BucketRegion buk = localBucket2RegionMap.get(bucketId); |
| // concurrent entry iterator does not guarantee value, key pairs |
| if (buk != null) { |
| try { |
| buk.getBucketAdvisor().getProxyBucketRegion().setHosting(false); |
| if (removeFromDisk) { |
| buk.localDestroyRegion(); |
| } else { |
| buk.close(); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("cleanup: Locally destroyed bucket {}", buk.getFullPath()); |
| } |
| // Fix for defect #49012 |
| if (buk instanceof AbstractBucketRegionQueue |
| && buk.getPartitionedRegion().isShadowPR()) { |
| if (buk.getPartitionedRegion().getColocatedWithRegion() != null) { |
| buk.getPartitionedRegion().getColocatedWithRegion().getRegionAdvisor() |
| .getBucketAdvisor(bucketId).setShadowBucketDestroyed(true); |
| } |
| } |
| } catch (RegionDestroyedException ignore) { |
| } catch (Exception ex) { |
| logger.warn( |
| String.format("PartitionedRegion %s: cleanUp problem destroying bucket %s", |
| new Object[] {this.partitionedRegion.getFullPath(), |
| Integer.valueOf(buk.getId())}), |
| ex); |
| } |
| |
| localBucket2RegionMap.remove(bucketId); |
| } else if (removeFromDisk) { |
| DiskRegion diskRegion = pbr.getDiskRegion(); |
| if (diskRegion != null) { |
| diskRegion.beginDestroy(null); |
| diskRegion.endDestroy(null); |
| } |
| } |
| } // while |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } catch (Exception ex) { |
| logger.warn( |
| String.format("PartitionedRegion %s: caught unexpected exception during data cleanup", |
| this.partitionedRegion.getFullPath()), |
| ex); |
| } finally { |
| this.partitionedRegion.getPrStats().setBucketCount(0); |
| this.bucketStats.close(); |
| } |
| } |
| |
| public boolean isRemotePrimaryReadyForColocatedChildren(int bucketId) { |
| boolean isRemotePrimaryReady = true; |
| InternalDistributedMember myId = |
| this.partitionedRegion.getDistributionManager().getDistributionManagerId(); |
| |
| List<PartitionedRegion> colocatedChildPRs = |
| ColocationHelper.getColocatedChildRegions(this.partitionedRegion); |
| if (colocatedChildPRs != null) { |
| for (PartitionedRegion pr : colocatedChildPRs) { |
| InternalDistributedMember primaryChild = pr.getBucketPrimary(bucketId); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Checking colocated child bucket " + pr + ", bucketId=" + bucketId |
| + ", primary is " + primaryChild); |
| } |
| if (primaryChild == null || myId.equals(primaryChild)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Colocated bucket region " + pr + " " + bucketId |
| + " does not have a remote primary yet. Not to remove."); |
| } |
| return false; |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger |
| .debug(pr + " bucketId=" + bucketId + " has remote primary, checking its children"); |
| } |
| isRemotePrimaryReady = isRemotePrimaryReady |
| && pr.getDataStore().isRemotePrimaryReadyForColocatedChildren(bucketId); |
| } |
| } |
| } |
| return isRemotePrimaryReady; |
| } |
| |
| /** |
| * Removes a redundant bucket hosted by this data store. The rebalancer invokes this method |
| * directly or sends this member a message to invoke it. |
| * |
| * From the spec: |
| * |
| * How to Remove a Redundant Bucket |
| * |
| * This operation is done by the rebalancer (REB) and can only be done on non-primary buckets. If |
| * you want to remove a primary bucket first send one of its peers "become primary" and then send |
| * it "unhost" (we could offer a "unhost" option on "become primary" or a "becomePrimary" option |
| * on "create redundant"). The member that has the bucket being removed is called the bucket host |
| * (BH). |
| * |
| * 1. REB sends an "unhostBucket" message to BH. This message will be rejected if the member finds |
| * itself to be the primary or if it doesn't host the bucket by sending a failure reply to REB. 2. |
| * BH marks itself as "not-hosting". This causes any read operations that come in to not start and |
| * retry. BH also updates the advisor to know that it is no longer hosting the bucket. 3. BH then |
| * waits for any in-progress reads (which read ops to wait for are TBD) to complete. 4. BH then |
| * removes the bucket region from its cache. 5. BH then sends a success reply to REB. |
| * |
| * This method is now also used by the PartitionManager. For the PartitionManager, it does remove |
| * the primary bucket. |
| * |
| * @param bucketId the id of the bucket to remove |
| * |
| * @return true if the bucket was removed; false if unable to remove or if bucket is not hosted |
| */ |
| public boolean removeBucket(int bucketId, boolean forceRemovePrimary) { |
| waitForInProgressBackup(); |
| |
| // Don't allow the removal of a bucket if we haven't |
| // finished recovering from disk |
| if (!forceRemovePrimary |
| && !this.partitionedRegion.getRedundancyProvider().isPersistentRecoveryComplete()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Returning false from removeBucket because we have not finished recovering all colocated regions from disk"); |
| } |
| return false; |
| } |
| // Lock out bucket creation while doing this :-) |
| StoppableWriteLock lock = this.bucketCreationLock.writeLock(); |
| lock.lock(); |
| try { |
| BucketRegion bucketRegion = this.localBucket2RegionMap.get(Integer.valueOf(bucketId)); |
| if (bucketRegion == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Returning true from removeBucket because we don't have the bucket we've been told to remove"); |
| } |
| return true; |
| |
| } |
| |
| PartitionedRegion leader = ColocationHelper.getLeaderRegion(this.partitionedRegion); |
| boolean isLeader = leader.equals(this.partitionedRegion); |
| BucketAdvisor bucketAdvisor = bucketRegion.getBucketAdvisor(); |
| InternalDistributedMember myId = |
| this.partitionedRegion.getDistributionManager().getDistributionManagerId(); |
| Lock primaryMoveReadLock = bucketAdvisor.getPrimaryMoveReadLock(); |
| |
| // Fix for 43613 - don't remove the bucket |
| // if we are primary. We hold the lock here |
| // to prevent this member from becoming primary until this |
| // member is no longer hosting the bucket. |
| primaryMoveReadLock.lock(); |
| try { |
| // forceRemovePrimary==true will enable remove the bucket even when: |
| // 1) it's primary |
| // 2) no other primary ready yet |
| // 3) colocated bucket and its child is not completely ready |
| if (!forceRemovePrimary && bucketAdvisor.isPrimary()) { |
| return false; |
| } |
| |
| if (isLeader) { |
| if (!forceRemovePrimary && !isRemotePrimaryReadyForColocatedChildren(bucketId)) { |
| return false; |
| } |
| |
| InternalDistributedMember primary = bucketAdvisor.getPrimary(); |
| if (!forceRemovePrimary && (primary == null || myId.equals(primary))) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Bucket region " + bucketRegion |
| + " does not have a remote primary yet. Not to remove."); |
| } |
| return false; |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Bucket region " + bucketRegion + " has primary at " + primary); |
| } |
| } |
| |
| // recurse down to each tier of children to remove first |
| removeBucketForColocatedChildren(bucketId, forceRemovePrimary); |
| |
| if (bucketRegion.getPartitionedRegion().isShadowPR()) { |
| if (bucketRegion.getPartitionedRegion().getColocatedWithRegion() != null) { |
| bucketRegion.getPartitionedRegion().getColocatedWithRegion().getRegionAdvisor() |
| .getBucketAdvisor(bucketId).setShadowBucketDestroyed(true); |
| } |
| } |
| bucketAdvisor.getProxyBucketRegion().removeBucket(); |
| } finally { |
| primaryMoveReadLock.unlock(); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Removed bucket {} from advisor", bucketRegion); |
| } |
| |
| // Flush the state of the primary. This make sure we have processed |
| // all operations were sent out before we removed our profile from |
| // our peers. |
| // |
| // Another option, instead of using the StateFlushOperation, could |
| // be to send a message which waits until it acquires the |
| // activePrimaryMoveLock on primary the bucket region. That would also |
| // wait for in progress writes. I choose to use the StateFlushOperation |
| // because it won't block write operations while we're trying to acquire |
| // the activePrimaryMoveLock |
| InternalDistributedMember primary = bucketAdvisor.getPrimary(); |
| if (!myId.equals(primary)) { |
| StateFlushOperation flush = new StateFlushOperation(bucketRegion); |
| int executor = OperationExecutors.WAITING_POOL_EXECUTOR; |
| try { |
| flush.flush(Collections.singleton(primary), myId, executor, false); |
| } catch (InterruptedException e) { |
| this.partitionedRegion.getCancelCriterion().checkCancelInProgress(e); |
| Thread.currentThread().interrupt(); |
| throw new InternalGemFireException("Interrupted while flushing state"); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("Finished state flush for removal of {}", bucketRegion); |
| } |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug("We became primary while destroying the bucket. Too late to stop now."); |
| } |
| } |
| bucketRegion.invokePartitionListenerAfterBucketRemoved(); |
| bucketRegion.preDestroyBucket(bucketId); |
| bucketRegion.localDestroyRegion(); |
| bucketAdvisor.getProxyBucketRegion().finishRemoveBucket(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Destroyed {}", bucketRegion); |
| } |
| this.localBucket2RegionMap.remove(Integer.valueOf(bucketId)); |
| this.partitionedRegion.getPrStats().incBucketCount(-1); |
| return true; |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * Wait for in an progress backup. When we backup the whole DS, we need to make sure we don't miss |
| * a bucket because it is in the process of rebalancing. This doesn't wait for the whole backup to |
| * complete, it only makes sure that this destroy will wait until the point when we know that we |
| * that this bucket won't be destroyed on this member in the backup unless it was backed up on the |
| * target member. |
| */ |
| private void waitForInProgressBackup() { |
| BackupService backupService = getPartitionedRegion().getGemFireCache().getBackupService(); |
| if (getPartitionedRegion().getDataPolicy().withPersistence()) { |
| backupService.waitForBackup(); |
| } |
| |
| } |
| |
| /** |
| * This calls removeBucket on every colocated child that is directly colocated to this bucket's |
| * PR. Those each in turn do the same to their child buckets and so on before returning. |
| * |
| * @param bucketId the bucket to remove |
| * @param forceRemovePrimary true if we should remove the bucket, even if it is primary. |
| * |
| * @return true if bucket was removed from all children |
| */ |
| private boolean removeBucketForColocatedChildren(int bucketId, boolean forceRemovePrimary) { |
| boolean removedChildBucket = true; |
| |
| // getColocatedChildRegions returns only the child PRs directly colocated |
| // with thisPR... |
| List<PartitionedRegion> colocatedChildPRs = |
| ColocationHelper.getColocatedChildRegions(this.partitionedRegion); |
| if (colocatedChildPRs != null) { |
| for (PartitionedRegion pr : colocatedChildPRs) { |
| removedChildBucket = |
| pr.getDataStore().removeBucket(bucketId, forceRemovePrimary) && removedChildBucket; |
| } |
| } |
| |
| return removedChildBucket; |
| } |
| |
| /** |
| * Create a new backup of the bucket, allowing redundancy to be exceeded. All colocated child |
| * buckets will also be created. |
| * |
| * @param bucketId the bucket to create |
| * @param isRebalance true if bucket creation is directed by rebalancing |
| * @return true if the bucket and its colocated chain of children are created |
| */ |
| public CreateBucketResult createRedundantBucket(int bucketId, boolean isRebalance, |
| InternalDistributedMember moveSource) { |
| // recurse down to create each tier of children after creating leader bucket |
| return grabBucket(bucketId, moveSource, true, false, isRebalance, null, false); |
| } |
| |
| /** |
| * Moves the bucket from the <code>source</code> member to this datastore. |
| * |
| * If the bucket is the leader bucket then it will recursively create all colocated children and |
| * then remove all colocated children as well from the <code>source</code> member. |
| * |
| * @param bucketId the bucket to move |
| * @param source the member to move the bucket from |
| * @return true if bucket was successfully moved to this datastore |
| */ |
| public boolean moveBucket(int bucketId, InternalDistributedMember source, |
| final boolean isRebalance) { |
| |
| if (createRedundantBucket(bucketId, isRebalance, source) != CreateBucketResult.CREATED) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Failed to move bucket {} to {}", bucketId, this); |
| } |
| return false; |
| } |
| |
| BucketAdvisor bucketAdvisor = |
| this.partitionedRegion.getRegionAdvisor().getBucketAdvisor(bucketId); |
| |
| if (source.equals(bucketAdvisor.getPrimary())) { |
| if (!bucketAdvisor.becomePrimary(true)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Failed to become primary for bucket {} on {}", bucketId, this); |
| } |
| } |
| } |
| |
| RemoveBucketResponse response = |
| RemoveBucketMessage.send(source, this.partitionedRegion, bucketId, false); |
| if (response != null) { |
| boolean removed = response.waitForResponse(); |
| if (removed == false) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Successfully created bucket {} in {} but failed to remove it from {}", |
| bucketId, this, source); |
| } |
| } |
| // TODO rebalance - perhaps we should thow an error if we |
| // can't remove the bucket?? |
| } |
| // The new bucket's size is counted in when GII |
| return true; |
| } |
| |
| /** |
| * Fetch a BucketRegion, but do not return until it is initialized |
| * |
| * @param key optional for error reporting; if none, no key available. |
| * @param bucketId the bucket to fetch |
| * @return the region |
| */ |
| public BucketRegion getInitializedBucketForId(Object key, Integer bucketId) |
| throws ForceReattemptException { |
| final BucketRegion bucketRegion = this.localBucket2RegionMap.get(bucketId); |
| if (null == bucketRegion) { |
| this.partitionedRegion.checkReadiness(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Got null bucket region for bucketId={}{}{} for PartitionedRegion = {}", |
| this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, |
| this.partitionedRegion); |
| } |
| ForceReattemptException fre = new BucketNotFoundException( |
| String.format("Bucket id %s not found on VM %s", |
| |
| new Object[] {this.partitionedRegion.bucketStringForLogs(bucketId.intValue()), |
| this.partitionedRegion.getMyId()})); |
| if (key != null) { |
| fre.setHash(key.hashCode()); |
| } |
| throw fre; |
| } |
| bucketRegion.waitForData(); |
| return bucketRegion; |
| } |
| |
| /** |
| * Returns the local BucketRegion given an bucketId. Returns null if no BucketRegion exists. |
| * |
| * @since GemFire 6.1.2.9 |
| */ |
| public BucketRegion getLocalBucketById(Integer bucketId) { |
| final BucketRegion bucketRegion = this.localBucket2RegionMap.get(bucketId); |
| if (bucketRegion != null) { |
| bucketRegion.waitForData(); |
| } |
| return bucketRegion; |
| } |
| |
| |
| /* |
| * @return an initialized local bucket or null |
| */ |
| public BucketRegion getLocalBucketByKey(Object key) { |
| Integer bucketId = Integer |
| .valueOf(PartitionedRegionHelper.getHashKey(this.partitionedRegion, null, key, null, null)); |
| return getLocalBucketById(bucketId); |
| } |
| |
| /** |
| * Test hook to return the per entry overhead for a bucket region. PRECONDITION: a bucket must |
| * exist and be using LRU. |
| * |
| * @since GemFire 6.1.2.9 |
| */ |
| public int getPerEntryLRUOverhead() { |
| BucketRegion br = (localBucket2RegionMap.values().iterator().next()); |
| return br.getRegionMap().getEntryOverhead(); |
| } |
| |
| /** |
| * Fetch a BucketRegion, but do not return until it is initialized and the primary is known. |
| * |
| * @see #getInitializedBucketForId(Object, Integer) |
| * @return the initialized region |
| */ |
| public BucketRegion getInitializedBucketWithKnownPrimaryForId(Object key, Integer bucketId) |
| throws ForceReattemptException { |
| final BucketRegion br = getInitializedBucketForId(key, bucketId); |
| br.getBucketAdvisor().getPrimary();// waits until the primary is initialized |
| return br; |
| } |
| |
| /** |
| * Checks if this instance contains a value for the key locally. |
| * |
| * @param bucketId for the key |
| * @param key the key, whose value needs to be checks |
| * @throws ForceReattemptException if bucket region is null |
| * @return true if there is a non-null value for the given key |
| * @throws PrimaryBucketException if the locally managed bucket is not the primary |
| * @throws PRLocallyDestroyedException if the PartitionRegion is locally destroyed |
| */ |
| public boolean containsValueForKeyLocally(Integer bucketId, Object key) |
| throws PrimaryBucketException, ForceReattemptException, PRLocallyDestroyedException { |
| final BucketRegion bucketRegion = getInitializedBucketForId(key, bucketId); |
| invokeBucketReadHook(); |
| boolean ret = false; |
| try { |
| ret = bucketRegion.containsValueForKey(key); |
| checkIfBucketMoved(bucketRegion); |
| } catch (RegionDestroyedException rde) { |
| if (this.partitionedRegion.isLocallyDestroyed || this.partitionedRegion.isClosed) { |
| throw new PRLocallyDestroyedException(rde); |
| } else { |
| this.getPartitionedRegion().checkReadiness(); |
| if (bucketRegion.isBucketDestroyed()) { |
| // bucket moved by rebalance |
| throw new ForceReattemptException("Bucket removed during containsValueForKey", rde); |
| } else { |
| throw new RegionDestroyedException( |
| "Unable to get value for key.", |
| this.partitionedRegion.toString(), rde); |
| } |
| } |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("containsValueForKeyLocally: key {} returning {}", key, ret); |
| } |
| |
| return ret; |
| } |
| |
| /** |
| * Throw a ForceReattemptException if bucket has been moved out of this data store. |
| */ |
| private void checkIfBucketMoved(BucketRegion br) throws ForceReattemptException { |
| if (br.isBucketDestroyed()) { |
| this.partitionedRegion.checkReadiness(); |
| throw new ForceReattemptException("bucket moved to other member during read op"); |
| } |
| } |
| |
| /** |
| * Checks if this instance contains a key. |
| * |
| * @param bucketId the bucketId for the key |
| * @param key the key to look for |
| * @throws ForceReattemptException if bucket region is null |
| * @return true if there is an entry with the given key |
| * @throws PrimaryBucketException if the bucket is not primary |
| * @throws PRLocallyDestroyedException if the PartitionRegion is locally destroyed |
| */ |
| public boolean containsKeyLocally(Integer bucketId, Object key) |
| throws PrimaryBucketException, ForceReattemptException, PRLocallyDestroyedException { |
| final BucketRegion bucketRegion = getInitializedBucketForId(key, bucketId); |
| invokeBucketReadHook(); |
| try { |
| final boolean ret = bucketRegion.containsKey(key); |
| checkIfBucketMoved(bucketRegion); |
| if (logger.isDebugEnabled()) { |
| logger.debug("containsKeyLocally: key {}) bucketId={}{}{} region {} returns {}", key, |
| this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, |
| bucketRegion.getName(), ret); |
| } |
| return ret; |
| } catch (RegionDestroyedException rde) { |
| if (this.partitionedRegion.isLocallyDestroyed || this.partitionedRegion.isClosed) { |
| throw new PRLocallyDestroyedException(rde); |
| } else { |
| this.getPartitionedRegion().checkReadiness(); |
| if (bucketRegion.isBucketDestroyed()) { |
| // bucket moved by rebalance |
| throw new ForceReattemptException("Bucket removed during containsKey", rde); |
| } else { |
| throw new RegionDestroyedException("Unable to do containsKey on", |
| this.partitionedRegion.toString(), rde); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Test hook that will be invoked before any bucket read. It is invoked after the bucket is |
| * acquired but before the bucket is locked and before the read operation is done. |
| */ |
| private Runnable bucketReadHook; |
| |
| /** |
| * invokes a test hook, if it was installed, and removes it. |
| */ |
| public void invokeBucketReadHook() { |
| Runnable r = this.bucketReadHook; |
| if (r != null) { |
| setBucketReadHook(null); |
| r.run(); |
| } |
| } |
| |
| public void setBucketReadHook(Runnable r) { |
| this.bucketReadHook = r; |
| } |
| |
| /** |
| * Returns value corresponding to this key. |
| * |
| * @param key the key to look for |
| * @param requestingClient the client making the request, or null |
| * @param clientEvent client's event (for returning version tag) |
| * @param returnTombstones whether tombstones should be returned |
| * @throws ForceReattemptException if bucket region is null |
| * @return value from the bucket region |
| * @throws PrimaryBucketException if the locally managed bucket is not primary |
| * @throws PRLocallyDestroyedException if the PartitionRegion is locally destroyed |
| */ |
| public Object getLocally(int bucketId, final Object key, final Object aCallbackArgument, |
| boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, |
| EntryEventImpl clientEvent, boolean returnTombstones) |
| throws PrimaryBucketException, ForceReattemptException, PRLocallyDestroyedException { |
| return getLocally(bucketId, key, aCallbackArgument, disableCopyOnRead, preferCD, |
| requestingClient, clientEvent, returnTombstones, false); |
| } |
| |
| /** |
| * Returns value corresponding to this key. |
| * |
| * @param key the key to look for |
| * @param requestingClient the client making the request, or null |
| * @param clientEvent client's event (for returning version tag) |
| * @param returnTombstones whether tombstones should be returned |
| * @param opScopeIsLocal if true then just check local storage for a value; if false then try to |
| * find the value if it is not local |
| * @throws ForceReattemptException if bucket region is null |
| * @return value from the bucket region |
| * @throws PrimaryBucketException if the locally managed bucket is not primary |
| * @throws PRLocallyDestroyedException if the PartitionRegion is locally destroyed |
| */ |
| public Object getLocally(int bucketId, final Object key, final Object aCallbackArgument, |
| boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, |
| EntryEventImpl clientEvent, boolean returnTombstones, boolean opScopeIsLocal) |
| throws PrimaryBucketException, ForceReattemptException, PRLocallyDestroyedException { |
| final BucketRegion bucketRegion = getInitializedBucketForId(key, Integer.valueOf(bucketId)); |
| // check for primary (when a loader is present) done deeper in the BucketRegion |
| Object ret = null; |
| if (logger.isDebugEnabled()) { |
| logger.debug("getLocally: key {}) bucketId={}{}{} region {} returnTombstones {} ", key, |
| this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, |
| bucketRegion.getName(), returnTombstones); |
| } |
| invokeBucketReadHook(); |
| try { |
| ret = bucketRegion.get(key, aCallbackArgument, true, disableCopyOnRead, preferCD, |
| requestingClient, clientEvent, returnTombstones, opScopeIsLocal, false); |
| checkIfBucketMoved(bucketRegion); |
| } catch (RegionDestroyedException rde) { |
| if (this.partitionedRegion.isLocallyDestroyed || this.partitionedRegion.isClosed) { |
| throw new PRLocallyDestroyedException(rde); |
| } else { |
| this.getPartitionedRegion().checkReadiness(); |
| if (bucketRegion.isBucketDestroyed()) { |
| // bucket moved by rebalance |
| throw new ForceReattemptException("Bucket removed during get", rde); |
| } else { |
| throw new InternalGemFireError( |
| "Got region destroyed message, but neither bucket nor PR was destroyed", rde); |
| } |
| } |
| } |
| return ret; |
| } |
| |
| /** |
| * Return a value from the bucket region, always serialized |
| * |
| * @param keyInfo TODO |
| * @param clientEvent a "client" event that will hold version information about the entry |
| * @param returnTombstones TODO |
| * @throws ForceReattemptException if bucket region is null |
| * @return value from the bucket region |
| * @throws PrimaryBucketException if the locally managed bucket is not primary |
| * @see #getLocally(int, Object, Object, boolean, boolean, ClientProxyMembershipID, |
| * EntryEventImpl, boolean) |
| */ |
| public RawValue getSerializedLocally(KeyInfo keyInfo, boolean doNotLockEntry, |
| ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, |
| boolean returnTombstones) throws PrimaryBucketException, ForceReattemptException { |
| final BucketRegion bucketRegion = |
| getInitializedBucketForId(keyInfo.getKey(), keyInfo.getBucketId()); |
| // check for primary (when loader is present) done deeper in the BucketRegion |
| if (logger.isDebugEnabled()) { |
| logger.debug("getSerializedLocally: key {}) bucketId={}{}{} region {}", keyInfo.getKey(), |
| this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, |
| keyInfo.getBucketId(), bucketRegion.getName()); |
| } |
| invokeBucketReadHook(); |
| |
| try { |
| RawValue result = bucketRegion.getSerialized(keyInfo, true, doNotLockEntry, requestingClient, |
| clientEvent, returnTombstones); |
| checkIfBucketMoved(bucketRegion); |
| return result; |
| } catch (RegionDestroyedException rde) { |
| if (bucketRegion.isBucketDestroyed()) { |
| // bucket moved by rebalance |
| throw new ForceReattemptException("Bucket removed during get", rde); |
| } else { |
| throw rde; |
| } |
| } catch (IOException e) { |
| throw new ForceReattemptException( |
| "Unable to serialize value", |
| e); |
| } |
| } |
| |
| /** |
| * Finds the local bucket corresponding to the given key and retrieves the key's Region.Entry |
| * |
| * @param key the key to look for |
| * @param access true if caller wants last accessed time updated |
| * @param allowTombstones whether a tombstoned entry can be returned |
| * |
| * @throws ForceReattemptException if bucket region is not present in this process |
| * @return a RegionEntry for the given key, which will be null if the key is not in the bucket |
| * @throws EntryNotFoundException TODO-javadocs |
| * @throws PRLocallyDestroyedException if the PartitionRegion is locally destroyed |
| */ |
| public EntrySnapshot getEntryLocally(int bucketId, final Object key, boolean access, |
| boolean allowTombstones) throws EntryNotFoundException, PrimaryBucketException, |
| ForceReattemptException, PRLocallyDestroyedException { |
| final BucketRegion bucketRegion = getInitializedBucketForId(key, Integer.valueOf(bucketId)); |
| if (logger.isDebugEnabled()) { |
| logger.debug("getEntryLocally: key {}) bucketId={}{}{} region {}", key, |
| this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, |
| bucketRegion.getName()); |
| } |
| invokeBucketReadHook(); |
| EntrySnapshot res = null; |
| RegionEntry ent = null; |
| try { |
| ent = bucketRegion.entries.getEntry(key); |
| |
| if (ent == null) { |
| this.getPartitionedRegion().checkReadiness(); |
| if (bucketRegion.isBucketDestroyed()) { |
| // bucket moved by rebalance |
| throw new ForceReattemptException("Bucket removed during getEntry"); |
| } |
| throw new EntryNotFoundException( |
| "entry not found"); |
| |
| } else if ((ent.isTombstone() && allowTombstones) || !ent.isDestroyedOrRemoved()) { |
| res = new EntrySnapshot(ent, bucketRegion, partitionedRegion, allowTombstones); |
| } |
| checkIfBucketMoved(bucketRegion); |
| } catch (RegionDestroyedException rde) { |
| if (this.partitionedRegion.isLocallyDestroyed || this.partitionedRegion.isClosed) { |
| throw new PRLocallyDestroyedException(rde); |
| } else { |
| this.getPartitionedRegion().checkReadiness(); |
| if (bucketRegion.isBucketDestroyed()) { |
| // bucket moved by rebalance |
| throw new ForceReattemptException("Bucket removed during getEntry", rde); |
| } else { |
| throw new RegionDestroyedException( |
| "Unable to get Entry.", |
| this.partitionedRegion.toString(), rde); |
| } |
| } |
| } finally { |
| if (access) { |
| bucketRegion.updateStatsForGet(ent, res != null); |
| } |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("getEntryLocally returning {}", res); |
| } |
| return res; |
| } |
| |
| /** |
| * Handle a remote request for keys for the provided bucketId |
| * |
| * @param allowTombstones whether to return destroyed entries |
| * @return The <code>Set</code> of keys for bucketId or {@link Collections#EMPTY_SET}if no keys |
| * are present |
| * @throws PRLocallyDestroyedException if the PartitionRegion is locally destroyed |
| */ |
| public Set handleRemoteGetKeys(Integer bucketId, int interestType, Object interestArg, |
| boolean allowTombstones) throws PRLocallyDestroyedException, ForceReattemptException { |
| if (logger.isDebugEnabled()) { |
| logger.debug("handleRemoteGetKeys: bucketId: {}{}{} with tombstones {}", |
| this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, |
| allowTombstones); |
| } |
| Set ret = Collections.EMPTY_SET; |
| final BucketRegion r = getInitializedBucketForId(null, bucketId); |
| try { |
| if (r != null) { |
| invokeBucketReadHook(); |
| if (!r.isEmpty() || (allowTombstones && r.getTombstoneCount() > 0)) { |
| ret = r.getKeysWithInterest(interestType, interestArg, allowTombstones); |
| } |
| checkIfBucketMoved(r); |
| } |
| } catch (RegionDestroyedException rde) { |
| if (this.partitionedRegion.isLocallyDestroyed || this.partitionedRegion.isClosed) { |
| throw new PRLocallyDestroyedException(rde); |
| } else { |
| this.getPartitionedRegion().checkReadiness(); |
| if (r.isBucketDestroyed()) { |
| // bucket moved by rebalance |
| throw new ForceReattemptException("Bucket removed during remoteGetKeys", rde); |
| } else { |
| throw new RegionDestroyedException( |
| String.format("Unable to fetch keys on %s", |
| this.partitionedRegion.toString()), |
| this.partitionedRegion.getFullPath(), rde); |
| } |
| } |
| } |
| return ret; |
| } |
| |
| /** |
| * Get the local keys for a given bucket. This operation should be as efficient as possible, by |
| * avoiding making copies of the returned set. The returned set can and should reflect concurrent |
| * changes (no ConcurrentModificationExceptions). |
| * |
| * @param allowTombstones whether to include destroyed entries in the result |
| * @return The <code>Set</code> of keys for bucketId or {@link Collections#EMPTY_SET} if no keys |
| * are present |
| * @throws PRLocallyDestroyedException if the PartitionRegion is locally destroyed |
| */ |
| public Set getKeysLocally(Integer bucketId, boolean allowTombstones) |
| throws PRLocallyDestroyedException, ForceReattemptException { |
| if (logger.isDebugEnabled()) { |
| logger.debug("handleRemoteGetKeys: bucketId: {}{}{}", this.partitionedRegion.getPRId(), |
| PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId); |
| } |
| Set ret = Collections.EMPTY_SET; |
| final BucketRegion r = getInitializedBucketForId(null, bucketId); |
| invokeBucketReadHook(); |
| try { |
| if (r != null) { |
| // A copy is made so that the bucket is free to move |
| ret = new HashSet(r.keySet(allowTombstones)); |
| checkIfBucketMoved(r); |
| } |
| } catch (RegionDestroyedException rde) { |
| if (this.partitionedRegion.isLocallyDestroyed || this.partitionedRegion.isClosed) { |
| throw new PRLocallyDestroyedException(rde); |
| } else { |
| this.getPartitionedRegion().checkReadiness(); |
| if (r.isBucketDestroyed()) { |
| // bucket moved by rebalance |
| throw new ForceReattemptException("Bucket removed during keySet", rde); |
| } else { |
| throw new RegionDestroyedException( |
| String.format("Unable to fetch keys on %s", |
| this.partitionedRegion), |
| this.partitionedRegion.getFullPath(), rde); |
| } |
| } |
| } |
| return ret; |
| } |
| |
| |
| @Override |
| public String toString() { |
| if (this.partitionedRegion != null) { |
| String rName = this.partitionedRegion.getFullPath(); |
| return this.partitionedRegion.getMyId() + "@" + getClass().getName() + "@" |
| + System.identityHashCode(this) + " name: " + rName + " bucket count: " |
| + this.localBucket2RegionMap.size(); |
| } |
| return null; |
| } |
| |
| /** |
| * Creates the entry with the given key locally. <br> |
| * Steps: <br> |
| * 1) It finds out the bucket region for the bucket id. <br> |
| * 2) If from step 1 it gets null, that means the bucket is remapped. <br> |
| * 3) If it finds the bucket region from step 1, it tries to creates the entry on the region. <br> |
| * 4) If entry already exists, for the key, step 3 would throw EntryExistsException or else it |
| * will create an entry <br> |
| * 5) updateBucket2Size if bucket is on more than 1 node or else bucket listners would take care |
| * of size update. <br> |
| * |
| * @param bucketRegion the bucket to do the create in |
| * @param event the particulars of the operation |
| * @param ifNew whether a new entry can be created |
| * @param ifOld whether an existing entry can be updated |
| * @param lastModified timestamp |
| * @throws ForceReattemptException if bucket region is null |
| * @throws EntryExistsException if an entry with this key already exists |
| */ |
| public boolean createLocally(final BucketRegion bucketRegion, final EntryEventImpl event, |
| boolean ifNew, boolean ifOld, boolean requireOldValue, final long lastModified) |
| throws ForceReattemptException { |
| boolean result = false; |
| try { |
| event.setRegion(bucketRegion); // convert to the bucket region |
| if (event.isOriginRemote()) { |
| result = bucketRegion.basicUpdate(event, ifNew, ifOld, lastModified, true); |
| } else { |
| // Skip validating again |
| result = bucketRegion.virtualPut(event, ifNew, ifOld, null, // expectedOldValue |
| requireOldValue, lastModified, false); |
| } |
| // if (shouldThrowExists && !posDup) { |
| // throw new EntryExistsException(event.getKey().toString()); |
| // } |
| // bug 34361: don't send a reply if bucket was destroyed during the op |
| bucketRegion.checkReadiness(); |
| } catch (RegionDestroyedException rde) { |
| checkRegionDestroyedOnBucket(bucketRegion, event.isOriginRemote(), rde); |
| } |
| |
| |
| return result; |
| |
| // this is now done elsewhere |
| // event.setRegion(this.partitionedRegion); |
| // this.partitionedRegion.notifyBridgeClients(EnumListenerEvent.AFTER_CREATE, |
| // event); |
| // this.partitionedRegion.notifyGatewayHub(EnumListenerEvent.AFTER_CREATE, |
| // event); |
| } |
| |
| /** |
| * Handles the local request to invalidate the key from this region. <br> |
| * Steps: <br> |
| * 1) Locates the bucket region. If it doesnt find the actual bucket, it means that this bucket is |
| * remapped to some other node and remappedBucketException is thrown <br> |
| * 2) get the existing value for the key from bucket region <br> |
| * 3) if step 2 returns null, throw EntryNotFoundException <br> |
| * 4) if step 2 returns non-null value, perform invalidate on the bucket region and use value from |
| * step 2 in step 5 <br> |
| * 5) updateBucket2Size if bucket is on more than 1 node or else bucket listners would take care |
| * of size update. <br> |
| * |
| * @param bucketId the bucketId for the key |
| * @param event the event that prompted this action |
| * @throws ForceReattemptException if bucket region is null |
| * @throws EntryNotFoundException if entry is not found for the key |
| * @throws PrimaryBucketException if the locally managed buffer is not primary |
| */ |
| protected void invalidateLocally(Integer bucketId, EntryEventImpl event) |
| throws EntryNotFoundException, PrimaryBucketException, ForceReattemptException { |
| if (logger.isDebugEnabled()) { |
| logger.debug("invalidateLocally: bucketId={}{}{} for key={}", |
| this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, |
| event.getKey()); |
| } |
| final BucketRegion bucketRegion = getInitializedBucketForId(event.getKey(), bucketId); |
| try { |
| event.setRegion(bucketRegion); |
| event.setOldValueFromRegion(); |
| bucketRegion.basicInvalidate(event); |
| |
| // bug 34361: don't send a reply if bucket was destroyed during the op |
| bucketRegion.checkReadiness(); |
| |
| // this is now done elsewhere |
| // event.setRegion(this.partitionedRegion); |
| // this.partitionedRegion.notifyBridgeClients( |
| // EnumListenerEvent.AFTER_INVALIDATE, event); |
| } catch (RegionDestroyedException rde) { |
| checkRegionDestroyedOnBucket(bucketRegion, event.isOriginRemote(), rde); |
| } |
| } |
| |
| /** |
| * This method iterates over localBucket2RegionMap and returns collective size of the bucket |
| * regions. <br> |
| * Steps: <br> |
| * 1) Check if localBucket2RegionMap is empty. If it is, return 0.<br> |
| * 2) If localBucket2RegionMap is not empty, get keyset of all these bucket IDs. <br> |
| * 3) Get the nodeList for the bucket ID from Bucket2Node region. <br> |
| * 4) If first node from the node list is current node, increment the size counter. <br> |
| * 5) Step#4 takes care of the problem of recounting the size of redundant buckets. <br> |
| * |
| * |
| * @return the map of bucketIds and their associated sizes, or {@link Collections#EMPTY_MAP}when |
| * the size is zero |
| */ |
| public Map<Integer, Integer> getSizeLocally() { |
| return getSizeLocally(false); |
| } |
| |
| /** |
| * @see #getSizeLocally() |
| * @param primaryOnly true if sizes for primary buckets are desired |
| * @return the map of bucketIds and their associated sizes |
| */ |
| public Map<Integer, Integer> getSizeLocally(boolean primaryOnly) { |
| HashMap<Integer, Integer> mySizeMap; |
| if (this.localBucket2RegionMap.isEmpty()) { |
| return Collections.EMPTY_MAP; |
| } |
| mySizeMap = new HashMap<>(this.localBucket2RegionMap.size()); |
| Map.Entry<Integer, BucketRegion> me; |
| BucketRegion r; |
| for (Iterator<Map.Entry<Integer, BucketRegion>> itr = |
| this.localBucket2RegionMap.entrySet().iterator(); itr.hasNext();) { |
| me = itr.next(); |
| try { |
| r = me.getValue(); |
| if (null != r) { // fix for bug#35497 |
| r.waitForData(); |
| if (primaryOnly) { |
| if (r.getBucketAdvisor().isPrimary()) { |
| mySizeMap.put(me.getKey(), r.size()); |
| } |
| } else { |
| mySizeMap.put(me.getKey(), r.size()); |
| } |
| } |
| } catch (CacheRuntimeException skip) { |
| } |
| } // for |
| if (logger.isDebugEnabled()) { |
| logger.debug("getSizeLocally: returns bucketSizes={}", mySizeMap); |
| } |
| |
| return mySizeMap; |
| } |
| |
| /** |
| * This method iterates over localBucket2RegionMap and returns collective size of the primary |
| * bucket regions. |
| * |
| * @return the map of bucketIds and their associated sizes, or {@link Collections#EMPTY_MAP}when |
| * the size is zero |
| */ |
| public Map<Integer, SizeEntry> getSizeForLocalBuckets() { |
| return getSizeLocallyForBuckets(this.localBucket2RegionMap.keySet()); |
| } |
| |
| public Map<Integer, SizeEntry> getSizeForLocalPrimaryBuckets() { |
| return getSizeLocallyForBuckets(getAllLocalPrimaryBucketIds()); |
| } |
| |
| public Map<Integer, SizeEntry> getSizeEstimateForLocalPrimaryBuckets() { |
| return getSizeEstimateLocallyForBuckets(getAllLocalPrimaryBucketIds()); |
| } |
| |
| /** |
| * This calculates size of all the primary bucket regions for the list of bucketIds. |
| * |
| * @return the size of all the primary bucket regions for the list of bucketIds. |
| */ |
| public Map<Integer, SizeEntry> getSizeLocallyForBuckets(Collection<Integer> bucketIds) { |
| return getSizeLocallyForPrimary(bucketIds, false); |
| } |
| |
| public Map<Integer, SizeEntry> getSizeEstimateLocallyForBuckets(Collection<Integer> bucketIds) { |
| return getSizeLocallyForPrimary(bucketIds, true); |
| } |
| |
| private Map<Integer, SizeEntry> getSizeLocallyForPrimary(Collection<Integer> bucketIds, |
| boolean estimate) { |
| Map<Integer, SizeEntry> mySizeMap; |
| if (this.localBucket2RegionMap.isEmpty()) { |
| return Collections.emptyMap(); |
| } |
| mySizeMap = new HashMap<Integer, SizeEntry>(this.localBucket2RegionMap.size()); |
| BucketRegion r = null; |
| for (Integer bucketId : bucketIds) { |
| try { |
| r = getInitializedBucketForId(null, bucketId); |
| mySizeMap.put(bucketId, new SizeEntry(estimate ? r.sizeEstimate() : r.size(), |
| r.getBucketAdvisor().isPrimary())); |
| // if (getLogWriter().fineEnabled() && r.getBucketAdvisor().isPrimary()) { |
| // r.verifyTombstoneCount(); |
| // } |
| } catch (PrimaryBucketException skip) { |
| // sizeEstimate() will throw this exception as it will not try to read from HDFS on a |
| // secondary bucket, |
| // this bucket will be retried in PartitionedRegion.getSizeForHDFS() fixes bug 49033 |
| continue; |
| } catch (ForceReattemptException skip) { |
| continue; |
| } catch (RegionDestroyedException skip) { |
| continue; |
| } |
| } // while |
| return mySizeMap; |
| } |
| |
| public int getSizeOfLocalPrimaryBuckets() { |
| int sizeOfLocalPrimaries = 0; |
| Set<BucketRegion> primaryBuckets = getAllLocalPrimaryBucketRegions(); |
| for (BucketRegion br : primaryBuckets) { |
| sizeOfLocalPrimaries += br.size(); |
| } |
| return sizeOfLocalPrimaries; |
| } |
| |
| public int getSizeOfLocalBuckets() { |
| int sizeOfLocal = 0; |
| Set<BucketRegion> allLocalBuckets = getAllLocalBucketRegions(); |
| for (BucketRegion br : allLocalBuckets) { |
| sizeOfLocal += br.size(); |
| } |
| return sizeOfLocal; |
| } |
| |
| /** |
| * Interface for visiting buckets |
| */ |
| // public visibility for tests |
| public interface BucketVisitor { |
| void visit(Integer bucketId, Region r); |
| } |
| |
| // public visibility for tests |
| public void visitBuckets(final BucketVisitor bv) { |
| if (this.localBucket2RegionMap.size() > 0) { |
| Map.Entry me; |
| for (Iterator i = this.localBucket2RegionMap.entrySet().iterator(); i.hasNext();) { |
| me = (Map.Entry) i.next(); |
| Region r = (Region) me.getValue(); |
| // ConcurrentHashMap entrySet iterator does not guarantee an atomic snapshot |
| // of an entry. Specifically, getValue() performs a CHM.get() and as a result |
| // may return null if the entry was removed, but yet always returns a key |
| // under the same circumstances... Ouch. Due to the fact that entries are |
| // removed as part of data store performs cleanup, a null check is required |
| // to protect BucketVisitors, in the event iteration occurs during data |
| // store cleanup. Bug fix 38680. |
| if (r != null) { |
| bv.visit((Integer) me.getKey(), r); |
| } |
| } |
| } |
| } |
| |
| private void visitBucket(final Integer bucketId, final LocalRegion bucket, |
| final EntryVisitor ev) { |
| try { |
| for (Iterator ei = bucket.entrySet().iterator(); ei.hasNext();) { |
| ev.visit(bucketId, (Region.Entry) ei.next()); |
| } |
| } catch (CacheRuntimeException ignore) { |
| } |
| ev.finishedVisiting(); |
| } |
| |
| /** |
| * Test class and method for visiting Entries NOTE: This class will only give a partial view if a |
| * visited bucket is moved by a rebalance while a visit is in progress on that bucket. |
| */ |
| protected abstract static class EntryVisitor { |
| public abstract void visit(Integer bucketId, Region.Entry re); |
| |
| public abstract void finishedVisiting(); |
| } |
| |
| private void visitEntries(final EntryVisitor knock) { |
| visitBuckets(new BucketVisitor() { |
| @Override |
| public void visit(Integer bucketId, Region buk) { |
| try { |
| ((LocalRegion) buk).waitForData(); |
| for (Iterator ei = buk.entrySet().iterator(); ei.hasNext();) { |
| knock.visit(bucketId, (Region.Entry) ei.next()); |
| } |
| } catch (CacheRuntimeException ignore) { |
| } |
| knock.finishedVisiting(); |
| } |
| }); |
| } |
| |
| /** |
| * <i>Test Method</i> Return the list of PartitionedRegion entries contained in this data store |
| * |
| * @return a List of all entries gathered across all buckets in this data store |
| */ |
| public List getEntries() { |
| final ArrayList al = new ArrayList(); |
| visitEntries(new EntryVisitor() { |
| @Override |
| public void visit(Integer bucketId, Entry re) { |
| if (re.getValue() != Token.TOMBSTONE) { |
| al.add(re); |
| } |
| } |
| |
| @Override |
| public void finishedVisiting() {} |
| }); |
| return al; |
| } |
| |
| /** |
| * <i>Test Method</i> Dump all the entries in all the buckets to the logger, validate that the |
| * bucket-to-node meta region contains all bhe buckets managed by this data store |
| * |
| * @param validateOnly only perform bucket-to-node validation |
| */ |
| public void dumpEntries(final boolean validateOnly) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("[dumpEntries] dumping {}", this); |
| } |
| final StringBuffer buf; |
| if (validateOnly) { |
| buf = null; |
| |
| // If we're doing a validation, make sure the region is initialized |
| // before insisting that its metadata be correct :-) |
| this.partitionedRegion.waitForData(); |
| } else { |
| dumpBackingMaps(); |
| } |
| } |
| |
| public void dumpBackingMaps() { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Bucket maps in {}\n", this); |
| } |
| visitBuckets(new BucketVisitor() { |
| @Override |
| public void visit(Integer bucketId, Region buk) { |
| try { |
| LocalRegion lbuk = (LocalRegion) buk; |
| lbuk.waitForData(); |
| int size = lbuk.size(); |
| int keySetSize = (new HashSet(lbuk.keySet())).size(); |
| if (size != keySetSize) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Size is not consistent with keySet size! size={} but keySet size={} region={}", |
| size, keySetSize, lbuk); |
| } |
| } |
| lbuk.dumpBackingMap(); |
| } catch (CacheRuntimeException ignore) { |
| } |
| } |
| }); |
| } |
| |
| |
| /** |
| * <i>Test Method</i> Dump all the bucket names in this data store to the logger |
| * |
| */ |
| public void dumpBuckets() { |
| final StringBuffer buf = new StringBuffer("Buckets in ").append(this).append("\n"); |
| visitBuckets(new BucketVisitor() { |
| @Override |
| public void visit(Integer bucketId, Region r) { |
| buf.append("bucketId: ").append(partitionedRegion.bucketStringForLogs(bucketId.intValue())) |
| .append(" bucketName: ").append(r).append("\n"); |
| } |
| }); |
| logger.debug(buf.toString()); |
| } |
| |
| /** |
| * <i>Test Method</i> Return the list of all the bucket names in this data store. |
| * |
| */ |
| public List getLocalBucketsListTestOnly() { |
| final List bucketList = new ArrayList(); |
| visitBuckets(new BucketVisitor() { |
| @Override |
| public void visit(Integer bucketId, Region r) { |
| bucketList.add(bucketId); |
| } |
| }); |
| return bucketList; |
| } |
| |
| /** |
| * <i>Test Method</i> Return the list of all the primary bucket ids in this data store. |
| * |
| */ |
| public List getLocalPrimaryBucketsListTestOnly() { |
| final List primaryBucketList = new ArrayList(); |
| visitBuckets(new BucketVisitor() { |
| @Override |
| public void visit(Integer bucketId, Region r) { |
| BucketRegion br = (BucketRegion) r; |
| BucketAdvisor ba = (BucketAdvisor) br.getDistributionAdvisor(); |
| if (ba.isPrimary()) { |
| primaryBucketList.add(bucketId); |
| } |
| } |
| }); |
| return primaryBucketList; |
| } |
| |
| /** |
| * <i>Test Method</i> Return the list of all the non primary bucket ids in this data store. |
| * |
| */ |
| public List getLocalNonPrimaryBucketsListTestOnly() { |
| final List nonPrimaryBucketList = new ArrayList(); |
| visitBuckets(new BucketVisitor() { |
| @Override |
| public void visit(Integer bucketId, Region r) { |
| BucketRegion br = (BucketRegion) r; |
| BucketAdvisor ba = (BucketAdvisor) br.getDistributionAdvisor(); |
| if (!ba.isPrimary()) { |
| nonPrimaryBucketList.add(bucketId); |
| } |
| } |
| }); |
| return nonPrimaryBucketList; |
| } |
| |
| /** |
| * <i>Test Method</i> Dump the entries in this given bucket to the logger |
| * |
| * @param bucketId the id of the bucket to dump |
| * @param bucket the Region containing the bucket data |
| */ |
| public void dumpBucket(int bucketId, final LocalRegion bucket) { |
| Integer buckId = Integer.valueOf(bucketId); |
| visitBucket(buckId, bucket, new EntryVisitor() { |
| final StringBuffer buf = new StringBuffer("Entries in bucket ").append(bucket).append("\n"); |
| |
| @Override |
| public void visit(Integer bid, Entry re) { |
| buf.append(re.getKey()).append(" => ").append(re.getValue()).append("\n"); |
| } |
| |
| @Override |
| public void finishedVisiting() { |
| logger.debug(buf.toString()); |
| } |
| }); |
| |
| } |
| |
| |
| /** |
| * Fetch the entries for the given bucket |
| * |
| * @param bucketId the id of the bucket |
| * @return a Map containing all the entries |
| */ |
| public BucketRegion handleRemoteGetEntries(int bucketId) throws ForceReattemptException { |
| if (logger.isDebugEnabled()) { |
| logger.debug("handleRemoteGetEntries: bucketId: {}{}{}", this.partitionedRegion.getPRId(), |
| PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId); |
| } |
| BucketRegion br = getInitializedBucketForId(null, Integer.valueOf(bucketId)); |
| // NOTE: this is a test method that does not take a snapshot so it does not |
| // give a stable set of entries if the bucket is moved during a rebalance |
| return br; |
| } |
| |
| @Override |
| public CachePerfStats getCachePerfStats() { |
| return this.bucketStats; |
| } |
| |
| /** |
| * Return a set of local buckets. Iterators may include entries with null values (but non-null |
| * keys). |
| * |
| * @return an unmodifiable set of Map.Entry objects |
| */ |
| public Set<Map.Entry<Integer, BucketRegion>> getAllLocalBuckets() { |
| return Collections.unmodifiableSet(localBucket2RegionMap.entrySet()); |
| } |
| |
| public Set<Integer> getAllLocalBucketIds() { |
| return Collections.unmodifiableSet(localBucket2RegionMap.keySet()); |
| } |
| |
| /** |
| * Returns a set of local buckets. |
| * |
| * @return an unmodifiable set of BucketRegion |
| */ |
| public Set<BucketRegion> getAllLocalBucketRegions() { |
| Set<BucketRegion> retVal = new HashSet<BucketRegion>(); |
| for (BucketRegion br : localBucket2RegionMap.values()) { |
| retVal.add(br); |
| } |
| return Collections.unmodifiableSet(retVal); |
| } |
| |
| public boolean isLocalBucketRegionPresent() { |
| return localBucket2RegionMap.size() > 0; |
| } |
| |
| public Set<BucketRegion> getAllLocalPrimaryBucketRegions() { |
| Set<BucketRegion> retVal = new HashSet<BucketRegion>(); |
| for (BucketRegion br : localBucket2RegionMap.values()) { |
| if (br.getBucketAdvisor().isPrimary()) { |
| retVal.add(br); |
| } |
| } |
| return Collections.unmodifiableSet(retVal); |
| } |
| |
| public Set<Integer> getAllLocalPrimaryBucketIds() { |
| Set<Integer> bucketIds = new HashSet<Integer>(); |
| for (Map.Entry<Integer, BucketRegion> bucketEntry : getAllLocalBuckets()) { |
| BucketRegion bucket = bucketEntry.getValue(); |
| if (bucket.getBucketAdvisor().isPrimary()) { |
| bucketIds.add(Integer.valueOf(bucket.getId())); |
| } |
| } |
| return bucketIds; |
| } |
| |
| public Set<Integer> getAllLocalPrimaryBucketIdsBetweenProvidedIds(int low, int high) { |
| Set<Integer> bucketIds = new HashSet<Integer>(); |
| for (Map.Entry<Integer, BucketRegion> bucketEntry : getAllLocalBuckets()) { |
| BucketRegion bucket = bucketEntry.getValue(); |
| if (bucket.getBucketAdvisor().isPrimary() && (bucket.getId() >= low) |
| && (bucket.getId() < high)) { |
| bucketIds.add(Integer.valueOf(bucket.getId())); |
| } |
| } |
| return bucketIds; |
| } |
| |
| /** a fast estimate of total bucket size */ |
| public long getEstimatedLocalBucketSize(boolean primaryOnly) { |
| long size = 0; |
| for (BucketRegion br : localBucket2RegionMap.values()) { |
| if (!primaryOnly || br.getBucketAdvisor().isPrimary()) { |
| size += br.getEstimatedLocalSize(); |
| } |
| } |
| return size; |
| } |
| |
| /** a fast estimate of total bucket size */ |
| public long getEstimatedLocalBucketSize(Set<Integer> bucketIds) { |
| long size = 0; |
| for (Integer bid : bucketIds) { |
| BucketRegion br = localBucket2RegionMap.get(bid); |
| if (br != null) { |
| size += br.getEstimatedLocalSize(); |
| } |
| } |
| return size; |
| } |
| |
| public Object getLocalValueInVM(final Object key, int bucketId) { |
| try { |
| BucketRegion br = getInitializedBucketForId(key, Integer.valueOf(bucketId)); |
| return br.getValueInVM(key); |
| } catch (ForceReattemptException e) { |
| e.checkKey(key); |
| return null; |
| } |
| } |
| |
| /** |
| * This method is intended for testing purposes only. DO NOT use in product code. |
| */ |
| public Object getLocalValueOnDisk(final Object key, int bucketId) { |
| try { |
| BucketRegion br = getInitializedBucketForId(key, Integer.valueOf(bucketId)); |
| return br.getValueOnDisk(key); |
| } catch (ForceReattemptException e) { |
| e.checkKey(key); |
| return null; |
| } |
| } |
| |
| public Object getLocalValueOnDiskOrBuffer(final Object key, int bucketId) { |
| try { |
| BucketRegion br = getInitializedBucketForId(key, Integer.valueOf(bucketId)); |
| return br.getValueOnDiskOrBuffer(key); |
| } catch (ForceReattemptException e) { |
| e.checkKey(key); |
| return null; |
| } |
| } |
| |
| /** |
| * Checks for RegionDestroyedException in case of remoteEvent & localDestroy OR isClosed throws a |
| * ForceReattemptException |
| * |
| * @param br the bucket that we are trying to operate on |
| * @param isOriginRemote true the event we are processing has a remote origin. |
| * |
| */ |
| public void checkRegionDestroyedOnBucket(final BucketRegion br, final boolean isOriginRemote, |
| RegionDestroyedException rde) throws ForceReattemptException { |
| if (isOriginRemote) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Operation failed due to RegionDestroyedException", rde); |
| } |
| if (this.partitionedRegion.isLocallyDestroyed || this.partitionedRegion.isClosed) { |
| throw new ForceReattemptException( |
| "Operation failed due to RegionDestroyedException :" + rde, rde); |
| } else { |
| this.partitionedRegion.checkReadiness(); |
| if (br.isBucketDestroyed()) { |
| throw new ForceReattemptException("Bucket moved", rde); |
| } |
| } |
| } else { |
| // this can now happen due to a rebalance removing a bucket |
| this.partitionedRegion.checkReadiness(); |
| if (br.isBucketDestroyed()) { |
| throw new ForceReattemptException("Bucket moved", rde); |
| } |
| } |
| throw new InternalGemFireError( |
| "Got region destroyed message, but neither bucket nor PR was destroyed", rde); |
| } |
| |
| /** |
| * Create a redundancy bucket on this member |
| * |
| * @param bucketId the id of the bucket to create |
| * @param moveSource the member id of where the bucket is being copied from, if this is a bucket |
| * move. Setting this field means that we are allowed to go over redundancy. |
| * @param forceCreation force the bucket creation, even if it would provide better balance if the |
| * bucket was placed on another node. |
| * @param replaceOffineData create the bucket, even if redundancy is satisfied when considering |
| * offline members. |
| * @param isRebalance true if this is a rebalance |
| * @param creationRequestor the id of the member that is atomically creating this bucket on all |
| * members, if this is an atomic bucket creation. |
| * @return the status of the bucket creation. |
| */ |
| public CreateBucketResult grabBucket(final int bucketId, |
| final InternalDistributedMember moveSource, final boolean forceCreation, |
| final boolean replaceOffineData, final boolean isRebalance, |
| final InternalDistributedMember creationRequestor, final boolean isDiskRecovery) { |
| CreateBucketResult grab = grabFreeBucket(bucketId, partitionedRegion.getMyId(), moveSource, |
| forceCreation, isRebalance, true, replaceOffineData, creationRequestor); |
| if (!grab.nowExists()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Failed grab for bucketId = {}{}{}", this.partitionedRegion.getPRId(), |
| PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId); |
| } |
| // Assert.assertTrue(nList.contains(partitionedRegion.getNode().getMemberId()) , |
| // " grab returned false and b2n does not contains this member."); |
| } else { |
| // try grabbing buckets for all the PR which are colocated with it |
| List colocatedWithList = ColocationHelper.getColocatedChildRegions(partitionedRegion); |
| Iterator itr = colocatedWithList.iterator(); |
| while (itr.hasNext()) { |
| PartitionedRegion pr = (PartitionedRegion) itr.next(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("For bucketId = {} isInitialized {} iscolocation complete {} pr name {}", |
| bucketId, pr.isInitialized(), pr.getDataStore().isColocationComplete(bucketId), |
| pr.getFullPath()); |
| } |
| if ((isDiskRecovery || pr.isInitialized()) |
| && (pr.getDataStore().isColocationComplete(bucketId))) { |
| try { |
| grab = pr.getDataStore().grabFreeBucketRecursively(bucketId, pr, moveSource, |
| forceCreation, replaceOffineData, isRebalance, creationRequestor, isDiskRecovery); |
| } catch (RegionDestroyedException rde) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Failed to grab, colocated region for bucketId = {}{}{} is destroyed.", |
| this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, |
| bucketId); |
| } |
| } |
| if (!grab.nowExists()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Failed grab for bucketId = {}{}{}", this.partitionedRegion.getPRId(), |
| PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId); |
| } |
| // Should Throw Exception-- As discussed in weekly call |
| // " grab returned false and b2n does not contains this member."); |
| } |
| } |
| } |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("Grab attempt on bucketId={}{}{}; grab:{}", this.partitionedRegion.getPRId(), |
| PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, grab); |
| } |
| return grab; |
| } |
| |
| /** |
| * Checks consistency of bucket and meta data before attempting to grab the bucket. |
| * |
| * @return false if bucket should not be grabbed, else true. TODO prpersist - move this to |
| * BucketRegion |
| */ |
| public boolean verifyBucketBeforeGrabbing(final int buckId) { |
| // Consistency checks |
| final boolean isNodeInMetaData = partitionedRegion.getRegionAdvisor().isBucketLocal(buckId); |
| if (isManagingBucket(buckId)) { |
| if (!isNodeInMetaData) { |
| partitionedRegion.checkReadiness(); |
| Set owners = partitionedRegion.getRegionAdvisor().getBucketOwners(buckId); |
| logger.info("Verified nodelist for bucketId={} is {}", |
| partitionedRegion.bucketStringForLogs(buckId), |
| PartitionedRegionHelper.printCollection(owners)); |
| Assert.assertTrue(false, |
| " This node " + partitionedRegion.getNode() + " is managing the bucket with bucketId= " |
| + partitionedRegion.bucketStringForLogs(buckId) + " but doesn't have an entry in " |
| + "b2n region for PR " + partitionedRegion); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("BR#verifyBucketBeforeGrabbing We already host {}{}{}", |
| this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, buckId); |
| } |
| // It's ok to return true here, we do another check later |
| // to make sure we don't host the bucket. |
| return true; |
| } else { |
| if (partitionedRegion.isDestroyed() || partitionedRegion.getGemFireCache().isClosed()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("BR#verifyBucketBeforeGrabbing: Exiting early due to Region destruction"); |
| } |
| return false; |
| } |
| if (isNodeInMetaData) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "PartitionedRegionDataStore: grabBackupBuckets: This node is not managing the bucket with Id = {} but has an entry in the b2n region for PartitionedRegion {} because destruction of this PartitionedRegion is initiated on other node", |
| buckId, partitionedRegion); |
| } |
| } |
| } // End consistency checks |
| return true; |
| } |
| |
| public void executeOnDataStore(final Set localKeys, final Function function, final Object object, |
| final int prid, final int[] bucketArray, final boolean isReExecute, |
| final PartitionedRegionFunctionStreamingMessage msg, long time, ServerConnection servConn, |
| int transactionID) { |
| |
| if (!areAllBucketsHosted(bucketArray)) { |
| throw new BucketMovedException( |
| "Bucket migrated to another node. Please retry."); |
| } |
| final DistributionManager dm = this.partitionedRegion.getDistributionManager(); |
| |
| ResultSender resultSender = new PartitionedRegionFunctionResultSender(dm, |
| this.partitionedRegion, time, msg, function, bucketArray); |
| |
| final RegionFunctionContextImpl prContext = |
| new RegionFunctionContextImpl(getPartitionedRegion().getCache(), function.getId(), |
| this.partitionedRegion, object, localKeys, ColocationHelper |
| .constructAndGetAllColocatedLocalDataSet(this.partitionedRegion, bucketArray), |
| bucketArray, resultSender, isReExecute); |
| |
| FunctionStats stats = FunctionStatsManager.getFunctionStats(function.getId(), dm.getSystem()); |
| long start = stats.startFunctionExecution(function.hasResult()); |
| try { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Executing Function: {} on Remote Node with context: ", function.getId(), |
| prContext); |
| } |
| function.execute(prContext); |
| stats.endFunctionExecution(start, function.hasResult()); |
| } catch (FunctionException functionException) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("FunctionException occurred on remote node while executing Function: {}", |
| function.getId(), functionException); |
| } |
| stats.endFunctionExecutionWithException(start, function.hasResult()); |
| if (functionException.getCause() instanceof QueryInvalidException) { |
| // Handle this exception differently since it can contain |
| // non-serializable objects. |
| // java.io.NotSerializableException: antlr.CommonToken |
| // create a new FunctionException on the original one's message (not cause). |
| throw new FunctionException(functionException.getLocalizedMessage()); |
| } |
| throw functionException; |
| } |
| } |
| |
| public boolean areAllBucketsHosted(final int[] bucketArray) { |
| // boolean arr[] = new boolean[]{false, true, false, true , false , false , false , false }; |
| // Random random = new Random(); |
| // int num = random.nextInt(7); |
| // System.out.println("PRDS.verifyBuckets returning " + arr[num]); |
| // return arr[num]; |
| int bucketlength = BucketSetHelper.length(bucketArray); |
| if (bucketlength == 0) { |
| return true; |
| } |
| int bucket; |
| for (int i = 0; i < bucketlength; i++) { |
| bucket = BucketSetHelper.get(bucketArray, i); |
| if (!this.partitionedRegion.getRegionAdvisor().getBucketAdvisor(bucket).isHosting()) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| |
| /* |
| * @return true if there is a local bucket for the event and it has seen the event |
| */ |
| public boolean hasSeenEvent(EntryEventImpl event) { |
| BucketRegion bucket = getLocalBucketById(event.getKeyInfo().getBucketId()); |
| if (bucket == null) { |
| return false; |
| } else { |
| return bucket.hasSeenEvent(event); |
| } |
| } |
| |
| public void handleInterestEvent(InterestRegistrationEvent event) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("PartitionedRegionDataStore for {} handling {}", |
| this.partitionedRegion.getFullPath(), event); |
| } |
| synchronized (keysOfInterestLock) { |
| boolean isRegister = event.isRegister(); |
| for (Iterator i = event.getKeysOfInterest().iterator(); i.hasNext();) { |
| Object key = i.next(); |
| // Get the reference counter for this key |
| AtomicInteger references = (AtomicInteger) this.keysOfInterest.get(key); |
| int newNumberOfReferences = 0; |
| // If this is a registration event, add interest for this key |
| if (isRegister) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("PartitionedRegionDataStore for {} adding interest for: ", |
| this.partitionedRegion.getFullPath(), key); |
| } |
| if (references == null) { |
| references = new AtomicInteger(); |
| this.keysOfInterest.put(key, references); |
| } |
| newNumberOfReferences = references.incrementAndGet(); |
| } else { |
| // If this is an unregistration event, remove interest for this key |
| if (logger.isDebugEnabled()) { |
| logger.debug("PartitionedRegionDataStore for {} removing interest for: ", |
| this.partitionedRegion.getFullPath(), key); |
| } |
| if (references != null) { |
| newNumberOfReferences = references.decrementAndGet(); |
| // If there are no more references, remove this key |
| if (newNumberOfReferences == 0) { |
| this.keysOfInterest.remove(key); |
| } |
| } |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "PartitionedRegionDataStore for {} now has {} client(s) interested in key {}", |
| this.partitionedRegion.getFullPath(), newNumberOfReferences, key); |
| } |
| } |
| } |
| } |
| |
| class BucketAttributesFactory extends AttributesFactory { |
| @Override |
| protected void setBucketRegion(boolean b) { |
| super.setBucketRegion(b); |
| } |
| } |
| |
| public enum CreateBucketResult { |
| /** Indicates that the bucket was successfully created on this node */ |
| CREATED(true), |
| /** Indicates that the bucket creation failed */ |
| FAILED(false), |
| /** Indicates that the bucket already exists on this node */ |
| ALREADY_EXISTS(true), |
| /** Indicates that redundancy was already satisfied */ |
| REDUNDANCY_ALREADY_SATISFIED(false); |
| |
| private final boolean nowExists; |
| |
| private CreateBucketResult(boolean nowExists) { |
| this.nowExists = nowExists; |
| } |
| |
| boolean nowExists() { |
| return this.nowExists; |
| } |
| } |
| |
| public void updateEntryVersionLocally(Integer bucketId, EntryEventImpl event) |
| throws ForceReattemptException { |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("updateEntryVersionLocally: bucketId={}{}{} for key={}", |
| this.partitionedRegion.getPRId(), PartitionedRegion.BUCKET_ID_SEPARATOR, bucketId, |
| event.getKey()); |
| } |
| final BucketRegion bucketRegion = getInitializedBucketForId(event.getKey(), bucketId); |
| try { |
| event.setRegion(bucketRegion); |
| bucketRegion.basicUpdateEntryVersion(event); |
| |
| // bug 34361: don't send a reply if bucket was destroyed during the op |
| bucketRegion.checkReadiness(); |
| |
| } catch (RegionDestroyedException rde) { |
| checkRegionDestroyedOnBucket(bucketRegion, event.isOriginRemote(), rde); |
| } |
| } |
| } |