blob: 8d24700de40acac1027fe85d4aeee717d4290669 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.logging.log4j.Logger;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.persistence.PersistentReplicatesOfflineException;
import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DirectReplyProcessor;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.ByteArrayDataInput;
import org.apache.geode.internal.DataSerializableFixedID;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
import org.apache.geode.internal.cache.ha.ThreadIdentifier;
import org.apache.geode.internal.cache.partitioned.PutAllPRMessage;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList;
import org.apache.geode.internal.cache.tx.RemotePutAllMessage;
import org.apache.geode.internal.cache.versions.DiskVersionTag;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.offheap.annotations.Released;
import org.apache.geode.internal.offheap.annotations.Retained;
import org.apache.geode.internal.offheap.annotations.Unretained;
/**
* Handles distribution of a Region.putall operation.
*
* @since GemFire 5.0
*/
public class DistributedPutAllOperation extends AbstractUpdateOperation {
private static final Logger logger = LogService.getLogger();
protected final PutAllEntryData[] putAllData;
public int putAllDataSize;
protected boolean isBridgeOp = false;
static final byte USED_FAKE_EVENT_ID = 0x01;
static final byte NOTIFY_ONLY = 0x02;
static final byte FILTER_ROUTING = 0x04;
static final byte VERSION_TAG = 0x08;
static final byte POSDUP = 0x10;
static final byte PERSISTENT_TAG = 0x20;
static final byte HAS_CALLBACKARG = 0x40;
static final byte HAS_TAILKEY = (byte) 0x80;
// flags for CachedDeserializable; additional flags can be combined
// with these if required
static final byte IS_CACHED_DESER = 0x1;
static final byte IS_OBJECT = 0x2;
// private boolean containsCreate = false;
public DistributedPutAllOperation(CacheEvent event, int size, boolean isBridgeOp) {
super(event, ((EntryEventImpl) event).getEventTime(0L));
this.putAllData = new PutAllEntryData[size];
this.putAllDataSize = 0;
this.isBridgeOp = isBridgeOp;
}
/**
* return if the operation is bridge operation
*/
public boolean isBridgeOperation() {
return this.isBridgeOp;
}
public PutAllEntryData[] getPutAllEntryData() {
return putAllData;
}
public void setPutAllEntryData(PutAllEntryData[] putAllEntryData) {
for (int i = 0; i < putAllEntryData.length; i++) {
putAllData[i] = putAllEntryData[i];
}
this.putAllDataSize = putAllEntryData.length;
}
/**
* Add an entry that this putall operation should distribute.
*/
public void addEntry(PutAllEntryData putAllEntry) {
this.putAllData[this.putAllDataSize] = putAllEntry;
this.putAllDataSize += 1;
// cachedEvents.add(ev);
}
/**
* Add an entry that this putall operation should distribute.
*/
public void addEntry(EntryEventImpl ev) {
this.putAllData[this.putAllDataSize] = new PutAllEntryData(ev);
this.putAllDataSize += 1;
// cachedEvents.add(ev);
}
/**
* Add an entry that this putall operation should distribute. This method is for a special case:
* the callback will be called after this in hasSeenEvent() case, so we should change the status
* beforehand
*/
public void addEntry(EntryEventImpl ev, boolean newCallbackInvoked) {
this.putAllData[this.putAllDataSize] = new PutAllEntryData(ev);
this.putAllData[this.putAllDataSize].setCallbacksInvoked(newCallbackInvoked);
this.putAllDataSize += 1;
// cachedEvents.add(ev);
}
/**
* Add an entry for PR bucket's msg.
*
* @param ev event to be added
* @param bucketId message is for this bucket
*/
public void addEntry(EntryEventImpl ev, Integer bucketId) {
this.putAllData[this.putAllDataSize] = new PutAllEntryData(ev);
this.putAllData[this.putAllDataSize].setBucketId(bucketId);
this.putAllDataSize += 1;
// cachedEvents.add(ev);
}
/**
* set using fake thread id
*
* @param status whether the entry is using fake event id
*/
public void setUseFakeEventId(boolean status) {
for (int i = 0; i < putAllDataSize; i++) {
putAllData[i].setUsedFakeEventId(status);
}
}
/**
* In the originating cache, this returns an iterator on the list of events caused by the putAll
* operation. This is cached for listener notification purposes. The iterator is guaranteed to
* return events in the order they are present in putAllData[]
*/
public Iterator eventIterator() {
return new Iterator() {
int position = 0;
@Override
public boolean hasNext() {
return DistributedPutAllOperation.this.putAllDataSize > position;
};
@Override
@Unretained
public Object next() {
@Unretained
EntryEventImpl ev = getEventForPosition(position);
position++;
return ev;
};
@Override
public void remove() {
throw new UnsupportedOperationException();
};
};
}
public void freeOffHeapResources() {
// I do not use eventIterator here because it forces the lazy creation of EntryEventImpl by
// calling getEventForPosition.
for (int i = 0; i < this.putAllDataSize; i++) {
PutAllEntryData entry = this.putAllData[i];
if (entry != null && entry.event != null) {
entry.event.release();
}
}
}
@Unretained
public EntryEventImpl getEventForPosition(int position) {
PutAllEntryData entry = this.putAllData[position];
if (entry == null) {
return null;
}
if (entry.event != null) {
return entry.event;
}
LocalRegion region = (LocalRegion) this.event.getRegion();
@Retained
EntryEventImpl ev = EntryEventImpl.create(region, entry.getOp(), entry.getKey(),
null/* value */, this.event.getCallbackArgument(), false /* originRemote */,
this.event.getDistributedMember(), this.event.isGenerateCallbacks(), entry.getEventID());
boolean returnedEv = false;
try {
ev.setPossibleDuplicate(entry.isPossibleDuplicate());
if (entry.versionTag != null && region.getConcurrencyChecksEnabled()) {
VersionSource id = entry.versionTag.getMemberID();
if (id != null) {
entry.versionTag.setMemberID(ev.getRegion().getVersionVector().getCanonicalId(id));
}
ev.setVersionTag(entry.versionTag);
}
entry.event = ev;
returnedEv = true;
Object entryValue = entry.getValue(region.getCache());
if (entryValue == null
&& ev.getRegion().getAttributes().getDataPolicy() == DataPolicy.NORMAL) {
ev.setLocalInvalid(true);
}
ev.setNewValue(entryValue);
ev.setOldValue(entry.getOldValue());
CqService cqService = region.getCache().getCqService();
if (cqService.isRunning() && !entry.getOp().isCreate() && !ev.hasOldValue()) {
ev.setOldValueForQueryProcessing();
}
ev.setInvokePRCallbacks(!entry.isNotifyOnly());
if (getBaseEvent().getContext() != null) {
ev.setContext(getBaseEvent().getContext());
}
ev.callbacksInvoked(entry.isCallbacksInvoked());
ev.setTailKey(entry.getTailKey());
return ev;
} finally {
if (!returnedEv) {
ev.release();
}
}
}
public EntryEventImpl getBaseEvent() {
return getEvent();
}
/**
* Data that represents a single entry being putall'd.
*/
public static class PutAllEntryData {
final Object key;
private Object value;
private final Object oldValue;
private final Operation op;
private EventID eventID;
transient EntryEventImpl event;
private Integer bucketId = Integer.valueOf(-1);
protected transient boolean callbacksInvoked = false;
public FilterRoutingInfo filterRouting;
// One flags byte for all booleans
protected byte flags = 0x00;
// TODO: Yogesh, this should be intialized and sent on wire only when
// parallel wan is enabled
private Long tailKey = 0L;
public VersionTag versionTag;
transient boolean inhibitDistribution;
/**
* Constructor to use when preparing to send putall data out
*/
public PutAllEntryData(EntryEventImpl event) {
this.key = event.getKey();
this.value = event.getRawNewValueAsHeapObject();
Object oldValue = event.getRawOldValueAsHeapObject();
if (oldValue == Token.NOT_AVAILABLE || Token.isRemoved(oldValue)) {
this.oldValue = null;
} else {
this.oldValue = oldValue;
}
this.op = event.getOperation();
this.eventID = event.getEventId();
this.tailKey = event.getTailKey();
this.versionTag = event.getVersionTag();
setNotifyOnly(!event.getInvokePRCallbacks());
setCallbacksInvoked(event.callbacksInvoked());
setPossibleDuplicate(event.isPossibleDuplicate());
setInhibitDistribution(event.getInhibitDistribution());
}
/**
* Constructor to use when receiving a putall from someone else
*/
public PutAllEntryData(DataInput in, EventID baseEventID, int idx, Version version,
ByteArrayDataInput bytesIn) throws IOException, ClassNotFoundException {
this.key = DataSerializer.readObject(in);
byte flgs = in.readByte();
if ((flgs & IS_OBJECT) != 0) {
this.value = DataSerializer.readObject(in);
} else {
byte[] bb = DataSerializer.readByteArray(in);
if ((flgs & IS_CACHED_DESER) != 0) {
this.value = new FutureCachedDeserializable(bb);
} else {
this.value = bb;
}
}
this.oldValue = null;
this.op = Operation.fromOrdinal(in.readByte());
this.flags = in.readByte();
if ((this.flags & FILTER_ROUTING) != 0) {
this.filterRouting = (FilterRoutingInfo) DataSerializer.readObject(in);
}
if ((this.flags & VERSION_TAG) != 0) {
boolean persistentTag = (this.flags & PERSISTENT_TAG) != 0;
this.versionTag = VersionTag.create(persistentTag, in);
}
if (isUsedFakeEventId()) {
this.eventID = new EventID();
InternalDataSerializer.invokeFromData(this.eventID, in);
} else {
this.eventID = new EventID(baseEventID, idx);
}
if ((this.flags & HAS_TAILKEY) != 0) {
this.tailKey = DataSerializer.readLong(in);
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder(50);
sb.append("(").append(getKey()).append(",").append(this.value).append(",")
.append(getOldValue());
if (this.bucketId > 0) {
sb.append(", b").append(this.bucketId);
}
if (versionTag != null) {
sb.append(versionTag);
// sb.append(",v").append(versionTag.getEntryVersion()).append(",rv"+versionTag.getRegionVersion());
}
if (filterRouting != null) {
sb.append(", ").append(filterRouting);
}
sb.append(")");
return sb.toString();
}
void setSender(InternalDistributedMember sender) {
if (this.versionTag != null) {
this.versionTag.replaceNullIDs(sender);
}
}
/**
* Used to serialize this instances data to <code>out</code>. If changes are made to this method
* make sure that it is backwards compatible by creating toDataPreXX methods. Also make sure
* that the callers to this method are backwards compatible by creating toDataPreXX methods for
* them even if they are not changed. <br>
* Callers for this method are: <br>
* {@link PutAllMessage#toData(DataOutput)} <br>
* {@link PutAllPRMessage#toData(DataOutput)} <br>
* {@link RemotePutAllMessage#toData(DataOutput)} <br>
*/
public void toData(final DataOutput out) throws IOException {
Object key = this.key;
final Object v = this.value;
DataSerializer.writeObject(key, out);
if (v instanceof byte[] || v == null) {
out.writeByte(0);
DataSerializer.writeByteArray((byte[]) v, out);
} else if (v instanceof CachedDeserializable) {
CachedDeserializable cd = (CachedDeserializable) v;
out.writeByte(IS_CACHED_DESER);
DataSerializer.writeByteArray(cd.getSerializedValue(), out);
} else {
out.writeByte(IS_CACHED_DESER);
DataSerializer.writeObjectAsByteArray(v, out);
}
out.writeByte(this.op.ordinal);
byte bits = this.flags;
if (this.filterRouting != null)
bits |= FILTER_ROUTING;
if (this.versionTag != null) {
bits |= VERSION_TAG;
if (this.versionTag instanceof DiskVersionTag) {
bits |= PERSISTENT_TAG;
}
}
// TODO: Yogesh, this should be conditional,
// make sure that we sent it on wire only
// when parallel wan is enabled
bits |= HAS_TAILKEY;
out.writeByte(bits);
if (this.filterRouting != null) {
DataSerializer.writeObject(this.filterRouting, out);
}
if (this.versionTag != null) {
InternalDataSerializer.invokeToData(this.versionTag, out);
}
if (isUsedFakeEventId()) {
// fake event id should be serialized
InternalDataSerializer.invokeToData(this.eventID, out);
}
// TODO: Yogesh, this should be conditional,
// make sure that we sent it on wire only
// when parallel wan is enabled
DataSerializer.writeLong(this.tailKey, out);
}
/**
* Returns the key
*/
public Object getKey() {
return this.key;
}
/**
* Returns the value
*/
public Object getValue(InternalCache cache) {
Object result = this.value;
if (result instanceof FutureCachedDeserializable) {
FutureCachedDeserializable future = (FutureCachedDeserializable) result;
result = future.create(cache);
this.value = result;
}
return result;
}
/**
* Returns the old value
*/
public Object getOldValue() {
return this.oldValue;
}
public Long getTailKey() {
return this.tailKey;
}
public void setTailKey(Long key) {
this.tailKey = key;
}
/**
* Returns the operation
*/
public Operation getOp() {
return this.op;
}
public EventID getEventID() {
return this.eventID;
}
/**
* change event id for the entry
*
* @param eventId new event id
*/
public void setEventId(EventID eventId) {
this.eventID = eventId;
}
/**
* change bucket id for the entry
*
* @param bucketId new bucket id
*/
public void setBucketId(Integer bucketId) {
this.bucketId = bucketId;
}
/**
* get bucket id for the entry
*
* @return bucket id
*/
public Integer getBucketId() {
return this.bucketId;
}
/**
* change event id into fake event id The algorithm is to change the threadid into
* bucketid*MAX_THREAD_PER_CLIENT+oldthreadid. So from the log, we can derive the original
* thread id.
*
* @return wether current event id is fake or not new bucket id
*/
public boolean setFakeEventID() {
if (bucketId.intValue() < 0) {
return false;
}
if (!isUsedFakeEventId()) {
// assign a fake big thread id. bucket id starts from 0. In order to distinguish
// with other read thread, let bucket id starts from 1 in fake thread id
long threadId = ThreadIdentifier.createFakeThreadIDForBulkOp(bucketId.intValue(),
eventID.getThreadID());
this.eventID = new EventID(eventID.getMembershipID(), threadId, eventID.getSequenceID());
this.setUsedFakeEventId(true);
}
return true;
}
public boolean isUsedFakeEventId() {
return (flags & USED_FAKE_EVENT_ID) != 0;
}
public void setUsedFakeEventId(boolean usedFakeEventId) {
if (usedFakeEventId) {
flags |= USED_FAKE_EVENT_ID;
} else {
flags &= ~(USED_FAKE_EVENT_ID);
}
}
public boolean isNotifyOnly() {
return (flags & NOTIFY_ONLY) != 0;
}
public void setNotifyOnly(boolean notifyOnly) {
if (notifyOnly) {
flags |= NOTIFY_ONLY;
} else {
flags &= ~(NOTIFY_ONLY);
}
}
boolean isPossibleDuplicate() {
return (this.flags & POSDUP) != 0;
}
public void setPossibleDuplicate(boolean possibleDuplicate) {
if (possibleDuplicate) {
flags |= POSDUP;
} else {
flags &= ~(POSDUP);
}
}
public boolean isInhibitDistribution() {
return this.inhibitDistribution;
}
public void setInhibitDistribution(boolean inhibitDistribution) {
this.inhibitDistribution = inhibitDistribution;
}
public boolean isCallbacksInvoked() {
return this.callbacksInvoked;
}
public void setCallbacksInvoked(boolean callbacksInvoked) {
this.callbacksInvoked = callbacksInvoked;
}
}
public static class EntryVersionsList extends ArrayList<VersionTag>
implements DataSerializableFixedID, Externalizable {
public EntryVersionsList() {
// Do nothing
}
public EntryVersionsList(int size) {
super(size);
}
public static EntryVersionsList create(DataInput in)
throws IOException, ClassNotFoundException {
EntryVersionsList newList = new EntryVersionsList();
InternalDataSerializer.invokeFromData(newList, in);
return newList;
}
private boolean extractVersion(PutAllEntryData entry) {
VersionTag versionTag = entry.versionTag;
// version tag can be null if only keys are sent in InitialImage.
if (versionTag != null) {
add(versionTag);
// Add entry without version tag in entries array.
entry.versionTag = null;
return true;
}
return false;
}
private VersionTag<VersionSource> getVersionTag(int index) {
VersionTag tag = null;
if (this.size() > 0) {
tag = get(index);
}
return tag;
}
/**
* replace null membership IDs in version tags with the given member ID. VersionTags received
* from a server may have null IDs because they were operations performed by that server. We
* transmit them as nulls to cut costs, but have to do the swap on the receiving end (in the
* client)
*
*/
public void replaceNullIDs(DistributedMember sender) {
for (VersionTag versionTag : this) {
if (versionTag != null) {
versionTag.replaceNullIDs((InternalDistributedMember) sender);
}
}
}
@Override
public int getDSFID() {
return DataSerializableFixedID.PUTALL_VERSIONS_LIST;
}
static final byte FLAG_NULL_TAG = 0;
static final byte FLAG_FULL_TAG = 1;
static final byte FLAG_TAG_WITH_NEW_ID = 2;
static final byte FLAG_TAG_WITH_NUMBER_ID = 3;
@Override
public void toData(DataOutput out) throws IOException {
int flags = 0;
boolean hasTags = false;
if (this.size() > 0) {
flags |= 0x04;
hasTags = true;
for (VersionTag tag : this) {
if (tag != null) {
if (tag instanceof DiskVersionTag) {
flags |= 0x20;
}
break;
}
}
}
if (logger.isTraceEnabled(LogMarker.INITIAL_IMAGE_VERSIONED_VERBOSE)) {
logger.trace(LogMarker.INITIAL_IMAGE_VERSIONED_VERBOSE, "serializing {} with flags 0x{}",
this, Integer.toHexString(flags));
}
out.writeByte(flags);
if (hasTags) {
InternalDataSerializer.writeUnsignedVL(this.size(), out);
Object2IntOpenHashMap ids = new Object2IntOpenHashMap(this.size());
int idCount = 0;
for (VersionTag tag : this) {
if (tag == null) {
out.writeByte(FLAG_NULL_TAG);
} else {
VersionSource id = tag.getMemberID();
if (id == null) {
out.writeByte(FLAG_FULL_TAG);
InternalDataSerializer.invokeToData(tag, out);
} else {
int idNumber = ids.getInt(id);
if (idNumber == 0) {
out.writeByte(FLAG_TAG_WITH_NEW_ID);
idNumber = ++idCount;
ids.put(id, idNumber);
InternalDataSerializer.invokeToData(tag, out);
} else {
out.writeByte(FLAG_TAG_WITH_NUMBER_ID);
tag.toData(out, false);
tag.setMemberID(id);
InternalDataSerializer.writeUnsignedVL(idNumber - 1, out);
}
}
}
}
}
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
int flags = in.readByte();
boolean hasTags = (flags & 0x04) == 0x04;
boolean persistent = (flags & 0x20) == 0x20;
if (logger.isTraceEnabled(LogMarker.INITIAL_IMAGE_VERSIONED_VERBOSE)) {
logger.trace(LogMarker.INITIAL_IMAGE_VERSIONED_VERBOSE,
"deserializing a InitialImageVersionedObjectList with flags 0x{}",
Integer.toHexString(flags));
}
if (hasTags) {
int size = (int) InternalDataSerializer.readUnsignedVL(in);
if (logger.isTraceEnabled(LogMarker.INITIAL_IMAGE_VERSIONED_VERBOSE)) {
logger.trace(LogMarker.INITIAL_IMAGE_VERSIONED_VERBOSE, "reading {} version tags", size);
}
List<VersionSource> ids = new ArrayList<VersionSource>(size);
for (int i = 0; i < size; i++) {
byte entryType = in.readByte();
switch (entryType) {
case FLAG_NULL_TAG:
add(null);
break;
case FLAG_FULL_TAG:
add(VersionTag.create(persistent, in));
break;
case FLAG_TAG_WITH_NEW_ID:
VersionTag tag = VersionTag.create(persistent, in);
ids.add(tag.getMemberID());
add(tag);
break;
case FLAG_TAG_WITH_NUMBER_ID:
tag = VersionTag.create(persistent, in);
int idNumber = (int) InternalDataSerializer.readUnsignedVL(in);
tag.setMemberID(ids.get(idNumber));
add(tag);
break;
}
}
}
}
@Override
public void writeExternal(ObjectOutput out) throws IOException {
toData(out);
}
@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
fromData(in);
}
@Override
public Version[] getSerializationVersions() {
return null;
}
}
@Override
protected FilterRoutingInfo getRecipientFilterRouting(Set cacheOpRecipients) {
// for putAll, we need to determine the routing information for each event and
// create a consolidated routing object representing all events that can be
// used for distribution
CacheDistributionAdvisor advisor;
LocalRegion region = (LocalRegion) this.event.getRegion();
if (region instanceof PartitionedRegion) {
advisor = ((PartitionedRegion) region).getCacheDistributionAdvisor();
} else if (region.isUsedForPartitionedRegionBucket()) {
advisor = ((BucketRegion) region).getPartitionedRegion().getCacheDistributionAdvisor();
} else {
advisor = ((DistributedRegion) region).getCacheDistributionAdvisor();
}
FilterRoutingInfo consolidated = new FilterRoutingInfo();
for (int i = 0; i < this.putAllData.length; i++) {
@Unretained
EntryEventImpl ev = getEventForPosition(i);
if (ev != null) {
FilterRoutingInfo eventRouting = advisor.adviseFilterRouting(ev, cacheOpRecipients);
if (eventRouting != null) {
consolidated.addFilterInfo(eventRouting);
}
putAllData[i].filterRouting = eventRouting;
}
}
// we need to create routing information for each PUT event
return consolidated;
}
@Override
protected FilterInfo getLocalFilterRouting(FilterRoutingInfo frInfo) {
// long start = NanoTimer.getTime();
FilterProfile fp = getRegion().getFilterProfile();
if (fp == null) {
return null;
}
// this will set the local FilterInfo in the events
if (this.putAllData != null && this.putAllData.length > 0) {
fp.getLocalFilterRoutingForPutAllOp(this, this.putAllData);
}
// long finish = NanoTimer.getTime();
// InternalDistributedSystem.getDMStats().incjChannelUpTime(finish-start);
return null;
}
@Override
protected CacheOperationMessage createMessage() {
EntryEventImpl event = getBaseEvent();
PutAllMessage msg = new PutAllMessage();
msg.eventId = event.getEventId();
msg.context = event.getContext();
return msg;
}
/**
* Create PutAllPRMessage for notify only (to adjunct nodes)
*
* @param bucketId create message to send to this bucket
*/
public PutAllPRMessage createPRMessagesNotifyOnly(int bucketId) {
final EntryEventImpl event = getBaseEvent();
PutAllPRMessage prMsg = new PutAllPRMessage(bucketId, putAllDataSize, true,
event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument());
if (event.getContext() != null) {
prMsg.setBridgeContext(event.getContext());
}
// will not recover event id here
for (int i = 0; i < putAllDataSize; i++) {
prMsg.addEntry(putAllData[i]);
}
return prMsg;
}
/**
* Create PutAllPRMessages for primary buckets out of dpao
*
* @return a HashMap contain PutAllPRMessages, key is bucket id
*/
public HashMap createPRMessages() {
// getFilterRecipients(Collections.EMPTY_SET); // establish filter recipient routing information
HashMap prMsgMap = new HashMap();
final EntryEventImpl event = getBaseEvent();
for (int i = 0; i < putAllDataSize; i++) {
Integer bucketId = putAllData[i].bucketId;
PutAllPRMessage prMsg = (PutAllPRMessage) prMsgMap.get(bucketId);
if (prMsg == null) {
prMsg = new PutAllPRMessage(bucketId.intValue(), putAllDataSize, false,
event.isPossibleDuplicate(), !event.isGenerateCallbacks(), event.getCallbackArgument());
prMsg
.setTransactionDistributed(event.getRegion().getCache().getTxManager().isDistributed());
// set dpao's context(original sender) into each PutAllMsg
// dpao's event's context could be null if it's P2P putAll in PR
if (event.getContext() != null) {
prMsg.setBridgeContext(event.getContext());
}
}
// Modify the event id, assign new thread id and new sequence id
// We have to set fake event id here, because we cannot derive old event id from baseId+idx as
// we
// did in DR's PutAllMessage.
putAllData[i].setFakeEventID();
// we only save the reference in prMsg. No duplicate copy
prMsg.addEntry(putAllData[i]);
prMsgMap.put(bucketId, prMsg);
}
return prMsgMap;
}
@Override
protected void initMessage(CacheOperationMessage msg, DirectReplyProcessor proc) {
super.initMessage(msg, proc);
PutAllMessage m = (PutAllMessage) msg;
// if concurrency checks are enabled and this is not a replicated
// region we need to see if any of the entries have no versions and,
// if so, cull them out and send a 1-hop message to a replicate that
// can generate a version for the operation
RegionAttributes attr = this.event.getRegion().getAttributes();
if (attr.getConcurrencyChecksEnabled() && !attr.getDataPolicy().withReplication()
&& attr.getScope() != Scope.GLOBAL) {
if (attr.getDataPolicy() == DataPolicy.EMPTY) {
// all entries are without version tags
boolean success = RemotePutAllMessage.distribute((EntryEventImpl) this.event,
this.putAllData, this.putAllDataSize);
if (success) {
m.callbackArg = this.event.getCallbackArgument();
m.putAllData = new PutAllEntryData[0];
m.putAllDataSize = 0;
m.skipCallbacks = !event.isGenerateCallbacks();
return;
} else if (!getRegion().getGenerateVersionTag()) {
// Fix for #45934. We can't continue if we need versions and we failed
// to distribute versionless entries.
throw new PersistentReplicatesOfflineException();
}
} else {
// some entries may have Create ops - these will not have version tags
PutAllEntryData[] versionless = selectVersionlessEntries();
if (logger.isTraceEnabled()) {
logger.trace("Found these versionless entries: {}", Arrays.toString(versionless));
}
if (versionless.length > 0) {
boolean success = RemotePutAllMessage.distribute((EntryEventImpl) this.event, versionless,
versionless.length);
if (success) {
versionless = null;
PutAllEntryData[] versioned = selectVersionedEntries();
if (logger.isTraceEnabled()) {
logger.trace("Found these remaining versioned entries: {}",
Arrays.toString(versioned));
}
m.callbackArg = this.event.getCallbackArgument();
m.putAllData = versioned;
m.putAllDataSize = versioned.length;
m.skipCallbacks = !event.isGenerateCallbacks();
return;
} else if (!getRegion().getGenerateVersionTag()) {
// Fix for #45934. We can't continue if we need versions and we failed
// to distribute versionless entries.
throw new PersistentReplicatesOfflineException();
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("All entries have versions, so using normal DPAO message");
}
}
}
}
m.callbackArg = this.event.getCallbackArgument();
m.putAllData = this.putAllData;
m.putAllDataSize = this.putAllDataSize;
m.skipCallbacks = !event.isGenerateCallbacks();
}
@Override
protected boolean shouldAck() {
// bug #45704 - RemotePutAllOp's DPAO in another server conflicts with lingering DPAO from same
// thread, so
// we require an ACK if concurrency checks are enabled to make sure that the previous op has
// finished first.
return super.shouldAck() || getRegion().getConcurrencyChecksEnabled();
}
private PutAllEntryData[] selectVersionlessEntries() {
int resultSize = this.putAllData.length;
for (int i = 0; i < this.putAllData.length; i++) {
PutAllEntryData p = this.putAllData[i];
if (p == null || p.isInhibitDistribution()) {
resultSize--;
} else if (p.versionTag != null && p.versionTag.hasValidVersion()) {
resultSize--;
}
}
PutAllEntryData[] result = new PutAllEntryData[resultSize];
int ri = 0;
for (int i = 0; i < this.putAllData.length; i++) {
PutAllEntryData p = this.putAllData[i];
if (p == null || p.isInhibitDistribution()) {
continue; // skip blanks
} else if (p.versionTag != null && p.versionTag.hasValidVersion()) {
continue; // skip versioned
}
// what remains is versionless
result[ri++] = p;
}
return result;
}
private PutAllEntryData[] selectVersionedEntries() {
int resultSize = 0;
for (int i = 0; i < this.putAllData.length; i++) {
PutAllEntryData p = this.putAllData[i];
if (p == null || p.isInhibitDistribution()) {
continue; // skip blanks
} else if (p.versionTag != null && p.versionTag.hasValidVersion()) {
resultSize++;
}
}
PutAllEntryData[] result = new PutAllEntryData[resultSize];
int ri = 0;
for (int i = 0; i < this.putAllData.length; i++) {
PutAllEntryData p = this.putAllData[i];
if (p == null || p.isInhibitDistribution()) {
continue; // skip blanks
} else if (p.versionTag != null && p.versionTag.hasValidVersion()) {
result[ri++] = p;
}
}
return result;
}
/**
* version tags are gathered from local operations and remote operation responses. This method
* gathers all of them and stores them in the given list.
*
*/
protected void fillVersionedObjectList(VersionedObjectList list) {
for (PutAllEntryData entry : this.putAllData) {
if (entry.versionTag != null) {
list.addKeyAndVersion(entry.key, entry.versionTag);
}
}
}
public static class PutAllMessage extends AbstractUpdateMessage {
protected PutAllEntryData[] putAllData;
protected int putAllDataSize;
protected transient ClientProxyMembershipID context;
protected boolean skipCallbacks;
protected EventID eventId = null;
protected static final short HAS_BRIDGE_CONTEXT = UNRESERVED_FLAGS_START;
protected static final short SKIP_CALLBACKS = (short) (HAS_BRIDGE_CONTEXT << 1);
/** test to see if this message holds any data */
public boolean isEmpty() {
return this.putAllData.length == 0;
}
/**
* Note this this is a "dummy" event since this message contains a list of entries each one of
* which has its own event. The key thing needed in this event is the region. This is the event
* that gets passed to basicOperateOnRegion
*/
@Override
@Retained
protected InternalCacheEvent createEvent(DistributedRegion rgn) throws EntryNotFoundException {
// Gester: We have to specify eventId for the message of MAP
@Retained
EntryEventImpl event = EntryEventImpl.create(rgn, Operation.PUTALL_UPDATE /* op */,
null /* key */, null/* value */, this.callbackArg, true /* originRemote */, getSender());
if (this.context != null) {
event.context = this.context;
}
event.setPossibleDuplicate(this.possibleDuplicate);
event.setEventId(this.eventId);
return event;
}
@Override
public void appendFields(StringBuilder sb) {
super.appendFields(sb);
if (eventId != null) {
sb.append("; eventId=").append(this.eventId);
}
sb.append("; entries=").append(this.putAllDataSize);
if (putAllDataSize <= 20) {
// 20 is a size for test
sb.append("; entry values=").append(Arrays.toString(this.putAllData));
}
}
/**
* Does the "put" of one entry for a "putall" operation. Note it calls back to
* AbstractUpdateOperation.UpdateMessage#basicOperationOnRegion
*
* @param entry the entry being put
* @param rgn the region the entry is put in
*/
public void doEntryPut(PutAllEntryData entry, DistributedRegion rgn) {
@Released
EntryEventImpl ev = PutAllMessage.createEntryEvent(entry, getSender(), this.context, rgn,
this.possibleDuplicate, this.needsRouting, this.callbackArg, true, skipCallbacks);
// we don't need to set old value here, because the msg is from remote. local old value will
// get from next step
try {
super.basicOperateOnRegion(ev, rgn);
} finally {
if (ev.hasValidVersionTag() && !ev.getVersionTag().isRecorded()) {
if (rgn.getVersionVector() != null) {
rgn.getVersionVector().recordVersion(getSender(), ev.getVersionTag());
}
}
ev.release();
}
}
/**
* create an event for a PutAllEntryData element
*
* @return the event to be used in applying the element
*/
@Retained
public static EntryEventImpl createEntryEvent(PutAllEntryData entry,
InternalDistributedMember sender, ClientProxyMembershipID context, DistributedRegion rgn,
boolean possibleDuplicate, boolean needsRouting, Object callbackArg, boolean originRemote,
boolean skipCallbacks) {
final Object key = entry.getKey();
EventID evId = entry.getEventID();
@Retained
EntryEventImpl ev = EntryEventImpl.create(rgn, entry.getOp(), key, null/* value */,
callbackArg, originRemote, sender, !skipCallbacks, evId);
boolean returnedEv = false;
try {
if (context != null) {
ev.context = context;
}
Object entryValue = entry.getValue(rgn.getCache());
if (entryValue == null && rgn.getDataPolicy() == DataPolicy.NORMAL) {
ev.setLocalInvalid(true);
}
ev.setNewValue(entryValue);
ev.setPossibleDuplicate(possibleDuplicate);
ev.setVersionTag(entry.versionTag);
// if (needsRouting) {
// FilterProfile fp = rgn.getFilterProfile();
// if (fp != null) {
// FilterInfo fi = fp.getLocalFilterRouting(ev);
// ev.setLocalFilterInfo(fi);
// }
// }
if (entry.filterRouting != null) {
InternalDistributedMember id = rgn.getMyId();
ev.setLocalFilterInfo(entry.filterRouting.getFilterInfo(id));
}
/**
* Setting tailKey for the secondary bucket here. Tail key was update by the primary.
*/
ev.setTailKey(entry.getTailKey());
returnedEv = true;
return ev;
} finally {
if (!returnedEv) {
ev.release();
}
}
}
@Override
protected void basicOperateOnRegion(EntryEventImpl ev, final DistributedRegion rgn) {
for (int i = 0; i < putAllDataSize; ++i) {
if (putAllData[i].versionTag != null) {
checkVersionTag(rgn, putAllData[i].versionTag);
}
}
rgn.syncBulkOp(new Runnable() {
@Override
public void run() {
final boolean isDebugEnabled = logger.isDebugEnabled();
for (int i = 0; i < putAllDataSize; ++i) {
if (isDebugEnabled) {
logger.debug("putAll processing {} with {} sender={}", putAllData[i],
putAllData[i].versionTag, sender);
}
putAllData[i].setSender(sender);
doEntryPut(putAllData[i], rgn);
}
}
}, ev.getEventId());
}
@Override
public int getDSFID() {
return PUT_ALL_MESSAGE;
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
super.fromData(in);
this.eventId = (EventID) DataSerializer.readObject(in);
this.putAllDataSize = (int) InternalDataSerializer.readUnsignedVL(in);
this.putAllData = new PutAllEntryData[this.putAllDataSize];
if (this.putAllDataSize > 0) {
final Version version = InternalDataSerializer.getVersionForDataStreamOrNull(in);
final ByteArrayDataInput bytesIn = new ByteArrayDataInput();
for (int i = 0; i < this.putAllDataSize; i++) {
this.putAllData[i] = new PutAllEntryData(in, eventId, i, version, bytesIn);
}
boolean hasTags = in.readBoolean();
if (hasTags) {
EntryVersionsList versionTags = EntryVersionsList.create(in);
for (int i = 0; i < this.putAllDataSize; i++) {
this.putAllData[i].versionTag = versionTags.get(i);
}
}
}
if ((flags & HAS_BRIDGE_CONTEXT) != 0) {
this.context = DataSerializer.readObject(in);
}
this.skipCallbacks = (flags & SKIP_CALLBACKS) != 0;
}
@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
DataSerializer.writeObject(this.eventId, out);
InternalDataSerializer.writeUnsignedVL(this.putAllDataSize, out);
if (this.putAllDataSize > 0) {
EntryVersionsList versionTags = new EntryVersionsList(putAllDataSize);
boolean hasTags = false;
for (int i = 0; i < this.putAllDataSize; i++) {
if (!hasTags && putAllData[i].versionTag != null) {
hasTags = true;
}
VersionTag<?> tag = putAllData[i].versionTag;
versionTags.add(tag);
putAllData[i].versionTag = null;
this.putAllData[i].toData(out);
this.putAllData[i].versionTag = tag;
}
out.writeBoolean(hasTags);
if (hasTags) {
InternalDataSerializer.invokeToData(versionTags, out);
}
}
if (this.context != null) {
DataSerializer.writeObject(this.context, out);
}
}
@Override
protected short computeCompressedShort(short s) {
s = super.computeCompressedShort(s);
if (this.context != null)
s |= HAS_BRIDGE_CONTEXT;
if (this.skipCallbacks)
s |= SKIP_CALLBACKS;
return s;
}
public ClientProxyMembershipID getContext() {
return this.context;
}
public PutAllEntryData[] getPutAllEntryData() {
return this.putAllData;
}
}
}