| /*========================================================================= |
| * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| |
| package com.gemstone.gemfire.internal.cache; |
| |
| /* enumerate each imported class because conflict with dl.u.c.TimeoutException */ |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.io.NotSerializableException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.locks.Lock; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import com.gemstone.gemfire.CancelCriterion; |
| import com.gemstone.gemfire.CancelException; |
| import com.gemstone.gemfire.DataSerializer; |
| import com.gemstone.gemfire.GemFireException; |
| import com.gemstone.gemfire.InternalGemFireException; |
| import com.gemstone.gemfire.SystemFailure; |
| import com.gemstone.gemfire.cache.CacheEvent; |
| import com.gemstone.gemfire.cache.CacheFactory; |
| import com.gemstone.gemfire.cache.CacheLoader; |
| import com.gemstone.gemfire.cache.CacheLoaderException; |
| import com.gemstone.gemfire.cache.CacheWriter; |
| import com.gemstone.gemfire.cache.CacheWriterException; |
| import com.gemstone.gemfire.cache.DataPolicy; |
| import com.gemstone.gemfire.cache.EntryEvent; |
| import com.gemstone.gemfire.cache.LoaderHelper; |
| import com.gemstone.gemfire.cache.Operation; |
| import com.gemstone.gemfire.cache.RegionAttributes; |
| import com.gemstone.gemfire.cache.RegionDestroyedException; |
| import com.gemstone.gemfire.cache.RegionEvent; |
| import com.gemstone.gemfire.cache.Scope; |
| import com.gemstone.gemfire.cache.TimeoutException; |
| import com.gemstone.gemfire.cache.util.ObjectSizer; |
| import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; |
| import com.gemstone.gemfire.distributed.internal.DM; |
| import com.gemstone.gemfire.distributed.internal.DistributionManager; |
| import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage; |
| import com.gemstone.gemfire.distributed.internal.MembershipListener; |
| import com.gemstone.gemfire.distributed.internal.PooledDistributionMessage; |
| import com.gemstone.gemfire.distributed.internal.ProcessorKeeper21; |
| import com.gemstone.gemfire.distributed.internal.ReplyProcessor21; |
| import com.gemstone.gemfire.distributed.internal.SerialDistributionMessage; |
| import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; |
| import com.gemstone.gemfire.internal.Assert; |
| import com.gemstone.gemfire.internal.InternalDataSerializer; |
| import com.gemstone.gemfire.internal.cache.versions.DiskVersionTag; |
| import com.gemstone.gemfire.internal.cache.versions.VersionStamp; |
| import com.gemstone.gemfire.internal.cache.versions.VersionTag; |
| import com.gemstone.gemfire.internal.i18n.LocalizedStrings; |
| import com.gemstone.gemfire.internal.logging.LogService; |
| import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; |
| import com.gemstone.gemfire.internal.offheap.StoredObject; |
| |
| |
| /** |
| * Implementation for distributed search, load and write operations in the |
| * GemFire system. Provides an API for doing netSearch, netLoad, netSearchAndLoad and |
| * netWrite. The class uses the DistributionAdvisor to route requests to the |
| * appropriate members. It also uses the DistributionAdvisor to get region scope |
| * and applies rules based on the scope. It makes uses of intelligent acceptors |
| * that allow netSearch to happen as a one phase operation at all times.netLoad happens |
| * as a one phase operation in all cases except where the scope is GLOBAL |
| * At the receiving end, the request is converted into an appropriate message |
| * whose process method responds to the request. |
| * |
| * @author Sudhir Menon |
| */ |
| |
| public class SearchLoadAndWriteProcessor implements MembershipListener { |
| private static final Logger logger = LogService.getLogger(); |
| |
| public static final int SMALL_BLOB_SIZE = |
| Integer.getInteger("DistributionManager.OptimizedUpdateByteLimit", 2000).intValue(); |
| |
| static final long RETRY_TIME = Long.getLong("gemfire.search-retry-interval", 2000).longValue(); |
| |
| |
| private InternalDistributedMember selectedNode; |
| private boolean selectedNodeDead = false; |
| private int timeout; |
| private boolean netSearchDone = false; |
| private boolean netLoadDone = false; |
| private long lastModified =0; |
| protected int processorId; |
| private Object aCallbackArgument; |
| private String regionName; |
| private Object key; |
| protected LocalRegion region; |
| private Object result = null; |
| private boolean isSerialized = false; // is result serialized? |
| private CacheDistributionAdvisor advisor = null; |
| protected Exception remoteException = null; |
| public DM distributionManager = null; |
| private volatile boolean requestInProgress = false; |
| private boolean remoteGetInProgress = false; |
| private volatile boolean authorative = false; |
| /** remoteLoadInProgress is volatile to make sure response threads see the current value */ |
| private volatile boolean remoteLoadInProgress = false; |
| private static final ProcessorKeeper21 processorKeeper = new ProcessorKeeper21(false); |
| //private static Set availableAcceptHelperSet = new HashSet(); |
| /** The members that haven't replied yet */ |
| // changed pendingResponders to synchronized Set to fix bug 38972 |
| private final Set pendingResponders = Collections.synchronizedSet(new HashSet()); |
| private List responseQueue = null; |
| private boolean netWriteSucceeded = false; |
| private int remainingTimeout = 0; |
| private long startTimeSnapShot = 0; |
| private long endTimeSnapShot = 0; |
| |
| private boolean netSearch = false; |
| private boolean netLoad = false; |
| private boolean attemptedLocalLoad = false; // added for bug 39738 |
| private boolean localLoad = false; |
| private boolean localWrite = false; |
| private boolean netWrite = false; |
| |
| private final Object membersLock = new Object(); |
| |
| private Lock lock = null; // if non-null, then needs to be unlocked in release |
| |
| static final int NETSEARCH = 0; |
| static final int NETLOAD = 1; |
| static final int NETWRITE = 2; |
| |
| static final int BEFORECREATE = 0; |
| static final int BEFOREDESTROY = 1; |
| static final int BEFOREUPDATE = 2; |
| static final int BEFOREREGIONDESTROY = 3; |
| static final int BEFOREREGIONCLEAR = 4; |
| |
| |
| /************** Public Methods ************************/ |
| |
| Object doNetSearch() |
| throws TimeoutException { |
| |
| resetResults(); |
| RegionAttributes attrs = region.getAttributes(); |
| this.requestInProgress=true; |
| Scope scope = attrs.getScope(); |
| Assert.assertTrue(scope != Scope.LOCAL); |
| netSearchForBlob(); |
| this.requestInProgress=false; |
| return this.result; |
| } |
| |
| |
| |
| void doSearchAndLoad(EntryEventImpl event, TXStateInterface txState, Object localValue) |
| throws CacheLoaderException, TimeoutException { |
| this.requestInProgress = true; |
| RegionAttributes attrs = region.getAttributes(); |
| Scope scope = attrs.getScope(); |
| CacheLoader loader = ((AbstractRegion)region).basicGetLoader(); |
| if (scope.isLocal()) { |
| Object obj = doLocalLoad(loader, false); |
| event.setNewValue(obj); |
| } |
| else { |
| searchAndLoad(event, txState, localValue); |
| } |
| this.requestInProgress = false; |
| if (this.netSearch) { |
| if (event.getOperation().isCreate()) { |
| event.setOperation(Operation.SEARCH_CREATE); |
| } else { |
| event.setOperation(Operation.SEARCH_UPDATE); |
| } |
| } else if (this.netLoad) { |
| if (event.getOperation().isCreate()) { |
| event.setOperation(Operation.NET_LOAD_CREATE); |
| } else { |
| event.setOperation(Operation.NET_LOAD_UPDATE); |
| } |
| } else if (this.localLoad) { |
| if (event.getOperation().isCreate()) { |
| event.setOperation(Operation.LOCAL_LOAD_CREATE); |
| } else { |
| event.setOperation(Operation.LOCAL_LOAD_UPDATE); |
| } |
| } |
| } |
| |
| /** return true if a CacheWriter was actually invoked */ |
| boolean doNetWrite(CacheEvent event, Set netWriteRecipients, |
| CacheWriter localWriter, int paction) |
| throws CacheWriterException, TimeoutException { |
| |
| int action = paction; |
| this.requestInProgress = true; |
| Scope scope = this.region.scope; |
| if (localWriter != null) { |
| doLocalWrite(localWriter, event, action); |
| this.requestInProgress = false; |
| return true; |
| } |
| if (scope == Scope.LOCAL && (region.getPartitionAttributes() == null)) { |
| return false; |
| } |
| CacheEvent listenerEvent = getEventForListener(event); |
| if (action == BEFOREUPDATE && listenerEvent.getOperation().isCreate()) { |
| action = BEFORECREATE; |
| } |
| boolean cacheWrote = netWrite(getEventForListener(event), action, netWriteRecipients); |
| this.requestInProgress = false; |
| return cacheWrote; |
| |
| } |
| |
| public void memberJoined(InternalDistributedMember id) { |
| // Ignore - if they just joined, they don't have what we want |
| } |
| |
| public void memberSuspect(InternalDistributedMember id, |
| InternalDistributedMember whoSuspected) { |
| } |
| |
| public void quorumLost(Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) { |
| } |
| |
| public void memberDeparted(final InternalDistributedMember id, final boolean crashed) { |
| |
| synchronized (membersLock) { |
| pendingResponders.remove(id); |
| } |
| synchronized (this) { |
| if (id.equals(selectedNode) && (this.requestInProgress) && (this.remoteGetInProgress)) { |
| selectedNode = null; |
| selectedNodeDead = true; |
| computeRemainingTimeout(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: processing loss of member {}", this, id); |
| } |
| this.lastNotifySpot = 3; |
| notifyAll(); // signal the waiter; we are not done; but we need the waiter to call sendNetSearchRequest |
| } |
| if(responseQueue != null) responseQueue.remove(id); |
| checkIfDone(); |
| } |
| } |
| |
| int getProcessorId() { |
| return this.processorId; |
| } |
| |
| synchronized void checkIfDone() { |
| if (!this.remoteGetInProgress |
| && this.pendingResponders.isEmpty()) { |
| // Synchronize in case a different response/reply is still |
| // in progress, and it's the one thats got the goods (bug 28741) |
| signalDone(); |
| } |
| |
| |
| } |
| |
| synchronized void signalDone() { |
| this.requestInProgress = false; |
| this.lastNotifySpot = 1; |
| notifyAll(); |
| } |
| synchronized void signalTimedOut() { |
| this.lastNotifySpot = 2; |
| notifyAll(); |
| } |
| private volatile int lastNotifySpot = 0; |
| |
| private VersionTag versionTag; |
| |
| synchronized void nackResponseComplete() { |
| /*if (this.requestInProgress && currentHelper != null && |
| optimizer != null && optimizer.allRepliesReceived(currentHelper.nackServerChannel)) { |
| this.requestInProgress = false; |
| signalDone(); |
| }*/ |
| |
| } |
| |
| static ProcessorKeeper21 getProcessorKeeper() { |
| return processorKeeper; |
| } |
| |
| boolean isNetSearch() { |
| return netSearch; |
| } |
| |
| boolean isNetLoad() { |
| return netLoad; |
| } |
| |
| boolean isLocalLoad() { |
| return localLoad; |
| } |
| |
| boolean isNetWrite() { |
| return netWrite; |
| } |
| |
| boolean isLocalWrite() { |
| return localWrite; |
| } |
| |
| long getLastModified() { |
| return lastModified; |
| } |
| |
| boolean resultIsSerialized() { |
| return this.isSerialized; |
| } |
| |
| static SearchLoadAndWriteProcessor getProcessor() { |
| SearchLoadAndWriteProcessor processor = new SearchLoadAndWriteProcessor(); |
| processor.processorId = getProcessorKeeper().put(processor); |
| return processor; |
| } |
| |
| void release() { |
| try { |
| if (this.lock != null) { |
| try { |
| this.lock.unlock(); |
| } |
| catch (CancelException ignore) { |
| } |
| this.lock = null; |
| } |
| } |
| finally { |
| try { |
| if (this.advisor != null) { |
| this.advisor.removeMembershipListener(this); |
| } |
| } |
| catch (IllegalArgumentException e) { |
| } |
| finally { |
| getProcessorKeeper().remove(this.processorId); |
| } |
| } |
| } |
| |
| void remove() { |
| getProcessorKeeper().remove( this.processorId); |
| |
| } |
| |
| |
| |
| void initialize(LocalRegion theRegion, Object theKey, Object theCallbackArg) { |
| |
| this.region = theRegion; |
| this.regionName = theRegion.getFullPath(); |
| this.key = theKey; |
| this.aCallbackArgument = theCallbackArg; |
| RegionAttributes attrs = theRegion.getAttributes(); |
| Scope scope = attrs.getScope(); |
| if (scope.isDistributed()) { |
| this.advisor = ((CacheDistributionAdvisee)this.region).getCacheDistributionAdvisor(); |
| this.distributionManager= ((CacheDistributionAdvisee)theRegion).getDistributionManager(); |
| this.timeout = getSearchTimeout(); |
| this.advisor.addMembershipListener(this); |
| |
| } |
| } |
| |
| void setKey(Object key) { |
| this.key = key; |
| } |
| |
| /************** Protected Methods ********************/ |
| protected void setSelectedNode(InternalDistributedMember selectedNode) { |
| this.selectedNode = selectedNode; |
| this.selectedNodeDead = false; |
| } |
| |
| protected int getTimeout() { |
| return this.timeout; |
| } |
| |
| protected Object getKey() { |
| return this.key; |
| } |
| |
| /************** Package Methods **********************/ |
| |
| /************** Private Methods **********************/ |
| /** |
| * Even though SearchLoadAndWriteProcessor may be in invoked in the context |
| * of a local region, most of the services it provides are relevant to |
| * distribution only. The 3 services it provides are netSearch, netLoad, |
| * netWrite |
| * |
| */ |
| private SearchLoadAndWriteProcessor() { |
| resetResults(); |
| this.pendingResponders.clear(); |
| this.attemptedLocalLoad = false; |
| this.netSearchDone = false; |
| this.isSerialized = false; |
| this.result = null; |
| this.key = null; |
| this.requestInProgress = false; |
| this.netWriteSucceeded = false; |
| this.remoteGetInProgress = false; |
| this.responseQueue = null; |
| } |
| |
| |
| /** |
| * If we have a local cache loader and the region is not global, then invoke the loader |
| * If the region is local, or the result is non-null, then return whatever the loader returned |
| * do a netSearch amongst selected peers |
| * if netSearch returns a blob, deserialize the blob and return that as the result |
| * netSearch failed, so all we can do at this point is do a load |
| * return result from load |
| */ |
| private void searchAndLoad(EntryEventImpl event, TXStateInterface txState, Object localValue) |
| throws CacheLoaderException, TimeoutException { |
| |
| RegionAttributes attrs = region.getAttributes(); |
| Scope scope = attrs.getScope(); |
| DataPolicy dataPolicy = attrs.getDataPolicy(); |
| |
| if (txState != null) { |
| TXEntryState tx = txState.txReadEntry(event.getKeyInfo(), region, false,true/*create txEntry is absent*/); |
| if (tx != null) { |
| if (tx.noValueInSystem()) { |
| // If the tx view has it invalid or destroyed everywhere |
| // then don't do a netsearch. We want to see the |
| // transactional view. |
| load(event); |
| return; |
| } |
| } |
| } |
| |
| // if mirrored then we can optimize by skipping netsearch in some cases, |
| // and if also skip netSearch if we find an INVALID token since we |
| // know it was distributed. (Otherwise it would be a LOCAL_INVALID token) |
| { |
| if(localValue == Token.INVALID || dataPolicy.withReplication()) { |
| load(event); |
| return; |
| } |
| } |
| |
| Object obj = null; |
| if (!scope.isGlobal()) { |
| // copy into local var to prevent race condition |
| CacheLoader loader = ((AbstractRegion)region).basicGetLoader(); |
| if (loader != null) { |
| obj = doLocalLoad(loader, true); |
| Assert.assertTrue(obj != Token.INVALID && |
| obj != Token.LOCAL_INVALID); |
| event.setNewValue(obj); |
| this.isSerialized = false; |
| this.result = obj; |
| return; |
| } |
| if (scope.isLocal()) { |
| return; |
| } |
| } |
| netSearchForBlob(); |
| if (this.result != null) { |
| Assert.assertTrue(this.result != Token.INVALID && |
| this.result != Token.LOCAL_INVALID); |
| if (this.isSerialized) { |
| event.setSerializedNewValue((byte[])this.result); |
| } else { |
| event.setNewValue(this.result); |
| } |
| event.setVersionTag(this.versionTag); |
| return; |
| } |
| |
| load(event); |
| } |
| |
| /** perform a net-search, setting this.result to the object found in the search */ |
| private void netSearchForBlob() throws TimeoutException { |
| if (this.netSearchDone) return; |
| this.netSearchDone = true; |
| CachePerfStats stats = region.getCachePerfStats(); |
| long start = 0; |
| Set sendSet = null; |
| |
| this.result = null; |
| RegionAttributes attrs = region.getAttributes(); |
| // Object aCallbackArgument = null; |
| this.requestInProgress = true; |
| this.selectedNodeDead = false; |
| initRemainingTimeout(); |
| start = stats.startNetsearch(); |
| try { |
| List<InternalDistributedMember> replicates = new ArrayList(advisor.adviseInitializedReplicates()); |
| if (replicates.size() > 1) { |
| Collections.shuffle(replicates); |
| } |
| for(InternalDistributedMember replicate : replicates) { |
| synchronized (this.pendingResponders) { |
| this.pendingResponders.clear(); |
| } |
| this.requestInProgress = true; |
| this.remoteGetInProgress = true; |
| synchronized(this) { |
| setSelectedNode(replicate); |
| this.lastNotifySpot = 0; |
| } |
| sendValueRequest(replicate); |
| waitForObject2(this.remainingTimeout); |
| if(this.authorative) { |
| if(this.result != null) { |
| this.netSearch = true; |
| } |
| return; |
| } else { |
| //clear anything that might have been set by our query. |
| this.selectedNode = null; |
| this.selectedNodeDead = false; |
| this.lastNotifySpot = 0; |
| this.result = null; |
| } |
| } |
| synchronized (membersLock) { |
| Set recipients = this.advisor.adviseNetSearch(); |
| if (recipients.isEmpty()) { |
| return; |
| } |
| ArrayList list = new ArrayList(recipients); |
| Collections.shuffle(list); |
| sendSet = new HashSet(list); |
| synchronized (this.pendingResponders) { |
| this.pendingResponders.clear(); |
| this.pendingResponders.addAll(list); |
| } |
| } |
| |
| boolean useMulticast = region.getMulticastEnabled() |
| && (region instanceof DistributedRegion) |
| && ((DistributedRegion)region).getSystem().getConfig().getMcastPort() != 0; |
| |
| // moved outside the sync to fix bug 39458 |
| QueryMessage.sendMessage(this, this.regionName,this.key,useMulticast, sendSet, this.remainingTimeout, |
| attrs.getEntryTimeToLive().getTimeout(), |
| attrs.getEntryIdleTimeout().getTimeout()); |
| |
| synchronized (this) { |
| // moved this send back into sync to fix bug 37132 |
| // QueryMessage.sendMessage(this, this.regionName,this.key,useMulticast, sendSet,this.remainingTimeout , |
| // attrs.getEntryTimeToLive().getTimeout(), |
| // attrs.getEntryIdleTimeout().getTimeout()); |
| boolean done = false; |
| do { |
| waitForObject2(this.remainingTimeout); |
| if (this.selectedNodeDead && remoteGetInProgress) { |
| sendNetSearchRequest(); |
| } |
| else |
| done = true; |
| } while (!done); |
| |
| if (this.result != null) { |
| this.netSearch = true; |
| } |
| return; |
| } |
| } finally { |
| stats.endNetsearch(start); |
| } |
| } |
| |
| private void load(EntryEventImpl event) |
| throws CacheLoaderException, TimeoutException { |
| Object obj = null; |
| RegionAttributes attrs = this.region.getAttributes(); |
| Scope scope = attrs.getScope(); |
| CacheLoader loader = ((AbstractRegion)region).basicGetLoader(); |
| Assert.assertTrue(scope.isDistributed()); |
| |
| if ((loader != null) && (!scope.isGlobal())) { |
| obj = doLocalLoad(loader, false); |
| event.setNewValue(obj); |
| Assert.assertTrue(obj != Token.INVALID && obj != Token.LOCAL_INVALID); |
| return; |
| } |
| |
| if (scope.isGlobal()) { |
| Assert.assertTrue(this.lock == null); |
| Set loadCandidatesSet = advisor.adviseNetLoad(); |
| if ((loader == null) && (loadCandidatesSet.isEmpty())) { |
| //no one has a data Loader. No point getting a lock |
| return; |
| } |
| |
| this.lock = region.getDistributedLock(this.key); |
| boolean locked = false; |
| try { |
| final CancelCriterion stopper = region.getCancelCriterion(); |
| for (;;) { |
| stopper.checkCancelInProgress(null); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| locked = this.lock.tryLock(region.getCache().getLockTimeout(), |
| TimeUnit.SECONDS); |
| if (!locked) { |
| throw new TimeoutException(LocalizedStrings.SearchLoadAndWriteProcessor_TIMED_OUT_LOCKING_0_BEFORE_LOAD.toLocalizedString(key)); |
| } |
| break; |
| } |
| catch (InterruptedException e) { |
| interrupted = true; |
| region.getCancelCriterion().checkCancelInProgress(null); |
| // continue; |
| } |
| finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| if (loader == null) { |
| this.localLoad = false; |
| if (scope.isDistributed()) { |
| this.isSerialized = false; |
| obj = doNetLoad(); |
| Assert.assertTrue(obj != Token.INVALID && obj != Token.LOCAL_INVALID); |
| if (this.isSerialized && obj != null) { |
| event.setSerializedNewValue((byte[])obj); |
| } else { |
| event.setNewValue(obj); |
| } |
| } |
| } |
| else { |
| obj = doLocalLoad(loader, false); |
| Assert.assertTrue(obj != Token.INVALID && obj != Token.LOCAL_INVALID); |
| event.setNewValue(obj); |
| } |
| return; |
| } |
| finally { |
| // The lock will not actually be released until release is |
| // called on this processor |
| if (!locked) this.lock = null; |
| } |
| } |
| if (scope.isDistributed()) { |
| // long start = System.currentTimeMillis(); |
| obj = doNetLoad(); |
| if (this.isSerialized && obj != null) { |
| event.setSerializedNewValue((byte[])obj); |
| } |
| else { |
| Assert.assertTrue(obj != Token.INVALID && obj != Token.LOCAL_INVALID); |
| event.setNewValue(obj); |
| } |
| // long end = System.currentTimeMillis(); |
| } |
| } |
| |
| Object doNetLoad() |
| throws CacheLoaderException, TimeoutException { |
| if (this.netLoadDone) return null; |
| this.netLoadDone = true; |
| if (advisor != null) { |
| Set loadCandidatesSet = advisor.adviseNetLoad(); |
| if (loadCandidatesSet.isEmpty()) { |
| return null; |
| } |
| CachePerfStats stats = region.getCachePerfStats(); |
| long start = stats.startNetload(); |
| ArrayList list = new ArrayList(loadCandidatesSet); |
| Collections.shuffle(list); |
| InternalDistributedMember[] loadCandidates = (InternalDistributedMember[])list. |
| toArray(new InternalDistributedMember[list.size()]); |
| initRemainingTimeout(); |
| |
| RegionAttributes attrs = region.getAttributes(); |
| int index = 0; |
| boolean stayInLoop = false; // never set to true! |
| this.remoteLoadInProgress = true; |
| try { |
| do { // the only time this loop repeats is when continue is called |
| InternalDistributedMember next = loadCandidates[index++]; |
| setSelectedNode(next); |
| this.lastNotifySpot = 0; |
| this.requestInProgress = true; |
| if (this.remainingTimeout <= 0) { // @todo this looks wrong; why not a timeout exception? |
| break; |
| } |
| this.remoteException = null; |
| NetLoadRequestMessage.sendMessage(this, this.regionName, this.key, |
| this.aCallbackArgument, next,this.remainingTimeout, |
| attrs.getEntryTimeToLive().getTimeout(), |
| attrs.getEntryIdleTimeout().getTimeout()); |
| waitForObject2(this.remainingTimeout); |
| if (this.remoteException == null) { |
| if (!this.requestInProgress) { |
| // note even if result is null we are done; see bug 39738 |
| this.localLoad = false; |
| if (this.result != null) { |
| this.netLoad = true; |
| } |
| return this.result; |
| } |
| else { |
| // Why does the following test for selectedNodeDead? |
| // Seems like this will cause us to quit trying netLoad |
| // even if we don't have a result yet and have not tried everyone. |
| if ((this.selectedNodeDead) && (index < loadCandidates.length)) { |
| continue; |
| } |
| // otherwise we are done |
| } |
| } |
| else { |
| Throwable cause; |
| if (this.remoteException instanceof TryAgainException) { |
| if (index < loadCandidates.length) { |
| continue; |
| } else { |
| break; |
| } |
| } |
| if (this.remoteException instanceof CacheLoaderException) { |
| cause = this.remoteException.getCause(); |
| } |
| else { |
| cause = this.remoteException; |
| } |
| throw new CacheLoaderException(LocalizedStrings.SearchLoadAndWriteProcessor_WHILE_INVOKING_A_REMOTE_NETLOAD_0.toLocalizedString(cause), cause); |
| } |
| |
| } while(stayInLoop); |
| } finally { |
| stats.endNetload(start); |
| } |
| |
| } |
| return null; |
| } |
| |
| /** |
| * This exception is just used in the class to tell the caller |
| * that it should try again. InternalGemFireException used to be |
| * used for this which seems dangerous. |
| */ |
| private static class TryAgainException extends GemFireException { |
| |
| ////////////////////// Constructors ////////////////////// |
| |
| public TryAgainException() { |
| super(); |
| } |
| |
| /** |
| * Creates a new <code>TryAgainException</code>. |
| */ |
| public TryAgainException(String message) { |
| super(message); |
| } |
| } |
| |
| |
| private Object doLocalLoad(CacheLoader loader, boolean netSearchAllowed) |
| throws CacheLoaderException { |
| Object obj = null; |
| if (loader != null && !this.attemptedLocalLoad) { |
| this.attemptedLocalLoad = true; |
| CachePerfStats stats = region.getCachePerfStats(); |
| LoaderHelper loaderHelper = this.region.loaderHelperFactory.createLoaderHelper(this.key, this.aCallbackArgument, netSearchAllowed, |
| true /*netLoadAllowed*/, this); |
| long statStart = stats.startLoad(); |
| try { |
| obj = loader.load(loaderHelper); |
| } |
| finally { |
| stats.endLoad(statStart); |
| } |
| if (obj != null) { |
| this.localLoad = true; |
| } |
| } |
| return obj; |
| } |
| |
| /** |
| * Returns an event for listener notification. The event's operation |
| * may be altered to conform to the ConcurrentMap implementation specification |
| * @param event the original event |
| * @return the original event or a new event having a change in operation |
| */ |
| private CacheEvent getEventForListener(CacheEvent event) { |
| Operation op = event.getOperation(); |
| if (!op.isEntry()) { |
| return event; |
| } else { |
| EntryEventImpl r = (EntryEventImpl)event; |
| EntryEventImpl result = r; |
| if (r.isSingleHop()) { |
| // fix for bug #46130 - origin remote incorrect for one-hop operation in receiver |
| result = new EntryEventImpl(r); |
| result.setOriginRemote(true); |
| // if this is from a non-replicate and it's not in a tx it should be seen as a Create |
| // because that's what the sender would use in notifying listeners. bug #46955 |
| if (result.getOperation().isUpdate() && (result.getTransactionId() == null)) { |
| result.makeCreate(); |
| } |
| } |
| if (op == Operation.REPLACE) { |
| if (result == r) result = new EntryEventImpl(r); |
| result.setOperation(Operation.UPDATE); |
| } else if (op == Operation.PUT_IF_ABSENT) { |
| if (result == r) result = new EntryEventImpl(r); |
| result.setOperation(Operation.CREATE); |
| } else if (op == Operation.REMOVE) { |
| if (result == r) result = new EntryEventImpl(r); |
| result.setOperation(Operation.DESTROY); |
| } |
| return result; |
| } |
| } |
| |
| private boolean doLocalWrite(CacheWriter writer,CacheEvent pevent,int paction) |
| throws CacheWriterException { |
| // Return if the inhibit all notifications flag is set |
| if (pevent instanceof EntryEventImpl) { |
| if (((EntryEventImpl)pevent).inhibitAllNotifications()){ |
| if (logger.isDebugEnabled()) { |
| logger.debug("Notification inhibited for key {}", pevent); |
| } |
| return false; |
| } |
| } |
| CacheEvent event = getEventForListener(pevent); |
| |
| int action = paction; |
| if (event.getOperation().isCreate() && action == BEFOREUPDATE) { |
| action = BEFORECREATE; |
| } |
| try { |
| switch(action) { |
| case BEFORECREATE: |
| writer.beforeCreate((EntryEvent)event); |
| break; |
| case BEFOREDESTROY: |
| writer.beforeDestroy((EntryEvent)event); |
| break; |
| case BEFOREUPDATE: |
| writer.beforeUpdate((EntryEvent)event); |
| break; |
| case BEFOREREGIONDESTROY: |
| writer.beforeRegionDestroy((RegionEvent)event); |
| break; |
| case BEFOREREGIONCLEAR: |
| writer.beforeRegionClear((RegionEvent)event); |
| break; |
| default: |
| break; |
| |
| } |
| } finally { |
| if (event != pevent) { |
| if (event instanceof EntryEventImpl) { |
| ((EntryEventImpl) event).release(); |
| } |
| } |
| } |
| this.localWrite = true; |
| return true; |
| |
| } |
| |
| /** Return true if cache writer was invoked */ |
| private boolean netWrite(CacheEvent event, int action, Set writeCandidateSet) |
| throws CacheWriterException, TimeoutException { |
| |
| // assert !writeCandidateSet.isEmpty(); |
| ArrayList list = new ArrayList(writeCandidateSet); |
| Collections.shuffle(list); |
| InternalDistributedMember[] writeCandidates = (InternalDistributedMember[])list. |
| toArray(new InternalDistributedMember[list.size()]); |
| initRemainingTimeout(); |
| int index =0; |
| do { |
| InternalDistributedMember next = writeCandidates[index++]; |
| Set set = new HashSet(); |
| set.add(next); |
| this.netWriteSucceeded = false; |
| this.requestInProgress = true; |
| this.remoteException = null; |
| NetWriteRequestMessage.sendMessage(this, this.regionName,this.remainingTimeout, |
| event,set,action); |
| if (this.remainingTimeout <= 0) { // @todo: should this throw a timeout exception? |
| break; |
| } |
| waitForObject2(this.remainingTimeout); |
| if (this.netWriteSucceeded) { |
| this.netWrite = true; |
| break; |
| } |
| if (this.remoteException != null) { |
| Throwable cause; |
| if (this.remoteException instanceof TryAgainException) { |
| if (index < writeCandidates.length) { |
| continue; |
| } else { |
| break; |
| } |
| } |
| if (this.remoteException instanceof CacheWriterException && |
| this.remoteException.getCause() != null) { |
| cause = this.remoteException.getCause(); |
| } |
| else { |
| cause = this.remoteException; |
| } |
| throw new CacheWriterException(LocalizedStrings.SearchLoadAndWriteProcessor_WHILE_INVOKING_A_REMOTE_NETWRITE_0.toLocalizedString(cause), cause); |
| } |
| } while(index < writeCandidates.length); |
| |
| return this.netWriteSucceeded; |
| } |
| |
| |
| /** process a QueryMessage netsearch response |
| * @param versionTag TODO*/ |
| protected synchronized void incomingResponse(Object obj, |
| long lastModifiedTime, |
| boolean isPresent, |
| boolean serialized, |
| final boolean requestorTimedOut, |
| final InternalDistributedMember sender, |
| DistributionManager dm, VersionTag versionTag) { |
| // NOTE: keep this method efficient since it is optimized |
| // by executing it in the p2p reader. |
| // This is done with this line in DistributionMessage.java: |
| // || c.equals(SearchLoadAndWriteProcessor.ResponseMessage.class) |
| |
| // bug 35266 - don't pay attention to late breaking "get" responses |
| if (this.remoteLoadInProgress) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Ignoring netsearch response from {} because we're now doing a netload", sender); |
| } |
| return; |
| } |
| |
| if (this.pendingResponders.isEmpty()) { |
| return; |
| } |
| if (!this.pendingResponders.remove(sender)) { |
| return; |
| } else { |
| if (logger.isDebugEnabled()) { |
| // only log the message if it's from a recipient we're waiting for. |
| // it could have been multicast to all members, which would give us |
| // one response per member |
| logger.debug("Processing response for processorId={}, isPresent is {}, sender is {}, key is {}, value is {}, version is {}", |
| this.processorId, isPresent, sender, this.key, serialized, versionTag); |
| } |
| } |
| |
| //Another thread got a response and that contained the value. |
| // Ignore this response. |
| if (this.result != null) { |
| return; |
| } |
| |
| if ( isPresent ) { |
| if (obj != null) { |
| Assert.assertTrue(obj != Token.INVALID && obj != Token.LOCAL_INVALID); |
| synchronized(this) { |
| this.result = obj; |
| this.lastModified = lastModifiedTime; |
| this.isSerialized = serialized; |
| this.versionTag = versionTag; |
| signalDone(); |
| return; |
| } |
| } |
| else { |
| if (!remoteGetInProgress) { |
| // send a message to this responder asking for the value |
| // do this on the waiting pool in case the send blocks |
| try { |
| dm.getWaitingThreadPool().execute(new Runnable() { |
| public void run() { |
| sendValueRequest(sender); |
| } |
| }); |
| // need to do this here before releasing sync to fix bug 37132 |
| this.requestInProgress = true; |
| this.remoteGetInProgress = true; |
| setSelectedNode(sender); |
| return; // sendValueRequest does the rest of the work |
| } |
| catch (RejectedExecutionException ex) { |
| // just fall through since we must be shutting down. |
| } |
| } |
| if (responseQueue == null) responseQueue = new LinkedList(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Saving isPresent response, requestInProgress {}", sender); |
| } |
| responseQueue.add(sender); |
| } |
| } |
| if (requestorTimedOut) { |
| signalTimedOut(); |
| } |
| |
| boolean endRequest = false; |
| if (this.pendingResponders.isEmpty() |
| && (!remoteGetInProgress)) { |
| endRequest = true; |
| } |
| if (endRequest) { |
| signalDone(); |
| } |
| } |
| |
| protected synchronized void sendValueRequest(final InternalDistributedMember sender) { |
| // send a message to this responder asking for the value |
| // do this on the waiting pool in case the send blocks |
| // Always attempt to send the message to fix bug 37149 |
| RegionAttributes attrs = this.region.getAttributes(); |
| NetSearchRequestMessage.sendMessage(this, this.regionName, this.key, |
| sender, this.remainingTimeout, |
| attrs.getEntryTimeToLive().getTimeout(), |
| attrs.getEntryIdleTimeout().getTimeout()); |
| // if it turns out that we can't send a message to this member then |
| // our membership listener should save the day and schedule a send |
| // to someone else or give up and report a failed search. |
| } |
| |
| // @todo creation times need to be handled properly |
| /** |
| * This is the response from the accepted responder. |
| * Grab the result and store it.Unlike 2.0 where the |
| * the response was a 2 phase operation, here it is a |
| * single phase operation. |
| */ |
| protected void incomingNetLoadReply(Object obj, long lastModifiedTime, |
| Object callbackArg, Exception e, |
| boolean serialized, boolean requestorTimedOut) { |
| synchronized (this) { |
| if (requestorTimedOut) { |
| signalTimedOut(); |
| return; |
| } |
| this.result = obj; |
| this.lastModified = lastModifiedTime; |
| this.remoteException = e; |
| this.aCallbackArgument = callbackArg; |
| computeRemainingTimeout(); |
| this.isSerialized = serialized; |
| signalDone(); |
| } |
| |
| |
| } |
| @SuppressWarnings("hiding") |
| protected synchronized void incomingNetSearchReply(byte[] value, long lastModifiedTime, |
| boolean serialized, |
| boolean requestorTimedOut, |
| boolean authorative, |
| VersionTag versionTag) { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| |
| if (this.requestInProgress) { |
| if (requestorTimedOut) { |
| //Force a timeout exception. |
| if (isDebugEnabled) { |
| logger.debug("incomingNetSearchReply() - requestorTimedOut {}", this); |
| } |
| signalTimedOut(); |
| } |
| computeRemainingTimeout(); |
| if (value != null || authorative) { |
| synchronized (this) { |
| this.result = value; |
| this.lastModified = lastModifiedTime; |
| this.isSerialized = serialized; |
| this.remoteGetInProgress = false; |
| this.authorative = authorative; |
| this.versionTag = versionTag; |
| if (isDebugEnabled) { |
| logger.debug("incomingNetSearchReply() - got obj {}", this); |
| } |
| signalDone(); |
| } |
| } |
| else if(this.remainingTimeout <= 0) { |
| this.remoteGetInProgress = false; |
| if (isDebugEnabled) { |
| logger.debug("incomingNetSearchReply() - null obj, no more time {}", this); |
| } |
| signalDone(); // @todo: is this a bug? should call signalTimedOut? |
| } |
| else { |
| if (isDebugEnabled) { |
| logger.debug("incomingNetSearchReply() - null obj, sendNetSearchRequest {}", this); |
| } |
| sendNetSearchRequest(); |
| } |
| } else { |
| if (isDebugEnabled) { |
| logger.debug("incomingNetSearchReply() - requestInProgress is false {}", this); |
| } |
| // should we checkIfDone? |
| // sure why not? |
| checkIfDone(); |
| } |
| } |
| |
| /** |
| * Return the next responder on the responseQueue, or null if empty |
| */ |
| private InternalDistributedMember nextAppropriateResponder() { |
| if (responseQueue != null && responseQueue.size() > 0) { |
| return (InternalDistributedMember)responseQueue.remove(0); |
| } else { |
| return null; |
| } |
| } |
| |
| private synchronized void sendNetSearchRequest() { |
| InternalDistributedMember nextResponder = nextAppropriateResponder(); |
| if (nextResponder != null) { |
| // Make a request to the next responder in the queue |
| RegionAttributes attrs = this.region.getAttributes(); |
| setSelectedNode(nextResponder); |
| this.requestInProgress = true; |
| this.remoteGetInProgress = true; |
| NetSearchRequestMessage.sendMessage(this, this.regionName, this.key, |
| nextResponder,this.remainingTimeout, |
| attrs.getEntryTimeToLive().getTimeout(), |
| attrs.getEntryIdleTimeout().getTimeout() |
| ); |
| |
| } |
| else { |
| this.remoteGetInProgress = false; |
| checkIfDone(); |
| |
| } |
| } |
| /** |
| * This is the response from the accepted responder. |
| */ |
| protected void incomingNetWriteReply(boolean netWriteSuccessful,Exception e, |
| boolean exe) { |
| synchronized (this) { |
| this.remoteException = e; |
| this.netWriteSucceeded = netWriteSuccessful; |
| computeRemainingTimeout(); |
| signalDone(); |
| } |
| } |
| |
| private synchronized void initRemainingTimeout() { |
| this.remainingTimeout = this.timeout * 1000; |
| this.startTimeSnapShot = this.distributionManager.cacheTimeMillis(); |
| } |
| |
| private synchronized void computeRemainingTimeout() { |
| if (this.startTimeSnapShot > 0) { // @todo this should be an assertion |
| this.endTimeSnapShot = this.distributionManager.cacheTimeMillis(); |
| long delta = this.endTimeSnapShot - this.startTimeSnapShot; |
| if (delta > 0) { |
| this.remainingTimeout -= delta; |
| } |
| this.startTimeSnapShot = this.endTimeSnapShot; |
| } |
| } |
| |
| private synchronized void waitForObject2(final int timeoutMs) |
| throws TimeoutException { |
| if (this.requestInProgress) { |
| try { |
| final DM dm = this.region.cache.getDistributedSystem() |
| .getDistributionManager(); |
| long waitTimeMs = timeoutMs; |
| final long endTime = System.currentTimeMillis() + waitTimeMs; |
| for (;;) { |
| if (!this.requestInProgress) { |
| return; |
| } |
| if (waitTimeMs <= 0) { |
| throw new TimeoutException(LocalizedStrings.SearchLoadAndWriteProcessor_TIMED_OUT_WHILE_DOING_NETSEARCHNETLOADNETWRITE_PROCESSORID_0_KEY_IS_1.toLocalizedString(new Object[] {Integer.valueOf(this.processorId), this.key})); |
| } |
| |
| boolean interrupted = Thread.interrupted(); |
| int lastNS = this.lastNotifySpot; |
| try { |
| { |
| boolean done = (lastNS != 0); |
| while (!done && waitTimeMs > 0) { |
| this.region.getCancelCriterion().checkCancelInProgress(null); |
| interrupted = Thread.interrupted() || interrupted; |
| long wt = Math.min(RETRY_TIME, waitTimeMs); |
| wait(wt); // spurious wakeup ok |
| lastNS = this.lastNotifySpot; |
| done = (lastNS != 0); |
| if (!done) { |
| // calc remaing wait time to fix bug 37196 |
| waitTimeMs = endTime - System.currentTimeMillis(); |
| } |
| } // while |
| if (done) { |
| this.lastNotifySpot = 0; |
| } |
| } |
| if (this.requestInProgress && !this.selectedNodeDead) { |
| // added the test of "!this.selectedNodeDead" for bug 37196 |
| StringBuilder sb = new StringBuilder(200); |
| sb.append("processorId=").append(this.processorId); |
| sb.append(" Key is ").append(this.key); |
| sb.append(" searchTimeoutMs ").append(timeoutMs); |
| if (waitTimeMs > 0) { |
| sb.append(" msRemaining=").append(waitTimeMs); |
| } |
| if (lastNS != 0) { |
| sb.append(" lastNotifySpot=" + lastNS); |
| } |
| throw new TimeoutException(LocalizedStrings.SearchLoadAndWriteProcessor_TIMEOUT_DURING_NETSEARCHNETLOADNETWRITE_DETAILS_0.toLocalizedString(sb)); |
| } |
| return; |
| } |
| catch (InterruptedException e) { |
| interrupted = true; |
| region.getCancelCriterion().checkCancelInProgress(null); |
| // keep waiting until we are done |
| waitTimeMs = endTime - System.currentTimeMillis(); |
| // continue |
| } |
| finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| } finally { |
| computeRemainingTimeout(); |
| } |
| } // requestInProgress |
| } |
| |
| private int getSearchTimeout() { |
| return region.getCache().getSearchTimeout(); //CacheFactory.getInstance(((DistributedRegion)this.region).getSystem()).getSearchTimeout(); |
| } |
| |
| private void resetResults() { |
| this.netSearch = false; |
| this.netLoad = false; |
| this.localLoad = false; |
| this.localWrite = false; |
| this.netWrite = false; |
| this.lastModified = 0; |
| this.isSerialized = false; |
| } |
| |
| |
| // private AcceptHelper getAcceptHelper(boolean ackPortInit) { |
| // AcceptHelper helper = null; |
| // synchronized(availableAcceptHelperSet) { |
| // if (availableAcceptHelperSet.size() <= 0) { |
| // helper = new AcceptHelper(); |
| // } |
| // else { |
| // helper = (AcceptHelper)availableAcceptHelperSet.iterator().next(); |
| // availableAcceptHelperSet.remove(helper); |
| // } |
| // } |
| // helper.reset(ackPortInit); |
| // return helper; |
| // |
| // } |
| |
| // private void releaseAcceptHelper(AcceptHelper helper) { |
| // synchronized(availableAcceptHelperSet) { |
| // if (!availableAcceptHelperSet.contains(helper)) |
| // availableAcceptHelperSet.add(helper); |
| // } |
| // |
| // } |
| |
| @Override |
| public String toString() { |
| return super.toString() + " processorId " + this.processorId; |
| } |
| |
| /** |
| * methods to set/remove htree reference (Bug 40299). |
| */ |
| protected static void setClearCountReference(LocalRegion rgn) { |
| DiskRegion dr = rgn.getDiskRegion(); |
| if (dr != null) { |
| dr.setClearCountReference(); |
| }; |
| } |
| |
| protected static void removeClearCountReference(LocalRegion rgn) { |
| DiskRegion dr = rgn.getDiskRegion(); |
| if (dr != null) { |
| dr.removeClearCountReference(); |
| }; |
| } |
| |
| /** |
| * Test method for bug 40299. |
| */ |
| @SuppressWarnings("synthetic-access") |
| public void testNetSearchMessageDoGet(String theRegionName, |
| Object theKey, |
| int theTimeoutMs, |
| int theTtl, |
| int theIdleTime) { |
| NetSearchRequestMessage nMsg = new NetSearchRequestMessage(); |
| nMsg.initialize(this, theRegionName, theKey, theTimeoutMs, theTtl, theIdleTime); |
| nMsg.doGet((DistributionManager)this.distributionManager); |
| } |
| |
| /***************************************************************************** |
| * INNER CLASSES |
| *****************************************************************************/ |
| |
| |
| |
| /** |
| * A QueryMessage is broadcast to every node that has the region defined, |
| * to find out who has a valid copy of the requested object. |
| */ |
| public static final class QueryMessage extends SerialDistributionMessage { |
| |
| /** |
| * The object id of the processor object on the |
| * initiator node. This will be communicated back in the response |
| * to enable transferring the result to the initiating VM. |
| */ |
| private int processorId; |
| |
| /** The fully qualified name of the Region */ |
| private String regionName; |
| |
| /** The object name */ |
| private Object key; |
| |
| /** Amount of time to wait before giving up */ |
| private int timeoutMs; |
| |
| |
| |
| /** The originator's expiration criteria */ |
| private int ttl, idleTime; |
| |
| /** if true then always send back value even if it is large. |
| * Added for bug 35942. |
| */ |
| private boolean alwaysSendResult; |
| |
| // using available bitmask flags |
| private static final short HAS_TTL = UNRESERVED_FLAGS_START; |
| private static final short HAS_IDLE_TIME = (HAS_TTL << 1); |
| private static final short ALWAYS_SEND_RESULT = (HAS_IDLE_TIME << 1); |
| |
| public QueryMessage() {}; |
| |
| /** |
| * Using a new or pooled message instance, create and send |
| * the query to all nodes. |
| */ |
| public static void sendMessage(SearchLoadAndWriteProcessor processor, |
| String regionName, |
| Object key, |
| boolean multicast, |
| Set recipients, |
| int timeoutMs, |
| int ttl, |
| int idleTime) { |
| |
| // create a message |
| QueryMessage msg = new QueryMessage(); |
| msg.initialize(processor, regionName, key, multicast, timeoutMs,ttl,idleTime); |
| msg.setRecipients(recipients); |
| if (!multicast && recipients.size() == 1) { |
| // we are only searching one guy so tell him to send the value |
| msg.alwaysSendResult = true; |
| } |
| processor.distributionManager.putOutgoing(msg); |
| |
| } |
| |
| private void initialize(SearchLoadAndWriteProcessor processor, |
| String theRegionName, |
| Object theKey, |
| boolean multicast, |
| int theTimeoutMs, |
| int theTtl, |
| int theIdleTime) { |
| this.processorId = processor.processorId; |
| this.regionName = theRegionName; |
| setMulticast(multicast); |
| this.key = theKey; |
| this.timeoutMs = theTimeoutMs; |
| this.ttl = theTtl; |
| this.idleTime = theIdleTime; |
| Assert.assertTrue(processor.region.getScope().isDistributed()); |
| } |
| |
| |
| /** |
| * This method execute's on the receiver's node, and |
| * checks to see if the requested object exists in |
| * shared memory on this node, and if so, sends back |
| * a ResponseMessage. |
| */ |
| @Override |
| protected void process(DistributionManager dm) { |
| doGet(dm); |
| |
| } |
| |
| public int getDSFID() { |
| return QUERY_MESSAGE; |
| } |
| |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| super.toData(out); |
| |
| short flags = 0; |
| if (this.processorId != 0) flags |= HAS_PROCESSOR_ID; |
| if (this.ttl != 0) flags |= HAS_TTL; |
| if (this.idleTime != 0) flags |= HAS_IDLE_TIME; |
| if (this.alwaysSendResult) flags |= ALWAYS_SEND_RESULT; |
| out.writeShort(flags); |
| |
| if (this.processorId != 0) { |
| out.writeInt(this.processorId); |
| } |
| out.writeUTF(this.regionName); |
| DataSerializer.writeObject(this.key,out); |
| out.writeInt(this.timeoutMs); |
| if (this.ttl != 0) { |
| InternalDataSerializer.writeSignedVL(this.ttl, out); |
| } |
| if (this.idleTime != 0) { |
| InternalDataSerializer.writeSignedVL(this.idleTime, out); |
| } |
| } |
| |
| @Override |
| public void fromData(DataInput in) |
| throws IOException, ClassNotFoundException { |
| super.fromData(in); |
| short flags = in.readShort(); |
| if ((flags & HAS_PROCESSOR_ID) != 0) { |
| this.processorId = in.readInt(); |
| ReplyProcessor21.setMessageRPId(this.processorId); |
| } |
| this.regionName = in.readUTF(); |
| this.key = DataSerializer.readObject(in); |
| this.timeoutMs = in.readInt(); |
| if ((flags & HAS_TTL) != 0) { |
| this.ttl = (int)InternalDataSerializer.readSignedVL(in); |
| } |
| if ((flags & HAS_IDLE_TIME) != 0) { |
| this.idleTime = (int)InternalDataSerializer.readSignedVL(in); |
| } |
| this.alwaysSendResult = (flags & ALWAYS_SEND_RESULT) != 0; |
| } |
| |
| @Override |
| public String toString() { |
| return "SearchLoadAndWriteProcessor.QueryMessage for \"" + this.key |
| + "\" in region \"" + this.regionName + "\", processorId " + processorId |
| + ", timeoutMs=" + this.timeoutMs |
| + ", ttl=" + this.ttl + ", idleTime=" + this.idleTime; |
| } |
| |
| private void doGet(DistributionManager dm) { |
| long startTime = dm.cacheTimeMillis(); |
| // boolean retVal = true; |
| boolean isPresent = false; |
| boolean sendResult = false; |
| boolean isSer = false; |
| long lastModifiedCacheTime = 0; |
| boolean requestorTimedOut = false; |
| VersionTag tag = null; |
| |
| if (dm.getDMType() == DistributionManager.ADMIN_ONLY_DM_TYPE |
| || getSender().equals(dm.getDistributionManagerId()) ) { |
| // this was probably a multicast message |
| //replyWithNull(dm); - bug 35266: don't send a reply |
| return; |
| } |
| |
| int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE); |
| try { |
| // check to see if we would have to wait on initialization latch (if global) |
| // if so abort and reply with null |
| GemFireCacheImpl gfc = (GemFireCacheImpl)CacheFactory.getInstance(dm.getSystem()); |
| if (gfc.isGlobalRegionInitializing(this.regionName)) { |
| replyWithNull(dm); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Global Region not initialized yet"); |
| } |
| return; |
| } |
| |
| LocalRegion region = (LocalRegion)CacheFactory |
| .getInstance(dm.getSystem()).getRegion(this.regionName); |
| Object o = null; |
| |
| if (region != null) { |
| setClearCountReference(region); |
| try { |
| RegionEntry entry = region.basicGetEntry(this.key); |
| if (entry != null) { |
| synchronized(entry) { |
| assert region.isInitialized(); |
| { |
| if (dm.cacheTimeMillis() - startTime < timeoutMs) { |
| o = region.getNoLRU(this.key, false, true, true); // OFFHEAP: incrc, copy bytes, decrc |
| if (o != null && !Token.isInvalid(o) && !Token.isRemoved(o) && |
| !region.isExpiredWithRegardTo(this.key, |
| this.ttl, |
| this.idleTime)) { |
| isPresent = true; |
| VersionStamp stamp = entry.getVersionStamp(); |
| if (stamp != null && stamp.hasValidVersion()) { |
| tag = stamp.asVersionTag(); |
| } |
| long lastModified = entry.getLastModified(); |
| lastModifiedCacheTime = lastModified; |
| isSer = o instanceof CachedDeserializable; |
| if (isSer) { |
| o = ((CachedDeserializable)o).getSerializedValue(); |
| } |
| if (isPresent |
| && (this.alwaysSendResult |
| || (ObjectSizer.DEFAULT.sizeof(o) < SMALL_BLOB_SIZE))) { |
| sendResult = true; |
| } |
| } |
| } |
| else { |
| requestorTimedOut = true; |
| } |
| } |
| } |
| } |
| else |
| if (logger.isDebugEnabled()) { |
| logger.debug("Entry is null"); |
| } |
| } |
| finally { |
| removeClearCountReference(region); |
| } |
| } |
| ResponseMessage.sendMessage( this.key, this.getSender(),processorId, |
| (sendResult ? o : null), |
| lastModifiedCacheTime, isPresent, isSer, |
| requestorTimedOut, dm, tag); |
| } |
| catch (RegionDestroyedException rde) { |
| logger.debug("Region Destroyed Exception in QueryMessage doGet, null"); |
| replyWithNull(dm); |
| } |
| catch (CancelException cce) { |
| logger.debug("CacheClosedException in QueryMessage doGet, null"); |
| replyWithNull(dm); |
| } |
| catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } |
| catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| logger.debug("Throwable in QueryMessage doGet, null", t); |
| replyWithNull(dm); |
| } |
| finally { |
| LocalRegion.setThreadInitLevelRequirement(oldLevel); |
| } |
| } |
| |
| private void replyWithNull(DistributionManager dm) { |
| ResponseMessage.sendMessage(this.key,this.getSender(),processorId, null, |
| 0,false, false, false,dm, null); |
| |
| } |
| |
| } |
| |
| /********************* ResponseMessage ***************************************/ |
| |
| |
| /** |
| * The ResponseMessage is a reply to a QueryMessage, and contains the |
| * object's value, if it is below the byte limit, otherwise an indication |
| * of whether the sender has the value. |
| */ |
| public static final class ResponseMessage extends HighPriorityDistributionMessage { |
| |
| private Object key; |
| |
| /** The gemfire id of the SearchLoadAndWrite object waiting for response */ |
| private int processorId; |
| |
| /** The value being transferred */ |
| private Object result; |
| |
| /** Object creation time on remote node */ |
| private long lastModified; |
| |
| /** is the value present*/ |
| private boolean isPresent; |
| |
| |
| /** Is blob serialized? */ |
| private boolean isSerialized; |
| |
| /** did the request time out at the sender*/ |
| private boolean requestorTimedOut; |
| |
| /** the version of the object being returned */ |
| private VersionTag versionTag; |
| |
| |
| public ResponseMessage() {} |
| |
| public static void sendMessage(Object key, |
| InternalDistributedMember recipient, |
| int processorId, |
| Object result, long lastModified, |
| boolean isPresent, boolean isSerialized, |
| boolean requestorTimedOut, |
| DistributionManager distributionManager, VersionTag versionTag) { |
| |
| // create a message |
| ResponseMessage msg = new ResponseMessage(); |
| msg.initialize(key,processorId, result, lastModified,isPresent,isSerialized,requestorTimedOut, versionTag); |
| msg.setRecipient(recipient); |
| distributionManager.putOutgoing(msg); |
| |
| } |
| |
| private void initialize(Object theKey, int theProcessorId, Object theResult, |
| long lastModifiedTime, boolean ispresent, boolean isserialized, |
| boolean requestorTimedOutFlag, VersionTag versionTag) { |
| this.key = theKey; |
| this.processorId = theProcessorId; |
| this.result = theResult; |
| this.lastModified = lastModifiedTime; |
| this.isPresent = ispresent; |
| this.isSerialized = isserialized; |
| this.requestorTimedOut = requestorTimedOutFlag; |
| this.versionTag = versionTag; |
| } |
| |
| /** |
| * Invoked on the receiver - which, in this case, was the initiator of |
| * the QueryMessage . |
| */ |
| @Override |
| protected void process(DistributionManager dm) { |
| // NOTE: keep this method efficient since it is optimized |
| // by executing it in the p2p reader. |
| // This is done with this line in DistributionMessage.java: |
| // || c.equals(SearchLoadAndWriteProcessor.ResponseMessage.class) |
| SearchLoadAndWriteProcessor processor = null; |
| processor = (SearchLoadAndWriteProcessor)getProcessorKeeper().retrieve(this.processorId); |
| if (processor == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Response() SearchLoadAndWriteProcessor no longer exists"); |
| } |
| return; |
| } |
| long lastModifiedSystemTime = 0; |
| if (this.lastModified != 0) { |
| lastModifiedSystemTime = this.lastModified; |
| } |
| if (this.versionTag != null) { |
| this.versionTag.replaceNullIDs(getSender()); |
| } |
| |
| processor.incomingResponse(this.result, lastModifiedSystemTime, |
| this.isPresent, this.isSerialized, |
| this.requestorTimedOut, |
| this.getSender(), |
| dm, versionTag); |
| } |
| |
| @Override |
| public boolean getInlineProcess() { // optimization for bug 37075 |
| return true; |
| } |
| |
| public int getDSFID() { |
| return RESPONSE_MESSAGE; |
| } |
| |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| super.toData(out); |
| DataSerializer.writeObject(this.key, out); |
| out.writeInt(this.processorId); |
| DataSerializer.writeObject(this.result,out); |
| out.writeLong(this.lastModified); |
| out.writeBoolean(this.isPresent); |
| out.writeBoolean(this.isSerialized); |
| out.writeBoolean(this.requestorTimedOut); |
| DataSerializer.writeObject(this.versionTag, out); |
| } |
| |
| @Override |
| public void fromData(DataInput in) |
| throws IOException, ClassNotFoundException { |
| super.fromData(in); |
| this.key = DataSerializer.readObject(in); |
| this.processorId = in.readInt(); |
| this.result = DataSerializer.readObject(in); |
| this.lastModified = in.readLong(); |
| this.isPresent = in.readBoolean(); |
| this.isSerialized = in.readBoolean(); |
| this.requestorTimedOut = in.readBoolean(); |
| this.versionTag = (VersionTag)DataSerializer.readObject(in); |
| } |
| |
| @Override |
| public String toString() { |
| return "SearchLoadAndWriteProcessor.ResponseMessage for processorId " + |
| processorId + ", blob is " + this.result + ", isPresent is " + isPresent |
| + ", requestorTimedOut is " + requestorTimedOut + ", version is " + versionTag; |
| } |
| |
| } |
| |
| /********************* NetSearchRequestMessage ***************************************/ |
| |
| public static final class NetSearchRequestMessage extends PooledDistributionMessage { |
| |
| /** |
| * The object id of the processor object on the |
| * initiator node. This will be communicated back in the response |
| * to enable transferring the result to the initiating VM. |
| */ |
| private int processorId; |
| |
| /** The fully qualified name of the Region */ |
| private String regionName; |
| |
| /** The object name */ |
| private Object key; |
| |
| /** Amount of time to wait before giving up */ |
| private int timeoutMs; |
| |
| /** The originator's expiration criteria */ |
| private int ttl, idleTime; |
| |
| // using available bitmask flags |
| private static final short HAS_TTL = UNRESERVED_FLAGS_START; |
| private static final short HAS_IDLE_TIME = (HAS_TTL << 1); |
| |
| public NetSearchRequestMessage() { |
| } |
| |
| /** |
| * Using a new or pooled message instance, create and send |
| * the request for object value to the specified node. |
| */ |
| public static void sendMessage(SearchLoadAndWriteProcessor processor, |
| String regionName, |
| Object key, |
| InternalDistributedMember recipient, |
| int timeoutMs, |
| int ttl, |
| int idleTime) { |
| |
| // create a message |
| NetSearchRequestMessage msg = new NetSearchRequestMessage(); |
| msg.initialize(processor, regionName, key,timeoutMs, ttl,idleTime); |
| msg.setRecipient(recipient); |
| processor.distributionManager.putOutgoing(msg); |
| |
| } |
| |
| private void initialize(SearchLoadAndWriteProcessor processor, |
| String theRegionName, |
| Object theKey, |
| int timeoutMS, |
| int ttlMS, |
| int idleTimeMS) { |
| this.processorId = processor.processorId; |
| this.regionName = theRegionName; |
| this.key = theKey; |
| this.timeoutMs = timeoutMS; |
| this.ttl = ttlMS; |
| this.idleTime = idleTimeMS; |
| Assert.assertTrue(processor.region.getScope().isDistributed()); |
| } |
| |
| /** Invoked on the node that has the object */ |
| @Override |
| protected void process(DistributionManager dm) { |
| doGet(dm); |
| } |
| |
| public int getDSFID() { |
| return NET_SEARCH_REQUEST_MESSAGE; |
| } |
| |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| super.toData(out); |
| |
| short flags = 0; |
| if (this.processorId != 0) flags |= HAS_PROCESSOR_ID; |
| if (this.ttl != 0) flags |= HAS_TTL; |
| if (this.idleTime != 0) flags |= HAS_IDLE_TIME; |
| out.writeShort(flags); |
| |
| if (this.processorId != 0) { |
| out.writeInt(this.processorId); |
| } |
| out.writeUTF(this.regionName); |
| DataSerializer.writeObject(this.key,out); |
| out.writeInt(this.timeoutMs); |
| if (this.ttl != 0) { |
| InternalDataSerializer.writeSignedVL(this.ttl, out); |
| } |
| if (this.idleTime != 0) { |
| InternalDataSerializer.writeSignedVL(this.idleTime, out); |
| } |
| } |
| |
| @Override |
| public void fromData(DataInput in) |
| throws IOException, ClassNotFoundException { |
| super.fromData(in); |
| short flags = in.readShort(); |
| if ((flags & HAS_PROCESSOR_ID) != 0) { |
| this.processorId = in.readInt(); |
| ReplyProcessor21.setMessageRPId(this.processorId); |
| } |
| this.regionName = in.readUTF(); |
| this.key = DataSerializer.readObject(in); |
| this.timeoutMs = in.readInt(); |
| if ((flags & HAS_TTL) != 0) { |
| this.ttl = (int)InternalDataSerializer.readSignedVL(in); |
| } |
| if ((flags & HAS_IDLE_TIME) != 0) { |
| this.idleTime = (int)InternalDataSerializer.readSignedVL(in); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "SearchLoadAndWriteProcessor.NetSearchRequestMessage for \"" + this.key |
| + "\" in region \"" + this.regionName + "\", processorId " + processorId; |
| } |
| |
| private void doGet(DistributionManager dm) { |
| long startTime = dm.cacheTimeMillis(); |
| // boolean retVal = true; |
| byte[] ebv = null; |
| Object ebvObj = null; |
| int ebvLen = 0; |
| long lastModifiedCacheTime =0; |
| boolean isSer = false; |
| boolean requestorTimedOut = false; |
| boolean authoritative = false; |
| VersionTag versionTag = null; |
| |
| int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE); |
| try { |
| LocalRegion region = (LocalRegion)CacheFactory |
| .getInstance(dm.getSystem()).getRegion(this.regionName); |
| if (region != null) { |
| setClearCountReference(region); |
| try { |
| boolean initialized = region.isInitialized(); |
| if(region.keyRequiresRegionContext()) { |
| ((KeyWithRegionContext)this.key).setRegionContext(region); |
| } |
| RegionEntry entry = region.basicGetEntry(this.key); |
| if (entry != null) { |
| synchronized (entry) { |
| // get the value and version under synchronization so they don't change |
| VersionStamp versionStamp = entry.getVersionStamp(); |
| if (versionStamp != null) { |
| versionTag = versionStamp.asVersionTag(); |
| } |
| Object eov = region.getNoLRU(this.key, false, true, true); // OFFHEAP: incrc, copy bytes, decrc |
| if (eov != null) { |
| if (eov == Token.INVALID || eov == Token.LOCAL_INVALID) { |
| // ebv = null; (redundant assignment) |
| } |
| else if (dm.cacheTimeMillis() - startTime < timeoutMs) { |
| if (!region.isExpiredWithRegardTo(this.key, this.ttl, this.idleTime)) { |
| long lastModified = entry.getLastModified(); |
| lastModifiedCacheTime = lastModified; |
| if (eov instanceof CachedDeserializable) { |
| if (eov instanceof StoredObject && !((StoredObject) eov).isSerialized()) { |
| isSer = false; |
| ebv = (byte[]) ((StoredObject)eov).getDeserializedForReading(); |
| ebvLen = ebv.length; |
| } else { |
| // don't serialize here if it is not already serialized |
| Object tmp = ((CachedDeserializable)eov).getValue(); |
| if (tmp instanceof byte[]) { |
| byte[] bb = (byte[])tmp; |
| ebv = bb; |
| ebvLen = bb.length; |
| } else { |
| ebvObj = tmp; |
| } |
| isSer = true; |
| } |
| } |
| else if (!(eov instanceof byte[])) { |
| ebvObj = eov; |
| isSer = true; |
| } |
| else { |
| ebv = (byte[])eov; |
| ebvLen = ebv.length; |
| } |
| } |
| } |
| else { |
| requestorTimedOut = true; |
| } |
| } |
| } |
| } |
| authoritative = region.getDataPolicy().withReplication() && initialized && !region.isDestroyed; |
| } |
| finally { |
| removeClearCountReference(region); |
| } |
| } |
| NetSearchReplyMessage.sendMessage(NetSearchRequestMessage.this.getSender(), |
| processorId, this.key, ebv, ebvObj, ebvLen, |
| lastModifiedCacheTime, isSer, |
| requestorTimedOut, authoritative, dm, versionTag); |
| } |
| catch (RegionDestroyedException rde) { |
| replyWithNull(dm); |
| |
| } |
| catch (CancelException cce) { |
| replyWithNull(dm); |
| |
| } |
| catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } |
| catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| logger.warn(LocalizedMessage.create(LocalizedStrings.SearchLoadAndWriteProcessor_UNEXPECTED_EXCEPTION), t); |
| replyWithNull(dm); |
| } |
| finally { |
| LocalRegion.setThreadInitLevelRequirement(oldLevel); |
| } |
| } |
| |
| private void replyWithNull(DistributionManager dm) { |
| NetSearchReplyMessage.sendMessage(NetSearchRequestMessage.this.getSender(), |
| processorId, this.key, null, null, 0, 0, false, |
| false,false,dm, null); |
| |
| } |
| |
| } |
| |
| /*********************NetSearchReplyMessage ***************************************/ |
| |
| /** |
| * The NetSearchReplyMessage is a reply to a NetSearchRequestMessage, and contains the |
| * object's value. |
| */ |
| public static final class NetSearchReplyMessage extends HighPriorityDistributionMessage { |
| private static final byte SERIALIZED = 0x01; |
| private static final byte REQUESTOR_TIMEOUT = 0x02; |
| private static final byte AUTHORATIVE = 0x04; |
| private static final byte VERSIONED = 0x08; |
| private static final byte PERSISTENT = 0x10; |
| |
| /** The gemfire id of the SearchLoadAndWrite object waiting for response */ |
| private int processorId; |
| |
| |
| |
| /** The object value being transferred */ |
| private byte[] value; |
| |
| private transient Object valueObj; // only used by toData |
| private transient int valueLen; // only used by toData |
| |
| /** Object creation time on remote node */ |
| private long lastModified; |
| |
| /** Is blob serialized? */ |
| private boolean isSerialized; |
| |
| /** did the request time out at the sender*/ |
| private boolean requestorTimedOut; |
| |
| /** Does this member authoritatively know the value? This is used to distinguish |
| * a null response indicating the region was missing vs. a null value. */ |
| private boolean authoritative; |
| |
| /** the version of the returned entry */ |
| private VersionTag versionTag; |
| |
| public NetSearchReplyMessage() {} |
| |
| public static void sendMessage(InternalDistributedMember recipient, |
| int processorId, |
| Object key, |
| byte[] value, Object valueObj, int valueLen, long lastModified, |
| boolean isSerialized, |
| boolean requestorTimedOut, |
| boolean authoritative, |
| DistributionManager distributionManager, VersionTag versionTag |
| ) { |
| // create a message |
| NetSearchReplyMessage msg = new NetSearchReplyMessage(); |
| msg.initialize(processorId, value, valueObj, valueLen, lastModified,isSerialized,requestorTimedOut, authoritative, versionTag); |
| msg.setRecipient(recipient); |
| distributionManager.putOutgoing(msg); |
| |
| } |
| |
| @SuppressWarnings("hiding") |
| private void initialize(int procId, |
| byte[] theValue, Object theValueObj, int valueObjLen, |
| long lastModifiedTime ,boolean isserialized, boolean requestorTimedout, |
| boolean authoritative, VersionTag versionTag) { |
| this.processorId = procId; |
| this.value = theValue; |
| this.valueObj = theValueObj; |
| this.valueLen = valueObjLen; |
| this.lastModified = lastModifiedTime; |
| this.isSerialized = isserialized; |
| this.requestorTimedOut = requestorTimedout; |
| this.authoritative = authoritative; |
| this.versionTag = versionTag; |
| } |
| |
| /** |
| * Invoked on the receiver - which, in this case, was the initiator of |
| * the NetSearchRequestMessage. This concludes the net request, by |
| * communicating an object value. |
| */ |
| @Override |
| protected void process(DistributionManager dm) { |
| SearchLoadAndWriteProcessor processor = null; |
| processor = (SearchLoadAndWriteProcessor)getProcessorKeeper().retrieve(processorId); |
| if (processor == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("NetSearchReplyMessage() SearchLoadAndWriteProcessor {} no longer exists", processorId); |
| } |
| return; |
| } |
| long lastModifiedSystemTime = 0; |
| if (this.lastModified != 0) { |
| lastModifiedSystemTime = this.lastModified; |
| } |
| if (this.versionTag != null) { |
| this.versionTag.replaceNullIDs(getSender()); |
| } |
| processor.incomingNetSearchReply(this.value, lastModifiedSystemTime, |
| this.isSerialized, this.requestorTimedOut, |
| this.authoritative, this.versionTag); |
| } |
| |
| public int getDSFID() { |
| return NET_SEARCH_REPLY_MESSAGE; |
| } |
| |
| |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| super.toData(out); |
| out.writeInt(this.processorId); |
| if (this.valueObj != null) { |
| DataSerializer.writeObjectAsByteArray(this.valueObj, out); |
| } else { |
| DataSerializer.writeByteArray(this.value, this.valueLen, out); |
| } |
| out.writeLong(this.lastModified); |
| byte booleans = 0; |
| if(this.isSerialized) booleans |= SERIALIZED; |
| if(this.requestorTimedOut) booleans |= REQUESTOR_TIMEOUT; |
| if(this.authoritative) booleans |= AUTHORATIVE; |
| if (this.versionTag != null) booleans |= VERSIONED; |
| if (this.versionTag instanceof DiskVersionTag) booleans |= PERSISTENT; |
| out.writeByte(booleans); |
| if (this.versionTag != null) { |
| InternalDataSerializer.invokeToData(this.versionTag, out); |
| } |
| } |
| |
| @Override |
| public void fromData(DataInput in) |
| throws IOException, ClassNotFoundException { |
| super.fromData(in); |
| this.processorId = in.readInt(); |
| this.value = DataSerializer.readByteArray(in); |
| if (this.value != null) { |
| this.valueLen = this.value.length; |
| } |
| this.lastModified = in.readLong(); |
| byte booleans = in.readByte(); |
| |
| this.isSerialized = (booleans & SERIALIZED )!= 0; |
| this.requestorTimedOut = (booleans & REQUESTOR_TIMEOUT)!= 0; |
| this.authoritative = (booleans & AUTHORATIVE)!= 0; |
| if ((booleans & VERSIONED) != 0) { |
| boolean persistentTag = (booleans & PERSISTENT) != 0; |
| this.versionTag = VersionTag.create(persistentTag, in); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "SearchLoadAndWriteProcessor.NetSearchReplyMessage for processorId " + |
| processorId + ", blob is " + |
| // this.value |
| (this.value == null ? "null" : "(" + this.value.length + " bytes)") + |
| " authorative=" + authoritative + " versionTag=" + this.versionTag |
| ; |
| } |
| |
| } |
| |
| |
| /********************************NetLoadRequestMessage**********************/ |
| |
| public static final class NetLoadRequestMessage extends PooledDistributionMessage { |
| |
| /** |
| * The object id of the processor object on the |
| * initiator node. This will be communicated back in the response |
| * to enable transferring the result to the initiating VM. |
| */ |
| private int processorId; |
| |
| /** The fully qualified name of the Region */ |
| private String regionName; |
| |
| /** The object name */ |
| private Object key; |
| |
| /** Parameter to use when invoking loader */ |
| private Object aCallbackArgument; |
| |
| /** Amount of time to wait before giving up */ |
| private int timeoutMs; |
| |
| |
| /** The originator's expiration criteria */ |
| private int ttl, idleTime; |
| |
| public NetLoadRequestMessage() {} |
| |
| /** |
| * Using a new or pooled message instance, create and send |
| * the request for object value to the specified node. |
| */ |
| public static void sendMessage(SearchLoadAndWriteProcessor processor, |
| String regionName, |
| Object key, |
| Object aCallbackArgument, |
| InternalDistributedMember recipient, |
| int timeoutMs, |
| int ttl, |
| int idleTime) { |
| |
| // create a message |
| NetLoadRequestMessage msg = new NetLoadRequestMessage(); |
| msg.initialize(processor, regionName, key,aCallbackArgument,timeoutMs,ttl,idleTime); |
| msg.setRecipient(recipient); |
| |
| try { |
| processor.distributionManager.putOutgoingUserData(msg); |
| } catch (NotSerializableException e) { |
| throw new IllegalArgumentException(LocalizedStrings.SearchLoadAndWriteProcessor_MESSAGE_NOT_SERIALIZABLE.toLocalizedString()); |
| } |
| } |
| |
| private void initialize(SearchLoadAndWriteProcessor processor, |
| String theRegionName, |
| Object theKey, |
| Object callbackArgument, |
| int timeoutMS, |
| int ttlMS, |
| int idleTimeMS) { |
| this.processorId = processor.processorId; |
| this.regionName = theRegionName; |
| this.key = theKey; |
| this.aCallbackArgument = callbackArgument; |
| this.timeoutMs = timeoutMS; |
| this.ttl = ttlMS; |
| this.idleTime = idleTimeMS; |
| Assert.assertTrue(processor.region.getScope().isDistributed()); |
| } |
| |
| /** Invoked on the node that has the object */ |
| @Override |
| protected void process(DistributionManager dm) { |
| doLoad(dm); |
| } |
| |
| public int getDSFID() { |
| return NET_LOAD_REQUEST_MESSAGE; |
| } |
| |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| super.toData(out); |
| out.writeInt(this.processorId); |
| out.writeUTF(this.regionName); |
| DataSerializer.writeObject(this.key,out); |
| DataSerializer.writeObject(this.aCallbackArgument,out); |
| out.writeInt(this.timeoutMs); |
| out.writeInt(this.ttl); |
| out.writeInt(this.idleTime); |
| |
| } |
| |
| @Override |
| public void fromData(DataInput in) |
| throws IOException, ClassNotFoundException { |
| super.fromData(in); |
| this.processorId = in.readInt(); |
| this.regionName = in.readUTF(); |
| this.key = DataSerializer.readObject(in); |
| this.aCallbackArgument = DataSerializer.readObject(in); |
| this.timeoutMs = in.readInt(); |
| this.ttl = in.readInt(); |
| this.idleTime = in.readInt(); |
| } |
| |
| @Override |
| public String toString() { |
| return "SearchLoadAndWriteProcessor.NetLoadRequestMessage for \"" + this.key |
| + "\" in region \"" + this.regionName + "\", processorId " + processorId; |
| } |
| |
| |
| |
| private void doLoad(DistributionManager dm) { |
| long startTime = dm.cacheTimeMillis(); |
| int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE); |
| try { |
| GemFireCacheImpl gfc = (GemFireCacheImpl)CacheFactory.getInstance(dm.getSystem()); |
| LocalRegion region = (LocalRegion)gfc.getRegion(this.regionName); |
| if (region != null |
| && region.isInitialized() |
| && (dm.cacheTimeMillis() - startTime < timeoutMs)) { |
| CacheLoader loader = ((AbstractRegion)region).basicGetLoader(); |
| if (loader != null) { |
| LoaderHelper loaderHelper = region.loaderHelperFactory.createLoaderHelper(this.key, |
| this.aCallbackArgument, false, false, null); |
| CachePerfStats stats = region.getCachePerfStats(); |
| long start = stats.startLoad(); |
| try { |
| Object o = loader.load(loaderHelper); |
| Assert.assertTrue(o != Token.INVALID && o != Token.LOCAL_INVALID); |
| NetLoadReplyMessage.sendMessage(NetLoadRequestMessage.this.getSender(), |
| processorId, o,dm,loaderHelper.getArgument(),null,false, false); |
| |
| } |
| catch(Exception e) { |
| replyWithException(e,dm); |
| } |
| finally { |
| stats.endLoad(start); |
| } |
| } |
| else { |
| replyWithException(new TryAgainException(LocalizedStrings.SearchLoadAndWriteProcessor_NO_LOADER_DEFINED_0.toLocalizedString()), dm); |
| } |
| |
| } |
| else { |
| replyWithException(new TryAgainException(LocalizedStrings.SearchLoadAndWriteProcessor_TIMEOUT_EXPIRED_OR_REGION_NOT_READY_0.toLocalizedString()), dm); |
| } |
| |
| } |
| catch (RegionDestroyedException rde) { |
| replyWithException(rde,dm); |
| |
| } |
| catch (CancelException cce) { |
| replyWithException(cce,dm); |
| |
| } |
| catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } |
| catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| replyWithException(new InternalGemFireException(LocalizedStrings.SearchLoadAndWriteProcessor_ERROR_PROCESSING_REQUEST.toLocalizedString(), t), dm); |
| } |
| finally { |
| LocalRegion.setThreadInitLevelRequirement(oldLevel); |
| } |
| |
| } |
| |
| void replyWithException(Exception e, DistributionManager dm) { |
| NetLoadReplyMessage.sendMessage(NetLoadRequestMessage.this.getSender(), |
| processorId, null , |
| dm,this.aCallbackArgument, |
| e, |
| false,false); |
| |
| } |
| |
| |
| } |
| |
| |
| |
| /********************* NetLoadReplyMessage ***************************************/ |
| |
| /** |
| * The NetLoadReplyMessage is a reply to a RequestMessage, and contains the |
| * object's value. |
| */ |
| public static final class NetLoadReplyMessage extends HighPriorityDistributionMessage { |
| |
| /** The gemfire id of the SearchLoadAndWrite object waiting for response */ |
| private int processorId; |
| |
| /** The object value being transferred */ |
| private Object result; |
| |
| |
| /** Loader parameter returned to sender*/ |
| private Object aCallbackArgument; |
| |
| /** Exception thrown by remote node */ |
| private Exception e; |
| |
| /** Is blob serialized? */ |
| private boolean isSerialized; |
| |
| /** ??? */ |
| private boolean requestorTimedOut; |
| |
| public NetLoadReplyMessage() {} |
| |
| public static void sendMessage(InternalDistributedMember recipient, |
| int processorId, |
| Object obj, |
| DistributionManager distributionManager, |
| Object aCallbackArgument, |
| Exception e, |
| boolean isSerialized, |
| boolean requestorTimedOut) { |
| // create a message |
| NetLoadReplyMessage msg = new NetLoadReplyMessage(); |
| msg.initialize(processorId, obj,aCallbackArgument,e, isSerialized,requestorTimedOut); |
| msg.setRecipient(recipient); |
| distributionManager.putOutgoing(msg); |
| } |
| |
| private void initialize(int procId, Object obj, |
| Object callbackArgument, Exception exe, |
| boolean isserialized, |
| boolean requestorTimedout) { |
| this.processorId = procId; |
| this.result = obj; |
| this.e = exe; |
| this.aCallbackArgument = callbackArgument; |
| this.isSerialized = isserialized; |
| this.requestorTimedOut = requestorTimedout; |
| } |
| |
| /** |
| * Invoked on the receiver - which, in this case, was the initiator of |
| * the NetLoadRequestMessage. This concludes the net request, by |
| * communicating an object value. |
| */ |
| @Override |
| protected void process(DistributionManager dm) { |
| SearchLoadAndWriteProcessor processor = null; |
| processor = (SearchLoadAndWriteProcessor)getProcessorKeeper().retrieve(processorId); |
| if (processor == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("NetLoadReplyMessage() SearchLoadAndWriteProcessor no longer exists"); |
| } |
| return; |
| } |
| processor.incomingNetLoadReply(this.result, 0,this.aCallbackArgument, |
| this.e,this.isSerialized, this.requestorTimedOut); |
| } |
| |
| public int getDSFID() { |
| return NET_LOAD_REPLY_MESSAGE; |
| } |
| |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| super.toData(out); |
| out.writeInt(this.processorId); |
| boolean isSerialized = this.isSerialized; |
| if(result instanceof byte[]) { |
| DataSerializer.writeByteArray((byte[]) this.result,out); |
| } else { |
| DataSerializer.writeObjectAsByteArray(this.result,out); |
| isSerialized = true; |
| } |
| DataSerializer.writeObject(this.aCallbackArgument,out); |
| DataSerializer.writeObject(this.e,out); |
| out.writeBoolean(isSerialized); |
| out.writeBoolean(this.requestorTimedOut); |
| |
| } |
| |
| @Override |
| public void fromData(DataInput in) |
| throws IOException, ClassNotFoundException { |
| super.fromData(in); |
| this.processorId = in.readInt(); |
| this.result = DataSerializer.readByteArray(in); |
| this.aCallbackArgument = DataSerializer.readObject(in); |
| this.e = (Exception) DataSerializer.readObject(in); |
| this.isSerialized = in.readBoolean(); |
| this.requestorTimedOut = in.readBoolean(); |
| |
| } |
| |
| @Override |
| public String toString() { |
| return "SearchLoadAndWriteProcessor.NetLoadReplyMessage for processorId " + |
| processorId + ", blob is " + this.result; |
| } |
| |
| } |
| |
| /********************* NetWriteRequestMessage *******************************/ |
| |
| public static final class NetWriteRequestMessage extends PooledDistributionMessage { |
| |
| /** |
| * The object id of the processor object on the |
| * initiator node. This will be communicated back in the response |
| * to enable transferring the result to the initiating VM. |
| */ |
| private int processorId; |
| |
| /** The fully qualified name of the Region */ |
| private String regionName; |
| |
| /** The event being sent over to the remote writer*/ |
| CacheEvent event; |
| |
| private int timeoutMs; |
| /**Action requested by sender*/ |
| int action; |
| |
| public NetWriteRequestMessage() {} |
| |
| /** |
| * Using a new or pooled message instance, create and send |
| * the request for object value to the specified node. |
| */ |
| public static void sendMessage(SearchLoadAndWriteProcessor processor, |
| String regionName,int timeoutMs, |
| CacheEvent event, Set recipients, |
| int action) { |
| |
| NetWriteRequestMessage msg = new NetWriteRequestMessage(); |
| msg.initialize(processor, regionName,timeoutMs, event,action); |
| msg.setRecipients(recipients); |
| processor.distributionManager.putOutgoing(msg); |
| |
| } |
| |
| private void initialize(SearchLoadAndWriteProcessor processor, |
| String theRegionName,int timeoutMS, |
| CacheEvent theEvent, int actionType) { |
| this.processorId = processor.processorId; |
| this.regionName = theRegionName; |
| this.timeoutMs = timeoutMS; |
| this.event = theEvent; |
| this.action = actionType; |
| Assert.assertTrue(processor.region.getScope().isDistributed()); |
| } |
| |
| public int getDSFID() { |
| return NET_WRITE_REQUEST_MESSAGE; |
| } |
| |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| super.toData(out); |
| out.writeInt(this.processorId); |
| out.writeUTF(this.regionName); |
| out.writeInt(this.timeoutMs); |
| DataSerializer.writeObject(this.event,out); |
| out.writeInt(this.action); |
| |
| } |
| |
| @Override |
| public void fromData(DataInput in) |
| throws IOException, ClassNotFoundException { |
| super.fromData(in); |
| this.processorId = in.readInt(); |
| this.regionName = in.readUTF(); |
| this.timeoutMs = in.readInt(); |
| this.event = (CacheEvent) DataSerializer.readObject(in); |
| this.action = in.readInt(); |
| } |
| |
| @Override |
| public String toString() { |
| return "SearchLoadAndWriteProcessor.NetWriteRequestMessage " |
| + " for region \"" + this.regionName + "\", processorId " + processorId; |
| } |
| |
| /** Invoked on the node that has the object */ |
| @Override |
| protected void process(DistributionManager dm) { |
| long startTime = dm.cacheTimeMillis(); |
| int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE); |
| try { |
| GemFireCacheImpl gfc = (GemFireCacheImpl)CacheFactory.getInstance(dm.getSystem()); |
| LocalRegion region = (LocalRegion)gfc.getRegion(this.regionName); |
| if (region != null && region.isInitialized() && |
| (dm.cacheTimeMillis() - startTime < timeoutMs)) { |
| CacheWriter writer = region.basicGetWriter(); |
| EntryEventImpl entryEvtImpl = null; |
| RegionEventImpl regionEvtImpl = null; |
| if (this.event instanceof EntryEventImpl) { |
| entryEvtImpl = (EntryEventImpl) this.event; |
| entryEvtImpl.region = region; |
| Operation op = entryEvtImpl.getOperation(); |
| if (op == Operation.REPLACE) { |
| entryEvtImpl.setOperation(Operation.UPDATE); |
| } else if (op == Operation.PUT_IF_ABSENT) { |
| entryEvtImpl.setOperation(Operation.CREATE); |
| } else if (op == Operation.REMOVE) { |
| entryEvtImpl.setOperation(Operation.DESTROY); |
| } |
| // [bruce] even if this event was transmitted from another VM, its operation may have |
| // originated in this VM. PartitionedRegion.put() with a cacheWriter is one |
| // situation where this can occur |
| entryEvtImpl.setOriginRemote(event.getDistributedMember() == null |
| || !event.getDistributedMember().equals(dm.getDistributionManagerId())); |
| } |
| else if (this.event instanceof RegionEventImpl) { |
| regionEvtImpl = (RegionEventImpl) this.event; |
| regionEvtImpl.region = region; |
| regionEvtImpl.originRemote = true; |
| } |
| |
| if (writer != null) { |
| try { |
| switch(action) { |
| case BEFORECREATE: |
| writer.beforeCreate(entryEvtImpl); |
| break; |
| case BEFOREDESTROY: |
| writer.beforeDestroy(entryEvtImpl); |
| break; |
| case BEFOREUPDATE: |
| writer.beforeUpdate(entryEvtImpl); |
| break; |
| case BEFOREREGIONDESTROY: |
| writer.beforeRegionDestroy(regionEvtImpl); |
| break; |
| case BEFOREREGIONCLEAR: |
| writer.beforeRegionClear(regionEvtImpl); |
| break; |
| default: |
| break; |
| |
| } |
| NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), |
| processorId,dm,true,null,false); |
| } |
| catch(CacheWriterException cwe) { |
| NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), |
| processorId, dm,false,cwe,true); |
| } |
| catch(Exception e) { |
| NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), |
| processorId, dm,false,e,false); |
| } |
| |
| } |
| else { |
| NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), |
| processorId, dm,false, |
| new TryAgainException(LocalizedStrings.SearchLoadAndWriteProcessor_NO_CACHEWRITER_DEFINED_0.toLocalizedString()), true); |
| } |
| |
| } |
| else { |
| NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), |
| processorId, dm, false, |
| new TryAgainException(LocalizedStrings.SearchLoadAndWriteProcessor_TIMEOUT_EXPIRED_OR_REGION_NOT_READY_0.toLocalizedString()), true); |
| |
| } |
| } |
| catch (RegionDestroyedException rde) { |
| NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), |
| processorId, dm,false,null,false); |
| |
| } |
| catch (DistributedSystemDisconnectedException e) { |
| // shutdown condition |
| throw e; |
| } |
| catch (CancelException cce) { |
| dm.getCancelCriterion().checkCancelInProgress(cce); // TODO anyway to find the region or cache here? |
| NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), |
| processorId, dm,false,null,false); |
| } |
| catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } |
| catch (Throwable t){ |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), |
| processorId, dm,false, |
| new InternalGemFireException(LocalizedStrings.SearchLoadAndWriteProcessor_ERROR_PROCESSING_REQUEST.toLocalizedString(), t), true); |
| } |
| finally { |
| LocalRegion.setThreadInitLevelRequirement(oldLevel); |
| } |
| |
| |
| |
| } |
| |
| |
| } |
| |
| /********************* NetWriteReplyMessage *********************************/ |
| |
| /** |
| * The NetWriteReplyMessage is a reply to a NetWriteRequestMessage, and contains the |
| * success code or exception that is propagated back to the requestor |
| */ |
| public static final class NetWriteReplyMessage extends HighPriorityDistributionMessage { |
| |
| /** The gemfire id of the SearchLoadAndWrite object waiting for response */ |
| private int processorId; |
| |
| /** Indicates whether the write succeeded */ |
| private boolean netWriteSucceeded; |
| |
| |
| /** Exception thrown by remote node */ |
| private Exception e; |
| |
| /** Is the exception a cacheLoaderException */ |
| private boolean cacheWriterException; |
| |
| public NetWriteReplyMessage() {} |
| |
| public static void sendMessage(InternalDistributedMember recipient, |
| int processorId, |
| DistributionManager distributionManager, |
| boolean netWriteSucceeded, |
| Exception e, |
| boolean cacheWriterException) { |
| // create a message |
| NetWriteReplyMessage msg = new NetWriteReplyMessage(); |
| msg.initialize(processorId, netWriteSucceeded, e, cacheWriterException); |
| msg.setRecipient(recipient); |
| distributionManager.putOutgoing(msg); |
| } |
| |
| private void initialize(int procId, boolean netwriteSucceeded, |
| Exception except, boolean cacheWriterExcept) { |
| this.processorId = procId; |
| this.netWriteSucceeded = netwriteSucceeded; |
| this.e = except; |
| this.cacheWriterException = cacheWriterExcept; |
| } |
| |
| /** |
| * Invoked on the receiver - which, in this case, was the initiator of |
| * the NetWriteRequestMessage. This concludes the net write request, by |
| * communicating an object value. |
| */ |
| @Override |
| protected void process(DistributionManager dm) { |
| SearchLoadAndWriteProcessor processor = null; |
| processor = (SearchLoadAndWriteProcessor)getProcessorKeeper().retrieve(processorId); |
| if (processor == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("NetWriteReplyMessage() SearchLoadAndWriteProcessor no longer exists"); |
| } |
| return; |
| } |
| processor.incomingNetWriteReply(this.netWriteSucceeded, this.e, |
| this.cacheWriterException); |
| } |
| |
| public int getDSFID() { |
| return NET_WRITE_REPLY_MESSAGE; |
| } |
| |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| super.toData(out); |
| out.writeInt(this.processorId); |
| out.writeBoolean(this.netWriteSucceeded); |
| DataSerializer.writeObject(this.e,out); |
| out.writeBoolean(this.cacheWriterException); |
| } |
| |
| @Override |
| public void fromData(DataInput in) |
| throws IOException, ClassNotFoundException { |
| super.fromData(in); |
| this.processorId = in.readInt(); |
| this.netWriteSucceeded = in.readBoolean(); |
| this.e = (Exception) DataSerializer.readObject(in); |
| this.cacheWriterException = in.readBoolean(); |
| } |
| |
| @Override |
| public String toString() { |
| return "SearchLoadAndWriteProcessor.NetWriteReplyMessage for processorId " + |
| processorId ; |
| } |
| } |
| } |