blob: 2e295820197522d8d569dc543f5bb0b362b0e9ea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.partitioned;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.CacheException;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.DistributionStats;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.HeapDataOutputStream;
import org.apache.geode.internal.cache.BucketDump;
import org.apache.geode.internal.cache.BucketRegion;
import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.InitialImageOperation;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionDataStore;
import org.apache.geode.internal.cache.VersionTagHolder;
import org.apache.geode.internal.cache.tier.InterestType;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.cache.versions.VersionTag;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
*
* @since GemFire 8.0
*/
public class FetchBulkEntriesMessage extends PartitionMessage {
private static final Logger logger = LogService.getLogger();
private HashSet<Integer> bucketIds;
private String regex;
/**
* Map of bucket-id as key and set of keys as value.
*/
private HashMap<Integer, HashSet> bucketKeys;
private static final byte ALL_KEYS = (byte) 0;
private static final byte KEY_LIST = (byte) 1;
private static final byte REGEX = (byte) 2;
private byte keys;
private boolean allowTombstones;
public FetchBulkEntriesMessage() {}
private FetchBulkEntriesMessage(InternalDistributedMember recipient, int regionId,
ReplyProcessor21 processor, HashMap<Integer, HashSet> bucketKeys, HashSet<Integer> bucketIds,
String regex, boolean allowTombstones) {
super(recipient, regionId, processor);
this.bucketKeys = bucketKeys;
this.bucketIds = bucketIds;
this.regex = regex;
this.keys = bucketKeys != null ? KEY_LIST : ALL_KEYS;
this.allowTombstones = allowTombstones;
}
/**
* Sends a PartitionedRegion message to fetch all the entries for a bucketId
*
* @param recipient the member that the fetch keys message is sent to
* @param r the PartitionedRegion that contains the bucket
* @param bucketIds the identity of the buckets that contain the entries to be returned
* @param regex the regular expression to be evaluated for selecting keys
* @return the processor used to read the returned entries
* @throws ForceReattemptException if the peer is no longer available
*/
public static FetchBulkEntriesResponse send(InternalDistributedMember recipient,
PartitionedRegion r, HashMap<Integer, HashSet> bucketKeys, HashSet<Integer> bucketIds,
String regex, boolean allowTombstones) throws ForceReattemptException {
Assert.assertTrue(recipient != null, "FetchBulkEntriesMessage NULL reply message");
FetchBulkEntriesResponse p = new FetchBulkEntriesResponse(r.getSystem(), r, recipient);
FetchBulkEntriesMessage m = new FetchBulkEntriesMessage(recipient, r.getPRId(), p, bucketKeys,
bucketIds, regex, allowTombstones);
m.setTransactionDistributed(r.getCache().getTxManager().isDistributed());
Set failures = r.getDistributionManager().putOutgoing(m);
if (failures != null && failures.size() > 0) {
throw new ForceReattemptException(
String.format("Failed sending < %s >", m));
}
return p;
}
@Override
protected boolean operateOnPartitionedRegion(ClusterDistributionManager dm, PartitionedRegion pr,
long startTime) throws CacheException, ForceReattemptException {
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE, "FetchBulkEntriesMessage operateOnRegion: {}",
pr.getFullPath());
}
FetchBulkEntriesReplyMessage.sendReply(pr, getSender(), getProcessorId(), dm, this.bucketKeys,
this.bucketIds, regex, this.allowTombstones, startTime);
return false;
}
@Override
protected void appendFields(StringBuilder buff) {
super.appendFields(buff);
buff.append("; bucketId=").append(this.bucketIds);
buff.append("; recipient=").append(this.getRecipient());
}
@Override
public int getDSFID() {
return PR_FETCH_BULK_ENTRIES_MESSAGE;
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.keys = DataSerializer.readByte(in);
if (this.keys == KEY_LIST) {
this.bucketKeys = DataSerializer.readHashMap(in);
} else if (this.keys == ALL_KEYS) {
this.bucketIds = DataSerializer.readHashSet(in);
}
this.regex = DataSerializer.readString(in);
this.allowTombstones = DataSerializer.readPrimitiveBoolean(in);
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
DataSerializer.writeByte(this.keys, out);
if (this.keys == KEY_LIST) {
DataSerializer.writeHashMap(this.bucketKeys, out);
} else if (this.keys == ALL_KEYS) {
DataSerializer.writeHashSet(this.bucketIds, out);
}
DataSerializer.writeString(this.regex, out);
DataSerializer.writePrimitiveBoolean(this.allowTombstones, out);
}
public static class FetchBulkEntriesReplyMessage extends ReplyMessage {
/** Whether this message is the last of a series of chunk responses */
boolean lastInSeries;
/** Array holding chunk of entries received */
transient byte[] chunk;
/** Stream holding chunk of entries to send */
transient HeapDataOutputStream chunkStream;
/** Sequence number of this chunk message */
private int msgNum;
private HashSet<Integer> failedBucketIds;
@Override
public Version[] getSerializationVersions() {
return null;
}
/**
* Empty constructor to conform to DataSerializable interface
*/
public FetchBulkEntriesReplyMessage() {}
private FetchBulkEntriesReplyMessage(InternalDistributedMember dest, int processorId,
HeapDataOutputStream chunk, int msgNum, boolean lastInSeries) {
setRecipient(dest);
setProcessorId(processorId);
this.lastInSeries = lastInSeries;
this.chunkStream = chunk;
this.msgNum = msgNum;
}
public static void sendReply(PartitionedRegion pr, final InternalDistributedMember recipient,
final int processorId, final DistributionManager dm,
final HashMap<Integer, HashSet> bucketKeys, final HashSet<Integer> bucketIds, String regex,
boolean allowTombstones, long startTime) throws ForceReattemptException {
PartitionedRegionDataStore ds = pr.getDataStore();
if (ds == null) {
return;
}
ArrayList<BucketRegion> maps = new ArrayList<BucketRegion>();
HashSet<Integer> failedBuckets = new HashSet<Integer>();
Set<Integer> bucketIdSet = null;
if (bucketKeys != null) {
bucketIdSet = bucketKeys.keySet();
} else { // bucketIds != null
bucketIdSet = bucketIds;
}
for (int id : bucketIdSet) {
try {
maps.add(ds.handleRemoteGetEntries(id));
} catch (ForceReattemptException fre) {
failedBuckets.add(id);
}
}
HeapDataOutputStream mos = new HeapDataOutputStream(
InitialImageOperation.CHUNK_SIZE_IN_BYTES + 2048, recipient.getVersionObject());
Iterator<BucketRegion> mapsIterator = maps.iterator();
BucketRegion map = null;
Iterator it = null;
boolean keepGoing = true;
boolean lockAcquired = false;
boolean writeFooter = false;
boolean lastMsgSent = false;
boolean needToWriteBucketInfo = true;
int msgNum = 0;
while (mapsIterator.hasNext()) {
if (map != null && lockAcquired) {
try {
map.releaseDestroyLock();
// instead take a bucketCreationLock.getWriteLock() or pr.BucketLock?
} catch (CancelException e) {
} finally {
lockAcquired = false;
}
}
map = mapsIterator.next();
if (map.isBucketDestroyed()) {
failedBuckets.add(map.getId());
continue;
}
try {
map.acquireDestroyLock();
lockAcquired = true;
} catch (CancelException e) {
if (logger.isDebugEnabled()) {
logger.debug("sendReply: acquireDestroyLock failed due to cache closure, region = {}",
map.getFullPath());
}
}
try {
if (bucketKeys != null) {
it = bucketKeys.get(map.getId()).iterator();
} else { // bucketIds != null
if (regex == null) {
it = new HashSet(map.keySet(allowTombstones)).iterator();
} else {
it = map.getKeysWithInterest(InterestType.REGULAR_EXPRESSION, regex, allowTombstones)
.iterator();
}
}
while (it.hasNext()) {
Object key = it.next();
VersionTagHolder clientEvent = new VersionTagHolder();
Object value = map.get(key, null, true, true, true, null, clientEvent, allowTombstones);
if (needToWriteBucketInfo) {
DataSerializer.writePrimitiveInt(map.getId(), mos);
needToWriteBucketInfo = false;
writeFooter = true;
}
int entrySize = mos.size();
DataSerializer.writeObject(key, mos);
VersionTag versionTag = clientEvent.getVersionTag();
if (versionTag != null) {
versionTag.replaceNullIDs(map.getVersionMember());
}
DataSerializer.writeObject(value, mos);
DataSerializer.writeObject(versionTag, mos);
entrySize = mos.size() - entrySize;
// If no more space OR no more entries in bucket, write end-of-bucket marker.
if ((mos.size() + entrySize) >= InitialImageOperation.CHUNK_SIZE_IN_BYTES
|| !it.hasNext()) {
DataSerializer.writeObject(null, mos);
DataSerializer.writePrimitiveBoolean(it.hasNext(), mos);
needToWriteBucketInfo = true;
writeFooter = false; // Be safe
}
// If no more space in chunk, send it.
if ((mos.size() + entrySize) >= InitialImageOperation.CHUNK_SIZE_IN_BYTES) {
// Send as last message if no more data
boolean lastMsg = !(it.hasNext() || mapsIterator.hasNext());
++msgNum;
FetchBulkEntriesReplyMessage reply =
new FetchBulkEntriesReplyMessage(recipient, processorId, mos, msgNum, lastMsg);
if (lastMsg) {
reply.failedBucketIds = failedBuckets;
}
Set failures = dm.putOutgoing(reply);
keepGoing = (failures == null) || (failures.size() == 0);
if (lastMsg && keepGoing) {
lastMsgSent = true;
}
mos.reset();
} // else still enough space
} // while (for each key)
if (!keepGoing) {
throw new ForceReattemptException("Failed to send response");
}
} catch (IOException ioe) {
throw new ForceReattemptException(
"Unable to send response to fetch-entries request",
ioe);
} finally {
if (lockAcquired) {
try {
map.releaseDestroyLock();
} catch (CancelException e) {
} finally {
lockAcquired = false;
}
}
}
} // while (for each map)
if (!lastMsgSent) {
if (mos.size() == 0) {
try {
DataSerializer.writePrimitiveInt(-1, mos);
} catch (IOException ioe) {
throw new ForceReattemptException(
"Unable to send response to fetch-entries request",
ioe);
}
} else if (writeFooter) {
try {
DataSerializer.writeObject(null, mos); // end of entries of current bucket in current
// response
DataSerializer.writePrimitiveBoolean(false, mos); // no more entries of current bucket
} catch (IOException ioe) {
throw new ForceReattemptException(
"Unable to send response to fetch-entries request",
ioe);
}
}
++msgNum;
FetchBulkEntriesReplyMessage reply =
new FetchBulkEntriesReplyMessage(recipient, processorId, mos, msgNum, true);
reply.failedBucketIds = failedBuckets;
Set failures = dm.putOutgoing(reply);
if (failures != null && failures.size() > 0) {
throw new ForceReattemptException("Failed to send response");
}
}
if (lockAcquired) {
try {
map.releaseDestroyLock();
} catch (CancelException e) {
// ignore
} finally {
lockAcquired = false;
}
}
}
/**
* Processes this message. This method is invoked by the receiver of the message.
*
* @param dm the distribution manager that is processing the message.
*/
@Override
public void process(final DistributionManager dm, final ReplyProcessor21 p) {
final long startTime = getTimestamp();
FetchBulkEntriesResponse processor = (FetchBulkEntriesResponse) p;
if (processor == null) {
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE, "FetchBulkEntriesReplyMessage processor not found");
}
return;
}
processor.processChunkResponse(this);
if (logger.isTraceEnabled(LogMarker.DM_VERBOSE)) {
logger.trace(LogMarker.DM_VERBOSE, "{} processed {}", processor, this);
}
dm.getStats().incReplyMessageTime(DistributionStats.getStatTime() - startTime);
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
out.writeBoolean(this.lastInSeries);
DataSerializer.writePrimitiveInt(this.msgNum, out);
DataSerializer.writeObjectAsByteArray(this.chunkStream, out);
DataSerializer.writeHashSet(this.failedBucketIds, out);
}
@Override
public int getDSFID() {
return PR_FETCH_BULK_ENTRIES_REPLY_MESSAGE;
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.lastInSeries = in.readBoolean();
this.msgNum = DataSerializer.readPrimitiveInt(in);
this.chunk = DataSerializer.readByteArray(in);
this.failedBucketIds = DataSerializer.readHashSet(in);
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("FetchBulkEntriesReplyMessage ").append("processorid=").append(this.processorId);
if (getSender() != null) {
sb.append(",sender=").append(this.getSender());
}
sb.append(",lastInSeries=").append(lastInSeries);
if (chunk != null) {
sb.append(",size=").append(chunk.length);
} else if (chunkStream != null) {
sb.append(",size=").append(chunkStream.size());
}
if (getException() != null) {
sb.append(",exception=").append(getException());
}
return sb.toString();
}
}
/**
* A processor to capture the value returned by
* {@link org.apache.geode.internal.cache.partitioned.FetchBulkEntriesMessage}
*
* @since GemFire 8.0
*/
public static class FetchBulkEntriesResponse extends ReplyProcessor21 {
private final PartitionedRegion pr;
private final HashMap<Integer, HashMap<Object, Object>> returnValue;
private final HashMap<Integer, HashMap<Object, VersionTag>> returnVersions = new HashMap();
private final Map<VersionSource, VersionSource> canonicalMembers =
new ConcurrentHashMap<VersionSource, VersionSource>();
/** lock used to synchronize chunk processing */
private final Object endLock = new Object();
/** number of chunks processed */
private volatile int chunksProcessed;
/** whether the last chunk has been processed */
private volatile boolean lastChunkReceived;
private HashSet<Integer> failedBucketIds;
private ArrayList<Integer> receivedBuckets = new ArrayList<Integer>();
private int expectedChunks;
private InternalDistributedMember recipient;
public FetchBulkEntriesResponse(InternalDistributedSystem ds, final PartitionedRegion pr,
final InternalDistributedMember recipient) {
super(ds, Collections.singleton(recipient));
this.pr = pr;
this.recipient = recipient;
this.returnValue = new HashMap<Integer, HashMap<Object, Object>>();
}
void processChunkResponse(FetchBulkEntriesReplyMessage msg) {
boolean doneProcessing = false;
if (msg.getException() != null) {
process(msg);
} else {
boolean deserializingKey = true;
try {
ByteArrayInputStream byteStream = new ByteArrayInputStream(msg.chunk);
DataInputStream in = new DataInputStream(byteStream);
Object key;
int currentId;
final boolean isDebugEnabled = logger.isTraceEnabled(LogMarker.DM_VERBOSE);
while (in.available() > 0) {
currentId = DataSerializer.readPrimitiveInt(in);
if (currentId == -1) {
break;
}
while (in.available() > 0) {
deserializingKey = true;
key = DataSerializer.readObject(in);
if (key != null) {
deserializingKey = false;
Object value = DataSerializer.readObject(in);
VersionTag versionTag = DataSerializer.readObject(in);
if (versionTag != null) {
// Fix for 47260 - canonicalize the member ids to avoid an OOME
if (canonicalMembers.containsKey(versionTag.getMemberID())) {
versionTag.setMemberID(canonicalMembers.get(versionTag.getMemberID()));
} else {
canonicalMembers.put(versionTag.getMemberID(), versionTag.getMemberID());
}
}
synchronized (returnValue) {
HashMap<Object, Object> valueMap = returnValue.get(currentId);
HashMap<Object, VersionTag> versionMap = returnVersions.get(currentId);
if (valueMap != null) {
valueMap.put(key, value);
} else {
valueMap = new HashMap<Object, Object>();
valueMap.put(key, value);
returnValue.put(currentId, valueMap);
}
if (versionMap != null) {
versionMap.put(key, versionTag);
} else {
versionMap = new HashMap<Object, VersionTag>();
versionMap.put(key, versionTag);
returnVersions.put(currentId, versionMap);
}
}
} else {
// null should signal the end of the set of keys
boolean bucketHasMore = DataSerializer.readPrimitiveBoolean(in);
synchronized (this.returnValue) {
if (!bucketHasMore) {
this.receivedBuckets.add(currentId);
}
}
break;
}
} // inner while
} // outer while
synchronized (this.endLock) {
this.chunksProcessed = this.chunksProcessed + 1;
if (msg.lastInSeries) {
this.expectedChunks = msg.msgNum;
this.failedBucketIds = msg.failedBucketIds;
}
if (this.expectedChunks == this.chunksProcessed) {
doneProcessing = true;
this.lastChunkReceived = true;
}
if (isDebugEnabled) {
logger.trace(LogMarker.DM_VERBOSE,
"{} chunksProcessed={}, lastChunkReceived={},done={}", this, this.chunksProcessed,
this.lastChunkReceived, doneProcessing);
}
}
} catch (Exception e) {
if (deserializingKey) {
processException(new ReplyException(
"Error deserializing keys",
e));
} else {
processException(new ReplyException(
"Error deserializing values",
e)); // for bug 41202
}
checkIfDone(); // fix for hang in 41202
}
// if all chunks have been received, wake up the waiting thread
if (doneProcessing) {
process(msg);
}
} // else msg.getException() == null
}
/**
* @return Array of BucketDumps
* @throws ForceReattemptException if the peer is no longer available
*/
public BucketDump[] waitForEntries() throws ForceReattemptException {
try {
waitForRepliesUninterruptibly();
} catch (ReplyException e) {
Throwable t = e.getCause();
if (t instanceof CancelException) {
logger.debug("FetchBulkEntriesResponse got remote cancellation; forcing reattempt. {}",
t.getMessage(), t);
throw new ForceReattemptException(
"FetchKeysResponse got remote cancellation; forcing reattempt.",
t);
} else if (t instanceof ForceReattemptException) {
// Not sure this is necessary, but it is possible for
// FetchBulkEntriesMessage to marshal a ForceReattemptException, so...
throw new ForceReattemptException(
"Peer requests reattempt", t);
}
e.handleCause();
}
if (!this.lastChunkReceived) {
throw new ForceReattemptException(
"No replies received");
}
BucketDump[] dumps = new BucketDump[this.receivedBuckets.size()];
for (int i = 0; i < this.receivedBuckets.size(); i++) {
int id = this.receivedBuckets.get(i);
dumps[i] = new BucketDump(id, recipient, null, returnValue.get(id), returnVersions.get(id));
}
return dumps;
}
public HashSet<Integer> getFailedBucketIds() {
return this.failedBucketIds;
}
}
@Override
public Version[] getSerializationVersions() {
return null;
}
}