package org.apache.geode.cache.client.internal;
import org.apache.logging.log4j.Logger;
import org.apache.geode.DataSerializer;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.client.AllConnectionsInUseException;
import org.apache.geode.cache.client.ServerConnectivityException;
import org.apache.geode.cache.client.ServerOperationException;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.CachedDeserializable;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.RegionEntry;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.Message;
import org.apache.geode.internal.cache.tier.sockets.Part;
import org.apache.geode.internal.cache.versions.VersionStamp;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.serialization.ByteArrayDataInput;
import org.apache.geode.logging.internal.log4j.api.LogService;
* Does a region put (or create) on a server
* @since GemFire 5.7
public class PutOp {
private static final Logger logger = LogService.getLogger();
* Does a region put on a server using connections from the given pool to communicate with the
* server.
* @param pool the pool to use to communicate with the server.
* @param region the region to do the put on
* @param key the entry key to do the put on
* @param value the entry value to put
* @param event the event for this put
* @param callbackArg an optional callback arg to pass to any cache callbacks
public static Object execute(ExecutablePool pool, LocalRegion region, Object key, Object value,
byte[] deltaBytes, EntryEventImpl event, Operation operation, boolean requireOldValue,
Object expectedOldValue, Object callbackArg, boolean prSingleHopEnabled) {
PutOpImpl op = new PutOpImpl(region, key, value, deltaBytes, event, operation, requireOldValue,
expectedOldValue, callbackArg, false/* donot send full obj; send delta */,
if (prSingleHopEnabled) {
ClientMetadataService cms = region.getCache().getClientMetadataService();
ServerLocation server =
cms.getBucketServerLocation(region, Operation.UPDATE, key, value, callbackArg);
if (server != null) {
try {
PoolImpl poolImpl = (PoolImpl) pool;
boolean onlyUseExistingCnx = (poolImpl.getMaxConnections() != -1
&& poolImpl.getConnectionCount() >= poolImpl.getMaxConnections());
return pool.executeOn(new ServerLocation(server.getHostName(), server.getPort()), op,
true, onlyUseExistingCnx);
} catch (AllConnectionsInUseException ignored) {
} catch (ServerConnectivityException e) {
if (e instanceof ServerOperationException) {
throw e; // fixed 44656
Object result = pool.execute(op);
if (op.getMessage().isRetry()) {
return result;
public static Object execute(ExecutablePool pool, String regionName, Object key, Object value,
byte[] deltaBytes, EntryEventImpl event, Operation operation,
boolean requireOldValue,
Object expectedOldValue, Object callbackArg,
boolean prSingleHopEnabled) {
AbstractOp op = new PutOpImpl(regionName, key, value, deltaBytes, event, operation,
requireOldValue, expectedOldValue, callbackArg, false/* donot send full obj; send delta */,
return pool.execute(op);
* This is a unit test method. It does a region put on a server using the given connection from
* the given pool to communicate with the server. Do not call this method if the value is Delta
* instance.
* @param con the connection to use
* @param pool the pool to use to communicate with the server.
* @param regionName the name of the region to do the put on
* @param key the entry key to do the put on
* @param value the entry value to put
* @param event the event for this put
* @param callbackArg an optional callback arg to pass to any cache callbacks
public static void execute(Connection con, ExecutablePool pool, String regionName, Object key,
Object value, EntryEventImpl event, Object callbackArg, boolean prSingleHopEnabled) {
AbstractOp op = new PutOpImpl(regionName, key, value, null, event, Operation.CREATE, false,
null, callbackArg, false /* donot send full Obj; send delta */, prSingleHopEnabled);
pool.executeOn(con, op);
public static final byte HAS_OLD_VALUE_FLAG = 0x01;
public static final byte OLD_VALUE_IS_OBJECT_FLAG = 0x02;
public static final byte HAS_VERSION_TAG = 0x04;
private PutOp() {
// no instances allowed
protected static class PutOpImpl extends AbstractOp {
private Object key;
private LocalRegion region;
* the operation will have either a region or a regionName. Names seem to be used by unit tests
* to exercise operations without creating a real region
private String regionName;
private Object value;
private boolean deltaSent = false;
private EntryEventImpl event;
private Object callbackArg;
private boolean prSingleHopEnabled;
private boolean requireOldValue;
private Object expectedOldValue;
public PutOpImpl(String regionName, Object key, Object value, byte[] deltaBytes,
EntryEventImpl event, Operation op, boolean requireOldValue, Object expectedOldValue,
Object callbackArg, boolean respondingToInvalidDelta, boolean prSingleHopEnabled) {
this(regionName, key, value, deltaBytes, event, op, requireOldValue, expectedOldValue,
callbackArg, respondingToInvalidDelta, respondingToInvalidDelta, prSingleHopEnabled);
PutOpImpl(Region region, Object key, Object value, byte[] deltaBytes,
EntryEventImpl event, Operation op, boolean requireOldValue, Object expectedOldValue,
Object callbackArg, boolean sendFullObj, boolean prSingleHopEnabled) {
this(region.getFullPath(), key, value, deltaBytes, event, op, requireOldValue,
callbackArg, false, sendFullObj, prSingleHopEnabled);
this.region = (LocalRegion) region;
private PutOpImpl(String regionName, Object key, Object value, byte[] deltaBytes,
EntryEventImpl event, Operation op, boolean requireOldValue, Object expectedOldValue,
Object callbackArg, boolean respondingToInvalidDelta, boolean sendFullObj,
boolean prSingleHopEnabled) {
7 + (callbackArg != null ? 1 : 0) + (expectedOldValue != null ? 1 : 0));
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("PutOpImpl constructing message for {}; operation={}", event.getEventId(),
this.key = key;
this.callbackArg = callbackArg;
this.event = event;
this.value = value;
this.regionName = regionName;
this.prSingleHopEnabled = prSingleHopEnabled;
this.requireOldValue = requireOldValue;
this.expectedOldValue = expectedOldValue;
getMessage().addStringPart(regionName, true);
int flags = 0;
if (requireOldValue)
flags |= 0x01;
if (expectedOldValue != null)
flags |= 0x02;
if (expectedOldValue != null) {
if (respondingToInvalidDelta) {
// Add message part for sending either delta or full value
if (!sendFullObj && deltaBytes != null && op == Operation.UPDATE) {
deltaSent = true;
if (isDebugEnabled) {
logger.debug("PutOp: Sending delta for key {}", this.key);
} else if (value instanceof CachedDeserializable) {
CachedDeserializable cd = (CachedDeserializable) value;
if (!cd.isSerialized()) {
// it is a byte[]
} else {
Object cdValue = cd.getValue();
if (cdValue instanceof byte[]) {
getMessage().addRawPart((byte[]) cdValue, true);
} else {
} else {
if (callbackArg != null) {
protected Object processResponse(Message msg) throws Exception {
throw new UnsupportedOperationException(
"processResponse should not be invoked in PutOp. Use processResponse(Message, Connection)");
* Process a response that contains an ack.
* @param msg the message containing the response
* @param con Connection on which this op is executing
* @throws Exception if response could not be processed or we received a response with a server
* exception.
* @since GemFire 6.1
protected Object processResponse(Message msg, Connection con) throws Exception {
processAck(msg, con);
if (prSingleHopEnabled) {
Part part = msg.getPart(0);
byte[] bytesReceived = part.getSerializedForm();
if (bytesReceived[0] != ClientMetadataService.INITIAL_VERSION
&& bytesReceived.length == ClientMetadataService.SIZE_BYTES_ARRAY_RECEIVED) {
if (region != null) {
ClientMetadataService cms = region.getCache().getClientMetadataService();
byte myVersion =
cms.getMetaDataVersion(region, Operation.UPDATE, key, value, callbackArg);
if (myVersion != bytesReceived[0] || isAllowDuplicateMetadataRefresh()) {
cms.scheduleGetPRMetaData(region, false, bytesReceived[1]);
if (msg.getMessageType() == MessageType.REPLY && msg.getNumberOfParts() > 1) {
int flags = msg.getPart(1).getInt();
int partIdx = 2;
Object oldValue = null;
if ((flags & HAS_OLD_VALUE_FLAG) != 0) {
oldValue = msg.getPart(partIdx++).getObject();
if ((flags & OLD_VALUE_IS_OBJECT_FLAG) != 0 && oldValue instanceof byte[]) {
ByteArrayDataInput din = new ByteArrayDataInput((byte[]) oldValue);
oldValue = DataSerializer.readObject(din);
// if the server has versioning we will attach it to the client's event
// here so it can be applied to the cache
if ((flags & HAS_VERSION_TAG) != 0) {
VersionTag tag = (VersionTag) msg.getPart(partIdx).getObject();
// we use the client's ID since we apparently don't track the server's ID in connections
tag.replaceNullIDs((InternalDistributedMember) con.getEndpoint().getMemberId());
checkForDeltaConflictAndSetVersionTag(tag, con);
return oldValue;
return null;
void checkForDeltaConflictAndSetVersionTag(VersionTag versionTag, Connection connection)
throws Exception {
RegionEntry regionEntry = ((EntryEventImpl) event).getRegionEntry();
if (regionEntry == null) {
VersionStamp versionStamp = regionEntry.getVersionStamp();
if (deltaSent && versionTag.getEntryVersion() > versionStamp.getEntryVersion() + 1) {
// Delta can't be applied, need to get full value.
if (logger.isDebugEnabled()) {
logger.debug("Version is out of order. Need to get from server to perform delta update.");
Object object = getFullValue(connection);
} else {
Object getFullValue(Connection connection) throws Exception {
GetOp.GetOpImpl getOp =
new GetOp.GetOpImpl(region, key, callbackArg, prSingleHopEnabled, event);
return getOp.attempt(connection);
* Process a response that contains an ack.
* @param msg the message containing the response
* @param con Connection on which this op is executing
* @throws Exception if response could not be processed or we received a response with a server
* exception.
* @since GemFire 6.1
private void processAck(Message msg, Connection con) throws Exception {
final int msgType = msg.getMessageType();
// Update delta stats
if (deltaSent && region != null) {
if (msgType != MessageType.REPLY) {
Part part = msg.getPart(0);
if (msgType == MessageType.PUT_DELTA_ERROR) {
if (logger.isDebugEnabled()) {
logger.debug("PutOp: Sending full value as delta failed on server...");
AbstractOp op = new PutOpImpl(regionName, key, value, null, event,
Operation.CREATE, requireOldValue, expectedOldValue, callbackArg,
true /* send full obj */, prSingleHopEnabled);
if (region != null) {
} else if (msgType == MessageType.EXCEPTION) {
String s = ": While performing a remote " + "put";
throw new ServerOperationException(s, (Throwable) part.getObject());
// Get the exception toString part.
// This was added for c++ thin client and not used in java
} else if (isErrorResponse(msgType)) {
throw new ServerOperationException(part.getString());
} else {
throw new InternalGemFireError(
"Unexpected message type " + MessageType.getString(msgType));
protected boolean isErrorResponse(int msgType) {
return msgType == MessageType.PUT_DATA_ERROR;
protected long startAttempt(ConnectionStats stats) {
return stats.startPut();
protected void endSendAttempt(ConnectionStats stats, long start) {
stats.endPutSend(start, hasFailed());
protected void endAttempt(ConnectionStats stats, long start) {
stats.endPut(start, hasTimedOut(), hasFailed());
public String toString() {
return "PutOp:" + key;