blob: d809554cd201a598730e47d82ec65bdcf98fa149 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.control;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.control.ResourceManager;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.DistributionAdvisee;
import com.gemstone.gemfire.distributed.internal.DistributionAdvisor;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
import com.gemstone.gemfire.distributed.internal.ReplyException;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.DSCODE;
import com.gemstone.gemfire.internal.InternalDataSerializer;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.UpdateAttributesProcessor;
import com.gemstone.gemfire.internal.cache.control.InternalResourceManager.ResourceType;
import com.gemstone.gemfire.internal.cache.control.MemoryThresholds.MemoryState;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
/**
* The advisor associated with a {@link ResourceManager}. Allows knowledge of
* remote {@link ResourceManager} state and distribution of local {@link ResourceManager} state.
*
* @author Mitch Thomas
* @since 6.0
*/
public class ResourceAdvisor extends DistributionAdvisor {
private static final Logger logger = LogService.getLogger();
/**
* Message used to push event updates to remote VMs
*/
public static class ResourceProfileMessage extends
HighPriorityDistributionMessage {
// As of 9.0 this message will only ever have a single profile.
// But to be compatible with previous releases we still support
// multiple profiles so that we can handle these messages during
// a rolling upgrade.
private volatile ResourceManagerProfile[] profiles;
private volatile int processorId;
/**
* Default constructor used for de-serialization (used during receipt)
*/
public ResourceProfileMessage() {}
/**
* Constructor used to send profiles to other members.
*
* @param recips Members to send the profile to.
* @param profile Profile to send.
*/
private ResourceProfileMessage(final Set<InternalDistributedMember> recips,
final ResourceManagerProfile profile) {
setRecipients(recips);
this.processorId = 0;
this.profiles = new ResourceManagerProfile[]{profile};
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.distributed.internal.DistributionMessage#process(com.gemstone.gemfire.distributed.internal.DistributionManager)
*/
@Override
protected void process(DistributionManager dm) {
Throwable thr = null;
ResourceManagerProfile p = null;
try {
final GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
if (cache != null && !cache.isClosed()) {
final ResourceAdvisor ra = cache.getResourceManager().getResourceAdvisor();
if (this.profiles != null) {
// Early reply to avoid waiting for the following putProfile call
// to fire (remote) listeners so that the origin member can proceed with
// firing its (local) listeners
for (int i=0; i<this.profiles.length; i++) {
ra.putProfile(this.profiles[i]);
}
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("No cache: {}", this);
}
}
} catch (CancelException e) {
if (logger.isDebugEnabled()) {
logger.debug("Cache closed: {}", this);
}
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
thr = t;
} finally {
if (thr != null) {
dm.getCancelCriterion().checkCancelInProgress(null);
logger.info(LocalizedMessage.create(
LocalizedStrings.ResourceAdvisor_MEMBER_CAUGHT_EXCEPTION_PROCESSING_PROFILE,
new Object[] {p, toString()}), thr);
}
}
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.DataSerializableFixedID#getDSFID()
*/
@Override
public int getDSFID() {
return RESOURCE_PROFILE_MESSAGE;
}
@Override
public void fromData(DataInput in) throws IOException,
ClassNotFoundException {
super.fromData(in);
this.processorId = in.readInt();
final int l = in.readInt();
if (l != -1) {
this.profiles = new ResourceManagerProfile[l];
for (int i=0; i<this.profiles.length; i++) {
final ResourceManagerProfile r = new ResourceManagerProfile();
InternalDataSerializer.invokeFromData(r, in);
this.profiles[i] = r;
}
} else {
this.profiles = null;
}
}
@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
out.writeInt(this.processorId);
if (this.profiles != null) {
out.writeInt(this.profiles.length);
for (int i=0; i<this.profiles.length; i++) {
InternalDataSerializer.invokeToData(this.profiles[i], out);
}
} else {
out.writeInt(-1);
}
}
/**
* Send profiles to the provided members
* @param irm The resource manager which is requesting distribution
* @param recips The recipients of the message
* @param profile Profile to send in this message
* @throws InterruptedException
* @throws ReplyException
*/
public static void send(final InternalResourceManager irm, Set<InternalDistributedMember> recips,
ResourceManagerProfile profile) {
final DM dm = irm.getResourceAdvisor().getDistributionManager();
ResourceProfileMessage r = new ResourceProfileMessage(recips, profile);
dm.putOutgoing(r);
}
@Override
public String getShortClassName() {
return "ResourceProfileMessage";
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(getShortClassName())
.append(" (processorId=").append(this.processorId)
.append("; profiles=[");
for (int i=0; i<this.profiles.length; i++) {
sb.append(this.profiles[i]);
if (i < this.profiles.length-1) {
sb.append(", ");
}
}
sb.append("]");
sb.append(")");
return sb.toString();
}
}
/**
* @param advisee The owner of this advisor
* @see ResourceManager
*/
private ResourceAdvisor(DistributionAdvisee advisee) {
super(advisee);
}
public static ResourceAdvisor createResourceAdvisor(DistributionAdvisee advisee) {
ResourceAdvisor advisor = new ResourceAdvisor(advisee);
advisor.initialize();
return advisor;
}
@Override
protected Profile instantiateProfile(InternalDistributedMember memberId,
int version) {
return new ResourceManagerProfile(memberId, version);
}
private InternalResourceManager getResourceManager() {
return ((GemFireCacheImpl) getAdvisee()).getResourceManager(false);
}
@SuppressWarnings("synthetic-access")
@Override
protected boolean evaluateProfiles(final Profile newProfile,
final Profile oldProfile) {
ResourceManagerProfile oldRMProfile = (ResourceManagerProfile) oldProfile;
ResourceManagerProfile newRMProfile = (ResourceManagerProfile) newProfile;
List<ResourceEvent> eventsToDeliver = new ArrayList<ResourceEvent>();
if (oldRMProfile == null) {
eventsToDeliver.add(new MemoryEvent(ResourceType.HEAP_MEMORY, MemoryState.DISABLED, newRMProfile.heapState, newRMProfile.getDistributedMember(),
newRMProfile.heapBytesUsed, false, newRMProfile.heapThresholds));
eventsToDeliver.add(new MemoryEvent(ResourceType.OFFHEAP_MEMORY, MemoryState.DISABLED, newRMProfile.offHeapState, newRMProfile.getDistributedMember(),
newRMProfile.offHeapBytesUsed, false, newRMProfile.offHeapThresholds));
} else {
if (oldRMProfile.heapState != newRMProfile.heapState) {
eventsToDeliver.add(new MemoryEvent(ResourceType.HEAP_MEMORY, oldRMProfile.heapState, newRMProfile.heapState, newRMProfile.getDistributedMember(),
newRMProfile.heapBytesUsed, false, newRMProfile.heapThresholds));
}
if (newRMProfile.heapState == MemoryState.DISABLED) {
newRMProfile.setHeapData(oldRMProfile.heapBytesUsed, oldRMProfile.heapState, oldRMProfile.heapThresholds);
}
if (oldRMProfile.offHeapState != newRMProfile.offHeapState) {
eventsToDeliver.add(new MemoryEvent(ResourceType.OFFHEAP_MEMORY, oldRMProfile.offHeapState, newRMProfile.offHeapState, newRMProfile.getDistributedMember(),
newRMProfile.offHeapBytesUsed, false, newRMProfile.offHeapThresholds));
}
if (newRMProfile.offHeapState == MemoryState.DISABLED) {
newRMProfile.setOffHeapData(oldRMProfile.offHeapBytesUsed, oldRMProfile.offHeapState, oldRMProfile.offHeapThresholds);
}
}
for (ResourceEvent event : eventsToDeliver) {
getResourceManager().deliverEventFromRemote(event);
}
return true;
}
@Override
public String toString() {
return new StringBuilder().append("ResourceAdvisor for ResourceManager "
+ getAdvisee()).toString();
}
/**
* Profile which shares state with other ResourceManagers.
* The data available in this profile should be enough to
* deliver a {@link MemoryEvent} for any of the CRITICAL {@link MemoryState}s
* @author Mitch Thomas
* @author David Hoots
* @since 6.0
*/
public static class ResourceManagerProfile extends Profile {
//Resource manager related fields
private long heapBytesUsed;
private MemoryState heapState;
private MemoryThresholds heapThresholds;
private long offHeapBytesUsed;
private MemoryState offHeapState;
private MemoryThresholds offHeapThresholds;
// Constructor for de-serialization
public ResourceManagerProfile() {}
// Constructor for sending purposes
public ResourceManagerProfile(InternalDistributedMember memberId,
int version) {
super(memberId, version);
}
public synchronized ResourceManagerProfile setHeapData(final long heapBytesUsed, final MemoryState heapState,
final MemoryThresholds heapThresholds) {
this.heapBytesUsed = heapBytesUsed;
this.heapState = heapState;
this.heapThresholds = heapThresholds;
return this;
}
public synchronized ResourceManagerProfile setOffHeapData(final long offHeapBytesUsed, final MemoryState offHeapState,
final MemoryThresholds offHeapThresholds) {
this.offHeapBytesUsed = offHeapBytesUsed;
this.offHeapState = offHeapState;
this.offHeapThresholds = offHeapThresholds;
return this;
}
public synchronized MemoryEvent createDisabledMemoryEvent(ResourceType resourceType) {
if (resourceType == ResourceType.HEAP_MEMORY) {
return new MemoryEvent(ResourceType.HEAP_MEMORY, this.heapState, MemoryState.DISABLED, getDistributedMember(), this.heapBytesUsed,
false, this.heapThresholds);
}
return new MemoryEvent(ResourceType.OFFHEAP_MEMORY, this.offHeapState, MemoryState.DISABLED, getDistributedMember(), this.offHeapBytesUsed,
false, this.offHeapThresholds);
}
/**
* Used to process incoming Resource Manager profiles. A reply is expected
* to contain a profile with state of the local Resource Manager.
*
* @since 6.0
*/
@Override
public void processIncoming(DistributionManager dm, String adviseePath,
boolean removeProfile, boolean exchangeProfiles,
final List<Profile> replyProfiles) {
final GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
if (cache != null && !cache.isClosed()) {
handleDistributionAdvisee(cache, removeProfile,
exchangeProfiles, replyProfiles);
}
}
@Override
public StringBuilder getToStringHeader() {
return new StringBuilder("ResourceAdvisor.ResourceManagerProfile");
}
@Override
public void fillInToString(StringBuilder sb) {
super.fillInToString(sb);
synchronized (this) {
sb.append("; heapState=").append(this.heapState)
.append("; heapBytesUsed=").append(this.heapBytesUsed)
.append("; heapThresholds=").append(this.heapThresholds)
.append("; offHeapState=").append(this.offHeapState)
.append("; offHeapBytesUsed=").append(this.offHeapBytesUsed)
.append("; offHeapThresholds=").append(this.offHeapThresholds);
}
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
super.fromData(in);
if (InternalDataSerializer.getVersionForDataStream(in).compareTo(Version.GFE_90) >= 0) {
final long heapBytesUsed = in.readLong();
MemoryState heapState = MemoryState.fromData(in);
MemoryThresholds heapThresholds = MemoryThresholds.fromData(in);
setHeapData(heapBytesUsed, heapState, heapThresholds);
final long offHeapBytesUsed = in.readLong();
MemoryState offHeapState = MemoryState.fromData(in);
MemoryThresholds offHeapThresholds = MemoryThresholds.fromData(in);
setOffHeapData(offHeapBytesUsed, offHeapState, offHeapThresholds);
} else {
// pre 9.0
in.readInt(); // currentHeapUsagePercent
long currentHeapBytesUsed = in.readLong();
//enum MemoryEventType (the byte GEMFIRE_ENUM followed by two Strings. The enums: UNKNOWN, EVICTION_UP, EVICT_MORE, EVICTION_DOWN, EVICTION_DISABLED, CRITICAL_UP, CRITICAL_DOWN, CRITICAL_DISABLED)
byte b = in.readByte();
String enumName;
if (b == DSCODE.GEMFIRE_ENUM) {
String enumClass = DataSerializer.readString(in);
assert enumClass.equals("com.gemstone.gemfire.internal.cache.control.MemoryEventType");
enumName = DataSerializer.readString(in);
} else if (b == DSCODE.SERIALIZABLE) {
final byte TC_ENUM = 0x7e;
final byte TC_CLASSDESC = 0x72;
final byte TC_ENDBLOCK = 0x78;
final byte TC_NULL = 0x70;
final byte TC_STRING = 0x74;
in.readShort(); // STREAM_MAGIC
in.readShort(); // STREAM_VERSION
b = in.readByte();
assert b == TC_ENUM : "expected " + b + " to be TC_ENUM " + TC_ENUM;
b = in.readByte();
assert b == TC_CLASSDESC : "expected " + b + " to be TC_CLASSDESC " + TC_CLASSDESC;
String cn = in.readUTF();
//System.out.println("DEBUG enum className=" + cn);
assert cn.equals("com.gemstone.gemfire.internal.cache.control.MemoryEventType");
in.readLong();
in.readByte();
int fields = in.readShort();
while (fields != 0) {
in.readByte();
in.readUTF();
fields--;
}
b = in.readByte();
assert b == TC_ENDBLOCK : "expected " + b + " to be TC_ENDBLOCK " + TC_ENDBLOCK;
// parent classDesc
b = in.readByte();
assert b == TC_CLASSDESC : "expected " + b + " to be TC_CLASSDESC " + TC_CLASSDESC;
cn = in.readUTF();
//System.out.println("DEBUG parent className=" + cn);
assert cn.equals("java.lang.Enum");
in.readLong();
in.readByte();
fields = in.readShort();
while (fields != 0) {
in.readByte();
in.readUTF();
fields--;
}
b = in.readByte();
assert b == TC_ENDBLOCK : "expected " + b + " to be TC_ENDBLOCK " + TC_ENDBLOCK;
b = in.readByte();
assert b == TC_NULL : "expected " + b + " to be TC_NULL " + TC_NULL;
b = in.readByte();
assert b == TC_STRING : "expected " + b + " to be TC_STRING " + TC_STRING;
enumName = in.readUTF();
//System.out.println("DEBUG enumName=" + enumName);
} else {
throw new IllegalStateException("Unexpected byte " + b);
}
in.readDouble(); // tenuredGenerationMaxBytes
in.readBoolean(); // hasTenuredGenerationMaxBytes
float criticalThreshold = in.readFloat();
in.readBoolean(); // hasCriticalThreshold
float evictionThreshold = in.readFloat();
in.readBoolean(); // hasEvictionThreshold
MemoryState heapState;
if (enumName.equals("CRITICAL_UP")) {
heapState = MemoryState.CRITICAL;
} else if (enumName.equals("CRITICAL_DISABLED")) {
heapState = MemoryState.CRITICAL_DISABLED;
} else if (enumName.equals("CRITICAL_DOWN")) {
heapState = MemoryState.NORMAL;
} else {
// We really don't care about the other old states so we just call them normal
heapState = MemoryState.NORMAL;
}
setHeapData(currentHeapBytesUsed, heapState, new MemoryThresholds(0, criticalThreshold, evictionThreshold));
setOffHeapData(0, MemoryState.DISABLED, new MemoryThresholds(0));
}
}
@Override
public void toData(DataOutput out) throws IOException {
final long heapBytesUsed; final MemoryState heapState; final MemoryThresholds heapThresholds;
final long offHeapBytesUsed; final MemoryState offHeapState; final MemoryThresholds offHeapThresholds;
synchronized(this) {
heapBytesUsed = this.heapBytesUsed;
heapState = this.heapState;
heapThresholds = this.heapThresholds;
offHeapBytesUsed = this.offHeapBytesUsed;
offHeapState = this.offHeapState;
offHeapThresholds = this.offHeapThresholds;
}
super.toData(out);
if (InternalDataSerializer.getVersionForDataStream(out).compareTo(Version.GFE_90) >= 0) {
out.writeLong(heapBytesUsed);
heapState.toData(out);
heapThresholds.toData(out);
out.writeLong(offHeapBytesUsed);
offHeapState.toData(out);
offHeapThresholds.toData(out);
} else {
out.writeInt(0); // currentHeapUsagePercent
out.writeLong(heapBytesUsed); // currentHeapBytesUsed
String memoryEventTypeName;
switch (heapState) {
case EVICTION_CRITICAL:
case CRITICAL: memoryEventTypeName = "CRITICAL_UP"; break;
case EVICTION_CRITICAL_DISABLED:
case CRITICAL_DISABLED: memoryEventTypeName = "CRITICAL_DISABLED"; break;
default: memoryEventTypeName = "CRITICAL_DOWN";
}
out.writeByte(DSCODE.GEMFIRE_ENUM);
DataSerializer.writeString("com.gemstone.gemfire.internal.cache.control.MemoryEventType", out);
DataSerializer.writeString(memoryEventTypeName, out);
out.writeDouble(0.0); // tenuredGenerationMaxBytes
out.writeBoolean(false); // hasTenuredGenerationMaxBytes
out.writeFloat(heapThresholds.getCriticalThreshold());
out.writeBoolean(heapThresholds.isCriticalThresholdEnabled());
out.writeFloat(heapThresholds.getEvictionThreshold());
out.writeBoolean(heapThresholds.isEvictionThresholdEnabled());
}
}
@Override
public int getDSFID() {
return RESOURCE_MANAGER_PROFILE;
}
public synchronized MemoryState getHeapState() {
return this.heapState;
}
public synchronized MemoryState getoffHeapState() {
return this.offHeapState;
}
}
/**
* Get set of members whose {@linkplain ResourceManager#setCriticalHeapPercentage(float)
* critical heap threshold} has been met or exceeded. The set does not include the local VM.
* The mutability of this set only effects the elements in the set, not the state
* of the members.
* @return a mutable set of members in the critical state otherwise {@link Collections#EMPTY_SET}
*/
public Set<InternalDistributedMember> adviseCritialMembers() {
return adviseFilter(new Filter() {
@Override
public boolean include(Profile profile) {
ResourceManagerProfile rmp = (ResourceManagerProfile)profile;
return rmp.getHeapState().isCritical();
}});
}
public final boolean isHeapCritical(final InternalDistributedMember member) {
ResourceManagerProfile rmp = (ResourceManagerProfile)getProfile(member);
return rmp != null ? rmp.getHeapState().isCritical() : false;
}
public synchronized void updateRemoteProfile() {
Set<InternalDistributedMember> recips = adviseGeneric();
ResourceManagerProfile profile = new ResourceManagerProfile(getDistributionManager().getId(), incrementAndGetVersion());
getResourceManager().fillInProfile(profile);
ResourceProfileMessage.send(getResourceManager(), recips, profile);
}
@Override
protected void profileRemoved(Profile profile) {
ResourceManagerProfile oldp = (ResourceManagerProfile) profile;
getResourceManager().deliverEventFromRemote(oldp.createDisabledMemoryEvent(ResourceType.HEAP_MEMORY));
getResourceManager().deliverEventFromRemote(oldp.createDisabledMemoryEvent(ResourceType.OFFHEAP_MEMORY));
}
@Override
public void close() {
new UpdateAttributesProcessor(this.getAdvisee(), true).distribute(false);
super.close();
}
}