blob: e8c4fa1c6b137b20b85927b159cfb50621f0ba99 [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.partitioned;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.CacheWriterException;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.EntryExistsException;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.RegionDestroyedException;
import com.gemstone.gemfire.cache.client.PoolFactory;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.DM;
import com.gemstone.gemfire.distributed.internal.DirectReplyProcessor;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem;
import com.gemstone.gemfire.distributed.internal.ReplyException;
import com.gemstone.gemfire.distributed.internal.ReplyMessage;
import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
import com.gemstone.gemfire.distributed.internal.ReplySender;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.Assert;
import com.gemstone.gemfire.internal.ByteArrayDataInput;
import com.gemstone.gemfire.internal.InternalDataSerializer;
import com.gemstone.gemfire.internal.NanoTimer;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.cache.BucketRegion;
import com.gemstone.gemfire.internal.cache.DataLocationException;
import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation;
import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation.EntryVersionsList;
import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation.PutAllEntryData;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
import com.gemstone.gemfire.internal.cache.KeyWithRegionContext;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegionDataStore;
import com.gemstone.gemfire.internal.cache.PutAllPartialResultException;
import com.gemstone.gemfire.internal.cache.PutAllPartialResultException.PutAllPartialResult;
import com.gemstone.gemfire.internal.cache.ha.ThreadIdentifier;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.tier.sockets.VersionedObjectList;
import com.gemstone.gemfire.internal.cache.versions.ConcurrentCacheModificationException;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
/**
* A Partitioned Region update message. Meant to be sent only to
* a bucket's primary owner. In addition to updating an entry it is also used to
* send Partitioned Region event information.
*
* @author Gester Zhou
* @since 6.0
*/
public final class PutAllPRMessage extends PartitionMessageWithDirectReply
{
private static final Logger logger = LogService.getLogger();
private PutAllEntryData[] putAllPRData;
private int putAllPRDataSize = 0;
private Integer bucketId;
/** An additional object providing context for the operation, e.g., for BridgeServer notification */
ClientProxyMembershipID bridgeContext;
/** true if no callbacks should be invoked */
private boolean skipCallbacks;
private Object callbackArg;
protected static final short HAS_BRIDGE_CONTEXT = UNRESERVED_FLAGS_START;
protected static final short SKIP_CALLBACKS = (HAS_BRIDGE_CONTEXT << 1);
protected static final short FETCH_FROM_HDFS = (SKIP_CALLBACKS << 1);
//using the left most bit for IS_PUT_DML, the last available bit
protected static final short IS_PUT_DML = (short) (FETCH_FROM_HDFS << 1);
private transient InternalDistributedSystem internalDs;
/** whether direct-acknowledgement is desired */
private transient boolean directAck = false;
/**
* state from operateOnRegion that must be preserved for transmission
* from the waiting pool
*/
transient boolean result = false;
transient VersionedObjectList versions = null;
/** whether this operation should fetch oldValue from HDFS */
private boolean fetchFromHDFS;
private boolean isPutDML;
/**
* Empty constructor to satisfy {@link DataSerializer}requirements
*/
public PutAllPRMessage() {
}
public PutAllPRMessage(int bucketId, int size, boolean notificationOnly,
boolean posDup, boolean skipCallbacks, Object callbackArg, boolean fetchFromHDFS, boolean isPutDML) {
this.bucketId = Integer.valueOf(bucketId);
putAllPRData = new PutAllEntryData[size];
this.notificationOnly = notificationOnly;
this.posDup = posDup;
this.skipCallbacks = skipCallbacks;
this.callbackArg = callbackArg;
initTxMemberId();
this.fetchFromHDFS = fetchFromHDFS;
this.isPutDML = isPutDML;
}
public void addEntry(PutAllEntryData entry) {
this.putAllPRData[this.putAllPRDataSize++] = entry;
}
public void initMessage(PartitionedRegion r, Set recipients, boolean notifyOnly, DirectReplyProcessor p) {
setInternalDs(r.getSystem());
setDirectAck(false);
this.resetRecipients();
if (recipients != null) {
setRecipients(recipients);
}
this.regionId = r.getPRId();
this.processor = p;
this.processorId = p==null? 0 : p.getProcessorId();
if (p != null && this.isSevereAlertCompatible()) {
p.enableSevereAlertProcessing();
}
this.notificationOnly = notifyOnly;
}
@Override
public boolean isSevereAlertCompatible() {
// allow forced-disconnect processing for all cache op messages
return true;
}
public void setPossibleDuplicate(boolean posDup) {
this.posDup = posDup;
}
// this method made unnecessary by entry versioning in 7.0 but kept here for merging
// public void saveKeySet(PutAllPartialResult partialKeys) {
// partialKeys.addKeysAndVersions(this.versions);
// }
public int getSize() {
return putAllPRDataSize;
}
public Set getKeys() {
Set keys = new HashSet(getSize());
for (int i=0; i<putAllPRData.length; i++) {
if (putAllPRData[i] != null) {
keys.add(putAllPRData[i].getKey());
}
}
return keys;
}
/**
* Sends a PartitionedRegion PutAllPRMessage to the recipient
* @param recipient the member to which the put message is sent
* @param r the PartitionedRegion for which the put was performed
* @return the processor used to await acknowledgement that the update was
* sent, or null to indicate that no acknowledgement will be sent
* @throws ForceReattemptException if the peer is no longer available
*/
public PartitionResponse send(DistributedMember recipient, PartitionedRegion r)
throws ForceReattemptException
{
//Assert.assertTrue(recipient != null, "PutAllPRMessage NULL recipient"); recipient can be null for event notifications
Set recipients = Collections.singleton(recipient);
PutAllResponse p = new PutAllResponse(r.getSystem(), recipients);
initMessage(r, recipients, false, p);
setTransactionDistributed(r.getCache().getTxManager().isDistributed());
if (logger.isDebugEnabled()) {
logger.debug("PutAllPRMessage.send: recipient is {}, msg is {}", recipient, this);
}
Set failures =r.getDistributionManager().putOutgoing(this);
if (failures != null && failures.size() > 0) {
throw new ForceReattemptException("Failed sending <" + this + ">");
}
return p;
}
public void setBridgeContext(ClientProxyMembershipID contx) {
Assert.assertTrue(contx != null);
this.bridgeContext = contx;
}
public int getDSFID() {
return PR_PUTALL_MESSAGE;
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
super.fromData(in);
this.bucketId = Integer.valueOf((int)InternalDataSerializer
.readSignedVL(in));
if ((flags & HAS_BRIDGE_CONTEXT) != 0) {
this.bridgeContext = DataSerializer.readObject(in);
}
Version sourceVersion = InternalDataSerializer.getVersionForDataStream(in);
if (sourceVersion.compareTo(Version.GFE_81) >= 0) {
this.callbackArg = DataSerializer.readObject(in);
}
this.putAllPRDataSize = (int)InternalDataSerializer.readUnsignedVL(in);
this.putAllPRData = new PutAllEntryData[putAllPRDataSize];
if (this.putAllPRDataSize > 0) {
final Version version = InternalDataSerializer
.getVersionForDataStreamOrNull(in);
final ByteArrayDataInput bytesIn = new ByteArrayDataInput();
for (int i = 0; i < this.putAllPRDataSize; i++) {
this.putAllPRData[i] = new PutAllEntryData(in, null, i, version,
bytesIn);
}
boolean hasTags = in.readBoolean();
if (hasTags) {
EntryVersionsList versionTags = EntryVersionsList.create(in);
for (int i = 0; i < this.putAllPRDataSize; i++) {
this.putAllPRData[i].versionTag = versionTags.get(i);
}
}
}
}
@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
if (bucketId == null) {
InternalDataSerializer.writeSignedVL(-1, out);
} else {
InternalDataSerializer.writeSignedVL(bucketId.intValue(), out);
}
if (this.bridgeContext != null) {
DataSerializer.writeObject(this.bridgeContext, out);
}
Version receiverVersion = InternalDataSerializer.getVersionForDataStream(out);
if (receiverVersion.compareTo(Version.GFE_81) >= 0) {
DataSerializer.writeObject(this.callbackArg, out);
}
InternalDataSerializer.writeUnsignedVL(this.putAllPRDataSize, out);
if (this.putAllPRDataSize > 0) {
EntryVersionsList versionTags = new EntryVersionsList(putAllPRDataSize);
boolean hasTags = false;
// get the "keyRequiresRegionContext" flag from first element assuming
// all key objects to be uniform
final boolean requiresRegionContext =
(this.putAllPRData[0].getKey() instanceof KeyWithRegionContext);
for (int i = 0; i < this.putAllPRDataSize; i++) {
// If sender's version is >= 7.0.1 then we can send versions list.
if (!hasTags && putAllPRData[i].versionTag != null) {
hasTags = true;
}
VersionTag<?> tag = putAllPRData[i].versionTag;
versionTags.add(tag);
putAllPRData[i].versionTag = null;
putAllPRData[i].toData(out, requiresRegionContext);
putAllPRData[i].versionTag = tag;
// PutAllEntryData's toData did not serialize eventID to save
// performance for DR, but in PR,
// we pack it for each entry since we used fake eventID
}
out.writeBoolean(hasTags);
if (hasTags) {
InternalDataSerializer.invokeToData(versionTags, out);
}
}
}
@Override
protected short computeCompressedShort(short s) {
s = super.computeCompressedShort(s);
if (this.bridgeContext != null) s |= HAS_BRIDGE_CONTEXT;
if (this.skipCallbacks) s |= SKIP_CALLBACKS;
if (this.fetchFromHDFS) s |= FETCH_FROM_HDFS;
if (this.isPutDML) s |= IS_PUT_DML;
return s;
}
@Override
protected void setBooleans(short s, DataInput in) throws IOException,
ClassNotFoundException {
super.setBooleans(s, in);
this.skipCallbacks = ((s & SKIP_CALLBACKS) != 0);
this.fetchFromHDFS = ((s & FETCH_FROM_HDFS) != 0);
this.isPutDML = ((s & IS_PUT_DML) != 0);
}
@Override
public EventID getEventID() {
if (this.putAllPRData.length > 0) {
return this.putAllPRData[0].getEventID();
}
return null;
}
/**
* This method is called upon receipt and make the desired changes to the
* PartitionedRegion Note: It is very important that this message does NOT
* cause any deadlocks as the sender will wait indefinitely for the
* acknowledgement
*/
@Override
protected final boolean operateOnPartitionedRegion(DistributionManager dm,
PartitionedRegion r, long startTime) throws EntryExistsException,
ForceReattemptException, DataLocationException
{
boolean sendReply = true;
InternalDistributedMember eventSender = getSender();
long lastModified = 0L;
try {
result = doLocalPutAll(r, eventSender, lastModified);
}
catch (ForceReattemptException fre) {
sendReply(getSender(), getProcessorId(), dm,
new ReplyException(fre), r, startTime);
return false;
}
if (sendReply) {
sendReply(getSender(), getProcessorId(), dm, null, r, startTime);
}
return false;
}
/* we need a event with content for waitForNodeOrCreateBucket() */
public EntryEventImpl getFirstEvent(PartitionedRegion r) {
if (putAllPRDataSize == 0) {
return null;
}
EntryEventImpl ev = EntryEventImpl.create(r,
putAllPRData[0].getOp(),
putAllPRData[0].getKey(),
putAllPRData[0].getValue(),
this.callbackArg,
false /* originRemote */,
getSender(),
true/* generate Callbacks */,
putAllPRData[0].getEventID());
return ev;
}
@Override
protected Object clone() throws CloneNotSupportedException {
// TODO Auto-generated method stub
return super.clone();
}
/**
* This method is called by both operateOnPartitionedRegion() when processing a remote msg
* or by sendMsgByBucket() when processing a msg targeted to local Jvm.
* PartitionedRegion Note: It is very important that this message does NOT
* cause any deadlocks as the sender will wait indefinitely for the
* acknowledgment
* @param r partitioned region
* eventSender the endpoint server who received request from client
* lastModified timestamp for last modification
* @return If succeeds, return true, otherwise, throw exception
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IMSE_DONT_CATCH_IMSE")
public final boolean doLocalPutAll(PartitionedRegion r, InternalDistributedMember eventSender, long lastModified)
throws EntryExistsException,
ForceReattemptException,DataLocationException
{
boolean didPut=false;
long clientReadTimeOut = PoolFactory.DEFAULT_READ_TIMEOUT;
if (r.hasServerProxy()) {
clientReadTimeOut = r.getServerProxy().getPool().getReadTimeout();
if (logger.isDebugEnabled()) {
logger.debug("PutAllPRMessage: doLocalPutAll: clientReadTimeOut is {}", clientReadTimeOut);
}
}
DistributedPutAllOperation dpao = null;
EntryEventImpl baseEvent = null;
BucketRegion bucketRegion = null;
PartitionedRegionDataStore ds = r.getDataStore();
InternalDistributedMember myId = r.getDistributionManager().getDistributionManagerId();
try {
if (!notificationOnly) {
// bucketRegion is not null only when !notificationOnly
bucketRegion = ds.getInitializedBucketForId(null, bucketId);
this.versions = new VersionedObjectList(this.putAllPRDataSize, true, bucketRegion.getAttributes().getConcurrencyChecksEnabled());
// create a base event and a DPAO for PutAllMessage distributed btw redundant buckets
baseEvent = EntryEventImpl.create(
bucketRegion, Operation.PUTALL_CREATE,
null, null, this.callbackArg, true, eventSender, !skipCallbacks, true);
// set baseEventId to the first entry's event id. We need the thread id for DACE
baseEvent.setEventId(putAllPRData[0].getEventID());
if (this.bridgeContext != null) {
baseEvent.setContext(this.bridgeContext);
}
baseEvent.setPossibleDuplicate(this.posDup);
if (logger.isDebugEnabled()) {
logger.debug("PutAllPRMessage.doLocalPutAll: eventSender is {}, baseEvent is {}, msg is {}",
eventSender, baseEvent, this);
}
dpao = new DistributedPutAllOperation(baseEvent, putAllPRDataSize, false);
}
// Fix the updateMsg misorder issue
// Lock the keys when doing postPutAll
Object keys[] = new Object[putAllPRDataSize];
final boolean keyRequiresRegionContext = r.keyRequiresRegionContext();
for (int i = 0; i < putAllPRDataSize; ++i) {
keys[i] = putAllPRData[i].getKey();
if (keyRequiresRegionContext) {
((KeyWithRegionContext)keys[i]).setRegionContext(r);
}
}
if (!notificationOnly) {
try {
if(putAllPRData.length > 0) {
if (this.posDup && bucketRegion.getConcurrencyChecksEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("attempting to locate version tags for retried event");
}
// bug #48205 - versions may have already been generated for a posdup event
// so try to recover them before wiping out the eventTracker's record
// of the previous attempt
for (int i=0; i<putAllPRDataSize; i++) {
if (putAllPRData[i].versionTag == null) {
putAllPRData[i].versionTag = bucketRegion.findVersionTagForClientBulkOp(putAllPRData[i].getEventID());
if (putAllPRData[i].versionTag != null) {
putAllPRData[i].versionTag.replaceNullIDs(bucketRegion.getVersionMember());
}
}
}
}
EventID eventID = putAllPRData[0].getEventID();
ThreadIdentifier membershipID = new ThreadIdentifier(
eventID.getMembershipID(), eventID.getThreadID());
bucketRegion.recordBulkOpStart(membershipID);
}
bucketRegion.waitUntilLocked(keys);
boolean lockedForPrimary = false;
final HashMap succeeded = new HashMap();
PutAllPartialResult partialKeys = new PutAllPartialResult(putAllPRDataSize);
Object key = keys[0];
try {
bucketRegion.doLockForPrimary(false);
lockedForPrimary = true;
/* The real work to be synchronized, it will take long time. We don't
* worry about another thread to send any msg which has the same key
* in this request, because these request will be blocked by foundKey
*/
for (int i=0; i<putAllPRDataSize; i++) {
EntryEventImpl ev = getEventFromEntry(r, myId, eventSender, i,putAllPRData,notificationOnly,bridgeContext,posDup,skipCallbacks, this.isPutDML);
try {
key = ev.getKey();
ev.setPutAllOperation(dpao);
// set the fetchFromHDFS flag
ev.setFetchFromHDFS(this.fetchFromHDFS);
// make sure a local update inserts a cache de-serializable
ev.makeSerializedNewValue();
// ev.setLocalFilterInfo(r.getFilterProfile().getLocalFilterRouting(ev));
// ev will be added into dpao in putLocally()
// oldValue and real operation will be modified into ev in putLocally()
// then in basicPutPart3(), the ev is added into dpao
try {
didPut = r.getDataView().putEntryOnRemote(ev, false, false, null, false, lastModified, true);
if (didPut && logger.isDebugEnabled()) {
logger.debug("PutAllPRMessage.doLocalPutAll:putLocally success for {}", ev);
}
} catch (ConcurrentCacheModificationException e) {
didPut = true;
if (logger.isDebugEnabled()) {
logger.debug("PutAllPRMessage.doLocalPutAll:putLocally encountered concurrent cache modification for {}", ev, e);
}
}
putAllPRData[i].setTailKey(ev.getTailKey());
if (!didPut) { // make sure the region hasn't gone away
r.checkReadiness();
ForceReattemptException fre = new ForceReattemptException(
"unable to perform put in PutAllPR, but operation should not fail");
fre.setHash(ev.getKey().hashCode());
throw fre;
} else {
succeeded.put(putAllPRData[i].getKey(), putAllPRData[i].getValue());
this.versions.addKeyAndVersion(putAllPRData[i].getKey(), ev.getVersionTag());
}
} finally {
ev.release();
}
} // for
} catch (IllegalMonitorStateException ex) {
ForceReattemptException fre = new ForceReattemptException(
"unable to get lock for primary, retrying... ");
throw fre;
} catch (CacheWriterException cwe) {
// encounter cacheWriter exception
partialKeys.saveFailedKey(key, cwe);
} finally {
try {
// Only PutAllPRMessage knows if the thread id is fake. Event has no idea.
// So we have to manually set useFakeEventId for this DPAO
dpao.setUseFakeEventId(true);
r.checkReadiness();
bucketRegion.getDataView().postPutAll(dpao, this.versions, bucketRegion);
} finally {
if (lockedForPrimary) {
bucketRegion.doUnlockForPrimary();
}
}
}
if (partialKeys.hasFailure()) {
partialKeys.addKeysAndVersions(this.versions);
if (logger.isDebugEnabled()) {
logger.debug("PutAllPRMessage: partial keys applied, map to bucket {}'s keys: {}. Applied {}",
bucketId, Arrays.toString(keys), succeeded);
}
throw new PutAllPartialResultException(partialKeys);
}
} catch(RegionDestroyedException e) {
ds.checkRegionDestroyedOnBucket(bucketRegion ,true, e);
} finally {
bucketRegion.removeAndNotifyKeys(keys);
}
} else {
for (int i=0; i<putAllPRDataSize; i++) {
EntryEventImpl ev = getEventFromEntry(r, myId, eventSender, i,putAllPRData,notificationOnly,bridgeContext,posDup,skipCallbacks, this.isPutDML);
try {
ev.setOriginRemote(true);
if (this.callbackArg != null) {
ev.setCallbackArgument(this.callbackArg);
}
r.invokePutCallbacks(ev.getOperation().isCreate() ? EnumListenerEvent.AFTER_CREATE
: EnumListenerEvent.AFTER_UPDATE, ev, r.isInitialized(), true);
} finally {
ev.release();
}
}
}
} finally {
if (baseEvent != null) baseEvent.release();
if (dpao != null) dpao.freeOffHeapResources();
}
return true;
}
public VersionedObjectList getVersions() {
return this.versions;
}
@Override
public boolean canStartRemoteTransaction() {
return true;
}
public static EntryEventImpl getEventFromEntry(LocalRegion r,
InternalDistributedMember myId, InternalDistributedMember eventSender,
int idx, DistributedPutAllOperation.PutAllEntryData[] data,
boolean notificationOnly, ClientProxyMembershipID bridgeContext,
boolean posDup, boolean skipCallbacks, boolean isPutDML) {
PutAllEntryData prd = data[idx];
//EntryEventImpl ev = EntryEventImpl.create(r,
// prd.getOp(),
// prd.getKey(), null/* value */, null /* callbackArg */,
// false /* originRemote */,
// eventSender,
// true/* generate Callbacks */,
// prd.getEventID());
EntryEventImpl ev = EntryEventImpl.create(r, prd.getOp(), prd.getKey(), prd
.getValue(), null, false, eventSender, !skipCallbacks, prd.getEventID());
boolean evReturned = false;
try {
if (prd.getValue() == null
&& ev.getRegion().getAttributes().getDataPolicy() == DataPolicy.NORMAL) {
ev.setLocalInvalid(true);
}
ev.setNewValue(prd.getValue());
ev.setOldValue(prd.getOldValue());
if (bridgeContext != null) {
ev.setContext(bridgeContext);
}
ev.setInvokePRCallbacks(!notificationOnly);
ev.setPossibleDuplicate(posDup);
if (prd.filterRouting != null) {
ev.setLocalFilterInfo(prd.filterRouting.getFilterInfo(myId));
}
if (prd.versionTag != null) {
prd.versionTag.replaceNullIDs(eventSender);
ev.setVersionTag(prd.versionTag);
}
//ev.setLocalFilterInfo(r.getFilterProfile().getLocalFilterRouting(ev));
if(notificationOnly){
ev.setTailKey(-1L);
} else {
ev.setTailKey(prd.getTailKey());
}
ev.setPutDML(isPutDML);
evReturned = true;
return ev;
} finally {
if (!evReturned) {
ev.release();
}
}
}
// override reply processor type from PartitionMessage
PartitionResponse createReplyProcessor(PartitionedRegion r, Set recipients, Object key) {
return new PutAllResponse(r.getSystem(), recipients);
}
// override reply message type from PartitionMessage
@Override
protected void sendReply(InternalDistributedMember member, int procId, DM dm, ReplyException ex, PartitionedRegion pr, long startTime) {
// if (!result && getOperation().isCreate()) {
// System.err.println("DEBUG: put returning false. ifNew=" + ifNew
// +" ifOld="+ifOld + " message=" + this);
// }
if (pr != null) {
if (startTime > 0) {
pr.getPrStats().endPartitionMessagesProcessing(startTime);
}
if (!pr.getConcurrencyChecksEnabled() && this.versions != null) {
this.versions.clear();
}
}
PutAllReplyMessage.send(member, procId, getReplySender(dm), this.result, this.versions, ex);
}
@Override
protected final void appendFields(StringBuffer buff)
{
super.appendFields(buff);
buff.append("; putAllPRDataSize=").append(putAllPRDataSize)
.append("; bucketId=").append(bucketId);
if (this.bridgeContext != null) {
buff.append("; bridgeContext=").append(this.bridgeContext);
}
buff.append("; directAck=")
.append(this.directAck);
for (int i=0; i<putAllPRDataSize; i++) {
// buff.append("; entry"+i+":").append(putAllPRData[i]);
buff.append("; entry"+i+":").append(putAllPRData[i].getKey())
.append(",").append(putAllPRData[i].versionTag);
}
}
public final InternalDistributedSystem getInternalDs()
{
return internalDs;
}
public final void setInternalDs(InternalDistributedSystem internalDs)
{
this.internalDs = internalDs;
}
public final void setDirectAck(boolean directAck)
{
this.directAck = directAck;
}
@Override
protected boolean mayAddToMultipleSerialGateways(DistributionManager dm) {
return _mayAddToMultipleSerialGateways(dm);
}
public static final class PutAllReplyMessage extends ReplyMessage {
/** Result of the PutAll operation */
boolean result;
VersionedObjectList versions;
@Override
public boolean getInlineProcess() {
return true;
}
/**
* Empty constructor to conform to DataSerializable interface
*/
public PutAllReplyMessage() {
}
private PutAllReplyMessage(int processorId, boolean result, VersionedObjectList versions, ReplyException ex) {
super();
this.versions = versions;
this.result = result;
setProcessorId(processorId);
setException(ex);
}
/** Send an ack */
public static void send(InternalDistributedMember recipient, int processorId,
ReplySender dm, boolean result, VersionedObjectList versions, ReplyException ex) {
Assert.assertTrue(recipient != null, "PutAllReplyMessage NULL reply message");
PutAllReplyMessage m = new PutAllReplyMessage(processorId, result, versions, ex);
m.setRecipient(recipient);
dm.putOutgoing(m);
}
/**
* Processes this message. This method is invoked by the receiver
* of the message.
* @param dm the distribution manager that is processing the message.
*/
@Override
public void process(final DM dm, final ReplyProcessor21 rp) {
final long startTime = getTimestamp();
if (rp == null) {
if (logger.isTraceEnabled(LogMarker.DM)) {
logger.trace(LogMarker.DM, "{}: processor not found", this);
}
return;
}
if (rp instanceof PutAllResponse) {
PutAllResponse processor = (PutAllResponse)rp;
processor.setResponse(this);
}
rp.process(this);
if (logger.isTraceEnabled(LogMarker.DM)) {
logger.trace(LogMarker.DM, "{} processed {}", rp, this);
}
dm.getStats().incReplyMessageTime(NanoTimer.getTime()-startTime);
}
@Override
public int getDSFID() {
return PR_PUTALL_REPLY_MESSAGE;
}
@Override
public void fromData(DataInput in)
throws IOException, ClassNotFoundException {
super.fromData(in);
this.result = in.readBoolean();
this.versions = (VersionedObjectList)DataSerializer.readObject(in);
}
@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
out.writeBoolean(this.result);
DataSerializer.writeObject(this.versions, out);
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("PutAllReplyMessage ")
.append("processorid=").append(this.processorId)
.append(" returning ").append(this.result)
.append(" exception=").append(getException())
.append(" versions= ").append(this.versions);
return sb.toString();
}
}
/**
* A processor to capture the value returned by {@link PutAllPRMessage}
* @author Gester Zhou
* @since 5.8
*/
public static class PutAllResponse extends PartitionResponse {
private volatile boolean returnValue;
private VersionedObjectList versions;
public PutAllResponse(InternalDistributedSystem ds, Set recipients) {
super(ds, recipients, false);
}
public void setResponse(PutAllReplyMessage response) {
this.returnValue = response.result;
if (response.versions != null) {
this.versions = response.versions;
this.versions.replaceNullIDs(response.getSender());
}
}
/**
* @return the result of the remote put operation
* @throws ForceReattemptException if the peer is no longer available
* @throws CacheException if the peer generates an error
*/
public PutAllResult waitForResult() throws CacheException,
ForceReattemptException {
try {
waitForCacheException();
}
catch (ForceReattemptException e) {
throw e;
}
// try {
// waitForRepliesUninterruptibly();
// }
// catch (ReplyException e) {
// Throwable t = e.getCause();
// if (t instanceof CacheClosedException) {
// throw new PartitionedRegionCommunicationException("Put operation received an exception", t);
// }
// e.handleAsUnexpected();
// }
return new PutAllResult(this.returnValue, this.versions);
}
}
public static class PutAllResult {
/** the result of the put operation */
public boolean returnValue;
/** version information for the changes made to the cache */
public VersionedObjectList versions;
public PutAllResult(boolean flag, VersionedObjectList versions) {
this.returnValue = flag;
this.versions = versions;
}
@Override
public String toString() {
return "PutAllResult("+this.returnValue+", "+this.versions+")";
}
}
}