| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache; |
| |
| import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Set; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.locks.Lock; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelCriterion; |
| import org.apache.geode.CancelException; |
| import org.apache.geode.DataSerializer; |
| import org.apache.geode.GemFireException; |
| import org.apache.geode.InternalGemFireException; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.cache.CacheEvent; |
| import org.apache.geode.cache.CacheLoader; |
| import org.apache.geode.cache.CacheLoaderException; |
| import org.apache.geode.cache.CacheWriter; |
| import org.apache.geode.cache.CacheWriterException; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.EntryEvent; |
| import org.apache.geode.cache.LoaderHelper; |
| import org.apache.geode.cache.Operation; |
| import org.apache.geode.cache.RegionAttributes; |
| import org.apache.geode.cache.RegionDestroyedException; |
| import org.apache.geode.cache.RegionEvent; |
| import org.apache.geode.cache.Scope; |
| import org.apache.geode.cache.TimeoutException; |
| import org.apache.geode.cache.util.ObjectSizer; |
| import org.apache.geode.distributed.DistributedSystemDisconnectedException; |
| import org.apache.geode.distributed.internal.ClusterDistributionManager; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.HighPriorityDistributionMessage; |
| import org.apache.geode.distributed.internal.MembershipListener; |
| import org.apache.geode.distributed.internal.PooledDistributionMessage; |
| import org.apache.geode.distributed.internal.ProcessorKeeper21; |
| import org.apache.geode.distributed.internal.ReplyProcessor21; |
| import org.apache.geode.distributed.internal.SerialDistributionMessage; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.InternalDataSerializer; |
| import org.apache.geode.internal.cache.LocalRegion.InitializationLevel; |
| import org.apache.geode.internal.cache.versions.DiskVersionTag; |
| import org.apache.geode.internal.cache.versions.VersionStamp; |
| import org.apache.geode.internal.cache.versions.VersionTag; |
| import org.apache.geode.internal.offheap.Releasable; |
| import org.apache.geode.internal.offheap.annotations.Released; |
| import org.apache.geode.internal.offheap.annotations.Retained; |
| import org.apache.geode.internal.serialization.DeserializationContext; |
| import org.apache.geode.internal.serialization.SerializationContext; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| import org.apache.geode.util.internal.GeodeGlossary; |
| |
| /** |
| * Implementation for distributed search, load and write operations in the GemFire system. Provides |
| * an API for doing netSearch, netLoad, netSearchAndLoad and netWrite. The class uses the |
| * DistributionAdvisor to route requests to the appropriate members. It also uses the |
| * DistributionAdvisor to get region scope and applies rules based on the scope. It makes uses of |
| * intelligent acceptors that allow netSearch to happen as a one phase operation at all |
| * times.netLoad happens as a one phase operation in all cases except where the scope is GLOBAL At |
| * the receiving end, the request is converted into an appropriate message whose process method |
| * responds to the request. |
| */ |
| public class SearchLoadAndWriteProcessor implements MembershipListener { |
| private static final Logger logger = LogService.getLogger(); |
| |
| public static final int SMALL_BLOB_SIZE = |
| Integer.getInteger("DistributionManager.OptimizedUpdateByteLimit", 2000); |
| |
| static final long RETRY_TIME = |
| Long.getLong(GeodeGlossary.GEMFIRE_PREFIX + "search-retry-interval", 2000); |
| |
| private volatile InternalDistributedMember selectedNode; |
| private boolean selectedNodeDead = false; |
| private int timeout; |
| private boolean netSearchDone = false; |
| private boolean netLoadDone = false; |
| private long lastModified = 0; |
| protected int processorId; |
| private Object aCallbackArgument; |
| private String regionName; |
| private Object key; |
| protected LocalRegion region; |
| private Object result = null; |
| private boolean isSerialized = false; // is result serialized? |
| private CacheDistributionAdvisor advisor = null; |
| protected Exception remoteException = null; |
| public DistributionManager distributionManager = null; |
| private volatile boolean requestInProgress = false; |
| private boolean remoteGetInProgress = false; |
| private volatile boolean authorative = false; |
| /** remoteLoadInProgress is volatile to make sure response threads see the current value */ |
| private volatile boolean remoteLoadInProgress = false; |
| @MakeNotStatic |
| private static final ProcessorKeeper21 processorKeeper = new ProcessorKeeper21(false); |
| // private static Set availableAcceptHelperSet = new HashSet(); |
| /** The members that haven't replied yet */ |
| // changed pendingResponders to synchronized Set to fix bug 38972 |
| private final Set pendingResponders = Collections.synchronizedSet(new HashSet()); |
| private List responseQueue = null; |
| private boolean netWriteSucceeded = false; |
| private int remainingTimeout = 0; |
| private long startTimeSnapShot = 0; |
| private long endTimeSnapShot = 0; |
| |
| private boolean netSearch = false; |
| private boolean netLoad = false; |
| private boolean attemptedLocalLoad = false; // added for bug 39738 |
| private boolean localLoad = false; |
| private boolean localWrite = false; |
| private boolean netWrite = false; |
| |
| private final Object membersLock = new Object(); |
| |
| private ArrayList<InternalDistributedMember> departedMembers; |
| |
| private Lock lock = null; // if non-null, then needs to be unlocked in release |
| |
| static final int NETSEARCH = 0; |
| static final int NETLOAD = 1; |
| static final int NETWRITE = 2; |
| |
| static final int BEFORECREATE = 0; |
| static final int BEFOREDESTROY = 1; |
| static final int BEFOREUPDATE = 2; |
| static final int BEFOREREGIONDESTROY = 3; |
| static final int BEFOREREGIONCLEAR = 4; |
| |
| |
| /************** Public Methods ************************/ |
| |
| Object doNetSearch() throws TimeoutException { |
| |
| resetResults(); |
| RegionAttributes attrs = region.getAttributes(); |
| this.requestInProgress = true; |
| Scope scope = attrs.getScope(); |
| Assert.assertTrue(scope != Scope.LOCAL); |
| netSearchForBlob(); |
| this.requestInProgress = false; |
| return this.result; |
| } |
| |
| |
| |
| void doSearchAndLoad(EntryEventImpl event, TXStateInterface txState, Object localValue, |
| boolean preferCD) throws CacheLoaderException, TimeoutException { |
| this.requestInProgress = true; |
| RegionAttributes attrs = region.getAttributes(); |
| Scope scope = attrs.getScope(); |
| CacheLoader loader = ((AbstractRegion) region).basicGetLoader(); |
| if (scope.isLocal()) { |
| Object obj = doLocalLoad(loader, false, preferCD); |
| event.setNewValue(obj); |
| } else { |
| searchAndLoad(event, txState, localValue, preferCD); |
| } |
| this.requestInProgress = false; |
| if (this.netSearch) { |
| if (event.getOperation().isCreate()) { |
| event.setOperation(Operation.SEARCH_CREATE); |
| } else { |
| event.setOperation(Operation.SEARCH_UPDATE); |
| } |
| } else if (this.netLoad) { |
| if (event.getOperation().isCreate()) { |
| event.setOperation(Operation.NET_LOAD_CREATE); |
| } else { |
| event.setOperation(Operation.NET_LOAD_UPDATE); |
| } |
| } else if (this.localLoad) { |
| if (event.getOperation().isCreate()) { |
| event.setOperation(Operation.LOCAL_LOAD_CREATE); |
| } else { |
| event.setOperation(Operation.LOCAL_LOAD_UPDATE); |
| } |
| } |
| } |
| |
| /** return true if a CacheWriter was actually invoked */ |
| boolean doNetWrite(CacheEvent event, Set netWriteRecipients, CacheWriter localWriter, int paction) |
| throws CacheWriterException, TimeoutException { |
| |
| int action = paction; |
| this.requestInProgress = true; |
| Scope scope = this.region.getScope(); |
| if (localWriter != null) { |
| doLocalWrite(localWriter, event, action); |
| this.requestInProgress = false; |
| return true; |
| } |
| if (scope == Scope.LOCAL && (region.getPartitionAttributes() == null)) { |
| return false; |
| } |
| @Released |
| CacheEvent listenerEvent = getEventForListener(event); |
| try { |
| if (action == BEFOREUPDATE && listenerEvent.getOperation().isCreate()) { |
| action = BEFORECREATE; |
| } |
| boolean cacheWrote = netWrite(listenerEvent, action, netWriteRecipients); |
| this.requestInProgress = false; |
| return cacheWrote; |
| } finally { |
| if (event != listenerEvent) { |
| if (listenerEvent instanceof EntryEventImpl) { |
| ((Releasable) listenerEvent).release(); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void memberJoined(DistributionManager distributionManager, InternalDistributedMember id) { |
| // Ignore - if they just joined, they don't have what we want |
| } |
| |
| @Override |
| public void memberSuspect(DistributionManager distributionManager, InternalDistributedMember id, |
| InternalDistributedMember whoSuspected, String reason) {} |
| |
| @Override |
| public void quorumLost(DistributionManager distributionManager, |
| Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {} |
| |
| @Override |
| public void memberDeparted(DistributionManager distributionManager, |
| final InternalDistributedMember id, final boolean crashed) { |
| |
| synchronized (membersLock) { |
| pendingResponders.remove(id); |
| } |
| synchronized (this) { |
| if (id.equals(selectedNode) && (this.requestInProgress) && (this.remoteGetInProgress)) { |
| if (departedMembers == null) { |
| departedMembers = new ArrayList<InternalDistributedMember>(); |
| } |
| departedMembers.add(id); |
| selectedNode = null; |
| selectedNodeDead = true; |
| computeRemainingTimeout(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("{}: processing loss of member {}", this, id); |
| } |
| this.lastNotifySpot = 3; |
| notifyAll(); // signal the waiter; we are not done; but we need the waiter to call |
| // sendNetSearchRequest |
| } |
| if (responseQueue != null) { |
| responseQueue.remove(id); |
| } |
| checkIfDone(); |
| } |
| } |
| |
| int getProcessorId() { |
| return this.processorId; |
| } |
| |
| synchronized void checkIfDone() { |
| if (!this.remoteGetInProgress && this.pendingResponders.isEmpty()) { |
| // Synchronize in case a different response/reply is still |
| // in progress, and it's the one thats got the goods (bug 28741) |
| signalDone(); |
| } |
| |
| |
| } |
| |
| synchronized void signalDone() { |
| this.requestInProgress = false; |
| this.lastNotifySpot = 1; |
| notifyAll(); |
| } |
| |
| synchronized void signalTimedOut() { |
| this.lastNotifySpot = 2; |
| notifyAll(); |
| } |
| |
| private volatile int lastNotifySpot = 0; |
| |
| private VersionTag versionTag; |
| |
| synchronized void nackResponseComplete() { |
| /* |
| * if (this.requestInProgress && currentHelper != null && optimizer != null && |
| * optimizer.allRepliesReceived(currentHelper.nackServerChannel)) { this.requestInProgress = |
| * false; signalDone(); } |
| */ |
| |
| } |
| |
| static ProcessorKeeper21 getProcessorKeeper() { |
| return processorKeeper; |
| } |
| |
| boolean isNetSearch() { |
| return netSearch; |
| } |
| |
| boolean isNetLoad() { |
| return netLoad; |
| } |
| |
| boolean isLocalLoad() { |
| return localLoad; |
| } |
| |
| boolean isNetWrite() { |
| return netWrite; |
| } |
| |
| boolean isLocalWrite() { |
| return localWrite; |
| } |
| |
| long getLastModified() { |
| return lastModified; |
| } |
| |
| boolean resultIsSerialized() { |
| return this.isSerialized; |
| } |
| |
| static SearchLoadAndWriteProcessor getProcessor() { |
| SearchLoadAndWriteProcessor processor = new SearchLoadAndWriteProcessor(); |
| processor.processorId = getProcessorKeeper().put(processor); |
| return processor; |
| } |
| |
| void release() { |
| try { |
| if (this.lock != null) { |
| try { |
| this.lock.unlock(); |
| } catch (CancelException ignore) { |
| } |
| this.lock = null; |
| } |
| } finally { |
| try { |
| if (this.advisor != null) { |
| this.advisor.removeMembershipListener(this); |
| } |
| } catch (IllegalArgumentException ignore) { |
| } finally { |
| getProcessorKeeper().remove(this.processorId); |
| } |
| } |
| } |
| |
| void remove() { |
| getProcessorKeeper().remove(this.processorId); |
| } |
| |
| void initialize(LocalRegion theRegion, Object theKey, Object theCallbackArg) { |
| this.region = theRegion; |
| this.regionName = theRegion.getFullPath(); |
| this.key = theKey; |
| this.aCallbackArgument = theCallbackArg; |
| RegionAttributes attrs = theRegion.getAttributes(); |
| Scope scope = attrs.getScope(); |
| if (scope.isDistributed()) { |
| this.advisor = ((CacheDistributionAdvisee) this.region).getCacheDistributionAdvisor(); |
| this.distributionManager = theRegion.getDistributionManager(); |
| this.timeout = getSearchTimeout(); |
| this.advisor.addMembershipListener(this); |
| } |
| } |
| |
| void setKey(Object key) { |
| this.key = key; |
| } |
| |
| protected void setSelectedNode(InternalDistributedMember selectedNode) { |
| this.selectedNode = selectedNode; |
| this.selectedNodeDead = false; |
| } |
| |
| protected int getTimeout() { |
| return this.timeout; |
| } |
| |
| protected Object getKey() { |
| return this.key; |
| } |
| |
| InternalDistributedMember getSelectedNode() { |
| return this.selectedNode; |
| } |
| |
| /** |
| * Even though SearchLoadAndWriteProcessor may be in invoked in the context of a local region, |
| * most of the services it provides are relevant to distribution only. The 3 services it provides |
| * are netSearch, netLoad, netWrite |
| */ |
| private SearchLoadAndWriteProcessor() { |
| resetResults(); |
| this.pendingResponders.clear(); |
| this.attemptedLocalLoad = false; |
| this.netSearchDone = false; |
| this.isSerialized = false; |
| this.result = null; |
| this.key = null; |
| this.requestInProgress = false; |
| this.netWriteSucceeded = false; |
| this.remoteGetInProgress = false; |
| this.responseQueue = null; |
| } |
| |
| /** |
| * If we have a local cache loader and the region is not global, then invoke the loader If the |
| * region is local, or the result is non-null, then return whatever the loader returned do a |
| * netSearch amongst selected peers if netSearch returns a blob, deserialize the blob and return |
| * that as the result netSearch failed, so all we can do at this point is do a load return result |
| * from load |
| * |
| */ |
| private void searchAndLoad(EntryEventImpl event, TXStateInterface txState, Object localValue, |
| boolean preferCD) throws CacheLoaderException, TimeoutException { |
| |
| RegionAttributes attrs = region.getAttributes(); |
| Scope scope = attrs.getScope(); |
| DataPolicy dataPolicy = attrs.getDataPolicy(); |
| |
| if (txState != null) { |
| TXEntryState tx = txState.txReadEntry(event.getKeyInfo(), region, false, |
| true/* create txEntry is absent */); |
| if (tx != null) { |
| if (tx.noValueInSystem()) { |
| // If the tx view has it invalid or destroyed everywhere |
| // then don't do a netsearch. We want to see the |
| // transactional view. |
| load(event, preferCD); |
| return; |
| } |
| } |
| } |
| |
| // if mirrored then we can optimize by skipping netsearch in some cases, |
| // and if also skip netSearch if we find an INVALID token since we |
| // know it was distributed. (Otherwise it would be a LOCAL_INVALID token) |
| { |
| if (localValue == Token.INVALID || dataPolicy.withReplication()) { |
| load(event, preferCD); |
| return; |
| } |
| } |
| |
| Object obj = null; |
| if (!scope.isGlobal()) { |
| // copy into local var to prevent race condition |
| CacheLoader loader = ((AbstractRegion) region).basicGetLoader(); |
| if (loader != null) { |
| obj = doLocalLoad(loader, true, preferCD); |
| Assert.assertTrue(obj != Token.INVALID && obj != Token.LOCAL_INVALID); |
| event.setNewValue(obj); |
| this.isSerialized = false; |
| this.result = obj; |
| return; |
| } |
| if (scope.isLocal()) { |
| return; |
| } |
| } |
| netSearchForBlob(); |
| if (this.result != null) { |
| Assert.assertTrue(this.result != Token.INVALID && this.result != Token.LOCAL_INVALID); |
| if (this.isSerialized) { |
| event.setSerializedNewValue((byte[]) this.result); |
| } else { |
| event.setNewValue(this.result); |
| } |
| event.setVersionTag(this.versionTag); |
| return; |
| } |
| |
| load(event, preferCD); |
| } |
| |
| /** perform a net-search, setting this.result to the object found in the search */ |
| private void netSearchForBlob() throws TimeoutException { |
| if (this.netSearchDone) |
| return; |
| this.netSearchDone = true; |
| CachePerfStats stats = region.getCachePerfStats(); |
| long start = 0; |
| Set sendSet = null; |
| |
| this.result = null; |
| RegionAttributes attrs = region.getAttributes(); |
| // Object aCallbackArgument = null; |
| this.requestInProgress = true; |
| this.selectedNodeDead = false; |
| initRemainingTimeout(); |
| start = stats.startNetsearch(); |
| try { |
| List<InternalDistributedMember> replicates = |
| new ArrayList(advisor.adviseInitializedReplicates()); |
| if (replicates.size() > 1) { |
| Collections.shuffle(replicates); |
| } |
| for (InternalDistributedMember replicate : replicates) { |
| synchronized (this.pendingResponders) { |
| this.pendingResponders.clear(); |
| } |
| |
| synchronized (this) { |
| this.requestInProgress = true; |
| this.remoteGetInProgress = true; |
| setSelectedNode(replicate); |
| this.lastNotifySpot = 0; |
| |
| sendValueRequest(replicate); |
| waitForObject2(this.remainingTimeout); |
| |
| if (this.authorative) { |
| if (this.result != null) { |
| this.netSearch = true; |
| } |
| return; |
| } else { |
| // clear anything that might have been set by our query. |
| this.selectedNode = null; |
| this.selectedNodeDead = false; |
| this.lastNotifySpot = 0; |
| this.result = null; |
| } |
| } |
| } |
| synchronized (membersLock) { |
| Set recipients = this.advisor.adviseNetSearch(); |
| if (recipients.isEmpty()) { |
| return; |
| } |
| ArrayList list = new ArrayList(recipients); |
| Collections.shuffle(list); |
| sendSet = new HashSet(list); |
| synchronized (this.pendingResponders) { |
| this.pendingResponders.clear(); |
| this.pendingResponders.addAll(list); |
| } |
| } |
| |
| boolean useMulticast = region.getMulticastEnabled() && (region instanceof DistributedRegion) |
| && ((DistributedRegion) region).getSystem().getConfig().getMcastPort() != 0; |
| |
| // moved outside the sync to fix bug 39458 |
| QueryMessage.sendMessage(this, this.regionName, this.key, useMulticast, sendSet, |
| this.remainingTimeout, attrs.getEntryTimeToLive().getTimeout(), |
| attrs.getEntryIdleTimeout().getTimeout()); |
| |
| synchronized (this) { |
| // moved this send back into sync to fix bug 37132 |
| // QueryMessage.sendMessage(this, this.regionName,this.key,useMulticast, |
| // sendSet,this.remainingTimeout , |
| // attrs.getEntryTimeToLive().getTimeout(), |
| // attrs.getEntryIdleTimeout().getTimeout()); |
| boolean done = false; |
| do { |
| waitForObject2(this.remainingTimeout); |
| if (this.selectedNodeDead && remoteGetInProgress) { |
| sendNetSearchRequest(); |
| } else |
| done = true; |
| } while (!done); |
| |
| if (this.result != null) { |
| this.netSearch = true; |
| } |
| return; |
| } |
| } finally { |
| stats.endNetsearch(start); |
| } |
| } |
| |
| private void load(EntryEventImpl event, boolean preferCD) |
| throws CacheLoaderException, TimeoutException { |
| Object obj = null; |
| RegionAttributes attrs = this.region.getAttributes(); |
| Scope scope = attrs.getScope(); |
| CacheLoader loader = ((AbstractRegion) region).basicGetLoader(); |
| Assert.assertTrue(scope.isDistributed()); |
| |
| if ((loader != null) && (!scope.isGlobal())) { |
| obj = doLocalLoad(loader, false, preferCD); |
| event.setNewValue(obj); |
| Assert.assertTrue(obj != Token.INVALID && obj != Token.LOCAL_INVALID); |
| return; |
| } |
| |
| if (scope.isGlobal()) { |
| Assert.assertTrue(this.lock == null); |
| Set loadCandidatesSet = advisor.adviseNetLoad(); |
| if ((loader == null) && (loadCandidatesSet.isEmpty())) { |
| // no one has a data Loader. No point getting a lock |
| return; |
| } |
| |
| this.lock = region.getDistributedLock(this.key); |
| boolean locked = false; |
| try { |
| final CancelCriterion stopper = region.getCancelCriterion(); |
| for (;;) { |
| stopper.checkCancelInProgress(null); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| locked = this.lock.tryLock(region.getCache().getLockTimeout(), TimeUnit.SECONDS); |
| if (!locked) { |
| throw new TimeoutException( |
| String.format("Timed out locking %s before load", |
| key)); |
| } |
| break; |
| } catch (InterruptedException ignore) { |
| interrupted = true; |
| region.getCancelCriterion().checkCancelInProgress(null); |
| // continue; |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| if (loader == null) { |
| this.localLoad = false; |
| if (scope.isDistributed()) { |
| this.isSerialized = false; |
| obj = doNetLoad(); |
| Assert.assertTrue(obj != Token.INVALID && obj != Token.LOCAL_INVALID); |
| if (this.isSerialized && obj != null) { |
| event.setSerializedNewValue((byte[]) obj); |
| } else { |
| event.setNewValue(obj); |
| } |
| } |
| } else { |
| obj = doLocalLoad(loader, false, preferCD); |
| Assert.assertTrue(obj != Token.INVALID && obj != Token.LOCAL_INVALID); |
| event.setNewValue(obj); |
| } |
| return; |
| } finally { |
| // The lock will not actually be released until release is |
| // called on this processor |
| if (!locked) |
| this.lock = null; |
| } |
| } |
| if (scope.isDistributed()) { |
| // long start = System.currentTimeMillis(); |
| obj = doNetLoad(); |
| if (this.isSerialized && obj != null) { |
| event.setSerializedNewValue((byte[]) obj); |
| } else { |
| Assert.assertTrue(obj != Token.INVALID && obj != Token.LOCAL_INVALID); |
| event.setNewValue(obj); |
| } |
| // long end = System.currentTimeMillis(); |
| } |
| } |
| |
| Object doNetLoad() throws CacheLoaderException, TimeoutException { |
| if (this.netLoadDone) |
| return null; |
| this.netLoadDone = true; |
| if (advisor != null) { |
| Set loadCandidatesSet = advisor.adviseNetLoad(); |
| if (loadCandidatesSet.isEmpty()) { |
| return null; |
| } |
| CachePerfStats stats = region.getCachePerfStats(); |
| long start = stats.startNetload(); |
| ArrayList list = new ArrayList(loadCandidatesSet); |
| Collections.shuffle(list); |
| InternalDistributedMember[] loadCandidates = |
| (InternalDistributedMember[]) list.toArray(new InternalDistributedMember[0]); |
| initRemainingTimeout(); |
| |
| RegionAttributes attrs = region.getAttributes(); |
| int index = 0; |
| boolean stayInLoop = false; // never set to true! |
| this.remoteLoadInProgress = true; |
| try { |
| do { // the only time this loop repeats is when continue is called |
| InternalDistributedMember next = loadCandidates[index++]; |
| setSelectedNode(next); |
| this.lastNotifySpot = 0; |
| this.requestInProgress = true; |
| if (this.remainingTimeout <= 0) { // @todo this looks wrong; why not a timeout exception? |
| break; |
| } |
| this.remoteException = null; |
| NetLoadRequestMessage.sendMessage(this, this.regionName, this.key, this.aCallbackArgument, |
| next, this.remainingTimeout, attrs.getEntryTimeToLive().getTimeout(), |
| attrs.getEntryIdleTimeout().getTimeout()); |
| waitForObject2(this.remainingTimeout); |
| if (this.remoteException == null) { |
| if (!this.requestInProgress) { |
| // note even if result is null we are done; see bug 39738 |
| this.localLoad = false; |
| if (this.result != null) { |
| this.netLoad = true; |
| } |
| return this.result; |
| } else { |
| // Why does the following test for selectedNodeDead? |
| // Seems like this will cause us to quit trying netLoad |
| // even if we don't have a result yet and have not tried everyone. |
| if ((this.selectedNodeDead) && (index < loadCandidates.length)) { |
| continue; |
| } |
| // otherwise we are done |
| } |
| } else { |
| Throwable cause; |
| if (this.remoteException instanceof TryAgainException) { |
| if (index < loadCandidates.length) { |
| continue; |
| } else { |
| break; |
| } |
| } |
| if (this.remoteException instanceof CacheLoaderException) { |
| cause = this.remoteException.getCause(); |
| } else { |
| cause = this.remoteException; |
| } |
| throw new CacheLoaderException( |
| String.format("While invoking a remote netLoad: %s", |
| cause), |
| cause); |
| } |
| |
| } while (stayInLoop); |
| } finally { |
| stats.endNetload(start); |
| } |
| |
| } |
| return null; |
| } |
| |
| /** |
| * This exception is just used in the class to tell the caller that it should try again. |
| * InternalGemFireException used to be used for this which seems dangerous. |
| */ |
| private static class TryAgainException extends GemFireException { |
| |
| ////////////////////// Constructors ////////////////////// |
| |
| public TryAgainException() { |
| super(); |
| } |
| |
| /** |
| * Creates a new <code>TryAgainException</code>. |
| */ |
| public TryAgainException(String message) { |
| super(message); |
| } |
| } |
| |
| |
| private Object doLocalLoad(CacheLoader loader, boolean netSearchAllowed, boolean preferCD) |
| throws CacheLoaderException { |
| Object obj = null; |
| if (loader != null && !this.attemptedLocalLoad) { |
| this.attemptedLocalLoad = true; |
| CachePerfStats stats = region.getCachePerfStats(); |
| LoaderHelper loaderHelper = this.region.loaderHelperFactory.createLoaderHelper(this.key, |
| this.aCallbackArgument, netSearchAllowed, true /* netLoadAllowed */, this); |
| long statStart = stats.startLoad(); |
| try { |
| obj = loader.load(loaderHelper); |
| obj = this.region.getCache().convertPdxInstanceIfNeeded(obj, preferCD); |
| } finally { |
| stats.endLoad(statStart); |
| } |
| if (obj != null) { |
| this.localLoad = true; |
| } |
| } |
| return obj; |
| } |
| |
| /** |
| * Returns an event for listener notification. The event's operation may be altered to conform to |
| * the ConcurrentMap implementation specification. If the returned value is not == to the event |
| * parameter then the caller is responsible for releasing it. |
| * |
| * @param event the original event |
| * @return the original event or a new event having a change in operation |
| */ |
| @Retained |
| private CacheEvent getEventForListener(CacheEvent event) { |
| Operation op = event.getOperation(); |
| if (!op.isEntry()) { |
| return event; |
| } else { |
| EntryEventImpl r = (EntryEventImpl) event; |
| @Retained |
| EntryEventImpl result = r; |
| if (r.isSingleHop()) { |
| // fix for bug #46130 - origin remote incorrect for one-hop operation in receiver |
| result = new EntryEventImpl(r); |
| result.setOriginRemote(true); |
| // if this is from a non-replicate and it's not in a tx it should be seen as a Create |
| // because that's what the sender would use in notifying listeners. bug #46955 |
| if (result.getOperation().isUpdate() && (result.getTransactionId() == null)) { |
| result.makeCreate(); |
| } |
| } |
| if (op == Operation.REPLACE) { |
| if (result == r) |
| result = new EntryEventImpl(r); |
| result.setOperation(Operation.UPDATE); |
| } else if (op == Operation.PUT_IF_ABSENT) { |
| if (result == r) |
| result = new EntryEventImpl(r); |
| result.setOperation(Operation.CREATE); |
| } else if (op == Operation.REMOVE) { |
| if (result == r) |
| result = new EntryEventImpl(r); |
| result.setOperation(Operation.DESTROY); |
| } |
| return result; |
| } |
| } |
| |
| private boolean doLocalWrite(CacheWriter writer, CacheEvent pevent, int paction) |
| throws CacheWriterException { |
| // Return if the inhibit all notifications flag is set |
| if (pevent instanceof EntryEventImpl) { |
| if (((EntryEventImpl) pevent).inhibitAllNotifications()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Notification inhibited for key {}", pevent); |
| } |
| return false; |
| } |
| } |
| @Released |
| CacheEvent event = getEventForListener(pevent); |
| |
| int action = paction; |
| if (event.getOperation().isCreate() && action == BEFOREUPDATE) { |
| action = BEFORECREATE; |
| } |
| if (event instanceof EntryEventImpl) { |
| ((EntryEventImpl) event).setReadOldValueFromDisk(true); |
| } |
| try { |
| switch (action) { |
| case BEFORECREATE: |
| writer.beforeCreate((EntryEvent) event); |
| break; |
| case BEFOREDESTROY: |
| writer.beforeDestroy((EntryEvent) event); |
| break; |
| case BEFOREUPDATE: |
| writer.beforeUpdate((EntryEvent) event); |
| break; |
| case BEFOREREGIONDESTROY: |
| writer.beforeRegionDestroy((RegionEvent) event); |
| break; |
| case BEFOREREGIONCLEAR: |
| writer.beforeRegionClear((RegionEvent) event); |
| break; |
| default: |
| break; |
| |
| } |
| } finally { |
| if (event instanceof EntryEventImpl) { |
| ((EntryEventImpl) event).setReadOldValueFromDisk(false); |
| } |
| if (event != pevent) { |
| if (event instanceof EntryEventImpl) { |
| ((Releasable) event).release(); |
| } |
| } |
| } |
| this.localWrite = true; |
| return true; |
| |
| } |
| |
| /** Return true if cache writer was invoked */ |
| private boolean netWrite(CacheEvent event, int action, Set writeCandidateSet) |
| throws CacheWriterException, TimeoutException { |
| if (writeCandidateSet == null || writeCandidateSet.isEmpty()) { |
| return false; |
| } |
| ArrayList list = new ArrayList(writeCandidateSet); |
| Collections.shuffle(list); |
| InternalDistributedMember[] writeCandidates = |
| (InternalDistributedMember[]) list.toArray(new InternalDistributedMember[0]); |
| initRemainingTimeout(); |
| int index = 0; |
| do { |
| InternalDistributedMember next = writeCandidates[index++]; |
| Set set = new HashSet(); |
| set.add(next); |
| this.netWriteSucceeded = false; |
| this.requestInProgress = true; |
| this.remoteException = null; |
| NetWriteRequestMessage.sendMessage(this, this.regionName, this.remainingTimeout, event, set, |
| action); |
| if (this.remainingTimeout <= 0) { // @todo: should this throw a timeout exception? |
| break; |
| } |
| waitForObject2(this.remainingTimeout); |
| if (this.netWriteSucceeded) { |
| this.netWrite = true; |
| break; |
| } |
| if (this.remoteException != null) { |
| Throwable cause; |
| if (this.remoteException instanceof TryAgainException) { |
| if (index < writeCandidates.length) { |
| continue; |
| } else { |
| break; |
| } |
| } |
| if (this.remoteException instanceof CacheWriterException |
| && this.remoteException.getCause() != null) { |
| cause = this.remoteException.getCause(); |
| } else { |
| cause = this.remoteException; |
| } |
| throw new CacheWriterException( |
| String.format("While invoking a remote netWrite: %s", |
| cause), |
| cause); |
| } |
| } while (index < writeCandidates.length); |
| |
| return this.netWriteSucceeded; |
| } |
| |
| |
| /** |
| * process a QueryMessage netsearch response |
| * |
| * @param versionTag TODO |
| */ |
| protected synchronized void incomingResponse(Object obj, long lastModifiedTime, boolean isPresent, |
| boolean serialized, final boolean requestorTimedOut, final InternalDistributedMember sender, |
| ClusterDistributionManager dm, VersionTag versionTag) { |
| // NOTE: keep this method efficient since it is optimized |
| // by executing it in the p2p reader. |
| // This is done with this line in DistributionMessage.java: |
| // || c.equals(SearchLoadAndWriteProcessor.ResponseMessage.class) |
| |
| // bug 35266 - don't pay attention to late breaking "get" responses |
| if (this.remoteLoadInProgress) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Ignoring netsearch response from {} because we're now doing a netload", |
| sender); |
| } |
| return; |
| } |
| |
| if (this.pendingResponders.isEmpty()) { |
| return; |
| } |
| if (!this.pendingResponders.remove(sender)) { |
| return; |
| } else { |
| if (logger.isDebugEnabled()) { |
| // only log the message if it's from a recipient we're waiting for. |
| // it could have been multicast to all members, which would give us |
| // one response per member |
| logger.debug( |
| "Processing response for processorId={}, isPresent is {}, sender is {}, key is {}, value is {}, version is {}", |
| this.processorId, isPresent, sender, this.key, serialized, versionTag); |
| } |
| } |
| |
| // Another thread got a response and that contained the value. |
| // Ignore this response. |
| if (this.result != null) { |
| return; |
| } |
| |
| if (isPresent) { |
| if (obj != null) { |
| Assert.assertTrue(obj != Token.INVALID && obj != Token.LOCAL_INVALID); |
| synchronized (this) { |
| this.result = obj; |
| this.lastModified = lastModifiedTime; |
| this.isSerialized = serialized; |
| this.versionTag = versionTag; |
| signalDone(); |
| return; |
| } |
| } else { |
| if (!remoteGetInProgress) { |
| // send a message to this responder asking for the value |
| // do this on the waiting pool in case the send blocks |
| try { |
| dm.getExecutors().getWaitingThreadPool().execute(new Runnable() { |
| @Override |
| public void run() { |
| sendValueRequest(sender); |
| } |
| }); |
| // need to do this here before releasing sync to fix bug 37132 |
| this.requestInProgress = true; |
| this.remoteGetInProgress = true; |
| setSelectedNode(sender); |
| return; // sendValueRequest does the rest of the work |
| } catch (RejectedExecutionException ignore) { |
| // just fall through since we must be shutting down. |
| } |
| } |
| if (responseQueue == null) |
| responseQueue = new LinkedList(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Saving isPresent response, requestInProgress {}", sender); |
| } |
| responseQueue.add(sender); |
| } |
| } |
| if (requestorTimedOut) { |
| signalTimedOut(); |
| } |
| |
| boolean endRequest = false; |
| if (this.pendingResponders.isEmpty() && (!remoteGetInProgress)) { |
| endRequest = true; |
| } |
| if (endRequest) { |
| signalDone(); |
| } |
| } |
| |
| protected synchronized void sendValueRequest(final InternalDistributedMember sender) { |
| // send a message to this responder asking for the value |
| // do this on the waiting pool in case the send blocks |
| // Always attempt to send the message to fix bug 37149 |
| RegionAttributes attrs = this.region.getAttributes(); |
| NetSearchRequestMessage.sendMessage(this, this.regionName, this.key, sender, |
| this.remainingTimeout, attrs.getEntryTimeToLive().getTimeout(), |
| attrs.getEntryIdleTimeout().getTimeout()); |
| // if it turns out that we can't send a message to this member then |
| // our membership listener should save the day and schedule a send |
| // to someone else or give up and report a failed search. |
| } |
| |
| // @todo creation times need to be handled properly |
| /** |
| * This is the response from the accepted responder. Grab the result and store it.Unlike 2.0 where |
| * the the response was a 2 phase operation, here it is a single phase operation. |
| */ |
| protected void incomingNetLoadReply(Object obj, long lastModifiedTime, Object callbackArg, |
| Exception e, boolean serialized, boolean requestorTimedOut) { |
| synchronized (this) { |
| if (requestorTimedOut) { |
| signalTimedOut(); |
| return; |
| } |
| this.result = obj; |
| this.lastModified = lastModifiedTime; |
| this.remoteException = e; |
| this.aCallbackArgument = callbackArg; |
| computeRemainingTimeout(); |
| this.isSerialized = serialized; |
| signalDone(); |
| } |
| |
| |
| } |
| |
| @SuppressWarnings("hiding") |
| protected synchronized void incomingNetSearchReply(byte[] value, long lastModifiedTime, |
| boolean serialized, boolean requestorTimedOut, boolean authorative, VersionTag versionTag, |
| InternalDistributedMember responder) { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| if (departedMembers != null && departedMembers.contains(responder)) { |
| if (isDebugEnabled) { |
| logger.debug("ignore the reply received from a departed member"); |
| } |
| return; |
| } |
| |
| if (this.requestInProgress) { |
| if (requestorTimedOut) { |
| // Force a timeout exception. |
| if (isDebugEnabled) { |
| logger.debug("incomingNetSearchReply() - requestorTimedOut {}", this); |
| } |
| signalTimedOut(); |
| } |
| computeRemainingTimeout(); |
| if (value != null || authorative) { |
| synchronized (this) { |
| this.result = value; |
| this.lastModified = lastModifiedTime; |
| this.isSerialized = serialized; |
| this.remoteGetInProgress = false; |
| this.authorative = authorative; |
| this.versionTag = versionTag; |
| if (isDebugEnabled) { |
| logger.debug("incomingNetSearchReply() - got obj {}", this); |
| } |
| signalDone(); |
| } |
| } else if (this.remainingTimeout <= 0) { |
| this.remoteGetInProgress = false; |
| if (isDebugEnabled) { |
| logger.debug("incomingNetSearchReply() - null obj, no more time {}", this); |
| } |
| signalDone(); // @todo: is this a bug? should call signalTimedOut? |
| } else { |
| if (isDebugEnabled) { |
| logger.debug("incomingNetSearchReply() - null obj, sendNetSearchRequest {}", this); |
| } |
| sendNetSearchRequest(); |
| } |
| } else { |
| if (isDebugEnabled) { |
| logger.debug("incomingNetSearchReply() - requestInProgress is false {}", this); |
| } |
| // should we checkIfDone? |
| // sure why not? |
| checkIfDone(); |
| } |
| } |
| |
| /** |
| * Return the next responder on the responseQueue, or null if empty |
| */ |
| private InternalDistributedMember nextAppropriateResponder() { |
| if (responseQueue != null && responseQueue.size() > 0) { |
| return (InternalDistributedMember) responseQueue.remove(0); |
| } else { |
| return null; |
| } |
| } |
| |
| private synchronized void sendNetSearchRequest() { |
| InternalDistributedMember nextResponder = nextAppropriateResponder(); |
| if (nextResponder != null) { |
| // Make a request to the next responder in the queue |
| RegionAttributes attrs = this.region.getAttributes(); |
| setSelectedNode(nextResponder); |
| this.requestInProgress = true; |
| this.remoteGetInProgress = true; |
| NetSearchRequestMessage.sendMessage(this, this.regionName, this.key, nextResponder, |
| this.remainingTimeout, attrs.getEntryTimeToLive().getTimeout(), |
| attrs.getEntryIdleTimeout().getTimeout()); |
| |
| } else { |
| this.remoteGetInProgress = false; |
| checkIfDone(); |
| |
| } |
| } |
| |
| /** |
| * This is the response from the accepted responder. |
| */ |
| protected void incomingNetWriteReply(boolean netWriteSuccessful, Exception e, boolean exe) { |
| synchronized (this) { |
| this.remoteException = e; |
| this.netWriteSucceeded = netWriteSuccessful; |
| computeRemainingTimeout(); |
| signalDone(); |
| } |
| } |
| |
| private synchronized void initRemainingTimeout() { |
| this.remainingTimeout = this.timeout * 1000; |
| this.startTimeSnapShot = this.distributionManager.cacheTimeMillis(); |
| } |
| |
| private synchronized void computeRemainingTimeout() { |
| if (this.startTimeSnapShot > 0) { // @todo this should be an assertion |
| this.endTimeSnapShot = this.distributionManager.cacheTimeMillis(); |
| long delta = this.endTimeSnapShot - this.startTimeSnapShot; |
| if (delta > 0) { |
| this.remainingTimeout -= delta; |
| } |
| this.startTimeSnapShot = this.endTimeSnapShot; |
| } |
| } |
| |
| private synchronized void waitForObject2(final int timeoutMs) throws TimeoutException { |
| if (this.requestInProgress) { |
| try { |
| final DistributionManager dm = |
| this.region.getCache().getInternalDistributedSystem().getDistributionManager(); |
| long waitTimeMs = timeoutMs; |
| final long endTime = System.currentTimeMillis() + waitTimeMs; |
| for (;;) { |
| if (!this.requestInProgress) { |
| return; |
| } |
| if (waitTimeMs <= 0) { |
| throw new TimeoutException( |
| String.format( |
| "Timed out while doing netsearch/netload/netwrite processorId= %s Key is %s", |
| new Object[] {this.processorId, this.key})); |
| } |
| |
| boolean interrupted = Thread.interrupted(); |
| int lastNS = this.lastNotifySpot; |
| try { |
| { |
| boolean done = (lastNS != 0); |
| while (!done && waitTimeMs > 0) { |
| this.region.getCancelCriterion().checkCancelInProgress(null); |
| interrupted = Thread.interrupted() || interrupted; |
| long wt = Math.min(RETRY_TIME, waitTimeMs); |
| wait(wt); // spurious wakeup ok |
| lastNS = this.lastNotifySpot; |
| done = (lastNS != 0); |
| if (!done) { |
| // calc remaing wait time to fix bug 37196 |
| waitTimeMs = endTime - System.currentTimeMillis(); |
| } |
| } // while |
| if (done) { |
| this.lastNotifySpot = 0; |
| } |
| } |
| if (this.requestInProgress && !this.selectedNodeDead) { |
| // added the test of "!this.selectedNodeDead" for bug 37196 |
| StringBuilder sb = new StringBuilder(200); |
| sb.append("processorId=").append(this.processorId); |
| sb.append(" Key is ").append(this.key); |
| sb.append(" searchTimeoutMs ").append(timeoutMs); |
| if (waitTimeMs > 0) { |
| sb.append(" msRemaining=").append(waitTimeMs); |
| } |
| if (lastNS != 0) { |
| sb.append(" lastNotifySpot=").append(lastNS); |
| } |
| throw new TimeoutException( |
| String.format("Timeout during netsearch/netload/netwrite. Details: %s", |
| sb)); |
| } |
| return; |
| } catch (InterruptedException ignore) { |
| interrupted = true; |
| region.getCancelCriterion().checkCancelInProgress(null); |
| // keep waiting until we are done |
| waitTimeMs = endTime - System.currentTimeMillis(); |
| // continue |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } // for |
| } finally { |
| computeRemainingTimeout(); |
| } |
| } // requestInProgress |
| } |
| |
| private int getSearchTimeout() { |
| return region.getCache().getSearchTimeout(); |
| } |
| |
| private void resetResults() { |
| this.netSearch = false; |
| this.netLoad = false; |
| this.localLoad = false; |
| this.localWrite = false; |
| this.netWrite = false; |
| this.lastModified = 0; |
| this.isSerialized = false; |
| } |
| |
| |
| // private AcceptHelper getAcceptHelper(boolean ackPortInit) { |
| // AcceptHelper helper = null; |
| // synchronized(availableAcceptHelperSet) { |
| // if (availableAcceptHelperSet.size() <= 0) { |
| // helper = new AcceptHelper(); |
| // } |
| // else { |
| // helper = (AcceptHelper)availableAcceptHelperSet.iterator().next(); |
| // availableAcceptHelperSet.remove(helper); |
| // } |
| // } |
| // helper.reset(ackPortInit); |
| // return helper; |
| // |
| // } |
| |
| // private void releaseAcceptHelper(AcceptHelper helper) { |
| // synchronized(availableAcceptHelperSet) { |
| // if (!availableAcceptHelperSet.contains(helper)) |
| // availableAcceptHelperSet.add(helper); |
| // } |
| // |
| // } |
| |
| @Override |
| public String toString() { |
| return super.toString() + " processorId " + this.processorId; |
| } |
| |
| /** |
| * methods to set/remove htree reference (Bug 40299). |
| */ |
| protected static void setClearCountReference(LocalRegion rgn) { |
| DiskRegion dr = rgn.getDiskRegion(); |
| if (dr != null) { |
| dr.setClearCountReference(); |
| } |
| } |
| |
| protected static void removeClearCountReference(LocalRegion rgn) { |
| DiskRegion dr = rgn.getDiskRegion(); |
| if (dr != null) { |
| dr.removeClearCountReference(); |
| } |
| } |
| |
| /** |
| * A QueryMessage is broadcast to every node that has the region defined, to find out who has a |
| * valid copy of the requested object. |
| */ |
| public static class QueryMessage extends SerialDistributionMessage { |
| |
| /** |
| * The object id of the processor object on the initiator node. This will be communicated back |
| * in the response to enable transferring the result to the initiating VM. |
| */ |
| private int processorId; |
| |
| /** The fully qualified name of the Region */ |
| private String regionName; |
| |
| /** The object name */ |
| private Object key; |
| |
| /** Amount of time to wait before giving up */ |
| private int timeoutMs; |
| |
| |
| |
| /** The originator's expiration criteria */ |
| private int ttl, idleTime; |
| |
| /** |
| * if true then always send back value even if it is large. Added for bug 35942. |
| */ |
| private boolean alwaysSendResult; |
| |
| // using available bitmask flags |
| private static final short HAS_TTL = UNRESERVED_FLAGS_START; |
| private static final short HAS_IDLE_TIME = (HAS_TTL << 1); |
| private static final short ALWAYS_SEND_RESULT = (HAS_IDLE_TIME << 1); |
| |
| public QueryMessage() { |
| // do nothing |
| } |
| |
| /** |
| * Using a new or pooled message instance, create and send the query to all nodes. |
| */ |
| public static void sendMessage(SearchLoadAndWriteProcessor processor, String regionName, |
| Object key, boolean multicast, Set recipients, int timeoutMs, int ttl, int idleTime) { |
| |
| // create a message |
| QueryMessage msg = new QueryMessage(); |
| msg.initialize(processor, regionName, key, multicast, timeoutMs, ttl, idleTime); |
| msg.setRecipients(recipients); |
| if (!multicast && recipients.size() == 1) { |
| // we are only searching one recipient, so tell it to send the value |
| msg.alwaysSendResult = true; |
| } |
| processor.distributionManager.putOutgoing(msg); |
| |
| } |
| |
| private void initialize(SearchLoadAndWriteProcessor processor, String theRegionName, |
| Object theKey, boolean multicast, int theTimeoutMs, int theTtl, int theIdleTime) { |
| this.processorId = processor.processorId; |
| this.regionName = theRegionName; |
| setMulticast(multicast); |
| this.key = theKey; |
| this.timeoutMs = theTimeoutMs; |
| this.ttl = theTtl; |
| this.idleTime = theIdleTime; |
| Assert.assertTrue(processor.region.getScope().isDistributed()); |
| } |
| |
| |
| /** |
| * This method execute's on the receiver's node, and checks to see if the requested object |
| * exists in shared memory on this node, and if so, sends back a ResponseMessage. |
| */ |
| @Override |
| protected void process(ClusterDistributionManager dm) { |
| doGet(dm); |
| |
| } |
| |
| @Override |
| public int getDSFID() { |
| return QUERY_MESSAGE; |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| super.toData(out, context); |
| |
| short flags = 0; |
| if (this.processorId != 0) |
| flags |= HAS_PROCESSOR_ID; |
| if (this.ttl != 0) |
| flags |= HAS_TTL; |
| if (this.idleTime != 0) |
| flags |= HAS_IDLE_TIME; |
| if (this.alwaysSendResult) |
| flags |= ALWAYS_SEND_RESULT; |
| out.writeShort(flags); |
| |
| if (this.processorId != 0) { |
| out.writeInt(this.processorId); |
| } |
| out.writeUTF(this.regionName); |
| DataSerializer.writeObject(this.key, out); |
| out.writeInt(this.timeoutMs); |
| if (this.ttl != 0) { |
| InternalDataSerializer.writeSignedVL(this.ttl, out); |
| } |
| if (this.idleTime != 0) { |
| InternalDataSerializer.writeSignedVL(this.idleTime, out); |
| } |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| short flags = in.readShort(); |
| if ((flags & HAS_PROCESSOR_ID) != 0) { |
| this.processorId = in.readInt(); |
| ReplyProcessor21.setMessageRPId(this.processorId); |
| } |
| this.regionName = in.readUTF(); |
| this.key = DataSerializer.readObject(in); |
| this.timeoutMs = in.readInt(); |
| if ((flags & HAS_TTL) != 0) { |
| this.ttl = (int) InternalDataSerializer.readSignedVL(in); |
| } |
| if ((flags & HAS_IDLE_TIME) != 0) { |
| this.idleTime = (int) InternalDataSerializer.readSignedVL(in); |
| } |
| this.alwaysSendResult = (flags & ALWAYS_SEND_RESULT) != 0; |
| } |
| |
| @Override |
| public String toString() { |
| return "SearchLoadAndWriteProcessor.QueryMessage for \"" + this.key + "\" in region \"" |
| + this.regionName + "\", processorId " + processorId + ", timeoutMs=" + this.timeoutMs |
| + ", ttl=" + this.ttl + ", idleTime=" + this.idleTime; |
| } |
| |
| private void doGet(ClusterDistributionManager dm) { |
| long startTime = dm.cacheTimeMillis(); |
| // boolean retVal = true; |
| boolean isPresent = false; |
| boolean sendResult = false; |
| boolean isSer = false; |
| long lastModifiedCacheTime = 0; |
| boolean requestorTimedOut = false; |
| VersionTag tag = null; |
| |
| if (dm.getDMType() == ClusterDistributionManager.ADMIN_ONLY_DM_TYPE |
| || getSender().equals(dm.getDistributionManagerId())) { |
| // this was probably a multicast message |
| // replyWithNull(dm); - bug 35266: don't send a reply |
| return; |
| } |
| |
| final InitializationLevel oldLevel = |
| LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE); |
| try { |
| // check to see if we would have to wait on initialization latch (if global) |
| // if so abort and reply with null |
| InternalCache cache = dm.getExistingCache(); |
| if (cache.isGlobalRegionInitializing(this.regionName)) { |
| replyWithNull(dm); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Global Region not initialized yet"); |
| } |
| return; |
| } |
| |
| LocalRegion region = (LocalRegion) dm.getExistingCache().getRegion(this.regionName); |
| Object o = null; |
| |
| if (region != null) { |
| setClearCountReference(region); |
| try { |
| RegionEntry entry = region.basicGetEntry(this.key); |
| if (entry != null) { |
| synchronized (entry) { |
| assert region.isInitialized(); |
| if (dm.cacheTimeMillis() - startTime < timeoutMs) { |
| o = region.getNoLRU(this.key, false, true, true); // OFFHEAP: incrc, copy bytes, |
| // decrc |
| if (o != null && !Token.isInvalid(o) && !Token.isRemoved(o) |
| && !region.isExpiredWithRegardTo(this.key, this.ttl, this.idleTime)) { |
| isPresent = true; |
| VersionStamp stamp = entry.getVersionStamp(); |
| if (stamp != null && stamp.hasValidVersion()) { |
| tag = stamp.asVersionTag(); |
| } |
| lastModifiedCacheTime = entry.getLastModified(); |
| isSer = o instanceof CachedDeserializable; |
| if (isSer) { |
| o = ((CachedDeserializable) o).getSerializedValue(); |
| } |
| if (isPresent && (this.alwaysSendResult |
| || (ObjectSizer.DEFAULT.sizeof(o) < SMALL_BLOB_SIZE))) { |
| sendResult = true; |
| } |
| } |
| } else { |
| requestorTimedOut = true; |
| } |
| } |
| } else if (logger.isDebugEnabled()) { |
| logger.debug("Entry is null"); |
| } |
| } finally { |
| removeClearCountReference(region); |
| } |
| } |
| ResponseMessage.sendMessage(this.key, this.getSender(), processorId, |
| (sendResult ? o : null), lastModifiedCacheTime, isPresent, isSer, requestorTimedOut, dm, |
| tag); |
| } catch (RegionDestroyedException ignore) { |
| logger.debug("Region Destroyed Exception in QueryMessage doGet, null"); |
| replyWithNull(dm); |
| } catch (CancelException ignore) { |
| logger.debug("CacheClosedException in QueryMessage doGet, null"); |
| replyWithNull(dm); |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| logger.debug("Throwable in QueryMessage doGet, null", t); |
| replyWithNull(dm); |
| } finally { |
| LocalRegion.setThreadInitLevelRequirement(oldLevel); |
| } |
| } |
| |
| private void replyWithNull(ClusterDistributionManager dm) { |
| ResponseMessage.sendMessage(this.key, this.getSender(), processorId, null, 0, false, false, |
| false, dm, null); |
| } |
| } |
| |
| /** |
| * The ResponseMessage is a reply to a QueryMessage, and contains the object's value, if it is |
| * below the byte limit, otherwise an indication of whether the sender has the value. |
| */ |
| public static class ResponseMessage extends HighPriorityDistributionMessage { |
| |
| private Object key; |
| |
| /** The gemfire id of the SearchLoadAndWrite object waiting for response */ |
| private int processorId; |
| |
| /** The value being transferred */ |
| private Object result; |
| |
| /** Object creation time on remote node */ |
| private long lastModified; |
| |
| /** is the value present */ |
| private boolean isPresent; |
| |
| /** Is blob serialized? */ |
| private boolean isSerialized; |
| |
| /** did the request time out at the sender */ |
| private boolean requestorTimedOut; |
| |
| /** the version of the object being returned */ |
| private VersionTag versionTag; |
| |
| |
| public ResponseMessage() {} |
| |
| public static void sendMessage(Object key, InternalDistributedMember recipient, int processorId, |
| Object result, long lastModified, boolean isPresent, boolean isSerialized, |
| boolean requestorTimedOut, ClusterDistributionManager distributionManager, |
| VersionTag versionTag) { |
| |
| // create a message |
| ResponseMessage msg = new ResponseMessage(); |
| msg.initialize(key, processorId, result, lastModified, isPresent, isSerialized, |
| requestorTimedOut, versionTag); |
| msg.setRecipient(recipient); |
| distributionManager.putOutgoing(msg); |
| |
| } |
| |
| private void initialize(Object theKey, int theProcessorId, Object theResult, |
| long lastModifiedTime, boolean ispresent, boolean isserialized, |
| boolean requestorTimedOutFlag, VersionTag versionTag) { |
| this.key = theKey; |
| this.processorId = theProcessorId; |
| this.result = theResult; |
| this.lastModified = lastModifiedTime; |
| this.isPresent = ispresent; |
| this.isSerialized = isserialized; |
| this.requestorTimedOut = requestorTimedOutFlag; |
| this.versionTag = versionTag; |
| } |
| |
| /** |
| * Invoked on the receiver - which, in this case, was the initiator of the QueryMessage . |
| */ |
| @Override |
| protected void process(ClusterDistributionManager dm) { |
| // NOTE: keep this method efficient since it is optimized |
| // by executing it in the p2p reader. |
| // This is done with this line in DistributionMessage.java: |
| // || c.equals(SearchLoadAndWriteProcessor.ResponseMessage.class) |
| SearchLoadAndWriteProcessor processor = null; |
| processor = (SearchLoadAndWriteProcessor) getProcessorKeeper().retrieve(this.processorId); |
| if (processor == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Response() SearchLoadAndWriteProcessor no longer exists"); |
| } |
| return; |
| } |
| long lastModifiedSystemTime = 0; |
| if (this.lastModified != 0) { |
| lastModifiedSystemTime = this.lastModified; |
| } |
| if (this.versionTag != null) { |
| this.versionTag.replaceNullIDs(getSender()); |
| } |
| |
| processor.incomingResponse(this.result, lastModifiedSystemTime, this.isPresent, |
| this.isSerialized, this.requestorTimedOut, this.getSender(), dm, versionTag); |
| } |
| |
| @Override |
| public boolean getInlineProcess() { // optimization for bug 37075 |
| return true; |
| } |
| |
| @Override |
| public int getDSFID() { |
| return RESPONSE_MESSAGE; |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| super.toData(out, context); |
| DataSerializer.writeObject(this.key, out); |
| out.writeInt(this.processorId); |
| DataSerializer.writeObject(this.result, out); |
| out.writeLong(this.lastModified); |
| out.writeBoolean(this.isPresent); |
| out.writeBoolean(this.isSerialized); |
| out.writeBoolean(this.requestorTimedOut); |
| DataSerializer.writeObject(this.versionTag, out); |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| this.key = DataSerializer.readObject(in); |
| this.processorId = in.readInt(); |
| this.result = DataSerializer.readObject(in); |
| this.lastModified = in.readLong(); |
| this.isPresent = in.readBoolean(); |
| this.isSerialized = in.readBoolean(); |
| this.requestorTimedOut = in.readBoolean(); |
| this.versionTag = (VersionTag) DataSerializer.readObject(in); |
| } |
| |
| @Override |
| public String toString() { |
| return "SearchLoadAndWriteProcessor.ResponseMessage for processorId " + processorId |
| + ", blob is " + this.result + ", isPresent is " + isPresent + ", requestorTimedOut is " |
| + requestorTimedOut + ", version is " + versionTag; |
| } |
| |
| } |
| |
| /********************* NetSearchRequestMessage ***************************************/ |
| |
| public static class NetSearchRequestMessage extends PooledDistributionMessage { |
| |
| /** |
| * The object id of the processor object on the initiator node. This will be communicated back |
| * in the response to enable transferring the result to the initiating VM. |
| */ |
| private int processorId; |
| |
| /** The fully qualified name of the Region */ |
| private String regionName; |
| |
| /** The object name */ |
| private Object key; |
| |
| /** Amount of time to wait before giving up */ |
| private int timeoutMs; |
| |
| /** The originator's expiration criteria */ |
| private int ttl, idleTime; |
| |
| // using available bitmask flags |
| private static final short HAS_TTL = UNRESERVED_FLAGS_START; |
| private static final short HAS_IDLE_TIME = (HAS_TTL << 1); |
| |
| public NetSearchRequestMessage() {} |
| |
| /** |
| * Using a new or pooled message instance, create and send the request for object value to the |
| * specified node. |
| */ |
| public static void sendMessage(SearchLoadAndWriteProcessor processor, String regionName, |
| Object key, InternalDistributedMember recipient, int timeoutMs, int ttl, int idleTime) { |
| |
| // create a message |
| NetSearchRequestMessage msg = new NetSearchRequestMessage(); |
| msg.initialize(processor, regionName, key, timeoutMs, ttl, idleTime); |
| msg.setRecipient(recipient); |
| processor.distributionManager.putOutgoing(msg); |
| |
| } |
| |
| void initialize(SearchLoadAndWriteProcessor processor, String theRegionName, Object theKey, |
| int timeoutMS, int ttlMS, int idleTimeMS) { |
| this.processorId = processor.processorId; |
| this.regionName = theRegionName; |
| this.key = theKey; |
| this.timeoutMs = timeoutMS; |
| this.ttl = ttlMS; |
| this.idleTime = idleTimeMS; |
| Assert.assertTrue(processor.region.getScope().isDistributed()); |
| } |
| |
| /** Invoked on the node that has the object */ |
| @Override |
| protected void process(ClusterDistributionManager dm) { |
| doGet(dm); |
| } |
| |
| @Override |
| public int getDSFID() { |
| return NET_SEARCH_REQUEST_MESSAGE; |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| super.toData(out, context); |
| |
| short flags = 0; |
| if (this.processorId != 0) |
| flags |= HAS_PROCESSOR_ID; |
| if (this.ttl != 0) |
| flags |= HAS_TTL; |
| if (this.idleTime != 0) |
| flags |= HAS_IDLE_TIME; |
| out.writeShort(flags); |
| |
| if (this.processorId != 0) { |
| out.writeInt(this.processorId); |
| } |
| out.writeUTF(this.regionName); |
| DataSerializer.writeObject(this.key, out); |
| out.writeInt(this.timeoutMs); |
| if (this.ttl != 0) { |
| InternalDataSerializer.writeSignedVL(this.ttl, out); |
| } |
| if (this.idleTime != 0) { |
| InternalDataSerializer.writeSignedVL(this.idleTime, out); |
| } |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| short flags = in.readShort(); |
| if ((flags & HAS_PROCESSOR_ID) != 0) { |
| this.processorId = in.readInt(); |
| ReplyProcessor21.setMessageRPId(this.processorId); |
| } |
| this.regionName = in.readUTF(); |
| this.key = DataSerializer.readObject(in); |
| this.timeoutMs = in.readInt(); |
| if ((flags & HAS_TTL) != 0) { |
| this.ttl = (int) InternalDataSerializer.readSignedVL(in); |
| } |
| if ((flags & HAS_IDLE_TIME) != 0) { |
| this.idleTime = (int) InternalDataSerializer.readSignedVL(in); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "SearchLoadAndWriteProcessor.NetSearchRequestMessage for \"" + this.key |
| + "\" in region \"" + this.regionName + "\", processorId " + processorId; |
| } |
| |
| void doGet(ClusterDistributionManager dm) { |
| long startTime = dm.cacheTimeMillis(); |
| // boolean retVal = true; |
| byte[] ebv = null; |
| Object ebvObj = null; |
| int ebvLen = 0; |
| long lastModifiedCacheTime = 0; |
| boolean isSer = false; |
| boolean requestorTimedOut = false; |
| boolean authoritative = false; |
| VersionTag versionTag = null; |
| |
| InternalCache cache = dm.getExistingCache(); |
| |
| final InitializationLevel oldLevel = |
| LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE); |
| LocalRegion region = (LocalRegion) cache.getRegion(this.regionName); |
| CachePerfStats stats = |
| region == null ? cache.getCachePerfStats() : region.getRegionPerfStats(); |
| long startHandlingTime = stats.startHandlingNetsearch(); |
| boolean handlingSuccess = false; |
| try { |
| if (region != null) { |
| setClearCountReference(region); |
| try { |
| boolean initialized = region.isInitialized(); |
| RegionEntry entry = region.basicGetEntry(this.key); |
| if (entry != null) { |
| synchronized (entry) { |
| // get the value and version under synchronization so they don't change |
| VersionStamp versionStamp = entry.getVersionStamp(); |
| if (versionStamp != null) { |
| versionTag = versionStamp.asVersionTag(); |
| } |
| Object eov = region.getNoLRU(this.key, false, true, true); // OFFHEAP: incrc, copy |
| // bytes, decrc |
| if (eov != null) { |
| if (eov == Token.INVALID || eov == Token.LOCAL_INVALID) { |
| // nothing? |
| } else if (dm.cacheTimeMillis() - startTime < timeoutMs) { |
| if (!region.isExpiredWithRegardTo(this.key, this.ttl, this.idleTime)) { |
| lastModifiedCacheTime = entry.getLastModified(); |
| if (eov instanceof CachedDeserializable) { |
| CachedDeserializable cd = (CachedDeserializable) eov; |
| if (!cd.isSerialized()) { |
| isSer = false; |
| ebv = (byte[]) cd.getDeserializedForReading(); |
| ebvLen = ebv.length; |
| } else { |
| // don't serialize here if it is not already serialized |
| Object tmp = cd.getValue(); |
| if (tmp instanceof byte[]) { |
| byte[] bb = (byte[]) tmp; |
| ebv = bb; |
| ebvLen = bb.length; |
| } else { |
| ebvObj = tmp; |
| } |
| isSer = true; |
| } |
| } else if (!(eov instanceof byte[])) { |
| ebvObj = eov; |
| isSer = true; |
| } else { |
| ebv = (byte[]) eov; |
| ebvLen = ebv.length; |
| } |
| } |
| handlingSuccess = true; |
| } else { |
| requestorTimedOut = true; |
| } |
| } |
| } |
| } |
| authoritative = |
| region.getDataPolicy().withReplication() && initialized && !region.isDestroyed; |
| } finally { |
| removeClearCountReference(region); |
| } |
| } |
| NetSearchReplyMessage.sendMessage(NetSearchRequestMessage.this.getSender(), processorId, |
| this.key, ebv, ebvObj, ebvLen, lastModifiedCacheTime, isSer, requestorTimedOut, |
| authoritative, dm, versionTag); |
| } catch (RegionDestroyedException ignore) { |
| replyWithNull(dm); |
| |
| } catch (CancelException ignore) { |
| replyWithNull(dm); |
| |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| logger.warn("Unexpected exception creating net search reply", t); |
| replyWithNull(dm); |
| } finally { |
| LocalRegion.setThreadInitLevelRequirement(oldLevel); |
| stats.endHandlingNetsearch(startHandlingTime, handlingSuccess); |
| } |
| } |
| |
| private void replyWithNull(ClusterDistributionManager dm) { |
| NetSearchReplyMessage.sendMessage(NetSearchRequestMessage.this.getSender(), processorId, |
| this.key, null, null, 0, 0, false, false, false, dm, null); |
| } |
| } |
| |
| /** |
| * The NetSearchReplyMessage is a reply to a NetSearchRequestMessage, and contains the object's |
| * value. |
| */ |
| public static class NetSearchReplyMessage extends HighPriorityDistributionMessage { |
| private static final byte SERIALIZED = 0x01; |
| private static final byte REQUESTOR_TIMEOUT = 0x02; |
| private static final byte AUTHORATIVE = 0x04; |
| private static final byte VERSIONED = 0x08; |
| private static final byte PERSISTENT = 0x10; |
| |
| /** The gemfire id of the SearchLoadAndWrite object waiting for response */ |
| private int processorId; |
| |
| /** The object value being transferred */ |
| private byte[] value; |
| |
| private transient Object valueObj; // only used by toData |
| private transient int valueLen; // only used by toData |
| |
| /** Object creation time on remote node */ |
| private long lastModified; |
| |
| /** Is blob serialized? */ |
| private boolean isSerialized; |
| |
| /** did the request time out at the sender */ |
| private boolean requestorTimedOut; |
| |
| /** |
| * Does this member authoritatively know the value? This is used to distinguish a null response |
| * indicating the region was missing vs. a null value. |
| */ |
| private boolean authoritative; |
| |
| /** the version of the returned entry */ |
| private VersionTag versionTag; |
| |
| public NetSearchReplyMessage() {} |
| |
| public static void sendMessage(InternalDistributedMember recipient, int processorId, Object key, |
| byte[] value, Object valueObj, int valueLen, long lastModified, boolean isSerialized, |
| boolean requestorTimedOut, boolean authoritative, |
| ClusterDistributionManager distributionManager, VersionTag versionTag) { |
| // create a message |
| NetSearchReplyMessage msg = new NetSearchReplyMessage(); |
| msg.initialize(processorId, value, valueObj, valueLen, lastModified, isSerialized, |
| requestorTimedOut, authoritative, versionTag); |
| msg.setRecipient(recipient); |
| distributionManager.putOutgoing(msg); |
| |
| } |
| |
| @SuppressWarnings("hiding") |
| private void initialize(int procId, byte[] theValue, Object theValueObj, int valueObjLen, |
| long lastModifiedTime, boolean isserialized, boolean requestorTimedout, |
| boolean authoritative, VersionTag versionTag) { |
| this.processorId = procId; |
| this.value = theValue; |
| this.valueObj = theValueObj; |
| this.valueLen = valueObjLen; |
| this.lastModified = lastModifiedTime; |
| this.isSerialized = isserialized; |
| this.requestorTimedOut = requestorTimedout; |
| this.authoritative = authoritative; |
| this.versionTag = versionTag; |
| } |
| |
| /** |
| * Invoked on the receiver - which, in this case, was the initiator of the |
| * NetSearchRequestMessage. This concludes the net request, by communicating an object value. |
| */ |
| @Override |
| protected void process(ClusterDistributionManager dm) { |
| SearchLoadAndWriteProcessor processor = null; |
| processor = (SearchLoadAndWriteProcessor) getProcessorKeeper().retrieve(processorId); |
| if (processor == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("NetSearchReplyMessage() SearchLoadAndWriteProcessor {} no longer exists", |
| processorId); |
| } |
| return; |
| } |
| long lastModifiedSystemTime = 0; |
| if (this.lastModified != 0) { |
| lastModifiedSystemTime = this.lastModified; |
| } |
| if (this.versionTag != null) { |
| this.versionTag.replaceNullIDs(getSender()); |
| } |
| processor.incomingNetSearchReply(this.value, lastModifiedSystemTime, this.isSerialized, |
| this.requestorTimedOut, this.authoritative, this.versionTag, getSender()); |
| } |
| |
| @Override |
| public int getDSFID() { |
| return NET_SEARCH_REPLY_MESSAGE; |
| } |
| |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| super.toData(out, context); |
| out.writeInt(this.processorId); |
| if (this.valueObj != null) { |
| DataSerializer.writeObjectAsByteArray(this.valueObj, out); |
| } else { |
| DataSerializer.writeByteArray(this.value, this.valueLen, out); |
| } |
| out.writeLong(this.lastModified); |
| byte booleans = 0; |
| if (this.isSerialized) |
| booleans |= SERIALIZED; |
| if (this.requestorTimedOut) |
| booleans |= REQUESTOR_TIMEOUT; |
| if (this.authoritative) |
| booleans |= AUTHORATIVE; |
| if (this.versionTag != null) |
| booleans |= VERSIONED; |
| if (this.versionTag instanceof DiskVersionTag) |
| booleans |= PERSISTENT; |
| out.writeByte(booleans); |
| if (this.versionTag != null) { |
| InternalDataSerializer.invokeToData(this.versionTag, out); |
| } |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| this.processorId = in.readInt(); |
| this.value = DataSerializer.readByteArray(in); |
| if (this.value != null) { |
| this.valueLen = this.value.length; |
| } |
| this.lastModified = in.readLong(); |
| byte booleans = in.readByte(); |
| |
| this.isSerialized = (booleans & SERIALIZED) != 0; |
| this.requestorTimedOut = (booleans & REQUESTOR_TIMEOUT) != 0; |
| this.authoritative = (booleans & AUTHORATIVE) != 0; |
| if ((booleans & VERSIONED) != 0) { |
| boolean persistentTag = (booleans & PERSISTENT) != 0; |
| this.versionTag = VersionTag.create(persistentTag, in); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| return "SearchLoadAndWriteProcessor.NetSearchReplyMessage for processorId " + processorId |
| + ", blob is " + |
| // this.value |
| (this.value == null ? "null" : "(" + this.value.length + " bytes)") + " authorative=" |
| + authoritative + " versionTag=" + this.versionTag; |
| } |
| |
| } |
| |
| |
| /******************************** NetLoadRequestMessage **********************/ |
| |
| public static class NetLoadRequestMessage extends PooledDistributionMessage { |
| |
| /** |
| * The object id of the processor object on the initiator node. This will be communicated back |
| * in the response to enable transferring the result to the initiating VM. |
| */ |
| private int processorId; |
| |
| /** The fully qualified name of the Region */ |
| private String regionName; |
| |
| /** The object name */ |
| private Object key; |
| |
| /** Parameter to use when invoking loader */ |
| private Object aCallbackArgument; |
| |
| /** Amount of time to wait before giving up */ |
| private int timeoutMs; |
| |
| |
| /** The originator's expiration criteria */ |
| private int ttl, idleTime; |
| |
| public NetLoadRequestMessage() {} |
| |
| /** |
| * Using a new or pooled message instance, create and send the request for object value to the |
| * specified node. |
| */ |
| public static void sendMessage(SearchLoadAndWriteProcessor processor, String regionName, |
| Object key, Object aCallbackArgument, InternalDistributedMember recipient, int timeoutMs, |
| int ttl, int idleTime) { |
| |
| // create a message |
| NetLoadRequestMessage msg = new NetLoadRequestMessage(); |
| msg.initialize(processor, regionName, key, aCallbackArgument, timeoutMs, ttl, idleTime); |
| msg.setRecipient(recipient); |
| |
| try { |
| processor.distributionManager.putOutgoing(msg); |
| } catch (InternalGemFireException e) { |
| throw new IllegalArgumentException( |
| "Message not serializable"); |
| } |
| } |
| |
| private void initialize(SearchLoadAndWriteProcessor processor, String theRegionName, |
| Object theKey, Object callbackArgument, int timeoutMS, int ttlMS, int idleTimeMS) { |
| this.processorId = processor.processorId; |
| this.regionName = theRegionName; |
| this.key = theKey; |
| this.aCallbackArgument = callbackArgument; |
| this.timeoutMs = timeoutMS; |
| this.ttl = ttlMS; |
| this.idleTime = idleTimeMS; |
| Assert.assertTrue(processor.region.getScope().isDistributed()); |
| } |
| |
| /** Invoked on the node that has the object */ |
| @Override |
| protected void process(ClusterDistributionManager dm) { |
| doLoad(dm); |
| } |
| |
| @Override |
| public int getDSFID() { |
| return NET_LOAD_REQUEST_MESSAGE; |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| super.toData(out, context); |
| out.writeInt(this.processorId); |
| out.writeUTF(this.regionName); |
| DataSerializer.writeObject(this.key, out); |
| DataSerializer.writeObject(this.aCallbackArgument, out); |
| out.writeInt(this.timeoutMs); |
| out.writeInt(this.ttl); |
| out.writeInt(this.idleTime); |
| |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| this.processorId = in.readInt(); |
| this.regionName = in.readUTF(); |
| this.key = DataSerializer.readObject(in); |
| this.aCallbackArgument = DataSerializer.readObject(in); |
| this.timeoutMs = in.readInt(); |
| this.ttl = in.readInt(); |
| this.idleTime = in.readInt(); |
| } |
| |
| @Override |
| public String toString() { |
| return "SearchLoadAndWriteProcessor.NetLoadRequestMessage for \"" + this.key |
| + "\" in region \"" + this.regionName + "\", processorId " + processorId; |
| } |
| |
| private void doLoad(ClusterDistributionManager dm) { |
| long startTime = dm.cacheTimeMillis(); |
| final InitializationLevel oldLevel = |
| LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE); |
| try { |
| InternalCache gfc = dm.getExistingCache(); |
| LocalRegion region = (LocalRegion) gfc.getRegion(this.regionName); |
| if (region != null && region.isInitialized() |
| && (dm.cacheTimeMillis() - startTime < timeoutMs)) { |
| CacheLoader loader = ((AbstractRegion) region).basicGetLoader(); |
| if (loader != null) { |
| LoaderHelper loaderHelper = region.loaderHelperFactory.createLoaderHelper(this.key, |
| this.aCallbackArgument, false, false, null); |
| CachePerfStats stats = region.getCachePerfStats(); |
| long start = stats.startLoad(); |
| try { |
| Object o = loader.load(loaderHelper); |
| // no need to call convertPdxInstanceIfNeeded since we are serializing |
| // this into the NetLoadRequestMessage. The loaded object will be deserialized |
| // on the other side and have the correct form in that member. |
| Assert.assertTrue(o != Token.INVALID && o != Token.LOCAL_INVALID); |
| NetLoadReplyMessage.sendMessage(NetLoadRequestMessage.this.getSender(), processorId, |
| o, dm, loaderHelper.getArgument(), null, false, false); |
| |
| } catch (Exception e) { |
| replyWithException(e, dm); |
| } finally { |
| stats.endLoad(start); |
| } |
| } else { |
| replyWithException(new TryAgainException("No loader defined"), |
| dm); |
| } |
| |
| } else { |
| replyWithException(new TryAgainException( |
| "Timeout expired or region not ready"), |
| dm); |
| } |
| |
| } catch (RegionDestroyedException rde) { |
| replyWithException(rde, dm); |
| |
| } catch (CancelException cce) { |
| replyWithException(cce, dm); |
| |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| replyWithException(new InternalGemFireException( |
| "Error processing request", |
| t), dm); |
| } finally { |
| LocalRegion.setThreadInitLevelRequirement(oldLevel); |
| } |
| |
| } |
| |
| void replyWithException(Exception e, ClusterDistributionManager dm) { |
| NetLoadReplyMessage.sendMessage(NetLoadRequestMessage.this.getSender(), processorId, null, dm, |
| this.aCallbackArgument, e, false, false); |
| } |
| } |
| |
| /** |
| * The NetLoadReplyMessage is a reply to a RequestMessage, and contains the object's value. |
| */ |
| public static class NetLoadReplyMessage extends HighPriorityDistributionMessage { |
| |
| /** The gemfire id of the SearchLoadAndWrite object waiting for response */ |
| private int processorId; |
| |
| /** The object value being transferred */ |
| private Object result; |
| |
| /** Loader parameter returned to sender */ |
| private Object aCallbackArgument; |
| |
| /** Exception thrown by remote node */ |
| private Exception e; |
| |
| /** Is blob serialized? */ |
| private boolean isSerialized; |
| |
| /** ??? */ |
| private boolean requestorTimedOut; |
| |
| public NetLoadReplyMessage() {} |
| |
| public static void sendMessage(InternalDistributedMember recipient, int processorId, Object obj, |
| ClusterDistributionManager distributionManager, Object aCallbackArgument, Exception e, |
| boolean isSerialized, boolean requestorTimedOut) { |
| // create a message |
| NetLoadReplyMessage msg = new NetLoadReplyMessage(); |
| msg.initialize(processorId, obj, aCallbackArgument, e, isSerialized, requestorTimedOut); |
| msg.setRecipient(recipient); |
| distributionManager.putOutgoing(msg); |
| } |
| |
| private void initialize(int procId, Object obj, Object callbackArgument, Exception exe, |
| boolean isserialized, boolean requestorTimedout) { |
| this.processorId = procId; |
| this.result = obj; |
| this.e = exe; |
| this.aCallbackArgument = callbackArgument; |
| this.isSerialized = isserialized; |
| this.requestorTimedOut = requestorTimedout; |
| } |
| |
| /** |
| * Invoked on the receiver - which, in this case, was the initiator of the |
| * NetLoadRequestMessage. This concludes the net request, by communicating an object value. |
| */ |
| @Override |
| protected void process(ClusterDistributionManager dm) { |
| SearchLoadAndWriteProcessor processor = null; |
| processor = (SearchLoadAndWriteProcessor) getProcessorKeeper().retrieve(processorId); |
| if (processor == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("NetLoadReplyMessage() SearchLoadAndWriteProcessor no longer exists"); |
| } |
| return; |
| } |
| processor.incomingNetLoadReply(this.result, 0, this.aCallbackArgument, this.e, |
| this.isSerialized, this.requestorTimedOut); |
| } |
| |
| @Override |
| public int getDSFID() { |
| return NET_LOAD_REPLY_MESSAGE; |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| super.toData(out, context); |
| out.writeInt(this.processorId); |
| boolean isSerialized = this.isSerialized; |
| if (result instanceof byte[]) { |
| DataSerializer.writeByteArray((byte[]) this.result, out); |
| } else { |
| DataSerializer.writeObjectAsByteArray(this.result, out); |
| isSerialized = true; |
| } |
| DataSerializer.writeObject(this.aCallbackArgument, out); |
| DataSerializer.writeObject(this.e, out); |
| out.writeBoolean(isSerialized); |
| out.writeBoolean(this.requestorTimedOut); |
| |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| this.processorId = in.readInt(); |
| this.result = DataSerializer.readByteArray(in); |
| this.aCallbackArgument = DataSerializer.readObject(in); |
| this.e = (Exception) DataSerializer.readObject(in); |
| this.isSerialized = in.readBoolean(); |
| this.requestorTimedOut = in.readBoolean(); |
| |
| } |
| |
| @Override |
| public String toString() { |
| return "SearchLoadAndWriteProcessor.NetLoadReplyMessage for processorId " + processorId |
| + ", blob is " + this.result; |
| } |
| |
| } |
| |
| /********************* NetWriteRequestMessage *******************************/ |
| |
| public static class NetWriteRequestMessage extends PooledDistributionMessage { |
| |
| /** |
| * The object id of the processor object on the initiator node. This will be communicated back |
| * in the response to enable transferring the result to the initiating VM. |
| */ |
| private int processorId; |
| |
| /** The fully qualified name of the Region */ |
| private String regionName; |
| |
| /** The event being sent over to the remote writer */ |
| CacheEvent event; |
| |
| private int timeoutMs; |
| /** Action requested by sender */ |
| int action; |
| |
| public NetWriteRequestMessage() {} |
| |
| /** |
| * Using a new or pooled message instance, create and send the request for object value to the |
| * specified node. |
| */ |
| public static void sendMessage(SearchLoadAndWriteProcessor processor, String regionName, |
| int timeoutMs, CacheEvent event, Set recipients, int action) { |
| |
| NetWriteRequestMessage msg = new NetWriteRequestMessage(); |
| msg.initialize(processor, regionName, timeoutMs, event, action); |
| msg.setRecipients(recipients); |
| processor.distributionManager.putOutgoing(msg); |
| |
| } |
| |
| private void initialize(SearchLoadAndWriteProcessor processor, String theRegionName, |
| int timeoutMS, CacheEvent theEvent, int actionType) { |
| this.processorId = processor.processorId; |
| this.regionName = theRegionName; |
| this.timeoutMs = timeoutMS; |
| this.event = theEvent; |
| this.action = actionType; |
| Assert.assertTrue(processor.region.getScope().isDistributed()); |
| } |
| |
| @Override |
| public int getDSFID() { |
| return NET_WRITE_REQUEST_MESSAGE; |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| super.toData(out, context); |
| out.writeInt(this.processorId); |
| out.writeUTF(this.regionName); |
| out.writeInt(this.timeoutMs); |
| DataSerializer.writeObject(this.event, out); |
| out.writeInt(this.action); |
| |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| this.processorId = in.readInt(); |
| this.regionName = in.readUTF(); |
| this.timeoutMs = in.readInt(); |
| this.event = (CacheEvent) DataSerializer.readObject(in); |
| this.action = in.readInt(); |
| } |
| |
| @Override |
| public String toString() { |
| return "SearchLoadAndWriteProcessor.NetWriteRequestMessage " + " for region \"" |
| + this.regionName + "\", processorId " + processorId; |
| } |
| |
| /** Invoked on the node that has the object */ |
| @Override |
| protected void process(ClusterDistributionManager dm) { |
| long startTime = dm.cacheTimeMillis(); |
| final InitializationLevel oldLevel = |
| LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE); |
| try { |
| InternalCache gfc = dm.getExistingCache(); |
| LocalRegion region = (LocalRegion) gfc.getRegion(this.regionName); |
| if (region != null && region.isInitialized() |
| && (dm.cacheTimeMillis() - startTime < timeoutMs)) { |
| CacheWriter writer = region.basicGetWriter(); |
| EntryEventImpl entryEvtImpl = null; |
| RegionEventImpl regionEvtImpl = null; |
| if (this.event instanceof EntryEventImpl) { |
| entryEvtImpl = (EntryEventImpl) this.event; |
| entryEvtImpl.setRegion(region); |
| Operation op = entryEvtImpl.getOperation(); |
| if (op == Operation.REPLACE) { |
| entryEvtImpl.setOperation(Operation.UPDATE); |
| } else if (op == Operation.PUT_IF_ABSENT) { |
| entryEvtImpl.setOperation(Operation.CREATE); |
| } else if (op == Operation.REMOVE) { |
| entryEvtImpl.setOperation(Operation.DESTROY); |
| } |
| // [bruce] even if this event was transmitted from another VM, its operation may have |
| // originated in this VM. PartitionedRegion.put() with a cacheWriter is one |
| // situation where this can occur |
| entryEvtImpl.setOriginRemote(event.getDistributedMember() == null |
| || !event.getDistributedMember().equals(dm.getDistributionManagerId())); |
| } else if (this.event instanceof RegionEventImpl) { |
| regionEvtImpl = (RegionEventImpl) this.event; |
| regionEvtImpl.region = region; |
| regionEvtImpl.originRemote = true; |
| } |
| |
| if (writer != null) { |
| if (entryEvtImpl != null) { |
| entryEvtImpl.setReadOldValueFromDisk(true); |
| } |
| try { |
| switch (action) { |
| case BEFORECREATE: |
| writer.beforeCreate(entryEvtImpl); |
| break; |
| case BEFOREDESTROY: |
| writer.beforeDestroy(entryEvtImpl); |
| break; |
| case BEFOREUPDATE: |
| writer.beforeUpdate(entryEvtImpl); |
| break; |
| case BEFOREREGIONDESTROY: |
| writer.beforeRegionDestroy(regionEvtImpl); |
| break; |
| case BEFOREREGIONCLEAR: |
| writer.beforeRegionClear(regionEvtImpl); |
| break; |
| default: |
| break; |
| |
| } |
| NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), processorId, |
| dm, true, null, false); |
| } catch (CacheWriterException cwe) { |
| NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), processorId, |
| dm, false, cwe, true); |
| } catch (Exception e) { |
| NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), processorId, |
| dm, false, e, false); |
| } finally { |
| if (entryEvtImpl != null) { |
| entryEvtImpl.setReadOldValueFromDisk(false); |
| } |
| } |
| |
| } else { |
| NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), processorId, |
| dm, false, |
| new TryAgainException( |
| "No cachewriter defined"), |
| true); |
| } |
| |
| } else { |
| NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), processorId, dm, |
| false, |
| new TryAgainException("Timeout expired or region not ready"), |
| true); |
| |
| } |
| } catch (RegionDestroyedException ignore) { |
| NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), processorId, dm, |
| false, null, false); |
| |
| } catch (DistributedSystemDisconnectedException e) { |
| // shutdown condition |
| throw e; |
| } catch (CancelException cce) { |
| dm.getCancelCriterion().checkCancelInProgress(cce); // TODO anyway to find the region or |
| // cache here? |
| NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), processorId, dm, |
| false, null, false); |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), processorId, dm, |
| false, |
| new InternalGemFireException( |
| "Error processing request", |
| t), |
| true); |
| } finally { |
| LocalRegion.setThreadInitLevelRequirement(oldLevel); |
| } |
| } |
| } |
| |
| /** |
| * The NetWriteReplyMessage is a reply to a NetWriteRequestMessage, and contains the success code |
| * or exception that is propagated back to the requestor |
| */ |
| public static class NetWriteReplyMessage extends HighPriorityDistributionMessage { |
| |
| /** The gemfire id of the SearchLoadAndWrite object waiting for response */ |
| private int processorId; |
| |
| /** Indicates whether the write succeeded */ |
| private boolean netWriteSucceeded; |
| |
| |
| /** Exception thrown by remote node */ |
| private Exception e; |
| |
| /** Is the exception a cacheLoaderException */ |
| private boolean cacheWriterException; |
| |
| public NetWriteReplyMessage() {} |
| |
| public static void sendMessage(InternalDistributedMember recipient, int processorId, |
| ClusterDistributionManager distributionManager, boolean netWriteSucceeded, Exception e, |
| boolean cacheWriterException) { |
| // create a message |
| NetWriteReplyMessage msg = new NetWriteReplyMessage(); |
| msg.initialize(processorId, netWriteSucceeded, e, cacheWriterException); |
| msg.setRecipient(recipient); |
| distributionManager.putOutgoing(msg); |
| } |
| |
| private void initialize(int procId, boolean netwriteSucceeded, Exception except, |
| boolean cacheWriterExcept) { |
| this.processorId = procId; |
| this.netWriteSucceeded = netwriteSucceeded; |
| this.e = except; |
| this.cacheWriterException = cacheWriterExcept; |
| } |
| |
| /** |
| * Invoked on the receiver - which, in this case, was the initiator of the |
| * NetWriteRequestMessage. This concludes the net write request, by communicating an object |
| * value. |
| */ |
| @Override |
| protected void process(ClusterDistributionManager dm) { |
| SearchLoadAndWriteProcessor processor = null; |
| processor = (SearchLoadAndWriteProcessor) getProcessorKeeper().retrieve(processorId); |
| if (processor == null) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("NetWriteReplyMessage() SearchLoadAndWriteProcessor no longer exists"); |
| } |
| return; |
| } |
| processor.incomingNetWriteReply(this.netWriteSucceeded, this.e, this.cacheWriterException); |
| } |
| |
| @Override |
| public int getDSFID() { |
| return NET_WRITE_REPLY_MESSAGE; |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| super.toData(out, context); |
| out.writeInt(this.processorId); |
| out.writeBoolean(this.netWriteSucceeded); |
| DataSerializer.writeObject(this.e, out); |
| out.writeBoolean(this.cacheWriterException); |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| this.processorId = in.readInt(); |
| this.netWriteSucceeded = in.readBoolean(); |
| this.e = (Exception) DataSerializer.readObject(in); |
| this.cacheWriterException = in.readBoolean(); |
| } |
| |
| @Override |
| public String toString() { |
| return "SearchLoadAndWriteProcessor.NetWriteReplyMessage for processorId " + processorId; |
| } |
| } |
| } |