blob: 5a282a1765db24c614d9d384df7653c5ee5b3fd3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.partitioned;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryExistsException;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.client.PoolFactory;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DirectReplyProcessor;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.ReplySender;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.NanoTimer;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.DataLocationException;
import org.apache.geode.internal.cache.DistributedPutAllOperation;
import org.apache.geode.internal.cache.DistributedPutAllOperation.EntryVersionsList;
import org.apache.geode.internal.cache.DistributedPutAllOperation.PutAllEntryData;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EnumListenerEvent;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionDataStore;
import org.apache.geode.internal.cache.PutAllPartialResultException;
import org.apache.geode.internal.cache.PutAllPartialResultException.PutAllPartialResult;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
import org.apache.geode.internal.cache.versions.ConcurrentCacheModificationException;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.offheap.annotations.Retained;
import org.apache.geode.internal.serialization.ByteArrayDataInput;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* A Partitioned Region update message. Meant to be sent only to a bucket's primary owner. In
* addition to updating an entry it is also used to send Partitioned Region event information.
*
* @since GemFire 6.0
*/
public class PutAllPRMessage extends PartitionMessageWithDirectReply {
private static final Logger logger = LogService.getLogger();
private PutAllEntryData[] putAllPRData;
private int putAllPRDataSize = 0;
private Integer bucketId;
/**
* An additional object providing context for the operation, e.g., for BridgeServer notification
*/
ClientProxyMembershipID bridgeContext;
/** true if no callbacks should be invoked */
private boolean skipCallbacks;
private Object callbackArg;
protected static final short HAS_BRIDGE_CONTEXT = UNRESERVED_FLAGS_START;
protected static final short SKIP_CALLBACKS = (HAS_BRIDGE_CONTEXT << 1);
private transient InternalDistributedSystem internalDs;
/** whether direct-acknowledgement is desired */
private transient boolean directAck = false;
/**
* state from operateOnRegion that must be preserved for transmission from the waiting pool
*/
transient boolean result = false;
transient VersionedObjectList versions = null;
/**
* Empty constructor to satisfy {@link DataSerializer}requirements
*/
public PutAllPRMessage() {}
public PutAllPRMessage(int bucketId, int size, boolean notificationOnly, boolean posDup,
boolean skipCallbacks, Object callbackArg) {
this.bucketId = bucketId;
putAllPRData = new PutAllEntryData[size];
this.notificationOnly = notificationOnly;
this.posDup = posDup;
this.skipCallbacks = skipCallbacks;
this.callbackArg = callbackArg;
initTxMemberId();
}
public void addEntry(PutAllEntryData entry) {
this.putAllPRData[this.putAllPRDataSize++] = entry;
}
public void initMessage(PartitionedRegion r, Set recipients, boolean notifyOnly,
DirectReplyProcessor p) {
setInternalDs(r.getSystem());
setDirectAck(false);
this.resetRecipients();
if (recipients != null) {
setRecipients(recipients);
}
this.regionId = r.getPRId();
this.processor = p;
this.processorId = p == null ? 0 : p.getProcessorId();
if (p != null && this.isSevereAlertCompatible()) {
p.enableSevereAlertProcessing();
}
this.notificationOnly = notifyOnly;
}
@Override
public boolean isSevereAlertCompatible() {
// allow forced-disconnect processing for all cache op messages
return true;
}
public void setPossibleDuplicate(boolean posDup) {
this.posDup = posDup;
}
// this method made unnecessary by entry versioning in 7.0 but kept here for merging
// public void saveKeySet(PutAllPartialResult partialKeys) {
// partialKeys.addKeysAndVersions(this.versions);
// }
public int getSize() {
return putAllPRDataSize;
}
public Set getKeys() {
Set keys = new HashSet(getSize());
for (int i = 0; i < putAllPRData.length; i++) {
if (putAllPRData[i] != null) {
keys.add(putAllPRData[i].getKey());
}
}
return keys;
}
/**
* Sends a PartitionedRegion PutAllPRMessage to the recipient
*
* @param recipient the member to which the put message is sent
* @param r the PartitionedRegion for which the put was performed
* @return the processor used to await acknowledgement that the update was sent, or null to
* indicate that no acknowledgement will be sent
* @throws ForceReattemptException if the peer is no longer available
*/
public PartitionResponse send(DistributedMember recipient, PartitionedRegion r)
throws ForceReattemptException {
// Assert.assertTrue(recipient != null, "PutAllPRMessage NULL recipient"); recipient can be null
// for event notifications
Set recipients = Collections.singleton(recipient);
PutAllResponse p = new PutAllResponse(r.getSystem(), recipients);
initMessage(r, recipients, false, p);
setTransactionDistributed(r.getCache().getTxManager().isDistributed());
if (logger.isDebugEnabled()) {
logger.debug("PutAllPRMessage.send: recipient is {}, msg is {}", recipient, this);
}
Set failures = r.getDistributionManager().putOutgoing(this);
if (failures != null && failures.size() > 0) {
throw new ForceReattemptException("Failed sending <" + this + ">");
}
return p;
}
public void setBridgeContext(ClientProxyMembershipID contx) {
Assert.assertTrue(contx != null);
this.bridgeContext = contx;
}
@Override
public int getDSFID() {
return PR_PUTALL_MESSAGE;
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.bucketId = (int) InternalDataSerializer.readSignedVL(in);
if ((flags & HAS_BRIDGE_CONTEXT) != 0) {
this.bridgeContext = DataSerializer.readObject(in);
}
this.callbackArg = DataSerializer.readObject(in);
this.putAllPRDataSize = (int) InternalDataSerializer.readUnsignedVL(in);
this.putAllPRData = new PutAllEntryData[putAllPRDataSize];
if (this.putAllPRDataSize > 0) {
final Version version = InternalDataSerializer.getVersionForDataStreamOrNull(in);
final ByteArrayDataInput bytesIn = new ByteArrayDataInput();
for (int i = 0; i < this.putAllPRDataSize; i++) {
this.putAllPRData[i] = new PutAllEntryData(in, context, null, i, version, bytesIn);
}
boolean hasTags = in.readBoolean();
if (hasTags) {
EntryVersionsList versionTags = EntryVersionsList.create(in);
for (int i = 0; i < this.putAllPRDataSize; i++) {
this.putAllPRData[i].versionTag = versionTags.get(i);
}
}
}
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
if (bucketId == null) {
InternalDataSerializer.writeSignedVL(-1, out);
} else {
InternalDataSerializer.writeSignedVL(bucketId, out);
}
if (this.bridgeContext != null) {
DataSerializer.writeObject(this.bridgeContext, out);
}
DataSerializer.writeObject(this.callbackArg, out);
InternalDataSerializer.writeUnsignedVL(this.putAllPRDataSize, out);
if (this.putAllPRDataSize > 0) {
EntryVersionsList versionTags = new EntryVersionsList(putAllPRDataSize);
boolean hasTags = false;
for (int i = 0; i < this.putAllPRDataSize; i++) {
// If sender's version is >= 7.0.1 then we can send versions list.
if (!hasTags && putAllPRData[i].versionTag != null) {
hasTags = true;
}
VersionTag<?> tag = putAllPRData[i].versionTag;
versionTags.add(tag);
putAllPRData[i].versionTag = null;
putAllPRData[i].toData(out, context);
putAllPRData[i].versionTag = tag;
// PutAllEntryData's toData did not serialize eventID to save
// performance for DR, but in PR,
// we pack it for each entry since we used fake eventID
}
out.writeBoolean(hasTags);
if (hasTags) {
InternalDataSerializer.invokeToData(versionTags, out);
}
}
}
@Override
protected short computeCompressedShort(short s) {
s = super.computeCompressedShort(s);
if (this.bridgeContext != null)
s |= HAS_BRIDGE_CONTEXT;
if (this.skipCallbacks)
s |= SKIP_CALLBACKS;
return s;
}
@Override
protected void setBooleans(short s, DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.setBooleans(s, in, context);
this.skipCallbacks = ((s & SKIP_CALLBACKS) != 0);
}
@Override
public EventID getEventID() {
if (this.putAllPRData.length > 0) {
return this.putAllPRData[0].getEventID();
}
return null;
}
/**
* This method is called upon receipt and make the desired changes to the PartitionedRegion Note:
* It is very important that this message does NOT cause any deadlocks as the sender will wait
* indefinitely for the acknowledgement
*/
@Override
protected boolean operateOnPartitionedRegion(ClusterDistributionManager dm, PartitionedRegion pr,
long startTime) throws EntryExistsException, ForceReattemptException, DataLocationException {
boolean sendReply = true;
InternalDistributedMember eventSender = getSender();
long lastModified = 0L;
try {
result = doLocalPutAll(pr, eventSender, lastModified);
} catch (ForceReattemptException fre) {
sendReply(getSender(), getProcessorId(), dm, new ReplyException(fre), pr, startTime);
return false;
}
if (sendReply) {
sendReply(getSender(), getProcessorId(), dm, null, pr, startTime);
}
return false;
}
/* we need a event with content for waitForNodeOrCreateBucket() */
@Retained
public EntryEventImpl getFirstEvent(PartitionedRegion r) {
if (putAllPRDataSize == 0) {
return null;
}
@Retained
EntryEventImpl ev = EntryEventImpl.create(r, putAllPRData[0].getOp(), putAllPRData[0].getKey(),
putAllPRData[0].getValue(r.getCache()), this.callbackArg, false /* originRemote */,
getSender(), true/* generate Callbacks */, putAllPRData[0].getEventID());
return ev;
}
@Override
protected Object clone() throws CloneNotSupportedException {
// TODO Auto-generated method stub
return super.clone();
}
/**
* This method is called by both operateOnPartitionedRegion() when processing a remote msg or by
* sendMsgByBucket() when processing a msg targeted to local Jvm. PartitionedRegion Note: It is
* very important that this message does NOT cause any deadlocks as the sender will wait
* indefinitely for the acknowledgment
*
* @param r partitioned region eventSender the endpoint server who received request from client
* lastModified timestamp for last modification
* @return If succeeds, return true, otherwise, throw exception
*/
@edu.umd.cs.findbugs.annotations.SuppressWarnings("IMSE_DONT_CATCH_IMSE")
public boolean doLocalPutAll(PartitionedRegion r, InternalDistributedMember eventSender,
long lastModified)
throws EntryExistsException, ForceReattemptException, DataLocationException {
boolean didPut = false;
long clientReadTimeOut = PoolFactory.DEFAULT_READ_TIMEOUT;
if (r.hasServerProxy()) {
clientReadTimeOut = r.getServerProxy().getPool().getReadTimeout();
if (logger.isDebugEnabled()) {
logger.debug("PutAllPRMessage: doLocalPutAll: clientReadTimeOut is {}", clientReadTimeOut);
}
}
DistributedPutAllOperation dpao = null;
@Released
EntryEventImpl baseEvent = null;
BucketRegion bucketRegion = null;
PartitionedRegionDataStore ds = r.getDataStore();
InternalDistributedMember myId = r.getDistributionManager().getDistributionManagerId();
try {
if (!notificationOnly) {
// bucketRegion is not null only when !notificationOnly
bucketRegion = ds.getInitializedBucketForId(null, bucketId);
this.versions = new VersionedObjectList(this.putAllPRDataSize, true,
bucketRegion.getAttributes().getConcurrencyChecksEnabled());
// create a base event and a DPAO for PutAllMessage distributed btw redundant buckets
baseEvent = EntryEventImpl.create(bucketRegion, Operation.PUTALL_CREATE, null, null,
this.callbackArg, true, eventSender, !skipCallbacks, true);
// set baseEventId to the first entry's event id. We need the thread id for DACE
baseEvent.setEventId(putAllPRData[0].getEventID());
if (this.bridgeContext != null) {
baseEvent.setContext(this.bridgeContext);
}
baseEvent.setPossibleDuplicate(this.posDup);
if (logger.isDebugEnabled()) {
logger.debug(
"PutAllPRMessage.doLocalPutAll: eventSender is {}, baseEvent is {}, msg is {}",
eventSender, baseEvent, this);
}
dpao = new DistributedPutAllOperation(baseEvent, putAllPRDataSize, false);
}
Object[] keys = getKeysToBeLocked();
if (!notificationOnly) {
boolean locked = false;
try {
if (putAllPRData.length > 0) {
if (this.posDup && bucketRegion.getConcurrencyChecksEnabled()) {
if (logger.isDebugEnabled()) {
logger.debug("attempting to locate version tags for retried event");
}
// bug #48205 - versions may have already been generated for a posdup event
// so try to recover them before wiping out the eventTracker's record
// of the previous attempt
for (int i = 0; i < putAllPRDataSize; i++) {
if (putAllPRData[i].versionTag == null) {
putAllPRData[i].versionTag =
bucketRegion.findVersionTagForClientBulkOp(putAllPRData[i].getEventID());
if (putAllPRData[i].versionTag != null) {
putAllPRData[i].versionTag.replaceNullIDs(bucketRegion.getVersionMember());
}
}
}
}
EventID eventID = putAllPRData[0].getEventID();
ThreadIdentifier membershipID =
new ThreadIdentifier(eventID.getMembershipID(), eventID.getThreadID());
bucketRegion.recordBulkOpStart(membershipID, eventID);
}
locked = bucketRegion.waitUntilLocked(keys);
boolean lockedForPrimary = false;
final HashMap succeeded = new HashMap();
PutAllPartialResult partialKeys = new PutAllPartialResult(putAllPRDataSize);
Object key = keys[0];
try {
bucketRegion.doLockForPrimary(false);
lockedForPrimary = true;
/*
* The real work to be synchronized, it will take long time. We don't worry about
* another thread to send any msg which has the same key in this request, because these
* request will be blocked by foundKey
*/
for (int i = 0; i < putAllPRDataSize; i++) {
@Released
EntryEventImpl ev = getEventFromEntry(r, myId, eventSender, i, putAllPRData,
notificationOnly, bridgeContext, posDup, skipCallbacks);
try {
key = ev.getKey();
ev.setPutAllOperation(dpao);
// make sure a local update inserts a cache de-serializable
ev.makeSerializedNewValue();
// ev.setLocalFilterInfo(r.getFilterProfile().getLocalFilterRouting(ev));
// ev will be added into dpao in putLocally()
// oldValue and real operation will be modified into ev in putLocally()
// then in basicPutPart3(), the ev is added into dpao
try {
didPut = r.getDataView().putEntryOnRemote(ev, false, false, null, false,
lastModified, true);
if (didPut && logger.isDebugEnabled()) {
logger.debug("PutAllPRMessage.doLocalPutAll:putLocally success for {}", ev);
}
} catch (ConcurrentCacheModificationException e) {
didPut = true;
if (logger.isDebugEnabled()) {
logger.debug(
"PutAllPRMessage.doLocalPutAll:putLocally encountered concurrent cache modification for {}",
ev, e);
}
}
putAllPRData[i].setTailKey(ev.getTailKey());
if (!didPut) { // make sure the region hasn't gone away
r.checkReadiness();
ForceReattemptException fre = new ForceReattemptException(
"unable to perform put in PutAllPR, but operation should not fail");
fre.setHash(ev.getKey().hashCode());
throw fre;
} else {
succeeded.put(putAllPRData[i].getKey(), putAllPRData[i].getValue(r.getCache()));
this.versions.addKeyAndVersion(putAllPRData[i].getKey(), ev.getVersionTag());
}
} finally {
ev.release();
}
} // for
} catch (IllegalMonitorStateException ignore) {
throw new ForceReattemptException("unable to get lock for primary, retrying... ");
} catch (CacheWriterException cwe) {
// encounter cacheWriter exception
partialKeys.saveFailedKey(key, cwe);
} finally {
doPostPutAll(r, dpao, bucketRegion, lockedForPrimary);
}
if (partialKeys.hasFailure()) {
partialKeys.addKeysAndVersions(this.versions);
if (logger.isDebugEnabled()) {
logger.debug(
"PutAllPRMessage: partial keys applied, map to bucket {}'s keys: {}. Applied {}",
bucketId, Arrays.toString(keys), succeeded);
}
throw new PutAllPartialResultException(partialKeys);
}
} catch (RegionDestroyedException e) {
ds.checkRegionDestroyedOnBucket(bucketRegion, true, e);
} finally {
if (locked) {
bucketRegion.removeAndNotifyKeys(keys);
}
}
} else {
for (int i = 0; i < putAllPRDataSize; i++) {
EntryEventImpl ev = getEventFromEntry(r, myId, eventSender, i, putAllPRData,
notificationOnly, bridgeContext, posDup, skipCallbacks);
try {
ev.setOriginRemote(true);
if (this.callbackArg != null) {
ev.setCallbackArgument(this.callbackArg);
}
r.invokePutCallbacks(ev.getOperation().isCreate() ? EnumListenerEvent.AFTER_CREATE
: EnumListenerEvent.AFTER_UPDATE, ev, r.isInitialized(), true);
} finally {
ev.release();
}
}
}
} finally {
if (baseEvent != null)
baseEvent.release();
if (dpao != null)
dpao.freeOffHeapResources();
}
return true;
}
Object[] getKeysToBeLocked() {
// Fix the updateMsg misorder issue
// Lock the keys when doing postPutAll
Object keys[] = new Object[putAllPRDataSize];
for (int i = 0; i < putAllPRDataSize; ++i) {
keys[i] = putAllPRData[i].getKey();
}
return keys;
}
void doPostPutAll(PartitionedRegion r, DistributedPutAllOperation dpao,
BucketRegion bucketRegion, boolean lockedForPrimary) {
try {
// Only PutAllPRMessage knows if the thread id is fake. Event has no idea.
// So we have to manually set useFakeEventId for this DPAO
dpao.setUseFakeEventId(true);
r.checkReadiness();
bucketRegion.getDataView().postPutAll(dpao, this.versions, bucketRegion);
r.checkReadiness();
} finally {
if (lockedForPrimary) {
bucketRegion.doUnlockForPrimary();
}
}
}
public VersionedObjectList getVersions() {
return this.versions;
}
@Override
public boolean canStartRemoteTransaction() {
return true;
}
@Retained
public static EntryEventImpl getEventFromEntry(InternalRegion r, InternalDistributedMember myId,
InternalDistributedMember eventSender, int idx, PutAllEntryData[] data,
boolean notificationOnly, ClientProxyMembershipID bridgeContext, boolean posDup,
boolean skipCallbacks) {
PutAllEntryData prd = data[idx];
// EntryEventImpl ev = EntryEventImpl.create(r,
// prd.getOp(),
// prd.getKey(), null/* value */, null /* callbackArg */,
// false /* originRemote */,
// eventSender,
// true/* generate Callbacks */,
// prd.getEventID());
Object prdValue = prd.getValue(r.getCache());
@Retained
EntryEventImpl ev = EntryEventImpl.create(r, prd.getOp(), prd.getKey(), prdValue, null, false,
eventSender, !skipCallbacks, prd.getEventID());
boolean evReturned = false;
try {
if (prdValue == null && ev.getRegion().getAttributes().getDataPolicy() == DataPolicy.NORMAL) {
ev.setLocalInvalid(true);
}
ev.setNewValue(prdValue);
ev.setOldValue(prd.getOldValue());
if (bridgeContext != null) {
ev.setContext(bridgeContext);
}
ev.setInvokePRCallbacks(!notificationOnly);
ev.setPossibleDuplicate(posDup);
if (prd.filterRouting != null) {
ev.setLocalFilterInfo(prd.filterRouting.getFilterInfo(myId));
}
if (prd.versionTag != null) {
prd.versionTag.replaceNullIDs(eventSender);
ev.setVersionTag(prd.versionTag);
}
// ev.setLocalFilterInfo(r.getFilterProfile().getLocalFilterRouting(ev));
if (notificationOnly) {
ev.setTailKey(-1L);
} else {
ev.setTailKey(prd.getTailKey());
}
evReturned = true;
return ev;
} finally {
if (!evReturned) {
ev.release();
}
}
}
// override reply processor type from PartitionMessage
PartitionResponse createReplyProcessor(PartitionedRegion r, Set recipients, Object key) {
return new PutAllResponse(r.getSystem(), recipients);
}
// override reply message type from PartitionMessage
@Override
protected void sendReply(InternalDistributedMember member, int procId, DistributionManager dm,
ReplyException ex, PartitionedRegion pr, long startTime) {
// if (!result && getOperation().isCreate()) {
// System.err.println("DEBUG: put returning false. ifNew=" + ifNew
// +" ifOld="+ifOld + " message=" + this);
// }
if (pr != null) {
if (startTime > 0) {
pr.getPrStats().endPartitionMessagesProcessing(startTime);
}
if (!pr.getConcurrencyChecksEnabled() && this.versions != null) {
this.versions.clear();
}
}
PutAllReplyMessage.send(member, procId, getReplySender(dm), this.result, this.versions, ex);
}
@Override
protected void appendFields(StringBuilder buff) {
super.appendFields(buff);
buff.append("; putAllPRDataSize=").append(putAllPRDataSize).append("; bucketId=")
.append(bucketId);
if (this.bridgeContext != null) {
buff.append("; bridgeContext=").append(this.bridgeContext);
}
buff.append("; directAck=").append(this.directAck);
for (int i = 0; i < putAllPRDataSize; i++) {
buff.append("; entry").append(i).append(":").append(putAllPRData[i].getKey()).append(",")
.append(putAllPRData[i].versionTag);
}
}
public void setInternalDs(InternalDistributedSystem internalDs) {
this.internalDs = internalDs;
}
public void setDirectAck(boolean directAck) {
this.directAck = directAck;
}
@Override
protected boolean mayNotifySerialGatewaySender(ClusterDistributionManager dm) {
return notifiesSerialGatewaySender(dm);
}
@Override
public String toString() {
StringBuilder buff = new StringBuilder();
String className = getClass().getName();
buff.append(className.substring(className.indexOf(PN_TOKEN) + PN_TOKEN.length())); // partition.<foo>
buff.append("(prid="); // make sure this is the first one
buff.append(this.regionId);
// Append name, if we have it
String name = null;
try {
PartitionedRegion pr = PartitionedRegion.getPRFromId(this.regionId);
if (pr != null) {
name = pr.getFullPath();
}
} catch (Exception ignore) {
/* ignored */
name = null;
}
if (name != null) {
buff.append(" (name = \"").append(name).append("\")");
}
appendFields(buff);
buff.append(" ,distTx=");
buff.append(this.isTransactionDistributed);
buff.append(" ,putAlldatasize=");
buff.append(this.putAllPRDataSize);
// [DISTTX] TODO Disable this
buff.append(" ,putAlldata=");
buff.append(Arrays.toString(this.putAllPRData));
buff.append(")");
return buff.toString();
}
public static class PutAllReplyMessage extends ReplyMessage {
/** Result of the PutAll operation */
boolean result;
VersionedObjectList versions;
@Override
public boolean getInlineProcess() {
return true;
}
/**
* Empty constructor to conform to DataSerializable interface
*/
public PutAllReplyMessage() {}
private PutAllReplyMessage(int processorId, boolean result, VersionedObjectList versions,
ReplyException ex) {
super();
this.versions = versions;
this.result = result;
setProcessorId(processorId);
setException(ex);
}
/** Send an ack */
public static void send(InternalDistributedMember recipient, int processorId, ReplySender dm,
boolean result, VersionedObjectList versions, ReplyException ex) {
Assert.assertTrue(recipient != null, "PutAllReplyMessage NULL reply message");
PutAllReplyMessage m = new PutAllReplyMessage(processorId, result, versions, ex);
m.setRecipient(recipient);
dm.putOutgoing(m);
}
/**
* Processes this message. This method is invoked by the receiver of the message.
*
* @param dm the distribution manager that is processing the message.
*/
@Override
public void process(final DistributionManager dm, final ReplyProcessor21 rp) {
final long startTime = getTimestamp();
if (rp == null) {
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE, "{}: processor not found", this);
}
return;
}
if (rp instanceof PutAllResponse) {
PutAllResponse processor = (PutAllResponse) rp;
processor.setResponse(this);
}
rp.process(this);
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE, "{} processed {}", rp, this);
}
dm.getStats().incReplyMessageTime(NanoTimer.getTime() - startTime);
}
@Override
public int getDSFID() {
return PR_PUTALL_REPLY_MESSAGE;
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.result = in.readBoolean();
this.versions = (VersionedObjectList) DataSerializer.readObject(in);
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
out.writeBoolean(this.result);
DataSerializer.writeObject(this.versions, out);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("PutAllReplyMessage ").append("processorid=").append(this.processorId)
.append(" returning ").append(this.result).append(" exception=").append(getException())
.append(" versions= ").append(this.versions);
return sb.toString();
}
}
/**
* A processor to capture the value returned by {@link PutAllPRMessage}
*
* @since GemFire 5.8
*/
public static class PutAllResponse extends PartitionResponse {
private volatile boolean returnValue;
private VersionedObjectList versions;
public PutAllResponse(InternalDistributedSystem ds, Set recipients) {
super(ds, recipients, false);
}
public void setResponse(PutAllReplyMessage response) {
this.returnValue = response.result;
if (response.versions != null) {
this.versions = response.versions;
this.versions.replaceNullIDs(response.getSender());
}
}
/**
* @return the result of the remote put operation
* @throws ForceReattemptException if the peer is no longer available
* @throws CacheException if the peer generates an error
*/
public PutAllResult waitForResult() throws CacheException, ForceReattemptException {
waitForCacheException();
return new PutAllResult(this.returnValue, this.versions);
}
}
public static class PutAllResult {
/** the result of the put operation */
public boolean returnValue;
/** version information for the changes made to the cache */
public VersionedObjectList versions;
public PutAllResult(boolean flag, VersionedObjectList versions) {
this.returnValue = flag;
this.versions = versions;
}
@Override
public String toString() {
return "PutAllResult(" + this.returnValue + ", " + this.versions + ")";
}
}
}