blob: 5d49db6e2865a9c75147f3d1d1c72ded5b91441d [file] [log] [blame]
/*
* =========================================================================
* (c)Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
* =========================================================================
*/
package com.gemstone.gemfire.internal.cache.partitioned;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.DataSerializable;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.InternalGemFireException;
import com.gemstone.gemfire.cache.InterestPolicy;
import com.gemstone.gemfire.cache.LowMemoryException;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.ProfileListener;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.InternalDataSerializer;
import com.gemstone.gemfire.internal.cache.BucketAdvisor;
import com.gemstone.gemfire.internal.cache.BucketAdvisor.BucketProfile;
import com.gemstone.gemfire.internal.cache.BucketAdvisor.ServerBucketProfile;
import com.gemstone.gemfire.internal.cache.BucketPersistenceAdvisor;
import com.gemstone.gemfire.internal.cache.BucketRegion;
import com.gemstone.gemfire.internal.cache.BucketServerLocation66;
import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.FixedPartitionAttributesImpl;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.InternalRegionArguments;
import com.gemstone.gemfire.internal.cache.Node;
import com.gemstone.gemfire.internal.cache.PRHARedundancyProvider.DataStoreBuckets;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegionStats;
import com.gemstone.gemfire.internal.cache.ProxyBucketRegion;
import com.gemstone.gemfire.internal.cache.control.MemoryThresholds;
import com.gemstone.gemfire.internal.cache.control.ResourceAdvisor;
import com.gemstone.gemfire.internal.cache.persistence.PersistenceAdvisor;
import com.gemstone.gemfire.internal.cache.persistence.PersistentStateListener;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
public class RegionAdvisor extends CacheDistributionAdvisor
{
private static final Logger logger = LogService.getLogger();
/**
* Number of threads allowed to concurrently volunteer for bucket primary.
*/
public static final short VOLUNTEERING_THREAD_COUNT = Integer.getInteger(
"gemfire.RegionAdvisor.volunteeringThreadCount", 1).shortValue();
/**
* Non-thread safe queue for volunteering for primary bucket. Each
* BucketAdvisor for this PR uses this queue. The thread that uses this
* queue is a waiting pool thread. Any thread using this queue must
* synchronize on this queue.
*/
private final Queue volunteeringQueue = new ConcurrentLinkedQueue();
/**
* Semaphore with {@link #VOLUNTEERING_THREAD_COUNT} number of permits to
* control number of threads volunteering for bucket primaries.
*/
private final Semaphore volunteeringSemaphore = new Semaphore(VOLUNTEERING_THREAD_COUNT);
private volatile int lastActiveProfiles = 0;
private volatile int numDataStores = 0;
protected volatile ProxyBucketRegion[] buckets;
private Queue preInitQueue;
private final Object preInitQueueMonitor = new Object();
/**
* Used by to generate redundancy loss alert only once even if more than one
* bucket or PR has lost redundancy. lowRedundancyFlags[0] is true if any
* bucket in this partitioned region has lower than configured redundancy.
* lowRedundancyFlags[1] is true if a warning has been generated for the
* current actual redundancy of this partitioned region. The caller must
* synchronize on lowRedundancyFlags in order to maintain atomicity of
* overall redundancy status and alert.
*/
private final boolean[] lowRedundancyFlags = new boolean[2];
private ConcurrentHashMap<Integer, Set<ServerBucketProfile>> clientBucketProfilesMap;
/**
* Caller must synchronize on the return value.
*
* @return the low redundancy flags for this partitioned region
*/
public boolean[] getLowRedundancyFlags() {
return lowRedundancyFlags;
}
private RegionAdvisor(PartitionedRegion region) {
super(region);
synchronized (this.preInitQueueMonitor) {
this.preInitQueue = new ConcurrentLinkedQueue();
}
this.clientBucketProfilesMap = new ConcurrentHashMap<Integer, Set<ServerBucketProfile>>();
}
public static RegionAdvisor createRegionAdvisor(PartitionedRegion region) {
RegionAdvisor advisor = new RegionAdvisor(region);
advisor.initialize();
return advisor;
}
public PartitionedRegionStats getPartitionedRegionStats() {
return getPartitionedRegion().getPrStats();
}
public synchronized void initializeRegionAdvisor() {
if (this.buckets != null) {
return;
}
PartitionedRegion p = getPartitionedRegion();
int numBuckets = p.getAttributes().getPartitionAttributes().getTotalNumBuckets();
ProxyBucketRegion[] bucs = new ProxyBucketRegion[numBuckets];
InternalRegionArguments args = new InternalRegionArguments();
args.setPartitionedRegionAdvisor(this);
for (int i = 0; i < bucs.length; i++) {
bucs[i] = new ProxyBucketRegion(i, p, args);
bucs[i].initialize();
}
this.buckets = bucs;
}
/**
* Process those profiles which were received during the initialization period.
* It is safe to process these profiles potentially out of order due to the profiles
* version which is established on the sender.
*/
public void processProfilesQueuedDuringInitialization() {
synchronized(this.preInitQueueMonitor) {
Iterator pi = this.preInitQueue.iterator();
boolean finishedInitQueue = false;
try {
while(pi.hasNext()) {
Object o = pi.next();
QueuedBucketProfile qbp = (QueuedBucketProfile) o;
if (!qbp.isRemoval) {
if (logger.isTraceEnabled(LogMarker.DA)) {
logger.trace(LogMarker.DA, "applying queued profile addition for bucket {}", qbp.bucketId);
}
getBucket(qbp.bucketId).getBucketAdvisor().putProfile(
qbp.bucketProfile);
}
else
if (qbp.memberDeparted ||
!getDistributionManager().isCurrentMember(qbp.memberId)) {
boolean crashed;
if (qbp.memberDeparted) {
crashed = qbp.crashed;
}
else {
// TODO not necessarily accurate, but how important is this?
crashed = !stillInView(qbp.memberId);
}
if (logger.isTraceEnabled(LogMarker.DA)) {
logger.trace(LogMarker.DA, "applying queued member departure for all buckets for {}", qbp.memberId);
}
for (int i = 0; i < this.buckets.length; i++) {
BucketAdvisor ba = this.buckets[i].getBucketAdvisor();
ba.removeId(qbp.memberId, crashed, qbp.destroyed, qbp.fromMembershipListener);
} // for
}
else { // apply removal for member still in the view
if (logger.isTraceEnabled(LogMarker.DA)) {
logger.trace(LogMarker.DA, "applying queued profile removal for all buckets for {}", qbp.memberId);
}
for (int i = 0; i < this.buckets.length; i++) {
BucketAdvisor ba = this.buckets[i].getBucketAdvisor();
int serial = qbp.serials[i];
if (serial != ILLEGAL_SERIAL) {
ba.removeIdWithSerial(qbp.memberId, serial, qbp.destroyed);
}
} // for
} // apply removal for member still in the view
} // while
finishedInitQueue = true;
}
finally {
this.preInitQueue = null; // prevent further additions to the queue
this.preInitQueueMonitor.notifyAll();
if (!finishedInitQueue &&
getAdvisee().getCancelCriterion().cancelInProgress() == null) {
logger.error(LocalizedMessage.create(LocalizedStrings.RegionAdvisor_FAILED_TO_PROCESS_ALL_QUEUED_BUCKETPROFILES_FOR_0, getAdvisee()));
}
}
}
}
@Override
protected Profile instantiateProfile(
InternalDistributedMember memberId, int version) {
return new PartitionProfile(memberId, version);
}
/**
* Returns the {@link #volunteeringQueue} used to queue primary volunteering
* tasks by this PR's BucketAdvisors.
*
* @return the volunteering queue for use by this PR's BucketAdvisors
*/
public Queue getVolunteeringQueue() {
return this.volunteeringQueue;
}
/**
* Returns the {@link #volunteeringSemaphore} for controlling the number
* of threads that this PR's BucketAdvisors are allowed to use for
* volunteering to be primary.
*
* @return the semaphore for controlling number of volunteering threads
*/
public Semaphore getVolunteeringSemaphore() {
return this.volunteeringSemaphore;
}
public Map<Integer, List<BucketServerLocation66>> getAllClientBucketProfiles() {
Map<Integer, List<BucketServerLocation66>> bucketToServerLocations = new HashMap<Integer, List<BucketServerLocation66>>();
for (Integer bucketId : this.clientBucketProfilesMap.keySet()) {
ArrayList<BucketServerLocation66> clientBucketProfiles = new ArrayList<BucketServerLocation66>();
for (BucketProfile profile : this.clientBucketProfilesMap.get(bucketId)) {
if (profile.isHosting) {
ServerBucketProfile cProfile = (ServerBucketProfile)profile;
Set<BucketServerLocation66> bucketServerLocations = cProfile.getBucketServerLocations();
// Either we can make BucketServeLocation having ServerLocation with them
// Or we can create bucketServerLocation as it is by iterating over the set of servers
clientBucketProfiles.addAll(bucketServerLocations);
}
}
bucketToServerLocations.put(bucketId, clientBucketProfiles);
}
if (getPartitionedRegion().isDataStore()) {
for (Integer bucketId : getPartitionedRegion().getDataStore().getAllLocalBucketIds()) {
BucketProfile profile = getBucketAdvisor(bucketId).getLocalProfile();
if (logger.isDebugEnabled()) {
logger.debug("The local profile is : {}", profile);
}
if (profile != null) {
List<BucketServerLocation66> clientBucketProfiles = bucketToServerLocations
.get(bucketId);
if (clientBucketProfiles == null) {
clientBucketProfiles = new ArrayList<BucketServerLocation66>();
bucketToServerLocations.put(bucketId, clientBucketProfiles);
}
if ((profile instanceof ServerBucketProfile) && profile.isHosting) {
ServerBucketProfile cProfile = (ServerBucketProfile)profile;
Set<BucketServerLocation66> bucketServerLocations = cProfile
.getBucketServerLocations();
// Either we can make BucketServeLocation having ServerLocation with
// them
// Or we can create bucketServerLocation as it is by iterating over
// the set of servers
clientBucketProfiles.removeAll(bucketServerLocations);
clientBucketProfiles.addAll(bucketServerLocations);
}
}
}
}
return bucketToServerLocations;
}
public ConcurrentHashMap<Integer, Set<ServerBucketProfile>> getAllClientBucketProfilesTest() {
ConcurrentHashMap<Integer, Set<ServerBucketProfile>> map = new ConcurrentHashMap<Integer, Set<ServerBucketProfile>>();
Map<Integer, List<BucketServerLocation66>> testMap = this.getAllClientBucketProfiles();
for (Integer bucketId : testMap.keySet()) {
Set<ServerBucketProfile> parr = this.clientBucketProfilesMap.get(bucketId);
map.put(bucketId, parr);
}
if (getPartitionedRegion().isDataStore()) {
for (Integer bucketId : getPartitionedRegion().getDataStore().getAllLocalBucketIds()) {
BucketProfile profile = getBucketAdvisor(bucketId).getLocalProfile();
if ((profile instanceof ServerBucketProfile) && profile.isHosting) {
map.get(bucketId).add((ServerBucketProfile)profile);
}
}
}
if (logger.isDebugEnabled()) {
logger.debug("This maps is sksk {} and size is {}", map, map.keySet().size());
}
return map;
}
public Set<ServerBucketProfile> getClientBucketProfiles(Integer bucketId) {
return this.clientBucketProfilesMap.get(bucketId);
}
public void setClientBucketProfiles(Integer bucketId,
Set<ServerBucketProfile> oldProfiles) {
this.clientBucketProfilesMap.put(bucketId, oldProfiles);
}
/**
* Close the bucket advisors, releasing any locks for primary buckets
* @return returns a list of primary bucket IDs
*
*/
public List closeBucketAdvisors() {
List primariesHeld = Collections.EMPTY_LIST;
if (this.buckets != null) {
for (int i = 0; i < this.buckets.length; i++) {
ProxyBucketRegion pbr = this.buckets[i];
if (pbr.isPrimary()) {
if (primariesHeld == Collections.EMPTY_LIST) {
primariesHeld = new ArrayList();
}
primariesHeld.add(Integer.valueOf(i));
}
pbr.close();
}
}
return primariesHeld;
}
/**
* Close the adviser and all bucket advisors.
*/
@Override
public void close() {
super.close();
if (this.buckets != null) {
for (int i = 0; i < this.buckets.length; i++) {
this.buckets[i].close();
}
}
}
@Override
public boolean removeId(ProfileId memberId, boolean crashed,
boolean regionDestroyed, boolean fromMembershipListener) {
//It's important that we remove member from the bucket advisors first
//Calling super.removeId triggers redundancy satisfaction, so the bucket
//advisors must have up to data information at that point.
boolean removeBuckets = true;
synchronized (this.preInitQueueMonitor) {
if (this.preInitQueue != null) {
// Queue profile during pre-initialization
assert memberId instanceof InternalDistributedMember;
QueuedBucketProfile qbf = new QueuedBucketProfile(
(InternalDistributedMember)memberId, crashed, regionDestroyed,
fromMembershipListener);
this.preInitQueue.add(qbf);
removeBuckets = false;
}
} // synchronized
if (removeBuckets && this.buckets != null) {
for (int i = 0; i < this.buckets.length; i++) {
ProxyBucketRegion pbr = this.buckets[i];
BucketAdvisor pbra = pbr.getBucketAdvisor();
boolean shouldSync = false;
Profile profile = null;
InternalDistributedMember mbr = null;
if (memberId instanceof InternalDistributedMember) {
mbr = (InternalDistributedMember)memberId;
shouldSync = pbra.shouldSyncForCrashedMember(mbr);
if (shouldSync) {
profile = pbr.getBucketAdvisor().getProfile(memberId);
}
}
boolean removed = pbr.getBucketAdvisor().removeId(memberId, crashed, regionDestroyed, fromMembershipListener);
if (removed && shouldSync) {
pbra.syncForCrashedMember(mbr, profile);
}
}
}
boolean removedId = false;
removedId = super.removeId(memberId, crashed, regionDestroyed, fromMembershipListener);
if (logger.isTraceEnabled(LogMarker.DA)) {
logger.trace(LogMarker.DA, "RegionAdvisor#removeId: removing member from region {}: {}; removed = {}; crashed = {}",
this.getPartitionedRegion().getName(), memberId, removedId, crashed);
}
return removedId;
}
/**
* Clear the knowledge of given member from this advisor.
* In particular, clear the knowledge of remote Bucket locations so that
* we avoid sending partition messages to buckets that will soon be destroyed.
*
* @param memberId member that has closed the region
* @param prSerial serial number of this partitioned region
* @param serials serial numbers of buckets that need to be removed
*/
public void removeIdAndBuckets(InternalDistributedMember memberId,
int prSerial, int serials[], boolean regionDestroyed)
{
if (logger.isTraceEnabled(LogMarker.DA)) {
logger.trace(LogMarker.DA, "RegionAdvisor#removeIdAndBuckets: removing member from region {}: {}; buckets = ({}) serials",
this.getPartitionedRegion().getName(), memberId, (serials == null ? "null" : serials.length));
}
synchronized (this.preInitQueueMonitor) {
if (this.preInitQueue != null) {
// Queue profile during pre-initialization
QueuedBucketProfile qbf = new QueuedBucketProfile(memberId, serials, regionDestroyed);
this.preInitQueue.add(qbf);
return;
}
} // synchronized
// OK, apply the update NOW
if (this.buckets != null) {
if (logger.isTraceEnabled(LogMarker.DA)) {
logger.trace(LogMarker.DA, "RegionAdvisor#removeIdAndBuckets: removing buckets for member{};{}", memberId, this);
}
for (int i = 0; i < this.buckets.length; i++) {
int s = serials[i];
if (s != ILLEGAL_SERIAL) {
if (logger.isTraceEnabled(LogMarker.DA)) {
logger.trace(LogMarker.DA, "RegionAdvisor#removeIdAndBuckets: removing bucket #{} serial {}", i, s);
}
this.buckets[i].getBucketAdvisor().removeIdWithSerial(memberId, s, regionDestroyed);
}
} // for
super.removeIdWithSerial(memberId, prSerial, regionDestroyed);
// super.removeId(memberId);
}
}
/**
* Iterates over all buckets and marks them sick if the given member hosts the
* bucket.
* @param member
* @param sick true if the bucket should be marked sick, false if healthy
*/
public void markBucketsOnMember(DistributedMember member, boolean sick){
//The health profile exchange at cache level should take care of preInitQueue
if (buckets==null) {
return;
}
for(int i=0; i<buckets.length; i++){
if (sick && !this.buckets[i].getBucketOwners().contains(member)) {
continue;
}
this.buckets[i].setBucketSick(member, sick);
if (logger.isDebugEnabled()) {
logger.debug("Marked bucket ({}) {}", getPartitionedRegion().bucketStringForLogs(i),
(this.buckets[i].isBucketSick() ? "sick" : "healthy"));
}
}
}
public void updateBucketStatus(int bucketId, DistributedMember member,
boolean profileRemoved) {
if (profileRemoved) {
this.buckets[bucketId].setBucketSick(member, false);
//getClientBucketProfiles(bucketId).remove();
} else {
ResourceAdvisor advisor = getPartitionedRegion().
getCache().getResourceAdvisor();
boolean sick = advisor.adviseCritialMembers().contains(member);
if (logger.isDebugEnabled()) {
logger.debug("updateBucketStatus:({}):member:{}:sick:{}",
getPartitionedRegion().bucketStringForLogs(bucketId), member, sick);
}
this.buckets[bucketId].setBucketSick(member, sick);
}
}
/**
* throws LowMemoryException if the given bucket is hosted on a member
* which has crossed the ResourceManager threshold.
* @param bucketId
* @param key for bucketId used in exception
* @throws LowMemoryException
*/
public void checkIfBucketSick(final int bucketId, final Object key) throws LowMemoryException{
if (MemoryThresholds.isLowMemoryExceptionDisabled()) {
return;
}
assert this.buckets != null;
assert this.buckets[bucketId] != null;
if (this.buckets[bucketId].isBucketSick()) {
Set<DistributedMember> sm = this.buckets[bucketId].getSickMembers();
if (sm.isEmpty()) {
// check again as this list is obtained under synchronization
// fixes bug 50845
return;
}
if (logger.isDebugEnabled()) {
logger.debug("For bucket {} sick members are ", getPartitionedRegion().bucketStringForLogs(bucketId), sm);
}
throw new LowMemoryException(LocalizedStrings.ResourceManager_LOW_MEMORY_PR_0_KEY_1_MEMBERS_2.toLocalizedString(
new Object[] {getPartitionedRegion().getFullPath(), key, sm}), sm);
}
}
/**
* Profile information for a remote counterpart.
*/
public static final class PartitionProfile extends CacheProfile {
/**
* The number of Mb the VM is allowed to use for the PR
* {@link PartitionedRegion#getLocalMaxMemory()}
*/
public int localMaxMemory;
/**
* A data store is a VM that has a non-zero local max memory,
* Since the localMaxMemory is already sent, there is no need
* to send this state as it's implied in localMaxMemory
*/
public transient boolean isDataStore = false;
/**
* requiresNotification determines whether a member needs to be notified of cache
* operations so that cache listeners and other hooks can be engaged
* @since 5.1
*/
public boolean requiresNotification = false;
/**
* A lock used to order operations that need to know about the imminent closure/destruction
* of a Region
*/
// private StoppableReentrantReadWriteLock isClosingLock = null;
/**
* Track the number of buckets this data store may have, implies isDataStore == true
* This value is NOT sent directly but updated when
* {@link com.gemstone.gemfire.internal.cache.BucketAdvisor}s recieve updates
*/
public transient short numBuckets = 0;
/**
* represents the list of the fixed partitions defined for this region.
*/
public List<FixedPartitionAttributesImpl> fixedPAttrs;
// Indicate the status of shutdown request
public int shutDownAllStatus = PartitionedRegion.RUNNING_MODE;
/** for internal use, required for DataSerializer.readObject */
public PartitionProfile() {
}
public PartitionProfile(InternalDistributedMember memberId, int version) {
super(memberId, version);
this.isPartitioned = true;
}
@Override
protected final int getIntInfo() {
int s = super.getIntInfo();
if (this.requiresNotification) s |= REQUIRES_NOTIFICATION_MASK;
return s;
}
@Override
protected final void setIntInfo(int s)
{
super.setIntInfo(s);
this.requiresNotification = (s & REQUIRES_NOTIFICATION_MASK) != 0;
}
@Override
public void fromData(DataInput in) throws IOException,
ClassNotFoundException {
super.fromData(in);
this.localMaxMemory = in.readInt();
this.isDataStore = this.localMaxMemory > 0;
this.fixedPAttrs = DataSerializer.readObject(in);
this.shutDownAllStatus = in.readInt();
}
@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
out.writeInt(this.localMaxMemory);
DataSerializer.writeObject(this.fixedPAttrs, out);
out.writeInt(this.shutDownAllStatus);
}
// public final StoppableReentrantReadWriteLock.StoppableReadLock getIsClosingReadLock(CancelCriterion stopper) {
// synchronized (this) {
// if (isClosingLock == null) {
// this.isClosingLock = new StoppableReentrantReadWriteLock(stopper);
// }
// }
// return this.isClosingLock.readLock();
// }
// public final Lock getIsClosingWriteLock() {
// return this.isClosingLock.writeLock();
// }
@Override
public StringBuilder getToStringHeader() {
return new StringBuilder("RegionAdvisor.PartitionProfile");
}
@Override
public void fillInToString(StringBuilder sb) {
super.fillInToString(sb);
sb.append("; isDataStore=").append(this.isDataStore)
.append("; requiresNotification=").append(this.requiresNotification)
.append("; localMaxMemory=").append(this.localMaxMemory)
.append("; numBuckets=").append(this.numBuckets);
if(this.fixedPAttrs != null){
sb.append("; FixedPartitionAttributes=").append(this.fixedPAttrs);
}
sb.append("; filterProfile=").append(this.filterProfile);
sb.append("; shutDownAllStatus=").append(this.shutDownAllStatus);
}
@Override
public int getDSFID() {
return PARTITION_PROFILE;
}
} // end class PartitionProfile
public int getNumDataStores() {
final int numProfs = getNumProfiles();
if (this.lastActiveProfiles != numProfs) {
this.numDataStores = adviseDataStore().size();
this.lastActiveProfiles = numProfs;
}
return this.numDataStores;
}
public Set<InternalDistributedMember> adviseDataStore()
{
return this.adviseDataStore(false);
}
/**
* Returns the set of data stores that have finished initialization.
*/
public Set<InternalDistributedMember> adviseInitializedDataStore()
{
Set<InternalDistributedMember> s = adviseFilter(new Filter() {
public boolean include(Profile profile) {
// probably not needed as all profiles for a partitioned region are Partition profiles
if (profile instanceof PartitionProfile) {
PartitionProfile p = (PartitionProfile)profile;
return p.isDataStore && (!p.dataPolicy.withPersistence() || p.regionInitialized);
}
return false;
}
});
return s;
}
/**
* Returns the set of members that are not arrived at specified shutDownAll status
*/
public Set<InternalDistributedMember> adviseNotAtShutDownAllStatus(final int status)
{
Set<InternalDistributedMember> s = adviseFilter(new Filter() {
public boolean include(Profile profile) {
// probably not needed as all profiles for a partitioned region are Partition profiles
if (profile instanceof PartitionProfile) {
PartitionProfile p = (PartitionProfile)profile;
return p.isDataStore && p.shutDownAllStatus < status;
}
return false;
}
});
return s;
}
public void waitForProfileStatus(int status) {
ProfileShutdownListener listener = new ProfileShutdownListener();
addProfileChangeListener(listener);
try {
int memberNum = 0;
String regionName = getPartitionedRegion().getFullPath();
do {
Region pr = getPartitionedRegion().getCache().getRegion(regionName);
if (pr == null || pr.isDestroyed()) break;
Set members = adviseNotAtShutDownAllStatus(status);
memberNum = members.size();
if (memberNum >0) {
if (logger.isDebugEnabled()) {
logger.debug("waitForProfileStatus {} at PR:{}, expecting {} members: {}",
status, getPartitionedRegion().getFullPath(), memberNum, members);
}
listener.waitForChange();
}
} while (memberNum > 0);
} finally {
removeProfileChangeListener(listener);
}
}
/**
* Return a real Set if set to true, which can be modified
* @param realHashSet true if a real set is needed
* @return depending on the realHashSet value may be a HashSet
*/
public Set<InternalDistributedMember> adviseDataStore(boolean realHashSet) {
Set<InternalDistributedMember> s = adviseFilter(new Filter() {
public boolean include(Profile profile) {
// probably not needed as all profiles for a partitioned region are Partition profiles
if (profile instanceof PartitionProfile) {
PartitionProfile p = (PartitionProfile)profile;
return p.isDataStore;
}
return false;
}
});
if (realHashSet) {
if (s == Collections.EMPTY_SET) {
s = new HashSet<InternalDistributedMember>();
}
}
if (logger.isTraceEnabled(LogMarker.DA)) {
logger.trace(LogMarker.DA, "adviseDataStore returning {} from {}", s, toStringWithProfiles());
}
return s;
}
/**
* return the set of the distributed members on which the given
* partition name is defined.
*
*/
public Set<InternalDistributedMember> adviseFixedPartitionDataStores(
final String partitionName) {
Set<InternalDistributedMember> s = adviseFilter(new Filter() {
public boolean include(Profile profile) {
// probably not needed as all profiles for a partitioned region are
// Partition profiles
if (profile instanceof PartitionProfile) {
PartitionProfile p = (PartitionProfile)profile;
if (p.fixedPAttrs != null) {
for (FixedPartitionAttributesImpl fpa : p.fixedPAttrs) {
if (fpa.getPartitionName().equals(partitionName)) {
return true;
}
}
}
}
return false;
}
});
if (s == Collections.EMPTY_SET) {
s = new HashSet<InternalDistributedMember>();
}
if (logger.isTraceEnabled(LogMarker.DA)) {
logger.trace(LogMarker.DA, "adviseFixedPartitionDataStore returning {} from {}", s, toStringWithProfiles());
}
return s;
}
/**
* return a distributed members on which the primary partition for given
* bucket is defined
*
*/
public InternalDistributedMember adviseFixedPrimaryPartitionDataStore(
final int bucketId) {
final List<InternalDistributedMember> fixedPartitionDataStore = new ArrayList<InternalDistributedMember>(
1);
fetchProfiles(new Filter() {
public boolean include(Profile profile) {
if (profile instanceof PartitionProfile) {
PartitionProfile p = (PartitionProfile)profile;
if (p.fixedPAttrs != null) {
for (FixedPartitionAttributesImpl fpa : p.fixedPAttrs) {
if (fpa.isPrimary() && fpa.hasBucket(bucketId)) {
fixedPartitionDataStore.add(0, p.getDistributedMember());
return true;
}
}
}
}
return false;
}
});
if (logger.isTraceEnabled(LogMarker.DA)) {
logger.trace(LogMarker.DA, "adviseFixedPartitionDataStore returning {} from {}", fixedPartitionDataStore, toStringWithProfiles());
}
if(fixedPartitionDataStore.isEmpty()){
return null;
}
return fixedPartitionDataStore.get(0);
}
/**
* Returns the list of all remote FixedPartitionAttributes defined across all
* members for the given partitioned region
*
* @return list of all partitions(primary as well as secondary) defined on
* remote nodes
*/
public List<FixedPartitionAttributesImpl> adviseAllFixedPartitionAttributes() {
final List<FixedPartitionAttributesImpl> allFPAs = new ArrayList<FixedPartitionAttributesImpl>();
fetchProfiles(new Filter() {
public boolean include(final Profile profile) {
if (profile instanceof PartitionProfile) {
final PartitionProfile pp = (PartitionProfile)profile;
if (pp.fixedPAttrs != null) {
allFPAs.addAll(pp.fixedPAttrs);
return true;
}
}
return false;
}
});
return allFPAs;
}
/**
* Returns the list of all FixedPartitionAttributes defined across all members
* of given partitioned region for a given FixedPartitionAttributes
*
* @param fpa
* @return the list of same partitions defined on other nodes(can be primary
* or secondary)
*/
public List<FixedPartitionAttributesImpl> adviseSameFPAs(final FixedPartitionAttributesImpl fpa){
final List<FixedPartitionAttributesImpl> sameFPAs = new ArrayList<FixedPartitionAttributesImpl>();
fetchProfiles(new Filter() {
public boolean include(final Profile profile) {
if (profile instanceof PartitionProfile) {
final PartitionProfile pp = (PartitionProfile)profile;
List<FixedPartitionAttributesImpl> fpaList = pp.fixedPAttrs;
if (fpaList != null) {
int index = fpaList.indexOf(fpa);
if (index != -1) {
sameFPAs.add(fpaList.get(index));
}
return true;
}
}
return false;
}
});
return sameFPAs;
}
/**
* Returns the list of all remote primary FixedPartitionAttributes defined
* across members for the given partitioned region
*
* @return list of all primary partitions defined on remote nodes
*/
public List<FixedPartitionAttributesImpl> adviseRemotePrimaryFPAs() {
final List<FixedPartitionAttributesImpl> remotePrimaryFPAs = new ArrayList<FixedPartitionAttributesImpl>();
fetchProfiles(new Filter() {
public boolean include(final Profile profile) {
if (profile instanceof PartitionProfile) {
final PartitionProfile pp = (PartitionProfile)profile;
List<FixedPartitionAttributesImpl> fpaList = pp.fixedPAttrs;
if (fpaList != null) {
for(FixedPartitionAttributesImpl fpa: fpaList){
if(fpa.isPrimary()){
remotePrimaryFPAs.add(fpa);
return true;
}
}
}
}
return false;
}
});
return remotePrimaryFPAs;
}
/**
* TODO remove this when Primary Bucket impl. is permanently in place
* @param limitNodeList
* @return the node??
*/
public Node adviseSmallestDataStore(final List limitNodeList) {
final HashMap filtSet = new HashMap(limitNodeList.size());
Node n = null;
for(Iterator filtI = limitNodeList.iterator(); filtI.hasNext(); ) {
n = (Node) filtI.next();
filtSet.put(n.getMemberId(), n);
}
final Object[] smallest = new Object[1];
adviseFilter(new Filter() {
short numBucks = Short.MAX_VALUE;
public boolean include(Profile profile) {
if (profile instanceof PartitionProfile) {
PartitionProfile p = (PartitionProfile)profile;
if (filtSet.containsKey(p.getDistributedMember()) && p.numBuckets < this.numBucks) {
smallest[0] = p.getDistributedMember();
this.numBucks = p.numBuckets;
}
}
return false;
}
});
return (Node) filtSet.get(smallest[0]);
}
public List<DistributedMember> orderDataStoresUsingBucketCount(final Set nodes) {
final Set<NodeBucketSize> orderedSet = new TreeSet<NodeBucketSize>();
final List<DistributedMember> orderedList = new ArrayList<DistributedMember>();
final DistributedMember self = getDistributionManager().getDistributionManagerId();
adviseFilter(new Filter() {
public boolean include(Profile profile) {
if (profile instanceof PartitionProfile
&& nodes.contains(profile.getDistributedMember())) {
PartitionProfile p = (PartitionProfile)profile;
orderedSet.add(new NodeBucketSize(p.numBuckets, p
.getDistributedMember()));
return true;
}
else if (profile instanceof PartitionProfile && nodes.contains(self)) {
orderedSet.add(new NodeBucketSize(getBucketSet().size(), self));
return true;
}
return false;
}
});
if (nodes.contains(self) && !orderedSet.contains(new NodeBucketSize(getBucketSet().size(), self))){
orderedSet.add(new NodeBucketSize(getBucketSet().size(),self));
}
for (NodeBucketSize node : orderedSet) {
orderedList.add(node.member);
}
return orderedList;
}
private class NodeBucketSize implements Comparable {
private final int numBuckets;
private final DistributedMember member;
public NodeBucketSize(final int numBuckets, final DistributedMember member) {
this.numBuckets = numBuckets;
this.member = member;
}
public int compareTo(Object o) {
assert o instanceof NodeBucketSize;
NodeBucketSize node = (NodeBucketSize)o;
if (node.numBuckets > this.numBuckets) {
return 1;
}
return -1;
}
@Override
public String toString() {
return "NodeBucketSize [ member =" + member + " numBuckets = "
+ numBuckets + "]";
}
@Override
public int hashCode() {
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof NodeBucketSize)) {
return false;
}
NodeBucketSize node = (NodeBucketSize)obj;
if (this.member.getId().equals(node.member.getId())) {
return true;
}
return false;
}
}
public Set adviseAllPRNodes()
{
return adviseFilter(new Filter() {
public boolean include(Profile profile) {
CacheProfile prof = (CacheProfile)profile;
return prof.isPartitioned;
}
});
}
/**
* return the set of all members who must receive operation notifications
* @since 5.1
* */
public Set adviseRequiresNotification(final EntryEventImpl event) {
return adviseFilter(new Filter() {
public boolean include(Profile profile) {
if (profile instanceof PartitionProfile) {
PartitionProfile prof = (PartitionProfile)profile;
if (prof.isPartitioned) {
if (prof.hasCacheListener) {
InterestPolicy pol = prof.subscriptionAttributes.getInterestPolicy();
if (pol == InterestPolicy.ALL) {
return true;
}
}
if (prof.requiresNotification) {
return true;
}
return false;
}
}
return false;
}
});
}
@Override
final synchronized public boolean putProfile(Profile p)
{
assert p instanceof CacheProfile;
CacheProfile profile = (CacheProfile)p;
PartitionedRegion pr = getPartitionedRegion();
if (profile.hasCacheLoader) {
pr.setHaveCacheLoader();
}
// don't keep FilterProfiles around in accessors. They're needed only for
// routing messages in data stors
if (profile.filterProfile != null) {
if (!pr.isDataStore()) {
profile.filterProfile = null;
}
}
return super.putProfile(profile);
}
final public PartitionProfile getPartitionProfile(InternalDistributedMember id) {
return (PartitionProfile) getProfile(id);
}
public boolean isPrimaryForBucket(int bucketId) {
if (this.buckets == null) {
return false;
}
return this.buckets[bucketId].isPrimary();
}
/**
* Returns true if the bucket is currently being hosted locally. Note that
* as soon as this call returns, this datastore may begin to host the
* bucket, thus two calls in a row may be different.
*
* @param bucketId the index of the bucket to check
* @return true if the bucket is currently being hosted locally
*/
public boolean isBucketLocal(int bucketId) {
if (this.buckets == null) {
return false;
}
return this.buckets[bucketId].getHostedBucketRegion() != null;
}
public boolean areBucketsInitialized() {
return this.buckets != null;
}
/**
* Returns the real BucketRegion if it's currently locally hosted. Otherwise
* the ProxyBucketRegion is returned. Note that this member may be in the
* process of hosting the real bucket. Until that has completed, getBucket
* will continue to return the ProxyBucketRegion.
*
* @param bucketId the index of the bucket to retrieve
* @return the bucket identified by bucketId
*/
public Bucket getBucket(int bucketId) {
Assert.assertTrue(this.buckets != null);
ProxyBucketRegion pbr = this.buckets[bucketId];
Bucket ret = pbr.getHostedBucketRegion();
if (ret != null) {
return ret;
} else {
return pbr;
}
}
/**
* Returns the BucketAdvisor for the specified bucket.
*
* @param bucketId the index of the bucket to retrieve the advisor for
* @return the bucket advisor identified by bucketId
*/
public BucketAdvisor getBucketAdvisor(int bucketId) {
Assert.assertTrue(this.buckets != null);
ProxyBucketRegion pbr = this.buckets[bucketId];
Bucket ret = pbr.getHostedBucketRegion();
if (ret != null) {
return ret.getBucketAdvisor();
} else {
return pbr.getBucketAdvisor();
}
}
public Map<Integer,BucketAdvisor> getAllBucketAdvisors() {
Assert.assertTrue(this.buckets != null);
Map<Integer,BucketAdvisor> map = new HashMap<Integer,BucketAdvisor>();
for(int i=0; i<buckets.length; i++){
ProxyBucketRegion pbr = this.buckets[i];
Bucket ret = pbr.getHostedBucketRegion();
if (ret != null) {
map.put(ret.getId(),ret.getBucketAdvisor());
}
}
return map;
}
/**
*
* @return array of serial numbers for buckets created locally
*/
public int[] getBucketSerials() {
if (this.buckets == null) {
return new int[0];
}
int result[] = new int[this.buckets.length];
for (int i = 0; i < result.length; i ++) {
ProxyBucketRegion pbr = this.buckets[i];
Bucket b = pbr.getCreatedBucketRegion();
if (b == null) {
result[i] = ILLEGAL_SERIAL;
}
else {
result[i] = b.getSerialNumber();
}
}
return result;
}
// For SQLFabric ALTER TABLE, need to reset the parentAdvisors if colocated
// region changes
public void resetBucketAdvisorParents() {
if (this.buckets != null) {
for (ProxyBucketRegion pbr : this.buckets) {
if (pbr.getCreatedBucketRegion() != null) {
throw new InternalGemFireException(
LocalizedStrings.RegionAdvisor_CANNOT_RESET_EXISTING_BUCKET
.toLocalizedString(new Object[] {
pbr.getPartitionedRegion().getFullPath(),
pbr.getBucketId() }));
}
pbr.getBucketAdvisor().resetParentAdvisor(pbr.getBucketId());
}
}
}
/**
* Returns the bucket identified by bucketId after waiting for initialization
* to finish processing queued profiles. Call synchronizes and waits on
* {@link #preInitQueueMonitor}.
*
* @param bucketId the bucket identifier
* @return the bucket identified by bucketId
* @throws com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException if interrupted
* for shutdown cancellation
*/
public Bucket getBucketPostInit(int bucketId) {
synchronized (this.preInitQueueMonitor) {
boolean interrupted = false;
try {
while (this.preInitQueue != null) {
try {
this.preInitQueueMonitor.wait(); // spurious wakeup ok
}
catch (InterruptedException e) {
interrupted = true;
this.getAdvisee().getCancelCriterion().checkCancelInProgress(e);
}
}
}
finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
return getBucket(bucketId);
}
/**
* Get the most recent primary node for the bucketId. Returns null if no
* primary can be found within {@link com.gemstone.gemfire.distributed.internal.DistributionConfig#getMemberTimeout}.
* @param bucketId
* @return the Node managing the primary copy of the bucket
*/
final public InternalDistributedMember getPrimaryMemberForBucket(int bucketId) {
Assert.assertTrue(this.buckets != null);
Bucket b = this.buckets[bucketId];
return b.getBucketAdvisor().getPrimary();
}
/**
* Return the node favored for reading for the given bucket
* @param bucketId the bucket we want to read
* @return the member, possibly null if no member is available
*/
public InternalDistributedMember getPreferredNode(int bucketId) {
Assert.assertTrue(this.buckets != null);
Bucket b = this.buckets[bucketId];
return b.getBucketAdvisor().getPreferredNode();
}
public boolean isStorageAssignedForBucket(int bucketId)
{
Assert.assertTrue(this.buckets != null);
return this.buckets[bucketId].getBucketRedundancy() >= 0;
}
/**
* @param bucketId the bucket to check redundancy on
* @param minRedundancy the amount of expected redundancy; ignored if wait is false
* @param wait true if caller wants us to wait for redundancy
* @return true if redundancy on given bucket is detected
*/
public boolean isStorageAssignedForBucket(int bucketId,
int minRedundancy,
boolean wait)
{
if (!wait) {
return isStorageAssignedForBucket(bucketId);
} else {
Assert.assertTrue(this.buckets != null);
return this.buckets[bucketId].getBucketAdvisor()
.waitForRedundancy(minRedundancy);
}
}
public boolean waitForLocalBucketStorage(int bucketId)
{
Assert.assertTrue(this.buckets != null);
return this.buckets[bucketId].getBucketAdvisor()
.waitForStorage();
}
/**
* Get the redundancy of the this bucket, taking into
* account the local bucket, if any.
* @return number of redundant copies for a given bucket, or -1 if
* there are no instances of the bucket.
*/
public int getBucketRedundancy(int bucketId)
{
Assert.assertTrue(this.buckets != null);
return this.buckets[bucketId].getBucketRedundancy();
}
/**
* Return the set of all members who currently own the bucket, including the local
* owner, if applicable
* @return a set of {@link InternalDistributedMember}s that own the bucket
*/
public Set<InternalDistributedMember> getBucketOwners(int bucketId) {
Assert.assertTrue(this.buckets != null);
return this.buckets[bucketId].getBucketOwners();
}
/**
* Return the set of buckets which have storage assigned
*
* @return set of Integer bucketIds
*/
public Set<Integer> getBucketSet()
{
Assert.assertTrue(this.buckets != null);
return new BucketSet();
}
public ProxyBucketRegion[] getProxyBucketArray()
{
return this.buckets;
}
private class BucketSet extends AbstractSet {
final ProxyBucketRegion[] pbrs;
public BucketSet() {
this.pbrs = RegionAdvisor.this.buckets;
Assert.assertTrue(this.pbrs != null);
}
@Override
public Object[] toArray() {
// A somewhat wasteful impl. but required because the size is not fixed
ArrayList ar = new ArrayList(this.pbrs.length);
try {
for (Iterator e = iterator(); e.hasNext(); ) {
ar.add(e.next());
}
} catch (NoSuchElementException allDone) {
}
return ar.toArray();
}
@Override
public Object[] toArray(Object p_a[]) {
Object a[] = p_a;
// Some what wasteful, but needed because size is not fixed
Object[] oa = toArray();
if (a.length < oa.length) {
a = (Object[])java.lang.reflect.Array.newInstance(
a.getClass().getComponentType(), oa.length);
System.arraycopy(oa, 0, a, 0, oa.length);
}
for (int i=0; i<oa.length; i++)
a[i] = oa[i];
if (a.length > oa.length)
a[oa.length] = null;
return a;
}
/*
* Note: The consistency between size(), hasNext() and next() is weak, meaning
* that the state of the backing Set may change causing more or less elements
* to be available after calling size()
*/
@Override
public int size()
{
return this.pbrs.length;
}
@Override
public Iterator iterator()
{
return new BucketSetIterator();
}
class BucketSetIterator implements Iterator {
private int currentItem = -1;
public void remove()
{
throw new UnsupportedOperationException();
}
/*
* Note: The consistency guarantee between hasNext() and next() is
* weak. It's possible hasNext() will return true
* and a following call to next() may throw NoSuchElementException (due to loss
* of bucket storage). Its also
* equally possible for hasNext() to return false and a subsequent call to next() will return
* a valid bucketid.
*/
public boolean hasNext()
{
if (getPartitionedRegion().isFixedPartitionedRegion()) {
if (this.currentItem + 1 < BucketSet.this.pbrs.length) {
int possibleBucketId = this.currentItem;
boolean bucketExists = false;
List<FixedPartitionAttributesImpl> fpaList = adviseAllFixedPartitionAttributes();
List<FixedPartitionAttributesImpl> localFpas = getPartitionedRegion().getFixedPartitionAttributesImpl();
if(localFpas != null){
fpaList.addAll(localFpas);
}
while (++possibleBucketId < BucketSet.this.pbrs.length
&& !bucketExists) {
for (FixedPartitionAttributesImpl fpa : fpaList) {
if (fpa.hasBucket(possibleBucketId)) {
bucketExists = true;
break;
}
}
}
return bucketExists;
}
else {
return false;
}
}
else {
return this.currentItem + 1 < BucketSet.this.pbrs.length;
}
}
public Object next() {
if (++this.currentItem < BucketSet.this.pbrs.length) {
if (isStorageAssignedForBucket(this.currentItem)) {
return Integer.valueOf(this.currentItem);
}
else {
if (getPartitionedRegion().isFixedPartitionedRegion()) {
boolean bucketExists = false;
List<FixedPartitionAttributesImpl> fpaList = adviseAllFixedPartitionAttributes();
List<FixedPartitionAttributesImpl> localFpas = getPartitionedRegion().getFixedPartitionAttributesImpl();
if(localFpas != null){
fpaList.addAll(localFpas);
}
do {
for (FixedPartitionAttributesImpl fpa : fpaList) {
if (fpa.hasBucket(this.currentItem)) {
bucketExists = true;
break;
}
}
if (!bucketExists) {
this.currentItem++;
}
} while (this.currentItem < BucketSet.this.pbrs.length && !bucketExists);
if (bucketExists) {
getPartitionedRegion().createBucket(this.currentItem, 0, null);
return Integer.valueOf(this.currentItem);
}
} else {
getPartitionedRegion().createBucket(this.currentItem, 0, null);
return Integer.valueOf(this.currentItem);
}
}
}
throw new NoSuchElementException();
}
}
}
/**
* Obtain the ordered {@link ArrayList} of data stores limited to those specified
* in the provided memberFilter.
* @param memberFilter the set of members allowed to be in the list.
* @return a list of DataStoreBuckets
*/
public ArrayList<DataStoreBuckets> adviseFilteredDataStores(final Set<InternalDistributedMember> memberFilter)
{
final HashMap<InternalDistributedMember, Integer> memberToPrimaryCount = new HashMap<InternalDistributedMember, Integer>();
for(int i=0; i<this.buckets.length; i++) {
ProxyBucketRegion pbr = this.buckets[i];
// quick dirty check
InternalDistributedMember p=pbr.getBucketAdvisor().basicGetPrimaryMember();
if (p!=null) {
Integer count = memberToPrimaryCount.get(p);
if (count != null) {
memberToPrimaryCount.put(p, Integer.valueOf(count.intValue() + 1));
} else {
memberToPrimaryCount.put(p, Integer.valueOf(1));
}
}
}
final ArrayList<DataStoreBuckets> ds = new ArrayList<DataStoreBuckets>(memberFilter.size());
adviseFilter(new Filter() {
public boolean include(Profile profile) {
if (profile instanceof PartitionProfile) {
PartitionProfile p = (PartitionProfile)profile;
if(memberFilter.contains(p.getDistributedMember())) {
Integer priCount = memberToPrimaryCount.get(p.getDistributedMember());
int primaryCount = 0;
if (priCount != null) {
primaryCount = priCount.intValue();
}
ds.add(new DataStoreBuckets(p.getDistributedMember(), p.numBuckets, primaryCount, p.localMaxMemory));
}
}
return false;
}
});
return ds;
}
public void incrementBucketCount(Profile p)
{
PartitionProfile pp = (PartitionProfile) getProfile(p.getDistributedMember());
if (pp != null) {
Assert.assertTrue(pp.isDataStore);
pp.numBuckets++;
}
}
public void decrementsBucketCount(Profile p)
{
PartitionProfile pp = (PartitionProfile) getProfile(p.getDistributedMember());
if (pp != null) {
Assert.assertTrue(pp.isDataStore);
pp.numBuckets--;
if (pp.numBuckets < 0) {
pp.numBuckets = 0;
}
}
}
/**
* Dumps out all profiles in this advisor AND all buckets. Callers should check
* for debug enabled.
* @param infoMsg prefix message to log
*/
@Override
public void dumpProfiles(String infoMsg) {
if (logger.isDebugEnabled()) {
logger.debug("[dumpProfiles] dumping {}", this.toStringWithProfiles());
}
// 1st dump all profiles for this RegionAdvisor
super.dumpProfiles(infoMsg);
// 2nd dump all profiles for each BucketAdvisor
ProxyBucketRegion[] pbrs = this.buckets;
if (pbrs == null) {
return;
}
for (int i = 0; i < pbrs.length; i++) {
pbrs[i].getBucketAdvisor().dumpProfiles(infoMsg);
BucketPersistenceAdvisor persistentAdvisor = pbrs[i].getPersistenceAdvisor();
if(persistentAdvisor != null) {
persistentAdvisor.dump(infoMsg);
}
}
}
public void notPrimary(int bucketId, InternalDistributedMember wasPrimary)
{
Assert.assertTrue(this.buckets != null);
ProxyBucketRegion b = this.buckets[bucketId];
b.getBucketAdvisor().notPrimary(wasPrimary);
}
/**
* Find the set of members which own primary buckets, including the local member
* @return set of InternalDistributedMember ids
*/
public Set advisePrimaryOwners()
{
Assert.assertTrue(this.buckets != null);
ProxyBucketRegion[] bucs = this.buckets;
HashSet hs = new HashSet();
for (int i = 0; i < bucs.length; i++) {
if (isStorageAssignedForBucket(i)) {
InternalDistributedMember mem = bucs[i].getBucketAdvisor().getPrimary();
if (mem != null) {
hs.add(mem);
}
}
}
return hs;
}
/**
* A visitor interface for the buckets of this region used by
* {@link RegionAdvisor#accept(BucketVisitor, Object)}.
*/
public static interface BucketVisitor<T> {
/**
* Visit a given {@link ProxyBucketRegion} accumulating the results in the
* given aggregate. Returns false when the visit has to be terminated.
*/
boolean visit(RegionAdvisor advisor, ProxyBucketRegion pbr, T aggregate);
}
/**
* Invoke the given {@link BucketVisitor} on all the {@link ProxyBucketRegion}
* s exiting when the {@link BucketVisitor#visit} method returns false.
*
* @param <T>
* the type of object used for aggregation of results
* @param visitor
* the {@link BucketVisitor} to use for the visit
* @param aggregate
* an aggregate object that will be used to for aggregation of
* results by the {@link BucketVisitor#visit} method; this allows the
* {@link BucketVisitor} to not maintain any state so that in most
* situations a global static object encapsulating the required
* behaviour will work
*
* @return true when the full visit completed, and false if it was terminated
* due to {@link BucketVisitor#visit} returning false
*/
public <T> boolean accept(BucketVisitor<T> visitor, T aggregate) {
final ProxyBucketRegion[] bucs = this.buckets;
Assert.assertTrue(bucs != null);
for (ProxyBucketRegion pbr : bucs) {
if (!visitor.visit(this, pbr, aggregate)) {
return false;
}
}
return true;
}
public PartitionedRegion getPartitionedRegion() {
return (PartitionedRegion) getAdvisee();
}
/**
* Update or create a bucket's meta-data
* If this advisor has not completed initialization, upon return the profile will be enqueued for
* processing during initialization, otherwise the profile will be immediately processed.
* This architecture limits the blockage of threads during initialization.
* @param bucketId the unique identifier of the bucket
* @param profile the bucket meta-data from a particular member with the bucket
*/
public void putBucketProfile(int bucketId, BucketProfile profile) {
synchronized (this.preInitQueueMonitor) {
if (this.preInitQueue != null) {
// Queue profile during pre-initialization
QueuedBucketProfile qbf = new QueuedBucketProfile(bucketId, profile);
this.preInitQueue.add(qbf);
return;
}
}
// Directly process profile post-initialization
getBucket(bucketId).getBucketAdvisor().putProfile(profile);
}
static class QueuedBucketProfile {
protected final int bucketId;
protected final BucketProfile bucketProfile;
/** true means that this member has departed the view */
protected final boolean memberDeparted;
/** true means that this profile needs to be removed */
protected final boolean isRemoval;
/** true means that the peer crashed */
protected final boolean crashed;
/** true means that this QueuedBucketProfile was created
* because of MembershipListener invocation
*/
protected final boolean fromMembershipListener;
protected final boolean destroyed;
protected final InternalDistributedMember memberId;
protected final int serials[];
/**
* Queue up an addition
* @param bId the bucket being added
* @param p the profile to add
*/
public QueuedBucketProfile(int bId, BucketProfile p) {
this.bucketId = bId;
this.bucketProfile = p;
this.isRemoval = false;
this.crashed = false;
this.memberDeparted = false;
this.memberId = null;
this.serials = null;
this.destroyed = false;
this.fromMembershipListener = false;
}
/**
* Queue up a removal due to member leaving the view
* @param mbr the member being removed
*/
public QueuedBucketProfile(InternalDistributedMember mbr, boolean crashed,
boolean destroyed, boolean fromMembershipListener) {
this.bucketId = 0;
this.bucketProfile = null;
this.isRemoval = true;
this.crashed = crashed;
this.memberDeparted = true;
this.memberId = mbr;
this.serials = null;
this.destroyed = destroyed;
this.fromMembershipListener = fromMembershipListener;
}
/**
* Queue up a removal due to region destroy
* @param mbr the member being removed
* @param serials the serials it had
*/
public QueuedBucketProfile(InternalDistributedMember mbr, int serials[], boolean destroyed) {
this.bucketId = 0;
this.bucketProfile = null;
this.isRemoval = true;
this.crashed = false;
this.memberDeparted = false;
this.memberId = mbr;
this.serials = serials;
this.destroyed = destroyed;
this.fromMembershipListener = false;
}
}
public Set adviseBucketProfileExchange() {
return adviseDataStore();
}
public long adviseTotalMemoryAllocation() {
final AtomicLong total = new AtomicLong();
adviseFilter(new Filter() {
public boolean include(Profile profile) {
// probably not needed as all profiles for a partitioned region are Partition profiles
if (profile instanceof PartitionProfile) {
PartitionProfile p = (PartitionProfile)profile;
total.addAndGet(p.localMaxMemory);
}
return false;
}
});
return total.get();
}
public long adviseTotalMemoryAllocationForFPR() {
final AtomicLong total = new AtomicLong();
adviseFilter(new Filter() {
public boolean include(Profile profile) {
// probably not needed as all profiles for a partitioned region are Partition profiles
if (profile instanceof PartitionProfile) {
PartitionProfile p = (PartitionProfile)profile;
if (p.fixedPAttrs != null) {
total.addAndGet(p.localMaxMemory);
}
}
return false;
}
});
return total.get();
}
/**
* Returns true if there are any buckets created anywhere in the distributed
* system for this partitioned region.
*/
public boolean hasCreatedBuckets() {
final ProxyBucketRegion[] bucs = this.buckets;
if (bucs != null) {
for (int i = 0; i < bucs.length; i++) {
if (bucs[i].getBucketOwnersCount() > 0) {
return true;
}
}
}
return false;
}
/**
* Returns the total number of buckets created anywhere in the distributed
* system for this partitioned region.
*
* @return the total number of buckets created anywhere for this PR
*/
public int getCreatedBucketsCount() {
final ProxyBucketRegion[] bucs = this.buckets;
if (bucs == null) {
return 0;
}
int createdBucketsCount = 0;
for (int i = 0; i < bucs.length; i++) {
if (bucs[i].getBucketOwnersCount() > 0) {
createdBucketsCount++;
}
}
return createdBucketsCount;
}
/**
* Returns a possibly null list of this advisor's real bucket profiles.
* A real bucket profile is one that for a bucket that actually has storage
* in this vm.
* @return a list of BucketProfileAndId instances; may be null
* @since 5.5
*/
public ArrayList getBucketRegionProfiles() {
final ProxyBucketRegion[] bucs = this.buckets;
if (bucs == null) {
return null;
}
ArrayList result = new ArrayList(bucs.length);
for (int i = 0; i < bucs.length; i++) {
//Fix for 41436 - we need to include buckets that are still initializing here
//we must start including buckets in this list *before* those buckets exchange
//profiles.
BucketRegion br = bucs[i].getCreatedBucketRegion();
if (br != null) {
result.add(new BucketProfileAndId(br.getProfile(), i));
}
}
if (result.size() == 0) {
result = null;
}
return result;
}
/**
* Takes a list of BucketProfileAndId and adds them to thsi advisors
* proxy buckets.
* @since 5.5
*/
public void putBucketRegionProfiles(ArrayList l) {
int size = l.size();
for (int i=0; i < size; i++) {
BucketProfileAndId bp = (BucketProfileAndId)l.get(i);
int id = bp.getId();
getBucket(id).getBucketAdvisor().putProfile(bp.getBucketProfile());
}
}
@Override
protected void profileRemoved(Profile profile) {
if (logger.isDebugEnabled()) {
logger.debug("RA: removing profile {}", profile);
}
if (getAdvisee() instanceof PartitionedRegion) {
((PartitionedRegion)getAdvisee()).removeMemberFromCriticalList(profile.peerMemberId);
}
if (this.buckets != null) {
for (int i = 0; i < this.buckets.length; i++) {
this.buckets[i].getBucketAdvisor().checkForLostPrimaryElector(profile);
}
}
}
public void addPersistenceListener(PersistentStateListener listener) {
for(int i = 0; i < buckets.length; i++) {
PersistenceAdvisor advisor = buckets[i].getPersistenceAdvisor();
if(advisor != null) {
advisor.addListener(listener);
}
}
}
public static class BucketProfileAndId implements DataSerializable {
private static final long serialVersionUID = 332892607792421553L;
/*final*/ private int id; // bid = bucket id
/*final*/ private BucketProfile bp;
private boolean isServerBucketProfile = false;
public BucketProfileAndId(Profile bp, int id) {
this.id = id;
this.bp = (BucketProfile)bp;
if(bp instanceof ServerBucketProfile)
isServerBucketProfile = true;
}
public BucketProfileAndId() {
}
public int getId() {
return this.id;
}
public BucketProfile getBucketProfile() {
return this.bp;
}
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
this.id = in.readInt();
this.isServerBucketProfile = in.readBoolean();
if(this.isServerBucketProfile)
this.bp = new ServerBucketProfile();
else
this.bp = new BucketProfile();
InternalDataSerializer.invokeFromData(this.bp, in);
}
public void toData(DataOutput out) throws IOException {
out.writeInt(this.id);
out.writeBoolean(this.isServerBucketProfile);
InternalDataSerializer.invokeToData(this.bp, out);
}
@Override
public String toString() {
return "BucketProfileAndId (profile=" + bp + "; id=" + id + ")";
}
}
// profile listener to monitor remote member unexpected leave during shutdownAll
private class ProfileShutdownListener implements ProfileListener {
ProfileShutdownListener() {
}
private boolean profileChanged = false;
public void waitForChange() {
Region pr = getPartitionedRegion();
synchronized(this) {
while (!profileChanged && pr != null && !pr.isDestroyed()) {
// the advisee might have been destroyed due to initialization failure
try {
this.wait(1000);
} catch (InterruptedException e) {
}
}
this.profileChanged = false;
}
}
public void profileCreated(Profile profile) {
profileUpdated(profile);
}
public void profileRemoved(Profile profile, boolean regionDestroyed) {
// if a profile is gone, notify
synchronized(this) {
this.profileChanged = true;
this.notifyAll();
}
}
public void profileUpdated(Profile profile) {
// when updated, notify the loop in GFC to check the list again
synchronized(this) {
this.profileChanged = true;
this.notifyAll();
}
}
}
}