blob: de1309deaf3efacc6cb22a6da74fd883a331568d [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache.tier.sockets;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.GemFireIOException;
import com.gemstone.gemfire.InternalGemFireError;
import com.gemstone.gemfire.cache.query.CqQuery;
import com.gemstone.gemfire.cache.query.internal.cq.InternalCqQuery;
import com.gemstone.gemfire.cache.util.ObjectSizer;
import com.gemstone.gemfire.internal.DSCODE;
import com.gemstone.gemfire.internal.InternalDataSerializer;
import com.gemstone.gemfire.internal.Sendable;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.cache.CachedDeserializable;
import com.gemstone.gemfire.internal.cache.CachedDeserializableFactory;
import com.gemstone.gemfire.internal.cache.EntryEventImpl;
import com.gemstone.gemfire.internal.cache.Token;
import com.gemstone.gemfire.internal.cache.EntryEventImpl.NewValueImporter;
import com.gemstone.gemfire.internal.cache.EntryEventImpl.SerializedCacheValueImpl;
import com.gemstone.gemfire.internal.cache.EnumListenerEvent;
import com.gemstone.gemfire.internal.cache.EventID;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.WrappedCallbackArgument;
import com.gemstone.gemfire.internal.cache.ha.HAContainerRegion;
import com.gemstone.gemfire.internal.cache.lru.Sizeable;
import com.gemstone.gemfire.internal.cache.tier.MessageType;
import com.gemstone.gemfire.internal.cache.versions.VersionTag;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.offheap.SimpleMemoryAllocatorImpl;
/**
* Class <code>ClientUpdateMessageImpl</code> is a message representing a cache
* operation that is sent from a server to an interested client.
*
* @author Barry Oglesby
*
* @since 4.2
*/
public class ClientUpdateMessageImpl implements ClientUpdateMessage, Sizeable, NewValueImporter
{
private static final long serialVersionUID = 7037106666445312400L;
private static final Logger logger = LogService.getLogger();
/**
* The operation performed (e.g. AFTER_CREATE, AFTER_UPDATE, AFTER_DESTROY,
* AFTER_INVALIDATE, AFTER_REGION_DESTROY)
*/
protected EnumListenerEvent _operation;
/**
* The name of the <code>Region</code> that was updated
*/
private String _regionName;
/**
* The key that was updated
*/
private Object _keyOfInterest;
/**
* The new value
*/
private Object _value;
/**
* Whether the value is a serialized object or just a byte[]
*/
protected byte _valueIsObject;
/**
* The callback argument
*/
protected Object _callbackArgument;
/**
* The membership id of the originator of the event
*/
protected ClientProxyMembershipID _membershipId;
/**
* The event id of the event
*/
protected EventID _eventIdentifier;
private boolean _shouldConflate = false;
/**
* To determine if this client message is part of InterestList.
*/
private volatile boolean _isInterestListPassed;
/**
* To determine if this client message is part of CQs.
*/
private volatile boolean _hasCqs = false;
/**
* Map containing clientId and the cqs satisfied for the client.
*/
private ClientCqConcurrentMap _clientCqs = null;
/**
* Client list satisfying the interestList who want values
*/
private volatile Set<ClientProxyMembershipID> _clientInterestList;
/**
* Client list satisfying the interestList who want invalidations
*/
private volatile Set<ClientProxyMembershipID> _clientInterestListInv;
/**
* To determine if the message is result of netLoad.
* If its net load the message is not delivered to the client that has
* requested the load.
*/
private transient boolean _isNetLoad = false;
/**
* Represents the changed bytes of this event's _value.
*
* @since 6.1
*/
private byte[] deltaBytes = null;
private VersionTag versionTag;
/* added up all constants and form single value */
private static final int CONSTANT_MEMORY_OVERHEAD;
/**
* Constructor.
*
* @param operation
* The operation performed (e.g. AFTER_CREATE, AFTER_UPDATE,
* AFTER_DESTROY, AFTER_INVALIDATE, AFTER_REGION_DESTROY)
* @param region
* The <code>Region</code> that was updated
* @param keyOfInterest
* The key that was updated
* @param value
* The new value
* @param valueIsObject false if value is an actual byte[] that isn't serialized info
* @param callbackArgument
* The callback argument
* @param memberId
* membership id of the originator of the event
*/
public ClientUpdateMessageImpl(EnumListenerEvent operation,
LocalRegion region, Object keyOfInterest, Object value,
byte valueIsObject, Object callbackArgument,
ClientProxyMembershipID memberId, EventID eventIdentifier) {
this(operation, region, keyOfInterest, value, null, valueIsObject,
callbackArgument, memberId, eventIdentifier, null);
}
public ClientUpdateMessageImpl(EnumListenerEvent operation,
LocalRegion region, Object keyOfInterest, Object value,
byte[] delta, byte valueIsObject, Object callbackArgument,
ClientProxyMembershipID memberId, EventID eventIdentifier,
VersionTag versionTag) {
// this._clientInterestList = new HashSet();
// this._clientInterestListInv = new HashSet();
this._operation = operation;
this._regionName = region.getFullPath();
this._keyOfInterest = keyOfInterest;
this._value = value;
this._valueIsObject = valueIsObject;
this._callbackArgument = callbackArgument;
this._membershipId = memberId;
this._eventIdentifier = eventIdentifier;
this._shouldConflate = (isUpdate() && region.getEnableConflation());
this.deltaBytes = delta;
this.versionTag = versionTag;
}
/**
* Constructor used by ClientInstantiatorMessage
*
* @param operation
* The operation performed (e.g. AFTER_CREATE, AFTER_UPDATE,
* AFTER_DESTROY, AFTER_INVALIDATE, AFTER_REGION_DESTROY)
* @param memberId
* membership id of the originator of the event
* @param eventIdentifier
* EventID of this message
*/
protected ClientUpdateMessageImpl(EnumListenerEvent operation,
ClientProxyMembershipID memberId, EventID eventIdentifier) {
// this._clientInterestList = new HashSet();
// this._clientInterestListInv = new HashSet();
this._operation = operation;
this._membershipId = memberId;
this._eventIdentifier = eventIdentifier;
}
/**
* default constructor
*
*/
public ClientUpdateMessageImpl() {
}
public String getRegionName() {
return this._regionName;
}
public Object getKeyOfInterest() {
return this._keyOfInterest;
}
public EnumListenerEvent getOperation() {
return this._operation;
}
public Object getValue() {
return this._value;
}
public boolean valueIsObject() {
return (this._valueIsObject == 0x01);
}
/**
* @return the callback argument
*/
public Object getCallbackArgument() {
return this._callbackArgument;
}
/// Conflatable interface methods ///
/**
* Determines whether or not to conflate this message. This method will answer
* true IFF the message's operation is AFTER_UPDATE and its region has enabled
* are conflation. Otherwise, this method will answer false. Messages whose
* operation is AFTER_CREATE, AFTER_DESTROY, AFTER_INVALIDATE or
* AFTER_REGION_DESTROY are not conflated.
*
* @return Whether to conflate this message
*/
public boolean shouldBeConflated()
{
// If the message is an update, it may be conflatable. If it is a
// create, destroy, invalidate or destroy-region, it is not conflatable.
// Only updates are conflated. If it is an update, then verify that
// the region has conflation enabled.
return this._shouldConflate;
}
public String getRegionToConflate()
{
return this._regionName;
}
public Object getKeyToConflate()
{
return this._keyOfInterest;
}
public Object getValueToConflate()
{
return this._value;
}
public void setLatestValue(Object value)
{
// does this also need to set _valueIsObject
this._value = value;
}
/// End Conflatable interface methods ///
public ClientProxyMembershipID getMembershipId()
{
return this._membershipId;
}
/**
* Returns the unqiue event eventifier for event corresponding to this
* message.
*
* @return the unqiue event eventifier for event corresponding to this
* message.
*/
public EventID getEventId()
{
return this._eventIdentifier;
}
public VersionTag getVersionTag() {
return this.versionTag;
}
public boolean isCreate()
{
return this._operation == EnumListenerEvent.AFTER_CREATE;
}
public boolean isUpdate()
{
return this._operation == EnumListenerEvent.AFTER_UPDATE;
}
public boolean isDestroy()
{
return this._operation == EnumListenerEvent.AFTER_DESTROY;
}
public boolean isInvalidate()
{
return this._operation == EnumListenerEvent.AFTER_INVALIDATE;
}
public boolean isDestroyRegion()
{
return this._operation == EnumListenerEvent.AFTER_REGION_DESTROY;
}
public boolean isClearRegion()
{
return this._operation == EnumListenerEvent.AFTER_REGION_CLEAR;
}
public boolean isInvalidateRegion()
{
return this._operation == EnumListenerEvent.AFTER_REGION_INVALIDATE;
}
public boolean isClientCompatible()
{
return false;
}
public Message getMessage(CacheClientProxy proxy, boolean notify) throws IOException
{
// the MessageDispatcher uses getMessage(CacheClientProxy, byte[]) for this class
throw new Error("ClientUpdateMessage.getMessage(proxy) should not be invoked");
}
/**
* Returns a <code>Message</code> generated from the fields of this
* <code>ClientUpdateMessage</code>.
*
* @param latestValue
* Object containing the latest value to use. This could be the
* original value if conflation is not enabled, or it could be a
* conflated value if conflation is enabled.
* @return a <code>Message</code> generated from the fields of this
* <code>ClientUpdateMessage</code>
* @throws IOException
* @see com.gemstone.gemfire.internal.cache.tier.sockets.Message
*/
protected Message getMessage(CacheClientProxy proxy, byte[] latestValue) throws IOException
{
Version clientVersion = proxy.getVersion();
byte[] serializedValue = null;
Message message = null;
boolean conflation = false;
conflation = (proxy.clientConflation == HandShake.CONFLATION_ON)
|| (proxy.clientConflation == HandShake.CONFLATION_DEFAULT && this
.shouldBeConflated());
if(latestValue != null) {
serializedValue = latestValue;
}
else {
/**
* This means latestValue is instance of Delta, and its delta has already
* been extracted and put into deltaBytes. We serialize the value.
*/
if (this.deltaBytes == null || isCreate()) {
// Delta could not be extracted. We would need to send full value.
// OR A CREATE operation has a value which has delta. But we send full value for CREATE.
// So serialize it.
this._value = serializedValue = CacheServerHelper
.serialize(latestValue);
}
}
if (clientVersion.compareTo(Version.GFE_70) >= 0) {
message = getGFE70Message(proxy, serializedValue, conflation, clientVersion);
} else if (clientVersion.compareTo(Version.GFE_65) >= 0) {
message = getGFE65Message(proxy, serializedValue, conflation, clientVersion);
} else if (clientVersion.compareTo(Version.GFE_61) >= 0) {
message = getGFE61Message(proxy, serializedValue, conflation, clientVersion);
} else if (clientVersion.compareTo(Version.GFE_57) >= 0) {
message = getGFEMessage(proxy.getProxyID(), latestValue, clientVersion);
} else {
throw new IOException(
"Unsupported client version for server-to-client message creation: "
+ clientVersion);
}
return message;
}
protected Message getGFEMessage(ClientProxyMembershipID proxyId,
byte[] latestValue, Version clientVersion) throws IOException {
Message message = null;
// Add CQ info.
int cqMsgParts = 0;
boolean clientHasCq = this._hasCqs && (this.getCqs(proxyId) != null);
if (clientHasCq) {
cqMsgParts = (this.getCqs(proxyId).length * 2) + 1;
}
if (isCreate() || isUpdate()) {
// Create or update event
if (this._clientInterestListInv != null && this._clientInterestListInv.contains(proxyId)) {
// Notify all - do not send the value
message = new Message(6, clientVersion);
message.setMessageType(MessageType.LOCAL_INVALIDATE);
message.addStringPart(this._regionName);
// Currently serializing the key here instead of when the message
// is put in the queue so that it can be conflated it later
message.addStringOrObjPart(this._keyOfInterest);
message.addObjPart(this._callbackArgument);
message.addObjPart(Boolean.valueOf(isClientInterested(proxyId)));
message.addObjPart(Boolean.FALSE);
} else {
// Notify by subscription - send the value
message = new Message(7 + cqMsgParts, clientVersion);
// Set message type
if (isCreate()) {
message.setMessageType(MessageType.LOCAL_CREATE);
}
else {
message.setMessageType(MessageType.LOCAL_UPDATE);
}
message.addStringPart(this._regionName);
// Currently serializing the key here instead of when the message
// is put in the queue so that it can be conflated it later
message.addStringOrObjPart(this._keyOfInterest);
message.addRawPart(latestValue, (this._valueIsObject == 0x01));
message.addObjPart(this._callbackArgument);
message.addObjPart(Boolean.valueOf(isClientInterested(proxyId)));
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
this.addCqsToMessage(proxyId, message);
}
}
}
else if (isDestroy() || isInvalidate()) {
message = new Message(6 + cqMsgParts, clientVersion);
if (isDestroy()) {
message.setMessageType(MessageType.LOCAL_DESTROY);
}
else {
message.setMessageType(MessageType.LOCAL_INVALIDATE);
}
message.addStringPart(this._regionName);
// Currently serializing the key here instead of when the message
// is put in the queue so that it can be conflated it later
message.addStringOrObjPart(this._keyOfInterest);
message.addObjPart(this._callbackArgument);
message.addObjPart(Boolean.valueOf(isClientInterested(proxyId)));
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
this.addCqsToMessage(proxyId, message);
}
} else if (isDestroyRegion()) {
message = new Message(4 + cqMsgParts, clientVersion);
message.setMessageType(MessageType.LOCAL_DESTROY_REGION);
message.addStringPart(this._regionName);
message.addObjPart(this._callbackArgument);
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
this.addCqsToMessage(proxyId, message);
}
}
else if (isClearRegion()) {
message = new Message(4 + cqMsgParts, clientVersion);
message.setMessageType(MessageType.CLEAR_REGION);
message.addStringPart(this._regionName);
message.addObjPart(this._callbackArgument);
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
this.addCqsToMessage(proxyId, message);
}
}
else if (isInvalidateRegion()) {
message = new Message(4 + cqMsgParts, clientVersion);
message.setMessageType(MessageType.INVALIDATE_REGION);
message.addStringPart(this._regionName);
message.addObjPart(this._callbackArgument);
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
this.addCqsToMessage(proxyId, message);
}
}
else {
throw new InternalGemFireError("Don't know what kind of message");
}
message.setTransactionId(0);
// Add the EventId since 5.1 (used to prevent duplicate events
// received on the client side after a failover)
message.addObjPart(this._eventIdentifier);
return message;
}
protected Message getGFE61Message(CacheClientProxy proxy, byte[] latestValue,
boolean conflation, Version clientVersion) throws IOException {
Message message = null;
ClientProxyMembershipID proxyId = proxy.getProxyID();
// Add CQ info.
int cqMsgParts = 0;
boolean clientHasCq = this._hasCqs && (this.getCqs(proxyId) != null);
if (clientHasCq) {
cqMsgParts = (this.getCqs(proxyId).length * 2) + 1;
}
if (isCreate() || isUpdate()) {
// Create or update event
if (this._clientInterestListInv != null && this._clientInterestListInv.contains(proxyId)) {
// Notify all - do not send the value
message = new Message(6, clientVersion);
message.setMessageType(MessageType.LOCAL_INVALIDATE);
// Add the region name
message.addStringPart(this._regionName);
// Add the key
// Currently serializing the key here instead of when the message
// is put in the queue so that it can be conflated it later
message.addStringOrObjPart(this._keyOfInterest);
// Add the callback argument
message.addObjPart(this._callbackArgument);
// Add interestlist status.
message.addObjPart(Boolean.valueOf(isClientInterested(proxyId)));
// Add CQ status.
message.addObjPart(Boolean.FALSE);
}
else {
boolean isClientInterested = isClientInterested(proxyId);
// Notify by subscription - send the value
message = new Message(8 + cqMsgParts, clientVersion);
// Set message type
if (isCreate()) {
message.setMessageType(MessageType.LOCAL_CREATE);
// Add the region name
message.addStringPart(this._regionName);
// Add the key
// Currently serializing the key here instead of when the message
// is put in the queue so that it can be conflated it later
message.addStringOrObjPart(this._keyOfInterest);
message.addObjPart(Boolean.FALSE); // NO delta
// Add the value (which has already been serialized)
message.addRawPart(latestValue, (this._valueIsObject == 0x01));
}
else {
message.setMessageType(MessageType.LOCAL_UPDATE);
// Add the region name
message.addStringPart(this._regionName);
// Add the key
// Currently serializing the key here instead of when the message
// is put in the queue so that it can be conflated it later
message.addStringOrObjPart(this._keyOfInterest);
if (this.deltaBytes != null
&& !conflation
&& !proxy.isMarkerEnqueued()
&& !proxy.getRegionsWithEmptyDataPolicy()
.containsKey(_regionName)) {
message.addObjPart(Boolean.TRUE);
message.addBytesPart(this.deltaBytes);
proxy.getStatistics().incDeltaMessagesSent();
}
else {
message.addObjPart(Boolean.FALSE);
byte[] l = latestValue;
if (l == null) {
if (!(this._value instanceof byte[])) {
this._value = CacheServerHelper.serialize(this._value);
}
l = (byte[])this._value;
}
// Add the value (which has already been serialized)
message.addRawPart(l, (this._valueIsObject == 0x01));
}
}
// Add the callback argument
message.addObjPart(this._callbackArgument);
// Add interest list status.
message.addObjPart(Boolean.valueOf(isClientInterested));
// Add CQ status.
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
this.addCqsToMessage(proxyId, message);
}
}
}
else if (isDestroy() || isInvalidate()) {
// Destroy or invalidate event
message = new Message(6 + cqMsgParts, clientVersion);
if (isDestroy()) {
message.setMessageType(MessageType.LOCAL_DESTROY);
}
else {
message.setMessageType(MessageType.LOCAL_INVALIDATE);
}
message.addStringPart(this._regionName);
// Currently serializing the key here instead of when the message
// is put in the queue so that it can be conflated later
message.addStringOrObjPart(this._keyOfInterest);
message.addObjPart(this._callbackArgument);
message.addObjPart(Boolean.valueOf(isClientInterested(proxyId)));
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
this.addCqsToMessage(proxyId, message);
}
}
else if (isDestroyRegion()) {
message = new Message(4 + cqMsgParts, clientVersion);
message.setMessageType(MessageType.LOCAL_DESTROY_REGION);
message.addStringPart(this._regionName);
message.addObjPart(this._callbackArgument);
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
this.addCqsToMessage(proxyId, message);
}
}
else if (isClearRegion()) {
message = new Message(4 + cqMsgParts, clientVersion);
message.setMessageType(MessageType.CLEAR_REGION);
message.addStringPart(this._regionName);
message.addObjPart(this._callbackArgument);
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
this.addCqsToMessage(proxyId, message);
}
}
else if (isInvalidateRegion()) {
message = new Message(4 + cqMsgParts, clientVersion);
message.setMessageType(MessageType.INVALIDATE_REGION);
message.addStringPart(this._regionName);
message.addObjPart(this._callbackArgument);
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
this.addCqsToMessage(proxyId, message);
}
}
else {
throw new InternalGemFireError("Don't know what kind of message");
}
message.setTransactionId(0);
// Add the EventId since 5.1 (used to prevent duplicate events
// received on the client side after a failover)
message.addObjPart(this._eventIdentifier);
return message;
}
protected Message getGFE65Message(CacheClientProxy proxy,
byte[] p_latestValue, boolean conflation, Version clientVersion) throws IOException {
byte[] latestValue = p_latestValue;
Message message = null;
ClientProxyMembershipID proxyId = proxy.getProxyID();
// Add CQ info.
int cqMsgParts = 0;
boolean clientHasCq = this._hasCqs && (this.getCqs(proxyId) != null);
if (clientHasCq) {
cqMsgParts = (this.getCqs(proxyId).length * 2) + 1;
}
if (isCreate() || isUpdate()) {
// Create or update event
if (this._clientInterestListInv != null && this._clientInterestListInv.contains(proxyId)) {
// Client is registered for invalidates.
if (cqMsgParts > 0) {
cqMsgParts++; // To store base operation type for CQ.
}
message = new Message(6 + cqMsgParts, clientVersion);
message.setMessageType(MessageType.LOCAL_INVALIDATE);
// Add the region name
message.addStringPart(this._regionName);
// Add the key
// Currently serializing the key here instead of when the message
// is put in the queue so that it can be conflated it later
message.addStringOrObjPart(this._keyOfInterest);
}
else {
// Notify by subscription - send the value
message = new Message(8 + cqMsgParts, clientVersion);
// Set message type
if (isCreate()) {
message.setMessageType(MessageType.LOCAL_CREATE);
// Add the region name
message.addStringPart(this._regionName);
// Add the key
// Currently serializing the key here instead of when the message
// is put in the queue so that it can be conflated it later
message.addStringOrObjPart(this._keyOfInterest);
message.addObjPart(Boolean.FALSE); // NO delta
// Add the value (which has already been serialized)
message.addRawPart(latestValue, (this._valueIsObject == 0x01));
}
else {
message.setMessageType(MessageType.LOCAL_UPDATE);
// Add the region name
message.addStringPart(this._regionName);
// Add the key
// Currently serializing the key here instead of when the message
// is put in the queue so that it can be conflated it later
message.addStringOrObjPart(this._keyOfInterest);
if (this.deltaBytes != null
&& !conflation
&& !proxy.isMarkerEnqueued()
&& !proxy.getRegionsWithEmptyDataPolicy()
.containsKey(_regionName)) {
message.addObjPart(Boolean.TRUE);
message.addBytesPart(this.deltaBytes);
proxy.getStatistics().incDeltaMessagesSent();
}
else {
message.addObjPart(Boolean.FALSE);
if (latestValue == null) {
if (!(this._value instanceof byte[])) {
this._value = CacheServerHelper.serialize(this._value);
}
latestValue = (byte[])this._value;
}
// Add the value (which has already been serialized)
message.addRawPart(latestValue, (this._valueIsObject == 0x01));
}
}
}
message.addObjPart(this._callbackArgument);
message.addObjPart(Boolean.valueOf(isClientInterested(proxyId)));
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
if (message.getMessageType() == MessageType.LOCAL_INVALIDATE) {
//in case of invalidate, set the region operation type.
message.addIntPart(isCreate() ? MessageType.LOCAL_CREATE:MessageType.LOCAL_UPDATE);
}
this.addCqsToMessage(proxyId, message);
}
}
else if (isDestroy() || isInvalidate()) {
if (isDestroy()) {
message = new Message(6 + cqMsgParts, clientVersion);
message.setMessageType(MessageType.LOCAL_DESTROY);
}
else {
if (clientHasCq){
cqMsgParts++;/* To store the region operation for CQ */
}
message = new Message(6 + cqMsgParts, clientVersion);
message.setMessageType(MessageType.LOCAL_INVALIDATE);
}
message.addStringPart(this._regionName);
message.addStringOrObjPart(this._keyOfInterest);
message.addObjPart(this._callbackArgument);
message.addObjPart(Boolean.valueOf(isClientInterested(proxyId)));
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
if (isInvalidate()){
// This is to take care when invalidate message is getting sent
// to the Client. See the code for create/update operation.
message.addIntPart(MessageType.LOCAL_INVALIDATE);
}
this.addCqsToMessage(proxyId, message);
}
}
else if (isDestroyRegion()) {
message = new Message(4 + cqMsgParts, clientVersion);
message.setMessageType(MessageType.LOCAL_DESTROY_REGION);
message.addStringPart(this._regionName);
message.addObjPart(this._callbackArgument);
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
this.addCqsToMessage(proxyId, message);
}
}
else if (isClearRegion()) {
message = new Message(4 + cqMsgParts, clientVersion);
message.setMessageType(MessageType.CLEAR_REGION);
message.addStringPart(this._regionName);
message.addObjPart(this._callbackArgument);
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
this.addCqsToMessage(proxyId, message);
}
}
else if (isInvalidateRegion()) {
message = new Message(4 + cqMsgParts, clientVersion);
message.setMessageType(MessageType.INVALIDATE_REGION);
message.addStringPart(this._regionName);
message.addObjPart(this._callbackArgument);
// Add CQ status.
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
this.addCqsToMessage(proxyId, message);
}
}
else {
throw new InternalGemFireError("Don't know what kind of message");
}
message.setTransactionId(0);
// Add the EventId since 5.1 (used to prevent duplicate events
// received on the client side after a failover)
message.addObjPart(this._eventIdentifier);
return message;
}
protected Message getGFE70Message(CacheClientProxy proxy,
byte[] p_latestValue, boolean conflation, Version clientVersion) throws IOException {
byte[] latestValue = p_latestValue;
Message message = null;
ClientProxyMembershipID proxyId = proxy.getProxyID();
// Add CQ info.
int cqMsgParts = 0;
boolean clientHasCq = this._hasCqs && (this.getCqs(proxyId) != null);
if (clientHasCq) {
cqMsgParts = (this.getCqs(proxyId).length * 2) + 1;
}
if (isCreate() || isUpdate()) {
// Create or update event
if (this._clientInterestListInv != null && this._clientInterestListInv.contains(proxyId)) {
// Client is registered for invalidates.
if (cqMsgParts > 0) {
cqMsgParts++; // To store base operation type for CQ.
}
message = new Message(7 + cqMsgParts, clientVersion);
message.setMessageType(MessageType.LOCAL_INVALIDATE);
message.addStringPart(this._regionName);
message.addStringOrObjPart(this._keyOfInterest);
}
else {
// Notify by subscription - send the value
message = new Message(9 + cqMsgParts, clientVersion);
if (isCreate()) {
message.setMessageType(MessageType.LOCAL_CREATE);
message.addStringPart(this._regionName);
message.addStringOrObjPart(this._keyOfInterest);
message.addObjPart(Boolean.FALSE); // NO delta
// Add the value (which has already been serialized)
message.addRawPart(latestValue, (this._valueIsObject == 0x01));
}
else {
message.setMessageType(MessageType.LOCAL_UPDATE);
message.addStringPart(this._regionName);
message.addStringOrObjPart(this._keyOfInterest);
if (this.deltaBytes != null
&& !conflation
&& !proxy.isMarkerEnqueued()
&& !proxy.getRegionsWithEmptyDataPolicy()
.containsKey(_regionName)) {
message.addObjPart(Boolean.TRUE);
message.addBytesPart(this.deltaBytes);
proxy.getStatistics().incDeltaMessagesSent();
}
else {
message.addObjPart(Boolean.FALSE);
if (latestValue == null) {
if (!(this._value instanceof byte[])) {
this._value = CacheServerHelper.serialize(this._value);
}
latestValue = (byte[])this._value;
}
// Add the value (which has already been serialized)
message.addRawPart(latestValue, (this._valueIsObject == 0x01));
}
}
}
message.addObjPart(this._callbackArgument);
if (this.versionTag != null) {
this.versionTag.setCanonicalIDs(proxy.getCache().getDistributionManager());
}
message.addObjPart(this.versionTag);
message.addObjPart(Boolean.valueOf(isClientInterested(proxyId)));
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
if (message.getMessageType() == MessageType.LOCAL_INVALIDATE) {
//in case of invalidate, set the region operation type.
message.addIntPart(isCreate() ? MessageType.LOCAL_CREATE:MessageType.LOCAL_UPDATE);
}
this.addCqsToMessage(proxyId, message);
}
}
else if (isDestroy() || isInvalidate()) {
if (isDestroy()) {
message = new Message(7 + cqMsgParts, clientVersion);
message.setMessageType(MessageType.LOCAL_DESTROY);
}
else {
if (clientHasCq){
cqMsgParts++;/* To store the region operation for CQ */
}
message = new Message(7 + cqMsgParts, clientVersion);
message.setMessageType(MessageType.LOCAL_INVALIDATE);
}
message.addStringPart(this._regionName);
message.addStringOrObjPart(this._keyOfInterest);
message.addObjPart(this._callbackArgument);
message.addObjPart(this.versionTag);
message.addObjPart(Boolean.valueOf(isClientInterested(proxyId)));
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
if (isInvalidate()){
// This is to take care when invalidate message is getting sent
// to the Client. See the code for create/update operation.
message.addIntPart(MessageType.LOCAL_INVALIDATE);
}
this.addCqsToMessage(proxyId, message);
}
}
else if (isDestroyRegion()) {
message = new Message(4 + cqMsgParts, clientVersion);
message.setMessageType(MessageType.LOCAL_DESTROY_REGION);
message.addStringPart(this._regionName);
message.addObjPart(this._callbackArgument);
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
this.addCqsToMessage(proxyId, message);
}
}
else if (isClearRegion()) {
message = new Message(4 + cqMsgParts, clientVersion);
message.setMessageType(MessageType.CLEAR_REGION);
message.addStringPart(this._regionName);
message.addObjPart(this._callbackArgument);
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
this.addCqsToMessage(proxyId, message);
}
}
else if (isInvalidateRegion()) {
message = new Message(4 + cqMsgParts, clientVersion);
message.setMessageType(MessageType.INVALIDATE_REGION);
message.addStringPart(this._regionName);
message.addObjPart(this._callbackArgument);
// Add CQ status.
message.addObjPart(Boolean.valueOf(clientHasCq));
if (clientHasCq) {
this.addCqsToMessage(proxyId, message);
}
}
else {
throw new InternalGemFireError("Don't know what kind of message");
}
message.setTransactionId(0);
// Add the EventId since 5.1 (used to prevent duplicate events
// received on the client side after a failover)
message.addObjPart(this._eventIdentifier);
return message;
}
/**
* @return boolean true if the event is due to net load.
*/
public boolean isNetLoad() {
return this._isNetLoad;
}
/**
* @param isNetLoad boolean true if the event is due to net load.
*/
public void setIsNetLoad(boolean isNetLoad) {
this._isNetLoad = isNetLoad;
}
/**
* @return boolean true if cq info is present for the given proxy.
*/
public boolean hasCqs(ClientProxyMembershipID clientId) {
if (this._clientCqs != null) {
CqNameToOp cqs = this._clientCqs.get(clientId);
if (cqs != null && !cqs.isEmpty()) {
return true;
}
}
return false;
}
/**
* @return boolean true if cq info is present.
*/
public boolean hasCqs() {
return this._hasCqs;
}
/**
* Returns the cqs for the given client.
* @return cqNames
*/
public String[] getCqs(ClientProxyMembershipID clientId) {
String[] cqNames = null;
if (this._clientCqs != null) {
CqNameToOp cqs = this._clientCqs.get(clientId);
if (cqs != null && !cqs.isEmpty()) {
cqNames = cqs.getNames();
}
}
return cqNames;
}
public ClientCqConcurrentMap getClientCqs(){
return this._clientCqs;
}
/**
* Add cqs for the given client.
* @param clientId
* @param filteredCqs
*/
public void addClientCqs(ClientProxyMembershipID clientId, CqNameToOp filteredCqs) {
if (this._clientCqs == null) {
this._clientCqs = new ClientCqConcurrentMap();
this._hasCqs = true;
}
this._clientCqs.put(clientId, filteredCqs);
}
public void addClientCq(ClientProxyMembershipID clientId, String cqName, Integer cqEvent){
if (this._clientCqs == null) {
this._clientCqs = new ClientCqConcurrentMap();
this._hasCqs = true;
}
CqNameToOp cqInfo = this._clientCqs.get(clientId);
if (cqInfo == null) {
cqInfo = new CqNameToOpSingleEntry(cqName, cqEvent);
this._clientCqs.put(clientId, cqInfo);
} else if (!cqInfo.isFull()) {
cqInfo.add(cqName, cqEvent);
} else {
cqInfo = new CqNameToOpHashMap((CqNameToOpSingleEntry)cqInfo);
cqInfo.add(cqName, cqEvent);
this._clientCqs.put(clientId, cqInfo);
}
}
private void addCqsToMessage(ClientProxyMembershipID proxyId, Message message) {
if (this._clientCqs != null) {
CqNameToOp cqs = this._clientCqs.get(proxyId);
if (cqs != null) {
message.addIntPart(cqs.size() * 2);
cqs.addToMessage(message);
}
}
}
public void removeClientCq(ClientProxyMembershipID clientId, InternalCqQuery cqToClose) {
CqNameToOp cqs = getClientCq(clientId);
if (cqs != null) {
cqs.delete(cqToClose.getName());
//remove clientId key if no more cqs exist for this clientId
if (cqs.isEmpty()) {
this._clientCqs.remove(clientId);
}
}
}
/**
* Set the region name that was updated.
*/
public void setRegionName(String regionName)
{
this._regionName = regionName;
}
/**
* @param eventId
* @see HAEventWrapper#fromData(DataInput)
* @see HAContainerRegion#get(Object)
*/
public void setEventIdentifier(EventID eventId) {
if (this._eventIdentifier == null) {
this._eventIdentifier = eventId;
}
}
/**
* @param clientCqs
* @see HAEventWrapper#fromData(DataInput)
* @see HAContainerRegion#get(Object)
*/
public void setClientCqs(ClientCqConcurrentMap clientCqs) {
if (this._clientCqs == null) {
this._clientCqs = clientCqs;
}
}
/*
private void writeCqInfo(ObjectOutput out) throws IOException {
// Write Client CQ Size
out.writeInt(this._clientCqs.size());
// For each client.
Iterator entries = this._clientCqs.entrySet().iterator();
while (entries.hasNext()) {
Map.Entry entry = (Map.Entry)entries.next();
// Write ProxyId.
ClientProxyMembershipID proxyId = (ClientProxyMembershipID)entry.getKey();
proxyId.toData(out);
HashMap cqs = (HashMap)entry.getValue();
// Write CQ size for each Client.
out.writeInt(cqs.size());
Iterator clients = cqs.entrySet().iterator();
while (clients.hasNext()) {
Map.Entry client = (Map.Entry)clients.next();
// Write CQ Name.
String cq = (String)client.getKey();
out.writeObject(cq);
// Write CQ OP.
int cqOp = ((Integer)client.getValue()).intValue();
out.writeInt(cqOp);
}
} // while
}
*/
/*
private void readCqInfo(ObjectInput in) throws IOException,
ClassNotFoundException {
// Read Client CQ Size
int numClientIds = in.readInt();
this._clientCqs = new HashMap();
// For each Client.
for (int cCnt=0; cCnt < numClientIds; cCnt++){
ClientProxyMembershipID proxyId = new ClientProxyMembershipID();
// Read Proxy id.
proxyId.fromData(in);
// read CQ size for each Client.
int numCqs = in.readInt();
HashMap cqs = new HashMap();
for (int cqCnt=0; cqCnt < numCqs; cqCnt++){
// Get CQ Name and CQ Op.
// Read CQ Name.
String cqName = (String)in.readObject();
int cqOp = in.readInt();
// Read CQ Op.
cqs.put(cqName, Integer.valueOf(cqOp));
}
this._clientCqs.put(proxyId, cqs);
}
}
*/
public void addClientInterestList(Set clientIds, boolean receiveValues) {
if (receiveValues) {
if (this._clientInterestList == null) {
this._clientInterestList = clientIds;
} else {
this._clientInterestList.addAll(clientIds);
}
} else {
if (this._clientInterestListInv == null) {
this._clientInterestListInv = clientIds;
} else {
this._clientInterestListInv.addAll(clientIds);
}
}
}
public void addClientInterestList(ClientProxyMembershipID clientId, boolean receiveValues) {
// This happens under synchronization on HAContainer.
HashSet <ClientProxyMembershipID>newInterests;
if (receiveValues) {
if (this._clientInterestList == null) {
newInterests = new HashSet<ClientProxyMembershipID>();
} else {
newInterests = new HashSet<ClientProxyMembershipID>(this._clientInterestList);
}
newInterests.add(clientId);
this._clientInterestList = newInterests;
} else {
if (this._clientInterestListInv == null) {
newInterests = new HashSet<ClientProxyMembershipID>();
} else {
newInterests = new HashSet<ClientProxyMembershipID>(this._clientInterestListInv);
}
newInterests.add(clientId);
this._clientInterestListInv = newInterests;
}
}
public boolean isClientInterested(ClientProxyMembershipID clientId) {
return (this._clientInterestList != null && this._clientInterestList.contains(clientId))
|| (this._clientInterestListInv != null && this._clientInterestListInv.contains(clientId));
}
public boolean isClientInterestedInUpdates(ClientProxyMembershipID clientId) {
return (this._clientInterestList != null && this._clientInterestList.contains(clientId));
}
public boolean isClientInterestedInInvalidates(ClientProxyMembershipID clientId) {
return (this._clientInterestListInv != null && this._clientInterestListInv.contains(clientId));
}
protected Object deserialize(byte[] serializedBytes)
{
Object deserializedObject = serializedBytes;
// This is a debugging method so ignore all exceptions like
// ClassNotFoundException
try {
DataInputStream dis = new DataInputStream(new ByteArrayInputStream(
serializedBytes));
deserializedObject = DataSerializer.readObject(dis);
}
catch (Exception e) {
}
return deserializedObject;
}
@Override
public String toString()
{
StringBuffer buffer = new StringBuffer();
buffer.append("ClientUpdateMessageImpl[")
.append("op=").append(this._operation)
.append(";region=").append(this._regionName)
.append(";key=").append(this._keyOfInterest);
if (logger.isTraceEnabled()) {
buffer.append(";value=").append(
(this._value instanceof byte[]) ? deserialize((byte[])this._value)
: this._value);
}
buffer
.append(";isObject=").append(_valueIsObject)
.append(";cbArg=").append(this._callbackArgument)
.append(";memberId=").append(this._membershipId)
.append(";eventId=").append(_eventIdentifier)
.append(";shouldConflate=").append(_shouldConflate)
.append(";versionTag=").append(this.versionTag)
.append(";hasCqs=").append(this._hasCqs)
// skip _logger :-)
.append("]");
return buffer.toString();
}
public int getDSFID() {
return CLIENT_UPDATE_MESSAGE;
}
public void toData(DataOutput out) throws IOException
{
out.writeByte(_operation.getEventCode());
DataSerializer.writeString(_regionName,out);
DataSerializer.writeObject(_keyOfInterest,out);
if (_value instanceof byte[]) {
DataSerializer.writeByteArray((byte[])_value, out);
}
else {
DataSerializer.writeByteArray(CacheServerHelper.serialize(_value), out);
}
out.writeByte(_valueIsObject);
DataSerializer.writeObject(_membershipId,out);
//DataSerializer.writeObject(_eventIdentifier,out);
out.writeBoolean(_shouldConflate);
out.writeBoolean(_isInterestListPassed);
DataSerializer.writeByteArray(this.deltaBytes, out);
out.writeBoolean(_hasCqs);
//if (_hasCqs) {
// DataSerializer.writeHashMap(this._clientCqs, out);
//}
DataSerializer.writeObject(_callbackArgument ,out);
DataSerializer.writeHashSet((HashSet)this._clientInterestList, out);
DataSerializer.writeHashSet((HashSet)this._clientInterestListInv, out);
DataSerializer.writeObject(this.versionTag, out);
}
public void fromData(DataInput in) throws IOException, ClassNotFoundException
{
this._operation = EnumListenerEvent.getEnumListenerEvent(in.readByte());
this._regionName =DataSerializer.readString(in);
this._keyOfInterest = DataSerializer.readObject(in);
this._value = DataSerializer.readByteArray(in);
this._valueIsObject = in.readByte();
this._membershipId = ClientProxyMembershipID.readCanonicalized(in);
//this._eventIdentifier = (EventID)DataSerializer.readObject(in);;
this._shouldConflate = in.readBoolean();
this._isInterestListPassed = in.readBoolean();
this.deltaBytes = DataSerializer.readByteArray(in);
this._hasCqs = in.readBoolean();
//if (this._hasCqs) {
// this._clientCqs = DataSerializer.readHashMap(in);
//}
this._callbackArgument = DataSerializer.readObject(in);
CacheClientNotifier ccn = CacheClientNotifier.getInstance();
HashSet ids = DataSerializer.readHashSet(in);
if (ccn != null && ids != null) { // use canonical IDs in servers
ids = (HashSet)ccn.getProxyIDs(ids);
}
this._clientInterestList = ids;
ids = DataSerializer.readHashSet(in);
if (ccn != null && ids != null) {
ids = (HashSet)ccn.getProxyIDs(ids);
}
this._clientInterestListInv = ids;
this.versionTag = (VersionTag)DataSerializer.readObject(in);
}
private Object getOriginalCallbackArgument() {
Object result = this._callbackArgument;
while (result instanceof WrappedCallbackArgument) {
WrappedCallbackArgument wca = (WrappedCallbackArgument)result;
result = wca.getOriginalCallbackArg();
}
return result;
}
/*
* Statically calculate constant overhead for ClientUpdateMessageImpl
* instance.
*/
static {
// The sizes of the following variables are calculated:
// - primitive and object instance variable references
//
// The sizes of the following variables are not calculated:
// - the key because it is a reference
// - the region and regionName because they are references
// - the operation because it is a reference
// - the membershipId because it is a reference
// - the logger because it is a reference
// - the keyOfInterest because it is a reference
// - the clientCqs because it is a reference
// - the clientInterestList because it is a reference
// - the eventIdentifier because it is a reference
// The size of instances of the following internal datatypes were estimated
// using a NullDataOutputStream and hardcoded into this method:
// - the id (an instance of EventId)
int size = 0;
// Add overhead for this instance.
size += Sizeable.PER_OBJECT_OVERHEAD;
// Add object references
//_operation reference = 4 bytes
//_regionName reference = 4 bytes
//_keyOfInterest reference = 4 bytes
//_value reference = 4 bytes
//_callbackArgument reference = 4 bytes
//_membershipId reference = 4 bytes
//_eventIdentifier reference = 4 bytes
//_logger reference = 4 bytes
//_clientCqs reference = 4 bytes
//_clientInterestList reference = 4 bytes
size += 40;
// Add primitive references
// byte _valueIsObject = 1 byte
// boolean _shouldConflate = 1 byte
// boolean _isInterestListPassed = 1 byte
// boolean _hasCqs = 1 byte
// boolean _isNetLoad = 1 byte
size += 5;
// not sure on the kind on wrapper is around callbackArgument
// The callback argument (a GatewayEventCallbackArgument wrapping an Object
// which is the original callback argument)
// The hardcoded value below represents the GatewayEventCallbackArgument
// and was estimated using a NullDataOutputStream
size += Sizeable.PER_OBJECT_OVERHEAD + 194; // do we need it
// add overhead for callback Argument
size += Sizeable.PER_OBJECT_OVERHEAD;
// total overhead
CONSTANT_MEMORY_OVERHEAD = size;
}
public int getSizeInBytes() {
int size = CONSTANT_MEMORY_OVERHEAD;
// The value (a byte[])
if (this._value != null) {
size += CachedDeserializableFactory.calcMemSize(this._value);
}
// The sizeOf call gets the size of the input callback argument.
size += sizeOf(getOriginalCallbackArgument());
return size;
}
private int sizeOf(Object obj) {
int size = 0;
if (obj == null) {
return size;
}
if (obj instanceof String) {
size = ObjectSizer.DEFAULT.sizeof(obj);
}
else if (obj instanceof Integer) {
size = 4; // estimate
}
else if (obj instanceof Long) {
size = 8; // estimate
}
else {
size = CachedDeserializableFactory.calcMemSize(obj)
- Sizeable.PER_OBJECT_OVERHEAD;
}
return size;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.cache.tier.sockets.ClientUpdateMessage#needsNoAuthorizationCheck()
*/
public boolean needsNoAuthorizationCheck() {
return false;
}
@Override
public CqNameToOp getClientCq(ClientProxyMembershipID clientId) {
if(this._clientCqs!=null){
return this._clientCqs.get(clientId);
}else{
return null;
}
}
@Override
public Version[] getSerializationVersions() {
return null;
}
/**
* Even though this class is just a ConcurrentHashMap I wanted it to be
* its own class so it could be easily identified in heap dumps.
* The concurrency level on these should be 1 to keep their memory footprint down.
*/
public static class ClientCqConcurrentMap extends ConcurrentHashMap<ClientProxyMembershipID, CqNameToOp> {
public ClientCqConcurrentMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
super(initialCapacity, loadFactor, concurrencyLevel);
}
public ClientCqConcurrentMap() {
super(16, 1.0f, 1);
}
}
/**
* Replaces what used to be a HashMap<String, Integer>.
*/
public interface CqNameToOp extends Sendable {
public boolean isEmpty();
/**
* Returns true if calling add would fail.
*/
public boolean isFull();
public void addToMessage(Message message);
public int size();
public String[] getNames();
public void add(String name, Integer op);
public void delete(String name);
}
/**
* Contains either zero or one String to int tuples.
* This is a common case and this impl has a much smaller
* memory footprint than a HashMap with one entry.
*/
public static class CqNameToOpSingleEntry implements CqNameToOp {
private String name;
private int op;
public CqNameToOpSingleEntry(String name, Integer op) {
this.name = name;
this.op = op.intValue();
}
@Override
public void sendTo(DataOutput out) throws IOException {
// When serialized it needs to look just as if writeObject was called on a HASH_MAP
out.writeByte(DSCODE.HASH_MAP);
int size = size();
InternalDataSerializer.writeArrayLength(size, out);
if (size > 0) {
DataSerializer.writeObject(this.name, out);
DataSerializer.writeObject(Integer.valueOf(this.op), out);
}
}
@Override
public boolean isEmpty() {
return this.name == null;
}
@Override
public void addToMessage(Message message) {
if (!isEmpty()) {
message.addStringPart(this.name);
message.addIntPart(this.op);
}
}
@Override
public int size() {
return isEmpty() ? 0 : 1;
}
@Override
public String[] getNames() {
if (isEmpty()) {
return new String[0];
} else {
return new String[]{this.name};
}
}
@Override
public void add(String name, Integer op) {
if (isEmpty()) {
this.name = name;
this.op = op.intValue();
} else if (this.name.equals(name)) {
this.op = op.intValue();
} else {
throw new IllegalStateException("tried to add to a full CqNameToOpSingleEntry");
}
}
@Override
public void delete(String name) {
if (name.equals(this.name)) {
this.name = null;
}
}
@Override
public boolean isFull() {
return !isEmpty();
}
}
/**
* Basically just a HashMap<String, Integer> but limits itself to the
* CqNameToOp interface.
*/
public static class CqNameToOpHashMap extends HashMap<String, Integer> implements CqNameToOp {
public CqNameToOpHashMap(int initialCapacity) {
super(initialCapacity, 1.0f);
}
public CqNameToOpHashMap(CqNameToOpSingleEntry se) {
super(2, 1.0f);
add(se.name, se.op);
}
@Override
public void sendTo(DataOutput out) throws IOException {
// When serialized it needs to look just as if writeObject was called on a HASH_MAP
out.writeByte(DSCODE.HASH_MAP);
DataSerializer.writeHashMap(this, out);
}
@Override
public String[] getNames() {
String[] cqNames = new String[size()];
cqNames = keySet().toArray(cqNames);
return cqNames;
}
@Override
public void addToMessage(Message message) {
Iterator<Entry<String, Integer>> entries = entrySet().iterator();
while (entries.hasNext()) {
Entry<String, Integer> entry = entries.next();
// Add CQ Name.
String cq = entry.getKey();
message.addStringPart(cq);
// Add CQ Op.
int op = entry.getValue().intValue();
message.addIntPart(op);
}
}
@Override
public void add(String name, Integer op) {
put(name, op);
}
@Override
public void delete(String name) {
remove(name);
}
@Override
public boolean isFull() {
return false;
}
}
// NewValueImporter methods
@Override
public boolean prefersNewSerialized() {
return true;
}
@Override
public boolean isUnretainedNewReferenceOk() {
return false;
}
@Override
public void importNewObject(Object nv, boolean isSerialized) {
if (!isSerialized) {
throw new IllegalStateException("Expected importNewBytes to be called.");
}
try {
this._value = CacheServerHelper.serialize(nv);
} catch (IOException e) {
throw new GemFireIOException("Exception serializing entry value", e);
}
}
@Override
public void importNewBytes(byte[] nv, boolean isSerialized) {
if (!isSerialized) {
// The value is already a byte[]. Set _valueIsObject flag to 0x00
// (not an object)
this._valueIsObject = 0x00;
}
this._value = nv;
}
}