blob: 519561468f775ed02c111486f0632f744aa64fa6 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.DataSerializable;
import com.gemstone.gemfire.cache.CacheLoader;
import com.gemstone.gemfire.cache.CacheLoaderException;
import com.gemstone.gemfire.cache.CacheWriterException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.ExpirationAction;
import com.gemstone.gemfire.cache.ExpirationAttributes;
import com.gemstone.gemfire.cache.LoaderHelper;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.RegionAttributes;
import com.gemstone.gemfire.cache.RegionExistsException;
import com.gemstone.gemfire.cache.TimeoutException;
import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.cache.ha.HARegionQueue;
import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.tier.sockets.HAEventWrapper;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
/**
* This region is being implemented to suppress distribution of puts and to
* allow localDestroys on mirrored regions.
*
* @author Mitul Bid
* @since 4.3
*
*/
public final class HARegion extends DistributedRegion
{
private static final Logger logger = LogService.getLogger();
CachePerfStats haRegionStats;
// Prevent this region from participating in a TX, bug 38709
@Override
protected boolean isSecret() {
return true;
}
@Override
protected boolean isCopyOnRead() {
return false;
}
@Override
public boolean doesNotDistribute() {
return true;
}
@Override
protected StringBuilder getStringBuilder() {
StringBuilder buf = new StringBuilder();
buf.append("HARegion");
buf.append("[path='")
.append(getFullPath());
return buf;
}
// protected Object conditionalCopy(Object o) {
// return o;
// }
private volatile HARegionQueue owningQueue;
// private Map giiProviderStates;
/**
* @param regionName
* @param attrs
* @param parentRegion
* @param cache
*/
private HARegion(String regionName, RegionAttributes attrs,
LocalRegion parentRegion, GemFireCacheImpl cache) {
super(regionName, attrs, parentRegion, cache, new InternalRegionArguments()
.setDestroyLockFlag(true).setRecreateFlag(false)
.setSnapshotInputStream(null).setImageTarget(null));
this.haRegionStats = new DummyCachePerfStats();
}
@Override
public boolean allowsPersistence() {
return false;
}
/**
* Updates never distributed from buckets.
* @since 5.7
*/
@Override
protected void distributeUpdate(EntryEventImpl event, long lastModifiedTime, boolean ifNew, boolean ifOld, Object expectedOldValue, boolean requireOldValue) {
}
@Override
public void createEventTracker() {
// event trackers aren't needed for HARegions
}
/**
* void implementation over-riding the method to allow localDestroy on
* mirrored regions
*
* @param event
*/
@Override
protected void checkIfReplicatedAndLocalDestroy(EntryEventImpl event) {
}
@Override
void checkEntryTimeoutAction(String mode, ExpirationAction ea) {
}
/**
* Overriding this method so as to allow expiry action of local invalidate
* even if the scope is distributed mirrored.
*
* <p>author Asif
*/
@Override
public ExpirationAttributes setEntryTimeToLive(ExpirationAttributes timeToLive)
{
//checkReadiness();
if (timeToLive == null) {
throw new IllegalArgumentException(LocalizedStrings.HARegion_TIMETOLIVE_MUST_NOT_BE_NULL.toLocalizedString());
}
if ((timeToLive.getAction() == ExpirationAction.LOCAL_DESTROY && this.dataPolicy
.withReplication())) {
throw new IllegalArgumentException(LocalizedStrings.HARegion_TIMETOLIVE_ACTION_IS_INCOMPATIBLE_WITH_THIS_REGIONS_MIRROR_TYPE.toLocalizedString());
}
if (!this.statisticsEnabled) {
throw new IllegalStateException(LocalizedStrings.HARegion_CANNOT_SET_TIME_TO_LIVE_WHEN_STATISTICS_ARE_DISABLED.toLocalizedString());
}
ExpirationAttributes oldAttrs = getEntryTimeToLive();
this.entryTimeToLive = timeToLive.getTimeout();
this.entryTimeToLiveExpirationAction = timeToLive.getAction();
setEntryTimeToLiveAtts();
updateEntryExpiryPossible();
timeToLiveChanged(oldAttrs);
return oldAttrs;
}
/**
* Before invalidating , check if the entry being invalidated has a key as
* Long . If yes check if the key is still present in availableIDs . If yes
* remove & allow invalidation to proceed. But if the key (Long)is absent do
* not allow invalidation to proceed.
*
* <p>author Asif
*/
@Override
protected void basicInvalidate(final EntryEventImpl event,
boolean invokeCallbacks, final boolean forceNewEntry)
throws EntryNotFoundException
{
Object key = event.getKey();
if (key instanceof Long) {
boolean removedFromAvID = false;
Conflatable conflatable = null;
try {
conflatable = (Conflatable)this.get(key);
removedFromAvID = !this.owningQueue.isPrimary()
&& this.owningQueue.destroyFromAvailableIDs((Long)key);
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
getCancelCriterion().checkCancelInProgress(ie);
return;
}
if (!removedFromAvID) {
return;
}
// <HA overflow>
if(conflatable instanceof HAEventWrapper) {
this.owningQueue
.decAndRemoveFromHAContainer((HAEventWrapper)conflatable);
}
// </HA overflow>
//update the stats
this.owningQueue.stats.incEventsExpired();
}
this.entries.invalidate(event, invokeCallbacks, forceNewEntry,false);
return;
}
/**
* This method is over-ridden since we do not want GII of ThreadIdentifier
* objects to happen
*/
@Override
protected boolean checkEntryNotValid(RegionEntry mapEntry)
{
return (super.checkEntryNotValid(mapEntry) || mapEntry.getKey() instanceof ThreadIdentifier);
}
@Override
public final Object put(Object key, Object value, Object aCallbackArgument)
throws TimeoutException, CacheWriterException {
checkReadiness();
EntryEventImpl event = EntryEventImpl.create(this, Operation.UPDATE, key,
value, aCallbackArgument, false, getMyId());
try {
Object oldValue = null;
if (basicPut(event, false, // ifNew
false, // ifOld
null, // expectedOldValue
false // requireOldValue
)) {
oldValue = event.getOldValue();
}
return handleNotAvailable(oldValue);
} finally {
event.release();
}
}
/**
*
* Returns an instance of HARegion after it has properly initialized
*
* @param regionName
* name of the region to be created
* @param cache
* the cache that owns this region
* @param ra
* attributes of the region
* @return an instance of an HARegion
* @throws TimeoutException
* @throws RegionExistsException
* if a region of the same name exists in the same Cache
* @throws IOException
* @throws ClassNotFoundException
*/
public static HARegion getInstance(String regionName, GemFireCacheImpl cache,
HARegionQueue hrq, RegionAttributes ra) throws TimeoutException,
RegionExistsException, IOException, ClassNotFoundException
{
HARegion haRegion = new HARegion(regionName, ra, null, cache);
haRegion.setOwner(hrq);
Region region = cache.createVMRegion(regionName, ra,
new InternalRegionArguments().setInternalMetaRegion(haRegion)
.setDestroyLockFlag(true).setSnapshotInputStream(null)
.setImageTarget(null));
return (HARegion)region;
}
public boolean isPrimaryQueue() {
if (this.owningQueue != null) {
return this.owningQueue.isPrimary();
}
return false;
}
public HARegionQueue getOwner()
{
// fix for bug #41634 - don't release a reference to the owning queue until
// it is fully initialized. The previous implementation of this rule did
// not protect subclasses of HARegionQueue and caused the bug.
return this.owningQueue.isQueueInitialized()? this.owningQueue : null;
}
@Override
public CachePerfStats getCachePerfStats() {
return this.haRegionStats;
}
/**
* This method is used to set the HARegionQueue owning the HARegion. It is set
* after the HARegionQueue is properly constructed
*
* @param hrq
* The owning HARegionQueue instance
*/
public void setOwner(HARegionQueue hrq)
{
this.owningQueue = hrq;
}
@Override
final protected boolean shouldNotifyBridgeClients()
{
return false;
}
// re-implemented from LocalRegion to avoid recording the event in GemFireCache
// before it's applied to the cache's region
// public boolean hasSeenClientEvent(InternalCacheEvent event) {
// return false;
// }
protected void notifyGatewayHub(EnumListenerEvent operation,
EntryEventImpl event)
{
}
/**
* This method is overriden so as to make isOriginRemote true always so that
* the operation is never propagated to other nodes
*
* @see com.gemstone.gemfire.internal.cache.AbstractRegion#destroyRegion()
*/
@Override
public void destroyRegion(Object aCallbackArgument)
throws CacheWriterException, TimeoutException
{
//Do not generate EventID
RegionEventImpl event = new RegionEventImpl(this, Operation.REGION_DESTROY,
aCallbackArgument, true /* isOriginRemote */, getMyId());
basicDestroyRegion(event, true);
}
/**
* Never genearte EventID for any Entry or Region operation on the HARegion
*/
@Override
final public boolean generateEventID()
{
return false;
}
@Override
protected void initialize(InputStream snapshotInputStream,
InternalDistributedMember imageTarget,
InternalRegionArguments internalRegionArgs)
throws TimeoutException, IOException, ClassNotFoundException
{
// Set this region in the ProxyBucketRegion early so that profile exchange will
// perform the correct fillInProfile method
// try {
super.initialize(snapshotInputStream, imageTarget, internalRegionArgs);
// } finally {
// this.giiProviderStates = null;
// }
}
/**
* @return the deserialized value
* @see DistributedRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean, boolean)
*
*/
@Override
protected Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate,
TXStateInterface txState, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead,
boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones, boolean allowReadFromHDFS)
throws CacheLoaderException, TimeoutException {
Object value = null;
final Object key = keyInfo.getKey();
final Object aCallbackArgument = keyInfo.getCallbackArg();
// copy into local var to prevent race condition
RegionEntry re = null;
Assert.assertTrue(!hasServerProxy());
CacheLoader loader = basicGetLoader();
if (loader != null) {
final LoaderHelper loaderHelper = loaderHelperFactory.createLoaderHelper(
key, aCallbackArgument, false /* netSearchAllowed */,
true /* netloadallowed */, null/* searcher */);
try {
value = loader.load(loaderHelper);
}
finally {
}
if (value != null) {
try {
validateKey(key);
Operation op;
if (isCreate) {
op = Operation.LOCAL_LOAD_CREATE;
}
else {
op = Operation.LOCAL_LOAD_UPDATE;
}
EntryEventImpl event = EntryEventImpl.create(
this, op, key, value,
aCallbackArgument, false, getMyId(), generateCallbacks);
try {
re = basicPutEntry(event, 0L);
} finally {
event.release();
}
if (txState == null) {
}
}
catch (CacheWriterException cwe) {
// @todo smenon Log the exception
}
}
}
if (isCreate) {
recordMiss(re, key);
}
return value;
}
/**
* invoked when we start providing a GII image
*/
public void startServingGIIRequest() {
// some of our dunit tests create HARegions in odd ways that cause owningQueue
// to be null during GII
if (this.owningQueue == null) {
if (logger.isDebugEnabled()) {
logger.debug("found that owningQueue was null during GII of {} which could lead to event loss (see #41681)", this);
}
return;
}
this.owningQueue.startGiiQueueing();
}
/**
* invoked when we finish providing a GII image
*/
public void endServingGIIRequest() {
if (this.owningQueue != null) {
this.owningQueue.endGiiQueueing();
}
}
@Override
protected CacheDistributionAdvisor createDistributionAdvisor(InternalRegionArguments internalRegionArgs) {
return HARegionAdvisor.createHARegionAdvisor(this); // Warning: potential early escape of object before full construction
}
@Override
public void fillInProfile(Profile p) {
super.fillInProfile(p);
HARegionAdvisor.HAProfile h = (HARegionAdvisor.HAProfile)p;
// dunit tests create HARegions without encapsulating them in queues
if (this.owningQueue != null) {
h.isPrimary = this.owningQueue.isPrimary();
h.hasRegisteredInterest = this.owningQueue.getHasRegisteredInterest();
}
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.LocalRegion#getEventState()
*/
@Override
public Map<? extends DataSerializable, ? extends DataSerializable> getEventState() {
if (this.owningQueue != null) {
return this.owningQueue.getEventMapForGII();
}
return null;
}
/*
* Record cache event state for a potential initial image provider. This is
* used to install event state when the sender is selected as initial image
* provider.
* @param sender
* @param eventState
*/
@Override
public void recordEventState(InternalDistributedMember sender, Map eventState) {
if (eventState != null && this.owningQueue != null) {
this.owningQueue.recordEventState(sender, eventState);
}
}
/** send a distribution advisor profile update to other members */
public void sendProfileUpdate() {
new UpdateAttributesProcessor(this).distribute(false);
}
/**
* whether the primary queue for the client has registered interest, or
* there is no primary present
*/
public boolean noPrimaryOrHasRegisteredInterest() {
return ((HARegionAdvisor)this.distAdvisor).noPrimaryOrHasRegisteredInterest();
}
/** HARegions have their own advisors so that interest registration state can be tracked */
public static class HARegionAdvisor extends CacheDistributionAdvisor {
/**
* @param region
*/
private HARegionAdvisor(CacheDistributionAdvisee region) {
super(region);
}
public static HARegionAdvisor createHARegionAdvisor(CacheDistributionAdvisee region) {
HARegionAdvisor advisor = new HARegionAdvisor(region);
advisor.initialize();
return advisor;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor#adviseInitialImage(com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.InitialImageAdvice)
*/
@Override
public InitialImageAdvice adviseInitialImage(
InitialImageAdvice previousAdvice) {
InitialImageAdvice r = super.adviseInitialImage(previousAdvice);
r.setOthers(this.getAdvisee().getDistributionManager().getOtherDistributionManagerIds());
return r;
}
@Override
protected Profile instantiateProfile(
InternalDistributedMember memberId, int version) {
return new HAProfile(memberId, version);
}
public boolean noPrimaryOrHasRegisteredInterest() {
Profile[] locProfiles = this.profiles; // grab current profiles
for (int i=0; i<locProfiles.length; i++) {
HAProfile p = (HAProfile)locProfiles[i];
if (p.isPrimary) {
return p.hasRegisteredInterest;
}
}
// if no primary, we want to accept events
return true;
}
public static class HAProfile extends CacheProfile {
private static int HAS_REGISTERED_INTEREST_BIT = 0x01;
private static int IS_PRIMARY_BIT = 0x02;
boolean hasRegisteredInterest;
boolean isPrimary;
public HAProfile() {
// for deserialization only
}
public HAProfile(InternalDistributedMember memberId, int version) {
super(memberId, version);
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile#fromData(java.io.DataInput)
*/
@Override
public void fromData(DataInput in) throws IOException,
ClassNotFoundException {
super.fromData(in);
int flags = in.readByte();
hasRegisteredInterest = (flags & HAS_REGISTERED_INTEREST_BIT) != 0;
isPrimary = (flags & IS_PRIMARY_BIT) != 0;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile#toData(java.io.DataOutput)
*/
@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
int flags = 0;
if (hasRegisteredInterest) { flags |= HAS_REGISTERED_INTEREST_BIT; }
if (isPrimary) { flags |= IS_PRIMARY_BIT; }
out.writeByte(flags & 0xff);
}
@Override
public int getDSFID() {
return HA_PROFILE;
}
@Override
public StringBuilder getToStringHeader() {
return new StringBuilder("HAProfile");
}
@Override
public void fillInToString(StringBuilder sb) {
super.fillInToString(sb);
sb.append("; isPrimary=").append(this.isPrimary);
sb.append("; hasRegisteredInterest=").append(this.hasRegisteredInterest);
}
}
}
}