blob: 8164ef8795736d3b30bf042726e149d6dc762cf3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.BEFORE_INITIAL_IMAGE;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.GemFireException;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.CacheLoader;
import org.apache.geode.cache.CacheLoaderException;
import org.apache.geode.cache.CacheWriter;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.LoaderHelper;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.util.ObjectSizer;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.HighPriorityDistributionMessage;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.PooledDistributionMessage;
import org.apache.geode.distributed.internal.ProcessorKeeper21;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.SerialDistributionMessage;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.LocalRegion.InitializationLevel;
import org.apache.geode.internal.cache.versions.DiskVersionTag;
import org.apache.geode.internal.cache.versions.VersionStamp;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.offheap.Releasable;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.offheap.annotations.Retained;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* Implementation for distributed search, load and write operations in the GemFire system. Provides
* an API for doing netSearch, netLoad, netSearchAndLoad and netWrite. The class uses the
* DistributionAdvisor to route requests to the appropriate members. It also uses the
* DistributionAdvisor to get region scope and applies rules based on the scope. It makes uses of
* intelligent acceptors that allow netSearch to happen as a one phase operation at all
* times.netLoad happens as a one phase operation in all cases except where the scope is GLOBAL At
* the receiving end, the request is converted into an appropriate message whose process method
* responds to the request.
*/
public class SearchLoadAndWriteProcessor implements MembershipListener {
private static final Logger logger = LogService.getLogger();
public static final int SMALL_BLOB_SIZE =
Integer.getInteger("DistributionManager.OptimizedUpdateByteLimit", 2000);
static final long RETRY_TIME =
Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "search-retry-interval", 2000);
private volatile InternalDistributedMember selectedNode;
private boolean selectedNodeDead = false;
private int timeout;
private boolean netSearchDone = false;
private boolean netLoadDone = false;
private long lastModified = 0;
protected int processorId;
private Object aCallbackArgument;
private String regionName;
private Object key;
protected LocalRegion region;
private Object result = null;
private boolean isSerialized = false; // is result serialized?
private CacheDistributionAdvisor advisor = null;
protected Exception remoteException = null;
public DistributionManager distributionManager = null;
private volatile boolean requestInProgress = false;
private boolean remoteGetInProgress = false;
private volatile boolean authorative = false;
/** remoteLoadInProgress is volatile to make sure response threads see the current value */
private volatile boolean remoteLoadInProgress = false;
@MakeNotStatic
private static final ProcessorKeeper21 processorKeeper = new ProcessorKeeper21(false);
// private static Set availableAcceptHelperSet = new HashSet();
/** The members that haven't replied yet */
// changed pendingResponders to synchronized Set to fix bug 38972
private final Set pendingResponders = Collections.synchronizedSet(new HashSet());
private List responseQueue = null;
private boolean netWriteSucceeded = false;
private int remainingTimeout = 0;
private long startTimeSnapShot = 0;
private long endTimeSnapShot = 0;
private boolean netSearch = false;
private boolean netLoad = false;
private boolean attemptedLocalLoad = false; // added for bug 39738
private boolean localLoad = false;
private boolean localWrite = false;
private boolean netWrite = false;
private final Object membersLock = new Object();
private ArrayList<InternalDistributedMember> departedMembers;
private Lock lock = null; // if non-null, then needs to be unlocked in release
static final int NETSEARCH = 0;
static final int NETLOAD = 1;
static final int NETWRITE = 2;
static final int BEFORECREATE = 0;
static final int BEFOREDESTROY = 1;
static final int BEFOREUPDATE = 2;
static final int BEFOREREGIONDESTROY = 3;
static final int BEFOREREGIONCLEAR = 4;
/************** Public Methods ************************/
Object doNetSearch() throws TimeoutException {
resetResults();
RegionAttributes attrs = region.getAttributes();
this.requestInProgress = true;
Scope scope = attrs.getScope();
Assert.assertTrue(scope != Scope.LOCAL);
netSearchForBlob();
this.requestInProgress = false;
return this.result;
}
void doSearchAndLoad(EntryEventImpl event, TXStateInterface txState, Object localValue,
boolean preferCD) throws CacheLoaderException, TimeoutException {
this.requestInProgress = true;
RegionAttributes attrs = region.getAttributes();
Scope scope = attrs.getScope();
CacheLoader loader = ((AbstractRegion) region).basicGetLoader();
if (scope.isLocal()) {
Object obj = doLocalLoad(loader, false, preferCD);
event.setNewValue(obj);
} else {
searchAndLoad(event, txState, localValue, preferCD);
}
this.requestInProgress = false;
if (this.netSearch) {
if (event.getOperation().isCreate()) {
event.setOperation(Operation.SEARCH_CREATE);
} else {
event.setOperation(Operation.SEARCH_UPDATE);
}
} else if (this.netLoad) {
if (event.getOperation().isCreate()) {
event.setOperation(Operation.NET_LOAD_CREATE);
} else {
event.setOperation(Operation.NET_LOAD_UPDATE);
}
} else if (this.localLoad) {
if (event.getOperation().isCreate()) {
event.setOperation(Operation.LOCAL_LOAD_CREATE);
} else {
event.setOperation(Operation.LOCAL_LOAD_UPDATE);
}
}
}
/** return true if a CacheWriter was actually invoked */
boolean doNetWrite(CacheEvent event, Set netWriteRecipients, CacheWriter localWriter, int paction)
throws CacheWriterException, TimeoutException {
int action = paction;
this.requestInProgress = true;
Scope scope = this.region.getScope();
if (localWriter != null) {
doLocalWrite(localWriter, event, action);
this.requestInProgress = false;
return true;
}
if (scope == Scope.LOCAL && (region.getPartitionAttributes() == null)) {
return false;
}
@Released
CacheEvent listenerEvent = getEventForListener(event);
try {
if (action == BEFOREUPDATE && listenerEvent.getOperation().isCreate()) {
action = BEFORECREATE;
}
boolean cacheWrote = netWrite(listenerEvent, action, netWriteRecipients);
this.requestInProgress = false;
return cacheWrote;
} finally {
if (event != listenerEvent) {
if (listenerEvent instanceof EntryEventImpl) {
((Releasable) listenerEvent).release();
}
}
}
}
@Override
public void memberJoined(DistributionManager distributionManager, InternalDistributedMember id) {
// Ignore - if they just joined, they don't have what we want
}
@Override
public void memberSuspect(DistributionManager distributionManager, InternalDistributedMember id,
InternalDistributedMember whoSuspected, String reason) {}
@Override
public void quorumLost(DistributionManager distributionManager,
Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {}
@Override
public void memberDeparted(DistributionManager distributionManager,
final InternalDistributedMember id, final boolean crashed) {
synchronized (membersLock) {
pendingResponders.remove(id);
}
synchronized (this) {
if (id.equals(selectedNode) && (this.requestInProgress) && (this.remoteGetInProgress)) {
if (departedMembers == null) {
departedMembers = new ArrayList<InternalDistributedMember>();
}
departedMembers.add(id);
selectedNode = null;
selectedNodeDead = true;
computeRemainingTimeout();
if (logger.isDebugEnabled()) {
logger.debug("{}: processing loss of member {}", this, id);
}
this.lastNotifySpot = 3;
notifyAll(); // signal the waiter; we are not done; but we need the waiter to call
// sendNetSearchRequest
}
if (responseQueue != null) {
responseQueue.remove(id);
}
checkIfDone();
}
}
int getProcessorId() {
return this.processorId;
}
synchronized void checkIfDone() {
if (!this.remoteGetInProgress && this.pendingResponders.isEmpty()) {
// Synchronize in case a different response/reply is still
// in progress, and it's the one thats got the goods (bug 28741)
signalDone();
}
}
synchronized void signalDone() {
this.requestInProgress = false;
this.lastNotifySpot = 1;
notifyAll();
}
synchronized void signalTimedOut() {
this.lastNotifySpot = 2;
notifyAll();
}
private volatile int lastNotifySpot = 0;
private VersionTag versionTag;
synchronized void nackResponseComplete() {
/*
* if (this.requestInProgress && currentHelper != null && optimizer != null &&
* optimizer.allRepliesReceived(currentHelper.nackServerChannel)) { this.requestInProgress =
* false; signalDone(); }
*/
}
static ProcessorKeeper21 getProcessorKeeper() {
return processorKeeper;
}
boolean isNetSearch() {
return netSearch;
}
boolean isNetLoad() {
return netLoad;
}
boolean isLocalLoad() {
return localLoad;
}
boolean isNetWrite() {
return netWrite;
}
boolean isLocalWrite() {
return localWrite;
}
long getLastModified() {
return lastModified;
}
boolean resultIsSerialized() {
return this.isSerialized;
}
static SearchLoadAndWriteProcessor getProcessor() {
SearchLoadAndWriteProcessor processor = new SearchLoadAndWriteProcessor();
processor.processorId = getProcessorKeeper().put(processor);
return processor;
}
void release() {
try {
if (this.lock != null) {
try {
this.lock.unlock();
} catch (CancelException ignore) {
}
this.lock = null;
}
} finally {
try {
if (this.advisor != null) {
this.advisor.removeMembershipListener(this);
}
} catch (IllegalArgumentException ignore) {
} finally {
getProcessorKeeper().remove(this.processorId);
}
}
}
void remove() {
getProcessorKeeper().remove(this.processorId);
}
void initialize(LocalRegion theRegion, Object theKey, Object theCallbackArg) {
this.region = theRegion;
this.regionName = theRegion.getFullPath();
this.key = theKey;
this.aCallbackArgument = theCallbackArg;
RegionAttributes attrs = theRegion.getAttributes();
Scope scope = attrs.getScope();
if (scope.isDistributed()) {
this.advisor = ((CacheDistributionAdvisee) this.region).getCacheDistributionAdvisor();
this.distributionManager = theRegion.getDistributionManager();
this.timeout = getSearchTimeout();
this.advisor.addMembershipListener(this);
}
}
void setKey(Object key) {
this.key = key;
}
protected void setSelectedNode(InternalDistributedMember selectedNode) {
this.selectedNode = selectedNode;
this.selectedNodeDead = false;
}
protected int getTimeout() {
return this.timeout;
}
protected Object getKey() {
return this.key;
}
InternalDistributedMember getSelectedNode() {
return this.selectedNode;
}
/**
* Even though SearchLoadAndWriteProcessor may be in invoked in the context of a local region,
* most of the services it provides are relevant to distribution only. The 3 services it provides
* are netSearch, netLoad, netWrite
*/
private SearchLoadAndWriteProcessor() {
resetResults();
this.pendingResponders.clear();
this.attemptedLocalLoad = false;
this.netSearchDone = false;
this.isSerialized = false;
this.result = null;
this.key = null;
this.requestInProgress = false;
this.netWriteSucceeded = false;
this.remoteGetInProgress = false;
this.responseQueue = null;
}
/**
* If we have a local cache loader and the region is not global, then invoke the loader If the
* region is local, or the result is non-null, then return whatever the loader returned do a
* netSearch amongst selected peers if netSearch returns a blob, deserialize the blob and return
* that as the result netSearch failed, so all we can do at this point is do a load return result
* from load
*
*/
private void searchAndLoad(EntryEventImpl event, TXStateInterface txState, Object localValue,
boolean preferCD) throws CacheLoaderException, TimeoutException {
RegionAttributes attrs = region.getAttributes();
Scope scope = attrs.getScope();
DataPolicy dataPolicy = attrs.getDataPolicy();
if (txState != null) {
TXEntryState tx = txState.txReadEntry(event.getKeyInfo(), region, false,
true/* create txEntry is absent */);
if (tx != null) {
if (tx.noValueInSystem()) {
// If the tx view has it invalid or destroyed everywhere
// then don't do a netsearch. We want to see the
// transactional view.
load(event, preferCD);
return;
}
}
}
// if mirrored then we can optimize by skipping netsearch in some cases,
// and if also skip netSearch if we find an INVALID token since we
// know it was distributed. (Otherwise it would be a LOCAL_INVALID token)
{
if (localValue == Token.INVALID || dataPolicy.withReplication()) {
load(event, preferCD);
return;
}
}
Object obj = null;
if (!scope.isGlobal()) {
// copy into local var to prevent race condition
CacheLoader loader = ((AbstractRegion) region).basicGetLoader();
if (loader != null) {
obj = doLocalLoad(loader, true, preferCD);
Assert.assertTrue(obj != Token.INVALID && obj != Token.LOCAL_INVALID);
event.setNewValue(obj);
this.isSerialized = false;
this.result = obj;
return;
}
if (scope.isLocal()) {
return;
}
}
netSearchForBlob();
if (this.result != null) {
Assert.assertTrue(this.result != Token.INVALID && this.result != Token.LOCAL_INVALID);
if (this.isSerialized) {
event.setSerializedNewValue((byte[]) this.result);
} else {
event.setNewValue(this.result);
}
event.setVersionTag(this.versionTag);
return;
}
load(event, preferCD);
}
/** perform a net-search, setting this.result to the object found in the search */
private void netSearchForBlob() throws TimeoutException {
if (this.netSearchDone)
return;
this.netSearchDone = true;
CachePerfStats stats = region.getCachePerfStats();
long start = 0;
Set sendSet = null;
this.result = null;
RegionAttributes attrs = region.getAttributes();
// Object aCallbackArgument = null;
this.requestInProgress = true;
this.selectedNodeDead = false;
initRemainingTimeout();
start = stats.startNetsearch();
try {
List<InternalDistributedMember> replicates =
new ArrayList(advisor.adviseInitializedReplicates());
if (replicates.size() > 1) {
Collections.shuffle(replicates);
}
for (InternalDistributedMember replicate : replicates) {
synchronized (this.pendingResponders) {
this.pendingResponders.clear();
}
synchronized (this) {
this.requestInProgress = true;
this.remoteGetInProgress = true;
setSelectedNode(replicate);
this.lastNotifySpot = 0;
sendValueRequest(replicate);
waitForObject2(this.remainingTimeout);
if (this.authorative) {
if (this.result != null) {
this.netSearch = true;
}
return;
} else {
// clear anything that might have been set by our query.
this.selectedNode = null;
this.selectedNodeDead = false;
this.lastNotifySpot = 0;
this.result = null;
}
}
}
synchronized (membersLock) {
Set recipients = this.advisor.adviseNetSearch();
if (recipients.isEmpty()) {
return;
}
ArrayList list = new ArrayList(recipients);
Collections.shuffle(list);
sendSet = new HashSet(list);
synchronized (this.pendingResponders) {
this.pendingResponders.clear();
this.pendingResponders.addAll(list);
}
}
boolean useMulticast = region.getMulticastEnabled() && (region instanceof DistributedRegion)
&& ((DistributedRegion) region).getSystem().getConfig().getMcastPort() != 0;
// moved outside the sync to fix bug 39458
QueryMessage.sendMessage(this, this.regionName, this.key, useMulticast, sendSet,
this.remainingTimeout, attrs.getEntryTimeToLive().getTimeout(),
attrs.getEntryIdleTimeout().getTimeout());
synchronized (this) {
// moved this send back into sync to fix bug 37132
// QueryMessage.sendMessage(this, this.regionName,this.key,useMulticast,
// sendSet,this.remainingTimeout ,
// attrs.getEntryTimeToLive().getTimeout(),
// attrs.getEntryIdleTimeout().getTimeout());
boolean done = false;
do {
waitForObject2(this.remainingTimeout);
if (this.selectedNodeDead && remoteGetInProgress) {
sendNetSearchRequest();
} else
done = true;
} while (!done);
if (this.result != null) {
this.netSearch = true;
}
return;
}
} finally {
stats.endNetsearch(start);
}
}
private void load(EntryEventImpl event, boolean preferCD)
throws CacheLoaderException, TimeoutException {
Object obj = null;
RegionAttributes attrs = this.region.getAttributes();
Scope scope = attrs.getScope();
CacheLoader loader = ((AbstractRegion) region).basicGetLoader();
Assert.assertTrue(scope.isDistributed());
if ((loader != null) && (!scope.isGlobal())) {
obj = doLocalLoad(loader, false, preferCD);
event.setNewValue(obj);
Assert.assertTrue(obj != Token.INVALID && obj != Token.LOCAL_INVALID);
return;
}
if (scope.isGlobal()) {
Assert.assertTrue(this.lock == null);
Set loadCandidatesSet = advisor.adviseNetLoad();
if ((loader == null) && (loadCandidatesSet.isEmpty())) {
// no one has a data Loader. No point getting a lock
return;
}
this.lock = region.getDistributedLock(this.key);
boolean locked = false;
try {
final CancelCriterion stopper = region.getCancelCriterion();
for (;;) {
stopper.checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
locked = this.lock.tryLock(region.getCache().getLockTimeout(), TimeUnit.SECONDS);
if (!locked) {
throw new TimeoutException(
String.format("Timed out locking %s before load",
key));
}
break;
} catch (InterruptedException ignore) {
interrupted = true;
region.getCancelCriterion().checkCancelInProgress(null);
// continue;
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // for
if (loader == null) {
this.localLoad = false;
if (scope.isDistributed()) {
this.isSerialized = false;
obj = doNetLoad();
Assert.assertTrue(obj != Token.INVALID && obj != Token.LOCAL_INVALID);
if (this.isSerialized && obj != null) {
event.setSerializedNewValue((byte[]) obj);
} else {
event.setNewValue(obj);
}
}
} else {
obj = doLocalLoad(loader, false, preferCD);
Assert.assertTrue(obj != Token.INVALID && obj != Token.LOCAL_INVALID);
event.setNewValue(obj);
}
return;
} finally {
// The lock will not actually be released until release is
// called on this processor
if (!locked)
this.lock = null;
}
}
if (scope.isDistributed()) {
// long start = System.currentTimeMillis();
obj = doNetLoad();
if (this.isSerialized && obj != null) {
event.setSerializedNewValue((byte[]) obj);
} else {
Assert.assertTrue(obj != Token.INVALID && obj != Token.LOCAL_INVALID);
event.setNewValue(obj);
}
// long end = System.currentTimeMillis();
}
}
Object doNetLoad() throws CacheLoaderException, TimeoutException {
if (this.netLoadDone)
return null;
this.netLoadDone = true;
if (advisor != null) {
Set loadCandidatesSet = advisor.adviseNetLoad();
if (loadCandidatesSet.isEmpty()) {
return null;
}
CachePerfStats stats = region.getCachePerfStats();
long start = stats.startNetload();
ArrayList list = new ArrayList(loadCandidatesSet);
Collections.shuffle(list);
InternalDistributedMember[] loadCandidates =
(InternalDistributedMember[]) list.toArray(new InternalDistributedMember[0]);
initRemainingTimeout();
RegionAttributes attrs = region.getAttributes();
int index = 0;
boolean stayInLoop = false; // never set to true!
this.remoteLoadInProgress = true;
try {
do { // the only time this loop repeats is when continue is called
InternalDistributedMember next = loadCandidates[index++];
setSelectedNode(next);
this.lastNotifySpot = 0;
this.requestInProgress = true;
if (this.remainingTimeout <= 0) { // @todo this looks wrong; why not a timeout exception?
break;
}
this.remoteException = null;
NetLoadRequestMessage.sendMessage(this, this.regionName, this.key, this.aCallbackArgument,
next, this.remainingTimeout, attrs.getEntryTimeToLive().getTimeout(),
attrs.getEntryIdleTimeout().getTimeout());
waitForObject2(this.remainingTimeout);
if (this.remoteException == null) {
if (!this.requestInProgress) {
// note even if result is null we are done; see bug 39738
this.localLoad = false;
if (this.result != null) {
this.netLoad = true;
}
return this.result;
} else {
// Why does the following test for selectedNodeDead?
// Seems like this will cause us to quit trying netLoad
// even if we don't have a result yet and have not tried everyone.
if ((this.selectedNodeDead) && (index < loadCandidates.length)) {
continue;
}
// otherwise we are done
}
} else {
Throwable cause;
if (this.remoteException instanceof TryAgainException) {
if (index < loadCandidates.length) {
continue;
} else {
break;
}
}
if (this.remoteException instanceof CacheLoaderException) {
cause = this.remoteException.getCause();
} else {
cause = this.remoteException;
}
throw new CacheLoaderException(
String.format("While invoking a remote netLoad: %s",
cause),
cause);
}
} while (stayInLoop);
} finally {
stats.endNetload(start);
}
}
return null;
}
/**
* This exception is just used in the class to tell the caller that it should try again.
* InternalGemFireException used to be used for this which seems dangerous.
*/
private static class TryAgainException extends GemFireException {
////////////////////// Constructors //////////////////////
public TryAgainException() {
super();
}
/**
* Creates a new <code>TryAgainException</code>.
*/
public TryAgainException(String message) {
super(message);
}
}
private Object doLocalLoad(CacheLoader loader, boolean netSearchAllowed, boolean preferCD)
throws CacheLoaderException {
Object obj = null;
if (loader != null && !this.attemptedLocalLoad) {
this.attemptedLocalLoad = true;
CachePerfStats stats = region.getCachePerfStats();
LoaderHelper loaderHelper = this.region.loaderHelperFactory.createLoaderHelper(this.key,
this.aCallbackArgument, netSearchAllowed, true /* netLoadAllowed */, this);
long statStart = stats.startLoad();
try {
obj = loader.load(loaderHelper);
obj = this.region.getCache().convertPdxInstanceIfNeeded(obj, preferCD);
} finally {
stats.endLoad(statStart);
}
if (obj != null) {
this.localLoad = true;
}
}
return obj;
}
/**
* Returns an event for listener notification. The event's operation may be altered to conform to
* the ConcurrentMap implementation specification. If the returned value is not == to the event
* parameter then the caller is responsible for releasing it.
*
* @param event the original event
* @return the original event or a new event having a change in operation
*/
@Retained
private CacheEvent getEventForListener(CacheEvent event) {
Operation op = event.getOperation();
if (!op.isEntry()) {
return event;
} else {
EntryEventImpl r = (EntryEventImpl) event;
@Retained
EntryEventImpl result = r;
if (r.isSingleHop()) {
// fix for bug #46130 - origin remote incorrect for one-hop operation in receiver
result = new EntryEventImpl(r);
result.setOriginRemote(true);
// if this is from a non-replicate and it's not in a tx it should be seen as a Create
// because that's what the sender would use in notifying listeners. bug #46955
if (result.getOperation().isUpdate() && (result.getTransactionId() == null)) {
result.makeCreate();
}
}
if (op == Operation.REPLACE) {
if (result == r)
result = new EntryEventImpl(r);
result.setOperation(Operation.UPDATE);
} else if (op == Operation.PUT_IF_ABSENT) {
if (result == r)
result = new EntryEventImpl(r);
result.setOperation(Operation.CREATE);
} else if (op == Operation.REMOVE) {
if (result == r)
result = new EntryEventImpl(r);
result.setOperation(Operation.DESTROY);
}
return result;
}
}
private boolean doLocalWrite(CacheWriter writer, CacheEvent pevent, int paction)
throws CacheWriterException {
// Return if the inhibit all notifications flag is set
if (pevent instanceof EntryEventImpl) {
if (((EntryEventImpl) pevent).inhibitAllNotifications()) {
if (logger.isDebugEnabled()) {
logger.debug("Notification inhibited for key {}", pevent);
}
return false;
}
}
@Released
CacheEvent event = getEventForListener(pevent);
int action = paction;
if (event.getOperation().isCreate() && action == BEFOREUPDATE) {
action = BEFORECREATE;
}
if (event instanceof EntryEventImpl) {
((EntryEventImpl) event).setReadOldValueFromDisk(true);
}
try {
switch (action) {
case BEFORECREATE:
writer.beforeCreate((EntryEvent) event);
break;
case BEFOREDESTROY:
writer.beforeDestroy((EntryEvent) event);
break;
case BEFOREUPDATE:
writer.beforeUpdate((EntryEvent) event);
break;
case BEFOREREGIONDESTROY:
writer.beforeRegionDestroy((RegionEvent) event);
break;
case BEFOREREGIONCLEAR:
writer.beforeRegionClear((RegionEvent) event);
break;
default:
break;
}
} finally {
if (event instanceof EntryEventImpl) {
((EntryEventImpl) event).setReadOldValueFromDisk(false);
}
if (event != pevent) {
if (event instanceof EntryEventImpl) {
((Releasable) event).release();
}
}
}
this.localWrite = true;
return true;
}
/** Return true if cache writer was invoked */
private boolean netWrite(CacheEvent event, int action, Set writeCandidateSet)
throws CacheWriterException, TimeoutException {
if (writeCandidateSet == null || writeCandidateSet.isEmpty()) {
return false;
}
ArrayList list = new ArrayList(writeCandidateSet);
Collections.shuffle(list);
InternalDistributedMember[] writeCandidates =
(InternalDistributedMember[]) list.toArray(new InternalDistributedMember[0]);
initRemainingTimeout();
int index = 0;
do {
InternalDistributedMember next = writeCandidates[index++];
Set set = new HashSet();
set.add(next);
this.netWriteSucceeded = false;
this.requestInProgress = true;
this.remoteException = null;
NetWriteRequestMessage.sendMessage(this, this.regionName, this.remainingTimeout, event, set,
action);
if (this.remainingTimeout <= 0) { // @todo: should this throw a timeout exception?
break;
}
waitForObject2(this.remainingTimeout);
if (this.netWriteSucceeded) {
this.netWrite = true;
break;
}
if (this.remoteException != null) {
Throwable cause;
if (this.remoteException instanceof TryAgainException) {
if (index < writeCandidates.length) {
continue;
} else {
break;
}
}
if (this.remoteException instanceof CacheWriterException
&& this.remoteException.getCause() != null) {
cause = this.remoteException.getCause();
} else {
cause = this.remoteException;
}
throw new CacheWriterException(
String.format("While invoking a remote netWrite: %s",
cause),
cause);
}
} while (index < writeCandidates.length);
return this.netWriteSucceeded;
}
/**
* process a QueryMessage netsearch response
*
* @param versionTag TODO
*/
protected synchronized void incomingResponse(Object obj, long lastModifiedTime, boolean isPresent,
boolean serialized, final boolean requestorTimedOut, final InternalDistributedMember sender,
ClusterDistributionManager dm, VersionTag versionTag) {
// NOTE: keep this method efficient since it is optimized
// by executing it in the p2p reader.
// This is done with this line in DistributionMessage.java:
// || c.equals(SearchLoadAndWriteProcessor.ResponseMessage.class)
// bug 35266 - don't pay attention to late breaking "get" responses
if (this.remoteLoadInProgress) {
if (logger.isDebugEnabled()) {
logger.debug("Ignoring netsearch response from {} because we're now doing a netload",
sender);
}
return;
}
if (this.pendingResponders.isEmpty()) {
return;
}
if (!this.pendingResponders.remove(sender)) {
return;
} else {
if (logger.isDebugEnabled()) {
// only log the message if it's from a recipient we're waiting for.
// it could have been multicast to all members, which would give us
// one response per member
logger.debug(
"Processing response for processorId={}, isPresent is {}, sender is {}, key is {}, value is {}, version is {}",
this.processorId, isPresent, sender, this.key, serialized, versionTag);
}
}
// Another thread got a response and that contained the value.
// Ignore this response.
if (this.result != null) {
return;
}
if (isPresent) {
if (obj != null) {
Assert.assertTrue(obj != Token.INVALID && obj != Token.LOCAL_INVALID);
synchronized (this) {
this.result = obj;
this.lastModified = lastModifiedTime;
this.isSerialized = serialized;
this.versionTag = versionTag;
signalDone();
return;
}
} else {
if (!remoteGetInProgress) {
// send a message to this responder asking for the value
// do this on the waiting pool in case the send blocks
try {
dm.getExecutors().getWaitingThreadPool().execute(new Runnable() {
@Override
public void run() {
sendValueRequest(sender);
}
});
// need to do this here before releasing sync to fix bug 37132
this.requestInProgress = true;
this.remoteGetInProgress = true;
setSelectedNode(sender);
return; // sendValueRequest does the rest of the work
} catch (RejectedExecutionException ignore) {
// just fall through since we must be shutting down.
}
}
if (responseQueue == null)
responseQueue = new LinkedList();
if (logger.isDebugEnabled()) {
logger.debug("Saving isPresent response, requestInProgress {}", sender);
}
responseQueue.add(sender);
}
}
if (requestorTimedOut) {
signalTimedOut();
}
boolean endRequest = false;
if (this.pendingResponders.isEmpty() && (!remoteGetInProgress)) {
endRequest = true;
}
if (endRequest) {
signalDone();
}
}
protected synchronized void sendValueRequest(final InternalDistributedMember sender) {
// send a message to this responder asking for the value
// do this on the waiting pool in case the send blocks
// Always attempt to send the message to fix bug 37149
RegionAttributes attrs = this.region.getAttributes();
NetSearchRequestMessage.sendMessage(this, this.regionName, this.key, sender,
this.remainingTimeout, attrs.getEntryTimeToLive().getTimeout(),
attrs.getEntryIdleTimeout().getTimeout());
// if it turns out that we can't send a message to this member then
// our membership listener should save the day and schedule a send
// to someone else or give up and report a failed search.
}
// @todo creation times need to be handled properly
/**
* This is the response from the accepted responder. Grab the result and store it.Unlike 2.0 where
* the the response was a 2 phase operation, here it is a single phase operation.
*/
protected void incomingNetLoadReply(Object obj, long lastModifiedTime, Object callbackArg,
Exception e, boolean serialized, boolean requestorTimedOut) {
synchronized (this) {
if (requestorTimedOut) {
signalTimedOut();
return;
}
this.result = obj;
this.lastModified = lastModifiedTime;
this.remoteException = e;
this.aCallbackArgument = callbackArg;
computeRemainingTimeout();
this.isSerialized = serialized;
signalDone();
}
}
@SuppressWarnings("hiding")
protected synchronized void incomingNetSearchReply(byte[] value, long lastModifiedTime,
boolean serialized, boolean requestorTimedOut, boolean authorative, VersionTag versionTag,
InternalDistributedMember responder) {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (departedMembers != null && departedMembers.contains(responder)) {
if (isDebugEnabled) {
logger.debug("ignore the reply received from a departed member");
}
return;
}
if (this.requestInProgress) {
if (requestorTimedOut) {
// Force a timeout exception.
if (isDebugEnabled) {
logger.debug("incomingNetSearchReply() - requestorTimedOut {}", this);
}
signalTimedOut();
}
computeRemainingTimeout();
if (value != null || authorative) {
synchronized (this) {
this.result = value;
this.lastModified = lastModifiedTime;
this.isSerialized = serialized;
this.remoteGetInProgress = false;
this.authorative = authorative;
this.versionTag = versionTag;
if (isDebugEnabled) {
logger.debug("incomingNetSearchReply() - got obj {}", this);
}
signalDone();
}
} else if (this.remainingTimeout <= 0) {
this.remoteGetInProgress = false;
if (isDebugEnabled) {
logger.debug("incomingNetSearchReply() - null obj, no more time {}", this);
}
signalDone(); // @todo: is this a bug? should call signalTimedOut?
} else {
if (isDebugEnabled) {
logger.debug("incomingNetSearchReply() - null obj, sendNetSearchRequest {}", this);
}
sendNetSearchRequest();
}
} else {
if (isDebugEnabled) {
logger.debug("incomingNetSearchReply() - requestInProgress is false {}", this);
}
// should we checkIfDone?
// sure why not?
checkIfDone();
}
}
/**
* Return the next responder on the responseQueue, or null if empty
*/
private InternalDistributedMember nextAppropriateResponder() {
if (responseQueue != null && responseQueue.size() > 0) {
return (InternalDistributedMember) responseQueue.remove(0);
} else {
return null;
}
}
private synchronized void sendNetSearchRequest() {
InternalDistributedMember nextResponder = nextAppropriateResponder();
if (nextResponder != null) {
// Make a request to the next responder in the queue
RegionAttributes attrs = this.region.getAttributes();
setSelectedNode(nextResponder);
this.requestInProgress = true;
this.remoteGetInProgress = true;
NetSearchRequestMessage.sendMessage(this, this.regionName, this.key, nextResponder,
this.remainingTimeout, attrs.getEntryTimeToLive().getTimeout(),
attrs.getEntryIdleTimeout().getTimeout());
} else {
this.remoteGetInProgress = false;
checkIfDone();
}
}
/**
* This is the response from the accepted responder.
*/
protected void incomingNetWriteReply(boolean netWriteSuccessful, Exception e, boolean exe) {
synchronized (this) {
this.remoteException = e;
this.netWriteSucceeded = netWriteSuccessful;
computeRemainingTimeout();
signalDone();
}
}
private synchronized void initRemainingTimeout() {
this.remainingTimeout = this.timeout * 1000;
this.startTimeSnapShot = this.distributionManager.cacheTimeMillis();
}
private synchronized void computeRemainingTimeout() {
if (this.startTimeSnapShot > 0) { // @todo this should be an assertion
this.endTimeSnapShot = this.distributionManager.cacheTimeMillis();
long delta = this.endTimeSnapShot - this.startTimeSnapShot;
if (delta > 0) {
this.remainingTimeout -= delta;
}
this.startTimeSnapShot = this.endTimeSnapShot;
}
}
private synchronized void waitForObject2(final int timeoutMs) throws TimeoutException {
if (this.requestInProgress) {
try {
final DistributionManager dm =
this.region.getCache().getInternalDistributedSystem().getDistributionManager();
long waitTimeMs = timeoutMs;
final long endTime = System.currentTimeMillis() + waitTimeMs;
for (;;) {
if (!this.requestInProgress) {
return;
}
if (waitTimeMs <= 0) {
throw new TimeoutException(
String.format(
"Timed out while doing netsearch/netload/netwrite processorId= %s Key is %s",
new Object[] {this.processorId, this.key}));
}
boolean interrupted = Thread.interrupted();
int lastNS = this.lastNotifySpot;
try {
{
boolean done = (lastNS != 0);
while (!done && waitTimeMs > 0) {
this.region.getCancelCriterion().checkCancelInProgress(null);
interrupted = Thread.interrupted() || interrupted;
long wt = Math.min(RETRY_TIME, waitTimeMs);
wait(wt); // spurious wakeup ok
lastNS = this.lastNotifySpot;
done = (lastNS != 0);
if (!done) {
// calc remaing wait time to fix bug 37196
waitTimeMs = endTime - System.currentTimeMillis();
}
} // while
if (done) {
this.lastNotifySpot = 0;
}
}
if (this.requestInProgress && !this.selectedNodeDead) {
// added the test of "!this.selectedNodeDead" for bug 37196
StringBuilder sb = new StringBuilder(200);
sb.append("processorId=").append(this.processorId);
sb.append(" Key is ").append(this.key);
sb.append(" searchTimeoutMs ").append(timeoutMs);
if (waitTimeMs > 0) {
sb.append(" msRemaining=").append(waitTimeMs);
}
if (lastNS != 0) {
sb.append(" lastNotifySpot=").append(lastNS);
}
throw new TimeoutException(
String.format("Timeout during netsearch/netload/netwrite. Details: %s",
sb));
}
return;
} catch (InterruptedException ignore) {
interrupted = true;
region.getCancelCriterion().checkCancelInProgress(null);
// keep waiting until we are done
waitTimeMs = endTime - System.currentTimeMillis();
// continue
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // for
} finally {
computeRemainingTimeout();
}
} // requestInProgress
}
private int getSearchTimeout() {
return region.getCache().getSearchTimeout();
}
private void resetResults() {
this.netSearch = false;
this.netLoad = false;
this.localLoad = false;
this.localWrite = false;
this.netWrite = false;
this.lastModified = 0;
this.isSerialized = false;
}
// private AcceptHelper getAcceptHelper(boolean ackPortInit) {
// AcceptHelper helper = null;
// synchronized(availableAcceptHelperSet) {
// if (availableAcceptHelperSet.size() <= 0) {
// helper = new AcceptHelper();
// }
// else {
// helper = (AcceptHelper)availableAcceptHelperSet.iterator().next();
// availableAcceptHelperSet.remove(helper);
// }
// }
// helper.reset(ackPortInit);
// return helper;
//
// }
// private void releaseAcceptHelper(AcceptHelper helper) {
// synchronized(availableAcceptHelperSet) {
// if (!availableAcceptHelperSet.contains(helper))
// availableAcceptHelperSet.add(helper);
// }
//
// }
@Override
public String toString() {
return super.toString() + " processorId " + this.processorId;
}
/**
* methods to set/remove htree reference (Bug 40299).
*/
protected static void setClearCountReference(LocalRegion rgn) {
DiskRegion dr = rgn.getDiskRegion();
if (dr != null) {
dr.setClearCountReference();
}
}
protected static void removeClearCountReference(LocalRegion rgn) {
DiskRegion dr = rgn.getDiskRegion();
if (dr != null) {
dr.removeClearCountReference();
}
}
/**
* A QueryMessage is broadcast to every node that has the region defined, to find out who has a
* valid copy of the requested object.
*/
public static class QueryMessage extends SerialDistributionMessage {
/**
* The object id of the processor object on the initiator node. This will be communicated back
* in the response to enable transferring the result to the initiating VM.
*/
private int processorId;
/** The fully qualified name of the Region */
private String regionName;
/** The object name */
private Object key;
/** Amount of time to wait before giving up */
private int timeoutMs;
/** The originator's expiration criteria */
private int ttl, idleTime;
/**
* if true then always send back value even if it is large. Added for bug 35942.
*/
private boolean alwaysSendResult;
// using available bitmask flags
private static final short HAS_TTL = UNRESERVED_FLAGS_START;
private static final short HAS_IDLE_TIME = (HAS_TTL << 1);
private static final short ALWAYS_SEND_RESULT = (HAS_IDLE_TIME << 1);
public QueryMessage() {
// do nothing
}
/**
* Using a new or pooled message instance, create and send the query to all nodes.
*/
public static void sendMessage(SearchLoadAndWriteProcessor processor, String regionName,
Object key, boolean multicast, Set recipients, int timeoutMs, int ttl, int idleTime) {
// create a message
QueryMessage msg = new QueryMessage();
msg.initialize(processor, regionName, key, multicast, timeoutMs, ttl, idleTime);
msg.setRecipients(recipients);
if (!multicast && recipients.size() == 1) {
// we are only searching one recipient, so tell it to send the value
msg.alwaysSendResult = true;
}
processor.distributionManager.putOutgoing(msg);
}
private void initialize(SearchLoadAndWriteProcessor processor, String theRegionName,
Object theKey, boolean multicast, int theTimeoutMs, int theTtl, int theIdleTime) {
this.processorId = processor.processorId;
this.regionName = theRegionName;
setMulticast(multicast);
this.key = theKey;
this.timeoutMs = theTimeoutMs;
this.ttl = theTtl;
this.idleTime = theIdleTime;
Assert.assertTrue(processor.region.getScope().isDistributed());
}
/**
* This method execute's on the receiver's node, and checks to see if the requested object
* exists in shared memory on this node, and if so, sends back a ResponseMessage.
*/
@Override
protected void process(ClusterDistributionManager dm) {
doGet(dm);
}
@Override
public int getDSFID() {
return QUERY_MESSAGE;
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
short flags = 0;
if (this.processorId != 0)
flags |= HAS_PROCESSOR_ID;
if (this.ttl != 0)
flags |= HAS_TTL;
if (this.idleTime != 0)
flags |= HAS_IDLE_TIME;
if (this.alwaysSendResult)
flags |= ALWAYS_SEND_RESULT;
out.writeShort(flags);
if (this.processorId != 0) {
out.writeInt(this.processorId);
}
out.writeUTF(this.regionName);
DataSerializer.writeObject(this.key, out);
out.writeInt(this.timeoutMs);
if (this.ttl != 0) {
InternalDataSerializer.writeSignedVL(this.ttl, out);
}
if (this.idleTime != 0) {
InternalDataSerializer.writeSignedVL(this.idleTime, out);
}
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
short flags = in.readShort();
if ((flags & HAS_PROCESSOR_ID) != 0) {
this.processorId = in.readInt();
ReplyProcessor21.setMessageRPId(this.processorId);
}
this.regionName = in.readUTF();
this.key = DataSerializer.readObject(in);
this.timeoutMs = in.readInt();
if ((flags & HAS_TTL) != 0) {
this.ttl = (int) InternalDataSerializer.readSignedVL(in);
}
if ((flags & HAS_IDLE_TIME) != 0) {
this.idleTime = (int) InternalDataSerializer.readSignedVL(in);
}
this.alwaysSendResult = (flags & ALWAYS_SEND_RESULT) != 0;
}
@Override
public String toString() {
return "SearchLoadAndWriteProcessor.QueryMessage for \"" + this.key + "\" in region \""
+ this.regionName + "\", processorId " + processorId + ", timeoutMs=" + this.timeoutMs
+ ", ttl=" + this.ttl + ", idleTime=" + this.idleTime;
}
private void doGet(ClusterDistributionManager dm) {
long startTime = dm.cacheTimeMillis();
// boolean retVal = true;
boolean isPresent = false;
boolean sendResult = false;
boolean isSer = false;
long lastModifiedCacheTime = 0;
boolean requestorTimedOut = false;
VersionTag tag = null;
if (dm.getDMType() == ClusterDistributionManager.ADMIN_ONLY_DM_TYPE
|| getSender().equals(dm.getDistributionManagerId())) {
// this was probably a multicast message
// replyWithNull(dm); - bug 35266: don't send a reply
return;
}
final InitializationLevel oldLevel =
LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE);
try {
// check to see if we would have to wait on initialization latch (if global)
// if so abort and reply with null
InternalCache cache = dm.getExistingCache();
if (cache.isGlobalRegionInitializing(this.regionName)) {
replyWithNull(dm);
if (logger.isDebugEnabled()) {
logger.debug("Global Region not initialized yet");
}
return;
}
LocalRegion region = (LocalRegion) dm.getExistingCache().getRegion(this.regionName);
Object o = null;
if (region != null) {
setClearCountReference(region);
try {
RegionEntry entry = region.basicGetEntry(this.key);
if (entry != null) {
synchronized (entry) {
assert region.isInitialized();
if (dm.cacheTimeMillis() - startTime < timeoutMs) {
o = region.getNoLRU(this.key, false, true, true); // OFFHEAP: incrc, copy bytes,
// decrc
if (o != null && !Token.isInvalid(o) && !Token.isRemoved(o)
&& !region.isExpiredWithRegardTo(this.key, this.ttl, this.idleTime)) {
isPresent = true;
VersionStamp stamp = entry.getVersionStamp();
if (stamp != null && stamp.hasValidVersion()) {
tag = stamp.asVersionTag();
}
lastModifiedCacheTime = entry.getLastModified();
isSer = o instanceof CachedDeserializable;
if (isSer) {
o = ((CachedDeserializable) o).getSerializedValue();
}
if (isPresent && (this.alwaysSendResult
|| (ObjectSizer.DEFAULT.sizeof(o) < SMALL_BLOB_SIZE))) {
sendResult = true;
}
}
} else {
requestorTimedOut = true;
}
}
} else if (logger.isDebugEnabled()) {
logger.debug("Entry is null");
}
} finally {
removeClearCountReference(region);
}
}
ResponseMessage.sendMessage(this.key, this.getSender(), processorId,
(sendResult ? o : null), lastModifiedCacheTime, isPresent, isSer, requestorTimedOut, dm,
tag);
} catch (RegionDestroyedException ignore) {
logger.debug("Region Destroyed Exception in QueryMessage doGet, null");
replyWithNull(dm);
} catch (CancelException ignore) {
logger.debug("CacheClosedException in QueryMessage doGet, null");
replyWithNull(dm);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.debug("Throwable in QueryMessage doGet, null", t);
replyWithNull(dm);
} finally {
LocalRegion.setThreadInitLevelRequirement(oldLevel);
}
}
private void replyWithNull(ClusterDistributionManager dm) {
ResponseMessage.sendMessage(this.key, this.getSender(), processorId, null, 0, false, false,
false, dm, null);
}
}
/**
* The ResponseMessage is a reply to a QueryMessage, and contains the object's value, if it is
* below the byte limit, otherwise an indication of whether the sender has the value.
*/
public static class ResponseMessage extends HighPriorityDistributionMessage {
private Object key;
/** The gemfire id of the SearchLoadAndWrite object waiting for response */
private int processorId;
/** The value being transferred */
private Object result;
/** Object creation time on remote node */
private long lastModified;
/** is the value present */
private boolean isPresent;
/** Is blob serialized? */
private boolean isSerialized;
/** did the request time out at the sender */
private boolean requestorTimedOut;
/** the version of the object being returned */
private VersionTag versionTag;
public ResponseMessage() {}
public static void sendMessage(Object key, InternalDistributedMember recipient, int processorId,
Object result, long lastModified, boolean isPresent, boolean isSerialized,
boolean requestorTimedOut, ClusterDistributionManager distributionManager,
VersionTag versionTag) {
// create a message
ResponseMessage msg = new ResponseMessage();
msg.initialize(key, processorId, result, lastModified, isPresent, isSerialized,
requestorTimedOut, versionTag);
msg.setRecipient(recipient);
distributionManager.putOutgoing(msg);
}
private void initialize(Object theKey, int theProcessorId, Object theResult,
long lastModifiedTime, boolean ispresent, boolean isserialized,
boolean requestorTimedOutFlag, VersionTag versionTag) {
this.key = theKey;
this.processorId = theProcessorId;
this.result = theResult;
this.lastModified = lastModifiedTime;
this.isPresent = ispresent;
this.isSerialized = isserialized;
this.requestorTimedOut = requestorTimedOutFlag;
this.versionTag = versionTag;
}
/**
* Invoked on the receiver - which, in this case, was the initiator of the QueryMessage .
*/
@Override
protected void process(ClusterDistributionManager dm) {
// NOTE: keep this method efficient since it is optimized
// by executing it in the p2p reader.
// This is done with this line in DistributionMessage.java:
// || c.equals(SearchLoadAndWriteProcessor.ResponseMessage.class)
SearchLoadAndWriteProcessor processor = null;
processor = (SearchLoadAndWriteProcessor) getProcessorKeeper().retrieve(this.processorId);
if (processor == null) {
if (logger.isDebugEnabled()) {
logger.debug("Response() SearchLoadAndWriteProcessor no longer exists");
}
return;
}
long lastModifiedSystemTime = 0;
if (this.lastModified != 0) {
lastModifiedSystemTime = this.lastModified;
}
if (this.versionTag != null) {
this.versionTag.replaceNullIDs(getSender());
}
processor.incomingResponse(this.result, lastModifiedSystemTime, this.isPresent,
this.isSerialized, this.requestorTimedOut, this.getSender(), dm, versionTag);
}
@Override
public boolean getInlineProcess() { // optimization for bug 37075
return true;
}
@Override
public int getDSFID() {
return RESPONSE_MESSAGE;
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
DataSerializer.writeObject(this.key, out);
out.writeInt(this.processorId);
DataSerializer.writeObject(this.result, out);
out.writeLong(this.lastModified);
out.writeBoolean(this.isPresent);
out.writeBoolean(this.isSerialized);
out.writeBoolean(this.requestorTimedOut);
DataSerializer.writeObject(this.versionTag, out);
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.key = DataSerializer.readObject(in);
this.processorId = in.readInt();
this.result = DataSerializer.readObject(in);
this.lastModified = in.readLong();
this.isPresent = in.readBoolean();
this.isSerialized = in.readBoolean();
this.requestorTimedOut = in.readBoolean();
this.versionTag = (VersionTag) DataSerializer.readObject(in);
}
@Override
public String toString() {
return "SearchLoadAndWriteProcessor.ResponseMessage for processorId " + processorId
+ ", blob is " + this.result + ", isPresent is " + isPresent + ", requestorTimedOut is "
+ requestorTimedOut + ", version is " + versionTag;
}
}
/********************* NetSearchRequestMessage ***************************************/
public static class NetSearchRequestMessage extends PooledDistributionMessage {
/**
* The object id of the processor object on the initiator node. This will be communicated back
* in the response to enable transferring the result to the initiating VM.
*/
private int processorId;
/** The fully qualified name of the Region */
private String regionName;
/** The object name */
private Object key;
/** Amount of time to wait before giving up */
private int timeoutMs;
/** The originator's expiration criteria */
private int ttl, idleTime;
// using available bitmask flags
private static final short HAS_TTL = UNRESERVED_FLAGS_START;
private static final short HAS_IDLE_TIME = (HAS_TTL << 1);
public NetSearchRequestMessage() {}
/**
* Using a new or pooled message instance, create and send the request for object value to the
* specified node.
*/
public static void sendMessage(SearchLoadAndWriteProcessor processor, String regionName,
Object key, InternalDistributedMember recipient, int timeoutMs, int ttl, int idleTime) {
// create a message
NetSearchRequestMessage msg = new NetSearchRequestMessage();
msg.initialize(processor, regionName, key, timeoutMs, ttl, idleTime);
msg.setRecipient(recipient);
processor.distributionManager.putOutgoing(msg);
}
void initialize(SearchLoadAndWriteProcessor processor, String theRegionName, Object theKey,
int timeoutMS, int ttlMS, int idleTimeMS) {
this.processorId = processor.processorId;
this.regionName = theRegionName;
this.key = theKey;
this.timeoutMs = timeoutMS;
this.ttl = ttlMS;
this.idleTime = idleTimeMS;
Assert.assertTrue(processor.region.getScope().isDistributed());
}
/** Invoked on the node that has the object */
@Override
protected void process(ClusterDistributionManager dm) {
doGet(dm);
}
@Override
public int getDSFID() {
return NET_SEARCH_REQUEST_MESSAGE;
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
short flags = 0;
if (this.processorId != 0)
flags |= HAS_PROCESSOR_ID;
if (this.ttl != 0)
flags |= HAS_TTL;
if (this.idleTime != 0)
flags |= HAS_IDLE_TIME;
out.writeShort(flags);
if (this.processorId != 0) {
out.writeInt(this.processorId);
}
out.writeUTF(this.regionName);
DataSerializer.writeObject(this.key, out);
out.writeInt(this.timeoutMs);
if (this.ttl != 0) {
InternalDataSerializer.writeSignedVL(this.ttl, out);
}
if (this.idleTime != 0) {
InternalDataSerializer.writeSignedVL(this.idleTime, out);
}
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
short flags = in.readShort();
if ((flags & HAS_PROCESSOR_ID) != 0) {
this.processorId = in.readInt();
ReplyProcessor21.setMessageRPId(this.processorId);
}
this.regionName = in.readUTF();
this.key = DataSerializer.readObject(in);
this.timeoutMs = in.readInt();
if ((flags & HAS_TTL) != 0) {
this.ttl = (int) InternalDataSerializer.readSignedVL(in);
}
if ((flags & HAS_IDLE_TIME) != 0) {
this.idleTime = (int) InternalDataSerializer.readSignedVL(in);
}
}
@Override
public String toString() {
return "SearchLoadAndWriteProcessor.NetSearchRequestMessage for \"" + this.key
+ "\" in region \"" + this.regionName + "\", processorId " + processorId;
}
void doGet(ClusterDistributionManager dm) {
long startTime = dm.cacheTimeMillis();
// boolean retVal = true;
byte[] ebv = null;
Object ebvObj = null;
int ebvLen = 0;
long lastModifiedCacheTime = 0;
boolean isSer = false;
boolean requestorTimedOut = false;
boolean authoritative = false;
VersionTag versionTag = null;
final InitializationLevel oldLevel =
LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE);
try {
LocalRegion region = (LocalRegion) dm.getExistingCache().getRegion(this.regionName);
if (region != null) {
setClearCountReference(region);
try {
boolean initialized = region.isInitialized();
RegionEntry entry = region.basicGetEntry(this.key);
if (entry != null) {
synchronized (entry) {
// get the value and version under synchronization so they don't change
VersionStamp versionStamp = entry.getVersionStamp();
if (versionStamp != null) {
versionTag = versionStamp.asVersionTag();
}
Object eov = region.getNoLRU(this.key, false, true, true); // OFFHEAP: incrc, copy
// bytes, decrc
if (eov != null) {
if (eov == Token.INVALID || eov == Token.LOCAL_INVALID) {
// nothing?
} else if (dm.cacheTimeMillis() - startTime < timeoutMs) {
if (!region.isExpiredWithRegardTo(this.key, this.ttl, this.idleTime)) {
lastModifiedCacheTime = entry.getLastModified();
if (eov instanceof CachedDeserializable) {
CachedDeserializable cd = (CachedDeserializable) eov;
if (!cd.isSerialized()) {
isSer = false;
ebv = (byte[]) cd.getDeserializedForReading();
ebvLen = ebv.length;
} else {
// don't serialize here if it is not already serialized
Object tmp = cd.getValue();
if (tmp instanceof byte[]) {
byte[] bb = (byte[]) tmp;
ebv = bb;
ebvLen = bb.length;
} else {
ebvObj = tmp;
}
isSer = true;
}
} else if (!(eov instanceof byte[])) {
ebvObj = eov;
isSer = true;
} else {
ebv = (byte[]) eov;
ebvLen = ebv.length;
}
}
} else {
requestorTimedOut = true;
}
}
}
}
authoritative =
region.getDataPolicy().withReplication() && initialized && !region.isDestroyed;
} finally {
removeClearCountReference(region);
}
}
NetSearchReplyMessage.sendMessage(NetSearchRequestMessage.this.getSender(), processorId,
this.key, ebv, ebvObj, ebvLen, lastModifiedCacheTime, isSer, requestorTimedOut,
authoritative, dm, versionTag);
} catch (RegionDestroyedException ignore) {
replyWithNull(dm);
} catch (CancelException ignore) {
replyWithNull(dm);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.warn("Unexpected exception creating net search reply", t);
replyWithNull(dm);
} finally {
LocalRegion.setThreadInitLevelRequirement(oldLevel);
}
}
private void replyWithNull(ClusterDistributionManager dm) {
NetSearchReplyMessage.sendMessage(NetSearchRequestMessage.this.getSender(), processorId,
this.key, null, null, 0, 0, false, false, false, dm, null);
}
}
/**
* The NetSearchReplyMessage is a reply to a NetSearchRequestMessage, and contains the object's
* value.
*/
public static class NetSearchReplyMessage extends HighPriorityDistributionMessage {
private static final byte SERIALIZED = 0x01;
private static final byte REQUESTOR_TIMEOUT = 0x02;
private static final byte AUTHORATIVE = 0x04;
private static final byte VERSIONED = 0x08;
private static final byte PERSISTENT = 0x10;
/** The gemfire id of the SearchLoadAndWrite object waiting for response */
private int processorId;
/** The object value being transferred */
private byte[] value;
private transient Object valueObj; // only used by toData
private transient int valueLen; // only used by toData
/** Object creation time on remote node */
private long lastModified;
/** Is blob serialized? */
private boolean isSerialized;
/** did the request time out at the sender */
private boolean requestorTimedOut;
/**
* Does this member authoritatively know the value? This is used to distinguish a null response
* indicating the region was missing vs. a null value.
*/
private boolean authoritative;
/** the version of the returned entry */
private VersionTag versionTag;
public NetSearchReplyMessage() {}
public static void sendMessage(InternalDistributedMember recipient, int processorId, Object key,
byte[] value, Object valueObj, int valueLen, long lastModified, boolean isSerialized,
boolean requestorTimedOut, boolean authoritative,
ClusterDistributionManager distributionManager, VersionTag versionTag) {
// create a message
NetSearchReplyMessage msg = new NetSearchReplyMessage();
msg.initialize(processorId, value, valueObj, valueLen, lastModified, isSerialized,
requestorTimedOut, authoritative, versionTag);
msg.setRecipient(recipient);
distributionManager.putOutgoing(msg);
}
@SuppressWarnings("hiding")
private void initialize(int procId, byte[] theValue, Object theValueObj, int valueObjLen,
long lastModifiedTime, boolean isserialized, boolean requestorTimedout,
boolean authoritative, VersionTag versionTag) {
this.processorId = procId;
this.value = theValue;
this.valueObj = theValueObj;
this.valueLen = valueObjLen;
this.lastModified = lastModifiedTime;
this.isSerialized = isserialized;
this.requestorTimedOut = requestorTimedout;
this.authoritative = authoritative;
this.versionTag = versionTag;
}
/**
* Invoked on the receiver - which, in this case, was the initiator of the
* NetSearchRequestMessage. This concludes the net request, by communicating an object value.
*/
@Override
protected void process(ClusterDistributionManager dm) {
SearchLoadAndWriteProcessor processor = null;
processor = (SearchLoadAndWriteProcessor) getProcessorKeeper().retrieve(processorId);
if (processor == null) {
if (logger.isDebugEnabled()) {
logger.debug("NetSearchReplyMessage() SearchLoadAndWriteProcessor {} no longer exists",
processorId);
}
return;
}
long lastModifiedSystemTime = 0;
if (this.lastModified != 0) {
lastModifiedSystemTime = this.lastModified;
}
if (this.versionTag != null) {
this.versionTag.replaceNullIDs(getSender());
}
processor.incomingNetSearchReply(this.value, lastModifiedSystemTime, this.isSerialized,
this.requestorTimedOut, this.authoritative, this.versionTag, getSender());
}
@Override
public int getDSFID() {
return NET_SEARCH_REPLY_MESSAGE;
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
out.writeInt(this.processorId);
if (this.valueObj != null) {
DataSerializer.writeObjectAsByteArray(this.valueObj, out);
} else {
DataSerializer.writeByteArray(this.value, this.valueLen, out);
}
out.writeLong(this.lastModified);
byte booleans = 0;
if (this.isSerialized)
booleans |= SERIALIZED;
if (this.requestorTimedOut)
booleans |= REQUESTOR_TIMEOUT;
if (this.authoritative)
booleans |= AUTHORATIVE;
if (this.versionTag != null)
booleans |= VERSIONED;
if (this.versionTag instanceof DiskVersionTag)
booleans |= PERSISTENT;
out.writeByte(booleans);
if (this.versionTag != null) {
InternalDataSerializer.invokeToData(this.versionTag, out);
}
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.processorId = in.readInt();
this.value = DataSerializer.readByteArray(in);
if (this.value != null) {
this.valueLen = this.value.length;
}
this.lastModified = in.readLong();
byte booleans = in.readByte();
this.isSerialized = (booleans & SERIALIZED) != 0;
this.requestorTimedOut = (booleans & REQUESTOR_TIMEOUT) != 0;
this.authoritative = (booleans & AUTHORATIVE) != 0;
if ((booleans & VERSIONED) != 0) {
boolean persistentTag = (booleans & PERSISTENT) != 0;
this.versionTag = VersionTag.create(persistentTag, in);
}
}
@Override
public String toString() {
return "SearchLoadAndWriteProcessor.NetSearchReplyMessage for processorId " + processorId
+ ", blob is " +
// this.value
(this.value == null ? "null" : "(" + this.value.length + " bytes)") + " authorative="
+ authoritative + " versionTag=" + this.versionTag;
}
}
/******************************** NetLoadRequestMessage **********************/
public static class NetLoadRequestMessage extends PooledDistributionMessage {
/**
* The object id of the processor object on the initiator node. This will be communicated back
* in the response to enable transferring the result to the initiating VM.
*/
private int processorId;
/** The fully qualified name of the Region */
private String regionName;
/** The object name */
private Object key;
/** Parameter to use when invoking loader */
private Object aCallbackArgument;
/** Amount of time to wait before giving up */
private int timeoutMs;
/** The originator's expiration criteria */
private int ttl, idleTime;
public NetLoadRequestMessage() {}
/**
* Using a new or pooled message instance, create and send the request for object value to the
* specified node.
*/
public static void sendMessage(SearchLoadAndWriteProcessor processor, String regionName,
Object key, Object aCallbackArgument, InternalDistributedMember recipient, int timeoutMs,
int ttl, int idleTime) {
// create a message
NetLoadRequestMessage msg = new NetLoadRequestMessage();
msg.initialize(processor, regionName, key, aCallbackArgument, timeoutMs, ttl, idleTime);
msg.setRecipient(recipient);
try {
processor.distributionManager.putOutgoing(msg);
} catch (InternalGemFireException e) {
throw new IllegalArgumentException(
"Message not serializable");
}
}
private void initialize(SearchLoadAndWriteProcessor processor, String theRegionName,
Object theKey, Object callbackArgument, int timeoutMS, int ttlMS, int idleTimeMS) {
this.processorId = processor.processorId;
this.regionName = theRegionName;
this.key = theKey;
this.aCallbackArgument = callbackArgument;
this.timeoutMs = timeoutMS;
this.ttl = ttlMS;
this.idleTime = idleTimeMS;
Assert.assertTrue(processor.region.getScope().isDistributed());
}
/** Invoked on the node that has the object */
@Override
protected void process(ClusterDistributionManager dm) {
doLoad(dm);
}
@Override
public int getDSFID() {
return NET_LOAD_REQUEST_MESSAGE;
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
out.writeInt(this.processorId);
out.writeUTF(this.regionName);
DataSerializer.writeObject(this.key, out);
DataSerializer.writeObject(this.aCallbackArgument, out);
out.writeInt(this.timeoutMs);
out.writeInt(this.ttl);
out.writeInt(this.idleTime);
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.processorId = in.readInt();
this.regionName = in.readUTF();
this.key = DataSerializer.readObject(in);
this.aCallbackArgument = DataSerializer.readObject(in);
this.timeoutMs = in.readInt();
this.ttl = in.readInt();
this.idleTime = in.readInt();
}
@Override
public String toString() {
return "SearchLoadAndWriteProcessor.NetLoadRequestMessage for \"" + this.key
+ "\" in region \"" + this.regionName + "\", processorId " + processorId;
}
private void doLoad(ClusterDistributionManager dm) {
long startTime = dm.cacheTimeMillis();
final InitializationLevel oldLevel =
LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE);
try {
InternalCache gfc = dm.getExistingCache();
LocalRegion region = (LocalRegion) gfc.getRegion(this.regionName);
if (region != null && region.isInitialized()
&& (dm.cacheTimeMillis() - startTime < timeoutMs)) {
CacheLoader loader = ((AbstractRegion) region).basicGetLoader();
if (loader != null) {
LoaderHelper loaderHelper = region.loaderHelperFactory.createLoaderHelper(this.key,
this.aCallbackArgument, false, false, null);
CachePerfStats stats = region.getCachePerfStats();
long start = stats.startLoad();
try {
Object o = loader.load(loaderHelper);
// no need to call convertPdxInstanceIfNeeded since we are serializing
// this into the NetLoadRequestMessage. The loaded object will be deserialized
// on the other side and have the correct form in that member.
Assert.assertTrue(o != Token.INVALID && o != Token.LOCAL_INVALID);
NetLoadReplyMessage.sendMessage(NetLoadRequestMessage.this.getSender(), processorId,
o, dm, loaderHelper.getArgument(), null, false, false);
} catch (Exception e) {
replyWithException(e, dm);
} finally {
stats.endLoad(start);
}
} else {
replyWithException(new TryAgainException("No loader defined"),
dm);
}
} else {
replyWithException(new TryAgainException(
"Timeout expired or region not ready"),
dm);
}
} catch (RegionDestroyedException rde) {
replyWithException(rde, dm);
} catch (CancelException cce) {
replyWithException(cce, dm);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
replyWithException(new InternalGemFireException(
"Error processing request",
t), dm);
} finally {
LocalRegion.setThreadInitLevelRequirement(oldLevel);
}
}
void replyWithException(Exception e, ClusterDistributionManager dm) {
NetLoadReplyMessage.sendMessage(NetLoadRequestMessage.this.getSender(), processorId, null, dm,
this.aCallbackArgument, e, false, false);
}
}
/**
* The NetLoadReplyMessage is a reply to a RequestMessage, and contains the object's value.
*/
public static class NetLoadReplyMessage extends HighPriorityDistributionMessage {
/** The gemfire id of the SearchLoadAndWrite object waiting for response */
private int processorId;
/** The object value being transferred */
private Object result;
/** Loader parameter returned to sender */
private Object aCallbackArgument;
/** Exception thrown by remote node */
private Exception e;
/** Is blob serialized? */
private boolean isSerialized;
/** ??? */
private boolean requestorTimedOut;
public NetLoadReplyMessage() {}
public static void sendMessage(InternalDistributedMember recipient, int processorId, Object obj,
ClusterDistributionManager distributionManager, Object aCallbackArgument, Exception e,
boolean isSerialized, boolean requestorTimedOut) {
// create a message
NetLoadReplyMessage msg = new NetLoadReplyMessage();
msg.initialize(processorId, obj, aCallbackArgument, e, isSerialized, requestorTimedOut);
msg.setRecipient(recipient);
distributionManager.putOutgoing(msg);
}
private void initialize(int procId, Object obj, Object callbackArgument, Exception exe,
boolean isserialized, boolean requestorTimedout) {
this.processorId = procId;
this.result = obj;
this.e = exe;
this.aCallbackArgument = callbackArgument;
this.isSerialized = isserialized;
this.requestorTimedOut = requestorTimedout;
}
/**
* Invoked on the receiver - which, in this case, was the initiator of the
* NetLoadRequestMessage. This concludes the net request, by communicating an object value.
*/
@Override
protected void process(ClusterDistributionManager dm) {
SearchLoadAndWriteProcessor processor = null;
processor = (SearchLoadAndWriteProcessor) getProcessorKeeper().retrieve(processorId);
if (processor == null) {
if (logger.isDebugEnabled()) {
logger.debug("NetLoadReplyMessage() SearchLoadAndWriteProcessor no longer exists");
}
return;
}
processor.incomingNetLoadReply(this.result, 0, this.aCallbackArgument, this.e,
this.isSerialized, this.requestorTimedOut);
}
@Override
public int getDSFID() {
return NET_LOAD_REPLY_MESSAGE;
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
out.writeInt(this.processorId);
boolean isSerialized = this.isSerialized;
if (result instanceof byte[]) {
DataSerializer.writeByteArray((byte[]) this.result, out);
} else {
DataSerializer.writeObjectAsByteArray(this.result, out);
isSerialized = true;
}
DataSerializer.writeObject(this.aCallbackArgument, out);
DataSerializer.writeObject(this.e, out);
out.writeBoolean(isSerialized);
out.writeBoolean(this.requestorTimedOut);
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.processorId = in.readInt();
this.result = DataSerializer.readByteArray(in);
this.aCallbackArgument = DataSerializer.readObject(in);
this.e = (Exception) DataSerializer.readObject(in);
this.isSerialized = in.readBoolean();
this.requestorTimedOut = in.readBoolean();
}
@Override
public String toString() {
return "SearchLoadAndWriteProcessor.NetLoadReplyMessage for processorId " + processorId
+ ", blob is " + this.result;
}
}
/********************* NetWriteRequestMessage *******************************/
public static class NetWriteRequestMessage extends PooledDistributionMessage {
/**
* The object id of the processor object on the initiator node. This will be communicated back
* in the response to enable transferring the result to the initiating VM.
*/
private int processorId;
/** The fully qualified name of the Region */
private String regionName;
/** The event being sent over to the remote writer */
CacheEvent event;
private int timeoutMs;
/** Action requested by sender */
int action;
public NetWriteRequestMessage() {}
/**
* Using a new or pooled message instance, create and send the request for object value to the
* specified node.
*/
public static void sendMessage(SearchLoadAndWriteProcessor processor, String regionName,
int timeoutMs, CacheEvent event, Set recipients, int action) {
NetWriteRequestMessage msg = new NetWriteRequestMessage();
msg.initialize(processor, regionName, timeoutMs, event, action);
msg.setRecipients(recipients);
processor.distributionManager.putOutgoing(msg);
}
private void initialize(SearchLoadAndWriteProcessor processor, String theRegionName,
int timeoutMS, CacheEvent theEvent, int actionType) {
this.processorId = processor.processorId;
this.regionName = theRegionName;
this.timeoutMs = timeoutMS;
this.event = theEvent;
this.action = actionType;
Assert.assertTrue(processor.region.getScope().isDistributed());
}
@Override
public int getDSFID() {
return NET_WRITE_REQUEST_MESSAGE;
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
out.writeInt(this.processorId);
out.writeUTF(this.regionName);
out.writeInt(this.timeoutMs);
DataSerializer.writeObject(this.event, out);
out.writeInt(this.action);
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.processorId = in.readInt();
this.regionName = in.readUTF();
this.timeoutMs = in.readInt();
this.event = (CacheEvent) DataSerializer.readObject(in);
this.action = in.readInt();
}
@Override
public String toString() {
return "SearchLoadAndWriteProcessor.NetWriteRequestMessage " + " for region \""
+ this.regionName + "\", processorId " + processorId;
}
/** Invoked on the node that has the object */
@Override
protected void process(ClusterDistributionManager dm) {
long startTime = dm.cacheTimeMillis();
final InitializationLevel oldLevel =
LocalRegion.setThreadInitLevelRequirement(BEFORE_INITIAL_IMAGE);
try {
InternalCache gfc = dm.getExistingCache();
LocalRegion region = (LocalRegion) gfc.getRegion(this.regionName);
if (region != null && region.isInitialized()
&& (dm.cacheTimeMillis() - startTime < timeoutMs)) {
CacheWriter writer = region.basicGetWriter();
EntryEventImpl entryEvtImpl = null;
RegionEventImpl regionEvtImpl = null;
if (this.event instanceof EntryEventImpl) {
entryEvtImpl = (EntryEventImpl) this.event;
entryEvtImpl.setRegion(region);
Operation op = entryEvtImpl.getOperation();
if (op == Operation.REPLACE) {
entryEvtImpl.setOperation(Operation.UPDATE);
} else if (op == Operation.PUT_IF_ABSENT) {
entryEvtImpl.setOperation(Operation.CREATE);
} else if (op == Operation.REMOVE) {
entryEvtImpl.setOperation(Operation.DESTROY);
}
// [bruce] even if this event was transmitted from another VM, its operation may have
// originated in this VM. PartitionedRegion.put() with a cacheWriter is one
// situation where this can occur
entryEvtImpl.setOriginRemote(event.getDistributedMember() == null
|| !event.getDistributedMember().equals(dm.getDistributionManagerId()));
} else if (this.event instanceof RegionEventImpl) {
regionEvtImpl = (RegionEventImpl) this.event;
regionEvtImpl.region = region;
regionEvtImpl.originRemote = true;
}
if (writer != null) {
if (entryEvtImpl != null) {
entryEvtImpl.setReadOldValueFromDisk(true);
}
try {
switch (action) {
case BEFORECREATE:
writer.beforeCreate(entryEvtImpl);
break;
case BEFOREDESTROY:
writer.beforeDestroy(entryEvtImpl);
break;
case BEFOREUPDATE:
writer.beforeUpdate(entryEvtImpl);
break;
case BEFOREREGIONDESTROY:
writer.beforeRegionDestroy(regionEvtImpl);
break;
case BEFOREREGIONCLEAR:
writer.beforeRegionClear(regionEvtImpl);
break;
default:
break;
}
NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), processorId,
dm, true, null, false);
} catch (CacheWriterException cwe) {
NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), processorId,
dm, false, cwe, true);
} catch (Exception e) {
NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), processorId,
dm, false, e, false);
} finally {
if (entryEvtImpl != null) {
entryEvtImpl.setReadOldValueFromDisk(false);
}
}
} else {
NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), processorId,
dm, false,
new TryAgainException(
"No cachewriter defined"),
true);
}
} else {
NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), processorId, dm,
false,
new TryAgainException("Timeout expired or region not ready"),
true);
}
} catch (RegionDestroyedException ignore) {
NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), processorId, dm,
false, null, false);
} catch (DistributedSystemDisconnectedException e) {
// shutdown condition
throw e;
} catch (CancelException cce) {
dm.getCancelCriterion().checkCancelInProgress(cce); // TODO anyway to find the region or
// cache here?
NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), processorId, dm,
false, null, false);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
NetWriteReplyMessage.sendMessage(NetWriteRequestMessage.this.getSender(), processorId, dm,
false,
new InternalGemFireException(
"Error processing request",
t),
true);
} finally {
LocalRegion.setThreadInitLevelRequirement(oldLevel);
}
}
}
/**
* The NetWriteReplyMessage is a reply to a NetWriteRequestMessage, and contains the success code
* or exception that is propagated back to the requestor
*/
public static class NetWriteReplyMessage extends HighPriorityDistributionMessage {
/** The gemfire id of the SearchLoadAndWrite object waiting for response */
private int processorId;
/** Indicates whether the write succeeded */
private boolean netWriteSucceeded;
/** Exception thrown by remote node */
private Exception e;
/** Is the exception a cacheLoaderException */
private boolean cacheWriterException;
public NetWriteReplyMessage() {}
public static void sendMessage(InternalDistributedMember recipient, int processorId,
ClusterDistributionManager distributionManager, boolean netWriteSucceeded, Exception e,
boolean cacheWriterException) {
// create a message
NetWriteReplyMessage msg = new NetWriteReplyMessage();
msg.initialize(processorId, netWriteSucceeded, e, cacheWriterException);
msg.setRecipient(recipient);
distributionManager.putOutgoing(msg);
}
private void initialize(int procId, boolean netwriteSucceeded, Exception except,
boolean cacheWriterExcept) {
this.processorId = procId;
this.netWriteSucceeded = netwriteSucceeded;
this.e = except;
this.cacheWriterException = cacheWriterExcept;
}
/**
* Invoked on the receiver - which, in this case, was the initiator of the
* NetWriteRequestMessage. This concludes the net write request, by communicating an object
* value.
*/
@Override
protected void process(ClusterDistributionManager dm) {
SearchLoadAndWriteProcessor processor = null;
processor = (SearchLoadAndWriteProcessor) getProcessorKeeper().retrieve(processorId);
if (processor == null) {
if (logger.isDebugEnabled()) {
logger.debug("NetWriteReplyMessage() SearchLoadAndWriteProcessor no longer exists");
}
return;
}
processor.incomingNetWriteReply(this.netWriteSucceeded, this.e, this.cacheWriterException);
}
@Override
public int getDSFID() {
return NET_WRITE_REPLY_MESSAGE;
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
out.writeInt(this.processorId);
out.writeBoolean(this.netWriteSucceeded);
DataSerializer.writeObject(this.e, out);
out.writeBoolean(this.cacheWriterException);
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.processorId = in.readInt();
this.netWriteSucceeded = in.readBoolean();
this.e = (Exception) DataSerializer.readObject(in);
this.cacheWriterException = in.readBoolean();
}
@Override
public String toString() {
return "SearchLoadAndWriteProcessor.NetWriteReplyMessage for processorId " + processorId;
}
}
}