| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.geode.internal.cache; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.util.Map; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.DataSerializable; |
| import org.apache.geode.cache.CacheLoader; |
| import org.apache.geode.cache.CacheLoaderException; |
| import org.apache.geode.cache.CacheWriterException; |
| import org.apache.geode.cache.EntryNotFoundException; |
| import org.apache.geode.cache.ExpirationAction; |
| import org.apache.geode.cache.ExpirationAttributes; |
| import org.apache.geode.cache.Operation; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionAttributes; |
| import org.apache.geode.cache.RegionExistsException; |
| import org.apache.geode.cache.TimeoutException; |
| import org.apache.geode.distributed.internal.DistributionAdvisor.Profile; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.cache.event.EventTracker; |
| import org.apache.geode.internal.cache.event.NonDistributedEventTracker; |
| import org.apache.geode.internal.cache.ha.HARegionQueue; |
| import org.apache.geode.internal.cache.ha.ThreadIdentifier; |
| import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; |
| import org.apache.geode.internal.cache.tier.sockets.HAEventWrapper; |
| import org.apache.geode.internal.offheap.annotations.Released; |
| import org.apache.geode.internal.serialization.DeserializationContext; |
| import org.apache.geode.internal.serialization.SerializationContext; |
| import org.apache.geode.internal.statistics.StatisticsClock; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * This region is being implemented to suppress distribution of puts and to allow localDestroys on |
| * mirrored regions. |
| * |
| * @since GemFire 4.3 |
| * |
| */ |
| public class HARegion extends DistributedRegion { |
| private static final Logger logger = LogService.getLogger(); |
| |
| CachePerfStats haRegionStats; |
| |
| // Prevent this region from participating in a TX, bug 38709 |
| @Override |
| public boolean isSecret() { |
| return true; |
| } |
| |
| @Override |
| public boolean isCopyOnRead() { |
| return false; |
| } |
| |
| @Override |
| public boolean doesNotDistribute() { |
| return true; |
| } |
| |
| @Override |
| protected StringBuilder getStringBuilder() { |
| StringBuilder buf = new StringBuilder(); |
| buf.append("HARegion"); |
| buf.append("[path='").append(getFullPath()); |
| return buf; |
| } |
| |
| // protected Object conditionalCopy(Object o) { |
| // return o; |
| // } |
| |
| private volatile HARegionQueue owningQueue; |
| |
| HARegion(String regionName, RegionAttributes attrs, LocalRegion parentRegion, |
| InternalCache cache, StatisticsClock statisticsClock) { |
| super(regionName, attrs, parentRegion, cache, |
| new InternalRegionArguments().setDestroyLockFlag(true).setRecreateFlag(false) |
| .setSnapshotInputStream(null).setImageTarget(null), |
| statisticsClock); |
| this.haRegionStats = new DummyCachePerfStats(); |
| } |
| |
| @Override |
| public boolean allowsPersistence() { |
| return false; |
| } |
| |
| /** |
| * Updates never distributed from buckets. |
| * |
| * @since GemFire 5.7 |
| */ |
| @Override |
| protected void distributeUpdate(EntryEventImpl event, long lastModifiedTime, boolean ifNew, |
| boolean ifOld, Object expectedOldValue, boolean requireOldValue) {} |
| |
| @Override |
| public EventTracker createEventTracker() { |
| // event trackers aren't needed for HARegions |
| return NonDistributedEventTracker.getInstance(); |
| } |
| |
| /** |
| * void implementation over-riding the method to allow localDestroy on mirrored regions |
| * |
| */ |
| @Override |
| void checkIfReplicatedAndLocalDestroy(EntryEventImpl event) {} |
| |
| @Override |
| void checkEntryTimeoutAction(String mode, ExpirationAction ea) {} |
| |
| /** |
| * Overriding this method so as to allow expiry action of local invalidate even if the scope is |
| * distributed mirrored. |
| * |
| * <p> |
| * author Asif |
| */ |
| @Override |
| public ExpirationAttributes setEntryTimeToLive(ExpirationAttributes timeToLive) { |
| // checkReadiness(); |
| if (timeToLive == null) { |
| throw new IllegalArgumentException( |
| "timeToLive must not be null"); |
| } |
| if ((timeToLive.getAction() == ExpirationAction.LOCAL_DESTROY |
| && this.getDataPolicy().withReplication())) { |
| throw new IllegalArgumentException( |
| "timeToLive action is incompatible with this region's mirror type"); |
| } |
| if (!this.statisticsEnabled) { |
| throw new IllegalStateException( |
| "Cannot set time to live when statistics are disabled"); |
| } |
| ExpirationAttributes oldAttrs = getEntryTimeToLive(); |
| this.entryTimeToLive = timeToLive.getTimeout(); |
| this.entryTimeToLiveExpirationAction = timeToLive.getAction(); |
| setEntryTimeToLiveAttributes(); |
| updateEntryExpiryPossible(); |
| timeToLiveChanged(oldAttrs); |
| return oldAttrs; |
| } |
| |
| /** |
| * Before invalidating , check if the entry being invalidated has a key as Long . If yes check if |
| * the key is still present in availableIDs . If yes remove & allow invalidation to proceed. But |
| * if the key (Long)is absent do not allow invalidation to proceed. |
| * |
| * <p> |
| * author Asif |
| */ |
| @Override |
| protected void basicInvalidate(final EntryEventImpl event, boolean invokeCallbacks, |
| final boolean forceNewEntry) throws EntryNotFoundException { |
| Object key = event.getKey(); |
| if (key instanceof Long) { |
| boolean removedFromAvID = false; |
| Conflatable conflatable = null; |
| try { |
| conflatable = (Conflatable) this.get(key); |
| removedFromAvID = |
| !this.owningQueue.isPrimary() && this.owningQueue.destroyFromAvailableIDs((Long) key); |
| } catch (InterruptedException ie) { |
| Thread.currentThread().interrupt(); |
| getCancelCriterion().checkCancelInProgress(ie); |
| return; |
| } |
| if (!removedFromAvID) { |
| return; |
| } |
| |
| // <HA overflow> |
| if (conflatable instanceof HAEventWrapper) { |
| this.owningQueue.decAndRemoveFromHAContainer((HAEventWrapper) conflatable, "Invalidate"); |
| } |
| // </HA overflow> |
| // update the stats |
| this.owningQueue.stats.incEventsExpired(); |
| } |
| this.entries.invalidate(event, invokeCallbacks, forceNewEntry, false); |
| return; |
| |
| } |
| |
| /** |
| * This method is over-ridden since we do not want GII of ThreadIdentifier objects to happen |
| */ |
| @Override |
| protected boolean checkEntryNotValid(RegionEntry mapEntry) { |
| return (super.checkEntryNotValid(mapEntry) || mapEntry.getKey() instanceof ThreadIdentifier); |
| } |
| |
| @Override |
| public Object put(Object key, Object value, Object aCallbackArgument) |
| throws TimeoutException, CacheWriterException { |
| checkReadiness(); |
| |
| @Released |
| EntryEventImpl event = EntryEventImpl.create(this, Operation.UPDATE, key, value, |
| aCallbackArgument, false, getMyId()); |
| try { |
| |
| Object oldValue = null; |
| |
| if (basicPut(event, false, // ifNew |
| false, // ifOld |
| null, // expectedOldValue |
| false // requireOldValue |
| )) { |
| oldValue = event.getOldValue(); |
| } |
| return handleNotAvailable(oldValue); |
| } finally { |
| event.release(); |
| } |
| } |
| |
| /** |
| * |
| * Returns an instance of HARegion after it has properly initialized |
| * |
| * @param regionName name of the region to be created |
| * @param cache the cache that owns this region |
| * @param ra attributes of the region |
| * @return an instance of an HARegion |
| * @throws RegionExistsException if a region of the same name exists in the same Cache |
| */ |
| public static HARegion getInstance(String regionName, InternalCache cache, HARegionQueue hrq, |
| RegionAttributes ra, StatisticsClock statisticsClock) |
| throws TimeoutException, RegionExistsException, IOException, ClassNotFoundException { |
| |
| HARegion haRegion = new HARegion(regionName, ra, null, cache, statisticsClock); |
| haRegion.setOwner(hrq); |
| Region region = cache.createVMRegion(regionName, ra, |
| new InternalRegionArguments().setInternalMetaRegion(haRegion).setDestroyLockFlag(true) |
| .setSnapshotInputStream(null).setInternalRegion(true).setImageTarget(null)); |
| |
| return (HARegion) region; |
| } |
| |
| public boolean isPrimaryQueue() { |
| if (this.owningQueue != null) { |
| return this.owningQueue.isPrimary(); |
| } |
| return false; |
| } |
| |
| public HARegionQueue getOwner() { |
| // fix for bug #41634 - don't release a reference to the owning queue until |
| // it is fully initialized. The previous implementation of this rule did |
| // not protect subclasses of HARegionQueue and caused the bug. |
| return this.owningQueue.isQueueInitialized() ? this.owningQueue : null; |
| } |
| |
| public HARegionQueue getOwnerWithWait(long timeout) { |
| if (owningQueue.isQueueInitializedWithWait(timeout)) { |
| return owningQueue; |
| } else { |
| if (logger.isDebugEnabled()) { |
| logger.debug("After waiting for {} seconds, queue is still not initialized", |
| TimeUnit.MILLISECONDS.toSeconds(timeout)); |
| } |
| return null; |
| } |
| } |
| |
| @Override |
| public CachePerfStats getCachePerfStats() { |
| return this.haRegionStats; |
| } |
| |
| /** |
| * This method is used to set the HARegionQueue owning the HARegion. It is set after the |
| * HARegionQueue is properly constructed |
| * |
| * @param hrq The owning HARegionQueue instance |
| */ |
| public void setOwner(HARegionQueue hrq) { |
| this.owningQueue = hrq; |
| } |
| |
| @Override |
| public boolean shouldNotifyBridgeClients() { |
| return false; |
| } |
| |
| // re-implemented from LocalRegion to avoid recording the event in GemFireCache |
| // before it's applied to the cache's region |
| // public boolean hasSeenClientEvent(InternalCacheEvent event) { |
| // return false; |
| // } |
| |
| protected void notifyGatewayHub(EnumListenerEvent operation, EntryEventImpl event) {} |
| |
| /** |
| * This method is overriden so as to make isOriginRemote true always so that the operation is |
| * never propagated to other nodes |
| * |
| * @see org.apache.geode.internal.cache.AbstractRegion#destroyRegion() |
| */ |
| @Override |
| public void destroyRegion(Object aCallbackArgument) |
| throws CacheWriterException, TimeoutException { |
| // Do not generate EventID |
| RegionEventImpl event = new RegionEventImpl(this, Operation.REGION_DESTROY, aCallbackArgument, |
| true /* isOriginRemote */, getMyId()); |
| |
| basicDestroyRegion(event, true); |
| } |
| |
| /** |
| * Never genearte EventID for any Entry or Region operation on the HARegion |
| */ |
| @Override |
| public boolean generateEventID() { |
| return false; |
| } |
| |
| @Override |
| public void initialize(InputStream snapshotInputStream, InternalDistributedMember imageTarget, |
| InternalRegionArguments internalRegionArgs) |
| throws TimeoutException, IOException, ClassNotFoundException { |
| // Set this region in the ProxyBucketRegion early so that profile exchange will |
| // perform the correct fillInProfile method |
| // try { |
| super.initialize(snapshotInputStream, imageTarget, internalRegionArgs); |
| // } finally { |
| // this.giiProviderStates = null; |
| // } |
| } |
| |
| /** |
| * @return the deserialized value |
| * @see LocalRegion#findObjectInSystem(KeyInfo, boolean, TXStateInterface, boolean, Object, |
| * boolean, boolean, ClientProxyMembershipID, EntryEventImpl, boolean) |
| * |
| */ |
| @Override |
| Object findObjectInSystem(KeyInfo keyInfo, boolean isCreate, TXStateInterface txState, |
| boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD, |
| ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, |
| boolean returnTombstones) throws CacheLoaderException, TimeoutException { |
| |
| Object value = null; |
| final Object key = keyInfo.getKey(); |
| final Object aCallbackArgument = keyInfo.getCallbackArg(); |
| // copy into local var to prevent race condition |
| RegionEntry re = null; |
| Assert.assertTrue(!hasServerProxy()); |
| CacheLoader loader = basicGetLoader(); |
| if (loader != null) { |
| value = callCacheLoader(loader, key, aCallbackArgument, preferCD); |
| |
| if (value != null) { |
| try { |
| validateKey(key); |
| Operation op; |
| if (isCreate) { |
| op = Operation.LOCAL_LOAD_CREATE; |
| } else { |
| op = Operation.LOCAL_LOAD_UPDATE; |
| } |
| |
| @Released |
| EntryEventImpl event = EntryEventImpl.create(this, op, key, value, aCallbackArgument, |
| false, getMyId(), generateCallbacks); |
| try { |
| re = basicPutEntry(event, 0L); |
| } finally { |
| event.release(); |
| } |
| if (txState == null) { |
| } |
| } catch (CacheWriterException cwe) { |
| // @todo smenon Log the exception |
| } |
| } |
| } |
| if (isCreate) { |
| recordMiss(re, key); |
| } |
| return value; |
| } |
| |
| /** |
| * invoked when we start providing a GII image |
| */ |
| public void startServingGIIRequest() { |
| // some of our dunit tests create HARegions in odd ways that cause owningQueue |
| // to be null during GII |
| if (this.owningQueue == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "found that owningQueue was null during GII of {} which could lead to event loss (see #41681)", |
| this); |
| } |
| return; |
| } |
| this.owningQueue.startGiiQueueing(); |
| } |
| |
| /** |
| * invoked when we finish providing a GII image |
| */ |
| public void endServingGIIRequest() { |
| if (this.owningQueue != null) { |
| this.owningQueue.endGiiQueueing(); |
| } |
| } |
| |
| @Override |
| protected CacheDistributionAdvisor createDistributionAdvisor( |
| InternalRegionArguments internalRegionArgs) { |
| return HARegionAdvisor.createHARegionAdvisor(this); // Warning: potential early escape of object |
| // before full construction |
| } |
| |
| @Override |
| public void fillInProfile(Profile profile) { |
| super.fillInProfile(profile); |
| HARegionAdvisor.HAProfile h = (HARegionAdvisor.HAProfile) profile; |
| // dunit tests create HARegions without encapsulating them in queues |
| if (this.owningQueue != null) { |
| h.isPrimary = this.owningQueue.isPrimary(); |
| h.hasRegisteredInterest = this.owningQueue.getHasRegisteredInterest(); |
| } |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.geode.internal.cache.LocalRegion#getEventState() |
| */ |
| @Override |
| public Map<? extends DataSerializable, ? extends DataSerializable> getEventState() { |
| if (this.owningQueue != null) { |
| return this.owningQueue.getEventMapForGII(); |
| } |
| return null; |
| } |
| |
| /* |
| * Record cache event state for a potential initial image provider. This is used to install event |
| * state when the sender is selected as initial image provider. |
| * |
| * |
| */ |
| @Override |
| void recordEventState(InternalDistributedMember sender, Map eventState) { |
| if (eventState != null && this.owningQueue != null) { |
| this.owningQueue.recordEventState(sender, eventState); |
| } |
| } |
| |
| |
| /** send a distribution advisor profile update to other members */ |
| public void sendProfileUpdate() { |
| new UpdateAttributesProcessor(this).distribute(false); |
| } |
| |
| |
| /** |
| * whether the primary queue for the client has registered interest, or there is no primary |
| * present |
| */ |
| public boolean noPrimaryOrHasRegisteredInterest() { |
| return ((HARegionAdvisor) this.distAdvisor).noPrimaryOrHasRegisteredInterest(); |
| } |
| |
| public Object updateHAEventWrapper(InternalDistributedMember sender, |
| CachedDeserializable newValueCd) { |
| return this.owningQueue.updateHAEventWrapper(sender, newValueCd, getName()); |
| } |
| |
| /** HARegions have their own advisors so that interest registration state can be tracked */ |
| public static class HARegionAdvisor extends CacheDistributionAdvisor { |
| private HARegionAdvisor(CacheDistributionAdvisee region) { |
| super(region); |
| } |
| |
| public static HARegionAdvisor createHARegionAdvisor(CacheDistributionAdvisee region) { |
| HARegionAdvisor advisor = new HARegionAdvisor(region); |
| advisor.initialize(); |
| return advisor; |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see |
| * org.apache.geode.internal.cache.CacheDistributionAdvisor#adviseInitialImage(org.apache.geode. |
| * internal.cache.CacheDistributionAdvisor.InitialImageAdvice) |
| */ |
| @Override |
| public InitialImageAdvice adviseInitialImage(InitialImageAdvice previousAdvice) { |
| InitialImageAdvice r = super.adviseInitialImage(previousAdvice); |
| r.setOthers(this.getAdvisee().getDistributionManager().getOtherDistributionManagerIds()); |
| return r; |
| } |
| |
| @Override |
| protected Profile instantiateProfile(InternalDistributedMember memberId, int version) { |
| return new HAProfile(memberId, version); |
| } |
| |
| public boolean noPrimaryOrHasRegisteredInterest() { |
| Profile[] locProfiles = this.profiles; // grab current profiles |
| for (int i = 0; i < locProfiles.length; i++) { |
| HAProfile p = (HAProfile) locProfiles[i]; |
| if (p.isPrimary) { |
| return p.hasRegisteredInterest; |
| } |
| } |
| // if no primary, we want to accept events |
| return true; |
| } |
| |
| public static class HAProfile extends CacheProfile { |
| private static final int HAS_REGISTERED_INTEREST_BIT = 0x01; |
| private static final int IS_PRIMARY_BIT = 0x02; |
| |
| boolean hasRegisteredInterest; |
| |
| boolean isPrimary; |
| |
| public HAProfile() { |
| // for deserialization only |
| } |
| |
| public HAProfile(InternalDistributedMember memberId, int version) { |
| super(memberId, version); |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see |
| * org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile#fromData(java.io. |
| * DataInput) |
| */ |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| int flags = in.readByte(); |
| hasRegisteredInterest = (flags & HAS_REGISTERED_INTEREST_BIT) != 0; |
| isPrimary = (flags & IS_PRIMARY_BIT) != 0; |
| } |
| |
| /* |
| * (non-Javadoc) |
| * |
| * @see org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile#toData(java.io. |
| * DataOutput) |
| */ |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| super.toData(out, context); |
| int flags = 0; |
| if (hasRegisteredInterest) { |
| flags |= HAS_REGISTERED_INTEREST_BIT; |
| } |
| if (isPrimary) { |
| flags |= IS_PRIMARY_BIT; |
| } |
| out.writeByte(flags & 0xff); |
| } |
| |
| @Override |
| public int getDSFID() { |
| return HA_PROFILE; |
| } |
| |
| @Override |
| public StringBuilder getToStringHeader() { |
| return new StringBuilder("HAProfile"); |
| } |
| |
| @Override |
| public void fillInToString(StringBuilder sb) { |
| super.fillInToString(sb); |
| sb.append("; isPrimary=").append(this.isPrimary); |
| sb.append("; hasRegisteredInterest=").append(this.hasRegisteredInterest); |
| } |
| } |
| } |
| } |